summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-05-21 17:23:38 +0200
committerChristian Grothoff <christian@grothoff.org>2019-05-21 17:23:38 +0200
commit8f49f02287c203bb0c05d9bc7c53f2e92472ec07 (patch)
tree5278e9f8bfa9520b34a70ee732046a10e2c899ac /src/transport/gnunet-service-tng.c
parentc6777519ba8ef594b999209bac79a7f4b37ff30c (diff)
implement FC window tracking for incoming messages
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c157
1 files changed, 138 insertions, 19 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 2af699dc1..1e77937e4 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -34,16 +34,6 @@
* at the beginning when the limit is zero!)
* - Retransmit challenge if it goes unanswered!
*
- * for RECEIVING)
- * - increment incoming_fc_window_size_ram when receiving
- * incoming packets!
- * - OR drop if incoming_fc_window_size_ram goes
- * (significantly?) beyond available_fc_window_size
- * - decrement incoming_fc_window_size_ram when CORE is done
- * with incoming packets!
- * - increment incoming_fc_window_size_used when CORE is done
- * with incoming packets!
- *
* for DV)
* - send challenges via DV (when DVH is confirmed *and* we care about
* the target to get window size, or when DVH is unconfirmed (passive
@@ -92,7 +82,7 @@
* - re-sending challenge response without a challenge when we have
* significantly increased the FC window (upon CORE being done with messages)
* so as to avoid the sender having to give us a fresh challenge [BANDWIDTH]
- * Also can re-use signature in this case [CPU].
+ * Also can re-use signature in this case [CPU]. Marked with "TODO-M1"
*
* Design realizations / discussion:
* - communicators do flow control by calling MQ "notify sent"
@@ -1118,6 +1108,16 @@ struct PendingMessage;
*/
struct DistanceVectorHop;
+/**
+ * A virtual link is another reachable peer that is known to CORE. It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`. With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink;
+
/**
* Context from #handle_incoming_msg(). Closure for many
@@ -1157,6 +1157,42 @@ struct CommunicatorMessageContext
/**
+ * Closure for #core_env_sent_cb.
+ */
+struct CoreSentContext
+{
+
+ /**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *next;
+
+ /**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *prev;
+
+ /**
+ * Virtual link this is about.
+ */
+ struct VirtualLink *vl;
+
+ /**
+ * How big was the message.
+ */
+ uint16_t size;
+
+ /**
+ * By how much should we increment @e vl's
+ * incoming_fc_window_size_used once we are done sending to CORE?
+ * Use to ensure we do not increment twice if there is more than one
+ * CORE client.
+ */
+ uint16_t isize;
+};
+
+
+/**
* A virtual link is another reachable peer that is known to CORE. It
* can be either a `struct Neighbour` with at least one confirmed
* `struct Queue`, or a `struct DistanceVector` with at least one
@@ -1194,6 +1230,16 @@ struct VirtualLink
struct PendingMessage *pending_msg_tail;
/**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *csc_tail;
+
+ /**
+ * Kept in a DLL to clear @e vl in case @e vl is lost.
+ */
+ struct CoreSentContext *csc_head;
+
+ /**
* Task scheduled to possibly notfiy core that this peer is no
* longer counting as confirmed. Runs the #core_visibility_check(),
* which checks that some DV-path or a queue exists that is still
@@ -1300,7 +1346,10 @@ struct VirtualLink
* we must tell communicators to stop providing us more messages
* for this peer. In fact, the window can go negative as we
* have multiple communicators, so per communicator we can go
- * down by one into the negative range.
+ * down by one into the negative range. Furthermore, we count
+ * delivery per CORE client, so if we had multiple cores, that
+ * might also cause a negative window size here (as one message
+ * would decrement the window by one per CORE client).
*/
int core_recv_window;
};
@@ -2846,6 +2895,7 @@ static void
free_virtual_link (struct VirtualLink *vl)
{
struct PendingMessage *pm;
+ struct CoreSentContext *csc;
while (NULL != (pm = vl->pending_msg_head))
free_pending_message (pm);
@@ -2855,6 +2905,12 @@ free_virtual_link (struct VirtualLink *vl)
GNUNET_SCHEDULER_cancel (vl->visibility_task);
vl->visibility_task = NULL;
}
+ while (NULL != (csc = vl->csc_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc);
+ GNUNET_assert (vl == csc->vl);
+ csc->vl = NULL;
+ }
GNUNET_break (NULL == vl->n);
GNUNET_break (NULL == vl->dv);
GNUNET_free (vl);
@@ -4951,6 +5007,34 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
/**
+ * Function called when we are done giving a message of a certain
+ * size to CORE and should thus decrement the number of bytes of
+ * RAM reserved for that peer's MQ.
+ *
+ * @param cls a `struct CoreSentContext`
+ */
+static void
+core_env_sent_cb (void *cls)
+{
+ struct CoreSentContext *ctx = cls;
+ struct VirtualLink *vl = ctx->vl;
+
+ if (NULL == vl)
+ {
+ /* lost the link in the meantime, ignore */
+ GNUNET_free (ctx);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx);
+ GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
+ vl->incoming_fc_window_size_ram -= ctx->size;
+ vl->incoming_fc_window_size_used += ctx->isize;
+ /* TODO-M1 */
+ GNUNET_free (ctx);
+}
+
+
+/**
* Communicator gave us an unencapsulated message to pass as-is to
* CORE. Process the request.
*
@@ -4995,30 +5079,65 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
finish_cmc_handling (cmc);
return;
}
+ if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# CORE messages droped (FC arithmetic overflow)",
+ 1,
+ GNUNET_NO);
+
+ finish_cmc_handling (cmc);
+ return;
+ }
+ if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# CORE messages droped (FC window overflow)",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
+ }
+
/* Forward to all CORE clients */
have_core = GNUNET_NO;
for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
{
struct GNUNET_MQ_Envelope *env;
struct InboundMessage *im;
+ struct CoreSentContext *ctx;
if (CT_CORE != tc->type)
continue;
- have_core = GNUNET_YES;
+ vl->incoming_fc_window_size_ram += size;
env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ ctx = GNUNET_new (struct CoreSentContext);
+ ctx->vl = vl;
+ ctx->size = size;
+ ctx->isize = (GNUNET_NO == have_core) ? size : 0;
+ have_core = GNUNET_YES;
+ GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx);
+ GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx);
im->peer = cmc->im.sender;
memcpy (&im[1], mh, size);
GNUNET_MQ_send (tc->mq, env);
+ vl->core_recv_window--;
}
- vl->core_recv_window--;
if (GNUNET_NO == have_core)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Dropped message to CORE: no CORE client connected!\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Delivered message from %s of type %u to CORE\n",
- GNUNET_i2s (&cmc->im.sender),
- ntohs (mh->type));
+ /* Nevertheless, count window as used, as it is from the
+ perspective of the other peer! */
+ vl->incoming_fc_window_size_used += size;
+ /* TODO-M1 */
+ finish_cmc_handling (cmc);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Delivered message from %s of type %u to CORE\n",
+ GNUNET_i2s (&cmc->im.sender),
+ ntohs (mh->type));
if (vl->core_recv_window > 0)
{
finish_cmc_handling (cmc);