From 705ce0c1e6b6e8fd877f94c6bad977718db4b647 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 22 Jun 2014 22:25:02 +0000 Subject: implement monitoring API in UNIX, simplify code a bit --- src/transport/plugin_transport_unix.c | 940 ++++++++++++++++++---------------- 1 file changed, 485 insertions(+), 455 deletions(-) (limited to 'src/transport/plugin_transport_unix.c') diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 881a4b9ff..6b90cf1f4 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c @@ -43,11 +43,24 @@ */ #define RETRY 0 +/** + * Name of the plugin. + */ #define PLUGIN_NAME "unix" +/** + * Options for UNIX Domain addresses. + */ enum UNIX_ADDRESS_OPTIONS { + /** + * No special options. + */ UNIX_OPTIONS_NONE = 0, + + /** + * Linux abstract domain sockets should be used. + */ UNIX_OPTIONS_USE_ABSTRACT_SOCKETS = 1 }; @@ -100,38 +113,9 @@ struct UNIXMessage GNUNET_NETWORK_STRUCT_END -/** - * Handle for a session. - */ -struct Session -{ - /** - * To whom are we talking to (set to our identity - * if we are still waiting for the welcome message). - * - * FIXME: information duplicated with 'peer' in address! - */ - struct GNUNET_PeerIdentity target; - - /** - * Pointer to the global plugin struct. - */ - struct Plugin *plugin; - - /** - * Address of the other peer. - */ - struct GNUNET_HELLO_Address *address; - - /** - * Session timeout task - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; -}; - /** - * + * Information we track for a message awaiting transmission. */ struct UNIXMessageWrapper { @@ -187,6 +171,63 @@ struct UNIXMessageWrapper }; +/** + * Handle for a session. + */ +struct Session +{ + + /** + * Sessions with pending messages (!) are kept in a DLL. + */ + struct Session *next; + + /** + * Sessions with pending messages (!) are kept in a DLL. + */ + struct Session *prev; + + /** + * To whom are we talking to (set to our identity + * if we are still waiting for the welcome message). + * + * FIXME: information duplicated with 'peer' in address! + */ + struct GNUNET_PeerIdentity target; + + /** + * Pointer to the global plugin struct. + */ + struct Plugin *plugin; + + /** + * Address of the other peer. + */ + struct GNUNET_HELLO_Address *address; + + /** + * Number of bytes we currently have in our write queue. + */ + unsigned long long bytes_in_queue; + + /** + * Timeout for this session. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Session timeout task. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Number of messages we currently have in our write queue. + */ + unsigned int msgs_in_queue; + +}; + + /** * Encapsulation of all of the state of the plugin. */ @@ -224,12 +265,17 @@ struct Plugin GNUNET_SCHEDULER_TaskIdentifier address_update_task; /** - * ID of select task + * ID of read task */ - GNUNET_SCHEDULER_TaskIdentifier select_task; + GNUNET_SCHEDULER_TaskIdentifier read_task; /** - * Number of bytes we currently have in our write queue. + * ID of write task + */ + GNUNET_SCHEDULER_TaskIdentifier write_task; + + /** + * Number of bytes we currently have in our write queues. */ unsigned long long bytes_in_queue; @@ -244,30 +290,20 @@ struct Plugin struct GNUNET_CONTAINER_MultiPeerMap *session_map; /** - * FD Read set + * Head of queue of messages to transmit. */ - struct GNUNET_NETWORK_FDSet *rs; + struct UNIXMessageWrapper *msg_head; /** - * FD Write set + * Tail of queue of messages to transmit. */ - struct GNUNET_NETWORK_FDSet *ws; + struct UNIXMessageWrapper *msg_tail; /** * Path of our unix domain socket (/tmp/unix-plugin-PORT) */ char *unix_socket_path; - /** - * Head of queue of messages to transmit. - */ - struct UNIXMessageWrapper *msg_head; - - /** - * Tail of queue of messages to transmit. - */ - struct UNIXMessageWrapper *msg_tail; - /** * Function to call about session status changes. */ @@ -293,74 +329,14 @@ struct Plugin */ struct GNUNET_ATS_Information ats_network; - /** - * Is the write set in the current 'select' task? #GNUNET_NO if the - * write queue was empty when the main task was scheduled, - * #GNUNET_YES if we're already waiting for being allowed to write. - */ - int with_ws; - /** * Are we using an abstract UNIX domain socket? */ - int abstract; + int is_abstract; }; -/** - * Increment session timeout due to activity - * - * @param s session for which the timeout should be moved - */ -static void -reschedule_session_timeout (struct Session *s); - - -/** - * We have been notified that our writeset has something to read. We don't - * know which socket needs to be read, so we have to check each one - * Then reschedule this function to be called again once more is available. - * - * @param cls the plugin handle - * @param tc the scheduling context (for rescheduling this function again) - */ -static void -unix_plugin_select (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - - -/** - * Convert unix path to a `struct sockaddr_un` - * - * @param unixpath path to convert - * @param sock_len[out] set to the length of the address - * @return converted unix path - */ -static struct sockaddr_un * -unix_address_to_sockaddr (const char *unixpath, - socklen_t *sock_len) -{ - struct sockaddr_un *un; - size_t slen; - - GNUNET_assert (0 < strlen (unixpath)); /* sanity check */ - un = GNUNET_new (struct sockaddr_un); - un->sun_family = AF_UNIX; - slen = strlen (unixpath); - if (slen >= sizeof (un->sun_path)) - slen = sizeof (un->sun_path) - 1; - memcpy (un->sun_path, unixpath, slen); - un->sun_path[slen] = '\0'; - slen = sizeof (struct sockaddr_un); -#if HAVE_SOCKADDR_IN_SIN_LEN - un->sun_len = (u_char) slen; -#endif - (*sock_len) = slen; - return un; -} - - /** * Function called for a quick conversion of the binary address to * a numeric address. Note that the caller must not free the @@ -424,39 +400,148 @@ unix_address_to_string (void *cls, /** - * Re-schedule the main 'select' callback (#unix_plugin_select()) - * for this plugin. + * Functions with this signature are called whenever we need + * to close a session due to a disconnect or failure to + * establish a connection. * - * @param plugin the plugin context + * @param cls closure with the `struct Plugin` + * @param session session to close down + * @return #GNUNET_OK on success */ -static void -reschedule_select (struct Plugin *plugin) +static int +unix_session_disconnect (void *cls, + struct Session *session) { - if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (plugin->select_task); - plugin->select_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != plugin->msg_head) + struct Plugin *plugin = cls; + struct UNIXMessageWrapper *msgw; + struct UNIXMessageWrapper *next; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting session for peer `%s' `%s'\n", + GNUNET_i2s (&session->target), + unix_address_to_string (NULL, + session->address->address, + session->address->address_length)); + plugin->env->session_end (plugin->env->cls, + session->address, + session); + next = plugin->msg_head; + while (NULL != next) { - plugin->select_task = - GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, - GNUNET_TIME_UNIT_FOREVER_REL, - plugin->rs, - plugin->ws, - &unix_plugin_select, plugin); - plugin->with_ws = GNUNET_YES; + msgw = next; + next = msgw->next; + if (msgw->session != session) + continue; + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, + plugin->msg_tail, + msgw); + session->msgs_in_queue--; + GNUNET_assert (session->bytes_in_queue >= msgw->msgsize); + session->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); + plugin->bytes_in_queue -= msgw->msgsize; + if (NULL != msgw->cont) + msgw->cont (msgw->cont_cls, + &msgw->session->target, + GNUNET_SYSERR, + msgw->payload, 0); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); } - else + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (plugin->session_map, + &session->target, + session)); + GNUNET_STATISTICS_set (plugin->env->stats, + "# UNIX sessions active", + GNUNET_CONTAINER_multipeermap_size (plugin->session_map), + GNUNET_NO); + if (GNUNET_SCHEDULER_NO_TASK != session->timeout_task) { - plugin->select_task = - GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, - GNUNET_TIME_UNIT_FOREVER_REL, - plugin->rs, - NULL, - &unix_plugin_select, plugin); - plugin->with_ws = GNUNET_NO; + GNUNET_SCHEDULER_cancel (session->timeout_task); + session->timeout_task = GNUNET_SCHEDULER_NO_TASK; } + GNUNET_HELLO_address_free (session->address); + GNUNET_break (0 == session->bytes_in_queue); + GNUNET_break (0 == session->msgs_in_queue); + GNUNET_free (session); + return GNUNET_OK; +} + + +/** + * Session was idle for too long, so disconnect it + * + * @param cls the 'struct Session' to disconnect + * @param tc scheduler context + */ +static void +session_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Session *s = cls; + struct GNUNET_TIME_Relative left; + + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + left = GNUNET_TIME_absolute_get_remaining (s->timeout); + if (0 != left.rel_value_us) + { + /* not actually our turn yet */ + s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, + &session_timeout, + s); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Session %p was idle for %s, disconnecting\n", + s, + GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + GNUNET_YES)); + unix_session_disconnect (s->plugin, s); +} + + +/** + * Increment session timeout due to activity + * + * @param s session for which the timeout should be rescheduled + */ +static void +reschedule_session_timeout (struct Session *s) +{ + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); +} + + +/** + * Convert unix path to a `struct sockaddr_un` + * + * @param unixpath path to convert + * @param sock_len[out] set to the length of the address + * @return converted unix path + */ +static struct sockaddr_un * +unix_address_to_sockaddr (const char *unixpath, + socklen_t *sock_len) +{ + struct sockaddr_un *un; + size_t slen; + + GNUNET_assert (0 < strlen (unixpath)); /* sanity check */ + un = GNUNET_new (struct sockaddr_un); + un->sun_family = AF_UNIX; + slen = strlen (unixpath); + if (slen >= sizeof (un->sun_path)) + slen = sizeof (un->sun_path) - 1; + memcpy (un->sun_path, unixpath, slen); + un->sun_path[slen] = '\0'; + slen = sizeof (struct sockaddr_un); +#if HAVE_SOCKADDR_IN_SIN_LEN + un->sun_len = (u_char) slen; +#endif + (*sock_len) = slen; + return un; } @@ -524,70 +609,6 @@ lookup_session (struct Plugin *plugin, } -/** - * Functions with this signature are called whenever we need - * to close a session due to a disconnect or failure to - * establish a connection. - * - * @param cls closure with the `struct Plugin` - * @param s session to close down - * @return #GNUNET_OK on success - */ -static int -unix_session_disconnect (void *cls, - struct Session *s) -{ - struct Plugin *plugin = cls; - struct UNIXMessageWrapper *msgw; - struct UNIXMessageWrapper *next; - int removed; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting session for peer `%s' `%s'\n", - GNUNET_i2s (&s->target), - unix_address_to_string (NULL, - s->address->address, - s->address->address_length)); - plugin->env->session_end (plugin->env->cls, s->address, s); - removed = GNUNET_NO; - next = plugin->msg_head; - while (NULL != next) - { - msgw = next; - next = msgw->next; - if (msgw->session != s) - continue; - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, - plugin->msg_tail, - msgw); - if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, - msgw->payload, 0); - GNUNET_free (msgw->msg); - GNUNET_free (msgw); - removed = GNUNET_YES; - } - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (plugin->session_map, - &s->target, - s)); - GNUNET_STATISTICS_set (plugin->env->stats, - "# UNIX sessions active", - GNUNET_CONTAINER_multipeermap_size (plugin->session_map), - GNUNET_NO); - if ((GNUNET_YES == removed) && (NULL == plugin->msg_head)) - reschedule_select (plugin); - if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) - { - GNUNET_SCHEDULER_cancel (s->timeout_task); - s->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_HELLO_address_free (s->address); - GNUNET_free (s); - return GNUNET_OK; -} - - /** * Function that is called to get the keepalive factor. * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to @@ -611,7 +632,7 @@ unix_query_keepalive_factor (void *cls) * @param send_handle which handle to send message on * @param target who should receive this message (ignored by UNIX) * @param msgbuf one or more GNUNET_MessageHeader(s) strung together - * @param msgbuf_size the size of the msgbuf to send + * @param msgbuf_size the size of the @a msgbuf to send * @param priority how important is the message (ignored by UNIX) * @param timeout when should we time out (give up) if we can not transmit? * @param addr the addr to send the message to, needs to be a sockaddr for us @@ -644,7 +665,6 @@ unix_real_send (void *cls, socklen_t un_len; const char *unixpath; - GNUNET_assert (NULL != plugin); if (NULL == send_handle) { GNUNET_break (0); /* We do not have a send handle */ @@ -665,15 +685,18 @@ unix_real_send (void *cls, return -1; } - if ((GNUNET_YES == plugin->abstract) && + if ((GNUNET_YES == plugin->is_abstract) && (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) ) { un->sun_path[0] = '\0'; } resend: /* Send the data */ - sent = GNUNET_NETWORK_socket_sendto (send_handle, msgbuf, msgbuf_size, - (const struct sockaddr *) un, un_len); + sent = GNUNET_NETWORK_socket_sendto (send_handle, + msgbuf, + msgbuf_size, + (const struct sockaddr *) un, + un_len); if (GNUNET_SYSERR == sent) { if ( (EAGAIN == errno) || @@ -693,12 +716,15 @@ resend: if (size < msgbuf_size) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Trying to increase socket buffer size from %i to %i for message size %i\n", - size, ((msgbuf_size / 1000) + 2) * 1000, msgbuf_size); + "Trying to increase socket buffer size from %u to %u for message size %u\n", + (unsigned int) size, + (unsigned int) ((msgbuf_size / 1000) + 2) * 1000, + (unsigned int) msgbuf_size); size = ((msgbuf_size / 1000) + 2) * 1000; - if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt - ((struct GNUNET_NETWORK_Handle *) send_handle, SOL_SOCKET, SO_SNDBUF, - &size, sizeof (size))) + if (GNUNET_OK == + GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle, + SOL_SOCKET, SO_SNDBUF, + &size, sizeof (size))) goto resend; /* Increased buffer size, retry sending */ else { @@ -720,35 +746,13 @@ resend: } LOG (GNUNET_ERROR_TYPE_DEBUG, - "UNIX transmit %u-byte message to %s (%d: %s)\n", - (unsigned int) msgbuf_size, - GNUNET_a2s ((const struct sockaddr *)un, un_len), - (int) sent, - (sent < 0) ? STRERROR (errno) : "ok"); - GNUNET_free (un); - return sent; -} - - -/** - * Session was idle for too long, so disconnect it - * - * @param cls the 'struct Session' to disconnect - * @param tc scheduler context - */ -static void -session_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct Session *s = cls; - - s->timeout_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Session %p was idle for %s, disconnecting\n", - s, - GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - GNUNET_YES)); - unix_session_disconnect (s->plugin, s); + "UNIX transmitted %u-byte message to %s (%d: %s)\n", + (unsigned int) msgbuf_size, + GNUNET_a2s ((const struct sockaddr *)un, un_len), + (int) sent, + (sent < 0) ? STRERROR (errno) : "ok"); + GNUNET_free (un); + return sent; } @@ -787,9 +791,6 @@ unix_plugin_get_session (void *cls, uint32_t addr_str_len; uint32_t addr_option; - GNUNET_assert (NULL != plugin); - GNUNET_assert (NULL != address); - ua = (struct UnixAddress *) address->address; if ((NULL == address->address) || (0 == address->address_length) || (sizeof (struct UnixAddress) > address->address_length)) @@ -802,7 +803,7 @@ unix_plugin_get_session (void *cls, addr_option = ntohl (ua->options); if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) && - (GNUNET_NO == plugin->abstract)) + (GNUNET_NO == plugin->is_abstract)) { return NULL; } @@ -882,101 +883,11 @@ unix_plugin_update_session_timeout (void *cls, GNUNET_CONTAINER_multipeermap_contains_value (plugin->session_map, &session->target, session)) - return; - reschedule_session_timeout (session); -} - - -/** - * Function that can be used by the transport service to transmit - * a message using the plugin. Note that in the case of a - * peer disconnecting, the continuation MUST be called - * prior to the disconnect notification itself. This function - * will be called with this peer's HELLO message to initiate - * a fresh connection to another peer. - * - * @param cls closure - * @param session which session must be used - * @param msgbuf the message to transmit - * @param msgbuf_size number of bytes in @a msgbuf - * @param priority how important is the message (most plugins will - * ignore message priority and just FIFO) - * @param to how long to wait at most for the transmission (does not - * require plugins to discard the message after the timeout, - * just advisory for the desired delay; most plugins will ignore - * this as well) - * @param cont continuation to call once the message has - * been transmitted (or if the transport is ready - * for the next transmission call; or if the - * peer disconnected...); can be NULL - * @param cont_cls closure for @a cont - * @return number of bytes used (on the physical network, with overheads); - * -1 on hard errors (i.e. address invalid); 0 is a legal value - * and does NOT mean that the message was not transmitted (DV) - */ -static ssize_t -unix_plugin_send (void *cls, - struct Session *session, - const char *msgbuf, - size_t msgbuf_size, - unsigned int priority, - struct GNUNET_TIME_Relative to, - GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) -{ - struct Plugin *plugin = cls; - struct UNIXMessageWrapper *wrapper; - struct UNIXMessage *message; - int ssize; - - if (GNUNET_OK != - GNUNET_CONTAINER_multipeermap_contains_value (plugin->session_map, - &session->target, - session)) { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Invalid session for peer `%s' `%s'\n", - GNUNET_i2s (&session->target), - unix_address_to_string(NULL, - session->address->address, - session->address->address_length)); GNUNET_break (0); - return GNUNET_SYSERR; + return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending %u bytes with session for peer `%s' `%s'\n", - msgbuf_size, - GNUNET_i2s (&session->target), - unix_address_to_string (NULL, - session->address->address, - session->address->address_length)); - ssize = sizeof (struct UNIXMessage) + msgbuf_size; - message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); - message->header.size = htons (ssize); - message->header.type = htons (0); - memcpy (&message->sender, plugin->env->my_identity, - sizeof (struct GNUNET_PeerIdentity)); - memcpy (&message[1], msgbuf, msgbuf_size); - wrapper = GNUNET_new (struct UNIXMessageWrapper); - wrapper->msg = message; - wrapper->msgsize = ssize; - wrapper->payload = msgbuf_size; - wrapper->priority = priority; - wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), - to); - wrapper->cont = cont; - wrapper->cont_cls = cont_cls; - wrapper->session = session; - GNUNET_CONTAINER_DLL_insert (plugin->msg_head, - plugin->msg_tail, - wrapper); - plugin->bytes_in_queue += ssize; - GNUNET_STATISTICS_set (plugin->env->stats, - "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, - GNUNET_NO); - if (GNUNET_NO == plugin->with_ws) - reschedule_select (plugin); - return ssize; + reschedule_session_timeout (session); } @@ -1008,21 +919,30 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, GNUNET_NO); /* Look for existing session */ - address = GNUNET_HELLO_address_allocate (sender, PLUGIN_NAME, ua, ua_len, - GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */ + address = GNUNET_HELLO_address_allocate (sender, + PLUGIN_NAME, + ua, ua_len, + GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */ s = lookup_session (plugin, address); - if (NULL == s) { s = unix_plugin_get_session (plugin, address); /* Notify transport and ATS about new inbound session */ - plugin->env->session_start (NULL, s->address, s, &plugin->ats_network, 1); + plugin->env->session_start (NULL, + s->address, + s, + &plugin->ats_network, 1); } GNUNET_HELLO_address_free (address); reschedule_session_timeout (s); - plugin->env->receive (plugin->env->cls, s->address, s, currhdr); - plugin->env->update_address_metrics (plugin->env->cls, s->address, s, + plugin->env->receive (plugin->env->cls, + s->address, + s, + currhdr); + plugin->env->update_address_metrics (plugin->env->cls, + s->address, + s, &plugin->ats_network, 1); } @@ -1033,7 +953,7 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, * @param plugin the plugin */ static void -unix_plugin_select_read (struct Plugin *plugin) +unix_plugin_do_read (struct Plugin *plugin) { char buf[65536] GNUNET_ALIGN; struct UnixAddress *ua; @@ -1052,17 +972,16 @@ unix_plugin_select_read (struct Plugin *plugin) addrlen = sizeof (un); memset (&un, 0, sizeof (un)); - - ret = - GNUNET_NETWORK_socket_recvfrom (plugin->unix_sock.desc, buf, sizeof (buf), - (struct sockaddr *) &un, &addrlen); - + ret = GNUNET_NETWORK_socket_recvfrom (plugin->unix_sock.desc, + buf, sizeof (buf), + (struct sockaddr *) &un, + &addrlen); if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS))) return; - - if (ret == GNUNET_SYSERR) + if (GNUNET_SYSERR == ret) { - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom"); + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "recvfrom"); return; } else @@ -1122,15 +1041,16 @@ unix_plugin_select_read (struct Plugin *plugin) /** * Write to UNIX domain socket (it is ready). * - * @param plugin the plugin + * @param session session to write data for */ static void -unix_plugin_select_write (struct Plugin *plugin) +unix_plugin_do_write (struct Plugin *plugin) { - int sent = 0; - struct UNIXMessageWrapper * msgw; + ssize_t sent = 0; + struct UNIXMessageWrapper *msgw; + struct Session *session; - while (NULL != (msgw = plugin->msg_tail)) + while (NULL != (msgw = plugin->msg_head)) { if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) break; /* Message is ready for sending */ @@ -1138,11 +1058,19 @@ unix_plugin_select_write (struct Plugin *plugin) LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout for message with %u bytes \n", (unsigned int) msgw->msgsize); - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, + plugin->msg_tail, + msgw); + session = msgw->session; + session->msgs_in_queue--; + GNUNET_assert (session->bytes_in_queue >= msgw->msgsize); + session->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); plugin->bytes_in_queue -= msgw->msgsize; GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, GNUNET_NO); + plugin->bytes_in_queue, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, "# UNIX bytes discarded", msgw->msgsize, @@ -1178,45 +1106,43 @@ unix_plugin_select_write (struct Plugin *plugin) 1, GNUNET_NO); return; } + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, + plugin->msg_tail, + msgw); + session = msgw->session; + session->msgs_in_queue--; + GNUNET_assert (session->bytes_in_queue >= msgw->msgsize); + session->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); + plugin->bytes_in_queue -= msgw->msgsize; + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, GNUNET_NO); if (GNUNET_SYSERR == sent) { /* failed and no retry */ if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, msgw->payload, 0); - - GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw); - - GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); - plugin->bytes_in_queue -= msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats, - "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, GNUNET_NO); + msgw->cont (msgw->cont_cls, + &msgw->session->target, + GNUNET_SYSERR, + msgw->payload, 0); GNUNET_STATISTICS_update (plugin->env->stats, "# UNIX bytes discarded", msgw->msgsize, GNUNET_NO); - GNUNET_free (msgw->msg); GNUNET_free (msgw); return; } /* successfully sent bytes */ GNUNET_break (sent > 0); - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, - plugin->msg_tail, - msgw); - GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); - plugin->bytes_in_queue -= msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats, - "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, - GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, "# bytes transmitted via UNIX", msgw->msgsize, GNUNET_NO); if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, &msgw->session->target, + msgw->cont (msgw->cont_cls, + &msgw->session->target, GNUNET_OK, msgw->payload, msgw->msgsize); @@ -1226,47 +1152,160 @@ unix_plugin_select_write (struct Plugin *plugin) /** - * We have been notified that our writeset has something to read. We don't - * know which socket needs to be read, so we have to check each one + * We have been notified that our socket has something to read. * Then reschedule this function to be called again once more is available. * * @param cls the plugin handle - * @param tc the scheduling context (for rescheduling this function again) + * @param tc the scheduling context */ static void -unix_plugin_select (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +unix_plugin_select_read (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Plugin *plugin = cls; - plugin->select_task = GNUNET_SCHEDULER_NO_TASK; - if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + plugin->read_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) + unix_plugin_do_read (plugin); + plugin->read_task = + GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + plugin->unix_sock.desc, + &unix_plugin_select_read, plugin); +} - if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0) - { - /* Ready to send data */ - GNUNET_assert (GNUNET_NETWORK_fdset_isset - (tc->write_ready, plugin->unix_sock.desc)); - if (NULL != plugin->msg_head) - unix_plugin_select_write (plugin); - } - if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) +/** + * We have been notified that our socket is ready to write. + * Then reschedule this function to be called again once more is available. + * + * @param cls the plugin handle + * @param tc the scheduling context + */ +static void +unix_plugin_select_write (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Plugin *plugin = cls; + + plugin->write_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) + unix_plugin_do_write (plugin); + if (NULL == plugin->msg_head) + return; /* write queue empty */ + plugin->write_task = + GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + plugin->unix_sock.desc, + &unix_plugin_select_write, plugin); +} + + +/** + * Function that can be used by the transport service to transmit + * a message using the plugin. Note that in the case of a + * peer disconnecting, the continuation MUST be called + * prior to the disconnect notification itself. This function + * will be called with this peer's HELLO message to initiate + * a fresh connection to another peer. + * + * @param cls closure + * @param session which session must be used + * @param msgbuf the message to transmit + * @param msgbuf_size number of bytes in @a msgbuf + * @param priority how important is the message (most plugins will + * ignore message priority and just FIFO) + * @param to how long to wait at most for the transmission (does not + * require plugins to discard the message after the timeout, + * just advisory for the desired delay; most plugins will ignore + * this as well) + * @param cont continuation to call once the message has + * been transmitted (or if the transport is ready + * for the next transmission call; or if the + * peer disconnected...); can be NULL + * @param cont_cls closure for @a cont + * @return number of bytes used (on the physical network, with overheads); + * -1 on hard errors (i.e. address invalid); 0 is a legal value + * and does NOT mean that the message was not transmitted (DV) + */ +static ssize_t +unix_plugin_send (void *cls, + struct Session *session, + const char *msgbuf, + size_t msgbuf_size, + unsigned int priority, + struct GNUNET_TIME_Relative to, + GNUNET_TRANSPORT_TransmitContinuation cont, + void *cont_cls) +{ + struct Plugin *plugin = cls; + struct UNIXMessageWrapper *wrapper; + struct UNIXMessage *message; + int ssize; + + if (GNUNET_OK != + GNUNET_CONTAINER_multipeermap_contains_value (plugin->session_map, + &session->target, + session)) { - /* Ready to receive data */ - GNUNET_assert (GNUNET_NETWORK_fdset_isset - (tc->read_ready, plugin->unix_sock.desc)); - unix_plugin_select_read (plugin); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Invalid session for peer `%s' `%s'\n", + GNUNET_i2s (&session->target), + unix_address_to_string(NULL, + session->address->address, + session->address->address_length)); + GNUNET_break (0); + return GNUNET_SYSERR; } - reschedule_select (plugin); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending %u bytes with session for peer `%s' `%s'\n", + msgbuf_size, + GNUNET_i2s (&session->target), + unix_address_to_string (NULL, + session->address->address, + session->address->address_length)); + ssize = sizeof (struct UNIXMessage) + msgbuf_size; + message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); + message->header.size = htons (ssize); + message->header.type = htons (0); + memcpy (&message->sender, plugin->env->my_identity, + sizeof (struct GNUNET_PeerIdentity)); + memcpy (&message[1], msgbuf, msgbuf_size); + wrapper = GNUNET_new (struct UNIXMessageWrapper); + wrapper->msg = message; + wrapper->msgsize = ssize; + wrapper->payload = msgbuf_size; + wrapper->priority = priority; + wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), + to); + wrapper->cont = cont; + wrapper->cont_cls = cont_cls; + wrapper->session = session; + GNUNET_CONTAINER_DLL_insert_tail (plugin->msg_head, + plugin->msg_tail, + wrapper); + plugin->bytes_in_queue += ssize; + session->bytes_in_queue += ssize; + session->msgs_in_queue++; + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, + GNUNET_NO); + if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task) + plugin->write_task = + GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + plugin->unix_sock.desc, + &unix_plugin_select_write, plugin); + return ssize; } /** * Create a slew of UNIX sockets. If possible, use IPv6 and IPv4. * - * @param cls closure for server start, should be a struct Plugin * + * @param cls closure for server start, should be a `struct Plugin *` * @return number of sockets created or #GNUNET_SYSERR on error */ static int @@ -1278,7 +1317,7 @@ unix_transport_server_start (void *cls) un = unix_address_to_sockaddr (plugin->unix_socket_path, &un_len); - if (GNUNET_YES == plugin->abstract) + if (GNUNET_YES == plugin->is_abstract) { plugin->unix_socket_path[0] = '@'; un->sun_path[0] = '\0'; @@ -1304,7 +1343,8 @@ unix_transport_server_start (void *cls) } } if (GNUNET_OK != - GNUNET_NETWORK_socket_bind (plugin->unix_sock.desc, (const struct sockaddr *) un, un_len)) + GNUNET_NETWORK_socket_bind (plugin->unix_sock.desc, + (const struct sockaddr *) un, un_len)) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind"); GNUNET_NETWORK_socket_close (plugin->unix_sock.desc); @@ -1312,15 +1352,13 @@ unix_transport_server_start (void *cls) GNUNET_free (un); return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", plugin->unix_socket_path); - plugin->rs = GNUNET_NETWORK_fdset_create (); - plugin->ws = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_zero (plugin->rs); - GNUNET_NETWORK_fdset_zero (plugin->ws); - GNUNET_NETWORK_fdset_set (plugin->rs, plugin->unix_sock.desc); - GNUNET_NETWORK_fdset_set (plugin->ws, plugin->unix_sock.desc); - - reschedule_select (plugin); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Bound to `%s'\n", + plugin->unix_socket_path); + plugin->read_task = + GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + plugin->unix_sock.desc, + &unix_plugin_select_read, plugin); GNUNET_free (un); return 1; } @@ -1430,7 +1468,8 @@ unix_plugin_address_pretty_printer (void *cls, const char *type, */ static int unix_string_to_address (void *cls, - const char *addr, uint16_t addrlen, + const char *addr, + uint16_t addrlen, void **buf, size_t *added) { struct UnixAddress *ua; @@ -1537,28 +1576,6 @@ address_notification (void *cls, } -/** - * Increment session timeout due to activity - * - * @param s session for which the timeout should be rescheduled - */ -static void -reschedule_session_timeout (struct Session *s) -{ - GNUNET_assert (NULL != s); - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); - GNUNET_SCHEDULER_cancel (s->timeout_task); - s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &session_timeout, - s); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout rescheduled for session %p set to %s\n", - s, - GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - GNUNET_YES)); -} - - /** * Function called on sessions to disconnect * @@ -1619,12 +1636,14 @@ send_session_info_iter (void *cls, struct GNUNET_TRANSPORT_SessionInfo info; memset (&info, 0, sizeof (info)); - info.state = GNUNET_TRANSPORT_SS_UP; /* ??? */ - // FIXME: info->is_inbound = ? - // FIXME: info->num_msg_pending = ? - // FIXME: info->num_bytes_pending = ? - // FIXME: info->receive_delay = ? - // FIXME: info->session_timeout = ? + info.state = GNUNET_TRANSPORT_SS_UP; /* all are up if we have them */ + info.is_inbound = GNUNET_SYSERR; /* hard to say */ + info.num_msg_pending = session->msgs_in_queue; + info.num_bytes_pending = session->bytes_in_queue; + /* info.receive_delay remains zero as this is not supported by UNIX + (cannot selectively not receive from 'some' peer while continuing + to receive from others) */ + info.session_timeout = session->timeout; info.address = session->address; plugin->sic (plugin->sic_cls, session, @@ -1690,11 +1709,14 @@ libgnunet_plugin_transport_unix_init (void *cls) plugin = GNUNET_new (struct Plugin); if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_filename(env->cfg, "transport-unix", "UNIXPATH", - &plugin->unix_socket_path)) + GNUNET_CONFIGURATION_get_value_filename (env->cfg, + "transport-unix", + "UNIXPATH", + &plugin->unix_socket_path)) { - LOG (GNUNET_ERROR_TYPE_ERROR, - _("No UNIXPATH given in configuration!\n")); + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "transport-unix", + "UNIXPATH"); GNUNET_free (plugin); return NULL; } @@ -1703,14 +1725,13 @@ libgnunet_plugin_transport_unix_init (void *cls) /* Initialize my flags */ #ifdef LINUX - plugin->abstract = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg, - "testing", "USE_ABSTRACT_SOCKETS"); + plugin->is_abstract = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg, + "testing", + "USE_ABSTRACT_SOCKETS"); #endif plugin->myoptions = UNIX_OPTIONS_NONE; - if (GNUNET_YES == plugin->abstract) - { + if (GNUNET_YES == plugin->is_abstract) plugin->myoptions = UNIX_OPTIONS_USE_ABSTRACT_SOCKETS; - } api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions); api->cls = plugin; @@ -1758,6 +1779,7 @@ libgnunet_plugin_transport_unix_done (void *cls) struct UNIXMessageWrapper * msgw; struct UnixAddress *ua; size_t len; + struct Session *session; if (NULL == plugin) { @@ -1787,6 +1809,12 @@ libgnunet_plugin_transport_unix_done (void *cls) GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); + session = msgw->session; + session->msgs_in_queue--; + GNUNET_assert (session->bytes_in_queue >= msgw->msgsize); + session->bytes_in_queue -= msgw->msgsize; + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); + plugin->bytes_in_queue -= msgw->msgsize; if (NULL != msgw->cont) msgw->cont (msgw->cont_cls, &msgw->session->target, @@ -1796,10 +1824,15 @@ libgnunet_plugin_transport_unix_done (void *cls) GNUNET_free (msgw); } - if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task) + if (GNUNET_SCHEDULER_NO_TASK != plugin->read_task) + { + GNUNET_SCHEDULER_cancel (plugin->read_task); + plugin->read_task = GNUNET_SCHEDULER_NO_TASK; + } + if (GNUNET_SCHEDULER_NO_TASK != plugin->write_task) { - GNUNET_SCHEDULER_cancel (plugin->select_task); - plugin->select_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (plugin->write_task); + plugin->write_task = GNUNET_SCHEDULER_NO_TASK; } if (GNUNET_SCHEDULER_NO_TASK != plugin->address_update_task) { @@ -1811,15 +1844,12 @@ libgnunet_plugin_transport_unix_done (void *cls) GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->unix_sock.desc)); plugin->unix_sock.desc = NULL; - plugin->with_ws = GNUNET_NO; } GNUNET_CONTAINER_multipeermap_iterate (plugin->session_map, - &get_session_delete_it, plugin); + &get_session_delete_it, + plugin); GNUNET_CONTAINER_multipeermap_destroy (plugin->session_map); - if (NULL != plugin->rs) - GNUNET_NETWORK_fdset_destroy (plugin->rs); - if (NULL != plugin->ws) - GNUNET_NETWORK_fdset_destroy (plugin->ws); + GNUNET_break (0 == plugin->bytes_in_queue); GNUNET_free (plugin->unix_socket_path); GNUNET_free (plugin); GNUNET_free (api); -- cgit v1.2.3