diff options
author | Florian Dold <florian.dold@gmail.com> | 2015-09-27 04:32:52 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2015-09-27 04:32:52 +0000 |
commit | 9aceae8a9f91642665fa28730c961c9f90360bc1 (patch) | |
tree | 5390aca855b836d17720a65671f994089cb93196 /src/set | |
parent | 55ad4fa34348aaa05fadedeb830ea72cea5d7cc4 (diff) | |
download | gnunet-9aceae8a9f91642665fa28730c961c9f90360bc1.tar.gz gnunet-9aceae8a9f91642665fa28730c961c9f90360bc1.zip |
SET service: accurate results for symmetric mode
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/Makefile.am | 8 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 6 | ||||
-rw-r--r-- | src/set/gnunet-service-set_protocol.h | 8 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 776 | ||||
-rw-r--r-- | src/set/gnunet-set-profiler.c | 51 | ||||
-rw-r--r-- | src/set/set_api.c | 63 | ||||
-rw-r--r-- | src/set/test_set.conf | 44 | ||||
-rw-r--r-- | src/set/test_set_union_result_symmetric.c (renamed from src/set/test_set_union_result_full.c) | 20 |
8 files changed, 568 insertions, 408 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am index 669a28658..b7617db25 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am | |||
@@ -66,7 +66,7 @@ libgnunetset_la_LDFLAGS = \ | |||
66 | if HAVE_TESTING | 66 | if HAVE_TESTING |
67 | check_PROGRAMS = \ | 67 | check_PROGRAMS = \ |
68 | test_set_api \ | 68 | test_set_api \ |
69 | test_set_union_result_full \ | 69 | test_set_union_result_symmetric \ |
70 | test_set_intersection_result_full \ | 70 | test_set_intersection_result_full \ |
71 | test_set_union_copy | 71 | test_set_union_copy |
72 | endif | 72 | endif |
@@ -83,9 +83,9 @@ test_set_api_LDADD = \ | |||
83 | $(top_builddir)/src/testing/libgnunettesting.la \ | 83 | $(top_builddir)/src/testing/libgnunettesting.la \ |
84 | libgnunetset.la | 84 | libgnunetset.la |
85 | 85 | ||
86 | test_set_union_result_full_SOURCES = \ | 86 | test_set_union_result_symmetric_SOURCES = \ |
87 | test_set_union_result_full.c | 87 | test_set_union_result_symmetric.c |
88 | test_set_union_result_full_LDADD = \ | 88 | test_set_union_result_symmetric_LDADD = \ |
89 | $(top_builddir)/src/util/libgnunetutil.la \ | 89 | $(top_builddir)/src/util/libgnunetutil.la \ |
90 | $(top_builddir)/src/testing/libgnunettesting.la \ | 90 | $(top_builddir)/src/testing/libgnunettesting.la \ |
91 | libgnunetset.la | 91 | libgnunetset.la |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index d8e8dfb78..754bc96e0 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -560,6 +560,7 @@ set_destroy (struct Set *set) | |||
560 | NULL); | 560 | NULL); |
561 | GNUNET_CONTAINER_multihashmap_destroy (content->elements); | 561 | GNUNET_CONTAINER_multihashmap_destroy (content->elements); |
562 | content->elements = NULL; | 562 | content->elements = NULL; |
563 | GNUNET_free (content); | ||
563 | } | 564 | } |
564 | } | 565 | } |
565 | GNUNET_free_non_null (set->excluded_generations); | 566 | GNUNET_free_non_null (set->excluded_generations); |
@@ -1951,8 +1952,11 @@ run (void *cls, | |||
1951 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, | 1952 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, |
1952 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, | 1953 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, |
1953 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, | 1954 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, |
1954 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, | 1955 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0}, |
1956 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0}, | ||
1957 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0}, | ||
1955 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, | 1958 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, |
1959 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, | ||
1956 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, | 1960 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, |
1957 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, | 1961 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, |
1958 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, | 1962 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, |
diff --git a/src/set/gnunet-service-set_protocol.h b/src/set/gnunet-service-set_protocol.h index c8a3121a6..eee0dc5ae 100644 --- a/src/set/gnunet-service-set_protocol.h +++ b/src/set/gnunet-service-set_protocol.h | |||
@@ -58,6 +58,12 @@ struct OperationRequestMessage | |||
58 | }; | 58 | }; |
59 | 59 | ||
60 | 60 | ||
61 | /** | ||
62 | * Message containing buckets of an invertible bloom filter. | ||
63 | * | ||
64 | * If an IBF has too many buckets for an IBF message, | ||
65 | * it is split into multiple messages. | ||
66 | */ | ||
61 | struct IBFMessage | 67 | struct IBFMessage |
62 | { | 68 | { |
63 | /** | 69 | /** |
@@ -86,7 +92,7 @@ struct IBFMessage | |||
86 | */ | 92 | */ |
87 | uint32_t salt GNUNET_PACKED; | 93 | uint32_t salt GNUNET_PACKED; |
88 | 94 | ||
89 | /* rest: strata */ | 95 | /* rest: buckets */ |
90 | }; | 96 | }; |
91 | 97 | ||
92 | 98 | ||
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 5b452cae1..47abeaac8 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet | 2 | This file is part of GNUnet |
3 | Copyright (C) 2013 Christian Grothoff (and other contributing authors) | 3 | Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -31,6 +31,11 @@ | |||
31 | #include <gcrypt.h> | 31 | #include <gcrypt.h> |
32 | 32 | ||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__) | ||
35 | |||
36 | #define LOG_OP(kind,msg,op,...) GNUNET_log_from (kind, "set-union","[OP %x] " msg,((void *)op),__VA_ARGS__) | ||
37 | |||
38 | |||
34 | /** | 39 | /** |
35 | * Number of IBFs in a strata estimator. | 40 | * Number of IBFs in a strata estimator. |
36 | */ | 41 | */ |
@@ -40,7 +45,7 @@ | |||
40 | */ | 45 | */ |
41 | #define SE_IBF_SIZE 80 | 46 | #define SE_IBF_SIZE 80 |
42 | /** | 47 | /** |
43 | * hash num parameter for the difference digests and strata estimators | 48 | * The hash num parameter for the difference digests and strata estimators. |
44 | */ | 49 | */ |
45 | #define SE_IBF_HASH_NUM 4 | 50 | #define SE_IBF_HASH_NUM 4 |
46 | 51 | ||
@@ -69,7 +74,7 @@ | |||
69 | enum UnionOperationPhase | 74 | enum UnionOperationPhase |
70 | { | 75 | { |
71 | /** | 76 | /** |
72 | * We sent the request message, and expect a strata estimator | 77 | * We sent the request message, and expect a strata estimator. |
73 | */ | 78 | */ |
74 | PHASE_EXPECT_SE, | 79 | PHASE_EXPECT_SE, |
75 | 80 | ||
@@ -77,6 +82,8 @@ enum UnionOperationPhase | |||
77 | * We sent the strata estimator, and expect an IBF. This phase is entered once | 82 | * We sent the strata estimator, and expect an IBF. This phase is entered once |
78 | * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. | 83 | * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. |
79 | * | 84 | * |
85 | * XXX: could use better wording. | ||
86 | * | ||
80 | * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS | 87 | * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS |
81 | */ | 88 | */ |
82 | PHASE_EXPECT_IBF, | 89 | PHASE_EXPECT_IBF, |
@@ -87,33 +94,33 @@ enum UnionOperationPhase | |||
87 | PHASE_EXPECT_IBF_CONT, | 94 | PHASE_EXPECT_IBF_CONT, |
88 | 95 | ||
89 | /** | 96 | /** |
90 | * We are sending request and elements, | 97 | * We are decoding an IBF. |
91 | * and thus only expect elements from the other peer. | ||
92 | * | ||
93 | * We are currently decoding an IBF until it can no longer be decoded, | ||
94 | * we currently send requests and expect elements | ||
95 | * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS | ||
96 | */ | 98 | */ |
97 | PHASE_EXPECT_ELEMENTS, | 99 | PHASE_INVENTORY_ACTIVE, |
98 | 100 | ||
99 | /** | 101 | /** |
100 | * We are expecting elements and requests, and send | 102 | * The other peer is decoding the IBF we just sent. |
101 | * requested elements back to the other peer. | ||
102 | * | ||
103 | * We are in this phase if we have SENT an IBF for the remote peer to decode. | ||
104 | * We expect requests, send elements or could receive an new IBF, which takes | ||
105 | * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS | ||
106 | * | ||
107 | * The remote peer is thus in: | ||
108 | * #PHASE_EXPECT_ELEMENTS | ||
109 | */ | 103 | */ |
110 | PHASE_EXPECT_ELEMENTS_AND_REQUESTS, | 104 | PHASE_INVENTORY_PASSIVE, |
111 | 105 | ||
112 | /** | 106 | /** |
113 | * The protocol is over. | 107 | * The protocol is almost finished, but we still have to flush our message |
114 | * Results may still have to be sent to the client. | 108 | * queue and/or expect some elements. |
115 | */ | 109 | */ |
116 | PHASE_FINISHED | 110 | PHASE_FINISH_CLOSING, |
111 | |||
112 | /** | ||
113 | * In the penultimate phase, | ||
114 | * we wait until all our demands | ||
115 | * are satisfied. Then we send a done | ||
116 | * message, and wait for another done message.*/ | ||
117 | PHASE_FINISH_WAITING, | ||
118 | |||
119 | /** | ||
120 | * In the ultimate phase, we wait until | ||
121 | * our demands are satisfied and then | ||
122 | * quit (sending another DONE message). */ | ||
123 | PHASE_DONE, | ||
117 | }; | 124 | }; |
118 | 125 | ||
119 | 126 | ||
@@ -122,20 +129,19 @@ enum UnionOperationPhase | |||
122 | */ | 129 | */ |
123 | struct OperationState | 130 | struct OperationState |
124 | { | 131 | { |
125 | |||
126 | /** | 132 | /** |
127 | * Copy of the set's strata estimator at the time of | 133 | * Copy of the set's strata estimator at the time of |
128 | * creation of this operation | 134 | * creation of this operation. |
129 | */ | 135 | */ |
130 | struct StrataEstimator *se; | 136 | struct StrataEstimator *se; |
131 | 137 | ||
132 | /** | 138 | /** |
133 | * The ibf we currently receive | 139 | * The IBF we currently receive. |
134 | */ | 140 | */ |
135 | struct InvertibleBloomFilter *remote_ibf; | 141 | struct InvertibleBloomFilter *remote_ibf; |
136 | 142 | ||
137 | /** | 143 | /** |
138 | * IBF of the set's element. | 144 | * The IBF with the local set's element. |
139 | */ | 145 | */ |
140 | struct InvertibleBloomFilter *local_ibf; | 146 | struct InvertibleBloomFilter *local_ibf; |
141 | 147 | ||
@@ -147,11 +153,6 @@ struct OperationState | |||
147 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; | 153 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; |
148 | 154 | ||
149 | /** | 155 | /** |
150 | * Iterator for sending elements on the key to element mapping to the client. | ||
151 | */ | ||
152 | struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; | ||
153 | |||
154 | /** | ||
155 | * Current state of the operation. | 156 | * Current state of the operation. |
156 | */ | 157 | */ |
157 | enum UnionOperationPhase phase; | 158 | enum UnionOperationPhase phase; |
@@ -162,10 +163,14 @@ struct OperationState | |||
162 | int client_done_sent; | 163 | int client_done_sent; |
163 | 164 | ||
164 | /** | 165 | /** |
165 | * Number of ibf buckets received | 166 | * Number of ibf buckets already received into the @a remote_ibf. |
166 | */ | 167 | */ |
167 | unsigned int ibf_buckets_received; | 168 | unsigned int ibf_buckets_received; |
168 | 169 | ||
170 | /** | ||
171 | * Hashes for elements that we have demanded from the other peer. | ||
172 | */ | ||
173 | struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes; | ||
169 | }; | 174 | }; |
170 | 175 | ||
171 | 176 | ||
@@ -181,14 +186,11 @@ struct KeyEntry | |||
181 | 186 | ||
182 | /** | 187 | /** |
183 | * The actual element associated with the key. | 188 | * The actual element associated with the key. |
189 | * | ||
190 | * Only owned by the union operation if element->operation | ||
191 | * is #GNUNET_YES. | ||
184 | */ | 192 | */ |
185 | struct ElementEntry *element; | 193 | struct ElementEntry *element; |
186 | |||
187 | /** | ||
188 | * Element that collides with this element | ||
189 | * on the ibf key. All colliding entries must have the same ibf key. | ||
190 | */ | ||
191 | struct KeyEntry *next_colliding; | ||
192 | }; | 194 | }; |
193 | 195 | ||
194 | 196 | ||
@@ -215,7 +217,7 @@ struct SendElementClosure | |||
215 | /** | 217 | /** |
216 | * Extra state required for efficient set union. | 218 | * Extra state required for efficient set union. |
217 | */ | 219 | */ |
218 | struct SetState | 220 | struct SetState |
219 | { | 221 | { |
220 | /** | 222 | /** |
221 | * The strata estimator is only generated once for | 223 | * The strata estimator is only generated once for |
@@ -244,18 +246,13 @@ destroy_key_to_element_iter (void *cls, | |||
244 | { | 246 | { |
245 | struct KeyEntry *k = value; | 247 | struct KeyEntry *k = value; |
246 | 248 | ||
247 | while (NULL != k) | 249 | GNUNET_assert (NULL != k); |
250 | if (GNUNET_YES == k->element->remote) | ||
248 | { | 251 | { |
249 | struct KeyEntry *k_tmp = k; | 252 | GNUNET_free (k->element); |
250 | 253 | k->element = NULL; | |
251 | k = k->next_colliding; | ||
252 | if (GNUNET_YES == k_tmp->element->remote) | ||
253 | { | ||
254 | GNUNET_free (k_tmp->element); | ||
255 | k_tmp->element = NULL; | ||
256 | } | ||
257 | GNUNET_free (k_tmp); | ||
258 | } | 254 | } |
255 | GNUNET_free (k); | ||
259 | return GNUNET_YES; | 256 | return GNUNET_YES; |
260 | } | 257 | } |
261 | 258 | ||
@@ -269,8 +266,8 @@ destroy_key_to_element_iter (void *cls, | |||
269 | static void | 266 | static void |
270 | union_op_cancel (struct Operation *op) | 267 | union_op_cancel (struct Operation *op) |
271 | { | 268 | { |
272 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 269 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
273 | "destroying union op\n"); | 270 | "destroying union op\n"); |
274 | /* check if the op was canceled twice */ | 271 | /* check if the op was canceled twice */ |
275 | GNUNET_assert (NULL != op->state); | 272 | GNUNET_assert (NULL != op->state); |
276 | if (NULL != op->state->remote_ibf) | 273 | if (NULL != op->state->remote_ibf) |
@@ -278,6 +275,11 @@ union_op_cancel (struct Operation *op) | |||
278 | ibf_destroy (op->state->remote_ibf); | 275 | ibf_destroy (op->state->remote_ibf); |
279 | op->state->remote_ibf = NULL; | 276 | op->state->remote_ibf = NULL; |
280 | } | 277 | } |
278 | if (NULL != op->state->demanded_hashes) | ||
279 | { | ||
280 | GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); | ||
281 | op->state->demanded_hashes = NULL; | ||
282 | } | ||
281 | if (NULL != op->state->local_ibf) | 283 | if (NULL != op->state->local_ibf) |
282 | { | 284 | { |
283 | ibf_destroy (op->state->local_ibf); | 285 | ibf_destroy (op->state->local_ibf); |
@@ -298,8 +300,8 @@ union_op_cancel (struct Operation *op) | |||
298 | } | 300 | } |
299 | GNUNET_free (op->state); | 301 | GNUNET_free (op->state); |
300 | op->state = NULL; | 302 | op->state = NULL; |
301 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 303 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
302 | "destroying union op done\n"); | 304 | "destroying union op done\n"); |
303 | } | 305 | } |
304 | 306 | ||
305 | 307 | ||
@@ -315,8 +317,8 @@ fail_union_operation (struct Operation *op) | |||
315 | struct GNUNET_MQ_Envelope *ev; | 317 | struct GNUNET_MQ_Envelope *ev; |
316 | struct GNUNET_SET_ResultMessage *msg; | 318 | struct GNUNET_SET_ResultMessage *msg; |
317 | 319 | ||
318 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 320 | LOG (GNUNET_ERROR_TYPE_ERROR, |
319 | "union operation failed\n"); | 321 | "union operation failed\n"); |
320 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 322 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
321 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 323 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
322 | msg->request_id = htonl (op->spec->client_request_id); | 324 | msg->request_id = htonl (op->spec->client_request_id); |
@@ -340,55 +342,23 @@ get_ibf_key (const struct GNUNET_HashCode *src, | |||
340 | { | 342 | { |
341 | struct IBF_Key key; | 343 | struct IBF_Key key; |
342 | 344 | ||
343 | GNUNET_CRYPTO_hkdf (&key, sizeof (key), | 345 | GNUNET_CRYPTO_kdf (&key, sizeof (key), |
344 | GCRY_MD_SHA512, GCRY_MD_SHA256, | 346 | src, sizeof *src, |
345 | src, sizeof *src, | 347 | &salt, sizeof (salt), |
346 | &salt, sizeof (salt), | 348 | NULL, 0); |
347 | NULL, 0); | ||
348 | return key; | 349 | return key; |
349 | } | 350 | } |
350 | 351 | ||
351 | 352 | ||
352 | /** | 353 | /** |
353 | * Iterator to create the mapping between ibf keys | 354 | * Iterator over the mapping from IBF keys to element entries. Checks if we |
354 | * and element entries. | 355 | * have an element with a given GNUNET_HashCode. |
355 | * | 356 | * |
356 | * @param cls closure | 357 | * @param cls closure |
357 | * @param key current key code | 358 | * @param key current key code |
358 | * @param value value in the hash map | 359 | * @param value value in the hash map |
359 | * @return #GNUNET_YES if we should continue to iterate, | 360 | * @return #GNUNET_YES if we should search further, |
360 | * #GNUNET_NO if not. | 361 | * #GNUNET_NO if we've found the element. |
361 | */ | ||
362 | static int | ||
363 | op_register_element_iterator (void *cls, | ||
364 | uint32_t key, | ||
365 | void *value) | ||
366 | { | ||
367 | struct KeyEntry *const new_k = cls; | ||
368 | struct KeyEntry *old_k = value; | ||
369 | |||
370 | GNUNET_assert (NULL != old_k); | ||
371 | /* check if our ibf key collides with the ibf key in the existing entry */ | ||
372 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | ||
373 | { | ||
374 | /* insert the the new key in the collision chain */ | ||
375 | new_k->next_colliding = old_k->next_colliding; | ||
376 | old_k->next_colliding = new_k; | ||
377 | /* signal to the caller that we were able to insert into a colliding bucket */ | ||
378 | return GNUNET_NO; | ||
379 | } | ||
380 | return GNUNET_YES; | ||
381 | } | ||
382 | |||
383 | |||
384 | /** | ||
385 | * Iterator to create the mapping between ibf keys | ||
386 | * and element entries. | ||
387 | * | ||
388 | * @param cls closure | ||
389 | * @param key current key code | ||
390 | * @param value value in the hash map | ||
391 | * @return #GNUNET_YES (we should continue to iterate) | ||
392 | */ | 362 | */ |
393 | static int | 363 | static int |
394 | op_has_element_iterator (void *cls, | 364 | op_has_element_iterator (void *cls, |
@@ -399,13 +369,9 @@ op_has_element_iterator (void *cls, | |||
399 | struct KeyEntry *k = value; | 369 | struct KeyEntry *k = value; |
400 | 370 | ||
401 | GNUNET_assert (NULL != k); | 371 | GNUNET_assert (NULL != k); |
402 | while (NULL != k) | 372 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, |
403 | { | 373 | element_hash)) |
404 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, | 374 | return GNUNET_NO; |
405 | element_hash)) | ||
406 | return GNUNET_NO; | ||
407 | k = k->next_colliding; | ||
408 | } | ||
409 | return GNUNET_YES; | 375 | return GNUNET_YES; |
410 | } | 376 | } |
411 | 377 | ||
@@ -446,6 +412,8 @@ op_has_element (struct Operation *op, | |||
446 | * This is done to speed up re-tried operations, if some elements | 412 | * This is done to speed up re-tried operations, if some elements |
447 | * were transmitted, and then the IBF fails to decode. | 413 | * were transmitted, and then the IBF fails to decode. |
448 | * | 414 | * |
415 | * XXX: clarify ownership, doesn't sound right. | ||
416 | * | ||
449 | * @param op the union operation | 417 | * @param op the union operation |
450 | * @param ee the element entry | 418 | * @param ee the element entry |
451 | */ | 419 | */ |
@@ -453,7 +421,6 @@ static void | |||
453 | op_register_element (struct Operation *op, | 421 | op_register_element (struct Operation *op, |
454 | struct ElementEntry *ee) | 422 | struct ElementEntry *ee) |
455 | { | 423 | { |
456 | int ret; | ||
457 | struct IBF_Key ibf_key; | 424 | struct IBF_Key ibf_key; |
458 | struct KeyEntry *k; | 425 | struct KeyEntry *k; |
459 | 426 | ||
@@ -461,18 +428,11 @@ op_register_element (struct Operation *op, | |||
461 | k = GNUNET_new (struct KeyEntry); | 428 | k = GNUNET_new (struct KeyEntry); |
462 | k->element = ee; | 429 | k->element = ee; |
463 | k->ibf_key = ibf_key; | 430 | k->ibf_key = ibf_key; |
464 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | 431 | GNUNET_assert (GNUNET_OK == |
432 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, | ||
465 | (uint32_t) ibf_key.key_val, | 433 | (uint32_t) ibf_key.key_val, |
466 | op_register_element_iterator, | 434 | k, |
467 | k); | 435 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
468 | |||
469 | /* was the element inserted into a colliding bucket? */ | ||
470 | if (GNUNET_SYSERR == ret) | ||
471 | return; | ||
472 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, | ||
473 | (uint32_t) ibf_key.key_val, | ||
474 | k, | ||
475 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
476 | } | 436 | } |
477 | 437 | ||
478 | 438 | ||
@@ -488,13 +448,15 @@ prepare_ibf_iterator (void *cls, | |||
488 | uint32_t key, | 448 | uint32_t key, |
489 | void *value) | 449 | void *value) |
490 | { | 450 | { |
491 | struct InvertibleBloomFilter *ibf = cls; | 451 | struct Operation *op = cls; |
492 | struct KeyEntry *ke = value; | 452 | struct KeyEntry *ke = value; |
493 | 453 | ||
494 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 454 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
495 | "inserting %x into ibf\n", | 455 | "[OP %x] inserting %lx (hash %s) into ibf\n", |
496 | ke->ibf_key.key_val); | 456 | (void *) op, |
497 | ibf_insert (ibf, ke->ibf_key); | 457 | (unsigned long) ke->ibf_key.key_val, |
458 | GNUNET_h2s (&ke->element->element_hash)); | ||
459 | ibf_insert (op->state->local_ibf, ke->ibf_key); | ||
498 | return GNUNET_YES; | 460 | return GNUNET_YES; |
499 | } | 461 | } |
500 | 462 | ||
@@ -554,13 +516,15 @@ prepare_ibf (struct Operation *op, | |||
554 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 516 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
555 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | 517 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
556 | &prepare_ibf_iterator, | 518 | &prepare_ibf_iterator, |
557 | op->state->local_ibf); | 519 | op); |
558 | } | 520 | } |
559 | 521 | ||
560 | 522 | ||
561 | /** | 523 | /** |
562 | * Send an ibf of appropriate size. | 524 | * Send an ibf of appropriate size. |
563 | * | 525 | * |
526 | * Fragments the IBF into multiple messages if necessary. | ||
527 | * | ||
564 | * @param op the union operation | 528 | * @param op the union operation |
565 | * @param ibf_order order of the ibf to send, size=2^order | 529 | * @param ibf_order order of the ibf to send, size=2^order |
566 | */ | 530 | */ |
@@ -573,9 +537,9 @@ send_ibf (struct Operation *op, | |||
573 | 537 | ||
574 | prepare_ibf (op, 1<<ibf_order); | 538 | prepare_ibf (op, 1<<ibf_order); |
575 | 539 | ||
576 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 540 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
577 | "sending ibf of size %u\n", | 541 | "sending ibf of size %u\n", |
578 | 1<<ibf_order); | 542 | 1<<ibf_order); |
579 | 543 | ||
580 | ibf = op->state->local_ibf; | 544 | ibf = op->state->local_ibf; |
581 | 545 | ||
@@ -599,7 +563,7 @@ send_ibf (struct Operation *op, | |||
599 | ibf_write_slice (ibf, buckets_sent, | 563 | ibf_write_slice (ibf, buckets_sent, |
600 | buckets_in_message, &msg[1]); | 564 | buckets_in_message, &msg[1]); |
601 | buckets_sent += buckets_in_message; | 565 | buckets_sent += buckets_in_message; |
602 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 566 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
603 | "ibf chunk size %u, %u/%u sent\n", | 567 | "ibf chunk size %u, %u/%u sent\n", |
604 | buckets_in_message, | 568 | buckets_in_message, |
605 | buckets_sent, | 569 | buckets_sent, |
@@ -607,7 +571,9 @@ send_ibf (struct Operation *op, | |||
607 | GNUNET_MQ_send (op->mq, ev); | 571 | GNUNET_MQ_send (op->mq, ev); |
608 | } | 572 | } |
609 | 573 | ||
610 | op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; | 574 | /* The other peer must decode the IBF, so |
575 | * we're passive. */ | ||
576 | op->state->phase = PHASE_INVENTORY_PASSIVE; | ||
611 | } | 577 | } |
612 | 578 | ||
613 | 579 | ||
@@ -629,7 +595,7 @@ send_strata_estimator (struct Operation *op) | |||
629 | GNUNET_MQ_send (op->mq, | 595 | GNUNET_MQ_send (op->mq, |
630 | ev); | 596 | ev); |
631 | op->state->phase = PHASE_EXPECT_IBF; | 597 | op->state->phase = PHASE_EXPECT_IBF; |
632 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 598 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
633 | "sent SE, expecting IBF\n"); | 599 | "sent SE, expecting IBF\n"); |
634 | } | 600 | } |
635 | 601 | ||
@@ -696,7 +662,7 @@ handle_p2p_strata_estimator (void *cls, | |||
696 | strata_estimator_destroy (remote_se); | 662 | strata_estimator_destroy (remote_se); |
697 | strata_estimator_destroy (op->state->se); | 663 | strata_estimator_destroy (op->state->se); |
698 | op->state->se = NULL; | 664 | op->state->se = NULL; |
699 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 665 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
700 | "got se diff=%d, using ibf size %d\n", | 666 | "got se diff=%d, using ibf size %d\n", |
701 | diff, | 667 | diff, |
702 | 1<<get_order_from_difference (diff)); | 668 | 1<<get_order_from_difference (diff)); |
@@ -714,56 +680,44 @@ handle_p2p_strata_estimator (void *cls, | |||
714 | * @param value the key entry | 680 | * @param value the key entry |
715 | */ | 681 | */ |
716 | static int | 682 | static int |
717 | send_element_iterator (void *cls, | 683 | send_offers_iterator (void *cls, |
718 | uint32_t key, | 684 | uint32_t key, |
719 | void *value) | 685 | void *value) |
720 | { | 686 | { |
721 | struct SendElementClosure *sec = cls; | 687 | struct SendElementClosure *sec = cls; |
722 | struct IBF_Key ibf_key = sec->ibf_key; | ||
723 | struct Operation *op = sec->op; | 688 | struct Operation *op = sec->op; |
724 | struct KeyEntry *ke = value; | 689 | struct KeyEntry *ke = value; |
690 | struct GNUNET_MQ_Envelope *ev; | ||
691 | struct GNUNET_MessageHeader *mh; | ||
725 | 692 | ||
726 | if (ke->ibf_key.key_val != ibf_key.key_val) | 693 | /* Detect 32-bit key collision for the 64-bit IBF keys. */ |
694 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) | ||
727 | return GNUNET_YES; | 695 | return GNUNET_YES; |
728 | while (NULL != ke) | ||
729 | { | ||
730 | const struct GNUNET_SET_Element *const element = &ke->element->element; | ||
731 | struct GNUNET_MQ_Envelope *ev; | ||
732 | struct GNUNET_MessageHeader *mh; | ||
733 | 696 | ||
734 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); | 697 | ev = GNUNET_MQ_msg_header_extra (mh, |
735 | ev = GNUNET_MQ_msg_header_extra (mh, | 698 | sizeof (struct GNUNET_HashCode), |
736 | element->size, | 699 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); |
737 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | 700 | |
738 | if (NULL == ev) | 701 | GNUNET_assert (NULL != ev); |
739 | { | 702 | *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; |
740 | /* element too large */ | 703 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
741 | GNUNET_break (0); | 704 | "[OP %x] sending element offer (%s) to peer\n", |
742 | continue; | 705 | (void *) op, |
743 | } | 706 | GNUNET_h2s (&ke->element->element_hash)); |
744 | memcpy (&mh[1], | 707 | GNUNET_MQ_send (op->mq, ev); |
745 | element->data, | 708 | return GNUNET_YES; |
746 | element->size); | ||
747 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
748 | "sending element (%s) to peer\n", | ||
749 | GNUNET_h2s (&ke->element->element_hash)); | ||
750 | GNUNET_MQ_send (op->mq, ev); | ||
751 | ke = ke->next_colliding; | ||
752 | } | ||
753 | return GNUNET_NO; | ||
754 | } | 709 | } |
755 | 710 | ||
756 | 711 | ||
757 | /** | 712 | /** |
758 | * Send all elements that have the specified IBF key | 713 | * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key. |
759 | * to the remote peer of the union operation | ||
760 | * | 714 | * |
761 | * @param op union operation | 715 | * @param op union operation |
762 | * @param ibf_key IBF key of interest | 716 | * @param ibf_key IBF key of interest |
763 | */ | 717 | */ |
764 | static void | 718 | static void |
765 | send_elements_for_key (struct Operation *op, | 719 | send_offers_for_key (struct Operation *op, |
766 | struct IBF_Key ibf_key) | 720 | struct IBF_Key ibf_key) |
767 | { | 721 | { |
768 | struct SendElementClosure send_cls; | 722 | struct SendElementClosure send_cls; |
769 | 723 | ||
@@ -771,14 +725,14 @@ send_elements_for_key (struct Operation *op, | |||
771 | send_cls.op = op; | 725 | send_cls.op = op; |
772 | (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | 726 | (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, |
773 | (uint32_t) ibf_key.key_val, | 727 | (uint32_t) ibf_key.key_val, |
774 | &send_element_iterator, | 728 | &send_offers_iterator, |
775 | &send_cls); | 729 | &send_cls); |
776 | } | 730 | } |
777 | 731 | ||
778 | 732 | ||
779 | /** | 733 | /** |
780 | * Decode which elements are missing on each side, and | 734 | * Decode which elements are missing on each side, and |
781 | * send the appropriate elemens and requests | 735 | * send the appropriate offers and inquiries. |
782 | * | 736 | * |
783 | * @param op union operation | 737 | * @param op union operation |
784 | */ | 738 | */ |
@@ -791,7 +745,7 @@ decode_and_send (struct Operation *op) | |||
791 | unsigned int num_decoded; | 745 | unsigned int num_decoded; |
792 | struct InvertibleBloomFilter *diff_ibf; | 746 | struct InvertibleBloomFilter *diff_ibf; |
793 | 747 | ||
794 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase); | 748 | GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); |
795 | 749 | ||
796 | prepare_ibf (op, op->state->remote_ibf->size); | 750 | prepare_ibf (op, op->state->remote_ibf->size); |
797 | diff_ibf = ibf_dup (op->state->local_ibf); | 751 | diff_ibf = ibf_dup (op->state->local_ibf); |
@@ -800,7 +754,7 @@ decode_and_send (struct Operation *op) | |||
800 | ibf_destroy (op->state->remote_ibf); | 754 | ibf_destroy (op->state->remote_ibf); |
801 | op->state->remote_ibf = NULL; | 755 | op->state->remote_ibf = NULL; |
802 | 756 | ||
803 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 757 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
804 | "decoding IBF (size=%u)\n", | 758 | "decoding IBF (size=%u)\n", |
805 | diff_ibf->size); | 759 | diff_ibf->size); |
806 | 760 | ||
@@ -817,14 +771,14 @@ decode_and_send (struct Operation *op) | |||
817 | res = ibf_decode (diff_ibf, &side, &key); | 771 | res = ibf_decode (diff_ibf, &side, &key); |
818 | if (res == GNUNET_OK) | 772 | if (res == GNUNET_OK) |
819 | { | 773 | { |
820 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 774 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
821 | "decoded ibf key %lx\n", | 775 | "decoded ibf key %lx\n", |
822 | key.key_val); | 776 | (unsigned long) key.key_val); |
823 | num_decoded += 1; | 777 | num_decoded += 1; |
824 | if ( (num_decoded > diff_ibf->size) || | 778 | if ( (num_decoded > diff_ibf->size) || |
825 | (num_decoded > 1 && last_key.key_val == key.key_val) ) | 779 | (num_decoded > 1 && last_key.key_val == key.key_val) ) |
826 | { | 780 | { |
827 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 781 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
828 | "detected cyclic ibf (decoded %u/%u)\n", | 782 | "detected cyclic ibf (decoded %u/%u)\n", |
829 | num_decoded, | 783 | num_decoded, |
830 | diff_ibf->size); | 784 | diff_ibf->size); |
@@ -841,15 +795,17 @@ decode_and_send (struct Operation *op) | |||
841 | next_order++; | 795 | next_order++; |
842 | if (next_order <= MAX_IBF_ORDER) | 796 | if (next_order <= MAX_IBF_ORDER) |
843 | { | 797 | { |
844 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 798 | LOG_OP (GNUNET_ERROR_TYPE_DEBUG, |
845 | "decoding failed, sending larger ibf (size %u)\n", | 799 | "decoding failed, sending larger ibf (size %u)\n", |
846 | 1<<next_order); | 800 | op, |
801 | 1<<next_order); | ||
847 | send_ibf (op, next_order); | 802 | send_ibf (op, next_order); |
848 | } | 803 | } |
849 | else | 804 | else |
850 | { | 805 | { |
851 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 806 | // XXX: Send the whole set, element-by-element |
852 | "set union failed: reached ibf limit\n"); | 807 | LOG (GNUNET_ERROR_TYPE_ERROR, |
808 | "set union failed: reached ibf limit\n"); | ||
853 | } | 809 | } |
854 | break; | 810 | break; |
855 | } | 811 | } |
@@ -857,32 +813,36 @@ decode_and_send (struct Operation *op) | |||
857 | { | 813 | { |
858 | struct GNUNET_MQ_Envelope *ev; | 814 | struct GNUNET_MQ_Envelope *ev; |
859 | 815 | ||
860 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 816 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
861 | "transmitted all values, sending DONE\n"); | 817 | "transmitted all values, sending DONE\n"); |
862 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | 818 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); |
863 | GNUNET_MQ_send (op->mq, ev); | 819 | GNUNET_MQ_send (op->mq, ev); |
820 | /* We now wait until we get a DONE message back | ||
821 | * and then wait for our MQ to be flushed and all our | ||
822 | * demands be delivered. */ | ||
864 | break; | 823 | break; |
865 | } | 824 | } |
866 | if (1 == side) | 825 | if (1 == side) |
867 | { | 826 | { |
868 | send_elements_for_key (op, key); | 827 | send_offers_for_key (op, key); |
869 | } | 828 | } |
870 | else if (-1 == side) | 829 | else if (-1 == side) |
871 | { | 830 | { |
872 | struct GNUNET_MQ_Envelope *ev; | 831 | struct GNUNET_MQ_Envelope *ev; |
873 | struct GNUNET_MessageHeader *msg; | 832 | struct GNUNET_MessageHeader *msg; |
874 | 833 | ||
875 | /* It may be nice to merge multiple requests, but with cadet's corking it is not worth | 834 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth |
876 | * the effort additional complexity. */ | 835 | * the effort additional complexity. */ |
877 | ev = GNUNET_MQ_msg_header_extra (msg, | 836 | ev = GNUNET_MQ_msg_header_extra (msg, |
878 | sizeof (struct IBF_Key), | 837 | sizeof (struct IBF_Key), |
879 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | 838 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); |
880 | 839 | ||
881 | memcpy (&msg[1], | 840 | memcpy (&msg[1], |
882 | &key, | 841 | &key, |
883 | sizeof (struct IBF_Key)); | 842 | sizeof (struct IBF_Key)); |
884 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 843 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
885 | "sending element request\n"); | 844 | "sending element inquiry for IBF key %lx\n", |
845 | (unsigned long) key.key_val); | ||
886 | GNUNET_MQ_send (op->mq, ev); | 846 | GNUNET_MQ_send (op->mq, ev); |
887 | } | 847 | } |
888 | else | 848 | else |
@@ -897,6 +857,9 @@ decode_and_send (struct Operation *op) | |||
897 | /** | 857 | /** |
898 | * Handle an IBF message from a remote peer. | 858 | * Handle an IBF message from a remote peer. |
899 | * | 859 | * |
860 | * Reassemble the IBF from multiple pieces, and | ||
861 | * process the whole IBF once possible. | ||
862 | * | ||
900 | * @param cls the union operation | 863 | * @param cls the union operation |
901 | * @param mh the header of the message | 864 | * @param mh the header of the message |
902 | * @return #GNUNET_SYSERR if the tunnel should be disconnected, | 865 | * @return #GNUNET_SYSERR if the tunnel should be disconnected, |
@@ -917,12 +880,12 @@ handle_p2p_ibf (void *cls, | |||
917 | return GNUNET_SYSERR; | 880 | return GNUNET_SYSERR; |
918 | } | 881 | } |
919 | msg = (const struct IBFMessage *) mh; | 882 | msg = (const struct IBFMessage *) mh; |
920 | if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || | 883 | if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || |
921 | (op->state->phase == PHASE_EXPECT_IBF) ) | 884 | (op->state->phase == PHASE_EXPECT_IBF) ) |
922 | { | 885 | { |
923 | op->state->phase = PHASE_EXPECT_IBF_CONT; | 886 | op->state->phase = PHASE_EXPECT_IBF_CONT; |
924 | GNUNET_assert (NULL == op->state->remote_ibf); | 887 | GNUNET_assert (NULL == op->state->remote_ibf); |
925 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 888 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
926 | "Creating new ibf of size %u\n", | 889 | "Creating new ibf of size %u\n", |
927 | 1 << msg->order); | 890 | 1 << msg->order); |
928 | op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 891 | op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); |
@@ -944,6 +907,13 @@ handle_p2p_ibf (void *cls, | |||
944 | return GNUNET_SYSERR; | 907 | return GNUNET_SYSERR; |
945 | } | 908 | } |
946 | } | 909 | } |
910 | else | ||
911 | { | ||
912 | LOG_OP (GNUNET_ERROR_TYPE_DEBUG, | ||
913 | "wrong phase\n", | ||
914 | op, NULL); | ||
915 | GNUNET_assert (0); | ||
916 | } | ||
947 | 917 | ||
948 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 918 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; |
949 | 919 | ||
@@ -961,6 +931,8 @@ handle_p2p_ibf (void *cls, | |||
961 | return GNUNET_SYSERR; | 931 | return GNUNET_SYSERR; |
962 | } | 932 | } |
963 | 933 | ||
934 | GNUNET_assert (NULL != op->state->remote_ibf); | ||
935 | |||
964 | ibf_read_slice (&msg[1], | 936 | ibf_read_slice (&msg[1], |
965 | op->state->ibf_buckets_received, | 937 | op->state->ibf_buckets_received, |
966 | buckets_in_message, | 938 | buckets_in_message, |
@@ -969,9 +941,9 @@ handle_p2p_ibf (void *cls, | |||
969 | 941 | ||
970 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) | 942 | if (op->state->ibf_buckets_received == op->state->remote_ibf->size) |
971 | { | 943 | { |
972 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 944 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
973 | "received full ibf\n"); | 945 | "received full ibf\n"); |
974 | op->state->phase = PHASE_EXPECT_ELEMENTS; | 946 | op->state->phase = PHASE_INVENTORY_ACTIVE; |
975 | decode_and_send (op); | 947 | decode_and_send (op); |
976 | } | 948 | } |
977 | return GNUNET_OK; | 949 | return GNUNET_OK; |
@@ -984,15 +956,17 @@ handle_p2p_ibf (void *cls, | |||
984 | * | 956 | * |
985 | * @param op union operation | 957 | * @param op union operation |
986 | * @param element element to send | 958 | * @param element element to send |
959 | * @param status status to send with the new element | ||
987 | */ | 960 | */ |
988 | static void | 961 | static void |
989 | send_client_element (struct Operation *op, | 962 | send_client_element (struct Operation *op, |
990 | struct GNUNET_SET_Element *element) | 963 | struct GNUNET_SET_Element *element, |
964 | int status) | ||
991 | { | 965 | { |
992 | struct GNUNET_MQ_Envelope *ev; | 966 | struct GNUNET_MQ_Envelope *ev; |
993 | struct GNUNET_SET_ResultMessage *rm; | 967 | struct GNUNET_SET_ResultMessage *rm; |
994 | 968 | ||
995 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 969 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
996 | "sending element (size %u) to client\n", | 970 | "sending element (size %u) to client\n", |
997 | element->size); | 971 | element->size); |
998 | GNUNET_assert (0 != op->spec->client_request_id); | 972 | GNUNET_assert (0 != op->spec->client_request_id); |
@@ -1003,12 +977,7 @@ send_client_element (struct Operation *op, | |||
1003 | GNUNET_break (0); | 977 | GNUNET_break (0); |
1004 | return; | 978 | return; |
1005 | } | 979 | } |
1006 | 980 | rm->result_status = htons (status); | |
1007 | if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode) | ||
1008 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | ||
1009 | else if (GNUNET_SET_RESULT_SYMMETRIC == op->spec->result_mode) | ||
1010 | rm->result_status = htons (GNUNET_SET_STATUS_ADD_LOCAL); | ||
1011 | |||
1012 | rm->request_id = htonl (op->spec->client_request_id); | 981 | rm->request_id = htonl (op->spec->client_request_id); |
1013 | rm->element_type = element->element_type; | 982 | rm->element_type = element->element_type; |
1014 | memcpy (&rm[1], element->data, element->size); | 983 | memcpy (&rm[1], element->data, element->size); |
@@ -1034,98 +1003,46 @@ send_done_and_destroy (void *cls) | |||
1034 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 1003 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1035 | rm->element_type = htons (0); | 1004 | rm->element_type = htons (0); |
1036 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | 1005 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
1006 | /* Will also call the union-specific cancel function. */ | ||
1037 | _GSS_operation_destroy (op, GNUNET_YES); | 1007 | _GSS_operation_destroy (op, GNUNET_YES); |
1038 | op->keep--; | ||
1039 | if (0 == op->keep) | ||
1040 | GNUNET_free (op); | ||
1041 | } | 1008 | } |
1042 | 1009 | ||
1043 | 1010 | ||
1044 | /** | ||
1045 | * Send all remaining elements in the full result iterator. | ||
1046 | * | ||
1047 | * @param cls operation | ||
1048 | */ | ||
1049 | static void | 1011 | static void |
1050 | send_remaining_elements (void *cls) | 1012 | maybe_finish (struct Operation *op) |
1051 | { | 1013 | { |
1052 | struct Operation *op = cls; | 1014 | unsigned int num_demanded; |
1053 | struct KeyEntry *ke; | ||
1054 | int res; | ||
1055 | 1015 | ||
1056 | res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, | 1016 | num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes); |
1057 | NULL, | 1017 | |
1058 | (const void **) &ke); | 1018 | if (PHASE_FINISH_WAITING == op->state->phase) |
1059 | if (GNUNET_NO == res) | ||
1060 | { | ||
1061 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1062 | "sending done and destroy because iterator ran out\n"); | ||
1063 | send_done_and_destroy (op); | ||
1064 | return; | ||
1065 | } | ||
1066 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1067 | "sending elements from key entry\n"); | ||
1068 | while (1) | ||
1069 | { | 1019 | { |
1070 | struct GNUNET_MQ_Envelope *ev; | 1020 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1071 | struct GNUNET_SET_ResultMessage *rm; | 1021 | "In PHASE_FINISH_WAITING, pending %u demands\n", |
1072 | struct GNUNET_SET_Element *element; | 1022 | num_demanded); |
1073 | 1023 | if (0 == num_demanded) | |
1074 | element = &ke->element->element; | ||
1075 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1076 | "sending element (size %u) to client (full set)\n", | ||
1077 | element->size); | ||
1078 | GNUNET_assert (0 != op->spec->client_request_id); | ||
1079 | ev = GNUNET_MQ_msg_extra (rm, | ||
1080 | element->size, | ||
1081 | GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
1082 | if (NULL == ev) | ||
1083 | { | 1024 | { |
1084 | GNUNET_MQ_discard (ev); | 1025 | struct GNUNET_MQ_Envelope *ev; |
1085 | GNUNET_break (0); | ||
1086 | continue; | ||
1087 | } | ||
1088 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | ||
1089 | rm->request_id = htonl (op->spec->client_request_id); | ||
1090 | rm->element_type = element->element_type; | ||
1091 | memcpy (&rm[1], element->data, element->size); | ||
1092 | if (NULL == ke->next_colliding) | ||
1093 | { | ||
1094 | GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); | ||
1095 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1096 | break; | ||
1097 | } | ||
1098 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1099 | ke = ke->next_colliding; | ||
1100 | } | ||
1101 | } | ||
1102 | 1026 | ||
1027 | op->state->phase = PHASE_DONE; | ||
1028 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | ||
1029 | GNUNET_MQ_send (op->mq, ev); | ||
1103 | 1030 | ||
1104 | /** | 1031 | /* We now wait until the other peer closes the channel |
1105 | * Send a result message to the client indicating | 1032 | * after it got all elements from us. */ |
1106 | * that the operation is over. | 1033 | } |
1107 | * After the result done message has been sent to the client, | 1034 | } |
1108 | * destroy the evaluate operation. | 1035 | if (PHASE_FINISH_CLOSING == op->state->phase) |
1109 | * | ||
1110 | * @param op union operation | ||
1111 | */ | ||
1112 | static void | ||
1113 | finish_and_destroy (struct Operation *op) | ||
1114 | { | ||
1115 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); | ||
1116 | op->keep++; | ||
1117 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) | ||
1118 | { | 1036 | { |
1119 | /* prevent that the op is free'd by the tunnel end handler */ | 1037 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1120 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1038 | "In PHASE_FINISH_CLOSING, pending %u demands\n", |
1121 | "sending full result set\n"); | 1039 | num_demanded); |
1122 | GNUNET_assert (NULL == op->state->full_result_iter); | 1040 | if (0 == num_demanded) |
1123 | op->state->full_result_iter = | 1041 | { |
1124 | GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element); | 1042 | op->state->phase = PHASE_DONE; |
1125 | send_remaining_elements (op); | 1043 | send_done_and_destroy (op); |
1126 | return; | 1044 | } |
1127 | } | 1045 | } |
1128 | send_done_and_destroy (op); | ||
1129 | } | 1046 | } |
1130 | 1047 | ||
1131 | 1048 | ||
@@ -1141,65 +1058,106 @@ handle_p2p_elements (void *cls, | |||
1141 | { | 1058 | { |
1142 | struct Operation *op = cls; | 1059 | struct Operation *op = cls; |
1143 | struct ElementEntry *ee; | 1060 | struct ElementEntry *ee; |
1061 | const struct GNUNET_SET_ElementMessage *emsg; | ||
1144 | uint16_t element_size; | 1062 | uint16_t element_size; |
1145 | 1063 | ||
1146 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1064 | if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) |
1147 | "Got element from peer\n"); | ||
1148 | if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) && | ||
1149 | (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) | ||
1150 | { | 1065 | { |
1066 | GNUNET_break_op (0); | ||
1151 | fail_union_operation (op); | 1067 | fail_union_operation (op); |
1068 | return; | ||
1069 | } | ||
1070 | |||
1071 | if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) | ||
1072 | { | ||
1152 | GNUNET_break_op (0); | 1073 | GNUNET_break_op (0); |
1074 | fail_union_operation (op); | ||
1153 | return; | 1075 | return; |
1154 | } | 1076 | } |
1155 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); | 1077 | |
1078 | emsg = (struct GNUNET_SET_ElementMessage *) mh; | ||
1079 | |||
1080 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); | ||
1156 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | 1081 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); |
1157 | memcpy (&ee[1], &mh[1], element_size); | 1082 | memcpy (&ee[1], &emsg[1], element_size); |
1158 | ee->element.size = element_size; | 1083 | ee->element.size = element_size; |
1159 | ee->element.data = &ee[1]; | 1084 | ee->element.data = &ee[1]; |
1085 | ee->element.element_type = ntohs (emsg->element_type); | ||
1160 | ee->remote = GNUNET_YES; | 1086 | ee->remote = GNUNET_YES; |
1161 | GNUNET_CRYPTO_hash (ee->element.data, | 1087 | GNUNET_CRYPTO_hash (ee->element.data, |
1162 | ee->element.size, | 1088 | ee->element.size, |
1163 | &ee->element_hash); | 1089 | &ee->element_hash); |
1164 | 1090 | ||
1165 | if (GNUNET_YES == op_has_element (op, &ee->element_hash)) | 1091 | if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, &ee->element_hash, NULL)) |
1166 | { | 1092 | { |
1167 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1093 | /* We got something we didn't demand, since it's not in our map. */ |
1168 | "got existing element from peer\n"); | 1094 | GNUNET_break_op (0); |
1169 | GNUNET_free (ee); | 1095 | GNUNET_free (ee); |
1096 | fail_union_operation (op); | ||
1170 | return; | 1097 | return; |
1171 | } | 1098 | } |
1172 | 1099 | ||
1173 | op_register_element (op, ee); | 1100 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1174 | /* only send results immediately if the client wants it */ | 1101 | "Got element (size %u, hash %s) from peer\n", |
1175 | if (GNUNET_SET_RESULT_FULL != op->spec->result_mode) | 1102 | (unsigned int) element_size, |
1176 | send_client_element (op, &ee->element); | 1103 | GNUNET_h2s (&ee->element_hash)); |
1104 | |||
1105 | if (GNUNET_YES == op_has_element (op, &ee->element_hash)) | ||
1106 | { | ||
1107 | /* Got repeated element. Should not happen since | ||
1108 | * we track demands. */ | ||
1109 | GNUNET_break (0); | ||
1110 | GNUNET_free (ee); | ||
1111 | } | ||
1112 | else | ||
1113 | { | ||
1114 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1115 | "Registering new element from remote peer\n"); | ||
1116 | op_register_element (op, ee); | ||
1117 | /* only send results immediately if the client wants it */ | ||
1118 | switch (op->spec->result_mode) | ||
1119 | { | ||
1120 | case GNUNET_SET_RESULT_ADDED: | ||
1121 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); | ||
1122 | break; | ||
1123 | case GNUNET_SET_RESULT_SYMMETRIC: | ||
1124 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | ||
1125 | break; | ||
1126 | default: | ||
1127 | /* Result mode not supported, should have been caught earlier. */ | ||
1128 | GNUNET_break (0); | ||
1129 | break; | ||
1130 | } | ||
1131 | } | ||
1132 | |||
1133 | maybe_finish (op); | ||
1177 | } | 1134 | } |
1178 | 1135 | ||
1179 | 1136 | ||
1180 | /** | 1137 | /** |
1181 | * Handle an element request from a remote peer. | 1138 | * Send offers (for GNUNET_Hash-es) in response |
1139 | * to inquiries (for IBF_Key-s). | ||
1182 | * | 1140 | * |
1183 | * @param cls the union operation | 1141 | * @param cls the union operation |
1184 | * @param mh the message | 1142 | * @param mh the message |
1185 | */ | 1143 | */ |
1186 | static void | 1144 | static void |
1187 | handle_p2p_element_requests (void *cls, | 1145 | handle_p2p_inquiry (void *cls, |
1188 | const struct GNUNET_MessageHeader *mh) | 1146 | const struct GNUNET_MessageHeader *mh) |
1189 | { | 1147 | { |
1190 | struct Operation *op = cls; | 1148 | struct Operation *op = cls; |
1191 | const struct IBF_Key *ibf_key; | 1149 | const struct IBF_Key *ibf_key; |
1192 | unsigned int num_keys; | 1150 | unsigned int num_keys; |
1193 | 1151 | ||
1194 | /* look up elements and send them */ | 1152 | /* look up elements and send them */ |
1195 | if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1153 | if (op->state->phase != PHASE_INVENTORY_PASSIVE) |
1196 | { | 1154 | { |
1197 | GNUNET_break_op (0); | 1155 | GNUNET_break_op (0); |
1198 | fail_union_operation (op); | 1156 | fail_union_operation (op); |
1199 | return; | 1157 | return; |
1200 | } | 1158 | } |
1201 | num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 1159 | num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) |
1202 | / sizeof (struct IBF_Key); | 1160 | / sizeof (struct IBF_Key); |
1203 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 1161 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) |
1204 | != num_keys * sizeof (struct IBF_Key)) | 1162 | != num_keys * sizeof (struct IBF_Key)) |
1205 | { | 1163 | { |
@@ -1211,12 +1169,149 @@ handle_p2p_element_requests (void *cls, | |||
1211 | ibf_key = (const struct IBF_Key *) &mh[1]; | 1169 | ibf_key = (const struct IBF_Key *) &mh[1]; |
1212 | while (0 != num_keys--) | 1170 | while (0 != num_keys--) |
1213 | { | 1171 | { |
1214 | send_elements_for_key (op, *ibf_key); | 1172 | send_offers_for_key (op, *ibf_key); |
1215 | ibf_key++; | 1173 | ibf_key++; |
1216 | } | 1174 | } |
1217 | } | 1175 | } |
1218 | 1176 | ||
1219 | 1177 | ||
1178 | |||
1179 | static void | ||
1180 | handle_p2p_demand (void *cls, | ||
1181 | const struct GNUNET_MessageHeader *mh) | ||
1182 | { | ||
1183 | struct Operation *op = cls; | ||
1184 | struct ElementEntry *ee; | ||
1185 | struct GNUNET_SET_ElementMessage *emsg; | ||
1186 | const struct GNUNET_HashCode *hash; | ||
1187 | unsigned int num_hashes; | ||
1188 | struct GNUNET_MQ_Envelope *ev; | ||
1189 | |||
1190 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | ||
1191 | / sizeof (struct GNUNET_HashCode); | ||
1192 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | ||
1193 | != num_hashes * sizeof (struct GNUNET_HashCode)) | ||
1194 | { | ||
1195 | GNUNET_break_op (0); | ||
1196 | fail_union_operation (op); | ||
1197 | return; | ||
1198 | } | ||
1199 | |||
1200 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; | ||
1201 | num_hashes > 0; | ||
1202 | hash++, num_hashes--) | ||
1203 | { | ||
1204 | ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); | ||
1205 | if (NULL == ee) | ||
1206 | { | ||
1207 | /* Demand for non-existing element. */ | ||
1208 | GNUNET_break_op (0); | ||
1209 | fail_union_operation (op); | ||
1210 | return; | ||
1211 | } | ||
1212 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | ||
1213 | { | ||
1214 | /* Probably confused lazily copied sets. */ | ||
1215 | GNUNET_break_op (0); | ||
1216 | fail_union_operation (op); | ||
1217 | return; | ||
1218 | } | ||
1219 | ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | ||
1220 | memcpy (&emsg[1], ee->element.data, ee->element.size); | ||
1221 | emsg->reserved = htons (0); | ||
1222 | emsg->element_type = htons (ee->element.element_type); | ||
1223 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1224 | "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", | ||
1225 | (void *) op, | ||
1226 | (unsigned int) ee->element.size, | ||
1227 | GNUNET_h2s (&ee->element_hash)); | ||
1228 | GNUNET_MQ_send (op->mq, ev); | ||
1229 | |||
1230 | switch (op->spec->result_mode) | ||
1231 | { | ||
1232 | case GNUNET_SET_RESULT_ADDED: | ||
1233 | /* Nothing to do. */ | ||
1234 | break; | ||
1235 | case GNUNET_SET_RESULT_SYMMETRIC: | ||
1236 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); | ||
1237 | break; | ||
1238 | default: | ||
1239 | /* Result mode not supported, should have been caught earlier. */ | ||
1240 | GNUNET_break (0); | ||
1241 | break; | ||
1242 | } | ||
1243 | } | ||
1244 | } | ||
1245 | |||
1246 | |||
1247 | /** | ||
1248 | * Handle offers (of GNUNET_HashCode-s) and | ||
1249 | * respond with demands (of GNUNET_HashCode-s). | ||
1250 | * | ||
1251 | * @param cls the union operation | ||
1252 | * @param mh the message | ||
1253 | */ | ||
1254 | static void | ||
1255 | handle_p2p_offer (void *cls, | ||
1256 | const struct GNUNET_MessageHeader *mh) | ||
1257 | { | ||
1258 | struct Operation *op = cls; | ||
1259 | const struct GNUNET_HashCode *hash; | ||
1260 | unsigned int num_hashes; | ||
1261 | |||
1262 | /* look up elements and send them */ | ||
1263 | if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && | ||
1264 | (op->state->phase != PHASE_INVENTORY_ACTIVE)) | ||
1265 | { | ||
1266 | GNUNET_break_op (0); | ||
1267 | fail_union_operation (op); | ||
1268 | return; | ||
1269 | } | ||
1270 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | ||
1271 | / sizeof (struct GNUNET_HashCode); | ||
1272 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | ||
1273 | != num_hashes * sizeof (struct GNUNET_HashCode)) | ||
1274 | { | ||
1275 | GNUNET_break_op (0); | ||
1276 | fail_union_operation (op); | ||
1277 | return; | ||
1278 | } | ||
1279 | |||
1280 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; | ||
1281 | num_hashes > 0; | ||
1282 | hash++, num_hashes--) | ||
1283 | { | ||
1284 | struct ElementEntry *ee; | ||
1285 | struct GNUNET_MessageHeader *demands; | ||
1286 | struct GNUNET_MQ_Envelope *ev; | ||
1287 | ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); | ||
1288 | if (NULL != ee) | ||
1289 | if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) | ||
1290 | continue; | ||
1291 | |||
1292 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, hash)) | ||
1293 | { | ||
1294 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1295 | "Skipped sending duplicate demand\n"); | ||
1296 | continue; | ||
1297 | } | ||
1298 | |||
1299 | GNUNET_assert (GNUNET_OK == | ||
1300 | GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes, | ||
1301 | hash, | ||
1302 | NULL, | ||
1303 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); | ||
1304 | |||
1305 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1306 | "[OP %x] Requesting element (hash %s)\n", | ||
1307 | (void *) op, GNUNET_h2s (hash)); | ||
1308 | ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); | ||
1309 | *(struct GNUNET_HashCode *) &demands[1] = *hash; | ||
1310 | GNUNET_MQ_send (op->mq, ev); | ||
1311 | } | ||
1312 | } | ||
1313 | |||
1314 | |||
1220 | /** | 1315 | /** |
1221 | * Handle a done message from a remote peer | 1316 | * Handle a done message from a remote peer |
1222 | * | 1317 | * |
@@ -1228,25 +1323,40 @@ handle_p2p_done (void *cls, | |||
1228 | const struct GNUNET_MessageHeader *mh) | 1323 | const struct GNUNET_MessageHeader *mh) |
1229 | { | 1324 | { |
1230 | struct Operation *op = cls; | 1325 | struct Operation *op = cls; |
1231 | struct GNUNET_MQ_Envelope *ev; | ||
1232 | 1326 | ||
1233 | if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1327 | if (op->state->phase == PHASE_INVENTORY_PASSIVE) |
1234 | { | 1328 | { |
1235 | /* we got all requests, but still have to send our elements as response */ | 1329 | /* We got all requests, but still have to send our elements in response. */ |
1236 | 1330 | ||
1237 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1331 | op->state->phase = PHASE_FINISH_WAITING; |
1238 | "got DONE, sending final DONE after elements\n"); | 1332 | |
1239 | op->state->phase = PHASE_FINISHED; | 1333 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1240 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); | 1334 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); |
1241 | GNUNET_MQ_send (op->mq, ev); | 1335 | /* The active peer is done sending offers |
1336 | * and inquiries. This means that all | ||
1337 | * our responses to that (demands and offers) | ||
1338 | * must be in flight (queued or in mesh). | ||
1339 | * | ||
1340 | * We should notify the active peer once | ||
1341 | * all our demands are satisfied, so that the active | ||
1342 | * peer can quit if we gave him everything. | ||
1343 | */ | ||
1344 | maybe_finish (op); | ||
1242 | return; | 1345 | return; |
1243 | } | 1346 | } |
1244 | if (op->state->phase == PHASE_EXPECT_ELEMENTS) | 1347 | if (op->state->phase == PHASE_INVENTORY_ACTIVE) |
1245 | { | 1348 | { |
1246 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1349 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1247 | "got final DONE\n"); | 1350 | "got DONE (as active partner), waiting to finish\n"); |
1248 | op->state->phase = PHASE_FINISHED; | 1351 | /* All demands of the other peer are satisfied, |
1249 | finish_and_destroy (op); | 1352 | * and we processed all offers, thus we know |
1353 | * exactly what our demands must be. | ||
1354 | * | ||
1355 | * We'll close the channel | ||
1356 | * to the other peer once our demands are met. | ||
1357 | */ | ||
1358 | op->state->phase = PHASE_FINISH_CLOSING; | ||
1359 | maybe_finish (op); | ||
1250 | return; | 1360 | return; |
1251 | } | 1361 | } |
1252 | GNUNET_break_op (0); | 1362 | GNUNET_break_op (0); |
@@ -1268,12 +1378,14 @@ union_evaluate (struct Operation *op, | |||
1268 | struct GNUNET_MQ_Envelope *ev; | 1378 | struct GNUNET_MQ_Envelope *ev; |
1269 | struct OperationRequestMessage *msg; | 1379 | struct OperationRequestMessage *msg; |
1270 | 1380 | ||
1381 | GNUNET_assert (NULL == op->state); | ||
1271 | op->state = GNUNET_new (struct OperationState); | 1382 | op->state = GNUNET_new (struct OperationState); |
1383 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); | ||
1272 | /* copy the current generation's strata estimator for this operation */ | 1384 | /* copy the current generation's strata estimator for this operation */ |
1273 | op->state->se = strata_estimator_dup (op->spec->set->state->se); | 1385 | op->state->se = strata_estimator_dup (op->spec->set->state->se); |
1274 | /* we started the operation, thus we have to send the operation request */ | 1386 | /* we started the operation, thus we have to send the operation request */ |
1275 | op->state->phase = PHASE_EXPECT_SE; | 1387 | op->state->phase = PHASE_EXPECT_SE; |
1276 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1388 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1277 | "Initiating union operation evaluation\n"); | 1389 | "Initiating union operation evaluation\n"); |
1278 | ev = GNUNET_MQ_msg_nested_mh (msg, | 1390 | ev = GNUNET_MQ_msg_nested_mh (msg, |
1279 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 1391 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
@@ -1291,10 +1403,10 @@ union_evaluate (struct Operation *op, | |||
1291 | ev); | 1403 | ev); |
1292 | 1404 | ||
1293 | if (NULL != opaque_context) | 1405 | if (NULL != opaque_context) |
1294 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1406 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1295 | "sent op request with context message\n"); | 1407 | "sent op request with context message\n"); |
1296 | else | 1408 | else |
1297 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1409 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1298 | "sent op request without context message\n"); | 1410 | "sent op request without context message\n"); |
1299 | } | 1411 | } |
1300 | 1412 | ||
@@ -1308,10 +1420,12 @@ union_evaluate (struct Operation *op, | |||
1308 | static void | 1420 | static void |
1309 | union_accept (struct Operation *op) | 1421 | union_accept (struct Operation *op) |
1310 | { | 1422 | { |
1311 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1423 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1312 | "accepting set union operation\n"); | 1424 | "accepting set union operation\n"); |
1425 | GNUNET_assert (NULL == op->state); | ||
1313 | op->state = GNUNET_new (struct OperationState); | 1426 | op->state = GNUNET_new (struct OperationState); |
1314 | op->state->se = strata_estimator_dup (op->spec->set->state->se); | 1427 | op->state->se = strata_estimator_dup (op->spec->set->state->se); |
1428 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); | ||
1315 | /* kick off the operation */ | 1429 | /* kick off the operation */ |
1316 | send_strata_estimator (op); | 1430 | send_strata_estimator (op); |
1317 | } | 1431 | } |
@@ -1330,7 +1444,7 @@ union_set_create (void) | |||
1330 | { | 1444 | { |
1331 | struct SetState *set_state; | 1445 | struct SetState *set_state; |
1332 | 1446 | ||
1333 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1447 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1334 | "union set created\n"); | 1448 | "union set created\n"); |
1335 | set_state = GNUNET_new (struct SetState); | 1449 | set_state = GNUNET_new (struct SetState); |
1336 | set_state->se = strata_estimator_create (SE_STRATA_COUNT, | 1450 | set_state->se = strata_estimator_create (SE_STRATA_COUNT, |
@@ -1397,10 +1511,10 @@ int | |||
1397 | union_handle_p2p_message (struct Operation *op, | 1511 | union_handle_p2p_message (struct Operation *op, |
1398 | const struct GNUNET_MessageHeader *mh) | 1512 | const struct GNUNET_MessageHeader *mh) |
1399 | { | 1513 | { |
1400 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1514 | //LOG (GNUNET_ERROR_TYPE_DEBUG, |
1401 | "received p2p message (t: %u, s: %u)\n", | 1515 | // "received p2p message (t: %u, s: %u)\n", |
1402 | ntohs (mh->type), | 1516 | // ntohs (mh->type), |
1403 | ntohs (mh->size)); | 1517 | // ntohs (mh->size)); |
1404 | switch (ntohs (mh->type)) | 1518 | switch (ntohs (mh->type)) |
1405 | { | 1519 | { |
1406 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: | 1520 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: |
@@ -1410,29 +1524,36 @@ union_handle_p2p_message (struct Operation *op, | |||
1410 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | 1524 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: |
1411 | handle_p2p_elements (op, mh); | 1525 | handle_p2p_elements (op, mh); |
1412 | break; | 1526 | break; |
1413 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: | 1527 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: |
1414 | handle_p2p_element_requests (op, mh); | 1528 | handle_p2p_inquiry (op, mh); |
1415 | break; | 1529 | break; |
1416 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: | 1530 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: |
1417 | handle_p2p_done (op, mh); | 1531 | handle_p2p_done (op, mh); |
1418 | break; | 1532 | break; |
1533 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER: | ||
1534 | handle_p2p_offer (op, mh); | ||
1535 | break; | ||
1536 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: | ||
1537 | handle_p2p_demand (op, mh); | ||
1538 | break; | ||
1419 | default: | 1539 | default: |
1420 | /* something wrong with cadet's message handlers? */ | 1540 | /* Something wrong with cadet's message handlers? */ |
1421 | GNUNET_assert (0); | 1541 | GNUNET_assert (0); |
1422 | } | 1542 | } |
1423 | return GNUNET_OK; | 1543 | return GNUNET_OK; |
1424 | } | 1544 | } |
1425 | 1545 | ||
1546 | |||
1426 | /** | 1547 | /** |
1427 | * handler for peer-disconnects, notifies the client | 1548 | * Handler for peer-disconnects, notifies the client |
1428 | * about the aborted operation in case the op was not concluded | 1549 | * about the aborted operation in case the op was not concluded. |
1429 | * | 1550 | * |
1430 | * @param op the destroyed operation | 1551 | * @param op the destroyed operation |
1431 | */ | 1552 | */ |
1432 | static void | 1553 | static void |
1433 | union_peer_disconnect (struct Operation *op) | 1554 | union_peer_disconnect (struct Operation *op) |
1434 | { | 1555 | { |
1435 | if (PHASE_FINISHED != op->state->phase) | 1556 | if (PHASE_DONE != op->state->phase) |
1436 | { | 1557 | { |
1437 | struct GNUNET_MQ_Envelope *ev; | 1558 | struct GNUNET_MQ_Envelope *ev; |
1438 | struct GNUNET_SET_ResultMessage *msg; | 1559 | struct GNUNET_SET_ResultMessage *msg; |
@@ -1444,19 +1565,27 @@ union_peer_disconnect (struct Operation *op) | |||
1444 | msg->element_type = htons (0); | 1565 | msg->element_type = htons (0); |
1445 | GNUNET_MQ_send (op->spec->set->client_mq, | 1566 | GNUNET_MQ_send (op->spec->set->client_mq, |
1446 | ev); | 1567 | ev); |
1447 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1568 | LOG (GNUNET_ERROR_TYPE_WARNING, |
1448 | "other peer disconnected prematurely\n"); | 1569 | "other peer disconnected prematurely, phase %u\n", |
1570 | op->state->phase); | ||
1449 | _GSS_operation_destroy (op, | 1571 | _GSS_operation_destroy (op, |
1450 | GNUNET_YES); | 1572 | GNUNET_YES); |
1451 | return; | 1573 | return; |
1452 | } | 1574 | } |
1453 | // else: the session has already been concluded | 1575 | // else: the session has already been concluded |
1454 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1576 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1455 | "other peer disconnected (finished)\n"); | 1577 | "other peer disconnected (finished)\n"); |
1456 | if (GNUNET_NO == op->state->client_done_sent) | 1578 | if (GNUNET_NO == op->state->client_done_sent) |
1457 | finish_and_destroy (op); | 1579 | send_done_and_destroy (op); |
1458 | } | 1580 | } |
1459 | 1581 | ||
1582 | |||
1583 | /** | ||
1584 | * Copy union-specific set state. | ||
1585 | * | ||
1586 | * @param set source set for copying the union state | ||
1587 | * @return a copy of the union-specific set state | ||
1588 | */ | ||
1460 | static struct SetState * | 1589 | static struct SetState * |
1461 | union_copy_state (struct Set *set) | 1590 | union_copy_state (struct Set *set) |
1462 | { | 1591 | { |
@@ -1469,6 +1598,7 @@ union_copy_state (struct Set *set) | |||
1469 | return new_state; | 1598 | return new_state; |
1470 | } | 1599 | } |
1471 | 1600 | ||
1601 | |||
1472 | /** | 1602 | /** |
1473 | * Get the table with implementing functions for | 1603 | * Get the table with implementing functions for |
1474 | * set union. | 1604 | * set union. |
diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c index 1569c29b7..2ee1b762d 100644 --- a/src/set/gnunet-set-profiler.c +++ b/src/set/gnunet-set-profiler.c | |||
@@ -98,6 +98,7 @@ set_result_cb (void *cls, | |||
98 | enum GNUNET_SET_Status status) | 98 | enum GNUNET_SET_Status status) |
99 | { | 99 | { |
100 | struct SetInfo *info = cls; | 100 | struct SetInfo *info = cls; |
101 | struct GNUNET_HashCode hash; | ||
101 | 102 | ||
102 | GNUNET_assert (GNUNET_NO == info->done); | 103 | GNUNET_assert (GNUNET_NO == info->done); |
103 | switch (status) | 104 | switch (status) |
@@ -114,15 +115,22 @@ set_result_cb (void *cls, | |||
114 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n"); | 115 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n"); |
115 | GNUNET_SCHEDULER_shutdown (); | 116 | GNUNET_SCHEDULER_shutdown (); |
116 | return; | 117 | return; |
117 | case GNUNET_SET_STATUS_OK: | 118 | case GNUNET_SET_STATUS_ADD_LOCAL: |
119 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id); | ||
118 | break; | 120 | break; |
121 | case GNUNET_SET_STATUS_ADD_REMOTE: | ||
122 | GNUNET_CRYPTO_hash (element->data, element->size, &hash); | ||
123 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: remote element %s\n", info->id, | ||
124 | GNUNET_h2s (&hash)); | ||
125 | // XXX: record and check | ||
126 | return; | ||
119 | default: | 127 | default: |
120 | GNUNET_assert (0); | 128 | GNUNET_assert (0); |
121 | } | 129 | } |
122 | 130 | ||
123 | if (element->size != sizeof (struct GNUNET_HashCode)) | 131 | if (element->size != sizeof (struct GNUNET_HashCode)) |
124 | { | 132 | { |
125 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u\n", element->size); | 133 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u, expected %u\n", element->size, sizeof (struct GNUNET_HashCode)); |
126 | GNUNET_assert (0); | 134 | GNUNET_assert (0); |
127 | } | 135 | } |
128 | 136 | ||
@@ -180,6 +188,8 @@ static void | |||
180 | handle_shutdown (void *cls, | 188 | handle_shutdown (void *cls, |
181 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 189 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
182 | { | 190 | { |
191 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
192 | "Shutting down set profiler\n"); | ||
183 | if (NULL != set_listener) | 193 | if (NULL != set_listener) |
184 | { | 194 | { |
185 | GNUNET_SET_listen_cancel (set_listener); | 195 | GNUNET_SET_listen_cancel (set_listener); |
@@ -209,11 +219,13 @@ handle_shutdown (void *cls, | |||
209 | 219 | ||
210 | 220 | ||
211 | static void | 221 | static void |
212 | run (void *cls, char *const *args, const char *cfgfile, | 222 | run (void *cls, |
213 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 223 | const struct GNUNET_CONFIGURATION_Handle *cfg, |
224 | struct GNUNET_TESTING_Peer *peer) | ||
214 | { | 225 | { |
215 | unsigned int i; | 226 | unsigned int i; |
216 | struct GNUNET_HashCode hash; | 227 | struct GNUNET_HashCode hash; |
228 | struct GNUNET_HashCode hashhash; | ||
217 | 229 | ||
218 | config = cfg; | 230 | config = cfg; |
219 | 231 | ||
@@ -239,6 +251,9 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
239 | for (i = 0; i < num_a; i++) | 251 | for (i = 0; i < num_a; i++) |
240 | { | 252 | { |
241 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | 253 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); |
254 | GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash); | ||
255 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set a: Created element %s\n", | ||
256 | GNUNET_h2s (&hashhash)); | ||
242 | GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL, | 257 | GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL, |
243 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 258 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
244 | } | 259 | } |
@@ -246,6 +261,9 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
246 | for (i = 0; i < num_b; i++) | 261 | for (i = 0; i < num_b; i++) |
247 | { | 262 | { |
248 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | 263 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); |
264 | GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash); | ||
265 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set b: Created element %s\n", | ||
266 | GNUNET_h2s (&hashhash)); | ||
249 | GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL, | 267 | GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL, |
250 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 268 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
251 | } | 269 | } |
@@ -253,12 +271,14 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
253 | for (i = 0; i < num_c; i++) | 271 | for (i = 0; i < num_c; i++) |
254 | { | 272 | { |
255 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | 273 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); |
274 | GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash); | ||
275 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set c: Created element %s\n", | ||
276 | GNUNET_h2s (&hashhash)); | ||
256 | GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL, | 277 | GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL, |
257 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | 278 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
258 | } | 279 | } |
259 | 280 | ||
260 | /* use last hash for app id */ | 281 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id); |
261 | app_id = hash; | ||
262 | 282 | ||
263 | /* FIXME: also implement intersection etc. */ | 283 | /* FIXME: also implement intersection etc. */ |
264 | info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); | 284 | info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); |
@@ -281,6 +301,17 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
281 | } | 301 | } |
282 | 302 | ||
283 | 303 | ||
304 | static void | ||
305 | pre_run (void *cls, char *const *args, const char *cfgfile, | ||
306 | const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
307 | { | ||
308 | if (0 != GNUNET_TESTING_peer_run ("set-profiler", | ||
309 | cfgfile, | ||
310 | &run, NULL)) | ||
311 | ret = 2; | ||
312 | } | ||
313 | |||
314 | |||
284 | int | 315 | int |
285 | main (int argc, char **argv) | 316 | main (int argc, char **argv) |
286 | { | 317 | { |
@@ -295,13 +326,13 @@ main (int argc, char **argv) | |||
295 | gettext_noop ("number of values"), | 326 | gettext_noop ("number of values"), |
296 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c }, | 327 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c }, |
297 | { 'x', "operation", NULL, | 328 | { 'x', "operation", NULL, |
298 | gettext_noop ("oeration to execute"), | 329 | gettext_noop ("operation to execute"), |
299 | GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str }, | 330 | GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str }, |
300 | GNUNET_GETOPT_OPTION_END | 331 | GNUNET_GETOPT_OPTION_END |
301 | }; | 332 | }; |
302 | GNUNET_PROGRAM_run (argc, argv, "gnunet-set-profiler", | 333 | GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler", |
303 | "help", | 334 | "help", |
304 | options, &run, NULL); | 335 | options, &pre_run, NULL, GNUNET_YES); |
305 | return ret; | 336 | return ret; |
306 | } | 337 | } |
307 | 338 | ||
diff --git a/src/set/set_api.c b/src/set/set_api.c index 51a494d7f..16aa87cd0 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -398,40 +398,49 @@ handle_result (void *cls, | |||
398 | "Ignoring result from canceled operation\n"); | 398 | "Ignoring result from canceled operation\n"); |
399 | return; | 399 | return; |
400 | } | 400 | } |
401 | if (GNUNET_SET_STATUS_OK != result_status) | 401 | |
402 | switch (result_status) | ||
402 | { | 403 | { |
403 | /* status is not #GNUNET_SET_STATUS_OK => there's no attached element, | ||
404 | * and this is the last result message we get */ | ||
405 | GNUNET_MQ_assoc_remove (set->mq, | ||
406 | ntohl (msg->request_id)); | ||
407 | GNUNET_CONTAINER_DLL_remove (set->ops_head, | ||
408 | set->ops_tail, | ||
409 | oh); | ||
410 | if ( (GNUNET_YES == set->destroy_requested) && | ||
411 | (NULL == set->ops_head) ) | ||
412 | GNUNET_SET_destroy (set); | ||
413 | if (NULL != oh->result_cb) | ||
414 | oh->result_cb (oh->result_cls, | ||
415 | NULL, | ||
416 | result_status); | ||
417 | switch (result_status) | ||
418 | { | ||
419 | case GNUNET_SET_STATUS_OK: | 404 | case GNUNET_SET_STATUS_OK: |
420 | case GNUNET_SET_STATUS_ADD_LOCAL: | 405 | case GNUNET_SET_STATUS_ADD_LOCAL: |
421 | case GNUNET_SET_STATUS_ADD_REMOTE: | 406 | case GNUNET_SET_STATUS_ADD_REMOTE: |
422 | break; | 407 | goto do_element; |
423 | case GNUNET_SET_STATUS_FAILURE: | 408 | case GNUNET_SET_STATUS_FAILURE: |
424 | oh->result_cb = NULL; | ||
425 | break; | ||
426 | case GNUNET_SET_STATUS_HALF_DONE: | ||
427 | break; | ||
428 | case GNUNET_SET_STATUS_DONE: | 409 | case GNUNET_SET_STATUS_DONE: |
429 | oh->result_cb = NULL; | 410 | goto do_final; |
430 | break; | 411 | case GNUNET_SET_STATUS_HALF_DONE: |
431 | } | 412 | /* not used anymore */ |
432 | GNUNET_free (oh); | 413 | GNUNET_assert (0); |
433 | return; | 414 | } |
415 | |||
416 | do_final: | ||
417 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
418 | "Treating result as final status\n"); | ||
419 | GNUNET_MQ_assoc_remove (set->mq, | ||
420 | ntohl (msg->request_id)); | ||
421 | GNUNET_CONTAINER_DLL_remove (set->ops_head, | ||
422 | set->ops_tail, | ||
423 | oh); | ||
424 | if (NULL != oh->result_cb) | ||
425 | { | ||
426 | oh->result_cb (oh->result_cls, | ||
427 | NULL, | ||
428 | result_status); | ||
434 | } | 429 | } |
430 | else | ||
431 | { | ||
432 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
433 | "No callback for final status\n"); | ||
434 | } | ||
435 | if ( (GNUNET_YES == set->destroy_requested) && | ||
436 | (NULL == set->ops_head) ) | ||
437 | GNUNET_SET_destroy (set); | ||
438 | GNUNET_free (oh); | ||
439 | return; | ||
440 | |||
441 | do_element: | ||
442 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
443 | "Treating result as element\n"); | ||
435 | e.data = &msg[1]; | 444 | e.data = &msg[1]; |
436 | e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); | 445 | e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); |
437 | e.element_type = msg->element_type; | 446 | e.element_type = msg->element_type; |
diff --git a/src/set/test_set.conf b/src/set/test_set.conf index f3c0770fe..4f32b8854 100644 --- a/src/set/test_set.conf +++ b/src/set/test_set.conf | |||
@@ -1,11 +1,12 @@ | |||
1 | @INLINE@ ../../contrib/no_forcestart.conf | ||
2 | |||
1 | [PATHS] | 3 | [PATHS] |
2 | GNUNET_TEST_HOME = /tmp/test-gnunet-set/ | 4 | GNUNET_TEST_HOME = /tmp/test-gnunet-set/ |
3 | 5 | ||
4 | [set] | 6 | [set] |
5 | AUTOSTART = YES | 7 | AUTOSTART = YES |
6 | @UNIXONLY@ PORT = 2106 | ||
7 | #PREFIX = valgrind | 8 | #PREFIX = valgrind |
8 | #PREFIX = valgrind -v --leak-check=full | 9 | PREFIX = valgrind --leak-check=full |
9 | #PREFIX = gdbserver :1234 | 10 | #PREFIX = gdbserver :1234 |
10 | OPTIONS = -L INFO | 11 | OPTIONS = -L INFO |
11 | 12 | ||
@@ -21,38 +22,13 @@ USE_LOCALADDR = YES | |||
21 | [peerinfo] | 22 | [peerinfo] |
22 | NO_IO = YES | 23 | NO_IO = YES |
23 | 24 | ||
24 | [nse] | 25 | [nat] |
25 | WORKBITS = 0 | 26 | # Use addresses from the local network interfaces (inluding loopback, but also others) |
26 | 27 | USE_LOCALADDR = YES | |
27 | [hostlist] | ||
28 | FORCESTART = NO | ||
29 | AUTOSTART = NO | ||
30 | |||
31 | [fs] | ||
32 | FORCESTART = NO | ||
33 | AUTOSTART = NO | ||
34 | |||
35 | [vpn] | ||
36 | FORCESTART = NO | ||
37 | AUTOSTART = NO | ||
38 | |||
39 | [revocation] | ||
40 | FORCESTART = NO | ||
41 | AUTOSTART = NO | ||
42 | |||
43 | [gns] | ||
44 | FORCESTART = NO | ||
45 | AUTOSTART = NO | ||
46 | |||
47 | [namestore] | ||
48 | FORCESTART = NO | ||
49 | AUTOSTART = NO | ||
50 | 28 | ||
51 | [namecache] | 29 | # Disable IPv6 support |
52 | FORCESTART = NO | 30 | DISABLEV6 = NO |
53 | AUTOSTART = NO | ||
54 | 31 | ||
55 | [topology] | 32 | # Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8) |
56 | FORCESTART = NO | 33 | RETURN_LOCAL_ADDRESSES = YES |
57 | AUTOSTART = NO | ||
58 | 34 | ||
diff --git a/src/set/test_set_union_result_full.c b/src/set/test_set_union_result_symmetric.c index 7901c52be..7f119b528 100644 --- a/src/set/test_set_union_result_full.c +++ b/src/set/test_set_union_result_symmetric.c | |||
@@ -19,8 +19,8 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file set/test_set_union_result_full.c | 22 | * @file set/test_set_union_result_smmetric |
23 | * @brief testcase for full result mode of the union set operation | 23 | * @brief testcase for symmetric result mode of the union set operation |
24 | */ | 24 | */ |
25 | #include "platform.h" | 25 | #include "platform.h" |
26 | #include "gnunet_util_lib.h" | 26 | #include "gnunet_util_lib.h" |
@@ -69,7 +69,7 @@ result_cb_set1 (void *cls, | |||
69 | { | 69 | { |
70 | switch (status) | 70 | switch (status) |
71 | { | 71 | { |
72 | case GNUNET_SET_STATUS_OK: | 72 | case GNUNET_SET_STATUS_ADD_LOCAL: |
73 | count_set1++; | 73 | count_set1++; |
74 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 74 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
75 | "set 1: got element\n"); | 75 | "set 1: got element\n"); |
@@ -88,6 +88,8 @@ result_cb_set1 (void *cls, | |||
88 | if (NULL == set2) | 88 | if (NULL == set2) |
89 | GNUNET_SCHEDULER_shutdown (); | 89 | GNUNET_SCHEDULER_shutdown (); |
90 | break; | 90 | break; |
91 | case GNUNET_SET_STATUS_ADD_REMOTE: | ||
92 | break; | ||
91 | default: | 93 | default: |
92 | GNUNET_assert (0); | 94 | GNUNET_assert (0); |
93 | } | 95 | } |
@@ -101,7 +103,7 @@ result_cb_set2 (void *cls, | |||
101 | { | 103 | { |
102 | switch (status) | 104 | switch (status) |
103 | { | 105 | { |
104 | case GNUNET_SET_STATUS_OK: | 106 | case GNUNET_SET_STATUS_ADD_LOCAL: |
105 | count_set2++; | 107 | count_set2++; |
106 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 108 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
107 | "set 2: got element\n"); | 109 | "set 2: got element\n"); |
@@ -120,6 +122,8 @@ result_cb_set2 (void *cls, | |||
120 | if (NULL == set1) | 122 | if (NULL == set1) |
121 | GNUNET_SCHEDULER_shutdown (); | 123 | GNUNET_SCHEDULER_shutdown (); |
122 | break; | 124 | break; |
125 | case GNUNET_SET_STATUS_ADD_REMOTE: | ||
126 | break; | ||
123 | default: | 127 | default: |
124 | GNUNET_assert (0); | 128 | GNUNET_assert (0); |
125 | } | 129 | } |
@@ -140,7 +144,7 @@ listen_cb (void *cls, | |||
140 | "listen cb called\n"); | 144 | "listen cb called\n"); |
141 | GNUNET_SET_listen_cancel (listen_handle); | 145 | GNUNET_SET_listen_cancel (listen_handle); |
142 | oh = GNUNET_SET_accept (request, | 146 | oh = GNUNET_SET_accept (request, |
143 | GNUNET_SET_RESULT_FULL, | 147 | GNUNET_SET_RESULT_SYMMETRIC, |
144 | &result_cb_set2, | 148 | &result_cb_set2, |
145 | NULL); | 149 | NULL); |
146 | GNUNET_SET_commit (oh, set2); | 150 | GNUNET_SET_commit (oh, set2); |
@@ -168,7 +172,7 @@ start (void *cls) | |||
168 | oh = GNUNET_SET_prepare (&local_id, | 172 | oh = GNUNET_SET_prepare (&local_id, |
169 | &app_id, | 173 | &app_id, |
170 | &context_msg, | 174 | &context_msg, |
171 | GNUNET_SET_RESULT_FULL, | 175 | GNUNET_SET_RESULT_SYMMETRIC, |
172 | &result_cb_set1, NULL); | 176 | &result_cb_set1, NULL); |
173 | GNUNET_SET_commit (oh, set1); | 177 | GNUNET_SET_commit (oh, set1); |
174 | } | 178 | } |
@@ -353,7 +357,7 @@ main (int argc, char **argv) | |||
353 | { | 357 | { |
354 | return 1; | 358 | return 1; |
355 | } | 359 | } |
356 | GNUNET_assert (4 == count_set1); | 360 | GNUNET_assert (2 == count_set1); |
357 | GNUNET_assert (4 == count_set2); | 361 | GNUNET_assert (1 == count_set2); |
358 | return ret; | 362 | return ret; |
359 | } | 363 | } |