From 95a032ed3619880651eda8f0a5abc9d3520a6f8e Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Fri, 25 May 2012 09:34:49 +0000 Subject: session timeout for udp and tcp --- src/transport/plugin_transport_tcp.c | 122 +++++++++++++++++++++++- src/transport/plugin_transport_udp.c | 176 +++++++++++++++++++++++++++++------ 2 files changed, 268 insertions(+), 30 deletions(-) diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 8e76398ea..18482b11e 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -272,6 +272,11 @@ struct Session */ GNUNET_SCHEDULER_TaskIdentifier receive_delay_task; + /** + * Session timeout task + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** * Address of the other peer (either based on our 'connect' * call or on our 'accept' call). @@ -395,6 +400,26 @@ struct Plugin }; + +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s); + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s); + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s); + + /* DEBUG CODE */ static const char * tcp_address_to_string (void *cls, const void *addr, size_t addrlen); @@ -740,6 +765,8 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, gettext_noop ("# TCP sessions active"), 1, GNUNET_NO); } + start_session_timeout (ret); + return ret; } @@ -919,14 +946,16 @@ disconnect_session (struct Session *session) GNUNET_i2s (&session->target), tcp_address_to_string(NULL, session->addr, session->addrlen)); - if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session)) - { + stop_session_timeout (session); + + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session)) + { GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop ("# TCP sessions active"), -1, GNUNET_NO); dec_sessions (plugin, session, __LINE__); - } - else GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session)); + } + else GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session)); /* clean up state */ if (session->transmit_handle != NULL) @@ -1037,6 +1066,8 @@ tcp_plugin_send (void *cls, "Asked to transmit %u bytes to `%s', added message to list.\n", msgbuf_size, GNUNET_i2s (&session->target)); + reschedule_session_timeout (session); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessionmap, &session->target.hashPubKey, session)) { GNUNET_assert (session->client != NULL); @@ -1850,6 +1881,8 @@ delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) session->plugin->env->receive (session->plugin->env->cls, &session->target, NULL, &ats, 0, session, NULL, 0); + reschedule_session_timeout (session); + if (delay.rel_value == 0) GNUNET_SERVER_receive_done (session->client, GNUNET_OK); else @@ -1948,6 +1981,9 @@ handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client, 1, session, (GNUNET_YES == session->inbound) ? NULL : session->addr, (GNUNET_YES == session->inbound) ? 0 : session->addrlen); + + reschedule_session_timeout (session); + if (delay.rel_value == 0) { GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -2086,6 +2122,84 @@ try_connection_reversal (void *cls, const struct sockaddr *addr, } +/** + * Session was idle, so disconnect it + */ +static void +session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_assert (NULL != cls); + struct Session *s = cls; + + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + + /* call session destroy function */ + disconnect_session(s); + +} + +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); + + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + + if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) + { + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n", + s); + } +} + + /** * Entry point for the plugin. * diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 60814327c..5b133951b 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -110,6 +110,11 @@ struct Session */ struct GNUNET_TIME_Absolute flow_delay_from_other_peer; + /** + * Session timeout task + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** * expected delay for ACKs */ @@ -293,6 +298,11 @@ struct UDP_ACK_Message }; +/** + * Encapsulation of all of the state of the plugin. + */ +struct Plugin * plugin; + /** * We have been notified that our readset has something to read. We don't @@ -316,6 +326,26 @@ udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); static void udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s); + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s); + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s); + + + /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -649,18 +679,15 @@ free_session (struct Session *s) /** - * Destroy a session, plugin is being unloaded. + * Functions with this signature are called whenever we need + * to close a session due to a disconnect or failure to + * establish a connection. * - * @param cls unused - * @param key hash of public key of target peer - * @param value a 'struct PeerSession*' to clean up - * @return GNUNET_OK (continue to iterate) + * @param session session to close down */ -static int -disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) +static void +disconnect_session (struct Session *s) { - struct Plugin *plugin = cls; - struct Session *s = value; struct UDPMessageWrapper *udpw; struct UDPMessageWrapper *next; @@ -670,6 +697,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) s, GNUNET_i2s (&s->target), GNUNET_a2s (s->sock_addr, s->addrlen)); + stop_session_timeout(s); next = plugin->ipv4_queue_head; while (NULL != (udpw = next)) { @@ -718,6 +746,20 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) s->in_destroy = GNUNET_YES; else free_session (s); +} + +/** + * Destroy a session, plugin is being unloaded. + * + * @param cls unused + * @param key hash of public key of target peer + * @param value a 'struct PeerSession*' to clean up + * @return GNUNET_OK (continue to iterate) + */ +static int +disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) +{ + disconnect_session(value); return GNUNET_OK; } @@ -804,6 +846,8 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero(); s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS; + start_session_timeout(s); + return s; } @@ -1129,6 +1173,7 @@ udp_plugin_send (void *cls, udp->reserved = htonl (0); udp->sender = *plugin->env->my_identity; + reschedule_session_timeout(s); if (mlen <= UDP_MTU) { udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen); @@ -1289,6 +1334,7 @@ process_inbound_tokenized_messages (void *cls, void *client, si->arg, si->args); si->session->flow_delay_for_other_peer = delay; + reschedule_session_timeout(si->session); return GNUNET_OK; } @@ -2141,6 +2187,82 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct return sockets_created; } +/** + * Session was idle, so disconnect it + */ +static void +session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_assert (NULL != cls); + struct Session *s = cls; + + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + + /* call session destroy function */ + disconnect_session(s); + +} + +/** + * Start session timeout + */ +static void +start_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); + + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + +/** + * Increment session timeout due to activity + */ +static void +reschedule_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); +} + +/** + * Cancel timeout + */ +static void +stop_session_timeout (struct Session *s) +{ + GNUNET_assert (NULL != s); + + if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) + { + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n", + s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n", + s); + } +} /** * The exported method. Makes the core api available via a global and @@ -2154,7 +2276,7 @@ libgnunet_plugin_transport_udp_init (void *cls) { struct GNUNET_TRANSPORT_PluginEnvironment *env = cls; struct GNUNET_TRANSPORT_PluginFunctions *api; - struct Plugin *plugin; + struct Plugin *p; unsigned long long port; unsigned long long aport; unsigned long long broadcast; @@ -2263,21 +2385,23 @@ libgnunet_plugin_transport_udp_init (void *cls) udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */ } - plugin = GNUNET_malloc (sizeof (struct Plugin)); + p = GNUNET_malloc (sizeof (struct Plugin)); api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions)); - GNUNET_BANDWIDTH_tracker_init (&plugin->tracker, + GNUNET_BANDWIDTH_tracker_init (&p->tracker, GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30); - plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10); - plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin); - plugin->port = port; - plugin->aport = aport; - plugin->broadcast_interval = interval; - plugin->enable_ipv6 = enable_v6; - plugin->env = env; - - api->cls = plugin; + p->sessions = GNUNET_CONTAINER_multihashmap_create (10); + p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p); + p->port = port; + p->aport = aport; + p->broadcast_interval = interval; + p->enable_ipv6 = enable_v6; + p->env = env; + + plugin = p; + + api->cls = p; api->send = NULL; api->disconnect = &udp_disconnect; api->address_pretty_printer = &udp_plugin_address_pretty_printer; @@ -2288,11 +2412,11 @@ libgnunet_plugin_transport_udp_init (void *cls) api->send = &udp_plugin_send; LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n"); - res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4); - if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL))) + res = setup_sockets (p, &serverAddrv6, &serverAddrv4); + if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL))) { LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n"); - GNUNET_free (plugin); + GNUNET_free (p); GNUNET_free (api); return NULL; } @@ -2300,7 +2424,7 @@ libgnunet_plugin_transport_udp_init (void *cls) if (broadcast == GNUNET_YES) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n"); - setup_broadcast (plugin, &serverAddrv6, &serverAddrv4); + setup_broadcast (p, &serverAddrv6, &serverAddrv4); } GNUNET_free_non_null (bind4_address); -- cgit v1.2.3