diff options
author | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-11-18 17:49:57 +0000 |
---|---|---|
committer | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-11-18 17:49:57 +0000 |
commit | ce25faf22dabe2ba33ed8bd8bea4d0ed6cd7acf8 (patch) | |
tree | 49398cd5c65375332da6681bb657e9ffdc456e4f /src/set | |
parent | e317089337c59773aecd7aef7c761be4804462f3 (diff) | |
download | gnunet-ce25faf22dabe2ba33ed8bd8bea4d0ed6cd7acf8.tar.gz gnunet-ce25faf22dabe2ba33ed8bd8bea4d0ed6cd7acf8.zip |
more work on set intersection
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 106 |
1 files changed, 75 insertions, 31 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 1c79d3c73..879f65809 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -558,38 +558,51 @@ send_element_count (struct Operation *op) | |||
558 | GNUNET_MQ_send (op->mq, ev); | 558 | GNUNET_MQ_send (op->mq, ev); |
559 | } | 559 | } |
560 | 560 | ||
561 | /** | ||
562 | * Send a result message to the client indicating | ||
563 | * that the operation is over. | ||
564 | * After the result done message has been sent to the client, | ||
565 | * destroy the evaluate operation. | ||
566 | * | ||
567 | * @param op intersection operation | ||
568 | */ | ||
569 | static void | ||
570 | finish_and_destroy (struct Operation *op) | ||
571 | { | ||
572 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
573 | |||
574 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) | ||
575 | { | ||
576 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); | ||
577 | op->state->full_result_iter = | ||
578 | GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements); | ||
579 | send_remaining_elements (op); | ||
580 | return; | ||
581 | } | ||
582 | send_done_and_destroy (op); | ||
583 | } | ||
561 | 584 | ||
562 | /** | 585 | /** |
563 | * Handle a done message from a remote peer | 586 | * Handle a done message from a remote peer |
564 | * | 587 | * |
565 | * @param cls the intersection operation | 588 | * @param cls the union operation |
566 | * @param mh the message | 589 | * @param mh the message |
567 | */ | 590 | */ |
568 | static void | 591 | static void |
569 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | 592 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) |
570 | { | 593 | { |
571 | struct OperationState *eo = cls; | 594 | struct Operation *op = cls; |
572 | struct GNUNET_MQ_Envelope *ev; | 595 | struct GNUNET_MQ_Envelope *ev; |
573 | 596 | ||
574 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 597 | if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){ |
575 | { | ||
576 | /* we got all requests, but still have to send our elements as response */ | ||
577 | |||
578 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); | ||
579 | eo->phase = PHASE_FINISHED; | ||
580 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | ||
581 | GNUNET_MQ_send (eo->mq, ev); | ||
582 | return; | ||
583 | } | ||
584 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | ||
585 | { | ||
586 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); | 598 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); |
587 | eo->phase = PHASE_FINISHED; | 599 | |
588 | send_client_done_and_destroy (eo); | 600 | finish_and_destroy (op); |
589 | return; | 601 | return; |
590 | } | 602 | } |
591 | GNUNET_break (0); | 603 | |
592 | fail_intersection_operation (eo); | 604 | GNUNET_break_op (0); |
605 | fail_intersection_operation (op); | ||
593 | } | 606 | } |
594 | 607 | ||
595 | 608 | ||
@@ -766,27 +779,57 @@ send_done_and_destroy (void *cls) | |||
766 | } | 779 | } |
767 | 780 | ||
768 | /** | 781 | /** |
769 | * Send a result message to the client indicating | 782 | * Send all elements in the full result iterator. |
770 | * that the operation is over. | ||
771 | * After the result done message has been sent to the client, | ||
772 | * destroy the evaluate operation. | ||
773 | * | 783 | * |
774 | * @param op union operation | 784 | * @param cls operation |
775 | */ | 785 | */ |
776 | static void | 786 | static void |
777 | finish_and_destroy (struct Operation *op) | 787 | send_remaining_elements (void *cls) |
778 | { | 788 | { |
779 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | 789 | struct Operation *op = cls; |
790 | struct KeyEntry *ke; | ||
791 | int res; | ||
780 | 792 | ||
781 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) | 793 | res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke); |
794 | res = GNUNET_NO; | ||
795 | if (GNUNET_NO == res) | ||
782 | { | 796 | { |
783 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); | 797 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n"); |
784 | GNUNET_assert (NULL == op->state->full_result_iter); | 798 | send_done_and_destroy (op); |
785 | op->state->full_result_iter = | ||
786 | GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements); | ||
787 | return; | 799 | return; |
788 | } | 800 | } |
789 | send_done_and_destroy (op); | 801 | |
802 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n"); | ||
803 | |||
804 | while (1) | ||
805 | { | ||
806 | struct GNUNET_MQ_Envelope *ev; | ||
807 | struct GNUNET_SET_ResultMessage *rm; | ||
808 | struct GNUNET_SET_Element *element; | ||
809 | element = &ke->element->element; | ||
810 | |||
811 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size); | ||
812 | GNUNET_assert (0 != op->spec->client_request_id); | ||
813 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
814 | if (NULL == ev) | ||
815 | { | ||
816 | GNUNET_MQ_discard (ev); | ||
817 | GNUNET_break (0); | ||
818 | continue; | ||
819 | } | ||
820 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | ||
821 | rm->request_id = htonl (op->spec->client_request_id); | ||
822 | rm->element_type = element->type; | ||
823 | memcpy (&rm[1], element->data, element->size); | ||
824 | if (ke->next_colliding == NULL) | ||
825 | { | ||
826 | GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); | ||
827 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
828 | break; | ||
829 | } | ||
830 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
831 | ke = ke->next_colliding; | ||
832 | } | ||
790 | } | 833 | } |
791 | 834 | ||
792 | /** | 835 | /** |
@@ -811,6 +854,7 @@ intersection_peer_disconnect (struct Operation *op) | |||
811 | _GSS_operation_destroy (op); | 854 | _GSS_operation_destroy (op); |
812 | return; | 855 | return; |
813 | } | 856 | } |
857 | // else: the session has already been concluded | ||
814 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); | 858 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); |
815 | if (GNUNET_NO == op->state->client_done_sent) | 859 | if (GNUNET_NO == op->state->client_done_sent) |
816 | finish_and_destroy (op); | 860 | finish_and_destroy (op); |