aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_intersection.c
diff options
context:
space:
mode:
authorSchanzenbach, Martin <mschanzenbach@posteo.de>2017-03-29 14:26:33 +0200
committerSchanzenbach, Martin <mschanzenbach@posteo.de>2017-03-29 14:26:33 +0200
commitab281595eeb270120f89ec954a572f4fcf78fc53 (patch)
tree335a2caf503596adc400c5ebb9fb742f097bc5a3 /src/set/gnunet-service-set_intersection.c
parent59d393a1124cfd1aaffdf994bf6f8a9baaac8361 (diff)
parent2b87f173e360aaf4a3bac3fbc6e5b4dc44cf58cd (diff)
downloadgnunet-ab281595eeb270120f89ec954a572f4fcf78fc53.tar.gz
gnunet-ab281595eeb270120f89ec954a572f4fcf78fc53.zip
- merge with master
Diffstat (limited to 'src/set/gnunet-service-set_intersection.c')
-rw-r--r--src/set/gnunet-service-set_intersection.c652
1 files changed, 348 insertions, 304 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 258ad6443..9dc421792 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013, 2014 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -28,6 +28,7 @@
28#include "gnunet-service-set.h" 28#include "gnunet-service-set.h"
29#include "gnunet_block_lib.h" 29#include "gnunet_block_lib.h"
30#include "gnunet-service-set_protocol.h" 30#include "gnunet-service-set_protocol.h"
31#include "gnunet-service-set_intersection.h"
31#include <gcrypt.h> 32#include <gcrypt.h>
32 33
33 34
@@ -55,6 +56,18 @@ enum IntersectionOperationPhase
55 PHASE_BF_EXCHANGE, 56 PHASE_BF_EXCHANGE,
56 57
57 /** 58 /**
59 * We must next send the P2P DONE message (after finishing mostly
60 * with the local client). Then we will wait for the channel to close.
61 */
62 PHASE_MUST_SEND_DONE,
63
64 /**
65 * We have received the P2P DONE message, and must finish with the
66 * local client before terminating the channel.
67 */
68 PHASE_DONE_RECEIVED,
69
70 /**
58 * The protocol is over. Results may still have to be sent to the 71 * The protocol is over. Results may still have to be sent to the
59 * client. 72 * client.
60 */ 73 */
@@ -161,6 +174,13 @@ struct OperationState
161 * Did we send the client that we are done? 174 * Did we send the client that we are done?
162 */ 175 */
163 int client_done_sent; 176 int client_done_sent;
177
178 /**
179 * Set whenever we reach the state where the death of the
180 * channel is perfectly find and should NOT result in the
181 * operation being cancelled.
182 */
183 int channel_death_expected;
164}; 184};
165 185
166 186
@@ -192,12 +212,12 @@ send_client_removed_element (struct Operation *op,
192 struct GNUNET_MQ_Envelope *ev; 212 struct GNUNET_MQ_Envelope *ev;
193 struct GNUNET_SET_ResultMessage *rm; 213 struct GNUNET_SET_ResultMessage *rm;
194 214
195 if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) 215 if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
196 return; /* Wrong mode for transmitting removed elements */ 216 return; /* Wrong mode for transmitting removed elements */
197 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
198 "Sending removed element (size %u) to client\n", 218 "Sending removed element (size %u) to client\n",
199 element->size); 219 element->size);
200 GNUNET_assert (0 != op->spec->client_request_id); 220 GNUNET_assert (0 != op->client_request_id);
201 ev = GNUNET_MQ_msg_extra (rm, 221 ev = GNUNET_MQ_msg_extra (rm,
202 element->size, 222 element->size,
203 GNUNET_MESSAGE_TYPE_SET_RESULT); 223 GNUNET_MESSAGE_TYPE_SET_RESULT);
@@ -207,12 +227,12 @@ send_client_removed_element (struct Operation *op,
207 return; 227 return;
208 } 228 }
209 rm->result_status = htons (GNUNET_SET_STATUS_OK); 229 rm->result_status = htons (GNUNET_SET_STATUS_OK);
210 rm->request_id = htonl (op->spec->client_request_id); 230 rm->request_id = htonl (op->client_request_id);
211 rm->element_type = element->element_type; 231 rm->element_type = element->element_type;
212 GNUNET_memcpy (&rm[1], 232 GNUNET_memcpy (&rm[1],
213 element->data, 233 element->data,
214 element->size); 234 element->size);
215 GNUNET_MQ_send (op->spec->set->client_mq, 235 GNUNET_MQ_send (op->set->cs->mq,
216 ev); 236 ev);
217} 237}
218 238
@@ -396,9 +416,9 @@ fail_intersection_operation (struct Operation *op)
396 ev = GNUNET_MQ_msg (msg, 416 ev = GNUNET_MQ_msg (msg,
397 GNUNET_MESSAGE_TYPE_SET_RESULT); 417 GNUNET_MESSAGE_TYPE_SET_RESULT);
398 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 418 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
399 msg->request_id = htonl (op->spec->client_request_id); 419 msg->request_id = htonl (op->client_request_id);
400 msg->element_type = htons (0); 420 msg->element_type = htons (0);
401 GNUNET_MQ_send (op->spec->set->client_mq, 421 GNUNET_MQ_send (op->set->cs->mq,
402 ev); 422 ev);
403 _GSS_operation_destroy (op, 423 _GSS_operation_destroy (op,
404 GNUNET_YES); 424 GNUNET_YES);
@@ -427,8 +447,8 @@ send_bloomfilter (struct Operation *op)
427 should use more bits to maximize its set reduction 447 should use more bits to maximize its set reduction
428 potential and minimize overall bandwidth consumption. */ 448 potential and minimize overall bandwidth consumption. */
429 bf_elementbits = 2 + ceil (log2((double) 449 bf_elementbits = 2 + ceil (log2((double)
430 (op->spec->remote_element_count / 450 (op->remote_element_count /
431 (double) op->state->my_element_count))); 451 (double) op->state->my_element_count)));
432 if (bf_elementbits < 1) 452 if (bf_elementbits < 1)
433 bf_elementbits = 1; /* make sure k is not 0 */ 453 bf_elementbits = 1; /* make sure k is not 0 */
434 /* optimize BF-size to ~50% of bits set */ 454 /* optimize BF-size to ~50% of bits set */
@@ -514,12 +534,14 @@ send_client_done_and_destroy (void *cls)
514 struct GNUNET_MQ_Envelope *ev; 534 struct GNUNET_MQ_Envelope *ev;
515 struct GNUNET_SET_ResultMessage *rm; 535 struct GNUNET_SET_ResultMessage *rm;
516 536
537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538 "Intersection succeeded, sending DONE to local client\n");
517 ev = GNUNET_MQ_msg (rm, 539 ev = GNUNET_MQ_msg (rm,
518 GNUNET_MESSAGE_TYPE_SET_RESULT); 540 GNUNET_MESSAGE_TYPE_SET_RESULT);
519 rm->request_id = htonl (op->spec->client_request_id); 541 rm->request_id = htonl (op->client_request_id);
520 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 542 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
521 rm->element_type = htons (0); 543 rm->element_type = htons (0);
522 GNUNET_MQ_send (op->spec->set->client_mq, 544 GNUNET_MQ_send (op->set->cs->mq,
523 ev); 545 ev);
524 _GSS_operation_destroy (op, 546 _GSS_operation_destroy (op,
525 GNUNET_YES); 547 GNUNET_YES);
@@ -527,6 +549,53 @@ send_client_done_and_destroy (void *cls)
527 549
528 550
529/** 551/**
552 * Remember that we are done dealing with the local client
553 * AND have sent the other peer our message that we are done,
554 * so we are not just waiting for the channel to die before
555 * telling the local client that we are done as our last act.
556 *
557 * @param cls the `struct Operation`.
558 */
559static void
560finished_local_operations (void *cls)
561{
562 struct Operation *op = cls;
563
564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565 "DONE sent to other peer, now waiting for other end to close the channel\n");
566 op->state->phase = PHASE_FINISHED;
567 op->state->channel_death_expected = GNUNET_YES;
568}
569
570
571/**
572 * Notify the other peer that we are done. Once this message
573 * is out, we still need to notify the local client that we
574 * are done.
575 *
576 * @param op operation to notify for.
577 */
578static void
579send_p2p_done (struct Operation *op)
580{
581 struct GNUNET_MQ_Envelope *ev;
582 struct IntersectionDoneMessage *idm;
583
584 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
585 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
586 ev = GNUNET_MQ_msg (idm,
587 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
588 idm->final_element_count = htonl (op->state->my_element_count);
589 idm->element_xor_hash = op->state->my_xor;
590 GNUNET_MQ_notify_sent (ev,
591 &finished_local_operations,
592 op);
593 GNUNET_MQ_send (op->mq,
594 ev);
595}
596
597
598/**
530 * Send all elements in the full result iterator. 599 * Send all elements in the full result iterator.
531 * 600 *
532 * @param cls the `struct Operation *` 601 * @param cls the `struct Operation *`
@@ -549,8 +618,21 @@ send_remaining_elements (void *cls)
549 { 618 {
550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551 "Sending done and destroy because iterator ran out\n"); 620 "Sending done and destroy because iterator ran out\n");
552 op->keep--; 621 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
553 send_client_done_and_destroy (op); 622 op->state->full_result_iter = NULL;
623 if (PHASE_DONE_RECEIVED == op->state->phase)
624 {
625 op->state->phase = PHASE_FINISHED;
626 send_client_done_and_destroy (op);
627 }
628 else if (PHASE_MUST_SEND_DONE == op->state->phase)
629 {
630 send_p2p_done (op);
631 }
632 else
633 {
634 GNUNET_assert (0);
635 }
554 return; 636 return;
555 } 637 }
556 ee = nxt; 638 ee = nxt;
@@ -559,48 +641,136 @@ send_remaining_elements (void *cls)
559 "Sending element %s:%u to client (full set)\n", 641 "Sending element %s:%u to client (full set)\n",
560 GNUNET_h2s (&ee->element_hash), 642 GNUNET_h2s (&ee->element_hash),
561 element->size); 643 element->size);
562 GNUNET_assert (0 != op->spec->client_request_id); 644 GNUNET_assert (0 != op->client_request_id);
563 ev = GNUNET_MQ_msg_extra (rm, 645 ev = GNUNET_MQ_msg_extra (rm,
564 element->size, 646 element->size,
565 GNUNET_MESSAGE_TYPE_SET_RESULT); 647 GNUNET_MESSAGE_TYPE_SET_RESULT);
566 GNUNET_assert (NULL != ev); 648 GNUNET_assert (NULL != ev);
567 rm->result_status = htons (GNUNET_SET_STATUS_OK); 649 rm->result_status = htons (GNUNET_SET_STATUS_OK);
568 rm->request_id = htonl (op->spec->client_request_id); 650 rm->request_id = htonl (op->client_request_id);
569 rm->element_type = element->element_type; 651 rm->element_type = element->element_type;
570 GNUNET_memcpy (&rm[1], 652 GNUNET_memcpy (&rm[1],
571 element->data, 653 element->data,
572 element->size); 654 element->size);
573 GNUNET_MQ_notify_sent (ev, 655 GNUNET_MQ_notify_sent (ev,
574 &send_remaining_elements, 656 &send_remaining_elements,
575 op); 657 op);
576 GNUNET_MQ_send (op->spec->set->client_mq, 658 GNUNET_MQ_send (op->set->cs->mq,
577 ev); 659 ev);
578} 660}
579 661
580 662
581/** 663/**
582 * Inform the peer that this operation is complete. 664 * Fills the "my_elements" hashmap with the initial set of
665 * (non-deleted) elements from the set of the specification.
583 * 666 *
584 * @param op the intersection operation to fail 667 * @param cls closure with the `struct Operation *`
668 * @param key current key code for the element
669 * @param value value in the hash map with the `struct ElementEntry *`
670 * @return #GNUNET_YES (we should continue to iterate)
671 */
672static int
673initialize_map_unfiltered (void *cls,
674 const struct GNUNET_HashCode *key,
675 void *value)
676{
677 struct ElementEntry *ee = value;
678 struct Operation *op = cls;
679
680 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
681 return GNUNET_YES; /* element not live in operation's generation */
682 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
683 &ee->element_hash,
684 &op->state->my_xor);
685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686 "Initial full initialization of my_elements, adding %s:%u\n",
687 GNUNET_h2s (&ee->element_hash),
688 ee->element.size);
689 GNUNET_break (GNUNET_YES ==
690 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
691 &ee->element_hash,
692 ee,
693 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
694 return GNUNET_YES;
695}
696
697
698/**
699 * Send our element count to the peer, in case our element count is
700 * lower than his.
701 *
702 * @param op intersection operation
585 */ 703 */
586static void 704static void
587send_peer_done (struct Operation *op) 705send_element_count (struct Operation *op)
588{ 706{
589 struct GNUNET_MQ_Envelope *ev; 707 struct GNUNET_MQ_Envelope *ev;
590 struct IntersectionDoneMessage *idm; 708 struct IntersectionElementInfoMessage *msg;
591 709
592 op->state->phase = PHASE_FINISHED;
593 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594 "Intersection succeeded, sending DONE\n"); 711 "Sending our element count (%u)\n",
595 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); 712 op->state->my_element_count);
596 op->state->local_bf = NULL; 713 ev = GNUNET_MQ_msg (msg,
714 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
715 msg->sender_element_count = htonl (op->state->my_element_count);
716 GNUNET_MQ_send (op->mq, ev);
717}
597 718
598 ev = GNUNET_MQ_msg (idm, 719
599 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); 720/**
600 idm->final_element_count = htonl (op->state->my_element_count); 721 * We go first, initialize our map with all elements and
601 idm->element_xor_hash = op->state->my_xor; 722 * send the first Bloom filter.
602 GNUNET_MQ_send (op->mq, 723 *
603 ev); 724 * @param op operation to start exchange for
725 */
726static void
727begin_bf_exchange (struct Operation *op)
728{
729 op->state->phase = PHASE_BF_EXCHANGE;
730 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
731 &initialize_map_unfiltered,
732 op);
733 send_bloomfilter (op);
734}
735
736
737/**
738 * Handle the initial `struct IntersectionElementInfoMessage` from a
739 * remote peer.
740 *
741 * @param cls the intersection operation
742 * @param mh the header of the message
743 */
744void
745handle_intersection_p2p_element_info (void *cls,
746 const struct IntersectionElementInfoMessage *msg)
747{
748 struct Operation *op = cls;
749
750 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
751 {
752 GNUNET_break_op (0);
753 fail_intersection_operation(op);
754 return;
755 }
756 op->remote_element_count = ntohl (msg->sender_element_count);
757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758 "Received remote element count (%u), I have %u\n",
759 op->remote_element_count,
760 op->state->my_element_count);
761 if ( ( (PHASE_INITIAL != op->state->phase) &&
762 (PHASE_COUNT_SENT != op->state->phase) ) ||
763 (op->state->my_element_count > op->remote_element_count) ||
764 (0 == op->state->my_element_count) ||
765 (0 == op->remote_element_count) )
766 {
767 GNUNET_break_op (0);
768 fail_intersection_operation(op);
769 return;
770 }
771 GNUNET_break (NULL == op->state->remote_bf);
772 begin_bf_exchange (op);
773 GNUNET_CADET_receive_done (op->channel);
604} 774}
605 775
606 776
@@ -615,9 +785,9 @@ process_bf (struct Operation *op)
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", 786 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
617 op->state->phase, 787 op->state->phase,
618 op->spec->remote_element_count, 788 op->remote_element_count,
619 op->state->my_element_count, 789 op->state->my_element_count,
620 GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); 790 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
621 switch (op->state->phase) 791 switch (op->state->phase)
622 { 792 {
623 case PHASE_INITIAL: 793 case PHASE_INITIAL:
@@ -627,11 +797,8 @@ process_bf (struct Operation *op)
627 case PHASE_COUNT_SENT: 797 case PHASE_COUNT_SENT:
628 /* This is the first BF being sent, build our initial map with 798 /* This is the first BF being sent, build our initial map with
629 filtering in place */ 799 filtering in place */
630 op->state->my_elements
631 = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
632 GNUNET_YES);
633 op->state->my_element_count = 0; 800 op->state->my_element_count = 0;
634 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, 801 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
635 &filtered_map_initialization, 802 &filtered_map_initialization,
636 op); 803 op);
637 break; 804 break;
@@ -641,6 +808,14 @@ process_bf (struct Operation *op)
641 &iterator_bf_reduce, 808 &iterator_bf_reduce,
642 op); 809 op);
643 break; 810 break;
811 case PHASE_MUST_SEND_DONE:
812 GNUNET_break_op (0);
813 fail_intersection_operation(op);
814 return;
815 case PHASE_DONE_RECEIVED:
816 GNUNET_break_op (0);
817 fail_intersection_operation(op);
818 return;
644 case PHASE_FINISHED: 819 case PHASE_FINISHED:
645 GNUNET_break_op (0); 820 GNUNET_break_op (0);
646 fail_intersection_operation(op); 821 fail_intersection_operation(op);
@@ -650,13 +825,28 @@ process_bf (struct Operation *op)
650 op->state->remote_bf = NULL; 825 op->state->remote_bf = NULL;
651 826
652 if ( (0 == op->state->my_element_count) || /* fully disjoint */ 827 if ( (0 == op->state->my_element_count) || /* fully disjoint */
653 ( (op->state->my_element_count == op->spec->remote_element_count) && 828 ( (op->state->my_element_count == op->remote_element_count) &&
654 (0 == memcmp (&op->state->my_xor, 829 (0 == memcmp (&op->state->my_xor,
655 &op->state->other_xor, 830 &op->state->other_xor,
656 sizeof (struct GNUNET_HashCode))) ) ) 831 sizeof (struct GNUNET_HashCode))) ) )
657 { 832 {
658 /* we are done */ 833 /* we are done */
659 send_peer_done (op); 834 op->state->phase = PHASE_MUST_SEND_DONE;
835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836 "Intersection succeeded, sending DONE to other peer\n");
837 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
838 op->state->local_bf = NULL;
839 if (GNUNET_SET_RESULT_FULL == op->result_mode)
840 {
841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842 "Sending full result set (%u elements)\n",
843 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
844 op->state->full_result_iter
845 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
846 send_remaining_elements (op);
847 return;
848 }
849 send_p2p_done (op);
660 return; 850 return;
661 } 851 }
662 op->state->phase = PHASE_BF_EXCHANGE; 852 op->state->phase = PHASE_BF_EXCHANGE;
@@ -665,41 +855,53 @@ process_bf (struct Operation *op)
665 855
666 856
667/** 857/**
858 * Check an BF message from a remote peer.
859 *
860 * @param cls the intersection operation
861 * @param msg the header of the message
862 * @return #GNUNET_OK if @a msg is well-formed
863 */
864int
865check_intersection_p2p_bf (void *cls,
866 const struct BFMessage *msg)
867{
868 struct Operation *op = cls;
869
870 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
871 {
872 GNUNET_break_op (0);
873 return GNUNET_SYSERR;
874 }
875 return GNUNET_OK;
876}
877
878
879/**
668 * Handle an BF message from a remote peer. 880 * Handle an BF message from a remote peer.
669 * 881 *
670 * @param cls the intersection operation 882 * @param cls the intersection operation
671 * @param mh the header of the message 883 * @param msg the header of the message
672 */ 884 */
673static void 885void
674handle_p2p_bf (void *cls, 886handle_intersection_p2p_bf (void *cls,
675 const struct GNUNET_MessageHeader *mh) 887 const struct BFMessage *msg)
676{ 888{
677 struct Operation *op = cls; 889 struct Operation *op = cls;
678 const struct BFMessage *msg;
679 uint32_t bf_size; 890 uint32_t bf_size;
680 uint32_t chunk_size; 891 uint32_t chunk_size;
681 uint32_t bf_bits_per_element; 892 uint32_t bf_bits_per_element;
682 uint16_t msize;
683 893
684 msize = htons (mh->size);
685 if (msize < sizeof (struct BFMessage))
686 {
687 GNUNET_break_op (0);
688 fail_intersection_operation (op);
689 return;
690 }
691 msg = (const struct BFMessage *) mh;
692 switch (op->state->phase) 894 switch (op->state->phase)
693 { 895 {
694 case PHASE_INITIAL: 896 case PHASE_INITIAL:
695 GNUNET_break_op (0); 897 GNUNET_break_op (0);
696 fail_intersection_operation (op); 898 fail_intersection_operation (op);
697 break; 899 return;
698 case PHASE_COUNT_SENT: 900 case PHASE_COUNT_SENT:
699 case PHASE_BF_EXCHANGE: 901 case PHASE_BF_EXCHANGE:
700 bf_size = ntohl (msg->bloomfilter_total_length); 902 bf_size = ntohl (msg->bloomfilter_total_length);
701 bf_bits_per_element = ntohl (msg->bits_per_element); 903 bf_bits_per_element = ntohl (msg->bits_per_element);
702 chunk_size = msize - sizeof (struct BFMessage); 904 chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
703 op->state->other_xor = msg->element_xor_hash; 905 op->state->other_xor = msg->element_xor_hash;
704 if (bf_size == chunk_size) 906 if (bf_size == chunk_size)
705 { 907 {
@@ -715,9 +917,9 @@ handle_p2p_bf (void *cls,
715 bf_size, 917 bf_size,
716 bf_bits_per_element); 918 bf_bits_per_element);
717 op->state->salt = ntohl (msg->sender_mutator); 919 op->state->salt = ntohl (msg->sender_mutator);
718 op->spec->remote_element_count = ntohl (msg->sender_element_count); 920 op->remote_element_count = ntohl (msg->sender_element_count);
719 process_bf (op); 921 process_bf (op);
720 return; 922 break;
721 } 923 }
722 /* multipart chunk */ 924 /* multipart chunk */
723 if (NULL == op->state->bf_data) 925 if (NULL == op->state->bf_data)
@@ -728,7 +930,7 @@ handle_p2p_bf (void *cls,
728 op->state->bf_bits_per_element = bf_bits_per_element; 930 op->state->bf_bits_per_element = bf_bits_per_element;
729 op->state->bf_data_offset = 0; 931 op->state->bf_data_offset = 0;
730 op->state->salt = ntohl (msg->sender_mutator); 932 op->state->salt = ntohl (msg->sender_mutator);
731 op->spec->remote_element_count = ntohl (msg->sender_element_count); 933 op->remote_element_count = ntohl (msg->sender_element_count);
732 } 934 }
733 else 935 else
734 { 936 {
@@ -737,7 +939,7 @@ handle_p2p_bf (void *cls,
737 (op->state->bf_bits_per_element != bf_bits_per_element) || 939 (op->state->bf_bits_per_element != bf_bits_per_element) ||
738 (op->state->bf_data_offset + chunk_size > bf_size) || 940 (op->state->bf_data_offset + chunk_size > bf_size) ||
739 (op->state->salt != ntohl (msg->sender_mutator)) || 941 (op->state->salt != ntohl (msg->sender_mutator)) ||
740 (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) 942 (op->remote_element_count != ntohl (msg->sender_element_count)) )
741 { 943 {
742 GNUNET_break_op (0); 944 GNUNET_break_op (0);
743 fail_intersection_operation (op); 945 fail_intersection_operation (op);
@@ -764,153 +966,9 @@ handle_p2p_bf (void *cls,
764 default: 966 default:
765 GNUNET_break_op (0); 967 GNUNET_break_op (0);
766 fail_intersection_operation (op); 968 fail_intersection_operation (op);
767 break;
768 }
769}
770
771
772/**
773 * Fills the "my_elements" hashmap with the initial set of
774 * (non-deleted) elements from the set of the specification.
775 *
776 * @param cls closure with the `struct Operation *`
777 * @param key current key code for the element
778 * @param value value in the hash map with the `struct ElementEntry *`
779 * @return #GNUNET_YES (we should continue to iterate)
780 */
781static int
782initialize_map_unfiltered (void *cls,
783 const struct GNUNET_HashCode *key,
784 void *value)
785{
786 struct ElementEntry *ee = value;
787 struct Operation *op = cls;
788
789 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
790 return GNUNET_YES; /* element not live in operation's generation */
791 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
792 &ee->element_hash,
793 &op->state->my_xor);
794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795 "Initial full initialization of my_elements, adding %s:%u\n",
796 GNUNET_h2s (&ee->element_hash),
797 ee->element.size);
798 GNUNET_break (GNUNET_YES ==
799 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
800 &ee->element_hash,
801 ee,
802 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
803 return GNUNET_YES;
804}
805
806
807/**
808 * Send our element count to the peer, in case our element count is
809 * lower than his.
810 *
811 * @param op intersection operation
812 */
813static void
814send_element_count (struct Operation *op)
815{
816 struct GNUNET_MQ_Envelope *ev;
817 struct IntersectionElementInfoMessage *msg;
818
819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820 "Sending our element count (%u)\n",
821 op->state->my_element_count);
822 ev = GNUNET_MQ_msg (msg,
823 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
824 msg->sender_element_count = htonl (op->state->my_element_count);
825 GNUNET_MQ_send (op->mq, ev);
826}
827
828
829/**
830 * We go first, initialize our map with all elements and
831 * send the first Bloom filter.
832 *
833 * @param op operation to start exchange for
834 */
835static void
836begin_bf_exchange (struct Operation *op)
837{
838 op->state->phase = PHASE_BF_EXCHANGE;
839 op->state->my_elements
840 = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
841 GNUNET_YES);
842 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
843 &initialize_map_unfiltered,
844 op);
845 send_bloomfilter (op);
846}
847
848
849/**
850 * Handle the initial `struct IntersectionElementInfoMessage` from a
851 * remote peer.
852 *
853 * @param cls the intersection operation
854 * @param mh the header of the message
855 */
856static void
857handle_p2p_element_info (void *cls,
858 const struct GNUNET_MessageHeader *mh)
859{
860 struct Operation *op = cls;
861 const struct IntersectionElementInfoMessage *msg;
862
863 if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
864 {
865 GNUNET_break_op (0);
866 fail_intersection_operation(op);
867 return;
868 }
869 msg = (const struct IntersectionElementInfoMessage *) mh;
870 op->spec->remote_element_count = ntohl (msg->sender_element_count);
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "Received remote element count (%u), I have %u\n",
873 op->spec->remote_element_count,
874 op->state->my_element_count);
875 if ( ( (PHASE_INITIAL != op->state->phase) &&
876 (PHASE_COUNT_SENT != op->state->phase) ) ||
877 (op->state->my_element_count > op->spec->remote_element_count) ||
878 (0 == op->state->my_element_count) ||
879 (0 == op->spec->remote_element_count) )
880 {
881 GNUNET_break_op (0);
882 fail_intersection_operation(op);
883 return;
884 }
885 GNUNET_break (NULL == op->state->remote_bf);
886 begin_bf_exchange (op);
887}
888
889
890/**
891 * Send a result message to the client indicating that the operation
892 * is over. After the result done message has been sent to the
893 * client, destroy the evaluate operation.
894 *
895 * @param op intersection operation
896 */
897static void
898finish_and_destroy (struct Operation *op)
899{
900 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
901
902 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
903 {
904 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905 "Sending full result set (%u elements)\n",
906 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
907 op->state->full_result_iter
908 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
909 op->keep++;
910 send_remaining_elements (op);
911 return; 969 return;
912 } 970 }
913 send_client_done_and_destroy (op); 971 GNUNET_CADET_receive_done (op->channel);
914} 972}
915 973
916 974
@@ -955,28 +1013,26 @@ filter_all (void *cls,
955 * @param cls the intersection operation 1013 * @param cls the intersection operation
956 * @param mh the message 1014 * @param mh the message
957 */ 1015 */
958static void 1016void
959handle_p2p_done (void *cls, 1017handle_intersection_p2p_done (void *cls,
960 const struct GNUNET_MessageHeader *mh) 1018 const struct IntersectionDoneMessage *idm)
961{ 1019{
962 struct Operation *op = cls; 1020 struct Operation *op = cls;
963 const struct IntersectionDoneMessage *idm;
964 1021
965 if (PHASE_BF_EXCHANGE != op->state->phase) 1022 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
966 { 1023 {
967 /* wrong phase to conclude? FIXME: Or should we allow this
968 if the other peer has _initially_ already an empty set? */
969 GNUNET_break_op (0); 1024 GNUNET_break_op (0);
970 fail_intersection_operation (op); 1025 fail_intersection_operation (op);
971 return; 1026 return;
972 } 1027 }
973 if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) 1028 if (PHASE_BF_EXCHANGE != op->state->phase)
974 { 1029 {
1030 /* wrong phase to conclude? FIXME: Or should we allow this
1031 if the other peer has _initially_ already an empty set? */
975 GNUNET_break_op (0); 1032 GNUNET_break_op (0);
976 fail_intersection_operation (op); 1033 fail_intersection_operation (op);
977 return; 1034 return;
978 } 1035 }
979 idm = (const struct IntersectionDoneMessage *) mh;
980 if (0 == ntohl (idm->final_element_count)) 1036 if (0 == ntohl (idm->final_element_count))
981 { 1037 {
982 /* other peer determined empty set is the intersection, 1038 /* other peer determined empty set is the intersection,
@@ -998,8 +1054,22 @@ handle_p2p_done (void *cls,
998 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1054 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
999 "Got IntersectionDoneMessage, have %u elements in intersection\n", 1055 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1000 op->state->my_element_count); 1056 op->state->my_element_count);
1057 op->state->phase = PHASE_DONE_RECEIVED;
1058 GNUNET_CADET_receive_done (op->channel);
1059
1060 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1061 if (GNUNET_SET_RESULT_FULL == op->result_mode)
1062 {
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Sending full result set to client (%u elements)\n",
1065 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1066 op->state->full_result_iter
1067 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1068 send_remaining_elements (op);
1069 return;
1070 }
1001 op->state->phase = PHASE_FINISHED; 1071 op->state->phase = PHASE_FINISHED;
1002 finish_and_destroy (op); 1072 send_client_done_and_destroy (op);
1003} 1073}
1004 1074
1005 1075
@@ -1010,21 +1080,16 @@ handle_p2p_done (void *cls,
1010 * begin the evaluation 1080 * begin the evaluation
1011 * @param opaque_context message to be transmitted to the listener 1081 * @param opaque_context message to be transmitted to the listener
1012 * to convince him to accept, may be NULL 1082 * to convince him to accept, may be NULL
1083 * @return operation-specific state to keep in @a op
1013 */ 1084 */
1014static void 1085static struct OperationState *
1015intersection_evaluate (struct Operation *op, 1086intersection_evaluate (struct Operation *op,
1016 const struct GNUNET_MessageHeader *opaque_context) 1087 const struct GNUNET_MessageHeader *opaque_context)
1017{ 1088{
1089 struct OperationState *state;
1018 struct GNUNET_MQ_Envelope *ev; 1090 struct GNUNET_MQ_Envelope *ev;
1019 struct OperationRequestMessage *msg; 1091 struct OperationRequestMessage *msg;
1020 1092
1021 op->state = GNUNET_new (struct OperationState);
1022 /* we started the operation, thus we have to send the operation request */
1023 op->state->phase = PHASE_INITIAL;
1024 op->state->my_element_count = op->spec->set->state->current_set_element_count;
1025
1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027 "Initiating intersection operation evaluation\n");
1028 ev = GNUNET_MQ_msg_nested_mh (msg, 1093 ev = GNUNET_MQ_msg_nested_mh (msg,
1029 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1094 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1030 opaque_context); 1095 opaque_context);
@@ -1032,20 +1097,30 @@ intersection_evaluate (struct Operation *op,
1032 { 1097 {
1033 /* the context message is too large!? */ 1098 /* the context message is too large!? */
1034 GNUNET_break (0); 1099 GNUNET_break (0);
1035 GNUNET_SERVER_client_disconnect (op->spec->set->client); 1100 return NULL;
1036 return;
1037 } 1101 }
1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103 "Initiating intersection operation evaluation\n");
1104 state = GNUNET_new (struct OperationState);
1105 /* we started the operation, thus we have to send the operation request */
1106 state->phase = PHASE_INITIAL;
1107 state->my_element_count = op->set->state->current_set_element_count;
1108 state->my_elements
1109 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1110 GNUNET_YES);
1111
1038 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); 1112 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1039 msg->element_count = htonl (op->state->my_element_count); 1113 msg->element_count = htonl (state->my_element_count);
1040 GNUNET_MQ_send (op->mq, 1114 GNUNET_MQ_send (op->mq,
1041 ev); 1115 ev);
1042 op->state->phase = PHASE_COUNT_SENT; 1116 state->phase = PHASE_COUNT_SENT;
1043 if (NULL != opaque_context) 1117 if (NULL != opaque_context)
1044 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1118 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1045 "Sent op request with context message\n"); 1119 "Sent op request with context message\n");
1046 else 1120 else
1047 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1121 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048 "Sent op request without context message\n"); 1122 "Sent op request without context message\n");
1123 return state;
1049} 1124}
1050 1125
1051 1126
@@ -1055,90 +1130,33 @@ intersection_evaluate (struct Operation *op,
1055 * 1130 *
1056 * @param op operation that will be accepted as an intersection operation 1131 * @param op operation that will be accepted as an intersection operation
1057 */ 1132 */
1058static void 1133static struct OperationState *
1059intersection_accept (struct Operation *op) 1134intersection_accept (struct Operation *op)
1060{ 1135{
1136 struct OperationState *state;
1137
1061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062 "Accepting set intersection operation\n"); 1139 "Accepting set intersection operation\n");
1063 op->state = GNUNET_new (struct OperationState); 1140 state = GNUNET_new (struct OperationState);
1064 op->state->phase = PHASE_INITIAL; 1141 state->phase = PHASE_INITIAL;
1065 op->state->my_element_count 1142 state->my_element_count
1066 = op->spec->set->state->current_set_element_count; 1143 = op->set->state->current_set_element_count;
1067 op->state->my_elements 1144 state->my_elements
1068 = GNUNET_CONTAINER_multihashmap_create 1145 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1069 (GNUNET_MIN (op->state->my_element_count, 1146 op->remote_element_count),
1070 op->spec->remote_element_count), 1147 GNUNET_YES);
1071 GNUNET_YES); 1148 op->state = state;
1072 if (op->spec->remote_element_count < op->state->my_element_count) 1149 if (op->remote_element_count < state->my_element_count)
1073 { 1150 {
1074 /* If the other peer (Alice) has fewer elements than us (Bob), 1151 /* If the other peer (Alice) has fewer elements than us (Bob),
1075 we just send the count as Alice should send the first BF */ 1152 we just send the count as Alice should send the first BF */
1076 send_element_count (op); 1153 send_element_count (op);
1077 op->state->phase = PHASE_COUNT_SENT; 1154 state->phase = PHASE_COUNT_SENT;
1078 return; 1155 return state;
1079 } 1156 }
1080 /* We have fewer elements, so we start with the BF */ 1157 /* We have fewer elements, so we start with the BF */
1081 begin_bf_exchange (op); 1158 begin_bf_exchange (op);
1082} 1159 return state;
1083
1084
1085/**
1086 * Dispatch messages for a intersection operation.
1087 *
1088 * @param op the state of the intersection evaluate operation
1089 * @param mh the received message
1090 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1091 * #GNUNET_OK otherwise
1092 */
1093static int
1094intersection_handle_p2p_message (struct Operation *op,
1095 const struct GNUNET_MessageHeader *mh)
1096{
1097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098 "Received p2p message (t: %u, s: %u)\n",
1099 ntohs (mh->type), ntohs (mh->size));
1100 switch (ntohs (mh->type))
1101 {
1102 /* this message handler is not active until after we received an
1103 * operation request message, thus the ops request is not handled here
1104 */
1105 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1106 handle_p2p_element_info (op, mh);
1107 break;
1108 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1109 handle_p2p_bf (op, mh);
1110 break;
1111 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
1112 handle_p2p_done (op, mh);
1113 break;
1114 default:
1115 /* something wrong with cadet's message handlers? */
1116 GNUNET_assert (0);
1117 }
1118 return GNUNET_OK;
1119}
1120
1121
1122/**
1123 * Handler for peer-disconnects, notifies the client about the aborted
1124 * operation. If we did not expect anything from the other peer, we
1125 * gracefully terminate the operation.
1126 *
1127 * @param op the destroyed operation
1128 */
1129static void
1130intersection_peer_disconnect (struct Operation *op)
1131{
1132 if (PHASE_FINISHED != op->state->phase)
1133 {
1134 fail_intersection_operation (op);
1135 return;
1136 }
1137 /* the session has already been concluded */
1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139 "Other peer disconnected (finished)\n");
1140 if (GNUNET_NO == op->state->client_done_sent)
1141 finish_and_destroy (op);
1142} 1160}
1143 1161
1144 1162
@@ -1168,6 +1186,11 @@ intersection_op_cancel (struct Operation *op)
1168 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); 1186 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1169 op->state->my_elements = NULL; 1187 op->state->my_elements = NULL;
1170 } 1188 }
1189 if (NULL != op->state->full_result_iter)
1190 {
1191 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
1192 op->state->full_result_iter = NULL;
1193 }
1171 GNUNET_free (op->state); 1194 GNUNET_free (op->state);
1172 op->state = NULL; 1195 op->state = NULL;
1173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1236,6 +1259,28 @@ intersection_remove (struct SetState *set_state,
1236 1259
1237 1260
1238/** 1261/**
1262 * Callback for channel death for the intersection operation.
1263 *
1264 * @param op operation that lost the channel
1265 */
1266static void
1267intersection_channel_death (struct Operation *op)
1268{
1269 if (GNUNET_YES == op->state->channel_death_expected)
1270 {
1271 /* oh goodie, we are done! */
1272 send_client_done_and_destroy (op);
1273 }
1274 else
1275 {
1276 /* sorry, channel went down early, too bad. */
1277 _GSS_operation_destroy (op,
1278 GNUNET_YES);
1279 }
1280}
1281
1282
1283/**
1239 * Get the table with implementing functions for set intersection. 1284 * Get the table with implementing functions for set intersection.
1240 * 1285 *
1241 * @return the operation specific VTable 1286 * @return the operation specific VTable
@@ -1245,14 +1290,13 @@ _GSS_intersection_vt ()
1245{ 1290{
1246 static const struct SetVT intersection_vt = { 1291 static const struct SetVT intersection_vt = {
1247 .create = &intersection_set_create, 1292 .create = &intersection_set_create,
1248 .msg_handler = &intersection_handle_p2p_message,
1249 .add = &intersection_add, 1293 .add = &intersection_add,
1250 .remove = &intersection_remove, 1294 .remove = &intersection_remove,
1251 .destroy_set = &intersection_set_destroy, 1295 .destroy_set = &intersection_set_destroy,
1252 .evaluate = &intersection_evaluate, 1296 .evaluate = &intersection_evaluate,
1253 .accept = &intersection_accept, 1297 .accept = &intersection_accept,
1254 .peer_disconnect = &intersection_peer_disconnect,
1255 .cancel = &intersection_op_cancel, 1298 .cancel = &intersection_op_cancel,
1299 .channel_death = &intersection_channel_death,
1256 }; 1300 };
1257 1301
1258 return &intersection_vt; 1302 return &intersection_vt;