From 6ce274c7e9fb01c76dc904d907fce9c893d0fc44 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 11 Jul 2011 22:34:11 +0000 Subject: frag --- src/fragmentation/defragmentation_new.c | 69 +++++++++++------- src/fragmentation/fragmentation_new.c | 24 ++++--- src/fragmentation/test_fragmentation.c | 120 ++++++++++++++++++++++++++++---- 3 files changed, 167 insertions(+), 46 deletions(-) (limited to 'src/fragmentation') diff --git a/src/fragmentation/defragmentation_new.c b/src/fragmentation/defragmentation_new.c index 2e1136f9f..379ec57d4 100644 --- a/src/fragmentation/defragmentation_new.c +++ b/src/fragmentation/defragmentation_new.c @@ -391,6 +391,7 @@ discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc) if (GNUNET_SCHEDULER_NO_TASK != old->ack_task) GNUNET_SCHEDULER_cancel (old->ack_task); GNUNET_free (old); + fprintf (stderr, "D"); } @@ -399,8 +400,9 @@ discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc) * * @param dc the context * @param msg the message that was received + * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error */ -void +int GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, const struct GNUNET_MessageHeader *msg) { @@ -415,25 +417,32 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, struct GNUNET_TIME_Relative delay; unsigned int bc; unsigned int b; + unsigned int n; + int duplicate; if (ntohs(msg->size) < sizeof (struct FragmentHeader)) { GNUNET_break_op (0); - return; + return GNUNET_SYSERR; } if (ntohs (msg->size) > dc->mtu) { GNUNET_break_op (0); - return; + return GNUNET_SYSERR; } fh = (const struct FragmentHeader*) msg; msize = ntohs (fh->total_size); fid = ntohl (fh->fragment_id); - foff = ntohl (fh->offset); + foff = ntohs (fh->offset); if (foff >= msize) { GNUNET_break_op (0); - return; + return GNUNET_SYSERR; + } + if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader)))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; } GNUNET_STATISTICS_update (dc->stats, _("Fragments received"), @@ -443,19 +452,19 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, while ( (NULL != mc) && (fid != mc->fragment_id) ) mc = mc->next; - bit = foff / dc->mtu; - if (bit * dc->mtu + ntohs (msg->size) + bit = foff / (dc->mtu - sizeof (struct FragmentHeader)); + if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) - sizeof (struct FragmentHeader) > msize) { /* payload extends past total message size */ GNUNET_break_op (0); - return; + return GNUNET_SYSERR; } if ( (NULL != mc) && (msize != mc->total_size) ) { /* inconsistent message size */ GNUNET_break_op (0); - return; + return GNUNET_SYSERR; } now = GNUNET_TIME_absolute_get (); if (NULL == mc) @@ -466,7 +475,11 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, mc->total_size = msize; mc->fragment_id = fid; mc->last_update = now; - mc->bits = (msize + dc->mtu - 1) / (dc->mtu - sizeof (struct FragmentHeader)); + n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader)); + if (n == 64) + mc->bits = UINT64_MAX; /* set all 64 bit */ + else + mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */ GNUNET_CONTAINER_DLL_insert (dc->head, dc->tail, mc); @@ -476,11 +489,11 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, } /* copy data to 'mc' */ - if (0 != (mc->bits & (1 << bit))) + if (0 != (mc->bits & (1LL << bit))) { - mc->bits -= 1 << bit; + mc->bits -= 1LL << bit; mbuf = (char* )&mc[1]; - memcpy (&mbuf[bit * dc->mtu], + memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1], ntohs (msg->size) - sizeof (struct FragmentHeader)); mc->last_update = now; @@ -490,19 +503,11 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, mc->frag_times[mc->frag_times_write_offset].time = now; mc->frag_times[mc->frag_times_write_offset].bit = bit; mc->frag_times_write_offset++; - if (0 == mc->bits) - { - /* message complete, notify! */ - dc->proc (dc->cls, - mc->msg); - GNUNET_STATISTICS_update (dc->stats, - _("Messages defragmented"), - 1, - GNUNET_NO); - } + duplicate = GNUNET_NO; } else { + duplicate = GNUNET_YES; GNUNET_STATISTICS_update (dc->stats, _("Duplicate fragments received"), 1, @@ -512,18 +517,32 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, /* count number of missing fragments */ bc = 0; for (b=0;b<64;b++) - if (0 != (mc->bits & (1 << b))) bc++; + if (0 != (mc->bits & (1LL << b))) bc++; if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1) dc->latency = estimate_latency (mc); delay = GNUNET_TIME_relative_multiply (dc->latency, bc + 1); - if (0 == mc->bits) /* message complete, ACK now! */ + if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */ delay = GNUNET_TIME_UNIT_ZERO; if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) GNUNET_SCHEDULER_cancel (mc->ack_task); mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, &send_ack, mc); + if ( (duplicate == GNUNET_NO) && + (0 == mc->bits) ) + { + GNUNET_STATISTICS_update (dc->stats, + _("Messages defragmented"), + 1, + GNUNET_NO); + /* message complete, notify! */ + dc->proc (dc->cls, + mc->msg); + } + if (duplicate == GNUNET_YES) + return GNUNET_NO; + return GNUNET_YES; } /* end of defragmentation_new.c */ diff --git a/src/fragmentation/fragmentation_new.c b/src/fragmentation/fragmentation_new.c index 17b4bb0ab..dbbaa859b 100644 --- a/src/fragmentation/fragmentation_new.c +++ b/src/fragmentation/fragmentation_new.c @@ -127,7 +127,7 @@ transmit_next (void *cls, /* calculate delay */ wrap = 0; - while (0 == (fc->acks & (1 << fc->next_transmission))) + while (0 == (fc->acks & (1LL << fc->next_transmission))) { fc->next_transmission = (fc->next_transmission + 1) % 64; wrap |= (fc->next_transmission == 0); @@ -160,13 +160,12 @@ transmit_next (void *cls, fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT); fh->fragment_id = htonl (fc->fragment_id); fh->total_size = fc->msg->size; /* already in big-endian */ - fh->offset = htons (fc->mtu * bit); - memcpy (&fc[1], + fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit); + memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], fsize - sizeof (struct FragmentHeader)); - fc->proc (fc->proc_cls, &fh->header); if (NULL != fc->tracker) - GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); + GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); GNUNET_STATISTICS_update (fc->stats, _("Fragments transmitted"), 1, GNUNET_NO); @@ -198,6 +197,7 @@ transmit_next (void *cls, fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc); + fc->proc (fc->proc_cls, &fh->header); } @@ -252,12 +252,12 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); memcpy (&fc[1], msg, size); - bits = (size + mtu - 1) / (mtu - sizeof (struct FragmentHeader)); + bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader)); GNUNET_assert (bits <= 64); if (bits == 64) fc->acks = UINT64_MAX; /* set all 64 bit */ else - fc->acks = (1 << bits) - 1; /* set lowest 'bits' bit */ + fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */ fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc); return fc; @@ -300,8 +300,14 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4; } - - fc->acks &= abits; + if (abits != (fc->acks & abits)) + { + /* ID collission or message reordering, count! This should be rare! */ + GNUNET_STATISTICS_update (fc->stats, + _("Bits removed from ACK"), + 1, GNUNET_NO); + } + fc->acks = abits; if (0 != fc->acks) { /* more to transmit, do so right now (if tracker permits...) */ diff --git a/src/fragmentation/test_fragmentation.c b/src/fragmentation/test_fragmentation.c index 639c0ef17..8a9af04e7 100644 --- a/src/fragmentation/test_fragmentation.c +++ b/src/fragmentation/test_fragmentation.c @@ -25,14 +25,41 @@ #include "platform.h" #include "gnunet_fragmentation_lib.h" -#define NUM_MSGS 1 +#define VERBOSE GNUNET_NO +#define DETAILS GNUNET_NO + +/** + * Number of messages to transmit (note: each uses ~32k memory!) + */ +#define NUM_MSGS 500 + +/** + * MTU to force on fragmentation (must be > 1k + 12) + */ #define MTU 1111 +/** + * Simulate dropping of 1 out of how many messages? (must be > 1) + */ +#define DROPRATE 2 + static int ret = 1; +static unsigned int dups; + +static unsigned int fragc; + +static unsigned int frag_drops; + +static unsigned int acks; + +static unsigned int ack_drops; + static struct GNUNET_DEFRAGMENT_Context *defrag; +static struct GNUNET_BANDWIDTH_Tracker trackers[NUM_MSGS]; + static struct GNUNET_FRAGMENT_Context *frags[NUM_MSGS]; static void @@ -40,17 +67,36 @@ proc_msgs (void *cls, const struct GNUNET_MessageHeader *hdr) { static unsigned int total; + unsigned int i; + const char *buf; - fprintf (stderr, "!"); +#if DETAILS + fprintf (stderr, "!"); /* message complete, good! */ +#endif + buf = (const char*) hdr; + for (i=sizeof (struct GNUNET_MessageHeader);isize);i++) + GNUNET_assert (buf[i] == (char) i); total++; +#if ! DETAILS + if (0 == (total % (NUM_MSGS / 100))) + fprintf (stderr, "."); +#endif if (total == NUM_MSGS) { ret = 0; GNUNET_DEFRAGMENT_context_destroy (defrag); defrag = NULL; + for (i=0;itype = htons ((uint16_t) i); - msg->size = htons (MTU + 1 + i % (32 * 1024)); + msg->size = htons (MTU + 1 + (17 * i) % (32 * 1024)); frags[i] = GNUNET_FRAGMENT_context_create (NULL /* no stats */, MTU, - NULL /* no tracker -- infinite BW */, - GNUNET_TIME_UNIT_MILLISECONDS, + &trackers[i], + GNUNET_TIME_UNIT_SECONDS, msg, &proc_frac, NULL); @@ -149,6 +234,7 @@ main (int argc, char *argv[]) #endif NULL }; + unsigned int i; GNUNET_log_setup ("test-fragmentation", #if VERBOSE @@ -157,6 +243,16 @@ main (int argc, char *argv[]) "WARNING", #endif NULL); + for (i=0;i