From 7ce8b45c6220fef72b572ea33d077397119f1754 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Fri, 24 Aug 2012 08:12:46 +0000 Subject: mod --- src/transport/plugin_transport_http_server.c | 255 +++++++++++++++++++-------- 1 file changed, 182 insertions(+), 73 deletions(-) (limited to 'src/transport/plugin_transport_http_server.c') diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c index 293dbd060..e3cd73559 100644 --- a/src/transport/plugin_transport_http_server.c +++ b/src/transport/plugin_transport_http_server.c @@ -67,21 +67,20 @@ */ struct Plugin; - /** * Session handle for connections. */ -struct HttpServerSession +struct Session { /** * Stored in a linked list. */ - struct HttpServerSession *next; + struct Session *next; /** * Stored in a linked list. */ - struct HttpServerSession *prev; + struct Session *prev; /** * To whom are we talking to (set to our identity @@ -164,10 +163,13 @@ struct ServerConnection int disconnect; /* The session this server connection belongs to */ - struct HttpServerSession *session; + struct Session *session; /* The MHD connection */ struct MHD_Connection *mhd_conn; + + /* The MHD daemon */ + struct MHD_Daemon *mhd_daemon; }; /** @@ -184,12 +186,12 @@ struct HTTP_Server_Plugin * Linked list head of open sessions. */ - struct HttpServerSession *head; + struct Session *head; /** * Linked list tail of open sessions. */ - struct HttpServerSession *tail; + struct Session *tail; /** * Plugin name @@ -259,9 +261,9 @@ struct HTTP_Server_Plugin * A full session consists of 2 semi-connections: send and receive * If not both directions are established the server keeps this sessions here */ - struct HttpServerSession *server_semi_head; + struct Session *server_semi_head; - struct HttpServerSession *server_semi_tail; + struct Session *server_semi_tail; /** * List of own addresses @@ -406,26 +408,25 @@ struct HTTP_Message }; -static struct Plugin * p; +static struct HTTP_Server_Plugin * p; /** * Start session timeout */ static void -server_start_session_timeout (struct HttpServerSession *s); +server_start_session_timeout (struct Session *s); -#if 0 /** * Increment session timeout due to activity */ static void -server_reschedule_session_timeout (struct HttpServerSession *s); -#endif +server_reschedule_session_timeout (struct Session *s); + /** * Cancel timeout */ static void -server_stop_session_timeout (struct HttpServerSession *s); +server_stop_session_timeout (struct Session *s); /** * Function that can be used by the transport service to transmit @@ -549,7 +550,7 @@ http_server_plugin_get_session (void *cls, */ void -server_delete_session (struct HttpServerSession *s) +server_delete_session (struct Session *s) { struct HTTP_Server_Plugin *plugin = s->plugin; server_stop_session_timeout(s); @@ -583,7 +584,7 @@ server_delete_session (struct HttpServerSession *s) } int -server_disconnect (struct HttpServerSession *s) +server_disconnect (struct Session *s) { struct ServerConnection * send; struct ServerConnection * recv; @@ -636,7 +637,7 @@ server_disconnect (struct HttpServerSession *s) * Cancel timeout */ static void -server_stop_session_timeout (struct HttpServerSession *s) +server_stop_session_timeout (struct Session *s) { GNUNET_assert (NULL != s); @@ -708,8 +709,8 @@ server_lookup_connection (struct HTTP_Server_Plugin *plugin, struct MHD_Connection *mhd_connection, const char *url, const char *method) { - struct HttpServerSession *s = NULL; - struct HttpServerSession *t; + struct Session *s = NULL; + struct Session *t; struct ServerConnection *sc = NULL; const union MHD_ConnectionInfo *conn_info; struct GNUNET_ATS_Information ats; @@ -744,15 +745,18 @@ server_lookup_connection (struct HTTP_Server_Plugin *plugin, if (url_len < 105) { + GNUNET_break (0); goto error; /* too short */ } hash_start = strrchr (url, '/'); if (NULL == hash_start) { + GNUNET_break (0); goto error; /* '/' delimiter not found */ } if (hash_start >= url_end) { + GNUNET_break (0); goto error; /* mal formed */ } hash_start++; @@ -762,16 +766,19 @@ server_lookup_connection (struct HTTP_Server_Plugin *plugin, goto error; /* ';' delimiter not found */ if (hash_end >= url_end) { + GNUNET_break (0); goto error; /* mal formed */ } if (hash_start >= hash_end) { + GNUNET_break (0); goto error; /* mal formed */ } if ((strlen(hash_start) - strlen(hash_end)) != 103) { + GNUNET_break (0); goto error; /* invalid hash length */ } @@ -780,11 +787,13 @@ server_lookup_connection (struct HTTP_Server_Plugin *plugin, hash[103] = '\0'; if (GNUNET_OK != GNUNET_CRYPTO_hash_from_string ((const char *) hash, &(target.hashPubKey))) { + GNUNET_break (0); goto error; /* mal formed */ } if (hash_end >= url_end) { + GNUNET_break (0); goto error; /* mal formed */ } @@ -902,9 +911,7 @@ server_lookup_connection (struct HTTP_Server_Plugin *plugin, create: /* create new session */ - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Creating new session for peer `%s' \n", - GNUNET_i2s (&target)); + switch (conn_info->client_addr->sa_family) { case (AF_INET): @@ -922,7 +929,12 @@ create: goto error; } - s = GNUNET_malloc (sizeof (struct HttpServerSession)); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Creating new session for peer `%s' connecting from `%s'\n", + GNUNET_i2s (&target), + http_common_plugin_address_to_string (NULL, addr, addr_len)); + + s = GNUNET_malloc (sizeof (struct Session)); memcpy (&s->target, &target, sizeof (struct GNUNET_PeerIdentity)); s->plugin = plugin; s->addr = addr; @@ -945,7 +957,13 @@ error: return NULL; found: + sc = GNUNET_malloc (sizeof (struct ServerConnection)); + + if (conn_info->client_addr->sa_family == AF_INET) + sc->mhd_daemon = plugin->server_v4; + if (conn_info->client_addr->sa_family == AF_INET6) + sc->mhd_daemon = plugin->server_v6; sc->mhd_conn = mhd_connection; sc->direction = direction; sc->session = s; @@ -961,24 +979,17 @@ found: "Setting timeout for %p to %u sec.\n", sc, to); MHD_set_connection_option (mhd_connection, MHD_CONNECTION_OPTION_TIMEOUT, to); - struct MHD_Daemon *d = NULL; -#if 0 - if (s->addrlen == sizeof (struct IPv6HttpAddress)) - d = plugin->server_v6; - if (s->addrlen == sizeof (struct IPv4HttpAddress)) - d = plugin->server_v4; -#endif - server_reschedule (plugin, d, GNUNET_NO); + server_reschedule (plugin, sc->mhd_daemon, GNUNET_NO); #endif return sc; } -static struct HttpServerSession * +static struct Session * server_lookup_session (struct HTTP_Server_Plugin *plugin, struct ServerConnection * sc) { - struct HttpServerSession *s; + struct Session *s; for (s = plugin->head; NULL != s; s = s->next) if ((s->server_recv == sc) || (s->server_send == sc)) @@ -989,6 +1000,110 @@ server_lookup_session (struct HTTP_Server_Plugin *plugin, return NULL; } +int +server_exist_session (struct HTTP_Server_Plugin *plugin, struct Session *s) +{ + struct Session * head; + + GNUNET_assert (NULL != plugin); + GNUNET_assert (NULL != s); + + for (head = plugin->head; head != NULL; head = head->next) + { + if (head == s) + return GNUNET_YES; + } + return GNUNET_NO; +} + + +/** + * Callback called by MHD when it needs data to send + * @param cls current session + * @param pos position in buffer + * @param buf the buffer to write data to + * @param max max number of bytes available in buffer + * @return bytes written to buffer + */ +static ssize_t +server_send_callback (void *cls, uint64_t pos, char *buf, size_t max) +{ + struct Session *s = cls; + ssize_t bytes_read = 0; + struct HTTP_Message *msg; + + GNUNET_assert (NULL != p); + if (GNUNET_NO == server_exist_session (p, s)) + return 0; + msg = s->msg_head; + if (NULL != msg) + { + /* sending */ + bytes_read = GNUNET_MIN (msg->size - msg->pos, + max); + memcpy (buf, &msg->buf[msg->pos], bytes_read); + msg->pos += bytes_read; + + /* removing message */ + if (msg->pos == msg->size) + { + GNUNET_CONTAINER_DLL_remove (s->msg_head, s->msg_tail, msg); + if (NULL != msg->transmit_cont) + msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK); + GNUNET_free (msg); + } + } + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name, + "Sent %u bytes\n", s, bytes_read); + return bytes_read; +} + +/** + * Callback called by MessageStreamTokenizer when a message has arrived + * @param cls current session as closure + * @param client clien + * @param message the message to be forwarded to transport service + */ +static int +server_receive_mst_cb (void *cls, void *client, + const struct GNUNET_MessageHeader *message) +{ + struct Session *s = cls; + struct GNUNET_ATS_Information atsi[2]; + struct GNUNET_TIME_Relative delay; + + GNUNET_assert (NULL != p); + if (GNUNET_NO == server_exist_session(p, s)) + return GNUNET_OK; + + struct HTTP_Server_Plugin *plugin = s->plugin; + + atsi[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); + atsi[0].value = htonl (1); + atsi[1].type = htonl (GNUNET_ATS_NETWORK_TYPE); + atsi[1].value = s->ats_address_network_type; + GNUNET_break (s->ats_address_network_type != ntohl (GNUNET_ATS_NET_UNSPECIFIED)); + + delay = plugin->env->receive (plugin->env->cls, + &s->target, + message, + (const struct GNUNET_ATS_Information *) &atsi, 2, + s, s->addr, s->addrlen); + + s->next_receive = + GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), delay); + if (delay.rel_value > 0) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Server: peer `%s' address `%s' next read delayed for %llu ms\n", + GNUNET_i2s (&s->target), + http_common_plugin_address_to_string (NULL, s->addr, s->addrlen), + delay); + } + server_reschedule_session_timeout (s); + return GNUNET_OK; +} + static int server_access_cb (void *cls, struct MHD_Connection *mhd_connection, @@ -1000,7 +1115,7 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, int res = MHD_YES; struct ServerConnection *sc = *httpSessionCache; - struct HttpServerSession *s; + struct Session *s; struct MHD_Response *response; GNUNET_assert (cls != NULL); @@ -1033,7 +1148,6 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, s = sc->session; GNUNET_assert (NULL != s); -#if 0 /* connection is to be disconnected */ if (sc->disconnect == GNUNET_YES) { @@ -1056,11 +1170,10 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, if (sc->direction == _SEND) { - response = - MHD_create_response_from_callback (MHD_SIZE_UNKNOWN, - 32 * 1024, - &server_send_callback, s, - NULL); + response = MHD_create_response_from_callback (MHD_SIZE_UNKNOWN, + 32 * 1024, + &server_send_callback, s, + NULL); MHD_queue_response (mhd_connection, MHD_HTTP_OK, response); MHD_destroy_response (response); return MHD_YES; @@ -1070,10 +1183,11 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, if (*upload_data_size == 0) { GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: Peer `%s' PUT on address `%s' connected\n", + "Peer `%s' PUT on address `%s' connected\n", GNUNET_i2s (&s->target), - http_plugin_address_to_string (NULL, s->addr, - s->addrlen)); + http_common_plugin_address_to_string (NULL, + s->addr, + s->addrlen)); return MHD_YES; } @@ -1081,17 +1195,18 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, if ((*upload_data_size > 0)) { GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: peer `%s' PUT on address `%s' received %u bytes\n", + "Peer `%s' PUT on address `%s' received %u bytes\n", GNUNET_i2s (&s->target), - http_plugin_address_to_string (NULL, s->addr, - s->addrlen), + http_common_plugin_address_to_string (NULL, + s->addr, + s->addrlen), *upload_data_size); struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); if ((s->next_receive.abs_value <= now.abs_value)) { GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: %p: PUT with %u bytes forwarded to MST\n", s, + "PUT with %u bytes forwarded to MST\n", s, *upload_data_size); if (s->msg_tk == NULL) { @@ -1105,33 +1220,30 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, struct ServerConnection *t = NULL; GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: Received %u bytes\n", *upload_data_size); + "Received %u bytes\n", *upload_data_size); /* Setting timeouts for other connections */ if (s->server_recv != NULL) { t = s->server_recv; GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: Setting timeout for %p to %u sec.\n", t, + "Setting timeout for %p to %u sec.\n", t, to); - MHD_set_connection_option (t->mhd_conn, MHD_CONNECTION_OPTION_TIMEOUT, + MHD_set_connection_option (t->mhd_conn, + MHD_CONNECTION_OPTION_TIMEOUT, to); + server_reschedule (plugin, t->mhd_daemon, GNUNET_NO); } if (s->server_send != NULL) { t = s->server_send; GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Server: Setting timeout for %p to %u sec.\n", t, + "Setting timeout for %p to %u sec.\n", t, to); - MHD_set_connection_option (t->mhd_conn, MHD_CONNECTION_OPTION_TIMEOUT, + MHD_set_connection_option (t->mhd_conn, + MHD_CONNECTION_OPTION_TIMEOUT, to); + server_reschedule (plugin, t->mhd_daemon, GNUNET_NO); } - struct MHD_Daemon *d = NULL; - - if (s->addrlen == sizeof (struct IPv6HttpAddress)) - d = plugin->server_v6; - if (s->addrlen == sizeof (struct IPv4HttpAddress)) - d = plugin->server_v4; - server_reschedule (plugin, d, GNUNET_NO); #endif (*upload_data_size) = 0; } @@ -1146,7 +1258,6 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, else return MHD_NO; } -#endif return res; } @@ -1617,8 +1728,8 @@ server_start (struct HTTP_Server_Plugin *plugin) void server_stop (struct HTTP_Server_Plugin *plugin) { - struct HttpServerSession *s = NULL; - struct HttpServerSession *t = NULL; + struct Session *s = NULL; + struct Session *t = NULL; struct MHD_Daemon *server_v4_tmp = plugin->server_v4; plugin->server_v4 = NULL; @@ -2225,12 +2336,15 @@ server_configure_plugin (struct HTTP_Server_Plugin *plugin) return GNUNET_OK; } +/** + * Session was idle, so disconnect it + */ static void server_session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_assert (NULL != cls); - struct HttpServerSession *s = cls; + struct Session *s = cls; s->timeout_task = GNUNET_SCHEDULER_NO_TASK; GNUNET_log (TIMEOUT_LOG, @@ -2245,7 +2359,7 @@ server_session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc * Start session timeout */ static void -server_start_session_timeout (struct HttpServerSession *s) +server_start_session_timeout (struct Session *s) { GNUNET_assert (NULL != s); GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); @@ -2257,30 +2371,24 @@ server_start_session_timeout (struct HttpServerSession *s) s, (unsigned long long) TIMEOUT.rel_value); } -#if 0 -/** - * Session was idle, so disconnect it - */ - /** * Increment session timeout due to activity */ static void -server_reschedule_session_timeout (struct HttpServerSession *s) +server_reschedule_session_timeout (struct Session *s) { GNUNET_assert (NULL != s); GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); GNUNET_SCHEDULER_cancel (s->timeout_task); s->timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, - &session_timeout, + &server_session_timeout, s); GNUNET_log (TIMEOUT_LOG, "Timeout rescheduled for session %p set to %llu ms\n", s, (unsigned long long) TIMEOUT.rel_value); } -#endif /** * Exit point from the plugin. @@ -2339,6 +2447,7 @@ LIBGNUNET_PLUGIN_TRANSPORT_INIT (void *cls) plugin = GNUNET_malloc (sizeof (struct HTTP_Server_Plugin)); plugin->env = env; + p = plugin; api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions)); api->cls = plugin; api->send = &http_server_plugin_send; -- cgit v1.2.3