libmicrohttpd2

HTTP server C library (MHD 2.x, alpha)
Log | Files | Refs | README | LICENSE

commit f994a286234c0bd8a0670cd1ae32ce0cff4e54fd
parent a0d33195c944047f5d28a7878325d5f7973f3d19
Author: Evgeny Grin (Karlson2k) <k2k@drgrin.dev>
Date:   Mon, 29 Dec 2025 13:36:26 +0100

Implemented externally added connections

The new implementation can be used as a basis for inter-thread data
exchange.

Also the new implementation uses completely fair connection distribution
between worker and could be even more efficient than internal
"accept()".

Diffstat:
Msrc/include/microhttpd2.h | 14+++++++-------
Msrc/include/microhttpd2_main.h.in | 14+++++++-------
Msrc/mhd2/daemon_add_conn.c | 458++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
Msrc/mhd2/daemon_add_conn.h | 10++++++++++
Msrc/mhd2/daemon_start.c | 286+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Msrc/mhd2/mhd_daemon.h | 99+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 661 insertions(+), 220 deletions(-)

diff --git a/src/include/microhttpd2.h b/src/include/microhttpd2.h @@ -5132,20 +5132,20 @@ MHD_FN_PAR_NONNULL_ (1); * The client socket will be closed by MHD even if error returned. * * @param daemon daemon that manages the connection - * @param client_socket socket to manage (MHD will expect - * to receive an HTTP request from this socket next). - * @param[in] addr IP address of the client - * @param addrlen number of bytes in @a addr + * @param new_socket socket to manage (MHD will expect to receive an + HTTP request from this socket next). + * @param addr_size number of bytes in @a addr + * @param addr IP address of the client, ignored when @a addrlen is zero * @param connection_cntx meta data the application wants to * associate with the new connection object * @return #MHD_SC_OK on success, - * error on failure (the @a client_socket is closed) + * error on failure (the @a new_socket is closed) * @ingroup specialized */ MHD_EXTERN_ enum MHD_StatusCode MHD_daemon_add_connection (struct MHD_Daemon *MHD_RESTRICT daemon, - MHD_Socket client_socket, - size_t addrlen, + MHD_Socket new_socket, + size_t addr_size, const struct sockaddr *MHD_RESTRICT addr, void *connection_cntx) MHD_FN_PAR_NONNULL_ (1) diff --git a/src/include/microhttpd2_main.h.in b/src/include/microhttpd2_main.h.in @@ -255,20 +255,20 @@ MHD_FN_PAR_NONNULL_ (1); * The client socket will be closed by MHD even if error returned. * * @param daemon daemon that manages the connection - * @param client_socket socket to manage (MHD will expect - * to receive an HTTP request from this socket next). - * @param[in] addr IP address of the client - * @param addrlen number of bytes in @a addr + * @param new_socket socket to manage (MHD will expect to receive an + HTTP request from this socket next). + * @param addr_size number of bytes in @a addr + * @param addr IP address of the client, ignored when @a addrlen is zero * @param connection_cntx meta data the application wants to * associate with the new connection object * @return #MHD_SC_OK on success, - * error on failure (the @a client_socket is closed) + * error on failure (the @a new_socket is closed) * @ingroup specialized */ MHD_EXTERN_ enum MHD_StatusCode MHD_daemon_add_connection (struct MHD_Daemon *MHD_RESTRICT daemon, - MHD_Socket client_socket, - size_t addrlen, + MHD_Socket new_socket, + size_t addr_size, const struct sockaddr *MHD_RESTRICT addr, void *connection_cntx) MHD_FN_PAR_NONNULL_ (1) diff --git a/src/mhd2/daemon_add_conn.c b/src/mhd2/daemon_add_conn.c @@ -61,6 +61,10 @@ #include "mhd_assume.h" #include "sys_offsetof.h" + +#include "mhd_locks.h" +#include "mhd_atomic_counter.h" + #include "sys_sockets_types.h" #include "sys_sockets_headers.h" #include "sys_ip_headers.h" @@ -89,6 +93,7 @@ #include "mempool_funcs.h" #include "events_process.h" +#include "daemon_funcs.h" #include "response_from.h" #include "response_destroy.h" #include "conn_timeout.h" @@ -199,8 +204,9 @@ notify_app_conn (struct MHD_Daemon *restrict daemon, * @param daemon daemon that manages the connection * @param client_socket socket to manage (MHD will expect * to receive an HTTP request from this socket next). - * @param addr IP address of the client * @param addrlen number of bytes in @a addr + * @param addr IP address of the client, + * will be deallocated by free() if @a external_add is 'true' * @param external_add indicate that socket has been added externally * @param non_blck indicate that socket in non-blocking mode * @param sk_spipe_supprs indicate that the @a client_socket has @@ -212,12 +218,12 @@ notify_app_conn (struct MHD_Daemon *restrict daemon, * error on failure (the @a client_socket is closed) */ static MHD_FN_MUST_CHECK_RESULT_ -MHD_FN_PAR_NONNULL_ (1) +MHD_FN_PAR_NONNULL_ (1) MHD_FN_PAR_INOUT_SIZE_ (4,3) MHD_FN_PAR_NONNULL_ (9) MHD_FN_PAR_OUT_ (9) enum MHD_StatusCode new_connection_prepare_ (struct MHD_Daemon *restrict daemon, MHD_Socket client_socket, - const struct sockaddr *restrict addr, size_t addrlen, + struct sockaddr_storage *restrict addr, bool external_add, bool non_blck, bool sk_spipe_supprs, @@ -286,33 +292,47 @@ new_connection_prepare_ (struct MHD_Daemon *restrict daemon, c->sk.state.nodelay = mhd_T_MAYBE; } - if (0 < addrlen) + if ((0 < addrlen)) { - // TODO: combine into single allocation. Alignment should be taken into account - c->sk.addr.data = (struct sockaddr_storage *) malloc (addrlen); - if (NULL == c->sk.addr.data) + if (! external_add) { - mhd_LOG_MSG (daemon, \ - MHD_SC_CONNECTION_MEM_ALLOC_FAILURE, \ - "Failed to allocate memory for the new connection"); - ret = MHD_SC_CONNECTION_MEM_ALLOC_FAILURE; + c->sk.addr.data = (struct sockaddr_storage *) malloc (addrlen); + if (NULL == c->sk.addr.data) + { + mhd_LOG_MSG (daemon, \ + MHD_SC_CONNECTION_MEM_ALLOC_FAILURE, \ + "Failed to allocate memory for the new connection"); + ret = MHD_SC_CONNECTION_MEM_ALLOC_FAILURE; + } + else + { + memcpy (c->sk.addr.data, + addr, + addrlen); + c->sk.addr.size = addrlen; +#ifdef HAVE_STRUCT_SOCKADDR_STORAGE_SS_LEN + c->sk.addr.data->ss_len = addrlen; +#endif /* HAVE_STRUCT_SOCKADDR_STORAGE_SS_LEN */ + } } else { - memcpy (c->sk.addr.data, - addr, - addrlen); -#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN - ((struct sockaddr*) c->sk.addr.data)->sa_len = addrlen; /* Force correct value */ -#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */ + c->sk.addr.data = addr; + c->sk.addr.size = addrlen; +#ifdef HAVE_STRUCT_SOCKADDR_STORAGE_SS_LEN + c->sk.addr.data->ss_len = addrlen; +#endif /* HAVE_STRUCT_SOCKADDR_STORAGE_SS_LEN */ + addr = NULL; } } else + { c->sk.addr.data = NULL; + c->sk.addr.size = 0u; + } if (MHD_SC_OK == ret) { - c->sk.addr.size = addrlen; c->sk.fd = client_socket; c->sk.props.is_nonblck = non_blck; c->sk.props.is_nonip = sk_is_nonip; @@ -359,6 +379,10 @@ new_connection_prepare_ (struct MHD_Daemon *restrict daemon, } free (c); } + + if ((NULL != addr) && external_add) + free (addr); + mhd_assert (MHD_SC_OK != ret); return ret; /* Failure exit point */ } @@ -568,8 +592,9 @@ new_connection_process_ (struct MHD_Daemon *restrict daemon, * @param daemon daemon that manages the connection * @param client_socket socket to manage (MHD will expect * to receive an HTTP request from this socket next). - * @param addr IP address of the client * @param addrlen number of bytes in @a addr + * @param addr IP address of the client, + * will be deallocated by free() if @a external_add is 'true' * @param external_add perform additional operations needed due * to the application calling us directly * @param non_blck indicate that socket in non-blocking mode @@ -579,11 +604,11 @@ new_connection_process_ (struct MHD_Daemon *restrict daemon, * @return #MHD_SC_OK on success, * error on failure (the @a client_socket is closed) */ -static enum MHD_StatusCode +static MHD_FN_PAR_NONNULL_ (1) MHD_FN_PAR_INOUT_SIZE_ (4,3) enum MHD_StatusCode internal_add_connection (struct MHD_Daemon *daemon, MHD_Socket client_socket, - const struct sockaddr *addr, size_t addrlen, + struct sockaddr_storage *addr, bool external_add, bool non_blck, bool sk_spipe_supprs, @@ -622,12 +647,13 @@ internal_add_connection (struct MHD_Daemon *daemon, { res = new_connection_prepare_ (daemon, client_socket, - addr, addrlen, + addrlen, addr, external_add, non_blck, sk_spipe_supprs, sk_is_nonip, &connection); + addr = NULL; /* Cleaned up with 'connection' if needed */ if (MHD_SC_OK == res) { @@ -641,6 +667,8 @@ internal_add_connection (struct MHD_Daemon *daemon, } } + if ((NULL != addr) && external_add) + free (addr); mhd_socket_close (client_socket); mhd_assert (MHD_SC_OK != res); @@ -649,167 +677,123 @@ internal_add_connection (struct MHD_Daemon *daemon, } -#if 0 // TODO: implement -static void -new_connections_list_process_ (struct MHD_Daemon *daemon) -{ - struct MHD_Connection *local_head; - struct MHD_Connection *local_tail; - mhd_assert (daemon->events.act_req); - // mhd_assert (MHD_D_IS_THREAD_SAFE_ (daemon)); - - /* Detach DL-list of new connections from the daemon for - * following local processing. */ - MHD_mutex_lock_chk_ (&daemon->new_connections_mutex); - mhd_assert (NULL != daemon->new_connections_head); - local_head = daemon->new_connections_head; - local_tail = daemon->new_connections_tail; - daemon->new_connections_head = NULL; - daemon->new_connections_tail = NULL; - daemon->have_new = false; - MHD_mutex_unlock_chk_ (&daemon->new_connections_mutex); - (void) local_head; /* Mute compiler warning */ - - /* Process new connections in FIFO order. */ - do - { - struct MHD_Connection *c; /**< Currently processed connection */ - - c = local_tail; - DLL_remove (local_head, - local_tail, - c); - mhd_assert (daemon == c->daemon); - if (MHD_NO == new_connection_process_ (daemon, c)) - { -#ifdef HAVE_MESSAGES - MHD_DLOG (daemon, - _ ("Failed to start serving new connection.\n")); -#endif - (void) 0; - } - } while (NULL != local_tail); - -} - - -#endif - - MHD_FN_PAR_NONNULL_ (1) MHD_FN_PAR_IN_ (4) MHD_EXTERN_ enum MHD_StatusCode MHD_daemon_add_connection (struct MHD_Daemon *MHD_RESTRICT daemon, - MHD_Socket client_socket, - size_t addrlen, + MHD_Socket new_socket, + size_t addr_size, const struct sockaddr *MHD_RESTRICT addr, void *connection_cntx) { + enum MHD_StatusCode ret; bool sk_nonbl; bool sk_spipe_supprs; + + sk_nonbl = false; + // TODO: global daemon lock for external events - (void) connection_cntx; // FIXME: is it really needed? Where it is used? + (void) connection_cntx; // TODO: add support for connection's context - if ((! mhd_D_HAS_THREADS (daemon)) && - (daemon->conns.block_new)) - (void) 0; // FIXME: remove already pending connections? + ret = MHD_SC_OK; if (! mhd_D_TYPE_HAS_WORKERS (daemon->threading.d_type) && daemon->conns.block_new) - { - (void) mhd_socket_close (client_socket); - return MHD_SC_LIMIT_CONNECTIONS_REACHED; - } + ret = MHD_SC_LIMIT_CONNECTIONS_REACHED; - if (0 != addrlen) + if ((MHD_SC_OK == ret) && + (0 != addr_size)) { - if (addrlen < (sizeof(addr->sa_family) - + offsetof (struct sockaddr, sa_family))) + if (addr_size < (sizeof(addr->sa_family) + + offsetof (struct sockaddr, sa_family))) { mhd_LOG_MSG (daemon, MHD_SC_CONFIGURATION_WRONG_SA_SIZE, \ "MHD_add_connection() has been called with " \ - "incorrect 'addrlen' value."); - (void) mhd_socket_close (client_socket); - return MHD_SC_CONFIGURATION_WRONG_SA_SIZE; + "incorrect 'addr_size' value."); + ret = MHD_SC_CONFIGURATION_WRONG_SA_SIZE; } - if (AF_INET == addr->sa_family) + else if (AF_INET == addr->sa_family) { - if (sizeof(struct sockaddr_in) > addrlen) + if (sizeof(struct sockaddr_in) > addr_size) { mhd_LOG_MSG (daemon, MHD_SC_CONFIGURATION_WRONG_SA_SIZE, \ "MHD_add_connection() has been called with " \ - "incorrect 'addrlen' value."); - (void) mhd_socket_close (client_socket); - return MHD_SC_CONFIGURATION_WRONG_SA_SIZE; + "incorrect 'addr_size' value."); + ret = MHD_SC_CONFIGURATION_WRONG_SA_SIZE; } #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN - if ((0 != addr->sa_len) && - (sizeof(struct sockaddr_in) > (size_t) addr->sa_len) ) + else if ((0 != addr->sa_len) && + (sizeof(struct sockaddr_in) > (size_t) addr->sa_len) ) { mhd_LOG_MSG (daemon, MHD_SC_CONFIGURATION_WRONG_SA_SIZE, \ "MHD_add_connection() has been called with " \ "non-zero value of 'sa_len' member of " \ "'struct sockaddr' which does not match 'sa_family'."); - (void) mhd_socket_close (client_socket); - return MHD_SC_CONFIGURATION_WRONG_SA_SIZE; + ret = MHD_SC_CONFIGURATION_WRONG_SA_SIZE; } #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */ } #ifdef HAVE_INET6 else if (AF_INET6 == addr->sa_family) { - if (sizeof(struct sockaddr_in6) > addrlen) + if (sizeof(struct sockaddr_in6) > addr_size) { mhd_LOG_MSG (daemon, MHD_SC_CONFIGURATION_WRONG_SA_SIZE, \ "MHD_add_connection() has been called with " \ - "incorrect 'addrlen' value."); - (void) mhd_socket_close (client_socket); - return MHD_SC_CONFIGURATION_WRONG_SA_SIZE; + "incorrect 'addr_size' value."); + ret = MHD_SC_CONFIGURATION_WRONG_SA_SIZE; } #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN - if ((0 != addr->sa_len) && - (sizeof(struct sockaddr_in6) > (size_t) addr->sa_len) ) + else if ((0 != addr->sa_len) && + (sizeof(struct sockaddr_in6) > (size_t) addr->sa_len) ) { mhd_LOG_MSG (daemon, MHD_SC_CONFIGURATION_WRONG_SA_SIZE, \ "MHD_add_connection() has been called with " \ "non-zero value of 'sa_len' member of " \ "'struct sockaddr' which does not match 'sa_family'."); - (void) mhd_socket_close (client_socket); - return MHD_SC_CONFIGURATION_WRONG_SA_SIZE; + ret = MHD_SC_CONFIGURATION_WRONG_SA_SIZE; } #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */ } #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN - else if (addrlen < (sizeof(addr->sa_len) - + offsetof (struct sockaddr, sa_len))) + else if (addr_size < (sizeof(addr->sa_len) + + offsetof (struct sockaddr, sa_len))) { mhd_LOG_MSG (daemon, MHD_SC_CONFIGURATION_WRONG_SA_SIZE, \ "MHD_add_connection() has been called with " \ - "incorrect 'addrlen' value."); - (void) mhd_socket_close (client_socket); - return MHD_SC_CONFIGURATION_WRONG_SA_SIZE; + "incorrect 'addr_size' value."); + ret = MHD_SC_CONFIGURATION_WRONG_SA_SIZE; } if ((0 != addr->sa_len) && - (addrlen > (size_t) addr->sa_len)) - addrlen = (size_t) addr->sa_len; /* Use safest value */ + (addr_size > (size_t) addr->sa_len)) + addr_size = (size_t) addr->sa_len; /* Use safest value */ #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */ #endif /* HAVE_INET6 */ } - if (! mhd_FD_FITS_DAEMON (daemon, client_socket)) + if (MHD_SC_OK == ret) { - mhd_LOG_MSG (daemon, MHD_SC_NEW_CONN_FD_OUTSIDE_OF_SET_RANGE, \ - "The new connection FD value is higher than allowed"); - (void) mhd_socket_close (client_socket); - return MHD_SC_NEW_CONN_FD_OUTSIDE_OF_SET_RANGE; + if (! mhd_FD_FITS_DAEMON (daemon, + new_socket)) + { + mhd_LOG_MSG (daemon, MHD_SC_NEW_CONN_FD_OUTSIDE_OF_SET_RANGE, \ + "The new connection FD value is higher than allowed"); + ret = MHD_SC_NEW_CONN_FD_OUTSIDE_OF_SET_RANGE; + } } - if (! mhd_socket_nonblocking (client_socket)) + if (MHD_SC_OK == ret) { - mhd_LOG_MSG (daemon, MHD_SC_ACCEPT_CONFIGURE_NONBLOCKING_FAILED, \ - "Failed to set nonblocking mode on the new client socket."); - sk_nonbl = false; + sk_nonbl = mhd_socket_nonblocking (new_socket); + if (! sk_nonbl) + mhd_LOG_MSG (daemon, MHD_SC_ACCEPT_CONFIGURE_NONBLOCKING_FAILED, \ + "Failed to set nonblocking mode on the new client socket."); + + if (1) // TODO: implement turbo + { + if (! mhd_socket_noninheritable (new_socket)) + mhd_LOG_MSG (daemon, MHD_SC_ACCEPT_CONFIGURE_NOINHERIT_FAILED, \ + "Failed to set noninheritable mode on new client socket."); + } } - else - sk_nonbl = true; #ifndef MHD_SOCKETS_KIND_WINSOCK sk_spipe_supprs = false; @@ -817,90 +801,130 @@ MHD_daemon_add_connection (struct MHD_Daemon *MHD_RESTRICT daemon, sk_spipe_supprs = true; /* Nothing to suppress on W32 */ #endif /* MHD_SOCKETS_KIND_WINSOCK */ #if defined(mhd_socket_nosignal) - if (! sk_spipe_supprs) - sk_spipe_supprs = mhd_socket_nosignal (client_socket); - if (! sk_spipe_supprs) + if (MHD_SC_OK == ret) { - mhd_LOG_MSG (daemon, MHD_SC_ACCEPT_CONFIGURE_NOSIGPIPE_FAILED, \ - "Failed to suppress SIGPIPE on the new client socket."); -#ifndef HAVE_DCLR_MSG_NOSIGNAL - /* Application expects that SIGPIPE will be suppressed, - * but suppression failed and SIGPIPE cannot be suppressed with send(). */ - if (! daemon->sigpipe_blocked) + if (! sk_spipe_supprs) + sk_spipe_supprs = mhd_socket_nosignal (new_socket); + if (! sk_spipe_supprs) { - int err = MHD_socket_get_error_ (); - MHD_socket_close_ (client_socket); - MHD_socket_fset_error_ (err); - return MHD_SC_ACCEPT_CONFIGURE_NOSIGPIPE_FAILED; + mhd_LOG_MSG (daemon, MHD_SC_ACCEPT_CONFIGURE_NOSIGPIPE_FAILED, \ + "Failed to suppress SIGPIPE on the new client socket."); +# ifndef HAVE_DCLR_MSG_NOSIGNAL + /* Application expects that SIGPIPE will be suppressed, + * but suppression failed and SIGPIPE cannot be suppressed with send(). */ + if (! daemon->sigpipe_blocked) + ret = MHD_SC_ACCEPT_CONFIGURE_NOSIGPIPE_FAILED; +# endif /* HAVE_DCLR_MSG_NOSIGNAL */ } -#endif /* HAVE_DCLR_MSG_NOSIGNAL */ } #endif /* mhd_socket_nosignal */ - if (1) // TODO: implement turbo + if (MHD_SC_OK == ret) { - if (! mhd_socket_noninheritable (client_socket)) - mhd_LOG_MSG (daemon, MHD_SC_ACCEPT_CONFIGURE_NOINHERIT_FAILED, \ - "Failed to set noninheritable mode on new client socket."); - } + struct mhd_DaemonExtAddedConn *new_conn; -#ifdef HAVE_STRUCT_SOCKADDR_STORAGE_SS_LEN - addrstorage.ss_len = addrlen; /* Force set the right length */ -#endif /* HAVE_STRUCT_SOCKADDR_STORAGE_SS_LEN */ + new_conn = + (struct mhd_DaemonExtAddedConn*) + malloc (sizeof(struct mhd_DaemonExtAddedConn)); + + if (NULL == new_conn) + ret = MHD_SC_CONNECTION_MEM_ALLOC_FAILURE; + else + { + mhd_DLINKEDL_INIT_LINKS (new_conn, queue); + new_conn->skt = new_socket; + new_conn->is_nonblock = sk_nonbl; + new_conn->has_spipe_suppr = sk_spipe_supprs; + new_conn->addr_size = addr_size; + + if (0 != addr_size) + { + new_conn->addr = (struct sockaddr_storage *) malloc (addr_size); + if (NULL == new_conn->addr) + ret = MHD_SC_CONNECTION_MEM_ALLOC_FAILURE; + else + memcpy (new_conn->addr, + addr, + addr_size); + } + else + new_conn->addr = NULL; + if (MHD_SC_OK == ret) + { + struct MHD_Daemon *d_to_add; + if (! mhd_D_TYPE_HAS_WORKERS (daemon->threading.d_type)) + d_to_add = daemon; + else + { #if defined(MHD_SUPPORT_THREADS) - if (mhd_D_TYPE_HAS_WORKERS (daemon->threading.d_type)) - { - unsigned int i; - /* have a pool, try to find a pool with capacity; we use the - socket as the initial offset into the pool for load - balancing */ - unsigned int offset; -#ifdef MHD_SOCKETS_KIND_WINSOCK - uint_fast64_t osb = (uint_fast64_t) client_socket; - osb ^= (((uint_fast64_t) client_socket) >> 9); - osb ^= (((uint_fast64_t) client_socket) >> 18); - osb ^= (((uint_fast64_t) client_socket) >> 27); - osb ^= (((uint_fast64_t) client_socket) >> 36); - osb ^= (((uint_fast64_t) client_socket) >> 45); - osb ^= (((uint_fast64_t) client_socket) >> 54); - osb ^= (((uint_fast64_t) client_socket) >> 63); - offset = (unsigned int) osb; -#else - offset = (unsigned int) client_socket; -#endif + size_t d_offset; - for (i = 0; i < daemon->threading.hier.pool.num; ++i) - { - struct MHD_Daemon *const restrict worker = - daemon->threading.hier.pool.workers - + (i + offset) % daemon->threading.hier.pool.num; - if (worker->conns.block_new) - continue; - return internal_add_connection (worker, - client_socket, - addr, - addrlen, - true, - sk_nonbl, - sk_spipe_supprs, - mhd_T_MAYBE); - } + d_offset = + mhd_atomic_counter_get_inc_wrap ( + &(daemon->events.act_req.ext_added.master.next_d_idx)); + + d_to_add = (daemon->threading.hier.pool.workers + + (d_offset % daemon->threading.hier.pool.num)); + mhd_ASSUME (NULL != d_to_add); + + if (d_to_add->conns.block_new) + { + /* Try to find daemon with available connection slots */ + size_t i; + + /* Start from the other side of the workers pool to avoid + conflict with the next called "add external connection". */ + d_offset += daemon->threading.hier.pool.num / 2; + + for (i = 0u; i < daemon->threading.hier.pool.num; ++i) + { + d_to_add = (daemon->threading.hier.pool.workers + + ((d_offset + i) % daemon->threading.hier.pool.num)); + + if (d_to_add->conns.block_new) + d_to_add = NULL; + else + break; + } + } +#else /* ! MHD_SUPPORT_THREADS */ + mhd_UNREACHABLE (); + d_to_add = NULL; +#endif /* ! MHD_SUPPORT_THREADS */ + } + + if (NULL == d_to_add) + ret = MHD_SC_LIMIT_CONNECTIONS_REACHED; + else + { + mhd_mutex_lock_chk ( + &(d_to_add->events.act_req.ext_added.worker.q_lock)); + mhd_DLINKEDL_INS_LAST (&(d_to_add->events.act_req.ext_added.worker), + new_conn, + queue); + mhd_mutex_unlock_chk ( + &(d_to_add->events.act_req.ext_added.worker.q_lock)); - /* all pools are at their connection limit, must refuse */ - (void) mhd_socket_close (client_socket); - return MHD_SC_LIMIT_CONNECTIONS_REACHED; + mhd_daemon_trigger_itc (d_to_add); + + return MHD_SC_OK; /* Success exit point */ + } + + /* Below is a clean-up path */ + + if (NULL != new_conn->addr) + free (new_conn->addr); + } + free (new_conn); + } } -#endif /* MHD_SUPPORT_THREADS */ - return internal_add_connection (daemon, - client_socket, - addr, - addrlen, - true, - sk_nonbl, - sk_spipe_supprs, - mhd_T_MAYBE); + (void) mhd_socket_close (new_socket); + + mhd_assert (MHD_SC_OK != ret); + + return ret; } @@ -1115,9 +1139,8 @@ mhd_daemon_accept_connection (struct MHD_Daemon *restrict daemon) #endif /* mhd_socket_nosignal */ return (MHD_SC_OK == internal_add_connection (daemon, s, - (const struct sockaddr *) - addrstorage, (size_t) addrlen, + addrstorage, false, sk_nonbl, sk_spipe_supprs, @@ -1213,3 +1236,54 @@ mhd_conn_close_final (struct MHD_Connection *restrict c) free (c); } + + +MHD_INTERNAL MHD_FN_PAR_NONNULL_ALL_ void +mhd_daemon_process_ext_added_conns (struct MHD_Daemon *restrict d) +{ + struct mhd_DaemonExtAddedConn *ext_added; + mhd_DLNKDL_LIST (mhd_DaemonExtAddedConn, detached_q); + + mhd_assert (! mhd_D_HAS_WORKERS (d)); + + if (NULL == + mhd_DLINKEDL_GET_FIRST (&(d->events.act_req.ext_added.worker), + queue)) + return; /* Shortcut: the queue is empty */ + + /* Detach the queue to quickly manipulate the lock one time only */ + mhd_mutex_lock_chk (&(d->events.act_req.ext_added.worker.q_lock)); + detached_q = d->events.act_req.ext_added.worker.queue; + mhd_DLINKEDL_INIT_LIST (&(d->events.act_req.ext_added.worker), + queue); + mhd_mutex_unlock_chk (&(d->events.act_req.ext_added.worker.q_lock)); + + /* Process without lock the detached queue in FIFO order */ + for (ext_added = mhd_DLINKEDL_GET_FIRST_D (&detached_q); + NULL != ext_added; + ext_added = mhd_DLINKEDL_GET_FIRST_D (&detached_q)) + { + mhd_ASSUME (NULL == mhd_DLINKEDL_GET_PREV (ext_added, + queue)); + mhd_DLINKEDL_DEL_D (&detached_q, ext_added, + queue); + + if (! d->conns.block_new) + { + (void) internal_add_connection (d, + ext_added->skt, + ext_added->addr_size, + ext_added->addr, + true, + ext_added->is_nonblock, + ext_added->has_spipe_suppr, + mhd_T_MAYBE); + } + else + { + if (NULL != ext_added->addr) + free (ext_added->addr); + } + free (ext_added); + } +} diff --git a/src/mhd2/daemon_add_conn.h b/src/mhd2/daemon_add_conn.h @@ -124,4 +124,14 @@ mhd_conn_close_final (struct MHD_Connection *restrict c) MHD_FN_PAR_NONNULL_ALL_; +/** + * Process externally added connections (if any). + * If the daemon at the connection limit, then any pending externally + * added connections will be closed without processing. + * @param d the daemon handle to use + */ +MHD_INTERNAL void +mhd_daemon_process_ext_added_conns (struct MHD_Daemon *restrict d) +MHD_FN_PAR_NONNULL_ALL_; + #endif /* ! MHD_DAEMON_ADD_CONN */ diff --git a/src/mhd2/daemon_start.c b/src/mhd2/daemon_start.c @@ -46,6 +46,7 @@ #include "mhd_assert.h" #include "mhd_unreachable.h" +#include "mhd_assume.h" #include "mhd_constexpr.h" @@ -60,6 +61,8 @@ #include "mhd_sockets_macros.h" #include "sys_ip_headers.h" +#include "mhd_atomic_counter.h" + #ifdef MHD_SOCKETS_KIND_POSIX # include "sys_errno.h" #endif @@ -2601,6 +2604,150 @@ deinit_individual_thread_data_events_conns (struct MHD_Daemon *restrict d) /** + * Initialise the data specific only for the worker daemon. + * @param d the daemon object + * @return #MHD_SC_OK on success, + * the error code otherwise + */ +static MHD_FN_PAR_NONNULL_ (1) +MHD_FN_MUST_CHECK_RESULT_ enum MHD_StatusCode +init_worker_only_data (struct MHD_Daemon *restrict d) +{ + enum MHD_StatusCode res; + struct mhd_DaemonExtAddedConnectionsWorker *worker_only_data = + &(d->events.act_req.ext_added.worker); + + mhd_assert (! mhd_D_HAS_WORKERS (d)); + mhd_assert (d->dbg.net_inited); + mhd_assert (! d->dbg.worker_only_inited); + mhd_assert (mhd_D_HAS_MASTER (d) || ! d->dbg.master_only_inited); + mhd_assert (! mhd_D_HAS_MASTER (d) || d->dbg.master_only_inited); /* Copied from master daemon */ + +#ifndef NDEBUG + /* "master"-only data will be overwritten here without de-initialising */ + d->dbg.master_only_inited = false; +#endif /* ! NDEBUG */ + + mhd_DLINKEDL_INIT_LIST (worker_only_data, queue); + + if (! mhd_mutex_init (&(worker_only_data->q_lock))) + { + mhd_LOG_MSG (d, MHD_SC_MUTEX_INIT_FAILURE, \ + "Failed to initialise mutex for externally added " + "connections"); + res = MHD_SC_MUTEX_INIT_FAILURE; + } + else + res = MHD_SC_OK; + +#ifndef NDEBUG + if (MHD_SC_OK == res) + d->dbg.worker_only_inited = true; +#endif /* ! NDEBUG */ + + return res; +} + + +/** + * De-initialise the data specific only for the worker daemon. + * @param d the daemon object + */ +static MHD_FN_PAR_NONNULL_ (1) void +deinit_worker_only_data (struct MHD_Daemon *restrict d) +{ + struct mhd_DaemonExtAddedConnectionsWorker *worker_only_data = + &(d->events.act_req.ext_added.worker); + struct mhd_DaemonExtAddedConn *q_e; + + mhd_assert (! mhd_D_HAS_WORKERS (d)); + mhd_assert (d->dbg.net_inited); + mhd_assert (d->dbg.worker_only_inited); + mhd_assert (! d->dbg.master_only_inited); + + /* Clean-up all unprocessed entries */ + + for (q_e = mhd_DLINKEDL_GET_FIRST (worker_only_data, queue); + NULL != q_e; + q_e = mhd_DLINKEDL_GET_FIRST (worker_only_data, queue)) + { + mhd_ASSUME (NULL == mhd_DLINKEDL_GET_PREV (q_e, queue)); + mhd_DLINKEDL_DEL (worker_only_data, q_e, queue); + mhd_socket_close (q_e->skt); + + if (NULL != q_e->addr) + free (q_e->addr); + + free (q_e); + } + + mhd_mutex_destroy_chk (&(worker_only_data->q_lock)); + +#ifndef NDEBUG + d->dbg.worker_only_inited = false; +#endif /* ! NDEBUG */ +} + + +/** + * Initialise worker daemon (the only daemon or member of the worker pool) + * worker-specific daemon data, individual thread data and finish events + * initialising. + * To be used only with non-master daemons. + * Do not start the thread even if configured for the internal threads. + * @param d the daemon object + * @return #MHD_SC_OK on success, + * the error code otherwise + */ +static MHD_FN_PAR_NONNULL_ (1) MHD_FN_PAR_NONNULL_ (2) +MHD_FN_MUST_CHECK_RESULT_ enum MHD_StatusCode +init_worker (struct MHD_Daemon *restrict d, + struct DaemonOptions *restrict s) +{ + enum MHD_StatusCode res; + mhd_assert (! mhd_D_TYPE_HAS_WORKERS (d->threading.d_type)); + + res = init_worker_only_data (d); + + if (MHD_SC_OK == res) + { + res = init_individual_thread_data_events_conns (d, + s); + + if (MHD_SC_OK == res) + return MHD_SC_OK; + + /* Below is a clean-up path */ + + deinit_worker_only_data (d); + } + + mhd_assert (MHD_SC_OK != res); + + return res; +} + + +/** + * De-initialise worker daemon (the only daemon or member of the worker pool) + * worker-specific daemon data, individual thread data and finish events + * initialising. + * To be used only with non-master daemons. + * The internal thread (is any) must be stopped already. + * @param d the daemon object + */ +static MHD_FN_PAR_NONNULL_ (1) void +deinit_worker (struct MHD_Daemon *restrict d) +{ + mhd_assert (! mhd_D_TYPE_HAS_WORKERS (d->threading.d_type)); + + deinit_individual_thread_data_events_conns (d); + + deinit_worker_only_data (d); +} + + +/** * Set the maximum number of handled connections for the daemon. * Works only for global limit, does not work for the worker daemon. * @param d the daemon object @@ -2855,7 +3002,7 @@ deinit_workers_pool (struct MHD_Daemon *restrict d, for (i = num_workers - 1; num_workers > i; --i) { /* Note: loop exits after underflow of 'i' */ struct MHD_Daemon *const worker = d->threading.hier.pool.workers + i; - deinit_individual_thread_data_events_conns (worker); + deinit_worker (worker); #ifdef MHD_SUPPORT_EPOLL if (mhd_POLL_TYPE_EPOLL == worker->events.poll_type) deinit_epoll (worker); @@ -2975,7 +3122,8 @@ init_workers_pool (struct MHD_Daemon *restrict d, #endif /* MHD_SUPPORT_EPOLL */ if (MHD_SC_OK == res) { - res = init_individual_thread_data_events_conns (worker, s); + res = init_worker (worker, + s); if (MHD_SC_OK == res) continue; /* Process the next worker */ @@ -3007,6 +3155,122 @@ init_workers_pool (struct MHD_Daemon *restrict d, } +/** + * Initialise data specific only for the master daemon. + * @param d the daemon object + * @return #MHD_SC_OK on success, + * the error code otherwise + */ +static MHD_FN_PAR_NONNULL_ (1) +MHD_FN_MUST_CHECK_RESULT_ enum MHD_StatusCode +init_master_only_data (struct MHD_Daemon *restrict d) +{ + enum MHD_StatusCode res; + + mhd_assert (mhd_D_HAS_WORKERS (d)); + mhd_assert (d->dbg.net_inited); + mhd_assert (! d->dbg.master_only_inited); + mhd_assert (! d->dbg.worker_only_inited); + + if (! mhd_atomic_counter_init ( \ + &(d->events.act_req.ext_added.master.next_d_idx), + 0)) + { + mhd_LOG_MSG (d, MHD_SC_MUTEX_INIT_FAILURE, \ + "Failed to initialise atomic counter for externally added " + "connections"); + res = MHD_SC_MUTEX_INIT_FAILURE; + } + else + res = MHD_SC_OK; + +#ifndef NDEBUG + if (MHD_SC_OK == res) + d->dbg.master_only_inited = true; +#endif /* ! NDEBUG */ + + return res; +} + + +/** + * De-initialise data specific only for the master daemon. + * @param d the daemon object + */ +static MHD_FN_PAR_NONNULL_ (1) void +deinit_master_only_data (struct MHD_Daemon *restrict d) +{ + mhd_assert (mhd_D_HAS_WORKERS (d)); + mhd_assert (d->dbg.master_only_inited); + mhd_assert (! d->dbg.worker_only_inited); + + mhd_atomic_counter_deinit (&(d->events.act_req.ext_added.master.next_d_idx)); + +#ifndef NDEBUG + d->dbg.master_only_inited = false; +#endif /* ! NDEBUG */ +} + + +/** + * Initialise individual events, connection data for the "master" daemon, + * including master-only data, the workers pool, and the workers daemons, + * including individual worker-specific threading and other data. + * Do not start the threads. + * @param d the daemon object + * @param s the user settings + * @return #MHD_SC_OK on success, + * the error code otherwise + */ +static MHD_FN_PAR_NONNULL_ (1) MHD_FN_PAR_NONNULL_ (2) +MHD_FN_MUST_CHECK_RESULT_ enum MHD_StatusCode +init_master (struct MHD_Daemon *restrict d, + struct DaemonOptions *restrict s) +{ + enum MHD_StatusCode res; + mhd_assert (mhd_D_TYPE_HAS_WORKERS (d->threading.d_type)); + + res = init_master_only_data (d); + if (MHD_SC_OK != res) + return res; + + res = init_workers_pool (d, + s); + if (MHD_SC_OK == res) + { + /* Copy some settings to the master daemon */ + d->conns.cfg.mem_pool_size = + d->threading.hier.pool.workers[0].conns.cfg.mem_pool_size; + + return res; + } + + /* Below is a clean-up path */ + + deinit_master_only_data (d); + + mhd_assert (MHD_SC_OK != res); + return res; +} + + +/** + * De-initialise individual events, connection data for the "master" daemon, + * including master-only data, the workers pool, and the workers daemons, + * including individual worker-specific threading and other data. + * The threads must be not running. + * @param d the daemon object + */ +static MHD_FN_PAR_NONNULL_ (1) void +deinit_master (struct MHD_Daemon *restrict d) +{ + deinit_workers_pool (d, + d->threading.hier.pool.num); + + deinit_master_only_data (d); +} + + #endif /* MHD_SUPPORT_THREADS */ /** @@ -3043,18 +3307,12 @@ daemon_init_threading_and_conn (struct MHD_Daemon *restrict d, #endif /* MHD_SUPPORT_THREADS */ if (! mhd_D_HAS_WORKERS (d)) - res = init_individual_thread_data_events_conns (d, s); + res = init_worker (d, + s); #ifdef MHD_SUPPORT_THREADS else - { - res = init_workers_pool (d, s); - if (MHD_SC_OK == res) - { - /* Copy some settings to the master daemon */ - d->conns.cfg.mem_pool_size = - d->threading.hier.pool.workers[0].conns.cfg.mem_pool_size; - } - } + res = init_master (d, + s); #endif /* MHD_SUPPORT_THREADS */ if (MHD_SC_OK == res) @@ -3094,7 +3352,7 @@ daemon_deinit_threading_and_conn (struct MHD_Daemon *restrict d) mhd_assert (d->dbg.connections_inited); mhd_assert (d->dbg.events_allocated); mhd_assert (! d->dbg.thread_pool_inited); - deinit_individual_thread_data_events_conns (d); + deinit_worker (d); } else { @@ -3103,7 +3361,7 @@ daemon_deinit_threading_and_conn (struct MHD_Daemon *restrict d) mhd_assert (! d->dbg.connections_inited); mhd_assert (! d->dbg.events_allocated); mhd_assert (d->dbg.thread_pool_inited); - deinit_workers_pool (d, d->threading.hier.pool.num); + deinit_master (d); #else /* ! MHD_SUPPORT_THREADS */ mhd_assert (0 && "Impossible value"); mhd_UNREACHABLE (); diff --git a/src/mhd2/mhd_daemon.h b/src/mhd2/mhd_daemon.h @@ -83,6 +83,7 @@ struct DaemonOptions; /* Forward declaration */ struct MHD_Connection; /* Forward declaration */ +struct sockaddr_storage; /* Forward declaration */ /** * The helper struct for the connections list @@ -487,6 +488,97 @@ union mhd_DaemonEventMonitoringTypeSpecificData }; +struct mhd_DaemonExtAddedConn; /* Forward declaration */ +mhd_DLINKEDL_STRUCTS_DEFS (mhd_DaemonExtAddedConn); + +/** + * Information about externally added connection + */ +struct mhd_DaemonExtAddedConn +{ + /** + * The links to the other externally added connections in the queue + */ + mhd_DLNKDL_LINKS (mhd_DaemonExtAddedConn, queue); + + /** + * The socket of the externally added connection + */ + MHD_Socket skt; + + /** + * The size of the data pointed by @a addr. + * Zero fi @a addr is NULL. + */ + size_t addr_size; + + /** + * Pointer to socket address. + * Allocated by malloc(), must be freed; + * May be NULL. + */ + struct sockaddr_storage *addr; + + /** + * 'true' if @a skt is non-blocking + */ + bool is_nonblock; + + /** + * 'true' if @a skt has SIGPIPE suppressed + */ + bool has_spipe_suppr; +}; + + +/** + * Information about externally added connections for the worker daemon + */ +struct mhd_DaemonExtAddedConnectionsWorker +{ + /** + * The lock to access @a queue list + */ + mhd_mutex q_lock; + + /** + * The list with the queue of externally added connections + */ + mhd_DLNKDL_LIST (mhd_DaemonExtAddedConn, queue); +}; + +#ifdef MHD_SUPPORT_THREADS +/** + * Information about externally added connections for the master daemon + */ +struct mhd_DaemonExtAddedConnectionsMaster +{ + /** + * The index (modulo number of workers) of the next worker daemon to add + * connection + */ + struct mhd_AtomicCounter next_d_idx; +}; +#endif /* MHD_SUPPORT_THREADS */ + +/** + * Information about externally added connections + */ +union mhd_DaemonExtAddedConnections +{ +#ifdef MHD_SUPPORT_THREADS + /** + * Information about externally added connections for the master daemon + */ + struct mhd_DaemonExtAddedConnectionsMaster master; +#endif /* MHD_SUPPORT_THREADS */ + /** + * Information about externally added connections for the worker daemon + */ + struct mhd_DaemonExtAddedConnectionsWorker worker; +}; + + /** * The required actions for the daemon */ @@ -496,6 +588,11 @@ struct mhd_DaemonEventActionRequired * If 'true' connection resuming is required. */ bool resume; + + /** + * Information about externally added connections + */ + union mhd_DaemonExtAddedConnections ext_added; }; /** @@ -1170,6 +1267,8 @@ struct mhd_daemon_debug { bool net_inited; bool net_deinited; + bool master_only_inited; + bool worker_only_inited; bool tls_inited; bool events_allocated; unsigned int num_events_elements;