aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c262
1 files changed, 231 insertions, 31 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index efedbcef6..694fb6056 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -135,11 +135,6 @@ struct UnionEvaluateOperation
135 struct GNUNET_MQ_MessageQueue *mq; 135 struct GNUNET_MQ_MessageQueue *mq;
136 136
137 /** 137 /**
138 * Type of this operation
139 */
140 enum GNUNET_SET_OperationType operation;
141
142 /**
143 * Request ID to multiplex set operations to 138 * Request ID to multiplex set operations to
144 * the client inhabiting the set. 139 * the client inhabiting the set.
145 */ 140 */
@@ -330,6 +325,45 @@ struct UnionState
330}; 325};
331 326
332 327
328
329/**
330 * Iterator over hash map entries.
331 *
332 * @param cls closure
333 * @param key current key code
334 * @param value value in the hash map
335 * @return GNUNET_YES if we should continue to
336 * iterate,
337 * GNUNET_NO if not.
338 */
339static int
340destroy_elements_iterator (void *cls,
341 const struct GNUNET_HashCode * key,
342 void *value)
343{
344 struct ElementEntry *ee = value;
345
346 GNUNET_free (ee);
347 return GNUNET_YES;
348}
349
350
351/**
352 * Destroy the elements belonging to a union set.
353 *
354 * @param us union state that contains the elements
355 */
356static void
357destroy_elements (struct UnionState *us)
358{
359 if (NULL == us->elements)
360 return;
361 GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
362 GNUNET_CONTAINER_multihashmap_destroy (us->elements);
363 us->elements = NULL;
364}
365
366
333/** 367/**
334 * Destroy a union operation, and free all resources 368 * Destroy a union operation, and free all resources
335 * associated with it. 369 * associated with it.
@@ -339,6 +373,38 @@ struct UnionState
339static void 373static void
340destroy_union_operation (struct UnionEvaluateOperation *eo) 374destroy_union_operation (struct UnionEvaluateOperation *eo)
341{ 375{
376 if (NULL != eo->mq)
377 {
378 GNUNET_MQ_destroy (eo->mq);
379 eo->mq = NULL;
380 }
381 if (NULL != eo->socket)
382 {
383 GNUNET_STREAM_close (eo->socket);
384 eo->socket = NULL;
385 }
386 if (NULL != eo->remote_ibf)
387 {
388 ibf_destroy (eo->remote_ibf);
389 eo->remote_ibf = NULL;
390 }
391 if (NULL != eo->local_ibf)
392 {
393 ibf_destroy (eo->local_ibf);
394 eo->local_ibf = NULL;
395 }
396 if (NULL != eo->se)
397 {
398 strata_estimator_destroy (eo->se);
399 eo->se = NULL;
400 }
401 if (NULL != eo->key_to_element)
402 {
403 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
404 eo->key_to_element = NULL;
405 }
406
407
342 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, 408 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
343 eo->set->state.u->ops_tail, 409 eo->set->state.u->ops_tail,
344 eo); 410 eo);
@@ -361,7 +427,7 @@ fail_union_operation (struct UnionEvaluateOperation *eo)
361 427
362 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 428 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
363 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 429 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
364 msg->request_id = eo->request_id; 430 msg->request_id = htonl (eo->request_id);
365 GNUNET_MQ_send (eo->set->client_mq, mqm); 431 GNUNET_MQ_send (eo->set->client_mq, mqm);
366 destroy_union_operation (eo); 432 destroy_union_operation (eo);
367} 433}
@@ -405,12 +471,12 @@ send_operation_request (struct UnionEvaluateOperation *eo)
405 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) 471 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
406 { 472 {
407 /* the context message is too large */ 473 /* the context message is too large */
408 _GSS_client_disconnect (eo->set->client);
409 GNUNET_MQ_discard (mqm);
410 GNUNET_break (0); 474 GNUNET_break (0);
475 GNUNET_SERVER_client_disconnect (eo->set->client);
476 GNUNET_MQ_discard (mqm);
411 return; 477 return;
412 } 478 }
413 msg->operation = eo->operation; 479 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
414 msg->app_id = eo->app_id; 480 msg->app_id = eo->app_id;
415 GNUNET_MQ_send (eo->mq, mqm); 481 GNUNET_MQ_send (eo->mq, mqm);
416 482
@@ -547,7 +613,7 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
547 { 613 {
548 unsigned int len; 614 unsigned int len;
549 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); 615 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
550 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len); 616 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
551 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, 617 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
552 init_key_to_element_iterator, eo); 618 init_key_to_element_iterator, eo);
553 } 619 }
@@ -573,6 +639,8 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
573 639
574 prepare_ibf (eo, 1<<ibf_order); 640 prepare_ibf (eo, 1<<ibf_order);
575 641
642 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
643
576 ibf = eo->local_ibf; 644 ibf = eo->local_ibf;
577 645
578 while (buckets_sent < (1 << ibf_order)) 646 while (buckets_sent < (1 << ibf_order))
@@ -588,7 +656,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
588 656
589 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, 657 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
590 GNUNET_MESSAGE_TYPE_SET_P2P_IBF); 658 GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
591 msg->order = htons (ibf_order); 659 msg->order = ibf_order;
592 msg->offset = htons (buckets_sent); 660 msg->offset = htons (buckets_sent);
593 ibf_write_slice (ibf, buckets_sent, 661 ibf_write_slice (ibf, buckets_sent,
594 buckets_in_message, &msg[1]); 662 buckets_in_message, &msg[1]);
@@ -654,7 +722,6 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
654 struct StrataEstimator *remote_se; 722 struct StrataEstimator *remote_se;
655 int diff; 723 int diff;
656 724
657 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
658 725
659 if (eo->phase != PHASE_EXPECT_SE) 726 if (eo->phase != PHASE_EXPECT_SE)
660 { 727 {
@@ -667,6 +734,7 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
667 strata_estimator_read (&mh[1], remote_se); 734 strata_estimator_read (&mh[1], remote_se);
668 GNUNET_assert (NULL != eo->se); 735 GNUNET_assert (NULL != eo->se);
669 diff = strata_estimator_difference (remote_se, eo->se); 736 diff = strata_estimator_difference (remote_se, eo->se);
737 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
670 strata_estimator_destroy (remote_se); 738 strata_estimator_destroy (remote_se);
671 strata_estimator_destroy (eo->se); 739 strata_estimator_destroy (eo->se);
672 eo->se = NULL; 740 eo->se = NULL;
@@ -708,6 +776,7 @@ send_element_iterator (void *cls,
708 continue; 776 continue;
709 } 777 }
710 GNUNET_MQ_send (eo->mq, mqm); 778 GNUNET_MQ_send (eo->mq, mqm);
779 ke = ke->next_colliding;
711 } 780 }
712 return GNUNET_NO; 781 return GNUNET_NO;
713} 782}
@@ -731,7 +800,6 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key
731} 800}
732 801
733 802
734
735/** 803/**
736 * Decode which elements are missing on each side, and 804 * Decode which elements are missing on each side, and
737 * send the appropriate elemens and requests 805 * send the appropriate elemens and requests
@@ -758,11 +826,22 @@ decode_and_send (struct UnionEvaluateOperation *eo)
758 res = ibf_decode (diff_ibf, &side, &key); 826 res = ibf_decode (diff_ibf, &side, &key);
759 if (GNUNET_SYSERR == res) 827 if (GNUNET_SYSERR == res)
760 { 828 {
761 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n", 829 int next_order;
762 diff_ibf->size * 2); 830 next_order = 0;
763 send_ibf (eo, diff_ibf->size * 2); 831 while (1<<next_order < diff_ibf->size)
764 ibf_destroy (diff_ibf); 832 next_order++;
765 return; 833 next_order++;
834 if (next_order <= MAX_IBF_ORDER)
835 {
836 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
837 1<<next_order);
838 send_ibf (eo, next_order);
839 }
840 else
841 {
842 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "set union failed: reached ibf limit\n");
843 }
844 break;
766 } 845 }
767 if (GNUNET_NO == res) 846 if (GNUNET_NO == res)
768 { 847 {
@@ -771,7 +850,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
771 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); 850 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
772 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 851 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
773 GNUNET_MQ_send (eo->mq, mqm); 852 GNUNET_MQ_send (eo->mq, mqm);
774 return; 853 break;
775 } 854 }
776 if (1 == side) 855 if (1 == side)
777 { 856 {
@@ -790,6 +869,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
790 GNUNET_MQ_send (eo->mq, mqm); 869 GNUNET_MQ_send (eo->mq, mqm);
791 } 870 }
792 } 871 }
872 ibf_destroy (diff_ibf);
793} 873}
794 874
795 875
@@ -811,6 +891,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
811 { 891 {
812 eo->phase = PHASE_EXPECT_IBF_CONT; 892 eo->phase = PHASE_EXPECT_IBF_CONT;
813 GNUNET_assert (NULL == eo->remote_ibf); 893 GNUNET_assert (NULL == eo->remote_ibf);
894 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
814 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 895 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
815 if (0 != ntohs (msg->offset)) 896 if (0 != ntohs (msg->offset))
816 { 897 {
@@ -825,6 +906,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
825 { 906 {
826 GNUNET_break (0); 907 GNUNET_break (0);
827 fail_union_operation (eo); 908 fail_union_operation (eo);
909 return;
828 } 910 }
829 } 911 }
830 912
@@ -834,13 +916,16 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
834 { 916 {
835 GNUNET_break (0); 917 GNUNET_break (0);
836 fail_union_operation (eo); 918 fail_union_operation (eo);
919 return;
837 } 920 }
838 921
839 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); 922 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
840 eo->ibf_buckets_received += buckets_in_message; 923 eo->ibf_buckets_received += buckets_in_message;
841 924
842 if (eo->ibf_buckets_received == eo->remote_ibf->size) 925 if (eo->ibf_buckets_received == eo->remote_ibf->size)
843 { 926 {
927
928 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
844 eo->phase = PHASE_EXPECT_ELEMENTS; 929 eo->phase = PHASE_EXPECT_ELEMENTS;
845 decode_and_send (eo); 930 decode_and_send (eo);
846 } 931 }
@@ -848,7 +933,8 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
848 933
849 934
850/** 935/**
851 * Send an element to the client of the operations's set. 936 * Send a result message to the client indicating
937 * that there is a new element.
852 * 938 *
853 * @param eo union operation 939 * @param eo union operation
854 * @param element element to send 940 * @param element element to send
@@ -862,6 +948,8 @@ send_client_element (struct UnionEvaluateOperation *eo,
862 948
863 GNUNET_assert (0 != eo->request_id); 949 GNUNET_assert (0 != eo->request_id);
864 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 950 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
951 rm->result_status = htons (GNUNET_SET_STATUS_OK);
952 rm->request_id = htonl (eo->request_id);
865 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) 953 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
866 { 954 {
867 GNUNET_MQ_discard (mqm); 955 GNUNET_MQ_discard (mqm);
@@ -869,7 +957,46 @@ send_client_element (struct UnionEvaluateOperation *eo,
869 return; 957 return;
870 } 958 }
871 959
872 GNUNET_MQ_send (eo->mq, mqm); 960 GNUNET_MQ_send (eo->set->client_mq, mqm);
961}
962
963
964/**
965 * Callback used for notifications
966 *
967 * @param cls closure
968 */
969static void
970client_done_sent_cb (void *cls)
971{
972 //struct UnionEvaluateOperation *eo = cls;
973 /* FIXME: destroy eo */
974}
975
976
977/**
978 * Send a result message to the client indicating
979 * that the operation is over.
980 * After the result done message has been sent to the client,
981 * destroy the evaluate operation.
982 *
983 * @param eo union operation
984 * @param element element to send
985 */
986static void
987send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
988{
989 struct GNUNET_MQ_Message *mqm;
990 struct ResultMessage *rm;
991
992 GNUNET_assert (0 != eo->request_id);
993 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
994 rm->request_id = htonl (eo->request_id);
995 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
996 GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
997 GNUNET_MQ_send (eo->set->client_mq, mqm);
998
999 /* FIXME: destroy the eo */
873} 1000}
874 1001
875 1002
@@ -886,6 +1013,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
886 struct ElementEntry *ee; 1013 struct ElementEntry *ee;
887 uint16_t element_size; 1014 uint16_t element_size;
888 1015
1016 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1017
889 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && 1018 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
890 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) 1019 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
891 { 1020 {
@@ -920,8 +1049,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
920 /* look up elements and send them */ 1049 /* look up elements and send them */
921 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1050 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
922 { 1051 {
923 fail_union_operation (eo);
924 GNUNET_break (0); 1052 GNUNET_break (0);
1053 fail_union_operation (eo);
925 return; 1054 return;
926 } 1055 }
927 1056
@@ -929,8 +1058,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
929 1058
930 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) 1059 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
931 { 1060 {
932 fail_union_operation (eo);
933 GNUNET_break (0); 1061 GNUNET_break (0);
1062 fail_union_operation (eo);
934 return; 1063 return;
935 } 1064 }
936 1065
@@ -944,6 +1073,20 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
944 1073
945 1074
946/** 1075/**
1076 * Callback used for notifications
1077 *
1078 * @param cls closure
1079 */
1080static void
1081peer_done_sent_cb (void *cls)
1082{
1083 struct UnionEvaluateOperation *eo = cls;
1084
1085 send_client_done_and_destroy (eo);
1086}
1087
1088
1089/**
947 * Handle a done message from a remote peer 1090 * Handle a done message from a remote peer
948 * 1091 *
949 * @param cls the union operation 1092 * @param cls the union operation
@@ -959,15 +1102,18 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
959 /* we got all requests, but still have to send our elements as response */ 1102 /* we got all requests, but still have to send our elements as response */
960 struct GNUNET_MQ_Message *mqm; 1103 struct GNUNET_MQ_Message *mqm;
961 1104
1105 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
962 eo->phase = PHASE_FINISHED; 1106 eo->phase = PHASE_FINISHED;
963 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1107 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1108 GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
964 GNUNET_MQ_send (eo->mq, mqm); 1109 GNUNET_MQ_send (eo->mq, mqm);
965 return; 1110 return;
966 } 1111 }
967 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1112 if (eo->phase == PHASE_EXPECT_ELEMENTS)
968 { 1113 {
969 /* it's all over! */ 1114 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
970 eo->phase = PHASE_FINISHED; 1115 eo->phase = PHASE_FINISHED;
1116 send_client_done_and_destroy (eo);
971 return; 1117 return;
972 } 1118 }
973 GNUNET_break (0); 1119 GNUNET_break (0);
@@ -1026,19 +1172,27 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1026{ 1172{
1027 struct UnionEvaluateOperation *eo; 1173 struct UnionEvaluateOperation *eo;
1028 1174
1029 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
1030
1031 eo = GNUNET_new (struct UnionEvaluateOperation); 1175 eo = GNUNET_new (struct UnionEvaluateOperation);
1032 eo->peer = m->peer; 1176 eo->peer = m->peer;
1033 eo->set = set; 1177 eo->set = set;
1034 eo->request_id = htons(m->request_id); 1178 eo->request_id = htonl (m->request_id);
1179 GNUNET_assert (0 != eo->request_id);
1035 eo->se = strata_estimator_dup (set->state.u->se); 1180 eo->se = strata_estimator_dup (set->state.u->se);
1036 eo->salt = ntohs (m->salt); 1181 eo->salt = ntohs (m->salt);
1037 eo->app_id = m->app_id; 1182 eo->app_id = m->app_id;
1183
1184 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n",
1185 GNUNET_h2s (&eo->app_id));
1186
1038 eo->socket = 1187 eo->socket =
1039 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, 1188 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1040 stream_open_cb, eo, 1189 stream_open_cb, eo,
1041 GNUNET_STREAM_OPTION_END); 1190 GNUNET_STREAM_OPTION_END);
1191
1192
1193 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1194 eo->set->state.u->ops_tail,
1195 eo);
1042 /* the stream open callback will kick off the operation */ 1196 /* the stream open callback will kick off the operation */
1043} 1197}
1044 1198
@@ -1056,18 +1210,30 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1056{ 1210{
1057 struct UnionEvaluateOperation *eo; 1211 struct UnionEvaluateOperation *eo;
1058 1212
1213 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1214
1059 eo = GNUNET_new (struct UnionEvaluateOperation); 1215 eo = GNUNET_new (struct UnionEvaluateOperation);
1060 eo->generation_created = set->state.u->current_generation++; 1216 eo->generation_created = set->state.u->current_generation++;
1061 eo->set = set; 1217 eo->set = set;
1062 eo->peer = incoming->peer; 1218 eo->peer = incoming->peer;
1063 eo->salt = ntohs (incoming->salt); 1219 eo->salt = ntohs (incoming->salt);
1064 eo->request_id = m->request_id; 1220 GNUNET_assert (0 != ntohl (m->request_id));
1221 eo->request_id = ntohl (m->request_id);
1065 eo->se = strata_estimator_dup (set->state.u->se); 1222 eo->se = strata_estimator_dup (set->state.u->se);
1066 eo->set = set; 1223 eo->set = set;
1067 eo->mq = incoming->mq; 1224 eo->mq = incoming->mq;
1225 /* transfer ownership of mq and socket from incoming to eo */
1226 incoming->mq = NULL;
1227 eo->socket = incoming->socket;
1228 incoming->socket = NULL;
1068 /* the peer's socket is now ours, we'll receive all messages */ 1229 /* the peer's socket is now ours, we'll receive all messages */
1069 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); 1230 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1070 /* kick of the operation */ 1231
1232 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1233 eo->set->state.u->ops_tail,
1234 eo);
1235
1236 /* kick off the operation */
1071 send_strata_estimator (eo); 1237 send_strata_estimator (eo);
1072} 1238}
1073 1239
@@ -1082,7 +1248,7 @@ _GSS_union_set_create (void)
1082{ 1248{
1083 struct Set *set; 1249 struct Set *set;
1084 1250
1085 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n"); 1251 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
1086 1252
1087 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); 1253 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1088 set->state.u = (struct UnionState *) &set[1]; 1254 set->state.u = (struct UnionState *) &set[1];
@@ -1109,6 +1275,8 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set)
1109 struct ElementEntry *ee_dup; 1275 struct ElementEntry *ee_dup;
1110 uint16_t element_size; 1276 uint16_t element_size;
1111 1277
1278 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1279
1112 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); 1280 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1113 element_size = ntohs (m->header.size) - sizeof *m; 1281 element_size = ntohs (m->header.size) - sizeof *m;
1114 ee = GNUNET_malloc (element_size + sizeof *ee); 1282 ee = GNUNET_malloc (element_size + sizeof *ee);
@@ -1131,6 +1299,38 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set)
1131 1299
1132 1300
1133/** 1301/**
1302 * Destroy a set that supports the union operation
1303 *
1304 * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1305 */
1306void
1307_GSS_union_set_destroy (struct Set *set)
1308{
1309 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1310 if (NULL != set->client)
1311 {
1312 GNUNET_SERVER_client_drop (set->client);
1313 set->client = NULL;
1314 }
1315 if (NULL != set->client_mq)
1316 {
1317 GNUNET_MQ_destroy (set->client_mq);
1318 set->client_mq = NULL;
1319 }
1320
1321 if (NULL != set->state.u->se)
1322 {
1323 strata_estimator_destroy (set->state.u->se);
1324 set->state.u->se = NULL;
1325 }
1326
1327 destroy_elements (set->state.u);
1328
1329 while (NULL != set->state.u->ops_head)
1330 destroy_union_operation (set->state.u->ops_head);
1331}
1332
1333/**
1134 * Remove the element given in the element message from the set. 1334 * Remove the element given in the element message from the set.
1135 * Only marks the element as removed, so that older set operations can still exchange it. 1335 * Only marks the element as removed, so that older set operations can still exchange it.
1136 * 1336 *