From 74b34ed9b400f74a7977e268626b85b51acfedd4 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Fri, 16 Sep 2011 16:15:52 +0000 Subject: transmitting data --- src/transport/plugin_transport_http.h | 67 +++++++++++- src/transport/plugin_transport_http_client.c | 90 ++++++++-------- src/transport/plugin_transport_http_new.c | 43 ++++++-- src/transport/plugin_transport_http_server.c | 152 +++++++++++++++++++++++++-- 4 files changed, 296 insertions(+), 56 deletions(-) (limited to 'src/transport') diff --git a/src/transport/plugin_transport_http.h b/src/transport/plugin_transport_http.h index b7b89e6e6..5e452ba84 100644 --- a/src/transport/plugin_transport_http.h +++ b/src/transport/plugin_transport_http.h @@ -45,6 +45,7 @@ #define DEBUG_HTTP GNUNET_YES #define VERBOSE_SERVER GNUNET_YES #define VERBOSE_CLIENT GNUNET_YES +#define VERBOSE_CURL GNUNET_NO #if BUILD_HTTPS #define LIBGNUNET_PLUGIN_TRANSPORT_INIT libgnunet_plugin_transport_https_init @@ -179,6 +180,17 @@ struct Session */ struct Plugin *plugin; + /** + * next pointer for double linked list + */ + struct HTTP_Message *msg_head; + + /** + * previous pointer for double linked list + */ + struct HTTP_Message *msg_tail; + + /** * message stream tokenizer for incoming data */ @@ -229,6 +241,7 @@ struct Session void *client_put; void *client_get; + int put_paused; void *server_recv; void *server_send; @@ -238,6 +251,49 @@ struct Session }; +/** + * Message to send using http + */ +struct HTTP_Message +{ + /** + * next pointer for double linked list + */ + struct HTTP_Message *next; + + /** + * previous pointer for double linked list + */ + struct HTTP_Message *prev; + + /** + * buffer containing data to send + */ + char *buf; + + /** + * amount of data already sent + */ + size_t pos; + + /** + * buffer length + */ + size_t size; + + /** + * Continuation function to call once the transmission buffer + * has again space available. NULL if there is no + * continuation to call. + */ + GNUNET_TRANSPORT_TransmitContinuation transmit_cont; + + /** + * Closure for transmit_cont. + */ + void *transmit_cont_cls; +}; + void delete_session (struct Session *s); @@ -246,6 +302,13 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, const void *addr, size_t addrlen, GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls); +struct GNUNET_TIME_Relative +http_plugin_receive (void *cls, const struct GNUNET_PeerIdentity * peer, + const struct GNUNET_MessageHeader * message, + struct Session * session, + const char *sender_address, + uint16_t sender_address_len); + const char * http_plugin_address_to_string (void *cls, const void *addr, size_t addrlen); @@ -256,7 +319,7 @@ int client_connect (struct Session *s); int -client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size); +client_send (struct Session *s, struct HTTP_Message *msg); int client_start (struct Plugin *plugin); @@ -268,7 +331,7 @@ int server_disconnect (struct Session *s); int -server_send (struct Session *s, const char *msgbuf, size_t msgbuf_size); +server_send (struct Session *s, struct HTTP_Message * msg); int server_start (struct Plugin *plugin); diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index 04a985906..3b3a4705b 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c @@ -26,7 +26,7 @@ #include "plugin_transport_http.h" -#if VERBOSE_CLIENT +#if VERBOSE_CURL /** * Function to log curl debug messages with GNUNET_log * @param curl handle @@ -58,8 +58,9 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls) #endif int -client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size) +client_send (struct Session *s, struct HTTP_Message *msg) { + GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg); return GNUNET_OK; } @@ -183,7 +184,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); #endif client_disconnect(s); - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); + //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); if (s->msg_tk != NULL) GNUNET_SERVER_mst_destroy (s->msg_tk); notify_session_end (plugin, &s->target, s); @@ -202,6 +203,8 @@ client_disconnect (struct Session *s) int res = GNUNET_OK; CURLMcode mret; struct Plugin *plugin = s->plugin; + struct HTTP_Message * msg; + struct HTTP_Message * t; #if 0 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, @@ -241,6 +244,17 @@ client_disconnect (struct Session *s) s->client_get = NULL; } + msg = s->msg_head; + while (msg != NULL) + { + t = msg->next; + if (NULL != msg->transmit_cont) + msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_SYSERR); + GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg); + GNUNET_free (msg); + msg = t; + } + plugin->cur_connections -= 2; /* Re-schedule since handles have changed */ if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) @@ -255,20 +269,15 @@ client_disconnect (struct Session *s) } static void -curl_receive_mst_cb (void *cls, void *client, +client_receive_mst_cb (void *cls, void *client, const struct GNUNET_MessageHeader *message) { struct Session *s = cls; struct Plugin *plugin = s->plugin; - struct GNUNET_TRANSPORT_ATS_Information distance[2]; struct GNUNET_TIME_Relative delay; - distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE); - distance[0].value = htonl (1); - distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - distance[1].value = htonl (0); + delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen); - delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen); s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay); if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) @@ -290,23 +299,23 @@ curl_receive_mst_cb (void *cls, void *client, * @return bytes read from stream */ static size_t -curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls) +client_receive (void *stream, size_t size, size_t nmemb, void *cls) { struct Session *s = cls; struct Plugin *plugin = s->plugin; if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) { -#if DEBUG_HTTP +#if DEBUG_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", + "no inbound bandwidth available! Next read was delayed for %llu ms\n", s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value); #endif return 0; } if (s->msg_tk == NULL) - s->msg_tk = GNUNET_SERVER_mst_create (&curl_receive_mst_cb, s); + s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s); GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO, GNUNET_NO); @@ -329,30 +338,30 @@ curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls) * @return bytes written to stream */ static size_t -curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr) +client_send_cb (void *stream, size_t size, size_t nmemb, void *cls) { + struct Session *s = cls; + //struct Plugin *plugin = s->plugin; size_t bytes_sent = 0; - -#if 0 - struct Session *ps = ptr; - struct HTTP_Message *msg = ps->pending_msgs_tail; - size_t len; - if (ps->send_active == GNUNET_NO) + struct HTTP_Message *msg = s->msg_head; +/* + if (s->put_paused == GNUNET_NO) return CURL_READFUNC_PAUSE; - if ((ps->pending_msgs_tail == NULL) && (ps->send_active == GNUNET_YES)) + if ((s->msg_head == NULL) && (s->put_paused == GNUNET_YES)) { -#if DEBUG_CONNECTIONS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connection %X: No Message to send, pausing connection\n", ps); +#if VERBOSE_CLIENT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Suspending handle `%s' `%s'\n", + GNUNET_i2s (&s->target),GNUNET_a2s (s->addr, s->addrlen)); #endif - ps->send_active = GNUNET_NO; + s->put_paused = GNUNET_NO; return CURL_READFUNC_PAUSE; } - +*/ + if (msg == NULL) + return bytes_sent; GNUNET_assert (msg != NULL); - /* data to send */ if (msg->pos < msg->size) { @@ -383,17 +392,14 @@ curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr) #if DEBUG_CONNECTIONS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connection %X: Message with %u bytes sent, removing message from queue\n", - ps, msg->pos); + s, msg->pos); #endif /* Calling transmit continuation */ - if (NULL != ps->pending_msgs_tail->transmit_cont) - msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls, - &(ps->peercontext)->identity, GNUNET_OK); - ps->queue_length_cur -= msg->size; - remove_http_message (ps, msg); + if (NULL != msg->transmit_cont) + msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK); + GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg); + GNUNET_free (msg); } - -#endif return bytes_sent; } @@ -423,7 +429,7 @@ client_connect (struct Session *s) #endif /* create get connection */ s->client_get = curl_easy_init (); -#if VERBOSE_CLIENT +#if VERBOSE_CURL curl_easy_setopt (s->client_get, CURLOPT_VERBOSE, 1L); curl_easy_setopt (s->client_get, CURLOPT_DEBUGFUNCTION, &client_log); curl_easy_setopt (s->client_get, CURLOPT_DEBUGDATA, s->client_get); @@ -436,9 +442,9 @@ client_connect (struct Session *s) curl_easy_setopt (s->client_get, CURLOPT_URL, url); //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb); //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps); - curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb); + curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, client_send_cb); curl_easy_setopt (s->client_get, CURLOPT_READDATA, s); - curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); + curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, client_receive); curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s); curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS, (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); @@ -453,7 +459,7 @@ client_connect (struct Session *s) /* create put connection */ s->client_put = curl_easy_init (); -#if VERBOSE_CLIENT +#if VERBOSE_CURL curl_easy_setopt (s->client_put, CURLOPT_VERBOSE, 1L); curl_easy_setopt (s->client_put, CURLOPT_DEBUGFUNCTION, &client_log); curl_easy_setopt (s->client_put, CURLOPT_DEBUGDATA, s->client_put); @@ -467,9 +473,9 @@ client_connect (struct Session *s) curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L); //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb); //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps); - curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb); + curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, client_send_cb); curl_easy_setopt (s->client_put, CURLOPT_READDATA, s); - curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); + curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, client_receive); curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s); curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS, (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); diff --git a/src/transport/plugin_transport_http_new.c b/src/transport/plugin_transport_http_new.c index 719182cc0..48477b809 100644 --- a/src/transport/plugin_transport_http_new.c +++ b/src/transport/plugin_transport_http_new.c @@ -292,6 +292,27 @@ http_plugin_address_suggested (void *cls, const void *addr, size_t addrlen) return GNUNET_SYSERR; } +struct GNUNET_TIME_Relative +http_plugin_receive (void *cls, const struct GNUNET_PeerIdentity * peer, + const struct GNUNET_MessageHeader * message, + struct Session * session, + const char *sender_address, + uint16_t sender_address_len) +{ + struct Session *s = cls; + struct Plugin *plugin = s->plugin; + struct GNUNET_TRANSPORT_ATS_Information distance[2]; + struct GNUNET_TIME_Relative delay; + + distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE); + distance[0].value = htonl (1); + distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); + distance[1].value = htonl (0); + + delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen); + return delay; +} + /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -425,7 +446,7 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, s->transmit_cont = cont; s->transmit_cont_cls = cont_cls; s->next = NULL; - + s->delay = GNUNET_TIME_absolute_get_forever(); return s; } @@ -486,7 +507,7 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) { struct Plugin *plugin = cls; - + struct HTTP_Message *msg; GNUNET_assert (plugin != NULL); int res = GNUNET_SYSERR; @@ -529,10 +550,20 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target, return GNUNET_SYSERR; } } - else if (s->inbound == GNUNET_NO) - res = client_send (s, msgbuf, msgbuf_size); - else if (s->inbound == GNUNET_YES) - res = server_send (s, msgbuf, msgbuf_size); + + msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size); + msg->next = NULL; + msg->size = msgbuf_size; + msg->pos = 0; + msg->buf = (char *) &msg[1]; + msg->transmit_cont = cont; + msg->transmit_cont_cls = cont_cls; + memcpy (msg->buf, msgbuf, msgbuf_size); + + if (s->inbound == GNUNET_NO) + res = client_send (s, msg); + if (s->inbound == GNUNET_YES) + res = server_send (s, msg); return res; } diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c index 43d9171eb..98fbfda1c 100644 --- a/src/transport/plugin_transport_http_server.c +++ b/src/transport/plugin_transport_http_server.c @@ -231,6 +231,81 @@ server_load_certificate (struct Plugin *plugin) #endif +/** + * Callback called by MessageStreamTokenizer when a message has arrived + * @param cls current session as closure + * @param client clien + * @param message the message to be forwarded to transport service + */ +static void +server_receive_mst_cb (void *cls, void *client, + const struct GNUNET_MessageHeader *message) +{ + struct Session *s = cls; + struct Plugin *plugin = s->plugin; + struct GNUNET_TIME_Relative delay; + + delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen); + + s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay); + + if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) + { +#if VERBOSE_CLIENT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Server: peer `%s' address `%s' next read delayed for %llu ms\n", + GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay); +#endif + } +} + +/** + * Callback called by MHD when it needs data to send + * @param cls current session + * @param pos position in buffer + * @param buf the buffer to write data to + * @param max max number of bytes available in buffer + * @return bytes written to buffer + */ +static ssize_t +mhd_send_callback (void *cls, uint64_t pos, char *buf, size_t max) +{ + struct Session *s = cls; + struct HTTP_Message *msg; + int bytes_read = 0; + + msg = s->msg_head; + if (msg != NULL) + { + /* sending */ + if ((msg->size - msg->pos) <= max) + { + memcpy (buf, &msg->buf[msg->pos], (msg->size - msg->pos)); + bytes_read = msg->size - msg->pos; + msg->pos += (msg->size - msg->pos); + } + else + { + memcpy (buf, &msg->buf[msg->pos], max); + msg->pos += max; + bytes_read = max; + } + + /* removing message */ + if (msg->pos == msg->size) + { + if (NULL != msg->transmit_cont) + msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK); + GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg); + GNUNET_free (msg); + } + } +#if DEBUG_CONNECTIONS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connection %X: MHD has sent %u bytes\n", + s, bytes_read); +#endif + return bytes_read; +} + /** * Process GET or PUT request received via MHD. For * GET, queue response that will send back our pending @@ -403,11 +478,7 @@ error: res = MHD_queue_response (mhd_connection, MHD_HTTP_NOT_FOUND, response); MHD_destroy_response (response); return res; - - found: - - sc = GNUNET_malloc (sizeof (struct ServerConnection)); sc->mhd_conn = mhd_connection; sc->direction = direction; @@ -418,8 +489,9 @@ found: s->server_recv = sc; (*httpSessionCache) = sc; - return MHD_YES; } + + /* existing connection */ sc = (*httpSessionCache); s = sc->session; @@ -437,6 +509,67 @@ found: return MHD_YES; } + GNUNET_assert (s != NULL); + if (sc->direction == _SEND) + { + response = + MHD_create_response_from_callback (-1, 32 * 1024, &mhd_send_callback, + s, NULL); + res = MHD_queue_response (mhd_connection, MHD_HTTP_OK, response); + MHD_destroy_response (response); + return MHD_YES; + } + if (sc->direction == _RECEIVE) + { + if (*upload_data_size == 0) + { +#if VERBOSE_SERVER + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Server: peer `%s' PUT on address `%s' connected\n", + GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); +#endif + return MHD_YES; + } + + /* Recieving data */ + if ((*upload_data_size > 0)) + { +#if VERBOSE_SERVER + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, + "Server: peer `%s' PUT on address `%s' received %u bytes\n", + GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); +#endif + if ((GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection %X: PUT with %u bytes forwarded to MST\n", s, + *upload_data_size); + + if (s->msg_tk == NULL) + { + s->msg_tk = GNUNET_SERVER_mst_create (&server_receive_mst_cb, s); + } + res = GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data, *upload_data_size, GNUNET_NO, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Server: Received %u bytes\n", + *upload_data_size); + (*upload_data_size) = 0; + } + else + { +/* +#if DEBUG_HTTP + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", + s, ps->peercontext->delay.rel_value); +#endif +*/ + } + return MHD_YES; + } + else + return MHD_NO; + } return res; } @@ -484,6 +617,8 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, tc = s->server_send; tc->disconnect = GNUNET_YES; } + if (s->msg_tk != NULL) + GNUNET_SERVER_mst_destroy(s->msg_tk); } GNUNET_free (sc); @@ -500,6 +635,7 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, } plugin->cur_connections--; + if ((s->server_send == NULL) && (s->server_recv == NULL)) { #if VERBOSE_SERVER @@ -507,6 +643,7 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, "Server: peer `%s' on address `%s' disconnected\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); #endif + notify_session_end(s->plugin, &s->target, s); } } @@ -538,8 +675,9 @@ server_disconnect (struct Session *s) } int -server_send (struct Session *s, const char *msgbuf, size_t msgbuf_size) +server_send (struct Session *s, struct HTTP_Message * msg) { + GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg); return GNUNET_OK; } @@ -809,6 +947,8 @@ server_stop (struct Plugin *plugin) while (s != NULL) { t = s->next; + if (s->msg_tk != NULL) + GNUNET_SERVER_mst_destroy(s->msg_tk); delete_session (s); s = t; } -- cgit v1.2.3