aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2015-09-27 04:32:52 +0000
committerFlorian Dold <florian.dold@gmail.com>2015-09-27 04:32:52 +0000
commit9aceae8a9f91642665fa28730c961c9f90360bc1 (patch)
tree5390aca855b836d17720a65671f994089cb93196 /src/set
parent55ad4fa34348aaa05fadedeb830ea72cea5d7cc4 (diff)
downloadgnunet-9aceae8a9f91642665fa28730c961c9f90360bc1.tar.gz
gnunet-9aceae8a9f91642665fa28730c961c9f90360bc1.zip
SET service: accurate results for symmetric mode
Diffstat (limited to 'src/set')
-rw-r--r--src/set/Makefile.am8
-rw-r--r--src/set/gnunet-service-set.c6
-rw-r--r--src/set/gnunet-service-set_protocol.h8
-rw-r--r--src/set/gnunet-service-set_union.c776
-rw-r--r--src/set/gnunet-set-profiler.c51
-rw-r--r--src/set/set_api.c63
-rw-r--r--src/set/test_set.conf44
-rw-r--r--src/set/test_set_union_result_symmetric.c (renamed from src/set/test_set_union_result_full.c)20
8 files changed, 568 insertions, 408 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index 669a28658..b7617db25 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -66,7 +66,7 @@ libgnunetset_la_LDFLAGS = \
66if HAVE_TESTING 66if HAVE_TESTING
67check_PROGRAMS = \ 67check_PROGRAMS = \
68 test_set_api \ 68 test_set_api \
69 test_set_union_result_full \ 69 test_set_union_result_symmetric \
70 test_set_intersection_result_full \ 70 test_set_intersection_result_full \
71 test_set_union_copy 71 test_set_union_copy
72endif 72endif
@@ -83,9 +83,9 @@ test_set_api_LDADD = \
83 $(top_builddir)/src/testing/libgnunettesting.la \ 83 $(top_builddir)/src/testing/libgnunettesting.la \
84 libgnunetset.la 84 libgnunetset.la
85 85
86test_set_union_result_full_SOURCES = \ 86test_set_union_result_symmetric_SOURCES = \
87 test_set_union_result_full.c 87 test_set_union_result_symmetric.c
88test_set_union_result_full_LDADD = \ 88test_set_union_result_symmetric_LDADD = \
89 $(top_builddir)/src/util/libgnunetutil.la \ 89 $(top_builddir)/src/util/libgnunetutil.la \
90 $(top_builddir)/src/testing/libgnunettesting.la \ 90 $(top_builddir)/src/testing/libgnunettesting.la \
91 libgnunetset.la 91 libgnunetset.la
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index d8e8dfb78..754bc96e0 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -560,6 +560,7 @@ set_destroy (struct Set *set)
560 NULL); 560 NULL);
561 GNUNET_CONTAINER_multihashmap_destroy (content->elements); 561 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
562 content->elements = NULL; 562 content->elements = NULL;
563 GNUNET_free (content);
563 } 564 }
564 } 565 }
565 GNUNET_free_non_null (set->excluded_generations); 566 GNUNET_free_non_null (set->excluded_generations);
@@ -1951,8 +1952,11 @@ run (void *cls,
1951 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, 1952 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1952 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, 1953 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1953 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, 1954 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1954 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, 1955 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0},
1956 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0},
1957 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0},
1955 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, 1958 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1959 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
1956 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, 1960 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1957 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, 1961 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
1958 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, 1962 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
diff --git a/src/set/gnunet-service-set_protocol.h b/src/set/gnunet-service-set_protocol.h
index c8a3121a6..eee0dc5ae 100644
--- a/src/set/gnunet-service-set_protocol.h
+++ b/src/set/gnunet-service-set_protocol.h
@@ -58,6 +58,12 @@ struct OperationRequestMessage
58}; 58};
59 59
60 60
61/**
62 * Message containing buckets of an invertible bloom filter.
63 *
64 * If an IBF has too many buckets for an IBF message,
65 * it is split into multiple messages.
66 */
61struct IBFMessage 67struct IBFMessage
62{ 68{
63 /** 69 /**
@@ -86,7 +92,7 @@ struct IBFMessage
86 */ 92 */
87 uint32_t salt GNUNET_PACKED; 93 uint32_t salt GNUNET_PACKED;
88 94
89 /* rest: strata */ 95 /* rest: buckets */
90}; 96};
91 97
92 98
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 5b452cae1..47abeaac8 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013 Christian Grothoff (and other contributing authors) 3 Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -31,6 +31,11 @@
31#include <gcrypt.h> 31#include <gcrypt.h>
32 32
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
35
36#define LOG_OP(kind,msg,op,...) GNUNET_log_from (kind, "set-union","[OP %x] " msg,((void *)op),__VA_ARGS__)
37
38
34/** 39/**
35 * Number of IBFs in a strata estimator. 40 * Number of IBFs in a strata estimator.
36 */ 41 */
@@ -40,7 +45,7 @@
40 */ 45 */
41#define SE_IBF_SIZE 80 46#define SE_IBF_SIZE 80
42/** 47/**
43 * hash num parameter for the difference digests and strata estimators 48 * The hash num parameter for the difference digests and strata estimators.
44 */ 49 */
45#define SE_IBF_HASH_NUM 4 50#define SE_IBF_HASH_NUM 4
46 51
@@ -69,7 +74,7 @@
69enum UnionOperationPhase 74enum UnionOperationPhase
70{ 75{
71 /** 76 /**
72 * We sent the request message, and expect a strata estimator 77 * We sent the request message, and expect a strata estimator.
73 */ 78 */
74 PHASE_EXPECT_SE, 79 PHASE_EXPECT_SE,
75 80
@@ -77,6 +82,8 @@ enum UnionOperationPhase
77 * We sent the strata estimator, and expect an IBF. This phase is entered once 82 * We sent the strata estimator, and expect an IBF. This phase is entered once
78 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. 83 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
79 * 84 *
85 * XXX: could use better wording.
86 *
80 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS 87 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
81 */ 88 */
82 PHASE_EXPECT_IBF, 89 PHASE_EXPECT_IBF,
@@ -87,33 +94,33 @@ enum UnionOperationPhase
87 PHASE_EXPECT_IBF_CONT, 94 PHASE_EXPECT_IBF_CONT,
88 95
89 /** 96 /**
90 * We are sending request and elements, 97 * We are decoding an IBF.
91 * and thus only expect elements from the other peer.
92 *
93 * We are currently decoding an IBF until it can no longer be decoded,
94 * we currently send requests and expect elements
95 * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS
96 */ 98 */
97 PHASE_EXPECT_ELEMENTS, 99 PHASE_INVENTORY_ACTIVE,
98 100
99 /** 101 /**
100 * We are expecting elements and requests, and send 102 * The other peer is decoding the IBF we just sent.
101 * requested elements back to the other peer.
102 *
103 * We are in this phase if we have SENT an IBF for the remote peer to decode.
104 * We expect requests, send elements or could receive an new IBF, which takes
105 * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS
106 *
107 * The remote peer is thus in:
108 * #PHASE_EXPECT_ELEMENTS
109 */ 103 */
110 PHASE_EXPECT_ELEMENTS_AND_REQUESTS, 104 PHASE_INVENTORY_PASSIVE,
111 105
112 /** 106 /**
113 * The protocol is over. 107 * The protocol is almost finished, but we still have to flush our message
114 * Results may still have to be sent to the client. 108 * queue and/or expect some elements.
115 */ 109 */
116 PHASE_FINISHED 110 PHASE_FINISH_CLOSING,
111
112 /**
113 * In the penultimate phase,
114 * we wait until all our demands
115 * are satisfied. Then we send a done
116 * message, and wait for another done message.*/
117 PHASE_FINISH_WAITING,
118
119 /**
120 * In the ultimate phase, we wait until
121 * our demands are satisfied and then
122 * quit (sending another DONE message). */
123 PHASE_DONE,
117}; 124};
118 125
119 126
@@ -122,20 +129,19 @@ enum UnionOperationPhase
122 */ 129 */
123struct OperationState 130struct OperationState
124{ 131{
125
126 /** 132 /**
127 * Copy of the set's strata estimator at the time of 133 * Copy of the set's strata estimator at the time of
128 * creation of this operation 134 * creation of this operation.
129 */ 135 */
130 struct StrataEstimator *se; 136 struct StrataEstimator *se;
131 137
132 /** 138 /**
133 * The ibf we currently receive 139 * The IBF we currently receive.
134 */ 140 */
135 struct InvertibleBloomFilter *remote_ibf; 141 struct InvertibleBloomFilter *remote_ibf;
136 142
137 /** 143 /**
138 * IBF of the set's element. 144 * The IBF with the local set's element.
139 */ 145 */
140 struct InvertibleBloomFilter *local_ibf; 146 struct InvertibleBloomFilter *local_ibf;
141 147
@@ -147,11 +153,6 @@ struct OperationState
147 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; 153 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
148 154
149 /** 155 /**
150 * Iterator for sending elements on the key to element mapping to the client.
151 */
152 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
153
154 /**
155 * Current state of the operation. 156 * Current state of the operation.
156 */ 157 */
157 enum UnionOperationPhase phase; 158 enum UnionOperationPhase phase;
@@ -162,10 +163,14 @@ struct OperationState
162 int client_done_sent; 163 int client_done_sent;
163 164
164 /** 165 /**
165 * Number of ibf buckets received 166 * Number of ibf buckets already received into the @a remote_ibf.
166 */ 167 */
167 unsigned int ibf_buckets_received; 168 unsigned int ibf_buckets_received;
168 169
170 /**
171 * Hashes for elements that we have demanded from the other peer.
172 */
173 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
169}; 174};
170 175
171 176
@@ -181,14 +186,11 @@ struct KeyEntry
181 186
182 /** 187 /**
183 * The actual element associated with the key. 188 * The actual element associated with the key.
189 *
190 * Only owned by the union operation if element->operation
191 * is #GNUNET_YES.
184 */ 192 */
185 struct ElementEntry *element; 193 struct ElementEntry *element;
186
187 /**
188 * Element that collides with this element
189 * on the ibf key. All colliding entries must have the same ibf key.
190 */
191 struct KeyEntry *next_colliding;
192}; 194};
193 195
194 196
@@ -215,7 +217,7 @@ struct SendElementClosure
215/** 217/**
216 * Extra state required for efficient set union. 218 * Extra state required for efficient set union.
217 */ 219 */
218struct SetState 220 struct SetState
219{ 221{
220 /** 222 /**
221 * The strata estimator is only generated once for 223 * The strata estimator is only generated once for
@@ -244,18 +246,13 @@ destroy_key_to_element_iter (void *cls,
244{ 246{
245 struct KeyEntry *k = value; 247 struct KeyEntry *k = value;
246 248
247 while (NULL != k) 249 GNUNET_assert (NULL != k);
250 if (GNUNET_YES == k->element->remote)
248 { 251 {
249 struct KeyEntry *k_tmp = k; 252 GNUNET_free (k->element);
250 253 k->element = NULL;
251 k = k->next_colliding;
252 if (GNUNET_YES == k_tmp->element->remote)
253 {
254 GNUNET_free (k_tmp->element);
255 k_tmp->element = NULL;
256 }
257 GNUNET_free (k_tmp);
258 } 254 }
255 GNUNET_free (k);
259 return GNUNET_YES; 256 return GNUNET_YES;
260} 257}
261 258
@@ -269,8 +266,8 @@ destroy_key_to_element_iter (void *cls,
269static void 266static void
270union_op_cancel (struct Operation *op) 267union_op_cancel (struct Operation *op)
271{ 268{
272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 269 LOG (GNUNET_ERROR_TYPE_DEBUG,
273 "destroying union op\n"); 270 "destroying union op\n");
274 /* check if the op was canceled twice */ 271 /* check if the op was canceled twice */
275 GNUNET_assert (NULL != op->state); 272 GNUNET_assert (NULL != op->state);
276 if (NULL != op->state->remote_ibf) 273 if (NULL != op->state->remote_ibf)
@@ -278,6 +275,11 @@ union_op_cancel (struct Operation *op)
278 ibf_destroy (op->state->remote_ibf); 275 ibf_destroy (op->state->remote_ibf);
279 op->state->remote_ibf = NULL; 276 op->state->remote_ibf = NULL;
280 } 277 }
278 if (NULL != op->state->demanded_hashes)
279 {
280 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
281 op->state->demanded_hashes = NULL;
282 }
281 if (NULL != op->state->local_ibf) 283 if (NULL != op->state->local_ibf)
282 { 284 {
283 ibf_destroy (op->state->local_ibf); 285 ibf_destroy (op->state->local_ibf);
@@ -298,8 +300,8 @@ union_op_cancel (struct Operation *op)
298 } 300 }
299 GNUNET_free (op->state); 301 GNUNET_free (op->state);
300 op->state = NULL; 302 op->state = NULL;
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 303 LOG (GNUNET_ERROR_TYPE_DEBUG,
302 "destroying union op done\n"); 304 "destroying union op done\n");
303} 305}
304 306
305 307
@@ -315,8 +317,8 @@ fail_union_operation (struct Operation *op)
315 struct GNUNET_MQ_Envelope *ev; 317 struct GNUNET_MQ_Envelope *ev;
316 struct GNUNET_SET_ResultMessage *msg; 318 struct GNUNET_SET_ResultMessage *msg;
317 319
318 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 320 LOG (GNUNET_ERROR_TYPE_ERROR,
319 "union operation failed\n"); 321 "union operation failed\n");
320 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 322 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
321 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 323 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
322 msg->request_id = htonl (op->spec->client_request_id); 324 msg->request_id = htonl (op->spec->client_request_id);
@@ -340,55 +342,23 @@ get_ibf_key (const struct GNUNET_HashCode *src,
340{ 342{
341 struct IBF_Key key; 343 struct IBF_Key key;
342 344
343 GNUNET_CRYPTO_hkdf (&key, sizeof (key), 345 GNUNET_CRYPTO_kdf (&key, sizeof (key),
344 GCRY_MD_SHA512, GCRY_MD_SHA256, 346 src, sizeof *src,
345 src, sizeof *src, 347 &salt, sizeof (salt),
346 &salt, sizeof (salt), 348 NULL, 0);
347 NULL, 0);
348 return key; 349 return key;
349} 350}
350 351
351 352
352/** 353/**
353 * Iterator to create the mapping between ibf keys 354 * Iterator over the mapping from IBF keys to element entries. Checks if we
354 * and element entries. 355 * have an element with a given GNUNET_HashCode.
355 * 356 *
356 * @param cls closure 357 * @param cls closure
357 * @param key current key code 358 * @param key current key code
358 * @param value value in the hash map 359 * @param value value in the hash map
359 * @return #GNUNET_YES if we should continue to iterate, 360 * @return #GNUNET_YES if we should search further,
360 * #GNUNET_NO if not. 361 * #GNUNET_NO if we've found the element.
361 */
362static int
363op_register_element_iterator (void *cls,
364 uint32_t key,
365 void *value)
366{
367 struct KeyEntry *const new_k = cls;
368 struct KeyEntry *old_k = value;
369
370 GNUNET_assert (NULL != old_k);
371 /* check if our ibf key collides with the ibf key in the existing entry */
372 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
373 {
374 /* insert the the new key in the collision chain */
375 new_k->next_colliding = old_k->next_colliding;
376 old_k->next_colliding = new_k;
377 /* signal to the caller that we were able to insert into a colliding bucket */
378 return GNUNET_NO;
379 }
380 return GNUNET_YES;
381}
382
383
384/**
385 * Iterator to create the mapping between ibf keys
386 * and element entries.
387 *
388 * @param cls closure
389 * @param key current key code
390 * @param value value in the hash map
391 * @return #GNUNET_YES (we should continue to iterate)
392 */ 362 */
393static int 363static int
394op_has_element_iterator (void *cls, 364op_has_element_iterator (void *cls,
@@ -399,13 +369,9 @@ op_has_element_iterator (void *cls,
399 struct KeyEntry *k = value; 369 struct KeyEntry *k = value;
400 370
401 GNUNET_assert (NULL != k); 371 GNUNET_assert (NULL != k);
402 while (NULL != k) 372 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
403 { 373 element_hash))
404 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, 374 return GNUNET_NO;
405 element_hash))
406 return GNUNET_NO;
407 k = k->next_colliding;
408 }
409 return GNUNET_YES; 375 return GNUNET_YES;
410} 376}
411 377
@@ -446,6 +412,8 @@ op_has_element (struct Operation *op,
446 * This is done to speed up re-tried operations, if some elements 412 * This is done to speed up re-tried operations, if some elements
447 * were transmitted, and then the IBF fails to decode. 413 * were transmitted, and then the IBF fails to decode.
448 * 414 *
415 * XXX: clarify ownership, doesn't sound right.
416 *
449 * @param op the union operation 417 * @param op the union operation
450 * @param ee the element entry 418 * @param ee the element entry
451 */ 419 */
@@ -453,7 +421,6 @@ static void
453op_register_element (struct Operation *op, 421op_register_element (struct Operation *op,
454 struct ElementEntry *ee) 422 struct ElementEntry *ee)
455{ 423{
456 int ret;
457 struct IBF_Key ibf_key; 424 struct IBF_Key ibf_key;
458 struct KeyEntry *k; 425 struct KeyEntry *k;
459 426
@@ -461,18 +428,11 @@ op_register_element (struct Operation *op,
461 k = GNUNET_new (struct KeyEntry); 428 k = GNUNET_new (struct KeyEntry);
462 k->element = ee; 429 k->element = ee;
463 k->ibf_key = ibf_key; 430 k->ibf_key = ibf_key;
464 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, 431 GNUNET_assert (GNUNET_OK ==
432 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
465 (uint32_t) ibf_key.key_val, 433 (uint32_t) ibf_key.key_val,
466 op_register_element_iterator, 434 k,
467 k); 435 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
468
469 /* was the element inserted into a colliding bucket? */
470 if (GNUNET_SYSERR == ret)
471 return;
472 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
473 (uint32_t) ibf_key.key_val,
474 k,
475 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
476} 436}
477 437
478 438
@@ -488,13 +448,15 @@ prepare_ibf_iterator (void *cls,
488 uint32_t key, 448 uint32_t key,
489 void *value) 449 void *value)
490{ 450{
491 struct InvertibleBloomFilter *ibf = cls; 451 struct Operation *op = cls;
492 struct KeyEntry *ke = value; 452 struct KeyEntry *ke = value;
493 453
494 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 454 LOG (GNUNET_ERROR_TYPE_DEBUG,
495 "inserting %x into ibf\n", 455 "[OP %x] inserting %lx (hash %s) into ibf\n",
496 ke->ibf_key.key_val); 456 (void *) op,
497 ibf_insert (ibf, ke->ibf_key); 457 (unsigned long) ke->ibf_key.key_val,
458 GNUNET_h2s (&ke->element->element_hash));
459 ibf_insert (op->state->local_ibf, ke->ibf_key);
498 return GNUNET_YES; 460 return GNUNET_YES;
499} 461}
500 462
@@ -554,13 +516,15 @@ prepare_ibf (struct Operation *op,
554 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 516 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
555 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 517 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
556 &prepare_ibf_iterator, 518 &prepare_ibf_iterator,
557 op->state->local_ibf); 519 op);
558} 520}
559 521
560 522
561/** 523/**
562 * Send an ibf of appropriate size. 524 * Send an ibf of appropriate size.
563 * 525 *
526 * Fragments the IBF into multiple messages if necessary.
527 *
564 * @param op the union operation 528 * @param op the union operation
565 * @param ibf_order order of the ibf to send, size=2^order 529 * @param ibf_order order of the ibf to send, size=2^order
566 */ 530 */
@@ -573,9 +537,9 @@ send_ibf (struct Operation *op,
573 537
574 prepare_ibf (op, 1<<ibf_order); 538 prepare_ibf (op, 1<<ibf_order);
575 539
576 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 540 LOG (GNUNET_ERROR_TYPE_DEBUG,
577 "sending ibf of size %u\n", 541 "sending ibf of size %u\n",
578 1<<ibf_order); 542 1<<ibf_order);
579 543
580 ibf = op->state->local_ibf; 544 ibf = op->state->local_ibf;
581 545
@@ -599,7 +563,7 @@ send_ibf (struct Operation *op,
599 ibf_write_slice (ibf, buckets_sent, 563 ibf_write_slice (ibf, buckets_sent,
600 buckets_in_message, &msg[1]); 564 buckets_in_message, &msg[1]);
601 buckets_sent += buckets_in_message; 565 buckets_sent += buckets_in_message;
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 566 LOG (GNUNET_ERROR_TYPE_DEBUG,
603 "ibf chunk size %u, %u/%u sent\n", 567 "ibf chunk size %u, %u/%u sent\n",
604 buckets_in_message, 568 buckets_in_message,
605 buckets_sent, 569 buckets_sent,
@@ -607,7 +571,9 @@ send_ibf (struct Operation *op,
607 GNUNET_MQ_send (op->mq, ev); 571 GNUNET_MQ_send (op->mq, ev);
608 } 572 }
609 573
610 op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; 574 /* The other peer must decode the IBF, so
575 * we're passive. */
576 op->state->phase = PHASE_INVENTORY_PASSIVE;
611} 577}
612 578
613 579
@@ -629,7 +595,7 @@ send_strata_estimator (struct Operation *op)
629 GNUNET_MQ_send (op->mq, 595 GNUNET_MQ_send (op->mq,
630 ev); 596 ev);
631 op->state->phase = PHASE_EXPECT_IBF; 597 op->state->phase = PHASE_EXPECT_IBF;
632 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 598 LOG (GNUNET_ERROR_TYPE_DEBUG,
633 "sent SE, expecting IBF\n"); 599 "sent SE, expecting IBF\n");
634} 600}
635 601
@@ -696,7 +662,7 @@ handle_p2p_strata_estimator (void *cls,
696 strata_estimator_destroy (remote_se); 662 strata_estimator_destroy (remote_se);
697 strata_estimator_destroy (op->state->se); 663 strata_estimator_destroy (op->state->se);
698 op->state->se = NULL; 664 op->state->se = NULL;
699 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 665 LOG (GNUNET_ERROR_TYPE_DEBUG,
700 "got se diff=%d, using ibf size %d\n", 666 "got se diff=%d, using ibf size %d\n",
701 diff, 667 diff,
702 1<<get_order_from_difference (diff)); 668 1<<get_order_from_difference (diff));
@@ -714,56 +680,44 @@ handle_p2p_strata_estimator (void *cls,
714 * @param value the key entry 680 * @param value the key entry
715 */ 681 */
716static int 682static int
717send_element_iterator (void *cls, 683send_offers_iterator (void *cls,
718 uint32_t key, 684 uint32_t key,
719 void *value) 685 void *value)
720{ 686{
721 struct SendElementClosure *sec = cls; 687 struct SendElementClosure *sec = cls;
722 struct IBF_Key ibf_key = sec->ibf_key;
723 struct Operation *op = sec->op; 688 struct Operation *op = sec->op;
724 struct KeyEntry *ke = value; 689 struct KeyEntry *ke = value;
690 struct GNUNET_MQ_Envelope *ev;
691 struct GNUNET_MessageHeader *mh;
725 692
726 if (ke->ibf_key.key_val != ibf_key.key_val) 693 /* Detect 32-bit key collision for the 64-bit IBF keys. */
694 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
727 return GNUNET_YES; 695 return GNUNET_YES;
728 while (NULL != ke)
729 {
730 const struct GNUNET_SET_Element *const element = &ke->element->element;
731 struct GNUNET_MQ_Envelope *ev;
732 struct GNUNET_MessageHeader *mh;
733 696
734 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); 697 ev = GNUNET_MQ_msg_header_extra (mh,
735 ev = GNUNET_MQ_msg_header_extra (mh, 698 sizeof (struct GNUNET_HashCode),
736 element->size, 699 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
737 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); 700
738 if (NULL == ev) 701 GNUNET_assert (NULL != ev);
739 { 702 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
740 /* element too large */ 703 LOG (GNUNET_ERROR_TYPE_DEBUG,
741 GNUNET_break (0); 704 "[OP %x] sending element offer (%s) to peer\n",
742 continue; 705 (void *) op,
743 } 706 GNUNET_h2s (&ke->element->element_hash));
744 memcpy (&mh[1], 707 GNUNET_MQ_send (op->mq, ev);
745 element->data, 708 return GNUNET_YES;
746 element->size);
747 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748 "sending element (%s) to peer\n",
749 GNUNET_h2s (&ke->element->element_hash));
750 GNUNET_MQ_send (op->mq, ev);
751 ke = ke->next_colliding;
752 }
753 return GNUNET_NO;
754} 709}
755 710
756 711
757/** 712/**
758 * Send all elements that have the specified IBF key 713 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
759 * to the remote peer of the union operation
760 * 714 *
761 * @param op union operation 715 * @param op union operation
762 * @param ibf_key IBF key of interest 716 * @param ibf_key IBF key of interest
763 */ 717 */
764static void 718static void
765send_elements_for_key (struct Operation *op, 719send_offers_for_key (struct Operation *op,
766 struct IBF_Key ibf_key) 720 struct IBF_Key ibf_key)
767{ 721{
768 struct SendElementClosure send_cls; 722 struct SendElementClosure send_cls;
769 723
@@ -771,14 +725,14 @@ send_elements_for_key (struct Operation *op,
771 send_cls.op = op; 725 send_cls.op = op;
772 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, 726 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
773 (uint32_t) ibf_key.key_val, 727 (uint32_t) ibf_key.key_val,
774 &send_element_iterator, 728 &send_offers_iterator,
775 &send_cls); 729 &send_cls);
776} 730}
777 731
778 732
779/** 733/**
780 * Decode which elements are missing on each side, and 734 * Decode which elements are missing on each side, and
781 * send the appropriate elemens and requests 735 * send the appropriate offers and inquiries.
782 * 736 *
783 * @param op union operation 737 * @param op union operation
784 */ 738 */
@@ -791,7 +745,7 @@ decode_and_send (struct Operation *op)
791 unsigned int num_decoded; 745 unsigned int num_decoded;
792 struct InvertibleBloomFilter *diff_ibf; 746 struct InvertibleBloomFilter *diff_ibf;
793 747
794 GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase); 748 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
795 749
796 prepare_ibf (op, op->state->remote_ibf->size); 750 prepare_ibf (op, op->state->remote_ibf->size);
797 diff_ibf = ibf_dup (op->state->local_ibf); 751 diff_ibf = ibf_dup (op->state->local_ibf);
@@ -800,7 +754,7 @@ decode_and_send (struct Operation *op)
800 ibf_destroy (op->state->remote_ibf); 754 ibf_destroy (op->state->remote_ibf);
801 op->state->remote_ibf = NULL; 755 op->state->remote_ibf = NULL;
802 756
803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 757 LOG (GNUNET_ERROR_TYPE_DEBUG,
804 "decoding IBF (size=%u)\n", 758 "decoding IBF (size=%u)\n",
805 diff_ibf->size); 759 diff_ibf->size);
806 760
@@ -817,14 +771,14 @@ decode_and_send (struct Operation *op)
817 res = ibf_decode (diff_ibf, &side, &key); 771 res = ibf_decode (diff_ibf, &side, &key);
818 if (res == GNUNET_OK) 772 if (res == GNUNET_OK)
819 { 773 {
820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 774 LOG (GNUNET_ERROR_TYPE_DEBUG,
821 "decoded ibf key %lx\n", 775 "decoded ibf key %lx\n",
822 key.key_val); 776 (unsigned long) key.key_val);
823 num_decoded += 1; 777 num_decoded += 1;
824 if ( (num_decoded > diff_ibf->size) || 778 if ( (num_decoded > diff_ibf->size) ||
825 (num_decoded > 1 && last_key.key_val == key.key_val) ) 779 (num_decoded > 1 && last_key.key_val == key.key_val) )
826 { 780 {
827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 781 LOG (GNUNET_ERROR_TYPE_DEBUG,
828 "detected cyclic ibf (decoded %u/%u)\n", 782 "detected cyclic ibf (decoded %u/%u)\n",
829 num_decoded, 783 num_decoded,
830 diff_ibf->size); 784 diff_ibf->size);
@@ -841,15 +795,17 @@ decode_and_send (struct Operation *op)
841 next_order++; 795 next_order++;
842 if (next_order <= MAX_IBF_ORDER) 796 if (next_order <= MAX_IBF_ORDER)
843 { 797 {
844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 798 LOG_OP (GNUNET_ERROR_TYPE_DEBUG,
845 "decoding failed, sending larger ibf (size %u)\n", 799 "decoding failed, sending larger ibf (size %u)\n",
846 1<<next_order); 800 op,
801 1<<next_order);
847 send_ibf (op, next_order); 802 send_ibf (op, next_order);
848 } 803 }
849 else 804 else
850 { 805 {
851 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 806 // XXX: Send the whole set, element-by-element
852 "set union failed: reached ibf limit\n"); 807 LOG (GNUNET_ERROR_TYPE_ERROR,
808 "set union failed: reached ibf limit\n");
853 } 809 }
854 break; 810 break;
855 } 811 }
@@ -857,32 +813,36 @@ decode_and_send (struct Operation *op)
857 { 813 {
858 struct GNUNET_MQ_Envelope *ev; 814 struct GNUNET_MQ_Envelope *ev;
859 815
860 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 816 LOG (GNUNET_ERROR_TYPE_DEBUG,
861 "transmitted all values, sending DONE\n"); 817 "transmitted all values, sending DONE\n");
862 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); 818 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
863 GNUNET_MQ_send (op->mq, ev); 819 GNUNET_MQ_send (op->mq, ev);
820 /* We now wait until we get a DONE message back
821 * and then wait for our MQ to be flushed and all our
822 * demands be delivered. */
864 break; 823 break;
865 } 824 }
866 if (1 == side) 825 if (1 == side)
867 { 826 {
868 send_elements_for_key (op, key); 827 send_offers_for_key (op, key);
869 } 828 }
870 else if (-1 == side) 829 else if (-1 == side)
871 { 830 {
872 struct GNUNET_MQ_Envelope *ev; 831 struct GNUNET_MQ_Envelope *ev;
873 struct GNUNET_MessageHeader *msg; 832 struct GNUNET_MessageHeader *msg;
874 833
875 /* It may be nice to merge multiple requests, but with cadet's corking it is not worth 834 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
876 * the effort additional complexity. */ 835 * the effort additional complexity. */
877 ev = GNUNET_MQ_msg_header_extra (msg, 836 ev = GNUNET_MQ_msg_header_extra (msg,
878 sizeof (struct IBF_Key), 837 sizeof (struct IBF_Key),
879 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); 838 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
880 839
881 memcpy (&msg[1], 840 memcpy (&msg[1],
882 &key, 841 &key,
883 sizeof (struct IBF_Key)); 842 sizeof (struct IBF_Key));
884 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 843 LOG (GNUNET_ERROR_TYPE_DEBUG,
885 "sending element request\n"); 844 "sending element inquiry for IBF key %lx\n",
845 (unsigned long) key.key_val);
886 GNUNET_MQ_send (op->mq, ev); 846 GNUNET_MQ_send (op->mq, ev);
887 } 847 }
888 else 848 else
@@ -897,6 +857,9 @@ decode_and_send (struct Operation *op)
897/** 857/**
898 * Handle an IBF message from a remote peer. 858 * Handle an IBF message from a remote peer.
899 * 859 *
860 * Reassemble the IBF from multiple pieces, and
861 * process the whole IBF once possible.
862 *
900 * @param cls the union operation 863 * @param cls the union operation
901 * @param mh the header of the message 864 * @param mh the header of the message
902 * @return #GNUNET_SYSERR if the tunnel should be disconnected, 865 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
@@ -917,12 +880,12 @@ handle_p2p_ibf (void *cls,
917 return GNUNET_SYSERR; 880 return GNUNET_SYSERR;
918 } 881 }
919 msg = (const struct IBFMessage *) mh; 882 msg = (const struct IBFMessage *) mh;
920 if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || 883 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
921 (op->state->phase == PHASE_EXPECT_IBF) ) 884 (op->state->phase == PHASE_EXPECT_IBF) )
922 { 885 {
923 op->state->phase = PHASE_EXPECT_IBF_CONT; 886 op->state->phase = PHASE_EXPECT_IBF_CONT;
924 GNUNET_assert (NULL == op->state->remote_ibf); 887 GNUNET_assert (NULL == op->state->remote_ibf);
925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 888 LOG (GNUNET_ERROR_TYPE_DEBUG,
926 "Creating new ibf of size %u\n", 889 "Creating new ibf of size %u\n",
927 1 << msg->order); 890 1 << msg->order);
928 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 891 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
@@ -944,6 +907,13 @@ handle_p2p_ibf (void *cls,
944 return GNUNET_SYSERR; 907 return GNUNET_SYSERR;
945 } 908 }
946 } 909 }
910 else
911 {
912 LOG_OP (GNUNET_ERROR_TYPE_DEBUG,
913 "wrong phase\n",
914 op, NULL);
915 GNUNET_assert (0);
916 }
947 917
948 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 918 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
949 919
@@ -961,6 +931,8 @@ handle_p2p_ibf (void *cls,
961 return GNUNET_SYSERR; 931 return GNUNET_SYSERR;
962 } 932 }
963 933
934 GNUNET_assert (NULL != op->state->remote_ibf);
935
964 ibf_read_slice (&msg[1], 936 ibf_read_slice (&msg[1],
965 op->state->ibf_buckets_received, 937 op->state->ibf_buckets_received,
966 buckets_in_message, 938 buckets_in_message,
@@ -969,9 +941,9 @@ handle_p2p_ibf (void *cls,
969 941
970 if (op->state->ibf_buckets_received == op->state->remote_ibf->size) 942 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
971 { 943 {
972 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 944 LOG (GNUNET_ERROR_TYPE_DEBUG,
973 "received full ibf\n"); 945 "received full ibf\n");
974 op->state->phase = PHASE_EXPECT_ELEMENTS; 946 op->state->phase = PHASE_INVENTORY_ACTIVE;
975 decode_and_send (op); 947 decode_and_send (op);
976 } 948 }
977 return GNUNET_OK; 949 return GNUNET_OK;
@@ -984,15 +956,17 @@ handle_p2p_ibf (void *cls,
984 * 956 *
985 * @param op union operation 957 * @param op union operation
986 * @param element element to send 958 * @param element element to send
959 * @param status status to send with the new element
987 */ 960 */
988static void 961static void
989send_client_element (struct Operation *op, 962send_client_element (struct Operation *op,
990 struct GNUNET_SET_Element *element) 963 struct GNUNET_SET_Element *element,
964 int status)
991{ 965{
992 struct GNUNET_MQ_Envelope *ev; 966 struct GNUNET_MQ_Envelope *ev;
993 struct GNUNET_SET_ResultMessage *rm; 967 struct GNUNET_SET_ResultMessage *rm;
994 968
995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 969 LOG (GNUNET_ERROR_TYPE_DEBUG,
996 "sending element (size %u) to client\n", 970 "sending element (size %u) to client\n",
997 element->size); 971 element->size);
998 GNUNET_assert (0 != op->spec->client_request_id); 972 GNUNET_assert (0 != op->spec->client_request_id);
@@ -1003,12 +977,7 @@ send_client_element (struct Operation *op,
1003 GNUNET_break (0); 977 GNUNET_break (0);
1004 return; 978 return;
1005 } 979 }
1006 980 rm->result_status = htons (status);
1007 if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1008 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1009 else if (GNUNET_SET_RESULT_SYMMETRIC == op->spec->result_mode)
1010 rm->result_status = htons (GNUNET_SET_STATUS_ADD_LOCAL);
1011
1012 rm->request_id = htonl (op->spec->client_request_id); 981 rm->request_id = htonl (op->spec->client_request_id);
1013 rm->element_type = element->element_type; 982 rm->element_type = element->element_type;
1014 memcpy (&rm[1], element->data, element->size); 983 memcpy (&rm[1], element->data, element->size);
@@ -1034,98 +1003,46 @@ send_done_and_destroy (void *cls)
1034 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1003 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1035 rm->element_type = htons (0); 1004 rm->element_type = htons (0);
1036 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1005 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1006 /* Will also call the union-specific cancel function. */
1037 _GSS_operation_destroy (op, GNUNET_YES); 1007 _GSS_operation_destroy (op, GNUNET_YES);
1038 op->keep--;
1039 if (0 == op->keep)
1040 GNUNET_free (op);
1041} 1008}
1042 1009
1043 1010
1044/**
1045 * Send all remaining elements in the full result iterator.
1046 *
1047 * @param cls operation
1048 */
1049static void 1011static void
1050send_remaining_elements (void *cls) 1012maybe_finish (struct Operation *op)
1051{ 1013{
1052 struct Operation *op = cls; 1014 unsigned int num_demanded;
1053 struct KeyEntry *ke;
1054 int res;
1055 1015
1056 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, 1016 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1057 NULL, 1017
1058 (const void **) &ke); 1018 if (PHASE_FINISH_WAITING == op->state->phase)
1059 if (GNUNET_NO == res)
1060 {
1061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062 "sending done and destroy because iterator ran out\n");
1063 send_done_and_destroy (op);
1064 return;
1065 }
1066 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1067 "sending elements from key entry\n");
1068 while (1)
1069 { 1019 {
1070 struct GNUNET_MQ_Envelope *ev; 1020 LOG (GNUNET_ERROR_TYPE_DEBUG,
1071 struct GNUNET_SET_ResultMessage *rm; 1021 "In PHASE_FINISH_WAITING, pending %u demands\n",
1072 struct GNUNET_SET_Element *element; 1022 num_demanded);
1073 1023 if (0 == num_demanded)
1074 element = &ke->element->element;
1075 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076 "sending element (size %u) to client (full set)\n",
1077 element->size);
1078 GNUNET_assert (0 != op->spec->client_request_id);
1079 ev = GNUNET_MQ_msg_extra (rm,
1080 element->size,
1081 GNUNET_MESSAGE_TYPE_SET_RESULT);
1082 if (NULL == ev)
1083 { 1024 {
1084 GNUNET_MQ_discard (ev); 1025 struct GNUNET_MQ_Envelope *ev;
1085 GNUNET_break (0);
1086 continue;
1087 }
1088 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1089 rm->request_id = htonl (op->spec->client_request_id);
1090 rm->element_type = element->element_type;
1091 memcpy (&rm[1], element->data, element->size);
1092 if (NULL == ke->next_colliding)
1093 {
1094 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1095 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1096 break;
1097 }
1098 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1099 ke = ke->next_colliding;
1100 }
1101}
1102 1026
1027 op->state->phase = PHASE_DONE;
1028 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1029 GNUNET_MQ_send (op->mq, ev);
1103 1030
1104/** 1031 /* We now wait until the other peer closes the channel
1105 * Send a result message to the client indicating 1032 * after it got all elements from us. */
1106 * that the operation is over. 1033 }
1107 * After the result done message has been sent to the client, 1034 }
1108 * destroy the evaluate operation. 1035 if (PHASE_FINISH_CLOSING == op->state->phase)
1109 *
1110 * @param op union operation
1111 */
1112static void
1113finish_and_destroy (struct Operation *op)
1114{
1115 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1116 op->keep++;
1117 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1118 { 1036 {
1119 /* prevent that the op is free'd by the tunnel end handler */ 1037 LOG (GNUNET_ERROR_TYPE_DEBUG,
1120 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1038 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1121 "sending full result set\n"); 1039 num_demanded);
1122 GNUNET_assert (NULL == op->state->full_result_iter); 1040 if (0 == num_demanded)
1123 op->state->full_result_iter = 1041 {
1124 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element); 1042 op->state->phase = PHASE_DONE;
1125 send_remaining_elements (op); 1043 send_done_and_destroy (op);
1126 return; 1044 }
1127 } 1045 }
1128 send_done_and_destroy (op);
1129} 1046}
1130 1047
1131 1048
@@ -1141,65 +1058,106 @@ handle_p2p_elements (void *cls,
1141{ 1058{
1142 struct Operation *op = cls; 1059 struct Operation *op = cls;
1143 struct ElementEntry *ee; 1060 struct ElementEntry *ee;
1061 const struct GNUNET_SET_ElementMessage *emsg;
1144 uint16_t element_size; 1062 uint16_t element_size;
1145 1063
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1064 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1147 "Got element from peer\n");
1148 if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1149 (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1150 { 1065 {
1066 GNUNET_break_op (0);
1151 fail_union_operation (op); 1067 fail_union_operation (op);
1068 return;
1069 }
1070
1071 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1072 {
1152 GNUNET_break_op (0); 1073 GNUNET_break_op (0);
1074 fail_union_operation (op);
1153 return; 1075 return;
1154 } 1076 }
1155 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); 1077
1078 emsg = (struct GNUNET_SET_ElementMessage *) mh;
1079
1080 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1156 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1081 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1157 memcpy (&ee[1], &mh[1], element_size); 1082 memcpy (&ee[1], &emsg[1], element_size);
1158 ee->element.size = element_size; 1083 ee->element.size = element_size;
1159 ee->element.data = &ee[1]; 1084 ee->element.data = &ee[1];
1085 ee->element.element_type = ntohs (emsg->element_type);
1160 ee->remote = GNUNET_YES; 1086 ee->remote = GNUNET_YES;
1161 GNUNET_CRYPTO_hash (ee->element.data, 1087 GNUNET_CRYPTO_hash (ee->element.data,
1162 ee->element.size, 1088 ee->element.size,
1163 &ee->element_hash); 1089 &ee->element_hash);
1164 1090
1165 if (GNUNET_YES == op_has_element (op, &ee->element_hash)) 1091 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, &ee->element_hash, NULL))
1166 { 1092 {
1167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1093 /* We got something we didn't demand, since it's not in our map. */
1168 "got existing element from peer\n"); 1094 GNUNET_break_op (0);
1169 GNUNET_free (ee); 1095 GNUNET_free (ee);
1096 fail_union_operation (op);
1170 return; 1097 return;
1171 } 1098 }
1172 1099
1173 op_register_element (op, ee); 1100 LOG (GNUNET_ERROR_TYPE_DEBUG,
1174 /* only send results immediately if the client wants it */ 1101 "Got element (size %u, hash %s) from peer\n",
1175 if (GNUNET_SET_RESULT_FULL != op->spec->result_mode) 1102 (unsigned int) element_size,
1176 send_client_element (op, &ee->element); 1103 GNUNET_h2s (&ee->element_hash));
1104
1105 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1106 {
1107 /* Got repeated element. Should not happen since
1108 * we track demands. */
1109 GNUNET_break (0);
1110 GNUNET_free (ee);
1111 }
1112 else
1113 {
1114 LOG (GNUNET_ERROR_TYPE_DEBUG,
1115 "Registering new element from remote peer\n");
1116 op_register_element (op, ee);
1117 /* only send results immediately if the client wants it */
1118 switch (op->spec->result_mode)
1119 {
1120 case GNUNET_SET_RESULT_ADDED:
1121 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1122 break;
1123 case GNUNET_SET_RESULT_SYMMETRIC:
1124 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1125 break;
1126 default:
1127 /* Result mode not supported, should have been caught earlier. */
1128 GNUNET_break (0);
1129 break;
1130 }
1131 }
1132
1133 maybe_finish (op);
1177} 1134}
1178 1135
1179 1136
1180/** 1137/**
1181 * Handle an element request from a remote peer. 1138 * Send offers (for GNUNET_Hash-es) in response
1139 * to inquiries (for IBF_Key-s).
1182 * 1140 *
1183 * @param cls the union operation 1141 * @param cls the union operation
1184 * @param mh the message 1142 * @param mh the message
1185 */ 1143 */
1186static void 1144static void
1187handle_p2p_element_requests (void *cls, 1145handle_p2p_inquiry (void *cls,
1188 const struct GNUNET_MessageHeader *mh) 1146 const struct GNUNET_MessageHeader *mh)
1189{ 1147{
1190 struct Operation *op = cls; 1148 struct Operation *op = cls;
1191 const struct IBF_Key *ibf_key; 1149 const struct IBF_Key *ibf_key;
1192 unsigned int num_keys; 1150 unsigned int num_keys;
1193 1151
1194 /* look up elements and send them */ 1152 /* look up elements and send them */
1195 if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1153 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1196 { 1154 {
1197 GNUNET_break_op (0); 1155 GNUNET_break_op (0);
1198 fail_union_operation (op); 1156 fail_union_operation (op);
1199 return; 1157 return;
1200 } 1158 }
1201 num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1159 num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1202 / sizeof (struct IBF_Key); 1160 / sizeof (struct IBF_Key);
1203 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1161 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1204 != num_keys * sizeof (struct IBF_Key)) 1162 != num_keys * sizeof (struct IBF_Key))
1205 { 1163 {
@@ -1211,12 +1169,149 @@ handle_p2p_element_requests (void *cls,
1211 ibf_key = (const struct IBF_Key *) &mh[1]; 1169 ibf_key = (const struct IBF_Key *) &mh[1];
1212 while (0 != num_keys--) 1170 while (0 != num_keys--)
1213 { 1171 {
1214 send_elements_for_key (op, *ibf_key); 1172 send_offers_for_key (op, *ibf_key);
1215 ibf_key++; 1173 ibf_key++;
1216 } 1174 }
1217} 1175}
1218 1176
1219 1177
1178
1179static void
1180handle_p2p_demand (void *cls,
1181 const struct GNUNET_MessageHeader *mh)
1182{
1183 struct Operation *op = cls;
1184 struct ElementEntry *ee;
1185 struct GNUNET_SET_ElementMessage *emsg;
1186 const struct GNUNET_HashCode *hash;
1187 unsigned int num_hashes;
1188 struct GNUNET_MQ_Envelope *ev;
1189
1190 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1191 / sizeof (struct GNUNET_HashCode);
1192 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1193 != num_hashes * sizeof (struct GNUNET_HashCode))
1194 {
1195 GNUNET_break_op (0);
1196 fail_union_operation (op);
1197 return;
1198 }
1199
1200 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1201 num_hashes > 0;
1202 hash++, num_hashes--)
1203 {
1204 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1205 if (NULL == ee)
1206 {
1207 /* Demand for non-existing element. */
1208 GNUNET_break_op (0);
1209 fail_union_operation (op);
1210 return;
1211 }
1212 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1213 {
1214 /* Probably confused lazily copied sets. */
1215 GNUNET_break_op (0);
1216 fail_union_operation (op);
1217 return;
1218 }
1219 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1220 memcpy (&emsg[1], ee->element.data, ee->element.size);
1221 emsg->reserved = htons (0);
1222 emsg->element_type = htons (ee->element.element_type);
1223 LOG (GNUNET_ERROR_TYPE_DEBUG,
1224 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1225 (void *) op,
1226 (unsigned int) ee->element.size,
1227 GNUNET_h2s (&ee->element_hash));
1228 GNUNET_MQ_send (op->mq, ev);
1229
1230 switch (op->spec->result_mode)
1231 {
1232 case GNUNET_SET_RESULT_ADDED:
1233 /* Nothing to do. */
1234 break;
1235 case GNUNET_SET_RESULT_SYMMETRIC:
1236 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1237 break;
1238 default:
1239 /* Result mode not supported, should have been caught earlier. */
1240 GNUNET_break (0);
1241 break;
1242 }
1243 }
1244}
1245
1246
1247/**
1248 * Handle offers (of GNUNET_HashCode-s) and
1249 * respond with demands (of GNUNET_HashCode-s).
1250 *
1251 * @param cls the union operation
1252 * @param mh the message
1253 */
1254static void
1255handle_p2p_offer (void *cls,
1256 const struct GNUNET_MessageHeader *mh)
1257{
1258 struct Operation *op = cls;
1259 const struct GNUNET_HashCode *hash;
1260 unsigned int num_hashes;
1261
1262 /* look up elements and send them */
1263 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1264 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1265 {
1266 GNUNET_break_op (0);
1267 fail_union_operation (op);
1268 return;
1269 }
1270 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1271 / sizeof (struct GNUNET_HashCode);
1272 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1273 != num_hashes * sizeof (struct GNUNET_HashCode))
1274 {
1275 GNUNET_break_op (0);
1276 fail_union_operation (op);
1277 return;
1278 }
1279
1280 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1281 num_hashes > 0;
1282 hash++, num_hashes--)
1283 {
1284 struct ElementEntry *ee;
1285 struct GNUNET_MessageHeader *demands;
1286 struct GNUNET_MQ_Envelope *ev;
1287 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1288 if (NULL != ee)
1289 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1290 continue;
1291
1292 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, hash))
1293 {
1294 LOG (GNUNET_ERROR_TYPE_DEBUG,
1295 "Skipped sending duplicate demand\n");
1296 continue;
1297 }
1298
1299 GNUNET_assert (GNUNET_OK ==
1300 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1301 hash,
1302 NULL,
1303 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1304
1305 LOG (GNUNET_ERROR_TYPE_DEBUG,
1306 "[OP %x] Requesting element (hash %s)\n",
1307 (void *) op, GNUNET_h2s (hash));
1308 ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1309 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1310 GNUNET_MQ_send (op->mq, ev);
1311 }
1312}
1313
1314
1220/** 1315/**
1221 * Handle a done message from a remote peer 1316 * Handle a done message from a remote peer
1222 * 1317 *
@@ -1228,25 +1323,40 @@ handle_p2p_done (void *cls,
1228 const struct GNUNET_MessageHeader *mh) 1323 const struct GNUNET_MessageHeader *mh)
1229{ 1324{
1230 struct Operation *op = cls; 1325 struct Operation *op = cls;
1231 struct GNUNET_MQ_Envelope *ev;
1232 1326
1233 if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1327 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1234 { 1328 {
1235 /* we got all requests, but still have to send our elements as response */ 1329 /* We got all requests, but still have to send our elements in response. */
1236 1330
1237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1331 op->state->phase = PHASE_FINISH_WAITING;
1238 "got DONE, sending final DONE after elements\n"); 1332
1239 op->state->phase = PHASE_FINISHED; 1333 LOG (GNUNET_ERROR_TYPE_DEBUG,
1240 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); 1334 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1241 GNUNET_MQ_send (op->mq, ev); 1335 /* The active peer is done sending offers
1336 * and inquiries. This means that all
1337 * our responses to that (demands and offers)
1338 * must be in flight (queued or in mesh).
1339 *
1340 * We should notify the active peer once
1341 * all our demands are satisfied, so that the active
1342 * peer can quit if we gave him everything.
1343 */
1344 maybe_finish (op);
1242 return; 1345 return;
1243 } 1346 }
1244 if (op->state->phase == PHASE_EXPECT_ELEMENTS) 1347 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1245 { 1348 {
1246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1349 LOG (GNUNET_ERROR_TYPE_DEBUG,
1247 "got final DONE\n"); 1350 "got DONE (as active partner), waiting to finish\n");
1248 op->state->phase = PHASE_FINISHED; 1351 /* All demands of the other peer are satisfied,
1249 finish_and_destroy (op); 1352 * and we processed all offers, thus we know
1353 * exactly what our demands must be.
1354 *
1355 * We'll close the channel
1356 * to the other peer once our demands are met.
1357 */
1358 op->state->phase = PHASE_FINISH_CLOSING;
1359 maybe_finish (op);
1250 return; 1360 return;
1251 } 1361 }
1252 GNUNET_break_op (0); 1362 GNUNET_break_op (0);
@@ -1268,12 +1378,14 @@ union_evaluate (struct Operation *op,
1268 struct GNUNET_MQ_Envelope *ev; 1378 struct GNUNET_MQ_Envelope *ev;
1269 struct OperationRequestMessage *msg; 1379 struct OperationRequestMessage *msg;
1270 1380
1381 GNUNET_assert (NULL == op->state);
1271 op->state = GNUNET_new (struct OperationState); 1382 op->state = GNUNET_new (struct OperationState);
1383 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1272 /* copy the current generation's strata estimator for this operation */ 1384 /* copy the current generation's strata estimator for this operation */
1273 op->state->se = strata_estimator_dup (op->spec->set->state->se); 1385 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1274 /* we started the operation, thus we have to send the operation request */ 1386 /* we started the operation, thus we have to send the operation request */
1275 op->state->phase = PHASE_EXPECT_SE; 1387 op->state->phase = PHASE_EXPECT_SE;
1276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1388 LOG (GNUNET_ERROR_TYPE_DEBUG,
1277 "Initiating union operation evaluation\n"); 1389 "Initiating union operation evaluation\n");
1278 ev = GNUNET_MQ_msg_nested_mh (msg, 1390 ev = GNUNET_MQ_msg_nested_mh (msg,
1279 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1391 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
@@ -1291,10 +1403,10 @@ union_evaluate (struct Operation *op,
1291 ev); 1403 ev);
1292 1404
1293 if (NULL != opaque_context) 1405 if (NULL != opaque_context)
1294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1406 LOG (GNUNET_ERROR_TYPE_DEBUG,
1295 "sent op request with context message\n"); 1407 "sent op request with context message\n");
1296 else 1408 else
1297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1409 LOG (GNUNET_ERROR_TYPE_DEBUG,
1298 "sent op request without context message\n"); 1410 "sent op request without context message\n");
1299} 1411}
1300 1412
@@ -1308,10 +1420,12 @@ union_evaluate (struct Operation *op,
1308static void 1420static void
1309union_accept (struct Operation *op) 1421union_accept (struct Operation *op)
1310{ 1422{
1311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1423 LOG (GNUNET_ERROR_TYPE_DEBUG,
1312 "accepting set union operation\n"); 1424 "accepting set union operation\n");
1425 GNUNET_assert (NULL == op->state);
1313 op->state = GNUNET_new (struct OperationState); 1426 op->state = GNUNET_new (struct OperationState);
1314 op->state->se = strata_estimator_dup (op->spec->set->state->se); 1427 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1428 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1315 /* kick off the operation */ 1429 /* kick off the operation */
1316 send_strata_estimator (op); 1430 send_strata_estimator (op);
1317} 1431}
@@ -1330,7 +1444,7 @@ union_set_create (void)
1330{ 1444{
1331 struct SetState *set_state; 1445 struct SetState *set_state;
1332 1446
1333 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1447 LOG (GNUNET_ERROR_TYPE_DEBUG,
1334 "union set created\n"); 1448 "union set created\n");
1335 set_state = GNUNET_new (struct SetState); 1449 set_state = GNUNET_new (struct SetState);
1336 set_state->se = strata_estimator_create (SE_STRATA_COUNT, 1450 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
@@ -1397,10 +1511,10 @@ int
1397union_handle_p2p_message (struct Operation *op, 1511union_handle_p2p_message (struct Operation *op,
1398 const struct GNUNET_MessageHeader *mh) 1512 const struct GNUNET_MessageHeader *mh)
1399{ 1513{
1400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1514 //LOG (GNUNET_ERROR_TYPE_DEBUG,
1401 "received p2p message (t: %u, s: %u)\n", 1515 // "received p2p message (t: %u, s: %u)\n",
1402 ntohs (mh->type), 1516 // ntohs (mh->type),
1403 ntohs (mh->size)); 1517 // ntohs (mh->size));
1404 switch (ntohs (mh->type)) 1518 switch (ntohs (mh->type))
1405 { 1519 {
1406 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: 1520 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
@@ -1410,29 +1524,36 @@ union_handle_p2p_message (struct Operation *op,
1410 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 1524 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1411 handle_p2p_elements (op, mh); 1525 handle_p2p_elements (op, mh);
1412 break; 1526 break;
1413 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: 1527 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1414 handle_p2p_element_requests (op, mh); 1528 handle_p2p_inquiry (op, mh);
1415 break; 1529 break;
1416 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: 1530 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1417 handle_p2p_done (op, mh); 1531 handle_p2p_done (op, mh);
1418 break; 1532 break;
1533 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1534 handle_p2p_offer (op, mh);
1535 break;
1536 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1537 handle_p2p_demand (op, mh);
1538 break;
1419 default: 1539 default:
1420 /* something wrong with cadet's message handlers? */ 1540 /* Something wrong with cadet's message handlers? */
1421 GNUNET_assert (0); 1541 GNUNET_assert (0);
1422 } 1542 }
1423 return GNUNET_OK; 1543 return GNUNET_OK;
1424} 1544}
1425 1545
1546
1426/** 1547/**
1427 * handler for peer-disconnects, notifies the client 1548 * Handler for peer-disconnects, notifies the client
1428 * about the aborted operation in case the op was not concluded 1549 * about the aborted operation in case the op was not concluded.
1429 * 1550 *
1430 * @param op the destroyed operation 1551 * @param op the destroyed operation
1431 */ 1552 */
1432static void 1553static void
1433union_peer_disconnect (struct Operation *op) 1554union_peer_disconnect (struct Operation *op)
1434{ 1555{
1435 if (PHASE_FINISHED != op->state->phase) 1556 if (PHASE_DONE != op->state->phase)
1436 { 1557 {
1437 struct GNUNET_MQ_Envelope *ev; 1558 struct GNUNET_MQ_Envelope *ev;
1438 struct GNUNET_SET_ResultMessage *msg; 1559 struct GNUNET_SET_ResultMessage *msg;
@@ -1444,19 +1565,27 @@ union_peer_disconnect (struct Operation *op)
1444 msg->element_type = htons (0); 1565 msg->element_type = htons (0);
1445 GNUNET_MQ_send (op->spec->set->client_mq, 1566 GNUNET_MQ_send (op->spec->set->client_mq,
1446 ev); 1567 ev);
1447 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1568 LOG (GNUNET_ERROR_TYPE_WARNING,
1448 "other peer disconnected prematurely\n"); 1569 "other peer disconnected prematurely, phase %u\n",
1570 op->state->phase);
1449 _GSS_operation_destroy (op, 1571 _GSS_operation_destroy (op,
1450 GNUNET_YES); 1572 GNUNET_YES);
1451 return; 1573 return;
1452 } 1574 }
1453 // else: the session has already been concluded 1575 // else: the session has already been concluded
1454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1576 LOG (GNUNET_ERROR_TYPE_DEBUG,
1455 "other peer disconnected (finished)\n"); 1577 "other peer disconnected (finished)\n");
1456 if (GNUNET_NO == op->state->client_done_sent) 1578 if (GNUNET_NO == op->state->client_done_sent)
1457 finish_and_destroy (op); 1579 send_done_and_destroy (op);
1458} 1580}
1459 1581
1582
1583/**
1584 * Copy union-specific set state.
1585 *
1586 * @param set source set for copying the union state
1587 * @return a copy of the union-specific set state
1588 */
1460static struct SetState * 1589static struct SetState *
1461union_copy_state (struct Set *set) 1590union_copy_state (struct Set *set)
1462{ 1591{
@@ -1469,6 +1598,7 @@ union_copy_state (struct Set *set)
1469 return new_state; 1598 return new_state;
1470} 1599}
1471 1600
1601
1472/** 1602/**
1473 * Get the table with implementing functions for 1603 * Get the table with implementing functions for
1474 * set union. 1604 * set union.
diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c
index 1569c29b7..2ee1b762d 100644
--- a/src/set/gnunet-set-profiler.c
+++ b/src/set/gnunet-set-profiler.c
@@ -98,6 +98,7 @@ set_result_cb (void *cls,
98 enum GNUNET_SET_Status status) 98 enum GNUNET_SET_Status status)
99{ 99{
100 struct SetInfo *info = cls; 100 struct SetInfo *info = cls;
101 struct GNUNET_HashCode hash;
101 102
102 GNUNET_assert (GNUNET_NO == info->done); 103 GNUNET_assert (GNUNET_NO == info->done);
103 switch (status) 104 switch (status)
@@ -114,15 +115,22 @@ set_result_cb (void *cls,
114 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n"); 115 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
115 GNUNET_SCHEDULER_shutdown (); 116 GNUNET_SCHEDULER_shutdown ();
116 return; 117 return;
117 case GNUNET_SET_STATUS_OK: 118 case GNUNET_SET_STATUS_ADD_LOCAL:
119 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id);
118 break; 120 break;
121 case GNUNET_SET_STATUS_ADD_REMOTE:
122 GNUNET_CRYPTO_hash (element->data, element->size, &hash);
123 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: remote element %s\n", info->id,
124 GNUNET_h2s (&hash));
125 // XXX: record and check
126 return;
119 default: 127 default:
120 GNUNET_assert (0); 128 GNUNET_assert (0);
121 } 129 }
122 130
123 if (element->size != sizeof (struct GNUNET_HashCode)) 131 if (element->size != sizeof (struct GNUNET_HashCode))
124 { 132 {
125 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u\n", element->size); 133 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u, expected %u\n", element->size, sizeof (struct GNUNET_HashCode));
126 GNUNET_assert (0); 134 GNUNET_assert (0);
127 } 135 }
128 136
@@ -180,6 +188,8 @@ static void
180handle_shutdown (void *cls, 188handle_shutdown (void *cls,
181 const struct GNUNET_SCHEDULER_TaskContext *tc) 189 const struct GNUNET_SCHEDULER_TaskContext *tc)
182{ 190{
191 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
192 "Shutting down set profiler\n");
183 if (NULL != set_listener) 193 if (NULL != set_listener)
184 { 194 {
185 GNUNET_SET_listen_cancel (set_listener); 195 GNUNET_SET_listen_cancel (set_listener);
@@ -209,11 +219,13 @@ handle_shutdown (void *cls,
209 219
210 220
211static void 221static void
212run (void *cls, char *const *args, const char *cfgfile, 222run (void *cls,
213 const struct GNUNET_CONFIGURATION_Handle *cfg) 223 const struct GNUNET_CONFIGURATION_Handle *cfg,
224 struct GNUNET_TESTING_Peer *peer)
214{ 225{
215 unsigned int i; 226 unsigned int i;
216 struct GNUNET_HashCode hash; 227 struct GNUNET_HashCode hash;
228 struct GNUNET_HashCode hashhash;
217 229
218 config = cfg; 230 config = cfg;
219 231
@@ -239,6 +251,9 @@ run (void *cls, char *const *args, const char *cfgfile,
239 for (i = 0; i < num_a; i++) 251 for (i = 0; i < num_a; i++)
240 { 252 {
241 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); 253 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
254 GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash);
255 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set a: Created element %s\n",
256 GNUNET_h2s (&hashhash));
242 GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL, 257 GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL,
243 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 258 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
244 } 259 }
@@ -246,6 +261,9 @@ run (void *cls, char *const *args, const char *cfgfile,
246 for (i = 0; i < num_b; i++) 261 for (i = 0; i < num_b; i++)
247 { 262 {
248 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); 263 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
264 GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash);
265 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set b: Created element %s\n",
266 GNUNET_h2s (&hashhash));
249 GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL, 267 GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL,
250 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 268 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
251 } 269 }
@@ -253,12 +271,14 @@ run (void *cls, char *const *args, const char *cfgfile,
253 for (i = 0; i < num_c; i++) 271 for (i = 0; i < num_c; i++)
254 { 272 {
255 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); 273 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
274 GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash);
275 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set c: Created element %s\n",
276 GNUNET_h2s (&hashhash));
256 GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL, 277 GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL,
257 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 278 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
258 } 279 }
259 280
260 /* use last hash for app id */ 281 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id);
261 app_id = hash;
262 282
263 /* FIXME: also implement intersection etc. */ 283 /* FIXME: also implement intersection etc. */
264 info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); 284 info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
@@ -281,6 +301,17 @@ run (void *cls, char *const *args, const char *cfgfile,
281} 301}
282 302
283 303
304static void
305pre_run (void *cls, char *const *args, const char *cfgfile,
306 const struct GNUNET_CONFIGURATION_Handle *cfg)
307{
308 if (0 != GNUNET_TESTING_peer_run ("set-profiler",
309 cfgfile,
310 &run, NULL))
311 ret = 2;
312}
313
314
284int 315int
285main (int argc, char **argv) 316main (int argc, char **argv)
286{ 317{
@@ -295,13 +326,13 @@ main (int argc, char **argv)
295 gettext_noop ("number of values"), 326 gettext_noop ("number of values"),
296 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c }, 327 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c },
297 { 'x', "operation", NULL, 328 { 'x', "operation", NULL,
298 gettext_noop ("oeration to execute"), 329 gettext_noop ("operation to execute"),
299 GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str }, 330 GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str },
300 GNUNET_GETOPT_OPTION_END 331 GNUNET_GETOPT_OPTION_END
301 }; 332 };
302 GNUNET_PROGRAM_run (argc, argv, "gnunet-set-profiler", 333 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler",
303 "help", 334 "help",
304 options, &run, NULL); 335 options, &pre_run, NULL, GNUNET_YES);
305 return ret; 336 return ret;
306} 337}
307 338
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 51a494d7f..16aa87cd0 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -398,40 +398,49 @@ handle_result (void *cls,
398 "Ignoring result from canceled operation\n"); 398 "Ignoring result from canceled operation\n");
399 return; 399 return;
400 } 400 }
401 if (GNUNET_SET_STATUS_OK != result_status) 401
402 switch (result_status)
402 { 403 {
403 /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
404 * and this is the last result message we get */
405 GNUNET_MQ_assoc_remove (set->mq,
406 ntohl (msg->request_id));
407 GNUNET_CONTAINER_DLL_remove (set->ops_head,
408 set->ops_tail,
409 oh);
410 if ( (GNUNET_YES == set->destroy_requested) &&
411 (NULL == set->ops_head) )
412 GNUNET_SET_destroy (set);
413 if (NULL != oh->result_cb)
414 oh->result_cb (oh->result_cls,
415 NULL,
416 result_status);
417 switch (result_status)
418 {
419 case GNUNET_SET_STATUS_OK: 404 case GNUNET_SET_STATUS_OK:
420 case GNUNET_SET_STATUS_ADD_LOCAL: 405 case GNUNET_SET_STATUS_ADD_LOCAL:
421 case GNUNET_SET_STATUS_ADD_REMOTE: 406 case GNUNET_SET_STATUS_ADD_REMOTE:
422 break; 407 goto do_element;
423 case GNUNET_SET_STATUS_FAILURE: 408 case GNUNET_SET_STATUS_FAILURE:
424 oh->result_cb = NULL;
425 break;
426 case GNUNET_SET_STATUS_HALF_DONE:
427 break;
428 case GNUNET_SET_STATUS_DONE: 409 case GNUNET_SET_STATUS_DONE:
429 oh->result_cb = NULL; 410 goto do_final;
430 break; 411 case GNUNET_SET_STATUS_HALF_DONE:
431 } 412 /* not used anymore */
432 GNUNET_free (oh); 413 GNUNET_assert (0);
433 return; 414 }
415
416do_final:
417 LOG (GNUNET_ERROR_TYPE_DEBUG,
418 "Treating result as final status\n");
419 GNUNET_MQ_assoc_remove (set->mq,
420 ntohl (msg->request_id));
421 GNUNET_CONTAINER_DLL_remove (set->ops_head,
422 set->ops_tail,
423 oh);
424 if (NULL != oh->result_cb)
425 {
426 oh->result_cb (oh->result_cls,
427 NULL,
428 result_status);
434 } 429 }
430 else
431 {
432 LOG (GNUNET_ERROR_TYPE_DEBUG,
433 "No callback for final status\n");
434 }
435 if ( (GNUNET_YES == set->destroy_requested) &&
436 (NULL == set->ops_head) )
437 GNUNET_SET_destroy (set);
438 GNUNET_free (oh);
439 return;
440
441do_element:
442 LOG (GNUNET_ERROR_TYPE_DEBUG,
443 "Treating result as element\n");
435 e.data = &msg[1]; 444 e.data = &msg[1];
436 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); 445 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
437 e.element_type = msg->element_type; 446 e.element_type = msg->element_type;
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index f3c0770fe..4f32b8854 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -1,11 +1,12 @@
1@INLINE@ ../../contrib/no_forcestart.conf
2
1[PATHS] 3[PATHS]
2GNUNET_TEST_HOME = /tmp/test-gnunet-set/ 4GNUNET_TEST_HOME = /tmp/test-gnunet-set/
3 5
4[set] 6[set]
5AUTOSTART = YES 7AUTOSTART = YES
6@UNIXONLY@ PORT = 2106
7#PREFIX = valgrind 8#PREFIX = valgrind
8#PREFIX = valgrind -v --leak-check=full 9PREFIX = valgrind --leak-check=full
9#PREFIX = gdbserver :1234 10#PREFIX = gdbserver :1234
10OPTIONS = -L INFO 11OPTIONS = -L INFO
11 12
@@ -21,38 +22,13 @@ USE_LOCALADDR = YES
21[peerinfo] 22[peerinfo]
22NO_IO = YES 23NO_IO = YES
23 24
24[nse] 25[nat]
25WORKBITS = 0 26# Use addresses from the local network interfaces (inluding loopback, but also others)
26 27USE_LOCALADDR = YES
27[hostlist]
28FORCESTART = NO
29AUTOSTART = NO
30
31[fs]
32FORCESTART = NO
33AUTOSTART = NO
34
35[vpn]
36FORCESTART = NO
37AUTOSTART = NO
38
39[revocation]
40FORCESTART = NO
41AUTOSTART = NO
42
43[gns]
44FORCESTART = NO
45AUTOSTART = NO
46
47[namestore]
48FORCESTART = NO
49AUTOSTART = NO
50 28
51[namecache] 29# Disable IPv6 support
52FORCESTART = NO 30DISABLEV6 = NO
53AUTOSTART = NO
54 31
55[topology] 32# Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8)
56FORCESTART = NO 33RETURN_LOCAL_ADDRESSES = YES
57AUTOSTART = NO
58 34
diff --git a/src/set/test_set_union_result_full.c b/src/set/test_set_union_result_symmetric.c
index 7901c52be..7f119b528 100644
--- a/src/set/test_set_union_result_full.c
+++ b/src/set/test_set_union_result_symmetric.c
@@ -19,8 +19,8 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file set/test_set_union_result_full.c 22 * @file set/test_set_union_result_smmetric
23 * @brief testcase for full result mode of the union set operation 23 * @brief testcase for symmetric result mode of the union set operation
24 */ 24 */
25#include "platform.h" 25#include "platform.h"
26#include "gnunet_util_lib.h" 26#include "gnunet_util_lib.h"
@@ -69,7 +69,7 @@ result_cb_set1 (void *cls,
69{ 69{
70 switch (status) 70 switch (status)
71 { 71 {
72 case GNUNET_SET_STATUS_OK: 72 case GNUNET_SET_STATUS_ADD_LOCAL:
73 count_set1++; 73 count_set1++;
74 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 74 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
75 "set 1: got element\n"); 75 "set 1: got element\n");
@@ -88,6 +88,8 @@ result_cb_set1 (void *cls,
88 if (NULL == set2) 88 if (NULL == set2)
89 GNUNET_SCHEDULER_shutdown (); 89 GNUNET_SCHEDULER_shutdown ();
90 break; 90 break;
91 case GNUNET_SET_STATUS_ADD_REMOTE:
92 break;
91 default: 93 default:
92 GNUNET_assert (0); 94 GNUNET_assert (0);
93 } 95 }
@@ -101,7 +103,7 @@ result_cb_set2 (void *cls,
101{ 103{
102 switch (status) 104 switch (status)
103 { 105 {
104 case GNUNET_SET_STATUS_OK: 106 case GNUNET_SET_STATUS_ADD_LOCAL:
105 count_set2++; 107 count_set2++;
106 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 108 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
107 "set 2: got element\n"); 109 "set 2: got element\n");
@@ -120,6 +122,8 @@ result_cb_set2 (void *cls,
120 if (NULL == set1) 122 if (NULL == set1)
121 GNUNET_SCHEDULER_shutdown (); 123 GNUNET_SCHEDULER_shutdown ();
122 break; 124 break;
125 case GNUNET_SET_STATUS_ADD_REMOTE:
126 break;
123 default: 127 default:
124 GNUNET_assert (0); 128 GNUNET_assert (0);
125 } 129 }
@@ -140,7 +144,7 @@ listen_cb (void *cls,
140 "listen cb called\n"); 144 "listen cb called\n");
141 GNUNET_SET_listen_cancel (listen_handle); 145 GNUNET_SET_listen_cancel (listen_handle);
142 oh = GNUNET_SET_accept (request, 146 oh = GNUNET_SET_accept (request,
143 GNUNET_SET_RESULT_FULL, 147 GNUNET_SET_RESULT_SYMMETRIC,
144 &result_cb_set2, 148 &result_cb_set2,
145 NULL); 149 NULL);
146 GNUNET_SET_commit (oh, set2); 150 GNUNET_SET_commit (oh, set2);
@@ -168,7 +172,7 @@ start (void *cls)
168 oh = GNUNET_SET_prepare (&local_id, 172 oh = GNUNET_SET_prepare (&local_id,
169 &app_id, 173 &app_id,
170 &context_msg, 174 &context_msg,
171 GNUNET_SET_RESULT_FULL, 175 GNUNET_SET_RESULT_SYMMETRIC,
172 &result_cb_set1, NULL); 176 &result_cb_set1, NULL);
173 GNUNET_SET_commit (oh, set1); 177 GNUNET_SET_commit (oh, set1);
174} 178}
@@ -353,7 +357,7 @@ main (int argc, char **argv)
353 { 357 {
354 return 1; 358 return 1;
355 } 359 }
356 GNUNET_assert (4 == count_set1); 360 GNUNET_assert (2 == count_set1);
357 GNUNET_assert (4 == count_set2); 361 GNUNET_assert (1 == count_set2);
358 return ret; 362 return ret;
359} 363}