summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-06-09 18:38:21 +0200
committerChristian Grothoff <christian@grothoff.org>2019-06-09 18:38:21 +0200
commit5c0d8339072c343e7f5d5578acecdd99d0fe842b (patch)
treea39512eb467ca842bf43474ebad8717ecdada8b4
parentf531d06a9d64d627034c2220769c1af63fb72297 (diff)
add FC retransmission logic
-rw-r--r--src/transport/gnunet-service-tng.c138
1 files changed, 113 insertions, 25 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 3e4d750a2..bb477fc1e 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -26,9 +26,7 @@
* Implement next:
* - FIXME-FC: realize transport-to-transport flow control (needed in case
* communicators do not offer flow control).
- * We do transmit FC window sizes now. Left:
- * for SENDING)
- * - need to call consider_sending_fc() periodically if it goes unanswered!
+ * We do transmit FC window sizes now.
*
* for DV)
* - send challenges via DV (when DVH is confirmed *and* we care about
@@ -118,6 +116,20 @@
#define MAX_CUMMULATIVE_ACKS 64
/**
+ * What is the 1:n chance that we send a Flow control response when
+ * receiving a flow control message that did not change anything for
+ * us? Basically, this is used in the case where both peers are stuck
+ * on flow control (no window changes), but one might continue sending
+ * flow control messages to the other peer as the first FC message
+ * when things stalled got lost, and then subsequently the other peer
+ * does *usually* not respond as nothing changed. So to ensure that
+ * eventually the FC messages stop, we do send with 1/8th probability
+ * an FC message even if nothing changed. That prevents one peer
+ * being stuck in sending (useless) FC messages "forever".
+ */
+#define FC_NO_CHANGE_REPLY_PROBABILITY 8
+
+/**
* What is the size we assume for a read operation in the
* absence of an MTU for the purpose of flow control?
*/
@@ -1290,6 +1302,12 @@ struct VirtualLink
struct GNUNET_SCHEDULER_Task *visibility_task;
/**
+ * Task scheduled to periodically retransmit FC messages (in
+ * case one got lost).
+ */
+ struct GNUNET_SCHEDULER_Task *fc_retransmit_task;
+
+ /**
* Neighbour used by this virtual link, NULL if @e dv is used.
*/
struct Neighbour *n;
@@ -1335,6 +1353,12 @@ struct VirtualLink
struct GNUNET_TIME_Absolute last_fc_timestamp;
/**
+ * Expected RTT from the last FC transmission. (Zero if the last
+ * attempt failed, but could theoretically be zero even on success.)
+ */
+ struct GNUNET_TIME_Relative last_fc_rtt;
+
+ /**
* Used to generate unique UUIDs for messages that are being
* fragmented.
*/
@@ -1400,6 +1424,17 @@ struct VirtualLink
uint64_t outbound_fc_window_size_used;
/**
+ * What is the most recent FC window the other peer sent us
+ * in `outbound_window_size`? This is basically the window
+ * size value the other peer has definitively received from
+ * us. If it matches @e incoming_fc_window_size, we should
+ * not send a FC message to increase the FC window. However,
+ * we may still send an FC message to notify the other peer
+ * that we received the other peer's FC message.
+ */
+ uint64_t last_outbound_window_size_received;
+
+ /**
* Generator for the sequence numbers of
* #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL messages we send.
*/
@@ -2987,6 +3022,11 @@ free_virtual_link (struct VirtualLink *vl)
GNUNET_SCHEDULER_cancel (vl->visibility_task);
vl->visibility_task = NULL;
}
+ if (NULL != vl->fc_retransmit_task)
+ {
+ GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+ vl->fc_retransmit_task = NULL;
+ }
while (NULL != (csc = vl->csc_head))
{
GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc);
@@ -4237,8 +4277,9 @@ queue_send_msg (struct Queue *queue,
* @param hdr message to send as payload
* @param options whether queues must be confirmed or not,
* and whether we may pick multiple (2) queues
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
*/
-static void
+static struct GNUNET_TIME_Relative
route_via_neighbour (const struct Neighbour *n,
const struct GNUNET_MessageHeader *hdr,
enum RouteMessageOptions options)
@@ -4247,6 +4288,7 @@ route_via_neighbour (const struct Neighbour *n,
unsigned int candidates;
unsigned int sel1;
unsigned int sel2;
+ struct GNUNET_TIME_Relative rtt;
/* Pick one or two 'random' queues from n (under constraints of options) */
now = GNUNET_TIME_absolute_get ();
@@ -4274,9 +4316,10 @@ route_via_neighbour (const struct Neighbour *n,
"# route selection failed (all no valid queue)",
1,
GNUNET_NO);
- return;
+ return GNUNET_TIME_UNIT_FOREVER_REL;
}
+ rtt = GNUNET_TIME_UNIT_FOREVER_REL;
sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
if (0 == (options & RMO_REDUNDANT))
sel2 = candidates; /* picks none! */
@@ -4297,11 +4340,13 @@ route_via_neighbour (const struct Neighbour *n,
GNUNET_i2s (&n->pid),
pos->address,
(sel1 == candidates) ? 1 : 2);
+ rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
}
candidates++;
}
}
+ return rtt;
}
@@ -4523,8 +4568,9 @@ typedef void (*DVMessageHandler) (void *cls,
* @param use function to call with the encapsulated message
* @param use_cls closure for @a use
* @param options whether path must be confirmed or not, to be passed to @a use
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
*/
-static void
+static struct GNUNET_TIME_Relative
encapsulate_for_dv (struct DistanceVector *dv,
unsigned int num_dvhs,
struct DistanceVectorHop **dvhs,
@@ -4540,6 +4586,7 @@ encapsulate_for_dv (struct DistanceVector *dv,
struct TransportDVBoxPayloadP *enc_payload_hdr =
(struct TransportDVBoxPayloadP *) enc;
struct DVKeyState key;
+ struct GNUNET_TIME_Relative rtt;
/* Encrypt payload */
box_hdr.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
@@ -4560,7 +4607,7 @@ encapsulate_for_dv (struct DistanceVector *dv,
enc_body_size);
dv_hmac (&key, &box_hdr.hmac, enc, sizeof (enc));
dv_key_clean (&key);
-
+ rtt = GNUNET_TIME_UNIT_FOREVER_REL;
/* For each selected path, take the pre-computed header and body
and add the path in the middle of the message; then send it. */
for (unsigned int i = 0; i < num_dvhs; i++)
@@ -4603,13 +4650,14 @@ encapsulate_for_dv (struct DistanceVector *dv,
path);
GNUNET_free (path);
}
-
+ rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
memcpy (&dhops[num_hops], enc, sizeof (enc));
use (use_cls,
dvh->next_hop,
(const struct GNUNET_MessageHeader *) buf,
options);
}
+ return rtt;
}
@@ -4629,7 +4677,7 @@ send_dv_to_neighbour (void *cls,
enum RouteMessageOptions options)
{
(void) cls;
- route_via_neighbour (next_hop, hdr, options);
+ (void) route_via_neighbour (next_hop, hdr, options);
}
@@ -4642,8 +4690,9 @@ send_dv_to_neighbour (void *cls,
* @param target peer to receive @a hdr
* @param hdr header of the message to route and #GNUNET_free()
* @param options which transmission channels are allowed
+ * @return expected RTT for transmission, #GNUNET_TIME_UNIT_FOREVER_REL if sending failed
*/
-static void
+static struct GNUNET_TIME_Relative
route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
const struct GNUNET_MessageHeader *hdr,
enum RouteMessageOptions options)
@@ -4651,6 +4700,8 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
struct VirtualLink *vl;
struct Neighbour *n;
struct DistanceVector *dv;
+ struct GNUNET_TIME_Relative rtt1;
+ struct GNUNET_TIME_Relative rtt2;
vl = lookup_virtual_link (target);
GNUNET_assert (NULL != vl);
@@ -4675,7 +4726,7 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
"# Messages dropped in routing: no acceptable method",
1,
GNUNET_NO);
- return;
+ return GNUNET_TIME_UNIT_FOREVER_REL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing message of type %u to %s with options %X\n",
@@ -4694,9 +4745,11 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
if ((NULL != n) && (NULL != dv))
options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
enough for redunancy, so clear the flag. */
+ rtt1 = GNUNET_TIME_UNIT_FOREVER_REL;
+ rtt2 = GNUNET_TIME_UNIT_FOREVER_REL;
if (NULL != n)
{
- route_via_neighbour (n, hdr, options);
+ rtt1 = route_via_neighbour (n, hdr, options);
}
if (NULL != dv)
{
@@ -4711,16 +4764,17 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Failed to route message, could not determine DV path\n");
- return;
+ return rtt1;
}
- encapsulate_for_dv (dv,
- res,
- hops,
- hdr,
- &send_dv_to_neighbour,
- NULL,
- options & (~RMO_REDUNDANT));
+ rtt2 = encapsulate_for_dv (dv,
+ res,
+ hops,
+ hdr,
+ &send_dv_to_neighbour,
+ NULL,
+ options & (~RMO_REDUNDANT));
}
+ return GNUNET_TIME_relative_min (rtt1, rtt2);
}
@@ -4728,21 +4782,23 @@ route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
* Something changed on the virtual link with respect to flow
* control. Consider retransmitting the FC window size.
*
- * @param vl virtual link to work with
+ * @param cls a `struct VirtualLink` to work with
*/
static void
-consider_sending_fc (struct VirtualLink *vl)
+consider_sending_fc (void *cls)
{
+ struct VirtualLink *vl = cls;
struct GNUNET_TIME_Absolute monotime;
struct TransportFlowControlMessage fc;
struct GNUNET_TIME_Relative duration;
+ struct GNUNET_TIME_Relative rtt;
duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission);
/* FIXME: decide sane criteria on when to do this, instead of doing
it always! */
/* For example, we should probably ONLY do this if a bit more than
an RTT has passed, or if the window changed "significantly" since
- then. */
+ then. See vl->last_fc_rtt! */
(void) duration;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -4759,7 +4815,24 @@ consider_sending_fc (struct VirtualLink *vl)
fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
- route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE);
+ rtt = route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE);
+ if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
+ {
+ rtt = GNUNET_TIME_UNIT_SECONDS;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "FC retransmission to %s failed, will retry in %s\n",
+ GNUNET_i2s (&vl->target),
+ GNUNET_STRINGS_relative_time_to_string (rtt, GNUNET_YES));
+ vl->last_fc_rtt = GNUNET_TIME_UNIT_ZERO;
+ }
+ else
+ {
+ vl->last_fc_rtt = rtt;
+ }
+ if (NULL != vl->fc_retransmit_task)
+ GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+ vl->fc_retransmit_task =
+ GNUNET_SCHEDULER_add_delayed (rtt, &consider_sending_fc, vl);
}
@@ -8181,12 +8254,27 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
(unsigned long long) vl->outbound_fc_window_size,
(long long) vl->incoming_fc_window_size_loss);
wnd = GNUNET_ntohll (fc->outbound_window_size);
- if (wnd < vl->incoming_fc_window_size)
+ if ((wnd < vl->incoming_fc_window_size) ||
+ (vl->last_outbound_window_size_received != wnd) ||
+ (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX) %
+ FC_NO_CHANGE_REPLY_PROBABILITY))
{
/* Consider re-sending our FC message, as clearly the
other peer's idea of the window is not up-to-date */
consider_sending_fc (vl);
}
+ if ((wnd == vl->incoming_fc_window_size) &&
+ (vl->last_outbound_window_size_received == wnd) &&
+ (NULL != vl->fc_retransmit_task))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Stopping FC retransmission to %s: peer is current at window %llu\n",
+ GNUNET_i2s (&vl->target),
+ (unsigned long long) wnd);
+ GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+ vl->fc_retransmit_task = NULL;
+ }
+ vl->last_outbound_window_size_received = wnd;
/* FC window likely increased, check transmission possibilities! */
check_vl_transmission (vl);
finish_cmc_handling (cmc);