diff options
author | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-12-18 15:39:21 +0000 |
---|---|---|
committer | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-12-18 15:39:21 +0000 |
commit | f9c6eb1e0a52ee619997e8f3772d99457adac3b5 (patch) | |
tree | becb437393b7677fe9afe11aa4daa9963aa0a4b9 /src/set | |
parent | 86cc996b969e9de27e00e7e7794c2381c93e9665 (diff) | |
download | gnunet-f9c6eb1e0a52ee619997e8f3772d99457adac3b5.tar.gz gnunet-f9c6eb1e0a52ee619997e8f3772d99457adac3b5.zip |
- further work on multipart receiving
- removed the multipart-state from the statemachine again, as we can recognize multipart sending based on wether or not the bf_data pointer is null or not
- simplified & refactored the multipart message format a bit
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 182 | ||||
-rw-r--r-- | src/set/set_protocol.h | 4 |
2 files changed, 123 insertions, 63 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 7152eec06..886d4c6dd 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -54,10 +54,6 @@ enum IntersectionOperationPhase | |||
54 | */ | 54 | */ |
55 | PHASE_BF_EXCHANGE, | 55 | PHASE_BF_EXCHANGE, |
56 | /** | 56 | /** |
57 | * Multipart continuation of BF_exchange | ||
58 | */ | ||
59 | PHASE_BF_AWAIT_MULTIPART, | ||
60 | /** | ||
61 | * if both peers have an equal peercount, they enter this state for | 57 | * if both peers have an equal peercount, they enter this state for |
62 | * one more turn, to see if they actually have agreed on a correct set. | 58 | * one more turn, to see if they actually have agreed on a correct set. |
63 | * if a peer finds the same element count after the next iteration, | 59 | * if a peer finds the same element count after the next iteration, |
@@ -91,12 +87,17 @@ struct OperationState | |||
91 | /** | 87 | /** |
92 | * for multipart msgs we have to store the bloomfilter-data until we fully sent it. | 88 | * for multipart msgs we have to store the bloomfilter-data until we fully sent it. |
93 | */ | 89 | */ |
94 | char * local_bf_data; | 90 | char * bf_data; |
95 | 91 | ||
96 | /** | 92 | /** |
97 | * size of the bloomfilter | 93 | * size of the bloomfilter |
98 | */ | 94 | */ |
99 | uint32_t local_bf_data_size; | 95 | uint32_t bf_data_size; |
96 | |||
97 | /** | ||
98 | * size of the bloomfilter | ||
99 | */ | ||
100 | uint32_t bf_bits_per_element; | ||
100 | 101 | ||
101 | /** | 102 | /** |
102 | * Current state of the operation. | 103 | * Current state of the operation. |
@@ -217,7 +218,6 @@ iterator_initialization (void *cls, | |||
217 | { | 218 | { |
218 | struct ElementEntry *ee = value; | 219 | struct ElementEntry *ee = value; |
219 | struct Operation *op = cls; | 220 | struct Operation *op = cls; |
220 | struct GNUNET_HashCode mutated_hash; | ||
221 | 221 | ||
222 | //only consider this element, if it is valid for us | 222 | //only consider this element, if it is valid for us |
223 | if ((op->generation_created >= ee->generation_removed) | 223 | if ((op->generation_created >= ee->generation_removed) |
@@ -366,24 +366,24 @@ send_bloomfilter_multipart (struct Operation *op, uint32_t offset) | |||
366 | struct GNUNET_MQ_Envelope *ev; | 366 | struct GNUNET_MQ_Envelope *ev; |
367 | struct BFPart *msg; | 367 | struct BFPart *msg; |
368 | uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); | 368 | uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); |
369 | uint32_t todo_size = op->state->local_bf_data_size - offset; | 369 | uint32_t todo_size = op->state->bf_data_size - offset; |
370 | 370 | ||
371 | if (todo_size < chunk_size) | 371 | if (todo_size < chunk_size) |
372 | chunk_size = todo_size; | 372 | chunk_size = todo_size; |
373 | 373 | ||
374 | ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); | 374 | ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); |
375 | 375 | ||
376 | msg->bloomfilter_length = htonl (chunk_size); | 376 | msg->chunk_length = htonl (chunk_size); |
377 | msg->bloomfilter_offset = htonl (offset); | 377 | msg->chunk_offset = htonl (offset); |
378 | memcpy(&msg[1], &op->state->local_bf_data[offset], chunk_size); | 378 | memcpy(&msg[1], &op->state->bf_data[offset], chunk_size); |
379 | 379 | ||
380 | GNUNET_MQ_send (op->mq, ev); | 380 | GNUNET_MQ_send (op->mq, ev); |
381 | 381 | ||
382 | if (op->state->local_bf_data_size == offset + chunk_size) | 382 | if (op->state->bf_data_size == offset + chunk_size) |
383 | { | 383 | { |
384 | // done | 384 | // done |
385 | GNUNET_free(op->state->local_bf_data); | 385 | GNUNET_free(op->state->bf_data); |
386 | op->state->local_bf_data = NULL; | 386 | op->state->bf_data = NULL; |
387 | return; | 387 | return; |
388 | } | 388 | } |
389 | 389 | ||
@@ -431,20 +431,20 @@ send_bloomfilter (struct Operation *op) | |||
431 | ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | 431 | ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); |
432 | GNUNET_assert (GNUNET_SYSERR != | 432 | GNUNET_assert (GNUNET_SYSERR != |
433 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, | 433 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, |
434 | &msg[1], | 434 | (char*)&msg[1], |
435 | bf_size)); | 435 | bf_size)); |
436 | } | 436 | } |
437 | else { | 437 | else { |
438 | //multipart | 438 | //multipart |
439 | chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); | 439 | chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); |
440 | ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | 440 | ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); |
441 | op->state->local_bf_data = (char *) GNUNET_malloc (bf_size); | 441 | op->state->bf_data = (char *) GNUNET_malloc (bf_size); |
442 | GNUNET_assert (GNUNET_SYSERR != | 442 | GNUNET_assert (GNUNET_SYSERR != |
443 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, | 443 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, |
444 | op->state->local_bf_data, | 444 | op->state->bf_data, |
445 | bf_size)); | 445 | bf_size)); |
446 | memcpy (&msg[1], op->state->local_bf_data, chunk_size); | 446 | memcpy (&msg[1], op->state->bf_data, chunk_size); |
447 | op->state->local_bf_data_size = bf_size; | 447 | op->state->bf_data_size = bf_size; |
448 | } | 448 | } |
449 | GNUNET_CONTAINER_bloomfilter_free (local_bf); | 449 | GNUNET_CONTAINER_bloomfilter_free (local_bf); |
450 | 450 | ||
@@ -456,7 +456,7 @@ send_bloomfilter (struct Operation *op) | |||
456 | 456 | ||
457 | GNUNET_MQ_send (op->mq, ev); | 457 | GNUNET_MQ_send (op->mq, ev); |
458 | 458 | ||
459 | if (op->state->local_bf_data) | 459 | if (op->state->bf_data) |
460 | send_bloomfilter_multipart (op, chunk_size); | 460 | send_bloomfilter_multipart (op, chunk_size); |
461 | } | 461 | } |
462 | 462 | ||
@@ -540,50 +540,19 @@ send_peer_done (struct Operation *op) | |||
540 | GNUNET_MQ_send (op->mq, ev); | 540 | GNUNET_MQ_send (op->mq, ev); |
541 | } | 541 | } |
542 | 542 | ||
543 | /** | ||
544 | * Handle an BF multipart message from a remote peer. | ||
545 | * | ||
546 | * @param cls the intersection operation | ||
547 | * @param mh the header of the message | ||
548 | */ | ||
549 | static void | ||
550 | handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh) | ||
551 | { | ||
552 | struct Operation *op = cls; | ||
553 | const struct BFPart *msg = (const struct BFPart *) mh; | ||
554 | |||
555 | if (op->state->phase != PHASE_BF_AWAIT_MULTIPART){ | ||
556 | GNUNET_break_op (0); | ||
557 | fail_intersection_operation(op); | ||
558 | return; | ||
559 | } | ||
560 | |||
561 | |||
562 | } | ||
563 | 543 | ||
564 | /** | 544 | /** |
565 | * Handle an BF message from a remote peer. | 545 | * Process a Bloomfilter once we got all the chunks |
566 | * | 546 | * |
567 | * @param cls the intersection operation | 547 | * @param op the intersection operation |
568 | * @param mh the header of the message | ||
569 | */ | 548 | */ |
570 | static void | 549 | static void |
571 | handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) | 550 | process_bf (struct Operation *op){ |
572 | { | ||
573 | struct Operation *op = cls; | ||
574 | const struct BFMessage *msg = (const struct BFMessage *) mh; | ||
575 | uint32_t old_elements; | 551 | uint32_t old_elements; |
576 | uint32_t peer_elements; | 552 | uint32_t peer_elements; |
577 | 553 | ||
578 | old_elements = op->state->my_element_count; | 554 | old_elements = op->state->my_element_count; |
579 | op->spec->salt = ntohl (msg->sender_mutator); | 555 | peer_elements = op->spec->remote_element_count; |
580 | |||
581 | op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | ||
582 | ntohl (msg->bloomfilter_total_length), | ||
583 | ntohl (msg->bits_per_element)); | ||
584 | op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, | ||
585 | BLOOMFILTER_SIZE, | ||
586 | GNUNET_CONSTANTS_BLOOMFILTER_K); | ||
587 | switch (op->state->phase) | 556 | switch (op->state->phase) |
588 | { | 557 | { |
589 | case PHASE_INITIAL: | 558 | case PHASE_INITIAL: |
@@ -613,26 +582,117 @@ handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
613 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | 582 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); |
614 | op->state->remote_bf = NULL; | 583 | op->state->remote_bf = NULL; |
615 | 584 | ||
616 | peer_elements = ntohl(msg->sender_element_count); | ||
617 | if ((op->state->phase == PHASE_MAYBE_FINISHED) | 585 | if ((op->state->phase == PHASE_MAYBE_FINISHED) |
618 | && (old_elements == op->state->my_element_count) | 586 | && (old_elements == op->state->my_element_count) |
619 | && (op->state->my_element_count == peer_elements)){ | 587 | && (op->state->my_element_count == peer_elements)){ |
620 | // In the last round we though we were finished, we now know this is correct | 588 | // In the last round we though we were finished, we now know this is correct |
621 | send_peer_done(op); | 589 | send_peer_done (op); |
622 | return; | 590 | return; |
623 | } | 591 | } |
624 | 592 | ||
625 | op->state->phase = PHASE_BF_EXCHANGE; | 593 | op->state->phase = PHASE_BF_EXCHANGE; |
626 | // maybe we are finished, but we do one more round to make certain | ||
627 | // we don't have false positives ... | ||
628 | if (op->state->my_element_count == peer_elements) | 594 | if (op->state->my_element_count == peer_elements) |
629 | op->state->phase = PHASE_MAYBE_FINISHED; | 595 | // maybe we are finished, but we do one more round to make certain |
596 | // we don't have false positives ... | ||
597 | op->state->phase = PHASE_MAYBE_FINISHED; | ||
630 | 598 | ||
631 | send_bloomfilter (op); | 599 | send_bloomfilter (op); |
632 | } | 600 | } |
633 | 601 | ||
634 | 602 | ||
635 | /** | 603 | /** |
604 | * Handle an BF multipart message from a remote peer. | ||
605 | * | ||
606 | * @param cls the intersection operation | ||
607 | * @param mh the header of the message | ||
608 | */ | ||
609 | static void | ||
610 | handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh) | ||
611 | { | ||
612 | struct Operation *op = cls; | ||
613 | const struct BFPart *msg = (const struct BFPart *) mh; | ||
614 | uint32_t chunk_size; | ||
615 | uint32_t chunk_offset; | ||
616 | |||
617 | chunk_size = ntohl(msg->chunk_length); | ||
618 | chunk_offset = ntohl(msg->chunk_offset); | ||
619 | |||
620 | if ((NULL == op->state->bf_data) | ||
621 | || (op->state->bf_data_size < chunk_size + chunk_offset)){ | ||
622 | // unexpected multipart chunk | ||
623 | GNUNET_break_op (0); | ||
624 | fail_intersection_operation(op); | ||
625 | return; | ||
626 | } | ||
627 | |||
628 | memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size); | ||
629 | |||
630 | if (op->state->bf_data_size > chunk_size + chunk_offset) | ||
631 | // wait for next chunk | ||
632 | return; | ||
633 | |||
634 | op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | ||
635 | op->state->bf_data_size, | ||
636 | op->state->bf_bits_per_element); | ||
637 | |||
638 | GNUNET_free (op->state->bf_data); | ||
639 | op->state->bf_data = NULL; | ||
640 | |||
641 | process_bf (op); | ||
642 | } | ||
643 | |||
644 | |||
645 | /** | ||
646 | * Handle an BF message from a remote peer. | ||
647 | * | ||
648 | * @param cls the intersection operation | ||
649 | * @param mh the header of the message | ||
650 | */ | ||
651 | static void | ||
652 | handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) | ||
653 | { | ||
654 | struct Operation *op = cls; | ||
655 | const struct BFMessage *msg = (const struct BFMessage *) mh; | ||
656 | uint32_t bf_size; | ||
657 | uint32_t chunk_size; | ||
658 | uint32_t bf_bits_per_element; | ||
659 | |||
660 | switch (op->state->phase) | ||
661 | { | ||
662 | case PHASE_INITIAL: | ||
663 | case PHASE_BF_EXCHANGE: | ||
664 | case PHASE_MAYBE_FINISHED: | ||
665 | if (NULL == op->state->bf_data) { | ||
666 | // no colliding multipart transaction going on currently | ||
667 | op->spec->salt = ntohl (msg->sender_mutator); | ||
668 | bf_size = ntohl (msg->bloomfilter_total_length); | ||
669 | bf_bits_per_element = ntohl (msg->bits_per_element); | ||
670 | chunk_size = ntohl (msg->bloomfilter_length); | ||
671 | op->spec->remote_element_count = ntohl(msg->sender_element_count); | ||
672 | if (bf_size == chunk_size) { | ||
673 | // single part, done here | ||
674 | op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | ||
675 | bf_size, | ||
676 | bf_bits_per_element); | ||
677 | process_bf (op); | ||
678 | return; | ||
679 | } | ||
680 | |||
681 | //first multipart chunk | ||
682 | op->state->bf_data = GNUNET_malloc (bf_size); | ||
683 | op->state->bf_data_size = bf_size; | ||
684 | op->state->bf_bits_per_element = bf_bits_per_element; | ||
685 | memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size); | ||
686 | return; | ||
687 | } | ||
688 | default: | ||
689 | GNUNET_break_op (0); | ||
690 | fail_intersection_operation (op); | ||
691 | } | ||
692 | } | ||
693 | |||
694 | |||
695 | /** | ||
636 | * Handle an BF message from a remote peer. | 696 | * Handle an BF message from a remote peer. |
637 | * | 697 | * |
638 | * @param cls the intersection operation | 698 | * @param cls the intersection operation |
diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h index 9d39abba8..b48809a3c 100644 --- a/src/set/set_protocol.h +++ b/src/set/set_protocol.h | |||
@@ -140,12 +140,12 @@ struct BFPart | |||
140 | /** | 140 | /** |
141 | * Length of the appended bloomfilter data block | 141 | * Length of the appended bloomfilter data block |
142 | */ | 142 | */ |
143 | uint32_t bloomfilter_length GNUNET_PACKED; | 143 | uint32_t chunk_length GNUNET_PACKED; |
144 | 144 | ||
145 | /** | 145 | /** |
146 | * offset in the bloolfilter data block, if multipart message | 146 | * offset in the bloolfilter data block, if multipart message |
147 | */ | 147 | */ |
148 | uint32_t bloomfilter_offset GNUNET_PACKED; | 148 | uint32_t chunk_offset GNUNET_PACKED; |
149 | 149 | ||
150 | /** | 150 | /** |
151 | * rest: the sender's bloomfilter | 151 | * rest: the sender's bloomfilter |