diff options
author | Christian Grothoff <christian@grothoff.org> | 2014-11-28 20:52:20 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2014-11-28 20:52:20 +0000 |
commit | f4c9c6514494547973b8962f22fce8266afd4992 (patch) | |
tree | 2211f91a0e80f143cb066ca365c6ddcbb02759cf /src/set | |
parent | 264c9b7f668d0429eaae01075c742ae6ad6f49ee (diff) | |
download | gnunet-f4c9c6514494547973b8962f22fce8266afd4992.tar.gz gnunet-f4c9c6514494547973b8962f22fce8266afd4992.zip |
-fixing misc issues and bugs, including better termination logic for intersection and salt handling
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/gnunet-service-set.c | 4 | ||||
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 588 | ||||
-rw-r--r-- | src/set/gnunet-service-set_protocol.h | 46 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 48 |
4 files changed, 361 insertions, 325 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index d5c02d69b..8dc111a4c 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -1498,12 +1498,12 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
1498 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, | 1498 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, |
1499 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, | 1499 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, |
1500 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, | 1500 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, |
1501 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, | 1501 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, |
1502 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, | 1502 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, |
1503 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, | 1503 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, |
1504 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, | 1504 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, |
1505 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, | 1505 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, |
1506 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART, 0}, | 1506 | { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0}, |
1507 | {NULL, 0, 0} | 1507 | {NULL, 0, 0} |
1508 | }; | 1508 | }; |
1509 | static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; | 1509 | static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; |
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 19d6498f5..fb3cd23b5 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -21,6 +21,7 @@ | |||
21 | * @file set/gnunet-service-set_intersection.c | 21 | * @file set/gnunet-service-set_intersection.c |
22 | * @brief two-peer set intersection | 22 | * @brief two-peer set intersection |
23 | * @author Christian Fuchs | 23 | * @author Christian Fuchs |
24 | * @author Christian Grothoff | ||
24 | */ | 25 | */ |
25 | #include "platform.h" | 26 | #include "platform.h" |
26 | #include "gnunet_util_lib.h" | 27 | #include "gnunet_util_lib.h" |
@@ -29,24 +30,6 @@ | |||
29 | #include "gnunet-service-set_protocol.h" | 30 | #include "gnunet-service-set_protocol.h" |
30 | #include <gcrypt.h> | 31 | #include <gcrypt.h> |
31 | 32 | ||
32 | #define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH | ||
33 | |||
34 | /** | ||
35 | * Calculate the size of the bloom filter. | ||
36 | * | ||
37 | * @param A | ||
38 | * @param B | ||
39 | * @param s | ||
40 | * @param k | ||
41 | * @return | ||
42 | */ | ||
43 | #define CALCULATE_BF_SIZE(A, B, s, k) \ | ||
44 | do { \ | ||
45 | k = ceil(1 + log2((double) (2*B / (double) A)));\ | ||
46 | if (k<1) k=1; /* k can be calculated as 0 */\ | ||
47 | s = ceil((double) (A * k / log(2))); \ | ||
48 | } while (0) | ||
49 | |||
50 | 33 | ||
51 | /** | 34 | /** |
52 | * Current phase we are in for a intersection operation. | 35 | * Current phase we are in for a intersection operation. |
@@ -61,21 +44,13 @@ enum IntersectionOperationPhase | |||
61 | 44 | ||
62 | /** | 45 | /** |
63 | * Bob has accepted the operation, Bob and Alice are now exchanging bfs | 46 | * Bob has accepted the operation, Bob and Alice are now exchanging bfs |
64 | * until one notices the their element count is equal | 47 | * until one notices the their element hashes are equal. |
65 | */ | 48 | */ |
66 | PHASE_BF_EXCHANGE, | 49 | PHASE_BF_EXCHANGE, |
67 | 50 | ||
68 | /** | 51 | /** |
69 | * if both peers have an equal peercount, they enter this state for | 52 | * The protocol is over. Results may still have to be sent to the |
70 | * one more turn, to see if they actually have agreed on a correct set. | 53 | * client. |
71 | * if a peer finds the same element count after the next iteration, | ||
72 | * it ends the the session | ||
73 | */ | ||
74 | PHASE_MAYBE_FINISHED, | ||
75 | |||
76 | /** | ||
77 | * The protocol is over. | ||
78 | * Results may still have to be sent to the client. | ||
79 | */ | 54 | */ |
80 | PHASE_FINISHED | 55 | PHASE_FINISHED |
81 | }; | 56 | }; |
@@ -118,12 +93,33 @@ struct OperationState | |||
118 | struct OperationState *prev; | 93 | struct OperationState *prev; |
119 | 94 | ||
120 | /** | 95 | /** |
121 | * for multipart msgs we have to store the bloomfilter-data until we fully sent it. | 96 | * For multipart BF transmissions, we have to store the |
97 | * bloomfilter-data until we fully received it. | ||
122 | */ | 98 | */ |
123 | char *bf_data; | 99 | char *bf_data; |
124 | 100 | ||
125 | /** | 101 | /** |
126 | * Current element count contained within @e my_elements | 102 | * XOR of the keys of all of the elements (remaining) in my set. |
103 | * Always updated when elements are added or removed to | ||
104 | * @e my_elements. | ||
105 | */ | ||
106 | struct GNUNET_HashCode my_xor; | ||
107 | |||
108 | /** | ||
109 | * XOR of the keys of all of the elements (remaining) in | ||
110 | * the other peer's set. Updated when we receive the | ||
111 | * other peer's Bloom filter. | ||
112 | */ | ||
113 | struct GNUNET_HashCode other_xor; | ||
114 | |||
115 | /** | ||
116 | * How many bytes of @e bf_data are valid? | ||
117 | */ | ||
118 | uint32_t bf_data_offset; | ||
119 | |||
120 | /** | ||
121 | * Current element count contained within @e my_elements. | ||
122 | * (May differ briefly during initialization.) | ||
127 | */ | 123 | */ |
128 | uint32_t my_element_count; | 124 | uint32_t my_element_count; |
129 | 125 | ||
@@ -138,6 +134,12 @@ struct OperationState | |||
138 | uint32_t bf_bits_per_element; | 134 | uint32_t bf_bits_per_element; |
139 | 135 | ||
140 | /** | 136 | /** |
137 | * Salt currently used for BF construction (by us or the other peer, | ||
138 | * depending on where we are in the code). | ||
139 | */ | ||
140 | uint32_t salt; | ||
141 | |||
142 | /** | ||
141 | * Current state of the operation. | 143 | * Current state of the operation. |
142 | */ | 144 | */ |
143 | enum IntersectionOperationPhase phase; | 145 | enum IntersectionOperationPhase phase; |
@@ -162,7 +164,8 @@ struct OperationState | |||
162 | struct SetState | 164 | struct SetState |
163 | { | 165 | { |
164 | /** | 166 | /** |
165 | * Number of currently valid elements in the set which have not been removed | 167 | * Number of currently valid elements in the set which have not been |
168 | * removed. | ||
166 | */ | 169 | */ |
167 | uint32_t current_set_element_count; | 170 | uint32_t current_set_element_count; |
168 | }; | 171 | }; |
@@ -208,8 +211,7 @@ send_client_removed_element (struct Operation *op, | |||
208 | 211 | ||
209 | 212 | ||
210 | /** | 213 | /** |
211 | * Fills the "my_elements" hashmap with all relevant elements and | 214 | * Fills the "my_elements" hashmap with all relevant elements. |
212 | * adds their mutated hashes to our local bloomfilter with mutator+1. | ||
213 | * | 215 | * |
214 | * @param cls the `struct Operation *` we are performing | 216 | * @param cls the `struct Operation *` we are performing |
215 | * @param key current key code | 217 | * @param key current key code |
@@ -217,9 +219,9 @@ send_client_removed_element (struct Operation *op, | |||
217 | * @return #GNUNET_YES (we should continue to iterate) | 219 | * @return #GNUNET_YES (we should continue to iterate) |
218 | */ | 220 | */ |
219 | static int | 221 | static int |
220 | filtered_map_and_bf_initialization (void *cls, | 222 | filtered_map_initialization (void *cls, |
221 | const struct GNUNET_HashCode *key, | 223 | const struct GNUNET_HashCode *key, |
222 | void *value) | 224 | void *value) |
223 | { | 225 | { |
224 | struct Operation *op = cls; | 226 | struct Operation *op = cls; |
225 | struct ElementEntry *ee = value; | 227 | struct ElementEntry *ee = value; |
@@ -230,9 +232,8 @@ filtered_map_and_bf_initialization (void *cls, | |||
230 | return GNUNET_YES; /* element not valid in our operation's generation */ | 232 | return GNUNET_YES; /* element not valid in our operation's generation */ |
231 | 233 | ||
232 | /* Test if element is in Bob's bloomfilter */ | 234 | /* Test if element is in Bob's bloomfilter */ |
233 | // FIXME: where does this salt come from!? | ||
234 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, | 235 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, |
235 | op->spec->salt, | 236 | op->state->salt, |
236 | &mutated_hash); | 237 | &mutated_hash); |
237 | if (GNUNET_NO == | 238 | if (GNUNET_NO == |
238 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, | 239 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, |
@@ -244,6 +245,9 @@ filtered_map_and_bf_initialization (void *cls, | |||
244 | return GNUNET_YES; | 245 | return GNUNET_YES; |
245 | } | 246 | } |
246 | op->state->my_element_count++; | 247 | op->state->my_element_count++; |
248 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
249 | &ee->element_hash, | ||
250 | &op->state->my_xor); | ||
247 | GNUNET_break (GNUNET_YES == | 251 | GNUNET_break (GNUNET_YES == |
248 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | 252 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, |
249 | &ee->element_hash, | 253 | &ee->element_hash, |
@@ -272,15 +276,18 @@ iterator_bf_reduce (void *cls, | |||
272 | struct ElementEntry *ee = value; | 276 | struct ElementEntry *ee = value; |
273 | struct GNUNET_HashCode mutated_hash; | 277 | struct GNUNET_HashCode mutated_hash; |
274 | 278 | ||
275 | // FIXME: where does this salt come from!? | ||
276 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, | 279 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, |
277 | op->spec->salt, | 280 | op->state->salt, |
278 | &mutated_hash); | 281 | &mutated_hash); |
279 | if (GNUNET_NO == | 282 | if (GNUNET_NO == |
280 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, | 283 | GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, |
281 | &mutated_hash)) | 284 | &mutated_hash)) |
282 | { | 285 | { |
286 | GNUNET_break (0 < op->state->my_element_count); | ||
283 | op->state->my_element_count--; | 287 | op->state->my_element_count--; |
288 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
289 | &ee->element_hash, | ||
290 | &op->state->my_xor); | ||
284 | GNUNET_assert (GNUNET_YES == | 291 | GNUNET_assert (GNUNET_YES == |
285 | GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, | 292 | GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, |
286 | &ee->element_hash, | 293 | &ee->element_hash, |
@@ -309,9 +316,8 @@ iterator_bf_create (void *cls, | |||
309 | struct ElementEntry *ee = value; | 316 | struct ElementEntry *ee = value; |
310 | struct GNUNET_HashCode mutated_hash; | 317 | struct GNUNET_HashCode mutated_hash; |
311 | 318 | ||
312 | // FIXME: where does this salt come from!? | ||
313 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, | 319 | GNUNET_BLOCK_mingle_hash (&ee->element_hash, |
314 | op->spec->salt, | 320 | op->state->salt, |
315 | &mutated_hash); | 321 | &mutated_hash); |
316 | GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, | 322 | GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, |
317 | &mutated_hash); | 323 | &mutated_hash); |
@@ -350,49 +356,6 @@ fail_intersection_operation (struct Operation *op) | |||
350 | } | 356 | } |
351 | 357 | ||
352 | 358 | ||
353 | |||
354 | |||
355 | |||
356 | |||
357 | |||
358 | /** | ||
359 | * | ||
360 | * @param op | ||
361 | * @param offset | ||
362 | */ | ||
363 | static void | ||
364 | send_bloomfilter_multipart (struct Operation *op, | ||
365 | uint32_t offset) | ||
366 | { | ||
367 | struct GNUNET_MQ_Envelope *ev; | ||
368 | struct BFPart *msg; | ||
369 | uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); | ||
370 | uint32_t todo_size = op->state->bf_data_size - offset; | ||
371 | |||
372 | if (todo_size < chunk_size) | ||
373 | chunk_size = todo_size; | ||
374 | |||
375 | ev = GNUNET_MQ_msg_extra (msg, | ||
376 | chunk_size, | ||
377 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); | ||
378 | |||
379 | msg->chunk_length = htonl (chunk_size); | ||
380 | msg->chunk_offset = htonl (offset); | ||
381 | memcpy(&msg[1], &op->state->bf_data[offset], chunk_size); | ||
382 | |||
383 | GNUNET_MQ_send (op->mq, ev); | ||
384 | |||
385 | if (op->state->bf_data_size == offset + chunk_size) | ||
386 | { | ||
387 | // done | ||
388 | GNUNET_free(op->state->bf_data); | ||
389 | op->state->bf_data = NULL; | ||
390 | return; | ||
391 | } | ||
392 | send_bloomfilter_multipart (op, offset + chunk_size); | ||
393 | } | ||
394 | |||
395 | |||
396 | /** | 359 | /** |
397 | * Send a bloomfilter to our peer. After the result done message has | 360 | * Send a bloomfilter to our peer. After the result done message has |
398 | * been sent to the client, destroy the evaluate operation. | 361 | * been sent to the client, destroy the evaluate operation. |
@@ -408,64 +371,83 @@ send_bloomfilter (struct Operation *op) | |||
408 | uint32_t bf_elementbits; | 371 | uint32_t bf_elementbits; |
409 | uint32_t chunk_size; | 372 | uint32_t chunk_size; |
410 | struct GNUNET_CONTAINER_BloomFilter *local_bf; | 373 | struct GNUNET_CONTAINER_BloomFilter *local_bf; |
411 | 374 | char *bf_data; | |
375 | uint32_t offset; | ||
376 | |||
377 | /* We consider the ratio of the set sizes to determine | ||
378 | the number of bits per element, as the smaller set | ||
379 | should use more bits to maximize its set reduction | ||
380 | potential and minimize overall bandwidth consumption. */ | ||
381 | bf_elementbits = 2 + ceil (log2((double) | ||
382 | (op->spec->remote_element_count / | ||
383 | (double) op->state->my_element_count))); | ||
384 | if (bf_elementbits < 1) | ||
385 | bf_elementbits = 1; /* make sure k is not 0 */ | ||
386 | /* optimize BF-size to ~50% of bits set */ | ||
387 | bf_size = ceil ((double) (op->state->my_element_count | ||
388 | * bf_elementbits / log(2))); | ||
412 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 389 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
413 | "sending bf of size %u\n"); | 390 | "Sending bf of size %u\n", |
414 | 391 | (unsigned int) bf_size); | |
415 | CALCULATE_BF_SIZE(op->state->my_element_count, | ||
416 | op->spec->remote_element_count, | ||
417 | bf_size, | ||
418 | bf_elementbits); | ||
419 | |||
420 | local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, | 392 | local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, |
421 | bf_size, | 393 | bf_size, |
422 | bf_elementbits); | 394 | bf_elementbits); |
423 | 395 | op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, | |
424 | op->spec->salt++; | 396 | UINT32_MAX); |
425 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, | 397 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, |
426 | &iterator_bf_create, | 398 | &iterator_bf_create, |
427 | op); | 399 | op); |
428 | 400 | ||
429 | // send our bloomfilter | 401 | /* send our Bloom filter */ |
430 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) | 402 | chunk_size = 60 * 1024 - sizeof (struct BFMessage); |
403 | if (bf_size <= chunk_size) | ||
431 | { | 404 | { |
432 | // singlepart | 405 | /* singlepart */ |
433 | chunk_size = bf_size; | 406 | chunk_size = bf_size; |
434 | ev = GNUNET_MQ_msg_extra (msg, | 407 | ev = GNUNET_MQ_msg_extra (msg, |
435 | chunk_size, | 408 | chunk_size, |
436 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | 409 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); |
437 | GNUNET_assert (GNUNET_SYSERR != | 410 | GNUNET_assert (GNUNET_SYSERR != |
438 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, | 411 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, |
439 | (char*)&msg[1], | 412 | (char*) &msg[1], |
440 | bf_size)); | 413 | bf_size)); |
414 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
415 | msg->bloomfilter_total_length = htonl (bf_size); | ||
416 | msg->bits_per_element = htonl (bf_elementbits); | ||
417 | msg->sender_mutator = htonl (op->state->salt); | ||
418 | msg->element_xor_hash = op->state->my_xor; | ||
419 | GNUNET_MQ_send (op->mq, ev); | ||
441 | } | 420 | } |
442 | else | 421 | else |
443 | { | 422 | { |
444 | //multipart | 423 | /* multipart */ |
445 | chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); | 424 | bf_data = GNUNET_malloc (bf_size); |
446 | ev = GNUNET_MQ_msg_extra (msg, | ||
447 | chunk_size, | ||
448 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | ||
449 | op->state->bf_data = (char *) GNUNET_malloc (bf_size); | ||
450 | GNUNET_assert (GNUNET_SYSERR != | 425 | GNUNET_assert (GNUNET_SYSERR != |
451 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, | 426 | GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, |
452 | op->state->bf_data, | 427 | bf_data, |
453 | bf_size)); | 428 | bf_size)); |
454 | memcpy (&msg[1], op->state->bf_data, chunk_size); | 429 | offset = 0; |
455 | op->state->bf_data_size = bf_size; | 430 | while (offset < bf_size) |
431 | { | ||
432 | if (bf_size - chunk_size < offset) | ||
433 | chunk_size = bf_size - offset; | ||
434 | ev = GNUNET_MQ_msg_extra (msg, | ||
435 | chunk_size, | ||
436 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | ||
437 | memcpy (&msg[1], | ||
438 | &bf_data[offset], | ||
439 | chunk_size); | ||
440 | offset += chunk_size; | ||
441 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
442 | msg->bloomfilter_total_length = htonl (bf_size); | ||
443 | msg->bits_per_element = htonl (bf_elementbits); | ||
444 | msg->sender_mutator = htonl (op->state->salt); | ||
445 | msg->element_xor_hash = op->state->my_xor; | ||
446 | GNUNET_MQ_send (op->mq, ev); | ||
447 | } | ||
448 | GNUNET_free (bf_data); | ||
456 | } | 449 | } |
457 | GNUNET_CONTAINER_bloomfilter_free (local_bf); | 450 | GNUNET_CONTAINER_bloomfilter_free (local_bf); |
458 | |||
459 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
460 | msg->bloomfilter_total_length = htonl (bf_size); | ||
461 | msg->bloomfilter_length = htonl (chunk_size); | ||
462 | msg->bits_per_element = htonl (bf_elementbits); | ||
463 | msg->sender_mutator = htonl (op->spec->salt); | ||
464 | |||
465 | GNUNET_MQ_send (op->mq, ev); | ||
466 | |||
467 | if (op->state->bf_data) | ||
468 | send_bloomfilter_multipart (op, chunk_size); | ||
469 | } | 451 | } |
470 | 452 | ||
471 | 453 | ||
@@ -553,6 +535,7 @@ static void | |||
553 | send_peer_done (struct Operation *op) | 535 | send_peer_done (struct Operation *op) |
554 | { | 536 | { |
555 | struct GNUNET_MQ_Envelope *ev; | 537 | struct GNUNET_MQ_Envelope *ev; |
538 | struct IntersectionDoneMessage *idm; | ||
556 | 539 | ||
557 | op->state->phase = PHASE_FINISHED; | 540 | op->state->phase = PHASE_FINISHED; |
558 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 541 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -560,8 +543,12 @@ send_peer_done (struct Operation *op) | |||
560 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | 543 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); |
561 | op->state->local_bf = NULL; | 544 | op->state->local_bf = NULL; |
562 | 545 | ||
563 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 546 | ev = GNUNET_MQ_msg (idm, |
564 | GNUNET_MQ_send (op->mq, ev); | 547 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); |
548 | idm->final_element_count = htonl (op->state->my_element_count); | ||
549 | idm->element_xor_hash = op->state->my_xor; | ||
550 | GNUNET_MQ_send (op->mq, | ||
551 | ev); | ||
565 | } | 552 | } |
566 | 553 | ||
567 | 554 | ||
@@ -573,107 +560,49 @@ send_peer_done (struct Operation *op) | |||
573 | static void | 560 | static void |
574 | process_bf (struct Operation *op) | 561 | process_bf (struct Operation *op) |
575 | { | 562 | { |
576 | uint32_t old_elements; | ||
577 | uint32_t peer_elements; | ||
578 | |||
579 | old_elements = op->state->my_element_count; | ||
580 | peer_elements = op->spec->remote_element_count; | ||
581 | switch (op->state->phase) | 563 | switch (op->state->phase) |
582 | { | 564 | { |
583 | case PHASE_INITIAL: | 565 | case PHASE_INITIAL: |
584 | /* This is the first BF being sent, build our | 566 | /* This is the first BF being sent, build our initial map with |
585 | initial map with filtering in place */ | 567 | filtering in place */ |
586 | op->state->my_elements | 568 | op->state->my_elements |
587 | = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, | 569 | = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, |
588 | GNUNET_YES); | 570 | GNUNET_YES); |
571 | GNUNET_break (0 == op->state->my_element_count); | ||
589 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | 572 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, |
590 | &filtered_map_and_bf_initialization, | 573 | &filtered_map_initialization, |
591 | op); | 574 | op); |
592 | break; | 575 | break; |
593 | case PHASE_BF_EXCHANGE: | 576 | case PHASE_BF_EXCHANGE: |
594 | case PHASE_MAYBE_FINISHED: | ||
595 | /* Update our set by reduction */ | 577 | /* Update our set by reduction */ |
596 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, | 578 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, |
597 | &iterator_bf_reduce, | 579 | &iterator_bf_reduce, |
598 | op); | 580 | op); |
599 | break; | 581 | break; |
600 | default: | 582 | case PHASE_FINISHED: |
601 | GNUNET_break_op (0); | 583 | GNUNET_break_op (0); |
602 | fail_intersection_operation(op); | 584 | fail_intersection_operation(op); |
585 | return; | ||
603 | } | 586 | } |
604 | // the iterators created a new BF with salt+1 | ||
605 | // the peer needs this information for decoding the next BF | ||
606 | // this behavior can be modified at will later on. | ||
607 | op->spec->salt++; | ||
608 | |||
609 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | 587 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); |
610 | op->state->remote_bf = NULL; | 588 | op->state->remote_bf = NULL; |
611 | 589 | ||
612 | if ((0 == op->state->my_element_count) // fully disjoint | 590 | if ( (0 == op->state->my_element_count) || /* fully disjoint */ |
613 | || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements | 591 | ( (op->state->my_element_count == op->spec->remote_element_count) && |
614 | && (old_elements == op->state->my_element_count) | 592 | (0 == memcmp (&op->state->my_xor, |
615 | && (op->state->my_element_count == peer_elements))) | 593 | &op->state->other_xor, |
594 | sizeof (struct GNUNET_HashCode))) ) ) | ||
616 | { | 595 | { |
617 | // In the last round we though we were finished, we now know this is correct | 596 | /* we are done */ |
618 | send_peer_done (op); | 597 | send_peer_done (op); |
619 | return; | 598 | return; |
620 | } | 599 | } |
621 | |||
622 | op->state->phase = PHASE_BF_EXCHANGE; | 600 | op->state->phase = PHASE_BF_EXCHANGE; |
623 | if (op->state->my_element_count == peer_elements) | ||
624 | // maybe we are finished, but we do one more round to make certain | ||
625 | // we don't have false positives ... | ||
626 | op->state->phase = PHASE_MAYBE_FINISHED; | ||
627 | |||
628 | send_bloomfilter (op); | 601 | send_bloomfilter (op); |
629 | } | 602 | } |
630 | 603 | ||
631 | 604 | ||
632 | /** | 605 | /** |
633 | * Handle an BF multipart message from a remote peer. | ||
634 | * | ||
635 | * @param cls the intersection operation | ||
636 | * @param mh the header of the message | ||
637 | */ | ||
638 | static void | ||
639 | handle_p2p_bf_part (void *cls, | ||
640 | const struct GNUNET_MessageHeader *mh) | ||
641 | { | ||
642 | struct Operation *op = cls; | ||
643 | const struct BFPart *msg = (const struct BFPart *) mh; | ||
644 | uint32_t chunk_size; | ||
645 | uint32_t chunk_offset; | ||
646 | |||
647 | chunk_size = ntohl(msg->chunk_length); | ||
648 | chunk_offset = ntohl(msg->chunk_offset); | ||
649 | |||
650 | if ((NULL == op->state->bf_data) | ||
651 | || (op->state->bf_data_size < chunk_size + chunk_offset)) | ||
652 | { | ||
653 | // unexpected multipart chunk | ||
654 | GNUNET_break_op (0); | ||
655 | fail_intersection_operation(op); | ||
656 | return; | ||
657 | } | ||
658 | |||
659 | memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size); | ||
660 | |||
661 | if (op->state->bf_data_size != chunk_offset + chunk_size) | ||
662 | // wait for next chunk | ||
663 | return; | ||
664 | |||
665 | op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | ||
666 | op->state->bf_data_size, | ||
667 | op->state->bf_bits_per_element); | ||
668 | |||
669 | GNUNET_free (op->state->bf_data); | ||
670 | op->state->bf_data = NULL; | ||
671 | |||
672 | process_bf (op); | ||
673 | } | ||
674 | |||
675 | |||
676 | /** | ||
677 | * Handle an BF message from a remote peer. | 606 | * Handle an BF message from a remote peer. |
678 | * | 607 | * |
679 | * @param cls the intersection operation | 608 | * @param cls the intersection operation |
@@ -684,44 +613,94 @@ handle_p2p_bf (void *cls, | |||
684 | const struct GNUNET_MessageHeader *mh) | 613 | const struct GNUNET_MessageHeader *mh) |
685 | { | 614 | { |
686 | struct Operation *op = cls; | 615 | struct Operation *op = cls; |
687 | const struct BFMessage *msg = (const struct BFMessage *) mh; | 616 | const struct BFMessage *msg; |
688 | uint32_t bf_size; | 617 | uint32_t bf_size; |
689 | uint32_t chunk_size; | 618 | uint32_t chunk_size; |
690 | uint32_t bf_bits_per_element; | 619 | uint32_t bf_bits_per_element; |
620 | uint16_t msize; | ||
691 | 621 | ||
622 | msize = htons (mh->size); | ||
623 | if (msize < sizeof (struct BFMessage)) | ||
624 | { | ||
625 | GNUNET_break_op (0); | ||
626 | fail_intersection_operation (op); | ||
627 | return; | ||
628 | } | ||
629 | msg = (const struct BFMessage *) mh; | ||
692 | switch (op->state->phase) | 630 | switch (op->state->phase) |
693 | { | 631 | { |
694 | case PHASE_INITIAL: | 632 | case PHASE_INITIAL: |
633 | GNUNET_break_op (0); | ||
634 | fail_intersection_operation (op); | ||
635 | break; | ||
695 | case PHASE_BF_EXCHANGE: | 636 | case PHASE_BF_EXCHANGE: |
696 | case PHASE_MAYBE_FINISHED: | 637 | bf_size = ntohl (msg->bloomfilter_total_length); |
697 | if (NULL == op->state->bf_data) | 638 | bf_bits_per_element = ntohl (msg->bits_per_element); |
639 | chunk_size = msize - sizeof (struct BFMessage); | ||
640 | op->state->other_xor = msg->element_xor_hash; | ||
641 | if (bf_size == chunk_size) | ||
698 | { | 642 | { |
699 | // no colliding multipart transaction going on currently | 643 | if (NULL != op->state->bf_data) |
700 | op->spec->salt = ntohl (msg->sender_mutator); | ||
701 | bf_size = ntohl (msg->bloomfilter_total_length); | ||
702 | bf_bits_per_element = ntohl (msg->bits_per_element); | ||
703 | chunk_size = ntohl (msg->bloomfilter_length); | ||
704 | op->spec->remote_element_count = ntohl(msg->sender_element_count); | ||
705 | if (bf_size == chunk_size) | ||
706 | { | 644 | { |
707 | // single part, done here | 645 | GNUNET_break_op (0); |
708 | op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | 646 | fail_intersection_operation (op); |
709 | bf_size, | ||
710 | bf_bits_per_element); | ||
711 | process_bf (op); | ||
712 | return; | 647 | return; |
713 | } | 648 | } |
714 | 649 | /* single part, done here immediately */ | |
715 | //first multipart chunk | 650 | op->state->remote_bf |
651 | = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], | ||
652 | bf_size, | ||
653 | bf_bits_per_element); | ||
654 | op->state->salt = ntohl (msg->sender_mutator); | ||
655 | process_bf (op); | ||
656 | return; | ||
657 | } | ||
658 | /* multipart chunk */ | ||
659 | if (NULL == op->state->bf_data) | ||
660 | { | ||
661 | /* first chunk, initialize */ | ||
716 | op->state->bf_data = GNUNET_malloc (bf_size); | 662 | op->state->bf_data = GNUNET_malloc (bf_size); |
717 | op->state->bf_data_size = bf_size; | 663 | op->state->bf_data_size = bf_size; |
718 | op->state->bf_bits_per_element = bf_bits_per_element; | 664 | op->state->bf_bits_per_element = bf_bits_per_element; |
719 | memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size); | 665 | op->state->bf_data_offset = 0; |
720 | return; | 666 | op->state->salt = ntohl (msg->sender_mutator); |
667 | op->spec->remote_element_count = ntohl (msg->sender_element_count); | ||
668 | } | ||
669 | else | ||
670 | { | ||
671 | /* increment */ | ||
672 | if ( (op->state->bf_data_size != bf_size) || | ||
673 | (op->state->bf_bits_per_element != bf_bits_per_element) || | ||
674 | (op->state->bf_data_offset + chunk_size > bf_size) || | ||
675 | (op->state->salt != ntohl (msg->sender_mutator)) || | ||
676 | (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) | ||
677 | { | ||
678 | GNUNET_break_op (0); | ||
679 | fail_intersection_operation (op); | ||
680 | return; | ||
681 | } | ||
682 | } | ||
683 | memcpy (&op->state->bf_data[op->state->bf_data_offset], | ||
684 | (const char*) &msg[1], | ||
685 | chunk_size); | ||
686 | op->state->bf_data_offset += chunk_size; | ||
687 | if (op->state->bf_data_offset == bf_size) | ||
688 | { | ||
689 | /* last chunk, run! */ | ||
690 | op->state->remote_bf | ||
691 | = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, | ||
692 | bf_size, | ||
693 | bf_bits_per_element); | ||
694 | GNUNET_free (op->state->bf_data); | ||
695 | op->state->bf_data = NULL; | ||
696 | op->state->bf_data_size = 0; | ||
697 | process_bf (op); | ||
721 | } | 698 | } |
699 | break; | ||
722 | default: | 700 | default: |
723 | GNUNET_break_op (0); | 701 | GNUNET_break_op (0); |
724 | fail_intersection_operation (op); | 702 | fail_intersection_operation (op); |
703 | break; | ||
725 | } | 704 | } |
726 | } | 705 | } |
727 | 706 | ||
@@ -736,9 +715,9 @@ handle_p2p_bf (void *cls, | |||
736 | * @return #GNUNET_YES (we should continue to iterate) | 715 | * @return #GNUNET_YES (we should continue to iterate) |
737 | */ | 716 | */ |
738 | static int | 717 | static int |
739 | initialize_map (void *cls, | 718 | initialize_map_unfiltered (void *cls, |
740 | const struct GNUNET_HashCode *key, | 719 | const struct GNUNET_HashCode *key, |
741 | void *value) | 720 | void *value) |
742 | { | 721 | { |
743 | struct ElementEntry *ee = value; | 722 | struct ElementEntry *ee = value; |
744 | struct Operation *op = cls; | 723 | struct Operation *op = cls; |
@@ -746,6 +725,9 @@ initialize_map (void *cls, | |||
746 | if ( (op->generation_created < ee->generation_removed) && | 725 | if ( (op->generation_created < ee->generation_removed) && |
747 | (op->generation_created >= ee->generation_added) ) | 726 | (op->generation_created >= ee->generation_added) ) |
748 | return GNUNET_YES; /* element not live in operation's generation */ | 727 | return GNUNET_YES; /* element not live in operation's generation */ |
728 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
729 | &ee->element_hash, | ||
730 | &op->state->my_xor); | ||
749 | GNUNET_break (GNUNET_YES == | 731 | GNUNET_break (GNUNET_YES == |
750 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, | 732 | GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, |
751 | &ee->element_hash, | 733 | &ee->element_hash, |
@@ -756,6 +738,48 @@ initialize_map (void *cls, | |||
756 | 738 | ||
757 | 739 | ||
758 | /** | 740 | /** |
741 | * Send our element count to the peer, in case our element count is | ||
742 | * lower than his. | ||
743 | * | ||
744 | * @param op intersection operation | ||
745 | */ | ||
746 | static void | ||
747 | send_element_count (struct Operation *op) | ||
748 | { | ||
749 | struct GNUNET_MQ_Envelope *ev; | ||
750 | struct IntersectionElementInfoMessage *msg; | ||
751 | |||
752 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
753 | "Sending our element count (bf_msg)\n"); | ||
754 | ev = GNUNET_MQ_msg (msg, | ||
755 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
756 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
757 | GNUNET_MQ_send (op->mq, ev); | ||
758 | } | ||
759 | |||
760 | |||
761 | /** | ||
762 | * We go first, initialize our map with all elements and | ||
763 | * send the first Bloom filter. | ||
764 | * | ||
765 | * @param op operation to start exchange for | ||
766 | */ | ||
767 | static void | ||
768 | begin_bf_exchange (struct Operation *op) | ||
769 | { | ||
770 | GNUNET_break (PHASE_INITIAL == op->state->phase); | ||
771 | op->state->phase = PHASE_BF_EXCHANGE; | ||
772 | op->state->my_elements | ||
773 | = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, | ||
774 | GNUNET_YES); | ||
775 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
776 | &initialize_map_unfiltered, | ||
777 | op); | ||
778 | send_bloomfilter (op); | ||
779 | } | ||
780 | |||
781 | |||
782 | /** | ||
759 | * Handle the initial `struct IntersectionElementInfoMessage` from a | 783 | * Handle the initial `struct IntersectionElementInfoMessage` from a |
760 | * remote peer. | 784 | * remote peer. |
761 | * | 785 | * |
@@ -786,41 +810,8 @@ handle_p2p_element_info (void *cls, | |||
786 | fail_intersection_operation(op); | 810 | fail_intersection_operation(op); |
787 | return; | 811 | return; |
788 | } | 812 | } |
789 | 813 | GNUNET_break (NULL == op->state->remote_bf); | |
790 | op->state->phase = PHASE_BF_EXCHANGE; | 814 | begin_bf_exchange (op); |
791 | // FIXME... -- why a new map here!? | ||
792 | op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, | ||
793 | GNUNET_YES); | ||
794 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
795 | &initialize_map, // FIXME: filtering!? | ||
796 | op); | ||
797 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | ||
798 | op->state->remote_bf = NULL; | ||
799 | |||
800 | if (op->state->my_element_count == ntohl (msg->sender_element_count)) | ||
801 | op->state->phase = PHASE_MAYBE_FINISHED; | ||
802 | |||
803 | send_bloomfilter (op); | ||
804 | } | ||
805 | |||
806 | |||
807 | /** | ||
808 | * Send our element count to the peer, in case our element count is lower than his | ||
809 | * | ||
810 | * @param op intersection operation | ||
811 | */ | ||
812 | static void | ||
813 | send_element_count (struct Operation *op) | ||
814 | { | ||
815 | struct GNUNET_MQ_Envelope *ev; | ||
816 | struct IntersectionElementInfoMessage *msg; | ||
817 | |||
818 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
819 | "Sending our element count (bf_msg)\n"); | ||
820 | ev = GNUNET_MQ_msg (msg, | ||
821 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | ||
822 | msg->sender_element_count = htonl (op->state->my_element_count); | ||
823 | GNUNET_MQ_send (op->mq, ev); | ||
824 | } | 815 | } |
825 | 816 | ||
826 | 817 | ||
@@ -850,6 +841,37 @@ finish_and_destroy (struct Operation *op) | |||
850 | 841 | ||
851 | 842 | ||
852 | /** | 843 | /** |
844 | * Remove all elements from our hashmap. | ||
845 | * | ||
846 | * @param cls closure with the `struct Operation *` | ||
847 | * @param key current key code | ||
848 | * @param value value in the hash map | ||
849 | * @return #GNUNET_YES (we should continue to iterate) | ||
850 | */ | ||
851 | static int | ||
852 | filter_all (void *cls, | ||
853 | const struct GNUNET_HashCode *key, | ||
854 | void *value) | ||
855 | { | ||
856 | struct Operation *op = cls; | ||
857 | struct ElementEntry *ee = value; | ||
858 | |||
859 | GNUNET_break (0 < op->state->my_element_count); | ||
860 | op->state->my_element_count--; | ||
861 | GNUNET_CRYPTO_hash_xor (&op->state->my_xor, | ||
862 | &ee->element_hash, | ||
863 | &op->state->my_xor); | ||
864 | GNUNET_assert (GNUNET_YES == | ||
865 | GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, | ||
866 | &ee->element_hash, | ||
867 | ee)); | ||
868 | send_client_removed_element (op, | ||
869 | &ee->element); | ||
870 | return GNUNET_YES; | ||
871 | } | ||
872 | |||
873 | |||
874 | /** | ||
853 | * Handle a done message from a remote peer | 875 | * Handle a done message from a remote peer |
854 | * | 876 | * |
855 | * @param cls the intersection operation | 877 | * @param cls the intersection operation |
@@ -860,17 +882,45 @@ handle_p2p_done (void *cls, | |||
860 | const struct GNUNET_MessageHeader *mh) | 882 | const struct GNUNET_MessageHeader *mh) |
861 | { | 883 | { |
862 | struct Operation *op = cls; | 884 | struct Operation *op = cls; |
885 | const struct IntersectionDoneMessage *idm; | ||
863 | 886 | ||
864 | if ( (op->state->phase = PHASE_FINISHED) || | 887 | if (PHASE_BF_EXCHANGE != op->state->phase) |
865 | (op->state->phase = PHASE_MAYBE_FINISHED) ) | ||
866 | { | 888 | { |
867 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 889 | /* wrong phase to conclude? FIXME: Or should we allow this |
868 | "Got final DONE\n"); | 890 | if the other peer has _initially_ already an empty set? */ |
869 | finish_and_destroy (op); | 891 | GNUNET_break_op (0); |
892 | fail_intersection_operation (op); | ||
893 | return; | ||
894 | } | ||
895 | if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) | ||
896 | { | ||
897 | GNUNET_break_op (0); | ||
898 | fail_intersection_operation (op); | ||
870 | return; | 899 | return; |
871 | } | 900 | } |
872 | GNUNET_break_op (0); | 901 | idm = (const struct IntersectionDoneMessage *) mh; |
873 | fail_intersection_operation (op); | 902 | if (0 == ntohl (idm->final_element_count)) |
903 | { | ||
904 | /* other peer determined empty set is the intersection, | ||
905 | remove all elements */ | ||
906 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
907 | &filter_all, | ||
908 | op); | ||
909 | } | ||
910 | if ( (op->state->my_element_count != ntohl (idm->final_element_count)) || | ||
911 | (0 != memcmp (&op->state->my_xor, | ||
912 | &idm->element_xor_hash, | ||
913 | sizeof (struct GNUNET_HashCode))) ) | ||
914 | { | ||
915 | /* Other peer thinks we are done, but we disagree on the result! */ | ||
916 | GNUNET_break_op (0); | ||
917 | fail_intersection_operation (op); | ||
918 | return; | ||
919 | } | ||
920 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
921 | "Got final DONE\n"); | ||
922 | op->state->phase = PHASE_FINISHED; | ||
923 | finish_and_destroy (op); | ||
874 | } | 924 | } |
875 | 925 | ||
876 | 926 | ||
@@ -892,7 +942,6 @@ intersection_evaluate (struct Operation *op, | |||
892 | op->state = GNUNET_new (struct OperationState); | 942 | op->state = GNUNET_new (struct OperationState); |
893 | /* we started the operation, thus we have to send the operation request */ | 943 | /* we started the operation, thus we have to send the operation request */ |
894 | op->state->phase = PHASE_INITIAL; | 944 | op->state->phase = PHASE_INITIAL; |
895 | op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); | ||
896 | op->state->my_element_count = op->spec->set->state->current_set_element_count; | 945 | op->state->my_element_count = op->spec->set->state->current_set_element_count; |
897 | 946 | ||
898 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 947 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -909,8 +958,6 @@ intersection_evaluate (struct Operation *op, | |||
909 | } | 958 | } |
910 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); | 959 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); |
911 | msg->app_id = op->spec->app_id; | 960 | msg->app_id = op->spec->app_id; |
912 | // FIXME: where does this 'salt' come from? | ||
913 | msg->salt = htonl (op->spec->salt); | ||
914 | msg->element_count = htonl (op->state->my_element_count); | 961 | msg->element_count = htonl (op->state->my_element_count); |
915 | GNUNET_MQ_send (op->mq, | 962 | GNUNET_MQ_send (op->mq, |
916 | ev); | 963 | ev); |
@@ -935,29 +982,23 @@ intersection_accept (struct Operation *op) | |||
935 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 982 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
936 | "Accepting set intersection operation\n"); | 983 | "Accepting set intersection operation\n"); |
937 | op->state = GNUNET_new (struct OperationState); | 984 | op->state = GNUNET_new (struct OperationState); |
985 | op->state->phase = PHASE_INITIAL; | ||
938 | op->state->my_element_count | 986 | op->state->my_element_count |
939 | = op->spec->set->state->current_set_element_count; | 987 | = op->spec->set->state->current_set_element_count; |
940 | op->state->my_elements | 988 | op->state->my_elements |
941 | = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, | 989 | = GNUNET_CONTAINER_multihashmap_create |
942 | op->spec->remote_element_count), | 990 | (GNUNET_MIN (op->state->my_element_count, |
943 | GNUNET_YES); | 991 | op->spec->remote_element_count), |
992 | GNUNET_YES); | ||
944 | if (op->spec->remote_element_count < op->state->my_element_count) | 993 | if (op->spec->remote_element_count < op->state->my_element_count) |
945 | { | 994 | { |
946 | /* If the other peer (Alice) has fewer elements than us (Bob), | 995 | /* If the other peer (Alice) has fewer elements than us (Bob), |
947 | we just send the count as Alice should send the first BF */ | 996 | we just send the count as Alice should send the first BF */ |
948 | op->state->phase = PHASE_INITIAL; | ||
949 | send_element_count (op); | 997 | send_element_count (op); |
950 | return; | 998 | return; |
951 | } | 999 | } |
952 | /* We have fewer elements, so we start with the BF */ | 1000 | /* We have fewer elements, so we start with the BF */ |
953 | op->state->phase = PHASE_BF_EXCHANGE; | 1001 | begin_bf_exchange (op); |
954 | op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, | ||
955 | BLOOMFILTER_SIZE, | ||
956 | GNUNET_CONSTANTS_BLOOMFILTER_K); | ||
957 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, | ||
958 | &initialize_map, | ||
959 | op); | ||
960 | send_bloomfilter (op); | ||
961 | } | 1002 | } |
962 | 1003 | ||
963 | 1004 | ||
@@ -987,10 +1028,7 @@ intersection_handle_p2p_message (struct Operation *op, | |||
987 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: | 1028 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: |
988 | handle_p2p_bf (op, mh); | 1029 | handle_p2p_bf (op, mh); |
989 | break; | 1030 | break; |
990 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART: | 1031 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE: |
991 | handle_p2p_bf_part (op, mh); | ||
992 | break; | ||
993 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | ||
994 | handle_p2p_done (op, mh); | 1032 | handle_p2p_done (op, mh); |
995 | break; | 1033 | break; |
996 | default: | 1034 | default: |
diff --git a/src/set/gnunet-service-set_protocol.h b/src/set/gnunet-service-set_protocol.h index f21259804..1f14c06b0 100644 --- a/src/set/gnunet-service-set_protocol.h +++ b/src/set/gnunet-service-set_protocol.h | |||
@@ -17,9 +17,9 @@ | |||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | |||
21 | /** | 20 | /** |
22 | * @author Florian Dold | 21 | * @author Florian Dold |
22 | * @author Christian Grothoff | ||
23 | * @file set/gnunet-service-set_protocol.h | 23 | * @file set/gnunet-service-set_protocol.h |
24 | * @brief Peer-to-Peer messages for gnunet set | 24 | * @brief Peer-to-Peer messages for gnunet set |
25 | */ | 25 | */ |
@@ -45,11 +45,6 @@ struct OperationRequestMessage | |||
45 | uint32_t operation GNUNET_PACKED; | 45 | uint32_t operation GNUNET_PACKED; |
46 | 46 | ||
47 | /** | 47 | /** |
48 | * Salt to use for this operation. | ||
49 | */ | ||
50 | uint32_t salt GNUNET_PACKED; | ||
51 | |||
52 | /** | ||
53 | * For Intersection: my element count | 48 | * For Intersection: my element count |
54 | */ | 49 | */ |
55 | uint32_t element_count GNUNET_PACKED; | 50 | uint32_t element_count GNUNET_PACKED; |
@@ -126,27 +121,28 @@ struct BFMessage | |||
126 | struct GNUNET_MessageHeader header; | 121 | struct GNUNET_MessageHeader header; |
127 | 122 | ||
128 | /** | 123 | /** |
129 | * mutator used with this bloomfilter. | 124 | * Number of elements the sender still has in the set. |
130 | */ | 125 | */ |
131 | uint32_t sender_element_count GNUNET_PACKED; | 126 | uint32_t sender_element_count GNUNET_PACKED; |
132 | 127 | ||
133 | /** | 128 | /** |
134 | * mutator used with this bloomfilter. | 129 | * XOR of all hashes over all elements remaining in the set. |
130 | * Used to determine termination. | ||
135 | */ | 131 | */ |
136 | uint32_t sender_mutator GNUNET_PACKED; | 132 | struct GNUNET_HashCode element_xor_hash; |
137 | 133 | ||
138 | /** | 134 | /** |
139 | * Length of the bloomfilter data | 135 | * Mutator used with this bloomfilter. |
140 | */ | 136 | */ |
141 | uint32_t bloomfilter_total_length GNUNET_PACKED; | 137 | uint32_t sender_mutator GNUNET_PACKED; |
142 | 138 | ||
143 | /** | 139 | /** |
144 | * Length of the appended bloomfilter data block | 140 | * Total length of the bloomfilter data. |
145 | */ | 141 | */ |
146 | uint32_t bloomfilter_length GNUNET_PACKED; | 142 | uint32_t bloomfilter_total_length GNUNET_PACKED; |
147 | 143 | ||
148 | /** | 144 | /** |
149 | * Length of the bloomfilter data | 145 | * Number of bits (k-value) used in encoding the bloomfilter. |
150 | */ | 146 | */ |
151 | uint32_t bits_per_element GNUNET_PACKED; | 147 | uint32_t bits_per_element GNUNET_PACKED; |
152 | 148 | ||
@@ -156,26 +152,28 @@ struct BFMessage | |||
156 | }; | 152 | }; |
157 | 153 | ||
158 | 154 | ||
159 | struct BFPart | 155 | /** |
156 | * Last message, send to confirm the final set. Contains the element | ||
157 | * count as it is possible that the peer determined that we were done | ||
158 | * by getting the empty set, which in that case also needs to be | ||
159 | * communicated. | ||
160 | */ | ||
161 | struct IntersectionDoneMessage | ||
160 | { | 162 | { |
161 | /** | 163 | /** |
162 | * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF | 164 | * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE |
163 | */ | 165 | */ |
164 | struct GNUNET_MessageHeader header; | 166 | struct GNUNET_MessageHeader header; |
165 | 167 | ||
166 | /** | 168 | /** |
167 | * Length of the appended bloomfilter data block | 169 | * Final number of elements in intersection. |
168 | */ | 170 | */ |
169 | uint32_t chunk_length GNUNET_PACKED; | 171 | uint32_t final_element_count GNUNET_PACKED; |
170 | 172 | ||
171 | /** | 173 | /** |
172 | * offset in the bloolfilter data block, if multipart message | 174 | * XOR of all hashes over all elements remaining in the set. |
173 | */ | ||
174 | uint32_t chunk_offset GNUNET_PACKED; | ||
175 | |||
176 | /** | ||
177 | * rest: the sender's bloomfilter | ||
178 | */ | 175 | */ |
176 | struct GNUNET_HashCode element_xor_hash; | ||
179 | }; | 177 | }; |
180 | 178 | ||
181 | GNUNET_NETWORK_STRUCT_END | 179 | GNUNET_NETWORK_STRUCT_END |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index b1f65ddcf..459996eca 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -170,8 +170,7 @@ struct OperationState | |||
170 | 170 | ||
171 | 171 | ||
172 | /** | 172 | /** |
173 | * The key entry is used to associate an ibf key with | 173 | * The key entry is used to associate an ibf key with an element. |
174 | * an element. | ||
175 | */ | 174 | */ |
176 | struct KeyEntry | 175 | struct KeyEntry |
177 | { | 176 | { |
@@ -316,8 +315,8 @@ fail_union_operation (struct Operation *op) | |||
316 | struct GNUNET_MQ_Envelope *ev; | 315 | struct GNUNET_MQ_Envelope *ev; |
317 | struct GNUNET_SET_ResultMessage *msg; | 316 | struct GNUNET_SET_ResultMessage *msg; |
318 | 317 | ||
319 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n"); | 318 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
320 | 319 | "union operation failed\n"); | |
321 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 320 | ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
322 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 321 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
323 | msg->request_id = htonl (op->spec->client_request_id); | 322 | msg->request_id = htonl (op->spec->client_request_id); |
@@ -357,8 +356,7 @@ get_ibf_key (const struct GNUNET_HashCode *src, | |||
357 | * @param cls closure | 356 | * @param cls closure |
358 | * @param key current key code | 357 | * @param key current key code |
359 | * @param value value in the hash map | 358 | * @param value value in the hash map |
360 | * @return #GNUNET_YES if we should continue to | 359 | * @return #GNUNET_YES if we should continue to iterate, |
361 | * iterate, | ||
362 | * #GNUNET_NO if not. | 360 | * #GNUNET_NO if not. |
363 | */ | 361 | */ |
364 | static int | 362 | static int |
@@ -390,9 +388,7 @@ op_register_element_iterator (void *cls, | |||
390 | * @param cls closure | 388 | * @param cls closure |
391 | * @param key current key code | 389 | * @param key current key code |
392 | * @param value value in the hash map | 390 | * @param value value in the hash map |
393 | * @return #GNUNET_YES if we should continue to | 391 | * @return #GNUNET_YES (we should continue to iterate) |
394 | * iterate, | ||
395 | * #GNUNET_NO if not. | ||
396 | */ | 392 | */ |
397 | static int | 393 | static int |
398 | op_has_element_iterator (void *cls, | 394 | op_has_element_iterator (void *cls, |
@@ -405,7 +401,8 @@ op_has_element_iterator (void *cls, | |||
405 | GNUNET_assert (NULL != k); | 401 | GNUNET_assert (NULL != k); |
406 | while (NULL != k) | 402 | while (NULL != k) |
407 | { | 403 | { |
408 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash)) | 404 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, |
405 | element_hash)) | ||
409 | return GNUNET_NO; | 406 | return GNUNET_NO; |
410 | k = k->next_colliding; | 407 | k = k->next_colliding; |
411 | } | 408 | } |
@@ -506,12 +503,11 @@ prepare_ibf_iterator (void *cls, | |||
506 | * Iterator for initializing the | 503 | * Iterator for initializing the |
507 | * key-to-element mapping of a union operation | 504 | * key-to-element mapping of a union operation |
508 | * | 505 | * |
509 | * @param cls the union operation | 506 | * @param cls the union operation `struct Operation *` |
510 | * @param key unised | 507 | * @param key unused |
511 | * @param value the element entry to insert | 508 | * @param value the `struct ElementEntry *` to insert |
512 | * into the key-to-element mapping | 509 | * into the key-to-element mapping |
513 | * @return GNUNET_YES to continue iterating, | 510 | * @return #GNUNET_YES (to continue iterating) |
514 | * GNUNET_NO to stop | ||
515 | */ | 511 | */ |
516 | static int | 512 | static int |
517 | init_key_to_element_iterator (void *cls, | 513 | init_key_to_element_iterator (void *cls, |
@@ -543,7 +539,8 @@ init_key_to_element_iterator (void *cls, | |||
543 | * @param size size of the ibf to create | 539 | * @param size size of the ibf to create |
544 | */ | 540 | */ |
545 | static void | 541 | static void |
546 | prepare_ibf (struct Operation *op, uint16_t size) | 542 | prepare_ibf (struct Operation *op, |
543 | uint16_t size) | ||
547 | { | 544 | { |
548 | if (NULL == op->state->key_to_element) | 545 | if (NULL == op->state->key_to_element) |
549 | { | 546 | { |
@@ -557,7 +554,8 @@ prepare_ibf (struct Operation *op, uint16_t size) | |||
557 | ibf_destroy (op->state->local_ibf); | 554 | ibf_destroy (op->state->local_ibf); |
558 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 555 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
559 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | 556 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
560 | prepare_ibf_iterator, op->state->local_ibf); | 557 | &prepare_ibf_iterator, |
558 | op->state->local_ibf); | ||
561 | } | 559 | } |
562 | 560 | ||
563 | 561 | ||
@@ -568,7 +566,8 @@ prepare_ibf (struct Operation *op, uint16_t size) | |||
568 | * @param ibf_order order of the ibf to send, size=2^order | 566 | * @param ibf_order order of the ibf to send, size=2^order |
569 | */ | 567 | */ |
570 | static void | 568 | static void |
571 | send_ibf (struct Operation *op, uint16_t ibf_order) | 569 | send_ibf (struct Operation *op, |
570 | uint16_t ibf_order) | ||
572 | { | 571 | { |
573 | unsigned int buckets_sent = 0; | 572 | unsigned int buckets_sent = 0; |
574 | struct InvertibleBloomFilter *ibf; | 573 | struct InvertibleBloomFilter *ibf; |
@@ -647,7 +646,8 @@ get_order_from_difference (unsigned int diff) | |||
647 | unsigned int ibf_order; | 646 | unsigned int ibf_order; |
648 | 647 | ||
649 | ibf_order = 2; | 648 | ibf_order = 2; |
650 | while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM) | 649 | while ( (1<<ibf_order) < (IBF_ALPHA * diff) || |
650 | ((1<<ibf_order) < SE_IBF_HASH_NUM) ) | ||
651 | ibf_order++; | 651 | ibf_order++; |
652 | if (ibf_order > MAX_IBF_ORDER) | 652 | if (ibf_order > MAX_IBF_ORDER) |
653 | ibf_order = MAX_IBF_ORDER; | 653 | ibf_order = MAX_IBF_ORDER; |
@@ -662,7 +662,8 @@ get_order_from_difference (unsigned int diff) | |||
662 | * @param mh the message | 662 | * @param mh the message |
663 | */ | 663 | */ |
664 | static void | 664 | static void |
665 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | 665 | handle_p2p_strata_estimator (void *cls, |
666 | const struct GNUNET_MessageHeader *mh) | ||
666 | { | 667 | { |
667 | struct Operation *op = cls; | 668 | struct Operation *op = cls; |
668 | struct StrataEstimator *remote_se; | 669 | struct StrataEstimator *remote_se; |
@@ -843,7 +844,7 @@ decode_and_send (struct Operation *op) | |||
843 | 844 | ||
844 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 845 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
845 | "transmitted all values, sending DONE\n"); | 846 | "transmitted all values, sending DONE\n"); |
846 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 847 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); |
847 | GNUNET_MQ_send (op->mq, ev); | 848 | GNUNET_MQ_send (op->mq, ev); |
848 | break; | 849 | break; |
849 | } | 850 | } |
@@ -1201,7 +1202,7 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1201 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1202 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1202 | "got DONE, sending final DONE after elements\n"); | 1203 | "got DONE, sending final DONE after elements\n"); |
1203 | op->state->phase = PHASE_FINISHED; | 1204 | op->state->phase = PHASE_FINISHED; |
1204 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1205 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); |
1205 | GNUNET_MQ_send (op->mq, ev); | 1206 | GNUNET_MQ_send (op->mq, ev); |
1206 | return; | 1207 | return; |
1207 | } | 1208 | } |
@@ -1251,7 +1252,6 @@ union_evaluate (struct Operation *op, | |||
1251 | } | 1252 | } |
1252 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); | 1253 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); |
1253 | msg->app_id = op->spec->app_id; | 1254 | msg->app_id = op->spec->app_id; |
1254 | msg->salt = htonl (op->spec->salt); | ||
1255 | GNUNET_MQ_send (op->mq, ev); | 1255 | GNUNET_MQ_send (op->mq, ev); |
1256 | 1256 | ||
1257 | if (NULL != opaque_context) | 1257 | if (NULL != opaque_context) |
@@ -1379,7 +1379,7 @@ union_handle_p2p_message (struct Operation *op, | |||
1379 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: | 1379 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: |
1380 | handle_p2p_element_requests (op, mh); | 1380 | handle_p2p_element_requests (op, mh); |
1381 | break; | 1381 | break; |
1382 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | 1382 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: |
1383 | handle_p2p_done (op, mh); | 1383 | handle_p2p_done (op, mh); |
1384 | break; | 1384 | break; |
1385 | default: | 1385 | default: |