diff options
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 262 |
1 files changed, 231 insertions, 31 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index efedbcef6..694fb6056 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -135,11 +135,6 @@ struct UnionEvaluateOperation | |||
135 | struct GNUNET_MQ_MessageQueue *mq; | 135 | struct GNUNET_MQ_MessageQueue *mq; |
136 | 136 | ||
137 | /** | 137 | /** |
138 | * Type of this operation | ||
139 | */ | ||
140 | enum GNUNET_SET_OperationType operation; | ||
141 | |||
142 | /** | ||
143 | * Request ID to multiplex set operations to | 138 | * Request ID to multiplex set operations to |
144 | * the client inhabiting the set. | 139 | * the client inhabiting the set. |
145 | */ | 140 | */ |
@@ -330,6 +325,45 @@ struct UnionState | |||
330 | }; | 325 | }; |
331 | 326 | ||
332 | 327 | ||
328 | |||
329 | /** | ||
330 | * Iterator over hash map entries. | ||
331 | * | ||
332 | * @param cls closure | ||
333 | * @param key current key code | ||
334 | * @param value value in the hash map | ||
335 | * @return GNUNET_YES if we should continue to | ||
336 | * iterate, | ||
337 | * GNUNET_NO if not. | ||
338 | */ | ||
339 | static int | ||
340 | destroy_elements_iterator (void *cls, | ||
341 | const struct GNUNET_HashCode * key, | ||
342 | void *value) | ||
343 | { | ||
344 | struct ElementEntry *ee = value; | ||
345 | |||
346 | GNUNET_free (ee); | ||
347 | return GNUNET_YES; | ||
348 | } | ||
349 | |||
350 | |||
351 | /** | ||
352 | * Destroy the elements belonging to a union set. | ||
353 | * | ||
354 | * @param us union state that contains the elements | ||
355 | */ | ||
356 | static void | ||
357 | destroy_elements (struct UnionState *us) | ||
358 | { | ||
359 | if (NULL == us->elements) | ||
360 | return; | ||
361 | GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL); | ||
362 | GNUNET_CONTAINER_multihashmap_destroy (us->elements); | ||
363 | us->elements = NULL; | ||
364 | } | ||
365 | |||
366 | |||
333 | /** | 367 | /** |
334 | * Destroy a union operation, and free all resources | 368 | * Destroy a union operation, and free all resources |
335 | * associated with it. | 369 | * associated with it. |
@@ -339,6 +373,38 @@ struct UnionState | |||
339 | static void | 373 | static void |
340 | destroy_union_operation (struct UnionEvaluateOperation *eo) | 374 | destroy_union_operation (struct UnionEvaluateOperation *eo) |
341 | { | 375 | { |
376 | if (NULL != eo->mq) | ||
377 | { | ||
378 | GNUNET_MQ_destroy (eo->mq); | ||
379 | eo->mq = NULL; | ||
380 | } | ||
381 | if (NULL != eo->socket) | ||
382 | { | ||
383 | GNUNET_STREAM_close (eo->socket); | ||
384 | eo->socket = NULL; | ||
385 | } | ||
386 | if (NULL != eo->remote_ibf) | ||
387 | { | ||
388 | ibf_destroy (eo->remote_ibf); | ||
389 | eo->remote_ibf = NULL; | ||
390 | } | ||
391 | if (NULL != eo->local_ibf) | ||
392 | { | ||
393 | ibf_destroy (eo->local_ibf); | ||
394 | eo->local_ibf = NULL; | ||
395 | } | ||
396 | if (NULL != eo->se) | ||
397 | { | ||
398 | strata_estimator_destroy (eo->se); | ||
399 | eo->se = NULL; | ||
400 | } | ||
401 | if (NULL != eo->key_to_element) | ||
402 | { | ||
403 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); | ||
404 | eo->key_to_element = NULL; | ||
405 | } | ||
406 | |||
407 | |||
342 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, | 408 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, |
343 | eo->set->state.u->ops_tail, | 409 | eo->set->state.u->ops_tail, |
344 | eo); | 410 | eo); |
@@ -361,7 +427,7 @@ fail_union_operation (struct UnionEvaluateOperation *eo) | |||
361 | 427 | ||
362 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 428 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
363 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 429 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
364 | msg->request_id = eo->request_id; | 430 | msg->request_id = htonl (eo->request_id); |
365 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 431 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
366 | destroy_union_operation (eo); | 432 | destroy_union_operation (eo); |
367 | } | 433 | } |
@@ -405,12 +471,12 @@ send_operation_request (struct UnionEvaluateOperation *eo) | |||
405 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) | 471 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) |
406 | { | 472 | { |
407 | /* the context message is too large */ | 473 | /* the context message is too large */ |
408 | _GSS_client_disconnect (eo->set->client); | ||
409 | GNUNET_MQ_discard (mqm); | ||
410 | GNUNET_break (0); | 474 | GNUNET_break (0); |
475 | GNUNET_SERVER_client_disconnect (eo->set->client); | ||
476 | GNUNET_MQ_discard (mqm); | ||
411 | return; | 477 | return; |
412 | } | 478 | } |
413 | msg->operation = eo->operation; | 479 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); |
414 | msg->app_id = eo->app_id; | 480 | msg->app_id = eo->app_id; |
415 | GNUNET_MQ_send (eo->mq, mqm); | 481 | GNUNET_MQ_send (eo->mq, mqm); |
416 | 482 | ||
@@ -547,7 +613,7 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) | |||
547 | { | 613 | { |
548 | unsigned int len; | 614 | unsigned int len; |
549 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); | 615 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); |
550 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len); | 616 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); |
551 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, | 617 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, |
552 | init_key_to_element_iterator, eo); | 618 | init_key_to_element_iterator, eo); |
553 | } | 619 | } |
@@ -573,6 +639,8 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
573 | 639 | ||
574 | prepare_ibf (eo, 1<<ibf_order); | 640 | prepare_ibf (eo, 1<<ibf_order); |
575 | 641 | ||
642 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order); | ||
643 | |||
576 | ibf = eo->local_ibf; | 644 | ibf = eo->local_ibf; |
577 | 645 | ||
578 | while (buckets_sent < (1 << ibf_order)) | 646 | while (buckets_sent < (1 << ibf_order)) |
@@ -588,7 +656,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
588 | 656 | ||
589 | mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, | 657 | mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, |
590 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); | 658 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); |
591 | msg->order = htons (ibf_order); | 659 | msg->order = ibf_order; |
592 | msg->offset = htons (buckets_sent); | 660 | msg->offset = htons (buckets_sent); |
593 | ibf_write_slice (ibf, buckets_sent, | 661 | ibf_write_slice (ibf, buckets_sent, |
594 | buckets_in_message, &msg[1]); | 662 | buckets_in_message, &msg[1]); |
@@ -654,7 +722,6 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | |||
654 | struct StrataEstimator *remote_se; | 722 | struct StrataEstimator *remote_se; |
655 | int diff; | 723 | int diff; |
656 | 724 | ||
657 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n"); | ||
658 | 725 | ||
659 | if (eo->phase != PHASE_EXPECT_SE) | 726 | if (eo->phase != PHASE_EXPECT_SE) |
660 | { | 727 | { |
@@ -667,6 +734,7 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | |||
667 | strata_estimator_read (&mh[1], remote_se); | 734 | strata_estimator_read (&mh[1], remote_se); |
668 | GNUNET_assert (NULL != eo->se); | 735 | GNUNET_assert (NULL != eo->se); |
669 | diff = strata_estimator_difference (remote_se, eo->se); | 736 | diff = strata_estimator_difference (remote_se, eo->se); |
737 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff); | ||
670 | strata_estimator_destroy (remote_se); | 738 | strata_estimator_destroy (remote_se); |
671 | strata_estimator_destroy (eo->se); | 739 | strata_estimator_destroy (eo->se); |
672 | eo->se = NULL; | 740 | eo->se = NULL; |
@@ -708,6 +776,7 @@ send_element_iterator (void *cls, | |||
708 | continue; | 776 | continue; |
709 | } | 777 | } |
710 | GNUNET_MQ_send (eo->mq, mqm); | 778 | GNUNET_MQ_send (eo->mq, mqm); |
779 | ke = ke->next_colliding; | ||
711 | } | 780 | } |
712 | return GNUNET_NO; | 781 | return GNUNET_NO; |
713 | } | 782 | } |
@@ -731,7 +800,6 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key | |||
731 | } | 800 | } |
732 | 801 | ||
733 | 802 | ||
734 | |||
735 | /** | 803 | /** |
736 | * Decode which elements are missing on each side, and | 804 | * Decode which elements are missing on each side, and |
737 | * send the appropriate elemens and requests | 805 | * send the appropriate elemens and requests |
@@ -758,11 +826,22 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
758 | res = ibf_decode (diff_ibf, &side, &key); | 826 | res = ibf_decode (diff_ibf, &side, &key); |
759 | if (GNUNET_SYSERR == res) | 827 | if (GNUNET_SYSERR == res) |
760 | { | 828 | { |
761 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n", | 829 | int next_order; |
762 | diff_ibf->size * 2); | 830 | next_order = 0; |
763 | send_ibf (eo, diff_ibf->size * 2); | 831 | while (1<<next_order < diff_ibf->size) |
764 | ibf_destroy (diff_ibf); | 832 | next_order++; |
765 | return; | 833 | next_order++; |
834 | if (next_order <= MAX_IBF_ORDER) | ||
835 | { | ||
836 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n", | ||
837 | 1<<next_order); | ||
838 | send_ibf (eo, next_order); | ||
839 | } | ||
840 | else | ||
841 | { | ||
842 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "set union failed: reached ibf limit\n"); | ||
843 | } | ||
844 | break; | ||
766 | } | 845 | } |
767 | if (GNUNET_NO == res) | 846 | if (GNUNET_NO == res) |
768 | { | 847 | { |
@@ -771,7 +850,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
771 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); | 850 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); |
772 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 851 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
773 | GNUNET_MQ_send (eo->mq, mqm); | 852 | GNUNET_MQ_send (eo->mq, mqm); |
774 | return; | 853 | break; |
775 | } | 854 | } |
776 | if (1 == side) | 855 | if (1 == side) |
777 | { | 856 | { |
@@ -790,6 +869,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
790 | GNUNET_MQ_send (eo->mq, mqm); | 869 | GNUNET_MQ_send (eo->mq, mqm); |
791 | } | 870 | } |
792 | } | 871 | } |
872 | ibf_destroy (diff_ibf); | ||
793 | } | 873 | } |
794 | 874 | ||
795 | 875 | ||
@@ -811,6 +891,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
811 | { | 891 | { |
812 | eo->phase = PHASE_EXPECT_IBF_CONT; | 892 | eo->phase = PHASE_EXPECT_IBF_CONT; |
813 | GNUNET_assert (NULL == eo->remote_ibf); | 893 | GNUNET_assert (NULL == eo->remote_ibf); |
894 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order); | ||
814 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 895 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); |
815 | if (0 != ntohs (msg->offset)) | 896 | if (0 != ntohs (msg->offset)) |
816 | { | 897 | { |
@@ -825,6 +906,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
825 | { | 906 | { |
826 | GNUNET_break (0); | 907 | GNUNET_break (0); |
827 | fail_union_operation (eo); | 908 | fail_union_operation (eo); |
909 | return; | ||
828 | } | 910 | } |
829 | } | 911 | } |
830 | 912 | ||
@@ -834,13 +916,16 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
834 | { | 916 | { |
835 | GNUNET_break (0); | 917 | GNUNET_break (0); |
836 | fail_union_operation (eo); | 918 | fail_union_operation (eo); |
919 | return; | ||
837 | } | 920 | } |
838 | 921 | ||
839 | ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); | 922 | ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); |
840 | eo->ibf_buckets_received += buckets_in_message; | 923 | eo->ibf_buckets_received += buckets_in_message; |
841 | 924 | ||
842 | if (eo->ibf_buckets_received == eo->remote_ibf->size) | 925 | if (eo->ibf_buckets_received == eo->remote_ibf->size) |
843 | { | 926 | { |
927 | |||
928 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n"); | ||
844 | eo->phase = PHASE_EXPECT_ELEMENTS; | 929 | eo->phase = PHASE_EXPECT_ELEMENTS; |
845 | decode_and_send (eo); | 930 | decode_and_send (eo); |
846 | } | 931 | } |
@@ -848,7 +933,8 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
848 | 933 | ||
849 | 934 | ||
850 | /** | 935 | /** |
851 | * Send an element to the client of the operations's set. | 936 | * Send a result message to the client indicating |
937 | * that there is a new element. | ||
852 | * | 938 | * |
853 | * @param eo union operation | 939 | * @param eo union operation |
854 | * @param element element to send | 940 | * @param element element to send |
@@ -862,6 +948,8 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
862 | 948 | ||
863 | GNUNET_assert (0 != eo->request_id); | 949 | GNUNET_assert (0 != eo->request_id); |
864 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 950 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
951 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | ||
952 | rm->request_id = htonl (eo->request_id); | ||
865 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) | 953 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) |
866 | { | 954 | { |
867 | GNUNET_MQ_discard (mqm); | 955 | GNUNET_MQ_discard (mqm); |
@@ -869,7 +957,46 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
869 | return; | 957 | return; |
870 | } | 958 | } |
871 | 959 | ||
872 | GNUNET_MQ_send (eo->mq, mqm); | 960 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
961 | } | ||
962 | |||
963 | |||
964 | /** | ||
965 | * Callback used for notifications | ||
966 | * | ||
967 | * @param cls closure | ||
968 | */ | ||
969 | static void | ||
970 | client_done_sent_cb (void *cls) | ||
971 | { | ||
972 | //struct UnionEvaluateOperation *eo = cls; | ||
973 | /* FIXME: destroy eo */ | ||
974 | } | ||
975 | |||
976 | |||
977 | /** | ||
978 | * Send a result message to the client indicating | ||
979 | * that the operation is over. | ||
980 | * After the result done message has been sent to the client, | ||
981 | * destroy the evaluate operation. | ||
982 | * | ||
983 | * @param eo union operation | ||
984 | * @param element element to send | ||
985 | */ | ||
986 | static void | ||
987 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | ||
988 | { | ||
989 | struct GNUNET_MQ_Message *mqm; | ||
990 | struct ResultMessage *rm; | ||
991 | |||
992 | GNUNET_assert (0 != eo->request_id); | ||
993 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
994 | rm->request_id = htonl (eo->request_id); | ||
995 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | ||
996 | GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo); | ||
997 | GNUNET_MQ_send (eo->set->client_mq, mqm); | ||
998 | |||
999 | /* FIXME: destroy the eo */ | ||
873 | } | 1000 | } |
874 | 1001 | ||
875 | 1002 | ||
@@ -886,6 +1013,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
886 | struct ElementEntry *ee; | 1013 | struct ElementEntry *ee; |
887 | uint16_t element_size; | 1014 | uint16_t element_size; |
888 | 1015 | ||
1016 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n"); | ||
1017 | |||
889 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && | 1018 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && |
890 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) | 1019 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) |
891 | { | 1020 | { |
@@ -920,8 +1049,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
920 | /* look up elements and send them */ | 1049 | /* look up elements and send them */ |
921 | if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1050 | if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
922 | { | 1051 | { |
923 | fail_union_operation (eo); | ||
924 | GNUNET_break (0); | 1052 | GNUNET_break (0); |
1053 | fail_union_operation (eo); | ||
925 | return; | 1054 | return; |
926 | } | 1055 | } |
927 | 1056 | ||
@@ -929,8 +1058,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
929 | 1058 | ||
930 | if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) | 1059 | if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) |
931 | { | 1060 | { |
932 | fail_union_operation (eo); | ||
933 | GNUNET_break (0); | 1061 | GNUNET_break (0); |
1062 | fail_union_operation (eo); | ||
934 | return; | 1063 | return; |
935 | } | 1064 | } |
936 | 1065 | ||
@@ -944,6 +1073,20 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
944 | 1073 | ||
945 | 1074 | ||
946 | /** | 1075 | /** |
1076 | * Callback used for notifications | ||
1077 | * | ||
1078 | * @param cls closure | ||
1079 | */ | ||
1080 | static void | ||
1081 | peer_done_sent_cb (void *cls) | ||
1082 | { | ||
1083 | struct UnionEvaluateOperation *eo = cls; | ||
1084 | |||
1085 | send_client_done_and_destroy (eo); | ||
1086 | } | ||
1087 | |||
1088 | |||
1089 | /** | ||
947 | * Handle a done message from a remote peer | 1090 | * Handle a done message from a remote peer |
948 | * | 1091 | * |
949 | * @param cls the union operation | 1092 | * @param cls the union operation |
@@ -959,15 +1102,18 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
959 | /* we got all requests, but still have to send our elements as response */ | 1102 | /* we got all requests, but still have to send our elements as response */ |
960 | struct GNUNET_MQ_Message *mqm; | 1103 | struct GNUNET_MQ_Message *mqm; |
961 | 1104 | ||
1105 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); | ||
962 | eo->phase = PHASE_FINISHED; | 1106 | eo->phase = PHASE_FINISHED; |
963 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1107 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
1108 | GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); | ||
964 | GNUNET_MQ_send (eo->mq, mqm); | 1109 | GNUNET_MQ_send (eo->mq, mqm); |
965 | return; | 1110 | return; |
966 | } | 1111 | } |
967 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | 1112 | if (eo->phase == PHASE_EXPECT_ELEMENTS) |
968 | { | 1113 | { |
969 | /* it's all over! */ | 1114 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n"); |
970 | eo->phase = PHASE_FINISHED; | 1115 | eo->phase = PHASE_FINISHED; |
1116 | send_client_done_and_destroy (eo); | ||
971 | return; | 1117 | return; |
972 | } | 1118 | } |
973 | GNUNET_break (0); | 1119 | GNUNET_break (0); |
@@ -1026,19 +1172,27 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) | |||
1026 | { | 1172 | { |
1027 | struct UnionEvaluateOperation *eo; | 1173 | struct UnionEvaluateOperation *eo; |
1028 | 1174 | ||
1029 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n"); | ||
1030 | |||
1031 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1175 | eo = GNUNET_new (struct UnionEvaluateOperation); |
1032 | eo->peer = m->peer; | 1176 | eo->peer = m->peer; |
1033 | eo->set = set; | 1177 | eo->set = set; |
1034 | eo->request_id = htons(m->request_id); | 1178 | eo->request_id = htonl (m->request_id); |
1179 | GNUNET_assert (0 != eo->request_id); | ||
1035 | eo->se = strata_estimator_dup (set->state.u->se); | 1180 | eo->se = strata_estimator_dup (set->state.u->se); |
1036 | eo->salt = ntohs (m->salt); | 1181 | eo->salt = ntohs (m->salt); |
1037 | eo->app_id = m->app_id; | 1182 | eo->app_id = m->app_id; |
1183 | |||
1184 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n", | ||
1185 | GNUNET_h2s (&eo->app_id)); | ||
1186 | |||
1038 | eo->socket = | 1187 | eo->socket = |
1039 | GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, | 1188 | GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, |
1040 | stream_open_cb, eo, | 1189 | stream_open_cb, eo, |
1041 | GNUNET_STREAM_OPTION_END); | 1190 | GNUNET_STREAM_OPTION_END); |
1191 | |||
1192 | |||
1193 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, | ||
1194 | eo->set->state.u->ops_tail, | ||
1195 | eo); | ||
1042 | /* the stream open callback will kick off the operation */ | 1196 | /* the stream open callback will kick off the operation */ |
1043 | } | 1197 | } |
1044 | 1198 | ||
@@ -1056,18 +1210,30 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set, | |||
1056 | { | 1210 | { |
1057 | struct UnionEvaluateOperation *eo; | 1211 | struct UnionEvaluateOperation *eo; |
1058 | 1212 | ||
1213 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); | ||
1214 | |||
1059 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1215 | eo = GNUNET_new (struct UnionEvaluateOperation); |
1060 | eo->generation_created = set->state.u->current_generation++; | 1216 | eo->generation_created = set->state.u->current_generation++; |
1061 | eo->set = set; | 1217 | eo->set = set; |
1062 | eo->peer = incoming->peer; | 1218 | eo->peer = incoming->peer; |
1063 | eo->salt = ntohs (incoming->salt); | 1219 | eo->salt = ntohs (incoming->salt); |
1064 | eo->request_id = m->request_id; | 1220 | GNUNET_assert (0 != ntohl (m->request_id)); |
1221 | eo->request_id = ntohl (m->request_id); | ||
1065 | eo->se = strata_estimator_dup (set->state.u->se); | 1222 | eo->se = strata_estimator_dup (set->state.u->se); |
1066 | eo->set = set; | 1223 | eo->set = set; |
1067 | eo->mq = incoming->mq; | 1224 | eo->mq = incoming->mq; |
1225 | /* transfer ownership of mq and socket from incoming to eo */ | ||
1226 | incoming->mq = NULL; | ||
1227 | eo->socket = incoming->socket; | ||
1228 | incoming->socket = NULL; | ||
1068 | /* the peer's socket is now ours, we'll receive all messages */ | 1229 | /* the peer's socket is now ours, we'll receive all messages */ |
1069 | GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); | 1230 | GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); |
1070 | /* kick of the operation */ | 1231 | |
1232 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, | ||
1233 | eo->set->state.u->ops_tail, | ||
1234 | eo); | ||
1235 | |||
1236 | /* kick off the operation */ | ||
1071 | send_strata_estimator (eo); | 1237 | send_strata_estimator (eo); |
1072 | } | 1238 | } |
1073 | 1239 | ||
@@ -1082,7 +1248,7 @@ _GSS_union_set_create (void) | |||
1082 | { | 1248 | { |
1083 | struct Set *set; | 1249 | struct Set *set; |
1084 | 1250 | ||
1085 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n"); | 1251 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n"); |
1086 | 1252 | ||
1087 | set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); | 1253 | set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); |
1088 | set->state.u = (struct UnionState *) &set[1]; | 1254 | set->state.u = (struct UnionState *) &set[1]; |
@@ -1109,6 +1275,8 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set) | |||
1109 | struct ElementEntry *ee_dup; | 1275 | struct ElementEntry *ee_dup; |
1110 | uint16_t element_size; | 1276 | uint16_t element_size; |
1111 | 1277 | ||
1278 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n"); | ||
1279 | |||
1112 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); | 1280 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); |
1113 | element_size = ntohs (m->header.size) - sizeof *m; | 1281 | element_size = ntohs (m->header.size) - sizeof *m; |
1114 | ee = GNUNET_malloc (element_size + sizeof *ee); | 1282 | ee = GNUNET_malloc (element_size + sizeof *ee); |
@@ -1131,6 +1299,38 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set) | |||
1131 | 1299 | ||
1132 | 1300 | ||
1133 | /** | 1301 | /** |
1302 | * Destroy a set that supports the union operation | ||
1303 | * | ||
1304 | * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION | ||
1305 | */ | ||
1306 | void | ||
1307 | _GSS_union_set_destroy (struct Set *set) | ||
1308 | { | ||
1309 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); | ||
1310 | if (NULL != set->client) | ||
1311 | { | ||
1312 | GNUNET_SERVER_client_drop (set->client); | ||
1313 | set->client = NULL; | ||
1314 | } | ||
1315 | if (NULL != set->client_mq) | ||
1316 | { | ||
1317 | GNUNET_MQ_destroy (set->client_mq); | ||
1318 | set->client_mq = NULL; | ||
1319 | } | ||
1320 | |||
1321 | if (NULL != set->state.u->se) | ||
1322 | { | ||
1323 | strata_estimator_destroy (set->state.u->se); | ||
1324 | set->state.u->se = NULL; | ||
1325 | } | ||
1326 | |||
1327 | destroy_elements (set->state.u); | ||
1328 | |||
1329 | while (NULL != set->state.u->ops_head) | ||
1330 | destroy_union_operation (set->state.u->ops_head); | ||
1331 | } | ||
1332 | |||
1333 | /** | ||
1134 | * Remove the element given in the element message from the set. | 1334 | * Remove the element given in the element message from the set. |
1135 | * Only marks the element as removed, so that older set operations can still exchange it. | 1335 | * Only marks the element as removed, so that older set operations can still exchange it. |
1136 | * | 1336 | * |