libmicrohttpd

HTTP/1.x server C library (MHD 1.x, stable)
Log | Files | Refs | Submodules | README | LICENSE

commit 19b038f68272a423ae74c89427c8b5dcfc7ce1ae
parent 3460db01ec7e2b042296d7f703dfa344c893f5ea
Author: Evgeny Grin (Karlson2k) <k2k@narod.ru>
Date:   Thu, 22 Oct 2020 16:38:59 +0300

Fixed thread-safety for externally added connections
Fully re-implemented scheme of adding connections
from external thread (application)

Diffstat:
Msrc/microhttpd/daemon.c | 237+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------
Msrc/microhttpd/internal.h | 21+++++++++++++++++++++
2 files changed, 210 insertions(+), 48 deletions(-)

diff --git a/src/microhttpd/daemon.c b/src/microhttpd/daemon.c @@ -2607,34 +2607,55 @@ new_connection_prepare_ (struct MHD_Daemon *daemon, /** + * Close prepared, but not yet processed connection. + * @param daemon the daemon + * @param connection the connection to close + */ +static void +new_connection_close_ (struct MHD_Daemon *daemon, + struct MHD_Connection *connection) +{ + mhd_assert (connection->daemon == daemon); + mhd_assert (! connection->in_cleanup); + mhd_assert (NULL == connection->next); + mhd_assert (NULL == connection->nextX); +#ifdef EPOLL_SUPPORT + mhd_assert (NULL == connection->nextE); +#endif /* EPOLL_SUPPORT */ + +#ifdef HTTPS_SUPPORT + if (NULL != connection->tls_session) + { + mhd_assert (0 != (daemon->options & MHD_USE_TLS)); + gnutls_deinit (connection->tls_session); + } +#endif /* HTTPS_SUPPORT */ + MHD_socket_close_chk_ (connection->socket_fd); + MHD_ip_limit_del (daemon, + connection->addr, + connection->addr_len); + free (connection->addr); + free (connection); +} + + +/** * Finally insert the new connection to the list of connections - * served by the daemon. + * served by the daemon and start processing. * @remark To be called only from thread that process * daemon's select()/poll()/etc. * * @param daemon daemon that manages the connection - * @param client_socket socket to manage (MHD will expect - * to receive an HTTP request from this socket next). - * @param addr IP address of the client - * @param addrlen number of bytes in @a addr - * @param external_add perform additional operations needed due - * to the application calling us directly * @param connection the newly created connection - * @return #MHD_YES on success, #MHD_NO if this daemon could - * not handle the connection (i.e. malloc failed, etc). - * The socket will be closed in any case; 'errno' is - * set to indicate further details about the error. + * @return #MHD_YES on success, #MHD_NO on error */ static enum MHD_Result -new_connection_insert_ (struct MHD_Daemon *daemon, - MHD_socket client_socket, - const struct sockaddr *addr, - socklen_t addrlen, - bool external_add, - struct MHD_Connection *connection) +new_connection_process_ (struct MHD_Daemon *daemon, + struct MHD_Connection *connection) { int eno = 0; + mhd_assert (connection->daemon == daemon); /* Allocate memory pool in the processing thread so * intensively used memory area is allocated in "good" * (for the thread) memory region. It is important with @@ -2647,10 +2668,10 @@ new_connection_insert_ (struct MHD_Daemon *daemon, _ ("Error allocating memory: %s\n"), MHD_strerror_ (errno)); #endif - MHD_socket_close_chk_ (client_socket); + MHD_socket_close_chk_ (connection->socket_fd); MHD_ip_limit_del (daemon, - addr, - addrlen); + connection->addr, + connection->addr_len); free (connection); #if ENOMEM errno = ENOMEM; @@ -2721,15 +2742,15 @@ new_connection_insert_ (struct MHD_Daemon *daemon, #ifdef EPOLL_SUPPORT if (0 != (daemon->options & MHD_USE_EPOLL)) { - if ((0 == (daemon->options & MHD_USE_TURBO)) || (external_add)) - { /* Do not manipulate EReady DL-list in 'external_add' mode. */ + if (0 == (daemon->options & MHD_USE_TURBO)) + { struct epoll_event event; event.events = EPOLLIN | EPOLLOUT | EPOLLPRI | EPOLLET; event.data.ptr = connection; if (0 != epoll_ctl (daemon->epoll_fd, EPOLL_CTL_ADD, - client_socket, + connection->socket_fd, &event)) { eno = errno; @@ -2752,20 +2773,10 @@ new_connection_insert_ (struct MHD_Daemon *daemon, connection); } } - else /* This 'else' is combined with next 'if'. */ -#endif - if ( (0 == (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) && - (external_add) && - (MHD_ITC_IS_VALID_ (daemon->itc)) && - (! MHD_itc_activate_ (daemon->itc, "n")) ) - { -#ifdef HAVE_MESSAGES - MHD_DLOG (daemon, - _ ( - "Failed to signal new connection via inter-thread communication channel.\n")); #endif - } + return MHD_YES; + cleanup: if (NULL != daemon->notify_connection) daemon->notify_connection (daemon->notify_connection_cls, @@ -2776,10 +2787,10 @@ cleanup: if (NULL != connection->tls_session) gnutls_deinit (connection->tls_session); #endif /* HTTPS_SUPPORT */ - MHD_socket_close_chk_ (client_socket); + MHD_socket_close_chk_ (connection->socket_fd); MHD_ip_limit_del (daemon, - addr, - addrlen); + connection->addr, + connection->addr_len); #if defined(MHD_USE_POSIX_THREADS) || defined(MHD_USE_W32_THREADS) MHD_mutex_lock_chk_ (&daemon->cleanup_connection_mutex); #endif @@ -2880,8 +2891,82 @@ internal_add_connection (struct MHD_Daemon *daemon, non_blck, &connection)) return MHD_NO; - return new_connection_insert_ (daemon, client_socket, addr, addrlen, - external_add, connection); + if ((external_add) && + (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD))) + { + /* Connection is added externally and MHD is handling its own threads. */ + MHD_mutex_lock_chk_ (&daemon->new_connections_mutex); + DLL_insert (daemon->new_connections_head, + daemon->new_connections_tail, + connection); + daemon->have_new = true; + MHD_mutex_unlock_chk_ (&daemon->new_connections_mutex); + + /* The rest of connection processing must be handled in + * the daemon thread. */ + if ((MHD_ITC_IS_VALID_ (daemon->itc)) && + (! MHD_itc_activate_ (daemon->itc, "n"))) + { + #ifdef HAVE_MESSAGES + MHD_DLOG (daemon, + _ ("Failed to signal new connection via inter-thread " \ + "communication channel.\n")); + #endif + } + return MHD_YES; + } + + return new_connection_process_ (daemon, connection); +} + + +static void +new_connections_list_process_ (struct MHD_Daemon *daemon) +{ + struct MHD_Connection *local_head; + struct MHD_Connection *local_tail; + struct MHD_Connection *c; /**< Currently processed connection */ + mhd_assert (daemon->have_new); + mhd_assert (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD)); + + local_head = NULL; + local_tail = NULL; + + /* Move all new connections to the local DL-list to release the mutex + * as quick as possible. */ + MHD_mutex_lock_chk_ (&daemon->new_connections_mutex); + mhd_assert (NULL != daemon->new_connections_head); + do + { /* Move connection in FIFO order. */ + c = daemon->new_connections_tail; + DLL_remove (daemon->new_connections_head, + daemon->new_connections_tail, + c); + DLL_insert (local_head, + local_tail, + c); + } while (NULL != daemon->new_connections_tail); + daemon->have_new = false; + MHD_mutex_unlock_chk_ (&daemon->new_connections_mutex); + + /* Process new connections in FIFO order. */ + do + { + c = local_tail; + DLL_remove (local_head, + local_tail, + c); + mhd_assert (daemon == c->daemon); + if (MHD_NO == new_connection_process_ (daemon, c)) + { +#ifdef HAVE_MESSAGES + MHD_DLOG (daemon, + _ ("Failed to start serving new connection.\n")); +#endif + (void) 0; + } + } while (NULL != local_tail); + } @@ -3710,6 +3795,10 @@ internal_run_from_select (struct MHD_Daemon *daemon, read_fd_set)) ) MHD_itc_clear_ (daemon->itc); + /* Process externally added connection if any */ + if (daemon->have_new) + new_connections_list_process_ (daemon); + /* select connection thread handling type */ if ( (MHD_INVALID_SOCKET != (ds = daemon->listen_fd)) && (! daemon->was_quiesced) && @@ -4141,9 +4230,6 @@ MHD_poll_all (struct MHD_Daemon *daemon, return MHD_NO; } - /* Reset. New value will be set when connections are processed. */ - daemon->data_already_pending = false; - /* handle ITC FD */ /* do it before any other processing so new signals will be processed in next loop */ @@ -4157,6 +4243,19 @@ MHD_poll_all (struct MHD_Daemon *daemon, free (p); return MHD_NO; } + + /* Process externally added connection if any */ + if (daemon->have_new) + new_connections_list_process_ (daemon); + + /* handle 'listen' FD */ + if ( (-1 != poll_listen) && + (0 != (p[poll_listen].revents & POLLIN)) ) + (void) MHD_accept_connection (daemon); + + /* Reset. New value will be set when connections are processed. */ + daemon->data_already_pending = false; + i = 0; prev = daemon->connections_tail; while (NULL != (pos = prev)) @@ -4209,10 +4308,6 @@ MHD_poll_all (struct MHD_Daemon *daemon, } } #endif /* HTTPS_SUPPORT && UPGRADE_SUPPORT */ - /* handle 'listen' FD */ - if ( (-1 != poll_listen) && - (0 != (p[poll_listen].revents & POLLIN)) ) - (void) MHD_accept_connection (daemon); free (p); } @@ -4294,6 +4389,11 @@ MHD_poll_listen_socket (struct MHD_Daemon *daemon, /* handle shutdown */ if (daemon->shutdown) return MHD_NO; + + /* Process externally added connection if any */ + if (daemon->have_new) + new_connections_list_process_ (daemon); + if ( (-1 != poll_listen) && (0 != (p[poll_listen].revents & POLLIN)) ) (void) MHD_accept_connection (daemon); @@ -4739,6 +4839,10 @@ MHD_epoll (struct MHD_Daemon *daemon, } } + /* Process externally added connection if any */ + if (daemon->have_new) + new_connections_list_process_ (daemon); + if (need_to_accept) { unsigned int series_length = 0; @@ -6633,6 +6737,18 @@ MHD_start_daemon_va (unsigned int flags, #endif /* ! HAVE_LISTEN_SHUTDOWN */ if (0 == daemon->worker_pool_size) { + if (! MHD_mutex_init_ (&daemon->new_connections_mutex)) + { +#ifdef HAVE_MESSAGES + MHD_DLOG (daemon, + _ ("Failed to initialise mutex.\n")); +#endif + MHD_mutex_destroy_chk_ (&daemon->cleanup_connection_mutex); + MHD_mutex_destroy_chk_ (&daemon->per_ip_connection_mutex); + if (MHD_INVALID_SOCKET != listen_fd) + MHD_socket_close_chk_ (listen_fd); + goto free_and_fail; + } if (! MHD_create_named_thread_ (&daemon->pid, (*pflags & MHD_USE_THREAD_PER_CONNECTION) ? @@ -6646,6 +6762,7 @@ MHD_start_daemon_va (unsigned int flags, _ ("Failed to create listen thread: %s\n"), MHD_strerror_ (errno)); #endif + MHD_mutex_destroy_chk_ (&daemon->new_connections_mutex); MHD_mutex_destroy_chk_ (&daemon->cleanup_connection_mutex); MHD_mutex_destroy_chk_ (&daemon->per_ip_connection_mutex); if (MHD_INVALID_SOCKET != listen_fd) @@ -6685,7 +6802,14 @@ MHD_start_daemon_va (unsigned int flags, d->master = daemon; d->worker_pool_size = 0; d->worker_pool = NULL; - + if (! MHD_mutex_init_ (&d->new_connections_mutex)) + { + #ifdef HAVE_MESSAGES + MHD_DLOG (daemon, + _ ("Failed to initialise mutex.\n")); + #endif + goto thread_failed; + } if (0 != (*pflags & MHD_USE_ITC)) { if (! MHD_itc_init_ (d->itc)) @@ -6696,6 +6820,7 @@ MHD_start_daemon_va (unsigned int flags, "Failed to create worker inter-thread communication channel: %s\n"), MHD_itc_last_strerror_ () ); #endif + MHD_mutex_destroy_chk_ (&d->new_connections_mutex); goto thread_failed; } if ( (0 == (*pflags & (MHD_USE_POLL | MHD_USE_EPOLL))) && @@ -6707,6 +6832,7 @@ MHD_start_daemon_va (unsigned int flags, _ ( "File descriptor for worker inter-thread communication channel exceeds maximum value.\n")); #endif + MHD_mutex_destroy_chk_ (&d->new_connections_mutex); MHD_itc_destroy_chk_ (d->itc); goto thread_failed; } @@ -6733,6 +6859,7 @@ MHD_start_daemon_va (unsigned int flags, { if (MHD_ITC_IS_VALID_ (d->itc)) MHD_itc_destroy_chk_ (d->itc); + MHD_mutex_destroy_chk_ (&d->new_connections_mutex); goto thread_failed; } #endif @@ -6745,6 +6872,7 @@ MHD_start_daemon_va (unsigned int flags, #endif if (MHD_ITC_IS_VALID_ (d->itc)) MHD_itc_destroy_chk_ (d->itc); + MHD_mutex_destroy_chk_ (&d->new_connections_mutex); goto thread_failed; } @@ -6765,6 +6893,7 @@ MHD_start_daemon_va (unsigned int flags, MHD_mutex_destroy_chk_ (&d->cleanup_connection_mutex); if (MHD_ITC_IS_VALID_ (d->itc)) MHD_itc_destroy_chk_ (d->itc); + MHD_mutex_destroy_chk_ (&d->new_connections_mutex); goto thread_failed; } } @@ -6879,6 +7008,17 @@ close_all_connections (struct MHD_Daemon *daemon) mhd_assert (NULL == daemon->worker_pool); #endif mhd_assert (daemon->shutdown); + + /* Remove externally added new connections that are + * not processed by the daemon thread. */ + while (NULL != (pos = daemon->new_connections_tail)) + { + mhd_assert (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD)); + DLL_remove (daemon->new_connections_head, + daemon->new_connections_tail, + pos); + new_connection_close_ (daemon, pos); + } /* give upgraded HTTPS connections a chance to finish */ /* 'daemon->urh_head' is not used in thread-per-connection mode. */ for (urh = daemon->urh_tail; NULL != urh; urh = urhn) @@ -7126,6 +7266,7 @@ MHD_stop_daemon (struct MHD_Daemon *daemon) } if (MHD_ITC_IS_VALID_ (daemon->itc)) MHD_itc_destroy_chk_ (daemon->itc); + MHD_mutex_destroy_chk_ (&daemon->new_connections_mutex); #ifdef EPOLL_SUPPORT if ( (0 != (daemon->options & MHD_USE_EPOLL)) && diff --git a/src/microhttpd/internal.h b/src/microhttpd/internal.h @@ -1302,6 +1302,16 @@ struct MHD_Daemon void *default_handler_cls; /** + * Head of doubly-linked list of new, externally added connections. + */ + struct MHD_Connection *new_connections_head; + + /** + * Tail of doubly-linked list of new, externally added connections. + */ + struct MHD_Connection *new_connections_tail; + + /** * Head of doubly-linked list of our current, active connections. */ struct MHD_Connection *connections_head; @@ -1516,6 +1526,11 @@ struct MHD_Daemon * "manual_timeout" DLLs. */ MHD_mutex_ cleanup_connection_mutex; + + /** + * Mutex for any access to the "new connections" DL-list. + */ + MHD_mutex_ new_connections_mutex; #endif /** @@ -1601,6 +1616,12 @@ struct MHD_Daemon volatile bool resuming; /** + * Indicate that new connections in @e new_connections_head list + * need to be processed. + */ + volatile bool have_new; + + /** * 'True' if some data is already waiting to be processed. * If set to 'true' - zero timeout for select()/poll*() * is used.