aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/fragmentation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fragmentation/fragmentation.c')
-rw-r--r--src/fragmentation/fragmentation.c528
1 files changed, 0 insertions, 528 deletions
diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c
deleted file mode 100644
index a2e097b8b..000000000
--- a/src/fragmentation/fragmentation.c
+++ /dev/null
@@ -1,528 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2009-2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file src/fragmentation/fragmentation.c
22 * @brief library to help fragment messages
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_fragmentation_lib.h"
27#include "gnunet_protocols.h"
28#include "fragmentation.h"
29
30
31/**
32 * Absolute minimum delay we impose between sending and expecting ACK to arrive.
33 */
34#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply ( \
35 GNUNET_TIME_UNIT_MILLISECONDS, 1)
36
37
38/**
39 * Fragmentation context.
40 */
41struct GNUNET_FRAGMENT_Context
42{
43 /**
44 * Statistics to use.
45 */
46 struct GNUNET_STATISTICS_Handle *stats;
47
48 /**
49 * Tracker for flow control.
50 */
51 struct GNUNET_BANDWIDTH_Tracker *tracker;
52
53 /**
54 * Current expected delay for ACKs.
55 */
56 struct GNUNET_TIME_Relative ack_delay;
57
58 /**
59 * Current expected delay between messages.
60 */
61 struct GNUNET_TIME_Relative msg_delay;
62
63 /**
64 * Next allowed transmission time.
65 */
66 struct GNUNET_TIME_Absolute delay_until;
67
68 /**
69 * Time we transmitted the last message of the last round.
70 */
71 struct GNUNET_TIME_Absolute last_round;
72
73 /**
74 * Message to fragment (allocated at the end of this struct).
75 */
76 const struct GNUNET_MessageHeader *msg;
77
78 /**
79 * Function to call for transmissions.
80 */
81 GNUNET_FRAGMENT_MessageProcessor proc;
82
83 /**
84 * Closure for @e proc.
85 */
86 void *proc_cls;
87
88 /**
89 * Bitfield, set to 1 for each unacknowledged fragment.
90 */
91 uint64_t acks;
92
93 /**
94 * Bitfield with all possible bits for @e acks (used to mask the
95 * ack we get back).
96 */
97 uint64_t acks_mask;
98
99 /**
100 * Task performing work for the fragmenter.
101 */
102 struct GNUNET_SCHEDULER_Task *task;
103
104 /**
105 * Our fragmentation ID. (chosen at random)
106 */
107 uint32_t fragment_id;
108
109 /**
110 * Round-robin selector for the next transmission.
111 */
112 unsigned int next_transmission;
113
114 /**
115 * How many rounds of transmission have we completed so far?
116 */
117 unsigned int num_rounds;
118
119 /**
120 * How many transmission have we completed in this round?
121 */
122 unsigned int num_transmissions;
123
124 /**
125 * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
126 */
127 int8_t proc_busy;
128
129 /**
130 * #GNUNET_YES if we are waiting for an ACK.
131 */
132 int8_t wack;
133
134 /**
135 * Target fragment size.
136 */
137 uint16_t mtu;
138};
139
140
141/**
142 * Convert an ACK message to a printable format suitable for logging.
143 *
144 * @param ack message to print
145 * @return ack in human-readable format
146 */
147const char *
148GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
149{
150 static char buf[128];
151 const struct FragmentAcknowledgement *fa;
152
153 if (sizeof(struct FragmentAcknowledgement) !=
154 htons (ack->size))
155 return "<malformed ack>";
156 fa = (const struct FragmentAcknowledgement *) ack;
157 GNUNET_snprintf (buf,
158 sizeof(buf),
159 "%u-%llX",
160 ntohl (fa->fragment_id),
161 (unsigned long long) GNUNET_ntohll (fa->bits));
162 return buf;
163}
164
165
166/**
167 * Transmit the next fragment to the other peer.
168 *
169 * @param cls the `struct GNUNET_FRAGMENT_Context`
170 */
171static void
172transmit_next (void *cls)
173{
174 struct GNUNET_FRAGMENT_Context *fc = cls;
175 char msg[fc->mtu];
176 const char *mbuf;
177 struct FragmentHeader *fh;
178 struct GNUNET_TIME_Relative delay;
179 unsigned int bit;
180 size_t size;
181 size_t fsize;
182 int wrap;
183
184 fc->task = NULL;
185 GNUNET_assert (GNUNET_NO == fc->proc_busy);
186 if (0 == fc->acks)
187 return; /* all done */
188 /* calculate delay */
189 wrap = 0;
190 while (0 == (fc->acks & (1LLU << fc->next_transmission)))
191 {
192 fc->next_transmission = (fc->next_transmission + 1) % 64;
193 wrap |= (0 == fc->next_transmission);
194 }
195 bit = fc->next_transmission;
196 size = ntohs (fc->msg->size);
197 if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
198 fsize =
199 (size % (fc->mtu - sizeof(struct FragmentHeader)))
200 + sizeof(struct FragmentHeader);
201 else
202 fsize = fc->mtu;
203 if (NULL != fc->tracker)
204 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
205 fsize);
206 else
207 delay = GNUNET_TIME_UNIT_ZERO;
208 if (delay.rel_value_us > 0)
209 {
210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
211 "Fragmentation logic delays transmission of next fragment by %s\n",
212 GNUNET_STRINGS_relative_time_to_string (delay,
213 GNUNET_YES));
214 fc->task = GNUNET_SCHEDULER_add_delayed (delay,
215 &transmit_next,
216 fc);
217 return;
218 }
219 fc->next_transmission = (fc->next_transmission + 1) % 64;
220 wrap |= (0 == fc->next_transmission);
221 while (0 == (fc->acks & (1LLU << fc->next_transmission)))
222 {
223 fc->next_transmission = (fc->next_transmission + 1) % 64;
224 wrap |= (0 == fc->next_transmission);
225 }
226
227 /* assemble fragmentation message */
228 mbuf = (const char *) &fc[1];
229 fh = (struct FragmentHeader *) msg;
230 fh->header.size = htons (fsize);
231 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
232 fh->fragment_id = htonl (fc->fragment_id);
233 fh->total_size = fc->msg->size; /* already in big-endian */
234 fh->offset = htons ((fc->mtu - sizeof(struct FragmentHeader)) * bit);
235 GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof(struct FragmentHeader))],
236 fsize - sizeof(struct FragmentHeader));
237 if (NULL != fc->tracker)
238 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
239 GNUNET_STATISTICS_update (fc->stats,
240 _ ("# fragments transmitted"),
241 1,
242 GNUNET_NO);
243 if (0 != fc->last_round.abs_value_us)
244 GNUNET_STATISTICS_update (fc->stats,
245 _ ("# fragments retransmitted"),
246 1,
247 GNUNET_NO);
248
249 /* select next message to calculate delay */
250 bit = fc->next_transmission;
251 size = ntohs (fc->msg->size);
252 if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
253 fsize = size % (fc->mtu - sizeof(struct FragmentHeader));
254 else
255 fsize = fc->mtu;
256 if (NULL != fc->tracker)
257 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
258 fsize);
259 else
260 delay = GNUNET_TIME_UNIT_ZERO;
261 if (fc->num_rounds < 64)
262 delay = GNUNET_TIME_relative_max (delay,
263 GNUNET_TIME_relative_saturating_multiply
264 (fc->msg_delay,
265 (1ULL << fc->num_rounds)));
266 else
267 delay = GNUNET_TIME_UNIT_FOREVER_REL;
268 if (wrap)
269 {
270 /* full round transmitted wait 2x delay for ACK before going again */
271 fc->num_rounds++;
272 delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
273 /* never use zero, need some time for ACK always */
274 delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
275 fc->wack = GNUNET_YES;
276 fc->last_round = GNUNET_TIME_absolute_get ();
277 GNUNET_STATISTICS_update (fc->stats,
278 _ ("# fragments wrap arounds"),
279 1,
280 GNUNET_NO);
281 }
282 fc->proc_busy = GNUNET_YES;
283 fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
284 fc->num_transmissions++;
285 fc->proc (fc->proc_cls,
286 &fh->header);
287}
288
289
290/**
291 * Create a fragmentation context for the given message.
292 * Fragments the message into fragments of size @a mtu or
293 * less. Calls @a proc on each un-acknowledged fragment,
294 * using both the expected @a msg_delay between messages and
295 * acknowledgements and the given @a tracker to guide the
296 * frequency of calls to @a proc.
297 *
298 * @param stats statistics context
299 * @param mtu the maximum message size for each fragment
300 * @param tracker bandwidth tracker to use for flow control (can be NULL)
301 * @param msg_delay initial delay to insert between fragment transmissions
302 * based on previous messages
303 * @param ack_delay expected delay between fragment transmission
304 * and ACK based on previous messages
305 * @param msg the message to fragment
306 * @param proc function to call for each fragment to transmit
307 * @param proc_cls closure for @a proc
308 * @return the fragmentation context
309 */
310struct GNUNET_FRAGMENT_Context *
311GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
312 uint16_t mtu,
313 struct GNUNET_BANDWIDTH_Tracker *tracker,
314 struct GNUNET_TIME_Relative msg_delay,
315 struct GNUNET_TIME_Relative ack_delay,
316 const struct GNUNET_MessageHeader *msg,
317 GNUNET_FRAGMENT_MessageProcessor proc,
318 void *proc_cls)
319{
320 struct GNUNET_FRAGMENT_Context *fc;
321 size_t size;
322 uint64_t bits;
323
324 GNUNET_STATISTICS_update (stats,
325 _ ("# messages fragmented"),
326 1,
327 GNUNET_NO);
328 GNUNET_assert (mtu >= 1024 + sizeof(struct FragmentHeader));
329 size = ntohs (msg->size);
330 GNUNET_STATISTICS_update (stats,
331 _ ("# total size of fragmented messages"),
332 size, GNUNET_NO);
333 GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
334 fc = GNUNET_malloc (sizeof(struct GNUNET_FRAGMENT_Context) + size);
335 fc->stats = stats;
336 fc->mtu = mtu;
337 fc->tracker = tracker;
338 fc->ack_delay = ack_delay;
339 fc->msg_delay = msg_delay;
340 fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
341 fc->proc = proc;
342 fc->proc_cls = proc_cls;
343 fc->fragment_id =
344 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
345 UINT32_MAX);
346 GNUNET_memcpy (&fc[1], msg, size);
347 bits =
348 (size + mtu - sizeof(struct FragmentHeader) - 1) / (mtu
349 - sizeof(struct
350 FragmentHeader));
351 GNUNET_assert (bits <= 64);
352 if (bits == 64)
353 fc->acks_mask = UINT64_MAX; /* set all 64 bit */
354 else
355 fc->acks_mask = (1LLU << bits) - 1; /* set lowest 'bits' bit */
356 fc->acks = fc->acks_mask;
357 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
358 return fc;
359}
360
361
362/**
363 * Continuation to call from the 'proc' function after the fragment
364 * has been transmitted (and hence the next fragment can now be
365 * given to proc).
366 *
367 * @param fc fragmentation context
368 */
369void
370GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
371{
372 GNUNET_assert (fc->proc_busy == GNUNET_YES);
373 fc->proc_busy = GNUNET_NO;
374 GNUNET_assert (fc->task == NULL);
375 fc->task =
376 GNUNET_SCHEDULER_add_at (fc->delay_until,
377 &transmit_next,
378 fc);
379}
380
381
382/**
383 * Process an acknowledgement message we got from the other
384 * side (to control re-transmits).
385 *
386 * @param fc fragmentation context
387 * @param msg acknowledgement message we received
388 * @return #GNUNET_OK if this ack completes the work of the 'fc'
389 * (all fragments have been received);
390 * #GNUNET_NO if more messages are pending
391 * #GNUNET_SYSERR if this ack is not valid for this fc
392 */
393int
394GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
395 const struct GNUNET_MessageHeader *msg)
396{
397 const struct FragmentAcknowledgement *fa;
398 uint64_t abits;
399 struct GNUNET_TIME_Relative ndelay;
400 unsigned int ack_cnt;
401 unsigned int snd_cnt;
402 unsigned int i;
403
404 if (sizeof(struct FragmentAcknowledgement) != ntohs (msg->size))
405 {
406 GNUNET_break_op (0);
407 return GNUNET_SYSERR;
408 }
409 fa = (const struct FragmentAcknowledgement *) msg;
410 if (ntohl (fa->fragment_id) != fc->fragment_id)
411 return GNUNET_SYSERR; /* not our ACK */
412 abits = GNUNET_ntohll (fa->bits);
413 if ((GNUNET_YES == fc->wack) &&
414 (0 != fc->num_transmissions))
415 {
416 /* normal ACK, can update running average of delay... */
417 fc->wack = GNUNET_NO;
418 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
419 fc->ack_delay.rel_value_us =
420 (ndelay.rel_value_us / fc->num_transmissions + 3
421 * fc->ack_delay.rel_value_us) / 4;
422 /* calculate ratio msg sent vs. msg acked */
423 ack_cnt = 0;
424 snd_cnt = 0;
425 for (i = 0; i < 64; i++)
426 {
427 if (1 == (fc->acks_mask & (1ULL << i)))
428 {
429 snd_cnt++;
430 if (0 == (abits & (1ULL << i)))
431 ack_cnt++;
432 }
433 }
434 if (0 == ack_cnt)
435 {
436 /* complete loss */
437 fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
438 snd_cnt);
439 }
440 else if (snd_cnt > ack_cnt)
441 {
442 /* some loss, slow down proportionally */
443 fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt)
444 / snd_cnt);
445 }
446 else if (snd_cnt == ack_cnt)
447 {
448 fc->msg_delay.rel_value_us =
449 (ndelay.rel_value_us / fc->num_transmissions + 3
450 * fc->msg_delay.rel_value_us) / 5;
451 }
452 fc->num_transmissions = 0;
453 fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
454 GNUNET_TIME_UNIT_SECONDS);
455 fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
456 GNUNET_TIME_UNIT_SECONDS);
457 }
458 GNUNET_STATISTICS_update (fc->stats,
459 _ ("# fragment acknowledgements received"),
460 1,
461 GNUNET_NO);
462 if (abits != (fc->acks & abits))
463 {
464 /* ID collision or message reordering, count! This should be rare! */
465 GNUNET_STATISTICS_update (fc->stats,
466 _ ("# bits removed from fragmentation ACKs"), 1,
467 GNUNET_NO);
468 }
469 fc->acks = abits & fc->acks_mask;
470 if (0 != fc->acks)
471 {
472 /* more to transmit, do so right now (if tracker permits...) */
473 if (fc->task != NULL)
474 {
475 /* schedule next transmission now, no point in waiting... */
476 GNUNET_SCHEDULER_cancel (fc->task);
477 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
478 }
479 else
480 {
481 /* only case where there is no task should be if we're waiting
482 * for the right to transmit again (proc_busy set to YES) */
483 GNUNET_assert (GNUNET_YES == fc->proc_busy);
484 }
485 return GNUNET_NO;
486 }
487
488 /* all done */
489 GNUNET_STATISTICS_update (fc->stats,
490 _ ("# fragmentation transmissions completed"),
491 1,
492 GNUNET_NO);
493 if (NULL != fc->task)
494 {
495 GNUNET_SCHEDULER_cancel (fc->task);
496 fc->task = NULL;
497 }
498 return GNUNET_OK;
499}
500
501
502/**
503 * Destroy the given fragmentation context (stop calling 'proc', free
504 * resources).
505 *
506 * @param fc fragmentation context
507 * @param msg_delay where to store average delay between individual message transmissions the
508 * last message (OUT only)
509 * @param ack_delay where to store average delay between transmission and ACK for the
510 * last message, set to FOREVER if the message was not fully transmitted (OUT only)
511 */
512void
513GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
514 struct GNUNET_TIME_Relative *msg_delay,
515 struct GNUNET_TIME_Relative *ack_delay)
516{
517 if (fc->task != NULL)
518 GNUNET_SCHEDULER_cancel (fc->task);
519 if (NULL != ack_delay)
520 *ack_delay = fc->ack_delay;
521 if (NULL != msg_delay)
522 *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
523 fc->num_rounds);
524 GNUNET_free (fc);
525}
526
527
528/* end of fragmentation.c */