diff options
author | Christian Grothoff <christian@grothoff.org> | 2014-11-28 20:52:20 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2014-11-28 20:52:20 +0000 |
commit | f4c9c6514494547973b8962f22fce8266afd4992 (patch) | |
tree | 2211f91a0e80f143cb066ca365c6ddcbb02759cf /src/set/gnunet-service-set_intersection.c | |
parent | 264c9b7f668d0429eaae01075c742ae6ad6f49ee (diff) | |
download | gnunet-f4c9c6514494547973b8962f22fce8266afd4992.tar.gz gnunet-f4c9c6514494547973b8962f22fce8266afd4992.zip |
-fixing misc issues and bugs, including better termination logic for intersection and salt handling
Diffstat (limited to 'src/set/gnunet-service-set_intersection.c')
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 588 |
1 files changed, 313 insertions, 275 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 19d6498f5..fb3cd23b5 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -21,6 +21,7 @@ | |||
21 | * @file set/gnunet-service-set_intersection.c | 21 | * @file set/gnunet-service-set_intersection.c |
22 | * @brief two-peer set intersection | 22 | * @brief two-peer set intersection |
23 | * @author Christian Fuchs | 23 | * @author Christian Fuchs |
24 | * @author Christian Grothoff | ||
24 | */ | 25 | */ |
25 | #include "platform.h" | 26 | #include "platform.h" |
26 | #include "gnunet_util_lib.h" | 27 | #include "gnunet_util_lib.h" |
@@ -29,24 +30,6 @@ | |||
29 | #include "gnunet-service-set_protocol.h" | 30 | #include "gnunet-service-set_protocol.h" |
30 | #include <gcrypt.h> | 31 | #include <gcrypt.h> |
31 | 32 | ||
32 | #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH | ||
33 | |||
34 | /** | ||
35 | * Calculate the size of the bloom filter. | ||
36 | * | ||
37 | * @param A | ||
38 | * @param B | ||
39 | * @param s | ||
40 | * @param k | ||
41 | * @return | ||
42 | */ | ||
43 | #define CALCULATE_BF_SIZE(A, B, s, k) \ | ||
44 | do { \ | ||
45 | k = ceil(1 + log2((double) (2*B / (double) A)));\ | ||
46 | if (k<1) k=1; /* k can be calculated as 0 */\ | ||
47 | s = ceil((double) (A * k / log(2))); \ | ||
48 | } while (0) | ||
49 | |||
50 | 33 | ||
51 | /** | 34 | /** |
52 | * Current phase we are in for a intersection operation. | 35 | * Current phase we are in for a intersection operation. |
@@ -61,21 +44,13 @@ enum IntersectionOperationPhase | |||
61 | 44 | ||
62 | /** | 45 | /** |
63 | * Bob has accepted the operation, Bob and Alice are now exchanging bfs | 46 | * Bob has accepted the operation, Bob and Alice are now exchanging bfs |
64 | * until one notices the their element count is equal | 47 | * until one notices the their element hashes are equal. |
65 | */ | 48 | */ |
66 | PHASE_BF_EXCHANGE, | 49 | PHASE_BF_EXCHANGE, |
67 | 50 | ||
68 | /** | 51 | /** |
69 | * if both peers have an equal peercount, they enter this state for | 52 | * The protocol is over. Results may still have to be sent to the |
70 | * one more turn, to see if they actually have agreed on a correct set. | 53 | * client. |
71 | * if a peer finds the same element count after the next iteration, | ||
72 | * it ends the the session | ||
73 | */ | ||
74 | PHASE_MAYBE_FINISHED, | ||
75 | |||
76 | /** | ||
77 | * The protocol is over. | ||
78 | * Results may still have to be sent to the client. | ||
79 | */ | 54 | */ |
80 | PHASE_FINISHED | 55 | PHASE_FINISHED |
81 | }; | 56 | }; |
@@ -118,12 +93,33 @@ struct OperationState | |||
118 | struct OperationState *prev; | 93 | struct OperationState *prev; |
119 | 94 | ||
120 | /** | 95 | /** |
121 | * for multipart msgs we have to store the bloomfilter-data until we fully sent it. | 96 | * For multipart BF transmissions, we have to store the |
97 | * bloomfilter-data until we fully received it. | ||
122 | */ | 98 | */ |
123 | char *bf_data; | 99 | char *bf_data; |
124 | 100 | ||
125 | /** | 101 | /** |
126 | * Current element count contained within @e my_elements | 102 | * XOR of the keys of all of the elements (remaining) in my set. |
103 | * Always updated when elements are added or removed to | ||
104 | * @e my_elements. | ||
105 | */ | ||
106 | struct GNUNET_HashCode my_xor; | ||
107 | |||
108 | /** | ||
109 | * XOR of the keys of all of the elements (remaining) in | ||
110 | * the other peer's set. Updated when we receive the | ||
111 | * other peer's Bloom filter. | ||
112 | */ | ||
113 | struct GNUNET_HashCode other_xor; | ||
114 | |||
115 | /** | ||
116 | * How many bytes of @e bf_data are valid? | ||
117 | */ | ||
118 | uint32_t bf_data_offset; | ||
119 | |||
120 | /** | ||
121 | * Current element count contained within @e my_elements. | ||
122 | * (May differ briefly during initialization.) | ||
127 | */ | 123 | */ |
128 | uint32_t my_element_count; | 124 | uint32_t my_element_count; |
129 | 125 | ||
@@ -138,6 +134,12 @@ struct OperationState | |||
138 | uint32_t bf_bits_per_element; | 134 | uint32_t bf_bits_per_element; |
139 | 135 | ||
140 | /** | 136 | /** |
137 | * Salt currently used for BF construction (by us or the other peer, | ||
138 | * depending on where we are in the code). | ||
139 | */ | ||
140 | uint32_t salt; | ||
141 | |||
142 | /** | ||
141 | * Current state of the operation. | 143 | * Current state of the operation. |
142 | */ | 144 | */ |
143 | enum IntersectionOperationPhase phase; | 145 | enum IntersectionOperationPhase phase; |
@@ -162,7 +164,8 @@ struct OperationState | |||
162 | struct SetState | 164 | struct SetState |
163 | { | 165 | { |
164 | /** | 166 | /** |
165 | * Number of currently valid elements in the set which have not been removed | 167 | * Number of currently valid elements in the set which have not been |
168 | * removed. | ||
166 | */ | 169 | */ |
167 | uint32_t current_set_element_count; | 170 | uint32_t current_set_element_count; |
168 | }; | 171 | }; |
@@ -208,8 +211,7 @@ send_client_removed_element (struct Operation *op, | |||
208 | 211 | ||
209 | 212 | ||
210 | /** | 213 | /** |
211 | * Fills the "my_elements" hashmap with all relevant elements and | 214 | * Fills the "my_elements" hashmap with all relevant elements. |
212 | * adds their mutated hashes to our local bloomfilter with mutator+1. | ||
213 | * | 215 | * |
214 | * @param cls the `struct Operation *` we are performing | 216 | * @param cls the `struct Operation *` we are performing |
215 | * @param key current key code | 217 | * @param key current key code |
@@ -217,9 +219,9 @@ send_client_removed_element (struct Operation *op, | |||
217 | * @return #GNUNET_YES (we should continue to iterate) | 219 | * @return #GNUNET_YES (we should continue to iterate) |
218 | */ | 220 | */ |
219 | static int | 221 | static int |
220 | filtered_map_and_bf_initialization (void *cls, | 222 | filtered_map_initialization (void *cls, |
221 | const struct GNUNET_HashCode *key, | 223 | const struct GNUNET_HashCode *key, |
222 | void *value) | 224 | void *value) |
223 | { | 225 | { |
224 | struct Operation *op = cls; | 226 | struct Operation *op = cls; |
225 | struct ElementEntry *ee = value; | 227 | struct ElementEntry *ee = value; |
@@ -230,9 +232,8 @@ filtered_map_and_bf_initialization (void *cls, | |||
230 | return GNUNET_YES; /* element not valid in our operation's generation */ | 232 | return GNUNET_YES; /* element not valid in our operation's generation */ |
231 | 233 | ||
232 | /* Test if element is in Bob's bloomfilter */ | 234 | /* Test if element is in Bob's bloomfilter */ |
233 | // FIXME: where does this salt come from!? | ||
234 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, | 235 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, |
235 | op->spec->salt, | 236 | op->state->salt, |
236 | &mutated_hash); | 237 | &mutated_hash); |
237 | if (GNUNET_NO == | 238 | if (GNUNET_NO == |
238 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, | 239 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, |
@@ -244,6 +245,9 @@ filtered_map_and_bf_initialization (void *cls, | |||
244 | return GNUNET_YES; | 245 | return GNUNET_YES; |
245 | } | 246 | } |
246 | op->state->my_element_count++; | 247 | op->state->my_element_count++; |
248 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
249 | &ee->element_hash, | ||
250 | &op->state->my_xor); | ||
247 | GNUNET_break (GNUNET_YES == | 251 | GNUNET_break (GNUNET_YES == |
248 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | 252 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, |
249 | &ee->element_hash, | 253 | &ee->element_hash, |
@@ -272,15 +276,18 @@ iterator_bf_reduce (void *cls, | |||
272 | struct ElementEntry *ee = value; | 276 | struct ElementEntry *ee = value; |
273 | struct GNUNET_HashCode mutated_hash; | 277 | struct GNUNET_HashCode mutated_hash; |
274 | 278 | ||
275 | // FIXME: where does this salt come from!? | ||
276 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, | 279 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, |
277 | op->spec->salt, | 280 | op->state->salt, |
278 | &mutated_hash); | 281 | &mutated_hash); |
279 | if (GNUNET_NO == | 282 | if (GNUNET_NO == |
280 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, | 283 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, |
281 | &mutated_hash)) | 284 | &mutated_hash)) |
282 | { | 285 | { |
286 | GNUNET_break (0 < op->state->my_element_count); | ||
283 | op->state->my_element_count--; | 287 | op->state->my_element_count--; |
288 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
289 | &ee->element_hash, | ||
290 | &op->state->my_xor); | ||
284 | GNUNET_assert (GNUNET_YES == | 291 | GNUNET_assert (GNUNET_YES == |
285 | GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, | 292 | GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, |
286 | &ee->element_hash, | 293 | &ee->element_hash, |
@@ -309,9 +316,8 @@ iterator_bf_create (void *cls, | |||
309 | struct ElementEntry *ee = value; | 316 | struct ElementEntry *ee = value; |
310 | struct GNUNET_HashCode mutated_hash; | 317 | struct GNUNET_HashCode mutated_hash; |
311 | 318 | ||
312 | // FIXME: where does this salt come from!? | ||
313 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, | 319 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, |
314 | op->spec->salt, | 320 | op->state->salt, |
315 | &mutated_hash); | 321 | &mutated_hash); |
316 | GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, | 322 | GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, |
317 | &mutated_hash); | 323 | &mutated_hash); |
@@ -350,49 +356,6 @@ fail_intersection_operation (struct Operation *op) | |||
350 | } | 356 | } |
351 | 357 | ||
352 | 358 | ||
353 | |||
354 | |||
355 | |||
356 | |||
357 | |||
358 | /** | ||
359 | * | ||
360 | * @param op | ||
361 | * @param offset | ||
362 | */ | ||
363 | static void | ||
364 | send_bloomfilter_multipart (struct Operation *op, | ||
365 | uint32_t offset) | ||
366 | { | ||
367 | struct GNUNET_MQ_Envelope *ev; | ||
368 | struct BFPart *msg; | ||
369 | uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); | ||
370 | uint32_t todo_size = op->state->bf_data_size - offset; | ||
371 | |||
372 | if (todo_size < chunk_size) | ||
373 | chunk_size = todo_size; | ||
374 | |||
375 | ev = GNUNET_MQ_msg_extra (msg, | ||
376 | chunk_size, | ||
377 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); | ||
378 | |||
379 | msg->chunk_length = htonl (chunk_size); | ||
380 | msg->chunk_offset = htonl (offset); | ||
381 | memcpy(&msg[1], &op->state->bf_data[offset], chunk_size); | ||
382 | |||
383 | GNUNET_MQ_send (op->mq, ev); | ||
384 | |||
385 | if (op->state->bf_data_size == offset + chunk_size) | ||
386 | { | ||
387 | // done | ||
388 | GNUNET_free(op->state->bf_data); | ||
389 | op->state->bf_data = NULL; | ||
390 | return; | ||
391 | } | ||
392 | send_bloomfilter_multipart (op, offset + chunk_size); | ||
393 | } | ||
394 | |||
395 | |||
396 | /** | 359 | /** |
397 | * Send a bloomfilter to our peer. After the result done message has | 360 | * Send a bloomfilter to our peer. After the result done message has |
398 | * been sent to the client, destroy the evaluate operation. | 361 | * been sent to the client, destroy the evaluate operation. |
@@ -408,64 +371,83 @@ send_bloomfilter (struct Operation *op) | |||
408 | uint32_t bf_elementbits; | 371 | uint32_t bf_elementbits; |
409 | uint32_t chunk_size; | 372 | uint32_t chunk_size; |
410 | struct GNUNET_CONTAINER_BloomFilter *local_bf; | 373 | struct GNUNET_CONTAINER_BloomFilter *local_bf; |
411 | 374 | char *bf_data; | |
375 | uint32_t offset; | ||
376 | |||
377 | /* We consider the ratio of the set sizes to determine | ||
378 | the number of bits per element, as the smaller set | ||
379 | should use more bits to maximize its set reduction | ||
380 | potential and minimize overall bandwidth consumption. */ | ||
381 | bf_elementbits = 2 + ceil (log2((double) | ||
382 | (op->spec->remote_element_count / | ||
383 | (double) op->state->my_element_count))); | ||
384 | if (bf_elementbits < 1) | ||
385 | bf_elementbits = 1; /* make sure k is not 0 */ | ||
386 | /* optimize BF-size to ~50% of bits set */ | ||
387 | bf_size = ceil ((double) (op->state->my_element_count | ||
388 | * bf_elementbits / log(2))); | ||
412 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 389 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
413 | "sending bf of size %u\n"); | 390 | "Sending bf of size %u\n", |
414 | 391 | (unsigned int) bf_size); | |
415 | CALCULATE_BF_SIZE(op->state->my_element_count, | ||
416 | op->spec->remote_element_count, | ||
417 | bf_size, | ||
418 | bf_elementbits); | ||
419 | |||
420 | local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, | 392 | local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, |
421 | bf_size, | 393 | bf_size, |
422 | bf_elementbits); | 394 | bf_elementbits); |
423 | 395 | op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, | |
424 | op->spec->salt++; | 396 | UINT32_MAX); |
425 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, | 397 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, |
426 | &iterator_bf_create, | 398 | &iterator_bf_create, |
427 | op); | 399 | op); |
428 | 400 | ||
429 | // send our bloomfilter | 401 | /* send our Bloom filter */ |
430 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) | 402 | chunk_size = 60 * 1024 - sizeof (struct BFMessage); |
403 | if (bf_size <= chunk_size) | ||
431 | { | 404 | { |
432 | // singlepart | 405 | /* singlepart */ |
433 | chunk_size = bf_size; | 406 | chunk_size = bf_size; |
434 | ev = GNUNET_MQ_msg_extra (msg, | 407 | ev = GNUNET_MQ_msg_extra (msg, |
435 | chunk_size, | 408 | chunk_size, |
436 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | 409 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); |
437 | GNUNET_assert (GNUNET_SYSERR != | 410 | GNUNET_assert (GNUNET_SYSERR != |
438 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, | 411 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, |
439 | (char*)&msg[1], | 412 | (char*) &msg[1], |
440 | bf_size)); | 413 | bf_size)); |
414 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
415 | msg->bloomfilter_total_length = htonl (bf_size); | ||
416 | msg->bits_per_element = htonl (bf_elementbits); | ||
417 | msg->sender_mutator = htonl (op->state->salt); | ||
418 | msg->element_xor_hash = op->state->my_xor; | ||
419 | GNUNET_MQ_send (op->mq, ev); | ||
441 | } | 420 | } |
442 | else | 421 | else |
443 | { | 422 | { |
444 | //multipart | 423 | /* multipart */ |
445 | chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); | 424 | bf_data = GNUNET_malloc (bf_size); |
446 | ev = GNUNET_MQ_msg_extra (msg, | ||
447 | chunk_size, | ||
448 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | ||
449 | op->state->bf_data = (char *) GNUNET_malloc (bf_size); | ||
450 | GNUNET_assert (GNUNET_SYSERR != | 425 | GNUNET_assert (GNUNET_SYSERR != |
451 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, | 426 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, |
452 | op->state->bf_data, | 427 | bf_data, |
453 | bf_size)); | 428 | bf_size)); |
454 | memcpy (&msg[1], op->state->bf_data, chunk_size); | 429 | offset = 0; |
455 | op->state->bf_data_size = bf_size; | 430 | while (offset < bf_size) |
431 | { | ||
432 | if (bf_size - chunk_size < offset) | ||
433 | chunk_size = bf_size - offset; | ||
434 | ev = GNUNET_MQ_msg_extra (msg, | ||
435 | chunk_size, | ||
436 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | ||
437 | memcpy (&msg[1], | ||
438 | &bf_data[offset], | ||
439 | chunk_size); | ||
440 | offset += chunk_size; | ||
441 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
442 | msg->bloomfilter_total_length = htonl (bf_size); | ||
443 | msg->bits_per_element = htonl (bf_elementbits); | ||
444 | msg->sender_mutator = htonl (op->state->salt); | ||
445 | msg->element_xor_hash = op->state->my_xor; | ||
446 | GNUNET_MQ_send (op->mq, ev); | ||
447 | } | ||
448 | GNUNET_free (bf_data); | ||
456 | } | 449 | } |
457 | GNUNET_CONTAINER_bloomfilter_free (local_bf); | 450 | GNUNET_CONTAINER_bloomfilter_free (local_bf); |
458 | |||
459 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
460 | msg->bloomfilter_total_length = htonl (bf_size); | ||
461 | msg->bloomfilter_length = htonl (chunk_size); | ||
462 | msg->bits_per_element = htonl (bf_elementbits); | ||
463 | msg->sender_mutator = htonl (op->spec->salt); | ||
464 | |||
465 | GNUNET_MQ_send (op->mq, ev); | ||
466 | |||
467 | if (op->state->bf_data) | ||
468 | send_bloomfilter_multipart (op, chunk_size); | ||
469 | } | 451 | } |
470 | 452 | ||
471 | 453 | ||
@@ -553,6 +535,7 @@ static void | |||
553 | send_peer_done (struct Operation *op) | 535 | send_peer_done (struct Operation *op) |
554 | { | 536 | { |
555 | struct GNUNET_MQ_Envelope *ev; | 537 | struct GNUNET_MQ_Envelope *ev; |
538 | struct IntersectionDoneMessage *idm; | ||
556 | 539 | ||
557 | op->state->phase = PHASE_FINISHED; | 540 | op->state->phase = PHASE_FINISHED; |
558 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 541 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -560,8 +543,12 @@ send_peer_done (struct Operation *op) | |||
560 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | 543 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); |
561 | op->state->local_bf = NULL; | 544 | op->state->local_bf = NULL; |
562 | 545 | ||
563 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 546 | ev = GNUNET_MQ_msg (idm, |
564 | GNUNET_MQ_send (op->mq, ev); | 547 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); |
548 | idm->final_element_count = htonl (op->state->my_element_count); | ||
549 | idm->element_xor_hash = op->state->my_xor; | ||
550 | GNUNET_MQ_send (op->mq, | ||
551 | ev); | ||
565 | } | 552 | } |
566 | 553 | ||
567 | 554 | ||
@@ -573,107 +560,49 @@ send_peer_done (struct Operation *op) | |||
573 | static void | 560 | static void |
574 | process_bf (struct Operation *op) | 561 | process_bf (struct Operation *op) |
575 | { | 562 | { |
576 | uint32_t old_elements; | ||
577 | uint32_t peer_elements; | ||
578 | |||
579 | old_elements = op->state->my_element_count; | ||
580 | peer_elements = op->spec->remote_element_count; | ||
581 | switch (op->state->phase) | 563 | switch (op->state->phase) |
582 | { | 564 | { |
583 | case PHASE_INITIAL: | 565 | case PHASE_INITIAL: |
584 | /* This is the first BF being sent, build our | 566 | /* This is the first BF being sent, build our initial map with |
585 | initial map with filtering in place */ | 567 | filtering in place */ |
586 | op->state->my_elements | 568 | op->state->my_elements |
587 | = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, | 569 | = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, |
588 | GNUNET_YES); | 570 | GNUNET_YES); |
571 | GNUNET_break (0 == op->state->my_element_count); | ||
589 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | 572 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, |
590 | &filtered_map_and_bf_initialization, | 573 | &filtered_map_initialization, |
591 | op); | 574 | op); |
592 | break; | 575 | break; |
593 | case PHASE_BF_EXCHANGE: | 576 | case PHASE_BF_EXCHANGE: |
594 | case PHASE_MAYBE_FINISHED: | ||
595 | /* Update our set by reduction */ | 577 | /* Update our set by reduction */ |
596 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, | 578 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, |
597 | &iterator_bf_reduce, | 579 | &iterator_bf_reduce, |
598 | op); | 580 | op); |
599 | break; | 581 | break; |
600 | default: | 582 | case PHASE_FINISHED: |
601 | GNUNET_break_op (0); | 583 | GNUNET_break_op (0); |
602 | fail_intersection_operation(op); | 584 | fail_intersection_operation(op); |
585 | return; | ||
603 | } | 586 | } |
604 | // the iterators created a new BF with salt+1 | ||
605 | // the peer needs this information for decoding the next BF | ||
606 | // this behavior can be modified at will later on. | ||
607 | op->spec->salt++; | ||
608 | |||
609 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | 587 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); |
610 | op->state->remote_bf = NULL; | 588 | op->state->remote_bf = NULL; |
611 | 589 | ||
612 | if ((0 == op->state->my_element_count) // fully disjoint | 590 | if ( (0 == op->state->my_element_count) || /* fully disjoint */ |
613 | || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements | 591 | ( (op->state->my_element_count == op->spec->remote_element_count) && |
614 | && (old_elements == op->state->my_element_count) | 592 | (0 == memcmp (&op->state->my_xor, |
615 | && (op->state->my_element_count == peer_elements))) | 593 | &op->state->other_xor, |
594 | sizeof (struct GNUNET_HashCode))) ) ) | ||
616 | { | 595 | { |
617 | // In the last round we though we were finished, we now know this is correct | 596 | /* we are done */ |
618 | send_peer_done (op); | 597 | send_peer_done (op); |
619 | return; | 598 | return; |
620 | } | 599 | } |
621 | |||
622 | op->state->phase = PHASE_BF_EXCHANGE; | 600 | op->state->phase = PHASE_BF_EXCHANGE; |
623 | if (op->state->my_element_count == peer_elements) | ||
624 | // maybe we are finished, but we do one more round to make certain | ||
625 | // we don't have false positives ... | ||
626 | op->state->phase = PHASE_MAYBE_FINISHED; | ||
627 | |||
628 | send_bloomfilter (op); | 601 | send_bloomfilter (op); |
629 | } | 602 | } |
630 | 603 | ||
631 | 604 | ||
632 | /** | 605 | /** |
633 | * Handle an BF multipart message from a remote peer. | ||
634 | * | ||
635 | * @param cls the intersection operation | ||
636 | * @param mh the header of the message | ||
637 | */ | ||
638 | static void | ||
639 | handle_p2p_bf_part (void *cls, | ||
640 | const struct GNUNET_MessageHeader *mh) | ||
641 | { | ||
642 | struct Operation *op = cls; | ||
643 | const struct BFPart *msg = (const struct BFPart *) mh; | ||
644 | uint32_t chunk_size; | ||
645 | uint32_t chunk_offset; | ||
646 | |||
647 | chunk_size = ntohl(msg->chunk_length); | ||
648 | chunk_offset = ntohl(msg->chunk_offset); | ||
649 | |||
650 | if ((NULL == op->state->bf_data) | ||
651 | || (op->state->bf_data_size < chunk_size + chunk_offset)) | ||
652 | { | ||
653 | // unexpected multipart chunk | ||
654 | GNUNET_break_op (0); | ||
655 | fail_intersection_operation(op); | ||
656 | return; | ||
657 | } | ||
658 | |||
659 | memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size); | ||
660 | |||
661 | if (op->state->bf_data_size != chunk_offset + chunk_size) | ||
662 | // wait for next chunk | ||
663 | return; | ||
664 | |||
665 | op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | ||
666 | op->state->bf_data_size, | ||
667 | op->state->bf_bits_per_element); | ||
668 | |||
669 | GNUNET_free (op->state->bf_data); | ||
670 | op->state->bf_data = NULL; | ||
671 | |||
672 | process_bf (op); | ||
673 | } | ||
674 | |||
675 | |||
676 | /** | ||
677 | * Handle an BF message from a remote peer. | 606 | * Handle an BF message from a remote peer. |
678 | * | 607 | * |
679 | * @param cls the intersection operation | 608 | * @param cls the intersection operation |
@@ -684,44 +613,94 @@ handle_p2p_bf (void *cls, | |||
684 | const struct GNUNET_MessageHeader *mh) | 613 | const struct GNUNET_MessageHeader *mh) |
685 | { | 614 | { |
686 | struct Operation *op = cls; | 615 | struct Operation *op = cls; |
687 | const struct BFMessage *msg = (const struct BFMessage *) mh; | 616 | const struct BFMessage *msg; |
688 | uint32_t bf_size; | 617 | uint32_t bf_size; |
689 | uint32_t chunk_size; | 618 | uint32_t chunk_size; |
690 | uint32_t bf_bits_per_element; | 619 | uint32_t bf_bits_per_element; |
620 | uint16_t msize; | ||
691 | 621 | ||
622 | msize = htons (mh->size); | ||
623 | if (msize < sizeof (struct BFMessage)) | ||
624 | { | ||
625 | GNUNET_break_op (0); | ||
626 | fail_intersection_operation (op); | ||
627 | return; | ||
628 | } | ||
629 | msg = (const struct BFMessage *) mh; | ||
692 | switch (op->state->phase) | 630 | switch (op->state->phase) |
693 | { | 631 | { |
694 | case PHASE_INITIAL: | 632 | case PHASE_INITIAL: |
633 | GNUNET_break_op (0); | ||
634 | fail_intersection_operation (op); | ||
635 | break; | ||
695 | case PHASE_BF_EXCHANGE: | 636 | case PHASE_BF_EXCHANGE: |
696 | case PHASE_MAYBE_FINISHED: | 637 | bf_size = ntohl (msg->bloomfilter_total_length); |
697 | if (NULL == op->state->bf_data) | 638 | bf_bits_per_element = ntohl (msg->bits_per_element); |
639 | chunk_size = msize - sizeof (struct BFMessage); | ||
640 | op->state->other_xor = msg->element_xor_hash; | ||
641 | if (bf_size == chunk_size) | ||
698 | { | 642 | { |
699 | // no colliding multipart transaction going on currently | 643 | if (NULL != op->state->bf_data) |
700 | op->spec->salt = ntohl (msg->sender_mutator); | ||
701 | bf_size = ntohl (msg->bloomfilter_total_length); | ||
702 | bf_bits_per_element = ntohl (msg->bits_per_element); | ||
703 | chunk_size = ntohl (msg->bloomfilter_length); | ||
704 | op->spec->remote_element_count = ntohl(msg->sender_element_count); | ||
705 | if (bf_size == chunk_size) | ||
706 | { | 644 | { |
707 | // single part, done here | 645 | GNUNET_break_op (0); |
708 | op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | 646 | fail_intersection_operation (op); |
709 | bf_size, | ||
710 | bf_bits_per_element); | ||
711 | process_bf (op); | ||
712 | return; | 647 | return; |
713 | } | 648 | } |
714 | 649 | /* single part, done here immediately */ | |
715 | //first multipart chunk | 650 | op->state->remote_bf |
651 | = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | ||
652 | bf_size, | ||
653 | bf_bits_per_element); | ||
654 | op->state->salt = ntohl (msg->sender_mutator); | ||
655 | process_bf (op); | ||
656 | return; | ||
657 | } | ||
658 | /* multipart chunk */ | ||
659 | if (NULL == op->state->bf_data) | ||
660 | { | ||
661 | /* first chunk, initialize */ | ||
716 | op->state->bf_data = GNUNET_malloc (bf_size); | 662 | op->state->bf_data = GNUNET_malloc (bf_size); |
717 | op->state->bf_data_size = bf_size; | 663 | op->state->bf_data_size = bf_size; |
718 | op->state->bf_bits_per_element = bf_bits_per_element; | 664 | op->state->bf_bits_per_element = bf_bits_per_element; |
719 | memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size); | 665 | op->state->bf_data_offset = 0; |
720 | return; | 666 | op->state->salt = ntohl (msg->sender_mutator); |
667 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | ||
668 | } | ||
669 | else | ||
670 | { | ||
671 | /* increment */ | ||
672 | if ( (op->state->bf_data_size != bf_size) || | ||
673 | (op->state->bf_bits_per_element != bf_bits_per_element) || | ||
674 | (op->state->bf_data_offset + chunk_size > bf_size) || | ||
675 | (op->state->salt != ntohl (msg->sender_mutator)) || | ||
676 | (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) | ||
677 | { | ||
678 | GNUNET_break_op (0); | ||
679 | fail_intersection_operation (op); | ||
680 | return; | ||
681 | } | ||
682 | } | ||
683 | memcpy (&op->state->bf_data[op->state->bf_data_offset], | ||
684 | (const char*) &msg[1], | ||
685 | chunk_size); | ||
686 | op->state->bf_data_offset += chunk_size; | ||
687 | if (op->state->bf_data_offset == bf_size) | ||
688 | { | ||
689 | /* last chunk, run! */ | ||
690 | op->state->remote_bf | ||
691 | = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, | ||
692 | bf_size, | ||
693 | bf_bits_per_element); | ||
694 | GNUNET_free (op->state->bf_data); | ||
695 | op->state->bf_data = NULL; | ||
696 | op->state->bf_data_size = 0; | ||
697 | process_bf (op); | ||
721 | } | 698 | } |
699 | break; | ||
722 | default: | 700 | default: |
723 | GNUNET_break_op (0); | 701 | GNUNET_break_op (0); |
724 | fail_intersection_operation (op); | 702 | fail_intersection_operation (op); |
703 | break; | ||
725 | } | 704 | } |
726 | } | 705 | } |
727 | 706 | ||
@@ -736,9 +715,9 @@ handle_p2p_bf (void *cls, | |||
736 | * @return #GNUNET_YES (we should continue to iterate) | 715 | * @return #GNUNET_YES (we should continue to iterate) |
737 | */ | 716 | */ |
738 | static int | 717 | static int |
739 | initialize_map (void *cls, | 718 | initialize_map_unfiltered (void *cls, |
740 | const struct GNUNET_HashCode *key, | 719 | const struct GNUNET_HashCode *key, |
741 | void *value) | 720 | void *value) |
742 | { | 721 | { |
743 | struct ElementEntry *ee = value; | 722 | struct ElementEntry *ee = value; |
744 | struct Operation *op = cls; | 723 | struct Operation *op = cls; |
@@ -746,6 +725,9 @@ initialize_map (void *cls, | |||
746 | if ( (op->generation_created < ee->generation_removed) && | 725 | if ( (op->generation_created < ee->generation_removed) && |
747 | (op->generation_created >= ee->generation_added) ) | 726 | (op->generation_created >= ee->generation_added) ) |
748 | return GNUNET_YES; /* element not live in operation's generation */ | 727 | return GNUNET_YES; /* element not live in operation's generation */ |
728 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
729 | &ee->element_hash, | ||
730 | &op->state->my_xor); | ||
749 | GNUNET_break (GNUNET_YES == | 731 | GNUNET_break (GNUNET_YES == |
750 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | 732 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, |
751 | &ee->element_hash, | 733 | &ee->element_hash, |
@@ -756,6 +738,48 @@ initialize_map (void *cls, | |||
756 | 738 | ||
757 | 739 | ||
758 | /** | 740 | /** |
741 | * Send our element count to the peer, in case our element count is | ||
742 | * lower than his. | ||
743 | * | ||
744 | * @param op intersection operation | ||
745 | */ | ||
746 | static void | ||
747 | send_element_count (struct Operation *op) | ||
748 | { | ||
749 | struct GNUNET_MQ_Envelope *ev; | ||
750 | struct IntersectionElementInfoMessage *msg; | ||
751 | |||
752 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
753 | "Sending our element count (bf_msg)\n"); | ||
754 | ev = GNUNET_MQ_msg (msg, | ||
755 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
756 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
757 | GNUNET_MQ_send (op->mq, ev); | ||
758 | } | ||
759 | |||
760 | |||
761 | /** | ||
762 | * We go first, initialize our map with all elements and | ||
763 | * send the first Bloom filter. | ||
764 | * | ||
765 | * @param op operation to start exchange for | ||
766 | */ | ||
767 | static void | ||
768 | begin_bf_exchange (struct Operation *op) | ||
769 | { | ||
770 | GNUNET_break (PHASE_INITIAL == op->state->phase); | ||
771 | op->state->phase = PHASE_BF_EXCHANGE; | ||
772 | op->state->my_elements | ||
773 | = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, | ||
774 | GNUNET_YES); | ||
775 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
776 | &initialize_map_unfiltered, | ||
777 | op); | ||
778 | send_bloomfilter (op); | ||
779 | } | ||
780 | |||
781 | |||
782 | /** | ||
759 | * Handle the initial `struct IntersectionElementInfoMessage` from a | 783 | * Handle the initial `struct IntersectionElementInfoMessage` from a |
760 | * remote peer. | 784 | * remote peer. |
761 | * | 785 | * |
@@ -786,41 +810,8 @@ handle_p2p_element_info (void *cls, | |||
786 | fail_intersection_operation(op); | 810 | fail_intersection_operation(op); |
787 | return; | 811 | return; |
788 | } | 812 | } |
789 | 813 | GNUNET_break (NULL == op->state->remote_bf); | |
790 | op->state->phase = PHASE_BF_EXCHANGE; | 814 | begin_bf_exchange (op); |
791 | // FIXME... -- why a new map here!? | ||
792 | op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, | ||
793 | GNUNET_YES); | ||
794 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
795 | &initialize_map, // FIXME: filtering!? | ||
796 | op); | ||
797 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | ||
798 | op->state->remote_bf = NULL; | ||
799 | |||
800 | if (op->state->my_element_count == ntohl (msg->sender_element_count)) | ||
801 | op->state->phase = PHASE_MAYBE_FINISHED; | ||
802 | |||
803 | send_bloomfilter (op); | ||
804 | } | ||
805 | |||
806 | |||
807 | /** | ||
808 | * Send our element count to the peer, in case our element count is lower than his | ||
809 | * | ||
810 | * @param op intersection operation | ||
811 | */ | ||
812 | static void | ||
813 | send_element_count (struct Operation *op) | ||
814 | { | ||
815 | struct GNUNET_MQ_Envelope *ev; | ||
816 | struct IntersectionElementInfoMessage *msg; | ||
817 | |||
818 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
819 | "Sending our element count (bf_msg)\n"); | ||
820 | ev = GNUNET_MQ_msg (msg, | ||
821 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
822 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
823 | GNUNET_MQ_send (op->mq, ev); | ||
824 | } | 815 | } |
825 | 816 | ||
826 | 817 | ||
@@ -850,6 +841,37 @@ finish_and_destroy (struct Operation *op) | |||
850 | 841 | ||
851 | 842 | ||
852 | /** | 843 | /** |
844 | * Remove all elements from our hashmap. | ||
845 | * | ||
846 | * @param cls closure with the `struct Operation *` | ||
847 | * @param key current key code | ||
848 | * @param value value in the hash map | ||
849 | * @return #GNUNET_YES (we should continue to iterate) | ||
850 | */ | ||
851 | static int | ||
852 | filter_all (void *cls, | ||
853 | const struct GNUNET_HashCode *key, | ||
854 | void *value) | ||
855 | { | ||
856 | struct Operation *op = cls; | ||
857 | struct ElementEntry *ee = value; | ||
858 | |||
859 | GNUNET_break (0 < op->state->my_element_count); | ||
860 | op->state->my_element_count--; | ||
861 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
862 | &ee->element_hash, | ||
863 | &op->state->my_xor); | ||
864 | GNUNET_assert (GNUNET_YES == | ||
865 | GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, | ||
866 | &ee->element_hash, | ||
867 | ee)); | ||
868 | send_client_removed_element (op, | ||
869 | &ee->element); | ||
870 | return GNUNET_YES; | ||
871 | } | ||
872 | |||
873 | |||
874 | /** | ||
853 | * Handle a done message from a remote peer | 875 | * Handle a done message from a remote peer |
854 | * | 876 | * |
855 | * @param cls the intersection operation | 877 | * @param cls the intersection operation |
@@ -860,17 +882,45 @@ handle_p2p_done (void *cls, | |||
860 | const struct GNUNET_MessageHeader *mh) | 882 | const struct GNUNET_MessageHeader *mh) |
861 | { | 883 | { |
862 | struct Operation *op = cls; | 884 | struct Operation *op = cls; |
885 | const struct IntersectionDoneMessage *idm; | ||
863 | 886 | ||
864 | if ( (op->state->phase = PHASE_FINISHED) || | 887 | if (PHASE_BF_EXCHANGE != op->state->phase) |
865 | (op->state->phase = PHASE_MAYBE_FINISHED) ) | ||
866 | { | 888 | { |
867 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 889 | /* wrong phase to conclude? FIXME: Or should we allow this |
868 | "Got final DONE\n"); | 890 | if the other peer has _initially_ already an empty set? */ |
869 | finish_and_destroy (op); | 891 | GNUNET_break_op (0); |
892 | fail_intersection_operation (op); | ||
893 | return; | ||
894 | } | ||
895 | if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) | ||
896 | { | ||
897 | GNUNET_break_op (0); | ||
898 | fail_intersection_operation (op); | ||
870 | return; | 899 | return; |
871 | } | 900 | } |
872 | GNUNET_break_op (0); | 901 | idm = (const struct IntersectionDoneMessage *) mh; |
873 | fail_intersection_operation (op); | 902 | if (0 == ntohl (idm->final_element_count)) |
903 | { | ||
904 | /* other peer determined empty set is the intersection, | ||
905 | remove all elements */ | ||
906 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
907 | &filter_all, | ||
908 | op); | ||
909 | } | ||
910 | if ( (op->state->my_element_count != ntohl (idm->final_element_count)) || | ||
911 | (0 != memcmp (&op->state->my_xor, | ||
912 | &idm->element_xor_hash, | ||
913 | sizeof (struct GNUNET_HashCode))) ) | ||
914 | { | ||
915 | /* Other peer thinks we are done, but we disagree on the result! */ | ||
916 | GNUNET_break_op (0); | ||
917 | fail_intersection_operation (op); | ||
918 | return; | ||
919 | } | ||
920 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
921 | "Got final DONE\n"); | ||
922 | op->state->phase = PHASE_FINISHED; | ||
923 | finish_and_destroy (op); | ||
874 | } | 924 | } |
875 | 925 | ||
876 | 926 | ||
@@ -892,7 +942,6 @@ intersection_evaluate (struct Operation *op, | |||
892 | op->state = GNUNET_new (struct OperationState); | 942 | op->state = GNUNET_new (struct OperationState); |
893 | /* we started the operation, thus we have to send the operation request */ | 943 | /* we started the operation, thus we have to send the operation request */ |
894 | op->state->phase = PHASE_INITIAL; | 944 | op->state->phase = PHASE_INITIAL; |
895 | op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); | ||
896 | op->state->my_element_count = op->spec->set->state->current_set_element_count; | 945 | op->state->my_element_count = op->spec->set->state->current_set_element_count; |
897 | 946 | ||
898 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 947 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -909,8 +958,6 @@ intersection_evaluate (struct Operation *op, | |||
909 | } | 958 | } |
910 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); | 959 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); |
911 | msg->app_id = op->spec->app_id; | 960 | msg->app_id = op->spec->app_id; |
912 | // FIXME: where does this 'salt' come from? | ||
913 | msg->salt = htonl (op->spec->salt); | ||
914 | msg->element_count = htonl (op->state->my_element_count); | 961 | msg->element_count = htonl (op->state->my_element_count); |
915 | GNUNET_MQ_send (op->mq, | 962 | GNUNET_MQ_send (op->mq, |
916 | ev); | 963 | ev); |
@@ -935,29 +982,23 @@ intersection_accept (struct Operation *op) | |||
935 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 982 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
936 | "Accepting set intersection operation\n"); | 983 | "Accepting set intersection operation\n"); |
937 | op->state = GNUNET_new (struct OperationState); | 984 | op->state = GNUNET_new (struct OperationState); |
985 | op->state->phase = PHASE_INITIAL; | ||
938 | op->state->my_element_count | 986 | op->state->my_element_count |
939 | = op->spec->set->state->current_set_element_count; | 987 | = op->spec->set->state->current_set_element_count; |
940 | op->state->my_elements | 988 | op->state->my_elements |
941 | = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, | 989 | = GNUNET_CONTAINER_multihashmap_create |
942 | op->spec->remote_element_count), | 990 | (GNUNET_MIN (op->state->my_element_count, |
943 | GNUNET_YES); | 991 | op->spec->remote_element_count), |
992 | GNUNET_YES); | ||
944 | if (op->spec->remote_element_count < op->state->my_element_count) | 993 | if (op->spec->remote_element_count < op->state->my_element_count) |
945 | { | 994 | { |
946 | /* If the other peer (Alice) has fewer elements than us (Bob), | 995 | /* If the other peer (Alice) has fewer elements than us (Bob), |
947 | we just send the count as Alice should send the first BF */ | 996 | we just send the count as Alice should send the first BF */ |
948 | op->state->phase = PHASE_INITIAL; | ||
949 | send_element_count (op); | 997 | send_element_count (op); |
950 | return; | 998 | return; |
951 | } | 999 | } |
952 | /* We have fewer elements, so we start with the BF */ | 1000 | /* We have fewer elements, so we start with the BF */ |
953 | op->state->phase = PHASE_BF_EXCHANGE; | 1001 | begin_bf_exchange (op); |
954 | op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, | ||
955 | BLOOMFILTER_SIZE, | ||
956 | GNUNET_CONSTANTS_BLOOMFILTER_K); | ||
957 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
958 | &initialize_map, | ||
959 | op); | ||
960 | send_bloomfilter (op); | ||
961 | } | 1002 | } |
962 | 1003 | ||
963 | 1004 | ||
@@ -987,10 +1028,7 @@ intersection_handle_p2p_message (struct Operation *op, | |||
987 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: | 1028 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: |
988 | handle_p2p_bf (op, mh); | 1029 | handle_p2p_bf (op, mh); |
989 | break; | 1030 | break; |
990 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART: | 1031 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE: |
991 | handle_p2p_bf_part (op, mh); | ||
992 | break; | ||
993 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | ||
994 | handle_p2p_done (op, mh); | 1032 | handle_p2p_done (op, mh); |
995 | break; | 1033 | break; |
996 | default: | 1034 | default: |