From 41460fb1154acb7ce90bad5e32d13dcf985c6eea Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 17 Oct 2015 20:28:16 +0000 Subject: fix misscalculation of per-session flow delays and apply flow delays properly per message instead of per session --- src/transport/plugin_transport_udp.c | 138 +++++++++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 8 deletions(-) diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index ca5166600..66843ed97 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -174,9 +174,11 @@ struct Session struct GNUNET_TIME_Relative flow_delay_for_other_peer; /** - * Desired delay for next sending we received from other peer + * Desired delay for transmissions we received from other peer. + * Adjusted to be per fragment (UDP_MTU), even though on the + * wire it was for "full messages". */ - struct GNUNET_TIME_Absolute flow_delay_from_other_peer; + struct GNUNET_TIME_Relative flow_delay_from_other_peer; /** * Session timeout task @@ -188,6 +190,11 @@ struct Session */ struct GNUNET_TIME_Absolute timeout; + /** + * What time did we last transmit? + */ + struct GNUNET_TIME_Absolute last_transmit_time; + /** * expected delay for ACKs */ @@ -329,6 +336,18 @@ struct UDP_FragmentationContext */ void *cont_cls; + /** + * Start time. + */ + struct GNUNET_TIME_Absolute start_time; + + /** + * Transmission time for the next fragment. Incremented by + * the "flow_delay_from_other_peer" for each fragment when + * we setup the fragments. + */ + struct GNUNET_TIME_Absolute next_frag_time; + /** * Message timeout */ @@ -418,6 +437,17 @@ struct UDP_MessageWrapper */ struct UDP_FragmentationContext *frag_ctx; + /** + * Message enqueue time. + */ + struct GNUNET_TIME_Absolute start_time; + + /** + * Desired transmission time for this message, based on the + * flow limiting information we got from the other peer. + */ + struct GNUNET_TIME_Absolute transmission_time; + /** * Message timeout. */ @@ -728,9 +758,26 @@ schedule_select_v4 (struct Plugin *plugin) min_delay = GNUNET_TIME_UNIT_FOREVER_REL; for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next) min_delay = GNUNET_TIME_relative_min (min_delay, - GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer)); + GNUNET_TIME_absolute_get_remaining (udpw->transmission_time)); if (NULL != plugin->select_task_v4) GNUNET_SCHEDULER_cancel (plugin->select_task_v4); + if (NULL != plugin->ipv4_queue_head) + { + if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Calculated flow delay for UDPv4 at %s\n", + GNUNET_STRINGS_relative_time_to_string (min_delay, + GNUNET_YES)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calculated flow delay for UDPv4 at %s\n", + GNUNET_STRINGS_relative_time_to_string (min_delay, + GNUNET_YES)); + } + } plugin->select_task_v4 = GNUNET_SCHEDULER_add_read_net (min_delay, plugin->sockv4, @@ -757,9 +804,26 @@ schedule_select_v6 (struct Plugin *plugin) min_delay = GNUNET_TIME_UNIT_FOREVER_REL; for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next) min_delay = GNUNET_TIME_relative_min (min_delay, - GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer)); + GNUNET_TIME_absolute_get_remaining (udpw->transmission_time)); if (NULL != plugin->select_task_v6) GNUNET_SCHEDULER_cancel (plugin->select_task_v6); + if (NULL != plugin->ipv6_queue_head) + { + if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Calculated flow delay for UDPv6 at %s\n", + GNUNET_STRINGS_relative_time_to_string (min_delay, + GNUNET_YES)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calculated flow delay for UDPv6 at %s\n", + GNUNET_STRINGS_relative_time_to_string (min_delay, + GNUNET_YES)); + } + } plugin->select_task_v6 = GNUNET_SCHEDULER_add_read_net (min_delay, plugin->sockv6, @@ -1584,6 +1648,7 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx, struct UDP_MessageWrapper *udpw; struct UDP_MessageWrapper *tmp; size_t overhead; + struct GNUNET_TIME_Relative delay; LOG (GNUNET_ERROR_TYPE_DEBUG, "%p: Fragmented message removed with result %s\n", @@ -1594,6 +1659,22 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx, overhead = frag_ctx->on_wire_size - frag_ctx->payload_size; else overhead = frag_ctx->on_wire_size; + delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time); + if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Fragmented message acknowledged after %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Fragmented message acknowledged after %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + } + if (NULL != frag_ctx->cont) frag_ctx->cont (frag_ctx->cont_cls, &s->target, @@ -1765,6 +1846,11 @@ enqueue_fragment (void *cls, udpw->msg_size = msg_len; udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */ udpw->timeout = frag_ctx->timeout; + udpw->start_time = frag_ctx->start_time; + udpw->transmission_time = frag_ctx->next_frag_time; + frag_ctx->next_frag_time + = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time, + session->flow_delay_from_other_peer); udpw->frag_ctx = frag_ctx; udpw->qc = &qc_fragment_sent; udpw->qc_cls = plugin; @@ -1795,6 +1881,7 @@ qc_message_sent (void *cls, { struct Plugin *plugin = cls; size_t overhead; + struct GNUNET_TIME_Relative delay; if (udpw->msg_size >= udpw->payload_size) overhead = udpw->msg_size - udpw->payload_size; @@ -1802,11 +1889,28 @@ qc_message_sent (void *cls, overhead = udpw->msg_size; if (NULL != udpw->cont) + { + delay = GNUNET_TIME_absolute_get_duration (udpw->start_time); + if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Message sent via UDP with delay of %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message sent via UDP with delay of %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + } udpw->cont (udpw->cont_cls, &udpw->session->target, result, udpw->payload_size, overhead); + } if (GNUNET_OK == result) { GNUNET_STATISTICS_update (plugin->env->stats, @@ -1941,7 +2045,12 @@ udp_plugin_send (void *cls, udpw->msg_buf = (char *) &udpw[1]; udpw->msg_size = udpmlen; /* message size with UDP overhead */ udpw->payload_size = msgbuf_size; /* message size without UDP overhead */ + udpw->start_time = GNUNET_TIME_absolute_get (); udpw->timeout = GNUNET_TIME_relative_to_absolute (to); + udpw->transmission_time = s->last_transmit_time; + s->last_transmit_time + = GNUNET_TIME_absolute_add (s->last_transmit_time, + s->flow_delay_from_other_peer); udpw->cont = cont; udpw->cont_cls = cont_cls; udpw->frag_ctx = NULL; @@ -1977,6 +2086,8 @@ udp_plugin_send (void *cls, frag_ctx->session = s; frag_ctx->cont = cont; frag_ctx->cont_cls = cont_cls; + frag_ctx->start_time = GNUNET_TIME_absolute_get (); + frag_ctx->next_frag_time = s->last_transmit_time; frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to); frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */ frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */ @@ -1989,6 +2100,7 @@ udp_plugin_send (void *cls, &enqueue_fragment, frag_ctx); s->frag_ctx = frag_ctx; + s->last_transmit_time = frag_ctx->next_frag_time; GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, fragmented messages active", 1, @@ -2082,7 +2194,12 @@ read_process_ack (struct Plugin *plugin, GNUNET_STRINGS_relative_time_to_string (flow_delay, GNUNET_YES), GNUNET_i2s (&udp_ack->sender)); - s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay); + /* Flow delay is for the reassembled packet, however, our delay + is per packet, so we need to adjust: */ + flow_delay = GNUNET_TIME_relative_divide (flow_delay, + 1 + (s->frag_ctx->payload_size / + UDP_MTU)); + s->flow_delay_from_other_peer = flow_delay; if (GNUNET_OK != @@ -2428,10 +2545,11 @@ udp_plugin_create_session (void *cls, s->plugin = plugin; s->address = GNUNET_HELLO_address_copy (address); s->target = address->peer; + s->last_transmit_time = GNUNET_TIME_absolute_get (); s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250); s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; - s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; + s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO; s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, @@ -2740,6 +2858,7 @@ ack_proc (void *cls, udpw->msg_size = msize; udpw->payload_size = 0; udpw->session = s; + udpw->start_time = GNUNET_TIME_absolute_get (); udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; udpw->msg_buf = (char *) &udpw[1]; udpw->qc = &ack_message_sent; @@ -3085,8 +3204,8 @@ remove_timeout_messages_and_select (struct Plugin *plugin, } else { - /* Message did not time out, check flow delay */ - remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer); + /* Message did not time out, check transmission time */ + remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time); if (0 == remaining.rel_value_us) { /* this message is not delayed */ @@ -3252,6 +3371,9 @@ udp_select_send (struct Plugin *plugin, udpw->msg_size, a, slen); + udpw->session->last_transmit_time + = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (), + udpw->session->last_transmit_time); dequeue (plugin, udpw); if (GNUNET_SYSERR == sent) -- cgit v1.2.3