From 79c78b7fe41811b2d71f3f4d5853acef6ac9ec2b Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 22 Jun 2014 22:44:32 +0000 Subject: -actually perform monitor calls where appropriate --- src/transport/plugin_transport_unix.c | 192 ++++++++++++++++++++++------------ 1 file changed, 126 insertions(+), 66 deletions(-) (limited to 'src/transport/plugin_transport_unix.c') diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 6b90cf1f4..7dff9587f 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c @@ -337,6 +337,39 @@ struct Plugin }; +/** + * If a session monitor is attached, notify it about the new + * session state. + * + * @param plugin our plugin + * @param session session that changed state + * @param state new state of the session + */ +static void +notify_session_monitor (struct Plugin *plugin, + struct Session *session, + enum GNUNET_TRANSPORT_SessionState state) +{ + struct GNUNET_TRANSPORT_SessionInfo info; + + if (NULL == plugin->sic) + return; + memset (&info, 0, sizeof (info)); + info.state = state; + info.is_inbound = GNUNET_SYSERR; /* hard to say */ + info.num_msg_pending = session->msgs_in_queue; + info.num_bytes_pending = session->bytes_in_queue; + /* info.receive_delay remains zero as this is not supported by UNIX + (cannot selectively not receive from 'some' peer while continuing + to receive from others) */ + info.session_timeout = session->timeout; + info.address = session->address; + plugin->sic (plugin->sic_cls, + session, + &info); +} + + /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -404,7 +437,7 @@ unix_address_to_string (void *cls, * to close a session due to a disconnect or failure to * establish a connection. * - * @param cls closure with the `struct Plugin` + * @param cls closure with the `struct Plugin *` * @param session session to close down * @return #GNUNET_OK on success */ @@ -460,7 +493,11 @@ unix_session_disconnect (void *cls, { GNUNET_SCHEDULER_cancel (session->timeout_task); session->timeout_task = GNUNET_SCHEDULER_NO_TASK; + session->timeout = GNUNET_TIME_UNIT_ZERO_ABS; } + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_DOWN); GNUNET_HELLO_address_free (session->address); GNUNET_break (0 == session->bytes_in_queue); GNUNET_break (0 == session->msgs_in_queue); @@ -472,50 +509,56 @@ unix_session_disconnect (void *cls, /** * Session was idle for too long, so disconnect it * - * @param cls the 'struct Session' to disconnect + * @param cls the `struct Session *` to disconnect * @param tc scheduler context */ static void session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct Session *s = cls; + struct Session *session = cls; struct GNUNET_TIME_Relative left; - s->timeout_task = GNUNET_SCHEDULER_NO_TASK; - left = GNUNET_TIME_absolute_get_remaining (s->timeout); + session->timeout_task = GNUNET_SCHEDULER_NO_TASK; + left = GNUNET_TIME_absolute_get_remaining (session->timeout); if (0 != left.rel_value_us) { - /* not actually our turn yet */ - s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, - &session_timeout, - s); + /* not actually our turn yet, but let's at least update + the monitor, it may think we're about to die ... */ + notify_session_monitor (session->plugin, + session, + GNUNET_TRANSPORT_SS_UP); + session->timeout_task = GNUNET_SCHEDULER_add_delayed (left, + &session_timeout, + session); return; } LOG (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %s, disconnecting\n", - s, + session, GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES)); - unix_session_disconnect (s->plugin, s); + unix_session_disconnect (session->plugin, session); } /** - * Increment session timeout due to activity + * Increment session timeout due to activity. We do not immediately + * notify the monitor here as that might generate excessive + * signalling. * - * @param s session for which the timeout should be rescheduled + * @param session session for which the timeout should be rescheduled */ static void -reschedule_session_timeout (struct Session *s) +reschedule_session_timeout (struct Session *session) { - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); - s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != session->timeout_task); + session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); } /** - * Convert unix path to a `struct sockaddr_un` + * Convert unix path to a `struct sockaddr_un *` * * @param unixpath path to convert * @param sock_len[out] set to the length of the address @@ -576,11 +619,12 @@ lookup_session_it (void *cls, void *value) { struct LookupCtx *lctx = cls; - struct Session *s = value; + struct Session *session = value; - if (0 == GNUNET_HELLO_address_cmp (lctx->address, s->address)) + if (0 == GNUNET_HELLO_address_cmp (lctx->address, + session->address)) { - lctx->res = s; + lctx->res = session; return GNUNET_NO; } return GNUNET_YES; @@ -785,7 +829,7 @@ unix_plugin_get_session (void *cls, const struct GNUNET_HELLO_Address *address) { struct Plugin *plugin = cls; - struct Session *s; + struct Session *session; struct UnixAddress *ua; char * addrstr; uint32_t addr_str_len; @@ -825,41 +869,43 @@ unix_plugin_get_session (void *cls, } /* Check if a session for this address already exists */ - if (NULL != (s = lookup_session (plugin, - address))) - { + if (NULL != (session = lookup_session (plugin, + address))) + { LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p for address `%s'\n", - s, + session, unix_address_to_string (NULL, address->address, address->address_length)); - return s; + return session; } /* create a new session */ - s = GNUNET_new (struct Session); - s->target = address->peer; - s->address = GNUNET_HELLO_address_copy (address); - s->plugin = plugin; - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); - s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &session_timeout, - s); + session = GNUNET_new (struct Session); + session->target = address->peer; + session->address = GNUNET_HELLO_address_copy (address); + session->plugin = plugin; + session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + session); LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a new session %p for address `%s'\n", - s, + session, unix_address_to_string (NULL, address->address, address->address_length)); (void) GNUNET_CONTAINER_multipeermap_put (plugin->session_map, - &address->peer, s, + &address->peer, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_STATISTICS_set (plugin->env->stats, "# UNIX sessions active", GNUNET_CONTAINER_multipeermap_size (plugin->session_map), GNUNET_NO); - return s; + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); + return session; } @@ -901,11 +947,12 @@ unix_plugin_update_session_timeout (void *cls, * @param ua_len length of the address @a ua */ static void -unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, +unix_demultiplexer (struct Plugin *plugin, + struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *currhdr, const struct UnixAddress *ua, size_t ua_len) { - struct Session *s = NULL; + struct Session *session; struct GNUNET_HELLO_Address *address; GNUNET_break (ntohl(plugin->ats_network.value) != GNUNET_ATS_NET_UNSPECIFIED); @@ -923,26 +970,31 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, PLUGIN_NAME, ua, ua_len, GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */ - s = lookup_session (plugin, address); - if (NULL == s) + session = lookup_session (plugin, address); + if (NULL == session) { - s = unix_plugin_get_session (plugin, address); + session = unix_plugin_get_session (plugin, address); /* Notify transport and ATS about new inbound session */ plugin->env->session_start (NULL, - s->address, - s, + session->address, + session, &plugin->ats_network, 1); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); + } + else + { + reschedule_session_timeout (session); } GNUNET_HELLO_address_free (address); - reschedule_session_timeout (s); - plugin->env->receive (plugin->env->cls, - s->address, - s, + session->address, + session, currhdr); plugin->env->update_address_metrics (plugin->env->cls, - s->address, - s, + session->address, + session, &plugin->ats_network, 1); } @@ -1018,7 +1070,9 @@ unix_plugin_do_read (struct Plugin *plugin) return; } msgbuf = (char *) &msg[1]; - memcpy (&sender, &msg->sender, sizeof (struct GNUNET_PeerIdentity)); + memcpy (&sender, + &msg->sender, + sizeof (struct GNUNET_PeerIdentity)); offset = 0; tsize = csize - sizeof (struct UNIXMessage); while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) @@ -1049,12 +1103,15 @@ unix_plugin_do_write (struct Plugin *plugin) ssize_t sent = 0; struct UNIXMessageWrapper *msgw; struct Session *session; + int did_delete; + did_delete = GNUNET_NO; while (NULL != (msgw = plugin->msg_head)) { if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) break; /* Message is ready for sending */ /* Message has a timeout */ + did_delete = GNUNET_YES; LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout for message with %u bytes \n", (unsigned int) msgw->msgsize); @@ -1085,7 +1142,13 @@ unix_plugin_do_write (struct Plugin *plugin) GNUNET_free (msgw); } if (NULL == msgw) + { + if (GNUNET_YES == did_delete) + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); return; /* Nothing to send at the moment */ + } sent = unix_real_send (plugin, plugin->unix_sock.desc, @@ -1098,12 +1161,14 @@ unix_plugin_do_write (struct Plugin *plugin) msgw->session->address->address_length, msgw->payload, msgw->cont, msgw->cont_cls); - if (RETRY == sent) { GNUNET_STATISTICS_update (plugin->env->stats, "# UNIX retry attempts", 1, GNUNET_NO); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); return; } GNUNET_CONTAINER_DLL_remove (plugin->msg_head, @@ -1118,6 +1183,9 @@ unix_plugin_do_write (struct Plugin *plugin) GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers", plugin->bytes_in_queue, GNUNET_NO); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); if (GNUNET_SYSERR == sent) { /* failed and no retry */ @@ -1293,6 +1361,9 @@ unix_plugin_send (void *cls, "# bytes currently in UNIX buffers", plugin->bytes_in_queue, GNUNET_NO); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task) plugin->write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, @@ -1633,21 +1704,10 @@ send_session_info_iter (void *cls, { struct Plugin *plugin = cls; struct Session *session = value; - struct GNUNET_TRANSPORT_SessionInfo info; - memset (&info, 0, sizeof (info)); - info.state = GNUNET_TRANSPORT_SS_UP; /* all are up if we have them */ - info.is_inbound = GNUNET_SYSERR; /* hard to say */ - info.num_msg_pending = session->msgs_in_queue; - info.num_bytes_pending = session->bytes_in_queue; - /* info.receive_delay remains zero as this is not supported by UNIX - (cannot selectively not receive from 'some' peer while continuing - to receive from others) */ - info.session_timeout = session->timeout; - info.address = session->address; - plugin->sic (plugin->sic_cls, - session, - &info); + notify_session_monitor (plugin, + session, + GNUNET_TRANSPORT_SS_UP); return GNUNET_OK; } -- cgit v1.2.3