aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_intersection.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:22 +0100
committerChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:34 +0100
commitbf6f552fdefe75425635f66343f98995e2f602f6 (patch)
treeadd6ea146823579a137763b78e89839ff97b3902 /src/set/gnunet-service-set_intersection.c
parenta9a5994e518ded483edb87513d5197b6539ed4ff (diff)
downloadgnunet-bf6f552fdefe75425635f66343f98995e2f602f6.tar.gz
gnunet-bf6f552fdefe75425635f66343f98995e2f602f6.zip
major clean up and bugfixes of SET
Diffstat (limited to 'src/set/gnunet-service-set_intersection.c')
-rw-r--r--src/set/gnunet-service-set_intersection.c542
1 files changed, 304 insertions, 238 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 8307672b9..9dc421792 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -56,6 +56,18 @@ enum IntersectionOperationPhase
56 PHASE_BF_EXCHANGE, 56 PHASE_BF_EXCHANGE,
57 57
58 /** 58 /**
59 * We must next send the P2P DONE message (after finishing mostly
60 * with the local client). Then we will wait for the channel to close.
61 */
62 PHASE_MUST_SEND_DONE,
63
64 /**
65 * We have received the P2P DONE message, and must finish with the
66 * local client before terminating the channel.
67 */
68 PHASE_DONE_RECEIVED,
69
70 /**
59 * The protocol is over. Results may still have to be sent to the 71 * The protocol is over. Results may still have to be sent to the
60 * client. 72 * client.
61 */ 73 */
@@ -162,6 +174,13 @@ struct OperationState
162 * Did we send the client that we are done? 174 * Did we send the client that we are done?
163 */ 175 */
164 int client_done_sent; 176 int client_done_sent;
177
178 /**
179 * Set whenever we reach the state where the death of the
180 * channel is perfectly find and should NOT result in the
181 * operation being cancelled.
182 */
183 int channel_death_expected;
165}; 184};
166 185
167 186
@@ -193,12 +212,12 @@ send_client_removed_element (struct Operation *op,
193 struct GNUNET_MQ_Envelope *ev; 212 struct GNUNET_MQ_Envelope *ev;
194 struct GNUNET_SET_ResultMessage *rm; 213 struct GNUNET_SET_ResultMessage *rm;
195 214
196 if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) 215 if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
197 return; /* Wrong mode for transmitting removed elements */ 216 return; /* Wrong mode for transmitting removed elements */
198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
199 "Sending removed element (size %u) to client\n", 218 "Sending removed element (size %u) to client\n",
200 element->size); 219 element->size);
201 GNUNET_assert (0 != op->spec->client_request_id); 220 GNUNET_assert (0 != op->client_request_id);
202 ev = GNUNET_MQ_msg_extra (rm, 221 ev = GNUNET_MQ_msg_extra (rm,
203 element->size, 222 element->size,
204 GNUNET_MESSAGE_TYPE_SET_RESULT); 223 GNUNET_MESSAGE_TYPE_SET_RESULT);
@@ -208,12 +227,12 @@ send_client_removed_element (struct Operation *op,
208 return; 227 return;
209 } 228 }
210 rm->result_status = htons (GNUNET_SET_STATUS_OK); 229 rm->result_status = htons (GNUNET_SET_STATUS_OK);
211 rm->request_id = htonl (op->spec->client_request_id); 230 rm->request_id = htonl (op->client_request_id);
212 rm->element_type = element->element_type; 231 rm->element_type = element->element_type;
213 GNUNET_memcpy (&rm[1], 232 GNUNET_memcpy (&rm[1],
214 element->data, 233 element->data,
215 element->size); 234 element->size);
216 GNUNET_MQ_send (op->spec->set->client_mq, 235 GNUNET_MQ_send (op->set->cs->mq,
217 ev); 236 ev);
218} 237}
219 238
@@ -397,9 +416,9 @@ fail_intersection_operation (struct Operation *op)
397 ev = GNUNET_MQ_msg (msg, 416 ev = GNUNET_MQ_msg (msg,
398 GNUNET_MESSAGE_TYPE_SET_RESULT); 417 GNUNET_MESSAGE_TYPE_SET_RESULT);
399 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 418 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
400 msg->request_id = htonl (op->spec->client_request_id); 419 msg->request_id = htonl (op->client_request_id);
401 msg->element_type = htons (0); 420 msg->element_type = htons (0);
402 GNUNET_MQ_send (op->spec->set->client_mq, 421 GNUNET_MQ_send (op->set->cs->mq,
403 ev); 422 ev);
404 _GSS_operation_destroy (op, 423 _GSS_operation_destroy (op,
405 GNUNET_YES); 424 GNUNET_YES);
@@ -428,8 +447,8 @@ send_bloomfilter (struct Operation *op)
428 should use more bits to maximize its set reduction 447 should use more bits to maximize its set reduction
429 potential and minimize overall bandwidth consumption. */ 448 potential and minimize overall bandwidth consumption. */
430 bf_elementbits = 2 + ceil (log2((double) 449 bf_elementbits = 2 + ceil (log2((double)
431 (op->spec->remote_element_count / 450 (op->remote_element_count /
432 (double) op->state->my_element_count))); 451 (double) op->state->my_element_count)));
433 if (bf_elementbits < 1) 452 if (bf_elementbits < 1)
434 bf_elementbits = 1; /* make sure k is not 0 */ 453 bf_elementbits = 1; /* make sure k is not 0 */
435 /* optimize BF-size to ~50% of bits set */ 454 /* optimize BF-size to ~50% of bits set */
@@ -515,12 +534,14 @@ send_client_done_and_destroy (void *cls)
515 struct GNUNET_MQ_Envelope *ev; 534 struct GNUNET_MQ_Envelope *ev;
516 struct GNUNET_SET_ResultMessage *rm; 535 struct GNUNET_SET_ResultMessage *rm;
517 536
537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538 "Intersection succeeded, sending DONE to local client\n");
518 ev = GNUNET_MQ_msg (rm, 539 ev = GNUNET_MQ_msg (rm,
519 GNUNET_MESSAGE_TYPE_SET_RESULT); 540 GNUNET_MESSAGE_TYPE_SET_RESULT);
520 rm->request_id = htonl (op->spec->client_request_id); 541 rm->request_id = htonl (op->client_request_id);
521 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 542 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
522 rm->element_type = htons (0); 543 rm->element_type = htons (0);
523 GNUNET_MQ_send (op->spec->set->client_mq, 544 GNUNET_MQ_send (op->set->cs->mq,
524 ev); 545 ev);
525 _GSS_operation_destroy (op, 546 _GSS_operation_destroy (op,
526 GNUNET_YES); 547 GNUNET_YES);
@@ -528,6 +549,53 @@ send_client_done_and_destroy (void *cls)
528 549
529 550
530/** 551/**
552 * Remember that we are done dealing with the local client
553 * AND have sent the other peer our message that we are done,
554 * so we are not just waiting for the channel to die before
555 * telling the local client that we are done as our last act.
556 *
557 * @param cls the `struct Operation`.
558 */
559static void
560finished_local_operations (void *cls)
561{
562 struct Operation *op = cls;
563
564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565 "DONE sent to other peer, now waiting for other end to close the channel\n");
566 op->state->phase = PHASE_FINISHED;
567 op->state->channel_death_expected = GNUNET_YES;
568}
569
570
571/**
572 * Notify the other peer that we are done. Once this message
573 * is out, we still need to notify the local client that we
574 * are done.
575 *
576 * @param op operation to notify for.
577 */
578static void
579send_p2p_done (struct Operation *op)
580{
581 struct GNUNET_MQ_Envelope *ev;
582 struct IntersectionDoneMessage *idm;
583
584 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
585 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
586 ev = GNUNET_MQ_msg (idm,
587 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
588 idm->final_element_count = htonl (op->state->my_element_count);
589 idm->element_xor_hash = op->state->my_xor;
590 GNUNET_MQ_notify_sent (ev,
591 &finished_local_operations,
592 op);
593 GNUNET_MQ_send (op->mq,
594 ev);
595}
596
597
598/**
531 * Send all elements in the full result iterator. 599 * Send all elements in the full result iterator.
532 * 600 *
533 * @param cls the `struct Operation *` 601 * @param cls the `struct Operation *`
@@ -550,10 +618,21 @@ send_remaining_elements (void *cls)
550 { 618 {
551 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
552 "Sending done and destroy because iterator ran out\n"); 620 "Sending done and destroy because iterator ran out\n");
553 op->keep--;
554 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); 621 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
555 op->state->full_result_iter = NULL; 622 op->state->full_result_iter = NULL;
556 send_client_done_and_destroy (op); 623 if (PHASE_DONE_RECEIVED == op->state->phase)
624 {
625 op->state->phase = PHASE_FINISHED;
626 send_client_done_and_destroy (op);
627 }
628 else if (PHASE_MUST_SEND_DONE == op->state->phase)
629 {
630 send_p2p_done (op);
631 }
632 else
633 {
634 GNUNET_assert (0);
635 }
557 return; 636 return;
558 } 637 }
559 ee = nxt; 638 ee = nxt;
@@ -562,48 +641,136 @@ send_remaining_elements (void *cls)
562 "Sending element %s:%u to client (full set)\n", 641 "Sending element %s:%u to client (full set)\n",
563 GNUNET_h2s (&ee->element_hash), 642 GNUNET_h2s (&ee->element_hash),
564 element->size); 643 element->size);
565 GNUNET_assert (0 != op->spec->client_request_id); 644 GNUNET_assert (0 != op->client_request_id);
566 ev = GNUNET_MQ_msg_extra (rm, 645 ev = GNUNET_MQ_msg_extra (rm,
567 element->size, 646 element->size,
568 GNUNET_MESSAGE_TYPE_SET_RESULT); 647 GNUNET_MESSAGE_TYPE_SET_RESULT);
569 GNUNET_assert (NULL != ev); 648 GNUNET_assert (NULL != ev);
570 rm->result_status = htons (GNUNET_SET_STATUS_OK); 649 rm->result_status = htons (GNUNET_SET_STATUS_OK);
571 rm->request_id = htonl (op->spec->client_request_id); 650 rm->request_id = htonl (op->client_request_id);
572 rm->element_type = element->element_type; 651 rm->element_type = element->element_type;
573 GNUNET_memcpy (&rm[1], 652 GNUNET_memcpy (&rm[1],
574 element->data, 653 element->data,
575 element->size); 654 element->size);
576 GNUNET_MQ_notify_sent (ev, 655 GNUNET_MQ_notify_sent (ev,
577 &send_remaining_elements, 656 &send_remaining_elements,
578 op); 657 op);
579 GNUNET_MQ_send (op->spec->set->client_mq, 658 GNUNET_MQ_send (op->set->cs->mq,
580 ev); 659 ev);
581} 660}
582 661
583 662
584/** 663/**
585 * Inform the peer that this operation is complete. 664 * Fills the "my_elements" hashmap with the initial set of
665 * (non-deleted) elements from the set of the specification.
586 * 666 *
587 * @param op the intersection operation to fail 667 * @param cls closure with the `struct Operation *`
668 * @param key current key code for the element
669 * @param value value in the hash map with the `struct ElementEntry *`
670 * @return #GNUNET_YES (we should continue to iterate)
671 */
672static int
673initialize_map_unfiltered (void *cls,
674 const struct GNUNET_HashCode *key,
675 void *value)
676{
677 struct ElementEntry *ee = value;
678 struct Operation *op = cls;
679
680 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
681 return GNUNET_YES; /* element not live in operation's generation */
682 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
683 &ee->element_hash,
684 &op->state->my_xor);
685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686 "Initial full initialization of my_elements, adding %s:%u\n",
687 GNUNET_h2s (&ee->element_hash),
688 ee->element.size);
689 GNUNET_break (GNUNET_YES ==
690 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
691 &ee->element_hash,
692 ee,
693 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
694 return GNUNET_YES;
695}
696
697
698/**
699 * Send our element count to the peer, in case our element count is
700 * lower than his.
701 *
702 * @param op intersection operation
588 */ 703 */
589static void 704static void
590send_peer_done (struct Operation *op) 705send_element_count (struct Operation *op)
591{ 706{
592 struct GNUNET_MQ_Envelope *ev; 707 struct GNUNET_MQ_Envelope *ev;
593 struct IntersectionDoneMessage *idm; 708 struct IntersectionElementInfoMessage *msg;
594 709
595 op->state->phase = PHASE_FINISHED;
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
597 "Intersection succeeded, sending DONE\n"); 711 "Sending our element count (%u)\n",
598 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); 712 op->state->my_element_count);
599 op->state->local_bf = NULL; 713 ev = GNUNET_MQ_msg (msg,
714 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
715 msg->sender_element_count = htonl (op->state->my_element_count);
716 GNUNET_MQ_send (op->mq, ev);
717}
600 718
601 ev = GNUNET_MQ_msg (idm, 719
602 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); 720/**
603 idm->final_element_count = htonl (op->state->my_element_count); 721 * We go first, initialize our map with all elements and
604 idm->element_xor_hash = op->state->my_xor; 722 * send the first Bloom filter.
605 GNUNET_MQ_send (op->mq, 723 *
606 ev); 724 * @param op operation to start exchange for
725 */
726static void
727begin_bf_exchange (struct Operation *op)
728{
729 op->state->phase = PHASE_BF_EXCHANGE;
730 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
731 &initialize_map_unfiltered,
732 op);
733 send_bloomfilter (op);
734}
735
736
737/**
738 * Handle the initial `struct IntersectionElementInfoMessage` from a
739 * remote peer.
740 *
741 * @param cls the intersection operation
742 * @param mh the header of the message
743 */
744void
745handle_intersection_p2p_element_info (void *cls,
746 const struct IntersectionElementInfoMessage *msg)
747{
748 struct Operation *op = cls;
749
750 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
751 {
752 GNUNET_break_op (0);
753 fail_intersection_operation(op);
754 return;
755 }
756 op->remote_element_count = ntohl (msg->sender_element_count);
757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758 "Received remote element count (%u), I have %u\n",
759 op->remote_element_count,
760 op->state->my_element_count);
761 if ( ( (PHASE_INITIAL != op->state->phase) &&
762 (PHASE_COUNT_SENT != op->state->phase) ) ||
763 (op->state->my_element_count > op->remote_element_count) ||
764 (0 == op->state->my_element_count) ||
765 (0 == op->remote_element_count) )
766 {
767 GNUNET_break_op (0);
768 fail_intersection_operation(op);
769 return;
770 }
771 GNUNET_break (NULL == op->state->remote_bf);
772 begin_bf_exchange (op);
773 GNUNET_CADET_receive_done (op->channel);
607} 774}
608 775
609 776
@@ -618,9 +785,9 @@ process_bf (struct Operation *op)
618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", 786 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
620 op->state->phase, 787 op->state->phase,
621 op->spec->remote_element_count, 788 op->remote_element_count,
622 op->state->my_element_count, 789 op->state->my_element_count,
623 GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); 790 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
624 switch (op->state->phase) 791 switch (op->state->phase)
625 { 792 {
626 case PHASE_INITIAL: 793 case PHASE_INITIAL:
@@ -631,7 +798,7 @@ process_bf (struct Operation *op)
631 /* This is the first BF being sent, build our initial map with 798 /* This is the first BF being sent, build our initial map with
632 filtering in place */ 799 filtering in place */
633 op->state->my_element_count = 0; 800 op->state->my_element_count = 0;
634 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, 801 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
635 &filtered_map_initialization, 802 &filtered_map_initialization,
636 op); 803 op);
637 break; 804 break;
@@ -641,6 +808,14 @@ process_bf (struct Operation *op)
641 &iterator_bf_reduce, 808 &iterator_bf_reduce,
642 op); 809 op);
643 break; 810 break;
811 case PHASE_MUST_SEND_DONE:
812 GNUNET_break_op (0);
813 fail_intersection_operation(op);
814 return;
815 case PHASE_DONE_RECEIVED:
816 GNUNET_break_op (0);
817 fail_intersection_operation(op);
818 return;
644 case PHASE_FINISHED: 819 case PHASE_FINISHED:
645 GNUNET_break_op (0); 820 GNUNET_break_op (0);
646 fail_intersection_operation(op); 821 fail_intersection_operation(op);
@@ -650,13 +825,28 @@ process_bf (struct Operation *op)
650 op->state->remote_bf = NULL; 825 op->state->remote_bf = NULL;
651 826
652 if ( (0 == op->state->my_element_count) || /* fully disjoint */ 827 if ( (0 == op->state->my_element_count) || /* fully disjoint */
653 ( (op->state->my_element_count == op->spec->remote_element_count) && 828 ( (op->state->my_element_count == op->remote_element_count) &&
654 (0 == memcmp (&op->state->my_xor, 829 (0 == memcmp (&op->state->my_xor,
655 &op->state->other_xor, 830 &op->state->other_xor,
656 sizeof (struct GNUNET_HashCode))) ) ) 831 sizeof (struct GNUNET_HashCode))) ) )
657 { 832 {
658 /* we are done */ 833 /* we are done */
659 send_peer_done (op); 834 op->state->phase = PHASE_MUST_SEND_DONE;
835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836 "Intersection succeeded, sending DONE to other peer\n");
837 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
838 op->state->local_bf = NULL;
839 if (GNUNET_SET_RESULT_FULL == op->result_mode)
840 {
841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842 "Sending full result set (%u elements)\n",
843 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
844 op->state->full_result_iter
845 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
846 send_remaining_elements (op);
847 return;
848 }
849 send_p2p_done (op);
660 return; 850 return;
661 } 851 }
662 op->state->phase = PHASE_BF_EXCHANGE; 852 op->state->phase = PHASE_BF_EXCHANGE;
@@ -677,7 +867,7 @@ check_intersection_p2p_bf (void *cls,
677{ 867{
678 struct Operation *op = cls; 868 struct Operation *op = cls;
679 869
680 if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) 870 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
681 { 871 {
682 GNUNET_break_op (0); 872 GNUNET_break_op (0);
683 return GNUNET_SYSERR; 873 return GNUNET_SYSERR;
@@ -727,7 +917,7 @@ handle_intersection_p2p_bf (void *cls,
727 bf_size, 917 bf_size,
728 bf_bits_per_element); 918 bf_bits_per_element);
729 op->state->salt = ntohl (msg->sender_mutator); 919 op->state->salt = ntohl (msg->sender_mutator);
730 op->spec->remote_element_count = ntohl (msg->sender_element_count); 920 op->remote_element_count = ntohl (msg->sender_element_count);
731 process_bf (op); 921 process_bf (op);
732 break; 922 break;
733 } 923 }
@@ -740,7 +930,7 @@ handle_intersection_p2p_bf (void *cls,
740 op->state->bf_bits_per_element = bf_bits_per_element; 930 op->state->bf_bits_per_element = bf_bits_per_element;
741 op->state->bf_data_offset = 0; 931 op->state->bf_data_offset = 0;
742 op->state->salt = ntohl (msg->sender_mutator); 932 op->state->salt = ntohl (msg->sender_mutator);
743 op->spec->remote_element_count = ntohl (msg->sender_element_count); 933 op->remote_element_count = ntohl (msg->sender_element_count);
744 } 934 }
745 else 935 else
746 { 936 {
@@ -749,7 +939,7 @@ handle_intersection_p2p_bf (void *cls,
749 (op->state->bf_bits_per_element != bf_bits_per_element) || 939 (op->state->bf_bits_per_element != bf_bits_per_element) ||
750 (op->state->bf_data_offset + chunk_size > bf_size) || 940 (op->state->bf_data_offset + chunk_size > bf_size) ||
751 (op->state->salt != ntohl (msg->sender_mutator)) || 941 (op->state->salt != ntohl (msg->sender_mutator)) ||
752 (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) 942 (op->remote_element_count != ntohl (msg->sender_element_count)) )
753 { 943 {
754 GNUNET_break_op (0); 944 GNUNET_break_op (0);
755 fail_intersection_operation (op); 945 fail_intersection_operation (op);
@@ -783,147 +973,6 @@ handle_intersection_p2p_bf (void *cls,
783 973
784 974
785/** 975/**
786 * Fills the "my_elements" hashmap with the initial set of
787 * (non-deleted) elements from the set of the specification.
788 *
789 * @param cls closure with the `struct Operation *`
790 * @param key current key code for the element
791 * @param value value in the hash map with the `struct ElementEntry *`
792 * @return #GNUNET_YES (we should continue to iterate)
793 */
794static int
795initialize_map_unfiltered (void *cls,
796 const struct GNUNET_HashCode *key,
797 void *value)
798{
799 struct ElementEntry *ee = value;
800 struct Operation *op = cls;
801
802 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
803 return GNUNET_YES; /* element not live in operation's generation */
804 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
805 &ee->element_hash,
806 &op->state->my_xor);
807 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808 "Initial full initialization of my_elements, adding %s:%u\n",
809 GNUNET_h2s (&ee->element_hash),
810 ee->element.size);
811 GNUNET_break (GNUNET_YES ==
812 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
813 &ee->element_hash,
814 ee,
815 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
816 return GNUNET_YES;
817}
818
819
820/**
821 * Send our element count to the peer, in case our element count is
822 * lower than his.
823 *
824 * @param op intersection operation
825 */
826static void
827send_element_count (struct Operation *op)
828{
829 struct GNUNET_MQ_Envelope *ev;
830 struct IntersectionElementInfoMessage *msg;
831
832 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
833 "Sending our element count (%u)\n",
834 op->state->my_element_count);
835 ev = GNUNET_MQ_msg (msg,
836 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
837 msg->sender_element_count = htonl (op->state->my_element_count);
838 GNUNET_MQ_send (op->mq, ev);
839}
840
841
842/**
843 * We go first, initialize our map with all elements and
844 * send the first Bloom filter.
845 *
846 * @param op operation to start exchange for
847 */
848static void
849begin_bf_exchange (struct Operation *op)
850{
851 op->state->phase = PHASE_BF_EXCHANGE;
852 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
853 &initialize_map_unfiltered,
854 op);
855 send_bloomfilter (op);
856}
857
858
859/**
860 * Handle the initial `struct IntersectionElementInfoMessage` from a
861 * remote peer.
862 *
863 * @param cls the intersection operation
864 * @param mh the header of the message
865 */
866void
867handle_intersection_p2p_element_info (void *cls,
868 const struct IntersectionElementInfoMessage *msg)
869{
870 struct Operation *op = cls;
871
872 if (GNUNET_SET_OPERATION_INTERSECTION != op->operation)
873 {
874 GNUNET_break_op (0);
875 fail_intersection_operation(op);
876 return;
877 }
878 op->spec->remote_element_count = ntohl (msg->sender_element_count);
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "Received remote element count (%u), I have %u\n",
881 op->spec->remote_element_count,
882 op->state->my_element_count);
883 if ( ( (PHASE_INITIAL != op->state->phase) &&
884 (PHASE_COUNT_SENT != op->state->phase) ) ||
885 (op->state->my_element_count > op->spec->remote_element_count) ||
886 (0 == op->state->my_element_count) ||
887 (0 == op->spec->remote_element_count) )
888 {
889 GNUNET_break_op (0);
890 fail_intersection_operation(op);
891 return;
892 }
893 GNUNET_break (NULL == op->state->remote_bf);
894 begin_bf_exchange (op);
895 GNUNET_CADET_receive_done (op->channel);
896}
897
898
899/**
900 * Send a result message to the client indicating that the operation
901 * is over. After the result done message has been sent to the
902 * client, destroy the evaluate operation.
903 *
904 * @param op intersection operation
905 */
906static void
907finish_and_destroy (struct Operation *op)
908{
909 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
910
911 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
912 {
913 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914 "Sending full result set (%u elements)\n",
915 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
916 op->state->full_result_iter
917 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
918 op->keep++;
919 send_remaining_elements (op);
920 return;
921 }
922 send_client_done_and_destroy (op);
923}
924
925
926/**
927 * Remove all elements from our hashmap. 976 * Remove all elements from our hashmap.
928 * 977 *
929 * @param cls closure with the `struct Operation *` 978 * @param cls closure with the `struct Operation *`
@@ -970,10 +1019,10 @@ handle_intersection_p2p_done (void *cls,
970{ 1019{
971 struct Operation *op = cls; 1020 struct Operation *op = cls;
972 1021
973 if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) 1022 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
974 { 1023 {
975 GNUNET_break_op (0); 1024 GNUNET_break_op (0);
976 fail_intersection_operation(op); 1025 fail_intersection_operation (op);
977 return; 1026 return;
978 } 1027 }
979 if (PHASE_BF_EXCHANGE != op->state->phase) 1028 if (PHASE_BF_EXCHANGE != op->state->phase)
@@ -1005,9 +1054,22 @@ handle_intersection_p2p_done (void *cls,
1005 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1054 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006 "Got IntersectionDoneMessage, have %u elements in intersection\n", 1055 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1007 op->state->my_element_count); 1056 op->state->my_element_count);
1008 op->state->phase = PHASE_FINISHED; 1057 op->state->phase = PHASE_DONE_RECEIVED;
1009 finish_and_destroy (op);
1010 GNUNET_CADET_receive_done (op->channel); 1058 GNUNET_CADET_receive_done (op->channel);
1059
1060 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1061 if (GNUNET_SET_RESULT_FULL == op->result_mode)
1062 {
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Sending full result set to client (%u elements)\n",
1065 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1066 op->state->full_result_iter
1067 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1068 send_remaining_elements (op);
1069 return;
1070 }
1071 op->state->phase = PHASE_FINISHED;
1072 send_client_done_and_destroy (op);
1011} 1073}
1012 1074
1013 1075
@@ -1018,24 +1080,16 @@ handle_intersection_p2p_done (void *cls,
1018 * begin the evaluation 1080 * begin the evaluation
1019 * @param opaque_context message to be transmitted to the listener 1081 * @param opaque_context message to be transmitted to the listener
1020 * to convince him to accept, may be NULL 1082 * to convince him to accept, may be NULL
1083 * @return operation-specific state to keep in @a op
1021 */ 1084 */
1022static void 1085static struct OperationState *
1023intersection_evaluate (struct Operation *op, 1086intersection_evaluate (struct Operation *op,
1024 const struct GNUNET_MessageHeader *opaque_context) 1087 const struct GNUNET_MessageHeader *opaque_context)
1025{ 1088{
1089 struct OperationState *state;
1026 struct GNUNET_MQ_Envelope *ev; 1090 struct GNUNET_MQ_Envelope *ev;
1027 struct OperationRequestMessage *msg; 1091 struct OperationRequestMessage *msg;
1028 1092
1029 op->state = GNUNET_new (struct OperationState);
1030 /* we started the operation, thus we have to send the operation request */
1031 op->state->phase = PHASE_INITIAL;
1032 op->state->my_element_count = op->spec->set->state->current_set_element_count;
1033 op->state->my_elements
1034 = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
1035 GNUNET_YES);
1036
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "Initiating intersection operation evaluation\n");
1039 ev = GNUNET_MQ_msg_nested_mh (msg, 1093 ev = GNUNET_MQ_msg_nested_mh (msg,
1040 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1094 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1041 opaque_context); 1095 opaque_context);
@@ -1043,20 +1097,30 @@ intersection_evaluate (struct Operation *op,
1043 { 1097 {
1044 /* the context message is too large!? */ 1098 /* the context message is too large!? */
1045 GNUNET_break (0); 1099 GNUNET_break (0);
1046 GNUNET_SERVICE_client_drop (op->spec->set->client); 1100 return NULL;
1047 return;
1048 } 1101 }
1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103 "Initiating intersection operation evaluation\n");
1104 state = GNUNET_new (struct OperationState);
1105 /* we started the operation, thus we have to send the operation request */
1106 state->phase = PHASE_INITIAL;
1107 state->my_element_count = op->set->state->current_set_element_count;
1108 state->my_elements
1109 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1110 GNUNET_YES);
1111
1049 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); 1112 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1050 msg->element_count = htonl (op->state->my_element_count); 1113 msg->element_count = htonl (state->my_element_count);
1051 GNUNET_MQ_send (op->mq, 1114 GNUNET_MQ_send (op->mq,
1052 ev); 1115 ev);
1053 op->state->phase = PHASE_COUNT_SENT; 1116 state->phase = PHASE_COUNT_SENT;
1054 if (NULL != opaque_context) 1117 if (NULL != opaque_context)
1055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1118 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056 "Sent op request with context message\n"); 1119 "Sent op request with context message\n");
1057 else 1120 else
1058 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1121 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059 "Sent op request without context message\n"); 1122 "Sent op request without context message\n");
1123 return state;
1060} 1124}
1061 1125
1062 1126
@@ -1066,53 +1130,33 @@ intersection_evaluate (struct Operation *op,
1066 * 1130 *
1067 * @param op operation that will be accepted as an intersection operation 1131 * @param op operation that will be accepted as an intersection operation
1068 */ 1132 */
1069static void 1133static struct OperationState *
1070intersection_accept (struct Operation *op) 1134intersection_accept (struct Operation *op)
1071{ 1135{
1136 struct OperationState *state;
1137
1072 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073 "Accepting set intersection operation\n"); 1139 "Accepting set intersection operation\n");
1074 op->state = GNUNET_new (struct OperationState); 1140 state = GNUNET_new (struct OperationState);
1075 op->state->phase = PHASE_INITIAL; 1141 state->phase = PHASE_INITIAL;
1076 op->state->my_element_count 1142 state->my_element_count
1077 = op->spec->set->state->current_set_element_count; 1143 = op->set->state->current_set_element_count;
1078 GNUNET_assert (NULL == op->state->my_elements); 1144 state->my_elements
1079 op->state->my_elements 1145 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1080 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, 1146 op->remote_element_count),
1081 op->spec->remote_element_count),
1082 GNUNET_YES); 1147 GNUNET_YES);
1083 if (op->spec->remote_element_count < op->state->my_element_count) 1148 op->state = state;
1149 if (op->remote_element_count < state->my_element_count)
1084 { 1150 {
1085 /* If the other peer (Alice) has fewer elements than us (Bob), 1151 /* If the other peer (Alice) has fewer elements than us (Bob),
1086 we just send the count as Alice should send the first BF */ 1152 we just send the count as Alice should send the first BF */
1087 send_element_count (op); 1153 send_element_count (op);
1088 op->state->phase = PHASE_COUNT_SENT; 1154 state->phase = PHASE_COUNT_SENT;
1089 return; 1155 return state;
1090 } 1156 }
1091 /* We have fewer elements, so we start with the BF */ 1157 /* We have fewer elements, so we start with the BF */
1092 begin_bf_exchange (op); 1158 begin_bf_exchange (op);
1093} 1159 return state;
1094
1095
1096/**
1097 * Handler for peer-disconnects, notifies the client about the aborted
1098 * operation. If we did not expect anything from the other peer, we
1099 * gracefully terminate the operation.
1100 *
1101 * @param op the destroyed operation
1102 */
1103static void
1104intersection_peer_disconnect (struct Operation *op)
1105{
1106 if (PHASE_FINISHED != op->state->phase)
1107 {
1108 fail_intersection_operation (op);
1109 return;
1110 }
1111 /* the session has already been concluded */
1112 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113 "Other peer disconnected (finished)\n");
1114 if (GNUNET_NO == op->state->client_done_sent)
1115 finish_and_destroy (op);
1116} 1160}
1117 1161
1118 1162
@@ -1215,6 +1259,28 @@ intersection_remove (struct SetState *set_state,
1215 1259
1216 1260
1217/** 1261/**
1262 * Callback for channel death for the intersection operation.
1263 *
1264 * @param op operation that lost the channel
1265 */
1266static void
1267intersection_channel_death (struct Operation *op)
1268{
1269 if (GNUNET_YES == op->state->channel_death_expected)
1270 {
1271 /* oh goodie, we are done! */
1272 send_client_done_and_destroy (op);
1273 }
1274 else
1275 {
1276 /* sorry, channel went down early, too bad. */
1277 _GSS_operation_destroy (op,
1278 GNUNET_YES);
1279 }
1280}
1281
1282
1283/**
1218 * Get the table with implementing functions for set intersection. 1284 * Get the table with implementing functions for set intersection.
1219 * 1285 *
1220 * @return the operation specific VTable 1286 * @return the operation specific VTable
@@ -1229,8 +1295,8 @@ _GSS_intersection_vt ()
1229 .destroy_set = &intersection_set_destroy, 1295 .destroy_set = &intersection_set_destroy,
1230 .evaluate = &intersection_evaluate, 1296 .evaluate = &intersection_evaluate,
1231 .accept = &intersection_accept, 1297 .accept = &intersection_accept,
1232 .peer_disconnect = &intersection_peer_disconnect,
1233 .cancel = &intersection_op_cancel, 1298 .cancel = &intersection_op_cancel,
1299 .channel_death = &intersection_channel_death,
1234 }; 1300 };
1235 1301
1236 return &intersection_vt; 1302 return &intersection_vt;