aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/defragmentation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fragmentation/defragmentation.c')
-rw-r--r--src/fragmentation/defragmentation.c590
1 files changed, 0 insertions, 590 deletions
diff --git a/src/fragmentation/defragmentation.c b/src/fragmentation/defragmentation.c
deleted file mode 100644
index c0ca86b37..000000000
--- a/src/fragmentation/defragmentation.c
+++ /dev/null
@@ -1,590 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2009, 2011 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file src/fragmentation/defragmentation.c
22 * @brief library to help defragment messages
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_fragmentation_lib.h"
27#include "fragmentation.h"
28
29/**
30 * Timestamps for fragments.
31 */
32struct FragTimes
33{
34 /**
35 * The time the fragment was received.
36 */
37 struct GNUNET_TIME_Absolute time;
38
39 /**
40 * Number of the bit for the fragment (in [0,..,63]).
41 */
42 unsigned int bit;
43};
44
45
46/**
47 * Information we keep for one message that is being assembled. Note
48 * that we keep the context around even after the assembly is done to
49 * handle 'stray' messages that are received 'late'. A message
50 * context is ONLY discarded when the queue gets too big.
51 */
52struct MessageContext
53{
54 /**
55 * This is a DLL.
56 */
57 struct MessageContext *next;
58
59 /**
60 * This is a DLL.
61 */
62 struct MessageContext *prev;
63
64 /**
65 * Associated defragmentation context.
66 */
67 struct GNUNET_DEFRAGMENT_Context *dc;
68
69 /**
70 * Pointer to the assembled message, allocated at the
71 * end of this struct.
72 */
73 const struct GNUNET_MessageHeader *msg;
74
75 /**
76 * Last time we received any update for this message
77 * (least-recently updated message will be discarded
78 * if we hit the queue size).
79 */
80 struct GNUNET_TIME_Absolute last_update;
81
82 /**
83 * Task scheduled for transmitting the next ACK to the
84 * other peer.
85 */
86 struct GNUNET_SCHEDULER_Task *ack_task;
87
88 /**
89 * When did we receive which fragment? Used to calculate
90 * the time we should send the ACK.
91 */
92 struct FragTimes frag_times[64];
93
94 /**
95 * Which fragments have we gotten yet? bits that are 1
96 * indicate missing fragments.
97 */
98 uint64_t bits;
99
100 /**
101 * Unique ID for this message.
102 */
103 uint32_t fragment_id;
104
105 /**
106 * Which 'bit' did the last fragment we received correspond to?
107 */
108 unsigned int last_bit;
109
110 /**
111 * For the current ACK round, which is the first relevant
112 * offset in @e frag_times?
113 */
114 unsigned int frag_times_start_offset;
115
116 /**
117 * Which offset would we write the next frag value into
118 * in the @e frag_times array? All smaller entries are valid.
119 */
120 unsigned int frag_times_write_offset;
121
122 /**
123 * Total size of the message that we are assembling.
124 */
125 uint16_t total_size;
126
127 /**
128 * Was the last fragment we got a duplicate?
129 */
130 int16_t last_duplicate;
131};
132
133
134/**
135 * Defragmentation context (one per connection).
136 */
137struct GNUNET_DEFRAGMENT_Context
138{
139 /**
140 * For statistics.
141 */
142 struct GNUNET_STATISTICS_Handle *stats;
143
144 /**
145 * Head of list of messages we're defragmenting.
146 */
147 struct MessageContext *head;
148
149 /**
150 * Tail of list of messages we're defragmenting.
151 */
152 struct MessageContext *tail;
153
154 /**
155 * Closure for @e proc and @e ackp.
156 */
157 void *cls;
158
159 /**
160 * Function to call with defragmented messages.
161 */
162 GNUNET_FRAGMENT_MessageProcessor proc;
163
164 /**
165 * Function to call with acknowledgements.
166 */
167 GNUNET_DEFRAGMENT_AckProcessor ackp;
168
169 /**
170 * Running average of the latency (delay between messages) for this
171 * connection.
172 */
173 struct GNUNET_TIME_Relative latency;
174
175 /**
176 * num_msgs how many fragmented messages
177 * to we defragment at most at the same time?
178 */
179 unsigned int num_msgs;
180
181 /**
182 * Current number of messages in the 'struct MessageContext'
183 * DLL (smaller or equal to 'num_msgs').
184 */
185 unsigned int list_size;
186
187 /**
188 * Maximum message size for each fragment.
189 */
190 uint16_t mtu;
191};
192
193
194/**
195 * Create a defragmentation context.
196 *
197 * @param stats statistics context
198 * @param mtu the maximum message size for each fragment
199 * @param num_msgs how many fragmented messages
200 * to we defragment at most at the same time?
201 * @param cls closure for @a proc and @a ackp
202 * @param proc function to call with defragmented messages
203 * @param ackp function to call with acknowledgements (to send
204 * back to the other side)
205 * @return the defragmentation context
206 */
207struct GNUNET_DEFRAGMENT_Context *
208GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
209 uint16_t mtu, unsigned int num_msgs,
210 void *cls,
211 GNUNET_FRAGMENT_MessageProcessor proc,
212 GNUNET_DEFRAGMENT_AckProcessor ackp)
213{
214 struct GNUNET_DEFRAGMENT_Context *dc;
215
216 dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
217 dc->stats = stats;
218 dc->cls = cls;
219 dc->proc = proc;
220 dc->ackp = ackp;
221 dc->num_msgs = num_msgs;
222 dc->mtu = mtu;
223 dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
224 return dc;
225}
226
227
228/**
229 * Destroy the given defragmentation context.
230 *
231 * @param dc defragmentation context
232 */
233void
234GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
235{
236 struct MessageContext *mc;
237
238 while (NULL != (mc = dc->head))
239 {
240 GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
241 dc->list_size--;
242 if (NULL != mc->ack_task)
243 {
244 GNUNET_SCHEDULER_cancel (mc->ack_task);
245 mc->ack_task = NULL;
246 }
247 GNUNET_free (mc);
248 }
249 GNUNET_assert (0 == dc->list_size);
250 GNUNET_free (dc);
251}
252
253
254/**
255 * Send acknowledgement to the other peer now.
256 *
257 * @param cls the message context
258 */
259static void
260send_ack (void *cls)
261{
262 struct MessageContext *mc = cls;
263 struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
264 struct FragmentAcknowledgement fa;
265
266 mc->ack_task = NULL;
267 fa.header.size = htons (sizeof(struct FragmentAcknowledgement));
268 fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
269 fa.fragment_id = htonl (mc->fragment_id);
270 fa.bits = GNUNET_htonll (mc->bits);
271 GNUNET_STATISTICS_update (mc->dc->stats,
272 _ ("# acknowledgements sent for fragment"),
273 1,
274 GNUNET_NO);
275 mc->last_duplicate = GNUNET_NO; /* clear flag */
276 dc->ackp (dc->cls,
277 mc->fragment_id,
278 &fa.header);
279}
280
281
282/**
283 * This function is from the GNU Scientific Library, linear/fit.c,
284 * Copyright (C) 2000 Brian Gough
285 */
286static void
287gsl_fit_mul (const double *x, const size_t xstride, const double *y,
288 const size_t ystride, const size_t n, double *c1, double *cov_11,
289 double *sumsq)
290{
291 double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
292
293 size_t i;
294
295 for (i = 0; i < n; i++)
296 {
297 m_x += (x[i * xstride] - m_x) / (i + 1.0);
298 m_y += (y[i * ystride] - m_y) / (i + 1.0);
299 }
300
301 for (i = 0; i < n; i++)
302 {
303 const double dx = x[i * xstride] - m_x;
304 const double dy = y[i * ystride] - m_y;
305
306 m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
307 m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
308 }
309
310 /* In terms of y = b x */
311
312 {
313 double s2 = 0, d2 = 0;
314 double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
315
316 *c1 = b;
317
318 /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
319
320 for (i = 0; i < n; i++)
321 {
322 const double dx = x[i * xstride] - m_x;
323 const double dy = y[i * ystride] - m_y;
324 const double d = (m_y - b * m_x) + dy - b * dx;
325
326 d2 += d * d;
327 }
328
329 s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
330
331 *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
332
333 *sumsq = d2;
334 }
335}
336
337
338/**
339 * Estimate the latency between messages based on the most recent
340 * message time stamps.
341 *
342 * @param mc context with time stamps
343 * @return average delay between time stamps (based on least-squares fit)
344 */
345static struct GNUNET_TIME_Relative
346estimate_latency (struct MessageContext *mc)
347{
348 struct FragTimes *first;
349 size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
350 double x[total];
351 double y[total];
352 size_t i;
353 double c1;
354 double cov11;
355 double sumsq;
356 struct GNUNET_TIME_Relative ret;
357
358 first = &mc->frag_times[mc->frag_times_start_offset];
359 GNUNET_assert (total > 1);
360 for (i = 0; i < total; i++)
361 {
362 x[i] = (double) i;
363 y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
364 }
365 gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
366 c1 += sqrt (sumsq); /* add 1 std dev */
367 ret.rel_value_us = (uint64_t) c1;
368 if (0 == ret.rel_value_us)
369 ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */
370 return ret;
371}
372
373
374/**
375 * Discard the message context that was inactive for the longest time.
376 *
377 * @param dc defragmentation context
378 */
379static void
380discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
381{
382 struct MessageContext *old;
383 struct MessageContext *pos;
384
385 old = NULL;
386 pos = dc->head;
387 while (NULL != pos)
388 {
389 if ((old == NULL) ||
390 (old->last_update.abs_value_us > pos->last_update.abs_value_us))
391 old = pos;
392 pos = pos->next;
393 }
394 GNUNET_assert (NULL != old);
395 GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
396 dc->list_size--;
397 if (NULL != old->ack_task)
398 {
399 GNUNET_SCHEDULER_cancel (old->ack_task);
400 old->ack_task = NULL;
401 }
402 GNUNET_free (old);
403}
404
405
406/**
407 * We have received a fragment. Process it.
408 *
409 * @param dc the context
410 * @param msg the message that was received
411 * @return #GNUNET_OK on success,
412 * #GNUNET_NO if this was a duplicate,
413 * #GNUNET_SYSERR on error
414 */
415int
416GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
417 const struct GNUNET_MessageHeader *msg)
418{
419 struct MessageContext *mc;
420 const struct FragmentHeader *fh;
421 uint16_t msize;
422 uint16_t foff;
423 uint32_t fid;
424 char *mbuf;
425 unsigned int bit;
426 struct GNUNET_TIME_Absolute now;
427 struct GNUNET_TIME_Relative delay;
428 unsigned int bc;
429 unsigned int b;
430 unsigned int n;
431 unsigned int num_fragments;
432 int duplicate;
433 int last;
434
435 if (ntohs (msg->size) < sizeof(struct FragmentHeader))
436 {
437 GNUNET_break_op (0);
438 return GNUNET_SYSERR;
439 }
440 if (ntohs (msg->size) > dc->mtu)
441 {
442 GNUNET_break_op (0);
443 return GNUNET_SYSERR;
444 }
445 fh = (const struct FragmentHeader *) msg;
446 msize = ntohs (fh->total_size);
447 if (msize < sizeof(struct GNUNET_MessageHeader))
448 {
449 GNUNET_break_op (0);
450 return GNUNET_SYSERR;
451 }
452 fid = ntohl (fh->fragment_id);
453 foff = ntohs (fh->offset);
454 if (foff >= msize)
455 {
456 GNUNET_break_op (0);
457 return GNUNET_SYSERR;
458 }
459 if (0 != (foff % (dc->mtu - sizeof(struct FragmentHeader))))
460 {
461 GNUNET_break_op (0);
462 return GNUNET_SYSERR;
463 }
464 GNUNET_STATISTICS_update (dc->stats,
465 _ ("# fragments received"),
466 1,
467 GNUNET_NO);
468 num_fragments = (ntohs (msg->size) + dc->mtu - sizeof(struct FragmentHeader)
469 - 1) / (dc->mtu - sizeof(struct FragmentHeader));
470 last = 0;
471 for (mc = dc->head; NULL != mc; mc = mc->next)
472 if (mc->fragment_id > fid)
473 last++;
474
475 mc = dc->head;
476 while ((NULL != mc) && (fid != mc->fragment_id))
477 mc = mc->next;
478 bit = foff / (dc->mtu - sizeof(struct FragmentHeader));
479 if (bit * (dc->mtu - sizeof(struct FragmentHeader)) + ntohs (msg->size)
480 - sizeof(struct FragmentHeader) > msize)
481 {
482 /* payload extends past total message size */
483 GNUNET_break_op (0);
484 return GNUNET_SYSERR;
485 }
486 if ((NULL != mc) && (msize != mc->total_size))
487 {
488 /* inconsistent message size */
489 GNUNET_break_op (0);
490 return GNUNET_SYSERR;
491 }
492 now = GNUNET_TIME_absolute_get ();
493 if (NULL == mc)
494 {
495 mc = GNUNET_malloc (sizeof(struct MessageContext) + msize);
496 mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
497 mc->dc = dc;
498 mc->total_size = msize;
499 mc->fragment_id = fid;
500 mc->last_update = now;
501 n = (msize + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu
502 - sizeof(struct
503 FragmentHeader));
504 if (n == 64)
505 mc->bits = UINT64_MAX; /* set all 64 bit */
506 else
507 mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */
508 if (dc->list_size >= dc->num_msgs)
509 discard_oldest_mc (dc);
510 GNUNET_CONTAINER_DLL_insert (dc->head,
511 dc->tail,
512 mc);
513 dc->list_size++;
514 }
515
516 /* copy data to 'mc' */
517 if (0 != (mc->bits & (1LLU << bit)))
518 {
519 mc->bits -= 1LLU << bit;
520 mbuf = (char *) &mc[1];
521 GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof(struct FragmentHeader))],
522 &fh[1],
523 ntohs (msg->size) - sizeof(struct FragmentHeader));
524 mc->last_update = now;
525 if (bit < mc->last_bit)
526 mc->frag_times_start_offset = mc->frag_times_write_offset;
527 mc->last_bit = bit;
528 mc->frag_times[mc->frag_times_write_offset].time = now;
529 mc->frag_times[mc->frag_times_write_offset].bit = bit;
530 mc->frag_times_write_offset++;
531 duplicate = GNUNET_NO;
532 }
533 else
534 {
535 duplicate = GNUNET_YES;
536 GNUNET_STATISTICS_update (dc->stats,
537 _ ("# duplicate fragments received"),
538 1,
539 GNUNET_NO);
540 }
541
542 /* count number of missing fragments after the current one */
543 bc = 0;
544 for (b = bit; b < 64; b++)
545 if (0 != (mc->bits & (1LLU << b)))
546 bc++;
547 else
548 bc = 0;
549
550 /* notify about complete message */
551 if ((GNUNET_NO == duplicate) &&
552 (0 == mc->bits))
553 {
554 GNUNET_STATISTICS_update (dc->stats,
555 _ ("# messages defragmented"),
556 1,
557 GNUNET_NO);
558 /* message complete, notify! */
559 dc->proc (dc->cls, mc->msg);
560 }
561 /* send ACK */
562 if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
563 {
564 dc->latency = estimate_latency (mc);
565 }
566 delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
567 bc + 1);
568 if ((last + fid == num_fragments) ||
569 (0 == mc->bits) ||
570 (GNUNET_YES == duplicate))
571 {
572 /* message complete or duplicate or last missing fragment in
573 linear sequence; ACK now! */
574 delay = GNUNET_TIME_UNIT_ZERO;
575 }
576 if (NULL != mc->ack_task)
577 GNUNET_SCHEDULER_cancel (mc->ack_task);
578 mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
579 &send_ack,
580 mc);
581 if (GNUNET_YES == duplicate)
582 {
583 mc->last_duplicate = GNUNET_YES;
584 return GNUNET_NO;
585 }
586 return GNUNET_YES;
587}
588
589
590/* end of defragmentation.c */