summaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorMatthias Wachs <wachs@net.in.tum.de>2012-05-25 09:34:49 +0000
committerMatthias Wachs <wachs@net.in.tum.de>2012-05-25 09:34:49 +0000
commit95a032ed3619880651eda8f0a5abc9d3520a6f8e (patch)
treec26673078f8180b79386b6a5ff49b1c1e065f0ac /src/transport
parent1e46552077655aeb2f54e59cad77edcdffe3f1ce (diff)
session timeout for udp and tcp
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/plugin_transport_tcp.c122
-rw-r--r--src/transport/plugin_transport_udp.c176
2 files changed, 268 insertions, 30 deletions
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c
index 8e76398ea..18482b11e 100644
--- a/src/transport/plugin_transport_tcp.c
+++ b/src/transport/plugin_transport_tcp.c
@@ -273,6 +273,11 @@ struct Session
GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
/**
+ * Session timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
* Address of the other peer (either based on our 'connect'
* call or on our 'accept' call).
*
@@ -395,6 +400,26 @@ struct Plugin
};
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s);
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s);
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s);
+
+
/* DEBUG CODE */
static const char *
tcp_address_to_string (void *cls, const void *addr, size_t addrlen);
@@ -740,6 +765,8 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
gettext_noop ("# TCP sessions active"), 1,
GNUNET_NO);
}
+ start_session_timeout (ret);
+
return ret;
}
@@ -919,14 +946,16 @@ disconnect_session (struct Session *session)
GNUNET_i2s (&session->target),
tcp_address_to_string(NULL, session->addr, session->addrlen));
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session))
- {
+ stop_session_timeout (session);
+
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session))
+ {
GNUNET_STATISTICS_update (session->plugin->env->stats,
gettext_noop ("# TCP sessions active"), -1,
GNUNET_NO);
dec_sessions (plugin, session, __LINE__);
- }
- else GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session));
+ }
+ else GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session));
/* clean up state */
if (session->transmit_handle != NULL)
@@ -1037,6 +1066,8 @@ tcp_plugin_send (void *cls,
"Asked to transmit %u bytes to `%s', added message to list.\n",
msgbuf_size, GNUNET_i2s (&session->target));
+ reschedule_session_timeout (session);
+
if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessionmap, &session->target.hashPubKey, session))
{
GNUNET_assert (session->client != NULL);
@@ -1850,6 +1881,8 @@ delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
session->plugin->env->receive (session->plugin->env->cls,
&session->target, NULL, &ats, 0, session,
NULL, 0);
+ reschedule_session_timeout (session);
+
if (delay.rel_value == 0)
GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
else
@@ -1948,6 +1981,9 @@ handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client,
1, session,
(GNUNET_YES == session->inbound) ? NULL : session->addr,
(GNUNET_YES == session->inbound) ? 0 : session->addrlen);
+
+ reschedule_session_timeout (session);
+
if (delay.rel_value == 0)
{
GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -2087,6 +2123,84 @@ try_connection_reversal (void *cls, const struct sockaddr *addr,
/**
+ * Session was idle, so disconnect it
+ */
+static void
+session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_assert (NULL != cls);
+ struct Session *s = cls;
+
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+
+ /* call session destroy function */
+ disconnect_session(s);
+
+}
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
+
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+
+ if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n",
+ s);
+ }
+}
+
+
+/**
* Entry point for the plugin.
*
* @param cls closure, the 'struct GNUNET_TRANSPORT_PluginEnvironment*'
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index 60814327c..5b133951b 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -111,6 +111,11 @@ struct Session
struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
/**
+ * Session timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
* expected delay for ACKs
*/
struct GNUNET_TIME_Relative last_expected_delay;
@@ -293,6 +298,11 @@ struct UDP_ACK_Message
};
+/**
+ * Encapsulation of all of the state of the plugin.
+ */
+struct Plugin * plugin;
+
/**
* We have been notified that our readset has something to read. We don't
@@ -317,6 +327,26 @@ static void
udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s);
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s);
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s);
+
+
+
+/**
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
* address and that the next call to this function is allowed
@@ -649,18 +679,15 @@ free_session (struct Session *s)
/**
- * Destroy a session, plugin is being unloaded.
+ * Functions with this signature are called whenever we need
+ * to close a session due to a disconnect or failure to
+ * establish a connection.
*
- * @param cls unused
- * @param key hash of public key of target peer
- * @param value a 'struct PeerSession*' to clean up
- * @return GNUNET_OK (continue to iterate)
+ * @param session session to close down
*/
-static int
-disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
+static void
+disconnect_session (struct Session *s)
{
- struct Plugin *plugin = cls;
- struct Session *s = value;
struct UDPMessageWrapper *udpw;
struct UDPMessageWrapper *next;
@@ -670,6 +697,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
s,
GNUNET_i2s (&s->target),
GNUNET_a2s (s->sock_addr, s->addrlen));
+ stop_session_timeout(s);
next = plugin->ipv4_queue_head;
while (NULL != (udpw = next))
{
@@ -718,6 +746,20 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
s->in_destroy = GNUNET_YES;
else
free_session (s);
+}
+
+/**
+ * Destroy a session, plugin is being unloaded.
+ *
+ * @param cls unused
+ * @param key hash of public key of target peer
+ * @param value a 'struct PeerSession*' to clean up
+ * @return GNUNET_OK (continue to iterate)
+ */
+static int
+disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
+{
+ disconnect_session(value);
return GNUNET_OK;
}
@@ -804,6 +846,8 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
+ start_session_timeout(s);
+
return s;
}
@@ -1129,6 +1173,7 @@ udp_plugin_send (void *cls,
udp->reserved = htonl (0);
udp->sender = *plugin->env->my_identity;
+ reschedule_session_timeout(s);
if (mlen <= UDP_MTU)
{
udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
@@ -1289,6 +1334,7 @@ process_inbound_tokenized_messages (void *cls, void *client,
si->arg,
si->args);
si->session->flow_delay_for_other_peer = delay;
+ reschedule_session_timeout(si->session);
return GNUNET_OK;
}
@@ -2141,6 +2187,82 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct
return sockets_created;
}
+/**
+ * Session was idle, so disconnect it
+ */
+static void
+session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_assert (NULL != cls);
+ struct Session *s = cls;
+
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+
+ /* call session destroy function */
+ disconnect_session(s);
+
+}
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
+
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+
+ if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n",
+ s);
+ }
+}
/**
* The exported method. Makes the core api available via a global and
@@ -2154,7 +2276,7 @@ libgnunet_plugin_transport_udp_init (void *cls)
{
struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
struct GNUNET_TRANSPORT_PluginFunctions *api;
- struct Plugin *plugin;
+ struct Plugin *p;
unsigned long long port;
unsigned long long aport;
unsigned long long broadcast;
@@ -2263,21 +2385,23 @@ libgnunet_plugin_transport_udp_init (void *cls)
udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
}
- plugin = GNUNET_malloc (sizeof (struct Plugin));
+ p = GNUNET_malloc (sizeof (struct Plugin));
api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
- GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
+ GNUNET_BANDWIDTH_tracker_init (&p->tracker,
GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
- plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
- plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
- plugin->port = port;
- plugin->aport = aport;
- plugin->broadcast_interval = interval;
- plugin->enable_ipv6 = enable_v6;
- plugin->env = env;
-
- api->cls = plugin;
+ p->sessions = GNUNET_CONTAINER_multihashmap_create (10);
+ p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
+ p->port = port;
+ p->aport = aport;
+ p->broadcast_interval = interval;
+ p->enable_ipv6 = enable_v6;
+ p->env = env;
+
+ plugin = p;
+
+ api->cls = p;
api->send = NULL;
api->disconnect = &udp_disconnect;
api->address_pretty_printer = &udp_plugin_address_pretty_printer;
@@ -2288,11 +2412,11 @@ libgnunet_plugin_transport_udp_init (void *cls)
api->send = &udp_plugin_send;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
- res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
- if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
+ res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
+ if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
{
LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
- GNUNET_free (plugin);
+ GNUNET_free (p);
GNUNET_free (api);
return NULL;
}
@@ -2300,7 +2424,7 @@ libgnunet_plugin_transport_udp_init (void *cls)
if (broadcast == GNUNET_YES)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
- setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
+ setup_broadcast (p, &serverAddrv6, &serverAddrv4);
}
GNUNET_free_non_null (bind4_address);