summaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-10-17 20:28:16 +0000
committerChristian Grothoff <christian@grothoff.org>2015-10-17 20:28:16 +0000
commit41460fb1154acb7ce90bad5e32d13dcf985c6eea (patch)
tree71de5621b705304e52d033f2d9ba46abb603430c /src/transport
parent5c3b8bfb02bbc5745383b4c6ea0be7823ddd9eb5 (diff)
fix misscalculation of per-session flow delays and apply flow delays properly per message instead of per session
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/plugin_transport_udp.c138
1 files changed, 130 insertions, 8 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index ca5166600..66843ed97 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -174,9 +174,11 @@ struct Session
struct GNUNET_TIME_Relative flow_delay_for_other_peer;
/**
- * Desired delay for next sending we received from other peer
+ * Desired delay for transmissions we received from other peer.
+ * Adjusted to be per fragment (UDP_MTU), even though on the
+ * wire it was for "full messages".
*/
- struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+ struct GNUNET_TIME_Relative flow_delay_from_other_peer;
/**
* Session timeout task
@@ -189,6 +191,11 @@ struct Session
struct GNUNET_TIME_Absolute timeout;
/**
+ * What time did we last transmit?
+ */
+ struct GNUNET_TIME_Absolute last_transmit_time;
+
+ /**
* expected delay for ACKs
*/
struct GNUNET_TIME_Relative last_expected_ack_delay;
@@ -330,6 +337,18 @@ struct UDP_FragmentationContext
void *cont_cls;
/**
+ * Start time.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Transmission time for the next fragment. Incremented by
+ * the "flow_delay_from_other_peer" for each fragment when
+ * we setup the fragments.
+ */
+ struct GNUNET_TIME_Absolute next_frag_time;
+
+ /**
* Message timeout
*/
struct GNUNET_TIME_Absolute timeout;
@@ -419,6 +438,17 @@ struct UDP_MessageWrapper
struct UDP_FragmentationContext *frag_ctx;
/**
+ * Message enqueue time.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Desired transmission time for this message, based on the
+ * flow limiting information we got from the other peer.
+ */
+ struct GNUNET_TIME_Absolute transmission_time;
+
+ /**
* Message timeout.
*/
struct GNUNET_TIME_Absolute timeout;
@@ -728,9 +758,26 @@ schedule_select_v4 (struct Plugin *plugin)
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+ GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
if (NULL != plugin->select_task_v4)
GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+ if (NULL != plugin->ipv4_queue_head)
+ {
+ if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Calculated flow delay for UDPv4 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calculated flow delay for UDPv4 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ }
plugin->select_task_v4
= GNUNET_SCHEDULER_add_read_net (min_delay,
plugin->sockv4,
@@ -757,9 +804,26 @@ schedule_select_v6 (struct Plugin *plugin)
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+ GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
if (NULL != plugin->select_task_v6)
GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
+ if (NULL != plugin->ipv6_queue_head)
+ {
+ if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Calculated flow delay for UDPv6 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calculated flow delay for UDPv6 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ }
plugin->select_task_v6
= GNUNET_SCHEDULER_add_read_net (min_delay,
plugin->sockv6,
@@ -1584,6 +1648,7 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
struct UDP_MessageWrapper *udpw;
struct UDP_MessageWrapper *tmp;
size_t overhead;
+ struct GNUNET_TIME_Relative delay;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%p: Fragmented message removed with result %s\n",
@@ -1594,6 +1659,22 @@ fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
else
overhead = frag_ctx->on_wire_size;
+ delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
+ if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Fragmented message acknowledged after %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragmented message acknowledged after %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
+
if (NULL != frag_ctx->cont)
frag_ctx->cont (frag_ctx->cont_cls,
&s->target,
@@ -1765,6 +1846,11 @@ enqueue_fragment (void *cls,
udpw->msg_size = msg_len;
udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
udpw->timeout = frag_ctx->timeout;
+ udpw->start_time = frag_ctx->start_time;
+ udpw->transmission_time = frag_ctx->next_frag_time;
+ frag_ctx->next_frag_time
+ = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
+ session->flow_delay_from_other_peer);
udpw->frag_ctx = frag_ctx;
udpw->qc = &qc_fragment_sent;
udpw->qc_cls = plugin;
@@ -1795,6 +1881,7 @@ qc_message_sent (void *cls,
{
struct Plugin *plugin = cls;
size_t overhead;
+ struct GNUNET_TIME_Relative delay;
if (udpw->msg_size >= udpw->payload_size)
overhead = udpw->msg_size - udpw->payload_size;
@@ -1802,11 +1889,28 @@ qc_message_sent (void *cls,
overhead = udpw->msg_size;
if (NULL != udpw->cont)
+ {
+ delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
+ if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Message sent via UDP with delay of %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message sent via UDP with delay of %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
udpw->cont (udpw->cont_cls,
&udpw->session->target,
result,
udpw->payload_size,
overhead);
+ }
if (GNUNET_OK == result)
{
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1941,7 +2045,12 @@ udp_plugin_send (void *cls,
udpw->msg_buf = (char *) &udpw[1];
udpw->msg_size = udpmlen; /* message size with UDP overhead */
udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
+ udpw->start_time = GNUNET_TIME_absolute_get ();
udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
+ udpw->transmission_time = s->last_transmit_time;
+ s->last_transmit_time
+ = GNUNET_TIME_absolute_add (s->last_transmit_time,
+ s->flow_delay_from_other_peer);
udpw->cont = cont;
udpw->cont_cls = cont_cls;
udpw->frag_ctx = NULL;
@@ -1977,6 +2086,8 @@ udp_plugin_send (void *cls,
frag_ctx->session = s;
frag_ctx->cont = cont;
frag_ctx->cont_cls = cont_cls;
+ frag_ctx->start_time = GNUNET_TIME_absolute_get ();
+ frag_ctx->next_frag_time = s->last_transmit_time;
frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
@@ -1989,6 +2100,7 @@ udp_plugin_send (void *cls,
&enqueue_fragment,
frag_ctx);
s->frag_ctx = frag_ctx;
+ s->last_transmit_time = frag_ctx->next_frag_time;
GNUNET_STATISTICS_update (plugin->env->stats,
"# UDP, fragmented messages active",
1,
@@ -2082,7 +2194,12 @@ read_process_ack (struct Plugin *plugin,
GNUNET_STRINGS_relative_time_to_string (flow_delay,
GNUNET_YES),
GNUNET_i2s (&udp_ack->sender));
- s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
+ /* Flow delay is for the reassembled packet, however, our delay
+ is per packet, so we need to adjust: */
+ flow_delay = GNUNET_TIME_relative_divide (flow_delay,
+ 1 + (s->frag_ctx->payload_size /
+ UDP_MTU));
+ s->flow_delay_from_other_peer = flow_delay;
if (GNUNET_OK !=
@@ -2428,10 +2545,11 @@ udp_plugin_create_session (void *cls,
s->plugin = plugin;
s->address = GNUNET_HELLO_address_copy (address);
s->target = address->peer;
+ s->last_transmit_time = GNUNET_TIME_absolute_get ();
s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
250);
s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
- s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
+ s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
@@ -2740,6 +2858,7 @@ ack_proc (void *cls,
udpw->msg_size = msize;
udpw->payload_size = 0;
udpw->session = s;
+ udpw->start_time = GNUNET_TIME_absolute_get ();
udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
udpw->msg_buf = (char *) &udpw[1];
udpw->qc = &ack_message_sent;
@@ -3085,8 +3204,8 @@ remove_timeout_messages_and_select (struct Plugin *plugin,
}
else
{
- /* Message did not time out, check flow delay */
- remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
+ /* Message did not time out, check transmission time */
+ remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
if (0 == remaining.rel_value_us)
{
/* this message is not delayed */
@@ -3252,6 +3371,9 @@ udp_select_send (struct Plugin *plugin,
udpw->msg_size,
a,
slen);
+ udpw->session->last_transmit_time
+ = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
+ udpw->session->last_transmit_time);
dequeue (plugin,
udpw);
if (GNUNET_SYSERR == sent)