aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/defragmentation.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-07-13 11:08:01 +0000
committerChristian Grothoff <christian@grothoff.org>2011-07-13 11:08:01 +0000
commitb2c1c35b6344bd03d9a1b07afcd064b7be34094a (patch)
tree420f2529be00eb6ab40a29b568910440f8cc590a /src/fragmentation/defragmentation.c
parent1ad6c770476ec896d2a5c9f8e26237de36eb05e0 (diff)
downloadgnunet-b2c1c35b6344bd03d9a1b07afcd064b7be34094a.tar.gz
gnunet-b2c1c35b6344bd03d9a1b07afcd064b7be34094a.zip
rename
Diffstat (limited to 'src/fragmentation/defragmentation.c')
-rw-r--r--src/fragmentation/defragmentation.c549
1 files changed, 549 insertions, 0 deletions
diff --git a/src/fragmentation/defragmentation.c b/src/fragmentation/defragmentation.c
new file mode 100644
index 000000000..cc42d3e75
--- /dev/null
+++ b/src/fragmentation/defragmentation.c
@@ -0,0 +1,549 @@
1/*
2 This file is part of GNUnet
3 (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20/**
21 * @file src/fragmentation/defragmentation_new.c
22 * @brief library to help defragment messages
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_fragmentation_lib.h"
27#include "fragmentation.h"
28
29/**
30 * Timestamps for fragments.
31 */
32struct FragTimes
33{
34 /**
35 * The time the fragment was received.
36 */
37 struct GNUNET_TIME_Absolute time;
38
39 /**
40 * Number of the bit for the fragment (in [0,..,63]).
41 */
42 unsigned int bit;
43};
44
45
46/**
47 * Information we keep for one message that is being assembled. Note
48 * that we keep the context around even after the assembly is done to
49 * handle 'stray' messages that are received 'late'. A message
50 * context is ONLY discarded when the queue gets too big.
51 */
52struct MessageContext
53{
54 /**
55 * This is a DLL.
56 */
57 struct MessageContext *next;
58
59 /**
60 * This is a DLL.
61 */
62 struct MessageContext *prev;
63
64 /**
65 * Associated defragmentation context.
66 */
67 struct GNUNET_DEFRAGMENT_Context *dc;
68
69 /**
70 * Pointer to the assembled message, allocated at the
71 * end of this struct.
72 */
73 const struct GNUNET_MessageHeader *msg;
74
75 /**
76 * Last time we received any update for this message
77 * (least-recently updated message will be discarded
78 * if we hit the queue size).
79 */
80 struct GNUNET_TIME_Absolute last_update;
81
82 /**
83 * Task scheduled for transmitting the next ACK to the
84 * other peer.
85 */
86 GNUNET_SCHEDULER_TaskIdentifier ack_task;
87
88 /**
89 * When did we receive which fragment? Used to calculate
90 * the time we should send the ACK.
91 */
92 struct FragTimes frag_times[64];
93
94 /**
95 * Which fragments have we gotten yet? bits that are 1
96 * indicate missing fragments.
97 */
98 uint64_t bits;
99
100 /**
101 * Unique ID for this message.
102 */
103 uint32_t fragment_id;
104
105 /**
106 * Which 'bit' did the last fragment we received correspond to?
107 */
108 unsigned int last_bit;
109
110 /**
111 * For the current ACK round, which is the first relevant
112 * offset in 'frag_times'?
113 */
114 unsigned int frag_times_start_offset;
115
116 /**
117 * Which offset whould we write the next frag value into
118 * in the 'frag_times' array? All smaller entries are valid.
119 */
120 unsigned int frag_times_write_offset;
121
122 /**
123 * Total size of the message that we are assembling.
124 */
125 uint16_t total_size;
126
127};
128
129
130/**
131 * Defragmentation context (one per connection).
132 */
133struct GNUNET_DEFRAGMENT_Context
134{
135
136 /**
137 * For statistics.
138 */
139 struct GNUNET_STATISTICS_Handle *stats;
140
141 /**
142 * Head of list of messages we're defragmenting.
143 */
144 struct MessageContext *head;
145
146 /**
147 * Tail of list of messages we're defragmenting.
148 */
149 struct MessageContext *tail;
150
151 /**
152 * Closure for 'proc' and 'ackp'.
153 */
154 void *cls;
155
156 /**
157 * Function to call with defragmented messages.
158 */
159 GNUNET_FRAGMENT_MessageProcessor proc;
160
161 /**
162 * Function to call with acknowledgements.
163 */
164 GNUNET_DEFRAGMENT_AckProcessor ackp;
165
166 /**
167 * Running average of the latency (delay between messages) for this
168 * connection.
169 */
170 struct GNUNET_TIME_Relative latency;
171
172 /**
173 * num_msgs how many fragmented messages
174 * to we defragment at most at the same time?
175 */
176 unsigned int num_msgs;
177
178 /**
179 * Current number of messages in the 'struct MessageContext'
180 * DLL (smaller or equal to 'num_msgs').
181 */
182 unsigned int list_size;
183
184 /**
185 * Maximum message size for each fragment.
186 */
187 uint16_t mtu;
188};
189
190
191/**
192 * Create a defragmentation context.
193 *
194 * @param stats statistics context
195 * @param mtu the maximum message size for each fragment
196 * @param num_msgs how many fragmented messages
197 * to we defragment at most at the same time?
198 * @param cls closure for proc and ackp
199 * @param proc function to call with defragmented messages
200 * @param ackp function to call with acknowledgements (to send
201 * back to the other side)
202 * @return the defragmentation context
203 */
204struct GNUNET_DEFRAGMENT_Context *
205GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
206 uint16_t mtu,
207 unsigned int num_msgs,
208 void *cls,
209 GNUNET_FRAGMENT_MessageProcessor proc,
210 GNUNET_DEFRAGMENT_AckProcessor ackp)
211{
212 struct GNUNET_DEFRAGMENT_Context *dc;
213
214 dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context));
215 dc->stats = stats;
216 dc->cls = cls;
217 dc->proc = proc;
218 dc->ackp = ackp;
219 dc->num_msgs = num_msgs;
220 dc->mtu = mtu;
221 dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
222 return dc;
223}
224
225
226/**
227 * Destroy the given defragmentation context.
228 *
229 * @param dc defragmentation context
230 */
231void
232GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
233{
234 struct MessageContext *mc;
235
236 while (NULL != (mc = dc->head))
237 {
238 GNUNET_CONTAINER_DLL_remove (dc->head,
239 dc->tail,
240 mc);
241 dc->list_size--;
242 if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
243 {
244 GNUNET_SCHEDULER_cancel (mc->ack_task);
245 mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
246 }
247 GNUNET_free (mc);
248 }
249 GNUNET_assert (0 == dc->list_size);
250 GNUNET_free (dc);
251}
252
253
254/**
255 * Send acknowledgement to the other peer now.
256 *
257 * @param cls the message context
258 * @param tc the scheduler context
259 */
260static void
261send_ack (void *cls,
262 const struct GNUNET_SCHEDULER_TaskContext *tc)
263{
264 struct MessageContext *mc = cls;
265 struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
266 struct FragmentAcknowledgement fa;
267
268 mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
269 fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
270 fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
271 fa.fragment_id = htonl (mc->fragment_id);
272 fa.bits = GNUNET_htonll (mc->bits);
273 dc->ackp (dc->cls, mc->fragment_id, &fa.header);
274}
275
276
277/**
278 * This function is from the GNU Scientific Library, linear/fit.c,
279 * (C) 2000 Brian Gough
280 */
281static void
282gsl_fit_mul (const double *x, const size_t xstride,
283 const double *y, const size_t ystride,
284 const size_t n,
285 double *c1, double *cov_11, double *sumsq)
286{
287 double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
288
289 size_t i;
290
291 for (i = 0; i < n; i++)
292 {
293 m_x += (x[i * xstride] - m_x) / (i + 1.0);
294 m_y += (y[i * ystride] - m_y) / (i + 1.0);
295 }
296
297 for (i = 0; i < n; i++)
298 {
299 const double dx = x[i * xstride] - m_x;
300 const double dy = y[i * ystride] - m_y;
301
302 m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
303 m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
304 }
305
306 /* In terms of y = b x */
307
308 {
309 double s2 = 0, d2 = 0;
310 double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
311
312 *c1 = b;
313
314 /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
315
316 for (i = 0; i < n; i++)
317 {
318 const double dx = x[i * xstride] - m_x;
319 const double dy = y[i * ystride] - m_y;
320 const double d = (m_y - b * m_x) + dy - b * dx;
321 d2 += d * d;
322 }
323
324 s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
325
326 *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
327
328 *sumsq = d2;
329 }
330}
331
332
333/**
334 * Estimate the latency between messages based on the most recent
335 * message time stamps.
336 *
337 * @param mc context with time stamps
338 * @return average delay between time stamps (based on least-squares fit)
339 */
340static struct GNUNET_TIME_Relative
341estimate_latency (struct MessageContext *mc)
342{
343 struct FragTimes *first;
344 size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
345 double x[total];
346 double y[total];
347 size_t i;
348 double c1;
349 double cov11;
350 double sumsq;
351 struct GNUNET_TIME_Relative ret;
352
353 first = &mc->frag_times[mc->frag_times_start_offset];
354 GNUNET_assert (total > 1);
355 for (i=0;i<total;i++)
356 {
357 x[i] = (double) i;
358 y[i] = (double) (first[i].time.abs_value - first[0].time.abs_value);
359 }
360 gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
361 ret.rel_value = (uint64_t) c1;
362 return ret;
363};
364
365
366/**
367 * Discard the message context that was inactive for the longest time.
368 *
369 * @param dc defragmentation context
370 */
371static void
372discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
373{
374 struct MessageContext *old;
375 struct MessageContext *pos;
376
377 old = NULL;
378 pos = dc->head;
379 while (NULL != pos)
380 {
381 if ( (old == NULL) ||
382 (old->last_update.abs_value > pos->last_update.abs_value) )
383 old = pos;
384 pos = pos->next;
385 }
386 GNUNET_assert (NULL != old);
387 GNUNET_CONTAINER_DLL_remove (dc->head,
388 dc->tail,
389 old);
390 dc->list_size--;
391 if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
392 GNUNET_SCHEDULER_cancel (old->ack_task);
393 GNUNET_free (old);
394 fprintf (stderr, "D");
395}
396
397
398/**
399 * We have received a fragment. Process it.
400 *
401 * @param dc the context
402 * @param msg the message that was received
403 * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
404 */
405int
406GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
407 const struct GNUNET_MessageHeader *msg)
408{
409 struct MessageContext *mc;
410 const struct FragmentHeader *fh;
411 uint16_t msize;
412 uint16_t foff;
413 uint32_t fid;
414 char *mbuf;
415 unsigned int bit;
416 struct GNUNET_TIME_Absolute now;
417 struct GNUNET_TIME_Relative delay;
418 unsigned int bc;
419 unsigned int b;
420 unsigned int n;
421 int duplicate;
422
423 if (ntohs(msg->size) < sizeof (struct FragmentHeader))
424 {
425 GNUNET_break_op (0);
426 return GNUNET_SYSERR;
427 }
428 if (ntohs (msg->size) > dc->mtu)
429 {
430 GNUNET_break_op (0);
431 return GNUNET_SYSERR;
432 }
433 fh = (const struct FragmentHeader*) msg;
434 msize = ntohs (fh->total_size);
435 fid = ntohl (fh->fragment_id);
436 foff = ntohs (fh->offset);
437 if (foff >= msize)
438 {
439 GNUNET_break_op (0);
440 return GNUNET_SYSERR;
441 }
442 if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
443 {
444 GNUNET_break_op (0);
445 return GNUNET_SYSERR;
446 }
447 GNUNET_STATISTICS_update (dc->stats,
448 _("Fragments received"),
449 1,
450 GNUNET_NO);
451 mc = dc->head;
452 while ( (NULL != mc) &&
453 (fid != mc->fragment_id) )
454 mc = mc->next;
455 bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
456 if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size)
457 - sizeof (struct FragmentHeader) > msize)
458 {
459 /* payload extends past total message size */
460 GNUNET_break_op (0);
461 return GNUNET_SYSERR;
462 }
463 if ( (NULL != mc) && (msize != mc->total_size) )
464 {
465 /* inconsistent message size */
466 GNUNET_break_op (0);
467 return GNUNET_SYSERR;
468 }
469 now = GNUNET_TIME_absolute_get ();
470 if (NULL == mc)
471 {
472 mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
473 mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
474 mc->dc = dc;
475 mc->total_size = msize;
476 mc->fragment_id = fid;
477 mc->last_update = now;
478 n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader));
479 if (n == 64)
480 mc->bits = UINT64_MAX; /* set all 64 bit */
481 else
482 mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */
483 GNUNET_CONTAINER_DLL_insert (dc->head,
484 dc->tail,
485 mc);
486 dc->list_size++;
487 if (dc->list_size > dc->num_msgs)
488 discard_oldest_mc (dc);
489 }
490
491 /* copy data to 'mc' */
492 if (0 != (mc->bits & (1LL << bit)))
493 {
494 mc->bits -= 1LL << bit;
495 mbuf = (char* )&mc[1];
496 memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))],
497 &fh[1],
498 ntohs (msg->size) - sizeof (struct FragmentHeader));
499 mc->last_update = now;
500 if (bit < mc->last_bit)
501 mc->frag_times_start_offset = mc->frag_times_write_offset;
502 mc->last_bit = bit;
503 mc->frag_times[mc->frag_times_write_offset].time = now;
504 mc->frag_times[mc->frag_times_write_offset].bit = bit;
505 mc->frag_times_write_offset++;
506 duplicate = GNUNET_NO;
507 }
508 else
509 {
510 duplicate = GNUNET_YES;
511 GNUNET_STATISTICS_update (dc->stats,
512 _("Duplicate fragments received"),
513 1,
514 GNUNET_NO);
515 }
516
517 /* count number of missing fragments */
518 bc = 0;
519 for (b=0;b<64;b++)
520 if (0 != (mc->bits & (1LL << b))) bc++;
521 if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
522 dc->latency = estimate_latency (mc);
523 delay = GNUNET_TIME_relative_multiply (dc->latency,
524 bc + 1);
525 if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */
526 delay = GNUNET_TIME_UNIT_ZERO;
527 if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
528 GNUNET_SCHEDULER_cancel (mc->ack_task);
529 mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
530 &send_ack,
531 mc);
532 if ( (duplicate == GNUNET_NO) &&
533 (0 == mc->bits) )
534 {
535 GNUNET_STATISTICS_update (dc->stats,
536 _("Messages defragmented"),
537 1,
538 GNUNET_NO);
539 /* message complete, notify! */
540 dc->proc (dc->cls,
541 mc->msg);
542 }
543 if (duplicate == GNUNET_YES)
544 return GNUNET_NO;
545 return GNUNET_YES;
546}
547
548/* end of defragmentation_new.c */
549