diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-05-21 17:23:38 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-05-21 17:23:38 +0200 |
commit | 8f49f02287c203bb0c05d9bc7c53f2e92472ec07 (patch) | |
tree | 5278e9f8bfa9520b34a70ee732046a10e2c899ac /src | |
parent | c6777519ba8ef594b999209bac79a7f4b37ff30c (diff) | |
download | gnunet-8f49f02287c203bb0c05d9bc7c53f2e92472ec07.tar.gz gnunet-8f49f02287c203bb0c05d9bc7c53f2e92472ec07.zip |
implement FC window tracking for incoming messages
Diffstat (limited to 'src')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 157 |
1 files changed, 138 insertions, 19 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 2af699dc1..1e77937e4 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -34,16 +34,6 @@ | |||
34 | * at the beginning when the limit is zero!) | 34 | * at the beginning when the limit is zero!) |
35 | * - Retransmit challenge if it goes unanswered! | 35 | * - Retransmit challenge if it goes unanswered! |
36 | * | 36 | * |
37 | * for RECEIVING) | ||
38 | * - increment incoming_fc_window_size_ram when receiving | ||
39 | * incoming packets! | ||
40 | * - OR drop if incoming_fc_window_size_ram goes | ||
41 | * (significantly?) beyond available_fc_window_size | ||
42 | * - decrement incoming_fc_window_size_ram when CORE is done | ||
43 | * with incoming packets! | ||
44 | * - increment incoming_fc_window_size_used when CORE is done | ||
45 | * with incoming packets! | ||
46 | * | ||
47 | * for DV) | 37 | * for DV) |
48 | * - send challenges via DV (when DVH is confirmed *and* we care about | 38 | * - send challenges via DV (when DVH is confirmed *and* we care about |
49 | * the target to get window size, or when DVH is unconfirmed (passive | 39 | * the target to get window size, or when DVH is unconfirmed (passive |
@@ -92,7 +82,7 @@ | |||
92 | * - re-sending challenge response without a challenge when we have | 82 | * - re-sending challenge response without a challenge when we have |
93 | * significantly increased the FC window (upon CORE being done with messages) | 83 | * significantly increased the FC window (upon CORE being done with messages) |
94 | * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] | 84 | * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH] |
95 | * Also can re-use signature in this case [CPU]. | 85 | * Also can re-use signature in this case [CPU]. Marked with "TODO-M1" |
96 | * | 86 | * |
97 | * Design realizations / discussion: | 87 | * Design realizations / discussion: |
98 | * - communicators do flow control by calling MQ "notify sent" | 88 | * - communicators do flow control by calling MQ "notify sent" |
@@ -1118,6 +1108,16 @@ struct PendingMessage; | |||
1118 | */ | 1108 | */ |
1119 | struct DistanceVectorHop; | 1109 | struct DistanceVectorHop; |
1120 | 1110 | ||
1111 | /** | ||
1112 | * A virtual link is another reachable peer that is known to CORE. It | ||
1113 | * can be either a `struct Neighbour` with at least one confirmed | ||
1114 | * `struct Queue`, or a `struct DistanceVector` with at least one | ||
1115 | * confirmed `struct DistanceVectorHop`. With a virtual link we track | ||
1116 | * data that is per neighbour that is not specific to how the | ||
1117 | * connectivity is established. | ||
1118 | */ | ||
1119 | struct VirtualLink; | ||
1120 | |||
1121 | 1121 | ||
1122 | /** | 1122 | /** |
1123 | * Context from #handle_incoming_msg(). Closure for many | 1123 | * Context from #handle_incoming_msg(). Closure for many |
@@ -1157,6 +1157,42 @@ struct CommunicatorMessageContext | |||
1157 | 1157 | ||
1158 | 1158 | ||
1159 | /** | 1159 | /** |
1160 | * Closure for #core_env_sent_cb. | ||
1161 | */ | ||
1162 | struct CoreSentContext | ||
1163 | { | ||
1164 | |||
1165 | /** | ||
1166 | * Kept in a DLL to clear @e vl in case @e vl is lost. | ||
1167 | */ | ||
1168 | struct CoreSentContext *next; | ||
1169 | |||
1170 | /** | ||
1171 | * Kept in a DLL to clear @e vl in case @e vl is lost. | ||
1172 | */ | ||
1173 | struct CoreSentContext *prev; | ||
1174 | |||
1175 | /** | ||
1176 | * Virtual link this is about. | ||
1177 | */ | ||
1178 | struct VirtualLink *vl; | ||
1179 | |||
1180 | /** | ||
1181 | * How big was the message. | ||
1182 | */ | ||
1183 | uint16_t size; | ||
1184 | |||
1185 | /** | ||
1186 | * By how much should we increment @e vl's | ||
1187 | * incoming_fc_window_size_used once we are done sending to CORE? | ||
1188 | * Use to ensure we do not increment twice if there is more than one | ||
1189 | * CORE client. | ||
1190 | */ | ||
1191 | uint16_t isize; | ||
1192 | }; | ||
1193 | |||
1194 | |||
1195 | /** | ||
1160 | * A virtual link is another reachable peer that is known to CORE. It | 1196 | * A virtual link is another reachable peer that is known to CORE. It |
1161 | * can be either a `struct Neighbour` with at least one confirmed | 1197 | * can be either a `struct Neighbour` with at least one confirmed |
1162 | * `struct Queue`, or a `struct DistanceVector` with at least one | 1198 | * `struct Queue`, or a `struct DistanceVector` with at least one |
@@ -1194,6 +1230,16 @@ struct VirtualLink | |||
1194 | struct PendingMessage *pending_msg_tail; | 1230 | struct PendingMessage *pending_msg_tail; |
1195 | 1231 | ||
1196 | /** | 1232 | /** |
1233 | * Kept in a DLL to clear @e vl in case @e vl is lost. | ||
1234 | */ | ||
1235 | struct CoreSentContext *csc_tail; | ||
1236 | |||
1237 | /** | ||
1238 | * Kept in a DLL to clear @e vl in case @e vl is lost. | ||
1239 | */ | ||
1240 | struct CoreSentContext *csc_head; | ||
1241 | |||
1242 | /** | ||
1197 | * Task scheduled to possibly notfiy core that this peer is no | 1243 | * Task scheduled to possibly notfiy core that this peer is no |
1198 | * longer counting as confirmed. Runs the #core_visibility_check(), | 1244 | * longer counting as confirmed. Runs the #core_visibility_check(), |
1199 | * which checks that some DV-path or a queue exists that is still | 1245 | * which checks that some DV-path or a queue exists that is still |
@@ -1300,7 +1346,10 @@ struct VirtualLink | |||
1300 | * we must tell communicators to stop providing us more messages | 1346 | * we must tell communicators to stop providing us more messages |
1301 | * for this peer. In fact, the window can go negative as we | 1347 | * for this peer. In fact, the window can go negative as we |
1302 | * have multiple communicators, so per communicator we can go | 1348 | * have multiple communicators, so per communicator we can go |
1303 | * down by one into the negative range. | 1349 | * down by one into the negative range. Furthermore, we count |
1350 | * delivery per CORE client, so if we had multiple cores, that | ||
1351 | * might also cause a negative window size here (as one message | ||
1352 | * would decrement the window by one per CORE client). | ||
1304 | */ | 1353 | */ |
1305 | int core_recv_window; | 1354 | int core_recv_window; |
1306 | }; | 1355 | }; |
@@ -2846,6 +2895,7 @@ static void | |||
2846 | free_virtual_link (struct VirtualLink *vl) | 2895 | free_virtual_link (struct VirtualLink *vl) |
2847 | { | 2896 | { |
2848 | struct PendingMessage *pm; | 2897 | struct PendingMessage *pm; |
2898 | struct CoreSentContext *csc; | ||
2849 | 2899 | ||
2850 | while (NULL != (pm = vl->pending_msg_head)) | 2900 | while (NULL != (pm = vl->pending_msg_head)) |
2851 | free_pending_message (pm); | 2901 | free_pending_message (pm); |
@@ -2855,6 +2905,12 @@ free_virtual_link (struct VirtualLink *vl) | |||
2855 | GNUNET_SCHEDULER_cancel (vl->visibility_task); | 2905 | GNUNET_SCHEDULER_cancel (vl->visibility_task); |
2856 | vl->visibility_task = NULL; | 2906 | vl->visibility_task = NULL; |
2857 | } | 2907 | } |
2908 | while (NULL != (csc = vl->csc_head)) | ||
2909 | { | ||
2910 | GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, csc); | ||
2911 | GNUNET_assert (vl == csc->vl); | ||
2912 | csc->vl = NULL; | ||
2913 | } | ||
2858 | GNUNET_break (NULL == vl->n); | 2914 | GNUNET_break (NULL == vl->n); |
2859 | GNUNET_break (NULL == vl->dv); | 2915 | GNUNET_break (NULL == vl->dv); |
2860 | GNUNET_free (vl); | 2916 | GNUNET_free (vl); |
@@ -4951,6 +5007,34 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, | |||
4951 | 5007 | ||
4952 | 5008 | ||
4953 | /** | 5009 | /** |
5010 | * Function called when we are done giving a message of a certain | ||
5011 | * size to CORE and should thus decrement the number of bytes of | ||
5012 | * RAM reserved for that peer's MQ. | ||
5013 | * | ||
5014 | * @param cls a `struct CoreSentContext` | ||
5015 | */ | ||
5016 | static void | ||
5017 | core_env_sent_cb (void *cls) | ||
5018 | { | ||
5019 | struct CoreSentContext *ctx = cls; | ||
5020 | struct VirtualLink *vl = ctx->vl; | ||
5021 | |||
5022 | if (NULL == vl) | ||
5023 | { | ||
5024 | /* lost the link in the meantime, ignore */ | ||
5025 | GNUNET_free (ctx); | ||
5026 | return; | ||
5027 | } | ||
5028 | GNUNET_CONTAINER_DLL_remove (vl->csc_head, vl->csc_tail, ctx); | ||
5029 | GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size); | ||
5030 | vl->incoming_fc_window_size_ram -= ctx->size; | ||
5031 | vl->incoming_fc_window_size_used += ctx->isize; | ||
5032 | /* TODO-M1 */ | ||
5033 | GNUNET_free (ctx); | ||
5034 | } | ||
5035 | |||
5036 | |||
5037 | /** | ||
4954 | * Communicator gave us an unencapsulated message to pass as-is to | 5038 | * Communicator gave us an unencapsulated message to pass as-is to |
4955 | * CORE. Process the request. | 5039 | * CORE. Process the request. |
4956 | * | 5040 | * |
@@ -4995,30 +5079,65 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
4995 | finish_cmc_handling (cmc); | 5079 | finish_cmc_handling (cmc); |
4996 | return; | 5080 | return; |
4997 | } | 5081 | } |
5082 | if (vl->incoming_fc_window_size_ram > UINT_MAX - size) | ||
5083 | { | ||
5084 | GNUNET_STATISTICS_update (GST_stats, | ||
5085 | "# CORE messages droped (FC arithmetic overflow)", | ||
5086 | 1, | ||
5087 | GNUNET_NO); | ||
5088 | |||
5089 | finish_cmc_handling (cmc); | ||
5090 | return; | ||
5091 | } | ||
5092 | if (vl->incoming_fc_window_size_ram + size > vl->available_fc_window_size) | ||
5093 | { | ||
5094 | GNUNET_STATISTICS_update (GST_stats, | ||
5095 | "# CORE messages droped (FC window overflow)", | ||
5096 | 1, | ||
5097 | GNUNET_NO); | ||
5098 | finish_cmc_handling (cmc); | ||
5099 | return; | ||
5100 | } | ||
5101 | |||
4998 | /* Forward to all CORE clients */ | 5102 | /* Forward to all CORE clients */ |
4999 | have_core = GNUNET_NO; | 5103 | have_core = GNUNET_NO; |
5000 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) | 5104 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) |
5001 | { | 5105 | { |
5002 | struct GNUNET_MQ_Envelope *env; | 5106 | struct GNUNET_MQ_Envelope *env; |
5003 | struct InboundMessage *im; | 5107 | struct InboundMessage *im; |
5108 | struct CoreSentContext *ctx; | ||
5004 | 5109 | ||
5005 | if (CT_CORE != tc->type) | 5110 | if (CT_CORE != tc->type) |
5006 | continue; | 5111 | continue; |
5007 | have_core = GNUNET_YES; | 5112 | vl->incoming_fc_window_size_ram += size; |
5008 | env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); | 5113 | env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); |
5114 | ctx = GNUNET_new (struct CoreSentContext); | ||
5115 | ctx->vl = vl; | ||
5116 | ctx->size = size; | ||
5117 | ctx->isize = (GNUNET_NO == have_core) ? size : 0; | ||
5118 | have_core = GNUNET_YES; | ||
5119 | GNUNET_CONTAINER_DLL_insert (vl->csc_head, vl->csc_tail, ctx); | ||
5120 | GNUNET_MQ_notify_sent (env, &core_env_sent_cb, ctx); | ||
5009 | im->peer = cmc->im.sender; | 5121 | im->peer = cmc->im.sender; |
5010 | memcpy (&im[1], mh, size); | 5122 | memcpy (&im[1], mh, size); |
5011 | GNUNET_MQ_send (tc->mq, env); | 5123 | GNUNET_MQ_send (tc->mq, env); |
5124 | vl->core_recv_window--; | ||
5012 | } | 5125 | } |
5013 | vl->core_recv_window--; | ||
5014 | if (GNUNET_NO == have_core) | 5126 | if (GNUNET_NO == have_core) |
5127 | { | ||
5015 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 5128 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
5016 | "Dropped message to CORE: no CORE client connected!\n"); | 5129 | "Dropped message to CORE: no CORE client connected!\n"); |
5017 | else | 5130 | /* Nevertheless, count window as used, as it is from the |
5018 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 5131 | perspective of the other peer! */ |
5019 | "Delivered message from %s of type %u to CORE\n", | 5132 | vl->incoming_fc_window_size_used += size; |
5020 | GNUNET_i2s (&cmc->im.sender), | 5133 | /* TODO-M1 */ |
5021 | ntohs (mh->type)); | 5134 | finish_cmc_handling (cmc); |
5135 | return; | ||
5136 | } | ||
5137 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
5138 | "Delivered message from %s of type %u to CORE\n", | ||
5139 | GNUNET_i2s (&cmc->im.sender), | ||
5140 | ntohs (mh->type)); | ||
5022 | if (vl->core_recv_window > 0) | 5141 | if (vl->core_recv_window > 0) |
5023 | { | 5142 | { |
5024 | finish_cmc_handling (cmc); | 5143 | finish_cmc_handling (cmc); |