aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/fragmentation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fragmentation/fragmentation.c')
-rw-r--r--src/fragmentation/fragmentation.c527
1 files changed, 0 insertions, 527 deletions
diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c
deleted file mode 100644
index b35ccc100..000000000
--- a/src/fragmentation/fragmentation.c
+++ /dev/null
@@ -1,527 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2009-2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file src/fragmentation/fragmentation.c
22 * @brief library to help fragment messages
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_protocols.h"
27#include "fragmentation.h"
28
29
30/**
31 * Absolute minimum delay we impose between sending and expecting ACK to arrive.
32 */
33#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply ( \
34 GNUNET_TIME_UNIT_MILLISECONDS, 1)
35
36
37/**
38 * Fragmentation context.
39 */
40struct GNUNET_FRAGMENT_Context
41{
42 /**
43 * Statistics to use.
44 */
45 struct GNUNET_STATISTICS_Handle *stats;
46
47 /**
48 * Tracker for flow control.
49 */
50 struct GNUNET_BANDWIDTH_Tracker *tracker;
51
52 /**
53 * Current expected delay for ACKs.
54 */
55 struct GNUNET_TIME_Relative ack_delay;
56
57 /**
58 * Current expected delay between messages.
59 */
60 struct GNUNET_TIME_Relative msg_delay;
61
62 /**
63 * Next allowed transmission time.
64 */
65 struct GNUNET_TIME_Absolute delay_until;
66
67 /**
68 * Time we transmitted the last message of the last round.
69 */
70 struct GNUNET_TIME_Absolute last_round;
71
72 /**
73 * Message to fragment (allocated at the end of this struct).
74 */
75 const struct GNUNET_MessageHeader *msg;
76
77 /**
78 * Function to call for transmissions.
79 */
80 GNUNET_FRAGMENT_MessageProcessor proc;
81
82 /**
83 * Closure for @e proc.
84 */
85 void *proc_cls;
86
87 /**
88 * Bitfield, set to 1 for each unacknowledged fragment.
89 */
90 uint64_t acks;
91
92 /**
93 * Bitfield with all possible bits for @e acks (used to mask the
94 * ack we get back).
95 */
96 uint64_t acks_mask;
97
98 /**
99 * Task performing work for the fragmenter.
100 */
101 struct GNUNET_SCHEDULER_Task *task;
102
103 /**
104 * Our fragmentation ID. (chosen at random)
105 */
106 uint32_t fragment_id;
107
108 /**
109 * Round-robin selector for the next transmission.
110 */
111 unsigned int next_transmission;
112
113 /**
114 * How many rounds of transmission have we completed so far?
115 */
116 unsigned int num_rounds;
117
118 /**
119 * How many transmission have we completed in this round?
120 */
121 unsigned int num_transmissions;
122
123 /**
124 * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
125 */
126 int8_t proc_busy;
127
128 /**
129 * #GNUNET_YES if we are waiting for an ACK.
130 */
131 int8_t wack;
132
133 /**
134 * Target fragment size.
135 */
136 uint16_t mtu;
137};
138
139
140/**
141 * Convert an ACK message to a printable format suitable for logging.
142 *
143 * @param ack message to print
144 * @return ack in human-readable format
145 */
146const char *
147GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
148{
149 static char buf[128];
150 const struct FragmentAcknowledgement *fa;
151
152 if (sizeof(struct FragmentAcknowledgement) !=
153 htons (ack->size))
154 return "<malformed ack>";
155 fa = (const struct FragmentAcknowledgement *) ack;
156 GNUNET_snprintf (buf,
157 sizeof(buf),
158 "%u-%llX",
159 ntohl (fa->fragment_id),
160 (unsigned long long) GNUNET_ntohll (fa->bits));
161 return buf;
162}
163
164
165/**
166 * Transmit the next fragment to the other peer.
167 *
168 * @param cls the `struct GNUNET_FRAGMENT_Context`
169 */
170static void
171transmit_next (void *cls)
172{
173 struct GNUNET_FRAGMENT_Context *fc = cls;
174 char msg[fc->mtu];
175 const char *mbuf;
176 struct FragmentHeader *fh;
177 struct GNUNET_TIME_Relative delay;
178 unsigned int bit;
179 size_t size;
180 size_t fsize;
181 int wrap;
182
183 fc->task = NULL;
184 GNUNET_assert (GNUNET_NO == fc->proc_busy);
185 if (0 == fc->acks)
186 return; /* all done */
187 /* calculate delay */
188 wrap = 0;
189 while (0 == (fc->acks & (1LLU << fc->next_transmission)))
190 {
191 fc->next_transmission = (fc->next_transmission + 1) % 64;
192 wrap |= (0 == fc->next_transmission);
193 }
194 bit = fc->next_transmission;
195 size = ntohs (fc->msg->size);
196 if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
197 fsize =
198 (size % (fc->mtu - sizeof(struct FragmentHeader)))
199 + sizeof(struct FragmentHeader);
200 else
201 fsize = fc->mtu;
202 if (NULL != fc->tracker)
203 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
204 fsize);
205 else
206 delay = GNUNET_TIME_UNIT_ZERO;
207 if (delay.rel_value_us > 0)
208 {
209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
210 "Fragmentation logic delays transmission of next fragment by %s\n",
211 GNUNET_STRINGS_relative_time_to_string (delay,
212 GNUNET_YES));
213 fc->task = GNUNET_SCHEDULER_add_delayed (delay,
214 &transmit_next,
215 fc);
216 return;
217 }
218 fc->next_transmission = (fc->next_transmission + 1) % 64;
219 wrap |= (0 == fc->next_transmission);
220 while (0 == (fc->acks & (1LLU << fc->next_transmission)))
221 {
222 fc->next_transmission = (fc->next_transmission + 1) % 64;
223 wrap |= (0 == fc->next_transmission);
224 }
225
226 /* assemble fragmentation message */
227 mbuf = (const char *) &fc[1];
228 fh = (struct FragmentHeader *) msg;
229 fh->header.size = htons (fsize);
230 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
231 fh->fragment_id = htonl (fc->fragment_id);
232 fh->total_size = fc->msg->size; /* already in big-endian */
233 fh->offset = htons ((fc->mtu - sizeof(struct FragmentHeader)) * bit);
234 GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof(struct FragmentHeader))],
235 fsize - sizeof(struct FragmentHeader));
236 if (NULL != fc->tracker)
237 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
238 GNUNET_STATISTICS_update (fc->stats,
239 _ ("# fragments transmitted"),
240 1,
241 GNUNET_NO);
242 if (0 != fc->last_round.abs_value_us)
243 GNUNET_STATISTICS_update (fc->stats,
244 _ ("# fragments retransmitted"),
245 1,
246 GNUNET_NO);
247
248 /* select next message to calculate delay */
249 bit = fc->next_transmission;
250 size = ntohs (fc->msg->size);
251 if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
252 fsize = size % (fc->mtu - sizeof(struct FragmentHeader));
253 else
254 fsize = fc->mtu;
255 if (NULL != fc->tracker)
256 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
257 fsize);
258 else
259 delay = GNUNET_TIME_UNIT_ZERO;
260 if (fc->num_rounds < 64)
261 delay = GNUNET_TIME_relative_max (delay,
262 GNUNET_TIME_relative_saturating_multiply
263 (fc->msg_delay,
264 (1ULL << fc->num_rounds)));
265 else
266 delay = GNUNET_TIME_UNIT_FOREVER_REL;
267 if (wrap)
268 {
269 /* full round transmitted wait 2x delay for ACK before going again */
270 fc->num_rounds++;
271 delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
272 /* never use zero, need some time for ACK always */
273 delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
274 fc->wack = GNUNET_YES;
275 fc->last_round = GNUNET_TIME_absolute_get ();
276 GNUNET_STATISTICS_update (fc->stats,
277 _ ("# fragments wrap arounds"),
278 1,
279 GNUNET_NO);
280 }
281 fc->proc_busy = GNUNET_YES;
282 fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
283 fc->num_transmissions++;
284 fc->proc (fc->proc_cls,
285 &fh->header);
286}
287
288
289/**
290 * Create a fragmentation context for the given message.
291 * Fragments the message into fragments of size @a mtu or
292 * less. Calls @a proc on each un-acknowledged fragment,
293 * using both the expected @a msg_delay between messages and
294 * acknowledgements and the given @a tracker to guide the
295 * frequency of calls to @a proc.
296 *
297 * @param stats statistics context
298 * @param mtu the maximum message size for each fragment
299 * @param tracker bandwidth tracker to use for flow control (can be NULL)
300 * @param msg_delay initial delay to insert between fragment transmissions
301 * based on previous messages
302 * @param ack_delay expected delay between fragment transmission
303 * and ACK based on previous messages
304 * @param msg the message to fragment
305 * @param proc function to call for each fragment to transmit
306 * @param proc_cls closure for @a proc
307 * @return the fragmentation context
308 */
309struct GNUNET_FRAGMENT_Context *
310GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
311 uint16_t mtu,
312 struct GNUNET_BANDWIDTH_Tracker *tracker,
313 struct GNUNET_TIME_Relative msg_delay,
314 struct GNUNET_TIME_Relative ack_delay,
315 const struct GNUNET_MessageHeader *msg,
316 GNUNET_FRAGMENT_MessageProcessor proc,
317 void *proc_cls)
318{
319 struct GNUNET_FRAGMENT_Context *fc;
320 size_t size;
321 uint64_t bits;
322
323 GNUNET_STATISTICS_update (stats,
324 _ ("# messages fragmented"),
325 1,
326 GNUNET_NO);
327 GNUNET_assert (mtu >= 1024 + sizeof(struct FragmentHeader));
328 size = ntohs (msg->size);
329 GNUNET_STATISTICS_update (stats,
330 _ ("# total size of fragmented messages"),
331 size, GNUNET_NO);
332 GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
333 fc = GNUNET_malloc (sizeof(struct GNUNET_FRAGMENT_Context) + size);
334 fc->stats = stats;
335 fc->mtu = mtu;
336 fc->tracker = tracker;
337 fc->ack_delay = ack_delay;
338 fc->msg_delay = msg_delay;
339 fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
340 fc->proc = proc;
341 fc->proc_cls = proc_cls;
342 fc->fragment_id =
343 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
344 UINT32_MAX);
345 GNUNET_memcpy (&fc[1], msg, size);
346 bits =
347 (size + mtu - sizeof(struct FragmentHeader) - 1) / (mtu
348 - sizeof(struct
349 FragmentHeader));
350 GNUNET_assert (bits <= 64);
351 if (bits == 64)
352 fc->acks_mask = UINT64_MAX; /* set all 64 bit */
353 else
354 fc->acks_mask = (1LLU << bits) - 1; /* set lowest 'bits' bit */
355 fc->acks = fc->acks_mask;
356 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
357 return fc;
358}
359
360
361/**
362 * Continuation to call from the 'proc' function after the fragment
363 * has been transmitted (and hence the next fragment can now be
364 * given to proc).
365 *
366 * @param fc fragmentation context
367 */
368void
369GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
370{
371 GNUNET_assert (fc->proc_busy == GNUNET_YES);
372 fc->proc_busy = GNUNET_NO;
373 GNUNET_assert (fc->task == NULL);
374 fc->task =
375 GNUNET_SCHEDULER_add_at (fc->delay_until,
376 &transmit_next,
377 fc);
378}
379
380
381/**
382 * Process an acknowledgement message we got from the other
383 * side (to control re-transmits).
384 *
385 * @param fc fragmentation context
386 * @param msg acknowledgement message we received
387 * @return #GNUNET_OK if this ack completes the work of the 'fc'
388 * (all fragments have been received);
389 * #GNUNET_NO if more messages are pending
390 * #GNUNET_SYSERR if this ack is not valid for this fc
391 */
392int
393GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
394 const struct GNUNET_MessageHeader *msg)
395{
396 const struct FragmentAcknowledgement *fa;
397 uint64_t abits;
398 struct GNUNET_TIME_Relative ndelay;
399 unsigned int ack_cnt;
400 unsigned int snd_cnt;
401 unsigned int i;
402
403 if (sizeof(struct FragmentAcknowledgement) != ntohs (msg->size))
404 {
405 GNUNET_break_op (0);
406 return GNUNET_SYSERR;
407 }
408 fa = (const struct FragmentAcknowledgement *) msg;
409 if (ntohl (fa->fragment_id) != fc->fragment_id)
410 return GNUNET_SYSERR; /* not our ACK */
411 abits = GNUNET_ntohll (fa->bits);
412 if ((GNUNET_YES == fc->wack) &&
413 (0 != fc->num_transmissions))
414 {
415 /* normal ACK, can update running average of delay... */
416 fc->wack = GNUNET_NO;
417 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
418 fc->ack_delay.rel_value_us =
419 (ndelay.rel_value_us / fc->num_transmissions + 3
420 * fc->ack_delay.rel_value_us) / 4;
421 /* calculate ratio msg sent vs. msg acked */
422 ack_cnt = 0;
423 snd_cnt = 0;
424 for (i = 0; i < 64; i++)
425 {
426 if (1 == (fc->acks_mask & (1ULL << i)))
427 {
428 snd_cnt++;
429 if (0 == (abits & (1ULL << i)))
430 ack_cnt++;
431 }
432 }
433 if (0 == ack_cnt)
434 {
435 /* complete loss */
436 fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
437 snd_cnt);
438 }
439 else if (snd_cnt > ack_cnt)
440 {
441 /* some loss, slow down proportionally */
442 fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt)
443 / snd_cnt);
444 }
445 else if (snd_cnt == ack_cnt)
446 {
447 fc->msg_delay.rel_value_us =
448 (ndelay.rel_value_us / fc->num_transmissions + 3
449 * fc->msg_delay.rel_value_us) / 5;
450 }
451 fc->num_transmissions = 0;
452 fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
453 GNUNET_TIME_UNIT_SECONDS);
454 fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
455 GNUNET_TIME_UNIT_SECONDS);
456 }
457 GNUNET_STATISTICS_update (fc->stats,
458 _ ("# fragment acknowledgements received"),
459 1,
460 GNUNET_NO);
461 if (abits != (fc->acks & abits))
462 {
463 /* ID collision or message reordering, count! This should be rare! */
464 GNUNET_STATISTICS_update (fc->stats,
465 _ ("# bits removed from fragmentation ACKs"), 1,
466 GNUNET_NO);
467 }
468 fc->acks = abits & fc->acks_mask;
469 if (0 != fc->acks)
470 {
471 /* more to transmit, do so right now (if tracker permits...) */
472 if (fc->task != NULL)
473 {
474 /* schedule next transmission now, no point in waiting... */
475 GNUNET_SCHEDULER_cancel (fc->task);
476 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
477 }
478 else
479 {
480 /* only case where there is no task should be if we're waiting
481 * for the right to transmit again (proc_busy set to YES) */
482 GNUNET_assert (GNUNET_YES == fc->proc_busy);
483 }
484 return GNUNET_NO;
485 }
486
487 /* all done */
488 GNUNET_STATISTICS_update (fc->stats,
489 _ ("# fragmentation transmissions completed"),
490 1,
491 GNUNET_NO);
492 if (NULL != fc->task)
493 {
494 GNUNET_SCHEDULER_cancel (fc->task);
495 fc->task = NULL;
496 }
497 return GNUNET_OK;
498}
499
500
501/**
502 * Destroy the given fragmentation context (stop calling 'proc', free
503 * resources).
504 *
505 * @param fc fragmentation context
506 * @param msg_delay where to store average delay between individual message transmissions the
507 * last message (OUT only)
508 * @param ack_delay where to store average delay between transmission and ACK for the
509 * last message, set to FOREVER if the message was not fully transmitted (OUT only)
510 */
511void
512GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
513 struct GNUNET_TIME_Relative *msg_delay,
514 struct GNUNET_TIME_Relative *ack_delay)
515{
516 if (fc->task != NULL)
517 GNUNET_SCHEDULER_cancel (fc->task);
518 if (NULL != ack_delay)
519 *ack_delay = fc->ack_delay;
520 if (NULL != msg_delay)
521 *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
522 fc->num_rounds);
523 GNUNET_free (fc);
524}
525
526
527/* end of fragmentation.c */