aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/fragmentation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fragmentation/fragmentation.c')
-rw-r--r--src/fragmentation/fragmentation.c702
1 files changed, 702 insertions, 0 deletions
diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c
new file mode 100644
index 000000000..9550663c7
--- /dev/null
+++ b/src/fragmentation/fragmentation.c
@@ -0,0 +1,702 @@
1/*
2 This file is part of GNUnet
3 (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20/**
21 * @file fragmentation/fragmentation.c
22 * @brief fragmentation and defragmentation, this code allows
23 * sending and receiving messages that are larger than
24 * the MTU of the transport. Messages are still limited
25 * to a maximum size of 65535 bytes, which is a good
26 * idea because otherwise we may need ungainly fragmentation
27 * buffers. Each connected peer can have at most one
28 * fragmented packet at any given point in time (prevents
29 * DoS attacks). Fragmented messages that have not been
30 * completed after a certain amount of time are discarded.
31 * @author Christian Grothoff
32 */
33
34#include "platform.h"
35#include "gnunet_fragmentation_lib.h"
36
37/**
38 * Message fragment. This header is followed
39 * by the actual data of the fragment.
40 */
41struct Fragment
42{
43
44 struct GNUNET_MessageHeader header;
45
46 /**
47 * Fragment offset.
48 */
49 uint32_t off GNUNET_PACKED;
50
51 /**
52 * "unique" id for the fragment
53 */
54 uint64_t id GNUNET_PACKED;
55
56};
57
58
59/**
60 * Defragmentation context.
61 */
62struct GNUNET_FRAGMENT_Context
63{
64};
65
66
67/**
68 * Fragment an over-sized message.
69 *
70 * @param msg the message to fragment
71 * @param mtu the maximum message size
72 * @param proc function to call for each fragment
73 * @param proc_cls closure for proc
74 */
75void
76GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
77 uint16_t mtu,
78 GNUNET_FRAGMENT_MessageProcessor proc,
79 void *proc_cls)
80{
81 GNUNET_assert (0);
82}
83
84
85/**
86 * Create a defragmentation context.
87 *
88 * @param stats statistics context
89 * @param proc function to call with defragmented messages
90 * @param proc_cls closure for proc
91 * @return the defragmentation context
92 */
93struct GNUNET_FRAGMENT_Context *
94GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
95 GNUNET_FRAGMENT_MessageProcessor proc,
96 void *proc_cls)
97{
98 return NULL;
99}
100
101
102/**
103 * Destroy the given defragmentation context.
104 */
105void
106GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
107{
108 GNUNET_assert (0);
109}
110
111
112/**
113 * We have received a fragment. Process it.
114 *
115 * @param ctx the context
116 * @param sender who transmitted the fragment
117 * @param msg the message that was received
118 */
119void
120GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
121 const struct GNUNET_PeerIdentity *sender,
122 const struct GNUNET_MessageHeader *msg)
123{
124 GNUNET_assert (0);
125}
126
127
128
129#if 0
130
131/**
132 * How many buckets does the fragment hash table
133 * have?
134 */
135#define DEFRAG_BUCKET_COUNT 16
136
137/**
138 * After how long do fragments time out?
139 */
140#ifndef DEFRAGMENTATION_TIMEOUT
141#define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
142#endif
143
144/**
145 * Entry in the linked list of fragments.
146 */
147typedef struct FL
148{
149 struct FL *link;
150 P2P_fragmentation_MESSAGE *frag;
151} FL;
152
153/**
154 * Entry in the GNUNET_hash table of fragments.
155 */
156typedef struct FC
157{
158 struct FC *next;
159 FL *head;
160 GNUNET_PeerIdentity sender;
161 int id;
162 GNUNET_CronTime ttl;
163} FC;
164
165#define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
166
167static GNUNET_CoreAPIForPlugins *coreAPI;
168
169static GNUNET_Stats_ServiceAPI *stats;
170
171static int stat_defragmented;
172
173static int stat_fragmented;
174
175static int stat_discarded;
176
177/**
178 * Hashtable *with* collision management!
179 */
180static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
181
182/**
183 * Lock for the defragmentation cache.
184 */
185static struct GNUNET_Mutex *defragCacheLock;
186
187static void
188freeFL (FL * fl, int c)
189{
190 while (fl != NULL)
191 {
192 FL *link = fl->link;
193 if (stats != NULL)
194 stats->change (stat_discarded, c);
195 GNUNET_free (fl->frag);
196 GNUNET_free (fl);
197 fl = link;
198 }
199}
200
201/**
202 * This cron job ensures that we purge buffers of fragments
203 * that have timed out. It can run in much longer intervals
204 * than the defragmentationCron, e.g. every 60s.
205 * <p>
206 * This method goes through the hashtable, finds entries that
207 * have timed out and removes them (and all the fragments that
208 * belong to the entry). It's a bit more complicated as the
209 * collision list is also collapsed.
210 */
211static void
212defragmentationPurgeCron (void *unused)
213{
214 int i;
215 FC *smf;
216 FC *next;
217 FC *last;
218
219 GNUNET_mutex_lock (defragCacheLock);
220 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
221 {
222 last = NULL;
223 smf = defragmentationCache[i];
224 while (smf != NULL)
225 {
226 if (smf->ttl < GNUNET_get_time ())
227 {
228 /* free linked list of fragments */
229 freeFL (smf->head, 1);
230 next = smf->next;
231 GNUNET_free (smf);
232 if (last == NULL)
233 defragmentationCache[i] = next;
234 else
235 last->next = next;
236 smf = next;
237 }
238 else
239 {
240 last = smf;
241 smf = smf->next;
242 }
243 } /* while smf != NULL */
244 } /* for all buckets */
245 GNUNET_mutex_unlock (defragCacheLock);
246}
247
248/**
249 * Check if this fragment-list is complete. If yes, put it together,
250 * process and free all buffers. Does not free the pep
251 * itself (but sets the TTL to 0 to have the cron free it
252 * in the next iteration).
253 *
254 * @param pep the entry in the GNUNET_hash table
255 */
256static void
257checkComplete (FC * pep)
258{
259 FL *pos;
260 unsigned short off;
261 unsigned short len;
262 char *msg;
263
264 GNUNET_GE_ASSERT (NULL, pep != NULL);
265 pos = pep->head;
266 if (pos == NULL)
267 return;
268 len = ntohs (pos->frag->len);
269 if (len == 0)
270 goto CLEANUP; /* really bad error! */
271 off = 0;
272 while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
273 {
274 if (off >= off + FRAGSIZE (pos))
275 goto CLEANUP; /* error! */
276 if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
277 off = ntohs (pos->frag->off) + FRAGSIZE (pos);
278 else
279 goto CLEANUP; /* error! */
280 pos = pos->link;
281 }
282 if (off < len)
283 return; /* some fragment is still missing */
284
285 msg = GNUNET_malloc (len);
286 pos = pep->head;
287 while (pos != NULL)
288 {
289 memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
290 pos = pos->link;
291 }
292 if (stats != NULL)
293 stats->change (stat_defragmented, 1);
294#if 0
295 printf ("Finished defragmentation!\n");
296#endif
297 /* handle message! */
298 coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
299 GNUNET_free (msg);
300CLEANUP:
301 /* free fragment buffers */
302 freeFL (pep->head, 0);
303 pep->head = NULL;
304 pep->ttl = 0;
305}
306
307/**
308 * See if the new fragment is a part of this entry and join them if
309 * yes. Return GNUNET_SYSERR if the fragments do not match. Return GNUNET_OK if
310 * the fragments do match and the fragment has been processed. The
311 * defragCacheLock is already acquired by the caller whenever this
312 * method is called.<p>
313 *
314 * @param entry the entry in the cache
315 * @param pep the new entry
316 * @param packet the ip part in the new entry
317 */
318static int
319tryJoin (FC * entry,
320 const GNUNET_PeerIdentity * sender,
321 const P2P_fragmentation_MESSAGE * packet)
322{
323 /* frame before ours; may end in the middle of
324 our frame or before it starts; NULL if we are
325 the earliest position we have received so far */
326 FL *before;
327 /* frame after ours; may start in the middle of
328 our frame or after it; NULL if we are the last
329 fragment we have received so far */
330 FL *after;
331 /* current position in the frame-list */
332 FL *pos;
333 /* the new entry that we're inserting */
334 FL *pep;
335 FL *tmp;
336 unsigned short end;
337
338 GNUNET_GE_ASSERT (NULL, entry != NULL);
339 if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
340 return GNUNET_SYSERR; /* wrong fragment list, try another! */
341 if (ntohl (packet->id) != entry->id)
342 return GNUNET_SYSERR; /* wrong fragment list, try another! */
343#if 0
344 printf ("Received fragment %u from %u to %u\n",
345 ntohl (packet->id),
346 ntohs (packet->off),
347 ntohs (packet->off) + ntohs (packet->header.size) -
348 sizeof (P2P_fragmentation_MESSAGE));
349#endif
350 pos = entry->head;
351 if ((pos != NULL) && (packet->len != pos->frag->len))
352 return GNUNET_SYSERR; /* wrong fragment size */
353
354 before = NULL;
355 /* find the before-frame */
356 while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
357 {
358 before = pos;
359 pos = pos->link;
360 }
361
362 /* find the after-frame */
363 end =
364 ntohs (packet->off) + ntohs (packet->header.size) -
365 sizeof (P2P_fragmentation_MESSAGE);
366 if (end <= ntohs (packet->off))
367 {
368 GNUNET_GE_LOG (NULL,
369 GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
370 "Received invalid fragment at %s:%d\n", __FILE__,
371 __LINE__);
372 return GNUNET_SYSERR; /* yuck! integer overflow! */
373 }
374
375 if (before != NULL)
376 after = before;
377 else
378 after = entry->head;
379 while ((after != NULL) && (ntohs (after->frag->off) < end))
380 after = after->link;
381
382 if ((before != NULL) && (before == after))
383 {
384 /* this implies after or before != NULL and thereby the new
385 fragment is redundant as it is fully enclosed in an earlier
386 fragment */
387 if (stats != NULL)
388 stats->change (stat_defragmented, 1);
389 return GNUNET_OK; /* drop, there is a packet that spans our range! */
390 }
391
392 if ((before != NULL) &&
393 (after != NULL) &&
394 ((htons (before->frag->off) +
395 FRAGSIZE (before)) >= htons (after->frag->off)))
396 {
397 /* this implies that the fragment that starts before us and the
398 fragment that comes after this one leave no space in the middle
399 or even overlap; thus we can drop this redundant piece */
400 if (stats != NULL)
401 stats->change (stat_defragmented, 1);
402 return GNUNET_OK;
403 }
404
405 /* allocate pep */
406 pep = GNUNET_malloc (sizeof (FC));
407 pep->frag = GNUNET_malloc (ntohs (packet->header.size));
408 memcpy (pep->frag, packet, ntohs (packet->header.size));
409 pep->link = NULL;
410
411 if (before == NULL)
412 {
413 pep->link = after;
414 pos = entry->head;
415 while (pos != after)
416 {
417 tmp = pos->link;
418 GNUNET_free (pos->frag);
419 GNUNET_free (pos);
420 pos = tmp;
421 }
422 entry->head = pep;
423 goto FINISH;
424 /* end of insert first */
425 }
426
427 if (after == NULL)
428 {
429 /* insert last: find the end, free everything after it */
430 freeFL (before->link, 1);
431 before->link = pep;
432 goto FINISH;
433 }
434
435 /* ok, we are filling the middle between two fragments; insert. If
436 there is anything else in the middle, it can be dropped as we're
437 bigger & cover that area as well */
438 /* free everything between before and after */
439 pos = before->link;
440 while (pos != after)
441 {
442 tmp = pos->link;
443 GNUNET_free (pos->frag);
444 GNUNET_free (pos);
445 pos = tmp;
446 }
447 before->link = pep;
448 pep->link = after;
449
450FINISH:
451 entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
452 checkComplete (entry);
453 return GNUNET_OK;
454}
455
456/**
457 * Defragment the given fragment and pass to handler once
458 * defragmentation is complete.
459 *
460 * @param frag the packet to defragment
461 * @return GNUNET_SYSERR if the fragment is invalid
462 */
463static int
464processFragment (const GNUNET_PeerIdentity * sender,
465 const GNUNET_MessageHeader * frag)
466{
467 unsigned int hash;
468 FC *smf;
469
470 if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
471 return GNUNET_SYSERR;
472
473 GNUNET_mutex_lock (defragCacheLock);
474 hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
475 smf = defragmentationCache[hash];
476 while (smf != NULL)
477 {
478 if (GNUNET_OK ==
479 tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
480 {
481 GNUNET_mutex_unlock (defragCacheLock);
482 return GNUNET_OK;
483 }
484 if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
485 {
486 freeFL (smf->head, 1);
487 break;
488 }
489 smf = smf->next;
490 }
491 if (smf == NULL)
492 {
493 smf = GNUNET_malloc (sizeof (FC));
494 smf->next = defragmentationCache[hash];
495 defragmentationCache[hash] = smf;
496 smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
497 smf->sender = *sender;
498 }
499 smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
500 smf->head = GNUNET_malloc (sizeof (FL));
501 smf->head->link = NULL;
502 smf->head->frag = GNUNET_malloc (ntohs (frag->size));
503 memcpy (smf->head->frag, frag, ntohs (frag->size));
504
505 GNUNET_mutex_unlock (defragCacheLock);
506 return GNUNET_OK;
507}
508
509typedef struct
510{
511 GNUNET_PeerIdentity sender;
512 /* maximums size of each fragment */
513 unsigned short mtu;
514 /** how long is this message part expected to be? */
515 unsigned short len;
516 /** when did we intend to transmit? */
517 GNUNET_CronTime transmissionTime;
518} FragmentBMC;
519
520/**
521 * Send a message that had to be fragmented (right now!). First grabs
522 * the first part of the message (obtained from ctx->se) and stores
523 * that in a P2P_fragmentation_MESSAGE envelope. The remaining fragments are
524 * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
525 * will be transmitted next). The logic here is that if the priority
526 * for the first fragment was sufficiently high, the priority should
527 * also have been sufficiently high for all of the other fragments (at
528 * this time) since they have the same priority. And we want to make
529 * sure that we send all of them since just sending the first fragment
530 * and then going to other messages of equal priority would not be
531 * such a great idea (i.e. would just waste bandwidth).
532 */
533static int
534fragmentBMC (void *buf, void *cls, unsigned short len)
535{
536 FragmentBMC *ctx = cls;
537 static int idGen = 0;
538 P2P_fragmentation_MESSAGE *frag;
539 unsigned int pos;
540 int id;
541 unsigned short mlen;
542
543 if ((len < ctx->mtu) || (buf == NULL))
544 {
545 GNUNET_free (ctx);
546 return GNUNET_SYSERR;
547 }
548 if (stats != NULL)
549 stats->change (stat_fragmented, 1);
550 id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
551 /* write first fragment to buf */
552 frag = (P2P_fragmentation_MESSAGE *) buf;
553 frag->header.size = htons (len);
554 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
555 frag->id = id;
556 frag->off = htons (0);
557 frag->len = htons (ctx->len);
558 memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
559
560 /* create remaining fragments, add to queue! */
561 pos = len - sizeof (P2P_fragmentation_MESSAGE);
562 frag = GNUNET_malloc (ctx->mtu);
563 while (pos < ctx->len)
564 {
565 mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
566 if (mlen > ctx->mtu)
567 mlen = ctx->mtu;
568 GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
569 frag->header.size = htons (mlen);
570 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
571 frag->id = id;
572 frag->off = htons (pos);
573 frag->len = htons (ctx->len);
574 memcpy (&frag[1],
575 &((char *) (&ctx[1]))[pos],
576 mlen - sizeof (P2P_fragmentation_MESSAGE));
577 coreAPI->ciphertext_send (&ctx->sender,
578 &frag->header,
579 GNUNET_EXTREME_PRIORITY,
580 ctx->transmissionTime - GNUNET_get_time ());
581 pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
582 }
583 GNUNET_GE_ASSERT (NULL, pos == ctx->len);
584 GNUNET_free (frag);
585 GNUNET_free (ctx);
586 return GNUNET_OK;
587}
588
589/**
590 * The given message must be fragmented. Produce a placeholder that
591 * corresponds to the first fragment. Once that fragment is scheduled
592 * for transmission, the placeholder should automatically add all of
593 * the other fragments (with very high priority).
594 */
595void
596fragment (const GNUNET_PeerIdentity * peer,
597 unsigned int mtu,
598 unsigned int prio,
599 unsigned int targetTime,
600 unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
601{
602 FragmentBMC *fbmc;
603 int xlen;
604
605 GNUNET_GE_ASSERT (NULL, len > mtu);
606 GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
607 fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
608 fbmc->mtu = mtu;
609 fbmc->sender = *peer;
610 fbmc->transmissionTime = targetTime;
611 fbmc->len = len;
612 if (bmc == NULL)
613 {
614 memcpy (&fbmc[1], bmcClosure, len);
615 GNUNET_free (bmcClosure);
616 }
617 else
618 {
619 if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
620 {
621 GNUNET_free (fbmc);
622 return;
623 }
624 }
625 xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
626 coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len, /* compute new priority */
627 targetTime);
628}
629
630/**
631 * Initialize Fragmentation module.
632 */
633GNUNET_Fragmentation_ServiceAPI *
634provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
635{
636 static GNUNET_Fragmentation_ServiceAPI ret;
637 int i;
638
639 coreAPI = capi;
640 stats = coreAPI->service_request ("stats");
641 if (stats != NULL)
642 {
643 stat_defragmented =
644 stats->create (gettext_noop ("# messages defragmented"));
645 stat_fragmented =
646 stats->create (gettext_noop ("# messages fragmented"));
647 stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
648 }
649 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
650 defragmentationCache[i] = NULL;
651 defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
652 GNUNET_cron_add_job (coreAPI->cron,
653 &defragmentationPurgeCron,
654 60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
655 NULL);
656 GNUNET_GE_LOG (capi->ectx,
657 GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
658 _("`%s' registering handler %d\n"), "fragmentation",
659 GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
660 capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
661 &processFragment);
662
663 ret.fragment = &fragment;
664 return &ret;
665}
666
667/**
668 * Shutdown fragmentation.
669 */
670void
671release_module_fragmentation ()
672{
673 int i;
674
675 coreAPI->p2p_ciphertext_handler_unregister
676 (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
677 GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
678 60 * GNUNET_CRON_SECONDS, NULL);
679 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
680 {
681 FC *pos = defragmentationCache[i];
682 while (pos != NULL)
683 {
684 FC *next = pos->next;
685 freeFL (pos->head, 1);
686 GNUNET_free (pos);
687 pos = next;
688 }
689 }
690 if (stats != NULL)
691 {
692 coreAPI->service_release (stats);
693 stats = NULL;
694 }
695 GNUNET_mutex_destroy (defragCacheLock);
696 defragCacheLock = NULL;
697 coreAPI = NULL;
698}
699
700#endif
701
702/* end of fragmentation.c */