diff options
Diffstat (limited to 'src/set/gnunet-service-set_intersection.c')
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 652 |
1 files changed, 348 insertions, 304 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 258ad6443..9dc421792 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet | 2 | This file is part of GNUnet |
3 | Copyright (C) 2013, 2014 GNUnet e.V. | 3 | Copyright (C) 2013-2017 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -28,6 +28,7 @@ | |||
28 | #include "gnunet-service-set.h" | 28 | #include "gnunet-service-set.h" |
29 | #include "gnunet_block_lib.h" | 29 | #include "gnunet_block_lib.h" |
30 | #include "gnunet-service-set_protocol.h" | 30 | #include "gnunet-service-set_protocol.h" |
31 | #include "gnunet-service-set_intersection.h" | ||
31 | #include <gcrypt.h> | 32 | #include <gcrypt.h> |
32 | 33 | ||
33 | 34 | ||
@@ -55,6 +56,18 @@ enum IntersectionOperationPhase | |||
55 | PHASE_BF_EXCHANGE, | 56 | PHASE_BF_EXCHANGE, |
56 | 57 | ||
57 | /** | 58 | /** |
59 | * We must next send the P2P DONE message (after finishing mostly | ||
60 | * with the local client). Then we will wait for the channel to close. | ||
61 | */ | ||
62 | PHASE_MUST_SEND_DONE, | ||
63 | |||
64 | /** | ||
65 | * We have received the P2P DONE message, and must finish with the | ||
66 | * local client before terminating the channel. | ||
67 | */ | ||
68 | PHASE_DONE_RECEIVED, | ||
69 | |||
70 | /** | ||
58 | * The protocol is over. Results may still have to be sent to the | 71 | * The protocol is over. Results may still have to be sent to the |
59 | * client. | 72 | * client. |
60 | */ | 73 | */ |
@@ -161,6 +174,13 @@ struct OperationState | |||
161 | * Did we send the client that we are done? | 174 | * Did we send the client that we are done? |
162 | */ | 175 | */ |
163 | int client_done_sent; | 176 | int client_done_sent; |
177 | |||
178 | /** | ||
179 | * Set whenever we reach the state where the death of the | ||
180 | * channel is perfectly find and should NOT result in the | ||
181 | * operation being cancelled. | ||
182 | */ | ||
183 | int channel_death_expected; | ||
164 | }; | 184 | }; |
165 | 185 | ||
166 | 186 | ||
@@ -192,12 +212,12 @@ send_client_removed_element (struct Operation *op, | |||
192 | struct GNUNET_MQ_Envelope *ev; | 212 | struct GNUNET_MQ_Envelope *ev; |
193 | struct GNUNET_SET_ResultMessage *rm; | 213 | struct GNUNET_SET_ResultMessage *rm; |
194 | 214 | ||
195 | if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) | 215 | if (GNUNET_SET_RESULT_REMOVED != op->result_mode) |
196 | return; /* Wrong mode for transmitting removed elements */ | 216 | return; /* Wrong mode for transmitting removed elements */ |
197 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 217 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
198 | "Sending removed element (size %u) to client\n", | 218 | "Sending removed element (size %u) to client\n", |
199 | element->size); | 219 | element->size); |
200 | GNUNET_assert (0 != op->spec->client_request_id); | 220 | GNUNET_assert (0 != op->client_request_id); |
201 | ev = GNUNET_MQ_msg_extra (rm, | 221 | ev = GNUNET_MQ_msg_extra (rm, |
202 | element->size, | 222 | element->size, |
203 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 223 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
@@ -207,12 +227,12 @@ send_client_removed_element (struct Operation *op, | |||
207 | return; | 227 | return; |
208 | } | 228 | } |
209 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 229 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
210 | rm->request_id = htonl (op->spec->client_request_id); | 230 | rm->request_id = htonl (op->client_request_id); |
211 | rm->element_type = element->element_type; | 231 | rm->element_type = element->element_type; |
212 | GNUNET_memcpy (&rm[1], | 232 | GNUNET_memcpy (&rm[1], |
213 | element->data, | 233 | element->data, |
214 | element->size); | 234 | element->size); |
215 | GNUNET_MQ_send (op->spec->set->client_mq, | 235 | GNUNET_MQ_send (op->set->cs->mq, |
216 | ev); | 236 | ev); |
217 | } | 237 | } |
218 | 238 | ||
@@ -396,9 +416,9 @@ fail_intersection_operation (struct Operation *op) | |||
396 | ev = GNUNET_MQ_msg (msg, | 416 | ev = GNUNET_MQ_msg (msg, |
397 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 417 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
398 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 418 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
399 | msg->request_id = htonl (op->spec->client_request_id); | 419 | msg->request_id = htonl (op->client_request_id); |
400 | msg->element_type = htons (0); | 420 | msg->element_type = htons (0); |
401 | GNUNET_MQ_send (op->spec->set->client_mq, | 421 | GNUNET_MQ_send (op->set->cs->mq, |
402 | ev); | 422 | ev); |
403 | _GSS_operation_destroy (op, | 423 | _GSS_operation_destroy (op, |
404 | GNUNET_YES); | 424 | GNUNET_YES); |
@@ -427,8 +447,8 @@ send_bloomfilter (struct Operation *op) | |||
427 | should use more bits to maximize its set reduction | 447 | should use more bits to maximize its set reduction |
428 | potential and minimize overall bandwidth consumption. */ | 448 | potential and minimize overall bandwidth consumption. */ |
429 | bf_elementbits = 2 + ceil (log2((double) | 449 | bf_elementbits = 2 + ceil (log2((double) |
430 | (op->spec->remote_element_count / | 450 | (op->remote_element_count / |
431 | (double) op->state->my_element_count))); | 451 | (double) op->state->my_element_count))); |
432 | if (bf_elementbits < 1) | 452 | if (bf_elementbits < 1) |
433 | bf_elementbits = 1; /* make sure k is not 0 */ | 453 | bf_elementbits = 1; /* make sure k is not 0 */ |
434 | /* optimize BF-size to ~50% of bits set */ | 454 | /* optimize BF-size to ~50% of bits set */ |
@@ -514,12 +534,14 @@ send_client_done_and_destroy (void *cls) | |||
514 | struct GNUNET_MQ_Envelope *ev; | 534 | struct GNUNET_MQ_Envelope *ev; |
515 | struct GNUNET_SET_ResultMessage *rm; | 535 | struct GNUNET_SET_ResultMessage *rm; |
516 | 536 | ||
537 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
538 | "Intersection succeeded, sending DONE to local client\n"); | ||
517 | ev = GNUNET_MQ_msg (rm, | 539 | ev = GNUNET_MQ_msg (rm, |
518 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 540 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
519 | rm->request_id = htonl (op->spec->client_request_id); | 541 | rm->request_id = htonl (op->client_request_id); |
520 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 542 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
521 | rm->element_type = htons (0); | 543 | rm->element_type = htons (0); |
522 | GNUNET_MQ_send (op->spec->set->client_mq, | 544 | GNUNET_MQ_send (op->set->cs->mq, |
523 | ev); | 545 | ev); |
524 | _GSS_operation_destroy (op, | 546 | _GSS_operation_destroy (op, |
525 | GNUNET_YES); | 547 | GNUNET_YES); |
@@ -527,6 +549,53 @@ send_client_done_and_destroy (void *cls) | |||
527 | 549 | ||
528 | 550 | ||
529 | /** | 551 | /** |
552 | * Remember that we are done dealing with the local client | ||
553 | * AND have sent the other peer our message that we are done, | ||
554 | * so we are not just waiting for the channel to die before | ||
555 | * telling the local client that we are done as our last act. | ||
556 | * | ||
557 | * @param cls the `struct Operation`. | ||
558 | */ | ||
559 | static void | ||
560 | finished_local_operations (void *cls) | ||
561 | { | ||
562 | struct Operation *op = cls; | ||
563 | |||
564 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
565 | "DONE sent to other peer, now waiting for other end to close the channel\n"); | ||
566 | op->state->phase = PHASE_FINISHED; | ||
567 | op->state->channel_death_expected = GNUNET_YES; | ||
568 | } | ||
569 | |||
570 | |||
571 | /** | ||
572 | * Notify the other peer that we are done. Once this message | ||
573 | * is out, we still need to notify the local client that we | ||
574 | * are done. | ||
575 | * | ||
576 | * @param op operation to notify for. | ||
577 | */ | ||
578 | static void | ||
579 | send_p2p_done (struct Operation *op) | ||
580 | { | ||
581 | struct GNUNET_MQ_Envelope *ev; | ||
582 | struct IntersectionDoneMessage *idm; | ||
583 | |||
584 | GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); | ||
585 | GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); | ||
586 | ev = GNUNET_MQ_msg (idm, | ||
587 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); | ||
588 | idm->final_element_count = htonl (op->state->my_element_count); | ||
589 | idm->element_xor_hash = op->state->my_xor; | ||
590 | GNUNET_MQ_notify_sent (ev, | ||
591 | &finished_local_operations, | ||
592 | op); | ||
593 | GNUNET_MQ_send (op->mq, | ||
594 | ev); | ||
595 | } | ||
596 | |||
597 | |||
598 | /** | ||
530 | * Send all elements in the full result iterator. | 599 | * Send all elements in the full result iterator. |
531 | * | 600 | * |
532 | * @param cls the `struct Operation *` | 601 | * @param cls the `struct Operation *` |
@@ -549,8 +618,21 @@ send_remaining_elements (void *cls) | |||
549 | { | 618 | { |
550 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 619 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
551 | "Sending done and destroy because iterator ran out\n"); | 620 | "Sending done and destroy because iterator ran out\n"); |
552 | op->keep--; | 621 | GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); |
553 | send_client_done_and_destroy (op); | 622 | op->state->full_result_iter = NULL; |
623 | if (PHASE_DONE_RECEIVED == op->state->phase) | ||
624 | { | ||
625 | op->state->phase = PHASE_FINISHED; | ||
626 | send_client_done_and_destroy (op); | ||
627 | } | ||
628 | else if (PHASE_MUST_SEND_DONE == op->state->phase) | ||
629 | { | ||
630 | send_p2p_done (op); | ||
631 | } | ||
632 | else | ||
633 | { | ||
634 | GNUNET_assert (0); | ||
635 | } | ||
554 | return; | 636 | return; |
555 | } | 637 | } |
556 | ee = nxt; | 638 | ee = nxt; |
@@ -559,48 +641,136 @@ send_remaining_elements (void *cls) | |||
559 | "Sending element %s:%u to client (full set)\n", | 641 | "Sending element %s:%u to client (full set)\n", |
560 | GNUNET_h2s (&ee->element_hash), | 642 | GNUNET_h2s (&ee->element_hash), |
561 | element->size); | 643 | element->size); |
562 | GNUNET_assert (0 != op->spec->client_request_id); | 644 | GNUNET_assert (0 != op->client_request_id); |
563 | ev = GNUNET_MQ_msg_extra (rm, | 645 | ev = GNUNET_MQ_msg_extra (rm, |
564 | element->size, | 646 | element->size, |
565 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 647 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
566 | GNUNET_assert (NULL != ev); | 648 | GNUNET_assert (NULL != ev); |
567 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 649 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
568 | rm->request_id = htonl (op->spec->client_request_id); | 650 | rm->request_id = htonl (op->client_request_id); |
569 | rm->element_type = element->element_type; | 651 | rm->element_type = element->element_type; |
570 | GNUNET_memcpy (&rm[1], | 652 | GNUNET_memcpy (&rm[1], |
571 | element->data, | 653 | element->data, |
572 | element->size); | 654 | element->size); |
573 | GNUNET_MQ_notify_sent (ev, | 655 | GNUNET_MQ_notify_sent (ev, |
574 | &send_remaining_elements, | 656 | &send_remaining_elements, |
575 | op); | 657 | op); |
576 | GNUNET_MQ_send (op->spec->set->client_mq, | 658 | GNUNET_MQ_send (op->set->cs->mq, |
577 | ev); | 659 | ev); |
578 | } | 660 | } |
579 | 661 | ||
580 | 662 | ||
581 | /** | 663 | /** |
582 | * Inform the peer that this operation is complete. | 664 | * Fills the "my_elements" hashmap with the initial set of |
665 | * (non-deleted) elements from the set of the specification. | ||
583 | * | 666 | * |
584 | * @param op the intersection operation to fail | 667 | * @param cls closure with the `struct Operation *` |
668 | * @param key current key code for the element | ||
669 | * @param value value in the hash map with the `struct ElementEntry *` | ||
670 | * @return #GNUNET_YES (we should continue to iterate) | ||
671 | */ | ||
672 | static int | ||
673 | initialize_map_unfiltered (void *cls, | ||
674 | const struct GNUNET_HashCode *key, | ||
675 | void *value) | ||
676 | { | ||
677 | struct ElementEntry *ee = value; | ||
678 | struct Operation *op = cls; | ||
679 | |||
680 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | ||
681 | return GNUNET_YES; /* element not live in operation's generation */ | ||
682 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
683 | &ee->element_hash, | ||
684 | &op->state->my_xor); | ||
685 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
686 | "Initial full initialization of my_elements, adding %s:%u\n", | ||
687 | GNUNET_h2s (&ee->element_hash), | ||
688 | ee->element.size); | ||
689 | GNUNET_break (GNUNET_YES == | ||
690 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | ||
691 | &ee->element_hash, | ||
692 | ee, | ||
693 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
694 | return GNUNET_YES; | ||
695 | } | ||
696 | |||
697 | |||
698 | /** | ||
699 | * Send our element count to the peer, in case our element count is | ||
700 | * lower than his. | ||
701 | * | ||
702 | * @param op intersection operation | ||
585 | */ | 703 | */ |
586 | static void | 704 | static void |
587 | send_peer_done (struct Operation *op) | 705 | send_element_count (struct Operation *op) |
588 | { | 706 | { |
589 | struct GNUNET_MQ_Envelope *ev; | 707 | struct GNUNET_MQ_Envelope *ev; |
590 | struct IntersectionDoneMessage *idm; | 708 | struct IntersectionElementInfoMessage *msg; |
591 | 709 | ||
592 | op->state->phase = PHASE_FINISHED; | ||
593 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
594 | "Intersection succeeded, sending DONE\n"); | 711 | "Sending our element count (%u)\n", |
595 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | 712 | op->state->my_element_count); |
596 | op->state->local_bf = NULL; | 713 | ev = GNUNET_MQ_msg (msg, |
714 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
715 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
716 | GNUNET_MQ_send (op->mq, ev); | ||
717 | } | ||
597 | 718 | ||
598 | ev = GNUNET_MQ_msg (idm, | 719 | |
599 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); | 720 | /** |
600 | idm->final_element_count = htonl (op->state->my_element_count); | 721 | * We go first, initialize our map with all elements and |
601 | idm->element_xor_hash = op->state->my_xor; | 722 | * send the first Bloom filter. |
602 | GNUNET_MQ_send (op->mq, | 723 | * |
603 | ev); | 724 | * @param op operation to start exchange for |
725 | */ | ||
726 | static void | ||
727 | begin_bf_exchange (struct Operation *op) | ||
728 | { | ||
729 | op->state->phase = PHASE_BF_EXCHANGE; | ||
730 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
731 | &initialize_map_unfiltered, | ||
732 | op); | ||
733 | send_bloomfilter (op); | ||
734 | } | ||
735 | |||
736 | |||
737 | /** | ||
738 | * Handle the initial `struct IntersectionElementInfoMessage` from a | ||
739 | * remote peer. | ||
740 | * | ||
741 | * @param cls the intersection operation | ||
742 | * @param mh the header of the message | ||
743 | */ | ||
744 | void | ||
745 | handle_intersection_p2p_element_info (void *cls, | ||
746 | const struct IntersectionElementInfoMessage *msg) | ||
747 | { | ||
748 | struct Operation *op = cls; | ||
749 | |||
750 | if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) | ||
751 | { | ||
752 | GNUNET_break_op (0); | ||
753 | fail_intersection_operation(op); | ||
754 | return; | ||
755 | } | ||
756 | op->remote_element_count = ntohl (msg->sender_element_count); | ||
757 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
758 | "Received remote element count (%u), I have %u\n", | ||
759 | op->remote_element_count, | ||
760 | op->state->my_element_count); | ||
761 | if ( ( (PHASE_INITIAL != op->state->phase) && | ||
762 | (PHASE_COUNT_SENT != op->state->phase) ) || | ||
763 | (op->state->my_element_count > op->remote_element_count) || | ||
764 | (0 == op->state->my_element_count) || | ||
765 | (0 == op->remote_element_count) ) | ||
766 | { | ||
767 | GNUNET_break_op (0); | ||
768 | fail_intersection_operation(op); | ||
769 | return; | ||
770 | } | ||
771 | GNUNET_break (NULL == op->state->remote_bf); | ||
772 | begin_bf_exchange (op); | ||
773 | GNUNET_CADET_receive_done (op->channel); | ||
604 | } | 774 | } |
605 | 775 | ||
606 | 776 | ||
@@ -615,9 +785,9 @@ process_bf (struct Operation *op) | |||
615 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 785 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
616 | "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", | 786 | "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", |
617 | op->state->phase, | 787 | op->state->phase, |
618 | op->spec->remote_element_count, | 788 | op->remote_element_count, |
619 | op->state->my_element_count, | 789 | op->state->my_element_count, |
620 | GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); | 790 | GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); |
621 | switch (op->state->phase) | 791 | switch (op->state->phase) |
622 | { | 792 | { |
623 | case PHASE_INITIAL: | 793 | case PHASE_INITIAL: |
@@ -627,11 +797,8 @@ process_bf (struct Operation *op) | |||
627 | case PHASE_COUNT_SENT: | 797 | case PHASE_COUNT_SENT: |
628 | /* This is the first BF being sent, build our initial map with | 798 | /* This is the first BF being sent, build our initial map with |
629 | filtering in place */ | 799 | filtering in place */ |
630 | op->state->my_elements | ||
631 | = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, | ||
632 | GNUNET_YES); | ||
633 | op->state->my_element_count = 0; | 800 | op->state->my_element_count = 0; |
634 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | 801 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, |
635 | &filtered_map_initialization, | 802 | &filtered_map_initialization, |
636 | op); | 803 | op); |
637 | break; | 804 | break; |
@@ -641,6 +808,14 @@ process_bf (struct Operation *op) | |||
641 | &iterator_bf_reduce, | 808 | &iterator_bf_reduce, |
642 | op); | 809 | op); |
643 | break; | 810 | break; |
811 | case PHASE_MUST_SEND_DONE: | ||
812 | GNUNET_break_op (0); | ||
813 | fail_intersection_operation(op); | ||
814 | return; | ||
815 | case PHASE_DONE_RECEIVED: | ||
816 | GNUNET_break_op (0); | ||
817 | fail_intersection_operation(op); | ||
818 | return; | ||
644 | case PHASE_FINISHED: | 819 | case PHASE_FINISHED: |
645 | GNUNET_break_op (0); | 820 | GNUNET_break_op (0); |
646 | fail_intersection_operation(op); | 821 | fail_intersection_operation(op); |
@@ -650,13 +825,28 @@ process_bf (struct Operation *op) | |||
650 | op->state->remote_bf = NULL; | 825 | op->state->remote_bf = NULL; |
651 | 826 | ||
652 | if ( (0 == op->state->my_element_count) || /* fully disjoint */ | 827 | if ( (0 == op->state->my_element_count) || /* fully disjoint */ |
653 | ( (op->state->my_element_count == op->spec->remote_element_count) && | 828 | ( (op->state->my_element_count == op->remote_element_count) && |
654 | (0 == memcmp (&op->state->my_xor, | 829 | (0 == memcmp (&op->state->my_xor, |
655 | &op->state->other_xor, | 830 | &op->state->other_xor, |
656 | sizeof (struct GNUNET_HashCode))) ) ) | 831 | sizeof (struct GNUNET_HashCode))) ) ) |
657 | { | 832 | { |
658 | /* we are done */ | 833 | /* we are done */ |
659 | send_peer_done (op); | 834 | op->state->phase = PHASE_MUST_SEND_DONE; |
835 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
836 | "Intersection succeeded, sending DONE to other peer\n"); | ||
837 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | ||
838 | op->state->local_bf = NULL; | ||
839 | if (GNUNET_SET_RESULT_FULL == op->result_mode) | ||
840 | { | ||
841 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
842 | "Sending full result set (%u elements)\n", | ||
843 | GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); | ||
844 | op->state->full_result_iter | ||
845 | = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); | ||
846 | send_remaining_elements (op); | ||
847 | return; | ||
848 | } | ||
849 | send_p2p_done (op); | ||
660 | return; | 850 | return; |
661 | } | 851 | } |
662 | op->state->phase = PHASE_BF_EXCHANGE; | 852 | op->state->phase = PHASE_BF_EXCHANGE; |
@@ -665,41 +855,53 @@ process_bf (struct Operation *op) | |||
665 | 855 | ||
666 | 856 | ||
667 | /** | 857 | /** |
858 | * Check an BF message from a remote peer. | ||
859 | * | ||
860 | * @param cls the intersection operation | ||
861 | * @param msg the header of the message | ||
862 | * @return #GNUNET_OK if @a msg is well-formed | ||
863 | */ | ||
864 | int | ||
865 | check_intersection_p2p_bf (void *cls, | ||
866 | const struct BFMessage *msg) | ||
867 | { | ||
868 | struct Operation *op = cls; | ||
869 | |||
870 | if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) | ||
871 | { | ||
872 | GNUNET_break_op (0); | ||
873 | return GNUNET_SYSERR; | ||
874 | } | ||
875 | return GNUNET_OK; | ||
876 | } | ||
877 | |||
878 | |||
879 | /** | ||
668 | * Handle an BF message from a remote peer. | 880 | * Handle an BF message from a remote peer. |
669 | * | 881 | * |
670 | * @param cls the intersection operation | 882 | * @param cls the intersection operation |
671 | * @param mh the header of the message | 883 | * @param msg the header of the message |
672 | */ | 884 | */ |
673 | static void | 885 | void |
674 | handle_p2p_bf (void *cls, | 886 | handle_intersection_p2p_bf (void *cls, |
675 | const struct GNUNET_MessageHeader *mh) | 887 | const struct BFMessage *msg) |
676 | { | 888 | { |
677 | struct Operation *op = cls; | 889 | struct Operation *op = cls; |
678 | const struct BFMessage *msg; | ||
679 | uint32_t bf_size; | 890 | uint32_t bf_size; |
680 | uint32_t chunk_size; | 891 | uint32_t chunk_size; |
681 | uint32_t bf_bits_per_element; | 892 | uint32_t bf_bits_per_element; |
682 | uint16_t msize; | ||
683 | 893 | ||
684 | msize = htons (mh->size); | ||
685 | if (msize < sizeof (struct BFMessage)) | ||
686 | { | ||
687 | GNUNET_break_op (0); | ||
688 | fail_intersection_operation (op); | ||
689 | return; | ||
690 | } | ||
691 | msg = (const struct BFMessage *) mh; | ||
692 | switch (op->state->phase) | 894 | switch (op->state->phase) |
693 | { | 895 | { |
694 | case PHASE_INITIAL: | 896 | case PHASE_INITIAL: |
695 | GNUNET_break_op (0); | 897 | GNUNET_break_op (0); |
696 | fail_intersection_operation (op); | 898 | fail_intersection_operation (op); |
697 | break; | 899 | return; |
698 | case PHASE_COUNT_SENT: | 900 | case PHASE_COUNT_SENT: |
699 | case PHASE_BF_EXCHANGE: | 901 | case PHASE_BF_EXCHANGE: |
700 | bf_size = ntohl (msg->bloomfilter_total_length); | 902 | bf_size = ntohl (msg->bloomfilter_total_length); |
701 | bf_bits_per_element = ntohl (msg->bits_per_element); | 903 | bf_bits_per_element = ntohl (msg->bits_per_element); |
702 | chunk_size = msize - sizeof (struct BFMessage); | 904 | chunk_size = htons (msg->header.size) - sizeof (struct BFMessage); |
703 | op->state->other_xor = msg->element_xor_hash; | 905 | op->state->other_xor = msg->element_xor_hash; |
704 | if (bf_size == chunk_size) | 906 | if (bf_size == chunk_size) |
705 | { | 907 | { |
@@ -715,9 +917,9 @@ handle_p2p_bf (void *cls, | |||
715 | bf_size, | 917 | bf_size, |
716 | bf_bits_per_element); | 918 | bf_bits_per_element); |
717 | op->state->salt = ntohl (msg->sender_mutator); | 919 | op->state->salt = ntohl (msg->sender_mutator); |
718 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | 920 | op->remote_element_count = ntohl (msg->sender_element_count); |
719 | process_bf (op); | 921 | process_bf (op); |
720 | return; | 922 | break; |
721 | } | 923 | } |
722 | /* multipart chunk */ | 924 | /* multipart chunk */ |
723 | if (NULL == op->state->bf_data) | 925 | if (NULL == op->state->bf_data) |
@@ -728,7 +930,7 @@ handle_p2p_bf (void *cls, | |||
728 | op->state->bf_bits_per_element = bf_bits_per_element; | 930 | op->state->bf_bits_per_element = bf_bits_per_element; |
729 | op->state->bf_data_offset = 0; | 931 | op->state->bf_data_offset = 0; |
730 | op->state->salt = ntohl (msg->sender_mutator); | 932 | op->state->salt = ntohl (msg->sender_mutator); |
731 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | 933 | op->remote_element_count = ntohl (msg->sender_element_count); |
732 | } | 934 | } |
733 | else | 935 | else |
734 | { | 936 | { |
@@ -737,7 +939,7 @@ handle_p2p_bf (void *cls, | |||
737 | (op->state->bf_bits_per_element != bf_bits_per_element) || | 939 | (op->state->bf_bits_per_element != bf_bits_per_element) || |
738 | (op->state->bf_data_offset + chunk_size > bf_size) || | 940 | (op->state->bf_data_offset + chunk_size > bf_size) || |
739 | (op->state->salt != ntohl (msg->sender_mutator)) || | 941 | (op->state->salt != ntohl (msg->sender_mutator)) || |
740 | (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) | 942 | (op->remote_element_count != ntohl (msg->sender_element_count)) ) |
741 | { | 943 | { |
742 | GNUNET_break_op (0); | 944 | GNUNET_break_op (0); |
743 | fail_intersection_operation (op); | 945 | fail_intersection_operation (op); |
@@ -764,153 +966,9 @@ handle_p2p_bf (void *cls, | |||
764 | default: | 966 | default: |
765 | GNUNET_break_op (0); | 967 | GNUNET_break_op (0); |
766 | fail_intersection_operation (op); | 968 | fail_intersection_operation (op); |
767 | break; | ||
768 | } | ||
769 | } | ||
770 | |||
771 | |||
772 | /** | ||
773 | * Fills the "my_elements" hashmap with the initial set of | ||
774 | * (non-deleted) elements from the set of the specification. | ||
775 | * | ||
776 | * @param cls closure with the `struct Operation *` | ||
777 | * @param key current key code for the element | ||
778 | * @param value value in the hash map with the `struct ElementEntry *` | ||
779 | * @return #GNUNET_YES (we should continue to iterate) | ||
780 | */ | ||
781 | static int | ||
782 | initialize_map_unfiltered (void *cls, | ||
783 | const struct GNUNET_HashCode *key, | ||
784 | void *value) | ||
785 | { | ||
786 | struct ElementEntry *ee = value; | ||
787 | struct Operation *op = cls; | ||
788 | |||
789 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | ||
790 | return GNUNET_YES; /* element not live in operation's generation */ | ||
791 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
792 | &ee->element_hash, | ||
793 | &op->state->my_xor); | ||
794 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
795 | "Initial full initialization of my_elements, adding %s:%u\n", | ||
796 | GNUNET_h2s (&ee->element_hash), | ||
797 | ee->element.size); | ||
798 | GNUNET_break (GNUNET_YES == | ||
799 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | ||
800 | &ee->element_hash, | ||
801 | ee, | ||
802 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
803 | return GNUNET_YES; | ||
804 | } | ||
805 | |||
806 | |||
807 | /** | ||
808 | * Send our element count to the peer, in case our element count is | ||
809 | * lower than his. | ||
810 | * | ||
811 | * @param op intersection operation | ||
812 | */ | ||
813 | static void | ||
814 | send_element_count (struct Operation *op) | ||
815 | { | ||
816 | struct GNUNET_MQ_Envelope *ev; | ||
817 | struct IntersectionElementInfoMessage *msg; | ||
818 | |||
819 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
820 | "Sending our element count (%u)\n", | ||
821 | op->state->my_element_count); | ||
822 | ev = GNUNET_MQ_msg (msg, | ||
823 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
824 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
825 | GNUNET_MQ_send (op->mq, ev); | ||
826 | } | ||
827 | |||
828 | |||
829 | /** | ||
830 | * We go first, initialize our map with all elements and | ||
831 | * send the first Bloom filter. | ||
832 | * | ||
833 | * @param op operation to start exchange for | ||
834 | */ | ||
835 | static void | ||
836 | begin_bf_exchange (struct Operation *op) | ||
837 | { | ||
838 | op->state->phase = PHASE_BF_EXCHANGE; | ||
839 | op->state->my_elements | ||
840 | = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, | ||
841 | GNUNET_YES); | ||
842 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
843 | &initialize_map_unfiltered, | ||
844 | op); | ||
845 | send_bloomfilter (op); | ||
846 | } | ||
847 | |||
848 | |||
849 | /** | ||
850 | * Handle the initial `struct IntersectionElementInfoMessage` from a | ||
851 | * remote peer. | ||
852 | * | ||
853 | * @param cls the intersection operation | ||
854 | * @param mh the header of the message | ||
855 | */ | ||
856 | static void | ||
857 | handle_p2p_element_info (void *cls, | ||
858 | const struct GNUNET_MessageHeader *mh) | ||
859 | { | ||
860 | struct Operation *op = cls; | ||
861 | const struct IntersectionElementInfoMessage *msg; | ||
862 | |||
863 | if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage)) | ||
864 | { | ||
865 | GNUNET_break_op (0); | ||
866 | fail_intersection_operation(op); | ||
867 | return; | ||
868 | } | ||
869 | msg = (const struct IntersectionElementInfoMessage *) mh; | ||
870 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | ||
871 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
872 | "Received remote element count (%u), I have %u\n", | ||
873 | op->spec->remote_element_count, | ||
874 | op->state->my_element_count); | ||
875 | if ( ( (PHASE_INITIAL != op->state->phase) && | ||
876 | (PHASE_COUNT_SENT != op->state->phase) ) || | ||
877 | (op->state->my_element_count > op->spec->remote_element_count) || | ||
878 | (0 == op->state->my_element_count) || | ||
879 | (0 == op->spec->remote_element_count) ) | ||
880 | { | ||
881 | GNUNET_break_op (0); | ||
882 | fail_intersection_operation(op); | ||
883 | return; | ||
884 | } | ||
885 | GNUNET_break (NULL == op->state->remote_bf); | ||
886 | begin_bf_exchange (op); | ||
887 | } | ||
888 | |||
889 | |||
890 | /** | ||
891 | * Send a result message to the client indicating that the operation | ||
892 | * is over. After the result done message has been sent to the | ||
893 | * client, destroy the evaluate operation. | ||
894 | * | ||
895 | * @param op intersection operation | ||
896 | */ | ||
897 | static void | ||
898 | finish_and_destroy (struct Operation *op) | ||
899 | { | ||
900 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
901 | |||
902 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) | ||
903 | { | ||
904 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
905 | "Sending full result set (%u elements)\n", | ||
906 | GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); | ||
907 | op->state->full_result_iter | ||
908 | = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); | ||
909 | op->keep++; | ||
910 | send_remaining_elements (op); | ||
911 | return; | 969 | return; |
912 | } | 970 | } |
913 | send_client_done_and_destroy (op); | 971 | GNUNET_CADET_receive_done (op->channel); |
914 | } | 972 | } |
915 | 973 | ||
916 | 974 | ||
@@ -955,28 +1013,26 @@ filter_all (void *cls, | |||
955 | * @param cls the intersection operation | 1013 | * @param cls the intersection operation |
956 | * @param mh the message | 1014 | * @param mh the message |
957 | */ | 1015 | */ |
958 | static void | 1016 | void |
959 | handle_p2p_done (void *cls, | 1017 | handle_intersection_p2p_done (void *cls, |
960 | const struct GNUNET_MessageHeader *mh) | 1018 | const struct IntersectionDoneMessage *idm) |
961 | { | 1019 | { |
962 | struct Operation *op = cls; | 1020 | struct Operation *op = cls; |
963 | const struct IntersectionDoneMessage *idm; | ||
964 | 1021 | ||
965 | if (PHASE_BF_EXCHANGE != op->state->phase) | 1022 | if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) |
966 | { | 1023 | { |
967 | /* wrong phase to conclude? FIXME: Or should we allow this | ||
968 | if the other peer has _initially_ already an empty set? */ | ||
969 | GNUNET_break_op (0); | 1024 | GNUNET_break_op (0); |
970 | fail_intersection_operation (op); | 1025 | fail_intersection_operation (op); |
971 | return; | 1026 | return; |
972 | } | 1027 | } |
973 | if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) | 1028 | if (PHASE_BF_EXCHANGE != op->state->phase) |
974 | { | 1029 | { |
1030 | /* wrong phase to conclude? FIXME: Or should we allow this | ||
1031 | if the other peer has _initially_ already an empty set? */ | ||
975 | GNUNET_break_op (0); | 1032 | GNUNET_break_op (0); |
976 | fail_intersection_operation (op); | 1033 | fail_intersection_operation (op); |
977 | return; | 1034 | return; |
978 | } | 1035 | } |
979 | idm = (const struct IntersectionDoneMessage *) mh; | ||
980 | if (0 == ntohl (idm->final_element_count)) | 1036 | if (0 == ntohl (idm->final_element_count)) |
981 | { | 1037 | { |
982 | /* other peer determined empty set is the intersection, | 1038 | /* other peer determined empty set is the intersection, |
@@ -998,8 +1054,22 @@ handle_p2p_done (void *cls, | |||
998 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1054 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
999 | "Got IntersectionDoneMessage, have %u elements in intersection\n", | 1055 | "Got IntersectionDoneMessage, have %u elements in intersection\n", |
1000 | op->state->my_element_count); | 1056 | op->state->my_element_count); |
1057 | op->state->phase = PHASE_DONE_RECEIVED; | ||
1058 | GNUNET_CADET_receive_done (op->channel); | ||
1059 | |||
1060 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
1061 | if (GNUNET_SET_RESULT_FULL == op->result_mode) | ||
1062 | { | ||
1063 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1064 | "Sending full result set to client (%u elements)\n", | ||
1065 | GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); | ||
1066 | op->state->full_result_iter | ||
1067 | = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); | ||
1068 | send_remaining_elements (op); | ||
1069 | return; | ||
1070 | } | ||
1001 | op->state->phase = PHASE_FINISHED; | 1071 | op->state->phase = PHASE_FINISHED; |
1002 | finish_and_destroy (op); | 1072 | send_client_done_and_destroy (op); |
1003 | } | 1073 | } |
1004 | 1074 | ||
1005 | 1075 | ||
@@ -1010,21 +1080,16 @@ handle_p2p_done (void *cls, | |||
1010 | * begin the evaluation | 1080 | * begin the evaluation |
1011 | * @param opaque_context message to be transmitted to the listener | 1081 | * @param opaque_context message to be transmitted to the listener |
1012 | * to convince him to accept, may be NULL | 1082 | * to convince him to accept, may be NULL |
1083 | * @return operation-specific state to keep in @a op | ||
1013 | */ | 1084 | */ |
1014 | static void | 1085 | static struct OperationState * |
1015 | intersection_evaluate (struct Operation *op, | 1086 | intersection_evaluate (struct Operation *op, |
1016 | const struct GNUNET_MessageHeader *opaque_context) | 1087 | const struct GNUNET_MessageHeader *opaque_context) |
1017 | { | 1088 | { |
1089 | struct OperationState *state; | ||
1018 | struct GNUNET_MQ_Envelope *ev; | 1090 | struct GNUNET_MQ_Envelope *ev; |
1019 | struct OperationRequestMessage *msg; | 1091 | struct OperationRequestMessage *msg; |
1020 | 1092 | ||
1021 | op->state = GNUNET_new (struct OperationState); | ||
1022 | /* we started the operation, thus we have to send the operation request */ | ||
1023 | op->state->phase = PHASE_INITIAL; | ||
1024 | op->state->my_element_count = op->spec->set->state->current_set_element_count; | ||
1025 | |||
1026 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1027 | "Initiating intersection operation evaluation\n"); | ||
1028 | ev = GNUNET_MQ_msg_nested_mh (msg, | 1093 | ev = GNUNET_MQ_msg_nested_mh (msg, |
1029 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 1094 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
1030 | opaque_context); | 1095 | opaque_context); |
@@ -1032,20 +1097,30 @@ intersection_evaluate (struct Operation *op, | |||
1032 | { | 1097 | { |
1033 | /* the context message is too large!? */ | 1098 | /* the context message is too large!? */ |
1034 | GNUNET_break (0); | 1099 | GNUNET_break (0); |
1035 | GNUNET_SERVER_client_disconnect (op->spec->set->client); | 1100 | return NULL; |
1036 | return; | ||
1037 | } | 1101 | } |
1102 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1103 | "Initiating intersection operation evaluation\n"); | ||
1104 | state = GNUNET_new (struct OperationState); | ||
1105 | /* we started the operation, thus we have to send the operation request */ | ||
1106 | state->phase = PHASE_INITIAL; | ||
1107 | state->my_element_count = op->set->state->current_set_element_count; | ||
1108 | state->my_elements | ||
1109 | = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, | ||
1110 | GNUNET_YES); | ||
1111 | |||
1038 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); | 1112 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); |
1039 | msg->element_count = htonl (op->state->my_element_count); | 1113 | msg->element_count = htonl (state->my_element_count); |
1040 | GNUNET_MQ_send (op->mq, | 1114 | GNUNET_MQ_send (op->mq, |
1041 | ev); | 1115 | ev); |
1042 | op->state->phase = PHASE_COUNT_SENT; | 1116 | state->phase = PHASE_COUNT_SENT; |
1043 | if (NULL != opaque_context) | 1117 | if (NULL != opaque_context) |
1044 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1118 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1045 | "Sent op request with context message\n"); | 1119 | "Sent op request with context message\n"); |
1046 | else | 1120 | else |
1047 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1121 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1048 | "Sent op request without context message\n"); | 1122 | "Sent op request without context message\n"); |
1123 | return state; | ||
1049 | } | 1124 | } |
1050 | 1125 | ||
1051 | 1126 | ||
@@ -1055,90 +1130,33 @@ intersection_evaluate (struct Operation *op, | |||
1055 | * | 1130 | * |
1056 | * @param op operation that will be accepted as an intersection operation | 1131 | * @param op operation that will be accepted as an intersection operation |
1057 | */ | 1132 | */ |
1058 | static void | 1133 | static struct OperationState * |
1059 | intersection_accept (struct Operation *op) | 1134 | intersection_accept (struct Operation *op) |
1060 | { | 1135 | { |
1136 | struct OperationState *state; | ||
1137 | |||
1061 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1138 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1062 | "Accepting set intersection operation\n"); | 1139 | "Accepting set intersection operation\n"); |
1063 | op->state = GNUNET_new (struct OperationState); | 1140 | state = GNUNET_new (struct OperationState); |
1064 | op->state->phase = PHASE_INITIAL; | 1141 | state->phase = PHASE_INITIAL; |
1065 | op->state->my_element_count | 1142 | state->my_element_count |
1066 | = op->spec->set->state->current_set_element_count; | 1143 | = op->set->state->current_set_element_count; |
1067 | op->state->my_elements | 1144 | state->my_elements |
1068 | = GNUNET_CONTAINER_multihashmap_create | 1145 | = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count, |
1069 | (GNUNET_MIN (op->state->my_element_count, | 1146 | op->remote_element_count), |
1070 | op->spec->remote_element_count), | 1147 | GNUNET_YES); |
1071 | GNUNET_YES); | 1148 | op->state = state; |
1072 | if (op->spec->remote_element_count < op->state->my_element_count) | 1149 | if (op->remote_element_count < state->my_element_count) |
1073 | { | 1150 | { |
1074 | /* If the other peer (Alice) has fewer elements than us (Bob), | 1151 | /* If the other peer (Alice) has fewer elements than us (Bob), |
1075 | we just send the count as Alice should send the first BF */ | 1152 | we just send the count as Alice should send the first BF */ |
1076 | send_element_count (op); | 1153 | send_element_count (op); |
1077 | op->state->phase = PHASE_COUNT_SENT; | 1154 | state->phase = PHASE_COUNT_SENT; |
1078 | return; | 1155 | return state; |
1079 | } | 1156 | } |
1080 | /* We have fewer elements, so we start with the BF */ | 1157 | /* We have fewer elements, so we start with the BF */ |
1081 | begin_bf_exchange (op); | 1158 | begin_bf_exchange (op); |
1082 | } | 1159 | return state; |
1083 | |||
1084 | |||
1085 | /** | ||
1086 | * Dispatch messages for a intersection operation. | ||
1087 | * | ||
1088 | * @param op the state of the intersection evaluate operation | ||
1089 | * @param mh the received message | ||
1090 | * @return #GNUNET_SYSERR if the tunnel should be disconnected, | ||
1091 | * #GNUNET_OK otherwise | ||
1092 | */ | ||
1093 | static int | ||
1094 | intersection_handle_p2p_message (struct Operation *op, | ||
1095 | const struct GNUNET_MessageHeader *mh) | ||
1096 | { | ||
1097 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1098 | "Received p2p message (t: %u, s: %u)\n", | ||
1099 | ntohs (mh->type), ntohs (mh->size)); | ||
1100 | switch (ntohs (mh->type)) | ||
1101 | { | ||
1102 | /* this message handler is not active until after we received an | ||
1103 | * operation request message, thus the ops request is not handled here | ||
1104 | */ | ||
1105 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO: | ||
1106 | handle_p2p_element_info (op, mh); | ||
1107 | break; | ||
1108 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: | ||
1109 | handle_p2p_bf (op, mh); | ||
1110 | break; | ||
1111 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE: | ||
1112 | handle_p2p_done (op, mh); | ||
1113 | break; | ||
1114 | default: | ||
1115 | /* something wrong with cadet's message handlers? */ | ||
1116 | GNUNET_assert (0); | ||
1117 | } | ||
1118 | return GNUNET_OK; | ||
1119 | } | ||
1120 | |||
1121 | |||
1122 | /** | ||
1123 | * Handler for peer-disconnects, notifies the client about the aborted | ||
1124 | * operation. If we did not expect anything from the other peer, we | ||
1125 | * gracefully terminate the operation. | ||
1126 | * | ||
1127 | * @param op the destroyed operation | ||
1128 | */ | ||
1129 | static void | ||
1130 | intersection_peer_disconnect (struct Operation *op) | ||
1131 | { | ||
1132 | if (PHASE_FINISHED != op->state->phase) | ||
1133 | { | ||
1134 | fail_intersection_operation (op); | ||
1135 | return; | ||
1136 | } | ||
1137 | /* the session has already been concluded */ | ||
1138 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1139 | "Other peer disconnected (finished)\n"); | ||
1140 | if (GNUNET_NO == op->state->client_done_sent) | ||
1141 | finish_and_destroy (op); | ||
1142 | } | 1160 | } |
1143 | 1161 | ||
1144 | 1162 | ||
@@ -1168,6 +1186,11 @@ intersection_op_cancel (struct Operation *op) | |||
1168 | GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); | 1186 | GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); |
1169 | op->state->my_elements = NULL; | 1187 | op->state->my_elements = NULL; |
1170 | } | 1188 | } |
1189 | if (NULL != op->state->full_result_iter) | ||
1190 | { | ||
1191 | GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); | ||
1192 | op->state->full_result_iter = NULL; | ||
1193 | } | ||
1171 | GNUNET_free (op->state); | 1194 | GNUNET_free (op->state); |
1172 | op->state = NULL; | 1195 | op->state = NULL; |
1173 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1196 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1236,6 +1259,28 @@ intersection_remove (struct SetState *set_state, | |||
1236 | 1259 | ||
1237 | 1260 | ||
1238 | /** | 1261 | /** |
1262 | * Callback for channel death for the intersection operation. | ||
1263 | * | ||
1264 | * @param op operation that lost the channel | ||
1265 | */ | ||
1266 | static void | ||
1267 | intersection_channel_death (struct Operation *op) | ||
1268 | { | ||
1269 | if (GNUNET_YES == op->state->channel_death_expected) | ||
1270 | { | ||
1271 | /* oh goodie, we are done! */ | ||
1272 | send_client_done_and_destroy (op); | ||
1273 | } | ||
1274 | else | ||
1275 | { | ||
1276 | /* sorry, channel went down early, too bad. */ | ||
1277 | _GSS_operation_destroy (op, | ||
1278 | GNUNET_YES); | ||
1279 | } | ||
1280 | } | ||
1281 | |||
1282 | |||
1283 | /** | ||
1239 | * Get the table with implementing functions for set intersection. | 1284 | * Get the table with implementing functions for set intersection. |
1240 | * | 1285 | * |
1241 | * @return the operation specific VTable | 1286 | * @return the operation specific VTable |
@@ -1245,14 +1290,13 @@ _GSS_intersection_vt () | |||
1245 | { | 1290 | { |
1246 | static const struct SetVT intersection_vt = { | 1291 | static const struct SetVT intersection_vt = { |
1247 | .create = &intersection_set_create, | 1292 | .create = &intersection_set_create, |
1248 | .msg_handler = &intersection_handle_p2p_message, | ||
1249 | .add = &intersection_add, | 1293 | .add = &intersection_add, |
1250 | .remove = &intersection_remove, | 1294 | .remove = &intersection_remove, |
1251 | .destroy_set = &intersection_set_destroy, | 1295 | .destroy_set = &intersection_set_destroy, |
1252 | .evaluate = &intersection_evaluate, | 1296 | .evaluate = &intersection_evaluate, |
1253 | .accept = &intersection_accept, | 1297 | .accept = &intersection_accept, |
1254 | .peer_disconnect = &intersection_peer_disconnect, | ||
1255 | .cancel = &intersection_op_cancel, | 1298 | .cancel = &intersection_op_cancel, |
1299 | .channel_death = &intersection_channel_death, | ||
1256 | }; | 1300 | }; |
1257 | 1301 | ||
1258 | return &intersection_vt; | 1302 | return &intersection_vt; |