From fd512b7696978adc5241f83092756afbdfcfd1af Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 23 Jun 2014 11:26:59 +0000 Subject: -towards having the monitoring API supported by TCP --- src/transport/plugin_transport_tcp.c | 310 ++++++++++++++++++++++++++-------- src/transport/plugin_transport_udp.c | 1 + src/transport/plugin_transport_unix.c | 1 + 3 files changed, 240 insertions(+), 72 deletions(-) (limited to 'src/transport') diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 4d9ab1b41..f3791c138 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -271,6 +271,11 @@ struct Session */ struct GNUNET_SERVER_TransmitHandle *transmit_handle; + /** + * Address of the other peer. + */ + struct GNUNET_HELLO_Address *address; + /** * ID of task used to delay receiving more to throttle sender. */ @@ -281,25 +286,27 @@ struct Session */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; - struct GNUNET_HELLO_Address *address; - - /** - * Address of the other peer (either based on our 'connect' - * call or on our 'accept' call). - * - * struct IPv4TcpAddress or struct IPv6TcpAddress - */ - //void *addr; /** - * Length of @e addr. + * When will this session time out? */ - //size_t addrlen; + struct GNUNET_TIME_Absolute timeout; + /** * Last activity on this connection. Used to select preferred * connection. */ struct GNUNET_TIME_Absolute last_activity; + /** + * Number of bytes waiting for transmission to this peer. + */ + unsigned long long bytes_in_queue; + + /** + * Number of messages waiting for transmission to this peer. + */ + unsigned int msgs_in_queue; + /** * Are we still expecting the welcome message? (#GNUNET_YES/#GNUNET_NO) */ @@ -377,6 +384,16 @@ struct Plugin */ struct GNUNET_RESOLVER_RequestHandle *ext_dns; + /** + * Function to call about session status changes. + */ + GNUNET_TRANSPORT_SessionInfoCallback sic; + + /** + * Closure for @e sic. + */ + void *sic_cls; + /** * How many more TCP sessions are we allowed to open right now? */ @@ -410,6 +427,40 @@ struct Plugin }; + +/** + * If a session monitor is attached, notify it about the new + * session state. + * + * @param plugin our plugin + * @param session session that changed state + * @param state new state of the session + */ +static void +notify_session_monitor (struct Plugin *plugin, + struct Session *session, + enum GNUNET_TRANSPORT_SessionState state) +{ + struct GNUNET_TRANSPORT_SessionInfo info; + + if (NULL == plugin->sic) + return; + memset (&info, 0, sizeof (info)); + info.state = state; + info.is_inbound = GNUNET_SYSERR; /* hard to say */ + info.num_msg_pending = session->msgs_in_queue; + info.num_bytes_pending = session->bytes_in_queue; + /* info.receive_delay remains zero as this is not supported by UDP + (cannot selectively not receive from 'some' peer while continuing + to receive from others) */ + info.session_timeout = session->timeout; + info.address = session->address; + plugin->sic (plugin->sic_cls, + session, + &info); +} + + /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -780,6 +831,7 @@ tcp_disconnect_session (void *cls, { GNUNET_SCHEDULER_cancel (session->timeout_task); session->timeout_task = GNUNET_SCHEDULER_NO_TASK; + session->timeout = GNUNET_TIME_UNIT_ZERO_ABS; } if (GNUNET_YES @@ -825,16 +877,26 @@ tcp_disconnect_session (void *cls, : "Could not deliver message to `%4s', notifying.\n", GNUNET_i2s (&session->target)); GNUNET_STATISTICS_update (session->plugin->env->stats, - gettext_noop ("# bytes currently in TCP buffers"), - -(int64_t) pm->message_size, GNUNET_NO); - GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop - ("# bytes discarded by TCP (disconnect)"), pm->message_size, GNUNET_NO); - GNUNET_CONTAINER_DLL_remove(session->pending_messages_head, - session->pending_messages_tail, pm); + gettext_noop ("# bytes currently in TCP buffers"), + -(int64_t) pm->message_size, GNUNET_NO); + GNUNET_STATISTICS_update (session->plugin->env->stats, + gettext_noop ("# bytes discarded by TCP (disconnect)"), + pm->message_size, + GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, + session->pending_messages_tail, + pm); + GNUNET_assert (0 < session->msgs_in_queue); + session->msgs_in_queue--; + GNUNET_assert (pm->message_size <= session->bytes_in_queue); + session->bytes_in_queue -= pm->message_size; if (NULL != pm->transmit_cont) - pm->transmit_cont (pm->transmit_cont_cls, &session->target, GNUNET_SYSERR, - pm->message_size, 0); - GNUNET_free(pm); + pm->transmit_cont (pm->transmit_cont_cls, + &session->target, + GNUNET_SYSERR, + pm->message_size, + 0); + GNUNET_free (pm); } if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK ) { @@ -881,10 +943,25 @@ session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Session *s = cls; + struct GNUNET_TIME_Relative left; s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + left = GNUNET_TIME_absolute_get_remaining (s->timeout); + if (0 != left.rel_value_us) + { + /* not actually our turn yet, but let's at least update + the monitor, it may think we're about to die ... */ + notify_session_monitor (s->plugin, + s, + GNUNET_TRANSPORT_SS_UP); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, + &session_timeout, + s); + return; + } GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, - "Session %p was idle for %s, disconnecting\n", s, + "Session %p was idle for %s, disconnecting\n", + s, GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES)); /* call session destroy function */ @@ -901,14 +978,10 @@ static void reschedule_session_timeout (struct Session *s) { GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task); - GNUNET_SCHEDULER_cancel (s->timeout_task); - s->timeout_task = GNUNET_SCHEDULER_add_delayed ( - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, s); - GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, - "Timeout rescheduled for session %p set to %s\n", s, - GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES)); + s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); } + /** * Create a new session. Also queues a welcome message. * @@ -959,8 +1032,11 @@ create_session (struct Plugin *plugin, GNUNET_STATISTICS_update (plugin->env->stats, gettext_noop ("# bytes currently in TCP buffers"), pm->message_size, GNUNET_NO); - GNUNET_CONTAINER_DLL_insert(session->pending_messages_head, - session->pending_messages_tail, pm); + GNUNET_CONTAINER_DLL_insert (session->pending_messages_head, + session->pending_messages_tail, + pm); + session->msgs_in_queue++; + session->bytes_in_queue += pm->message_size; if (GNUNET_YES != is_nat) { GNUNET_STATISTICS_update (plugin->env->stats, @@ -968,8 +1044,10 @@ create_session (struct Plugin *plugin, } plugin->env->register_quota_notification (plugin->env->cls, &address->peer, PLUGIN_NAME, session); - session->timeout_task = GNUNET_SCHEDULER_add_delayed ( - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session); + session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + session); return session; } @@ -1012,9 +1090,9 @@ do_transmit (void *cls, size_t size, void *buf) plugin = session->plugin; if (NULL == buf) { - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Timeout trying to transmit to peer `%4s', discarding message queue.\n", - GNUNET_i2s (&session->target)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Timeout trying to transmit to peer `%4s', discarding message queue.\n", + GNUNET_i2s (&session->target)); /* timeout; cancel all messages that have already expired */ hd = NULL; tl = NULL; @@ -1023,13 +1101,19 @@ do_transmit (void *cls, size_t size, void *buf) while ((NULL != (pos = session->pending_messages_head)) && (pos->timeout.abs_value_us <= now.abs_value_us)) { - GNUNET_CONTAINER_DLL_remove(session->pending_messages_head, - session->pending_messages_tail, pos); - LOG(GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit %u byte message to `%4s'.\n", pos->message_size, - GNUNET_i2s (&session->target)); + GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, + session->pending_messages_tail, + pos); + GNUNET_assert (0 < session->msgs_in_queue); + session->msgs_in_queue--; + GNUNET_assert (pos->message_size <= session->bytes_in_queue); + session->bytes_in_queue -= pos->message_size; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed to transmit %u byte message to `%4s'.\n", + pos->message_size, + GNUNET_i2s (&session->target)); ret += pos->message_size; - GNUNET_CONTAINER_DLL_insert_after(hd, tl, tl, pos); + GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos); } /* do this call before callbacks (so that if callbacks destroy * session, they have a chance to cancel actions done by this @@ -1040,10 +1124,12 @@ do_transmit (void *cls, size_t size, void *buf) * the callbacks may abort the session */ while (NULL != (pos = hd)) { - GNUNET_CONTAINER_DLL_remove(hd, tl, pos); - if (pos->transmit_cont != NULL ) - pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_SYSERR, - pos->message_size, 0); + GNUNET_CONTAINER_DLL_remove (hd, tl, pos); + if (pos->transmit_cont != NULL) + pos->transmit_cont (pos->transmit_cont_cls, + &pid, + GNUNET_SYSERR, + pos->message_size, 0); GNUNET_free(pos); } GNUNET_STATISTICS_update (plugin->env->stats, @@ -1062,10 +1148,16 @@ do_transmit (void *cls, size_t size, void *buf) { if (ret + pos->message_size > size) break; - GNUNET_CONTAINER_DLL_remove(session->pending_messages_head, - session->pending_messages_tail, pos); + GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, + session->pending_messages_tail, + pos); + GNUNET_assert (0 < session->msgs_in_queue); + session->msgs_in_queue--; + GNUNET_assert (pos->message_size <= session->bytes_in_queue); + session->bytes_in_queue -= pos->message_size; GNUNET_assert(size >= pos->message_size); - LOG(GNUNET_ERROR_TYPE_DEBUG, "Transmitting message of type %u size %u\n", + LOG(GNUNET_ERROR_TYPE_DEBUG, + "Transmitting message of type %u size %u\n", ntohs (((struct GNUNET_MessageHeader * ) pos->msg)->type), pos->message_size); /* FIXME: this memcpy can be up to 7% of our total runtime */ @@ -1073,7 +1165,7 @@ do_transmit (void *cls, size_t size, void *buf) cbuf += pos->message_size; ret += pos->message_size; size -= pos->message_size; - GNUNET_CONTAINER_DLL_insert_tail(hd, tl, pos); + GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos); } /* schedule 'continuation' before callbacks so that callbacks that * cancel everything don't cause us to use a session that no longer @@ -1085,10 +1177,13 @@ do_transmit (void *cls, size_t size, void *buf) * we should not use 'session' after this point */ while (NULL != (pos = hd)) { - GNUNET_CONTAINER_DLL_remove(hd, tl, pos); - if (pos->transmit_cont != NULL ) - pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_OK, - pos->message_size, pos->message_size); /* FIXME: include TCP overhead */ + GNUNET_CONTAINER_DLL_remove (hd, tl, pos); + if (pos->transmit_cont != NULL) + pos->transmit_cont (pos->transmit_cont_cls, + &pid, + GNUNET_OK, + pos->message_size, + pos->message_size); /* FIXME: include TCP overhead */ GNUNET_free(pos); } GNUNET_assert(hd == NULL); @@ -1253,11 +1348,12 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, "Asked to transmit %u bytes to `%s', added message to list.\n", msgbuf_size, GNUNET_i2s (&session->target)); - if (GNUNET_YES - == GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap, - &session->target, session)) + if (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap, + &session->target, + session)) { - GNUNET_assert(NULL != session->client); + GNUNET_assert (NULL != session->client); GNUNET_SERVER_client_set_timeout (session->client, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); GNUNET_STATISTICS_update (plugin->env->stats, @@ -1265,9 +1361,11 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, GNUNET_NO); /* append pm to pending_messages list */ - GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head, - session->pending_messages_tail, pm); - + GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head, + session->pending_messages_tail, + pm); + session->msgs_in_queue++; + session->bytes_in_queue += pm->message_size; process_pending_messages (session); return msgbuf_size; } @@ -1283,17 +1381,25 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, GNUNET_NO); /* append pm to pending_messages list */ - GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head, - session->pending_messages_tail, pm); + GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head, + session->pending_messages_tail, + pm); + session->msgs_in_queue++; + session->bytes_in_queue += pm->message_size; return msgbuf_size; } else { - LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid session %p\n", session); + LOG(GNUNET_ERROR_TYPE_ERROR, + "Invalid session %p\n", session); if (NULL != cont) - cont (cont_cls, &session->target, GNUNET_SYSERR, pm->message_size, 0); - GNUNET_break(0); - GNUNET_free(pm); + cont (cont_cls, + &session->target, + GNUNET_SYSERR, + pm->message_size, + 0); + GNUNET_break (0); + GNUNET_free (pm); return GNUNET_SYSERR; /* session does not exist here */ } } @@ -2346,9 +2452,10 @@ notify_send_probe (void *cls, size_t ret; tcp_probe_ctx->transmit_handle = NULL; - GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail, - tcp_probe_ctx); - if (buf == NULL ) + GNUNET_CONTAINER_DLL_remove (plugin->probe_head, + plugin->probe_tail, + tcp_probe_ctx); + if (buf == NULL) { GNUNET_CONNECTION_destroy (tcp_probe_ctx->sock); GNUNET_free(tcp_probe_ctx); @@ -2402,8 +2509,9 @@ try_connection_reversal (void *cls, sizeof(struct GNUNET_PeerIdentity)); tcp_probe_ctx->plugin = plugin; tcp_probe_ctx->sock = sock; - GNUNET_CONTAINER_DLL_insert(plugin->probe_head, plugin->probe_tail, - tcp_probe_ctx); + GNUNET_CONTAINER_DLL_insert (plugin->probe_head, + plugin->probe_tail, + tcp_probe_ctx); tcp_probe_ctx->transmit_handle = GNUNET_CONNECTION_notify_transmit_ready ( sock, ntohs (tcp_probe_ctx->message.header.size), GNUNET_TIME_UNIT_FOREVER_REL, ¬ify_send_probe, tcp_probe_ctx); @@ -2432,6 +2540,62 @@ tcp_get_network (void *cls, } +/** + * Return information about the given session to the + * monitor callback. + * + * @param cls the `struct Plugin` with the monitor callback (`sic`) + * @param peer peer we send information about + * @param value our `struct Session` to send information about + * @return #GNUNET_OK (continue to iterate) + */ +static int +send_session_info_iter (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *value) +{ + struct Plugin *plugin = cls; + struct Session *session = value; + + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); + return GNUNET_OK; +} + + +/** + * Begin monitoring sessions of a plugin. There can only + * be one active monitor per plugin (i.e. if there are + * multiple monitors, the transport service needs to + * multiplex the generated events over all of them). + * + * @param cls closure of the plugin + * @param sic callback to invoke, NULL to disable monitor; + * plugin will being by iterating over all active + * sessions immediately and then enter monitor mode + * @param sic_cls closure for @a sic + */ +static void +tcp_plugin_setup_monitor (void *cls, + GNUNET_TRANSPORT_SessionInfoCallback sic, + void *sic_cls) +{ + struct Plugin *plugin = cls; + + plugin->sic = sic; + plugin->sic_cls = sic_cls; + if (NULL != sic) + { + GNUNET_CONTAINER_multipeermap_iterate (plugin->sessionmap, + &send_session_info_iter, + plugin); + /* signal end of first iteration */ + sic (sic_cls, NULL, NULL); + } +} + + /** * Entry point for the plugin. * @@ -2569,6 +2733,7 @@ libgnunet_plugin_transport_tcp_init (void *cls) api->get_network = &tcp_get_network; api->update_session_timeout = &tcp_plugin_update_session_timeout; api->update_inbound_delay = &tcp_plugin_update_inbound_delay; + api->setup_monitor = &tcp_plugin_setup_monitor; plugin->service = service; if (NULL != service) { @@ -2678,8 +2843,9 @@ libgnunet_plugin_transport_tcp_done (void *cls) GNUNET_NAT_unregister (plugin->nat); while (NULL != (tcp_probe = plugin->probe_head)) { - GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail, - tcp_probe); + GNUNET_CONTAINER_DLL_remove (plugin->probe_head, + plugin->probe_tail, + tcp_probe); GNUNET_CONNECTION_destroy (tcp_probe->sock); GNUNET_free(tcp_probe); } diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 782a5ea8f..e9d6b54d4 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -1573,6 +1573,7 @@ create_session (struct Plugin *plugin, s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; + s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, &session_timeout, s); return s; diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 0c3d29733..9ca58ef51 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c @@ -886,6 +886,7 @@ unix_plugin_get_session (void *cls, session->target = address->peer; session->address = GNUNET_HELLO_address_copy (address); session->plugin = plugin; + session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session); -- cgit v1.2.3