summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-09-03 09:56:30 +0000
committerChristian Grothoff <christian@grothoff.org>2016-09-03 09:56:30 +0000
commit6fa19ae05f226451db78316919908c44d6444ac4 (patch)
treed3f2520a441b05d12e2001a3d3e3bbc95fe65543
parent5395e6196d8f469f95d904eff8afd731a9cceb29 (diff)
implementing 'Connection: upgrade' for thread-per-connection modes, but untested
-rw-r--r--ChangeLog4
-rw-r--r--src/include/microhttpd.h16
-rw-r--r--src/microhttpd/Makefile.am2
-rw-r--r--src/microhttpd/connection.c4
-rw-r--r--src/microhttpd/daemon.c546
-rw-r--r--src/microhttpd/internal.h26
-rw-r--r--src/microhttpd/mhd_locks.h47
-rw-r--r--src/microhttpd/mhd_sem.c138
-rw-r--r--src/microhttpd/response.c78
9 files changed, 677 insertions, 184 deletions
diff --git a/ChangeLog b/ChangeLog
index a606d2fe..a918505e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+Sat Sep 3 11:56:20 CEST 2016
+ Adding logic for handling HTTP "Upgrade" in thread-per-connection
+ mode. Also still untested. -CG
+
Sat Aug 27 21:01:43 CEST 2016
Adding a few extra safety checks around HTTP "Upgrade"
(against wrong uses of API), and a testcase. -CG
diff --git a/src/include/microhttpd.h b/src/include/microhttpd.h
index 7347447c..afb97571 100644
--- a/src/include/microhttpd.h
+++ b/src/include/microhttpd.h
@@ -1,6 +1,6 @@
/*
This file is part of libmicrohttpd
- Copyright (C) 2006-2015 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2006-2016 Christian Grothoff (and other contributing authors)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
@@ -2261,7 +2261,19 @@ enum MHD_UpgradeAction
* NOTE: it is unclear if we want to have this in the
* "final" API, this is just an idea right now.
*/
- MHD_UPGRADE_ACTION_CORK
+ MHD_UPGRADE_ACTION_CORK,
+
+ /**
+ * Try to "flush" our write buffer (to the network), returning
+ * #MHD_YES on success (buffer is empty) and #MHD_NO on failure
+ * (unsent bytes remain in buffers). This option is useful if
+ * the application wants to make sure that all data has been sent,
+ * which may be a good idea before closing the socket.
+ *
+ * NOTE: it is unclear if we want to have this in the
+ * "final" API, this is just an idea right now.
+ */
+ MHD_UPGRADE_ACTION_FLUSH
};
diff --git a/src/microhttpd/Makefile.am b/src/microhttpd/Makefile.am
index 6973af8a..9a32f598 100644
--- a/src/microhttpd/Makefile.am
+++ b/src/microhttpd/Makefile.am
@@ -63,7 +63,7 @@ libmicrohttpd_la_SOURCES = \
sysfdsetsize.c sysfdsetsize.h \
mhd_str.c mhd_str.h \
mhd_threads.c mhd_threads.h \
- mhd_locks.h \
+ mhd_locks.h mhd_sem.c \
mhd_sockets.c mhd_sockets.h \
mhd_itc.c mhd_itc.h \
mhd_compat.c mhd_compat.h \
diff --git a/src/microhttpd/connection.c b/src/microhttpd/connection.c
index e3880e8a..0c21bc57 100644
--- a/src/microhttpd/connection.c
+++ b/src/microhttpd/connection.c
@@ -413,7 +413,8 @@ MHD_set_connection_value (struct MHD_Connection *connection,
*/
const char *
MHD_lookup_connection_value (struct MHD_Connection *connection,
- enum MHD_ValueKind kind, const char *key)
+ enum MHD_ValueKind kind,
+ const char *key)
{
struct MHD_HTTP_Header *pos;
@@ -2772,7 +2773,6 @@ MHD_connection_handle_idle (struct MHD_Connection *connection)
/* Buffering for flushable socket was already enabled*/
if (MHD_NO == socket_flush_possible (connection))
socket_start_no_buffering (connection);
-
break;
}
/* not ready, no socket action */
diff --git a/src/microhttpd/daemon.c b/src/microhttpd/daemon.c
index 791e17d4..a298a1a4 100644
--- a/src/microhttpd/daemon.c
+++ b/src/microhttpd/daemon.c
@@ -636,6 +636,79 @@ MHD_get_fdset (struct MHD_Daemon *daemon,
/**
+ * Obtain the select() file descriptor sets for the
+ * given @a urh.
+ *
+ * @param urh upgrade handle to wait for
+ * @param[out] rs read set to initialize
+ * @param[out] ws write set to initialize
+ * @param[out] max_fd maximum FD to update
+ * @param fd_setsize value of FD_SETSIZE
+ * @return #MHD_YES on success, #MHD_NO on error
+ */
+static int
+urh_to_fdset (struct MHD_UpgradeResponseHandle *urh,
+ fd_set *rs,
+ fd_set *ws,
+ MHD_socket *max_fd,
+ unsigned int fd_setsize)
+{
+ if ( (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
+ (! MHD_add_to_fd_set_ (urh->mhd.socket,
+ rs,
+ max_fd,
+ fd_setsize)) )
+ return MHD_NO;
+ if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
+ (! MHD_add_to_fd_set_ (urh->mhd.socket,
+ ws,
+ max_fd,
+ fd_setsize)) )
+ return MHD_NO;
+ if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
+ (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
+ rs,
+ max_fd,
+ fd_setsize)) )
+ return MHD_NO;
+ if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
+ (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
+ ws,
+ max_fd,
+ fd_setsize)) )
+ return MHD_NO;
+ return MHD_YES;
+}
+
+
+/**
+ * Update the @a urh based on the ready FDs in the @a rs and @a ws.
+ *
+ * @param urh upgrade handle to update
+ * @param rs read result from select()
+ * @param ws write result from select()
+ */
+static void
+urh_from_fdset (struct MHD_UpgradeResponseHandle *urh,
+ const fd_set *rs,
+ const fd_set *ws)
+{
+ if (FD_ISSET (urh->connection->socket_fd,
+ rs))
+ urh->app.celi |= MHD_EPOLL_STATE_READ_READY;
+ if (FD_ISSET (urh->connection->socket_fd,
+ ws))
+ urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY;
+ if (FD_ISSET (urh->mhd.socket,
+ rs))
+ urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY;
+ if (FD_ISSET (urh->mhd.socket,
+ ws))
+ urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY;
+}
+
+
+/**
* Obtain the `select()` sets for this daemon.
* Daemon's FDs will be added to fd_sets. To get only
* daemon FDs in fd_sets, call FD_ZERO for each fd_set
@@ -733,29 +806,12 @@ MHD_get_fdset2 (struct MHD_Daemon *daemon,
}
for (urh = daemon->urh_head; NULL != urh; urh = urh->next)
{
- if ( (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
- (! MHD_add_to_fd_set_ (urh->mhd.socket,
- read_fd_set,
- max_fd,
- fd_setsize)) )
- result = MHD_NO;
- if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
- (! MHD_add_to_fd_set_ (urh->mhd.socket,
- write_fd_set,
- max_fd,
- fd_setsize)) )
- result = MHD_NO;
- if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
- (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
- read_fd_set,
- max_fd,
- fd_setsize)) )
- result = MHD_NO;
- if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
- (! MHD_add_to_fd_set_ (urh->connection->socket_fd,
- write_fd_set,
- max_fd,
- fd_setsize)) )
+ if (MHD_NO ==
+ urh_to_fdset (urh,
+ read_fd_set,
+ write_fd_set,
+ max_fd,
+ fd_setsize))
result = MHD_NO;
}
#if DEBUG_CONNECT
@@ -825,6 +881,251 @@ call_handlers (struct MHD_Connection *con,
}
+#if HTTPS_SUPPORT
+/**
+ * Performs bi-directional forwarding on upgraded HTTPS connections
+ * based on the readyness state stored in the @a urh handle.
+ *
+ * @param urh handle to process
+ */
+static void
+process_urh (struct MHD_UpgradeResponseHandle *urh)
+{
+ /* handle reading from TLS client and writing to application */
+ if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
+ (urh->in_buffer_off < urh->in_buffer_size) )
+ {
+ ssize_t res;
+
+ res = gnutls_record_recv (urh->connection->tls_session,
+ &urh->in_buffer[urh->in_buffer_off],
+ urh->in_buffer_size - urh->in_buffer_off);
+ if ( (GNUTLS_E_AGAIN == res) ||
+ (GNUTLS_E_INTERRUPTED == res) )
+ {
+ urh->app.celi &= ~MHD_EPOLL_STATE_READ_READY;
+ }
+ else if (res > 0)
+ {
+ urh->in_buffer_off += res;
+ }
+ }
+ if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
+ (urh->in_buffer_off > 0) )
+ {
+ size_t res;
+
+ res = write (urh->mhd.socket,
+ urh->in_buffer,
+ urh->in_buffer_off);
+ if (-1 == res)
+ {
+ /* FIXME: differenciate by errno? */
+ urh->mhd.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
+ }
+ else
+ {
+ if (urh->in_buffer_off != res)
+ {
+ memmove (urh->in_buffer,
+ &urh->in_buffer[res],
+ urh->in_buffer_off - res);
+ urh->in_buffer_off -= res;
+ }
+ else
+ {
+ urh->in_buffer_off = 0;
+ }
+ }
+ }
+
+ /* handle reading from application and writing to HTTPS client */
+ if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
+ (urh->out_buffer_off < urh->out_buffer_size) )
+ {
+ size_t res;
+
+ res = read (urh->mhd.socket,
+ &urh->out_buffer[urh->out_buffer_off],
+ urh->out_buffer_size - urh->out_buffer_off);
+ if (-1 == res)
+ {
+ /* FIXME: differenciate by errno? */
+ urh->mhd.celi &= ~MHD_EPOLL_STATE_READ_READY;
+ }
+ else
+ {
+ urh->out_buffer_off += res;
+ }
+ }
+ if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
+ (urh->out_buffer_off > 0) )
+ {
+ ssize_t res;
+
+ res = gnutls_record_send (urh->connection->tls_session,
+ urh->out_buffer,
+ urh->out_buffer_off);
+ if ( (GNUTLS_E_AGAIN == res) ||
+ (GNUTLS_E_INTERRUPTED == res) )
+ {
+ urh->app.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
+ }
+ else if (res > 0)
+ {
+ if (urh->out_buffer_off != res)
+ {
+ memmove (urh->out_buffer,
+ &urh->out_buffer[res],
+ urh->out_buffer_off - res);
+ urh->out_buffer_off -= res;
+ }
+ else
+ {
+ urh->out_buffer_off = 0;
+ }
+ }
+ }
+}
+#endif
+
+
+/**
+ * Main function of the thread that handles an individual connection
+ * after it was "upgraded" when #MHD_USE_THREAD_PER_CONNECTION is set.
+ *
+ * @param con the connection this thread will handle
+ */
+static void
+thread_main_connection_upgrade (struct MHD_Connection *con)
+{
+ struct MHD_Daemon *daemon = con->daemon;
+
+ if (0 == (daemon->options & MHD_USE_SSL))
+ {
+ /* Here, we need to block until the application
+ signals us that it is done with the socket */
+ MHD_semaphore_down (con->upgrade_sem);
+ MHD_semaphore_destroy (con->upgrade_sem);
+ con->upgrade_sem = NULL;
+ return;
+ }
+#if HTTPS_SUPPORT
+ {
+ struct MHD_UpgradeResponseHandle *urh = con->urh;
+
+ /* Here, we need to bi-directionally forward
+ until the application tells us that it is done
+ with the socket; */
+ if (0 == (daemon->options & MHD_USE_POLL))
+ {
+ while (MHD_CONNECTION_UPGRADE == con->state)
+ {
+ /* use select */
+ fd_set rs;
+ fd_set ws;
+ MHD_socket max_fd;
+ int num_ready;
+ int result;
+
+ FD_ZERO (&rs);
+ FD_ZERO (&ws);
+ max_fd = MHD_INVALID_SOCKET;
+ result = urh_to_fdset (urh,
+ &rs,
+ &ws,
+ &max_fd,
+ FD_SETSIZE);
+ if (MHD_NO == result)
+ {
+#ifdef HAVE_MESSAGES
+ MHD_DLOG (con->daemon,
+ "Error preparing select\n");
+#endif
+ break;
+ }
+ num_ready = MHD_SYS_select_ (max_fd + 1,
+ &rs,
+ &ws,
+ NULL,
+ NULL);
+ if (num_ready < 0)
+ {
+ const int err = MHD_socket_get_error_();
+
+ if (MHD_SCKT_ERR_IS_EINTR_(err))
+ continue;
+#ifdef HAVE_MESSAGES
+ MHD_DLOG (con->daemon,
+ "Error during select (%d): `%s'\n",
+ err,
+ MHD_socket_strerr_ (err));
+#endif
+ break;
+ }
+ urh_from_fdset (urh,
+ &rs,
+ &ws);
+ process_urh (urh);
+ }
+ }
+#ifdef HAVE_POLL
+ else
+ {
+ /* use poll() */
+ struct pollfd p[2];
+ const unsigned int timeout = UINT_MAX;
+
+ p[0].fd = urh->connection->socket_fd;
+ p[1].fd = urh->mhd.socket;
+ while (MHD_CONNECTION_UPGRADE == con->state)
+ {
+ if (0 == (MHD_EPOLL_STATE_READ_READY & urh->app.celi))
+ p[0].events |= POLLIN;
+ if (0 == (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi))
+ p[0].events |= POLLOUT;
+ if (0 == (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi))
+ p[1].events |= POLLIN;
+ if (0 == (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi))
+ p[1].events |= POLLOUT;
+
+ if (MHD_sys_poll_ (p,
+ 2,
+ timeout) < 0)
+ {
+ const int err = MHD_socket_get_error_ ();
+
+ if (MHD_SCKT_ERR_IS_EINTR_ (err))
+ continue;
+#ifdef HAVE_MESSAGES
+ MHD_DLOG (con->daemon,
+ "Error during poll: `%s'\n",
+ MHD_socket_strerr_ (err));
+#endif
+ break;
+ }
+ if (0 != (p[0].revents & POLLIN))
+ urh->app.celi |= MHD_EPOLL_STATE_READ_READY;
+ if (0 != (p[0].revents & POLLOUT))
+ urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY;
+ if (0 != (p[1].revents & POLLIN))
+ urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY;
+ if (0 != (p[1].revents & POLLOUT))
+ urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY;
+ process_urh (urh);
+ }
+ }
+ /* end POLL */
+#endif
+ /* end HTTPS */
+#else
+ /* HTTPS option set, but compiled without HTTPS */
+ MHD_PANIC ("This should not be possible\n");
+#endif
+ }
+}
+
+
/**
* Main function of the thread that handles an individual
* connection when #MHD_USE_THREAD_PER_CONNECTION is set.
@@ -836,6 +1137,7 @@ static MHD_THRD_RTRN_TYPE_ MHD_THRD_CALL_SPEC_
thread_main_handle_connection (void *data)
{
struct MHD_Connection *con = data;
+ struct MHD_Daemon *daemon = con->daemon;
int num_ready;
fd_set rs;
fd_set ws;
@@ -844,7 +1146,7 @@ thread_main_handle_connection (void *data)
struct timeval *tvp;
time_t now;
#if WINDOWS
- MHD_pipe spipe = con->daemon->wpipe[0];
+ MHD_pipe spipe = daemon->wpipe[0];
#ifdef HAVE_POLL
int extra_slot;
#endif /* HAVE_POLL */
@@ -855,11 +1157,13 @@ thread_main_handle_connection (void *data)
#ifdef HAVE_POLL
struct pollfd p[1 + EXTRA_SLOTS];
#endif
+#undef EXTRA_SLOTS
- while ( (MHD_YES != con->daemon->shutdown) &&
+ while ( (MHD_YES != daemon->shutdown) &&
(MHD_CONNECTION_CLOSED != con->state) )
{
- unsigned const int timeout = con->daemon->connection_timeout;
+ const unsigned int timeout = daemon->connection_timeout;
+
tvp = NULL;
#if HTTPS_SUPPORT
if (MHD_YES == con->tls_read_ready)
@@ -870,7 +1174,8 @@ thread_main_handle_connection (void *data)
tvp = &tv;
}
#endif
- if (NULL == tvp && timeout > 0)
+ if ( (NULL == tvp) &&
+ (timeout > 0) )
{
now = MHD_monotonic_sec_counter();
if (now - con->last_activity > timeout)
@@ -884,35 +1189,48 @@ thread_main_handle_connection (void *data)
if (seconds_left > TIMEVAL_TV_SEC_MAX)
tv.tv_sec = TIMEVAL_TV_SEC_MAX;
else
- tv.tv_sec = (_MHD_TIMEVAL_TV_SEC_TYPE)seconds_left;
+ tv.tv_sec = (_MHD_TIMEVAL_TV_SEC_TYPE) seconds_left;
#endif /* _WIN32 */
}
tv.tv_usec = 0;
tvp = &tv;
}
- if (0 == (con->daemon->options & MHD_USE_POLL))
+ if (0 == (daemon->options & MHD_USE_POLL))
{
/* use select */
int err_state = 0;
+
FD_ZERO (&rs);
FD_ZERO (&ws);
maxsock = MHD_INVALID_SOCKET;
switch (con->event_loop_info)
{
case MHD_EVENT_LOOP_INFO_READ:
- if (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, FD_SETSIZE))
+ if (! MHD_add_to_fd_set_ (con->socket_fd,
+ &rs,
+ &maxsock,
+ FD_SETSIZE))
err_state = 1;
break;
case MHD_EVENT_LOOP_INFO_WRITE:
- if (!MHD_add_to_fd_set_ (con->socket_fd, &ws, &maxsock, FD_SETSIZE))
+ if (! MHD_add_to_fd_set_ (con->socket_fd,
+ &ws,
+ &maxsock,
+ FD_SETSIZE))
err_state = 1;
if ( (con->read_buffer_size > con->read_buffer_offset) &&
- (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, FD_SETSIZE)) )
+ (! MHD_add_to_fd_set_ (con->socket_fd,
+ &rs,
+ &maxsock,
+ FD_SETSIZE)) )
err_state = 1;
break;
case MHD_EVENT_LOOP_INFO_BLOCK:
if ( (con->read_buffer_size > con->read_buffer_offset) &&
- (!MHD_add_to_fd_set_ (con->socket_fd, &rs, &maxsock, FD_SETSIZE)) )
+ (! MHD_add_to_fd_set_ (con->socket_fd,
+ &rs,
+ &maxsock,
+ FD_SETSIZE)) )
err_state = 1;
tv.tv_sec = 0;
tv.tv_usec = 0;
@@ -925,7 +1243,10 @@ thread_main_handle_connection (void *data)
#if WINDOWS
if (MHD_INVALID_PIPE_ != spipe)
{
- if (!MHD_add_to_fd_set_ (spipe, &rs, &maxsock, FD_SETSIZE))
+ if (! MHD_add_to_fd_set_ (spipe,
+ &rs,
+ &maxsock,
+ FD_SETSIZE))
err_state = 1;
}
#endif
@@ -938,10 +1259,15 @@ thread_main_handle_connection (void *data)
goto exit;
}
- num_ready = MHD_SYS_select_ (maxsock + 1, &rs, &ws, NULL, tvp);
+ num_ready = MHD_SYS_select_ (maxsock + 1,
+ &rs,
+ &ws,
+ NULL,
+ tvp);
if (num_ready < 0)
{
const int err = MHD_socket_get_error_();
+
if (MHD_SCKT_ERR_IS_EINTR_(err))
continue;
#ifdef HAVE_MESSAGES
@@ -960,8 +1286,10 @@ thread_main_handle_connection (void *data)
#endif
if (MHD_NO ==
call_handlers (con,
- FD_ISSET (con->socket_fd, &rs),
- FD_ISSET (con->socket_fd, &ws),
+ FD_ISSET (con->socket_fd,
+ &rs),
+ FD_ISSET (con->socket_fd,
+ &ws),
MHD_NO))
goto exit;
}
@@ -969,7 +1297,9 @@ thread_main_handle_connection (void *data)
else
{
/* use poll */
- memset (&p, 0, sizeof (p));
+ memset (&p,
+ 0,
+ sizeof (p));
p[0].fd = con->socket_fd;
switch (con->event_loop_info)
{
@@ -1004,11 +1334,11 @@ thread_main_handle_connection (void *data)
#endif
if (MHD_sys_poll_ (p,
#if WINDOWS
- 1 + extra_slot,
+ 1 + extra_slot,
#else
- 1,
+ 1,
#endif
- (NULL == tvp) ? -1 : tv.tv_sec * 1000) < 0)
+ (NULL == tvp) ? -1 : tv.tv_sec * 1000) < 0)
{
if (MHD_SCKT_LAST_ERR_IS_(MHD_SCKT_EINTR_))
continue;
@@ -1033,6 +1363,11 @@ thread_main_handle_connection (void *data)
goto exit;
}
#endif
+ if (MHD_CONNECTION_UPGRADE == con->state)
+ {
+ thread_main_connection_upgrade (con);
+ break;
+ }
}
if (MHD_CONNECTION_IN_CLEANUP != con->state)
{
@@ -1054,14 +1389,15 @@ exit:
con->response = NULL;
}
- if (NULL != con->daemon->notify_connection)
- con->daemon->notify_connection (con->daemon->notify_connection_cls,
+ if (NULL != daemon->notify_connection)
+ con->daemon->notify_connection (daemon->notify_connection_cls,
con,
&con->socket_context,
MHD_CONNECTION_NOTIFY_CLOSED);
if (MHD_INVALID_SOCKET != con->socket_fd)
{
- shutdown (con->socket_fd, SHUT_WR);
+ shutdown (con->socket_fd,
+ SHUT_WR);
if (0 != MHD_socket_close_ (con->socket_fd))
MHD_PANIC ("close failed\n");
con->socket_fd = MHD_INVALID_SOCKET;
@@ -1433,7 +1769,7 @@ internal_add_connection (struct MHD_Daemon *daemon,
{
/* in turbo mode, we assume that non-blocking was already set
by 'accept4' or whoever calls 'MHD_add_connection' */
- if (!MHD_socket_nonblocking_ (connection->socket_fd))
+ if (! MHD_socket_nonblocking_ (connection->socket_fd))
{
#ifdef HAVE_MESSAGES
MHD_DLOG (connection->daemon,
@@ -2168,115 +2504,6 @@ MHD_get_timeout (struct MHD_Daemon *daemon,
}
-#if HTTPS_SUPPORT
-/**
- * Performs bi-directional forwarding on upgraded HTTPS connections
- * based on the readyness state stored in the @a urh handle.
- *
- * @param urh handle to process
- */
-static void
-process_urh (struct MHD_UpgradeResponseHandle *urh)
-{
- /* handle reading from TLS client and writing to application */
- if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->app.celi)) &&
- (urh->in_buffer_off < urh->in_buffer_size) )
- {
- ssize_t res;
-
- res = gnutls_record_recv (urh->connection->tls_session,
- &urh->in_buffer[urh->in_buffer_off],
- urh->in_buffer_size - urh->in_buffer_off);
- if ( (GNUTLS_E_AGAIN == res) ||
- (GNUTLS_E_INTERRUPTED == res) )
- {
- urh->app.celi &= ~MHD_EPOLL_STATE_READ_READY;
- }
- else if (res > 0)
- {
- urh->in_buffer_off += res;
- }
- }
- if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->mhd.celi)) &&
- (urh->in_buffer_off > 0) )
- {
- size_t res;
-
- res = write (urh->mhd.socket,
- urh->in_buffer,
- urh->in_buffer_off);
- if (-1 == res)
- {
- /* FIXME: differenciate by errno? */
- urh->mhd.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
- }
- else
- {
- if (urh->in_buffer_off != res)
- {
- memmove (urh->in_buffer,
- &urh->in_buffer[res],
- urh->in_buffer_off - res);
- urh->in_buffer_off -= res;
- }
- else
- {
- urh->in_buffer_off = 0;
- }
- }
- }
-
- /* handle reading from application and writing to HTTPS client */
- if ( (0 != (MHD_EPOLL_STATE_READ_READY & urh->mhd.celi)) &&
- (urh->out_buffer_off < urh->out_buffer_size) )
- {
- size_t res;
-
- res = read (urh->mhd.socket,
- &urh->out_buffer[urh->out_buffer_off],
- urh->out_buffer_size - urh->out_buffer_off);
- if (-1 == res)
- {
- /* FIXME: differenciate by errno? */
- urh->mhd.celi &= ~MHD_EPOLL_STATE_READ_READY;
- }
- else
- {
- urh->out_buffer_off += res;
- }
- }
- if ( (0 != (MHD_EPOLL_STATE_WRITE_READY & urh->app.celi)) &&
- (urh->out_buffer_off > 0) )
- {
- ssize_t res;
-
- res = gnutls_record_send (urh->connection->tls_session,
- urh->out_buffer,
- urh->out_buffer_off);
- if ( (GNUTLS_E_AGAIN == res) ||
- (GNUTLS_E_INTERRUPTED == res) )
- {
- urh->app.celi &= ~MHD_EPOLL_STATE_WRITE_READY;
- }
- else if (res > 0)
- {
- if (urh->out_buffer_off != res)
- {
- memmove (urh->out_buffer,
- &urh->out_buffer[res],
- urh->out_buffer_off - res);
- urh->out_buffer_off -= res;
- }
- else
- {
- urh->out_buffer_off = 0;
- }
- }
- }
-}
-#endif
-
-
/**
* Run webserver operations. This method should be called by clients
* in combination with #MHD_get_fdset if the client-controlled select
@@ -2362,14 +2589,9 @@ MHD_run_from_select (struct MHD_Daemon *daemon,
for (urh = daemon->urh_head; NULL != urh; urh = urh->next)
{
/* update urh state based on select() output */
- if (FD_ISSET (urh->connection->socket_fd, read_fd_set))
- urh->app.celi |= MHD_EPOLL_STATE_READ_READY;
- if (FD_ISSET (urh->connection->socket_fd, write_fd_set))
- urh->app.celi |= MHD_EPOLL_STATE_WRITE_READY;
- if (FD_ISSET (urh->mhd.socket, read_fd_set))
- urh->mhd.celi |= MHD_EPOLL_STATE_READ_READY;
- if (FD_ISSET (urh->mhd.socket, write_fd_set))
- urh->mhd.celi |= MHD_EPOLL_STATE_WRITE_READY;
+ urh_from_fdset (urh,
+ read_fd_set,
+ write_fd_set);
/* call generic forwarding function for passing data */
process_urh (urh);
}
diff --git a/src/microhttpd/internal.h b/src/microhttpd/internal.h
index 00843729..7a1e58e1 100644
--- a/src/microhttpd/internal.h
+++ b/src/microhttpd/internal.h
@@ -480,7 +480,14 @@ enum MHD_CONNECTION_STATE
* Connection was "upgraded" and socket is now under the
* control of the application.
*/
- MHD_CONNECTION_UPGRADE = MHD_TLS_CONNECTION_INIT + 1
+ MHD_CONNECTION_UPGRADE = MHD_TLS_CONNECTION_INIT + 1,
+
+ /**
+ * Connection was "upgraded" and subsequently closed
+ * by the application. We now need to do our own
+ * internal cleanup.
+ */
+ MHD_CONNECTION_UPGRADE_CLOSED = MHD_TLS_CONNECTION_INIT + 1
};
@@ -854,6 +861,23 @@ struct MHD_Connection
#if HTTPS_SUPPORT
/**
+ * If this connection was upgraded and if we are using
+ * #MHD_USE_THREAD_PER_CONNECTION, this points to the
+ * upgrade response details such that the
+ * #thread_main_connection_upgrade()-logic can perform
+ * the bi-directional forwarding.
+ */
+ struct MHD_UpgradeResponseHandle *urh;
+
+ /**
+ * If this connection was upgraded and if we are using
+ * #MHD_USE_THREAD_PER_CONNECTION without encryption,
+ * this points to the semaphore we use to signal termination
+ * to the thread handling the connection.
+ */
+ struct MHD_Semaphore *upgrade_sem;
+
+ /**
* State required for HTTPS/SSL/TLS support.
*/
gnutls_session_t tls_session;
diff --git a/src/microhttpd/mhd_locks.h b/src/microhttpd/mhd_locks.h
index cf10c0d1..1d8376f0 100644
--- a/src/microhttpd/mhd_locks.h
+++ b/src/microhttpd/mhd_locks.h
@@ -22,8 +22,9 @@
* @file microhttpd/mhd_locks.h
* @brief Header for platform-independent locks abstraction
* @author Karlson2k (Evgeny Grin)
+ * @author Christian Grothoff
*
- * Provides basic abstraction for locks and mutex.
+ * Provides basic abstraction for locks/mutex and semaphores.
* Any functions can be implemented as macro on some platforms
* unless explicitly marked otherwise.
* Any function argument can be skipped in macro, so avoid
@@ -147,4 +148,48 @@
#define MHD_mutex_unlock_(pmutex) (LeaveCriticalSection((pmutex)), !0)
#endif
+
+/**
+ * A semaphore.
+ */
+struct MHD_Semaphore;
+
+
+/**
+ * Create a semaphore with an initial counter of @a init
+ *
+ * @param init initial counter
+ * @return the semaphore, NULL on error
+ */
+struct MHD_Semaphore *
+MHD_semaphore_create (unsigned int init);
+
+
+/**
+ * Count down the semaphore, block if necessary.
+ *
+ * @param sem semaphore to count down.
+ */
+void
+MHD_semaphore_down (struct MHD_Semaphore *sem);
+
+
+/**
+ * Increment the semaphore.
+ *
+ * @param sem semaphore to increment.
+ */
+void
+MHD_semaphore_up (struct MHD_Semaphore *sem);
+
+
+/**
+ * Destroys the semaphore.
+ *
+ * @param sem semaphore to destroy.
+ */
+void
+MHD_semaphore_destroy (struct MHD_Semaphore *sem);
+
+
#endif /* ! MHD_LOCKS_H */
diff --git a/src/microhttpd/mhd_sem.c b/src/microhttpd/mhd_sem.c
new file mode 100644
index 00000000..fdd5dbe4
--- /dev/null
+++ b/src/microhttpd/mhd_sem.c
@@ -0,0 +1,138 @@
+/*
+ This file is part of libmicrohttpd
+ Copyright (C) 2016 Christian Grothoff
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+*/
+
+/**
+ * @file microhttpd/mhd_sem.c
+ * @brief implementation of semaphores
+ * @author Christian Grothoff
+ */
+#include "internal.h"
+#include "mhd_locks.h"
+
+/**
+ * A semaphore.
+ */
+struct MHD_Semaphore
+{
+ /**
+ * Mutex we use internally.
+ */
+ pthread_mutex_t mutex;
+
+ /**
+ * Condition variable used to implement the semaphore.
+ */
+ pthread_cond_t cv;
+
+ /**
+ * Current value of the semaphore.
+ */
+ unsigned int counter;
+};
+
+
+/**
+ * Create a semaphore with an initial counter of @a init
+ *
+ * @param init initial counter
+ * @return the semaphore, NULL on error
+ */
+struct MHD_Semaphore *
+MHD_semaphore_create (unsigned int init)
+{
+ struct MHD_Semaphore *sem;
+
+ sem = malloc (sizeof (struct MHD_Semaphore));
+ if (NULL == sem)
+ return NULL;
+ sem->counter = init;
+ if (0 != pthread_mutex_init (&sem->mutex,
+ NULL))
+ {
+ free (sem);
+ return NULL;
+ }
+ if (0 != pthread_cond_init (&sem->cv,
+ NULL))
+ {
+ (void) pthread_mutex_destroy (&sem->mutex);
+ free (sem);
+ return NULL;
+ }
+ return sem;
+}
+
+
+/**
+ * Count down the semaphore, block if necessary.
+ *
+ * @param sem semaphore to count down.
+ */
+void
+MHD_semaphore_down (struct MHD_Semaphore *sem)
+{
+ if (! pthread_mutex_lock (&sem->mutex))
+ MHD_PANIC ("pthread_mutex_lock for semaphore failed\n");
+ while (0 == sem->counter)
+ {
+ if (0 != pthread_cond_wait (&sem->cv,
+ &sem->mutex))
+ MHD_PANIC ("pthread_cond_wait failed\n");
+ }
+ sem->counter--;
+ if (! pthread_mutex_unlock (&sem->mutex))
+ MHD_PANIC ("pthread_mutex_unlock for semaphore failed\n");
+}
+
+
+/**
+ * Increment the semaphore.
+ *
+ * @param sem semaphore to increment.
+ */
+void
+MHD_semaphore_up (struct MHD_Semaphore *sem)
+{
+ if (! pthread_mutex_lock (&sem->mutex))
+ MHD_PANIC ("pthread_mutex_lock for semaphore failed\n");
+ sem->counter++;
+ pthread_cond_signal (&sem->cv);
+ if (! pthread_mutex_unlock (&sem->mutex))
+ MHD_PANIC ("pthread_mutex_unlock for semaphore failed\n");
+}
+
+
+/**
+ * Destroys the semaphore.
+ *
+ * @param sem semaphore to destroy.
+ */
+void
+MHD_semaphore_destroy (struct MHD_Semaphore *sem)
+{
+ if (0 != pthread_cond_destroy (&sem->cv))
+ MHD_PANIC ("pthread_cond_destroy failed\n");
+ if (0 != pthread_mutex_destroy (&sem->mutex))
+ MHD_PANIC ("pthread_mutex_destroy failed\n");
+ free (sem);
+}
+
+
+/* end of mhd_sem.c */
diff --git a/src/microhttpd/response.c b/src/microhttpd/response.c
index 670be983..ca729765 100644
--- a/src/microhttpd/response.c
+++ b/src/microhttpd/response.c
@@ -605,7 +605,23 @@ MHD_upgrade_action (struct MHD_UpgradeResponseHandle *urh,
switch (action)
{
case MHD_UPGRADE_ACTION_CLOSE:
+ /* transition to special 'closed' state for start of cleanup */
+ connection->state = MHD_CONNECTION_UPGRADE_CLOSED;
/* Application is done with this connection, tear it down! */
+ if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION) )
+ {
+ if (0 == (daemon->options & MHD_USE_SSL) )
+ {
+ /* just need to signal the thread that we are done */
+ MHD_semaphore_up (connection->upgrade_sem);
+ }
+ else
+ {
+ /* signal thread by shutdown() of 'app' socket */
+ shutdown (urh->app.socket, SHUT_RDWR);
+ }
+ return MHD_YES;
+ }
#if HTTPS_SUPPORT
if (0 != (daemon->options & MHD_USE_SSL) )
{
@@ -658,6 +674,9 @@ MHD_upgrade_action (struct MHD_UpgradeResponseHandle *urh,
case MHD_UPGRADE_ACTION_CORK:
/* FIXME: not implemented */
return MHD_NO;
+ case MHD_UPGRADE_ACTION_FLUSH:
+ /* FIXME: not implemented */
+ return MHD_NO;
default:
/* we don't understand this one */
return MHD_NO;
@@ -784,11 +803,6 @@ MHD_response_execute_upgrade_ (struct MHD_Response *response,
rbo,
urh->app.socket,
urh);
- /* As far as MHD is concerned, this connection is
- suspended; it will be resumed once we are done
- in the #MHD_upgrade_action() function */
- MHD_suspend_connection (connection);
-
/* Launch IO processing by the event loop */
if (0 != (daemon->options & MHD_USE_EPOLL))
{
@@ -846,12 +860,25 @@ MHD_response_execute_upgrade_ (struct MHD_Response *response,
return MHD_NO;
}
}
-
- /* This takes care of most event loops: simply add to DLL */
- DLL_insert (daemon->urh_head,
- daemon->urh_tail,
- urh);
- /* FIXME: None of the above will not work (yet) for thread-per-connection processing */
+ if (0 == (daemon->options & MHD_USE_THREAD_PER_CONNECTION) )
+ {
+ /* As far as MHD's event loops are concerned, this connection
+ is suspended; it will be resumed once we are done in the
+ #MHD_upgrade_action() function */
+ MHD_suspend_connection (connection);
+ /* This takes care of further processing for most event loops:
+ simply add to DLL for bi-direcitonal processing */
+ DLL_insert (daemon->urh_head,
+ daemon->urh_tail,
+ urh);
+ }
+ else
+ {
+ /* Our caller will set 'connection->state' to
+ MHD_CONNECTION_UPGRADE, thereby triggering the main method
+ of the thread to switch to bi-directional forwarding. */
+ connection->urh = urh;
+ }
return MHD_YES;
}
urh->app.socket = MHD_INVALID_SOCKET;
@@ -864,10 +891,31 @@ MHD_response_execute_upgrade_ (struct MHD_Response *response,
rbo,
connection->socket_fd,
urh);
- /* As far as MHD is concerned, this connection is
- suspended; it will be resumed once we are done
- in the #MHD_upgrade_action() function */
- MHD_suspend_connection (connection);
+ if (0 != (daemon->options & MHD_USE_THREAD_PER_CONNECTION) )
+ {
+ /* Need to give the thread something to block on... */
+ connection->upgrade_sem = MHD_semaphore_create (0);
+ if (NULL == connection->upgrade_sem)
+ {
+#ifdef HAVE_MESSAGES
+ MHD_DLOG (daemon,
+ "Failed to create semaphore for upgrade handling\n");
+#endif
+ MHD_connection_close_ (connection,
+ MHD_REQUEST_TERMINATED_WITH_ERROR);
+ return MHD_NO;
+ }
+ /* Our caller will set 'connection->state' to
+ MHD_CONNECTION_UPGRADE, thereby triggering the
+ main method of the thread to block on the semaphore. */
+ }
+ else
+ {
+ /* As far as MHD's event loops are concerned, this connection is
+ suspended; it will be resumed once we are done in the
+ #MHD_upgrade_action() function */
+ MHD_suspend_connection (connection);
+ }
return MHD_YES;
}