From 40ff2323130e2cbe383f9b00fdb5ddfef995d347 Mon Sep 17 00:00:00 2001 From: Ji Lu Date: Thu, 18 Mar 2010 20:33:07 +0000 Subject: to be continued --- src/fragmentation/Makefile.am | 2 +- src/fragmentation/fragmentation.c | 1016 +++++++++++++++++++------------------ src/fragmentation/test_frag_ji.c | 51 +- 3 files changed, 571 insertions(+), 498 deletions(-) (limited to 'src/fragmentation') diff --git a/src/fragmentation/Makefile.am b/src/fragmentation/Makefile.am index d3a47d920..47915a35f 100644 --- a/src/fragmentation/Makefile.am +++ b/src/fragmentation/Makefile.am @@ -21,7 +21,7 @@ check_PROGRAMS = \ TESTS = $(check_PROGRAMS) test_fragmentation_SOURCES = \ - test_fragmentation.c + test_frag_ji.c test_fragmentation_LDADD = \ $(top_builddir)/src/fragmentation/libgnunetfragmentation.la \ $(top_builddir)/src/util/libgnunetutil.la diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c index 49680bf42..c67059f34 100644 --- a/src/fragmentation/fragmentation.c +++ b/src/fragmentation/fragmentation.c @@ -16,7 +16,7 @@ along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -*/ + */ /** * @file fragmentation/fragmentation.c * @brief fragmentation and defragmentation, this code allows @@ -42,31 +42,31 @@ struct Fragment { - struct GNUNET_MessageHeader header; - - /** - * Fragment offset. - */ - uint32_t off GNUNET_PACKED; + struct GNUNET_MessageHeader header; - /** - * "unique" id for the fragment - */ - uint64_t id GNUNET_PACKED; + /** + * Fragment offset. + */ + uint32_t off GNUNET_PACKED; - size_t mtu; - uint32_t totalNum; + /** + * "unique" id for the fragment + */ + uint64_t id GNUNET_PACKED; + size_t mtu; + uint32_t totalNum; + uint64_t totalSize; }; struct GNUNET_FRAGEMENT_Ctxbuffer{ - uint64_t id; - uint16_t size; - char * buff; - int counter; - struct GNUNET_TIME_Absolute receivedTime; - struct GNUNET_PeerIdentity *peerID; struct GNUNET_FRAGEMENT_Ctxbuffer *next; + uint64_t id; + uint16_t size; + char * buff; + int counter; + struct GNUNET_TIME_Absolute receivedTime; + struct GNUNET_PeerIdentity *peerID; int * num; }; @@ -78,6 +78,8 @@ struct GNUNET_FRAGMENT_Context { uint32_t maxNum; struct GNUNET_FRAGEMENT_Ctxbuffer *buffer; + GNUNET_FRAGMENT_MessageProcessor proc; + void *proc_cls; }; @@ -91,38 +93,59 @@ struct GNUNET_FRAGMENT_Context */ void GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg, - uint16_t mtu, - GNUNET_FRAGMENT_MessageProcessor proc, - void *proc_cls) + uint16_t mtu, + GNUNET_FRAGMENT_MessageProcessor proc, + void *proc_cls) { uint32_t id = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 256); size_t size = sizeof(struct Fragment); - if(msg->size > mtu){ - uint16_t lastSize = (msg->size) % (mtu-size); - int num = ceil(msg->size / mtu - size); - int i; - for(i = 0; isize) > mtu-size){ + uint16_t lastSize; + uint16_t num; + uint16_t i; + uint16_t actualNum; + lastSize = ntohs(msg->size) % (mtu-size); + num = ntohs(msg->size) / (mtu - size); + actualNum = num; + if(lastSize!=0){ + actualNum = num+1; + } + for(i = 0; iheader.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT); frag->id = htonl(id); - frag->off = htons(mtu*i); + frag->off = htons((mtu-size)*i); frag->mtu = htons(mtu); - if(lastSize!=0){ - frag->totalNum = htons(num+1); + frag->totalNum = htons(actualNum); + frag->totalSize = msg->size; + if(actualNum != num){ + if(i!=actualNum-1){ + frag->header.size = htons(mtu); + memcpy(&frag[1], msg + ntohs(frag->off), mtu - size); + } + else{ + frag->header.size = htons(lastSize+size); + memcpy(&frag[1], msg + ntohs(frag->off), lastSize); + } } else{ - frag->totalNum = htons(num); + frag->header.size = htons(mtu); + memcpy(&frag[1], msg + ntohs(frag->off), mtu - size); } - if(i!=num-1){ - frag->header.size = htons(mtu - size); - memcpy((char*)&frag[1], (char *)&msg[1]+frag->off, mtu - size); - } - else{ - frag->header.size = htons(lastSize); - memcpy((char*)&frag[1], (char *)&msg[1]+frag->off, lastSize); - } - proc(proc_cls, &frag->header); - free(frag); + proc(proc_cls, &frag->header); + GNUNET_free(frag); } } } @@ -138,14 +161,16 @@ GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg, */ struct GNUNET_FRAGMENT_Context * GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, - GNUNET_FRAGMENT_MessageProcessor proc, - void *proc_cls) -{ + GNUNET_FRAGMENT_MessageProcessor proc, + void *proc_cls) + { struct GNUNET_FRAGMENT_Context *ctx = (struct GNUNET_FRAGMENT_Context*)GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context)); ctx->maxNum = 100; + ctx->proc = proc; + ctx->proc_cls = proc_cls; ctx->buffer = NULL; return ctx; -} + } /** @@ -173,46 +198,77 @@ GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx) */ void GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *msg) + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *msg) { - uint16_t type = ntohs(msg->type); - int exited = 0, received = 0; - if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){ - return; - } - struct Fragment *frag = (struct Fragment *)msg; - struct GNUNET_FRAGEMENT_Ctxbuffer* buffer; - for(buffer = ctx->buffer; buffer!= NULL; buffer = buffer->next){ - if(ctx->buffer->counter == ntohs(frag->totalNum)){return;} - if(buffer->id == ntohl(frag->id)&&(buffer->peerID==sender)){ - exited = 1; - int i; - for(i = 0; itotalNum); i++){ - if(buffer->num[i]==ntohs(frag->off)/ntohs(frag->mtu)){ - received = 1; - break; - } - } - if(!received){ - buffer->num[buffer->counter++]=ntohs(frag->off)/ntohs(frag->mtu); - } - buffer->receivedTime = GNUNET_TIME_absolute_get (); - uint16_t size = ntohs(frag->header.size); - memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], size); - break; - } - } - if(!exited){ - buffer = (struct GNUNET_FRAGEMENT_Ctxbuffer* )GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer)); - buffer->num = (int*)GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int)); - buffer->num[buffer->counter++]=ntohs(frag->off)/ntohs(frag->mtu); - memcpy(buffer->peerID,sender,sizeof(struct GNUNET_PeerIdentity)); - buffer->receivedTime = GNUNET_TIME_absolute_get (); - uint16_t size = ntohs(frag->header.size); - memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], size); - } + uint16_t type = ntohs(msg->type); + int exist = 0, received = 0; + if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){ + return; + } + struct Fragment *frag = (struct Fragment *)msg; + struct GNUNET_FRAGEMENT_Ctxbuffer* buffer; + struct GNUNET_FRAGEMENT_Ctxbuffer* prev; + prev = NULL; + buffer = ctx->buffer; + while (buffer != NULL) + { +//for(buffer = ctx->buffer; buffer != NULL; buffer = buffer->next){ + if(buffer->id == ntohl(frag->id)&&(buffer->peerID==sender)){ + exist = 1; + break; + } + prev = buffer; + buffer = buffer->next; + } + + if (exist) + { + int i; + for(i = 0; itotalNum); i++){ + if(buffer->num[i]==ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment))){ + received = 1; + break; + } + } + } + + if(!exist){ + buffer = GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer)); + buffer->num = (int*)GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int)); + int j; + for(j = 0; jtotalNum); j++){ + buffer->num[j] = -10; + } + buffer->peerID = sender; + buffer->id = ntohl(frag->id); + buffer->receivedTime = GNUNET_TIME_absolute_get (); + uint64_t si = ntohs(frag->totalSize); + buffer->size = si; + buffer->buff = (char *)GNUNET_malloc(si); + buffer->next = ctx->buffer; + ctx->buffer = buffer; + } + + if(!received){ + buffer->num[buffer->counter++]=ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment)); + uint16_t sizeoffrag = ntohs(frag->header.size) - sizeof(struct Fragment); + memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], sizeoffrag); + buffer->receivedTime = GNUNET_TIME_absolute_get (); + } + if(buffer->counter == ntohs(frag->totalNum)) + { + ctx->proc(ctx->proc_cls, (struct GNUNET_MessageHeader *)buffer->buff); + if(prev==NULL){ + ctx->buffer = buffer->next; + } + else{ + prev->next = buffer->next; + } + GNUNET_free(buffer); + return; + } } @@ -237,8 +293,8 @@ GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx, */ typedef struct FL { - struct FL *link; - P2P_fragmentation_MESSAGE *frag; + struct FL *link; + P2P_fragmentation_MESSAGE *frag; } FL; /** @@ -246,11 +302,11 @@ typedef struct FL */ typedef struct FC { - struct FC *next; - FL *head; - GNUNET_PeerIdentity sender; - int id; - GNUNET_CronTime ttl; + struct FC *next; + FL *head; + GNUNET_PeerIdentity sender; + int id; + GNUNET_CronTime ttl; } FC; #define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE))) @@ -278,15 +334,15 @@ static struct GNUNET_Mutex *defragCacheLock; static void freeFL (FL * fl, int c) { - while (fl != NULL) - { - FL *link = fl->link; - if (stats != NULL) - stats->change (stat_discarded, c); - GNUNET_free (fl->frag); - GNUNET_free (fl); - fl = link; - } + while (fl != NULL) + { + FL *link = fl->link; + if (stats != NULL) + stats->change (stat_discarded, c); + GNUNET_free (fl->frag); + GNUNET_free (fl); + fl = link; + } } /** @@ -302,38 +358,38 @@ freeFL (FL * fl, int c) static void defragmentationPurgeCron (void *unused) { - int i; - FC *smf; - FC *next; - FC *last; - - GNUNET_mutex_lock (defragCacheLock); - for (i = 0; i < DEFRAG_BUCKET_COUNT; i++) - { - last = NULL; - smf = defragmentationCache[i]; - while (smf != NULL) - { - if (smf->ttl < GNUNET_get_time ()) - { - /* free linked list of fragments */ - freeFL (smf->head, 1); - next = smf->next; - GNUNET_free (smf); - if (last == NULL) - defragmentationCache[i] = next; - else - last->next = next; - smf = next; - } - else - { - last = smf; - smf = smf->next; - } - } /* while smf != NULL */ - } /* for all buckets */ - GNUNET_mutex_unlock (defragCacheLock); + int i; + FC *smf; + FC *next; + FC *last; + + GNUNET_mutex_lock (defragCacheLock); + for (i = 0; i < DEFRAG_BUCKET_COUNT; i++) + { + last = NULL; + smf = defragmentationCache[i]; + while (smf != NULL) + { + if (smf->ttl < GNUNET_get_time ()) + { + /* free linked list of fragments */ + freeFL (smf->head, 1); + next = smf->next; + GNUNET_free (smf); + if (last == NULL) + defragmentationCache[i] = next; + else + last->next = next; + smf = next; + } + else + { + last = smf; + smf = smf->next; + } + } /* while smf != NULL */ + } /* for all buckets */ + GNUNET_mutex_unlock (defragCacheLock); } /** @@ -347,52 +403,52 @@ defragmentationPurgeCron (void *unused) static void checkComplete (FC * pep) { - FL *pos; - unsigned short off; - unsigned short len; - char *msg; - - GNUNET_GE_ASSERT (NULL, pep != NULL); - pos = pep->head; - if (pos == NULL) - return; - len = ntohs (pos->frag->len); - if (len == 0) - goto CLEANUP; /* really bad error! */ - off = 0; - while ((pos != NULL) && (ntohs (pos->frag->off) <= off)) - { - if (off >= off + FRAGSIZE (pos)) - goto CLEANUP; /* error! */ - if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off) - off = ntohs (pos->frag->off) + FRAGSIZE (pos); - else - goto CLEANUP; /* error! */ - pos = pos->link; - } - if (off < len) - return; /* some fragment is still missing */ - - msg = GNUNET_malloc (len); - pos = pep->head; - while (pos != NULL) - { - memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos)); - pos = pos->link; - } - if (stats != NULL) - stats->change (stat_defragmented, 1); + FL *pos; + unsigned short off; + unsigned short len; + char *msg; + + GNUNET_GE_ASSERT (NULL, pep != NULL); + pos = pep->head; + if (pos == NULL) + return; + len = ntohs (pos->frag->len); + if (len == 0) + goto CLEANUP; /* really bad error! */ + off = 0; + while ((pos != NULL) && (ntohs (pos->frag->off) <= off)) + { + if (off >= off + FRAGSIZE (pos)) + goto CLEANUP; /* error! */ + if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off) + off = ntohs (pos->frag->off) + FRAGSIZE (pos); + else + goto CLEANUP; /* error! */ + pos = pos->link; + } + if (off < len) + return; /* some fragment is still missing */ + + msg = GNUNET_malloc (len); + pos = pep->head; + while (pos != NULL) + { + memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos)); + pos = pos->link; + } + if (stats != NULL) + stats->change (stat_defragmented, 1); #if 0 - printf ("Finished defragmentation!\n"); + printf ("Finished defragmentation!\n"); #endif - /* handle message! */ - coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL); - GNUNET_free (msg); -CLEANUP: - /* free fragment buffers */ - freeFL (pep->head, 0); - pep->head = NULL; - pep->ttl = 0; + /* handle message! */ + coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL); + GNUNET_free (msg); + CLEANUP: + /* free fragment buffers */ + freeFL (pep->head, 0); + pep->head = NULL; + pep->ttl = 0; } /** @@ -408,140 +464,140 @@ CLEANUP: */ static int tryJoin (FC * entry, - const GNUNET_PeerIdentity * sender, - const P2P_fragmentation_MESSAGE * packet) + const GNUNET_PeerIdentity * sender, + const P2P_fragmentation_MESSAGE * packet) { - /* frame before ours; may end in the middle of + /* frame before ours; may end in the middle of our frame or before it starts; NULL if we are the earliest position we have received so far */ - FL *before; - /* frame after ours; may start in the middle of + FL *before; + /* frame after ours; may start in the middle of our frame or after it; NULL if we are the last fragment we have received so far */ - FL *after; - /* current position in the frame-list */ - FL *pos; - /* the new entry that we're inserting */ - FL *pep; - FL *tmp; - unsigned short end; - - GNUNET_GE_ASSERT (NULL, entry != NULL); - if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity))) - return GNUNET_SYSERR; /* wrong fragment list, try another! */ - if (ntohl (packet->id) != entry->id) - return GNUNET_SYSERR; /* wrong fragment list, try another! */ + FL *after; + /* current position in the frame-list */ + FL *pos; + /* the new entry that we're inserting */ + FL *pep; + FL *tmp; + unsigned short end; + + GNUNET_GE_ASSERT (NULL, entry != NULL); + if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity))) + return GNUNET_SYSERR; /* wrong fragment list, try another! */ + if (ntohl (packet->id) != entry->id) + return GNUNET_SYSERR; /* wrong fragment list, try another! */ #if 0 - printf ("Received fragment %u from %u to %u\n", - ntohl (packet->id), - ntohs (packet->off), - ntohs (packet->off) + ntohs (packet->header.size) - - sizeof (P2P_fragmentation_MESSAGE)); + printf ("Received fragment %u from %u to %u\n", + ntohl (packet->id), + ntohs (packet->off), + ntohs (packet->off) + ntohs (packet->header.size) - + sizeof (P2P_fragmentation_MESSAGE)); #endif - pos = entry->head; - if ((pos != NULL) && (packet->len != pos->frag->len)) - return GNUNET_SYSERR; /* wrong fragment size */ - - before = NULL; - /* find the before-frame */ - while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off))) - { - before = pos; - pos = pos->link; - } - - /* find the after-frame */ - end = - ntohs (packet->off) + ntohs (packet->header.size) - - sizeof (P2P_fragmentation_MESSAGE); - if (end <= ntohs (packet->off)) - { - GNUNET_GE_LOG (NULL, - GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK, - "Received invalid fragment at %s:%d\n", __FILE__, - __LINE__); - return GNUNET_SYSERR; /* yuck! integer overflow! */ - } - - if (before != NULL) - after = before; - else - after = entry->head; - while ((after != NULL) && (ntohs (after->frag->off) < end)) - after = after->link; - - if ((before != NULL) && (before == after)) - { - /* this implies after or before != NULL and thereby the new + pos = entry->head; + if ((pos != NULL) && (packet->len != pos->frag->len)) + return GNUNET_SYSERR; /* wrong fragment size */ + + before = NULL; + /* find the before-frame */ + while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off))) + { + before = pos; + pos = pos->link; + } + + /* find the after-frame */ + end = + ntohs (packet->off) + ntohs (packet->header.size) - + sizeof (P2P_fragmentation_MESSAGE); + if (end <= ntohs (packet->off)) + { + GNUNET_GE_LOG (NULL, + GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK, + "Received invalid fragment at %s:%d\n", __FILE__, + __LINE__); + return GNUNET_SYSERR; /* yuck! integer overflow! */ + } + + if (before != NULL) + after = before; + else + after = entry->head; + while ((after != NULL) && (ntohs (after->frag->off) < end)) + after = after->link; + + if ((before != NULL) && (before == after)) + { + /* this implies after or before != NULL and thereby the new fragment is redundant as it is fully enclosed in an earlier fragment */ - if (stats != NULL) - stats->change (stat_defragmented, 1); - return GNUNET_OK; /* drop, there is a packet that spans our range! */ - } - - if ((before != NULL) && - (after != NULL) && - ((htons (before->frag->off) + - FRAGSIZE (before)) >= htons (after->frag->off))) - { - /* this implies that the fragment that starts before us and the + if (stats != NULL) + stats->change (stat_defragmented, 1); + return GNUNET_OK; /* drop, there is a packet that spans our range! */ + } + + if ((before != NULL) && + (after != NULL) && + ((htons (before->frag->off) + + FRAGSIZE (before)) >= htons (after->frag->off))) + { + /* this implies that the fragment that starts before us and the fragment that comes after this one leave no space in the middle or even overlap; thus we can drop this redundant piece */ - if (stats != NULL) - stats->change (stat_defragmented, 1); - return GNUNET_OK; - } - - /* allocate pep */ - pep = GNUNET_malloc (sizeof (FC)); - pep->frag = GNUNET_malloc (ntohs (packet->header.size)); - memcpy (pep->frag, packet, ntohs (packet->header.size)); - pep->link = NULL; - - if (before == NULL) - { - pep->link = after; - pos = entry->head; - while (pos != after) - { - tmp = pos->link; - GNUNET_free (pos->frag); - GNUNET_free (pos); - pos = tmp; - } - entry->head = pep; - goto FINISH; - /* end of insert first */ - } - - if (after == NULL) - { - /* insert last: find the end, free everything after it */ - freeFL (before->link, 1); - before->link = pep; - goto FINISH; - } - - /* ok, we are filling the middle between two fragments; insert. If + if (stats != NULL) + stats->change (stat_defragmented, 1); + return GNUNET_OK; + } + + /* allocate pep */ + pep = GNUNET_malloc (sizeof (FC)); + pep->frag = GNUNET_malloc (ntohs (packet->header.size)); + memcpy (pep->frag, packet, ntohs (packet->header.size)); + pep->link = NULL; + + if (before == NULL) + { + pep->link = after; + pos = entry->head; + while (pos != after) + { + tmp = pos->link; + GNUNET_free (pos->frag); + GNUNET_free (pos); + pos = tmp; + } + entry->head = pep; + goto FINISH; + /* end of insert first */ + } + + if (after == NULL) + { + /* insert last: find the end, free everything after it */ + freeFL (before->link, 1); + before->link = pep; + goto FINISH; + } + + /* ok, we are filling the middle between two fragments; insert. If there is anything else in the middle, it can be dropped as we're bigger & cover that area as well */ - /* free everything between before and after */ - pos = before->link; - while (pos != after) - { - tmp = pos->link; - GNUNET_free (pos->frag); - GNUNET_free (pos); - pos = tmp; - } - before->link = pep; - pep->link = after; - -FINISH: - entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT; - checkComplete (entry); - return GNUNET_OK; + /* free everything between before and after */ + pos = before->link; + while (pos != after) + { + tmp = pos->link; + GNUNET_free (pos->frag); + GNUNET_free (pos); + pos = tmp; + } + before->link = pep; + pep->link = after; + + FINISH: + entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT; + checkComplete (entry); + return GNUNET_OK; } /** @@ -553,59 +609,59 @@ FINISH: */ static int processFragment (const GNUNET_PeerIdentity * sender, - const GNUNET_MessageHeader * frag) + const GNUNET_MessageHeader * frag) { - unsigned int hash; - FC *smf; - - if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE)) - return GNUNET_SYSERR; - - GNUNET_mutex_lock (defragCacheLock); - hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT; - smf = defragmentationCache[hash]; - while (smf != NULL) - { - if (GNUNET_OK == - tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag)) - { - GNUNET_mutex_unlock (defragCacheLock); - return GNUNET_OK; - } - if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity))) - { - freeFL (smf->head, 1); - break; - } - smf = smf->next; - } - if (smf == NULL) - { - smf = GNUNET_malloc (sizeof (FC)); - smf->next = defragmentationCache[hash]; - defragmentationCache[hash] = smf; - smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT; - smf->sender = *sender; - } - smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id); - smf->head = GNUNET_malloc (sizeof (FL)); - smf->head->link = NULL; - smf->head->frag = GNUNET_malloc (ntohs (frag->size)); - memcpy (smf->head->frag, frag, ntohs (frag->size)); - - GNUNET_mutex_unlock (defragCacheLock); - return GNUNET_OK; + unsigned int hash; + FC *smf; + + if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE)) + return GNUNET_SYSERR; + + GNUNET_mutex_lock (defragCacheLock); + hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT; + smf = defragmentationCache[hash]; + while (smf != NULL) + { + if (GNUNET_OK == + tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag)) + { + GNUNET_mutex_unlock (defragCacheLock); + return GNUNET_OK; + } + if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity))) + { + freeFL (smf->head, 1); + break; + } + smf = smf->next; + } + if (smf == NULL) + { + smf = GNUNET_malloc (sizeof (FC)); + smf->next = defragmentationCache[hash]; + defragmentationCache[hash] = smf; + smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT; + smf->sender = *sender; + } + smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id); + smf->head = GNUNET_malloc (sizeof (FL)); + smf->head->link = NULL; + smf->head->frag = GNUNET_malloc (ntohs (frag->size)); + memcpy (smf->head->frag, frag, ntohs (frag->size)); + + GNUNET_mutex_unlock (defragCacheLock); + return GNUNET_OK; } typedef struct { - GNUNET_PeerIdentity sender; - /* maximums size of each fragment */ - unsigned short mtu; - /** how long is this message part expected to be? */ - unsigned short len; - /** when did we intend to transmit? */ - GNUNET_CronTime transmissionTime; + GNUNET_PeerIdentity sender; + /* maximums size of each fragment */ + unsigned short mtu; + /** how long is this message part expected to be? */ + unsigned short len; + /** when did we intend to transmit? */ + GNUNET_CronTime transmissionTime; } FragmentBMC; /** @@ -624,57 +680,57 @@ typedef struct static int fragmentBMC (void *buf, void *cls, unsigned short len) { - FragmentBMC *ctx = cls; - static int idGen = 0; - P2P_fragmentation_MESSAGE *frag; - unsigned int pos; - int id; - unsigned short mlen; - - if ((len < ctx->mtu) || (buf == NULL)) - { - GNUNET_free (ctx); - return GNUNET_SYSERR; - } - if (stats != NULL) - stats->change (stat_fragmented, 1); - id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512); - /* write first fragment to buf */ - frag = (P2P_fragmentation_MESSAGE *) buf; - frag->header.size = htons (len); - frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT); - frag->id = id; - frag->off = htons (0); - frag->len = htons (ctx->len); - memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE)); - - /* create remaining fragments, add to queue! */ - pos = len - sizeof (P2P_fragmentation_MESSAGE); - frag = GNUNET_malloc (ctx->mtu); - while (pos < ctx->len) - { - mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos; - if (mlen > ctx->mtu) - mlen = ctx->mtu; - GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE)); - frag->header.size = htons (mlen); - frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT); - frag->id = id; - frag->off = htons (pos); - frag->len = htons (ctx->len); - memcpy (&frag[1], - &((char *) (&ctx[1]))[pos], - mlen - sizeof (P2P_fragmentation_MESSAGE)); - coreAPI->ciphertext_send (&ctx->sender, - &frag->header, - GNUNET_EXTREME_PRIORITY, - ctx->transmissionTime - GNUNET_get_time ()); - pos += mlen - sizeof (P2P_fragmentation_MESSAGE); - } - GNUNET_GE_ASSERT (NULL, pos == ctx->len); - GNUNET_free (frag); - GNUNET_free (ctx); - return GNUNET_OK; + FragmentBMC *ctx = cls; + static int idGen = 0; + P2P_fragmentation_MESSAGE *frag; + unsigned int pos; + int id; + unsigned short mlen; + + if ((len < ctx->mtu) || (buf == NULL)) + { + GNUNET_free (ctx); + return GNUNET_SYSERR; + } + if (stats != NULL) + stats->change (stat_fragmented, 1); + id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512); + /* write first fragment to buf */ + frag = (P2P_fragmentation_MESSAGE *) buf; + frag->header.size = htons (len); + frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT); + frag->id = id; + frag->off = htons (0); + frag->len = htons (ctx->len); + memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE)); + + /* create remaining fragments, add to queue! */ + pos = len - sizeof (P2P_fragmentation_MESSAGE); + frag = GNUNET_malloc (ctx->mtu); + while (pos < ctx->len) + { + mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos; + if (mlen > ctx->mtu) + mlen = ctx->mtu; + GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE)); + frag->header.size = htons (mlen); + frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT); + frag->id = id; + frag->off = htons (pos); + frag->len = htons (ctx->len); + memcpy (&frag[1], + &((char *) (&ctx[1]))[pos], + mlen - sizeof (P2P_fragmentation_MESSAGE)); + coreAPI->ciphertext_send (&ctx->sender, + &frag->header, + GNUNET_EXTREME_PRIORITY, + ctx->transmissionTime - GNUNET_get_time ()); + pos += mlen - sizeof (P2P_fragmentation_MESSAGE); + } + GNUNET_GE_ASSERT (NULL, pos == ctx->len); + GNUNET_free (frag); + GNUNET_free (ctx); + return GNUNET_OK; } /** @@ -685,37 +741,37 @@ fragmentBMC (void *buf, void *cls, unsigned short len) */ void fragment (const GNUNET_PeerIdentity * peer, - unsigned int mtu, - unsigned int prio, - unsigned int targetTime, - unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure) + unsigned int mtu, + unsigned int prio, + unsigned int targetTime, + unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure) { - FragmentBMC *fbmc; - int xlen; - - GNUNET_GE_ASSERT (NULL, len > mtu); - GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE)); - fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len); - fbmc->mtu = mtu; - fbmc->sender = *peer; - fbmc->transmissionTime = targetTime; - fbmc->len = len; - if (bmc == NULL) - { - memcpy (&fbmc[1], bmcClosure, len); - GNUNET_free (bmcClosure); - } - else - { - if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len)) - { - GNUNET_free (fbmc); - return; - } - } - xlen = mtu - sizeof (P2P_fragmentation_MESSAGE); - coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len, /* compute new priority */ - targetTime); + FragmentBMC *fbmc; + int xlen; + + GNUNET_GE_ASSERT (NULL, len > mtu); + GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE)); + fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len); + fbmc->mtu = mtu; + fbmc->sender = *peer; + fbmc->transmissionTime = targetTime; + fbmc->len = len; + if (bmc == NULL) + { + memcpy (&fbmc[1], bmcClosure, len); + GNUNET_free (bmcClosure); + } + else + { + if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len)) + { + GNUNET_free (fbmc); + return; + } + } + xlen = mtu - sizeof (P2P_fragmentation_MESSAGE); + coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len, /* compute new priority */ + targetTime); } /** @@ -724,35 +780,35 @@ fragment (const GNUNET_PeerIdentity * peer, GNUNET_Fragmentation_ServiceAPI * provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi) { - static GNUNET_Fragmentation_ServiceAPI ret; - int i; - - coreAPI = capi; - stats = coreAPI->service_request ("stats"); - if (stats != NULL) - { - stat_defragmented = - stats->create (gettext_noop ("# messages defragmented")); - stat_fragmented = - stats->create (gettext_noop ("# messages fragmented")); - stat_discarded = stats->create (gettext_noop ("# fragments discarded")); - } - for (i = 0; i < DEFRAG_BUCKET_COUNT; i++) - defragmentationCache[i] = NULL; - defragCacheLock = GNUNET_mutex_create (GNUNET_NO); - GNUNET_cron_add_job (coreAPI->cron, - &defragmentationPurgeCron, - 60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS, - NULL); - GNUNET_GE_LOG (capi->ectx, - GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST, - _("`%s' registering handler %d\n"), "fragmentation", - GNUNET_P2P_PROTO_MESSAGE_FRAGMENT); - capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, - &processFragment); - - ret.fragment = &fragment; - return &ret; + static GNUNET_Fragmentation_ServiceAPI ret; + int i; + + coreAPI = capi; + stats = coreAPI->service_request ("stats"); + if (stats != NULL) + { + stat_defragmented = + stats->create (gettext_noop ("# messages defragmented")); + stat_fragmented = + stats->create (gettext_noop ("# messages fragmented")); + stat_discarded = stats->create (gettext_noop ("# fragments discarded")); + } + for (i = 0; i < DEFRAG_BUCKET_COUNT; i++) + defragmentationCache[i] = NULL; + defragCacheLock = GNUNET_mutex_create (GNUNET_NO); + GNUNET_cron_add_job (coreAPI->cron, + &defragmentationPurgeCron, + 60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS, + NULL); + GNUNET_GE_LOG (capi->ectx, + GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST, + _("`%s' registering handler %d\n"), "fragmentation", + GNUNET_P2P_PROTO_MESSAGE_FRAGMENT); + capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, + &processFragment); + + ret.fragment = &fragment; + return &ret; } /** @@ -761,31 +817,31 @@ provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi) void release_module_fragmentation () { - int i; - - coreAPI->p2p_ciphertext_handler_unregister - (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment); - GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron, - 60 * GNUNET_CRON_SECONDS, NULL); - for (i = 0; i < DEFRAG_BUCKET_COUNT; i++) - { - FC *pos = defragmentationCache[i]; - while (pos != NULL) - { - FC *next = pos->next; - freeFL (pos->head, 1); - GNUNET_free (pos); - pos = next; - } - } - if (stats != NULL) - { - coreAPI->service_release (stats); - stats = NULL; - } - GNUNET_mutex_destroy (defragCacheLock); - defragCacheLock = NULL; - coreAPI = NULL; + int i; + + coreAPI->p2p_ciphertext_handler_unregister + (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment); + GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron, + 60 * GNUNET_CRON_SECONDS, NULL); + for (i = 0; i < DEFRAG_BUCKET_COUNT; i++) + { + FC *pos = defragmentationCache[i]; + while (pos != NULL) + { + FC *next = pos->next; + freeFL (pos->head, 1); + GNUNET_free (pos); + pos = next; + } + } + if (stats != NULL) + { + coreAPI->service_release (stats); + stats = NULL; + } + GNUNET_mutex_destroy (defragCacheLock); + defragCacheLock = NULL; + coreAPI = NULL; } #endif diff --git a/src/fragmentation/test_frag_ji.c b/src/fragmentation/test_frag_ji.c index 8cba654ad..059994f2b 100644 --- a/src/fragmentation/test_frag_ji.c +++ b/src/fragmentation/test_frag_ji.c @@ -1,38 +1,55 @@ #include "platform.h" +#include "gnunet_protocols.h" #include "gnunet_fragmentation_lib.h" +struct combine{ + struct GNUNET_FRAGMENT_Context* ctx; + struct GNUNET_PeerIdentity* sender; +}; + void message_proc1(void *cls, const struct GNUNET_MessageHeader * msg){ + fprintf(stderr, "enter into message_proc1\n"); struct GNUNET_MessageHeader * originalMsg = (struct GNUNET_MessageHeader *)cls; - if(originalMsg->size != msg->size){ - fprintf(stderr, "the received message has the different size with the sent one!"); + + if(ntohs(originalMsg->size) != ntohs(msg->size)){ + fprintf(stderr, "the received message has the different size with the sent one!\n"); } - if(originalMsg->type != msg->type){ - fprintf(stderr, "the received message has the different type with the sent one!"); + if(ntohs(originalMsg->type) != ntohs(msg->type)){ + fprintf(stderr, "the received message has the different type with the sent one!\n"); } - if(memcmp(&originalMsg[1], &msg[1], originalMsg->size - sizeof(struct GNUNET_MessageHeader))){ - fprintf(stderr, "the received message is not the sent one!"); + if(memcmp(msg+960, originalMsg+960, 68)){ + fprintf(stderr, "the received message is not the sent one!\n"); } } void message_proc2(void *cls, const struct GNUNET_MessageHeader * msg){ - struct GNUNET_FRAGMENT_Context * ctx = (struct GNUNET_FRAGMENT_Context * )cls; - struct Fragment *frag; - struct GNUNET_PeerIdentity sender; - GNUNET_FRAGMENT_process(ctx, &sender, msg); + printf("enter into message_proc2\n"); + struct combine * com2 = (struct combine* )cls; + GNUNET_FRAGMENT_process(com2->ctx, com2->sender, msg); } int main(int argc, char * argv[]){ - + + uint16_t mtu = 512; struct GNUNET_FRAGMENT_Context * ctx; - struct GNUNET_MessageHeader *msg; - ctx = GNUNET_FRAGMENT_context_create(stats, message_proc1, msg); - msg->size = sizeof(struct GNUNET_MessageHeader)+2*mtu; - msg->type = GNUNET_MESSAGE_TYPE_HELLO; - memcpy(&msg[1], 5, 2*mtu); - GNUNET_FRAGMENT_fragment(msg, mtu, message_proc2, ctx); + struct GNUNET_MessageHeader *msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader)+2*mtu); + ctx = GNUNET_FRAGMENT_context_create(NULL, message_proc1, msg); + msg->size = htons(sizeof(struct GNUNET_MessageHeader)+2*mtu); + msg->type = htons(GNUNET_MESSAGE_TYPE_HELLO); + struct GNUNET_PeerIdentity *sender; + sender = GNUNET_malloc(sizeof(struct GNUNET_PeerIdentity)); + + memset(sender, 9, sizeof(struct GNUNET_PeerIdentity)); + memset(&msg[1], 5, 2*mtu); + struct combine *com; + com = GNUNET_malloc(sizeof(struct combine)); + com->ctx = ctx; + com->sender = sender; + GNUNET_FRAGMENT_fragment(msg, mtu, message_proc2, com); + GNUNET_free(msg); return 0; } -- cgit v1.2.3