From 463bedc5891215a4e496a7e45e71851dc700c54b Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Thu, 2 Feb 2012 12:47:26 +0000 Subject: - working with fragmentation now fine tuning --- src/transport/plugin_transport_udp_new.c | 214 ++++++++++++++++++++++--------- 1 file changed, 153 insertions(+), 61 deletions(-) diff --git a/src/transport/plugin_transport_udp_new.c b/src/transport/plugin_transport_udp_new.c index efb131dd4..d914e68eb 100644 --- a/src/transport/plugin_transport_udp_new.c +++ b/src/transport/plugin_transport_udp_new.c @@ -108,8 +108,10 @@ struct Session struct GNUNET_ATS_Information ats; - struct FragmentationContext * head; - struct FragmentationContext * tail; + struct FragmentationContext * frag_ctx; + +// struct FragmentationContext * head; +// struct FragmentationContext * tail; }; @@ -201,9 +203,6 @@ struct DefragContext * Length of 'src_addr' */ size_t addr_len; - - struct GNUNET_PeerIdentity id; - }; @@ -257,7 +256,7 @@ struct UDPMessageWrapper */ void *cont_cls; - struct FragmentationContext *frag; + struct FragmentationContext *frag_ctx; }; @@ -526,6 +525,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) { struct Plugin *plugin = cls; struct Session *s = value; + struct UDPMessageWrapper *udpw; #if DEBUG_UDP LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -534,22 +534,35 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_i2s (&s->target), GNUNET_a2s (s->sock_addr, s->addrlen)); #endif - struct FragmentationContext *fctx = s->head; - while (fctx != NULL) + plugin->env->session_end (plugin->env->cls, &s->target, s); + + while (s->frag_ctx != NULL) { - GNUNET_FRAGMENT_context_destroy(fctx->frag); - GNUNET_CONTAINER_DLL_remove(s->head, s->tail, fctx); - GNUNET_free (fctx); - fctx = s->head; + GNUNET_FRAGMENT_context_destroy(s->frag_ctx->frag); + GNUNET_free (s->frag_ctx); + s->frag_ctx = NULL; } - plugin->env->session_end (plugin->env->cls, &s->target, s); + udpw = plugin->msg_head; + while (udpw != NULL) + { + if (udpw->session == s) + { + GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw); + + if (udpw->cont != NULL) + udpw->cont (udpw->cont_cls, &s->target, GNUNET_SYSERR); + GNUNET_free (udpw); + } + udpw = plugin->msg_head; + } GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (plugin->sessions, &s->target.hashPubKey, s)); + GNUNET_free (s); return GNUNET_OK; } @@ -576,6 +589,8 @@ udp_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) /* Clean up sessions */ GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessions, &target->hashPubKey, &disconnect_and_free_it, plugin); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "FREEED SESSIONS from peer `%s'\n", GNUNET_i2s (target)); } static struct Session * @@ -650,7 +665,7 @@ static int session_cmp_it (void *cls, socklen_t s_addrlen = s->addrlen; -#if VERBOSE +#if VERBOSE_UDP GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n", udp_address_to_string (NULL, (void *) address->address, address->address_length), GNUNET_a2s (s->sock_addr, s->addrlen)); @@ -719,13 +734,13 @@ udp_plugin_get_session (void *cls, struct SessionCompareContext cctx; cctx.addr = address; cctx.res = NULL; -#if DEBUG_UDP +#if VERBOSE_UDP GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length)); #endif GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx); if (cctx.res != NULL) { -#if DEBUG_UDP +#if VERBOSE_UDP GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res); #endif return cctx.res; @@ -737,7 +752,7 @@ udp_plugin_get_session (void *cls, address->address, address->address_length, NULL, NULL); -#if DEBUG_UDP +#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating new session %p for peer `%s' address `%s'\n", s, @@ -771,10 +786,9 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) size_t msg_len = ntohs (msg->size); -#if DEBUG_UDP +#if VERBOSE_UDP + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper)); #endif - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Enqueueing fragment with %u bytes %u\n", msg_len , sizeof (struct UDPMessageWrapper)); - udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len); udpw->session = frag_ctx->session; @@ -784,7 +798,7 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) udpw->cont = frag_ctx->cont; udpw->cont_cls = frag_ctx->cont_cls; udpw->timeout = frag_ctx->timeout; - udpw->frag = frag_ctx; + udpw->frag_ctx = frag_ctx; memcpy (udpw->udp, msg, msg_len); GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw); @@ -848,7 +862,7 @@ udp_plugin_send (void *cls, return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_ERROR, + LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP transmits %u-byte message to `%s' using address `%s'\n", msgbuf_size, GNUNET_i2s (&s->target), @@ -870,7 +884,7 @@ udp_plugin_send (void *cls, udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to); udpw->cont = cont; udpw->cont_cls = cont_cls; - udpw->frag = NULL; + udpw->frag_ctx = NULL; memcpy (udpw->udp, udp, sizeof (struct UDPMessage)); memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size); @@ -879,8 +893,10 @@ udp_plugin_send (void *cls, } else { - LOG (GNUNET_ERROR_TYPE_ERROR, + LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP has to fragment message \n"); + if (s->frag_ctx != NULL) + return GNUNET_SYSERR; memcpy (&udp[1], msgbuf, msgbuf_size); struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct FragmentationContext)); @@ -898,7 +914,7 @@ udp_plugin_send (void *cls, &enqueue_fragment, frag_ctx); - GNUNET_CONTAINER_DLL_insert(s->head, s->tail, frag_ctx); + s->frag_ctx = frag_ctx; } @@ -1013,7 +1029,7 @@ process_inbound_tokenized_messages (void *cls, void *client, &si->sender, hdr, (const struct GNUNET_ATS_Information *) &ats, 2, - si->session, + NULL, si->arg, si->args); si->session->flow_delay_for_other_peer = delay; @@ -1147,13 +1163,16 @@ fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_break (0); return; } + LOG (GNUNET_ERROR_TYPE_ERROR, "Sending fragment_msg_proc ms\n"); process_udp_message (rc->plugin, (const struct UDPMessage *) msg, rc->src_addr, rc->addr_len); } struct LookupContext { - struct DefragContext *rc; + const struct sockaddr * addr; + size_t addrlen; + struct Session *res; }; @@ -1163,11 +1182,9 @@ lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value) struct LookupContext *l_ctx = cls; struct Session * s = value; - if ((s->addrlen == l_ctx->rc->addr_len) && - (0 == memcmp (s->sock_addr, l_ctx->rc->src_addr, s->addrlen))) + if ((s->addrlen == l_ctx->addrlen) && + (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen))) { - LOG (GNUNET_ERROR_TYPE_ERROR, - "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY \n"); l_ctx->res = s; return GNUNET_NO; } @@ -1185,6 +1202,7 @@ static void ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) { struct DefragContext *rc = cls; + LOG (GNUNET_ERROR_TYPE_ERROR, "Sending ACK ms\n"); size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); struct UDP_ACK_Message *udp_ack; @@ -1193,21 +1211,18 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) struct Session *s; struct LookupContext l_ctx; - l_ctx.rc = rc; + l_ctx.addr = rc->src_addr; + l_ctx.addrlen = rc->addr_len; l_ctx.res = NULL; - GNUNET_CONTAINER_multihashmap_get_multiple(rc->plugin->sessions, - &rc->id.hashPubKey, + GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions, &lookup_session_by_addr_it, &l_ctx); s = l_ctx.res; - if (s != NULL) - { - if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX) - delay = s->flow_delay_for_other_peer.rel_value; - else - delay = UINT32_MAX; - } + GNUNET_assert (s != NULL); + + if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX) + delay = s->flow_delay_for_other_peer.rel_value; #if DEBUG_UDP LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1221,7 +1236,7 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize); udpw->cont = NULL; udpw->cont_cls = NULL; - udpw->frag = NULL; + udpw->frag_ctx = NULL; udpw->msg_size = msize; udpw->session = s; udpw->timeout = GNUNET_TIME_absolute_get_forever(); @@ -1253,15 +1268,79 @@ static void read_process_msg (struct Plugin *plugin, return; } -static void read_process_ack () +static void read_process_ack (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + char *addr, + socklen_t fromlen) { - //const struct GNUNET_MessageHeader *ack; - //struct Session *peer_session; - //const struct UDP_ACK_Message *udp_ack; - //struct Session *s = NULL; - //struct GNUNET_TIME_Relative flow_delay; - //struct GNUNET_ATS_Information ats; - GNUNET_break_op (0); + const struct GNUNET_MessageHeader *ack; + const struct UDP_ACK_Message *udp_ack; + struct LookupContext l_ctx; + struct Session *s = NULL; + struct GNUNET_TIME_Relative flow_delay; + + if (ntohs (msg->size) < + sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return; + } + + udp_ack = (const struct UDP_ACK_Message *) msg; + + l_ctx.addr = (const struct sockaddr *) addr; + l_ctx.addrlen = fromlen; + l_ctx.res = NULL; + GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions, + &lookup_session_by_addr_it, + &l_ctx); + s = l_ctx.res; + GNUNET_assert (s != NULL); + + if (s != NULL) + { + flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay); + + LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n", + flow_delay.rel_value); + + s->flow_delay_from_other_peer = + GNUNET_TIME_relative_to_absolute (flow_delay); + } + + ack = (const struct GNUNET_MessageHeader *) &udp_ack[1]; + if (ntohs (ack->size) != + ntohs (msg->size) - sizeof (struct UDP_ACK_Message)) + { + GNUNET_break_op (0); + return; + } + + if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack)) + { +#if DEBUG_UDP + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP processes %u-byte acknowledgement from `%s' at `%s'\n", + (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); +#endif + return; + } + +#if DEBUG_UDP + LOG (GNUNET_ERROR_TYPE_DEBUG, + "FULL MESSAGE ACKed\n", + (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); +#endif + plugin->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag); + + if (s->frag_ctx->cont != NULL) + s->frag_ctx->cont + (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK); + GNUNET_free (s->frag_ctx); + s->frag_ctx = NULL; + return; } static void read_process_fragment (struct Plugin *plugin, @@ -1307,6 +1386,19 @@ static void read_process_fragment (struct Plugin *plugin, GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx, (GNUNET_CONTAINER_HeapCostType) now.abs_value); +#if DEBUG_UDP + LOG (GNUNET_ERROR_TYPE_DEBUG, "Created new defragmentation context for %u-byte fragment from `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); +#endif + } + else + { +#if DEBUG_UDP + LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing defragmentation context for %u-byte fragment from `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); +#endif } if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg)) @@ -1375,7 +1467,7 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) return; case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK: - read_process_ack (); + read_process_ack (plugin, msg, addr, fromlen);; return; case GNUNET_MESSAGE_TYPE_FRAGMENT: @@ -1409,13 +1501,13 @@ udp_select_send (struct Plugin *plugin) if (udpw->cont != NULL) udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR); - if (udpw->frag != NULL) + if (udpw->frag_ctx != NULL) { #if DEBUG_UDP - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmendted message for peer `%s' with size %u timed out\n", - GNUNET_i2s(&udpw->session->target), udpw->frag->bytes_to_send); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Fragmented message for peer `%s' with size %u timed out\n", + GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->bytes_to_send); #endif - GNUNET_FRAGMENT_context_destroy(udpw->frag->frag); + GNUNET_FRAGMENT_context_destroy(udpw->frag_ctx->frag); } else { @@ -1462,22 +1554,22 @@ udp_select_send (struct Plugin *plugin) if (GNUNET_SYSERR == sent) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "sendto"); - LOG (GNUNET_ERROR_TYPE_ERROR, + LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP transmitted %u-byte message to %s (%d: %s)\n", - (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, + (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, (sent < 0) ? STRERROR (errno) : "ok"); if (udpw->cont != NULL) udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR); } LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP transmitted %u-byte message to %s (%d: %s)\n", - (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, + (unsigned int) (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, (sent < 0) ? STRERROR (errno) : "ok"); /* This was just a message fragment */ - if (udpw->frag != NULL) + if (udpw->frag_ctx != NULL) { - GNUNET_FRAGMENT_context_transmission_done (udpw->frag->frag); + GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag); } /* This was a complete message*/ else @@ -1865,7 +1957,7 @@ libgnunet_plugin_transport_udp_done (void *cls) { struct GNUNET_TRANSPORT_PluginFunctions *api = cls; struct Plugin *plugin = api->cls; - +GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "libgnunet_plugin_transport_udp_done\n "); stop_broadcast (plugin); if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK) -- cgit v1.2.3