diff options
author | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-10-21 14:02:39 +0000 |
---|---|---|
committer | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-10-21 14:02:39 +0000 |
commit | 15389f2525da19c32e040ac1d32d3473b43456df (patch) | |
tree | 3f985987b66303bc93b22de0f29efd329f1c9e59 /src/set | |
parent | 1d58e25444716142e5a76367e6bf22ba3e036aea (diff) | |
download | gnunet-15389f2525da19c32e040ac1d32d3473b43456df.tar.gz gnunet-15389f2525da19c32e040ac1d32d3473b43456df.zip |
re-synced set intersection code with union as a starting point
added create for set-intersection
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/gnunet-service-set.c | 2 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 7 | ||||
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 805 |
3 files changed, 389 insertions, 425 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 99deb3b23..a5f132b41 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -583,7 +583,7 @@ handle_client_create (void *cls, | |||
583 | switch (ntohs (msg->operation)) | 583 | switch (ntohs (msg->operation)) |
584 | { | 584 | { |
585 | case GNUNET_SET_OPERATION_INTERSECTION: | 585 | case GNUNET_SET_OPERATION_INTERSECTION: |
586 | // FIXME | 586 | set->vt = _GSS_intersection_vt (); |
587 | break; | 587 | break; |
588 | case GNUNET_SET_OPERATION_UNION: | 588 | case GNUNET_SET_OPERATION_UNION: |
589 | set->vt = _GSS_union_vt (); | 589 | set->vt = _GSS_union_vt (); |
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index f26ff3fc3..e69f2a09a 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -377,5 +377,12 @@ struct TunnelContext | |||
377 | const struct SetVT * | 377 | const struct SetVT * |
378 | _GSS_union_vt (void); | 378 | _GSS_union_vt (void); |
379 | 379 | ||
380 | /** | ||
381 | * Get the table with implementing functions for | ||
382 | * set intersection. | ||
383 | */ | ||
384 | const struct SetVT * | ||
385 | _GSS_intersection_vt (void); | ||
386 | |||
380 | 387 | ||
381 | #endif | 388 | #endif |
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 3082f94b5..7e8bd9cbf 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -21,13 +21,11 @@ | |||
21 | /** | 21 | /** |
22 | * @file set/gnunet-service-set_intersection.c | 22 | * @file set/gnunet-service-set_intersection.c |
23 | * @brief two-peer set intersection | 23 | * @brief two-peer set intersection |
24 | * @author Christian M. Fuchs | 24 | * @author Christian Fuchs |
25 | */ | 25 | */ |
26 | 26 | #include "platform.h" | |
27 | 27 | #include "gnunet_util_lib.h" | |
28 | #include "gnunet-service-set.h" | 28 | #include "gnunet-service-set.h" |
29 | #include "gnunet_container_lib.h" | ||
30 | #include "gnunet_crypto_lib.h" | ||
31 | #include "ibf.h" | 29 | #include "ibf.h" |
32 | #include "strata_estimator.h" | 30 | #include "strata_estimator.h" |
33 | #include "set_protocol.h" | 31 | #include "set_protocol.h" |
@@ -45,7 +43,7 @@ | |||
45 | /** | 43 | /** |
46 | * hash num parameter for the difference digests and strata estimators | 44 | * hash num parameter for the difference digests and strata estimators |
47 | */ | 45 | */ |
48 | #define SE_IBF_HASH_NUM 3 | 46 | #define SE_IBF_HASH_NUM 4 |
49 | 47 | ||
50 | /** | 48 | /** |
51 | * Number of buckets that can be transmitted in one message. | 49 | * Number of buckets that can be transmitted in one message. |
@@ -59,9 +57,15 @@ | |||
59 | */ | 57 | */ |
60 | #define MAX_IBF_ORDER (16) | 58 | #define MAX_IBF_ORDER (16) |
61 | 59 | ||
60 | /** | ||
61 | * Number of buckets used in the ibf per estimated | ||
62 | * difference. | ||
63 | */ | ||
64 | #define IBF_ALPHA 4 | ||
65 | |||
62 | 66 | ||
63 | /** | 67 | /** |
64 | * Current phase we are in for a union operation | 68 | * Current phase we are in for a union operation. |
65 | */ | 69 | */ |
66 | enum IntersectionOperationPhase | 70 | enum IntersectionOperationPhase |
67 | { | 71 | { |
@@ -100,39 +104,23 @@ enum IntersectionOperationPhase | |||
100 | * State of an evaluate operation | 104 | * State of an evaluate operation |
101 | * with another peer. | 105 | * with another peer. |
102 | */ | 106 | */ |
103 | struct IntersectionEvaluateOperation | 107 | struct OperationState |
104 | { | 108 | { |
105 | /** | 109 | /** |
106 | * Local set the operation is evaluated on. | 110 | * Tunnel to the remote peer. |
107 | */ | 111 | */ |
108 | struct Set *set; | 112 | struct GNUNET_MESH_Tunnel *tunnel; |
109 | |||
110 | /** | ||
111 | * Peer with the remote set | ||
112 | */ | ||
113 | struct GNUNET_PeerIdentity peer; | ||
114 | |||
115 | /** | ||
116 | * Application-specific identifier | ||
117 | */ | ||
118 | struct GNUNET_HashCode app_id; | ||
119 | |||
120 | /** | ||
121 | * Context message, given to us | ||
122 | * by the client, may be NULL. | ||
123 | */ | ||
124 | struct GNUNET_MessageHeader *context_msg; | ||
125 | 113 | ||
126 | /** | 114 | /** |
127 | * Tunnel to the other peer. | 115 | * Detail information about the set operation, |
116 | * including the set to use. | ||
128 | */ | 117 | */ |
129 | struct GNUNET_MESH_Tunnel *tunnel; | 118 | struct OperationSpecification *spec; |
130 | 119 | ||
131 | /** | 120 | /** |
132 | * Request ID to multiplex set operations to | 121 | * Message queue for the peer. |
133 | * the client inhabiting the set. | ||
134 | */ | 122 | */ |
135 | uint32_t request_id; | 123 | struct GNUNET_MQ_Handle *mq; |
136 | 124 | ||
137 | /** | 125 | /** |
138 | * Number of ibf buckets received | 126 | * Number of ibf buckets received |
@@ -157,6 +145,8 @@ struct IntersectionEvaluateOperation | |||
157 | 145 | ||
158 | /** | 146 | /** |
159 | * Maps IBF-Keys (specific to the current salt) to elements. | 147 | * Maps IBF-Keys (specific to the current salt) to elements. |
148 | * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. | ||
149 | * Colliding IBF-Keys are linked. | ||
160 | */ | 150 | */ |
161 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; | 151 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; |
162 | 152 | ||
@@ -166,80 +156,39 @@ struct IntersectionEvaluateOperation | |||
166 | enum IntersectionOperationPhase phase; | 156 | enum IntersectionOperationPhase phase; |
167 | 157 | ||
168 | /** | 158 | /** |
169 | * Salt to use for this operation. | ||
170 | */ | ||
171 | uint16_t salt; | ||
172 | |||
173 | /** | ||
174 | * Generation in which the operation handle | 159 | * Generation in which the operation handle |
175 | * was created. | 160 | * was created. |
176 | */ | 161 | */ |
177 | unsigned int generation_created; | 162 | unsigned int generation_created; |
178 | 163 | ||
179 | /** | 164 | /** |
180 | * Evaluate operations are held in | 165 | * Set state of the set that this operation |
181 | * a linked list. | 166 | * belongs to. |
182 | */ | 167 | */ |
183 | struct IntersectionEvaluateOperation *next; | 168 | struct Set *set; |
184 | 169 | ||
185 | /** | 170 | /** |
186 | * Evaluate operations are held in | 171 | * Evaluate operations are held in |
187 | * a linked list. | 172 | * a linked list. |
188 | */ | 173 | */ |
189 | struct IntersectionEvaluateOperation *prev; | 174 | struct OperationState *next; |
190 | }; | ||
191 | |||
192 | 175 | ||
193 | /** | 176 | /** |
194 | * Information about the element in a set. | 177 | * Evaluate operations are held in |
195 | * All elements are stored in a hash-table | 178 | * a linked list. |
196 | * from their hash-code to their 'struct Element', | 179 | */ |
197 | * so that the remove and add operations are reasonably | 180 | struct OperationState *prev; |
198 | * fast. | ||
199 | */ | ||
200 | struct ElementEntry | ||
201 | { | ||
202 | /** | ||
203 | * The actual element. The data for the element | ||
204 | * should be allocated at the end of this struct. | ||
205 | */ | ||
206 | struct GNUNET_SET_Element element; | ||
207 | |||
208 | /** | ||
209 | * Hash of the element. | ||
210 | * Will be used to derive the different IBF keys | ||
211 | * for different salts. | ||
212 | */ | ||
213 | struct GNUNET_HashCode element_hash; | ||
214 | |||
215 | /** | ||
216 | * Generation the element was added by the client. | ||
217 | * Operations of earlier generations will not consider the element. | ||
218 | */ | ||
219 | unsigned int generation_added; | ||
220 | |||
221 | /** | ||
222 | * GNUNET_YES if the element has been removed in some generation. | ||
223 | */ | ||
224 | int removed; | ||
225 | |||
226 | /** | ||
227 | * Generation the element was removed by the client. | ||
228 | * Operations of later generations will not consider the element. | ||
229 | * Only valid if is_removed is GNUNET_YES. | ||
230 | */ | ||
231 | unsigned int generation_removed; | ||
232 | 181 | ||
233 | /** | 182 | /** |
234 | * GNUNET_YES if the element is a remote element, and does not belong | 183 | * Did we send the client that we are done? |
235 | * to the operation's set. | ||
236 | */ | 184 | */ |
237 | int remote; | 185 | int client_done_sent; |
238 | }; | 186 | }; |
239 | 187 | ||
240 | 188 | ||
241 | /** | 189 | /** |
242 | * Entries in the key-to-element map of the union set. | 190 | * The key entry is used to associate an ibf key with |
191 | * an element. | ||
243 | */ | 192 | */ |
244 | struct KeyEntry | 193 | struct KeyEntry |
245 | { | 194 | { |
@@ -260,6 +209,7 @@ struct KeyEntry | |||
260 | struct KeyEntry *next_colliding; | 209 | struct KeyEntry *next_colliding; |
261 | }; | 210 | }; |
262 | 211 | ||
212 | |||
263 | /** | 213 | /** |
264 | * Used as a closure for sending elements | 214 | * Used as a closure for sending elements |
265 | * with a specific IBF key. | 215 | * with a specific IBF key. |
@@ -276,14 +226,14 @@ struct SendElementClosure | |||
276 | * Operation for which the elements | 226 | * Operation for which the elements |
277 | * should be sent. | 227 | * should be sent. |
278 | */ | 228 | */ |
279 | struct IntersectionEvaluateOperation *eo; | 229 | struct OperationState *eo; |
280 | }; | 230 | }; |
281 | 231 | ||
282 | 232 | ||
283 | /** | 233 | /** |
284 | * Extra state required for efficient set union. | 234 | * Extra state required for efficient set union. |
285 | */ | 235 | */ |
286 | struct IntersectionState | 236 | struct SetState |
287 | { | 237 | { |
288 | /** | 238 | /** |
289 | * The strata estimator is only generated once for | 239 | * The strata estimator is only generated once for |
@@ -294,70 +244,19 @@ struct IntersectionState | |||
294 | struct StrataEstimator *se; | 244 | struct StrataEstimator *se; |
295 | 245 | ||
296 | /** | 246 | /** |
297 | * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'. | ||
298 | */ | ||
299 | struct GNUNET_CONTAINER_MultiHashMap *elements; | ||
300 | |||
301 | /** | ||
302 | * Evaluate operations are held in | 247 | * Evaluate operations are held in |
303 | * a linked list. | 248 | * a linked list. |
304 | */ | 249 | */ |
305 | struct IntersectionEvaluateOperation *ops_head; | 250 | struct OperationState *ops_head; |
306 | 251 | ||
307 | /** | 252 | /** |
308 | * Evaluate operations are held in | 253 | * Evaluate operations are held in |
309 | * a linked list. | 254 | * a linked list. |
310 | */ | 255 | */ |
311 | struct IntersectionEvaluateOperation *ops_tail; | 256 | struct OperationState *ops_tail; |
312 | |||
313 | /** | ||
314 | * Current generation, that is, number of | ||
315 | * previously executed operations on this set | ||
316 | */ | ||
317 | unsigned int current_generation; | ||
318 | }; | 257 | }; |
319 | 258 | ||
320 | 259 | ||
321 | |||
322 | /** | ||
323 | * Iterator over hash map entries. | ||
324 | * | ||
325 | * @param cls closure | ||
326 | * @param key current key code | ||
327 | * @param value value in the hash map | ||
328 | * @return GNUNET_YES if we should continue to | ||
329 | * iterate, | ||
330 | * GNUNET_NO if not. | ||
331 | */ | ||
332 | static int | ||
333 | destroy_elements_iterator (void *cls, | ||
334 | const struct GNUNET_HashCode * key, | ||
335 | void *value) | ||
336 | { | ||
337 | struct ElementEntry *ee = value; | ||
338 | |||
339 | GNUNET_free (ee); | ||
340 | return GNUNET_YES; | ||
341 | } | ||
342 | |||
343 | |||
344 | /** | ||
345 | * Destroy the elements belonging to a union set. | ||
346 | * | ||
347 | * @param us union state that contains the elements | ||
348 | */ | ||
349 | static void | ||
350 | destroy_elements (struct IntersectionState *us) | ||
351 | { | ||
352 | if (NULL == us->elements) | ||
353 | return; | ||
354 | GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL); | ||
355 | GNUNET_CONTAINER_multihashmap_destroy (us->elements); | ||
356 | us->elements = NULL; | ||
357 | } | ||
358 | |||
359 | |||
360 | |||
361 | /** | 260 | /** |
362 | * Iterator over hash map entries. | 261 | * Iterator over hash map entries. |
363 | * | 262 | * |
@@ -379,6 +278,11 @@ destroy_key_to_element_iter (void *cls, | |||
379 | { | 278 | { |
380 | struct KeyEntry *k_tmp = k; | 279 | struct KeyEntry *k_tmp = k; |
381 | k = k->next_colliding; | 280 | k = k->next_colliding; |
281 | if (GNUNET_YES == k_tmp->element->remote) | ||
282 | { | ||
283 | GNUNET_free (k_tmp->element); | ||
284 | k_tmp->element = NULL; | ||
285 | } | ||
382 | GNUNET_free (k_tmp); | 286 | GNUNET_free (k_tmp); |
383 | } | 287 | } |
384 | return GNUNET_YES; | 288 | return GNUNET_YES; |
@@ -391,18 +295,24 @@ destroy_key_to_element_iter (void *cls, | |||
391 | * | 295 | * |
392 | * @param eo the union operation to destroy | 296 | * @param eo the union operation to destroy |
393 | */ | 297 | */ |
394 | void | 298 | static void |
395 | _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | 299 | intersection_operation_destroy (struct OperationState *eo) |
396 | { | 300 | { |
397 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); | 301 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); |
398 | 302 | GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head, | |
303 | eo->set->state->ops_tail, | ||
304 | eo); | ||
305 | if (NULL != eo->mq) | ||
306 | { | ||
307 | GNUNET_MQ_destroy (eo->mq); | ||
308 | eo->mq = NULL; | ||
309 | } | ||
399 | if (NULL != eo->tunnel) | 310 | if (NULL != eo->tunnel) |
400 | { | 311 | { |
401 | GNUNET_MESH_tunnel_destroy (eo->tunnel); | 312 | struct GNUNET_MESH_Tunnel *t = eo->tunnel; |
402 | /* wait for the final destruction by the tunnel cleaner */ | 313 | eo->tunnel = NULL; |
403 | return; | 314 | GNUNET_MESH_tunnel_destroy (t); |
404 | } | 315 | } |
405 | |||
406 | if (NULL != eo->remote_ibf) | 316 | if (NULL != eo->remote_ibf) |
407 | { | 317 | { |
408 | ibf_destroy (eo->remote_ibf); | 318 | ibf_destroy (eo->remote_ibf); |
@@ -424,13 +334,19 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
424 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); | 334 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); |
425 | eo->key_to_element = NULL; | 335 | eo->key_to_element = NULL; |
426 | } | 336 | } |
427 | 337 | if (NULL != eo->spec) | |
428 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, | 338 | { |
429 | eo->set->state.u->ops_tail, | 339 | if (NULL != eo->spec->context_msg) |
430 | eo); | 340 | { |
341 | GNUNET_free (eo->spec->context_msg); | ||
342 | eo->spec->context_msg = NULL; | ||
343 | } | ||
344 | GNUNET_free (eo->spec); | ||
345 | eo->spec = NULL; | ||
346 | } | ||
431 | GNUNET_free (eo); | 347 | GNUNET_free (eo); |
432 | 348 | ||
433 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); | 349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); |
434 | 350 | ||
435 | /* FIXME: do a garbage collection of the set generations */ | 351 | /* FIXME: do a garbage collection of the set generations */ |
436 | } | 352 | } |
@@ -443,16 +359,17 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) | |||
443 | * @param eo the union operation to fail | 359 | * @param eo the union operation to fail |
444 | */ | 360 | */ |
445 | static void | 361 | static void |
446 | fail_union_operation (struct UnionEvaluateOperation *eo) | 362 | fail_intersection_operation (struct OperationState *eo) |
447 | { | 363 | { |
448 | struct GNUNET_MQ_Envelope *mqm; | 364 | struct GNUNET_MQ_Envelope *ev; |
449 | struct GNUNET_SET_ResultMessage *msg; | 365 | struct GNUNET_SET_ResultMessage *msg; |
450 | 366 | ||
451 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 367 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
452 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 368 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
453 | msg->request_id = htonl (eo->request_id); | 369 | msg->request_id = htonl (eo->spec->client_request_id); |
454 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 370 | msg->element_type = htons (0); |
455 | _GSS_union_operation_destroy (eo); | 371 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
372 | union_operation_destroy (eo); | ||
456 | } | 373 | } |
457 | 374 | ||
458 | 375 | ||
@@ -484,31 +401,37 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | |||
484 | * @param eo operation with the other peer | 401 | * @param eo operation with the other peer |
485 | */ | 402 | */ |
486 | static void | 403 | static void |
487 | send_operation_request (struct IntersectionEvaluateOperation *eo) | 404 | send_operation_request (struct OperationState *eo) |
488 | { | 405 | { |
489 | struct GNUNET_MQ_Envelope *mqm; | 406 | struct GNUNET_MQ_Envelope *ev; |
490 | struct OperationRequestMessage *msg; | 407 | struct OperationRequestMessage *msg; |
491 | 408 | ||
492 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); | 409 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
410 | eo->spec->context_msg); | ||
493 | 411 | ||
494 | if (NULL == mqm) | 412 | if (NULL == ev) |
495 | { | 413 | { |
496 | /* the context message is too large */ | 414 | /* the context message is too large */ |
497 | GNUNET_break (0); | 415 | GNUNET_break (0); |
498 | GNUNET_SERVER_client_disconnect (eo->set->client); | 416 | GNUNET_SERVER_client_disconnect (eo->spec->set->client); |
499 | return; | 417 | return; |
500 | } | 418 | } |
501 | msg->operation = htons (GNUNET_SET_OPERATION_INTERSECTION); | 419 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); |
502 | msg->app_id = eo->app_id; | 420 | msg->app_id = eo->spec->app_id; |
503 | GNUNET_MQ_send (eo->tc->mq, mqm); | 421 | msg->salt = htonl (eo->spec->salt); |
422 | GNUNET_MQ_send (eo->mq, ev); | ||
423 | |||
424 | if (NULL != eo->spec->context_msg) | ||
425 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); | ||
426 | else | ||
427 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); | ||
504 | 428 | ||
505 | if (NULL != eo->context_msg) | 429 | if (NULL != eo->spec->context_msg) |
506 | { | 430 | { |
507 | GNUNET_free (eo->context_msg); | 431 | GNUNET_free (eo->spec->context_msg); |
508 | eo->context_msg = NULL; | 432 | eo->spec->context_msg = NULL; |
509 | } | 433 | } |
510 | 434 | ||
511 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); | ||
512 | } | 435 | } |
513 | 436 | ||
514 | 437 | ||
@@ -524,7 +447,7 @@ send_operation_request (struct IntersectionEvaluateOperation *eo) | |||
524 | * GNUNET_NO if not. | 447 | * GNUNET_NO if not. |
525 | */ | 448 | */ |
526 | static int | 449 | static int |
527 | insert_element_iterator (void *cls, | 450 | op_register_element_iterator (void *cls, |
528 | uint32_t key, | 451 | uint32_t key, |
529 | void *value) | 452 | void *value) |
530 | { | 453 | { |
@@ -548,32 +471,36 @@ insert_element_iterator (void *cls, | |||
548 | 471 | ||
549 | /** | 472 | /** |
550 | * Insert an element into the union operation's | 473 | * Insert an element into the union operation's |
551 | * key-to-element mapping | 474 | * key-to-element mapping. Takes ownership of 'ee'. |
475 | * Note that this does not insert the element in the set, | ||
476 | * only in the operation's key-element mapping. | ||
477 | * This is done to speed up re-tried operations, if some elements | ||
478 | * were transmitted, and then the IBF fails to decode. | ||
552 | * | 479 | * |
553 | * @param eo the union operation | 480 | * @param eo the union operation |
554 | * @param ee the element entry | 481 | * @param ee the element entry |
555 | */ | 482 | */ |
556 | static void | 483 | static void |
557 | insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) | 484 | op_register_element (struct OperationState *eo, struct ElementEntry *ee) |
558 | { | 485 | { |
559 | int ret; | 486 | int ret; |
560 | struct IBF_Key ibf_key; | 487 | struct IBF_Key ibf_key; |
561 | struct KeyEntry *k; | 488 | struct KeyEntry *k; |
562 | 489 | ||
563 | ibf_key = get_ibf_key (&ee->element_hash, eo->salt); | 490 | ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); |
564 | k = GNUNET_new (struct KeyEntry); | 491 | k = GNUNET_new (struct KeyEntry); |
565 | k->element = ee; | 492 | k->element = ee; |
566 | k->ibf_key = ibf_key; | 493 | k->ibf_key = ibf_key; |
567 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, | 494 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, |
568 | (uint32_t) ibf_key.key_val, | 495 | (uint32_t) ibf_key.key_val, |
569 | insert_element_iterator, k); | 496 | op_register_element_iterator, k); |
570 | 497 | ||
571 | /* was the element inserted into a colliding bucket? */ | 498 | /* was the element inserted into a colliding bucket? */ |
572 | if (GNUNET_SYSERR == ret) | 499 | if (GNUNET_SYSERR == ret) |
573 | return; | 500 | return; |
574 | 501 | ||
575 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | 502 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, |
576 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 503 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
577 | } | 504 | } |
578 | 505 | ||
579 | 506 | ||
@@ -592,6 +519,8 @@ prepare_ibf_iterator (void *cls, | |||
592 | struct InvertibleBloomFilter *ibf = cls; | 519 | struct InvertibleBloomFilter *ibf = cls; |
593 | struct KeyEntry *ke = value; | 520 | struct KeyEntry *ke = value; |
594 | 521 | ||
522 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val); | ||
523 | |||
595 | ibf_insert (ibf, ke->ibf_key); | 524 | ibf_insert (ibf, ke->ibf_key); |
596 | return GNUNET_YES; | 525 | return GNUNET_YES; |
597 | } | 526 | } |
@@ -605,13 +534,15 @@ prepare_ibf_iterator (void *cls, | |||
605 | * @param key unised | 534 | * @param key unised |
606 | * @param value the element entry to insert | 535 | * @param value the element entry to insert |
607 | * into the key-to-element mapping | 536 | * into the key-to-element mapping |
537 | * @return GNUNET_YES to continue iterating, | ||
538 | * GNUNET_NO to stop | ||
608 | */ | 539 | */ |
609 | static int | 540 | static int |
610 | init_key_to_element_iterator (void *cls, | 541 | init_key_to_element_iterator (void *cls, |
611 | const struct GNUNET_HashCode *key, | 542 | const struct GNUNET_HashCode *key, |
612 | void *value) | 543 | void *value) |
613 | { | 544 | { |
614 | struct UnionEvaluateOperation *eo = cls; | 545 | struct OperationState *eo = cls; |
615 | struct ElementEntry *e = value; | 546 | struct ElementEntry *e = value; |
616 | 547 | ||
617 | /* make sure that the element belongs to the set at the time | 548 | /* make sure that the element belongs to the set at the time |
@@ -621,7 +552,9 @@ init_key_to_element_iterator (void *cls, | |||
621 | (e->generation_removed < eo->generation_created))) | 552 | (e->generation_removed < eo->generation_created))) |
622 | return GNUNET_YES; | 553 | return GNUNET_YES; |
623 | 554 | ||
624 | insert_element (eo, e); | 555 | GNUNET_assert (GNUNET_NO == e->remote); |
556 | |||
557 | op_register_element (eo, e); | ||
625 | return GNUNET_YES; | 558 | return GNUNET_YES; |
626 | } | 559 | } |
627 | 560 | ||
@@ -634,15 +567,15 @@ init_key_to_element_iterator (void *cls, | |||
634 | * @param size size of the ibf to create | 567 | * @param size size of the ibf to create |
635 | */ | 568 | */ |
636 | static void | 569 | static void |
637 | prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) | 570 | prepare_ibf (struct OperationState *eo, uint16_t size) |
638 | { | 571 | { |
639 | if (NULL == eo->key_to_element) | 572 | if (NULL == eo->key_to_element) |
640 | { | 573 | { |
641 | unsigned int len; | 574 | unsigned int len; |
642 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); | 575 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements); |
643 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | 576 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); |
644 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, | 577 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements, |
645 | init_key_to_element_iterator, eo); | 578 | init_key_to_element_iterator, eo); |
646 | } | 579 | } |
647 | if (NULL != eo->local_ibf) | 580 | if (NULL != eo->local_ibf) |
648 | ibf_destroy (eo->local_ibf); | 581 | ibf_destroy (eo->local_ibf); |
@@ -659,21 +592,21 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) | |||
659 | * @param ibf_order order of the ibf to send, size=2^order | 592 | * @param ibf_order order of the ibf to send, size=2^order |
660 | */ | 593 | */ |
661 | static void | 594 | static void |
662 | send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | 595 | send_ibf (struct OperationState *eo, uint16_t ibf_order) |
663 | { | 596 | { |
664 | unsigned int buckets_sent = 0; | 597 | unsigned int buckets_sent = 0; |
665 | struct InvertibleBloomFilter *ibf; | 598 | struct InvertibleBloomFilter *ibf; |
666 | 599 | ||
667 | prepare_ibf (eo, 1<<ibf_order); | 600 | prepare_ibf (eo, 1<<ibf_order); |
668 | 601 | ||
669 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order); | 602 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); |
670 | 603 | ||
671 | ibf = eo->local_ibf; | 604 | ibf = eo->local_ibf; |
672 | 605 | ||
673 | while (buckets_sent < (1 << ibf_order)) | 606 | while (buckets_sent < (1 << ibf_order)) |
674 | { | 607 | { |
675 | unsigned int buckets_in_message; | 608 | unsigned int buckets_in_message; |
676 | struct GNUNET_MQ_Envelope *mqm; | 609 | struct GNUNET_MQ_Envelope *ev; |
677 | struct IBFMessage *msg; | 610 | struct IBFMessage *msg; |
678 | 611 | ||
679 | buckets_in_message = (1 << ibf_order) - buckets_sent; | 612 | buckets_in_message = (1 << ibf_order) - buckets_sent; |
@@ -681,14 +614,17 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
681 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | 614 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) |
682 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | 615 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; |
683 | 616 | ||
684 | mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, | 617 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, |
685 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); | 618 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); |
619 | msg->reserved = 0; | ||
686 | msg->order = ibf_order; | 620 | msg->order = ibf_order; |
687 | msg->offset = htons (buckets_sent); | 621 | msg->offset = htons (buckets_sent); |
688 | ibf_write_slice (ibf, buckets_sent, | 622 | ibf_write_slice (ibf, buckets_sent, |
689 | buckets_in_message, &msg[1]); | 623 | buckets_in_message, &msg[1]); |
690 | buckets_sent += buckets_in_message; | 624 | buckets_sent += buckets_in_message; |
691 | GNUNET_MQ_send (eo->tc->mq, mqm); | 625 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", |
626 | buckets_in_message, buckets_sent, 1<<ibf_order); | ||
627 | GNUNET_MQ_send (eo->mq, ev); | ||
692 | } | 628 | } |
693 | 629 | ||
694 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; | 630 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; |
@@ -701,17 +637,18 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
701 | * @param eo the union operation with the remote peer | 637 | * @param eo the union operation with the remote peer |
702 | */ | 638 | */ |
703 | static void | 639 | static void |
704 | send_strata_estimator (struct IntersectionEvaluateOperation *eo) | 640 | send_strata_estimator (struct OperationState *eo) |
705 | { | 641 | { |
706 | struct GNUNET_MQ_Envelope *mqm; | 642 | struct GNUNET_MQ_Envelope *ev; |
707 | struct GNUNET_MessageHeader *strata_msg; | 643 | struct GNUNET_MessageHeader *strata_msg; |
708 | 644 | ||
709 | mqm = GNUNET_MQ_msg_header_extra (strata_msg, | 645 | ev = GNUNET_MQ_msg_header_extra (strata_msg, |
710 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 646 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
711 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 647 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); |
712 | strata_estimator_write (eo->set->state.i->se, &strata_msg[1]); | 648 | strata_estimator_write (eo->set->state->se, &strata_msg[1]); |
713 | GNUNET_MQ_send (eo->tc->mq, mqm); | 649 | GNUNET_MQ_send (eo->mq, ev); |
714 | eo->phase = PHASE_EXPECT_IBF; | 650 | eo->phase = PHASE_EXPECT_IBF; |
651 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); | ||
715 | } | 652 | } |
716 | 653 | ||
717 | 654 | ||
@@ -728,7 +665,7 @@ get_order_from_difference (unsigned int diff) | |||
728 | unsigned int ibf_order; | 665 | unsigned int ibf_order; |
729 | 666 | ||
730 | ibf_order = 2; | 667 | ibf_order = 2; |
731 | while ((1<<ibf_order) < (2 * diff)) | 668 | while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM) |
732 | ibf_order++; | 669 | ibf_order++; |
733 | if (ibf_order > MAX_IBF_ORDER) | 670 | if (ibf_order > MAX_IBF_ORDER) |
734 | ibf_order = MAX_IBF_ORDER; | 671 | ibf_order = MAX_IBF_ORDER; |
@@ -745,11 +682,10 @@ get_order_from_difference (unsigned int diff) | |||
745 | static void | 682 | static void |
746 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | 683 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) |
747 | { | 684 | { |
748 | struct UnionEvaluateOperation *eo = cls; | 685 | struct OperationState *eo = cls; |
749 | struct StrataEstimator *remote_se; | 686 | struct StrataEstimator *remote_se; |
750 | int diff; | 687 | int diff; |
751 | 688 | ||
752 | |||
753 | if (eo->phase != PHASE_EXPECT_SE) | 689 | if (eo->phase != PHASE_EXPECT_SE) |
754 | { | 690 | { |
755 | fail_union_operation (eo); | 691 | fail_union_operation (eo); |
@@ -761,10 +697,11 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | |||
761 | strata_estimator_read (&mh[1], remote_se); | 697 | strata_estimator_read (&mh[1], remote_se); |
762 | GNUNET_assert (NULL != eo->se); | 698 | GNUNET_assert (NULL != eo->se); |
763 | diff = strata_estimator_difference (remote_se, eo->se); | 699 | diff = strata_estimator_difference (remote_se, eo->se); |
764 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff); | ||
765 | strata_estimator_destroy (remote_se); | 700 | strata_estimator_destroy (remote_se); |
766 | strata_estimator_destroy (eo->se); | 701 | strata_estimator_destroy (eo->se); |
767 | eo->se = NULL; | 702 | eo->se = NULL; |
703 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", | ||
704 | diff, 1<<get_order_from_difference (diff)); | ||
768 | send_ibf (eo, get_order_from_difference (diff)); | 705 | send_ibf (eo, get_order_from_difference (diff)); |
769 | } | 706 | } |
770 | 707 | ||
@@ -784,7 +721,7 @@ send_element_iterator (void *cls, | |||
784 | { | 721 | { |
785 | struct SendElementClosure *sec = cls; | 722 | struct SendElementClosure *sec = cls; |
786 | struct IBF_Key ibf_key = sec->ibf_key; | 723 | struct IBF_Key ibf_key = sec->ibf_key; |
787 | struct UnionEvaluateOperation *eo = sec->eo; | 724 | struct OperationState *eo = sec->eo; |
788 | struct KeyEntry *ke = value; | 725 | struct KeyEntry *ke = value; |
789 | 726 | ||
790 | if (ke->ibf_key.key_val != ibf_key.key_val) | 727 | if (ke->ibf_key.key_val != ibf_key.key_val) |
@@ -792,20 +729,21 @@ send_element_iterator (void *cls, | |||
792 | while (NULL != ke) | 729 | while (NULL != ke) |
793 | { | 730 | { |
794 | const struct GNUNET_SET_Element *const element = &ke->element->element; | 731 | const struct GNUNET_SET_Element *const element = &ke->element->element; |
795 | struct GNUNET_MQ_Envelope *mqm; | 732 | struct GNUNET_MQ_Envelope *ev; |
796 | struct GNUNET_MessageHeader *mh; | 733 | struct GNUNET_MessageHeader *mh; |
797 | 734 | ||
798 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); | 735 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); |
799 | mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | 736 | ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); |
800 | if (NULL == mqm) | 737 | if (NULL == ev) |
801 | { | 738 | { |
802 | /* element too large */ | 739 | /* element too large */ |
803 | GNUNET_break (0); | 740 | GNUNET_break (0); |
804 | continue; | 741 | continue; |
805 | } | 742 | } |
806 | memcpy (&mh[1], element->data, element->size); | 743 | memcpy (&mh[1], element->data, element->size); |
807 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); | 744 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", |
808 | GNUNET_MQ_send (eo->tc->mq, mqm); | 745 | GNUNET_h2s (&ke->element->element_hash)); |
746 | GNUNET_MQ_send (eo->mq, ev); | ||
809 | ke = ke->next_colliding; | 747 | ke = ke->next_colliding; |
810 | } | 748 | } |
811 | return GNUNET_NO; | 749 | return GNUNET_NO; |
@@ -819,7 +757,7 @@ send_element_iterator (void *cls, | |||
819 | * @param ibf_key IBF key of interest | 757 | * @param ibf_key IBF key of interest |
820 | */ | 758 | */ |
821 | static void | 759 | static void |
822 | send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key) | 760 | send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) |
823 | { | 761 | { |
824 | struct SendElementClosure send_cls; | 762 | struct SendElementClosure send_cls; |
825 | 763 | ||
@@ -837,10 +775,12 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key | |||
837 | * @param eo union operation | 775 | * @param eo union operation |
838 | */ | 776 | */ |
839 | static void | 777 | static void |
840 | decode_and_send (struct UnionEvaluateOperation *eo) | 778 | decode_and_send (struct OperationState *eo) |
841 | { | 779 | { |
842 | struct IBF_Key key; | 780 | struct IBF_Key key; |
781 | struct IBF_Key last_key; | ||
843 | int side; | 782 | int side; |
783 | unsigned int num_decoded; | ||
844 | struct InvertibleBloomFilter *diff_ibf; | 784 | struct InvertibleBloomFilter *diff_ibf; |
845 | 785 | ||
846 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); | 786 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); |
@@ -849,12 +789,35 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
849 | diff_ibf = ibf_dup (eo->local_ibf); | 789 | diff_ibf = ibf_dup (eo->local_ibf); |
850 | ibf_subtract (diff_ibf, eo->remote_ibf); | 790 | ibf_subtract (diff_ibf, eo->remote_ibf); |
851 | 791 | ||
792 | ibf_destroy (eo->remote_ibf); | ||
793 | eo->remote_ibf = NULL; | ||
794 | |||
795 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); | ||
796 | |||
797 | num_decoded = 0; | ||
798 | last_key.key_val = 0; | ||
799 | |||
852 | while (1) | 800 | while (1) |
853 | { | 801 | { |
854 | int res; | 802 | int res; |
803 | int cycle_detected = GNUNET_NO; | ||
804 | |||
805 | last_key = key; | ||
855 | 806 | ||
856 | res = ibf_decode (diff_ibf, &side, &key); | 807 | res = ibf_decode (diff_ibf, &side, &key); |
857 | if (GNUNET_SYSERR == res) | 808 | if (res == GNUNET_OK) |
809 | { | ||
810 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", | ||
811 | key.key_val); | ||
812 | num_decoded += 1; | ||
813 | if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) | ||
814 | { | ||
815 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", | ||
816 | num_decoded, diff_ibf->size); | ||
817 | cycle_detected = GNUNET_YES; | ||
818 | } | ||
819 | } | ||
820 | if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) | ||
858 | { | 821 | { |
859 | int next_order; | 822 | int next_order; |
860 | next_order = 0; | 823 | next_order = 0; |
@@ -877,28 +840,36 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
877 | } | 840 | } |
878 | if (GNUNET_NO == res) | 841 | if (GNUNET_NO == res) |
879 | { | 842 | { |
880 | struct GNUNET_MQ_Envelope *mqm; | 843 | struct GNUNET_MQ_Envelope *ev; |
881 | 844 | ||
882 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); | 845 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); |
883 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 846 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
884 | GNUNET_MQ_send (eo->tc->mq, mqm); | 847 | GNUNET_MQ_send (eo->mq, ev); |
885 | break; | 848 | break; |
886 | } | 849 | } |
887 | if (1 == side) | 850 | if (1 == side) |
888 | { | 851 | { |
889 | send_elements_for_key (eo, key); | 852 | send_elements_for_key (eo, key); |
890 | } | 853 | } |
891 | else | 854 | else if (-1 == side) |
892 | { | 855 | { |
893 | struct GNUNET_MQ_Envelope *mqm; | 856 | struct GNUNET_MQ_Envelope *ev; |
894 | struct GNUNET_MessageHeader *msg; | 857 | struct GNUNET_MessageHeader *msg; |
895 | 858 | ||
896 | /* FIXME: before sending the request, check if we may just have the element */ | 859 | /* FIXME: before sending the request, check if we may just have the element */ |
897 | /* FIXME: merge multiple requests */ | 860 | /* FIXME: merge multiple requests */ |
898 | mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | 861 | /* FIXME: remember somewhere that we already requested the element, |
862 | * so that we don't request it again with the next ibf if decoding fails */ | ||
863 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | ||
899 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | 864 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); |
865 | |||
900 | *(struct IBF_Key *) &msg[1] = key; | 866 | *(struct IBF_Key *) &msg[1] = key; |
901 | GNUNET_MQ_send (eo->tc->mq, mqm); | 867 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); |
868 | GNUNET_MQ_send (eo->mq, ev); | ||
869 | } | ||
870 | else | ||
871 | { | ||
872 | GNUNET_assert (0); | ||
902 | } | 873 | } |
903 | } | 874 | } |
904 | ibf_destroy (diff_ibf); | 875 | ibf_destroy (diff_ibf); |
@@ -914,7 +885,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
914 | static void | 885 | static void |
915 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | 886 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) |
916 | { | 887 | { |
917 | struct UnionEvaluateOperation *eo = cls; | 888 | struct OperationState *eo = cls; |
918 | struct IBFMessage *msg = (struct IBFMessage *) mh; | 889 | struct IBFMessage *msg = (struct IBFMessage *) mh; |
919 | unsigned int buckets_in_message; | 890 | unsigned int buckets_in_message; |
920 | 891 | ||
@@ -923,12 +894,14 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
923 | { | 894 | { |
924 | eo->phase = PHASE_EXPECT_IBF_CONT; | 895 | eo->phase = PHASE_EXPECT_IBF_CONT; |
925 | GNUNET_assert (NULL == eo->remote_ibf); | 896 | GNUNET_assert (NULL == eo->remote_ibf); |
926 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order); | 897 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); |
927 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 898 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); |
899 | eo->ibf_buckets_received = 0; | ||
928 | if (0 != ntohs (msg->offset)) | 900 | if (0 != ntohs (msg->offset)) |
929 | { | 901 | { |
930 | GNUNET_break (0); | 902 | GNUNET_break (0); |
931 | fail_union_operation (eo); | 903 | fail_union_operation (eo); |
904 | return; | ||
932 | } | 905 | } |
933 | } | 906 | } |
934 | else if (eo->phase == PHASE_EXPECT_IBF_CONT) | 907 | else if (eo->phase == PHASE_EXPECT_IBF_CONT) |
@@ -944,6 +917,13 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
944 | 917 | ||
945 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 918 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; |
946 | 919 | ||
920 | if (0 == buckets_in_message) | ||
921 | { | ||
922 | GNUNET_break_op (0); | ||
923 | fail_union_operation (eo); | ||
924 | return; | ||
925 | } | ||
926 | |||
947 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | 927 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) |
948 | { | 928 | { |
949 | GNUNET_break (0); | 929 | GNUNET_break (0); |
@@ -956,8 +936,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
956 | 936 | ||
957 | if (eo->ibf_buckets_received == eo->remote_ibf->size) | 937 | if (eo->ibf_buckets_received == eo->remote_ibf->size) |
958 | { | 938 | { |
959 | 939 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); | |
960 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n"); | ||
961 | eo->phase = PHASE_EXPECT_ELEMENTS; | 940 | eo->phase = PHASE_EXPECT_ELEMENTS; |
962 | decode_and_send (eo); | 941 | decode_and_send (eo); |
963 | } | 942 | } |
@@ -972,24 +951,26 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | |||
972 | * @param element element to send | 951 | * @param element element to send |
973 | */ | 952 | */ |
974 | static void | 953 | static void |
975 | send_client_element (struct UnionEvaluateOperation *eo, | 954 | send_client_element (struct OperationState *eo, |
976 | struct GNUNET_SET_Element *element) | 955 | struct GNUNET_SET_Element *element) |
977 | { | 956 | { |
978 | struct GNUNET_MQ_Envelope *mqm; | 957 | struct GNUNET_MQ_Envelope *ev; |
979 | struct GNUNET_SET_ResultMessage *rm; | 958 | struct GNUNET_SET_ResultMessage *rm; |
980 | 959 | ||
981 | GNUNET_assert (0 != eo->request_id); | 960 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); |
982 | mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); | 961 | GNUNET_assert (0 != eo->spec->client_request_id); |
983 | if (NULL == mqm) | 962 | ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
963 | if (NULL == ev) | ||
984 | { | 964 | { |
985 | GNUNET_MQ_discard (mqm); | 965 | GNUNET_MQ_discard (ev); |
986 | GNUNET_break (0); | 966 | GNUNET_break (0); |
987 | return; | 967 | return; |
988 | } | 968 | } |
989 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 969 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
990 | rm->request_id = htonl (eo->request_id); | 970 | rm->request_id = htonl (eo->spec->client_request_id); |
971 | rm->element_type = element->type; | ||
991 | memcpy (&rm[1], element->data, element->size); | 972 | memcpy (&rm[1], element->data, element->size); |
992 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 973 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); |
993 | } | 974 | } |
994 | 975 | ||
995 | 976 | ||
@@ -1002,17 +983,22 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
1002 | * @param eo union operation | 983 | * @param eo union operation |
1003 | */ | 984 | */ |
1004 | static void | 985 | static void |
1005 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | 986 | send_client_done_and_destroy (struct OperationState *eo) |
1006 | { | 987 | { |
1007 | struct GNUNET_MQ_Envelope *mqm; | 988 | struct GNUNET_MQ_Envelope *ev; |
1008 | struct GNUNET_SET_ResultMessage *rm; | 989 | struct GNUNET_SET_ResultMessage *rm; |
1009 | 990 | ||
1010 | GNUNET_assert (0 != eo->request_id); | 991 | GNUNET_assert (GNUNET_NO == eo->client_done_sent); |
1011 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 992 | |
1012 | rm->request_id = htonl (eo->request_id); | 993 | eo->client_done_sent = GNUNET_YES; |
994 | |||
995 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
996 | rm->request_id = htonl (eo->spec->client_request_id); | ||
1013 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 997 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1014 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 998 | rm->element_type = htons (0); |
999 | GNUNET_MQ_send (eo->spec->set->client_mq, ev); | ||
1015 | 1000 | ||
1001 | union_operation_destroy (eo); | ||
1016 | } | 1002 | } |
1017 | 1003 | ||
1018 | 1004 | ||
@@ -1025,11 +1011,11 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | |||
1025 | static void | 1011 | static void |
1026 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | 1012 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) |
1027 | { | 1013 | { |
1028 | struct UnionEvaluateOperation *eo = cls; | 1014 | struct OperationState *eo = cls; |
1029 | struct ElementEntry *ee; | 1015 | struct ElementEntry *ee; |
1030 | uint16_t element_size; | 1016 | uint16_t element_size; |
1031 | 1017 | ||
1032 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n"); | 1018 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); |
1033 | 1019 | ||
1034 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && | 1020 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && |
1035 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) | 1021 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) |
@@ -1041,13 +1027,15 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1041 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); | 1027 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); |
1042 | ee = GNUNET_malloc (sizeof *ee + element_size); | 1028 | ee = GNUNET_malloc (sizeof *ee + element_size); |
1043 | memcpy (&ee[1], &mh[1], element_size); | 1029 | memcpy (&ee[1], &mh[1], element_size); |
1030 | ee->element.size = element_size; | ||
1044 | ee->element.data = &ee[1]; | 1031 | ee->element.data = &ee[1]; |
1045 | ee->remote = GNUNET_YES; | 1032 | ee->remote = GNUNET_YES; |
1033 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); | ||
1046 | 1034 | ||
1047 | insert_element (eo, ee); | 1035 | /* FIXME: see if the element has already been inserted! */ |
1048 | send_client_element (eo, &ee->element); | ||
1049 | 1036 | ||
1050 | GNUNET_free (ee); | 1037 | op_register_element (eo, ee); |
1038 | send_client_element (eo, &ee->element); | ||
1051 | } | 1039 | } |
1052 | 1040 | ||
1053 | 1041 | ||
@@ -1060,7 +1048,7 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1060 | static void | 1048 | static void |
1061 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | 1049 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) |
1062 | { | 1050 | { |
1063 | struct UnionEvaluateOperation *eo = cls; | 1051 | struct OperationState *eo = cls; |
1064 | struct IBF_Key *ibf_key; | 1052 | struct IBF_Key *ibf_key; |
1065 | unsigned int num_keys; | 1053 | unsigned int num_keys; |
1066 | 1054 | ||
@@ -1091,20 +1079,6 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1091 | 1079 | ||
1092 | 1080 | ||
1093 | /** | 1081 | /** |
1094 | * Callback used for notifications | ||
1095 | * | ||
1096 | * @param cls closure | ||
1097 | */ | ||
1098 | static void | ||
1099 | peer_done_sent_cb (void *cls) | ||
1100 | { | ||
1101 | struct UnionEvaluateOperation *eo = cls; | ||
1102 | |||
1103 | send_client_done_and_destroy (eo); | ||
1104 | } | ||
1105 | |||
1106 | |||
1107 | /** | ||
1108 | * Handle a done message from a remote peer | 1082 | * Handle a done message from a remote peer |
1109 | * | 1083 | * |
1110 | * @param cls the union operation | 1084 | * @param cls the union operation |
@@ -1113,23 +1087,22 @@ peer_done_sent_cb (void *cls) | |||
1113 | static void | 1087 | static void |
1114 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | 1088 | handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) |
1115 | { | 1089 | { |
1116 | struct UnionEvaluateOperation *eo = cls; | 1090 | struct OperationState *eo = cls; |
1091 | struct GNUNET_MQ_Envelope *ev; | ||
1117 | 1092 | ||
1118 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1093 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1119 | { | 1094 | { |
1120 | /* we got all requests, but still have to send our elements as response */ | 1095 | /* we got all requests, but still have to send our elements as response */ |
1121 | struct GNUNET_MQ_Envelope *mqm; | ||
1122 | 1096 | ||
1123 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); | 1097 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); |
1124 | eo->phase = PHASE_FINISHED; | 1098 | eo->phase = PHASE_FINISHED; |
1125 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1099 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
1126 | GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); | 1100 | GNUNET_MQ_send (eo->mq, ev); |
1127 | GNUNET_MQ_send (eo->tc->mq, mqm); | ||
1128 | return; | 1101 | return; |
1129 | } | 1102 | } |
1130 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | 1103 | if (eo->phase == PHASE_EXPECT_ELEMENTS) |
1131 | { | 1104 | { |
1132 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n"); | 1105 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); |
1133 | eo->phase = PHASE_FINISHED; | 1106 | eo->phase = PHASE_FINISHED; |
1134 | send_client_done_and_destroy (eo); | 1107 | send_client_done_and_destroy (eo); |
1135 | return; | 1108 | return; |
@@ -1143,45 +1116,38 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1143 | * Evaluate a union operation with | 1116 | * Evaluate a union operation with |
1144 | * a remote peer. | 1117 | * a remote peer. |
1145 | * | 1118 | * |
1146 | * @param m the evaluate request message from the client | 1119 | * @param spec specification of the operation the evaluate |
1147 | * @param set the set to evaluate the operation with | 1120 | * @param tunnel tunnel already connected to the partner peer |
1121 | * @param tc tunnel context, passed here so all new incoming | ||
1122 | * messages are directly going to the union operations | ||
1123 | * @return a handle to the operation | ||
1148 | */ | 1124 | */ |
1149 | void | 1125 | static void |
1150 | _GSS_intersection_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) | 1126 | intersection_evaluate (struct OperationSpecification *spec, |
1127 | struct GNUNET_MESH_Tunnel *tunnel, | ||
1128 | struct TunnelContext *tc) | ||
1151 | { | 1129 | { |
1152 | struct IntersectionEvaluateOperation *eo; | 1130 | struct OperationState *eo; |
1153 | struct GNUNET_MessageHeader *context_msg; | 1131 | |
1154 | 1132 | eo = GNUNET_new (struct OperationState); | |
1155 | eo = GNUNET_new (struct IntersectionEvaluateOperation); | 1133 | tc->vt = _GSS_union_vt (); |
1156 | eo->peer = m->target_peer; | 1134 | tc->op = eo; |
1157 | eo->set = set; | 1135 | eo->se = strata_estimator_dup (spec->set->state->se); |
1158 | eo->request_id = htonl (m->request_id); | 1136 | eo->generation_created = spec->set->current_generation++; |
1159 | GNUNET_assert (0 != eo->request_id); | 1137 | eo->set = spec->set; |
1160 | eo->se = strata_estimator_dup (set->state.i->se); | 1138 | eo->spec = spec; |
1161 | eo->salt = ntohs (m->salt); | 1139 | eo->tunnel = tunnel; |
1162 | eo->app_id = m->app_id; | 1140 | eo->mq = GNUNET_MESH_mq_create (tunnel); |
1163 | 1141 | ||
1164 | context_msg = GNUNET_MQ_extract_nested_mh (m); | 1142 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1165 | if (NULL != context_msg) | 1143 | "evaluating union operation, (app %s)\n", |
1166 | { | 1144 | GNUNET_h2s (&eo->spec->app_id)); |
1167 | eo->context_msg = GNUNET_copy_message (context_msg); | ||
1168 | } | ||
1169 | |||
1170 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1171 | "evaluating intersection operation, (app %s)\n", | ||
1172 | GNUNET_h2s (&eo->app_id)); | ||
1173 | 1145 | ||
1174 | eo->tc = GNUNET_new (struct TunnelContext); | ||
1175 | eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer, | ||
1176 | GNUNET_APPLICATION_TYPE_SET); | ||
1177 | GNUNET_assert (NULL != eo->tc->tunnel); | ||
1178 | eo->tc->peer = eo->peer; | ||
1179 | eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel); | ||
1180 | /* we started the operation, thus we have to send the operation request */ | 1146 | /* we started the operation, thus we have to send the operation request */ |
1181 | eo->phase = PHASE_EXPECT_SE; | 1147 | eo->phase = PHASE_EXPECT_SE; |
1182 | 1148 | ||
1183 | GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head, | 1149 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, |
1184 | eo->set->state.i->ops_tail, | 1150 | eo->set->state->ops_tail, |
1185 | eo); | 1151 | eo); |
1186 | 1152 | ||
1187 | send_operation_request (eo); | 1153 | send_operation_request (eo); |
@@ -1191,29 +1157,33 @@ _GSS_intersection_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *se | |||
1191 | /** | 1157 | /** |
1192 | * Accept an union operation request from a remote peer | 1158 | * Accept an union operation request from a remote peer |
1193 | * | 1159 | * |
1194 | * @param m the accept message from the client | 1160 | * @param spec all necessary information about the operation |
1195 | * @param set the set of the client | 1161 | * @param tunnel open tunnel to the partner's peer |
1196 | * @param incoming information about the requesting remote peer | 1162 | * @param tc tunnel context, passed here so all new incoming |
1163 | * messages are directly going to the union operations | ||
1164 | * @return operation | ||
1197 | */ | 1165 | */ |
1198 | void | 1166 | static void |
1199 | _GSS_intersection_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, | 1167 | intersection_accept (struct OperationSpecification *spec, |
1200 | struct Incoming *incoming) | 1168 | struct GNUNET_MESH_Tunnel *tunnel, |
1169 | struct TunnelContext *tc) | ||
1201 | { | 1170 | { |
1202 | struct IntersectionEvaluateOperation *eo; | 1171 | struct OperationState *eo; |
1203 | 1172 | ||
1204 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); | 1173 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); |
1205 | 1174 | ||
1206 | eo = GNUNET_new (struct IntersectionEvaluateOperation); | 1175 | eo = GNUNET_new (struct OperationState); |
1207 | eo->tc = incoming->tc; | 1176 | tc->vt = _GSS_union_vt (); |
1208 | eo->generation_created = set->state.i->current_generation++; | 1177 | tc->op = eo; |
1209 | eo->set = set; | 1178 | eo->set = spec->set; |
1210 | eo->salt = ntohs (incoming->salt); | 1179 | eo->generation_created = eo->set->current_generation++; |
1211 | GNUNET_assert (0 != ntohl (m->request_id)); | 1180 | eo->spec = spec; |
1212 | eo->request_id = ntohl (m->request_id); | 1181 | eo->tunnel = tunnel; |
1213 | eo->se = strata_estimator_dup (set->state.i->se); | 1182 | eo->mq = GNUNET_MESH_mq_create (tunnel); |
1183 | eo->se = strata_estimator_dup (eo->set->state->se); | ||
1214 | /* transfer ownership of mq and socket from incoming to eo */ | 1184 | /* transfer ownership of mq and socket from incoming to eo */ |
1215 | GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head, | 1185 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, |
1216 | eo->set->state.i->ops_tail, | 1186 | eo->set->state->ops_tail, |
1217 | eo); | 1187 | eo); |
1218 | /* kick off the operation */ | 1188 | /* kick off the operation */ |
1219 | send_strata_estimator (eo); | 1189 | send_strata_estimator (eo); |
@@ -1221,156 +1191,86 @@ _GSS_intersection_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set * | |||
1221 | 1191 | ||
1222 | 1192 | ||
1223 | /** | 1193 | /** |
1224 | * Create a new set supporting the intersection operation | 1194 | * Create a new set supporting the union operation |
1225 | * | 1195 | * |
1226 | * @return the newly created set | 1196 | * @return the newly created set |
1227 | */ | 1197 | */ |
1228 | struct Set * | 1198 | static struct SetState * |
1229 | _GSS_intersection_set_create (void) | 1199 | intersection_set_create (void) |
1230 | { | 1200 | { |
1231 | struct Set *set; | 1201 | struct SetState *set_state; |
1232 | 1202 | ||
1233 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "intersection set created\n"); | 1203 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n"); |
1234 | 1204 | ||
1235 | set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct IntersectionState)); | 1205 | set_state = GNUNET_new (struct SetState); |
1236 | set->state.i = (struct IntersectionState *) &set[1]; | 1206 | set_state->se = strata_estimator_create (SE_STRATA_COUNT, |
1237 | set->operation = GNUNET_SET_OPERATION_INTERSECTION; | ||
1238 | /* keys of the hash map are stored in the element entrys, thus we do not | ||
1239 | * want the hash map to copy them */ | ||
1240 | set->state.i->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
1241 | set->state.i->se = strata_estimator_create (SE_STRATA_COUNT, | ||
1242 | SE_IBF_SIZE, SE_IBF_HASH_NUM); | 1207 | SE_IBF_SIZE, SE_IBF_HASH_NUM); |
1243 | return set; | 1208 | return set_state; |
1244 | } | 1209 | } |
1245 | 1210 | ||
1246 | 1211 | ||
1247 | /** | 1212 | /** |
1248 | * Add the element from the given element message to the set. | 1213 | * Add the element from the given element message to the set. |
1249 | * | 1214 | * |
1250 | * @param m message with the element | 1215 | * @param set_state state of the set want to add to |
1251 | * @param set set to add the element to | 1216 | * @param ee the element to add to the set |
1252 | */ | 1217 | */ |
1253 | void | 1218 | static void |
1254 | _GSS_intersection_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) | 1219 | intersection_add (struct SetState *set_state, struct ElementEntry *ee) |
1255 | { | 1220 | { |
1256 | struct ElementEntry *ee; | 1221 | strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0)); |
1257 | struct ElementEntry *ee_dup; | ||
1258 | uint16_t element_size; | ||
1259 | |||
1260 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n"); | ||
1261 | |||
1262 | GNUNET_assert (GNUNET_SET_OPERATION_INTERSECTION == set->operation); | ||
1263 | element_size = ntohs (m->header.size) - sizeof *m; | ||
1264 | ee = GNUNET_malloc (element_size + sizeof *ee); | ||
1265 | ee->element.size = element_size; | ||
1266 | memcpy (&ee[1], &m[1], element_size); | ||
1267 | ee->element.data = &ee[1]; | ||
1268 | ee->generation_added = set->state.i->current_generation; | ||
1269 | GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); | ||
1270 | ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &ee->element_hash); | ||
1271 | if (NULL != ee_dup) | ||
1272 | { | ||
1273 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n"); | ||
1274 | GNUNET_free (ee); | ||
1275 | return; | ||
1276 | } | ||
1277 | GNUNET_CONTAINER_multihashmap_put (set->state.i->elements, &ee->element_hash, ee, | ||
1278 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
1279 | strata_estimator_insert (set->state.i->se, get_ibf_key (&ee->element_hash, 0)); | ||
1280 | } | 1222 | } |
1281 | 1223 | ||
1282 | 1224 | ||
1283 | /** | 1225 | /** |
1284 | * Destroy a set that supports the union operation | 1226 | * Destroy a set that supports the union operation |
1285 | * | 1227 | * |
1286 | * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION | 1228 | * @param set_state the set to destroy |
1287 | */ | 1229 | */ |
1288 | void | 1230 | static void |
1289 | _GSS_union_set_destroy (struct Set *set) | 1231 | intersection_set_destroy (struct SetState *set_state) |
1290 | { | 1232 | { |
1291 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); | 1233 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n"); |
1292 | if (NULL != set->client) | 1234 | /* important to destroy operations before the rest of the set */ |
1293 | { | 1235 | while (NULL != set_state->ops_head) |
1294 | GNUNET_SERVER_client_drop (set->client); | 1236 | union_operation_destroy (set_state->ops_head); |
1295 | set->client = NULL; | 1237 | if (NULL != set_state->se) |
1296 | } | ||
1297 | if (NULL != set->client_mq) | ||
1298 | { | ||
1299 | GNUNET_MQ_destroy (set->client_mq); | ||
1300 | set->client_mq = NULL; | ||
1301 | } | ||
1302 | |||
1303 | if (NULL != set->state.u->se) | ||
1304 | { | ||
1305 | strata_estimator_destroy (set->state.u->se); | ||
1306 | set->state.u->se = NULL; | ||
1307 | } | ||
1308 | |||
1309 | destroy_elements (set->state.u); | ||
1310 | |||
1311 | while (NULL != set->state.u->ops_head) | ||
1312 | { | 1238 | { |
1313 | _GSS_union_operation_destroy (set->state.u->ops_head); | 1239 | strata_estimator_destroy (set_state->se); |
1240 | set_state->se = NULL; | ||
1314 | } | 1241 | } |
1242 | GNUNET_free (set_state); | ||
1315 | } | 1243 | } |
1316 | 1244 | ||
1245 | |||
1317 | /** | 1246 | /** |
1318 | * Remove the element given in the element message from the set. | 1247 | * Remove the element given in the element message from the set. |
1319 | * Only marks the element as removed, so that older set operations can still exchange it. | 1248 | * Only marks the element as removed, so that older set operations can still exchange it. |
1320 | * | 1249 | * |
1321 | * @param m message with the element | 1250 | * @param set_state state of the set to remove from |
1322 | * @param set set to remove the element from | 1251 | * @param element set element to remove |
1323 | */ | 1252 | */ |
1324 | void | 1253 | static void |
1325 | _GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) | 1254 | intersection_remove (struct SetState *set_state, struct ElementEntry *element) |
1326 | { | 1255 | { |
1327 | struct GNUNET_HashCode hash; | 1256 | /* FIXME: remove from strata estimator */ |
1328 | struct ElementEntry *ee; | ||
1329 | |||
1330 | GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); | ||
1331 | GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash); | ||
1332 | ee = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &hash); | ||
1333 | if (NULL == ee) | ||
1334 | { | ||
1335 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n"); | ||
1336 | return; | ||
1337 | } | ||
1338 | if (GNUNET_YES == ee->removed) | ||
1339 | { | ||
1340 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n"); | ||
1341 | return; | ||
1342 | } | ||
1343 | ee->removed = GNUNET_YES; | ||
1344 | ee->generation_removed = set->state.i->current_generation; | ||
1345 | } | 1257 | } |
1346 | 1258 | ||
1347 | 1259 | ||
1348 | /** | 1260 | /** |
1349 | * Dispatch messages for a union operation. | 1261 | * Dispatch messages for a union operation. |
1350 | * | 1262 | * |
1351 | * @param cls closure | 1263 | * @param eo the state of the union evaluate operation |
1352 | * @param tunnel mesh tunnel | 1264 | * @param mh the received message |
1353 | * @param tunnel_ctx tunnel context | 1265 | * @return GNUNET_SYSERR if the tunnel should be disconnected, |
1354 | * @param mh message to process | 1266 | * GNUNET_OK otherwise |
1355 | * @return ??? | ||
1356 | */ | 1267 | */ |
1357 | int | 1268 | int |
1358 | _GSS_union_handle_p2p_message (void *cls, | 1269 | intersection_handle_p2p_message (struct OperationState *eo, |
1359 | struct GNUNET_MESH_Tunnel *tunnel, | 1270 | const struct GNUNET_MessageHeader *mh) |
1360 | void **tunnel_ctx, | ||
1361 | const struct GNUNET_MessageHeader *mh) | ||
1362 | { | 1271 | { |
1363 | struct TunnelContext *tc = *tunnel_ctx; | 1272 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", |
1364 | struct UnionEvaluateOperation *eo; | 1273 | ntohs (mh->type), ntohs (mh->size)); |
1365 | |||
1366 | if (CONTEXT_OPERATION_UNION != tc->type) | ||
1367 | { | ||
1368 | /* never kill mesh */ | ||
1369 | return GNUNET_OK; | ||
1370 | } | ||
1371 | |||
1372 | eo = tc->data; | ||
1373 | |||
1374 | switch (ntohs (mh->type)) | 1274 | switch (ntohs (mh->type)) |
1375 | { | 1275 | { |
1376 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | 1276 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: |
@@ -1392,6 +1292,63 @@ _GSS_union_handle_p2p_message (void *cls, | |||
1392 | /* something wrong with mesh's message handlers? */ | 1292 | /* something wrong with mesh's message handlers? */ |
1393 | GNUNET_assert (0); | 1293 | GNUNET_assert (0); |
1394 | } | 1294 | } |
1395 | /* never kill mesh! */ | ||
1396 | return GNUNET_OK; | 1295 | return GNUNET_OK; |
1397 | } | 1296 | } |
1297 | |||
1298 | |||
1299 | static void | ||
1300 | intersection_peer_disconnect (struct OperationState *op) | ||
1301 | { | ||
1302 | /* Are we already disconnected? */ | ||
1303 | if (NULL == op->tunnel) | ||
1304 | return; | ||
1305 | op->tunnel = NULL; | ||
1306 | if (NULL != op->mq) | ||
1307 | { | ||
1308 | GNUNET_MQ_destroy (op->mq); | ||
1309 | op->mq = NULL; | ||
1310 | } | ||
1311 | if (PHASE_FINISHED != op->phase) | ||
1312 | { | ||
1313 | struct GNUNET_MQ_Envelope *ev; | ||
1314 | struct GNUNET_SET_ResultMessage *msg; | ||
1315 | |||
1316 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
1317 | msg->request_id = htonl (op->spec->client_request_id); | ||
1318 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | ||
1319 | msg->element_type = htons (0); | ||
1320 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
1321 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); | ||
1322 | union_operation_destroy (op); | ||
1323 | return; | ||
1324 | } | ||
1325 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); | ||
1326 | if (GNUNET_NO == op->client_done_sent) | ||
1327 | send_client_done_and_destroy (op); | ||
1328 | } | ||
1329 | |||
1330 | |||
1331 | static void | ||
1332 | intersection_op_cancel (struct SetState *set_state, uint32_t op_id) | ||
1333 | { | ||
1334 | /* FIXME: implement */ | ||
1335 | } | ||
1336 | |||
1337 | |||
1338 | const struct SetVT * | ||
1339 | _GSS_intersection_vt () | ||
1340 | { | ||
1341 | static const struct SetVT union_vt = { | ||
1342 | .create = &union_set_create, | ||
1343 | .msg_handler = &union_handle_p2p_message, | ||
1344 | .add = &union_add, | ||
1345 | .remove = &union_remove, | ||
1346 | .destroy_set = &union_set_destroy, | ||
1347 | .evaluate = &union_evaluate, | ||
1348 | .accept = &union_accept, | ||
1349 | .peer_disconnect = &union_peer_disconnect, | ||
1350 | .cancel = &union_op_cancel, | ||
1351 | }; | ||
1352 | |||
1353 | return &union_vt; | ||
1354 | } | ||