diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-03-13 01:24:22 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-03-13 01:24:34 +0100 |
commit | bf6f552fdefe75425635f66343f98995e2f602f6 (patch) | |
tree | add6ea146823579a137763b78e89839ff97b3902 /src/set/gnunet-service-set_intersection.c | |
parent | a9a5994e518ded483edb87513d5197b6539ed4ff (diff) | |
download | gnunet-bf6f552fdefe75425635f66343f98995e2f602f6.tar.gz gnunet-bf6f552fdefe75425635f66343f98995e2f602f6.zip |
major clean up and bugfixes of SET
Diffstat (limited to 'src/set/gnunet-service-set_intersection.c')
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 542 |
1 files changed, 304 insertions, 238 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 8307672b9..9dc421792 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -56,6 +56,18 @@ enum IntersectionOperationPhase | |||
56 | PHASE_BF_EXCHANGE, | 56 | PHASE_BF_EXCHANGE, |
57 | 57 | ||
58 | /** | 58 | /** |
59 | * We must next send the P2P DONE message (after finishing mostly | ||
60 | * with the local client). Then we will wait for the channel to close. | ||
61 | */ | ||
62 | PHASE_MUST_SEND_DONE, | ||
63 | |||
64 | /** | ||
65 | * We have received the P2P DONE message, and must finish with the | ||
66 | * local client before terminating the channel. | ||
67 | */ | ||
68 | PHASE_DONE_RECEIVED, | ||
69 | |||
70 | /** | ||
59 | * The protocol is over. Results may still have to be sent to the | 71 | * The protocol is over. Results may still have to be sent to the |
60 | * client. | 72 | * client. |
61 | */ | 73 | */ |
@@ -162,6 +174,13 @@ struct OperationState | |||
162 | * Did we send the client that we are done? | 174 | * Did we send the client that we are done? |
163 | */ | 175 | */ |
164 | int client_done_sent; | 176 | int client_done_sent; |
177 | |||
178 | /** | ||
179 | * Set whenever we reach the state where the death of the | ||
180 | * channel is perfectly find and should NOT result in the | ||
181 | * operation being cancelled. | ||
182 | */ | ||
183 | int channel_death_expected; | ||
165 | }; | 184 | }; |
166 | 185 | ||
167 | 186 | ||
@@ -193,12 +212,12 @@ send_client_removed_element (struct Operation *op, | |||
193 | struct GNUNET_MQ_Envelope *ev; | 212 | struct GNUNET_MQ_Envelope *ev; |
194 | struct GNUNET_SET_ResultMessage *rm; | 213 | struct GNUNET_SET_ResultMessage *rm; |
195 | 214 | ||
196 | if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) | 215 | if (GNUNET_SET_RESULT_REMOVED != op->result_mode) |
197 | return; /* Wrong mode for transmitting removed elements */ | 216 | return; /* Wrong mode for transmitting removed elements */ |
198 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 217 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
199 | "Sending removed element (size %u) to client\n", | 218 | "Sending removed element (size %u) to client\n", |
200 | element->size); | 219 | element->size); |
201 | GNUNET_assert (0 != op->spec->client_request_id); | 220 | GNUNET_assert (0 != op->client_request_id); |
202 | ev = GNUNET_MQ_msg_extra (rm, | 221 | ev = GNUNET_MQ_msg_extra (rm, |
203 | element->size, | 222 | element->size, |
204 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 223 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
@@ -208,12 +227,12 @@ send_client_removed_element (struct Operation *op, | |||
208 | return; | 227 | return; |
209 | } | 228 | } |
210 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 229 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
211 | rm->request_id = htonl (op->spec->client_request_id); | 230 | rm->request_id = htonl (op->client_request_id); |
212 | rm->element_type = element->element_type; | 231 | rm->element_type = element->element_type; |
213 | GNUNET_memcpy (&rm[1], | 232 | GNUNET_memcpy (&rm[1], |
214 | element->data, | 233 | element->data, |
215 | element->size); | 234 | element->size); |
216 | GNUNET_MQ_send (op->spec->set->client_mq, | 235 | GNUNET_MQ_send (op->set->cs->mq, |
217 | ev); | 236 | ev); |
218 | } | 237 | } |
219 | 238 | ||
@@ -397,9 +416,9 @@ fail_intersection_operation (struct Operation *op) | |||
397 | ev = GNUNET_MQ_msg (msg, | 416 | ev = GNUNET_MQ_msg (msg, |
398 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 417 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
399 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 418 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
400 | msg->request_id = htonl (op->spec->client_request_id); | 419 | msg->request_id = htonl (op->client_request_id); |
401 | msg->element_type = htons (0); | 420 | msg->element_type = htons (0); |
402 | GNUNET_MQ_send (op->spec->set->client_mq, | 421 | GNUNET_MQ_send (op->set->cs->mq, |
403 | ev); | 422 | ev); |
404 | _GSS_operation_destroy (op, | 423 | _GSS_operation_destroy (op, |
405 | GNUNET_YES); | 424 | GNUNET_YES); |
@@ -428,8 +447,8 @@ send_bloomfilter (struct Operation *op) | |||
428 | should use more bits to maximize its set reduction | 447 | should use more bits to maximize its set reduction |
429 | potential and minimize overall bandwidth consumption. */ | 448 | potential and minimize overall bandwidth consumption. */ |
430 | bf_elementbits = 2 + ceil (log2((double) | 449 | bf_elementbits = 2 + ceil (log2((double) |
431 | (op->spec->remote_element_count / | 450 | (op->remote_element_count / |
432 | (double) op->state->my_element_count))); | 451 | (double) op->state->my_element_count))); |
433 | if (bf_elementbits < 1) | 452 | if (bf_elementbits < 1) |
434 | bf_elementbits = 1; /* make sure k is not 0 */ | 453 | bf_elementbits = 1; /* make sure k is not 0 */ |
435 | /* optimize BF-size to ~50% of bits set */ | 454 | /* optimize BF-size to ~50% of bits set */ |
@@ -515,12 +534,14 @@ send_client_done_and_destroy (void *cls) | |||
515 | struct GNUNET_MQ_Envelope *ev; | 534 | struct GNUNET_MQ_Envelope *ev; |
516 | struct GNUNET_SET_ResultMessage *rm; | 535 | struct GNUNET_SET_ResultMessage *rm; |
517 | 536 | ||
537 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
538 | "Intersection succeeded, sending DONE to local client\n"); | ||
518 | ev = GNUNET_MQ_msg (rm, | 539 | ev = GNUNET_MQ_msg (rm, |
519 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 540 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
520 | rm->request_id = htonl (op->spec->client_request_id); | 541 | rm->request_id = htonl (op->client_request_id); |
521 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 542 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
522 | rm->element_type = htons (0); | 543 | rm->element_type = htons (0); |
523 | GNUNET_MQ_send (op->spec->set->client_mq, | 544 | GNUNET_MQ_send (op->set->cs->mq, |
524 | ev); | 545 | ev); |
525 | _GSS_operation_destroy (op, | 546 | _GSS_operation_destroy (op, |
526 | GNUNET_YES); | 547 | GNUNET_YES); |
@@ -528,6 +549,53 @@ send_client_done_and_destroy (void *cls) | |||
528 | 549 | ||
529 | 550 | ||
530 | /** | 551 | /** |
552 | * Remember that we are done dealing with the local client | ||
553 | * AND have sent the other peer our message that we are done, | ||
554 | * so we are not just waiting for the channel to die before | ||
555 | * telling the local client that we are done as our last act. | ||
556 | * | ||
557 | * @param cls the `struct Operation`. | ||
558 | */ | ||
559 | static void | ||
560 | finished_local_operations (void *cls) | ||
561 | { | ||
562 | struct Operation *op = cls; | ||
563 | |||
564 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
565 | "DONE sent to other peer, now waiting for other end to close the channel\n"); | ||
566 | op->state->phase = PHASE_FINISHED; | ||
567 | op->state->channel_death_expected = GNUNET_YES; | ||
568 | } | ||
569 | |||
570 | |||
571 | /** | ||
572 | * Notify the other peer that we are done. Once this message | ||
573 | * is out, we still need to notify the local client that we | ||
574 | * are done. | ||
575 | * | ||
576 | * @param op operation to notify for. | ||
577 | */ | ||
578 | static void | ||
579 | send_p2p_done (struct Operation *op) | ||
580 | { | ||
581 | struct GNUNET_MQ_Envelope *ev; | ||
582 | struct IntersectionDoneMessage *idm; | ||
583 | |||
584 | GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); | ||
585 | GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); | ||
586 | ev = GNUNET_MQ_msg (idm, | ||
587 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); | ||
588 | idm->final_element_count = htonl (op->state->my_element_count); | ||
589 | idm->element_xor_hash = op->state->my_xor; | ||
590 | GNUNET_MQ_notify_sent (ev, | ||
591 | &finished_local_operations, | ||
592 | op); | ||
593 | GNUNET_MQ_send (op->mq, | ||
594 | ev); | ||
595 | } | ||
596 | |||
597 | |||
598 | /** | ||
531 | * Send all elements in the full result iterator. | 599 | * Send all elements in the full result iterator. |
532 | * | 600 | * |
533 | * @param cls the `struct Operation *` | 601 | * @param cls the `struct Operation *` |
@@ -550,10 +618,21 @@ send_remaining_elements (void *cls) | |||
550 | { | 618 | { |
551 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 619 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
552 | "Sending done and destroy because iterator ran out\n"); | 620 | "Sending done and destroy because iterator ran out\n"); |
553 | op->keep--; | ||
554 | GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); | 621 | GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); |
555 | op->state->full_result_iter = NULL; | 622 | op->state->full_result_iter = NULL; |
556 | send_client_done_and_destroy (op); | 623 | if (PHASE_DONE_RECEIVED == op->state->phase) |
624 | { | ||
625 | op->state->phase = PHASE_FINISHED; | ||
626 | send_client_done_and_destroy (op); | ||
627 | } | ||
628 | else if (PHASE_MUST_SEND_DONE == op->state->phase) | ||
629 | { | ||
630 | send_p2p_done (op); | ||
631 | } | ||
632 | else | ||
633 | { | ||
634 | GNUNET_assert (0); | ||
635 | } | ||
557 | return; | 636 | return; |
558 | } | 637 | } |
559 | ee = nxt; | 638 | ee = nxt; |
@@ -562,48 +641,136 @@ send_remaining_elements (void *cls) | |||
562 | "Sending element %s:%u to client (full set)\n", | 641 | "Sending element %s:%u to client (full set)\n", |
563 | GNUNET_h2s (&ee->element_hash), | 642 | GNUNET_h2s (&ee->element_hash), |
564 | element->size); | 643 | element->size); |
565 | GNUNET_assert (0 != op->spec->client_request_id); | 644 | GNUNET_assert (0 != op->client_request_id); |
566 | ev = GNUNET_MQ_msg_extra (rm, | 645 | ev = GNUNET_MQ_msg_extra (rm, |
567 | element->size, | 646 | element->size, |
568 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 647 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
569 | GNUNET_assert (NULL != ev); | 648 | GNUNET_assert (NULL != ev); |
570 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 649 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
571 | rm->request_id = htonl (op->spec->client_request_id); | 650 | rm->request_id = htonl (op->client_request_id); |
572 | rm->element_type = element->element_type; | 651 | rm->element_type = element->element_type; |
573 | GNUNET_memcpy (&rm[1], | 652 | GNUNET_memcpy (&rm[1], |
574 | element->data, | 653 | element->data, |
575 | element->size); | 654 | element->size); |
576 | GNUNET_MQ_notify_sent (ev, | 655 | GNUNET_MQ_notify_sent (ev, |
577 | &send_remaining_elements, | 656 | &send_remaining_elements, |
578 | op); | 657 | op); |
579 | GNUNET_MQ_send (op->spec->set->client_mq, | 658 | GNUNET_MQ_send (op->set->cs->mq, |
580 | ev); | 659 | ev); |
581 | } | 660 | } |
582 | 661 | ||
583 | 662 | ||
584 | /** | 663 | /** |
585 | * Inform the peer that this operation is complete. | 664 | * Fills the "my_elements" hashmap with the initial set of |
665 | * (non-deleted) elements from the set of the specification. | ||
586 | * | 666 | * |
587 | * @param op the intersection operation to fail | 667 | * @param cls closure with the `struct Operation *` |
668 | * @param key current key code for the element | ||
669 | * @param value value in the hash map with the `struct ElementEntry *` | ||
670 | * @return #GNUNET_YES (we should continue to iterate) | ||
671 | */ | ||
672 | static int | ||
673 | initialize_map_unfiltered (void *cls, | ||
674 | const struct GNUNET_HashCode *key, | ||
675 | void *value) | ||
676 | { | ||
677 | struct ElementEntry *ee = value; | ||
678 | struct Operation *op = cls; | ||
679 | |||
680 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | ||
681 | return GNUNET_YES; /* element not live in operation's generation */ | ||
682 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
683 | &ee->element_hash, | ||
684 | &op->state->my_xor); | ||
685 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
686 | "Initial full initialization of my_elements, adding %s:%u\n", | ||
687 | GNUNET_h2s (&ee->element_hash), | ||
688 | ee->element.size); | ||
689 | GNUNET_break (GNUNET_YES == | ||
690 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | ||
691 | &ee->element_hash, | ||
692 | ee, | ||
693 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
694 | return GNUNET_YES; | ||
695 | } | ||
696 | |||
697 | |||
698 | /** | ||
699 | * Send our element count to the peer, in case our element count is | ||
700 | * lower than his. | ||
701 | * | ||
702 | * @param op intersection operation | ||
588 | */ | 703 | */ |
589 | static void | 704 | static void |
590 | send_peer_done (struct Operation *op) | 705 | send_element_count (struct Operation *op) |
591 | { | 706 | { |
592 | struct GNUNET_MQ_Envelope *ev; | 707 | struct GNUNET_MQ_Envelope *ev; |
593 | struct IntersectionDoneMessage *idm; | 708 | struct IntersectionElementInfoMessage *msg; |
594 | 709 | ||
595 | op->state->phase = PHASE_FINISHED; | ||
596 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
597 | "Intersection succeeded, sending DONE\n"); | 711 | "Sending our element count (%u)\n", |
598 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | 712 | op->state->my_element_count); |
599 | op->state->local_bf = NULL; | 713 | ev = GNUNET_MQ_msg (msg, |
714 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
715 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
716 | GNUNET_MQ_send (op->mq, ev); | ||
717 | } | ||
600 | 718 | ||
601 | ev = GNUNET_MQ_msg (idm, | 719 | |
602 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); | 720 | /** |
603 | idm->final_element_count = htonl (op->state->my_element_count); | 721 | * We go first, initialize our map with all elements and |
604 | idm->element_xor_hash = op->state->my_xor; | 722 | * send the first Bloom filter. |
605 | GNUNET_MQ_send (op->mq, | 723 | * |
606 | ev); | 724 | * @param op operation to start exchange for |
725 | */ | ||
726 | static void | ||
727 | begin_bf_exchange (struct Operation *op) | ||
728 | { | ||
729 | op->state->phase = PHASE_BF_EXCHANGE; | ||
730 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
731 | &initialize_map_unfiltered, | ||
732 | op); | ||
733 | send_bloomfilter (op); | ||
734 | } | ||
735 | |||
736 | |||
737 | /** | ||
738 | * Handle the initial `struct IntersectionElementInfoMessage` from a | ||
739 | * remote peer. | ||
740 | * | ||
741 | * @param cls the intersection operation | ||
742 | * @param mh the header of the message | ||
743 | */ | ||
744 | void | ||
745 | handle_intersection_p2p_element_info (void *cls, | ||
746 | const struct IntersectionElementInfoMessage *msg) | ||
747 | { | ||
748 | struct Operation *op = cls; | ||
749 | |||
750 | if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) | ||
751 | { | ||
752 | GNUNET_break_op (0); | ||
753 | fail_intersection_operation(op); | ||
754 | return; | ||
755 | } | ||
756 | op->remote_element_count = ntohl (msg->sender_element_count); | ||
757 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
758 | "Received remote element count (%u), I have %u\n", | ||
759 | op->remote_element_count, | ||
760 | op->state->my_element_count); | ||
761 | if ( ( (PHASE_INITIAL != op->state->phase) && | ||
762 | (PHASE_COUNT_SENT != op->state->phase) ) || | ||
763 | (op->state->my_element_count > op->remote_element_count) || | ||
764 | (0 == op->state->my_element_count) || | ||
765 | (0 == op->remote_element_count) ) | ||
766 | { | ||
767 | GNUNET_break_op (0); | ||
768 | fail_intersection_operation(op); | ||
769 | return; | ||
770 | } | ||
771 | GNUNET_break (NULL == op->state->remote_bf); | ||
772 | begin_bf_exchange (op); | ||
773 | GNUNET_CADET_receive_done (op->channel); | ||
607 | } | 774 | } |
608 | 775 | ||
609 | 776 | ||
@@ -618,9 +785,9 @@ process_bf (struct Operation *op) | |||
618 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 785 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
619 | "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", | 786 | "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", |
620 | op->state->phase, | 787 | op->state->phase, |
621 | op->spec->remote_element_count, | 788 | op->remote_element_count, |
622 | op->state->my_element_count, | 789 | op->state->my_element_count, |
623 | GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); | 790 | GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); |
624 | switch (op->state->phase) | 791 | switch (op->state->phase) |
625 | { | 792 | { |
626 | case PHASE_INITIAL: | 793 | case PHASE_INITIAL: |
@@ -631,7 +798,7 @@ process_bf (struct Operation *op) | |||
631 | /* This is the first BF being sent, build our initial map with | 798 | /* This is the first BF being sent, build our initial map with |
632 | filtering in place */ | 799 | filtering in place */ |
633 | op->state->my_element_count = 0; | 800 | op->state->my_element_count = 0; |
634 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | 801 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, |
635 | &filtered_map_initialization, | 802 | &filtered_map_initialization, |
636 | op); | 803 | op); |
637 | break; | 804 | break; |
@@ -641,6 +808,14 @@ process_bf (struct Operation *op) | |||
641 | &iterator_bf_reduce, | 808 | &iterator_bf_reduce, |
642 | op); | 809 | op); |
643 | break; | 810 | break; |
811 | case PHASE_MUST_SEND_DONE: | ||
812 | GNUNET_break_op (0); | ||
813 | fail_intersection_operation(op); | ||
814 | return; | ||
815 | case PHASE_DONE_RECEIVED: | ||
816 | GNUNET_break_op (0); | ||
817 | fail_intersection_operation(op); | ||
818 | return; | ||
644 | case PHASE_FINISHED: | 819 | case PHASE_FINISHED: |
645 | GNUNET_break_op (0); | 820 | GNUNET_break_op (0); |
646 | fail_intersection_operation(op); | 821 | fail_intersection_operation(op); |
@@ -650,13 +825,28 @@ process_bf (struct Operation *op) | |||
650 | op->state->remote_bf = NULL; | 825 | op->state->remote_bf = NULL; |
651 | 826 | ||
652 | if ( (0 == op->state->my_element_count) || /* fully disjoint */ | 827 | if ( (0 == op->state->my_element_count) || /* fully disjoint */ |
653 | ( (op->state->my_element_count == op->spec->remote_element_count) && | 828 | ( (op->state->my_element_count == op->remote_element_count) && |
654 | (0 == memcmp (&op->state->my_xor, | 829 | (0 == memcmp (&op->state->my_xor, |
655 | &op->state->other_xor, | 830 | &op->state->other_xor, |
656 | sizeof (struct GNUNET_HashCode))) ) ) | 831 | sizeof (struct GNUNET_HashCode))) ) ) |
657 | { | 832 | { |
658 | /* we are done */ | 833 | /* we are done */ |
659 | send_peer_done (op); | 834 | op->state->phase = PHASE_MUST_SEND_DONE; |
835 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
836 | "Intersection succeeded, sending DONE to other peer\n"); | ||
837 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | ||
838 | op->state->local_bf = NULL; | ||
839 | if (GNUNET_SET_RESULT_FULL == op->result_mode) | ||
840 | { | ||
841 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
842 | "Sending full result set (%u elements)\n", | ||
843 | GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); | ||
844 | op->state->full_result_iter | ||
845 | = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); | ||
846 | send_remaining_elements (op); | ||
847 | return; | ||
848 | } | ||
849 | send_p2p_done (op); | ||
660 | return; | 850 | return; |
661 | } | 851 | } |
662 | op->state->phase = PHASE_BF_EXCHANGE; | 852 | op->state->phase = PHASE_BF_EXCHANGE; |
@@ -677,7 +867,7 @@ check_intersection_p2p_bf (void *cls, | |||
677 | { | 867 | { |
678 | struct Operation *op = cls; | 868 | struct Operation *op = cls; |
679 | 869 | ||
680 | if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) | 870 | if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) |
681 | { | 871 | { |
682 | GNUNET_break_op (0); | 872 | GNUNET_break_op (0); |
683 | return GNUNET_SYSERR; | 873 | return GNUNET_SYSERR; |
@@ -727,7 +917,7 @@ handle_intersection_p2p_bf (void *cls, | |||
727 | bf_size, | 917 | bf_size, |
728 | bf_bits_per_element); | 918 | bf_bits_per_element); |
729 | op->state->salt = ntohl (msg->sender_mutator); | 919 | op->state->salt = ntohl (msg->sender_mutator); |
730 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | 920 | op->remote_element_count = ntohl (msg->sender_element_count); |
731 | process_bf (op); | 921 | process_bf (op); |
732 | break; | 922 | break; |
733 | } | 923 | } |
@@ -740,7 +930,7 @@ handle_intersection_p2p_bf (void *cls, | |||
740 | op->state->bf_bits_per_element = bf_bits_per_element; | 930 | op->state->bf_bits_per_element = bf_bits_per_element; |
741 | op->state->bf_data_offset = 0; | 931 | op->state->bf_data_offset = 0; |
742 | op->state->salt = ntohl (msg->sender_mutator); | 932 | op->state->salt = ntohl (msg->sender_mutator); |
743 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | 933 | op->remote_element_count = ntohl (msg->sender_element_count); |
744 | } | 934 | } |
745 | else | 935 | else |
746 | { | 936 | { |
@@ -749,7 +939,7 @@ handle_intersection_p2p_bf (void *cls, | |||
749 | (op->state->bf_bits_per_element != bf_bits_per_element) || | 939 | (op->state->bf_bits_per_element != bf_bits_per_element) || |
750 | (op->state->bf_data_offset + chunk_size > bf_size) || | 940 | (op->state->bf_data_offset + chunk_size > bf_size) || |
751 | (op->state->salt != ntohl (msg->sender_mutator)) || | 941 | (op->state->salt != ntohl (msg->sender_mutator)) || |
752 | (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) | 942 | (op->remote_element_count != ntohl (msg->sender_element_count)) ) |
753 | { | 943 | { |
754 | GNUNET_break_op (0); | 944 | GNUNET_break_op (0); |
755 | fail_intersection_operation (op); | 945 | fail_intersection_operation (op); |
@@ -783,147 +973,6 @@ handle_intersection_p2p_bf (void *cls, | |||
783 | 973 | ||
784 | 974 | ||
785 | /** | 975 | /** |
786 | * Fills the "my_elements" hashmap with the initial set of | ||
787 | * (non-deleted) elements from the set of the specification. | ||
788 | * | ||
789 | * @param cls closure with the `struct Operation *` | ||
790 | * @param key current key code for the element | ||
791 | * @param value value in the hash map with the `struct ElementEntry *` | ||
792 | * @return #GNUNET_YES (we should continue to iterate) | ||
793 | */ | ||
794 | static int | ||
795 | initialize_map_unfiltered (void *cls, | ||
796 | const struct GNUNET_HashCode *key, | ||
797 | void *value) | ||
798 | { | ||
799 | struct ElementEntry *ee = value; | ||
800 | struct Operation *op = cls; | ||
801 | |||
802 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | ||
803 | return GNUNET_YES; /* element not live in operation's generation */ | ||
804 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
805 | &ee->element_hash, | ||
806 | &op->state->my_xor); | ||
807 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
808 | "Initial full initialization of my_elements, adding %s:%u\n", | ||
809 | GNUNET_h2s (&ee->element_hash), | ||
810 | ee->element.size); | ||
811 | GNUNET_break (GNUNET_YES == | ||
812 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | ||
813 | &ee->element_hash, | ||
814 | ee, | ||
815 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
816 | return GNUNET_YES; | ||
817 | } | ||
818 | |||
819 | |||
820 | /** | ||
821 | * Send our element count to the peer, in case our element count is | ||
822 | * lower than his. | ||
823 | * | ||
824 | * @param op intersection operation | ||
825 | */ | ||
826 | static void | ||
827 | send_element_count (struct Operation *op) | ||
828 | { | ||
829 | struct GNUNET_MQ_Envelope *ev; | ||
830 | struct IntersectionElementInfoMessage *msg; | ||
831 | |||
832 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
833 | "Sending our element count (%u)\n", | ||
834 | op->state->my_element_count); | ||
835 | ev = GNUNET_MQ_msg (msg, | ||
836 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
837 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
838 | GNUNET_MQ_send (op->mq, ev); | ||
839 | } | ||
840 | |||
841 | |||
842 | /** | ||
843 | * We go first, initialize our map with all elements and | ||
844 | * send the first Bloom filter. | ||
845 | * | ||
846 | * @param op operation to start exchange for | ||
847 | */ | ||
848 | static void | ||
849 | begin_bf_exchange (struct Operation *op) | ||
850 | { | ||
851 | op->state->phase = PHASE_BF_EXCHANGE; | ||
852 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
853 | &initialize_map_unfiltered, | ||
854 | op); | ||
855 | send_bloomfilter (op); | ||
856 | } | ||
857 | |||
858 | |||
859 | /** | ||
860 | * Handle the initial `struct IntersectionElementInfoMessage` from a | ||
861 | * remote peer. | ||
862 | * | ||
863 | * @param cls the intersection operation | ||
864 | * @param mh the header of the message | ||
865 | */ | ||
866 | void | ||
867 | handle_intersection_p2p_element_info (void *cls, | ||
868 | const struct IntersectionElementInfoMessage *msg) | ||
869 | { | ||
870 | struct Operation *op = cls; | ||
871 | |||
872 | if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) | ||
873 | { | ||
874 | GNUNET_break_op (0); | ||
875 | fail_intersection_operation(op); | ||
876 | return; | ||
877 | } | ||
878 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | ||
879 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
880 | "Received remote element count (%u), I have %u\n", | ||
881 | op->spec->remote_element_count, | ||
882 | op->state->my_element_count); | ||
883 | if ( ( (PHASE_INITIAL != op->state->phase) && | ||
884 | (PHASE_COUNT_SENT != op->state->phase) ) || | ||
885 | (op->state->my_element_count > op->spec->remote_element_count) || | ||
886 | (0 == op->state->my_element_count) || | ||
887 | (0 == op->spec->remote_element_count) ) | ||
888 | { | ||
889 | GNUNET_break_op (0); | ||
890 | fail_intersection_operation(op); | ||
891 | return; | ||
892 | } | ||
893 | GNUNET_break (NULL == op->state->remote_bf); | ||
894 | begin_bf_exchange (op); | ||
895 | GNUNET_CADET_receive_done (op->channel); | ||
896 | } | ||
897 | |||
898 | |||
899 | /** | ||
900 | * Send a result message to the client indicating that the operation | ||
901 | * is over. After the result done message has been sent to the | ||
902 | * client, destroy the evaluate operation. | ||
903 | * | ||
904 | * @param op intersection operation | ||
905 | */ | ||
906 | static void | ||
907 | finish_and_destroy (struct Operation *op) | ||
908 | { | ||
909 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
910 | |||
911 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) | ||
912 | { | ||
913 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
914 | "Sending full result set (%u elements)\n", | ||
915 | GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); | ||
916 | op->state->full_result_iter | ||
917 | = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); | ||
918 | op->keep++; | ||
919 | send_remaining_elements (op); | ||
920 | return; | ||
921 | } | ||
922 | send_client_done_and_destroy (op); | ||
923 | } | ||
924 | |||
925 | |||
926 | /** | ||
927 | * Remove all elements from our hashmap. | 976 | * Remove all elements from our hashmap. |
928 | * | 977 | * |
929 | * @param cls closure with the `struct Operation *` | 978 | * @param cls closure with the `struct Operation *` |
@@ -970,10 +1019,10 @@ handle_intersection_p2p_done (void *cls, | |||
970 | { | 1019 | { |
971 | struct Operation *op = cls; | 1020 | struct Operation *op = cls; |
972 | 1021 | ||
973 | if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) | 1022 | if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) |
974 | { | 1023 | { |
975 | GNUNET_break_op (0); | 1024 | GNUNET_break_op (0); |
976 | fail_intersection_operation(op); | 1025 | fail_intersection_operation (op); |
977 | return; | 1026 | return; |
978 | } | 1027 | } |
979 | if (PHASE_BF_EXCHANGE != op->state->phase) | 1028 | if (PHASE_BF_EXCHANGE != op->state->phase) |
@@ -1005,9 +1054,22 @@ handle_intersection_p2p_done (void *cls, | |||
1005 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1054 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1006 | "Got IntersectionDoneMessage, have %u elements in intersection\n", | 1055 | "Got IntersectionDoneMessage, have %u elements in intersection\n", |
1007 | op->state->my_element_count); | 1056 | op->state->my_element_count); |
1008 | op->state->phase = PHASE_FINISHED; | 1057 | op->state->phase = PHASE_DONE_RECEIVED; |
1009 | finish_and_destroy (op); | ||
1010 | GNUNET_CADET_receive_done (op->channel); | 1058 | GNUNET_CADET_receive_done (op->channel); |
1059 | |||
1060 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
1061 | if (GNUNET_SET_RESULT_FULL == op->result_mode) | ||
1062 | { | ||
1063 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1064 | "Sending full result set to client (%u elements)\n", | ||
1065 | GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); | ||
1066 | op->state->full_result_iter | ||
1067 | = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); | ||
1068 | send_remaining_elements (op); | ||
1069 | return; | ||
1070 | } | ||
1071 | op->state->phase = PHASE_FINISHED; | ||
1072 | send_client_done_and_destroy (op); | ||
1011 | } | 1073 | } |
1012 | 1074 | ||
1013 | 1075 | ||
@@ -1018,24 +1080,16 @@ handle_intersection_p2p_done (void *cls, | |||
1018 | * begin the evaluation | 1080 | * begin the evaluation |
1019 | * @param opaque_context message to be transmitted to the listener | 1081 | * @param opaque_context message to be transmitted to the listener |
1020 | * to convince him to accept, may be NULL | 1082 | * to convince him to accept, may be NULL |
1083 | * @return operation-specific state to keep in @a op | ||
1021 | */ | 1084 | */ |
1022 | static void | 1085 | static struct OperationState * |
1023 | intersection_evaluate (struct Operation *op, | 1086 | intersection_evaluate (struct Operation *op, |
1024 | const struct GNUNET_MessageHeader *opaque_context) | 1087 | const struct GNUNET_MessageHeader *opaque_context) |
1025 | { | 1088 | { |
1089 | struct OperationState *state; | ||
1026 | struct GNUNET_MQ_Envelope *ev; | 1090 | struct GNUNET_MQ_Envelope *ev; |
1027 | struct OperationRequestMessage *msg; | 1091 | struct OperationRequestMessage *msg; |
1028 | 1092 | ||
1029 | op->state = GNUNET_new (struct OperationState); | ||
1030 | /* we started the operation, thus we have to send the operation request */ | ||
1031 | op->state->phase = PHASE_INITIAL; | ||
1032 | op->state->my_element_count = op->spec->set->state->current_set_element_count; | ||
1033 | op->state->my_elements | ||
1034 | = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, | ||
1035 | GNUNET_YES); | ||
1036 | |||
1037 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1038 | "Initiating intersection operation evaluation\n"); | ||
1039 | ev = GNUNET_MQ_msg_nested_mh (msg, | 1093 | ev = GNUNET_MQ_msg_nested_mh (msg, |
1040 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 1094 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
1041 | opaque_context); | 1095 | opaque_context); |
@@ -1043,20 +1097,30 @@ intersection_evaluate (struct Operation *op, | |||
1043 | { | 1097 | { |
1044 | /* the context message is too large!? */ | 1098 | /* the context message is too large!? */ |
1045 | GNUNET_break (0); | 1099 | GNUNET_break (0); |
1046 | GNUNET_SERVICE_client_drop (op->spec->set->client); | 1100 | return NULL; |
1047 | return; | ||
1048 | } | 1101 | } |
1102 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1103 | "Initiating intersection operation evaluation\n"); | ||
1104 | state = GNUNET_new (struct OperationState); | ||
1105 | /* we started the operation, thus we have to send the operation request */ | ||
1106 | state->phase = PHASE_INITIAL; | ||
1107 | state->my_element_count = op->set->state->current_set_element_count; | ||
1108 | state->my_elements | ||
1109 | = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, | ||
1110 | GNUNET_YES); | ||
1111 | |||
1049 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); | 1112 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); |
1050 | msg->element_count = htonl (op->state->my_element_count); | 1113 | msg->element_count = htonl (state->my_element_count); |
1051 | GNUNET_MQ_send (op->mq, | 1114 | GNUNET_MQ_send (op->mq, |
1052 | ev); | 1115 | ev); |
1053 | op->state->phase = PHASE_COUNT_SENT; | 1116 | state->phase = PHASE_COUNT_SENT; |
1054 | if (NULL != opaque_context) | 1117 | if (NULL != opaque_context) |
1055 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1118 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1056 | "Sent op request with context message\n"); | 1119 | "Sent op request with context message\n"); |
1057 | else | 1120 | else |
1058 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1121 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1059 | "Sent op request without context message\n"); | 1122 | "Sent op request without context message\n"); |
1123 | return state; | ||
1060 | } | 1124 | } |
1061 | 1125 | ||
1062 | 1126 | ||
@@ -1066,53 +1130,33 @@ intersection_evaluate (struct Operation *op, | |||
1066 | * | 1130 | * |
1067 | * @param op operation that will be accepted as an intersection operation | 1131 | * @param op operation that will be accepted as an intersection operation |
1068 | */ | 1132 | */ |
1069 | static void | 1133 | static struct OperationState * |
1070 | intersection_accept (struct Operation *op) | 1134 | intersection_accept (struct Operation *op) |
1071 | { | 1135 | { |
1136 | struct OperationState *state; | ||
1137 | |||
1072 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1138 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1073 | "Accepting set intersection operation\n"); | 1139 | "Accepting set intersection operation\n"); |
1074 | op->state = GNUNET_new (struct OperationState); | 1140 | state = GNUNET_new (struct OperationState); |
1075 | op->state->phase = PHASE_INITIAL; | 1141 | state->phase = PHASE_INITIAL; |
1076 | op->state->my_element_count | 1142 | state->my_element_count |
1077 | = op->spec->set->state->current_set_element_count; | 1143 | = op->set->state->current_set_element_count; |
1078 | GNUNET_assert (NULL == op->state->my_elements); | 1144 | state->my_elements |
1079 | op->state->my_elements | 1145 | = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count, |
1080 | = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, | 1146 | op->remote_element_count), |
1081 | op->spec->remote_element_count), | ||
1082 | GNUNET_YES); | 1147 | GNUNET_YES); |
1083 | if (op->spec->remote_element_count < op->state->my_element_count) | 1148 | op->state = state; |
1149 | if (op->remote_element_count < state->my_element_count) | ||
1084 | { | 1150 | { |
1085 | /* If the other peer (Alice) has fewer elements than us (Bob), | 1151 | /* If the other peer (Alice) has fewer elements than us (Bob), |
1086 | we just send the count as Alice should send the first BF */ | 1152 | we just send the count as Alice should send the first BF */ |
1087 | send_element_count (op); | 1153 | send_element_count (op); |
1088 | op->state->phase = PHASE_COUNT_SENT; | 1154 | state->phase = PHASE_COUNT_SENT; |
1089 | return; | 1155 | return state; |
1090 | } | 1156 | } |
1091 | /* We have fewer elements, so we start with the BF */ | 1157 | /* We have fewer elements, so we start with the BF */ |
1092 | begin_bf_exchange (op); | 1158 | begin_bf_exchange (op); |
1093 | } | 1159 | return state; |
1094 | |||
1095 | |||
1096 | /** | ||
1097 | * Handler for peer-disconnects, notifies the client about the aborted | ||
1098 | * operation. If we did not expect anything from the other peer, we | ||
1099 | * gracefully terminate the operation. | ||
1100 | * | ||
1101 | * @param op the destroyed operation | ||
1102 | */ | ||
1103 | static void | ||
1104 | intersection_peer_disconnect (struct Operation *op) | ||
1105 | { | ||
1106 | if (PHASE_FINISHED != op->state->phase) | ||
1107 | { | ||
1108 | fail_intersection_operation (op); | ||
1109 | return; | ||
1110 | } | ||
1111 | /* the session has already been concluded */ | ||
1112 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1113 | "Other peer disconnected (finished)\n"); | ||
1114 | if (GNUNET_NO == op->state->client_done_sent) | ||
1115 | finish_and_destroy (op); | ||
1116 | } | 1160 | } |
1117 | 1161 | ||
1118 | 1162 | ||
@@ -1215,6 +1259,28 @@ intersection_remove (struct SetState *set_state, | |||
1215 | 1259 | ||
1216 | 1260 | ||
1217 | /** | 1261 | /** |
1262 | * Callback for channel death for the intersection operation. | ||
1263 | * | ||
1264 | * @param op operation that lost the channel | ||
1265 | */ | ||
1266 | static void | ||
1267 | intersection_channel_death (struct Operation *op) | ||
1268 | { | ||
1269 | if (GNUNET_YES == op->state->channel_death_expected) | ||
1270 | { | ||
1271 | /* oh goodie, we are done! */ | ||
1272 | send_client_done_and_destroy (op); | ||
1273 | } | ||
1274 | else | ||
1275 | { | ||
1276 | /* sorry, channel went down early, too bad. */ | ||
1277 | _GSS_operation_destroy (op, | ||
1278 | GNUNET_YES); | ||
1279 | } | ||
1280 | } | ||
1281 | |||
1282 | |||
1283 | /** | ||
1218 | * Get the table with implementing functions for set intersection. | 1284 | * Get the table with implementing functions for set intersection. |
1219 | * | 1285 | * |
1220 | * @return the operation specific VTable | 1286 | * @return the operation specific VTable |
@@ -1229,8 +1295,8 @@ _GSS_intersection_vt () | |||
1229 | .destroy_set = &intersection_set_destroy, | 1295 | .destroy_set = &intersection_set_destroy, |
1230 | .evaluate = &intersection_evaluate, | 1296 | .evaluate = &intersection_evaluate, |
1231 | .accept = &intersection_accept, | 1297 | .accept = &intersection_accept, |
1232 | .peer_disconnect = &intersection_peer_disconnect, | ||
1233 | .cancel = &intersection_op_cancel, | 1298 | .cancel = &intersection_op_cancel, |
1299 | .channel_death = &intersection_channel_death, | ||
1234 | }; | 1300 | }; |
1235 | 1301 | ||
1236 | return &intersection_vt; | 1302 | return &intersection_vt; |