summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-03-01 14:32:34 +0000
committerChristian Grothoff <christian@grothoff.org>2015-03-01 14:32:34 +0000
commit8ea46cc3c928bd5f74859dc74f305c94b687aad2 (patch)
tree8a18d4d5a51a7ad7572aa7b5e2386d14e1d1c6b1
parent3c8d61656ddd2f9fcfd86345618086223dde363b (diff)
-simplifying event loop for UDP, separting v4/v6 for better performance (in theory at least)
-rw-r--r--src/transport/plugin_transport_udp.c503
-rw-r--r--src/transport/plugin_transport_udp.h58
-rw-r--r--src/transport/plugin_transport_udp_broadcasting.c12
3 files changed, 285 insertions, 288 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index 3702d99be..f6b1ac251 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -68,6 +68,29 @@
/**
+ * UDP Message-Packet header (after defragmentation).
+ */
+struct UDPMessage
+{
+ /**
+ * Message header.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Always zero for now.
+ */
+ uint32_t reserved;
+
+ /**
+ * What is the identity of the sender
+ */
+ struct GNUNET_PeerIdentity sender;
+
+};
+
+
+/**
* Closure for #append_port().
*/
struct PrettyPrinterContext
@@ -213,32 +236,6 @@ struct Session
};
-/**
- * Closure for #find_receive_context().
- */
-struct FindReceiveContext
-{
- /**
- * Where to store the result.
- */
- struct DefragContext *rc;
-
- /**
- * Session associated with this context.
- */
- struct Session *session;
-
- /**
- * Address to find.
- */
- const union UdpAddress *udp_addr;
-
- /**
- * Number of bytes in @e udp_addr.
- */
- size_t udp_addr_len;
-
-};
/**
* Data structure to track defragmentation contexts based
@@ -263,6 +260,12 @@ struct DefragContext
struct GNUNET_CONTAINER_HeapNode *hnode;
/**
+ * Source address this receive context is for (allocated at the
+ * end of the struct).
+ */
+ const union UdpAddress *udp_addr;
+
+ /**
* Who's message(s) are we defragmenting here?
* Only initialized once we succeeded and
* @e have_sender is set.
@@ -270,12 +273,6 @@ struct DefragContext
struct GNUNET_PeerIdentity sender;
/**
- * Source address this receive context is for (allocated at the
- * end of the struct).
- */
- const union UdpAddress *udp_addr;
-
- /**
* Length of @e udp_addr.
*/
size_t udp_addr_len;
@@ -313,7 +310,7 @@ struct UDP_FragmentationContext
struct Plugin *plugin;
/**
- * Handle for GNUNET_FRAGMENT context
+ * Handle for fragmentation.
*/
struct GNUNET_FRAGMENT_Context *frag;
@@ -347,11 +344,6 @@ struct UDP_FragmentationContext
*/
size_t on_wire_size;
- /**
- * FIXME.
- */
- unsigned int fragments_used;
-
};
@@ -366,22 +358,26 @@ enum UDP_MessageType
UMT_UNDEFINED = 0,
/**
- * Fragment of a message.
+ * This queue entry represents a fragment of a message.
*/
UMT_MSG_FRAGMENTED = 1,
/**
- *
+ * This queue entry does not include a message, but merely
+ * represents that we finished sending a fragmented message
+ * (all fragments confirmed, or timeout).
*/
UMT_MSG_FRAGMENTED_COMPLETE = 2,
/**
- * Unfragmented message.
+ * This queue entry represents a unfragmented message
+ * (was small enough to not require fragmentation).
*/
UMT_MSG_UNFRAGMENTED = 3,
/**
- * Receipt confirmation.
+ * This queue entry represents the acknowledgement of us
+ * receiving a fragment.
*/
UMT_MSG_ACK = 4
@@ -399,24 +395,22 @@ struct UDP_MessageWrapper
struct Session *session;
/**
- * DLL of messages
- * previous element
+ * DLL of messages, previous element
*/
struct UDP_MessageWrapper *prev;
/**
- * DLL of messages
- * previous element
+ * DLL of messages, next element
*/
struct UDP_MessageWrapper *next;
/**
- * Message with size msg_size including UDP specific overhead
+ * Message with @e msg_size bytes including UDP-specific overhead.
*/
char *msg_buf;
/**
- * Function to call upon completion of the transmission.
+ * Function to call upon completion of the transmission, can be NULL.
*/
GNUNET_TRANSPORT_TransmitContinuation cont;
@@ -426,29 +420,29 @@ struct UDP_MessageWrapper
void *cont_cls;
/**
- * Fragmentation context
+ * Fragmentation context.
* frag_ctx == NULL if transport <= MTU
* frag_ctx != NULL if transport > MTU
*/
struct UDP_FragmentationContext *frag_ctx;
/**
- * Message timeout
+ * Message timeout.
*/
struct GNUNET_TIME_Absolute timeout;
/**
- * Size of UDP message to send including UDP specific overhead
+ * Size of UDP message to send, including UDP-specific overhead.
*/
size_t msg_size;
/**
- * Payload size of original message
+ * Payload size of original message.
*/
size_t payload_size;
/**
- * Message type
+ * Message type (what does this entry in the queue represent).
*/
enum UDP_MessageType msg_type;
@@ -466,7 +460,7 @@ struct UDP_ACK_Message
struct GNUNET_MessageHeader header;
/**
- * Desired delay for flow control
+ * Desired delay for flow control, in us (in NBO).
*/
uint32_t delay;
@@ -497,7 +491,9 @@ notify_session_monitor (struct Plugin *plugin,
return;
if (GNUNET_YES == session->in_destroy)
return; /* already destroyed, just RC>0 left-over actions */
- memset (&info, 0, sizeof (info));
+ memset (&info,
+ 0,
+ sizeof (info));
info.state = state;
info.is_inbound = GNUNET_SYSERR; /* hard to say */
info.num_msg_pending = session->msgs_in_queue;
@@ -522,8 +518,8 @@ notify_session_monitor (struct Plugin *plugin,
* @param tc the scheduling context (for rescheduling this function again)
*/
static void
-udp_plugin_select (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v4 (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
@@ -540,55 +536,61 @@ udp_plugin_select_v6 (void *cls,
/**
- * (re)schedule select tasks for this plugin.
+ * (re)schedule IPv4-select tasks for this plugin.
*
* @param plugin plugin to reschedule
*/
static void
-schedule_select (struct Plugin *plugin)
+schedule_select_v4 (struct Plugin *plugin)
{
struct GNUNET_TIME_Relative min_delay;
struct UDP_MessageWrapper *udpw;
- if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
+ if ( (GNUNET_YES == plugin->enable_ipv4) &&
+ (NULL != plugin->sockv4) )
{
/* Find a message ready to send:
* Flow delay from other peer is expired or not set (0) */
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (
- udpw->session->flow_delay_from_other_peer));
+ GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+ if (NULL != plugin->select_task_v4)
+ GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+ plugin->select_task_v4
+ = GNUNET_SCHEDULER_add_read_net (min_delay,
+ plugin->sockv4,
+ &udp_plugin_select_v4,
+ plugin);
+ }
+}
- if (plugin->select_task != NULL )
- GNUNET_SCHEDULER_cancel (plugin->select_task);
- /* Schedule with:
- * - write active set if message is ready
- * - timeout minimum delay */
- plugin->select_task = GNUNET_SCHEDULER_add_select (
- GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- (0 == min_delay.rel_value_us) ?
- GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
- (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
- &udp_plugin_select, plugin);
- }
- if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
+/**
+ * (re)schedule IPv6-select tasks for this plugin.
+ *
+ * @param plugin plugin to reschedule
+ */
+static void
+schedule_select_v6 (struct Plugin *plugin)
+{
+ struct GNUNET_TIME_Relative min_delay;
+ struct UDP_MessageWrapper *udpw;
+
+ if ( (GNUNET_YES == plugin->enable_ipv6) &&
+ (NULL != plugin->sockv6) )
{
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (
- udpw->session->flow_delay_from_other_peer));
-
+ GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
if (NULL != plugin->select_task_v6)
GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
- plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
- GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- (0 == min_delay.rel_value_us) ?
- GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
- (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
- &udp_plugin_select_v6, plugin);
+ plugin->select_task_v6
+ = GNUNET_SCHEDULER_add_read_net (min_delay,
+ plugin->sockv6,
+ &udp_plugin_select_v6,
+ plugin);
}
}
@@ -1347,6 +1349,34 @@ fragmented_message_done (struct UDP_FragmentationContext *fc,
/**
+ * Closure for #find_receive_context().
+ */
+struct FindReceiveContext
+{
+ /**
+ * Where to store the result.
+ */
+ struct DefragContext *rc;
+
+ /**
+ * Session associated with this context.
+ */
+ struct Session *session;
+
+ /**
+ * Address to find.
+ */
+ const union UdpAddress *udp_addr;
+
+ /**
+ * Number of bytes in @e udp_addr.
+ */
+ size_t udp_addr_len;
+
+};
+
+
+/**
* Scan the heap for a receive context with the given address.
*
* @param cls the `struct FindReceiveContext`
@@ -1979,13 +2009,12 @@ enqueue_fragment (void *cls,
{
struct UDP_FragmentationContext *frag_ctx = cls;
struct Plugin *plugin = frag_ctx->plugin;
- struct UDP_MessageWrapper * udpw;
+ struct UDP_MessageWrapper *udpw;
size_t msg_len = ntohs (msg->size);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Enqueuing fragment with %u bytes\n",
msg_len);
- frag_ctx->fragments_used++;
udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
udpw->session = frag_ctx->session;
udpw->msg_buf = (char *) &udpw[1];
@@ -1997,8 +2026,12 @@ enqueue_fragment (void *cls,
udpw->frag_ctx = frag_ctx;
udpw->msg_type = UMT_MSG_FRAGMENTED;
memcpy (udpw->msg_buf, msg, msg_len);
- enqueue (plugin, udpw);
- schedule_select (plugin);
+ enqueue (plugin,
+ udpw);
+ if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
+ schedule_select_v4 (plugin);
+ else
+ schedule_select_v6 (plugin);
}
@@ -2151,7 +2184,10 @@ udp_plugin_send (void *cls,
notify_session_monitor (s->plugin,
s,
GNUNET_TRANSPORT_SS_UPDATE);
- schedule_select (plugin);
+ if (s->address->address_length == sizeof (struct IPv4UdpAddress))
+ schedule_select_v4 (plugin);
+ else
+ schedule_select_v6 (plugin);
return udpmlen;
}
@@ -2334,7 +2370,7 @@ static void
fragment_msg_proc (void *cls,
const struct GNUNET_MessageHeader *msg)
{
- struct DefragContext *rc = cls;
+ struct DefragContext *dc = cls;
const struct UDPMessage *um;
if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
@@ -2348,13 +2384,13 @@ fragment_msg_proc (void *cls,
return;
}
um = (const struct UDPMessage *) msg;
- rc->sender = um->sender;
- rc->have_sender = GNUNET_YES;
- process_udp_message (rc->plugin,
+ dc->sender = um->sender;
+ dc->have_sender = GNUNET_YES;
+ process_udp_message (dc->plugin,
um,
- rc->udp_addr,
- rc->udp_addr_len,
- rc->network_type);
+ dc->udp_addr,
+ dc->udp_addr_len,
+ dc->network_type);
}
@@ -2373,7 +2409,7 @@ ack_proc (void *cls,
struct DefragContext *rc = cls;
size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
struct UDP_ACK_Message *udp_ack;
- uint32_t delay = 0;
+ uint32_t delay;
struct UDP_MessageWrapper *udpw;
struct Session *s;
struct GNUNET_HELLO_Address *address;
@@ -2406,6 +2442,8 @@ ack_proc (void *cls,
}
if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
delay = s->flow_delay_for_other_peer.rel_value_us;
+ else
+ delay = UINT32_MAX;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending ACK to `%s' including delay of %s\n",
@@ -2431,7 +2469,10 @@ ack_proc (void *cls,
notify_session_monitor (s->plugin,
s,
GNUNET_TRANSPORT_SS_UPDATE);
- schedule_select (rc->plugin);
+ if (s->address->address_length == sizeof (struct IPv4UdpAddress))
+ schedule_select_v4 (rc->plugin);
+ else
+ schedule_select_v6 (rc->plugin);
}
@@ -2940,7 +2981,8 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
static void
analyze_send_error (struct Plugin *plugin,
const struct sockaddr *sa,
- socklen_t slen, int error)
+ socklen_t slen,
+ int error)
{
enum GNUNET_ATS_Network_Type type;
@@ -2989,9 +3031,8 @@ analyze_send_error (struct Plugin *plugin,
*
* @param plugin the plugin
* @param sock which socket (v4/v6) to send on
- * @return number of bytes transmitted, #GNUNET_SYSERR on failure
*/
-static size_t
+static void
udp_select_send (struct Plugin *plugin,
struct GNUNET_NETWORK_Handle *sock)
{
@@ -3004,103 +3045,102 @@ udp_select_send (struct Plugin *plugin,
struct sockaddr_in6 a6;
struct UDP_MessageWrapper *udpw;
- /* Find message to send */
- udpw = remove_timeout_messages_and_select (plugin,
- sock);
- if (NULL == udpw)
- return 0; /* No message to send */
-
- if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
+ /* Find message(s) to send */
+ while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
+ sock)))
{
- u4 = udpw->session->address->address;
- memset (&a4, 0, sizeof(a4));
- a4.sin_family = AF_INET;
+ if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
+ {
+ u4 = udpw->session->address->address;
+ memset (&a4, 0, sizeof(a4));
+ a4.sin_family = AF_INET;
#if HAVE_SOCKADDR_IN_SIN_LEN
- a4.sin_len = sizeof (a4);
+ a4.sin_len = sizeof (a4);
#endif
- a4.sin_port = u4->u4_port;
- memcpy (&a4.sin_addr,
- &u4->ipv4_addr,
- sizeof(struct in_addr));
- a = (struct sockaddr *) &a4;
- slen = sizeof (a4);
- }
- else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
- {
- u6 = udpw->session->address->address;
- memset (&a6, 0, sizeof(a6));
- a6.sin6_family = AF_INET6;
+ a4.sin_port = u4->u4_port;
+ memcpy (&a4.sin_addr,
+ &u4->ipv4_addr,
+ sizeof(struct in_addr));
+ a = (struct sockaddr *) &a4;
+ slen = sizeof (a4);
+ }
+ else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
+ {
+ u6 = udpw->session->address->address;
+ memset (&a6, 0, sizeof(a6));
+ a6.sin6_family = AF_INET6;
#if HAVE_SOCKADDR_IN_SIN_LEN
- a6.sin6_len = sizeof (a6);
+ a6.sin6_len = sizeof (a6);
#endif
- a6.sin6_port = u6->u6_port;
- memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
- a = (struct sockaddr *) &a6;
- slen = sizeof (a6);
- }
- else
- {
- call_continuation (udpw,
- GNUNET_OK);
+ a6.sin6_port = u6->u6_port;
+ memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
+ a = (struct sockaddr *) &a6;
+ slen = sizeof (a6);
+ }
+ else
+ {
+ call_continuation (udpw,
+ GNUNET_OK);
+ dequeue (plugin,
+ udpw);
+ notify_session_monitor (plugin,
+ udpw->session,
+ GNUNET_TRANSPORT_SS_UPDATE);
+ GNUNET_free (udpw);
+ continue;
+ }
+ sent = GNUNET_NETWORK_socket_sendto (sock,
+ udpw->msg_buf,
+ udpw->msg_size,
+ a,
+ slen);
+ if (GNUNET_SYSERR == sent)
+ {
+ /* Failure */
+ analyze_send_error (plugin,
+ a,
+ slen,
+ errno);
+ call_continuation (udpw,
+ GNUNET_SYSERR);
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# UDP, total, bytes, sent, failure",
+ sent,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# UDP, total, messages, sent, failure",
+ 1,
+ GNUNET_NO);
+ }
+ else
+ {
+ /* Success */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n",
+ (unsigned int) (udpw->msg_size),
+ GNUNET_i2s (&udpw->session->target),
+ GNUNET_a2s (a, slen),
+ (int ) sent,
+ (sent < 0) ? STRERROR (errno) : "ok");
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# UDP, total, bytes, sent, success",
+ sent,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# UDP, total, messages, sent, success",
+ 1,
+ GNUNET_NO);
+ if (NULL != udpw->frag_ctx)
+ udpw->frag_ctx->on_wire_size += udpw->msg_size;
+ call_continuation (udpw, GNUNET_OK);
+ }
dequeue (plugin,
- udpw);
+ udpw);
notify_session_monitor (plugin,
udpw->session,
GNUNET_TRANSPORT_SS_UPDATE);
GNUNET_free (udpw);
- return GNUNET_SYSERR;
- }
- sent = GNUNET_NETWORK_socket_sendto (sock,
- udpw->msg_buf,
- udpw->msg_size,
- a,
- slen);
- if (GNUNET_SYSERR == sent)
- {
- /* Failure */
- analyze_send_error (plugin,
- a,
- slen,
- errno);
- call_continuation (udpw,
- GNUNET_SYSERR);
- GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, bytes, sent, failure",
- sent,
- GNUNET_NO);
- GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, messages, sent, failure",
- 1,
- GNUNET_NO);
- }
- else
- {
- /* Success */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n",
- (unsigned int) (udpw->msg_size),
- GNUNET_i2s (&udpw->session->target),
- GNUNET_a2s (a, slen),
- (int ) sent,
- (sent < 0) ? STRERROR (errno) : "ok");
- GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, bytes, sent, success",
- sent,
- GNUNET_NO);
- GNUNET_STATISTICS_update (plugin->env->stats,
- "# UDP, total, messages, sent, success",
- 1,
- GNUNET_NO);
- if (NULL != udpw->frag_ctx)
- udpw->frag_ctx->on_wire_size += udpw->msg_size;
- call_continuation (udpw, GNUNET_OK);
}
- dequeue (plugin, udpw);
- notify_session_monitor (plugin,
- udpw->session,
- GNUNET_TRANSPORT_SS_UPDATE);
- GNUNET_free (udpw);
- return sent;
}
@@ -3110,26 +3150,26 @@ udp_select_send (struct Plugin *plugin,
* Then reschedule this function to be called again once more is available.
*
* @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
+ * @param tc the scheduling context
*/
static void
-udp_plugin_select (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v4 (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Plugin *plugin = cls;
- plugin->select_task = NULL;
+ plugin->select_task_v4 = NULL;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
- && (NULL != plugin->sockv4)
- && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
- udp_select_read (plugin, plugin->sockv4);
- if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
- && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
- && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
- udp_select_send (plugin, plugin->sockv4);
- schedule_select (plugin);
+ if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
+ (NULL != plugin->sockv4) &&
+ (GNUNET_NETWORK_fdset_isset (tc->read_ready,
+ plugin->sockv4)))
+ udp_select_read (plugin,
+ plugin->sockv4);
+ udp_select_send (plugin,
+ plugin->sockv4);
+ schedule_select_v4 (plugin);
}
@@ -3139,7 +3179,7 @@ udp_plugin_select (void *cls,
* Then reschedule this function to be called again once more is available.
*
* @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
+ * @param tc the scheduling context
*/
static void
udp_plugin_select_v6 (void *cls,
@@ -3150,14 +3190,15 @@ udp_plugin_select_v6 (void *cls,
plugin->select_task_v6 = NULL;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
- && (NULL != plugin->sockv6)
- && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
- udp_select_read (plugin, plugin->sockv6);
- if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
- && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
- (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6);
- schedule_select (plugin);
+ if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
+ (NULL != plugin->sockv6) &&
+ (GNUNET_NETWORK_fdset_isset (tc->read_ready,
+ plugin->sockv6)) )
+ udp_select_read (plugin,
+ plugin->sockv6);
+ udp_select_send (plugin,
+ plugin->sockv6);
+ schedule_select_v6 (plugin);
}
@@ -3365,35 +3406,8 @@ setup_sockets (struct Plugin *plugin,
_("Failed to open UDP sockets\n"));
return 0; /* No sockets created, return */
}
-
- /* Create file descriptors */
- if (plugin->enable_ipv4 == GNUNET_YES)
- {
- plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
- plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
- GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
- GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
- if (NULL != plugin->sockv4)
- {
- GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
- GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
- }
- }
-
- if (plugin->enable_ipv6 == GNUNET_YES)
- {
- plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
- plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
- GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
- GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
- if (NULL != plugin->sockv6)
- {
- GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
- GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
- }
- }
-
- schedule_select (plugin);
+ schedule_select_v4 (plugin);
+ schedule_select_v6 (plugin);
plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
GNUNET_NO,
plugin->port,
@@ -3719,12 +3733,12 @@ libgnunet_plugin_transport_udp_done (void *cls)
return NULL;
}
stop_broadcast (plugin);
- if (plugin->select_task != NULL)
+ if (NULL != plugin->select_task_v4)
{
- GNUNET_SCHEDULER_cancel (plugin->select_task);
- plugin->select_task = NULL;
+ GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+ plugin->select_task_v4 = NULL;
}
- if (plugin->select_task_v6 != NULL)
+ if (NULL != plugin->select_task_v6)
{
GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
plugin->select_task_v6 = NULL;
@@ -3739,8 +3753,6 @@ libgnunet_plugin_transport_udp_done (void *cls)
GNUNET_NETWORK_socket_close (plugin->sockv4));
plugin->sockv4 = NULL;
}
- GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
- GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
}
if (GNUNET_YES == plugin->enable_ipv6)
{
@@ -3749,9 +3761,6 @@ libgnunet_plugin_transport_udp_done (void *cls)
GNUNET_break (GNUNET_OK ==
GNUNET_NETWORK_socket_close (plugin->sockv6));
plugin->sockv6 = NULL;
-
- GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
- GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
}
}
if (NULL != plugin->nat)
diff --git a/src/transport/plugin_transport_udp.h b/src/transport/plugin_transport_udp.h
index e1ff5e9cf..e6cb5b450 100644
--- a/src/transport/plugin_transport_udp.h
+++ b/src/transport/plugin_transport_udp.h
@@ -121,29 +121,14 @@ union UdpAddress
/**
- * UDP Message-Packet header (after defragmentation).
+ * Information we track for each message in the queue.
*/
-struct UDPMessage
-{
- /**
- * Message header.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Always zero for now.
- */
- uint32_t reserved;
-
- /**
- * What is the identity of the sender
- */
- struct GNUNET_PeerIdentity sender;
-
-};
-
struct UDP_MessageWrapper;
+
+/**
+ * Closure for #append_port().
+ */
struct PrettyPrinterContext;
@@ -172,7 +157,7 @@ struct Plugin
/**
* ID of select task for IPv4
*/
- struct GNUNET_SCHEDULER_Task *select_task;
+ struct GNUNET_SCHEDULER_Task *select_task_v4;
/**
* ID of select task for IPv6
@@ -205,31 +190,11 @@ struct Plugin
struct GNUNET_NAT_Handle *nat;
/**
- * FD Read set
- */
- struct GNUNET_NETWORK_FDSet *rs_v4;
-
- /**
- * FD Write set
- */
- struct GNUNET_NETWORK_FDSet *ws_v4;
-
- /**
* The read socket for IPv4
*/
struct GNUNET_NETWORK_Handle *sockv4;
/**
- * FD Read set
- */
- struct GNUNET_NETWORK_FDSet *rs_v6;
-
- /**
- * FD Write set
- */
- struct GNUNET_NETWORK_FDSet *ws_v6;
-
- /**
* The read socket for IPv6
*/
struct GNUNET_NETWORK_Handle *sockv6;
@@ -347,6 +312,17 @@ struct Plugin
};
+/**
+ * Function called for a quick conversion of the binary address to
+ * a numeric address. Note that the caller must not free the
+ * address and that the next call to this function is allowed
+ * to override the address again.
+ *
+ * @param cls closure
+ * @param addr binary address (a `union UdpAddress`)
+ * @param addrlen length of the @a addr
+ * @return string representing the same address
+ */
const char *
udp_address_to_string (void *cls,
const void *addr,
diff --git a/src/transport/plugin_transport_udp_broadcasting.c b/src/transport/plugin_transport_udp_broadcasting.c
index e7b7cdc23..ea8797f29 100644
--- a/src/transport/plugin_transport_udp_broadcasting.c
+++ b/src/transport/plugin_transport_udp_broadcasting.c
@@ -527,6 +527,13 @@ iface_proc (void *cls,
}
+/**
+ * Setup broadcasting subsystem.
+ *
+ * @param plugin
+ * @param server_addrv6
+ * @param server_addrv4
+ */
void
setup_broadcast (struct Plugin *plugin,
struct sockaddr_in6 *server_addrv6,
@@ -577,6 +584,11 @@ setup_broadcast (struct Plugin *plugin,
}
+/**
+ * Stop broadcasting subsystem.
+ *
+ * @param plugin
+ */
void
stop_broadcast (struct Plugin *plugin)
{