From 8f49f02287c203bb0c05d9bc7c53f2e92472ec07 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 21 May 2019 17:23:38 +0200 Subject: implement FC window tracking for incoming messages --- src/transport/gnunet-service-tng.c | 157 ++++++++++++++++++++++++++++++++----- 1 file changed, 138 insertions(+), 19 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 2af699dc1..1e77937e4 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -34,16 +34,6 @@ * at the beginning when the limit is zero!) * - Retransmit challenge if it goes unanswered! * - * for RECEIVING) - * - increment incoming_fc_window_size_ram when receiving - * incoming packets! - * - OR drop if incoming_fc_window_size_ram goes - * (significantly?) beyond available_fc_window_size - * - decrement incoming_fc_window_size_ram when CORE is done - * with incoming packets! - * - increment incoming_fc_window_size_used when CORE is done - * with incoming packets! - * * for DV) * - send challenges via DV (when DVH is confirmed *and* we care about * the target to get window size, or when DVH is unconfirmed (passive @@ -92,7 +82,7 @@ * - re-sending challenge response without a challenge when we have * significantly increased the FC window (upon CORE being done with messages) * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] - * Also can re-use signature in this case [CPU]. + * Also can re-use signature in this case [CPU]. Marked with "TODO-M1" * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" @@ -1118,6 +1108,16 @@ struct PendingMessage; */ struct DistanceVectorHop; +/** + * A virtual link is another reachable peer that is known to CORE. It + * can be either a `struct Neighbour` with at least one confirmed + * `struct Queue`, or a `struct DistanceVector` with at least one + * confirmed `struct DistanceVectorHop`. With a virtual link we track + * data that is per neighbour that is not specific to how the + * connectivity is established. + */ +struct VirtualLink; + /** * Context from #handle_incoming_msg(). Closure for many @@ -1156,6 +1156,42 @@ struct CommunicatorMessageContext }; +/** + * Closure for #core_env_sent_cb. + */ +struct CoreSentContext +{ + + /** + * Kept in a DLL to clear @e vl in case @e vl is lost. + */ + struct CoreSentContext *next; + + /** + * Kept in a DLL to clear @e vl in case @e vl is lost. + */ + struct CoreSentContext *prev; + + /** + * Virtual link this is about. + */ + struct VirtualLink *vl; + + /** + * How big was the message. + */ + uint16_t size; + + /** + * By how much should we increment @e vl's + * incoming_fc_window_size_used once we are done sending to CORE? + * Use to ensure we do not increment twice if there is more than one + * CORE client. + */ + uint16_t isize; +}; + + /** * A virtual link is another reachable peer that is known to CORE. It * can be either a `struct Neighbour` with at least one confirmed @@ -1193,6 +1229,16 @@ struct VirtualLink */ struct PendingMessage *pending_msg_tail; + /** + * Kept in a DLL to clear @e vl in case @e vl is lost. + */ + struct CoreSentContext *csc_tail; + + /** + * Kept in a DLL to clear @e vl in case @e vl is lost. + */ + struct CoreSentContext *csc_head; + /** * Task scheduled to possibly notfiy core that this peer is no * longer counting as confirmed. Runs the #core_visibility_check(), @@ -1300,7 +1346,10 @@ struct VirtualLink * we must tell communicators to stop providing us more messages * for this peer. In fact, the window can go negative as we * have multiple communicators, so per communicator we can go - * down by one into the negative range. + * down by one into the negative range. Furthermore, we count + * delivery per CORE client, so if we had multiple cores, that + * might also cause a negative window size here (as one message + * would decrement the window by one per CORE client). */ int core_recv_window; }; @@ -2846,6 +2895,7 @@ static void free_virtual_link (struct VirtualLink *vl) { struct PendingMessage *pm; + struct CoreSentContext *csc; while (NULL != (pm = vl->pending_msg_head)) free_pending_message (pm); @@ -2855,6 +2905,12 @@ free_virtual_link (struct VirtualLink *vl) GNUNET_SCHEDULER_cancel (vl->visibility_task); vl->visibility_task = NULL; } + while (NULL != (csc = vl->csc_head)) + { + GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc); + GNUNET_assert (vl == csc->vl); + csc->vl = NULL; + } GNUNET_break (NULL == vl->n); GNUNET_break (NULL == vl->dv); GNUNET_free (vl); @@ -4950,6 +5006,34 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, const struct GNUNET_MessageHeader *msg); +/** + * Function called when we are done giving a message of a certain + * size to CORE and should thus decrement the number of bytes of + * RAM reserved for that peer's MQ. + * + * @param cls a `struct CoreSentContext` + */ +static void +core_env_sent_cb (void *cls) +{ + struct CoreSentContext *ctx = cls; + struct VirtualLink *vl = ctx->vl; + + if (NULL == vl) + { + /* lost the link in the meantime, ignore */ + GNUNET_free (ctx); + return; + } + GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx); + GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size); + vl->incoming_fc_window_size_ram -= ctx->size; + vl->incoming_fc_window_size_used += ctx->isize; + /* TODO-M1 */ + GNUNET_free (ctx); +} + + /** * Communicator gave us an unencapsulated message to pass as-is to * CORE. Process the request. @@ -4995,30 +5079,65 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) finish_cmc_handling (cmc); return; } + if (vl->incoming_fc_window_size_ram > UINT_MAX - size) + { + GNUNET_STATISTICS_update (GST_stats, + "# CORE messages droped (FC arithmetic overflow)", + 1, + GNUNET_NO); + + finish_cmc_handling (cmc); + return; + } + if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size) + { + GNUNET_STATISTICS_update (GST_stats, + "# CORE messages droped (FC window overflow)", + 1, + GNUNET_NO); + finish_cmc_handling (cmc); + return; + } + /* Forward to all CORE clients */ have_core = GNUNET_NO; for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) { struct GNUNET_MQ_Envelope *env; struct InboundMessage *im; + struct CoreSentContext *ctx; if (CT_CORE != tc->type) continue; - have_core = GNUNET_YES; + vl->incoming_fc_window_size_ram += size; env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + ctx = GNUNET_new (struct CoreSentContext); + ctx->vl = vl; + ctx->size = size; + ctx->isize = (GNUNET_NO == have_core) ? size : 0; + have_core = GNUNET_YES; + GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx); + GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx); im->peer = cmc->im.sender; memcpy (&im[1], mh, size); GNUNET_MQ_send (tc->mq, env); + vl->core_recv_window--; } - vl->core_recv_window--; if (GNUNET_NO == have_core) + { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Dropped message to CORE: no CORE client connected!\n"); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Delivered message from %s of type %u to CORE\n", - GNUNET_i2s (&cmc->im.sender), - ntohs (mh->type)); + /* Nevertheless, count window as used, as it is from the + perspective of the other peer! */ + vl->incoming_fc_window_size_used += size; + /* TODO-M1 */ + finish_cmc_handling (cmc); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Delivered message from %s of type %u to CORE\n", + GNUNET_i2s (&cmc->im.sender), + ntohs (mh->type)); if (vl->core_recv_window > 0) { finish_cmc_handling (cmc); -- cgit v1.2.3