aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c2266
1 files changed, 1145 insertions, 1121 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index fd7bc24d4..ca4ef2092 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -34,7 +34,7 @@
34#include <gcrypt.h> 34#include <gcrypt.h>
35 35
36 36
37#define LOG(kind, ...) GNUNET_log_from(kind, "set-union", __VA_ARGS__) 37#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
38 38
39 39
40/** 40/**
@@ -74,7 +74,8 @@
74/** 74/**
75 * Current phase we are in for a union operation. 75 * Current phase we are in for a union operation.
76 */ 76 */
77enum UnionOperationPhase { 77enum UnionOperationPhase
78{
78 /** 79 /**
79 * We sent the request message, and expect a strata estimator. 80 * We sent the request message, and expect a strata estimator.
80 */ 81 */
@@ -138,7 +139,8 @@ enum UnionOperationPhase {
138/** 139/**
139 * State of an evaluate operation with another peer. 140 * State of an evaluate operation with another peer.
140 */ 141 */
141struct OperationState { 142struct OperationState
143{
142 /** 144 /**
143 * Copy of the set's strata estimator at the time of 145 * Copy of the set's strata estimator at the time of
144 * creation of this operation. 146 * creation of this operation.
@@ -214,7 +216,8 @@ struct OperationState {
214/** 216/**
215 * The key entry is used to associate an ibf key with an element. 217 * The key entry is used to associate an ibf key with an element.
216 */ 218 */
217struct KeyEntry { 219struct KeyEntry
220{
218 /** 221 /**
219 * IBF key for the entry, derived from the current salt. 222 * IBF key for the entry, derived from the current salt.
220 */ 223 */
@@ -242,7 +245,8 @@ struct KeyEntry {
242 * Used as a closure for sending elements 245 * Used as a closure for sending elements
243 * with a specific IBF key. 246 * with a specific IBF key.
244 */ 247 */
245struct SendElementClosure { 248struct SendElementClosure
249{
246 /** 250 /**
247 * The IBF key whose matching elements should be 251 * The IBF key whose matching elements should be
248 * sent. 252 * sent.
@@ -260,7 +264,8 @@ struct SendElementClosure {
260/** 264/**
261 * Extra state required for efficient set union. 265 * Extra state required for efficient set union.
262 */ 266 */
263struct SetState { 267struct SetState
268{
264 /** 269 /**
265 * The strata estimator is only generated once for 270 * The strata estimator is only generated once for
266 * each set. 271 * each set.
@@ -282,19 +287,19 @@ struct SetState {
282 * #GNUNET_NO if not. 287 * #GNUNET_NO if not.
283 */ 288 */
284static int 289static int
285destroy_key_to_element_iter(void *cls, 290destroy_key_to_element_iter (void *cls,
286 uint32_t key, 291 uint32_t key,
287 void *value) 292 void *value)
288{ 293{
289 struct KeyEntry *k = value; 294 struct KeyEntry *k = value;
290 295
291 GNUNET_assert(NULL != k); 296 GNUNET_assert (NULL != k);
292 if (GNUNET_YES == k->element->remote) 297 if (GNUNET_YES == k->element->remote)
293 { 298 {
294 GNUNET_free(k->element); 299 GNUNET_free (k->element);
295 k->element = NULL; 300 k->element = NULL;
296 } 301 }
297 GNUNET_free(k); 302 GNUNET_free (k);
298 return GNUNET_YES; 303 return GNUNET_YES;
299} 304}
300 305
@@ -306,44 +311,44 @@ destroy_key_to_element_iter(void *cls,
306 * @param op union operation to destroy 311 * @param op union operation to destroy
307 */ 312 */
308static void 313static void
309union_op_cancel(struct Operation *op) 314union_op_cancel (struct Operation *op)
310{ 315{
311 LOG(GNUNET_ERROR_TYPE_DEBUG, 316 LOG (GNUNET_ERROR_TYPE_DEBUG,
312 "destroying union op\n"); 317 "destroying union op\n");
313 /* check if the op was canceled twice */ 318 /* check if the op was canceled twice */
314 GNUNET_assert(NULL != op->state); 319 GNUNET_assert (NULL != op->state);
315 if (NULL != op->state->remote_ibf) 320 if (NULL != op->state->remote_ibf)
316 { 321 {
317 ibf_destroy(op->state->remote_ibf); 322 ibf_destroy (op->state->remote_ibf);
318 op->state->remote_ibf = NULL; 323 op->state->remote_ibf = NULL;
319 } 324 }
320 if (NULL != op->state->demanded_hashes) 325 if (NULL != op->state->demanded_hashes)
321 { 326 {
322 GNUNET_CONTAINER_multihashmap_destroy(op->state->demanded_hashes); 327 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
323 op->state->demanded_hashes = NULL; 328 op->state->demanded_hashes = NULL;
324 } 329 }
325 if (NULL != op->state->local_ibf) 330 if (NULL != op->state->local_ibf)
326 { 331 {
327 ibf_destroy(op->state->local_ibf); 332 ibf_destroy (op->state->local_ibf);
328 op->state->local_ibf = NULL; 333 op->state->local_ibf = NULL;
329 } 334 }
330 if (NULL != op->state->se) 335 if (NULL != op->state->se)
331 { 336 {
332 strata_estimator_destroy(op->state->se); 337 strata_estimator_destroy (op->state->se);
333 op->state->se = NULL; 338 op->state->se = NULL;
334 } 339 }
335 if (NULL != op->state->key_to_element) 340 if (NULL != op->state->key_to_element)
336 { 341 {
337 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, 342 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
338 &destroy_key_to_element_iter, 343 &destroy_key_to_element_iter,
339 NULL); 344 NULL);
340 GNUNET_CONTAINER_multihashmap32_destroy(op->state->key_to_element); 345 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
341 op->state->key_to_element = NULL; 346 op->state->key_to_element = NULL;
342 } 347 }
343 GNUNET_free(op->state); 348 GNUNET_free (op->state);
344 op->state = NULL; 349 op->state = NULL;
345 LOG(GNUNET_ERROR_TYPE_DEBUG, 350 LOG (GNUNET_ERROR_TYPE_DEBUG,
346 "destroying union op done\n"); 351 "destroying union op done\n");
347} 352}
348 353
349 354
@@ -354,20 +359,20 @@ union_op_cancel(struct Operation *op)
354 * @param op the union operation to fail 359 * @param op the union operation to fail
355 */ 360 */
356static void 361static void
357fail_union_operation(struct Operation *op) 362fail_union_operation (struct Operation *op)
358{ 363{
359 struct GNUNET_MQ_Envelope *ev; 364 struct GNUNET_MQ_Envelope *ev;
360 struct GNUNET_SET_ResultMessage *msg; 365 struct GNUNET_SET_ResultMessage *msg;
361 366
362 LOG(GNUNET_ERROR_TYPE_WARNING, 367 LOG (GNUNET_ERROR_TYPE_WARNING,
363 "union operation failed\n"); 368 "union operation failed\n");
364 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365 msg->result_status = htons(GNUNET_SET_STATUS_FAILURE); 370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
366 msg->request_id = htonl(op->client_request_id); 371 msg->request_id = htonl (op->client_request_id);
367 msg->element_type = htons(0); 372 msg->element_type = htons (0);
368 GNUNET_MQ_send(op->set->cs->mq, 373 GNUNET_MQ_send (op->set->cs->mq,
369 ev); 374 ev);
370 _GSS_operation_destroy(op, GNUNET_YES); 375 _GSS_operation_destroy (op, GNUNET_YES);
371} 376}
372 377
373 378
@@ -379,16 +384,16 @@ fail_union_operation(struct Operation *op)
379 * @return the derived IBF key 384 * @return the derived IBF key
380 */ 385 */
381static struct IBF_Key 386static struct IBF_Key
382get_ibf_key(const struct GNUNET_HashCode *src) 387get_ibf_key (const struct GNUNET_HashCode *src)
383{ 388{
384 struct IBF_Key key; 389 struct IBF_Key key;
385 uint16_t salt = 0; 390 uint16_t salt = 0;
386 391
387 GNUNET_assert(GNUNET_OK == 392 GNUNET_assert (GNUNET_OK ==
388 GNUNET_CRYPTO_kdf(&key, sizeof(key), 393 GNUNET_CRYPTO_kdf (&key, sizeof(key),
389 src, sizeof *src, 394 src, sizeof *src,
390 &salt, sizeof(salt), 395 &salt, sizeof(salt),
391 NULL, 0)); 396 NULL, 0));
392 return key; 397 return key;
393} 398}
394 399
@@ -396,7 +401,8 @@ get_ibf_key(const struct GNUNET_HashCode *src)
396/** 401/**
397 * Context for #op_get_element_iterator 402 * Context for #op_get_element_iterator
398 */ 403 */
399struct GetElementContext { 404struct GetElementContext
405{
400 /** 406 /**
401 * FIXME. 407 * FIXME.
402 */ 408 */
@@ -420,20 +426,20 @@ struct GetElementContext {
420 * #GNUNET_NO if we've found the element. 426 * #GNUNET_NO if we've found the element.
421 */ 427 */
422static int 428static int
423op_get_element_iterator(void *cls, 429op_get_element_iterator (void *cls,
424 uint32_t key, 430 uint32_t key,
425 void *value) 431 void *value)
426{ 432{
427 struct GetElementContext *ctx = cls; 433 struct GetElementContext *ctx = cls;
428 struct KeyEntry *k = value; 434 struct KeyEntry *k = value;
429 435
430 GNUNET_assert(NULL != k); 436 GNUNET_assert (NULL != k);
431 if (0 == GNUNET_CRYPTO_hash_cmp(&k->element->element_hash, 437 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
432 &ctx->hash)) 438 &ctx->hash))
433 { 439 {
434 ctx->k = k; 440 ctx->k = k;
435 return GNUNET_NO; 441 return GNUNET_NO;
436 } 442 }
437 return GNUNET_YES; 443 return GNUNET_YES;
438} 444}
439 445
@@ -447,8 +453,8 @@ op_get_element_iterator(void *cls,
447 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise 453 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
448 */ 454 */
449static struct KeyEntry * 455static struct KeyEntry *
450op_get_element(struct Operation *op, 456op_get_element (struct Operation *op,
451 const struct GNUNET_HashCode *element_hash) 457 const struct GNUNET_HashCode *element_hash)
452{ 458{
453 int ret; 459 int ret;
454 struct IBF_Key ibf_key; 460 struct IBF_Key ibf_key;
@@ -456,18 +462,18 @@ op_get_element(struct Operation *op,
456 462
457 ctx.hash = *element_hash; 463 ctx.hash = *element_hash;
458 464
459 ibf_key = get_ibf_key(element_hash); 465 ibf_key = get_ibf_key (element_hash);
460 ret = GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element, 466 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
461 (uint32_t)ibf_key.key_val, 467 (uint32_t) ibf_key.key_val,
462 op_get_element_iterator, 468 op_get_element_iterator,
463 &ctx); 469 &ctx);
464 470
465 /* was the iteration aborted because we found the element? */ 471 /* was the iteration aborted because we found the element? */
466 if (GNUNET_SYSERR == ret) 472 if (GNUNET_SYSERR == ret)
467 { 473 {
468 GNUNET_assert(NULL != ctx.k); 474 GNUNET_assert (NULL != ctx.k);
469 return ctx.k; 475 return ctx.k;
470 } 476 }
471 return NULL; 477 return NULL;
472} 478}
473 479
@@ -487,23 +493,23 @@ op_get_element(struct Operation *op,
487 * @parem received was this element received from the remote peer? 493 * @parem received was this element received from the remote peer?
488 */ 494 */
489static void 495static void
490op_register_element(struct Operation *op, 496op_register_element (struct Operation *op,
491 struct ElementEntry *ee, 497 struct ElementEntry *ee,
492 int received) 498 int received)
493{ 499{
494 struct IBF_Key ibf_key; 500 struct IBF_Key ibf_key;
495 struct KeyEntry *k; 501 struct KeyEntry *k;
496 502
497 ibf_key = get_ibf_key(&ee->element_hash); 503 ibf_key = get_ibf_key (&ee->element_hash);
498 k = GNUNET_new(struct KeyEntry); 504 k = GNUNET_new (struct KeyEntry);
499 k->element = ee; 505 k->element = ee;
500 k->ibf_key = ibf_key; 506 k->ibf_key = ibf_key;
501 k->received = received; 507 k->received = received;
502 GNUNET_assert(GNUNET_OK == 508 GNUNET_assert (GNUNET_OK ==
503 GNUNET_CONTAINER_multihashmap32_put(op->state->key_to_element, 509 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
504 (uint32_t)ibf_key.key_val, 510 (uint32_t) ibf_key.key_val,
505 k, 511 k,
506 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 512 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
507} 513}
508 514
509 515
@@ -511,9 +517,9 @@ op_register_element(struct Operation *op,
511 * FIXME. 517 * FIXME.
512 */ 518 */
513static void 519static void
514salt_key(const struct IBF_Key *k_in, 520salt_key (const struct IBF_Key *k_in,
515 uint32_t salt, 521 uint32_t salt,
516 struct IBF_Key *k_out) 522 struct IBF_Key *k_out)
517{ 523{
518 int s = salt % 64; 524 int s = salt % 64;
519 uint64_t x = k_in->key_val; 525 uint64_t x = k_in->key_val;
@@ -528,9 +534,9 @@ salt_key(const struct IBF_Key *k_in,
528 * FIXME. 534 * FIXME.
529 */ 535 */
530static void 536static void
531unsalt_key(const struct IBF_Key *k_in, 537unsalt_key (const struct IBF_Key *k_in,
532 uint32_t salt, 538 uint32_t salt,
533 struct IBF_Key *k_out) 539 struct IBF_Key *k_out)
534{ 540{
535 int s = salt % 64; 541 int s = salt % 64;
536 uint64_t x = k_in->key_val; 542 uint64_t x = k_in->key_val;
@@ -548,23 +554,23 @@ unsalt_key(const struct IBF_Key *k_in,
548 * @param value the key entry to get the key from 554 * @param value the key entry to get the key from
549 */ 555 */
550static int 556static int
551prepare_ibf_iterator(void *cls, 557prepare_ibf_iterator (void *cls,
552 uint32_t key, 558 uint32_t key,
553 void *value) 559 void *value)
554{ 560{
555 struct Operation *op = cls; 561 struct Operation *op = cls;
556 struct KeyEntry *ke = value; 562 struct KeyEntry *ke = value;
557 struct IBF_Key salted_key; 563 struct IBF_Key salted_key;
558 564
559 LOG(GNUNET_ERROR_TYPE_DEBUG, 565 LOG (GNUNET_ERROR_TYPE_DEBUG,
560 "[OP %x] inserting %lx (hash %s) into ibf\n", 566 "[OP %x] inserting %lx (hash %s) into ibf\n",
561 (void *)op, 567 (void *) op,
562 (unsigned long)ke->ibf_key.key_val, 568 (unsigned long) ke->ibf_key.key_val,
563 GNUNET_h2s(&ke->element->element_hash)); 569 GNUNET_h2s (&ke->element->element_hash));
564 salt_key(&ke->ibf_key, 570 salt_key (&ke->ibf_key,
565 op->state->salt_send, 571 op->state->salt_send,
566 &salted_key); 572 &salted_key);
567 ibf_insert(op->state->local_ibf, salted_key); 573 ibf_insert (op->state->local_ibf, salted_key);
568 return GNUNET_YES; 574 return GNUNET_YES;
569} 575}
570 576
@@ -580,9 +586,9 @@ prepare_ibf_iterator(void *cls,
580 * @return #GNUNET_YES (to continue iterating) 586 * @return #GNUNET_YES (to continue iterating)
581 */ 587 */
582static int 588static int
583init_key_to_element_iterator(void *cls, 589init_key_to_element_iterator (void *cls,
584 const struct GNUNET_HashCode *key, 590 const struct GNUNET_HashCode *key,
585 void *value) 591 void *value)
586{ 592{
587 struct Operation *op = cls; 593 struct Operation *op = cls;
588 struct ElementEntry *ee = value; 594 struct ElementEntry *ee = value;
@@ -590,13 +596,13 @@ init_key_to_element_iterator(void *cls,
590 /* make sure that the element belongs to the set at the time 596 /* make sure that the element belongs to the set at the time
591 * of creating the operation */ 597 * of creating the operation */
592 if (GNUNET_NO == 598 if (GNUNET_NO ==
593 _GSS_is_element_of_operation(ee, 599 _GSS_is_element_of_operation (ee,
594 op)) 600 op))
595 return GNUNET_YES; 601 return GNUNET_YES;
596 GNUNET_assert(GNUNET_NO == ee->remote); 602 GNUNET_assert (GNUNET_NO == ee->remote);
597 op_register_element(op, 603 op_register_element (op,
598 ee, 604 ee,
599 GNUNET_NO); 605 GNUNET_NO);
600 return GNUNET_YES; 606 return GNUNET_YES;
601} 607}
602 608
@@ -608,16 +614,16 @@ init_key_to_element_iterator(void *cls,
608 * @param op the set union operation 614 * @param op the set union operation
609 */ 615 */
610static void 616static void
611initialize_key_to_element(struct Operation *op) 617initialize_key_to_element (struct Operation *op)
612{ 618{
613 unsigned int len; 619 unsigned int len;
614 620
615 GNUNET_assert(NULL == op->state->key_to_element); 621 GNUNET_assert (NULL == op->state->key_to_element);
616 len = GNUNET_CONTAINER_multihashmap_size(op->set->content->elements); 622 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
617 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create(len + 1); 623 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
618 GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, 624 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
619 &init_key_to_element_iterator, 625 &init_key_to_element_iterator,
620 op); 626 op);
621} 627}
622 628
623 629
@@ -630,23 +636,23 @@ initialize_key_to_element(struct Operation *op)
630 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure 636 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
631 */ 637 */
632static int 638static int
633prepare_ibf(struct Operation *op, 639prepare_ibf (struct Operation *op,
634 uint32_t size) 640 uint32_t size)
635{ 641{
636 GNUNET_assert(NULL != op->state->key_to_element); 642 GNUNET_assert (NULL != op->state->key_to_element);
637 643
638 if (NULL != op->state->local_ibf) 644 if (NULL != op->state->local_ibf)
639 ibf_destroy(op->state->local_ibf); 645 ibf_destroy (op->state->local_ibf);
640 op->state->local_ibf = ibf_create(size, SE_IBF_HASH_NUM); 646 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
641 if (NULL == op->state->local_ibf) 647 if (NULL == op->state->local_ibf)
642 { 648 {
643 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 649 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
644 "Failed to allocate local IBF\n"); 650 "Failed to allocate local IBF\n");
645 return GNUNET_SYSERR; 651 return GNUNET_SYSERR;
646 } 652 }
647 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, 653 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
648 &prepare_ibf_iterator, 654 &prepare_ibf_iterator,
649 op); 655 op);
650 return GNUNET_OK; 656 return GNUNET_OK;
651} 657}
652 658
@@ -661,60 +667,60 @@ prepare_ibf(struct Operation *op,
661 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure 667 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
662 */ 668 */
663static int 669static int
664send_ibf(struct Operation *op, 670send_ibf (struct Operation *op,
665 uint16_t ibf_order) 671 uint16_t ibf_order)
666{ 672{
667 unsigned int buckets_sent = 0; 673 unsigned int buckets_sent = 0;
668 struct InvertibleBloomFilter *ibf; 674 struct InvertibleBloomFilter *ibf;
669 675
670 if (GNUNET_OK != 676 if (GNUNET_OK !=
671 prepare_ibf(op, 1 << ibf_order)) 677 prepare_ibf (op, 1 << ibf_order))
672 { 678 {
673 /* allocation failed */ 679 /* allocation failed */
674 return GNUNET_SYSERR; 680 return GNUNET_SYSERR;
675 } 681 }
676 682
677 LOG(GNUNET_ERROR_TYPE_DEBUG, 683 LOG (GNUNET_ERROR_TYPE_DEBUG,
678 "sending ibf of size %u\n", 684 "sending ibf of size %u\n",
679 1 << ibf_order); 685 1 << ibf_order);
680 686
681 { 687 {
682 char name[64] = { 0 }; 688 char name[64] = { 0 };
683 snprintf(name, sizeof(name), "# sent IBF (order %u)", ibf_order); 689 snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
684 GNUNET_STATISTICS_update(_GSS_statistics, name, 1, GNUNET_NO); 690 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
685 } 691 }
686 692
687 ibf = op->state->local_ibf; 693 ibf = op->state->local_ibf;
688 694
689 while (buckets_sent < (1 << ibf_order)) 695 while (buckets_sent < (1 << ibf_order))
690 { 696 {
691 unsigned int buckets_in_message; 697 unsigned int buckets_in_message;
692 struct GNUNET_MQ_Envelope *ev; 698 struct GNUNET_MQ_Envelope *ev;
693 struct IBFMessage *msg; 699 struct IBFMessage *msg;
694 700
695 buckets_in_message = (1 << ibf_order) - buckets_sent; 701 buckets_in_message = (1 << ibf_order) - buckets_sent;
696 /* limit to maximum */ 702 /* limit to maximum */
697 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 703 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
698 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 704 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
699 705
700 ev = GNUNET_MQ_msg_extra(msg, 706 ev = GNUNET_MQ_msg_extra (msg,
701 buckets_in_message * IBF_BUCKET_SIZE, 707 buckets_in_message * IBF_BUCKET_SIZE,
702 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); 708 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
703 msg->reserved1 = 0; 709 msg->reserved1 = 0;
704 msg->reserved2 = 0; 710 msg->reserved2 = 0;
705 msg->order = ibf_order; 711 msg->order = ibf_order;
706 msg->offset = htonl(buckets_sent); 712 msg->offset = htonl (buckets_sent);
707 msg->salt = htonl(op->state->salt_send); 713 msg->salt = htonl (op->state->salt_send);
708 ibf_write_slice(ibf, buckets_sent, 714 ibf_write_slice (ibf, buckets_sent,
709 buckets_in_message, &msg[1]); 715 buckets_in_message, &msg[1]);
710 buckets_sent += buckets_in_message; 716 buckets_sent += buckets_in_message;
711 LOG(GNUNET_ERROR_TYPE_DEBUG, 717 LOG (GNUNET_ERROR_TYPE_DEBUG,
712 "ibf chunk size %u, %u/%u sent\n", 718 "ibf chunk size %u, %u/%u sent\n",
713 buckets_in_message, 719 buckets_in_message,
714 buckets_sent, 720 buckets_sent,
715 1 << ibf_order); 721 1 << ibf_order);
716 GNUNET_MQ_send(op->mq, ev); 722 GNUNET_MQ_send (op->mq, ev);
717 } 723 }
718 724
719 /* The other peer must decode the IBF, so 725 /* The other peer must decode the IBF, so
720 * we're passive. */ 726 * we're passive. */
@@ -731,7 +737,7 @@ send_ibf(struct Operation *op,
731 * @return the required size of the ibf 737 * @return the required size of the ibf
732 */ 738 */
733static unsigned int 739static unsigned int
734get_order_from_difference(unsigned int diff) 740get_order_from_difference (unsigned int diff)
735{ 741{
736 unsigned int ibf_order; 742 unsigned int ibf_order;
737 743
@@ -755,9 +761,9 @@ get_order_from_difference(unsigned int diff)
755 * @return #GNUNET_YES (to continue iterating) 761 * @return #GNUNET_YES (to continue iterating)
756 */ 762 */
757static int 763static int
758send_full_element_iterator(void *cls, 764send_full_element_iterator (void *cls,
759 const struct GNUNET_HashCode *key, 765 const struct GNUNET_HashCode *key,
760 void *value) 766 void *value)
761{ 767{
762 struct Operation *op = cls; 768 struct Operation *op = cls;
763 struct GNUNET_SET_ElementMessage *emsg; 769 struct GNUNET_SET_ElementMessage *emsg;
@@ -765,18 +771,18 @@ send_full_element_iterator(void *cls,
765 struct GNUNET_SET_Element *el = &ee->element; 771 struct GNUNET_SET_Element *el = &ee->element;
766 struct GNUNET_MQ_Envelope *ev; 772 struct GNUNET_MQ_Envelope *ev;
767 773
768 LOG(GNUNET_ERROR_TYPE_DEBUG, 774 LOG (GNUNET_ERROR_TYPE_DEBUG,
769 "Sending element %s\n", 775 "Sending element %s\n",
770 GNUNET_h2s(key)); 776 GNUNET_h2s (key));
771 ev = GNUNET_MQ_msg_extra(emsg, 777 ev = GNUNET_MQ_msg_extra (emsg,
772 el->size, 778 el->size,
773 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); 779 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
774 emsg->element_type = htons(el->element_type); 780 emsg->element_type = htons (el->element_type);
775 GNUNET_memcpy(&emsg[1], 781 GNUNET_memcpy (&emsg[1],
776 el->data, 782 el->data,
777 el->size); 783 el->size);
778 GNUNET_MQ_send(op->mq, 784 GNUNET_MQ_send (op->mq,
779 ev); 785 ev);
780 return GNUNET_YES; 786 return GNUNET_YES;
781} 787}
782 788
@@ -787,21 +793,21 @@ send_full_element_iterator(void *cls,
787 * @param op operation to switch to full set transmission. 793 * @param op operation to switch to full set transmission.
788 */ 794 */
789static void 795static void
790send_full_set(struct Operation *op) 796send_full_set (struct Operation *op)
791{ 797{
792 struct GNUNET_MQ_Envelope *ev; 798 struct GNUNET_MQ_Envelope *ev;
793 799
794 op->state->phase = PHASE_FULL_SENDING; 800 op->state->phase = PHASE_FULL_SENDING;
795 LOG(GNUNET_ERROR_TYPE_DEBUG, 801 LOG (GNUNET_ERROR_TYPE_DEBUG,
796 "Dedicing to transmit the full set\n"); 802 "Dedicing to transmit the full set\n");
797 /* FIXME: use a more memory-friendly way of doing this with an 803 /* FIXME: use a more memory-friendly way of doing this with an
798 iterator, just as we do in the non-full case! */ 804 iterator, just as we do in the non-full case! */
799 (void)GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements, 805 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
800 &send_full_element_iterator, 806 &send_full_element_iterator,
801 op); 807 op);
802 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 808 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
803 GNUNET_MQ_send(op->mq, 809 GNUNET_MQ_send (op->mq,
804 ev); 810 ev);
805} 811}
806 812
807 813
@@ -812,26 +818,27 @@ send_full_set(struct Operation *op)
812 * @param msg the message 818 * @param msg the message
813 */ 819 */
814int 820int
815check_union_p2p_strata_estimator(void *cls, 821check_union_p2p_strata_estimator (void *cls,
816 const struct StrataEstimatorMessage *msg) 822 const struct StrataEstimatorMessage *msg)
817{ 823{
818 struct Operation *op = cls; 824 struct Operation *op = cls;
819 int is_compressed; 825 int is_compressed;
820 size_t len; 826 size_t len;
821 827
822 if (op->state->phase != PHASE_EXPECT_SE) 828 if (op->state->phase != PHASE_EXPECT_SE)
823 { 829 {
824 GNUNET_break(0); 830 GNUNET_break (0);
825 return GNUNET_SYSERR; 831 return GNUNET_SYSERR;
826 } 832 }
827 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type)); 833 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
828 len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage); 834 msg->header.type));
835 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
829 if ((GNUNET_NO == is_compressed) && 836 if ((GNUNET_NO == is_compressed) &&
830 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) 837 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
831 { 838 {
832 GNUNET_break(0); 839 GNUNET_break (0);
833 return GNUNET_SYSERR; 840 return GNUNET_SYSERR;
834 } 841 }
835 return GNUNET_OK; 842 return GNUNET_OK;
836} 843}
837 844
@@ -843,8 +850,8 @@ check_union_p2p_strata_estimator(void *cls,
843 * @param msg the message 850 * @param msg the message
844 */ 851 */
845void 852void
846handle_union_p2p_strata_estimator(void *cls, 853handle_union_p2p_strata_estimator (void *cls,
847 const struct StrataEstimatorMessage *msg) 854 const struct StrataEstimatorMessage *msg)
848{ 855{
849 struct Operation *op = cls; 856 struct Operation *op = cls;
850 struct StrataEstimator *remote_se; 857 struct StrataEstimator *remote_se;
@@ -853,116 +860,118 @@ handle_union_p2p_strata_estimator(void *cls,
853 size_t len; 860 size_t len;
854 int is_compressed; 861 int is_compressed;
855 862
856 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type)); 863 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
857 GNUNET_STATISTICS_update(_GSS_statistics, 864 msg->header.type));
858 "# bytes of SE received", 865 GNUNET_STATISTICS_update (_GSS_statistics,
859 ntohs(msg->header.size), 866 "# bytes of SE received",
860 GNUNET_NO); 867 ntohs (msg->header.size),
861 len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage); 868 GNUNET_NO);
862 other_size = GNUNET_ntohll(msg->set_size); 869 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
863 remote_se = strata_estimator_create(SE_STRATA_COUNT, 870 other_size = GNUNET_ntohll (msg->set_size);
864 SE_IBF_SIZE, 871 remote_se = strata_estimator_create (SE_STRATA_COUNT,
865 SE_IBF_HASH_NUM); 872 SE_IBF_SIZE,
873 SE_IBF_HASH_NUM);
866 if (NULL == remote_se) 874 if (NULL == remote_se)
867 { 875 {
868 /* insufficient resources, fail */ 876 /* insufficient resources, fail */
869 fail_union_operation(op); 877 fail_union_operation (op);
870 return; 878 return;
871 } 879 }
872 if (GNUNET_OK != 880 if (GNUNET_OK !=
873 strata_estimator_read(&msg[1], 881 strata_estimator_read (&msg[1],
874 len, 882 len,
875 is_compressed, 883 is_compressed,
876 remote_se)) 884 remote_se))
877 { 885 {
878 /* decompression failed */ 886 /* decompression failed */
879 strata_estimator_destroy(remote_se); 887 strata_estimator_destroy (remote_se);
880 fail_union_operation(op); 888 fail_union_operation (op);
881 return; 889 return;
882 } 890 }
883 GNUNET_assert(NULL != op->state->se); 891 GNUNET_assert (NULL != op->state->se);
884 diff = strata_estimator_difference(remote_se, 892 diff = strata_estimator_difference (remote_se,
885 op->state->se); 893 op->state->se);
886 894
887 if (diff > 200) 895 if (diff > 200)
888 diff = diff * 3 / 2; 896 diff = diff * 3 / 2;
889 897
890 strata_estimator_destroy(remote_se); 898 strata_estimator_destroy (remote_se);
891 strata_estimator_destroy(op->state->se); 899 strata_estimator_destroy (op->state->se);
892 op->state->se = NULL; 900 op->state->se = NULL;
893 LOG(GNUNET_ERROR_TYPE_DEBUG, 901 LOG (GNUNET_ERROR_TYPE_DEBUG,
894 "got se diff=%d, using ibf size %d\n", 902 "got se diff=%d, using ibf size %d\n",
895 diff, 903 diff,
896 1U << get_order_from_difference(diff)); 904 1U << get_order_from_difference (diff));
897 905
898 { 906 {
899 char *set_debug; 907 char *set_debug;
900 908
901 set_debug = getenv("GNUNET_SET_BENCHMARK"); 909 set_debug = getenv ("GNUNET_SET_BENCHMARK");
902 if ((NULL != set_debug) && 910 if ((NULL != set_debug) &&
903 (0 == strcmp(set_debug, "1"))) 911 (0 == strcmp (set_debug, "1")))
904 { 912 {
905 FILE *f = fopen("set.log", "a"); 913 FILE *f = fopen ("set.log", "a");
906 fprintf(f, "%llu\n", (unsigned long long)diff); 914 fprintf (f, "%llu\n", (unsigned long long) diff);
907 fclose(f); 915 fclose (f);
908 } 916 }
909 } 917 }
910 918
911 if ((GNUNET_YES == op->byzantine) && 919 if ((GNUNET_YES == op->byzantine) &&
912 (other_size < op->byzantine_lower_bound)) 920 (other_size < op->byzantine_lower_bound))
913 { 921 {
914 GNUNET_break(0); 922 GNUNET_break (0);
915 fail_union_operation(op); 923 fail_union_operation (op);
916 return; 924 return;
917 } 925 }
918 926
919 if ((GNUNET_YES == op->force_full) || 927 if ((GNUNET_YES == op->force_full) ||
920 (diff > op->state->initial_size / 4) || 928 (diff > op->state->initial_size / 4) ||
921 (0 == other_size)) 929 (0 == other_size))
930 {
931 LOG (GNUNET_ERROR_TYPE_DEBUG,
932 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
933 diff,
934 op->state->initial_size);
935 GNUNET_STATISTICS_update (_GSS_statistics,
936 "# of full sends",
937 1,
938 GNUNET_NO);
939 if ((op->state->initial_size <= other_size) ||
940 (0 == other_size))
922 { 941 {
923 LOG(GNUNET_ERROR_TYPE_DEBUG, 942 send_full_set (op);
924 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
925 diff,
926 op->state->initial_size);
927 GNUNET_STATISTICS_update(_GSS_statistics,
928 "# of full sends",
929 1,
930 GNUNET_NO);
931 if ((op->state->initial_size <= other_size) ||
932 (0 == other_size))
933 {
934 send_full_set(op);
935 }
936 else
937 {
938 struct GNUNET_MQ_Envelope *ev;
939
940 LOG(GNUNET_ERROR_TYPE_DEBUG,
941 "Telling other peer that we expect its full set\n");
942 op->state->phase = PHASE_EXPECT_IBF;
943 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
944 GNUNET_MQ_send(op->mq,
945 ev);
946 }
947 } 943 }
948 else 944 else
949 { 945 {
950 GNUNET_STATISTICS_update(_GSS_statistics, 946 struct GNUNET_MQ_Envelope *ev;
951 "# of ibf sends", 947
952 1, 948 LOG (GNUNET_ERROR_TYPE_DEBUG,
953 GNUNET_NO); 949 "Telling other peer that we expect its full set\n");
954 if (GNUNET_OK != 950 op->state->phase = PHASE_EXPECT_IBF;
955 send_ibf(op, 951 ev = GNUNET_MQ_msg_header (
956 get_order_from_difference(diff))) 952 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
957 { 953 GNUNET_MQ_send (op->mq,
958 /* Internal error, best we can do is shut the connection */ 954 ev);
959 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
960 "Failed to send IBF, closing connection\n");
961 fail_union_operation(op);
962 return;
963 }
964 } 955 }
965 GNUNET_CADET_receive_done(op->channel); 956 }
957 else
958 {
959 GNUNET_STATISTICS_update (_GSS_statistics,
960 "# of ibf sends",
961 1,
962 GNUNET_NO);
963 if (GNUNET_OK !=
964 send_ibf (op,
965 get_order_from_difference (diff)))
966 {
967 /* Internal error, best we can do is shut the connection */
968 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
969 "Failed to send IBF, closing connection\n");
970 fail_union_operation (op);
971 return;
972 }
973 }
974 GNUNET_CADET_receive_done (op->channel);
966} 975}
967 976
968 977
@@ -974,9 +983,9 @@ handle_union_p2p_strata_estimator(void *cls,
974 * @param value the key entry 983 * @param value the key entry
975 */ 984 */
976static int 985static int
977send_offers_iterator(void *cls, 986send_offers_iterator (void *cls,
978 uint32_t key, 987 uint32_t key,
979 void *value) 988 void *value)
980{ 989{
981 struct SendElementClosure *sec = cls; 990 struct SendElementClosure *sec = cls;
982 struct Operation *op = sec->op; 991 struct Operation *op = sec->op;
@@ -988,17 +997,17 @@ send_offers_iterator(void *cls,
988 if (ke->ibf_key.key_val != sec->ibf_key.key_val) 997 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
989 return GNUNET_YES; 998 return GNUNET_YES;
990 999
991 ev = GNUNET_MQ_msg_header_extra(mh, 1000 ev = GNUNET_MQ_msg_header_extra (mh,
992 sizeof(struct GNUNET_HashCode), 1001 sizeof(struct GNUNET_HashCode),
993 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); 1002 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
994 1003
995 GNUNET_assert(NULL != ev); 1004 GNUNET_assert (NULL != ev);
996 *(struct GNUNET_HashCode *)&mh[1] = ke->element->element_hash; 1005 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
997 LOG(GNUNET_ERROR_TYPE_DEBUG, 1006 LOG (GNUNET_ERROR_TYPE_DEBUG,
998 "[OP %x] sending element offer (%s) to peer\n", 1007 "[OP %x] sending element offer (%s) to peer\n",
999 (void *)op, 1008 (void *) op,
1000 GNUNET_h2s(&ke->element->element_hash)); 1009 GNUNET_h2s (&ke->element->element_hash));
1001 GNUNET_MQ_send(op->mq, ev); 1010 GNUNET_MQ_send (op->mq, ev);
1002 return GNUNET_YES; 1011 return GNUNET_YES;
1003} 1012}
1004 1013
@@ -1010,17 +1019,19 @@ send_offers_iterator(void *cls,
1010 * @param ibf_key IBF key of interest 1019 * @param ibf_key IBF key of interest
1011 */ 1020 */
1012static void 1021static void
1013send_offers_for_key(struct Operation *op, 1022send_offers_for_key (struct Operation *op,
1014 struct IBF_Key ibf_key) 1023 struct IBF_Key ibf_key)
1015{ 1024{
1016 struct SendElementClosure send_cls; 1025 struct SendElementClosure send_cls;
1017 1026
1018 send_cls.ibf_key = ibf_key; 1027 send_cls.ibf_key = ibf_key;
1019 send_cls.op = op; 1028 send_cls.op = op;
1020 (void)GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element, 1029 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1021 (uint32_t)ibf_key.key_val, 1030 op->state->key_to_element,
1022 &send_offers_iterator, 1031 (uint32_t) ibf_key.
1023 &send_cls); 1032 key_val,
1033 &send_offers_iterator,
1034 &send_cls);
1024} 1035}
1025 1036
1026 1037
@@ -1032,7 +1043,7 @@ send_offers_for_key(struct Operation *op,
1032 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure 1043 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1033 */ 1044 */
1034static int 1045static int
1035decode_and_send(struct Operation *op) 1046decode_and_send (struct Operation *op)
1036{ 1047{
1037 struct IBF_Key key; 1048 struct IBF_Key key;
1038 struct IBF_Key last_key; 1049 struct IBF_Key last_key;
@@ -1040,147 +1051,147 @@ decode_and_send(struct Operation *op)
1040 unsigned int num_decoded; 1051 unsigned int num_decoded;
1041 struct InvertibleBloomFilter *diff_ibf; 1052 struct InvertibleBloomFilter *diff_ibf;
1042 1053
1043 GNUNET_assert(PHASE_INVENTORY_ACTIVE == op->state->phase); 1054 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1044 1055
1045 if (GNUNET_OK != 1056 if (GNUNET_OK !=
1046 prepare_ibf(op, 1057 prepare_ibf (op,
1047 op->state->remote_ibf->size)) 1058 op->state->remote_ibf->size))
1048 { 1059 {
1049 GNUNET_break(0); 1060 GNUNET_break (0);
1050 /* allocation failed */ 1061 /* allocation failed */
1051 return GNUNET_SYSERR; 1062 return GNUNET_SYSERR;
1052 } 1063 }
1053 diff_ibf = ibf_dup(op->state->local_ibf); 1064 diff_ibf = ibf_dup (op->state->local_ibf);
1054 ibf_subtract(diff_ibf, 1065 ibf_subtract (diff_ibf,
1055 op->state->remote_ibf); 1066 op->state->remote_ibf);
1056 1067
1057 ibf_destroy(op->state->remote_ibf); 1068 ibf_destroy (op->state->remote_ibf);
1058 op->state->remote_ibf = NULL; 1069 op->state->remote_ibf = NULL;
1059 1070
1060 LOG(GNUNET_ERROR_TYPE_DEBUG, 1071 LOG (GNUNET_ERROR_TYPE_DEBUG,
1061 "decoding IBF (size=%u)\n", 1072 "decoding IBF (size=%u)\n",
1062 diff_ibf->size); 1073 diff_ibf->size);
1063 1074
1064 num_decoded = 0; 1075 num_decoded = 0;
1065 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ 1076 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1066 1077
1067 while (1) 1078 while (1)
1068 { 1079 {
1069 int res; 1080 int res;
1070 int cycle_detected = GNUNET_NO; 1081 int cycle_detected = GNUNET_NO;
1071
1072 last_key = key;
1073 1082
1074 res = ibf_decode(diff_ibf, &side, &key); 1083 last_key = key;
1075 if (res == GNUNET_OK)
1076 {
1077 LOG(GNUNET_ERROR_TYPE_DEBUG,
1078 "decoded ibf key %lx\n",
1079 (unsigned long)key.key_val);
1080 num_decoded += 1;
1081 if ((num_decoded > diff_ibf->size) ||
1082 ((num_decoded > 1) &&
1083 (last_key.key_val == key.key_val)))
1084 {
1085 LOG(GNUNET_ERROR_TYPE_DEBUG,
1086 "detected cyclic ibf (decoded %u/%u)\n",
1087 num_decoded,
1088 diff_ibf->size);
1089 cycle_detected = GNUNET_YES;
1090 }
1091 }
1092 if ((GNUNET_SYSERR == res) ||
1093 (GNUNET_YES == cycle_detected))
1094 {
1095 int next_order;
1096 next_order = 0;
1097 while (1 << next_order < diff_ibf->size)
1098 next_order++;
1099 next_order++;
1100 if (next_order <= MAX_IBF_ORDER)
1101 {
1102 LOG(GNUNET_ERROR_TYPE_DEBUG,
1103 "decoding failed, sending larger ibf (size %u)\n",
1104 1 << next_order);
1105 GNUNET_STATISTICS_update(_GSS_statistics,
1106 "# of IBF retries",
1107 1,
1108 GNUNET_NO);
1109 op->state->salt_send++;
1110 if (GNUNET_OK !=
1111 send_ibf(op, next_order))
1112 {
1113 /* Internal error, best we can do is shut the connection */
1114 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1115 "Failed to send IBF, closing connection\n");
1116 fail_union_operation(op);
1117 ibf_destroy(diff_ibf);
1118 return GNUNET_SYSERR;
1119 }
1120 }
1121 else
1122 {
1123 GNUNET_STATISTICS_update(_GSS_statistics,
1124 "# of failed union operations (too large)",
1125 1,
1126 GNUNET_NO);
1127 // XXX: Send the whole set, element-by-element
1128 LOG(GNUNET_ERROR_TYPE_ERROR,
1129 "set union failed: reached ibf limit\n");
1130 fail_union_operation(op);
1131 ibf_destroy(diff_ibf);
1132 return GNUNET_SYSERR;
1133 }
1134 break;
1135 }
1136 if (GNUNET_NO == res)
1137 {
1138 struct GNUNET_MQ_Envelope *ev;
1139
1140 LOG(GNUNET_ERROR_TYPE_DEBUG,
1141 "transmitted all values, sending DONE\n");
1142 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1143 GNUNET_MQ_send(op->mq, ev);
1144 /* We now wait until we get a DONE message back
1145 * and then wait for our MQ to be flushed and all our
1146 * demands be delivered. */
1147 break;
1148 }
1149 if (1 == side)
1150 {
1151 struct IBF_Key unsalted_key;
1152 1084
1153 unsalt_key(&key, 1085 res = ibf_decode (diff_ibf, &side, &key);
1154 op->state->salt_receive, 1086 if (res == GNUNET_OK)
1155 &unsalted_key); 1087 {
1156 send_offers_for_key(op, 1088 LOG (GNUNET_ERROR_TYPE_DEBUG,
1157 unsalted_key); 1089 "decoded ibf key %lx\n",
1158 } 1090 (unsigned long) key.key_val);
1159 else if (-1 == side) 1091 num_decoded += 1;
1092 if ((num_decoded > diff_ibf->size) ||
1093 ((num_decoded > 1) &&
1094 (last_key.key_val == key.key_val)))
1095 {
1096 LOG (GNUNET_ERROR_TYPE_DEBUG,
1097 "detected cyclic ibf (decoded %u/%u)\n",
1098 num_decoded,
1099 diff_ibf->size);
1100 cycle_detected = GNUNET_YES;
1101 }
1102 }
1103 if ((GNUNET_SYSERR == res) ||
1104 (GNUNET_YES == cycle_detected))
1105 {
1106 int next_order;
1107 next_order = 0;
1108 while (1 << next_order < diff_ibf->size)
1109 next_order++;
1110 next_order++;
1111 if (next_order <= MAX_IBF_ORDER)
1112 {
1113 LOG (GNUNET_ERROR_TYPE_DEBUG,
1114 "decoding failed, sending larger ibf (size %u)\n",
1115 1 << next_order);
1116 GNUNET_STATISTICS_update (_GSS_statistics,
1117 "# of IBF retries",
1118 1,
1119 GNUNET_NO);
1120 op->state->salt_send++;
1121 if (GNUNET_OK !=
1122 send_ibf (op, next_order))
1160 { 1123 {
1161 struct GNUNET_MQ_Envelope *ev; 1124 /* Internal error, best we can do is shut the connection */
1162 struct InquiryMessage *msg; 1125 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1163 1126 "Failed to send IBF, closing connection\n");
1164 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth 1127 fail_union_operation (op);
1165 * the effort additional complexity. */ 1128 ibf_destroy (diff_ibf);
1166 ev = GNUNET_MQ_msg_extra(msg, 1129 return GNUNET_SYSERR;
1167 sizeof(struct IBF_Key),
1168 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1169 msg->salt = htonl(op->state->salt_receive);
1170 GNUNET_memcpy(&msg[1],
1171 &key,
1172 sizeof(struct IBF_Key));
1173 LOG(GNUNET_ERROR_TYPE_DEBUG,
1174 "sending element inquiry for IBF key %lx\n",
1175 (unsigned long)key.key_val);
1176 GNUNET_MQ_send(op->mq, ev);
1177 } 1130 }
1131 }
1178 else 1132 else
1179 { 1133 {
1180 GNUNET_assert(0); 1134 GNUNET_STATISTICS_update (_GSS_statistics,
1181 } 1135 "# of failed union operations (too large)",
1136 1,
1137 GNUNET_NO);
1138 // XXX: Send the whole set, element-by-element
1139 LOG (GNUNET_ERROR_TYPE_ERROR,
1140 "set union failed: reached ibf limit\n");
1141 fail_union_operation (op);
1142 ibf_destroy (diff_ibf);
1143 return GNUNET_SYSERR;
1144 }
1145 break;
1146 }
1147 if (GNUNET_NO == res)
1148 {
1149 struct GNUNET_MQ_Envelope *ev;
1150
1151 LOG (GNUNET_ERROR_TYPE_DEBUG,
1152 "transmitted all values, sending DONE\n");
1153 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1154 GNUNET_MQ_send (op->mq, ev);
1155 /* We now wait until we get a DONE message back
1156 * and then wait for our MQ to be flushed and all our
1157 * demands be delivered. */
1158 break;
1182 } 1159 }
1183 ibf_destroy(diff_ibf); 1160 if (1 == side)
1161 {
1162 struct IBF_Key unsalted_key;
1163
1164 unsalt_key (&key,
1165 op->state->salt_receive,
1166 &unsalted_key);
1167 send_offers_for_key (op,
1168 unsalted_key);
1169 }
1170 else if (-1 == side)
1171 {
1172 struct GNUNET_MQ_Envelope *ev;
1173 struct InquiryMessage *msg;
1174
1175 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1176 * the effort additional complexity. */
1177 ev = GNUNET_MQ_msg_extra (msg,
1178 sizeof(struct IBF_Key),
1179 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1180 msg->salt = htonl (op->state->salt_receive);
1181 GNUNET_memcpy (&msg[1],
1182 &key,
1183 sizeof(struct IBF_Key));
1184 LOG (GNUNET_ERROR_TYPE_DEBUG,
1185 "sending element inquiry for IBF key %lx\n",
1186 (unsigned long) key.key_val);
1187 GNUNET_MQ_send (op->mq, ev);
1188 }
1189 else
1190 {
1191 GNUNET_assert (0);
1192 }
1193 }
1194 ibf_destroy (diff_ibf);
1184 return GNUNET_OK; 1195 return GNUNET_OK;
1185} 1196}
1186 1197
@@ -1196,52 +1207,54 @@ decode_and_send(struct Operation *op)
1196 * @return #GNUNET_OK if @a msg is well-formed 1207 * @return #GNUNET_OK if @a msg is well-formed
1197 */ 1208 */
1198int 1209int
1199check_union_p2p_ibf(void *cls, 1210check_union_p2p_ibf (void *cls,
1200 const struct IBFMessage *msg) 1211 const struct IBFMessage *msg)
1201{ 1212{
1202 struct Operation *op = cls; 1213 struct Operation *op = cls;
1203 unsigned int buckets_in_message; 1214 unsigned int buckets_in_message;
1204 1215
1205 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1216 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1206 { 1217 {
1207 GNUNET_break_op(0); 1218 GNUNET_break_op (0);
1208 return GNUNET_SYSERR; 1219 return GNUNET_SYSERR;
1209 } 1220 }
1210 buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 1221 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1222 / IBF_BUCKET_SIZE;
1211 if (0 == buckets_in_message) 1223 if (0 == buckets_in_message)
1224 {
1225 GNUNET_break_op (0);
1226 return GNUNET_SYSERR;
1227 }
1228 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1229 * IBF_BUCKET_SIZE)
1230 {
1231 GNUNET_break_op (0);
1232 return GNUNET_SYSERR;
1233 }
1234 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1235 {
1236 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1212 { 1237 {
1213 GNUNET_break_op(0); 1238 GNUNET_break_op (0);
1214 return GNUNET_SYSERR; 1239 return GNUNET_SYSERR;
1215 } 1240 }
1216 if ((ntohs(msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) 1241 if (1 << msg->order != op->state->remote_ibf->size)
1217 { 1242 {
1218 GNUNET_break_op(0); 1243 GNUNET_break_op (0);
1219 return GNUNET_SYSERR; 1244 return GNUNET_SYSERR;
1220 } 1245 }
1221 if (op->state->phase == PHASE_EXPECT_IBF_CONT) 1246 if (ntohl (msg->salt) != op->state->salt_receive)
1222 { 1247 {
1223 if (ntohl(msg->offset) != op->state->ibf_buckets_received) 1248 GNUNET_break_op (0);
1224 { 1249 return GNUNET_SYSERR;
1225 GNUNET_break_op(0);
1226 return GNUNET_SYSERR;
1227 }
1228 if (1 << msg->order != op->state->remote_ibf->size)
1229 {
1230 GNUNET_break_op(0);
1231 return GNUNET_SYSERR;
1232 }
1233 if (ntohl(msg->salt) != op->state->salt_receive)
1234 {
1235 GNUNET_break_op(0);
1236 return GNUNET_SYSERR;
1237 }
1238 } 1250 }
1251 }
1239 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && 1252 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1240 (op->state->phase != PHASE_EXPECT_IBF)) 1253 (op->state->phase != PHASE_EXPECT_IBF))
1241 { 1254 {
1242 GNUNET_break_op(0); 1255 GNUNET_break_op (0);
1243 return GNUNET_SYSERR; 1256 return GNUNET_SYSERR;
1244 } 1257 }
1245 1258
1246 return GNUNET_OK; 1259 return GNUNET_OK;
1247} 1260}
@@ -1257,71 +1270,72 @@ check_union_p2p_ibf(void *cls,
1257 * @param msg the header of the message 1270 * @param msg the header of the message
1258 */ 1271 */
1259void 1272void
1260handle_union_p2p_ibf(void *cls, 1273handle_union_p2p_ibf (void *cls,
1261 const struct IBFMessage *msg) 1274 const struct IBFMessage *msg)
1262{ 1275{
1263 struct Operation *op = cls; 1276 struct Operation *op = cls;
1264 unsigned int buckets_in_message; 1277 unsigned int buckets_in_message;
1265 1278
1266 buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 1279 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1280 / IBF_BUCKET_SIZE;
1267 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) || 1281 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1268 (op->state->phase == PHASE_EXPECT_IBF)) 1282 (op->state->phase == PHASE_EXPECT_IBF))
1269 { 1283 {
1270 op->state->phase = PHASE_EXPECT_IBF_CONT; 1284 op->state->phase = PHASE_EXPECT_IBF_CONT;
1271 GNUNET_assert(NULL == op->state->remote_ibf); 1285 GNUNET_assert (NULL == op->state->remote_ibf);
1272 LOG(GNUNET_ERROR_TYPE_DEBUG, 1286 LOG (GNUNET_ERROR_TYPE_DEBUG,
1273 "Creating new ibf of size %u\n", 1287 "Creating new ibf of size %u\n",
1274 1 << msg->order); 1288 1 << msg->order);
1275 op->state->remote_ibf = ibf_create(1 << msg->order, SE_IBF_HASH_NUM); 1289 op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1276 op->state->salt_receive = ntohl(msg->salt); 1290 op->state->salt_receive = ntohl (msg->salt);
1277 LOG(GNUNET_ERROR_TYPE_DEBUG, 1291 LOG (GNUNET_ERROR_TYPE_DEBUG,
1278 "Receiving new IBF with salt %u\n", 1292 "Receiving new IBF with salt %u\n",
1279 op->state->salt_receive); 1293 op->state->salt_receive);
1280 if (NULL == op->state->remote_ibf) 1294 if (NULL == op->state->remote_ibf)
1281 { 1295 {
1282 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 1296 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1283 "Failed to parse remote IBF, closing connection\n"); 1297 "Failed to parse remote IBF, closing connection\n");
1284 fail_union_operation(op); 1298 fail_union_operation (op);
1285 return; 1299 return;
1286 }
1287 op->state->ibf_buckets_received = 0;
1288 if (0 != ntohl(msg->offset))
1289 {
1290 GNUNET_break_op(0);
1291 fail_union_operation(op);
1292 return;
1293 }
1294 } 1300 }
1295 else 1301 op->state->ibf_buckets_received = 0;
1302 if (0 != ntohl (msg->offset))
1296 { 1303 {
1297 GNUNET_assert(op->state->phase == PHASE_EXPECT_IBF_CONT); 1304 GNUNET_break_op (0);
1298 LOG(GNUNET_ERROR_TYPE_DEBUG, 1305 fail_union_operation (op);
1299 "Received more of IBF\n"); 1306 return;
1300 } 1307 }
1301 GNUNET_assert(NULL != op->state->remote_ibf); 1308 }
1309 else
1310 {
1311 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1312 LOG (GNUNET_ERROR_TYPE_DEBUG,
1313 "Received more of IBF\n");
1314 }
1315 GNUNET_assert (NULL != op->state->remote_ibf);
1302 1316
1303 ibf_read_slice(&msg[1], 1317 ibf_read_slice (&msg[1],
1304 op->state->ibf_buckets_received, 1318 op->state->ibf_buckets_received,
1305 buckets_in_message, 1319 buckets_in_message,
1306 op->state->remote_ibf); 1320 op->state->remote_ibf);
1307 op->state->ibf_buckets_received += buckets_in_message; 1321 op->state->ibf_buckets_received += buckets_in_message;
1308 1322
1309 if (op->state->ibf_buckets_received == op->state->remote_ibf->size) 1323 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1310 { 1324 {
1311 LOG(GNUNET_ERROR_TYPE_DEBUG, 1325 LOG (GNUNET_ERROR_TYPE_DEBUG,
1312 "received full ibf\n"); 1326 "received full ibf\n");
1313 op->state->phase = PHASE_INVENTORY_ACTIVE; 1327 op->state->phase = PHASE_INVENTORY_ACTIVE;
1314 if (GNUNET_OK != 1328 if (GNUNET_OK !=
1315 decode_and_send(op)) 1329 decode_and_send (op))
1316 { 1330 {
1317 /* Internal error, best we can do is shut down */ 1331 /* Internal error, best we can do is shut down */
1318 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 1332 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1319 "Failed to decode IBF, closing connection\n"); 1333 "Failed to decode IBF, closing connection\n");
1320 fail_union_operation(op); 1334 fail_union_operation (op);
1321 return; 1335 return;
1322 }
1323 } 1336 }
1324 GNUNET_CADET_receive_done(op->channel); 1337 }
1338 GNUNET_CADET_receive_done (op->channel);
1325} 1339}
1326 1340
1327 1341
@@ -1334,33 +1348,34 @@ handle_union_p2p_ibf(void *cls,
1334 * @param status status to send with the new element 1348 * @param status status to send with the new element
1335 */ 1349 */
1336static void 1350static void
1337send_client_element(struct Operation *op, 1351send_client_element (struct Operation *op,
1338 struct GNUNET_SET_Element *element, 1352 struct GNUNET_SET_Element *element,
1339 int status) 1353 int status)
1340{ 1354{
1341 struct GNUNET_MQ_Envelope *ev; 1355 struct GNUNET_MQ_Envelope *ev;
1342 struct GNUNET_SET_ResultMessage *rm; 1356 struct GNUNET_SET_ResultMessage *rm;
1343 1357
1344 LOG(GNUNET_ERROR_TYPE_DEBUG, 1358 LOG (GNUNET_ERROR_TYPE_DEBUG,
1345 "sending element (size %u) to client\n", 1359 "sending element (size %u) to client\n",
1346 element->size); 1360 element->size);
1347 GNUNET_assert(0 != op->client_request_id); 1361 GNUNET_assert (0 != op->client_request_id);
1348 ev = GNUNET_MQ_msg_extra(rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 1362 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1349 if (NULL == ev) 1363 if (NULL == ev)
1350 { 1364 {
1351 GNUNET_MQ_discard(ev); 1365 GNUNET_MQ_discard (ev);
1352 GNUNET_break(0); 1366 GNUNET_break (0);
1353 return; 1367 return;
1354 } 1368 }
1355 rm->result_status = htons(status); 1369 rm->result_status = htons (status);
1356 rm->request_id = htonl(op->client_request_id); 1370 rm->request_id = htonl (op->client_request_id);
1357 rm->element_type = htons(element->element_type); 1371 rm->element_type = htons (element->element_type);
1358 rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element)); 1372 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1359 GNUNET_memcpy(&rm[1], 1373 op->state->key_to_element));
1360 element->data, 1374 GNUNET_memcpy (&rm[1],
1361 element->size); 1375 element->data,
1362 GNUNET_MQ_send(op->set->cs->mq, 1376 element->size);
1363 ev); 1377 GNUNET_MQ_send (op->set->cs->mq,
1378 ev);
1364} 1379}
1365 1380
1366 1381
@@ -1371,50 +1386,51 @@ send_client_element(struct Operation *op,
1371 * @param cls operation to destroy 1386 * @param cls operation to destroy
1372 */ 1387 */
1373static void 1388static void
1374send_client_done(void *cls) 1389send_client_done (void *cls)
1375{ 1390{
1376 struct Operation *op = cls; 1391 struct Operation *op = cls;
1377 struct GNUNET_MQ_Envelope *ev; 1392 struct GNUNET_MQ_Envelope *ev;
1378 struct GNUNET_SET_ResultMessage *rm; 1393 struct GNUNET_SET_ResultMessage *rm;
1379 1394
1380 if (GNUNET_YES == op->state->client_done_sent) 1395 if (GNUNET_YES == op->state->client_done_sent)
1381 { 1396 {
1382 return; 1397 return;
1383 } 1398 }
1384 1399
1385 if (PHASE_DONE != op->state->phase) 1400 if (PHASE_DONE != op->state->phase)
1386 { 1401 {
1387 LOG(GNUNET_ERROR_TYPE_WARNING, 1402 LOG (GNUNET_ERROR_TYPE_WARNING,
1388 "Union operation failed\n"); 1403 "Union operation failed\n");
1389 GNUNET_STATISTICS_update(_GSS_statistics, 1404 GNUNET_STATISTICS_update (_GSS_statistics,
1390 "# Union operations failed", 1405 "# Union operations failed",
1391 1, 1406 1,
1392 GNUNET_NO); 1407 GNUNET_NO);
1393 ev = GNUNET_MQ_msg(rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1408 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1394 rm->result_status = htons(GNUNET_SET_STATUS_FAILURE); 1409 rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1395 rm->request_id = htonl(op->client_request_id); 1410 rm->request_id = htonl (op->client_request_id);
1396 rm->element_type = htons(0); 1411 rm->element_type = htons (0);
1397 GNUNET_MQ_send(op->set->cs->mq, 1412 GNUNET_MQ_send (op->set->cs->mq,
1398 ev); 1413 ev);
1399 return; 1414 return;
1400 } 1415 }
1401 1416
1402 op->state->client_done_sent = GNUNET_YES; 1417 op->state->client_done_sent = GNUNET_YES;
1403 1418
1404 GNUNET_STATISTICS_update(_GSS_statistics, 1419 GNUNET_STATISTICS_update (_GSS_statistics,
1405 "# Union operations succeeded", 1420 "# Union operations succeeded",
1406 1, 1421 1,
1407 GNUNET_NO); 1422 GNUNET_NO);
1408 LOG(GNUNET_ERROR_TYPE_INFO, 1423 LOG (GNUNET_ERROR_TYPE_INFO,
1409 "Signalling client that union operation is done\n"); 1424 "Signalling client that union operation is done\n");
1410 ev = GNUNET_MQ_msg(rm, 1425 ev = GNUNET_MQ_msg (rm,
1411 GNUNET_MESSAGE_TYPE_SET_RESULT); 1426 GNUNET_MESSAGE_TYPE_SET_RESULT);
1412 rm->request_id = htonl(op->client_request_id); 1427 rm->request_id = htonl (op->client_request_id);
1413 rm->result_status = htons(GNUNET_SET_STATUS_DONE); 1428 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1414 rm->element_type = htons(0); 1429 rm->element_type = htons (0);
1415 rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element)); 1430 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1416 GNUNET_MQ_send(op->set->cs->mq, 1431 op->state->key_to_element));
1417 ev); 1432 GNUNET_MQ_send (op->set->cs->mq,
1433 ev);
1418} 1434}
1419 1435
1420 1436
@@ -1424,41 +1440,42 @@ send_client_done(void *cls)
1424 * @param op operation to check 1440 * @param op operation to check
1425 */ 1441 */
1426static void 1442static void
1427maybe_finish(struct Operation *op) 1443maybe_finish (struct Operation *op)
1428{ 1444{
1429 unsigned int num_demanded; 1445 unsigned int num_demanded;
1430 1446
1431 num_demanded = GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes); 1447 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1448 op->state->demanded_hashes);
1432 1449
1433 if (PHASE_FINISH_WAITING == op->state->phase) 1450 if (PHASE_FINISH_WAITING == op->state->phase)
1451 {
1452 LOG (GNUNET_ERROR_TYPE_DEBUG,
1453 "In PHASE_FINISH_WAITING, pending %u demands\n",
1454 num_demanded);
1455 if (0 == num_demanded)
1434 { 1456 {
1435 LOG(GNUNET_ERROR_TYPE_DEBUG, 1457 struct GNUNET_MQ_Envelope *ev;
1436 "In PHASE_FINISH_WAITING, pending %u demands\n", 1458
1437 num_demanded); 1459 op->state->phase = PHASE_DONE;
1438 if (0 == num_demanded) 1460 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1439 { 1461 GNUNET_MQ_send (op->mq,
1440 struct GNUNET_MQ_Envelope *ev; 1462 ev);
1441 1463 /* We now wait until the other peer sends P2P_OVER
1442 op->state->phase = PHASE_DONE; 1464 * after it got all elements from us. */
1443 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1444 GNUNET_MQ_send(op->mq,
1445 ev);
1446 /* We now wait until the other peer sends P2P_OVER
1447 * after it got all elements from us. */
1448 }
1449 } 1465 }
1466 }
1450 if (PHASE_FINISH_CLOSING == op->state->phase) 1467 if (PHASE_FINISH_CLOSING == op->state->phase)
1468 {
1469 LOG (GNUNET_ERROR_TYPE_DEBUG,
1470 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1471 num_demanded);
1472 if (0 == num_demanded)
1451 { 1473 {
1452 LOG(GNUNET_ERROR_TYPE_DEBUG, 1474 op->state->phase = PHASE_DONE;
1453 "In PHASE_FINISH_CLOSING, pending %u demands\n", 1475 send_client_done (op);
1454 num_demanded); 1476 _GSS_operation_destroy2 (op);
1455 if (0 == num_demanded)
1456 {
1457 op->state->phase = PHASE_DONE;
1458 send_client_done(op);
1459 _GSS_operation_destroy2(op);
1460 }
1461 } 1477 }
1478 }
1462} 1479}
1463 1480
1464 1481
@@ -1469,21 +1486,21 @@ maybe_finish(struct Operation *op)
1469 * @param emsg the message 1486 * @param emsg the message
1470 */ 1487 */
1471int 1488int
1472check_union_p2p_elements(void *cls, 1489check_union_p2p_elements (void *cls,
1473 const struct GNUNET_SET_ElementMessage *emsg) 1490 const struct GNUNET_SET_ElementMessage *emsg)
1474{ 1491{
1475 struct Operation *op = cls; 1492 struct Operation *op = cls;
1476 1493
1477 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1494 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1478 { 1495 {
1479 GNUNET_break_op(0); 1496 GNUNET_break_op (0);
1480 return GNUNET_SYSERR; 1497 return GNUNET_SYSERR;
1481 } 1498 }
1482 if (0 == GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes)) 1499 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1483 { 1500 {
1484 GNUNET_break_op(0); 1501 GNUNET_break_op (0);
1485 return GNUNET_SYSERR; 1502 return GNUNET_SYSERR;
1486 } 1503 }
1487 return GNUNET_OK; 1504 return GNUNET_OK;
1488} 1505}
1489 1506
@@ -1497,98 +1514,99 @@ check_union_p2p_elements(void *cls,
1497 * @param emsg the message 1514 * @param emsg the message
1498 */ 1515 */
1499void 1516void
1500handle_union_p2p_elements(void *cls, 1517handle_union_p2p_elements (void *cls,
1501 const struct GNUNET_SET_ElementMessage *emsg) 1518 const struct GNUNET_SET_ElementMessage *emsg)
1502{ 1519{
1503 struct Operation *op = cls; 1520 struct Operation *op = cls;
1504 struct ElementEntry *ee; 1521 struct ElementEntry *ee;
1505 struct KeyEntry *ke; 1522 struct KeyEntry *ke;
1506 uint16_t element_size; 1523 uint16_t element_size;
1507 1524
1508 element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage); 1525 element_size = ntohs (emsg->header.size) - sizeof(struct
1509 ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size); 1526 GNUNET_SET_ElementMessage);
1510 GNUNET_memcpy(&ee[1], 1527 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1511 &emsg[1], 1528 GNUNET_memcpy (&ee[1],
1512 element_size); 1529 &emsg[1],
1530 element_size);
1513 ee->element.size = element_size; 1531 ee->element.size = element_size;
1514 ee->element.data = &ee[1]; 1532 ee->element.data = &ee[1];
1515 ee->element.element_type = ntohs(emsg->element_type); 1533 ee->element.element_type = ntohs (emsg->element_type);
1516 ee->remote = GNUNET_YES; 1534 ee->remote = GNUNET_YES;
1517 GNUNET_SET_element_hash(&ee->element, 1535 GNUNET_SET_element_hash (&ee->element,
1518 &ee->element_hash); 1536 &ee->element_hash);
1519 if (GNUNET_NO == 1537 if (GNUNET_NO ==
1520 GNUNET_CONTAINER_multihashmap_remove(op->state->demanded_hashes, 1538 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1521 &ee->element_hash, 1539 &ee->element_hash,
1522 NULL)) 1540 NULL))
1523 { 1541 {
1524 /* We got something we didn't demand, since it's not in our map. */ 1542 /* We got something we didn't demand, since it's not in our map. */
1525 GNUNET_break_op(0); 1543 GNUNET_break_op (0);
1526 fail_union_operation(op); 1544 fail_union_operation (op);
1527 return; 1545 return;
1528 } 1546 }
1529 1547
1530 LOG(GNUNET_ERROR_TYPE_DEBUG, 1548 LOG (GNUNET_ERROR_TYPE_DEBUG,
1531 "Got element (size %u, hash %s) from peer\n", 1549 "Got element (size %u, hash %s) from peer\n",
1532 (unsigned int)element_size, 1550 (unsigned int) element_size,
1533 GNUNET_h2s(&ee->element_hash)); 1551 GNUNET_h2s (&ee->element_hash));
1534 1552
1535 GNUNET_STATISTICS_update(_GSS_statistics, 1553 GNUNET_STATISTICS_update (_GSS_statistics,
1536 "# received elements", 1554 "# received elements",
1537 1, 1555 1,
1538 GNUNET_NO); 1556 GNUNET_NO);
1539 GNUNET_STATISTICS_update(_GSS_statistics, 1557 GNUNET_STATISTICS_update (_GSS_statistics,
1540 "# exchanged elements", 1558 "# exchanged elements",
1541 1, 1559 1,
1542 GNUNET_NO); 1560 GNUNET_NO);
1543 1561
1544 op->state->received_total++; 1562 op->state->received_total++;
1545 1563
1546 ke = op_get_element(op, &ee->element_hash); 1564 ke = op_get_element (op, &ee->element_hash);
1547 if (NULL != ke) 1565 if (NULL != ke)
1548 { 1566 {
1549 /* Got repeated element. Should not happen since 1567 /* Got repeated element. Should not happen since
1550 * we track demands. */ 1568 * we track demands. */
1551 GNUNET_STATISTICS_update(_GSS_statistics, 1569 GNUNET_STATISTICS_update (_GSS_statistics,
1552 "# repeated elements", 1570 "# repeated elements",
1553 1, 1571 1,
1554 GNUNET_NO); 1572 GNUNET_NO);
1555 ke->received = GNUNET_YES; 1573 ke->received = GNUNET_YES;
1556 GNUNET_free(ee); 1574 GNUNET_free (ee);
1557 } 1575 }
1558 else 1576 else
1577 {
1578 LOG (GNUNET_ERROR_TYPE_DEBUG,
1579 "Registering new element from remote peer\n");
1580 op->state->received_fresh++;
1581 op_register_element (op, ee, GNUNET_YES);
1582 /* only send results immediately if the client wants it */
1583 switch (op->result_mode)
1559 { 1584 {
1560 LOG(GNUNET_ERROR_TYPE_DEBUG, 1585 case GNUNET_SET_RESULT_ADDED:
1561 "Registering new element from remote peer\n"); 1586 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1562 op->state->received_fresh++; 1587 break;
1563 op_register_element(op, ee, GNUNET_YES); 1588
1564 /* only send results immediately if the client wants it */ 1589 case GNUNET_SET_RESULT_SYMMETRIC:
1565 switch (op->result_mode) 1590 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1566 { 1591 break;
1567 case GNUNET_SET_RESULT_ADDED: 1592
1568 send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK); 1593 default:
1569 break; 1594 /* Result mode not supported, should have been caught earlier. */
1570 1595 GNUNET_break (0);
1571 case GNUNET_SET_RESULT_SYMMETRIC: 1596 break;
1572 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1573 break;
1574
1575 default:
1576 /* Result mode not supported, should have been caught earlier. */
1577 GNUNET_break(0);
1578 break;
1579 }
1580 } 1597 }
1598 }
1581 1599
1582 if ((op->state->received_total > 8) && 1600 if ((op->state->received_total > 8) &&
1583 (op->state->received_fresh < op->state->received_total / 3)) 1601 (op->state->received_fresh < op->state->received_total / 3))
1584 { 1602 {
1585 /* The other peer gave us lots of old elements, there's something wrong. */ 1603 /* The other peer gave us lots of old elements, there's something wrong. */
1586 GNUNET_break_op(0); 1604 GNUNET_break_op (0);
1587 fail_union_operation(op); 1605 fail_union_operation (op);
1588 return; 1606 return;
1589 } 1607 }
1590 GNUNET_CADET_receive_done(op->channel); 1608 GNUNET_CADET_receive_done (op->channel);
1591 maybe_finish(op); 1609 maybe_finish (op);
1592} 1610}
1593 1611
1594 1612
@@ -1599,16 +1617,16 @@ handle_union_p2p_elements(void *cls,
1599 * @param emsg the message 1617 * @param emsg the message
1600 */ 1618 */
1601int 1619int
1602check_union_p2p_full_element(void *cls, 1620check_union_p2p_full_element (void *cls,
1603 const struct GNUNET_SET_ElementMessage *emsg) 1621 const struct GNUNET_SET_ElementMessage *emsg)
1604{ 1622{
1605 struct Operation *op = cls; 1623 struct Operation *op = cls;
1606 1624
1607 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1625 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1608 { 1626 {
1609 GNUNET_break_op(0); 1627 GNUNET_break_op (0);
1610 return GNUNET_SYSERR; 1628 return GNUNET_SYSERR;
1611 } 1629 }
1612 // FIXME: check that we expect full elements here? 1630 // FIXME: check that we expect full elements here?
1613 return GNUNET_OK; 1631 return GNUNET_OK;
1614} 1632}
@@ -1621,89 +1639,90 @@ check_union_p2p_full_element(void *cls,
1621 * @param emsg the message 1639 * @param emsg the message
1622 */ 1640 */
1623void 1641void
1624handle_union_p2p_full_element(void *cls, 1642handle_union_p2p_full_element (void *cls,
1625 const struct GNUNET_SET_ElementMessage *emsg) 1643 const struct GNUNET_SET_ElementMessage *emsg)
1626{ 1644{
1627 struct Operation *op = cls; 1645 struct Operation *op = cls;
1628 struct ElementEntry *ee; 1646 struct ElementEntry *ee;
1629 struct KeyEntry *ke; 1647 struct KeyEntry *ke;
1630 uint16_t element_size; 1648 uint16_t element_size;
1631 1649
1632 element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage); 1650 element_size = ntohs (emsg->header.size) - sizeof(struct
1633 ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size); 1651 GNUNET_SET_ElementMessage);
1634 GNUNET_memcpy(&ee[1], &emsg[1], element_size); 1652 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1653 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1635 ee->element.size = element_size; 1654 ee->element.size = element_size;
1636 ee->element.data = &ee[1]; 1655 ee->element.data = &ee[1];
1637 ee->element.element_type = ntohs(emsg->element_type); 1656 ee->element.element_type = ntohs (emsg->element_type);
1638 ee->remote = GNUNET_YES; 1657 ee->remote = GNUNET_YES;
1639 GNUNET_SET_element_hash(&ee->element, &ee->element_hash); 1658 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1640 1659
1641 LOG(GNUNET_ERROR_TYPE_DEBUG, 1660 LOG (GNUNET_ERROR_TYPE_DEBUG,
1642 "Got element (full diff, size %u, hash %s) from peer\n", 1661 "Got element (full diff, size %u, hash %s) from peer\n",
1643 (unsigned int)element_size, 1662 (unsigned int) element_size,
1644 GNUNET_h2s(&ee->element_hash)); 1663 GNUNET_h2s (&ee->element_hash));
1645 1664
1646 GNUNET_STATISTICS_update(_GSS_statistics, 1665 GNUNET_STATISTICS_update (_GSS_statistics,
1647 "# received elements", 1666 "# received elements",
1648 1, 1667 1,
1649 GNUNET_NO); 1668 GNUNET_NO);
1650 GNUNET_STATISTICS_update(_GSS_statistics, 1669 GNUNET_STATISTICS_update (_GSS_statistics,
1651 "# exchanged elements", 1670 "# exchanged elements",
1652 1, 1671 1,
1653 GNUNET_NO); 1672 GNUNET_NO);
1654 1673
1655 op->state->received_total++; 1674 op->state->received_total++;
1656 1675
1657 ke = op_get_element(op, &ee->element_hash); 1676 ke = op_get_element (op, &ee->element_hash);
1658 if (NULL != ke) 1677 if (NULL != ke)
1659 { 1678 {
1660 /* Got repeated element. Should not happen since 1679 /* Got repeated element. Should not happen since
1661 * we track demands. */ 1680 * we track demands. */
1662 GNUNET_STATISTICS_update(_GSS_statistics, 1681 GNUNET_STATISTICS_update (_GSS_statistics,
1663 "# repeated elements", 1682 "# repeated elements",
1664 1, 1683 1,
1665 GNUNET_NO); 1684 GNUNET_NO);
1666 ke->received = GNUNET_YES; 1685 ke->received = GNUNET_YES;
1667 GNUNET_free(ee); 1686 GNUNET_free (ee);
1668 } 1687 }
1669 else 1688 else
1689 {
1690 LOG (GNUNET_ERROR_TYPE_DEBUG,
1691 "Registering new element from remote peer\n");
1692 op->state->received_fresh++;
1693 op_register_element (op, ee, GNUNET_YES);
1694 /* only send results immediately if the client wants it */
1695 switch (op->result_mode)
1670 { 1696 {
1671 LOG(GNUNET_ERROR_TYPE_DEBUG, 1697 case GNUNET_SET_RESULT_ADDED:
1672 "Registering new element from remote peer\n"); 1698 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1673 op->state->received_fresh++; 1699 break;
1674 op_register_element(op, ee, GNUNET_YES); 1700
1675 /* only send results immediately if the client wants it */ 1701 case GNUNET_SET_RESULT_SYMMETRIC:
1676 switch (op->result_mode) 1702 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1677 { 1703 break;
1678 case GNUNET_SET_RESULT_ADDED: 1704
1679 send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK); 1705 default:
1680 break; 1706 /* Result mode not supported, should have been caught earlier. */
1681 1707 GNUNET_break (0);
1682 case GNUNET_SET_RESULT_SYMMETRIC: 1708 break;
1683 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1684 break;
1685
1686 default:
1687 /* Result mode not supported, should have been caught earlier. */
1688 GNUNET_break(0);
1689 break;
1690 }
1691 } 1709 }
1710 }
1692 1711
1693 if ((GNUNET_YES == op->byzantine) && 1712 if ((GNUNET_YES == op->byzantine) &&
1694 (op->state->received_total > 384 + op->state->received_fresh * 4) && 1713 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1695 (op->state->received_fresh < op->state->received_total / 6)) 1714 (op->state->received_fresh < op->state->received_total / 6))
1696 { 1715 {
1697 /* The other peer gave us lots of old elements, there's something wrong. */ 1716 /* The other peer gave us lots of old elements, there's something wrong. */
1698 LOG(GNUNET_ERROR_TYPE_ERROR, 1717 LOG (GNUNET_ERROR_TYPE_ERROR,
1699 "Other peer sent only %llu/%llu fresh elements, failing operation\n", 1718 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1700 (unsigned long long)op->state->received_fresh, 1719 (unsigned long long) op->state->received_fresh,
1701 (unsigned long long)op->state->received_total); 1720 (unsigned long long) op->state->received_total);
1702 GNUNET_break_op(0); 1721 GNUNET_break_op (0);
1703 fail_union_operation(op); 1722 fail_union_operation (op);
1704 return; 1723 return;
1705 } 1724 }
1706 GNUNET_CADET_receive_done(op->channel); 1725 GNUNET_CADET_receive_done (op->channel);
1707} 1726}
1708 1727
1709 1728
@@ -1715,30 +1734,30 @@ handle_union_p2p_full_element(void *cls,
1715 * @param msg the message 1734 * @param msg the message
1716 */ 1735 */
1717int 1736int
1718check_union_p2p_inquiry(void *cls, 1737check_union_p2p_inquiry (void *cls,
1719 const struct InquiryMessage *msg) 1738 const struct InquiryMessage *msg)
1720{ 1739{
1721 struct Operation *op = cls; 1740 struct Operation *op = cls;
1722 unsigned int num_keys; 1741 unsigned int num_keys;
1723 1742
1724 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1743 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1725 { 1744 {
1726 GNUNET_break_op(0); 1745 GNUNET_break_op (0);
1727 return GNUNET_SYSERR; 1746 return GNUNET_SYSERR;
1728 } 1747 }
1729 if (op->state->phase != PHASE_INVENTORY_PASSIVE) 1748 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1730 { 1749 {
1731 GNUNET_break_op(0); 1750 GNUNET_break_op (0);
1732 return GNUNET_SYSERR; 1751 return GNUNET_SYSERR;
1733 } 1752 }
1734 num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage)) 1753 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1735 / sizeof(struct IBF_Key); 1754 / sizeof(struct IBF_Key);
1736 if ((ntohs(msg->header.size) - sizeof(struct InquiryMessage)) 1755 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1737 != num_keys * sizeof(struct IBF_Key)) 1756 != num_keys * sizeof(struct IBF_Key))
1738 { 1757 {
1739 GNUNET_break_op(0); 1758 GNUNET_break_op (0);
1740 return GNUNET_SYSERR; 1759 return GNUNET_SYSERR;
1741 } 1760 }
1742 return GNUNET_OK; 1761 return GNUNET_OK;
1743} 1762}
1744 1763
@@ -1751,30 +1770,30 @@ check_union_p2p_inquiry(void *cls,
1751 * @param msg the message 1770 * @param msg the message
1752 */ 1771 */
1753void 1772void
1754handle_union_p2p_inquiry(void *cls, 1773handle_union_p2p_inquiry (void *cls,
1755 const struct InquiryMessage *msg) 1774 const struct InquiryMessage *msg)
1756{ 1775{
1757 struct Operation *op = cls; 1776 struct Operation *op = cls;
1758 const struct IBF_Key *ibf_key; 1777 const struct IBF_Key *ibf_key;
1759 unsigned int num_keys; 1778 unsigned int num_keys;
1760 1779
1761 LOG(GNUNET_ERROR_TYPE_DEBUG, 1780 LOG (GNUNET_ERROR_TYPE_DEBUG,
1762 "Received union inquiry\n"); 1781 "Received union inquiry\n");
1763 num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage)) 1782 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1764 / sizeof(struct IBF_Key); 1783 / sizeof(struct IBF_Key);
1765 ibf_key = (const struct IBF_Key *)&msg[1]; 1784 ibf_key = (const struct IBF_Key *) &msg[1];
1766 while (0 != num_keys--) 1785 while (0 != num_keys--)
1767 { 1786 {
1768 struct IBF_Key unsalted_key; 1787 struct IBF_Key unsalted_key;
1769 1788
1770 unsalt_key(ibf_key, 1789 unsalt_key (ibf_key,
1771 ntohl(msg->salt), 1790 ntohl (msg->salt),
1772 &unsalted_key); 1791 &unsalted_key);
1773 send_offers_for_key(op, 1792 send_offers_for_key (op,
1774 unsalted_key); 1793 unsalted_key);
1775 ibf_key++; 1794 ibf_key++;
1776 } 1795 }
1777 GNUNET_CADET_receive_done(op->channel); 1796 GNUNET_CADET_receive_done (op->channel);
1778} 1797}
1779 1798
1780 1799
@@ -1789,9 +1808,9 @@ handle_union_p2p_inquiry(void *cls,
1789 * #GNUNET_NO if not. 1808 * #GNUNET_NO if not.
1790 */ 1809 */
1791static int 1810static int
1792send_missing_full_elements_iter(void *cls, 1811send_missing_full_elements_iter (void *cls,
1793 uint32_t key, 1812 uint32_t key,
1794 void *value) 1813 void *value)
1795{ 1814{
1796 struct Operation *op = cls; 1815 struct Operation *op = cls;
1797 struct KeyEntry *ke = value; 1816 struct KeyEntry *ke = value;
@@ -1801,15 +1820,15 @@ send_missing_full_elements_iter(void *cls,
1801 1820
1802 if (GNUNET_YES == ke->received) 1821 if (GNUNET_YES == ke->received)
1803 return GNUNET_YES; 1822 return GNUNET_YES;
1804 ev = GNUNET_MQ_msg_extra(emsg, 1823 ev = GNUNET_MQ_msg_extra (emsg,
1805 ee->element.size, 1824 ee->element.size,
1806 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); 1825 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1807 GNUNET_memcpy(&emsg[1], 1826 GNUNET_memcpy (&emsg[1],
1808 ee->element.data, 1827 ee->element.data,
1809 ee->element.size); 1828 ee->element.size);
1810 emsg->element_type = htons(ee->element.element_type); 1829 emsg->element_type = htons (ee->element.element_type);
1811 GNUNET_MQ_send(op->mq, 1830 GNUNET_MQ_send (op->mq,
1812 ev); 1831 ev);
1813 return GNUNET_YES; 1832 return GNUNET_YES;
1814} 1833}
1815 1834
@@ -1821,30 +1840,30 @@ send_missing_full_elements_iter(void *cls,
1821 * @param mh the demand message 1840 * @param mh the demand message
1822 */ 1841 */
1823void 1842void
1824handle_union_p2p_request_full(void *cls, 1843handle_union_p2p_request_full (void *cls,
1825 const struct GNUNET_MessageHeader *mh) 1844 const struct GNUNET_MessageHeader *mh)
1826{ 1845{
1827 struct Operation *op = cls; 1846 struct Operation *op = cls;
1828 1847
1829 LOG(GNUNET_ERROR_TYPE_DEBUG, 1848 LOG (GNUNET_ERROR_TYPE_DEBUG,
1830 "Received request for full set transmission\n"); 1849 "Received request for full set transmission\n");
1831 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1850 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1832 { 1851 {
1833 GNUNET_break_op(0); 1852 GNUNET_break_op (0);
1834 fail_union_operation(op); 1853 fail_union_operation (op);
1835 return; 1854 return;
1836 } 1855 }
1837 if (PHASE_EXPECT_IBF != op->state->phase) 1856 if (PHASE_EXPECT_IBF != op->state->phase)
1838 { 1857 {
1839 GNUNET_break_op(0); 1858 GNUNET_break_op (0);
1840 fail_union_operation(op); 1859 fail_union_operation (op);
1841 return; 1860 return;
1842 } 1861 }
1843 1862
1844 // FIXME: we need to check that our set is larger than the 1863 // FIXME: we need to check that our set is larger than the
1845 // byzantine_lower_bound by some threshold 1864 // byzantine_lower_bound by some threshold
1846 send_full_set(op); 1865 send_full_set (op);
1847 GNUNET_CADET_receive_done(op->channel); 1866 GNUNET_CADET_receive_done (op->channel);
1848} 1867}
1849 1868
1850 1869
@@ -1855,55 +1874,55 @@ handle_union_p2p_request_full(void *cls,
1855 * @param mh the demand message 1874 * @param mh the demand message
1856 */ 1875 */
1857void 1876void
1858handle_union_p2p_full_done(void *cls, 1877handle_union_p2p_full_done (void *cls,
1859 const struct GNUNET_MessageHeader *mh) 1878 const struct GNUNET_MessageHeader *mh)
1860{ 1879{
1861 struct Operation *op = cls; 1880 struct Operation *op = cls;
1862 1881
1863 switch (op->state->phase) 1882 switch (op->state->phase)
1864 { 1883 {
1865 case PHASE_EXPECT_IBF: 1884 case PHASE_EXPECT_IBF:
1866 { 1885 {
1867 struct GNUNET_MQ_Envelope *ev; 1886 struct GNUNET_MQ_Envelope *ev;
1868 1887
1869 LOG(GNUNET_ERROR_TYPE_DEBUG, 1888 LOG (GNUNET_ERROR_TYPE_DEBUG,
1870 "got FULL DONE, sending elements that other peer is missing\n"); 1889 "got FULL DONE, sending elements that other peer is missing\n");
1871 1890
1872 /* send all the elements that did not come from the remote peer */ 1891 /* send all the elements that did not come from the remote peer */
1873 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element, 1892 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1874 &send_missing_full_elements_iter, 1893 &send_missing_full_elements_iter,
1875 op); 1894 op);
1876 1895
1877 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 1896 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1878 GNUNET_MQ_send(op->mq, 1897 GNUNET_MQ_send (op->mq,
1879 ev); 1898 ev);
1880 op->state->phase = PHASE_DONE; 1899 op->state->phase = PHASE_DONE;
1881 /* we now wait until the other peer sends us the OVER message*/ 1900 /* we now wait until the other peer sends us the OVER message*/
1882 } 1901 }
1883 break; 1902 break;
1884 1903
1885 case PHASE_FULL_SENDING: 1904 case PHASE_FULL_SENDING:
1886 { 1905 {
1887 LOG(GNUNET_ERROR_TYPE_DEBUG, 1906 LOG (GNUNET_ERROR_TYPE_DEBUG,
1888 "got FULL DONE, finishing\n"); 1907 "got FULL DONE, finishing\n");
1889 /* We sent the full set, and got the response for that. We're done. */ 1908 /* We sent the full set, and got the response for that. We're done. */
1890 op->state->phase = PHASE_DONE; 1909 op->state->phase = PHASE_DONE;
1891 GNUNET_CADET_receive_done(op->channel); 1910 GNUNET_CADET_receive_done (op->channel);
1892 send_client_done(op); 1911 send_client_done (op);
1893 _GSS_operation_destroy2(op); 1912 _GSS_operation_destroy2 (op);
1894 return; 1913 return;
1895 } 1914 }
1896 break; 1915 break;
1897 1916
1898 default: 1917 default:
1899 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1900 "Handle full done phase is %u\n", 1919 "Handle full done phase is %u\n",
1901 (unsigned)op->state->phase); 1920 (unsigned) op->state->phase);
1902 GNUNET_break_op(0); 1921 GNUNET_break_op (0);
1903 fail_union_operation(op); 1922 fail_union_operation (op);
1904 return; 1923 return;
1905 } 1924 }
1906 GNUNET_CADET_receive_done(op->channel); 1925 GNUNET_CADET_receive_done (op->channel);
1907} 1926}
1908 1927
1909 1928
@@ -1916,25 +1935,25 @@ handle_union_p2p_full_done(void *cls,
1916 * @return #GNUNET_OK if @a mh is well-formed 1935 * @return #GNUNET_OK if @a mh is well-formed
1917 */ 1936 */
1918int 1937int
1919check_union_p2p_demand(void *cls, 1938check_union_p2p_demand (void *cls,
1920 const struct GNUNET_MessageHeader *mh) 1939 const struct GNUNET_MessageHeader *mh)
1921{ 1940{
1922 struct Operation *op = cls; 1941 struct Operation *op = cls;
1923 unsigned int num_hashes; 1942 unsigned int num_hashes;
1924 1943
1925 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1944 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1926 { 1945 {
1927 GNUNET_break_op(0); 1946 GNUNET_break_op (0);
1928 return GNUNET_SYSERR; 1947 return GNUNET_SYSERR;
1929 } 1948 }
1930 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) 1949 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1931 / sizeof(struct GNUNET_HashCode); 1950 / sizeof(struct GNUNET_HashCode);
1932 if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) 1951 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1933 != num_hashes * sizeof(struct GNUNET_HashCode)) 1952 != num_hashes * sizeof(struct GNUNET_HashCode))
1934 { 1953 {
1935 GNUNET_break_op(0); 1954 GNUNET_break_op (0);
1936 return GNUNET_SYSERR; 1955 return GNUNET_SYSERR;
1937 } 1956 }
1938 return GNUNET_OK; 1957 return GNUNET_OK;
1939} 1958}
1940 1959
@@ -1947,8 +1966,8 @@ check_union_p2p_demand(void *cls,
1947 * @param mh the demand message 1966 * @param mh the demand message
1948 */ 1967 */
1949void 1968void
1950handle_union_p2p_demand(void *cls, 1969handle_union_p2p_demand (void *cls,
1951 const struct GNUNET_MessageHeader *mh) 1970 const struct GNUNET_MessageHeader *mh)
1952{ 1971{
1953 struct Operation *op = cls; 1972 struct Operation *op = cls;
1954 struct ElementEntry *ee; 1973 struct ElementEntry *ee;
@@ -1957,60 +1976,61 @@ handle_union_p2p_demand(void *cls,
1957 unsigned int num_hashes; 1976 unsigned int num_hashes;
1958 struct GNUNET_MQ_Envelope *ev; 1977 struct GNUNET_MQ_Envelope *ev;
1959 1978
1960 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) 1979 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1961 / sizeof(struct GNUNET_HashCode); 1980 / sizeof(struct GNUNET_HashCode);
1962 for (hash = (const struct GNUNET_HashCode *)&mh[1]; 1981 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1963 num_hashes > 0; 1982 num_hashes > 0;
1964 hash++, num_hashes--) 1983 hash++, num_hashes--)
1984 {
1985 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1986 hash);
1987 if (NULL == ee)
1965 { 1988 {
1966 ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements, 1989 /* Demand for non-existing element. */
1967 hash); 1990 GNUNET_break_op (0);
1968 if (NULL == ee) 1991 fail_union_operation (op);
1969 { 1992 return;
1970 /* Demand for non-existing element. */ 1993 }
1971 GNUNET_break_op(0); 1994 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1972 fail_union_operation(op); 1995 {
1973 return; 1996 /* Probably confused lazily copied sets. */
1974 } 1997 GNUNET_break_op (0);
1975 if (GNUNET_NO == _GSS_is_element_of_operation(ee, op)) 1998 fail_union_operation (op);
1976 { 1999 return;
1977 /* Probably confused lazily copied sets. */
1978 GNUNET_break_op(0);
1979 fail_union_operation(op);
1980 return;
1981 }
1982 ev = GNUNET_MQ_msg_extra(emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1983 GNUNET_memcpy(&emsg[1], ee->element.data, ee->element.size);
1984 emsg->reserved = htons(0);
1985 emsg->element_type = htons(ee->element.element_type);
1986 LOG(GNUNET_ERROR_TYPE_DEBUG,
1987 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1988 (void *)op,
1989 (unsigned int)ee->element.size,
1990 GNUNET_h2s(&ee->element_hash));
1991 GNUNET_MQ_send(op->mq, ev);
1992 GNUNET_STATISTICS_update(_GSS_statistics,
1993 "# exchanged elements",
1994 1,
1995 GNUNET_NO);
1996
1997 switch (op->result_mode)
1998 {
1999 case GNUNET_SET_RESULT_ADDED:
2000 /* Nothing to do. */
2001 break;
2002
2003 case GNUNET_SET_RESULT_SYMMETRIC:
2004 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2005 break;
2006
2007 default:
2008 /* Result mode not supported, should have been caught earlier. */
2009 GNUNET_break(0);
2010 break;
2011 }
2012 } 2000 }
2013 GNUNET_CADET_receive_done(op->channel); 2001 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
2002 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
2003 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
2004 emsg->reserved = htons (0);
2005 emsg->element_type = htons (ee->element.element_type);
2006 LOG (GNUNET_ERROR_TYPE_DEBUG,
2007 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2008 (void *) op,
2009 (unsigned int) ee->element.size,
2010 GNUNET_h2s (&ee->element_hash));
2011 GNUNET_MQ_send (op->mq, ev);
2012 GNUNET_STATISTICS_update (_GSS_statistics,
2013 "# exchanged elements",
2014 1,
2015 GNUNET_NO);
2016
2017 switch (op->result_mode)
2018 {
2019 case GNUNET_SET_RESULT_ADDED:
2020 /* Nothing to do. */
2021 break;
2022
2023 case GNUNET_SET_RESULT_SYMMETRIC:
2024 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2025 break;
2026
2027 default:
2028 /* Result mode not supported, should have been caught earlier. */
2029 GNUNET_break (0);
2030 break;
2031 }
2032 }
2033 GNUNET_CADET_receive_done (op->channel);
2014} 2034}
2015 2035
2016 2036
@@ -2022,32 +2042,32 @@ handle_union_p2p_demand(void *cls,
2022 * @return #GNUNET_OK if @a mh is well-formed 2042 * @return #GNUNET_OK if @a mh is well-formed
2023 */ 2043 */
2024int 2044int
2025check_union_p2p_offer(void *cls, 2045check_union_p2p_offer (void *cls,
2026 const struct GNUNET_MessageHeader *mh) 2046 const struct GNUNET_MessageHeader *mh)
2027{ 2047{
2028 struct Operation *op = cls; 2048 struct Operation *op = cls;
2029 unsigned int num_hashes; 2049 unsigned int num_hashes;
2030 2050
2031 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 2051 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2032 { 2052 {
2033 GNUNET_break_op(0); 2053 GNUNET_break_op (0);
2034 return GNUNET_SYSERR; 2054 return GNUNET_SYSERR;
2035 } 2055 }
2036 /* look up elements and send them */ 2056 /* look up elements and send them */
2037 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && 2057 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2038 (op->state->phase != PHASE_INVENTORY_ACTIVE)) 2058 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2039 { 2059 {
2040 GNUNET_break_op(0); 2060 GNUNET_break_op (0);
2041 return GNUNET_SYSERR; 2061 return GNUNET_SYSERR;
2042 } 2062 }
2043 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) 2063 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2044 / sizeof(struct GNUNET_HashCode); 2064 / sizeof(struct GNUNET_HashCode);
2045 if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) != 2065 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2046 num_hashes * sizeof(struct GNUNET_HashCode)) 2066 num_hashes * sizeof(struct GNUNET_HashCode))
2047 { 2067 {
2048 GNUNET_break_op(0); 2068 GNUNET_break_op (0);
2049 return GNUNET_SYSERR; 2069 return GNUNET_SYSERR;
2050 } 2070 }
2051 return GNUNET_OK; 2071 return GNUNET_OK;
2052} 2072}
2053 2073
@@ -2060,56 +2080,57 @@ check_union_p2p_offer(void *cls,
2060 * @param mh the message 2080 * @param mh the message
2061 */ 2081 */
2062void 2082void
2063handle_union_p2p_offer(void *cls, 2083handle_union_p2p_offer (void *cls,
2064 const struct GNUNET_MessageHeader *mh) 2084 const struct GNUNET_MessageHeader *mh)
2065{ 2085{
2066 struct Operation *op = cls; 2086 struct Operation *op = cls;
2067 const struct GNUNET_HashCode *hash; 2087 const struct GNUNET_HashCode *hash;
2068 unsigned int num_hashes; 2088 unsigned int num_hashes;
2069 2089
2070 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) 2090 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2071 / sizeof(struct GNUNET_HashCode); 2091 / sizeof(struct GNUNET_HashCode);
2072 for (hash = (const struct GNUNET_HashCode *)&mh[1]; 2092 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2073 num_hashes > 0; 2093 num_hashes > 0;
2074 hash++, num_hashes--) 2094 hash++, num_hashes--)
2075 { 2095 {
2076 struct ElementEntry *ee; 2096 struct ElementEntry *ee;
2077 struct GNUNET_MessageHeader *demands; 2097 struct GNUNET_MessageHeader *demands;
2078 struct GNUNET_MQ_Envelope *ev; 2098 struct GNUNET_MQ_Envelope *ev;
2079 2099
2080 ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements, 2100 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2081 hash); 2101 hash);
2082 if (NULL != ee) 2102 if (NULL != ee)
2083 if (GNUNET_YES == _GSS_is_element_of_operation(ee, op)) 2103 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2084 continue; 2104 continue;
2085 2105
2086 if (GNUNET_YES == 2106 if (GNUNET_YES ==
2087 GNUNET_CONTAINER_multihashmap_contains(op->state->demanded_hashes, 2107 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2088 hash)) 2108 hash))
2089 { 2109 {
2090 LOG(GNUNET_ERROR_TYPE_DEBUG, 2110 LOG (GNUNET_ERROR_TYPE_DEBUG,
2091 "Skipped sending duplicate demand\n"); 2111 "Skipped sending duplicate demand\n");
2092 continue; 2112 continue;
2093 } 2113 }
2094 2114
2095 GNUNET_assert(GNUNET_OK == 2115 GNUNET_assert (GNUNET_OK ==
2096 GNUNET_CONTAINER_multihashmap_put(op->state->demanded_hashes, 2116 GNUNET_CONTAINER_multihashmap_put (
2097 hash, 2117 op->state->demanded_hashes,
2098 NULL, 2118 hash,
2099 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); 2119 NULL,
2100 2120 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2101 LOG(GNUNET_ERROR_TYPE_DEBUG, 2121
2102 "[OP %x] Requesting element (hash %s)\n", 2122 LOG (GNUNET_ERROR_TYPE_DEBUG,
2103 (void *)op, GNUNET_h2s(hash)); 2123 "[OP %x] Requesting element (hash %s)\n",
2104 ev = GNUNET_MQ_msg_header_extra(demands, 2124 (void *) op, GNUNET_h2s (hash));
2105 sizeof(struct GNUNET_HashCode), 2125 ev = GNUNET_MQ_msg_header_extra (demands,
2106 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); 2126 sizeof(struct GNUNET_HashCode),
2107 GNUNET_memcpy(&demands[1], 2127 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2108 hash, 2128 GNUNET_memcpy (&demands[1],
2109 sizeof(struct GNUNET_HashCode)); 2129 hash,
2110 GNUNET_MQ_send(op->mq, ev); 2130 sizeof(struct GNUNET_HashCode));
2111 } 2131 GNUNET_MQ_send (op->mq, ev);
2112 GNUNET_CADET_receive_done(op->channel); 2132 }
2133 GNUNET_CADET_receive_done (op->channel);
2113} 2134}
2114 2135
2115 2136
@@ -2120,58 +2141,58 @@ handle_union_p2p_offer(void *cls,
2120 * @param mh the message 2141 * @param mh the message
2121 */ 2142 */
2122void 2143void
2123handle_union_p2p_done(void *cls, 2144handle_union_p2p_done (void *cls,
2124 const struct GNUNET_MessageHeader *mh) 2145 const struct GNUNET_MessageHeader *mh)
2125{ 2146{
2126 struct Operation *op = cls; 2147 struct Operation *op = cls;
2127 2148
2128 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 2149 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2129 { 2150 {
2130 GNUNET_break_op(0); 2151 GNUNET_break_op (0);
2131 fail_union_operation(op); 2152 fail_union_operation (op);
2132 return; 2153 return;
2133 } 2154 }
2134 switch (op->state->phase) 2155 switch (op->state->phase)
2135 { 2156 {
2136 case PHASE_INVENTORY_PASSIVE: 2157 case PHASE_INVENTORY_PASSIVE:
2137 /* We got all requests, but still have to send our elements in response. */ 2158 /* We got all requests, but still have to send our elements in response. */
2138 op->state->phase = PHASE_FINISH_WAITING; 2159 op->state->phase = PHASE_FINISH_WAITING;
2139 2160
2140 LOG(GNUNET_ERROR_TYPE_DEBUG, 2161 LOG (GNUNET_ERROR_TYPE_DEBUG,
2141 "got DONE (as passive partner), waiting for our demands to be satisfied\n"); 2162 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2142 /* The active peer is done sending offers 2163 /* The active peer is done sending offers
2143 * and inquiries. This means that all 2164 * and inquiries. This means that all
2144 * our responses to that (demands and offers) 2165 * our responses to that (demands and offers)
2145 * must be in flight (queued or in mesh). 2166 * must be in flight (queued or in mesh).
2146 * 2167 *
2147 * We should notify the active peer once 2168 * We should notify the active peer once
2148 * all our demands are satisfied, so that the active 2169 * all our demands are satisfied, so that the active
2149 * peer can quit if we gave it everything. 2170 * peer can quit if we gave it everything.
2150 */ 2171 */
2151 GNUNET_CADET_receive_done(op->channel); 2172 GNUNET_CADET_receive_done (op->channel);
2152 maybe_finish(op); 2173 maybe_finish (op);
2153 return; 2174 return;
2154 2175
2155 case PHASE_INVENTORY_ACTIVE: 2176 case PHASE_INVENTORY_ACTIVE:
2156 LOG(GNUNET_ERROR_TYPE_DEBUG, 2177 LOG (GNUNET_ERROR_TYPE_DEBUG,
2157 "got DONE (as active partner), waiting to finish\n"); 2178 "got DONE (as active partner), waiting to finish\n");
2158 /* All demands of the other peer are satisfied, 2179 /* All demands of the other peer are satisfied,
2159 * and we processed all offers, thus we know 2180 * and we processed all offers, thus we know
2160 * exactly what our demands must be. 2181 * exactly what our demands must be.
2161 * 2182 *
2162 * We'll close the channel 2183 * We'll close the channel
2163 * to the other peer once our demands are met. 2184 * to the other peer once our demands are met.
2164 */ 2185 */
2165 op->state->phase = PHASE_FINISH_CLOSING; 2186 op->state->phase = PHASE_FINISH_CLOSING;
2166 GNUNET_CADET_receive_done(op->channel); 2187 GNUNET_CADET_receive_done (op->channel);
2167 maybe_finish(op); 2188 maybe_finish (op);
2168 return; 2189 return;
2169 2190
2170 default: 2191 default:
2171 GNUNET_break_op(0); 2192 GNUNET_break_op (0);
2172 fail_union_operation(op); 2193 fail_union_operation (op);
2173 return; 2194 return;
2174 } 2195 }
2175} 2196}
2176 2197
2177/** 2198/**
@@ -2181,10 +2202,10 @@ handle_union_p2p_done(void *cls,
2181 * @param mh the message 2202 * @param mh the message
2182 */ 2203 */
2183void 2204void
2184handle_union_p2p_over(void *cls, 2205handle_union_p2p_over (void *cls,
2185 const struct GNUNET_MessageHeader *mh) 2206 const struct GNUNET_MessageHeader *mh)
2186{ 2207{
2187 send_client_done(cls); 2208 send_client_done (cls);
2188} 2209}
2189 2210
2190 2211
@@ -2196,54 +2217,55 @@ handle_union_p2p_over(void *cls,
2196 * to convince it to accept, may be NULL 2217 * to convince it to accept, may be NULL
2197 */ 2218 */
2198static struct OperationState * 2219static struct OperationState *
2199union_evaluate(struct Operation *op, 2220union_evaluate (struct Operation *op,
2200 const struct GNUNET_MessageHeader *opaque_context) 2221 const struct GNUNET_MessageHeader *opaque_context)
2201{ 2222{
2202 struct OperationState *state; 2223 struct OperationState *state;
2203 struct GNUNET_MQ_Envelope *ev; 2224 struct GNUNET_MQ_Envelope *ev;
2204 struct OperationRequestMessage *msg; 2225 struct OperationRequestMessage *msg;
2205 2226
2206 ev = GNUNET_MQ_msg_nested_mh(msg, 2227 ev = GNUNET_MQ_msg_nested_mh (msg,
2207 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 2228 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2208 opaque_context); 2229 opaque_context);
2209 if (NULL == ev) 2230 if (NULL == ev)
2210 { 2231 {
2211 /* the context message is too large */ 2232 /* the context message is too large */
2212 GNUNET_break(0); 2233 GNUNET_break (0);
2213 return NULL; 2234 return NULL;
2214 } 2235 }
2215 state = GNUNET_new(struct OperationState); 2236 state = GNUNET_new (struct OperationState);
2216 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32, 2237 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2217 GNUNET_NO); 2238 GNUNET_NO);
2218 /* copy the current generation's strata estimator for this operation */ 2239 /* copy the current generation's strata estimator for this operation */
2219 state->se = strata_estimator_dup(op->set->state->se); 2240 state->se = strata_estimator_dup (op->set->state->se);
2220 /* we started the operation, thus we have to send the operation request */ 2241 /* we started the operation, thus we have to send the operation request */
2221 state->phase = PHASE_EXPECT_SE; 2242 state->phase = PHASE_EXPECT_SE;
2222 state->salt_receive = state->salt_send = 42; // FIXME????? 2243 state->salt_receive = state->salt_send = 42; // FIXME?????
2223 LOG(GNUNET_ERROR_TYPE_DEBUG, 2244 LOG (GNUNET_ERROR_TYPE_DEBUG,
2224 "Initiating union operation evaluation\n"); 2245 "Initiating union operation evaluation\n");
2225 GNUNET_STATISTICS_update(_GSS_statistics, 2246 GNUNET_STATISTICS_update (_GSS_statistics,
2226 "# of total union operations", 2247 "# of total union operations",
2227 1, 2248 1,
2228 GNUNET_NO); 2249 GNUNET_NO);
2229 GNUNET_STATISTICS_update(_GSS_statistics, 2250 GNUNET_STATISTICS_update (_GSS_statistics,
2230 "# of initiated union operations", 2251 "# of initiated union operations",
2231 1, 2252 1,
2232 GNUNET_NO); 2253 GNUNET_NO);
2233 msg->operation = htonl(GNUNET_SET_OPERATION_UNION); 2254 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2234 GNUNET_MQ_send(op->mq, 2255 GNUNET_MQ_send (op->mq,
2235 ev); 2256 ev);
2236 2257
2237 if (NULL != opaque_context) 2258 if (NULL != opaque_context)
2238 LOG(GNUNET_ERROR_TYPE_DEBUG, 2259 LOG (GNUNET_ERROR_TYPE_DEBUG,
2239 "sent op request with context message\n"); 2260 "sent op request with context message\n");
2240 else 2261 else
2241 LOG(GNUNET_ERROR_TYPE_DEBUG, 2262 LOG (GNUNET_ERROR_TYPE_DEBUG,
2242 "sent op request without context message\n"); 2263 "sent op request without context message\n");
2243 2264
2244 op->state = state; 2265 op->state = state;
2245 initialize_key_to_element(op); 2266 initialize_key_to_element (op);
2246 state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element); 2267 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2268 state->key_to_element);
2247 return state; 2269 return state;
2248} 2270}
2249 2271
@@ -2255,7 +2277,7 @@ union_evaluate(struct Operation *op,
2255 * @param op operation that will be accepted as a union operation 2277 * @param op operation that will be accepted as a union operation
2256 */ 2278 */
2257static struct OperationState * 2279static struct OperationState *
2258union_accept(struct Operation *op) 2280union_accept (struct Operation *op)
2259{ 2281{
2260 struct OperationState *state; 2282 struct OperationState *state;
2261 const struct StrataEstimator *se; 2283 const struct StrataEstimator *se;
@@ -2265,46 +2287,48 @@ union_accept(struct Operation *op)
2265 size_t len; 2287 size_t len;
2266 uint16_t type; 2288 uint16_t type;
2267 2289
2268 LOG(GNUNET_ERROR_TYPE_DEBUG, 2290 LOG (GNUNET_ERROR_TYPE_DEBUG,
2269 "accepting set union operation\n"); 2291 "accepting set union operation\n");
2270 GNUNET_STATISTICS_update(_GSS_statistics, 2292 GNUNET_STATISTICS_update (_GSS_statistics,
2271 "# of accepted union operations", 2293 "# of accepted union operations",
2272 1, 2294 1,
2273 GNUNET_NO); 2295 GNUNET_NO);
2274 GNUNET_STATISTICS_update(_GSS_statistics, 2296 GNUNET_STATISTICS_update (_GSS_statistics,
2275 "# of total union operations", 2297 "# of total union operations",
2276 1, 2298 1,
2277 GNUNET_NO); 2299 GNUNET_NO);
2278 2300
2279 state = GNUNET_new(struct OperationState); 2301 state = GNUNET_new (struct OperationState);
2280 state->se = strata_estimator_dup(op->set->state->se); 2302 state->se = strata_estimator_dup (op->set->state->se);
2281 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32, 2303 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2282 GNUNET_NO); 2304 GNUNET_NO);
2283 state->salt_receive = state->salt_send = 42; // FIXME????? 2305 state->salt_receive = state->salt_send = 42; // FIXME?????
2284 op->state = state; 2306 op->state = state;
2285 initialize_key_to_element(op); 2307 initialize_key_to_element (op);
2286 state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element); 2308 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2309 state->key_to_element);
2287 2310
2288 /* kick off the operation */ 2311 /* kick off the operation */
2289 se = state->se; 2312 se = state->se;
2290 buf = GNUNET_malloc(se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); 2313 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2291 len = strata_estimator_write(se, 2314 len = strata_estimator_write (se,
2292 buf); 2315 buf);
2293 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) 2316 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2294 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; 2317 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2295 else 2318 else
2296 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; 2319 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2297 ev = GNUNET_MQ_msg_extra(strata_msg, 2320 ev = GNUNET_MQ_msg_extra (strata_msg,
2298 len, 2321 len,
2299 type); 2322 type);
2300 GNUNET_memcpy(&strata_msg[1], 2323 GNUNET_memcpy (&strata_msg[1],
2301 buf, 2324 buf,
2302 len); 2325 len);
2303 GNUNET_free(buf); 2326 GNUNET_free (buf);
2304 strata_msg->set_size 2327 strata_msg->set_size
2305 = GNUNET_htonll(GNUNET_CONTAINER_multihashmap_size(op->set->content->elements)); 2328 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
2306 GNUNET_MQ_send(op->mq, 2329 op->set->content->elements));
2307 ev); 2330 GNUNET_MQ_send (op->mq,
2331 ev);
2308 state->phase = PHASE_EXPECT_IBF; 2332 state->phase = PHASE_EXPECT_IBF;
2309 return state; 2333 return state;
2310} 2334}
@@ -2319,22 +2343,22 @@ union_accept(struct Operation *op)
2319 * @return the newly created set, NULL on error 2343 * @return the newly created set, NULL on error
2320 */ 2344 */
2321static struct SetState * 2345static struct SetState *
2322union_set_create(void) 2346union_set_create (void)
2323{ 2347{
2324 struct SetState *set_state; 2348 struct SetState *set_state;
2325 2349
2326 LOG(GNUNET_ERROR_TYPE_DEBUG, 2350 LOG (GNUNET_ERROR_TYPE_DEBUG,
2327 "union set created\n"); 2351 "union set created\n");
2328 set_state = GNUNET_new(struct SetState); 2352 set_state = GNUNET_new (struct SetState);
2329 set_state->se = strata_estimator_create(SE_STRATA_COUNT, 2353 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2330 SE_IBF_SIZE, SE_IBF_HASH_NUM); 2354 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2331 if (NULL == set_state->se) 2355 if (NULL == set_state->se)
2332 { 2356 {
2333 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 2357 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2334 "Failed to allocate strata estimator\n"); 2358 "Failed to allocate strata estimator\n");
2335 GNUNET_free(set_state); 2359 GNUNET_free (set_state);
2336 return NULL; 2360 return NULL;
2337 } 2361 }
2338 return set_state; 2362 return set_state;
2339} 2363}
2340 2364
@@ -2346,11 +2370,11 @@ union_set_create(void)
2346 * @param ee the element to add to the set 2370 * @param ee the element to add to the set
2347 */ 2371 */
2348static void 2372static void
2349union_add(struct SetState *set_state, 2373union_add (struct SetState *set_state,
2350 struct ElementEntry *ee) 2374 struct ElementEntry *ee)
2351{ 2375{
2352 strata_estimator_insert(set_state->se, 2376 strata_estimator_insert (set_state->se,
2353 get_ibf_key(&ee->element_hash)); 2377 get_ibf_key (&ee->element_hash));
2354} 2378}
2355 2379
2356 2380
@@ -2362,11 +2386,11 @@ union_add(struct SetState *set_state,
2362 * @param ee set element to remove 2386 * @param ee set element to remove
2363 */ 2387 */
2364static void 2388static void
2365union_remove(struct SetState *set_state, 2389union_remove (struct SetState *set_state,
2366 struct ElementEntry *ee) 2390 struct ElementEntry *ee)
2367{ 2391{
2368 strata_estimator_remove(set_state->se, 2392 strata_estimator_remove (set_state->se,
2369 get_ibf_key(&ee->element_hash)); 2393 get_ibf_key (&ee->element_hash));
2370} 2394}
2371 2395
2372 2396
@@ -2376,14 +2400,14 @@ union_remove(struct SetState *set_state,
2376 * @param set_state the set to destroy 2400 * @param set_state the set to destroy
2377 */ 2401 */
2378static void 2402static void
2379union_set_destroy(struct SetState *set_state) 2403union_set_destroy (struct SetState *set_state)
2380{ 2404{
2381 if (NULL != set_state->se) 2405 if (NULL != set_state->se)
2382 { 2406 {
2383 strata_estimator_destroy(set_state->se); 2407 strata_estimator_destroy (set_state->se);
2384 set_state->se = NULL; 2408 set_state->se = NULL;
2385 } 2409 }
2386 GNUNET_free(set_state); 2410 GNUNET_free (set_state);
2387} 2411}
2388 2412
2389 2413
@@ -2394,14 +2418,14 @@ union_set_destroy(struct SetState *set_state)
2394 * @return a copy of the union-specific set state 2418 * @return a copy of the union-specific set state
2395 */ 2419 */
2396static struct SetState * 2420static struct SetState *
2397union_copy_state(struct SetState *state) 2421union_copy_state (struct SetState *state)
2398{ 2422{
2399 struct SetState *new_state; 2423 struct SetState *new_state;
2400 2424
2401 GNUNET_assert((NULL != state) && 2425 GNUNET_assert ((NULL != state) &&
2402 (NULL != state->se)); 2426 (NULL != state->se));
2403 new_state = GNUNET_new(struct SetState); 2427 new_state = GNUNET_new (struct SetState);
2404 new_state->se = strata_estimator_dup(state->se); 2428 new_state->se = strata_estimator_dup (state->se);
2405 2429
2406 return new_state; 2430 return new_state;
2407} 2431}
@@ -2413,11 +2437,11 @@ union_copy_state(struct SetState *state)
2413 * @param op operation that lost the channel 2437 * @param op operation that lost the channel
2414 */ 2438 */
2415static void 2439static void
2416union_channel_death(struct Operation *op) 2440union_channel_death (struct Operation *op)
2417{ 2441{
2418 send_client_done(op); 2442 send_client_done (op);
2419 _GSS_operation_destroy(op, 2443 _GSS_operation_destroy (op,
2420 GNUNET_YES); 2444 GNUNET_YES);
2421} 2445}
2422 2446
2423 2447
@@ -2428,7 +2452,7 @@ union_channel_death(struct Operation *op)
2428 * @return the operation specific VTable 2452 * @return the operation specific VTable
2429 */ 2453 */
2430const struct SetVT * 2454const struct SetVT *
2431_GSS_union_vt() 2455_GSS_union_vt ()
2432{ 2456{
2433 static const struct SetVT union_vt = { 2457 static const struct SetVT union_vt = {
2434 .create = &union_set_create, 2458 .create = &union_set_create,