aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-11-28 20:52:20 +0000
committerChristian Grothoff <christian@grothoff.org>2014-11-28 20:52:20 +0000
commitf4c9c6514494547973b8962f22fce8266afd4992 (patch)
tree2211f91a0e80f143cb066ca365c6ddcbb02759cf /src/set
parent264c9b7f668d0429eaae01075c742ae6ad6f49ee (diff)
downloadgnunet-f4c9c6514494547973b8962f22fce8266afd4992.tar.gz
gnunet-f4c9c6514494547973b8962f22fce8266afd4992.zip
-fixing misc issues and bugs, including better termination logic for intersection and salt handling
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set.c4
-rw-r--r--src/set/gnunet-service-set_intersection.c588
-rw-r--r--src/set/gnunet-service-set_protocol.h46
-rw-r--r--src/set/gnunet-service-set_union.c48
4 files changed, 361 insertions, 325 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index d5c02d69b..8dc111a4c 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -1498,12 +1498,12 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1498 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, 1498 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1499 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, 1499 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1500 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, 1500 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1501 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, 1501 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
1502 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, 1502 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1503 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, 1503 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1504 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, 1504 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
1505 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, 1505 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
1506 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART, 0}, 1506 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0},
1507 {NULL, 0, 0} 1507 {NULL, 0, 0}
1508 }; 1508 };
1509 static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; 1509 static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 19d6498f5..fb3cd23b5 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -21,6 +21,7 @@
21 * @file set/gnunet-service-set_intersection.c 21 * @file set/gnunet-service-set_intersection.c
22 * @brief two-peer set intersection 22 * @brief two-peer set intersection
23 * @author Christian Fuchs 23 * @author Christian Fuchs
24 * @author Christian Grothoff
24 */ 25 */
25#include "platform.h" 26#include "platform.h"
26#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
@@ -29,24 +30,6 @@
29#include "gnunet-service-set_protocol.h" 30#include "gnunet-service-set_protocol.h"
30#include <gcrypt.h> 31#include <gcrypt.h>
31 32
32#define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH
33
34/**
35 * Calculate the size of the bloom filter.
36 *
37 * @param A
38 * @param B
39 * @param s
40 * @param k
41 * @return
42 */
43#define CALCULATE_BF_SIZE(A, B, s, k) \
44 do { \
45 k = ceil(1 + log2((double) (2*B / (double) A)));\
46 if (k<1) k=1; /* k can be calculated as 0 */\
47 s = ceil((double) (A * k / log(2))); \
48 } while (0)
49
50 33
51/** 34/**
52 * Current phase we are in for a intersection operation. 35 * Current phase we are in for a intersection operation.
@@ -61,21 +44,13 @@ enum IntersectionOperationPhase
61 44
62 /** 45 /**
63 * Bob has accepted the operation, Bob and Alice are now exchanging bfs 46 * Bob has accepted the operation, Bob and Alice are now exchanging bfs
64 * until one notices the their element count is equal 47 * until one notices the their element hashes are equal.
65 */ 48 */
66 PHASE_BF_EXCHANGE, 49 PHASE_BF_EXCHANGE,
67 50
68 /** 51 /**
69 * if both peers have an equal peercount, they enter this state for 52 * The protocol is over. Results may still have to be sent to the
70 * one more turn, to see if they actually have agreed on a correct set. 53 * client.
71 * if a peer finds the same element count after the next iteration,
72 * it ends the the session
73 */
74 PHASE_MAYBE_FINISHED,
75
76 /**
77 * The protocol is over.
78 * Results may still have to be sent to the client.
79 */ 54 */
80 PHASE_FINISHED 55 PHASE_FINISHED
81}; 56};
@@ -118,12 +93,33 @@ struct OperationState
118 struct OperationState *prev; 93 struct OperationState *prev;
119 94
120 /** 95 /**
121 * for multipart msgs we have to store the bloomfilter-data until we fully sent it. 96 * For multipart BF transmissions, we have to store the
97 * bloomfilter-data until we fully received it.
122 */ 98 */
123 char *bf_data; 99 char *bf_data;
124 100
125 /** 101 /**
126 * Current element count contained within @e my_elements 102 * XOR of the keys of all of the elements (remaining) in my set.
103 * Always updated when elements are added or removed to
104 * @e my_elements.
105 */
106 struct GNUNET_HashCode my_xor;
107
108 /**
109 * XOR of the keys of all of the elements (remaining) in
110 * the other peer's set. Updated when we receive the
111 * other peer's Bloom filter.
112 */
113 struct GNUNET_HashCode other_xor;
114
115 /**
116 * How many bytes of @e bf_data are valid?
117 */
118 uint32_t bf_data_offset;
119
120 /**
121 * Current element count contained within @e my_elements.
122 * (May differ briefly during initialization.)
127 */ 123 */
128 uint32_t my_element_count; 124 uint32_t my_element_count;
129 125
@@ -138,6 +134,12 @@ struct OperationState
138 uint32_t bf_bits_per_element; 134 uint32_t bf_bits_per_element;
139 135
140 /** 136 /**
137 * Salt currently used for BF construction (by us or the other peer,
138 * depending on where we are in the code).
139 */
140 uint32_t salt;
141
142 /**
141 * Current state of the operation. 143 * Current state of the operation.
142 */ 144 */
143 enum IntersectionOperationPhase phase; 145 enum IntersectionOperationPhase phase;
@@ -162,7 +164,8 @@ struct OperationState
162struct SetState 164struct SetState
163{ 165{
164 /** 166 /**
165 * Number of currently valid elements in the set which have not been removed 167 * Number of currently valid elements in the set which have not been
168 * removed.
166 */ 169 */
167 uint32_t current_set_element_count; 170 uint32_t current_set_element_count;
168}; 171};
@@ -208,8 +211,7 @@ send_client_removed_element (struct Operation *op,
208 211
209 212
210/** 213/**
211 * Fills the "my_elements" hashmap with all relevant elements and 214 * Fills the "my_elements" hashmap with all relevant elements.
212 * adds their mutated hashes to our local bloomfilter with mutator+1.
213 * 215 *
214 * @param cls the `struct Operation *` we are performing 216 * @param cls the `struct Operation *` we are performing
215 * @param key current key code 217 * @param key current key code
@@ -217,9 +219,9 @@ send_client_removed_element (struct Operation *op,
217 * @return #GNUNET_YES (we should continue to iterate) 219 * @return #GNUNET_YES (we should continue to iterate)
218 */ 220 */
219static int 221static int
220filtered_map_and_bf_initialization (void *cls, 222filtered_map_initialization (void *cls,
221 const struct GNUNET_HashCode *key, 223 const struct GNUNET_HashCode *key,
222 void *value) 224 void *value)
223{ 225{
224 struct Operation *op = cls; 226 struct Operation *op = cls;
225 struct ElementEntry *ee = value; 227 struct ElementEntry *ee = value;
@@ -230,9 +232,8 @@ filtered_map_and_bf_initialization (void *cls,
230 return GNUNET_YES; /* element not valid in our operation's generation */ 232 return GNUNET_YES; /* element not valid in our operation's generation */
231 233
232 /* Test if element is in Bob's bloomfilter */ 234 /* Test if element is in Bob's bloomfilter */
233 // FIXME: where does this salt come from!?
234 GNUNET_BLOCK_mingle_hash (&ee->element_hash, 235 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
235 op->spec->salt, 236 op->state->salt,
236 &mutated_hash); 237 &mutated_hash);
237 if (GNUNET_NO == 238 if (GNUNET_NO ==
238 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, 239 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
@@ -244,6 +245,9 @@ filtered_map_and_bf_initialization (void *cls,
244 return GNUNET_YES; 245 return GNUNET_YES;
245 } 246 }
246 op->state->my_element_count++; 247 op->state->my_element_count++;
248 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
249 &ee->element_hash,
250 &op->state->my_xor);
247 GNUNET_break (GNUNET_YES == 251 GNUNET_break (GNUNET_YES ==
248 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, 252 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
249 &ee->element_hash, 253 &ee->element_hash,
@@ -272,15 +276,18 @@ iterator_bf_reduce (void *cls,
272 struct ElementEntry *ee = value; 276 struct ElementEntry *ee = value;
273 struct GNUNET_HashCode mutated_hash; 277 struct GNUNET_HashCode mutated_hash;
274 278
275 // FIXME: where does this salt come from!?
276 GNUNET_BLOCK_mingle_hash (&ee->element_hash, 279 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
277 op->spec->salt, 280 op->state->salt,
278 &mutated_hash); 281 &mutated_hash);
279 if (GNUNET_NO == 282 if (GNUNET_NO ==
280 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, 283 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
281 &mutated_hash)) 284 &mutated_hash))
282 { 285 {
286 GNUNET_break (0 < op->state->my_element_count);
283 op->state->my_element_count--; 287 op->state->my_element_count--;
288 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
289 &ee->element_hash,
290 &op->state->my_xor);
284 GNUNET_assert (GNUNET_YES == 291 GNUNET_assert (GNUNET_YES ==
285 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, 292 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
286 &ee->element_hash, 293 &ee->element_hash,
@@ -309,9 +316,8 @@ iterator_bf_create (void *cls,
309 struct ElementEntry *ee = value; 316 struct ElementEntry *ee = value;
310 struct GNUNET_HashCode mutated_hash; 317 struct GNUNET_HashCode mutated_hash;
311 318
312 // FIXME: where does this salt come from!?
313 GNUNET_BLOCK_mingle_hash (&ee->element_hash, 319 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
314 op->spec->salt, 320 op->state->salt,
315 &mutated_hash); 321 &mutated_hash);
316 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, 322 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
317 &mutated_hash); 323 &mutated_hash);
@@ -350,49 +356,6 @@ fail_intersection_operation (struct Operation *op)
350} 356}
351 357
352 358
353
354
355
356
357
358/**
359 *
360 * @param op
361 * @param offset
362 */
363static void
364send_bloomfilter_multipart (struct Operation *op,
365 uint32_t offset)
366{
367 struct GNUNET_MQ_Envelope *ev;
368 struct BFPart *msg;
369 uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart));
370 uint32_t todo_size = op->state->bf_data_size - offset;
371
372 if (todo_size < chunk_size)
373 chunk_size = todo_size;
374
375 ev = GNUNET_MQ_msg_extra (msg,
376 chunk_size,
377 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART);
378
379 msg->chunk_length = htonl (chunk_size);
380 msg->chunk_offset = htonl (offset);
381 memcpy(&msg[1], &op->state->bf_data[offset], chunk_size);
382
383 GNUNET_MQ_send (op->mq, ev);
384
385 if (op->state->bf_data_size == offset + chunk_size)
386 {
387 // done
388 GNUNET_free(op->state->bf_data);
389 op->state->bf_data = NULL;
390 return;
391 }
392 send_bloomfilter_multipart (op, offset + chunk_size);
393}
394
395
396/** 359/**
397 * Send a bloomfilter to our peer. After the result done message has 360 * Send a bloomfilter to our peer. After the result done message has
398 * been sent to the client, destroy the evaluate operation. 361 * been sent to the client, destroy the evaluate operation.
@@ -408,64 +371,83 @@ send_bloomfilter (struct Operation *op)
408 uint32_t bf_elementbits; 371 uint32_t bf_elementbits;
409 uint32_t chunk_size; 372 uint32_t chunk_size;
410 struct GNUNET_CONTAINER_BloomFilter *local_bf; 373 struct GNUNET_CONTAINER_BloomFilter *local_bf;
411 374 char *bf_data;
375 uint32_t offset;
376
377 /* We consider the ratio of the set sizes to determine
378 the number of bits per element, as the smaller set
379 should use more bits to maximize its set reduction
380 potential and minimize overall bandwidth consumption. */
381 bf_elementbits = 2 + ceil (log2((double)
382 (op->spec->remote_element_count /
383 (double) op->state->my_element_count)));
384 if (bf_elementbits < 1)
385 bf_elementbits = 1; /* make sure k is not 0 */
386 /* optimize BF-size to ~50% of bits set */
387 bf_size = ceil ((double) (op->state->my_element_count
388 * bf_elementbits / log(2)));
412 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413 "sending bf of size %u\n"); 390 "Sending bf of size %u\n",
414 391 (unsigned int) bf_size);
415 CALCULATE_BF_SIZE(op->state->my_element_count,
416 op->spec->remote_element_count,
417 bf_size,
418 bf_elementbits);
419
420 local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 392 local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
421 bf_size, 393 bf_size,
422 bf_elementbits); 394 bf_elementbits);
423 395 op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
424 op->spec->salt++; 396 UINT32_MAX);
425 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, 397 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
426 &iterator_bf_create, 398 &iterator_bf_create,
427 op); 399 op);
428 400
429 // send our bloomfilter 401 /* send our Bloom filter */
430 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) 402 chunk_size = 60 * 1024 - sizeof (struct BFMessage);
403 if (bf_size <= chunk_size)
431 { 404 {
432 // singlepart 405 /* singlepart */
433 chunk_size = bf_size; 406 chunk_size = bf_size;
434 ev = GNUNET_MQ_msg_extra (msg, 407 ev = GNUNET_MQ_msg_extra (msg,
435 chunk_size, 408 chunk_size,
436 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); 409 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
437 GNUNET_assert (GNUNET_SYSERR != 410 GNUNET_assert (GNUNET_SYSERR !=
438 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, 411 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
439 (char*)&msg[1], 412 (char*) &msg[1],
440 bf_size)); 413 bf_size));
414 msg->sender_element_count = htonl (op->state->my_element_count);
415 msg->bloomfilter_total_length = htonl (bf_size);
416 msg->bits_per_element = htonl (bf_elementbits);
417 msg->sender_mutator = htonl (op->state->salt);
418 msg->element_xor_hash = op->state->my_xor;
419 GNUNET_MQ_send (op->mq, ev);
441 } 420 }
442 else 421 else
443 { 422 {
444 //multipart 423 /* multipart */
445 chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); 424 bf_data = GNUNET_malloc (bf_size);
446 ev = GNUNET_MQ_msg_extra (msg,
447 chunk_size,
448 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
449 op->state->bf_data = (char *) GNUNET_malloc (bf_size);
450 GNUNET_assert (GNUNET_SYSERR != 425 GNUNET_assert (GNUNET_SYSERR !=
451 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, 426 GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf,
452 op->state->bf_data, 427 bf_data,
453 bf_size)); 428 bf_size));
454 memcpy (&msg[1], op->state->bf_data, chunk_size); 429 offset = 0;
455 op->state->bf_data_size = bf_size; 430 while (offset < bf_size)
431 {
432 if (bf_size - chunk_size < offset)
433 chunk_size = bf_size - offset;
434 ev = GNUNET_MQ_msg_extra (msg,
435 chunk_size,
436 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
437 memcpy (&msg[1],
438 &bf_data[offset],
439 chunk_size);
440 offset += chunk_size;
441 msg->sender_element_count = htonl (op->state->my_element_count);
442 msg->bloomfilter_total_length = htonl (bf_size);
443 msg->bits_per_element = htonl (bf_elementbits);
444 msg->sender_mutator = htonl (op->state->salt);
445 msg->element_xor_hash = op->state->my_xor;
446 GNUNET_MQ_send (op->mq, ev);
447 }
448 GNUNET_free (bf_data);
456 } 449 }
457 GNUNET_CONTAINER_bloomfilter_free (local_bf); 450 GNUNET_CONTAINER_bloomfilter_free (local_bf);
458
459 msg->sender_element_count = htonl (op->state->my_element_count);
460 msg->bloomfilter_total_length = htonl (bf_size);
461 msg->bloomfilter_length = htonl (chunk_size);
462 msg->bits_per_element = htonl (bf_elementbits);
463 msg->sender_mutator = htonl (op->spec->salt);
464
465 GNUNET_MQ_send (op->mq, ev);
466
467 if (op->state->bf_data)
468 send_bloomfilter_multipart (op, chunk_size);
469} 451}
470 452
471 453
@@ -553,6 +535,7 @@ static void
553send_peer_done (struct Operation *op) 535send_peer_done (struct Operation *op)
554{ 536{
555 struct GNUNET_MQ_Envelope *ev; 537 struct GNUNET_MQ_Envelope *ev;
538 struct IntersectionDoneMessage *idm;
556 539
557 op->state->phase = PHASE_FINISHED; 540 op->state->phase = PHASE_FINISHED;
558 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 541 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -560,8 +543,12 @@ send_peer_done (struct Operation *op)
560 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); 543 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
561 op->state->local_bf = NULL; 544 op->state->local_bf = NULL;
562 545
563 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 546 ev = GNUNET_MQ_msg (idm,
564 GNUNET_MQ_send (op->mq, ev); 547 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
548 idm->final_element_count = htonl (op->state->my_element_count);
549 idm->element_xor_hash = op->state->my_xor;
550 GNUNET_MQ_send (op->mq,
551 ev);
565} 552}
566 553
567 554
@@ -573,107 +560,49 @@ send_peer_done (struct Operation *op)
573static void 560static void
574process_bf (struct Operation *op) 561process_bf (struct Operation *op)
575{ 562{
576 uint32_t old_elements;
577 uint32_t peer_elements;
578
579 old_elements = op->state->my_element_count;
580 peer_elements = op->spec->remote_element_count;
581 switch (op->state->phase) 563 switch (op->state->phase)
582 { 564 {
583 case PHASE_INITIAL: 565 case PHASE_INITIAL:
584 /* This is the first BF being sent, build our 566 /* This is the first BF being sent, build our initial map with
585 initial map with filtering in place */ 567 filtering in place */
586 op->state->my_elements 568 op->state->my_elements
587 = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, 569 = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
588 GNUNET_YES); 570 GNUNET_YES);
571 GNUNET_break (0 == op->state->my_element_count);
589 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, 572 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
590 &filtered_map_and_bf_initialization, 573 &filtered_map_initialization,
591 op); 574 op);
592 break; 575 break;
593 case PHASE_BF_EXCHANGE: 576 case PHASE_BF_EXCHANGE:
594 case PHASE_MAYBE_FINISHED:
595 /* Update our set by reduction */ 577 /* Update our set by reduction */
596 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, 578 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
597 &iterator_bf_reduce, 579 &iterator_bf_reduce,
598 op); 580 op);
599 break; 581 break;
600 default: 582 case PHASE_FINISHED:
601 GNUNET_break_op (0); 583 GNUNET_break_op (0);
602 fail_intersection_operation(op); 584 fail_intersection_operation(op);
585 return;
603 } 586 }
604 // the iterators created a new BF with salt+1
605 // the peer needs this information for decoding the next BF
606 // this behavior can be modified at will later on.
607 op->spec->salt++;
608
609 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); 587 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
610 op->state->remote_bf = NULL; 588 op->state->remote_bf = NULL;
611 589
612 if ((0 == op->state->my_element_count) // fully disjoint 590 if ( (0 == op->state->my_element_count) || /* fully disjoint */
613 || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements 591 ( (op->state->my_element_count == op->spec->remote_element_count) &&
614 && (old_elements == op->state->my_element_count) 592 (0 == memcmp (&op->state->my_xor,
615 && (op->state->my_element_count == peer_elements))) 593 &op->state->other_xor,
594 sizeof (struct GNUNET_HashCode))) ) )
616 { 595 {
617 // In the last round we though we were finished, we now know this is correct 596 /* we are done */
618 send_peer_done (op); 597 send_peer_done (op);
619 return; 598 return;
620 } 599 }
621
622 op->state->phase = PHASE_BF_EXCHANGE; 600 op->state->phase = PHASE_BF_EXCHANGE;
623 if (op->state->my_element_count == peer_elements)
624 // maybe we are finished, but we do one more round to make certain
625 // we don't have false positives ...
626 op->state->phase = PHASE_MAYBE_FINISHED;
627
628 send_bloomfilter (op); 601 send_bloomfilter (op);
629} 602}
630 603
631 604
632/** 605/**
633 * Handle an BF multipart message from a remote peer.
634 *
635 * @param cls the intersection operation
636 * @param mh the header of the message
637 */
638static void
639handle_p2p_bf_part (void *cls,
640 const struct GNUNET_MessageHeader *mh)
641{
642 struct Operation *op = cls;
643 const struct BFPart *msg = (const struct BFPart *) mh;
644 uint32_t chunk_size;
645 uint32_t chunk_offset;
646
647 chunk_size = ntohl(msg->chunk_length);
648 chunk_offset = ntohl(msg->chunk_offset);
649
650 if ((NULL == op->state->bf_data)
651 || (op->state->bf_data_size < chunk_size + chunk_offset))
652 {
653 // unexpected multipart chunk
654 GNUNET_break_op (0);
655 fail_intersection_operation(op);
656 return;
657 }
658
659 memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size);
660
661 if (op->state->bf_data_size != chunk_offset + chunk_size)
662 // wait for next chunk
663 return;
664
665 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
666 op->state->bf_data_size,
667 op->state->bf_bits_per_element);
668
669 GNUNET_free (op->state->bf_data);
670 op->state->bf_data = NULL;
671
672 process_bf (op);
673}
674
675
676/**
677 * Handle an BF message from a remote peer. 606 * Handle an BF message from a remote peer.
678 * 607 *
679 * @param cls the intersection operation 608 * @param cls the intersection operation
@@ -684,44 +613,94 @@ handle_p2p_bf (void *cls,
684 const struct GNUNET_MessageHeader *mh) 613 const struct GNUNET_MessageHeader *mh)
685{ 614{
686 struct Operation *op = cls; 615 struct Operation *op = cls;
687 const struct BFMessage *msg = (const struct BFMessage *) mh; 616 const struct BFMessage *msg;
688 uint32_t bf_size; 617 uint32_t bf_size;
689 uint32_t chunk_size; 618 uint32_t chunk_size;
690 uint32_t bf_bits_per_element; 619 uint32_t bf_bits_per_element;
620 uint16_t msize;
691 621
622 msize = htons (mh->size);
623 if (msize < sizeof (struct BFMessage))
624 {
625 GNUNET_break_op (0);
626 fail_intersection_operation (op);
627 return;
628 }
629 msg = (const struct BFMessage *) mh;
692 switch (op->state->phase) 630 switch (op->state->phase)
693 { 631 {
694 case PHASE_INITIAL: 632 case PHASE_INITIAL:
633 GNUNET_break_op (0);
634 fail_intersection_operation (op);
635 break;
695 case PHASE_BF_EXCHANGE: 636 case PHASE_BF_EXCHANGE:
696 case PHASE_MAYBE_FINISHED: 637 bf_size = ntohl (msg->bloomfilter_total_length);
697 if (NULL == op->state->bf_data) 638 bf_bits_per_element = ntohl (msg->bits_per_element);
639 chunk_size = msize - sizeof (struct BFMessage);
640 op->state->other_xor = msg->element_xor_hash;
641 if (bf_size == chunk_size)
698 { 642 {
699 // no colliding multipart transaction going on currently 643 if (NULL != op->state->bf_data)
700 op->spec->salt = ntohl (msg->sender_mutator);
701 bf_size = ntohl (msg->bloomfilter_total_length);
702 bf_bits_per_element = ntohl (msg->bits_per_element);
703 chunk_size = ntohl (msg->bloomfilter_length);
704 op->spec->remote_element_count = ntohl(msg->sender_element_count);
705 if (bf_size == chunk_size)
706 { 644 {
707 // single part, done here 645 GNUNET_break_op (0);
708 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], 646 fail_intersection_operation (op);
709 bf_size,
710 bf_bits_per_element);
711 process_bf (op);
712 return; 647 return;
713 } 648 }
714 649 /* single part, done here immediately */
715 //first multipart chunk 650 op->state->remote_bf
651 = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
652 bf_size,
653 bf_bits_per_element);
654 op->state->salt = ntohl (msg->sender_mutator);
655 process_bf (op);
656 return;
657 }
658 /* multipart chunk */
659 if (NULL == op->state->bf_data)
660 {
661 /* first chunk, initialize */
716 op->state->bf_data = GNUNET_malloc (bf_size); 662 op->state->bf_data = GNUNET_malloc (bf_size);
717 op->state->bf_data_size = bf_size; 663 op->state->bf_data_size = bf_size;
718 op->state->bf_bits_per_element = bf_bits_per_element; 664 op->state->bf_bits_per_element = bf_bits_per_element;
719 memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size); 665 op->state->bf_data_offset = 0;
720 return; 666 op->state->salt = ntohl (msg->sender_mutator);
667 op->spec->remote_element_count = ntohl (msg->sender_element_count);
668 }
669 else
670 {
671 /* increment */
672 if ( (op->state->bf_data_size != bf_size) ||
673 (op->state->bf_bits_per_element != bf_bits_per_element) ||
674 (op->state->bf_data_offset + chunk_size > bf_size) ||
675 (op->state->salt != ntohl (msg->sender_mutator)) ||
676 (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
677 {
678 GNUNET_break_op (0);
679 fail_intersection_operation (op);
680 return;
681 }
682 }
683 memcpy (&op->state->bf_data[op->state->bf_data_offset],
684 (const char*) &msg[1],
685 chunk_size);
686 op->state->bf_data_offset += chunk_size;
687 if (op->state->bf_data_offset == bf_size)
688 {
689 /* last chunk, run! */
690 op->state->remote_bf
691 = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
692 bf_size,
693 bf_bits_per_element);
694 GNUNET_free (op->state->bf_data);
695 op->state->bf_data = NULL;
696 op->state->bf_data_size = 0;
697 process_bf (op);
721 } 698 }
699 break;
722 default: 700 default:
723 GNUNET_break_op (0); 701 GNUNET_break_op (0);
724 fail_intersection_operation (op); 702 fail_intersection_operation (op);
703 break;
725 } 704 }
726} 705}
727 706
@@ -736,9 +715,9 @@ handle_p2p_bf (void *cls,
736 * @return #GNUNET_YES (we should continue to iterate) 715 * @return #GNUNET_YES (we should continue to iterate)
737 */ 716 */
738static int 717static int
739initialize_map (void *cls, 718initialize_map_unfiltered (void *cls,
740 const struct GNUNET_HashCode *key, 719 const struct GNUNET_HashCode *key,
741 void *value) 720 void *value)
742{ 721{
743 struct ElementEntry *ee = value; 722 struct ElementEntry *ee = value;
744 struct Operation *op = cls; 723 struct Operation *op = cls;
@@ -746,6 +725,9 @@ initialize_map (void *cls,
746 if ( (op->generation_created < ee->generation_removed) && 725 if ( (op->generation_created < ee->generation_removed) &&
747 (op->generation_created >= ee->generation_added) ) 726 (op->generation_created >= ee->generation_added) )
748 return GNUNET_YES; /* element not live in operation's generation */ 727 return GNUNET_YES; /* element not live in operation's generation */
728 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
729 &ee->element_hash,
730 &op->state->my_xor);
749 GNUNET_break (GNUNET_YES == 731 GNUNET_break (GNUNET_YES ==
750 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, 732 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
751 &ee->element_hash, 733 &ee->element_hash,
@@ -756,6 +738,48 @@ initialize_map (void *cls,
756 738
757 739
758/** 740/**
741 * Send our element count to the peer, in case our element count is
742 * lower than his.
743 *
744 * @param op intersection operation
745 */
746static void
747send_element_count (struct Operation *op)
748{
749 struct GNUNET_MQ_Envelope *ev;
750 struct IntersectionElementInfoMessage *msg;
751
752 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
753 "Sending our element count (bf_msg)\n");
754 ev = GNUNET_MQ_msg (msg,
755 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
756 msg->sender_element_count = htonl (op->state->my_element_count);
757 GNUNET_MQ_send (op->mq, ev);
758}
759
760
761/**
762 * We go first, initialize our map with all elements and
763 * send the first Bloom filter.
764 *
765 * @param op operation to start exchange for
766 */
767static void
768begin_bf_exchange (struct Operation *op)
769{
770 GNUNET_break (PHASE_INITIAL == op->state->phase);
771 op->state->phase = PHASE_BF_EXCHANGE;
772 op->state->my_elements
773 = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
774 GNUNET_YES);
775 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
776 &initialize_map_unfiltered,
777 op);
778 send_bloomfilter (op);
779}
780
781
782/**
759 * Handle the initial `struct IntersectionElementInfoMessage` from a 783 * Handle the initial `struct IntersectionElementInfoMessage` from a
760 * remote peer. 784 * remote peer.
761 * 785 *
@@ -786,41 +810,8 @@ handle_p2p_element_info (void *cls,
786 fail_intersection_operation(op); 810 fail_intersection_operation(op);
787 return; 811 return;
788 } 812 }
789 813 GNUNET_break (NULL == op->state->remote_bf);
790 op->state->phase = PHASE_BF_EXCHANGE; 814 begin_bf_exchange (op);
791 // FIXME... -- why a new map here!?
792 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1,
793 GNUNET_YES);
794 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
795 &initialize_map, // FIXME: filtering!?
796 op);
797 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
798 op->state->remote_bf = NULL;
799
800 if (op->state->my_element_count == ntohl (msg->sender_element_count))
801 op->state->phase = PHASE_MAYBE_FINISHED;
802
803 send_bloomfilter (op);
804}
805
806
807/**
808 * Send our element count to the peer, in case our element count is lower than his
809 *
810 * @param op intersection operation
811 */
812static void
813send_element_count (struct Operation *op)
814{
815 struct GNUNET_MQ_Envelope *ev;
816 struct IntersectionElementInfoMessage *msg;
817
818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819 "Sending our element count (bf_msg)\n");
820 ev = GNUNET_MQ_msg (msg,
821 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
822 msg->sender_element_count = htonl (op->state->my_element_count);
823 GNUNET_MQ_send (op->mq, ev);
824} 815}
825 816
826 817
@@ -850,6 +841,37 @@ finish_and_destroy (struct Operation *op)
850 841
851 842
852/** 843/**
844 * Remove all elements from our hashmap.
845 *
846 * @param cls closure with the `struct Operation *`
847 * @param key current key code
848 * @param value value in the hash map
849 * @return #GNUNET_YES (we should continue to iterate)
850 */
851static int
852filter_all (void *cls,
853 const struct GNUNET_HashCode *key,
854 void *value)
855{
856 struct Operation *op = cls;
857 struct ElementEntry *ee = value;
858
859 GNUNET_break (0 < op->state->my_element_count);
860 op->state->my_element_count--;
861 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
862 &ee->element_hash,
863 &op->state->my_xor);
864 GNUNET_assert (GNUNET_YES ==
865 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
866 &ee->element_hash,
867 ee));
868 send_client_removed_element (op,
869 &ee->element);
870 return GNUNET_YES;
871}
872
873
874/**
853 * Handle a done message from a remote peer 875 * Handle a done message from a remote peer
854 * 876 *
855 * @param cls the intersection operation 877 * @param cls the intersection operation
@@ -860,17 +882,45 @@ handle_p2p_done (void *cls,
860 const struct GNUNET_MessageHeader *mh) 882 const struct GNUNET_MessageHeader *mh)
861{ 883{
862 struct Operation *op = cls; 884 struct Operation *op = cls;
885 const struct IntersectionDoneMessage *idm;
863 886
864 if ( (op->state->phase = PHASE_FINISHED) || 887 if (PHASE_BF_EXCHANGE != op->state->phase)
865 (op->state->phase = PHASE_MAYBE_FINISHED) )
866 { 888 {
867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 889 /* wrong phase to conclude? FIXME: Or should we allow this
868 "Got final DONE\n"); 890 if the other peer has _initially_ already an empty set? */
869 finish_and_destroy (op); 891 GNUNET_break_op (0);
892 fail_intersection_operation (op);
893 return;
894 }
895 if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
896 {
897 GNUNET_break_op (0);
898 fail_intersection_operation (op);
870 return; 899 return;
871 } 900 }
872 GNUNET_break_op (0); 901 idm = (const struct IntersectionDoneMessage *) mh;
873 fail_intersection_operation (op); 902 if (0 == ntohl (idm->final_element_count))
903 {
904 /* other peer determined empty set is the intersection,
905 remove all elements */
906 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
907 &filter_all,
908 op);
909 }
910 if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
911 (0 != memcmp (&op->state->my_xor,
912 &idm->element_xor_hash,
913 sizeof (struct GNUNET_HashCode))) )
914 {
915 /* Other peer thinks we are done, but we disagree on the result! */
916 GNUNET_break_op (0);
917 fail_intersection_operation (op);
918 return;
919 }
920 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921 "Got final DONE\n");
922 op->state->phase = PHASE_FINISHED;
923 finish_and_destroy (op);
874} 924}
875 925
876 926
@@ -892,7 +942,6 @@ intersection_evaluate (struct Operation *op,
892 op->state = GNUNET_new (struct OperationState); 942 op->state = GNUNET_new (struct OperationState);
893 /* we started the operation, thus we have to send the operation request */ 943 /* we started the operation, thus we have to send the operation request */
894 op->state->phase = PHASE_INITIAL; 944 op->state->phase = PHASE_INITIAL;
895 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
896 op->state->my_element_count = op->spec->set->state->current_set_element_count; 945 op->state->my_element_count = op->spec->set->state->current_set_element_count;
897 946
898 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -909,8 +958,6 @@ intersection_evaluate (struct Operation *op,
909 } 958 }
910 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); 959 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
911 msg->app_id = op->spec->app_id; 960 msg->app_id = op->spec->app_id;
912 // FIXME: where does this 'salt' come from?
913 msg->salt = htonl (op->spec->salt);
914 msg->element_count = htonl (op->state->my_element_count); 961 msg->element_count = htonl (op->state->my_element_count);
915 GNUNET_MQ_send (op->mq, 962 GNUNET_MQ_send (op->mq,
916 ev); 963 ev);
@@ -935,29 +982,23 @@ intersection_accept (struct Operation *op)
935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 982 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
936 "Accepting set intersection operation\n"); 983 "Accepting set intersection operation\n");
937 op->state = GNUNET_new (struct OperationState); 984 op->state = GNUNET_new (struct OperationState);
985 op->state->phase = PHASE_INITIAL;
938 op->state->my_element_count 986 op->state->my_element_count
939 = op->spec->set->state->current_set_element_count; 987 = op->spec->set->state->current_set_element_count;
940 op->state->my_elements 988 op->state->my_elements
941 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, 989 = GNUNET_CONTAINER_multihashmap_create
942 op->spec->remote_element_count), 990 (GNUNET_MIN (op->state->my_element_count,
943 GNUNET_YES); 991 op->spec->remote_element_count),
992 GNUNET_YES);
944 if (op->spec->remote_element_count < op->state->my_element_count) 993 if (op->spec->remote_element_count < op->state->my_element_count)
945 { 994 {
946 /* If the other peer (Alice) has fewer elements than us (Bob), 995 /* If the other peer (Alice) has fewer elements than us (Bob),
947 we just send the count as Alice should send the first BF */ 996 we just send the count as Alice should send the first BF */
948 op->state->phase = PHASE_INITIAL;
949 send_element_count (op); 997 send_element_count (op);
950 return; 998 return;
951 } 999 }
952 /* We have fewer elements, so we start with the BF */ 1000 /* We have fewer elements, so we start with the BF */
953 op->state->phase = PHASE_BF_EXCHANGE; 1001 begin_bf_exchange (op);
954 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
955 BLOOMFILTER_SIZE,
956 GNUNET_CONSTANTS_BLOOMFILTER_K);
957 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
958 &initialize_map,
959 op);
960 send_bloomfilter (op);
961} 1002}
962 1003
963 1004
@@ -987,10 +1028,7 @@ intersection_handle_p2p_message (struct Operation *op,
987 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: 1028 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
988 handle_p2p_bf (op, mh); 1029 handle_p2p_bf (op, mh);
989 break; 1030 break;
990 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART: 1031 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
991 handle_p2p_bf_part (op, mh);
992 break;
993 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
994 handle_p2p_done (op, mh); 1032 handle_p2p_done (op, mh);
995 break; 1033 break;
996 default: 1034 default:
diff --git a/src/set/gnunet-service-set_protocol.h b/src/set/gnunet-service-set_protocol.h
index f21259804..1f14c06b0 100644
--- a/src/set/gnunet-service-set_protocol.h
+++ b/src/set/gnunet-service-set_protocol.h
@@ -17,9 +17,9 @@
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19*/ 19*/
20
21/** 20/**
22 * @author Florian Dold 21 * @author Florian Dold
22 * @author Christian Grothoff
23 * @file set/gnunet-service-set_protocol.h 23 * @file set/gnunet-service-set_protocol.h
24 * @brief Peer-to-Peer messages for gnunet set 24 * @brief Peer-to-Peer messages for gnunet set
25 */ 25 */
@@ -45,11 +45,6 @@ struct OperationRequestMessage
45 uint32_t operation GNUNET_PACKED; 45 uint32_t operation GNUNET_PACKED;
46 46
47 /** 47 /**
48 * Salt to use for this operation.
49 */
50 uint32_t salt GNUNET_PACKED;
51
52 /**
53 * For Intersection: my element count 48 * For Intersection: my element count
54 */ 49 */
55 uint32_t element_count GNUNET_PACKED; 50 uint32_t element_count GNUNET_PACKED;
@@ -126,27 +121,28 @@ struct BFMessage
126 struct GNUNET_MessageHeader header; 121 struct GNUNET_MessageHeader header;
127 122
128 /** 123 /**
129 * mutator used with this bloomfilter. 124 * Number of elements the sender still has in the set.
130 */ 125 */
131 uint32_t sender_element_count GNUNET_PACKED; 126 uint32_t sender_element_count GNUNET_PACKED;
132 127
133 /** 128 /**
134 * mutator used with this bloomfilter. 129 * XOR of all hashes over all elements remaining in the set.
130 * Used to determine termination.
135 */ 131 */
136 uint32_t sender_mutator GNUNET_PACKED; 132 struct GNUNET_HashCode element_xor_hash;
137 133
138 /** 134 /**
139 * Length of the bloomfilter data 135 * Mutator used with this bloomfilter.
140 */ 136 */
141 uint32_t bloomfilter_total_length GNUNET_PACKED; 137 uint32_t sender_mutator GNUNET_PACKED;
142 138
143 /** 139 /**
144 * Length of the appended bloomfilter data block 140 * Total length of the bloomfilter data.
145 */ 141 */
146 uint32_t bloomfilter_length GNUNET_PACKED; 142 uint32_t bloomfilter_total_length GNUNET_PACKED;
147 143
148 /** 144 /**
149 * Length of the bloomfilter data 145 * Number of bits (k-value) used in encoding the bloomfilter.
150 */ 146 */
151 uint32_t bits_per_element GNUNET_PACKED; 147 uint32_t bits_per_element GNUNET_PACKED;
152 148
@@ -156,26 +152,28 @@ struct BFMessage
156}; 152};
157 153
158 154
159struct BFPart 155/**
156 * Last message, send to confirm the final set. Contains the element
157 * count as it is possible that the peer determined that we were done
158 * by getting the empty set, which in that case also needs to be
159 * communicated.
160 */
161struct IntersectionDoneMessage
160{ 162{
161 /** 163 /**
162 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF 164 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
163 */ 165 */
164 struct GNUNET_MessageHeader header; 166 struct GNUNET_MessageHeader header;
165 167
166 /** 168 /**
167 * Length of the appended bloomfilter data block 169 * Final number of elements in intersection.
168 */ 170 */
169 uint32_t chunk_length GNUNET_PACKED; 171 uint32_t final_element_count GNUNET_PACKED;
170 172
171 /** 173 /**
172 * offset in the bloolfilter data block, if multipart message 174 * XOR of all hashes over all elements remaining in the set.
173 */
174 uint32_t chunk_offset GNUNET_PACKED;
175
176 /**
177 * rest: the sender's bloomfilter
178 */ 175 */
176 struct GNUNET_HashCode element_xor_hash;
179}; 177};
180 178
181GNUNET_NETWORK_STRUCT_END 179GNUNET_NETWORK_STRUCT_END
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index b1f65ddcf..459996eca 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -170,8 +170,7 @@ struct OperationState
170 170
171 171
172/** 172/**
173 * The key entry is used to associate an ibf key with 173 * The key entry is used to associate an ibf key with an element.
174 * an element.
175 */ 174 */
176struct KeyEntry 175struct KeyEntry
177{ 176{
@@ -316,8 +315,8 @@ fail_union_operation (struct Operation *op)
316 struct GNUNET_MQ_Envelope *ev; 315 struct GNUNET_MQ_Envelope *ev;
317 struct GNUNET_SET_ResultMessage *msg; 316 struct GNUNET_SET_ResultMessage *msg;
318 317
319 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n"); 318 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
320 319 "union operation failed\n");
321 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 320 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
322 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 321 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
323 msg->request_id = htonl (op->spec->client_request_id); 322 msg->request_id = htonl (op->spec->client_request_id);
@@ -357,8 +356,7 @@ get_ibf_key (const struct GNUNET_HashCode *src,
357 * @param cls closure 356 * @param cls closure
358 * @param key current key code 357 * @param key current key code
359 * @param value value in the hash map 358 * @param value value in the hash map
360 * @return #GNUNET_YES if we should continue to 359 * @return #GNUNET_YES if we should continue to iterate,
361 * iterate,
362 * #GNUNET_NO if not. 360 * #GNUNET_NO if not.
363 */ 361 */
364static int 362static int
@@ -390,9 +388,7 @@ op_register_element_iterator (void *cls,
390 * @param cls closure 388 * @param cls closure
391 * @param key current key code 389 * @param key current key code
392 * @param value value in the hash map 390 * @param value value in the hash map
393 * @return #GNUNET_YES if we should continue to 391 * @return #GNUNET_YES (we should continue to iterate)
394 * iterate,
395 * #GNUNET_NO if not.
396 */ 392 */
397static int 393static int
398op_has_element_iterator (void *cls, 394op_has_element_iterator (void *cls,
@@ -405,7 +401,8 @@ op_has_element_iterator (void *cls,
405 GNUNET_assert (NULL != k); 401 GNUNET_assert (NULL != k);
406 while (NULL != k) 402 while (NULL != k)
407 { 403 {
408 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash)) 404 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
405 element_hash))
409 return GNUNET_NO; 406 return GNUNET_NO;
410 k = k->next_colliding; 407 k = k->next_colliding;
411 } 408 }
@@ -506,12 +503,11 @@ prepare_ibf_iterator (void *cls,
506 * Iterator for initializing the 503 * Iterator for initializing the
507 * key-to-element mapping of a union operation 504 * key-to-element mapping of a union operation
508 * 505 *
509 * @param cls the union operation 506 * @param cls the union operation `struct Operation *`
510 * @param key unised 507 * @param key unused
511 * @param value the element entry to insert 508 * @param value the `struct ElementEntry *` to insert
512 * into the key-to-element mapping 509 * into the key-to-element mapping
513 * @return GNUNET_YES to continue iterating, 510 * @return #GNUNET_YES (to continue iterating)
514 * GNUNET_NO to stop
515 */ 511 */
516static int 512static int
517init_key_to_element_iterator (void *cls, 513init_key_to_element_iterator (void *cls,
@@ -543,7 +539,8 @@ init_key_to_element_iterator (void *cls,
543 * @param size size of the ibf to create 539 * @param size size of the ibf to create
544 */ 540 */
545static void 541static void
546prepare_ibf (struct Operation *op, uint16_t size) 542prepare_ibf (struct Operation *op,
543 uint16_t size)
547{ 544{
548 if (NULL == op->state->key_to_element) 545 if (NULL == op->state->key_to_element)
549 { 546 {
@@ -557,7 +554,8 @@ prepare_ibf (struct Operation *op, uint16_t size)
557 ibf_destroy (op->state->local_ibf); 554 ibf_destroy (op->state->local_ibf);
558 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 555 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
559 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 556 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
560 prepare_ibf_iterator, op->state->local_ibf); 557 &prepare_ibf_iterator,
558 op->state->local_ibf);
561} 559}
562 560
563 561
@@ -568,7 +566,8 @@ prepare_ibf (struct Operation *op, uint16_t size)
568 * @param ibf_order order of the ibf to send, size=2^order 566 * @param ibf_order order of the ibf to send, size=2^order
569 */ 567 */
570static void 568static void
571send_ibf (struct Operation *op, uint16_t ibf_order) 569send_ibf (struct Operation *op,
570 uint16_t ibf_order)
572{ 571{
573 unsigned int buckets_sent = 0; 572 unsigned int buckets_sent = 0;
574 struct InvertibleBloomFilter *ibf; 573 struct InvertibleBloomFilter *ibf;
@@ -647,7 +646,8 @@ get_order_from_difference (unsigned int diff)
647 unsigned int ibf_order; 646 unsigned int ibf_order;
648 647
649 ibf_order = 2; 648 ibf_order = 2;
650 while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM) 649 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
650 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
651 ibf_order++; 651 ibf_order++;
652 if (ibf_order > MAX_IBF_ORDER) 652 if (ibf_order > MAX_IBF_ORDER)
653 ibf_order = MAX_IBF_ORDER; 653 ibf_order = MAX_IBF_ORDER;
@@ -662,7 +662,8 @@ get_order_from_difference (unsigned int diff)
662 * @param mh the message 662 * @param mh the message
663 */ 663 */
664static void 664static void
665handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) 665handle_p2p_strata_estimator (void *cls,
666 const struct GNUNET_MessageHeader *mh)
666{ 667{
667 struct Operation *op = cls; 668 struct Operation *op = cls;
668 struct StrataEstimator *remote_se; 669 struct StrataEstimator *remote_se;
@@ -843,7 +844,7 @@ decode_and_send (struct Operation *op)
843 844
844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845 "transmitted all values, sending DONE\n"); 846 "transmitted all values, sending DONE\n");
846 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 847 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
847 GNUNET_MQ_send (op->mq, ev); 848 GNUNET_MQ_send (op->mq, ev);
848 break; 849 break;
849 } 850 }
@@ -1201,7 +1202,7 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1202 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1202 "got DONE, sending final DONE after elements\n"); 1203 "got DONE, sending final DONE after elements\n");
1203 op->state->phase = PHASE_FINISHED; 1204 op->state->phase = PHASE_FINISHED;
1204 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1205 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1205 GNUNET_MQ_send (op->mq, ev); 1206 GNUNET_MQ_send (op->mq, ev);
1206 return; 1207 return;
1207 } 1208 }
@@ -1251,7 +1252,6 @@ union_evaluate (struct Operation *op,
1251 } 1252 }
1252 msg->operation = htonl (GNUNET_SET_OPERATION_UNION); 1253 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1253 msg->app_id = op->spec->app_id; 1254 msg->app_id = op->spec->app_id;
1254 msg->salt = htonl (op->spec->salt);
1255 GNUNET_MQ_send (op->mq, ev); 1255 GNUNET_MQ_send (op->mq, ev);
1256 1256
1257 if (NULL != opaque_context) 1257 if (NULL != opaque_context)
@@ -1379,7 +1379,7 @@ union_handle_p2p_message (struct Operation *op,
1379 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: 1379 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1380 handle_p2p_element_requests (op, mh); 1380 handle_p2p_element_requests (op, mh);
1381 break; 1381 break;
1382 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: 1382 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1383 handle_p2p_done (op, mh); 1383 handle_p2p_done (op, mh);
1384 break; 1384 break;
1385 default: 1385 default: