aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_intersection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_intersection.c')
-rw-r--r--src/set/gnunet-service-set_intersection.c1244
1 files changed, 627 insertions, 617 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 964a26b91..1ae4dcdac 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -36,7 +36,8 @@
36/** 36/**
37 * Current phase we are in for a intersection operation. 37 * Current phase we are in for a intersection operation.
38 */ 38 */
39enum IntersectionOperationPhase { 39enum IntersectionOperationPhase
40{
40 /** 41 /**
41 * We are just starting. 42 * We are just starting.
42 */ 43 */
@@ -78,7 +79,8 @@ enum IntersectionOperationPhase {
78/** 79/**
79 * State of an evaluate operation with another peer. 80 * State of an evaluate operation with another peer.
80 */ 81 */
81struct OperationState { 82struct OperationState
83{
82 /** 84 /**
83 * The bf we currently receive 85 * The bf we currently receive
84 */ 86 */
@@ -186,7 +188,8 @@ struct OperationState {
186 * Extra state required for efficient set intersection. 188 * Extra state required for efficient set intersection.
187 * Merely tracks the total number of elements. 189 * Merely tracks the total number of elements.
188 */ 190 */
189struct SetState { 191struct SetState
192{
190 /** 193 /**
191 * Number of currently valid elements in the set which have not been 194 * Number of currently valid elements in the set which have not been
192 * removed. 195 * removed.
@@ -203,38 +206,38 @@ struct SetState {
203 * @param element element to send 206 * @param element element to send
204 */ 207 */
205static void 208static void
206send_client_removed_element(struct Operation *op, 209send_client_removed_element (struct Operation *op,
207 struct GNUNET_SET_Element *element) 210 struct GNUNET_SET_Element *element)
208{ 211{
209 struct GNUNET_MQ_Envelope *ev; 212 struct GNUNET_MQ_Envelope *ev;
210 struct GNUNET_SET_ResultMessage *rm; 213 struct GNUNET_SET_ResultMessage *rm;
211 214
212 if (GNUNET_SET_RESULT_REMOVED != op->result_mode) 215 if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
213 return; /* Wrong mode for transmitting removed elements */ 216 return; /* Wrong mode for transmitting removed elements */
214 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
215 "Sending removed element (size %u) to client\n", 218 "Sending removed element (size %u) to client\n",
216 element->size); 219 element->size);
217 GNUNET_STATISTICS_update(_GSS_statistics, 220 GNUNET_STATISTICS_update (_GSS_statistics,
218 "# Element removed messages sent", 221 "# Element removed messages sent",
219 1, 222 1,
220 GNUNET_NO); 223 GNUNET_NO);
221 GNUNET_assert(0 != op->client_request_id); 224 GNUNET_assert (0 != op->client_request_id);
222 ev = GNUNET_MQ_msg_extra(rm, 225 ev = GNUNET_MQ_msg_extra (rm,
223 element->size, 226 element->size,
224 GNUNET_MESSAGE_TYPE_SET_RESULT); 227 GNUNET_MESSAGE_TYPE_SET_RESULT);
225 if (NULL == ev) 228 if (NULL == ev)
226 { 229 {
227 GNUNET_break(0); 230 GNUNET_break (0);
228 return; 231 return;
229 } 232 }
230 rm->result_status = htons(GNUNET_SET_STATUS_OK); 233 rm->result_status = htons (GNUNET_SET_STATUS_OK);
231 rm->request_id = htonl(op->client_request_id); 234 rm->request_id = htonl (op->client_request_id);
232 rm->element_type = element->element_type; 235 rm->element_type = element->element_type;
233 GNUNET_memcpy(&rm[1], 236 GNUNET_memcpy (&rm[1],
234 element->data, 237 element->data,
235 element->size); 238 element->size);
236 GNUNET_MQ_send(op->set->cs->mq, 239 GNUNET_MQ_send (op->set->cs->mq,
237 ev); 240 ev);
238} 241}
239 242
240 243
@@ -247,63 +250,63 @@ send_client_removed_element(struct Operation *op,
247 * @return #GNUNET_YES (we should continue to iterate) 250 * @return #GNUNET_YES (we should continue to iterate)
248 */ 251 */
249static int 252static int
250filtered_map_initialization(void *cls, 253filtered_map_initialization (void *cls,
251 const struct GNUNET_HashCode *key, 254 const struct GNUNET_HashCode *key,
252 void *value) 255 void *value)
253{ 256{
254 struct Operation *op = cls; 257 struct Operation *op = cls;
255 struct ElementEntry *ee = value; 258 struct ElementEntry *ee = value;
256 struct GNUNET_HashCode mutated_hash; 259 struct GNUNET_HashCode mutated_hash;
257 260
258 261
259 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 262 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
260 "FIMA called for %s:%u\n", 263 "FIMA called for %s:%u\n",
261 GNUNET_h2s(&ee->element_hash), 264 GNUNET_h2s (&ee->element_hash),
262 ee->element.size); 265 ee->element.size);
263 266
264 if (GNUNET_NO == _GSS_is_element_of_operation(ee, op)) 267 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
265 { 268 {
266 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 269 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267 "Reduced initialization, not starting with %s:%u (wrong generation)\n", 270 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
268 GNUNET_h2s(&ee->element_hash), 271 GNUNET_h2s (&ee->element_hash),
269 ee->element.size); 272 ee->element.size);
270 return GNUNET_YES; /* element not valid in our operation's generation */ 273 return GNUNET_YES; /* element not valid in our operation's generation */
271 } 274 }
272 275
273 /* Test if element is in other peer's bloomfilter */ 276 /* Test if element is in other peer's bloomfilter */
274 GNUNET_BLOCK_mingle_hash(&ee->element_hash, 277 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
275 op->state->salt, 278 op->state->salt,
276 &mutated_hash); 279 &mutated_hash);
277 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
278 "Testing mingled hash %s with salt %u\n", 281 "Testing mingled hash %s with salt %u\n",
279 GNUNET_h2s(&mutated_hash), 282 GNUNET_h2s (&mutated_hash),
280 op->state->salt); 283 op->state->salt);
281 if (GNUNET_NO == 284 if (GNUNET_NO ==
282 GNUNET_CONTAINER_bloomfilter_test(op->state->remote_bf, 285 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
283 &mutated_hash)) 286 &mutated_hash))
284 { 287 {
285 /* remove this element */ 288 /* remove this element */
286 send_client_removed_element(op, 289 send_client_removed_element (op,
287 &ee->element); 290 &ee->element);
288 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 291 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289 "Reduced initialization, not starting with %s:%u\n", 292 "Reduced initialization, not starting with %s:%u\n",
290 GNUNET_h2s(&ee->element_hash), 293 GNUNET_h2s (&ee->element_hash),
291 ee->element.size); 294 ee->element.size);
292 return GNUNET_YES; 295 return GNUNET_YES;
293 } 296 }
294 op->state->my_element_count++; 297 op->state->my_element_count++;
295 GNUNET_CRYPTO_hash_xor(&op->state->my_xor, 298 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
296 &ee->element_hash, 299 &ee->element_hash,
297 &op->state->my_xor); 300 &op->state->my_xor);
298 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
299 "Filtered initialization of my_elements, adding %s:%u\n", 302 "Filtered initialization of my_elements, adding %s:%u\n",
300 GNUNET_h2s(&ee->element_hash), 303 GNUNET_h2s (&ee->element_hash),
301 ee->element.size); 304 ee->element.size);
302 GNUNET_break(GNUNET_YES == 305 GNUNET_break (GNUNET_YES ==
303 GNUNET_CONTAINER_multihashmap_put(op->state->my_elements, 306 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
304 &ee->element_hash, 307 &ee->element_hash,
305 ee, 308 ee,
306 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 309 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
307 310
308 return GNUNET_YES; 311 return GNUNET_YES;
309} 312}
@@ -319,48 +322,48 @@ filtered_map_initialization(void *cls,
319 * @return #GNUNET_YES (we should continue to iterate) 322 * @return #GNUNET_YES (we should continue to iterate)
320 */ 323 */
321static int 324static int
322iterator_bf_reduce(void *cls, 325iterator_bf_reduce (void *cls,
323 const struct GNUNET_HashCode *key, 326 const struct GNUNET_HashCode *key,
324 void *value) 327 void *value)
325{ 328{
326 struct Operation *op = cls; 329 struct Operation *op = cls;
327 struct ElementEntry *ee = value; 330 struct ElementEntry *ee = value;
328 struct GNUNET_HashCode mutated_hash; 331 struct GNUNET_HashCode mutated_hash;
329 332
330 GNUNET_BLOCK_mingle_hash(&ee->element_hash, 333 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
331 op->state->salt, 334 op->state->salt,
332 &mutated_hash); 335 &mutated_hash);
333 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
334 "Testing mingled hash %s with salt %u\n", 337 "Testing mingled hash %s with salt %u\n",
335 GNUNET_h2s(&mutated_hash), 338 GNUNET_h2s (&mutated_hash),
336 op->state->salt); 339 op->state->salt);
337 if (GNUNET_NO == 340 if (GNUNET_NO ==
338 GNUNET_CONTAINER_bloomfilter_test(op->state->remote_bf, 341 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
339 &mutated_hash)) 342 &mutated_hash))
340 { 343 {
341 GNUNET_break(0 < op->state->my_element_count); 344 GNUNET_break (0 < op->state->my_element_count);
342 op->state->my_element_count--; 345 op->state->my_element_count--;
343 GNUNET_CRYPTO_hash_xor(&op->state->my_xor, 346 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
344 &ee->element_hash, 347 &ee->element_hash,
345 &op->state->my_xor); 348 &op->state->my_xor);
346 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
347 "Bloom filter reduction of my_elements, removing %s:%u\n", 350 "Bloom filter reduction of my_elements, removing %s:%u\n",
348 GNUNET_h2s(&ee->element_hash), 351 GNUNET_h2s (&ee->element_hash),
349 ee->element.size); 352 ee->element.size);
350 GNUNET_assert(GNUNET_YES == 353 GNUNET_assert (GNUNET_YES ==
351 GNUNET_CONTAINER_multihashmap_remove(op->state->my_elements, 354 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
352 &ee->element_hash, 355 &ee->element_hash,
353 ee)); 356 ee));
354 send_client_removed_element(op, 357 send_client_removed_element (op,
355 &ee->element); 358 &ee->element);
356 } 359 }
357 else 360 else
358 { 361 {
359 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360 "Bloom filter reduction of my_elements, keeping %s:%u\n", 363 "Bloom filter reduction of my_elements, keeping %s:%u\n",
361 GNUNET_h2s(&ee->element_hash), 364 GNUNET_h2s (&ee->element_hash),
362 ee->element.size); 365 ee->element.size);
363 } 366 }
364 return GNUNET_YES; 367 return GNUNET_YES;
365} 368}
366 369
@@ -374,23 +377,23 @@ iterator_bf_reduce(void *cls,
374 * @return #GNUNET_YES (we should continue to iterate) 377 * @return #GNUNET_YES (we should continue to iterate)
375 */ 378 */
376static int 379static int
377iterator_bf_create(void *cls, 380iterator_bf_create (void *cls,
378 const struct GNUNET_HashCode *key, 381 const struct GNUNET_HashCode *key,
379 void *value) 382 void *value)
380{ 383{
381 struct Operation *op = cls; 384 struct Operation *op = cls;
382 struct ElementEntry *ee = value; 385 struct ElementEntry *ee = value;
383 struct GNUNET_HashCode mutated_hash; 386 struct GNUNET_HashCode mutated_hash;
384 387
385 GNUNET_BLOCK_mingle_hash(&ee->element_hash, 388 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
386 op->state->salt, 389 op->state->salt,
387 &mutated_hash); 390 &mutated_hash);
388 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
389 "Initializing BF with hash %s with salt %u\n", 392 "Initializing BF with hash %s with salt %u\n",
390 GNUNET_h2s(&mutated_hash), 393 GNUNET_h2s (&mutated_hash),
391 op->state->salt); 394 op->state->salt);
392 GNUNET_CONTAINER_bloomfilter_add(op->state->local_bf, 395 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
393 &mutated_hash); 396 &mutated_hash);
394 return GNUNET_YES; 397 return GNUNET_YES;
395} 398}
396 399
@@ -402,31 +405,31 @@ iterator_bf_create(void *cls,
402 * @param op the intersection operation to fail 405 * @param op the intersection operation to fail
403 */ 406 */
404static void 407static void
405fail_intersection_operation(struct Operation *op) 408fail_intersection_operation (struct Operation *op)
406{ 409{
407 struct GNUNET_MQ_Envelope *ev; 410 struct GNUNET_MQ_Envelope *ev;
408 struct GNUNET_SET_ResultMessage *msg; 411 struct GNUNET_SET_ResultMessage *msg;
409 412
410 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 413 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
411 "Intersection operation failed\n"); 414 "Intersection operation failed\n");
412 GNUNET_STATISTICS_update(_GSS_statistics, 415 GNUNET_STATISTICS_update (_GSS_statistics,
413 "# Intersection operations failed", 416 "# Intersection operations failed",
414 1, 417 1,
415 GNUNET_NO); 418 GNUNET_NO);
416 if (NULL != op->state->my_elements) 419 if (NULL != op->state->my_elements)
417 { 420 {
418 GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements); 421 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
419 op->state->my_elements = NULL; 422 op->state->my_elements = NULL;
420 } 423 }
421 ev = GNUNET_MQ_msg(msg, 424 ev = GNUNET_MQ_msg (msg,
422 GNUNET_MESSAGE_TYPE_SET_RESULT); 425 GNUNET_MESSAGE_TYPE_SET_RESULT);
423 msg->result_status = htons(GNUNET_SET_STATUS_FAILURE); 426 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
424 msg->request_id = htonl(op->client_request_id); 427 msg->request_id = htonl (op->client_request_id);
425 msg->element_type = htons(0); 428 msg->element_type = htons (0);
426 GNUNET_MQ_send(op->set->cs->mq, 429 GNUNET_MQ_send (op->set->cs->mq,
427 ev); 430 ev);
428 _GSS_operation_destroy(op, 431 _GSS_operation_destroy (op,
429 GNUNET_YES); 432 GNUNET_YES);
430} 433}
431 434
432 435
@@ -437,7 +440,7 @@ fail_intersection_operation(struct Operation *op)
437 * @param op intersection operation 440 * @param op intersection operation
438 */ 441 */
439static void 442static void
440send_bloomfilter(struct Operation *op) 443send_bloomfilter (struct Operation *op)
441{ 444{
442 struct GNUNET_MQ_Envelope *ev; 445 struct GNUNET_MQ_Envelope *ev;
443 struct BFMessage *msg; 446 struct BFMessage *msg;
@@ -451,81 +454,83 @@ send_bloomfilter(struct Operation *op)
451 the number of bits per element, as the smaller set 454 the number of bits per element, as the smaller set
452 should use more bits to maximize its set reduction 455 should use more bits to maximize its set reduction
453 potential and minimize overall bandwidth consumption. */ 456 potential and minimize overall bandwidth consumption. */
454 bf_elementbits = 2 + ceil(log2((double) 457 bf_elementbits = 2 + ceil (log2 ((double)
455 (op->remote_element_count / 458 (op->remote_element_count
456 (double)op->state->my_element_count))); 459 / (double) op->state->my_element_count)));
457 if (bf_elementbits < 1) 460 if (bf_elementbits < 1)
458 bf_elementbits = 1; /* make sure k is not 0 */ 461 bf_elementbits = 1; /* make sure k is not 0 */
459 /* optimize BF-size to ~50% of bits set */ 462 /* optimize BF-size to ~50% of bits set */
460 bf_size = ceil((double)(op->state->my_element_count 463 bf_size = ceil ((double) (op->state->my_element_count
461 * bf_elementbits / log(2))); 464 * bf_elementbits / log (2)));
462 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
463 "Sending Bloom filter (%u) of size %u bytes\n", 466 "Sending Bloom filter (%u) of size %u bytes\n",
464 (unsigned int)bf_elementbits, 467 (unsigned int) bf_elementbits,
465 (unsigned int)bf_size); 468 (unsigned int) bf_size);
466 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init(NULL, 469 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
467 bf_size, 470 bf_size,
468 bf_elementbits); 471 bf_elementbits);
469 op->state->salt = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_NONCE, 472 op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
470 UINT32_MAX); 473 UINT32_MAX);
471 GNUNET_CONTAINER_multihashmap_iterate(op->state->my_elements, 474 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
472 &iterator_bf_create, 475 &iterator_bf_create,
473 op); 476 op);
474 477
475 /* send our Bloom filter */ 478 /* send our Bloom filter */
476 GNUNET_STATISTICS_update(_GSS_statistics, 479 GNUNET_STATISTICS_update (_GSS_statistics,
477 "# Intersection Bloom filters sent", 480 "# Intersection Bloom filters sent",
478 1, 481 1,
479 GNUNET_NO); 482 GNUNET_NO);
480 chunk_size = 60 * 1024 - sizeof(struct BFMessage); 483 chunk_size = 60 * 1024 - sizeof(struct BFMessage);
481 if (bf_size <= chunk_size) 484 if (bf_size <= chunk_size)
482 { 485 {
483 /* singlepart */ 486 /* singlepart */
484 chunk_size = bf_size; 487 chunk_size = bf_size;
485 ev = GNUNET_MQ_msg_extra(msg, 488 ev = GNUNET_MQ_msg_extra (msg,
486 chunk_size, 489 chunk_size,
487 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); 490 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
488 GNUNET_assert(GNUNET_SYSERR != 491 GNUNET_assert (GNUNET_SYSERR !=
489 GNUNET_CONTAINER_bloomfilter_get_raw_data(op->state->local_bf, 492 GNUNET_CONTAINER_bloomfilter_get_raw_data (
490 (char*)&msg[1], 493 op->state->local_bf,
491 bf_size)); 494 (char*) &msg[1],
492 msg->sender_element_count = htonl(op->state->my_element_count); 495 bf_size));
493 msg->bloomfilter_total_length = htonl(bf_size); 496 msg->sender_element_count = htonl (op->state->my_element_count);
494 msg->bits_per_element = htonl(bf_elementbits); 497 msg->bloomfilter_total_length = htonl (bf_size);
495 msg->sender_mutator = htonl(op->state->salt); 498 msg->bits_per_element = htonl (bf_elementbits);
496 msg->element_xor_hash = op->state->my_xor; 499 msg->sender_mutator = htonl (op->state->salt);
497 GNUNET_MQ_send(op->mq, ev); 500 msg->element_xor_hash = op->state->my_xor;
498 } 501 GNUNET_MQ_send (op->mq, ev);
502 }
499 else 503 else
504 {
505 /* multipart */
506 bf_data = GNUNET_malloc (bf_size);
507 GNUNET_assert (GNUNET_SYSERR !=
508 GNUNET_CONTAINER_bloomfilter_get_raw_data (
509 op->state->local_bf,
510 bf_data,
511 bf_size));
512 offset = 0;
513 while (offset < bf_size)
500 { 514 {
501 /* multipart */ 515 if (bf_size - chunk_size < offset)
502 bf_data = GNUNET_malloc(bf_size); 516 chunk_size = bf_size - offset;
503 GNUNET_assert(GNUNET_SYSERR != 517 ev = GNUNET_MQ_msg_extra (msg,
504 GNUNET_CONTAINER_bloomfilter_get_raw_data(op->state->local_bf, 518 chunk_size,
505 bf_data, 519 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
506 bf_size)); 520 GNUNET_memcpy (&msg[1],
507 offset = 0; 521 &bf_data[offset],
508 while (offset < bf_size) 522 chunk_size);
509 { 523 offset += chunk_size;
510 if (bf_size - chunk_size < offset) 524 msg->sender_element_count = htonl (op->state->my_element_count);
511 chunk_size = bf_size - offset; 525 msg->bloomfilter_total_length = htonl (bf_size);
512 ev = GNUNET_MQ_msg_extra(msg, 526 msg->bits_per_element = htonl (bf_elementbits);
513 chunk_size, 527 msg->sender_mutator = htonl (op->state->salt);
514 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); 528 msg->element_xor_hash = op->state->my_xor;
515 GNUNET_memcpy(&msg[1], 529 GNUNET_MQ_send (op->mq, ev);
516 &bf_data[offset],
517 chunk_size);
518 offset += chunk_size;
519 msg->sender_element_count = htonl(op->state->my_element_count);
520 msg->bloomfilter_total_length = htonl(bf_size);
521 msg->bits_per_element = htonl(bf_elementbits);
522 msg->sender_mutator = htonl(op->state->salt);
523 msg->element_xor_hash = op->state->my_xor;
524 GNUNET_MQ_send(op->mq, ev);
525 }
526 GNUNET_free(bf_data);
527 } 530 }
528 GNUNET_CONTAINER_bloomfilter_free(op->state->local_bf); 531 GNUNET_free (bf_data);
532 }
533 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
529 op->state->local_bf = NULL; 534 op->state->local_bf = NULL;
530} 535}
531 536
@@ -537,27 +542,27 @@ send_bloomfilter(struct Operation *op)
537 * @param cls operation to destroy 542 * @param cls operation to destroy
538 */ 543 */
539static void 544static void
540send_client_done_and_destroy(void *cls) 545send_client_done_and_destroy (void *cls)
541{ 546{
542 struct Operation *op = cls; 547 struct Operation *op = cls;
543 struct GNUNET_MQ_Envelope *ev; 548 struct GNUNET_MQ_Envelope *ev;
544 struct GNUNET_SET_ResultMessage *rm; 549 struct GNUNET_SET_ResultMessage *rm;
545 550
546 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 551 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
547 "Intersection succeeded, sending DONE to local client\n"); 552 "Intersection succeeded, sending DONE to local client\n");
548 GNUNET_STATISTICS_update(_GSS_statistics, 553 GNUNET_STATISTICS_update (_GSS_statistics,
549 "# Intersection operations succeeded", 554 "# Intersection operations succeeded",
550 1, 555 1,
551 GNUNET_NO); 556 GNUNET_NO);
552 ev = GNUNET_MQ_msg(rm, 557 ev = GNUNET_MQ_msg (rm,
553 GNUNET_MESSAGE_TYPE_SET_RESULT); 558 GNUNET_MESSAGE_TYPE_SET_RESULT);
554 rm->request_id = htonl(op->client_request_id); 559 rm->request_id = htonl (op->client_request_id);
555 rm->result_status = htons(GNUNET_SET_STATUS_DONE); 560 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
556 rm->element_type = htons(0); 561 rm->element_type = htons (0);
557 GNUNET_MQ_send(op->set->cs->mq, 562 GNUNET_MQ_send (op->set->cs->mq,
558 ev); 563 ev);
559 _GSS_operation_destroy(op, 564 _GSS_operation_destroy (op,
560 GNUNET_YES); 565 GNUNET_YES);
561} 566}
562 567
563 568
@@ -570,12 +575,12 @@ send_client_done_and_destroy(void *cls)
570 * @param cls the `struct Operation`. 575 * @param cls the `struct Operation`.
571 */ 576 */
572static void 577static void
573finished_local_operations(void *cls) 578finished_local_operations (void *cls)
574{ 579{
575 struct Operation *op = cls; 580 struct Operation *op = cls;
576 581
577 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 582 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578 "DONE sent to other peer, now waiting for other end to close the channel\n"); 583 "DONE sent to other peer, now waiting for other end to close the channel\n");
579 op->state->phase = PHASE_FINISHED; 584 op->state->phase = PHASE_FINISHED;
580 op->state->channel_death_expected = GNUNET_YES; 585 op->state->channel_death_expected = GNUNET_YES;
581} 586}
@@ -589,22 +594,22 @@ finished_local_operations(void *cls)
589 * @param op operation to notify for. 594 * @param op operation to notify for.
590 */ 595 */
591static void 596static void
592send_p2p_done(struct Operation *op) 597send_p2p_done (struct Operation *op)
593{ 598{
594 struct GNUNET_MQ_Envelope *ev; 599 struct GNUNET_MQ_Envelope *ev;
595 struct IntersectionDoneMessage *idm; 600 struct IntersectionDoneMessage *idm;
596 601
597 GNUNET_assert(PHASE_MUST_SEND_DONE == op->state->phase); 602 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
598 GNUNET_assert(GNUNET_NO == op->state->channel_death_expected); 603 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
599 ev = GNUNET_MQ_msg(idm, 604 ev = GNUNET_MQ_msg (idm,
600 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); 605 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
601 idm->final_element_count = htonl(op->state->my_element_count); 606 idm->final_element_count = htonl (op->state->my_element_count);
602 idm->element_xor_hash = op->state->my_xor; 607 idm->element_xor_hash = op->state->my_xor;
603 GNUNET_MQ_notify_sent(ev, 608 GNUNET_MQ_notify_sent (ev,
604 &finished_local_operations, 609 &finished_local_operations,
605 op); 610 op);
606 GNUNET_MQ_send(op->mq, 611 GNUNET_MQ_send (op->mq,
607 ev); 612 ev);
608} 613}
609 614
610 615
@@ -614,7 +619,7 @@ send_p2p_done(struct Operation *op)
614 * @param cls the `struct Operation *` 619 * @param cls the `struct Operation *`
615 */ 620 */
616static void 621static void
617send_remaining_elements(void *cls) 622send_remaining_elements (void *cls)
618{ 623{
619 struct Operation *op = cls; 624 struct Operation *op = cls;
620 const void *nxt; 625 const void *nxt;
@@ -624,52 +629,54 @@ send_remaining_elements(void *cls)
624 const struct GNUNET_SET_Element *element; 629 const struct GNUNET_SET_Element *element;
625 int res; 630 int res;
626 631
627 res = GNUNET_CONTAINER_multihashmap_iterator_next(op->state->full_result_iter, 632 res = GNUNET_CONTAINER_multihashmap_iterator_next (
628 NULL, 633 op->state->full_result_iter,
629 &nxt); 634 NULL,
635 &nxt);
630 if (GNUNET_NO == res) 636 if (GNUNET_NO == res)
637 {
638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
639 "Sending done and destroy because iterator ran out\n");
640 GNUNET_CONTAINER_multihashmap_iterator_destroy (
641 op->state->full_result_iter);
642 op->state->full_result_iter = NULL;
643 if (PHASE_DONE_RECEIVED == op->state->phase)
631 { 644 {
632 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 645 op->state->phase = PHASE_FINISHED;
633 "Sending done and destroy because iterator ran out\n"); 646 send_client_done_and_destroy (op);
634 GNUNET_CONTAINER_multihashmap_iterator_destroy(op->state->full_result_iter); 647 }
635 op->state->full_result_iter = NULL; 648 else if (PHASE_MUST_SEND_DONE == op->state->phase)
636 if (PHASE_DONE_RECEIVED == op->state->phase) 649 {
637 { 650 send_p2p_done (op);
638 op->state->phase = PHASE_FINISHED;
639 send_client_done_and_destroy(op);
640 }
641 else if (PHASE_MUST_SEND_DONE == op->state->phase)
642 {
643 send_p2p_done(op);
644 }
645 else
646 {
647 GNUNET_assert(0);
648 }
649 return;
650 } 651 }
652 else
653 {
654 GNUNET_assert (0);
655 }
656 return;
657 }
651 ee = nxt; 658 ee = nxt;
652 element = &ee->element; 659 element = &ee->element;
653 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 660 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654 "Sending element %s:%u to client (full set)\n", 661 "Sending element %s:%u to client (full set)\n",
655 GNUNET_h2s(&ee->element_hash), 662 GNUNET_h2s (&ee->element_hash),
656 element->size); 663 element->size);
657 GNUNET_assert(0 != op->client_request_id); 664 GNUNET_assert (0 != op->client_request_id);
658 ev = GNUNET_MQ_msg_extra(rm, 665 ev = GNUNET_MQ_msg_extra (rm,
659 element->size, 666 element->size,
660 GNUNET_MESSAGE_TYPE_SET_RESULT); 667 GNUNET_MESSAGE_TYPE_SET_RESULT);
661 GNUNET_assert(NULL != ev); 668 GNUNET_assert (NULL != ev);
662 rm->result_status = htons(GNUNET_SET_STATUS_OK); 669 rm->result_status = htons (GNUNET_SET_STATUS_OK);
663 rm->request_id = htonl(op->client_request_id); 670 rm->request_id = htonl (op->client_request_id);
664 rm->element_type = element->element_type; 671 rm->element_type = element->element_type;
665 GNUNET_memcpy(&rm[1], 672 GNUNET_memcpy (&rm[1],
666 element->data, 673 element->data,
667 element->size); 674 element->size);
668 GNUNET_MQ_notify_sent(ev, 675 GNUNET_MQ_notify_sent (ev,
669 &send_remaining_elements, 676 &send_remaining_elements,
670 op); 677 op);
671 GNUNET_MQ_send(op->set->cs->mq, 678 GNUNET_MQ_send (op->set->cs->mq,
672 ev); 679 ev);
673} 680}
674 681
675 682
@@ -683,27 +690,27 @@ send_remaining_elements(void *cls)
683 * @return #GNUNET_YES (we should continue to iterate) 690 * @return #GNUNET_YES (we should continue to iterate)
684 */ 691 */
685static int 692static int
686initialize_map_unfiltered(void *cls, 693initialize_map_unfiltered (void *cls,
687 const struct GNUNET_HashCode *key, 694 const struct GNUNET_HashCode *key,
688 void *value) 695 void *value)
689{ 696{
690 struct ElementEntry *ee = value; 697 struct ElementEntry *ee = value;
691 struct Operation *op = cls; 698 struct Operation *op = cls;
692 699
693 if (GNUNET_NO == _GSS_is_element_of_operation(ee, op)) 700 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
694 return GNUNET_YES; /* element not live in operation's generation */ 701 return GNUNET_YES; /* element not live in operation's generation */
695 GNUNET_CRYPTO_hash_xor(&op->state->my_xor, 702 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
696 &ee->element_hash, 703 &ee->element_hash,
697 &op->state->my_xor); 704 &op->state->my_xor);
698 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 705 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
699 "Initial full initialization of my_elements, adding %s:%u\n", 706 "Initial full initialization of my_elements, adding %s:%u\n",
700 GNUNET_h2s(&ee->element_hash), 707 GNUNET_h2s (&ee->element_hash),
701 ee->element.size); 708 ee->element.size);
702 GNUNET_break(GNUNET_YES == 709 GNUNET_break (GNUNET_YES ==
703 GNUNET_CONTAINER_multihashmap_put(op->state->my_elements, 710 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
704 &ee->element_hash, 711 &ee->element_hash,
705 ee, 712 ee,
706 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 713 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
707 return GNUNET_YES; 714 return GNUNET_YES;
708} 715}
709 716
@@ -715,18 +722,18 @@ initialize_map_unfiltered(void *cls,
715 * @param op intersection operation 722 * @param op intersection operation
716 */ 723 */
717static void 724static void
718send_element_count(struct Operation *op) 725send_element_count (struct Operation *op)
719{ 726{
720 struct GNUNET_MQ_Envelope *ev; 727 struct GNUNET_MQ_Envelope *ev;
721 struct IntersectionElementInfoMessage *msg; 728 struct IntersectionElementInfoMessage *msg;
722 729
723 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 730 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724 "Sending our element count (%u)\n", 731 "Sending our element count (%u)\n",
725 op->state->my_element_count); 732 op->state->my_element_count);
726 ev = GNUNET_MQ_msg(msg, 733 ev = GNUNET_MQ_msg (msg,
727 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); 734 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
728 msg->sender_element_count = htonl(op->state->my_element_count); 735 msg->sender_element_count = htonl (op->state->my_element_count);
729 GNUNET_MQ_send(op->mq, ev); 736 GNUNET_MQ_send (op->mq, ev);
730} 737}
731 738
732 739
@@ -737,13 +744,13 @@ send_element_count(struct Operation *op)
737 * @param op operation to start exchange for 744 * @param op operation to start exchange for
738 */ 745 */
739static void 746static void
740begin_bf_exchange(struct Operation *op) 747begin_bf_exchange (struct Operation *op)
741{ 748{
742 op->state->phase = PHASE_BF_EXCHANGE; 749 op->state->phase = PHASE_BF_EXCHANGE;
743 GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, 750 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
744 &initialize_map_unfiltered, 751 &initialize_map_unfiltered,
745 op); 752 op);
746 send_bloomfilter(op); 753 send_bloomfilter (op);
747} 754}
748 755
749 756
@@ -755,35 +762,36 @@ begin_bf_exchange(struct Operation *op)
755 * @param mh the header of the message 762 * @param mh the header of the message
756 */ 763 */
757void 764void
758handle_intersection_p2p_element_info(void *cls, 765handle_intersection_p2p_element_info (void *cls,
759 const struct IntersectionElementInfoMessage *msg) 766 const struct
767 IntersectionElementInfoMessage *msg)
760{ 768{
761 struct Operation *op = cls; 769 struct Operation *op = cls;
762 770
763 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) 771 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
764 { 772 {
765 GNUNET_break_op(0); 773 GNUNET_break_op (0);
766 fail_intersection_operation(op); 774 fail_intersection_operation (op);
767 return; 775 return;
768 } 776 }
769 op->remote_element_count = ntohl(msg->sender_element_count); 777 op->remote_element_count = ntohl (msg->sender_element_count);
770 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 778 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
771 "Received remote element count (%u), I have %u\n", 779 "Received remote element count (%u), I have %u\n",
772 op->remote_element_count, 780 op->remote_element_count,
773 op->state->my_element_count); 781 op->state->my_element_count);
774 if (((PHASE_INITIAL != op->state->phase) && 782 if (((PHASE_INITIAL != op->state->phase) &&
775 (PHASE_COUNT_SENT != op->state->phase)) || 783 (PHASE_COUNT_SENT != op->state->phase)) ||
776 (op->state->my_element_count > op->remote_element_count) || 784 (op->state->my_element_count > op->remote_element_count) ||
777 (0 == op->state->my_element_count) || 785 (0 == op->state->my_element_count) ||
778 (0 == op->remote_element_count)) 786 (0 == op->remote_element_count))
779 { 787 {
780 GNUNET_break_op(0); 788 GNUNET_break_op (0);
781 fail_intersection_operation(op); 789 fail_intersection_operation (op);
782 return; 790 return;
783 } 791 }
784 GNUNET_break(NULL == op->state->remote_bf); 792 GNUNET_break (NULL == op->state->remote_bf);
785 begin_bf_exchange(op); 793 begin_bf_exchange (op);
786 GNUNET_CADET_receive_done(op->channel); 794 GNUNET_CADET_receive_done (op->channel);
787} 795}
788 796
789 797
@@ -793,81 +801,82 @@ handle_intersection_p2p_element_info(void *cls,
793 * @param op the intersection operation 801 * @param op the intersection operation
794 */ 802 */
795static void 803static void
796process_bf(struct Operation *op) 804process_bf (struct Operation *op)
797{ 805{
798 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", 807 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
800 op->state->phase, 808 op->state->phase,
801 op->remote_element_count, 809 op->remote_element_count,
802 op->state->my_element_count, 810 op->state->my_element_count,
803 GNUNET_CONTAINER_multihashmap_size(op->set->content->elements)); 811 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
804 switch (op->state->phase) 812 switch (op->state->phase)
805 { 813 {
806 case PHASE_INITIAL: 814 case PHASE_INITIAL:
807 GNUNET_break_op(0); 815 GNUNET_break_op (0);
808 fail_intersection_operation(op); 816 fail_intersection_operation (op);
809 return; 817 return;
810 818
811 case PHASE_COUNT_SENT: 819 case PHASE_COUNT_SENT:
812 /* This is the first BF being sent, build our initial map with 820 /* This is the first BF being sent, build our initial map with
813 filtering in place */ 821 filtering in place */
814 op->state->my_element_count = 0; 822 op->state->my_element_count = 0;
815 GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, 823 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
816 &filtered_map_initialization, 824 &filtered_map_initialization,
817 op); 825 op);
818 break; 826 break;
819 827
820 case PHASE_BF_EXCHANGE: 828 case PHASE_BF_EXCHANGE:
821 /* Update our set by reduction */ 829 /* Update our set by reduction */
822 GNUNET_CONTAINER_multihashmap_iterate(op->state->my_elements, 830 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
823 &iterator_bf_reduce, 831 &iterator_bf_reduce,
824 op); 832 op);
825 break; 833 break;
826 834
827 case PHASE_MUST_SEND_DONE: 835 case PHASE_MUST_SEND_DONE:
828 GNUNET_break_op(0); 836 GNUNET_break_op (0);
829 fail_intersection_operation(op); 837 fail_intersection_operation (op);
830 return; 838 return;
831 839
832 case PHASE_DONE_RECEIVED: 840 case PHASE_DONE_RECEIVED:
833 GNUNET_break_op(0); 841 GNUNET_break_op (0);
834 fail_intersection_operation(op); 842 fail_intersection_operation (op);
835 return; 843 return;
836 844
837 case PHASE_FINISHED: 845 case PHASE_FINISHED:
838 GNUNET_break_op(0); 846 GNUNET_break_op (0);
839 fail_intersection_operation(op); 847 fail_intersection_operation (op);
840 return; 848 return;
841 } 849 }
842 GNUNET_CONTAINER_bloomfilter_free(op->state->remote_bf); 850 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
843 op->state->remote_bf = NULL; 851 op->state->remote_bf = NULL;
844 852
845 if ((0 == op->state->my_element_count) || /* fully disjoint */ 853 if ((0 == op->state->my_element_count) || /* fully disjoint */
846 ((op->state->my_element_count == op->remote_element_count) && 854 ((op->state->my_element_count == op->remote_element_count) &&
847 (0 == GNUNET_memcmp(&op->state->my_xor, 855 (0 == GNUNET_memcmp (&op->state->my_xor,
848 &op->state->other_xor)))) 856 &op->state->other_xor))))
857 {
858 /* we are done */
859 op->state->phase = PHASE_MUST_SEND_DONE;
860 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861 "Intersection succeeded, sending DONE to other peer\n");
862 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
863 op->state->local_bf = NULL;
864 if (GNUNET_SET_RESULT_FULL == op->result_mode)
849 { 865 {
850 /* we are done */ 866 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
851 op->state->phase = PHASE_MUST_SEND_DONE; 867 "Sending full result set (%u elements)\n",
852 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 868 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
853 "Intersection succeeded, sending DONE to other peer\n"); 869 op->state->full_result_iter
854 GNUNET_CONTAINER_bloomfilter_free(op->state->local_bf); 870 = GNUNET_CONTAINER_multihashmap_iterator_create (
855 op->state->local_bf = NULL; 871 op->state->my_elements);
856 if (GNUNET_SET_RESULT_FULL == op->result_mode) 872 send_remaining_elements (op);
857 {
858 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
859 "Sending full result set (%u elements)\n",
860 GNUNET_CONTAINER_multihashmap_size(op->state->my_elements));
861 op->state->full_result_iter
862 = GNUNET_CONTAINER_multihashmap_iterator_create(op->state->my_elements);
863 send_remaining_elements(op);
864 return;
865 }
866 send_p2p_done(op);
867 return; 873 return;
868 } 874 }
875 send_p2p_done (op);
876 return;
877 }
869 op->state->phase = PHASE_BF_EXCHANGE; 878 op->state->phase = PHASE_BF_EXCHANGE;
870 send_bloomfilter(op); 879 send_bloomfilter (op);
871} 880}
872 881
873 882
@@ -879,16 +888,16 @@ process_bf(struct Operation *op)
879 * @return #GNUNET_OK if @a msg is well-formed 888 * @return #GNUNET_OK if @a msg is well-formed
880 */ 889 */
881int 890int
882check_intersection_p2p_bf(void *cls, 891check_intersection_p2p_bf (void *cls,
883 const struct BFMessage *msg) 892 const struct BFMessage *msg)
884{ 893{
885 struct Operation *op = cls; 894 struct Operation *op = cls;
886 895
887 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) 896 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
888 { 897 {
889 GNUNET_break_op(0); 898 GNUNET_break_op (0);
890 return GNUNET_SYSERR; 899 return GNUNET_SYSERR;
891 } 900 }
892 return GNUNET_OK; 901 return GNUNET_OK;
893} 902}
894 903
@@ -900,8 +909,8 @@ check_intersection_p2p_bf(void *cls,
900 * @param msg the header of the message 909 * @param msg the header of the message
901 */ 910 */
902void 911void
903handle_intersection_p2p_bf(void *cls, 912handle_intersection_p2p_bf (void *cls,
904 const struct BFMessage *msg) 913 const struct BFMessage *msg)
905{ 914{
906 struct Operation *op = cls; 915 struct Operation *op = cls;
907 uint32_t bf_size; 916 uint32_t bf_size;
@@ -909,85 +918,85 @@ handle_intersection_p2p_bf(void *cls,
909 uint32_t bf_bits_per_element; 918 uint32_t bf_bits_per_element;
910 919
911 switch (op->state->phase) 920 switch (op->state->phase)
921 {
922 case PHASE_INITIAL:
923 GNUNET_break_op (0);
924 fail_intersection_operation (op);
925 return;
926
927 case PHASE_COUNT_SENT:
928 case PHASE_BF_EXCHANGE:
929 bf_size = ntohl (msg->bloomfilter_total_length);
930 bf_bits_per_element = ntohl (msg->bits_per_element);
931 chunk_size = htons (msg->header.size) - sizeof(struct BFMessage);
932 op->state->other_xor = msg->element_xor_hash;
933 if (bf_size == chunk_size)
912 { 934 {
913 case PHASE_INITIAL: 935 if (NULL != op->state->bf_data)
914 GNUNET_break_op(0); 936 {
915 fail_intersection_operation(op); 937 GNUNET_break_op (0);
916 return; 938 fail_intersection_operation (op);
917 939 return;
918 case PHASE_COUNT_SENT: 940 }
919 case PHASE_BF_EXCHANGE: 941 /* single part, done here immediately */
920 bf_size = ntohl(msg->bloomfilter_total_length); 942 op->state->remote_bf
921 bf_bits_per_element = ntohl(msg->bits_per_element); 943 = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
922 chunk_size = htons(msg->header.size) - sizeof(struct BFMessage); 944 bf_size,
923 op->state->other_xor = msg->element_xor_hash; 945 bf_bits_per_element);
924 if (bf_size == chunk_size) 946 op->state->salt = ntohl (msg->sender_mutator);
925 { 947 op->remote_element_count = ntohl (msg->sender_element_count);
926 if (NULL != op->state->bf_data) 948 process_bf (op);
927 {
928 GNUNET_break_op(0);
929 fail_intersection_operation(op);
930 return;
931 }
932 /* single part, done here immediately */
933 op->state->remote_bf
934 = GNUNET_CONTAINER_bloomfilter_init((const char*)&msg[1],
935 bf_size,
936 bf_bits_per_element);
937 op->state->salt = ntohl(msg->sender_mutator);
938 op->remote_element_count = ntohl(msg->sender_element_count);
939 process_bf(op);
940 break;
941 }
942 /* multipart chunk */
943 if (NULL == op->state->bf_data)
944 {
945 /* first chunk, initialize */
946 op->state->bf_data = GNUNET_malloc(bf_size);
947 op->state->bf_data_size = bf_size;
948 op->state->bf_bits_per_element = bf_bits_per_element;
949 op->state->bf_data_offset = 0;
950 op->state->salt = ntohl(msg->sender_mutator);
951 op->remote_element_count = ntohl(msg->sender_element_count);
952 }
953 else
954 {
955 /* increment */
956 if ((op->state->bf_data_size != bf_size) ||
957 (op->state->bf_bits_per_element != bf_bits_per_element) ||
958 (op->state->bf_data_offset + chunk_size > bf_size) ||
959 (op->state->salt != ntohl(msg->sender_mutator)) ||
960 (op->remote_element_count != ntohl(msg->sender_element_count)))
961 {
962 GNUNET_break_op(0);
963 fail_intersection_operation(op);
964 return;
965 }
966 }
967 GNUNET_memcpy(&op->state->bf_data[op->state->bf_data_offset],
968 (const char*)&msg[1],
969 chunk_size);
970 op->state->bf_data_offset += chunk_size;
971 if (op->state->bf_data_offset == bf_size)
972 {
973 /* last chunk, run! */
974 op->state->remote_bf
975 = GNUNET_CONTAINER_bloomfilter_init(op->state->bf_data,
976 bf_size,
977 bf_bits_per_element);
978 GNUNET_free(op->state->bf_data);
979 op->state->bf_data = NULL;
980 op->state->bf_data_size = 0;
981 process_bf(op);
982 }
983 break; 949 break;
984
985 default:
986 GNUNET_break_op(0);
987 fail_intersection_operation(op);
988 return;
989 } 950 }
990 GNUNET_CADET_receive_done(op->channel); 951 /* multipart chunk */
952 if (NULL == op->state->bf_data)
953 {
954 /* first chunk, initialize */
955 op->state->bf_data = GNUNET_malloc (bf_size);
956 op->state->bf_data_size = bf_size;
957 op->state->bf_bits_per_element = bf_bits_per_element;
958 op->state->bf_data_offset = 0;
959 op->state->salt = ntohl (msg->sender_mutator);
960 op->remote_element_count = ntohl (msg->sender_element_count);
961 }
962 else
963 {
964 /* increment */
965 if ((op->state->bf_data_size != bf_size) ||
966 (op->state->bf_bits_per_element != bf_bits_per_element) ||
967 (op->state->bf_data_offset + chunk_size > bf_size) ||
968 (op->state->salt != ntohl (msg->sender_mutator)) ||
969 (op->remote_element_count != ntohl (msg->sender_element_count)))
970 {
971 GNUNET_break_op (0);
972 fail_intersection_operation (op);
973 return;
974 }
975 }
976 GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
977 (const char*) &msg[1],
978 chunk_size);
979 op->state->bf_data_offset += chunk_size;
980 if (op->state->bf_data_offset == bf_size)
981 {
982 /* last chunk, run! */
983 op->state->remote_bf
984 = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
985 bf_size,
986 bf_bits_per_element);
987 GNUNET_free (op->state->bf_data);
988 op->state->bf_data = NULL;
989 op->state->bf_data_size = 0;
990 process_bf (op);
991 }
992 break;
993
994 default:
995 GNUNET_break_op (0);
996 fail_intersection_operation (op);
997 return;
998 }
999 GNUNET_CADET_receive_done (op->channel);
991} 1000}
992 1001
993 1002
@@ -1000,28 +1009,28 @@ handle_intersection_p2p_bf(void *cls,
1000 * @return #GNUNET_YES (we should continue to iterate) 1009 * @return #GNUNET_YES (we should continue to iterate)
1001 */ 1010 */
1002static int 1011static int
1003filter_all(void *cls, 1012filter_all (void *cls,
1004 const struct GNUNET_HashCode *key, 1013 const struct GNUNET_HashCode *key,
1005 void *value) 1014 void *value)
1006{ 1015{
1007 struct Operation *op = cls; 1016 struct Operation *op = cls;
1008 struct ElementEntry *ee = value; 1017 struct ElementEntry *ee = value;
1009 1018
1010 GNUNET_break(0 < op->state->my_element_count); 1019 GNUNET_break (0 < op->state->my_element_count);
1011 op->state->my_element_count--; 1020 op->state->my_element_count--;
1012 GNUNET_CRYPTO_hash_xor(&op->state->my_xor, 1021 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
1013 &ee->element_hash, 1022 &ee->element_hash,
1014 &op->state->my_xor); 1023 &op->state->my_xor);
1015 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1024 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016 "Final reduction of my_elements, removing %s:%u\n", 1025 "Final reduction of my_elements, removing %s:%u\n",
1017 GNUNET_h2s(&ee->element_hash), 1026 GNUNET_h2s (&ee->element_hash),
1018 ee->element.size); 1027 ee->element.size);
1019 GNUNET_assert(GNUNET_YES == 1028 GNUNET_assert (GNUNET_YES ==
1020 GNUNET_CONTAINER_multihashmap_remove(op->state->my_elements, 1029 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
1021 &ee->element_hash, 1030 &ee->element_hash,
1022 ee)); 1031 ee));
1023 send_client_removed_element(op, 1032 send_client_removed_element (op,
1024 &ee->element); 1033 &ee->element);
1025 return GNUNET_YES; 1034 return GNUNET_YES;
1026} 1035}
1027 1036
@@ -1033,61 +1042,61 @@ filter_all(void *cls,
1033 * @param mh the message 1042 * @param mh the message
1034 */ 1043 */
1035void 1044void
1036handle_intersection_p2p_done(void *cls, 1045handle_intersection_p2p_done (void *cls,
1037 const struct IntersectionDoneMessage *idm) 1046 const struct IntersectionDoneMessage *idm)
1038{ 1047{
1039 struct Operation *op = cls; 1048 struct Operation *op = cls;
1040 1049
1041 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) 1050 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1042 { 1051 {
1043 GNUNET_break_op(0); 1052 GNUNET_break_op (0);
1044 fail_intersection_operation(op); 1053 fail_intersection_operation (op);
1045 return; 1054 return;
1046 } 1055 }
1047 if (PHASE_BF_EXCHANGE != op->state->phase) 1056 if (PHASE_BF_EXCHANGE != op->state->phase)
1048 { 1057 {
1049 /* wrong phase to conclude? FIXME: Or should we allow this 1058 /* wrong phase to conclude? FIXME: Or should we allow this
1050 if the other peer has _initially_ already an empty set? */ 1059 if the other peer has _initially_ already an empty set? */
1051 GNUNET_break_op(0); 1060 GNUNET_break_op (0);
1052 fail_intersection_operation(op); 1061 fail_intersection_operation (op);
1053 return; 1062 return;
1054 } 1063 }
1055 if (0 == ntohl(idm->final_element_count)) 1064 if (0 == ntohl (idm->final_element_count))
1056 { 1065 {
1057 /* other peer determined empty set is the intersection, 1066 /* other peer determined empty set is the intersection,
1058 remove all elements */ 1067 remove all elements */
1059 GNUNET_CONTAINER_multihashmap_iterate(op->state->my_elements, 1068 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1060 &filter_all, 1069 &filter_all,
1061 op); 1070 op);
1062 } 1071 }
1063 if ((op->state->my_element_count != ntohl(idm->final_element_count)) || 1072 if ((op->state->my_element_count != ntohl (idm->final_element_count)) ||
1064 (0 != GNUNET_memcmp(&op->state->my_xor, 1073 (0 != GNUNET_memcmp (&op->state->my_xor,
1065 &idm->element_xor_hash))) 1074 &idm->element_xor_hash)))
1066 { 1075 {
1067 /* Other peer thinks we are done, but we disagree on the result! */ 1076 /* Other peer thinks we are done, but we disagree on the result! */
1068 GNUNET_break_op(0); 1077 GNUNET_break_op (0);
1069 fail_intersection_operation(op); 1078 fail_intersection_operation (op);
1070 return; 1079 return;
1071 } 1080 }
1072 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073 "Got IntersectionDoneMessage, have %u elements in intersection\n", 1082 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1074 op->state->my_element_count); 1083 op->state->my_element_count);
1075 op->state->phase = PHASE_DONE_RECEIVED; 1084 op->state->phase = PHASE_DONE_RECEIVED;
1076 GNUNET_CADET_receive_done(op->channel); 1085 GNUNET_CADET_receive_done (op->channel);
1077 1086
1078 GNUNET_assert(GNUNET_NO == op->state->client_done_sent); 1087 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1079 if (GNUNET_SET_RESULT_FULL == op->result_mode) 1088 if (GNUNET_SET_RESULT_FULL == op->result_mode)
1080 { 1089 {
1081 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1090 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082 "Sending full result set to client (%u elements)\n", 1091 "Sending full result set to client (%u elements)\n",
1083 GNUNET_CONTAINER_multihashmap_size(op->state->my_elements)); 1092 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1084 op->state->full_result_iter 1093 op->state->full_result_iter
1085 = GNUNET_CONTAINER_multihashmap_iterator_create(op->state->my_elements); 1094 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1086 send_remaining_elements(op); 1095 send_remaining_elements (op);
1087 return; 1096 return;
1088 } 1097 }
1089 op->state->phase = PHASE_FINISHED; 1098 op->state->phase = PHASE_FINISHED;
1090 send_client_done_and_destroy(op); 1099 send_client_done_and_destroy (op);
1091} 1100}
1092 1101
1093 1102
@@ -1101,43 +1110,43 @@ handle_intersection_p2p_done(void *cls,
1101 * @return operation-specific state to keep in @a op 1110 * @return operation-specific state to keep in @a op
1102 */ 1111 */
1103static struct OperationState * 1112static struct OperationState *
1104intersection_evaluate(struct Operation *op, 1113intersection_evaluate (struct Operation *op,
1105 const struct GNUNET_MessageHeader *opaque_context) 1114 const struct GNUNET_MessageHeader *opaque_context)
1106{ 1115{
1107 struct OperationState *state; 1116 struct OperationState *state;
1108 struct GNUNET_MQ_Envelope *ev; 1117 struct GNUNET_MQ_Envelope *ev;
1109 struct OperationRequestMessage *msg; 1118 struct OperationRequestMessage *msg;
1110 1119
1111 ev = GNUNET_MQ_msg_nested_mh(msg, 1120 ev = GNUNET_MQ_msg_nested_mh (msg,
1112 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1121 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1113 opaque_context); 1122 opaque_context);
1114 if (NULL == ev) 1123 if (NULL == ev)
1115 { 1124 {
1116 /* the context message is too large!? */ 1125 /* the context message is too large!? */
1117 GNUNET_break(0); 1126 GNUNET_break (0);
1118 return NULL; 1127 return NULL;
1119 } 1128 }
1120 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1129 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121 "Initiating intersection operation evaluation\n"); 1130 "Initiating intersection operation evaluation\n");
1122 state = GNUNET_new(struct OperationState); 1131 state = GNUNET_new (struct OperationState);
1123 /* we started the operation, thus we have to send the operation request */ 1132 /* we started the operation, thus we have to send the operation request */
1124 state->phase = PHASE_INITIAL; 1133 state->phase = PHASE_INITIAL;
1125 state->my_element_count = op->set->state->current_set_element_count; 1134 state->my_element_count = op->set->state->current_set_element_count;
1126 state->my_elements 1135 state->my_elements
1127 = GNUNET_CONTAINER_multihashmap_create(state->my_element_count, 1136 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1128 GNUNET_YES); 1137 GNUNET_YES);
1129 1138
1130 msg->operation = htonl(GNUNET_SET_OPERATION_INTERSECTION); 1139 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1131 msg->element_count = htonl(state->my_element_count); 1140 msg->element_count = htonl (state->my_element_count);
1132 GNUNET_MQ_send(op->mq, 1141 GNUNET_MQ_send (op->mq,
1133 ev); 1142 ev);
1134 state->phase = PHASE_COUNT_SENT; 1143 state->phase = PHASE_COUNT_SENT;
1135 if (NULL != opaque_context) 1144 if (NULL != opaque_context)
1136 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1145 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137 "Sent op request with context message\n"); 1146 "Sent op request with context message\n");
1138 else 1147 else
1139 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140 "Sent op request without context message\n"); 1149 "Sent op request without context message\n");
1141 return state; 1150 return state;
1142} 1151}
1143 1152
@@ -1149,31 +1158,31 @@ intersection_evaluate(struct Operation *op,
1149 * @param op operation that will be accepted as an intersection operation 1158 * @param op operation that will be accepted as an intersection operation
1150 */ 1159 */
1151static struct OperationState * 1160static struct OperationState *
1152intersection_accept(struct Operation *op) 1161intersection_accept (struct Operation *op)
1153{ 1162{
1154 struct OperationState *state; 1163 struct OperationState *state;
1155 1164
1156 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1165 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157 "Accepting set intersection operation\n"); 1166 "Accepting set intersection operation\n");
1158 state = GNUNET_new(struct OperationState); 1167 state = GNUNET_new (struct OperationState);
1159 state->phase = PHASE_INITIAL; 1168 state->phase = PHASE_INITIAL;
1160 state->my_element_count 1169 state->my_element_count
1161 = op->set->state->current_set_element_count; 1170 = op->set->state->current_set_element_count;
1162 state->my_elements 1171 state->my_elements
1163 = GNUNET_CONTAINER_multihashmap_create(GNUNET_MIN(state->my_element_count, 1172 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1164 op->remote_element_count), 1173 op->remote_element_count),
1165 GNUNET_YES); 1174 GNUNET_YES);
1166 op->state = state; 1175 op->state = state;
1167 if (op->remote_element_count < state->my_element_count) 1176 if (op->remote_element_count < state->my_element_count)
1168 { 1177 {
1169 /* If the other peer (Alice) has fewer elements than us (Bob), 1178 /* If the other peer (Alice) has fewer elements than us (Bob),
1170 we just send the count as Alice should send the first BF */ 1179 we just send the count as Alice should send the first BF */
1171 send_element_count(op); 1180 send_element_count (op);
1172 state->phase = PHASE_COUNT_SENT; 1181 state->phase = PHASE_COUNT_SENT;
1173 return state; 1182 return state;
1174 } 1183 }
1175 /* We have fewer elements, so we start with the BF */ 1184 /* We have fewer elements, so we start with the BF */
1176 begin_bf_exchange(op); 1185 begin_bf_exchange (op);
1177 return state; 1186 return state;
1178} 1187}
1179 1188
@@ -1185,34 +1194,35 @@ intersection_accept(struct Operation *op)
1185 * @param op intersection operation to destroy 1194 * @param op intersection operation to destroy
1186 */ 1195 */
1187static void 1196static void
1188intersection_op_cancel(struct Operation *op) 1197intersection_op_cancel (struct Operation *op)
1189{ 1198{
1190 /* check if the op was canceled twice */ 1199 /* check if the op was canceled twice */
1191 GNUNET_assert(NULL != op->state); 1200 GNUNET_assert (NULL != op->state);
1192 if (NULL != op->state->remote_bf) 1201 if (NULL != op->state->remote_bf)
1193 { 1202 {
1194 GNUNET_CONTAINER_bloomfilter_free(op->state->remote_bf); 1203 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1195 op->state->remote_bf = NULL; 1204 op->state->remote_bf = NULL;
1196 } 1205 }
1197 if (NULL != op->state->local_bf) 1206 if (NULL != op->state->local_bf)
1198 { 1207 {
1199 GNUNET_CONTAINER_bloomfilter_free(op->state->local_bf); 1208 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1200 op->state->local_bf = NULL; 1209 op->state->local_bf = NULL;
1201 } 1210 }
1202 if (NULL != op->state->my_elements) 1211 if (NULL != op->state->my_elements)
1203 { 1212 {
1204 GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements); 1213 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1205 op->state->my_elements = NULL; 1214 op->state->my_elements = NULL;
1206 } 1215 }
1207 if (NULL != op->state->full_result_iter) 1216 if (NULL != op->state->full_result_iter)
1208 { 1217 {
1209 GNUNET_CONTAINER_multihashmap_iterator_destroy(op->state->full_result_iter); 1218 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1210 op->state->full_result_iter = NULL; 1219 op->state->full_result_iter);
1211 } 1220 op->state->full_result_iter = NULL;
1212 GNUNET_free(op->state); 1221 }
1222 GNUNET_free (op->state);
1213 op->state = NULL; 1223 op->state = NULL;
1214 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215 "Destroying intersection op state done\n"); 1225 "Destroying intersection op state done\n");
1216} 1226}
1217 1227
1218 1228
@@ -1222,13 +1232,13 @@ intersection_op_cancel(struct Operation *op)
1222 * @return the newly created set 1232 * @return the newly created set
1223 */ 1233 */
1224static struct SetState * 1234static struct SetState *
1225intersection_set_create() 1235intersection_set_create ()
1226{ 1236{
1227 struct SetState *set_state; 1237 struct SetState *set_state;
1228 1238
1229 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230 "Intersection set created\n"); 1240 "Intersection set created\n");
1231 set_state = GNUNET_new(struct SetState); 1241 set_state = GNUNET_new (struct SetState);
1232 set_state->current_set_element_count = 0; 1242 set_state->current_set_element_count = 0;
1233 1243
1234 return set_state; 1244 return set_state;
@@ -1242,8 +1252,8 @@ intersection_set_create()
1242 * @param ee the element to add to the set 1252 * @param ee the element to add to the set
1243 */ 1253 */
1244static void 1254static void
1245intersection_add(struct SetState *set_state, 1255intersection_add (struct SetState *set_state,
1246 struct ElementEntry *ee) 1256 struct ElementEntry *ee)
1247{ 1257{
1248 set_state->current_set_element_count++; 1258 set_state->current_set_element_count++;
1249} 1259}
@@ -1255,9 +1265,9 @@ intersection_add(struct SetState *set_state,
1255 * @param set_state the set to destroy 1265 * @param set_state the set to destroy
1256 */ 1266 */
1257static void 1267static void
1258intersection_set_destroy(struct SetState *set_state) 1268intersection_set_destroy (struct SetState *set_state)
1259{ 1269{
1260 GNUNET_free(set_state); 1270 GNUNET_free (set_state);
1261} 1271}
1262 1272
1263 1273
@@ -1268,10 +1278,10 @@ intersection_set_destroy(struct SetState *set_state)
1268 * @param element set element to remove 1278 * @param element set element to remove
1269 */ 1279 */
1270static void 1280static void
1271intersection_remove(struct SetState *set_state, 1281intersection_remove (struct SetState *set_state,
1272 struct ElementEntry *element) 1282 struct ElementEntry *element)
1273{ 1283{
1274 GNUNET_assert(0 < set_state->current_set_element_count); 1284 GNUNET_assert (0 < set_state->current_set_element_count);
1275 set_state->current_set_element_count--; 1285 set_state->current_set_element_count--;
1276} 1286}
1277 1287
@@ -1282,19 +1292,19 @@ intersection_remove(struct SetState *set_state,
1282 * @param op operation that lost the channel 1292 * @param op operation that lost the channel
1283 */ 1293 */
1284static void 1294static void
1285intersection_channel_death(struct Operation *op) 1295intersection_channel_death (struct Operation *op)
1286{ 1296{
1287 if (GNUNET_YES == op->state->channel_death_expected) 1297 if (GNUNET_YES == op->state->channel_death_expected)
1288 { 1298 {
1289 /* oh goodie, we are done! */ 1299 /* oh goodie, we are done! */
1290 send_client_done_and_destroy(op); 1300 send_client_done_and_destroy (op);
1291 } 1301 }
1292 else 1302 else
1293 { 1303 {
1294 /* sorry, channel went down early, too bad. */ 1304 /* sorry, channel went down early, too bad. */
1295 _GSS_operation_destroy(op, 1305 _GSS_operation_destroy (op,
1296 GNUNET_YES); 1306 GNUNET_YES);
1297 } 1307 }
1298} 1308}
1299 1309
1300 1310
@@ -1304,7 +1314,7 @@ intersection_channel_death(struct Operation *op)
1304 * @return the operation specific VTable 1314 * @return the operation specific VTable
1305 */ 1315 */
1306const struct SetVT * 1316const struct SetVT *
1307_GSS_intersection_vt() 1317_GSS_intersection_vt ()
1308{ 1318{
1309 static const struct SetVT intersection_vt = { 1319 static const struct SetVT intersection_vt = {
1310 .create = &intersection_set_create, 1320 .create = &intersection_set_create,