diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-09-03 09:56:30 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-09-03 09:56:30 +0000 |
commit | 6fa19ae05f226451db78316919908c44d6444ac4 (patch) | |
tree | d3f2520a441b05d12e2001a3d3e3bbc95fe65543 | |
parent | 5395e6196d8f469f95d904eff8afd731a9cceb29 (diff) |
implementing 'Connection: upgrade' for thread-per-connection modes, but untested
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | src/include/microhttpd.h | 16 | ||||
-rw-r--r-- | src/microhttpd/Makefile.am | 2 | ||||
-rw-r--r-- | src/microhttpd/connection.c | 4 | ||||
-rw-r--r-- | src/microhttpd/daemon.c | 546 | ||||
-rw-r--r-- | src/microhttpd/internal.h | 26 | ||||
-rw-r--r-- | src/microhttpd/mhd_locks.h | 47 | ||||
-rw-r--r-- | src/microhttpd/mhd_sem.c | 138 | ||||
-rw-r--r-- | src/microhttpd/response.c | 78 |
9 files changed, 677 insertions, 184 deletions
@@ -1,3 +1,7 @@ +Sat Sep 3 11:56:20 CEST 2016 + Adding logic for handling HTTP "Upgrade" in thread-per-connection + mode. Also still untested. -CG + Sat Aug 27 21:01:43 CEST 2016 Adding a few extra safety checks around HTTP "Upgrade" (against wrong uses of API), and a testcase. -CG diff --git a/src/include/microhttpd.h b/src/include/microhttpd.h index 7347447c..afb97571 100644 --- a/src/include/microhttpd.h +++ b/src/include/microhttpd.h @@ -1,6 +1,6 @@ /* This file is part of libmicrohttpd - Copyright (C) 2006-2015 Christian Grothoff (and other contributing authors) + Copyright (C) 2006-2016 Christian Grothoff (and other contributing authors) This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -2261,7 +2261,19 @@ enum MHD_UpgradeAction * NOTE: it is unclear if we want to have this in the * "final" API, this is just an idea right now. */ - MHD_UPGRADE_ACTION_CORK + MHD_UPGRADE_ACTION_CORK, + + /** + * Try to "flush" our write buffer (to the network), returning + * #MHD_YES on success (buffer is empty) and #MHD_NO on failure + * (unsent bytes remain in buffers). This option is useful if + * the application wants to make sure that all data has been sent, + * which may be a good idea before closing the socket. + * + * NOTE: it is unclear if we want to have this in the + * "final" API, this is just an idea right now. + */ + MHD_UPGRADE_ACTION_FLUSH }; diff --git a/src/microhttpd/Makefile.am b/src/microhttpd/Makefile.am index 6973af8a..9a32f598 100644 --- a/src/microhttpd/Makefile.am +++ b/src/microhttpd/Makefile.am @@ -63,7 +63,7 @@ libmicrohttpd_la_SOURCES = \ sysfdsetsize.c sysfdsetsize.h \ mhd_str.c mhd_str.h \ mhd_threads.c mhd_threads.h \ - mhd_locks.h \ + mhd_locks.h mhd_sem.c \ mhd_sockets.c mhd_sockets.h \ mhd_itc.c mhd_itc.h \ mhd_compat.c mhd_compat.h \ diff --git a/src/microhttpd/connection.c b/src/microhttpd/connection.c index e3880e8a..0c21bc57 100644 --- a/src/microhttpd/connection.c +++ b/src/microhttpd/connection.c @@ -413,7 +413,8 @@ MHD_set_connection_value (struct MHD_Connection *connection, */ const char * MHD_lookup_connection_value (struct MHD_Connection *connection, - enum MHD_ValueKind kind, const char *key) + enum MHD_ValueKind kind, + const char *key) { struct MHD_HTTP_Header *pos; @@ -2772,7 +2773,6 @@ MHD_connection_handle_idle (struct MHD_Connection *connection) /* Buffering for flushable socket was already enabled*/ if (MHD_NO == socket_flush_possible (connection)) socket_start_no_buffering (connection); - break; } /* not ready, no socket action */ diff --git a/src/microhttpd/daemon.c b/src/microhttpd/daemon.c index 791e17d4..a298a1a4 100644 --- a/src/microhttpd/daemon.c +++ b/src/microhttpd/daemon.c @@ -636,6 +636,79 @@ MHD_get_fdset (struct MHD_Daemon *daemon, /** + * Obtain the select() file descriptor sets for the + * given @a urh. + * + * @param urh upgrade handle to wait for + * @param[out] rs read set to initialize + * @param[out] ws write set to initialize + * @param[out] max_fd maximum FD to update + * @param fd_setsize value of FD_SETSIZE + * @return #MHD_YES on success, #MHD_NO on error + */ +static int +urh_to_fdset (struct MHD_UpgradeResponseHandle *urh, + fd_set *rs, + fd_set *ws, + MHD_socket *max_fd, + unsigned int fd_setsize) +{ + if ( (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) && + (! MHD_add_to_fd_set_ (urh->mhd.socket, + rs, + max_fd, + fd_setsize)) ) + return MHD_NO; + if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) && + (! MHD_add_to_fd_set_ (urh->mhd.socket, + ws, + max_fd, + fd_setsize)) ) + return MHD_NO; + if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) && + (! MHD_add_to_fd_set_ (urh->connection->socket_fd, + rs, + max_fd, + fd_setsize)) ) + return MHD_NO; + if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) && + (! MHD_add_to_fd_set_ (urh->connection->socket_fd, + ws, + max_fd, + fd_setsize)) ) + return MHD_NO; + return MHD_YES; +} + + +/** + * Update the @a urh based on the ready FDs in the @a rs and @a ws. + * + * @param urh upgrade handle to update + * @param rs read result from select() + * @param ws write result from select() + */ +static void +urh_from_fdset (struct MHD_UpgradeResponseHandle *urh, + const fd_set *rs, + const fd_set *ws) +{ + if (FD_ISSET (urh->connection->socket_fd, + rs)) + urh->app.celi |= MHD_EPOLL_STATE_READ_READY; + if (FD_ISSET (urh->connection->socket_fd, + ws)) + urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY; + if (FD_ISSET (urh->mhd.socket, + rs)) + urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY; + if (FD_ISSET (urh->mhd.socket, + ws)) + urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY; +} + + +/** * Obtain the `select()` sets for this daemon. * Daemon's FDs will be added to fd_sets. To get only * daemon FDs in fd_sets, call FD_ZERO for each fd_set @@ -733,29 +806,12 @@ MHD_get_fdset2 (struct MHD_Daemon *daemon, } for (urh = daemon->urh_head; NULL != urh; urh = urh->next) { - if ( (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) && - (! MHD_add_to_fd_set_ (urh->mhd.socket, - read_fd_set, - max_fd, - fd_setsize)) ) - result = MHD_NO; - if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) && - (! MHD_add_to_fd_set_ (urh->mhd.socket, - write_fd_set, - max_fd, - fd_setsize)) ) - result = MHD_NO; - if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) && - (! MHD_add_to_fd_set_ (urh->connection->socket_fd, - read_fd_set, - max_fd, - fd_setsize)) ) - result = MHD_NO; - if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) && - (! MHD_add_to_fd_set_ (urh->connection->socket_fd, - write_fd_set, - max_fd, - fd_setsize)) ) + if (MHD_NO == + urh_to_fdset (urh, + read_fd_set, + write_fd_set, + max_fd, + fd_setsize)) result = MHD_NO; } #if DEBUG_CONNECT @@ -825,6 +881,251 @@ call_handlers (struct MHD_Connection *con, } +#if HTTPS_SUPPORT +/** + * Performs bi-directional forwarding on upgraded HTTPS connections + * based on the readyness state stored in the @a urh handle. + * + * @param urh handle to process + */ +static void +process_urh (struct MHD_UpgradeResponseHandle *urh) +{ + /* handle reading from TLS client and writing to application */ + if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) && + (urh->in_buffer_off < urh->in_buffer_size) ) + { + ssize_t res; + + res = gnutls_record_recv (urh->connection->tls_session, + &urh->in_buffer[urh->in_buffer_off], + urh->in_buffer_size - urh->in_buffer_off); + if ( (GNUTLS_E_AGAIN == res) || + (GNUTLS_E_INTERRUPTED == res) ) + { + urh->app.celi &= ~MHD_EPOLL_STATE_READ_READY; + } + else if (res > 0) + { + urh->in_buffer_off += res; + } + } + if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) && + (urh->in_buffer_off > 0) ) + { + size_t res; + + res = write (urh->mhd.socket, + urh->in_buffer, + urh->in_buffer_off); + if (-1 == res) + { + /* FIXME: differenciate by errno? */ + urh->mhd.celi &= ~MHD_EPOLL_STATE_WRITE_READY; + } + else + { + if (urh->in_buffer_off != res) + { + memmove (urh->in_buffer, + &urh->in_buffer[res], + urh->in_buffer_off - res); + urh->in_buffer_off -= res; + } + else + { + urh->in_buffer_off = 0; + } + } + } + + /* handle reading from application and writing to HTTPS client */ + if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) && + (urh->out_buffer_off < urh->out_buffer_size) ) + { + size_t res; + + res = read (urh->mhd.socket, + &urh->out_buffer[urh->out_buffer_off], + urh->out_buffer_size - urh->out_buffer_off); + if (-1 == res) + { + /* FIXME: differenciate by errno? */ + urh->mhd.celi &= ~MHD_EPOLL_STATE_READ_READY; + } + else + { + urh->out_buffer_off += res; + } + } + if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) && + (urh->out_buffer_off > 0) ) + { + ssize_t res; + + res = gnutls_record_send (urh->connection->tls_session, + urh->out_buffer, + urh->out_buffer_off); + if ( (GNUTLS_E_AGAIN == res) || + (GNUTLS_E_INTERRUPTED == res) ) + { + urh->app.celi &= ~MHD_EPOLL_STATE_WRITE_READY; + } + else if (res > 0) + { + if (urh->out_buffer_off != res) + { + memmove (urh->out_buffer, + &urh->out_buffer[res], + urh->out_buffer_off - res); + urh->out_buffer_off -= res; + } + else + { + urh->out_buffer_off = 0; + } + } + } +} +#endif + + +/** + * Main function of the thread that handles an individual connection + * after it was "upgraded" when #MHD_USE_THREAD_PER_CONNECTION is set. + * + * @param con the connection this thread will handle + */ +static void +thread_main_connection_upgrade (struct MHD_Connection *con) +{ + struct MHD_Daemon *daemon = con->daemon; + + if (0 == (daemon->options & MHD_USE_SSL)) + { + /* Here, we need to block until the application + signals us that it is done with the socket */ + MHD_semaphore_down (con->upgrade_sem); + MHD_semaphore_destroy (con->upgrade_sem); + con->upgrade_sem = NULL; + return; + } +#if HTTPS_SUPPORT + { + struct MHD_UpgradeResponseHandle *urh = con->urh; + + /* Here, we need to bi-directionally forward + until the application tells us that it is done + with the socket; */ + if (0 == (daemon->options & MHD_USE_POLL)) + { + while (MHD_CONNECTION_UPGRADE == con->state) + { + /* use select */ + fd_set rs; + fd_set ws; + MHD_socket max_fd; + int num_ready; + int result; + + FD_ZERO (&rs); + FD_ZERO (&ws); + max_fd = MHD_INVALID_SOCKET; + result = urh_to_fdset (urh, + &rs, + &ws, + &max_fd, + FD_SETSIZE); + if (MHD_NO == result) + { +#ifdef HAVE_MESSAGES + MHD_DLOG (con->daemon, + "Error preparing select\n"); +#endif + break; + } + num_ready = MHD_SYS_select_ (max_fd + 1, + &rs, + &ws, + NULL, + NULL); + if (num_ready < 0) + { + const int err = MHD_socket_get_error_(); + + if (MHD_SCKT_ERR_IS_EINTR_(err)) + continue; +#ifdef HAVE_MESSAGES + MHD_DLOG (con->daemon, + "Error during select (%d): `%s'\n", + err, + MHD_socket_strerr_ (err)); +#endif + break; + } + urh_from_fdset (urh, + &rs, + &ws); + process_urh (urh); + } + } +#ifdef HAVE_POLL + else + { + /* use poll() */ + struct pollfd p[2]; + const unsigned int timeout = UINT_MAX; + + p[0].fd = urh->connection->socket_fd; + p[1].fd = urh->mhd.socket; + while (MHD_CONNECTION_UPGRADE == con->state) + { + if (0 == (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) + p[0].events |= POLLIN; + if (0 == (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) + p[0].events |= POLLOUT; + if (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) + p[1].events |= POLLIN; + if (0 == (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) + p[1].events |= POLLOUT; + + if (MHD_sys_poll_ (p, + 2, + timeout) < 0) + { + const int err = MHD_socket_get_error_ (); + + if (MHD_SCKT_ERR_IS_EINTR_ (err)) + continue; +#ifdef HAVE_MESSAGES + MHD_DLOG (con->daemon, + "Error during poll: `%s'\n", + MHD_socket_strerr_ (err)); +#endif + break; + } + if (0 != (p[0].revents & POLLIN)) + urh->app.celi |= MHD_EPOLL_STATE_READ_READY; + if (0 != (p[0].revents & POLLOUT)) + urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY; + if (0 != (p[1].revents & POLLIN)) + urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY; + if (0 != (p[1].revents & POLLOUT)) + urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY; + process_urh (urh); + } + } + /* end POLL */ +#endif + /* end HTTPS */ +#else + /* HTTPS option set, but compiled without HTTPS */ + MHD_PANIC ("This should not be possible\n"); +#endif + } +} + + /** * Main function of the thread that handles an individual * connection when #MHD_USE_THREAD_PER_CONNECTION is set. @@ -836,6 +1137,7 @@ static MHD_THRD_RTRN_TYPE_ MHD_THRD_CALL_SPEC_ thread_main_handle_connection (void *data) { struct MHD_Connection *con = data; + struct MHD_Daemon *daemon = con->daemon; int num_ready; fd_set rs; fd_set ws; @@ -844,7 +1146,7 @@ thread_main_handle_connection (void *data) struct timeval *tvp; time_t now; #if WINDOWS - MHD_pipe spipe = con->daemon->wpipe[0]; + MHD_pipe spipe = daemon->wpipe[0]; #ifdef HAVE_POLL int extra_slot; #endif /* HAVE_POLL */ @@ -855,11 +1157,13 @@ thread_main_handle_connection (void *data) #ifdef HAVE_POLL struct pollfd p[1 + EXTRA_SLOTS]; #endif +#undef EXTRA_SLOTS - while ( (MHD_YES != con->daemon->shutdown) && + while ( (MHD_YES != daemon->shutdown) && (MHD_CONNECTION_CLOSED != con->state) ) { - unsigned const int timeout = con->daemon->connection_timeout; + const unsigned int timeout = daemon->connection_timeout; + tvp = NULL; #if HTTPS_SUPPORT if (MHD_YES == con->tls_read_ready) @@ -870,7 +1174,8 @@ thread_main_handle_connection (void *data) tvp = &tv; } #endif - if (NULL == tvp && timeout > 0) + if ( (NULL == tvp) && + (timeout > 0) ) { now = MHD_monotonic_sec_counter(); if (now - con->last_activity > timeout) @@ -884,35 +1189,48 @@ thread_main_handle_connection (void *data) if (seconds_left > TIMEVAL_TV_SEC_MAX) tv.tv_sec = TIMEVAL_TV_SEC_MAX; else - tv.tv_sec = (_MHD_TIMEVAL_TV_SEC_TYPE)seconds_left; + tv.tv_sec = (_MHD_TIMEVAL_TV_SEC_TYPE) seconds_left; #endif /* _WIN32 */ } tv.tv_usec = 0; tvp = &tv; } - if (0 == (con->daemon->options & MHD_USE_POLL)) + if (0 == (daemon->options & MHD_USE_POLL)) { /* use select */ int err_state = 0; + FD_ZERO (&rs); FD_ZERO (&ws); maxsock = MHD_INVALID_SOCKET; switch (con->event_loop_info) { case MHD_EVENT_LOOP_INFO_READ: - if (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, FD_SETSIZE)) + if (! MHD_add_to_fd_set_ (con->socket_fd, + &rs, + &maxsock, + FD_SETSIZE)) err_state = 1; break; case MHD_EVENT_LOOP_INFO_WRITE: - if (!MHD_add_to_fd_set_ (con->socket_fd, &ws, &maxsock, FD_SETSIZE)) + if (! MHD_add_to_fd_set_ (con->socket_fd, + &ws, + &maxsock, + FD_SETSIZE)) err_state = 1; if ( (con->read_buffer_size > con->read_buffer_offset) && - (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, FD_SETSIZE)) ) + (! MHD_add_to_fd_set_ (con->socket_fd, + &rs, + &maxsock, + FD_SETSIZE)) ) err_state = 1; break; case MHD_EVENT_LOOP_INFO_BLOCK: if ( (con->read_buffer_size > con->read_buffer_offset) && - (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, FD_SETSIZE)) ) + (! MHD_add_to_fd_set_ (con->socket_fd, + &rs, + &maxsock, + FD_SETSIZE)) ) err_state = 1; tv.tv_sec = 0; tv.tv_usec = 0; @@ -925,7 +1243,10 @@ thread_main_handle_connection (void *data) #if WINDOWS if (MHD_INVALID_PIPE_ != spipe) { - if (!MHD_add_to_fd_set_ (spipe, &rs, &maxsock, FD_SETSIZE)) + if (! MHD_add_to_fd_set_ (spipe, + &rs, + &maxsock, + FD_SETSIZE)) err_state = 1; } #endif @@ -938,10 +1259,15 @@ thread_main_handle_connection (void *data) goto exit; } - num_ready = MHD_SYS_select_ (maxsock + 1, &rs, &ws, NULL, tvp); + num_ready = MHD_SYS_select_ (maxsock + 1, + &rs, + &ws, + NULL, + tvp); if (num_ready < 0) { const int err = MHD_socket_get_error_(); + if (MHD_SCKT_ERR_IS_EINTR_(err)) continue; #ifdef HAVE_MESSAGES @@ -960,8 +1286,10 @@ thread_main_handle_connection (void *data) #endif if (MHD_NO == call_handlers (con, - FD_ISSET (con->socket_fd, &rs), - FD_ISSET (con->socket_fd, &ws), + FD_ISSET (con->socket_fd, + &rs), + FD_ISSET (con->socket_fd, + &ws), MHD_NO)) goto exit; } @@ -969,7 +1297,9 @@ thread_main_handle_connection (void *data) else { /* use poll */ - memset (&p, 0, sizeof (p)); + memset (&p, + 0, + sizeof (p)); p[0].fd = con->socket_fd; switch (con->event_loop_info) { @@ -1004,11 +1334,11 @@ thread_main_handle_connection (void *data) #endif if (MHD_sys_poll_ (p, #if WINDOWS - 1 + extra_slot, + 1 + extra_slot, #else - 1, + 1, #endif - (NULL == tvp) ? -1 : tv.tv_sec * 1000) < 0) + (NULL == tvp) ? -1 : tv.tv_sec * 1000) < 0) { if (MHD_SCKT_LAST_ERR_IS_(MHD_SCKT_EINTR_)) continue; @@ -1033,6 +1363,11 @@ thread_main_handle_connection (void *data) goto exit; } #endif + if (MHD_CONNECTION_UPGRADE == con->state) + { + thread_main_connection_upgrade (con); + break; + } } if (MHD_CONNECTION_IN_CLEANUP != con->state) { @@ -1054,14 +1389,15 @@ exit: con->response = NULL; } - if (NULL != con->daemon->notify_connection) - con->daemon->notify_connection (con->daemon->notify_connection_cls, + if (NULL != daemon->notify_connection) + con->daemon->notify_connection (daemon->notify_connection_cls, con, &con->socket_context, MHD_CONNECTION_NOTIFY_CLOSED); if (MHD_INVALID_SOCKET != con->socket_fd) { - shutdown (con->socket_fd, SHUT_WR); + shutdown (con->socket_fd, + SHUT_WR); if (0 != MHD_socket_close_ (con->socket_fd)) MHD_PANIC ("close failed\n"); con->socket_fd = MHD_INVALID_SOCKET; @@ -1433,7 +1769,7 @@ internal_add_connection (struct MHD_Daemon *daemon, { /* in turbo mode, we assume that non-blocking was already set by 'accept4' or whoever calls 'MHD_add_connection' */ - if (!MHD_socket_nonblocking_ (connection->socket_fd)) + if (! MHD_socket_nonblocking_ (connection->socket_fd)) { #ifdef HAVE_MESSAGES MHD_DLOG (connection->daemon, @@ -2168,115 +2504,6 @@ MHD_get_timeout (struct MHD_Daemon *daemon, } -#if HTTPS_SUPPORT -/** - * Performs bi-directional forwarding on upgraded HTTPS connections - * based on the readyness state stored in the @a urh handle. - * - * @param urh handle to process - */ -static void -process_urh (struct MHD_UpgradeResponseHandle *urh) -{ - /* handle reading from TLS client and writing to application */ - if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) && - (urh->in_buffer_off < urh->in_buffer_size) ) - { - ssize_t res; - - res = gnutls_record_recv (urh->connection->tls_session, - &urh->in_buffer[urh->in_buffer_off], - urh->in_buffer_size - urh->in_buffer_off); - if ( (GNUTLS_E_AGAIN == res) || - (GNUTLS_E_INTERRUPTED == res) ) - { - urh->app.celi &= ~MHD_EPOLL_STATE_READ_READY; - } - else if (res > 0) - { - urh->in_buffer_off += res; - } - } - if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) && - (urh->in_buffer_off > 0) ) - { - size_t res; - - res = write (urh->mhd.socket, - urh->in_buffer, - urh->in_buffer_off); - if (-1 == res) - { - /* FIXME: differenciate by errno? */ - urh->mhd.celi &= ~MHD_EPOLL_STATE_WRITE_READY; - } - else - { - if (urh->in_buffer_off != res) - { - memmove (urh->in_buffer, - &urh->in_buffer[res], - urh->in_buffer_off - res); - urh->in_buffer_off -= res; - } - else - { - urh->in_buffer_off = 0; - } - } - } - - /* handle reading from application and writing to HTTPS client */ - if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) && - (urh->out_buffer_off < urh->out_buffer_size) ) - { - size_t res; - - res = read (urh->mhd.socket, - &urh->out_buffer[urh->out_buffer_off], - urh->out_buffer_size - urh->out_buffer_off); - if (-1 == res) - { - /* FIXME: differenciate by errno? */ - urh->mhd.celi &= ~MHD_EPOLL_STATE_READ_READY; - } - else - { - urh->out_buffer_off += res; - } - } - if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) && - (urh->out_buffer_off > 0) ) - { - ssize_t res; - - res = gnutls_record_send (urh->connection->tls_session, - urh->out_buffer, - urh->out_buffer_off); - if ( (GNUTLS_E_AGAIN == res) || - (GNUTLS_E_INTERRUPTED == res) ) - { - urh->app.celi &= ~MHD_EPOLL_STATE_WRITE_READY; - } - else if (res > 0) - { - if (urh->out_buffer_off != res) - { - memmove (urh->out_buffer, - &urh->out_buffer[res], - urh->out_buffer_off - res); - urh->out_buffer_off -= res; - } - else - { - urh->out_buffer_off = 0; - } - } - } -} -#endif - - /** * Run webserver operations. This method should be called by clients * in combination with #MHD_get_fdset if the client-controlled select @@ -2362,14 +2589,9 @@ MHD_run_from_select (struct MHD_Daemon *daemon, for (urh = daemon->urh_head; NULL != urh; urh = urh->next) { /* update urh state based on select() output */ - if (FD_ISSET (urh->connection->socket_fd, read_fd_set)) - urh->app.celi |= MHD_EPOLL_STATE_READ_READY; - if (FD_ISSET (urh->connection->socket_fd, write_fd_set)) - urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY; - if (FD_ISSET (urh->mhd.socket, read_fd_set)) - urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY; - if (FD_ISSET (urh->mhd.socket, write_fd_set)) - urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY; + urh_from_fdset (urh, + read_fd_set, + write_fd_set); /* call generic forwarding function for passing data */ process_urh (urh); } diff --git a/src/microhttpd/internal.h b/src/microhttpd/internal.h index 00843729..7a1e58e1 100644 --- a/src/microhttpd/internal.h +++ b/src/microhttpd/internal.h @@ -480,7 +480,14 @@ enum MHD_CONNECTION_STATE * Connection was "upgraded" and socket is now under the * control of the application. */ - MHD_CONNECTION_UPGRADE = MHD_TLS_CONNECTION_INIT + 1 + MHD_CONNECTION_UPGRADE = MHD_TLS_CONNECTION_INIT + 1, + + /** + * Connection was "upgraded" and subsequently closed + * by the application. We now need to do our own + * internal cleanup. + */ + MHD_CONNECTION_UPGRADE_CLOSED = MHD_TLS_CONNECTION_INIT + 1 }; @@ -854,6 +861,23 @@ struct MHD_Connection #if HTTPS_SUPPORT /** + * If this connection was upgraded and if we are using + * #MHD_USE_THREAD_PER_CONNECTION, this points to the + * upgrade response details such that the + * #thread_main_connection_upgrade()-logic can perform + * the bi-directional forwarding. + */ + struct MHD_UpgradeResponseHandle *urh; + + /** + * If this connection was upgraded and if we are using + * #MHD_USE_THREAD_PER_CONNECTION without encryption, + * this points to the semaphore we use to signal termination + * to the thread handling the connection. + */ + struct MHD_Semaphore *upgrade_sem; + + /** * State required for HTTPS/SSL/TLS support. */ gnutls_session_t tls_session; diff --git a/src/microhttpd/mhd_locks.h b/src/microhttpd/mhd_locks.h index cf10c0d1..1d8376f0 100644 --- a/src/microhttpd/mhd_locks.h +++ b/src/microhttpd/mhd_locks.h @@ -22,8 +22,9 @@ * @file microhttpd/mhd_locks.h * @brief Header for platform-independent locks abstraction * @author Karlson2k (Evgeny Grin) + * @author Christian Grothoff * - * Provides basic abstraction for locks and mutex. + * Provides basic abstraction for locks/mutex and semaphores. * Any functions can be implemented as macro on some platforms * unless explicitly marked otherwise. * Any function argument can be skipped in macro, so avoid @@ -147,4 +148,48 @@ #define MHD_mutex_unlock_(pmutex) (LeaveCriticalSection((pmutex)), !0) #endif + +/** + * A semaphore. + */ +struct MHD_Semaphore; + + +/** + * Create a semaphore with an initial counter of @a init + * + * @param init initial counter + * @return the semaphore, NULL on error + */ +struct MHD_Semaphore * +MHD_semaphore_create (unsigned int init); + + +/** + * Count down the semaphore, block if necessary. + * + * @param sem semaphore to count down. + */ +void +MHD_semaphore_down (struct MHD_Semaphore *sem); + + +/** + * Increment the semaphore. + * + * @param sem semaphore to increment. + */ +void +MHD_semaphore_up (struct MHD_Semaphore *sem); + + +/** + * Destroys the semaphore. + * + * @param sem semaphore to destroy. + */ +void +MHD_semaphore_destroy (struct MHD_Semaphore *sem); + + #endif /* ! MHD_LOCKS_H */ diff --git a/src/microhttpd/mhd_sem.c b/src/microhttpd/mhd_sem.c new file mode 100644 index 00000000..fdd5dbe4 --- /dev/null +++ b/src/microhttpd/mhd_sem.c @@ -0,0 +1,138 @@ +/* + This file is part of libmicrohttpd + Copyright (C) 2016 Christian Grothoff + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +*/ + +/** + * @file microhttpd/mhd_sem.c + * @brief implementation of semaphores + * @author Christian Grothoff + */ +#include "internal.h" +#include "mhd_locks.h" + +/** + * A semaphore. + */ +struct MHD_Semaphore +{ + /** + * Mutex we use internally. + */ + pthread_mutex_t mutex; + + /** + * Condition variable used to implement the semaphore. + */ + pthread_cond_t cv; + + /** + * Current value of the semaphore. + */ + unsigned int counter; +}; + + +/** + * Create a semaphore with an initial counter of @a init + * + * @param init initial counter + * @return the semaphore, NULL on error + */ +struct MHD_Semaphore * +MHD_semaphore_create (unsigned int init) +{ + struct MHD_Semaphore *sem; + + sem = malloc (sizeof (struct MHD_Semaphore)); + if (NULL == sem) + return NULL; + sem->counter = init; + if (0 != pthread_mutex_init (&sem->mutex, + NULL)) + { + free (sem); + return NULL; + } + if (0 != pthread_cond_init (&sem->cv, + NULL)) + { + (void) pthread_mutex_destroy (&sem->mutex); + free (sem); + return NULL; + } + return sem; +} + + +/** + * Count down the semaphore, block if necessary. + * + * @param sem semaphore to count down. + */ +void +MHD_semaphore_down (struct MHD_Semaphore *sem) +{ + if (! pthread_mutex_lock (&sem->mutex)) + MHD_PANIC ("pthread_mutex_lock for semaphore failed\n"); + while (0 == sem->counter) + { + if (0 != pthread_cond_wait (&sem->cv, + &sem->mutex)) + MHD_PANIC ("pthread_cond_wait failed\n"); + } + sem->counter--; + if (! pthread_mutex_unlock (&sem->mutex)) + MHD_PANIC ("pthread_mutex_unlock for semaphore failed\n"); +} + + +/** + * Increment the semaphore. + * + * @param sem semaphore to increment. + */ +void +MHD_semaphore_up (struct MHD_Semaphore *sem) +{ + if (! pthread_mutex_lock (&sem->mutex)) + MHD_PANIC ("pthread_mutex_lock for semaphore failed\n"); + sem->counter++; + pthread_cond_signal (&sem->cv); + if (! pthread_mutex_unlock (&sem->mutex)) + MHD_PANIC ("pthread_mutex_unlock for semaphore failed\n"); +} + + +/** + * Destroys the semaphore. + * + * @param sem semaphore to destroy. + */ +void +MHD_semaphore_destroy (struct MHD_Semaphore *sem) +{ + if (0 != pthread_cond_destroy (&sem->cv)) + MHD_PANIC ("pthread_cond_destroy failed\n"); + if (0 != pthread_mutex_destroy (&sem->mutex)) + MHD_PANIC ("pthread_mutex_destroy failed\n"); + free (sem); +} + + +/* end of mhd_sem.c */ diff --git a/src/microhttpd/response.c b/src/microhttpd/response.c index 670be983..ca729765 100644 --- a/src/microhttpd/response.c +++ b/src/microhttpd/response.c @@ -605,7 +605,23 @@ MHD_upgrade_action (struct MHD_UpgradeResponseHandle *urh, switch (action) { case MHD_UPGRADE_ACTION_CLOSE: + /* transition to special 'closed' state for start of cleanup */ + connection->state = MHD_CONNECTION_UPGRADE_CLOSED; /* Application is done with this connection, tear it down! */ + if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION) ) + { + if (0 == (daemon->options & MHD_USE_SSL) ) + { + /* just need to signal the thread that we are done */ + MHD_semaphore_up (connection->upgrade_sem); + } + else + { + /* signal thread by shutdown() of 'app' socket */ + shutdown (urh->app.socket, SHUT_RDWR); + } + return MHD_YES; + } #if HTTPS_SUPPORT if (0 != (daemon->options & MHD_USE_SSL) ) { @@ -658,6 +674,9 @@ MHD_upgrade_action (struct MHD_UpgradeResponseHandle *urh, case MHD_UPGRADE_ACTION_CORK: /* FIXME: not implemented */ return MHD_NO; + case MHD_UPGRADE_ACTION_FLUSH: + /* FIXME: not implemented */ + return MHD_NO; default: /* we don't understand this one */ return MHD_NO; @@ -784,11 +803,6 @@ MHD_response_execute_upgrade_ (struct MHD_Response *response, rbo, urh->app.socket, urh); - /* As far as MHD is concerned, this connection is - suspended; it will be resumed once we are done - in the #MHD_upgrade_action() function */ - MHD_suspend_connection (connection); - /* Launch IO processing by the event loop */ if (0 != (daemon->options & MHD_USE_EPOLL)) { @@ -846,12 +860,25 @@ MHD_response_execute_upgrade_ (struct MHD_Response *response, return MHD_NO; } } - - /* This takes care of most event loops: simply add to DLL */ - DLL_insert (daemon->urh_head, - daemon->urh_tail, - urh); - /* FIXME: None of the above will not work (yet) for thread-per-connection processing */ + if (0 == (daemon->options & MHD_USE_THREAD_PER_CONNECTION) ) + { + /* As far as MHD's event loops are concerned, this connection + is suspended; it will be resumed once we are done in the + #MHD_upgrade_action() function */ + MHD_suspend_connection (connection); + /* This takes care of further processing for most event loops: + simply add to DLL for bi-direcitonal processing */ + DLL_insert (daemon->urh_head, + daemon->urh_tail, + urh); + } + else + { + /* Our caller will set 'connection->state' to + MHD_CONNECTION_UPGRADE, thereby triggering the main method + of the thread to switch to bi-directional forwarding. */ + connection->urh = urh; + } return MHD_YES; } urh->app.socket = MHD_INVALID_SOCKET; @@ -864,10 +891,31 @@ MHD_response_execute_upgrade_ (struct MHD_Response *response, rbo, connection->socket_fd, urh); - /* As far as MHD is concerned, this connection is - suspended; it will be resumed once we are done - in the #MHD_upgrade_action() function */ - MHD_suspend_connection (connection); + if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION) ) + { + /* Need to give the thread something to block on... */ + connection->upgrade_sem = MHD_semaphore_create (0); + if (NULL == connection->upgrade_sem) + { +#ifdef HAVE_MESSAGES + MHD_DLOG (daemon, + "Failed to create semaphore for upgrade handling\n"); +#endif + MHD_connection_close_ (connection, + MHD_REQUEST_TERMINATED_WITH_ERROR); + return MHD_NO; + } + /* Our caller will set 'connection->state' to + MHD_CONNECTION_UPGRADE, thereby triggering the + main method of the thread to block on the semaphore. */ + } + else + { + /* As far as MHD's event loops are concerned, this connection is + suspended; it will be resumed once we are done in the + #MHD_upgrade_action() function */ + MHD_suspend_connection (connection); + } return MHD_YES; } |