aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-05-21 17:23:38 +0200
committerChristian Grothoff <christian@grothoff.org>2019-05-21 17:23:38 +0200
commit8f49f02287c203bb0c05d9bc7c53f2e92472ec07 (patch)
tree5278e9f8bfa9520b34a70ee732046a10e2c899ac /src/transport/gnunet-service-tng.c
parentc6777519ba8ef594b999209bac79a7f4b37ff30c (diff)
downloadgnunet-8f49f02287c203bb0c05d9bc7c53f2e92472ec07.tar.gz
gnunet-8f49f02287c203bb0c05d9bc7c53f2e92472ec07.zip
implement FC window tracking for incoming messages
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c157
1 files changed, 138 insertions, 19 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 2af699dc1..1e77937e4 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -34,16 +34,6 @@
34 * at the beginning when the limit is zero!) 34 * at the beginning when the limit is zero!)
35 * - Retransmit challenge if it goes unanswered! 35 * - Retransmit challenge if it goes unanswered!
36 * 36 *
37 * for RECEIVING)
38 * - increment incoming_fc_window_size_ram when receiving
39 * incoming packets!
40 * - OR drop if incoming_fc_window_size_ram goes
41 * (significantly?) beyond available_fc_window_size
42 * - decrement incoming_fc_window_size_ram when CORE is done
43 * with incoming packets!
44 * - increment incoming_fc_window_size_used when CORE is done
45 * with incoming packets!
46 *
47 * for DV) 37 * for DV)
48 * - send challenges via DV (when DVH is confirmed *and* we care about 38 * - send challenges via DV (when DVH is confirmed *and* we care about
49 * the target to get window size, or when DVH is unconfirmed (passive 39 * the target to get window size, or when DVH is unconfirmed (passive
@@ -92,7 +82,7 @@
92 * - re-sending challenge response without a challenge when we have 82 * - re-sending challenge response without a challenge when we have
93 * significantly increased the FC window (upon CORE being done with messages) 83 * significantly increased the FC window (upon CORE being done with messages)
94 * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] 84 * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH]
95 * Also can re-use signature in this case [CPU]. 85 * Also can re-use signature in this case [CPU]. Marked with "TODO-M1"
96 * 86 *
97 * Design realizations / discussion: 87 * Design realizations / discussion:
98 * - communicators do flow control by calling MQ "notify sent" 88 * - communicators do flow control by calling MQ "notify sent"
@@ -1118,6 +1108,16 @@ struct PendingMessage;
1118 */ 1108 */
1119struct DistanceVectorHop; 1109struct DistanceVectorHop;
1120 1110
1111/**
1112 * A virtual link is another reachable peer that is known to CORE. It
1113 * can be either a `struct Neighbour` with at least one confirmed
1114 * `struct Queue`, or a `struct DistanceVector` with at least one
1115 * confirmed `struct DistanceVectorHop`. With a virtual link we track
1116 * data that is per neighbour that is not specific to how the
1117 * connectivity is established.
1118 */
1119struct VirtualLink;
1120
1121 1121
1122/** 1122/**
1123 * Context from #handle_incoming_msg(). Closure for many 1123 * Context from #handle_incoming_msg(). Closure for many
@@ -1157,6 +1157,42 @@ struct CommunicatorMessageContext
1157 1157
1158 1158
1159/** 1159/**
1160 * Closure for #core_env_sent_cb.
1161 */
1162struct CoreSentContext
1163{
1164
1165 /**
1166 * Kept in a DLL to clear @e vl in case @e vl is lost.
1167 */
1168 struct CoreSentContext *next;
1169
1170 /**
1171 * Kept in a DLL to clear @e vl in case @e vl is lost.
1172 */
1173 struct CoreSentContext *prev;
1174
1175 /**
1176 * Virtual link this is about.
1177 */
1178 struct VirtualLink *vl;
1179
1180 /**
1181 * How big was the message.
1182 */
1183 uint16_t size;
1184
1185 /**
1186 * By how much should we increment @e vl's
1187 * incoming_fc_window_size_used once we are done sending to CORE?
1188 * Use to ensure we do not increment twice if there is more than one
1189 * CORE client.
1190 */
1191 uint16_t isize;
1192};
1193
1194
1195/**
1160 * A virtual link is another reachable peer that is known to CORE. It 1196 * A virtual link is another reachable peer that is known to CORE. It
1161 * can be either a `struct Neighbour` with at least one confirmed 1197 * can be either a `struct Neighbour` with at least one confirmed
1162 * `struct Queue`, or a `struct DistanceVector` with at least one 1198 * `struct Queue`, or a `struct DistanceVector` with at least one
@@ -1194,6 +1230,16 @@ struct VirtualLink
1194 struct PendingMessage *pending_msg_tail; 1230 struct PendingMessage *pending_msg_tail;
1195 1231
1196 /** 1232 /**
1233 * Kept in a DLL to clear @e vl in case @e vl is lost.
1234 */
1235 struct CoreSentContext *csc_tail;
1236
1237 /**
1238 * Kept in a DLL to clear @e vl in case @e vl is lost.
1239 */
1240 struct CoreSentContext *csc_head;
1241
1242 /**
1197 * Task scheduled to possibly notfiy core that this peer is no 1243 * Task scheduled to possibly notfiy core that this peer is no
1198 * longer counting as confirmed. Runs the #core_visibility_check(), 1244 * longer counting as confirmed. Runs the #core_visibility_check(),
1199 * which checks that some DV-path or a queue exists that is still 1245 * which checks that some DV-path or a queue exists that is still
@@ -1300,7 +1346,10 @@ struct VirtualLink
1300 * we must tell communicators to stop providing us more messages 1346 * we must tell communicators to stop providing us more messages
1301 * for this peer. In fact, the window can go negative as we 1347 * for this peer. In fact, the window can go negative as we
1302 * have multiple communicators, so per communicator we can go 1348 * have multiple communicators, so per communicator we can go
1303 * down by one into the negative range. 1349 * down by one into the negative range. Furthermore, we count
1350 * delivery per CORE client, so if we had multiple cores, that
1351 * might also cause a negative window size here (as one message
1352 * would decrement the window by one per CORE client).
1304 */ 1353 */
1305 int core_recv_window; 1354 int core_recv_window;
1306}; 1355};
@@ -2846,6 +2895,7 @@ static void
2846free_virtual_link (struct VirtualLink *vl) 2895free_virtual_link (struct VirtualLink *vl)
2847{ 2896{
2848 struct PendingMessage *pm; 2897 struct PendingMessage *pm;
2898 struct CoreSentContext *csc;
2849 2899
2850 while (NULL != (pm = vl->pending_msg_head)) 2900 while (NULL != (pm = vl->pending_msg_head))
2851 free_pending_message (pm); 2901 free_pending_message (pm);
@@ -2855,6 +2905,12 @@ free_virtual_link (struct VirtualLink *vl)
2855 GNUNET_SCHEDULER_cancel (vl->visibility_task); 2905 GNUNET_SCHEDULER_cancel (vl->visibility_task);
2856 vl->visibility_task = NULL; 2906 vl->visibility_task = NULL;
2857 } 2907 }
2908 while (NULL != (csc = vl->csc_head))
2909 {
2910 GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc);
2911 GNUNET_assert (vl == csc->vl);
2912 csc->vl = NULL;
2913 }
2858 GNUNET_break (NULL == vl->n); 2914 GNUNET_break (NULL == vl->n);
2859 GNUNET_break (NULL == vl->dv); 2915 GNUNET_break (NULL == vl->dv);
2860 GNUNET_free (vl); 2916 GNUNET_free (vl);
@@ -4951,6 +5007,34 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
4951 5007
4952 5008
4953/** 5009/**
5010 * Function called when we are done giving a message of a certain
5011 * size to CORE and should thus decrement the number of bytes of
5012 * RAM reserved for that peer's MQ.
5013 *
5014 * @param cls a `struct CoreSentContext`
5015 */
5016static void
5017core_env_sent_cb (void *cls)
5018{
5019 struct CoreSentContext *ctx = cls;
5020 struct VirtualLink *vl = ctx->vl;
5021
5022 if (NULL == vl)
5023 {
5024 /* lost the link in the meantime, ignore */
5025 GNUNET_free (ctx);
5026 return;
5027 }
5028 GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx);
5029 GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
5030 vl->incoming_fc_window_size_ram -= ctx->size;
5031 vl->incoming_fc_window_size_used += ctx->isize;
5032 /* TODO-M1 */
5033 GNUNET_free (ctx);
5034}
5035
5036
5037/**
4954 * Communicator gave us an unencapsulated message to pass as-is to 5038 * Communicator gave us an unencapsulated message to pass as-is to
4955 * CORE. Process the request. 5039 * CORE. Process the request.
4956 * 5040 *
@@ -4995,30 +5079,65 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
4995 finish_cmc_handling (cmc); 5079 finish_cmc_handling (cmc);
4996 return; 5080 return;
4997 } 5081 }
5082 if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
5083 {
5084 GNUNET_STATISTICS_update (GST_stats,
5085 "# CORE messages droped (FC arithmetic overflow)",
5086 1,
5087 GNUNET_NO);
5088
5089 finish_cmc_handling (cmc);
5090 return;
5091 }
5092 if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size)
5093 {
5094 GNUNET_STATISTICS_update (GST_stats,
5095 "# CORE messages droped (FC window overflow)",
5096 1,
5097 GNUNET_NO);
5098 finish_cmc_handling (cmc);
5099 return;
5100 }
5101
4998 /* Forward to all CORE clients */ 5102 /* Forward to all CORE clients */
4999 have_core = GNUNET_NO; 5103 have_core = GNUNET_NO;
5000 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) 5104 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
5001 { 5105 {
5002 struct GNUNET_MQ_Envelope *env; 5106 struct GNUNET_MQ_Envelope *env;
5003 struct InboundMessage *im; 5107 struct InboundMessage *im;
5108 struct CoreSentContext *ctx;
5004 5109
5005 if (CT_CORE != tc->type) 5110 if (CT_CORE != tc->type)
5006 continue; 5111 continue;
5007 have_core = GNUNET_YES; 5112 vl->incoming_fc_window_size_ram += size;
5008 env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); 5113 env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
5114 ctx = GNUNET_new (struct CoreSentContext);
5115 ctx->vl = vl;
5116 ctx->size = size;
5117 ctx->isize = (GNUNET_NO == have_core) ? size : 0;
5118 have_core = GNUNET_YES;
5119 GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx);
5120 GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx);
5009 im->peer = cmc->im.sender; 5121 im->peer = cmc->im.sender;
5010 memcpy (&im[1], mh, size); 5122 memcpy (&im[1], mh, size);
5011 GNUNET_MQ_send (tc->mq, env); 5123 GNUNET_MQ_send (tc->mq, env);
5124 vl->core_recv_window--;
5012 } 5125 }
5013 vl->core_recv_window--;
5014 if (GNUNET_NO == have_core) 5126 if (GNUNET_NO == have_core)
5127 {
5015 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 5128 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
5016 "Dropped message to CORE: no CORE client connected!\n"); 5129 "Dropped message to CORE: no CORE client connected!\n");
5017 else 5130 /* Nevertheless, count window as used, as it is from the
5018 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5131 perspective of the other peer! */
5019 "Delivered message from %s of type %u to CORE\n", 5132 vl->incoming_fc_window_size_used += size;
5020 GNUNET_i2s (&cmc->im.sender), 5133 /* TODO-M1 */
5021 ntohs (mh->type)); 5134 finish_cmc_handling (cmc);
5135 return;
5136 }
5137 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5138 "Delivered message from %s of type %u to CORE\n",
5139 GNUNET_i2s (&cmc->im.sender),
5140 ntohs (mh->type));
5022 if (vl->core_recv_window > 0) 5141 if (vl->core_recv_window > 0)
5023 { 5142 {
5024 finish_cmc_handling (cmc); 5143 finish_cmc_handling (cmc);