aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2013-12-18 15:39:21 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2013-12-18 15:39:21 +0000
commitf9c6eb1e0a52ee619997e8f3772d99457adac3b5 (patch)
treebecb437393b7677fe9afe11aa4daa9963aa0a4b9 /src/set
parent86cc996b969e9de27e00e7e7794c2381c93e9665 (diff)
downloadgnunet-f9c6eb1e0a52ee619997e8f3772d99457adac3b5.tar.gz
gnunet-f9c6eb1e0a52ee619997e8f3772d99457adac3b5.zip
- further work on multipart receiving
- removed the multipart-state from the statemachine again, as we can recognize multipart sending based on wether or not the bf_data pointer is null or not - simplified & refactored the multipart message format a bit
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set_intersection.c182
-rw-r--r--src/set/set_protocol.h4
2 files changed, 123 insertions, 63 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 7152eec06..886d4c6dd 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -54,10 +54,6 @@ enum IntersectionOperationPhase
54 */ 54 */
55 PHASE_BF_EXCHANGE, 55 PHASE_BF_EXCHANGE,
56 /** 56 /**
57 * Multipart continuation of BF_exchange
58 */
59 PHASE_BF_AWAIT_MULTIPART,
60 /**
61 * if both peers have an equal peercount, they enter this state for 57 * if both peers have an equal peercount, they enter this state for
62 * one more turn, to see if they actually have agreed on a correct set. 58 * one more turn, to see if they actually have agreed on a correct set.
63 * if a peer finds the same element count after the next iteration, 59 * if a peer finds the same element count after the next iteration,
@@ -91,12 +87,17 @@ struct OperationState
91 /** 87 /**
92 * for multipart msgs we have to store the bloomfilter-data until we fully sent it. 88 * for multipart msgs we have to store the bloomfilter-data until we fully sent it.
93 */ 89 */
94 char * local_bf_data; 90 char * bf_data;
95 91
96 /** 92 /**
97 * size of the bloomfilter 93 * size of the bloomfilter
98 */ 94 */
99 uint32_t local_bf_data_size; 95 uint32_t bf_data_size;
96
97 /**
98 * size of the bloomfilter
99 */
100 uint32_t bf_bits_per_element;
100 101
101 /** 102 /**
102 * Current state of the operation. 103 * Current state of the operation.
@@ -217,7 +218,6 @@ iterator_initialization (void *cls,
217{ 218{
218 struct ElementEntry *ee = value; 219 struct ElementEntry *ee = value;
219 struct Operation *op = cls; 220 struct Operation *op = cls;
220 struct GNUNET_HashCode mutated_hash;
221 221
222 //only consider this element, if it is valid for us 222 //only consider this element, if it is valid for us
223 if ((op->generation_created >= ee->generation_removed) 223 if ((op->generation_created >= ee->generation_removed)
@@ -366,24 +366,24 @@ send_bloomfilter_multipart (struct Operation *op, uint32_t offset)
366 struct GNUNET_MQ_Envelope *ev; 366 struct GNUNET_MQ_Envelope *ev;
367 struct BFPart *msg; 367 struct BFPart *msg;
368 uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); 368 uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
369 uint32_t todo_size = op->state->local_bf_data_size - offset; 369 uint32_t todo_size = op->state->bf_data_size - offset;
370 370
371 if (todo_size < chunk_size) 371 if (todo_size < chunk_size)
372 chunk_size = todo_size; 372 chunk_size = todo_size;
373 373
374 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); 374 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
375 375
376 msg->bloomfilter_length = htonl (chunk_size); 376 msg->chunk_length = htonl (chunk_size);
377 msg->bloomfilter_offset = htonl (offset); 377 msg->chunk_offset = htonl (offset);
378 memcpy(&msg[1], &op->state->local_bf_data[offset], chunk_size); 378 memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
379 379
380 GNUNET_MQ_send (op->mq, ev); 380 GNUNET_MQ_send (op->mq, ev);
381 381
382 if (op->state->local_bf_data_size == offset + chunk_size) 382 if (op->state->bf_data_size == offset + chunk_size)
383 { 383 {
384 // done 384 // done
385 GNUNET_free(op->state->local_bf_data); 385 GNUNET_free(op->state->bf_data);
386 op->state->local_bf_data = NULL; 386 op->state->bf_data = NULL;
387 return; 387 return;
388 } 388 }
389 389
@@ -431,20 +431,20 @@ send_bloomfilter (struct Operation *op)
431 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); 431 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
432 GNUNET_assert (GNUNET_SYSERR != 432 GNUNET_assert (GNUNET_SYSERR !=
433 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, 433 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
434 &msg[1], 434 (char*)&msg[1],
435 bf_size)); 435 bf_size));
436 } 436 }
437 else { 437 else {
438 //multipart 438 //multipart
439 chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); 439 chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage);
440 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); 440 ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
441 op->state->local_bf_data = (char *) GNUNET_malloc (bf_size); 441 op->state->bf_data = (char *) GNUNET_malloc (bf_size);
442 GNUNET_assert (GNUNET_SYSERR != 442 GNUNET_assert (GNUNET_SYSERR !=
443 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, 443 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
444 op->state->local_bf_data, 444 op->state->bf_data,
445 bf_size)); 445 bf_size));
446 memcpy (&msg[1], op->state->local_bf_data, chunk_size); 446 memcpy (&msg[1], op->state->bf_data, chunk_size);
447 op->state->local_bf_data_size = bf_size; 447 op->state->bf_data_size = bf_size;
448 } 448 }
449 GNUNET_CONTAINER_bloomfilter_free (local_bf); 449 GNUNET_CONTAINER_bloomfilter_free (local_bf);
450 450
@@ -456,7 +456,7 @@ send_bloomfilter (struct Operation *op)
456 456
457 GNUNET_MQ_send (op->mq, ev); 457 GNUNET_MQ_send (op->mq, ev);
458 458
459 if (op->state->local_bf_data) 459 if (op->state->bf_data)
460 send_bloomfilter_multipart (op, chunk_size); 460 send_bloomfilter_multipart (op, chunk_size);
461} 461}
462 462
@@ -540,50 +540,19 @@ send_peer_done (struct Operation *op)
540 GNUNET_MQ_send (op->mq, ev); 540 GNUNET_MQ_send (op->mq, ev);
541} 541}
542 542
543/**
544 * Handle an BF multipart message from a remote peer.
545 *
546 * @param cls the intersection operation
547 * @param mh the header of the message
548 */
549static void
550handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
551{
552 struct Operation *op = cls;
553 const struct BFPart *msg = (const struct BFPart *) mh;
554
555 if (op->state->phase != PHASE_BF_AWAIT_MULTIPART){
556 GNUNET_break_op (0);
557 fail_intersection_operation(op);
558 return;
559 }
560
561
562}
563 543
564/** 544/**
565 * Handle an BF message from a remote peer. 545 * Process a Bloomfilter once we got all the chunks
566 * 546 *
567 * @param cls the intersection operation 547 * @param op the intersection operation
568 * @param mh the header of the message
569 */ 548 */
570static void 549static void
571handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) 550process_bf (struct Operation *op){
572{
573 struct Operation *op = cls;
574 const struct BFMessage *msg = (const struct BFMessage *) mh;
575 uint32_t old_elements; 551 uint32_t old_elements;
576 uint32_t peer_elements; 552 uint32_t peer_elements;
577 553
578 old_elements = op->state->my_element_count; 554 old_elements = op->state->my_element_count;
579 op->spec->salt = ntohl (msg->sender_mutator); 555 peer_elements = op->spec->remote_element_count;
580
581 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
582 ntohl (msg->bloomfilter_total_length),
583 ntohl (msg->bits_per_element));
584 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
585 BLOOMFILTER_SIZE,
586 GNUNET_CONSTANTS_BLOOMFILTER_K);
587 switch (op->state->phase) 556 switch (op->state->phase)
588 { 557 {
589 case PHASE_INITIAL: 558 case PHASE_INITIAL:
@@ -613,26 +582,117 @@ handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
613 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); 582 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
614 op->state->remote_bf = NULL; 583 op->state->remote_bf = NULL;
615 584
616 peer_elements = ntohl(msg->sender_element_count);
617 if ((op->state->phase == PHASE_MAYBE_FINISHED) 585 if ((op->state->phase == PHASE_MAYBE_FINISHED)
618 && (old_elements == op->state->my_element_count) 586 && (old_elements == op->state->my_element_count)
619 && (op->state->my_element_count == peer_elements)){ 587 && (op->state->my_element_count == peer_elements)){
620 // In the last round we though we were finished, we now know this is correct 588 // In the last round we though we were finished, we now know this is correct
621 send_peer_done(op); 589 send_peer_done (op);
622 return; 590 return;
623 } 591 }
624 592
625 op->state->phase = PHASE_BF_EXCHANGE; 593 op->state->phase = PHASE_BF_EXCHANGE;
626 // maybe we are finished, but we do one more round to make certain
627 // we don't have false positives ...
628 if (op->state->my_element_count == peer_elements) 594 if (op->state->my_element_count == peer_elements)
629 op->state->phase = PHASE_MAYBE_FINISHED; 595 // maybe we are finished, but we do one more round to make certain
596 // we don't have false positives ...
597 op->state->phase = PHASE_MAYBE_FINISHED;
630 598
631 send_bloomfilter (op); 599 send_bloomfilter (op);
632} 600}
633 601
634 602
635/** 603/**
604 * Handle an BF multipart message from a remote peer.
605 *
606 * @param cls the intersection operation
607 * @param mh the header of the message
608 */
609static void
610handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh)
611{
612 struct Operation *op = cls;
613 const struct BFPart *msg = (const struct BFPart *) mh;
614 uint32_t chunk_size;
615 uint32_t chunk_offset;
616
617 chunk_size = ntohl(msg->chunk_length);
618 chunk_offset = ntohl(msg->chunk_offset);
619
620 if ((NULL == op->state->bf_data)
621 || (op->state->bf_data_size < chunk_size + chunk_offset)){
622 // unexpected multipart chunk
623 GNUNET_break_op (0);
624 fail_intersection_operation(op);
625 return;
626 }
627
628 memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
629
630 if (op->state->bf_data_size > chunk_size + chunk_offset)
631 // wait for next chunk
632 return;
633
634 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
635 op->state->bf_data_size,
636 op->state->bf_bits_per_element);
637
638 GNUNET_free (op->state->bf_data);
639 op->state->bf_data = NULL;
640
641 process_bf (op);
642}
643
644
645/**
646 * Handle an BF message from a remote peer.
647 *
648 * @param cls the intersection operation
649 * @param mh the header of the message
650 */
651static void
652handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
653{
654 struct Operation *op = cls;
655 const struct BFMessage *msg = (const struct BFMessage *) mh;
656 uint32_t bf_size;
657 uint32_t chunk_size;
658 uint32_t bf_bits_per_element;
659
660 switch (op->state->phase)
661 {
662 case PHASE_INITIAL:
663 case PHASE_BF_EXCHANGE:
664 case PHASE_MAYBE_FINISHED:
665 if (NULL == op->state->bf_data) {
666 // no colliding multipart transaction going on currently
667 op->spec->salt = ntohl (msg->sender_mutator);
668 bf_size = ntohl (msg->bloomfilter_total_length);
669 bf_bits_per_element = ntohl (msg->bits_per_element);
670 chunk_size = ntohl (msg->bloomfilter_length);
671 op->spec->remote_element_count = ntohl(msg->sender_element_count);
672 if (bf_size == chunk_size) {
673 // single part, done here
674 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
675 bf_size,
676 bf_bits_per_element);
677 process_bf (op);
678 return;
679 }
680
681 //first multipart chunk
682 op->state->bf_data = GNUNET_malloc (bf_size);
683 op->state->bf_data_size = bf_size;
684 op->state->bf_bits_per_element = bf_bits_per_element;
685 memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size);
686 return;
687 }
688 default:
689 GNUNET_break_op (0);
690 fail_intersection_operation (op);
691 }
692}
693
694
695/**
636 * Handle an BF message from a remote peer. 696 * Handle an BF message from a remote peer.
637 * 697 *
638 * @param cls the intersection operation 698 * @param cls the intersection operation
diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h
index 9d39abba8..b48809a3c 100644
--- a/src/set/set_protocol.h
+++ b/src/set/set_protocol.h
@@ -140,12 +140,12 @@ struct BFPart
140 /** 140 /**
141 * Length of the appended bloomfilter data block 141 * Length of the appended bloomfilter data block
142 */ 142 */
143 uint32_t bloomfilter_length GNUNET_PACKED; 143 uint32_t chunk_length GNUNET_PACKED;
144 144
145 /** 145 /**
146 * offset in the bloolfilter data block, if multipart message 146 * offset in the bloolfilter data block, if multipart message
147 */ 147 */
148 uint32_t bloomfilter_offset GNUNET_PACKED; 148 uint32_t chunk_offset GNUNET_PACKED;
149 149
150 /** 150 /**
151 * rest: the sender's bloomfilter 151 * rest: the sender's bloomfilter