From 8ea46cc3c928bd5f74859dc74f305c94b687aad2 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 1 Mar 2015 14:32:34 +0000 Subject: -simplifying event loop for UDP, separting v4/v6 for better performance (in theory at least) --- src/transport/plugin_transport_udp.c | 503 +++++++++++----------- src/transport/plugin_transport_udp.h | 58 +-- src/transport/plugin_transport_udp_broadcasting.c | 12 + 3 files changed, 285 insertions(+), 288 deletions(-) (limited to 'src') diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 3702d99be..f6b1ac251 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -67,6 +67,29 @@ #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128 +/** + * UDP Message-Packet header (after defragmentation). + */ +struct UDPMessage +{ + /** + * Message header. + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero for now. + */ + uint32_t reserved; + + /** + * What is the identity of the sender + */ + struct GNUNET_PeerIdentity sender; + +}; + + /** * Closure for #append_port(). */ @@ -213,32 +236,6 @@ struct Session }; -/** - * Closure for #find_receive_context(). - */ -struct FindReceiveContext -{ - /** - * Where to store the result. - */ - struct DefragContext *rc; - - /** - * Session associated with this context. - */ - struct Session *session; - - /** - * Address to find. - */ - const union UdpAddress *udp_addr; - - /** - * Number of bytes in @e udp_addr. - */ - size_t udp_addr_len; - -}; /** * Data structure to track defragmentation contexts based @@ -262,6 +259,12 @@ struct DefragContext */ struct GNUNET_CONTAINER_HeapNode *hnode; + /** + * Source address this receive context is for (allocated at the + * end of the struct). + */ + const union UdpAddress *udp_addr; + /** * Who's message(s) are we defragmenting here? * Only initialized once we succeeded and @@ -269,12 +272,6 @@ struct DefragContext */ struct GNUNET_PeerIdentity sender; - /** - * Source address this receive context is for (allocated at the - * end of the struct). - */ - const union UdpAddress *udp_addr; - /** * Length of @e udp_addr. */ @@ -313,7 +310,7 @@ struct UDP_FragmentationContext struct Plugin *plugin; /** - * Handle for GNUNET_FRAGMENT context + * Handle for fragmentation. */ struct GNUNET_FRAGMENT_Context *frag; @@ -347,11 +344,6 @@ struct UDP_FragmentationContext */ size_t on_wire_size; - /** - * FIXME. - */ - unsigned int fragments_used; - }; @@ -366,22 +358,26 @@ enum UDP_MessageType UMT_UNDEFINED = 0, /** - * Fragment of a message. + * This queue entry represents a fragment of a message. */ UMT_MSG_FRAGMENTED = 1, /** - * + * This queue entry does not include a message, but merely + * represents that we finished sending a fragmented message + * (all fragments confirmed, or timeout). */ UMT_MSG_FRAGMENTED_COMPLETE = 2, /** - * Unfragmented message. + * This queue entry represents a unfragmented message + * (was small enough to not require fragmentation). */ UMT_MSG_UNFRAGMENTED = 3, /** - * Receipt confirmation. + * This queue entry represents the acknowledgement of us + * receiving a fragment. */ UMT_MSG_ACK = 4 @@ -399,24 +395,22 @@ struct UDP_MessageWrapper struct Session *session; /** - * DLL of messages - * previous element + * DLL of messages, previous element */ struct UDP_MessageWrapper *prev; /** - * DLL of messages - * previous element + * DLL of messages, next element */ struct UDP_MessageWrapper *next; /** - * Message with size msg_size including UDP specific overhead + * Message with @e msg_size bytes including UDP-specific overhead. */ char *msg_buf; /** - * Function to call upon completion of the transmission. + * Function to call upon completion of the transmission, can be NULL. */ GNUNET_TRANSPORT_TransmitContinuation cont; @@ -426,29 +420,29 @@ struct UDP_MessageWrapper void *cont_cls; /** - * Fragmentation context + * Fragmentation context. * frag_ctx == NULL if transport <= MTU * frag_ctx != NULL if transport > MTU */ struct UDP_FragmentationContext *frag_ctx; /** - * Message timeout + * Message timeout. */ struct GNUNET_TIME_Absolute timeout; /** - * Size of UDP message to send including UDP specific overhead + * Size of UDP message to send, including UDP-specific overhead. */ size_t msg_size; /** - * Payload size of original message + * Payload size of original message. */ size_t payload_size; /** - * Message type + * Message type (what does this entry in the queue represent). */ enum UDP_MessageType msg_type; @@ -466,7 +460,7 @@ struct UDP_ACK_Message struct GNUNET_MessageHeader header; /** - * Desired delay for flow control + * Desired delay for flow control, in us (in NBO). */ uint32_t delay; @@ -497,7 +491,9 @@ notify_session_monitor (struct Plugin *plugin, return; if (GNUNET_YES == session->in_destroy) return; /* already destroyed, just RC>0 left-over actions */ - memset (&info, 0, sizeof (info)); + memset (&info, + 0, + sizeof (info)); info.state = state; info.is_inbound = GNUNET_SYSERR; /* hard to say */ info.num_msg_pending = session->msgs_in_queue; @@ -522,8 +518,8 @@ notify_session_monitor (struct Plugin *plugin, * @param tc the scheduling context (for rescheduling this function again) */ static void -udp_plugin_select (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); +udp_plugin_select_v4 (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); /** @@ -540,55 +536,61 @@ udp_plugin_select_v6 (void *cls, /** - * (re)schedule select tasks for this plugin. + * (re)schedule IPv4-select tasks for this plugin. * * @param plugin plugin to reschedule */ static void -schedule_select (struct Plugin *plugin) +schedule_select_v4 (struct Plugin *plugin) { struct GNUNET_TIME_Relative min_delay; struct UDP_MessageWrapper *udpw; - if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4)) + if ( (GNUNET_YES == plugin->enable_ipv4) && + (NULL != plugin->sockv4) ) { /* Find a message ready to send: * Flow delay from other peer is expired or not set (0) */ min_delay = GNUNET_TIME_UNIT_FOREVER_REL; for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next) min_delay = GNUNET_TIME_relative_min (min_delay, - GNUNET_TIME_absolute_get_remaining ( - udpw->session->flow_delay_from_other_peer)); + GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer)); + if (NULL != plugin->select_task_v4) + GNUNET_SCHEDULER_cancel (plugin->select_task_v4); + plugin->select_task_v4 + = GNUNET_SCHEDULER_add_read_net (min_delay, + plugin->sockv4, + &udp_plugin_select_v4, + plugin); + } +} - if (plugin->select_task != NULL ) - GNUNET_SCHEDULER_cancel (plugin->select_task); - /* Schedule with: - * - write active set if message is ready - * - timeout minimum delay */ - plugin->select_task = GNUNET_SCHEDULER_add_select ( - GNUNET_SCHEDULER_PRIORITY_DEFAULT, - (0 == min_delay.rel_value_us) ? - GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4, - (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL, - &udp_plugin_select, plugin); - } - if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6)) +/** + * (re)schedule IPv6-select tasks for this plugin. + * + * @param plugin plugin to reschedule + */ +static void +schedule_select_v6 (struct Plugin *plugin) +{ + struct GNUNET_TIME_Relative min_delay; + struct UDP_MessageWrapper *udpw; + + if ( (GNUNET_YES == plugin->enable_ipv6) && + (NULL != plugin->sockv6) ) { min_delay = GNUNET_TIME_UNIT_FOREVER_REL; for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next) min_delay = GNUNET_TIME_relative_min (min_delay, - GNUNET_TIME_absolute_get_remaining ( - udpw->session->flow_delay_from_other_peer)); - + GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer)); if (NULL != plugin->select_task_v6) GNUNET_SCHEDULER_cancel (plugin->select_task_v6); - plugin->select_task_v6 = GNUNET_SCHEDULER_add_select ( - GNUNET_SCHEDULER_PRIORITY_DEFAULT, - (0 == min_delay.rel_value_us) ? - GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6, - (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL, - &udp_plugin_select_v6, plugin); + plugin->select_task_v6 + = GNUNET_SCHEDULER_add_read_net (min_delay, + plugin->sockv6, + &udp_plugin_select_v6, + plugin); } } @@ -1346,6 +1348,34 @@ fragmented_message_done (struct UDP_FragmentationContext *fc, } +/** + * Closure for #find_receive_context(). + */ +struct FindReceiveContext +{ + /** + * Where to store the result. + */ + struct DefragContext *rc; + + /** + * Session associated with this context. + */ + struct Session *session; + + /** + * Address to find. + */ + const union UdpAddress *udp_addr; + + /** + * Number of bytes in @e udp_addr. + */ + size_t udp_addr_len; + +}; + + /** * Scan the heap for a receive context with the given address. * @@ -1979,13 +2009,12 @@ enqueue_fragment (void *cls, { struct UDP_FragmentationContext *frag_ctx = cls; struct Plugin *plugin = frag_ctx->plugin; - struct UDP_MessageWrapper * udpw; + struct UDP_MessageWrapper *udpw; size_t msg_len = ntohs (msg->size); LOG (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes\n", msg_len); - frag_ctx->fragments_used++; udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len); udpw->session = frag_ctx->session; udpw->msg_buf = (char *) &udpw[1]; @@ -1997,8 +2026,12 @@ enqueue_fragment (void *cls, udpw->frag_ctx = frag_ctx; udpw->msg_type = UMT_MSG_FRAGMENTED; memcpy (udpw->msg_buf, msg, msg_len); - enqueue (plugin, udpw); - schedule_select (plugin); + enqueue (plugin, + udpw); + if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress)) + schedule_select_v4 (plugin); + else + schedule_select_v6 (plugin); } @@ -2151,7 +2184,10 @@ udp_plugin_send (void *cls, notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE); - schedule_select (plugin); + if (s->address->address_length == sizeof (struct IPv4UdpAddress)) + schedule_select_v4 (plugin); + else + schedule_select_v6 (plugin); return udpmlen; } @@ -2334,7 +2370,7 @@ static void fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) { - struct DefragContext *rc = cls; + struct DefragContext *dc = cls; const struct UDPMessage *um; if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE) @@ -2348,13 +2384,13 @@ fragment_msg_proc (void *cls, return; } um = (const struct UDPMessage *) msg; - rc->sender = um->sender; - rc->have_sender = GNUNET_YES; - process_udp_message (rc->plugin, + dc->sender = um->sender; + dc->have_sender = GNUNET_YES; + process_udp_message (dc->plugin, um, - rc->udp_addr, - rc->udp_addr_len, - rc->network_type); + dc->udp_addr, + dc->udp_addr_len, + dc->network_type); } @@ -2373,7 +2409,7 @@ ack_proc (void *cls, struct DefragContext *rc = cls; size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size); struct UDP_ACK_Message *udp_ack; - uint32_t delay = 0; + uint32_t delay; struct UDP_MessageWrapper *udpw; struct Session *s; struct GNUNET_HELLO_Address *address; @@ -2406,6 +2442,8 @@ ack_proc (void *cls, } if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX) delay = s->flow_delay_for_other_peer.rel_value_us; + else + delay = UINT32_MAX; LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %s\n", @@ -2431,7 +2469,10 @@ ack_proc (void *cls, notify_session_monitor (s->plugin, s, GNUNET_TRANSPORT_SS_UPDATE); - schedule_select (rc->plugin); + if (s->address->address_length == sizeof (struct IPv4UdpAddress)) + schedule_select_v4 (rc->plugin); + else + schedule_select_v6 (rc->plugin); } @@ -2940,7 +2981,8 @@ remove_timeout_messages_and_select (struct Plugin *plugin, static void analyze_send_error (struct Plugin *plugin, const struct sockaddr *sa, - socklen_t slen, int error) + socklen_t slen, + int error) { enum GNUNET_ATS_Network_Type type; @@ -2989,9 +3031,8 @@ analyze_send_error (struct Plugin *plugin, * * @param plugin the plugin * @param sock which socket (v4/v6) to send on - * @return number of bytes transmitted, #GNUNET_SYSERR on failure */ -static size_t +static void udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) { @@ -3004,103 +3045,102 @@ udp_select_send (struct Plugin *plugin, struct sockaddr_in6 a6; struct UDP_MessageWrapper *udpw; - /* Find message to send */ - udpw = remove_timeout_messages_and_select (plugin, - sock); - if (NULL == udpw) - return 0; /* No message to send */ - - if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length) + /* Find message(s) to send */ + while (NULL != (udpw = remove_timeout_messages_and_select (plugin, + sock))) { - u4 = udpw->session->address->address; - memset (&a4, 0, sizeof(a4)); - a4.sin_family = AF_INET; + if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length) + { + u4 = udpw->session->address->address; + memset (&a4, 0, sizeof(a4)); + a4.sin_family = AF_INET; #if HAVE_SOCKADDR_IN_SIN_LEN - a4.sin_len = sizeof (a4); + a4.sin_len = sizeof (a4); #endif - a4.sin_port = u4->u4_port; - memcpy (&a4.sin_addr, - &u4->ipv4_addr, - sizeof(struct in_addr)); - a = (struct sockaddr *) &a4; - slen = sizeof (a4); - } - else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length) - { - u6 = udpw->session->address->address; - memset (&a6, 0, sizeof(a6)); - a6.sin6_family = AF_INET6; + a4.sin_port = u4->u4_port; + memcpy (&a4.sin_addr, + &u4->ipv4_addr, + sizeof(struct in_addr)); + a = (struct sockaddr *) &a4; + slen = sizeof (a4); + } + else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length) + { + u6 = udpw->session->address->address; + memset (&a6, 0, sizeof(a6)); + a6.sin6_family = AF_INET6; #if HAVE_SOCKADDR_IN_SIN_LEN - a6.sin6_len = sizeof (a6); + a6.sin6_len = sizeof (a6); #endif - a6.sin6_port = u6->u6_port; - memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr)); - a = (struct sockaddr *) &a6; - slen = sizeof (a6); - } - else - { - call_continuation (udpw, - GNUNET_OK); + a6.sin6_port = u6->u6_port; + memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr)); + a = (struct sockaddr *) &a6; + slen = sizeof (a6); + } + else + { + call_continuation (udpw, + GNUNET_OK); + dequeue (plugin, + udpw); + notify_session_monitor (plugin, + udpw->session, + GNUNET_TRANSPORT_SS_UPDATE); + GNUNET_free (udpw); + continue; + } + sent = GNUNET_NETWORK_socket_sendto (sock, + udpw->msg_buf, + udpw->msg_size, + a, + slen); + if (GNUNET_SYSERR == sent) + { + /* Failure */ + analyze_send_error (plugin, + a, + slen, + errno); + call_continuation (udpw, + GNUNET_SYSERR); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, bytes, sent, failure", + sent, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, messages, sent, failure", + 1, + GNUNET_NO); + } + else + { + /* Success */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n", + (unsigned int) (udpw->msg_size), + GNUNET_i2s (&udpw->session->target), + GNUNET_a2s (a, slen), + (int ) sent, + (sent < 0) ? STRERROR (errno) : "ok"); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, bytes, sent, success", + sent, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UDP, total, messages, sent, success", + 1, + GNUNET_NO); + if (NULL != udpw->frag_ctx) + udpw->frag_ctx->on_wire_size += udpw->msg_size; + call_continuation (udpw, GNUNET_OK); + } dequeue (plugin, - udpw); + udpw); notify_session_monitor (plugin, udpw->session, GNUNET_TRANSPORT_SS_UPDATE); GNUNET_free (udpw); - return GNUNET_SYSERR; - } - sent = GNUNET_NETWORK_socket_sendto (sock, - udpw->msg_buf, - udpw->msg_size, - a, - slen); - if (GNUNET_SYSERR == sent) - { - /* Failure */ - analyze_send_error (plugin, - a, - slen, - errno); - call_continuation (udpw, - GNUNET_SYSERR); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes, sent, failure", - sent, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, messages, sent, failure", - 1, - GNUNET_NO); - } - else - { - /* Success */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n", - (unsigned int) (udpw->msg_size), - GNUNET_i2s (&udpw->session->target), - GNUNET_a2s (a, slen), - (int ) sent, - (sent < 0) ? STRERROR (errno) : "ok"); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes, sent, success", - sent, - GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, messages, sent, success", - 1, - GNUNET_NO); - if (NULL != udpw->frag_ctx) - udpw->frag_ctx->on_wire_size += udpw->msg_size; - call_continuation (udpw, GNUNET_OK); } - dequeue (plugin, udpw); - notify_session_monitor (plugin, - udpw->session, - GNUNET_TRANSPORT_SS_UPDATE); - GNUNET_free (udpw); - return sent; } @@ -3110,26 +3150,26 @@ udp_select_send (struct Plugin *plugin, * Then reschedule this function to be called again once more is available. * * @param cls the plugin handle - * @param tc the scheduling context (for rescheduling this function again) + * @param tc the scheduling context */ static void -udp_plugin_select (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +udp_plugin_select_v4 (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Plugin *plugin = cls; - plugin->select_task = NULL; + plugin->select_task_v4 = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) - && (NULL != plugin->sockv4) - && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4))) - udp_select_read (plugin, plugin->sockv4); - if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) - && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head) - && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4))) - udp_select_send (plugin, plugin->sockv4); - schedule_select (plugin); + if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) && + (NULL != plugin->sockv4) && + (GNUNET_NETWORK_fdset_isset (tc->read_ready, + plugin->sockv4))) + udp_select_read (plugin, + plugin->sockv4); + udp_select_send (plugin, + plugin->sockv4); + schedule_select_v4 (plugin); } @@ -3139,7 +3179,7 @@ udp_plugin_select (void *cls, * Then reschedule this function to be called again once more is available. * * @param cls the plugin handle - * @param tc the scheduling context (for rescheduling this function again) + * @param tc the scheduling context */ static void udp_plugin_select_v6 (void *cls, @@ -3150,14 +3190,15 @@ udp_plugin_select_v6 (void *cls, plugin->select_task_v6 = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) - && (NULL != plugin->sockv6) - && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6))) - udp_select_read (plugin, plugin->sockv6); - if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) - && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&& - (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6); - schedule_select (plugin); + if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) && + (NULL != plugin->sockv6) && + (GNUNET_NETWORK_fdset_isset (tc->read_ready, + plugin->sockv6)) ) + udp_select_read (plugin, + plugin->sockv6); + udp_select_send (plugin, + plugin->sockv6); + schedule_select_v6 (plugin); } @@ -3365,35 +3406,8 @@ setup_sockets (struct Plugin *plugin, _("Failed to open UDP sockets\n")); return 0; /* No sockets created, return */ } - - /* Create file descriptors */ - if (plugin->enable_ipv4 == GNUNET_YES) - { - plugin->rs_v4 = GNUNET_NETWORK_fdset_create (); - plugin->ws_v4 = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_zero (plugin->rs_v4); - GNUNET_NETWORK_fdset_zero (plugin->ws_v4); - if (NULL != plugin->sockv4) - { - GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4); - GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4); - } - } - - if (plugin->enable_ipv6 == GNUNET_YES) - { - plugin->rs_v6 = GNUNET_NETWORK_fdset_create (); - plugin->ws_v6 = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_zero (plugin->rs_v6); - GNUNET_NETWORK_fdset_zero (plugin->ws_v6); - if (NULL != plugin->sockv6) - { - GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6); - GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6); - } - } - - schedule_select (plugin); + schedule_select_v4 (plugin); + schedule_select_v6 (plugin); plugin->nat = GNUNET_NAT_register (plugin->env->cfg, GNUNET_NO, plugin->port, @@ -3719,12 +3733,12 @@ libgnunet_plugin_transport_udp_done (void *cls) return NULL; } stop_broadcast (plugin); - if (plugin->select_task != NULL) + if (NULL != plugin->select_task_v4) { - GNUNET_SCHEDULER_cancel (plugin->select_task); - plugin->select_task = NULL; + GNUNET_SCHEDULER_cancel (plugin->select_task_v4); + plugin->select_task_v4 = NULL; } - if (plugin->select_task_v6 != NULL) + if (NULL != plugin->select_task_v6) { GNUNET_SCHEDULER_cancel (plugin->select_task_v6); plugin->select_task_v6 = NULL; @@ -3739,8 +3753,6 @@ libgnunet_plugin_transport_udp_done (void *cls) GNUNET_NETWORK_socket_close (plugin->sockv4)); plugin->sockv4 = NULL; } - GNUNET_NETWORK_fdset_destroy (plugin->rs_v4); - GNUNET_NETWORK_fdset_destroy (plugin->ws_v4); } if (GNUNET_YES == plugin->enable_ipv6) { @@ -3749,9 +3761,6 @@ libgnunet_plugin_transport_udp_done (void *cls) GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (plugin->sockv6)); plugin->sockv6 = NULL; - - GNUNET_NETWORK_fdset_destroy (plugin->rs_v6); - GNUNET_NETWORK_fdset_destroy (plugin->ws_v6); } } if (NULL != plugin->nat) diff --git a/src/transport/plugin_transport_udp.h b/src/transport/plugin_transport_udp.h index e1ff5e9cf..e6cb5b450 100644 --- a/src/transport/plugin_transport_udp.h +++ b/src/transport/plugin_transport_udp.h @@ -121,29 +121,14 @@ union UdpAddress /** - * UDP Message-Packet header (after defragmentation). + * Information we track for each message in the queue. */ -struct UDPMessage -{ - /** - * Message header. - */ - struct GNUNET_MessageHeader header; - - /** - * Always zero for now. - */ - uint32_t reserved; - - /** - * What is the identity of the sender - */ - struct GNUNET_PeerIdentity sender; - -}; - struct UDP_MessageWrapper; + +/** + * Closure for #append_port(). + */ struct PrettyPrinterContext; @@ -172,7 +157,7 @@ struct Plugin /** * ID of select task for IPv4 */ - struct GNUNET_SCHEDULER_Task *select_task; + struct GNUNET_SCHEDULER_Task *select_task_v4; /** * ID of select task for IPv6 @@ -204,31 +189,11 @@ struct Plugin */ struct GNUNET_NAT_Handle *nat; - /** - * FD Read set - */ - struct GNUNET_NETWORK_FDSet *rs_v4; - - /** - * FD Write set - */ - struct GNUNET_NETWORK_FDSet *ws_v4; - /** * The read socket for IPv4 */ struct GNUNET_NETWORK_Handle *sockv4; - /** - * FD Read set - */ - struct GNUNET_NETWORK_FDSet *rs_v6; - - /** - * FD Write set - */ - struct GNUNET_NETWORK_FDSet *ws_v6; - /** * The read socket for IPv6 */ @@ -347,6 +312,17 @@ struct Plugin }; +/** + * Function called for a quick conversion of the binary address to + * a numeric address. Note that the caller must not free the + * address and that the next call to this function is allowed + * to override the address again. + * + * @param cls closure + * @param addr binary address (a `union UdpAddress`) + * @param addrlen length of the @a addr + * @return string representing the same address + */ const char * udp_address_to_string (void *cls, const void *addr, diff --git a/src/transport/plugin_transport_udp_broadcasting.c b/src/transport/plugin_transport_udp_broadcasting.c index e7b7cdc23..ea8797f29 100644 --- a/src/transport/plugin_transport_udp_broadcasting.c +++ b/src/transport/plugin_transport_udp_broadcasting.c @@ -527,6 +527,13 @@ iface_proc (void *cls, } +/** + * Setup broadcasting subsystem. + * + * @param plugin + * @param server_addrv6 + * @param server_addrv4 + */ void setup_broadcast (struct Plugin *plugin, struct sockaddr_in6 *server_addrv6, @@ -577,6 +584,11 @@ setup_broadcast (struct Plugin *plugin, } +/** + * Stop broadcasting subsystem. + * + * @param plugin + */ void stop_broadcast (struct Plugin *plugin) { -- cgit v1.2.3