summaryrefslogtreecommitdiff
path: root/src/transport/plugin_transport_unix.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-06-22 22:44:32 +0000
committerChristian Grothoff <christian@grothoff.org>2014-06-22 22:44:32 +0000
commit79c78b7fe41811b2d71f3f4d5853acef6ac9ec2b (patch)
tree1a7e346f1f9ad1d7baf8fca809a204002999c4e8 /src/transport/plugin_transport_unix.c
parent705ce0c1e6b6e8fd877f94c6bad977718db4b647 (diff)
-actually perform monitor calls where appropriate
Diffstat (limited to 'src/transport/plugin_transport_unix.c')
-rw-r--r--src/transport/plugin_transport_unix.c192
1 files changed, 126 insertions, 66 deletions
diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c
index 6b90cf1f4..7dff9587f 100644
--- a/src/transport/plugin_transport_unix.c
+++ b/src/transport/plugin_transport_unix.c
@@ -338,6 +338,39 @@ struct Plugin
/**
+ * If a session monitor is attached, notify it about the new
+ * session state.
+ *
+ * @param plugin our plugin
+ * @param session session that changed state
+ * @param state new state of the session
+ */
+static void
+notify_session_monitor (struct Plugin *plugin,
+ struct Session *session,
+ enum GNUNET_TRANSPORT_SessionState state)
+{
+ struct GNUNET_TRANSPORT_SessionInfo info;
+
+ if (NULL == plugin->sic)
+ return;
+ memset (&info, 0, sizeof (info));
+ info.state = state;
+ info.is_inbound = GNUNET_SYSERR; /* hard to say */
+ info.num_msg_pending = session->msgs_in_queue;
+ info.num_bytes_pending = session->bytes_in_queue;
+ /* info.receive_delay remains zero as this is not supported by UNIX
+ (cannot selectively not receive from 'some' peer while continuing
+ to receive from others) */
+ info.session_timeout = session->timeout;
+ info.address = session->address;
+ plugin->sic (plugin->sic_cls,
+ session,
+ &info);
+}
+
+
+/**
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
* address and that the next call to this function is allowed
@@ -404,7 +437,7 @@ unix_address_to_string (void *cls,
* to close a session due to a disconnect or failure to
* establish a connection.
*
- * @param cls closure with the `struct Plugin`
+ * @param cls closure with the `struct Plugin *`
* @param session session to close down
* @return #GNUNET_OK on success
*/
@@ -460,7 +493,11 @@ unix_session_disconnect (void *cls,
{
GNUNET_SCHEDULER_cancel (session->timeout_task);
session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ session->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
}
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_DOWN);
GNUNET_HELLO_address_free (session->address);
GNUNET_break (0 == session->bytes_in_queue);
GNUNET_break (0 == session->msgs_in_queue);
@@ -472,50 +509,56 @@ unix_session_disconnect (void *cls,
/**
* Session was idle for too long, so disconnect it
*
- * @param cls the 'struct Session' to disconnect
+ * @param cls the `struct Session *` to disconnect
* @param tc scheduler context
*/
static void
session_timeout (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct Session *s = cls;
+ struct Session *session = cls;
struct GNUNET_TIME_Relative left;
- s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- left = GNUNET_TIME_absolute_get_remaining (s->timeout);
+ session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ left = GNUNET_TIME_absolute_get_remaining (session->timeout);
if (0 != left.rel_value_us)
{
- /* not actually our turn yet */
- s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
- &session_timeout,
- s);
+ /* not actually our turn yet, but let's at least update
+ the monitor, it may think we're about to die ... */
+ notify_session_monitor (session->plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
+ session->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
+ &session_timeout,
+ session);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Session %p was idle for %s, disconnecting\n",
- s,
+ session,
GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
GNUNET_YES));
- unix_session_disconnect (s->plugin, s);
+ unix_session_disconnect (session->plugin, session);
}
/**
- * Increment session timeout due to activity
+ * Increment session timeout due to activity. We do not immediately
+ * notify the monitor here as that might generate excessive
+ * signalling.
*
- * @param s session for which the timeout should be rescheduled
+ * @param session session for which the timeout should be rescheduled
*/
static void
-reschedule_session_timeout (struct Session *s)
+reschedule_session_timeout (struct Session *session)
{
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
- s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != session->timeout_task);
+ session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
}
/**
- * Convert unix path to a `struct sockaddr_un`
+ * Convert unix path to a `struct sockaddr_un *`
*
* @param unixpath path to convert
* @param sock_len[out] set to the length of the address
@@ -576,11 +619,12 @@ lookup_session_it (void *cls,
void *value)
{
struct LookupCtx *lctx = cls;
- struct Session *s = value;
+ struct Session *session = value;
- if (0 == GNUNET_HELLO_address_cmp (lctx->address, s->address))
+ if (0 == GNUNET_HELLO_address_cmp (lctx->address,
+ session->address))
{
- lctx->res = s;
+ lctx->res = session;
return GNUNET_NO;
}
return GNUNET_YES;
@@ -785,7 +829,7 @@ unix_plugin_get_session (void *cls,
const struct GNUNET_HELLO_Address *address)
{
struct Plugin *plugin = cls;
- struct Session *s;
+ struct Session *session;
struct UnixAddress *ua;
char * addrstr;
uint32_t addr_str_len;
@@ -825,41 +869,43 @@ unix_plugin_get_session (void *cls,
}
/* Check if a session for this address already exists */
- if (NULL != (s = lookup_session (plugin,
- address)))
- {
+ if (NULL != (session = lookup_session (plugin,
+ address)))
+ {
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Found existing session %p for address `%s'\n",
- s,
+ session,
unix_address_to_string (NULL,
address->address,
address->address_length));
- return s;
+ return session;
}
/* create a new session */
- s = GNUNET_new (struct Session);
- s->target = address->peer;
- s->address = GNUNET_HELLO_address_copy (address);
- s->plugin = plugin;
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
- s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &session_timeout,
- s);
+ session = GNUNET_new (struct Session);
+ session->target = address->peer;
+ session->address = GNUNET_HELLO_address_copy (address);
+ session->plugin = plugin;
+ session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ session);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Creating a new session %p for address `%s'\n",
- s,
+ session,
unix_address_to_string (NULL,
address->address,
address->address_length));
(void) GNUNET_CONTAINER_multipeermap_put (plugin->session_map,
- &address->peer, s,
+ &address->peer, session,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_STATISTICS_set (plugin->env->stats,
"# UNIX sessions active",
GNUNET_CONTAINER_multipeermap_size (plugin->session_map),
GNUNET_NO);
- return s;
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
+ return session;
}
@@ -901,11 +947,12 @@ unix_plugin_update_session_timeout (void *cls,
* @param ua_len length of the address @a ua
*/
static void
-unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
+unix_demultiplexer (struct Plugin *plugin,
+ struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *currhdr,
const struct UnixAddress *ua, size_t ua_len)
{
- struct Session *s = NULL;
+ struct Session *session;
struct GNUNET_HELLO_Address *address;
GNUNET_break (ntohl(plugin->ats_network.value) != GNUNET_ATS_NET_UNSPECIFIED);
@@ -923,26 +970,31 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
PLUGIN_NAME,
ua, ua_len,
GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */
- s = lookup_session (plugin, address);
- if (NULL == s)
+ session = lookup_session (plugin, address);
+ if (NULL == session)
{
- s = unix_plugin_get_session (plugin, address);
+ session = unix_plugin_get_session (plugin, address);
/* Notify transport and ATS about new inbound session */
plugin->env->session_start (NULL,
- s->address,
- s,
+ session->address,
+ session,
&plugin->ats_network, 1);
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
+ }
+ else
+ {
+ reschedule_session_timeout (session);
}
GNUNET_HELLO_address_free (address);
- reschedule_session_timeout (s);
-
plugin->env->receive (plugin->env->cls,
- s->address,
- s,
+ session->address,
+ session,
currhdr);
plugin->env->update_address_metrics (plugin->env->cls,
- s->address,
- s,
+ session->address,
+ session,
&plugin->ats_network, 1);
}
@@ -1018,7 +1070,9 @@ unix_plugin_do_read (struct Plugin *plugin)
return;
}
msgbuf = (char *) &msg[1];
- memcpy (&sender, &msg->sender, sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&sender,
+ &msg->sender,
+ sizeof (struct GNUNET_PeerIdentity));
offset = 0;
tsize = csize - sizeof (struct UNIXMessage);
while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
@@ -1049,12 +1103,15 @@ unix_plugin_do_write (struct Plugin *plugin)
ssize_t sent = 0;
struct UNIXMessageWrapper *msgw;
struct Session *session;
+ int did_delete;
+ did_delete = GNUNET_NO;
while (NULL != (msgw = plugin->msg_head))
{
if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0)
break; /* Message is ready for sending */
/* Message has a timeout */
+ did_delete = GNUNET_YES;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Timeout for message with %u bytes \n",
(unsigned int) msgw->msgsize);
@@ -1085,7 +1142,13 @@ unix_plugin_do_write (struct Plugin *plugin)
GNUNET_free (msgw);
}
if (NULL == msgw)
+ {
+ if (GNUNET_YES == did_delete)
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
return; /* Nothing to send at the moment */
+ }
sent = unix_real_send (plugin,
plugin->unix_sock.desc,
@@ -1098,12 +1161,14 @@ unix_plugin_do_write (struct Plugin *plugin)
msgw->session->address->address_length,
msgw->payload,
msgw->cont, msgw->cont_cls);
-
if (RETRY == sent)
{
GNUNET_STATISTICS_update (plugin->env->stats,
"# UNIX retry attempts",
1, GNUNET_NO);
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
return;
}
GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
@@ -1118,6 +1183,9 @@ unix_plugin_do_write (struct Plugin *plugin)
GNUNET_STATISTICS_set (plugin->env->stats,
"# bytes currently in UNIX buffers",
plugin->bytes_in_queue, GNUNET_NO);
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
if (GNUNET_SYSERR == sent)
{
/* failed and no retry */
@@ -1293,6 +1361,9 @@ unix_plugin_send (void *cls,
"# bytes currently in UNIX buffers",
plugin->bytes_in_queue,
GNUNET_NO);
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task)
plugin->write_task =
GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1633,21 +1704,10 @@ send_session_info_iter (void *cls,
{
struct Plugin *plugin = cls;
struct Session *session = value;
- struct GNUNET_TRANSPORT_SessionInfo info;
- memset (&info, 0, sizeof (info));
- info.state = GNUNET_TRANSPORT_SS_UP; /* all are up if we have them */
- info.is_inbound = GNUNET_SYSERR; /* hard to say */
- info.num_msg_pending = session->msgs_in_queue;
- info.num_bytes_pending = session->bytes_in_queue;
- /* info.receive_delay remains zero as this is not supported by UNIX
- (cannot selectively not receive from 'some' peer while continuing
- to receive from others) */
- info.session_timeout = session->timeout;
- info.address = session->address;
- plugin->sic (plugin->sic_cls,
- session,
- &info);
+ notify_session_monitor (plugin,
+ session,
+ GNUNET_TRANSPORT_SS_UP);
return GNUNET_OK;
}