summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/transport/plugin_transport_tcp.c310
-rw-r--r--src/transport/plugin_transport_udp.c1
-rw-r--r--src/transport/plugin_transport_unix.c1
3 files changed, 240 insertions, 72 deletions
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c
index 4d9ab1b41..f3791c138 100644
--- a/src/transport/plugin_transport_tcp.c
+++ b/src/transport/plugin_transport_tcp.c
@@ -272,6 +272,11 @@ struct Session
struct GNUNET_SERVER_TransmitHandle *transmit_handle;
/**
+ * Address of the other peer.
+ */
+ struct GNUNET_HELLO_Address *address;
+
+ /**
* ID of task used to delay receiving more to throttle sender.
*/
GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
@@ -281,19 +286,11 @@ struct Session
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
- struct GNUNET_HELLO_Address *address;
-
- /**
- * Address of the other peer (either based on our 'connect'
- * call or on our 'accept' call).
- *
- * struct IPv4TcpAddress or struct IPv6TcpAddress
- */
- //void *addr;
/**
- * Length of @e addr.
+ * When will this session time out?
*/
- //size_t addrlen;
+ struct GNUNET_TIME_Absolute timeout;
+
/**
* Last activity on this connection. Used to select preferred
* connection.
@@ -301,6 +298,16 @@ struct Session
struct GNUNET_TIME_Absolute last_activity;
/**
+ * Number of bytes waiting for transmission to this peer.
+ */
+ unsigned long long bytes_in_queue;
+
+ /**
+ * Number of messages waiting for transmission to this peer.
+ */
+ unsigned int msgs_in_queue;
+
+ /**
* Are we still expecting the welcome message? (#GNUNET_YES/#GNUNET_NO)
*/
int expecting_welcome;
@@ -378,6 +385,16 @@ struct Plugin
struct GNUNET_RESOLVER_RequestHandle *ext_dns;
/**
+ * Function to call about session status changes.
+ */
+ GNUNET_TRANSPORT_SessionInfoCallback sic;
+
+ /**
+ * Closure for @e sic.
+ */
+ void *sic_cls;
+
+ /**
* How many more TCP sessions are we allowed to open right now?
*/
unsigned long long max_connections;
@@ -410,6 +427,40 @@ struct Plugin
};
+
+/**
+ * If a session monitor is attached, notify it about the new
+ * session state.
+ *
+ * @param plugin our plugin
+ * @param session session that changed state
+ * @param state new state of the session
+ */
+static void
+notify_session_monitor (struct Plugin *plugin,
+ struct Session *session,
+ enum GNUNET_TRANSPORT_SessionState state)
+{
+ struct GNUNET_TRANSPORT_SessionInfo info;
+
+ if (NULL == plugin->sic)
+ return;
+ memset (&info, 0, sizeof (info));
+ info.state = state;
+ info.is_inbound = GNUNET_SYSERR; /* hard to say */
+ info.num_msg_pending = session->msgs_in_queue;
+ info.num_bytes_pending = session->bytes_in_queue;
+ /* info.receive_delay remains zero as this is not supported by UDP
+ (cannot selectively not receive from 'some' peer while continuing
+ to receive from others) */
+ info.session_timeout = session->timeout;
+ info.address = session->address;
+ plugin->sic (plugin->sic_cls,
+ session,
+ &info);
+}
+
+
/**
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
@@ -780,6 +831,7 @@ tcp_disconnect_session (void *cls,
{
GNUNET_SCHEDULER_cancel (session->timeout_task);
session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ session->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
}
if (GNUNET_YES
@@ -825,16 +877,26 @@ tcp_disconnect_session (void *cls,
: "Could not deliver message to `%4s', notifying.\n",
GNUNET_i2s (&session->target));
GNUNET_STATISTICS_update (session->plugin->env->stats,
- gettext_noop ("# bytes currently in TCP buffers"),
- -(int64_t) pm->message_size, GNUNET_NO);
- GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop
- ("# bytes discarded by TCP (disconnect)"), pm->message_size, GNUNET_NO);
- GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
- session->pending_messages_tail, pm);
+ gettext_noop ("# bytes currently in TCP buffers"),
+ -(int64_t) pm->message_size, GNUNET_NO);
+ GNUNET_STATISTICS_update (session->plugin->env->stats,
+ gettext_noop ("# bytes discarded by TCP (disconnect)"),
+ pm->message_size,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ GNUNET_assert (0 < session->msgs_in_queue);
+ session->msgs_in_queue--;
+ GNUNET_assert (pm->message_size <= session->bytes_in_queue);
+ session->bytes_in_queue -= pm->message_size;
if (NULL != pm->transmit_cont)
- pm->transmit_cont (pm->transmit_cont_cls, &session->target, GNUNET_SYSERR,
- pm->message_size, 0);
- GNUNET_free(pm);
+ pm->transmit_cont (pm->transmit_cont_cls,
+ &session->target,
+ GNUNET_SYSERR,
+ pm->message_size,
+ 0);
+ GNUNET_free (pm);
}
if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK )
{
@@ -881,10 +943,25 @@ session_timeout (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Session *s = cls;
+ struct GNUNET_TIME_Relative left;
s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ left = GNUNET_TIME_absolute_get_remaining (s->timeout);
+ if (0 != left.rel_value_us)
+ {
+ /* not actually our turn yet, but let's at least update
+ the monitor, it may think we're about to die ... */
+ notify_session_monitor (s->plugin,
+ s,
+ GNUNET_TRANSPORT_SS_UP);
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
+ &session_timeout,
+ s);
+ return;
+ }
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
- "Session %p was idle for %s, disconnecting\n", s,
+ "Session %p was idle for %s, disconnecting\n",
+ s,
GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
GNUNET_YES));
/* call session destroy function */
@@ -901,14 +978,10 @@ static void
reschedule_session_timeout (struct Session *s)
{
GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
- GNUNET_SCHEDULER_cancel (s->timeout_task);
- s->timeout_task = GNUNET_SCHEDULER_add_delayed (
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, s);
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
- "Timeout rescheduled for session %p set to %s\n", s,
- GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES));
+ s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
}
+
/**
* Create a new session. Also queues a welcome message.
*
@@ -959,8 +1032,11 @@ create_session (struct Plugin *plugin,
GNUNET_STATISTICS_update (plugin->env->stats,
gettext_noop ("# bytes currently in TCP buffers"), pm->message_size,
GNUNET_NO);
- GNUNET_CONTAINER_DLL_insert(session->pending_messages_head,
- session->pending_messages_tail, pm);
+ GNUNET_CONTAINER_DLL_insert (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ session->msgs_in_queue++;
+ session->bytes_in_queue += pm->message_size;
if (GNUNET_YES != is_nat)
{
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -968,8 +1044,10 @@ create_session (struct Plugin *plugin,
}
plugin->env->register_quota_notification (plugin->env->cls,
&address->peer, PLUGIN_NAME, session);
- session->timeout_task = GNUNET_SCHEDULER_add_delayed (
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session);
+ session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ session);
return session;
}
@@ -1012,9 +1090,9 @@ do_transmit (void *cls, size_t size, void *buf)
plugin = session->plugin;
if (NULL == buf)
{
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Timeout trying to transmit to peer `%4s', discarding message queue.\n",
- GNUNET_i2s (&session->target));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout trying to transmit to peer `%4s', discarding message queue.\n",
+ GNUNET_i2s (&session->target));
/* timeout; cancel all messages that have already expired */
hd = NULL;
tl = NULL;
@@ -1023,13 +1101,19 @@ do_transmit (void *cls, size_t size, void *buf)
while ((NULL != (pos = session->pending_messages_head))
&& (pos->timeout.abs_value_us <= now.abs_value_us))
{
- GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
- session->pending_messages_tail, pos);
- LOG(GNUNET_ERROR_TYPE_DEBUG,
- "Failed to transmit %u byte message to `%4s'.\n", pos->message_size,
- GNUNET_i2s (&session->target));
+ GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+ session->pending_messages_tail,
+ pos);
+ GNUNET_assert (0 < session->msgs_in_queue);
+ session->msgs_in_queue--;
+ GNUNET_assert (pos->message_size <= session->bytes_in_queue);
+ session->bytes_in_queue -= pos->message_size;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to transmit %u byte message to `%4s'.\n",
+ pos->message_size,
+ GNUNET_i2s (&session->target));
ret += pos->message_size;
- GNUNET_CONTAINER_DLL_insert_after(hd, tl, tl, pos);
+ GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
}
/* do this call before callbacks (so that if callbacks destroy
* session, they have a chance to cancel actions done by this
@@ -1040,10 +1124,12 @@ do_transmit (void *cls, size_t size, void *buf)
* the callbacks may abort the session */
while (NULL != (pos = hd))
{
- GNUNET_CONTAINER_DLL_remove(hd, tl, pos);
- if (pos->transmit_cont != NULL )
- pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_SYSERR,
- pos->message_size, 0);
+ GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+ if (pos->transmit_cont != NULL)
+ pos->transmit_cont (pos->transmit_cont_cls,
+ &pid,
+ GNUNET_SYSERR,
+ pos->message_size, 0);
GNUNET_free(pos);
}
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1062,10 +1148,16 @@ do_transmit (void *cls, size_t size, void *buf)
{
if (ret + pos->message_size > size)
break;
- GNUNET_CONTAINER_DLL_remove(session->pending_messages_head,
- session->pending_messages_tail, pos);
+ GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
+ session->pending_messages_tail,
+ pos);
+ GNUNET_assert (0 < session->msgs_in_queue);
+ session->msgs_in_queue--;
+ GNUNET_assert (pos->message_size <= session->bytes_in_queue);
+ session->bytes_in_queue -= pos->message_size;
GNUNET_assert(size >= pos->message_size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Transmitting message of type %u size %u\n",
+ LOG(GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting message of type %u size %u\n",
ntohs (((struct GNUNET_MessageHeader * ) pos->msg)->type),
pos->message_size);
/* FIXME: this memcpy can be up to 7% of our total runtime */
@@ -1073,7 +1165,7 @@ do_transmit (void *cls, size_t size, void *buf)
cbuf += pos->message_size;
ret += pos->message_size;
size -= pos->message_size;
- GNUNET_CONTAINER_DLL_insert_tail(hd, tl, pos);
+ GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos);
}
/* schedule 'continuation' before callbacks so that callbacks that
* cancel everything don't cause us to use a session that no longer
@@ -1085,10 +1177,13 @@ do_transmit (void *cls, size_t size, void *buf)
* we should not use 'session' after this point */
while (NULL != (pos = hd))
{
- GNUNET_CONTAINER_DLL_remove(hd, tl, pos);
- if (pos->transmit_cont != NULL )
- pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_OK,
- pos->message_size, pos->message_size); /* FIXME: include TCP overhead */
+ GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+ if (pos->transmit_cont != NULL)
+ pos->transmit_cont (pos->transmit_cont_cls,
+ &pid,
+ GNUNET_OK,
+ pos->message_size,
+ pos->message_size); /* FIXME: include TCP overhead */
GNUNET_free(pos);
}
GNUNET_assert(hd == NULL);
@@ -1253,11 +1348,12 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf,
"Asked to transmit %u bytes to `%s', added message to list.\n",
msgbuf_size, GNUNET_i2s (&session->target));
- if (GNUNET_YES
- == GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap,
- &session->target, session))
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap,
+ &session->target,
+ session))
{
- GNUNET_assert(NULL != session->client);
+ GNUNET_assert (NULL != session->client);
GNUNET_SERVER_client_set_timeout (session->client,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1265,9 +1361,11 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf,
GNUNET_NO);
/* append pm to pending_messages list */
- GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head,
- session->pending_messages_tail, pm);
-
+ GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ session->msgs_in_queue++;
+ session->bytes_in_queue += pm->message_size;
process_pending_messages (session);
return msgbuf_size;
}
@@ -1283,17 +1381,25 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf,
GNUNET_NO);
/* append pm to pending_messages list */
- GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head,
- session->pending_messages_tail, pm);
+ GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+ session->pending_messages_tail,
+ pm);
+ session->msgs_in_queue++;
+ session->bytes_in_queue += pm->message_size;
return msgbuf_size;
}
else
{
- LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid session %p\n", session);
+ LOG(GNUNET_ERROR_TYPE_ERROR,
+ "Invalid session %p\n", session);
if (NULL != cont)
- cont (cont_cls, &session->target, GNUNET_SYSERR, pm->message_size, 0);
- GNUNET_break(0);
- GNUNET_free(pm);
+ cont (cont_cls,
+ &session->target,
+ GNUNET_SYSERR,
+ pm->message_size,
+ 0);
+ GNUNET_break (0);
+ GNUNET_free (pm);
return GNUNET_SYSERR; /* session does not exist here */
}
}
@@ -2346,9 +2452,10 @@ notify_send_probe (void *cls,
size_t ret;
tcp_probe_ctx->transmit_handle = NULL;
- GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail,
- tcp_probe_ctx);
- if (buf == NULL )
+ GNUNET_CONTAINER_DLL_remove (plugin->probe_head,
+ plugin->probe_tail,
+ tcp_probe_ctx);
+ if (buf == NULL)
{
GNUNET_CONNECTION_destroy (tcp_probe_ctx->sock);
GNUNET_free(tcp_probe_ctx);
@@ -2402,8 +2509,9 @@ try_connection_reversal (void *cls,
sizeof(struct GNUNET_PeerIdentity));
tcp_probe_ctx->plugin = plugin;
tcp_probe_ctx->sock = sock;
- GNUNET_CONTAINER_DLL_insert(plugin->probe_head, plugin->probe_tail,
- tcp_probe_ctx);
+ GNUNET_CONTAINER_DLL_insert (plugin->probe_head,
+ plugin->probe_tail,
+ tcp_probe_ctx);
tcp_probe_ctx->transmit_handle = GNUNET_CONNECTION_notify_transmit_ready (
sock, ntohs (tcp_probe_ctx->message.header.size),
GNUNET_TIME_UNIT_FOREVER_REL, &notify_send_probe, tcp_probe_ctx);
@@ -2433,6 +2541,62 @@ tcp_get_network (void *cls,
/**
+ * Return information about the given session to the
+ * monitor callback.
+ *
+ * @param cls the `struct Plugin` with the monitor callback (`sic`)
+ * @param peer peer we send information about
+ * @param value our `struct Session` to send information about
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+send_session_info_iter (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *value)
+{
+ struct Plugin *plugin = cls;
+ struct Session *session = value;
+
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Begin monitoring sessions of a plugin. There can only
+ * be one active monitor per plugin (i.e. if there are
+ * multiple monitors, the transport service needs to
+ * multiplex the generated events over all of them).
+ *
+ * @param cls closure of the plugin
+ * @param sic callback to invoke, NULL to disable monitor;
+ * plugin will being by iterating over all active
+ * sessions immediately and then enter monitor mode
+ * @param sic_cls closure for @a sic
+ */
+static void
+tcp_plugin_setup_monitor (void *cls,
+ GNUNET_TRANSPORT_SessionInfoCallback sic,
+ void *sic_cls)
+{
+ struct Plugin *plugin = cls;
+
+ plugin->sic = sic;
+ plugin->sic_cls = sic_cls;
+ if (NULL != sic)
+ {
+ GNUNET_CONTAINER_multipeermap_iterate (plugin->sessionmap,
+ &send_session_info_iter,
+ plugin);
+ /* signal end of first iteration */
+ sic (sic_cls, NULL, NULL);
+ }
+}
+
+
+/**
* Entry point for the plugin.
*
* @param cls closure, the 'struct GNUNET_TRANSPORT_PluginEnvironment*'
@@ -2569,6 +2733,7 @@ libgnunet_plugin_transport_tcp_init (void *cls)
api->get_network = &tcp_get_network;
api->update_session_timeout = &tcp_plugin_update_session_timeout;
api->update_inbound_delay = &tcp_plugin_update_inbound_delay;
+ api->setup_monitor = &tcp_plugin_setup_monitor;
plugin->service = service;
if (NULL != service)
{
@@ -2678,8 +2843,9 @@ libgnunet_plugin_transport_tcp_done (void *cls)
GNUNET_NAT_unregister (plugin->nat);
while (NULL != (tcp_probe = plugin->probe_head))
{
- GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail,
- tcp_probe);
+ GNUNET_CONTAINER_DLL_remove (plugin->probe_head,
+ plugin->probe_tail,
+ tcp_probe);
GNUNET_CONNECTION_destroy (tcp_probe->sock);
GNUNET_free(tcp_probe);
}
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index 782a5ea8f..e9d6b54d4 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -1573,6 +1573,7 @@ create_session (struct Plugin *plugin,
s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
+ s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
&session_timeout, s);
return s;
diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c
index 0c3d29733..9ca58ef51 100644
--- a/src/transport/plugin_transport_unix.c
+++ b/src/transport/plugin_transport_unix.c
@@ -886,6 +886,7 @@ unix_plugin_get_session (void *cls,
session->target = address->peer;
session->address = GNUNET_HELLO_address_copy (address);
session->plugin = plugin;
+ session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
&session_timeout,
session);