aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-10-17 20:28:16 +0000
committerChristian Grothoff <christian@grothoff.org>2015-10-17 20:28:16 +0000
commit41460fb1154acb7ce90bad5e32d13dcf985c6eea (patch)
tree71de5621b705304e52d033f2d9ba46abb603430c
parent5c3b8bfb02bbc5745383b4c6ea0be7823ddd9eb5 (diff)
downloadgnunet-41460fb1154acb7ce90bad5e32d13dcf985c6eea.tar.gz
gnunet-41460fb1154acb7ce90bad5e32d13dcf985c6eea.zip
fix misscalculation of per-session flow delays and apply flow delays properly per message instead of per session
-rw-r--r--src/transport/plugin_transport_udp.c138
1 files changed, 130 insertions, 8 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index ca5166600..66843ed97 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -174,9 +174,11 @@ struct Session
174 struct GNUNET_TIME_Relative flow_delay_for_other_peer; 174 struct GNUNET_TIME_Relative flow_delay_for_other_peer;
175 175
176 /** 176 /**
177 * Desired delay for next sending we received from other peer 177 * Desired delay for transmissions we received from other peer.
178 * Adjusted to be per fragment (UDP_MTU), even though on the
179 * wire it was for "full messages".
178 */ 180 */
179 struct GNUNET_TIME_Absolute flow_delay_from_other_peer; 181 struct GNUNET_TIME_Relative flow_delay_from_other_peer;
180 182
181 /** 183 /**
182 * Session timeout task 184 * Session timeout task
@@ -189,6 +191,11 @@ struct Session
189 struct GNUNET_TIME_Absolute timeout; 191 struct GNUNET_TIME_Absolute timeout;
190 192
191 /** 193 /**
194 * What time did we last transmit?
195 */
196 struct GNUNET_TIME_Absolute last_transmit_time;
197
198 /**
192 * expected delay for ACKs 199 * expected delay for ACKs
193 */ 200 */
194 struct GNUNET_TIME_Relative last_expected_ack_delay; 201 struct GNUNET_TIME_Relative last_expected_ack_delay;
@@ -330,6 +337,18 @@ struct UDP_FragmentationContext
330 void *cont_cls; 337 void *cont_cls;
331 338
332 /** 339 /**
340 * Start time.
341 */
342 struct GNUNET_TIME_Absolute start_time;
343
344 /**
345 * Transmission time for the next fragment. Incremented by
346 * the "flow_delay_from_other_peer" for each fragment when
347 * we setup the fragments.
348 */
349 struct GNUNET_TIME_Absolute next_frag_time;
350
351 /**
333 * Message timeout 352 * Message timeout
334 */ 353 */
335 struct GNUNET_TIME_Absolute timeout; 354 struct GNUNET_TIME_Absolute timeout;
@@ -419,6 +438,17 @@ struct UDP_MessageWrapper
419 struct UDP_FragmentationContext *frag_ctx; 438 struct UDP_FragmentationContext *frag_ctx;
420 439
421 /** 440 /**
441 * Message enqueue time.
442 */
443 struct GNUNET_TIME_Absolute start_time;
444
445 /**
446 * Desired transmission time for this message, based on the
447 * flow limiting information we got from the other peer.
448 */
449 struct GNUNET_TIME_Absolute transmission_time;
450
451 /**
422 * Message timeout. 452 * Message timeout.
423 */ 453 */
424 struct GNUNET_TIME_Absolute timeout; 454 struct GNUNET_TIME_Absolute timeout;
@@ -728,9 +758,26 @@ schedule_select_v4 (struct Plugin *plugin)
728 min_delay = GNUNET_TIME_UNIT_FOREVER_REL; 758 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
729 for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next) 759 for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
730 min_delay = GNUNET_TIME_relative_min (min_delay, 760 min_delay = GNUNET_TIME_relative_min (min_delay,
731 GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer)); 761 GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
732 if (NULL != plugin->select_task_v4) 762 if (NULL != plugin->select_task_v4)
733 GNUNET_SCHEDULER_cancel (plugin->select_task_v4); 763 GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
764 if (NULL != plugin->ipv4_queue_head)
765 {
766 if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
767 {
768 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
769 "Calculated flow delay for UDPv4 at %s\n",
770 GNUNET_STRINGS_relative_time_to_string (min_delay,
771 GNUNET_YES));
772 }
773 else
774 {
775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776 "Calculated flow delay for UDPv4 at %s\n",
777 GNUNET_STRINGS_relative_time_to_string (min_delay,
778 GNUNET_YES));
779 }
780 }
734 plugin->select_task_v4 781 plugin->select_task_v4
735 = GNUNET_SCHEDULER_add_read_net (min_delay, 782 = GNUNET_SCHEDULER_add_read_net (min_delay,
736 plugin->sockv4, 783 plugin->sockv4,
@@ -757,9 +804,26 @@ schedule_select_v6 (struct Plugin *plugin)
757 min_delay = GNUNET_TIME_UNIT_FOREVER_REL; 804 min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
758 for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next) 805 for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
759 min_delay = GNUNET_TIME_relative_min (min_delay, 806 min_delay = GNUNET_TIME_relative_min (min_delay,
760 GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer)); 807 GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
761 if (NULL != plugin->select_task_v6) 808 if (NULL != plugin->select_task_v6)
762 GNUNET_SCHEDULER_cancel (plugin->select_task_v6); 809 GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
810 if (NULL != plugin->ipv6_queue_head)
811 {
812 if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
813 {
814 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
815 "Calculated flow delay for UDPv6 at %s\n",
816 GNUNET_STRINGS_relative_time_to_string (min_delay,
817 GNUNET_YES));
818 }
819 else
820 {
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822 "Calculated flow delay for UDPv6 at %s\n",
823 GNUNET_STRINGS_relative_time_to_string (min_delay,
824 GNUNET_YES));
825 }
826 }
763 plugin->select_task_v6 827 plugin->select_task_v6
764 = GNUNET_SCHEDULER_add_read_net (min_delay, 828 = GNUNET_SCHEDULER_add_read_net (min_delay,
765 plugin->sockv6, 829 plugin->sockv6,
@@ -1584,6 +1648,7 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1584 struct UDP_MessageWrapper *udpw; 1648 struct UDP_MessageWrapper *udpw;
1585 struct UDP_MessageWrapper *tmp; 1649 struct UDP_MessageWrapper *tmp;
1586 size_t overhead; 1650 size_t overhead;
1651 struct GNUNET_TIME_Relative delay;
1587 1652
1588 LOG (GNUNET_ERROR_TYPE_DEBUG, 1653 LOG (GNUNET_ERROR_TYPE_DEBUG,
1589 "%p: Fragmented message removed with result %s\n", 1654 "%p: Fragmented message removed with result %s\n",
@@ -1594,6 +1659,22 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1594 overhead = frag_ctx->on_wire_size - frag_ctx->payload_size; 1659 overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1595 else 1660 else
1596 overhead = frag_ctx->on_wire_size; 1661 overhead = frag_ctx->on_wire_size;
1662 delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
1663 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1664 {
1665 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1666 "Fragmented message acknowledged after %s\n",
1667 GNUNET_STRINGS_relative_time_to_string (delay,
1668 GNUNET_YES));
1669 }
1670 else
1671 {
1672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673 "Fragmented message acknowledged after %s\n",
1674 GNUNET_STRINGS_relative_time_to_string (delay,
1675 GNUNET_YES));
1676 }
1677
1597 if (NULL != frag_ctx->cont) 1678 if (NULL != frag_ctx->cont)
1598 frag_ctx->cont (frag_ctx->cont_cls, 1679 frag_ctx->cont (frag_ctx->cont_cls,
1599 &s->target, 1680 &s->target,
@@ -1765,6 +1846,11 @@ enqueue_fragment (void *cls,
1765 udpw->msg_size = msg_len; 1846 udpw->msg_size = msg_len;
1766 udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */ 1847 udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1767 udpw->timeout = frag_ctx->timeout; 1848 udpw->timeout = frag_ctx->timeout;
1849 udpw->start_time = frag_ctx->start_time;
1850 udpw->transmission_time = frag_ctx->next_frag_time;
1851 frag_ctx->next_frag_time
1852 = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
1853 session->flow_delay_from_other_peer);
1768 udpw->frag_ctx = frag_ctx; 1854 udpw->frag_ctx = frag_ctx;
1769 udpw->qc = &qc_fragment_sent; 1855 udpw->qc = &qc_fragment_sent;
1770 udpw->qc_cls = plugin; 1856 udpw->qc_cls = plugin;
@@ -1795,6 +1881,7 @@ qc_message_sent (void *cls,
1795{ 1881{
1796 struct Plugin *plugin = cls; 1882 struct Plugin *plugin = cls;
1797 size_t overhead; 1883 size_t overhead;
1884 struct GNUNET_TIME_Relative delay;
1798 1885
1799 if (udpw->msg_size >= udpw->payload_size) 1886 if (udpw->msg_size >= udpw->payload_size)
1800 overhead = udpw->msg_size - udpw->payload_size; 1887 overhead = udpw->msg_size - udpw->payload_size;
@@ -1802,11 +1889,28 @@ qc_message_sent (void *cls,
1802 overhead = udpw->msg_size; 1889 overhead = udpw->msg_size;
1803 1890
1804 if (NULL != udpw->cont) 1891 if (NULL != udpw->cont)
1892 {
1893 delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
1894 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1895 {
1896 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1897 "Message sent via UDP with delay of %s\n",
1898 GNUNET_STRINGS_relative_time_to_string (delay,
1899 GNUNET_YES));
1900 }
1901 else
1902 {
1903 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904 "Message sent via UDP with delay of %s\n",
1905 GNUNET_STRINGS_relative_time_to_string (delay,
1906 GNUNET_YES));
1907 }
1805 udpw->cont (udpw->cont_cls, 1908 udpw->cont (udpw->cont_cls,
1806 &udpw->session->target, 1909 &udpw->session->target,
1807 result, 1910 result,
1808 udpw->payload_size, 1911 udpw->payload_size,
1809 overhead); 1912 overhead);
1913 }
1810 if (GNUNET_OK == result) 1914 if (GNUNET_OK == result)
1811 { 1915 {
1812 GNUNET_STATISTICS_update (plugin->env->stats, 1916 GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1941,7 +2045,12 @@ udp_plugin_send (void *cls,
1941 udpw->msg_buf = (char *) &udpw[1]; 2045 udpw->msg_buf = (char *) &udpw[1];
1942 udpw->msg_size = udpmlen; /* message size with UDP overhead */ 2046 udpw->msg_size = udpmlen; /* message size with UDP overhead */
1943 udpw->payload_size = msgbuf_size; /* message size without UDP overhead */ 2047 udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2048 udpw->start_time = GNUNET_TIME_absolute_get ();
1944 udpw->timeout = GNUNET_TIME_relative_to_absolute (to); 2049 udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
2050 udpw->transmission_time = s->last_transmit_time;
2051 s->last_transmit_time
2052 = GNUNET_TIME_absolute_add (s->last_transmit_time,
2053 s->flow_delay_from_other_peer);
1945 udpw->cont = cont; 2054 udpw->cont = cont;
1946 udpw->cont_cls = cont_cls; 2055 udpw->cont_cls = cont_cls;
1947 udpw->frag_ctx = NULL; 2056 udpw->frag_ctx = NULL;
@@ -1977,6 +2086,8 @@ udp_plugin_send (void *cls,
1977 frag_ctx->session = s; 2086 frag_ctx->session = s;
1978 frag_ctx->cont = cont; 2087 frag_ctx->cont = cont;
1979 frag_ctx->cont_cls = cont_cls; 2088 frag_ctx->cont_cls = cont_cls;
2089 frag_ctx->start_time = GNUNET_TIME_absolute_get ();
2090 frag_ctx->next_frag_time = s->last_transmit_time;
1980 frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to); 2091 frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
1981 frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */ 2092 frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1982 frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */ 2093 frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
@@ -1989,6 +2100,7 @@ udp_plugin_send (void *cls,
1989 &enqueue_fragment, 2100 &enqueue_fragment,
1990 frag_ctx); 2101 frag_ctx);
1991 s->frag_ctx = frag_ctx; 2102 s->frag_ctx = frag_ctx;
2103 s->last_transmit_time = frag_ctx->next_frag_time;
1992 GNUNET_STATISTICS_update (plugin->env->stats, 2104 GNUNET_STATISTICS_update (plugin->env->stats,
1993 "# UDP, fragmented messages active", 2105 "# UDP, fragmented messages active",
1994 1, 2106 1,
@@ -2082,7 +2194,12 @@ read_process_ack (struct Plugin *plugin,
2082 GNUNET_STRINGS_relative_time_to_string (flow_delay, 2194 GNUNET_STRINGS_relative_time_to_string (flow_delay,
2083 GNUNET_YES), 2195 GNUNET_YES),
2084 GNUNET_i2s (&udp_ack->sender)); 2196 GNUNET_i2s (&udp_ack->sender));
2085 s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay); 2197 /* Flow delay is for the reassembled packet, however, our delay
2198 is per packet, so we need to adjust: */
2199 flow_delay = GNUNET_TIME_relative_divide (flow_delay,
2200 1 + (s->frag_ctx->payload_size /
2201 UDP_MTU));
2202 s->flow_delay_from_other_peer = flow_delay;
2086 2203
2087 2204
2088 if (GNUNET_OK != 2205 if (GNUNET_OK !=
@@ -2428,10 +2545,11 @@ udp_plugin_create_session (void *cls,
2428 s->plugin = plugin; 2545 s->plugin = plugin;
2429 s->address = GNUNET_HELLO_address_copy (address); 2546 s->address = GNUNET_HELLO_address_copy (address);
2430 s->target = address->peer; 2547 s->target = address->peer;
2548 s->last_transmit_time = GNUNET_TIME_absolute_get ();
2431 s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 2549 s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2432 250); 2550 250);
2433 s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; 2551 s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2434 s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; 2552 s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
2435 s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; 2553 s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2436 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); 2554 s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2437 s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, 2555 s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
@@ -2740,6 +2858,7 @@ ack_proc (void *cls,
2740 udpw->msg_size = msize; 2858 udpw->msg_size = msize;
2741 udpw->payload_size = 0; 2859 udpw->payload_size = 0;
2742 udpw->session = s; 2860 udpw->session = s;
2861 udpw->start_time = GNUNET_TIME_absolute_get ();
2743 udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; 2862 udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2744 udpw->msg_buf = (char *) &udpw[1]; 2863 udpw->msg_buf = (char *) &udpw[1];
2745 udpw->qc = &ack_message_sent; 2864 udpw->qc = &ack_message_sent;
@@ -3085,8 +3204,8 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
3085 } 3204 }
3086 else 3205 else
3087 { 3206 {
3088 /* Message did not time out, check flow delay */ 3207 /* Message did not time out, check transmission time */
3089 remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer); 3208 remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
3090 if (0 == remaining.rel_value_us) 3209 if (0 == remaining.rel_value_us)
3091 { 3210 {
3092 /* this message is not delayed */ 3211 /* this message is not delayed */
@@ -3252,6 +3371,9 @@ udp_select_send (struct Plugin *plugin,
3252 udpw->msg_size, 3371 udpw->msg_size,
3253 a, 3372 a,
3254 slen); 3373 slen);
3374 udpw->session->last_transmit_time
3375 = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
3376 udpw->session->last_transmit_time);
3255 dequeue (plugin, 3377 dequeue (plugin,
3256 udpw); 3378 udpw);
3257 if (GNUNET_SYSERR == sent) 3379 if (GNUNET_SYSERR == sent)