aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c2299
1 files changed, 1153 insertions, 1146 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 8786807dc..fd7bc24d4 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -16,7 +16,7 @@
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20/** 20/**
21 * @file set/gnunet-service-set_union.c 21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations 22 * @brief two-peer set operations
@@ -34,7 +34,7 @@
34#include <gcrypt.h> 34#include <gcrypt.h>
35 35
36 36
37#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__) 37#define LOG(kind, ...) GNUNET_log_from(kind, "set-union", __VA_ARGS__)
38 38
39 39
40/** 40/**
@@ -55,7 +55,7 @@
55/** 55/**
56 * Number of buckets that can be transmitted in one message. 56 * Number of buckets that can be transmitted in one message.
57 */ 57 */
58#define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) 58#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
59 59
60/** 60/**
61 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). 61 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
@@ -74,8 +74,7 @@
74/** 74/**
75 * Current phase we are in for a union operation. 75 * Current phase we are in for a union operation.
76 */ 76 */
77enum UnionOperationPhase 77enum UnionOperationPhase {
78{
79 /** 78 /**
80 * We sent the request message, and expect a strata estimator. 79 * We sent the request message, and expect a strata estimator.
81 */ 80 */
@@ -139,8 +138,7 @@ enum UnionOperationPhase
139/** 138/**
140 * State of an evaluate operation with another peer. 139 * State of an evaluate operation with another peer.
141 */ 140 */
142struct OperationState 141struct OperationState {
143{
144 /** 142 /**
145 * Copy of the set's strata estimator at the time of 143 * Copy of the set's strata estimator at the time of
146 * creation of this operation. 144 * creation of this operation.
@@ -216,8 +214,7 @@ struct OperationState
216/** 214/**
217 * The key entry is used to associate an ibf key with an element. 215 * The key entry is used to associate an ibf key with an element.
218 */ 216 */
219struct KeyEntry 217struct KeyEntry {
220{
221 /** 218 /**
222 * IBF key for the entry, derived from the current salt. 219 * IBF key for the entry, derived from the current salt.
223 */ 220 */
@@ -245,8 +242,7 @@ struct KeyEntry
245 * Used as a closure for sending elements 242 * Used as a closure for sending elements
246 * with a specific IBF key. 243 * with a specific IBF key.
247 */ 244 */
248struct SendElementClosure 245struct SendElementClosure {
249{
250 /** 246 /**
251 * The IBF key whose matching elements should be 247 * The IBF key whose matching elements should be
252 * sent. 248 * sent.
@@ -264,8 +260,7 @@ struct SendElementClosure
264/** 260/**
265 * Extra state required for efficient set union. 261 * Extra state required for efficient set union.
266 */ 262 */
267struct SetState 263struct SetState {
268{
269 /** 264 /**
270 * The strata estimator is only generated once for 265 * The strata estimator is only generated once for
271 * each set. 266 * each set.
@@ -287,19 +282,19 @@ struct SetState
287 * #GNUNET_NO if not. 282 * #GNUNET_NO if not.
288 */ 283 */
289static int 284static int
290destroy_key_to_element_iter (void *cls, 285destroy_key_to_element_iter(void *cls,
291 uint32_t key, 286 uint32_t key,
292 void *value) 287 void *value)
293{ 288{
294 struct KeyEntry *k = value; 289 struct KeyEntry *k = value;
295 290
296 GNUNET_assert (NULL != k); 291 GNUNET_assert(NULL != k);
297 if (GNUNET_YES == k->element->remote) 292 if (GNUNET_YES == k->element->remote)
298 { 293 {
299 GNUNET_free (k->element); 294 GNUNET_free(k->element);
300 k->element = NULL; 295 k->element = NULL;
301 } 296 }
302 GNUNET_free (k); 297 GNUNET_free(k);
303 return GNUNET_YES; 298 return GNUNET_YES;
304} 299}
305 300
@@ -311,44 +306,44 @@ destroy_key_to_element_iter (void *cls,
311 * @param op union operation to destroy 306 * @param op union operation to destroy
312 */ 307 */
313static void 308static void
314union_op_cancel (struct Operation *op) 309union_op_cancel(struct Operation *op)
315{ 310{
316 LOG (GNUNET_ERROR_TYPE_DEBUG, 311 LOG(GNUNET_ERROR_TYPE_DEBUG,
317 "destroying union op\n"); 312 "destroying union op\n");
318 /* check if the op was canceled twice */ 313 /* check if the op was canceled twice */
319 GNUNET_assert (NULL != op->state); 314 GNUNET_assert(NULL != op->state);
320 if (NULL != op->state->remote_ibf) 315 if (NULL != op->state->remote_ibf)
321 { 316 {
322 ibf_destroy (op->state->remote_ibf); 317 ibf_destroy(op->state->remote_ibf);
323 op->state->remote_ibf = NULL; 318 op->state->remote_ibf = NULL;
324 } 319 }
325 if (NULL != op->state->demanded_hashes) 320 if (NULL != op->state->demanded_hashes)
326 { 321 {
327 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); 322 GNUNET_CONTAINER_multihashmap_destroy(op->state->demanded_hashes);
328 op->state->demanded_hashes = NULL; 323 op->state->demanded_hashes = NULL;
329 } 324 }
330 if (NULL != op->state->local_ibf) 325 if (NULL != op->state->local_ibf)
331 { 326 {
332 ibf_destroy (op->state->local_ibf); 327 ibf_destroy(op->state->local_ibf);
333 op->state->local_ibf = NULL; 328 op->state->local_ibf = NULL;
334 } 329 }
335 if (NULL != op->state->se) 330 if (NULL != op->state->se)
336 { 331 {
337 strata_estimator_destroy (op->state->se); 332 strata_estimator_destroy(op->state->se);
338 op->state->se = NULL; 333 op->state->se = NULL;
339 } 334 }
340 if (NULL != op->state->key_to_element) 335 if (NULL != op->state->key_to_element)
341 { 336 {
342 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 337 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
343 &destroy_key_to_element_iter, 338 &destroy_key_to_element_iter,
344 NULL); 339 NULL);
345 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); 340 GNUNET_CONTAINER_multihashmap32_destroy(op->state->key_to_element);
346 op->state->key_to_element = NULL; 341 op->state->key_to_element = NULL;
347 } 342 }
348 GNUNET_free (op->state); 343 GNUNET_free(op->state);
349 op->state = NULL; 344 op->state = NULL;
350 LOG (GNUNET_ERROR_TYPE_DEBUG, 345 LOG(GNUNET_ERROR_TYPE_DEBUG,
351 "destroying union op done\n"); 346 "destroying union op done\n");
352} 347}
353 348
354 349
@@ -359,20 +354,20 @@ union_op_cancel (struct Operation *op)
359 * @param op the union operation to fail 354 * @param op the union operation to fail
360 */ 355 */
361static void 356static void
362fail_union_operation (struct Operation *op) 357fail_union_operation(struct Operation *op)
363{ 358{
364 struct GNUNET_MQ_Envelope *ev; 359 struct GNUNET_MQ_Envelope *ev;
365 struct GNUNET_SET_ResultMessage *msg; 360 struct GNUNET_SET_ResultMessage *msg;
366 361
367 LOG (GNUNET_ERROR_TYPE_WARNING, 362 LOG(GNUNET_ERROR_TYPE_WARNING,
368 "union operation failed\n"); 363 "union operation failed\n");
369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 364 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 365 msg->result_status = htons(GNUNET_SET_STATUS_FAILURE);
371 msg->request_id = htonl (op->client_request_id); 366 msg->request_id = htonl(op->client_request_id);
372 msg->element_type = htons (0); 367 msg->element_type = htons(0);
373 GNUNET_MQ_send (op->set->cs->mq, 368 GNUNET_MQ_send(op->set->cs->mq,
374 ev); 369 ev);
375 _GSS_operation_destroy (op, GNUNET_YES); 370 _GSS_operation_destroy(op, GNUNET_YES);
376} 371}
377 372
378 373
@@ -384,16 +379,16 @@ fail_union_operation (struct Operation *op)
384 * @return the derived IBF key 379 * @return the derived IBF key
385 */ 380 */
386static struct IBF_Key 381static struct IBF_Key
387get_ibf_key (const struct GNUNET_HashCode *src) 382get_ibf_key(const struct GNUNET_HashCode *src)
388{ 383{
389 struct IBF_Key key; 384 struct IBF_Key key;
390 uint16_t salt = 0; 385 uint16_t salt = 0;
391 386
392 GNUNET_assert (GNUNET_OK == 387 GNUNET_assert(GNUNET_OK ==
393 GNUNET_CRYPTO_kdf (&key, sizeof (key), 388 GNUNET_CRYPTO_kdf(&key, sizeof(key),
394 src, sizeof *src, 389 src, sizeof *src,
395 &salt, sizeof (salt), 390 &salt, sizeof(salt),
396 NULL, 0)); 391 NULL, 0));
397 return key; 392 return key;
398} 393}
399 394
@@ -401,8 +396,7 @@ get_ibf_key (const struct GNUNET_HashCode *src)
401/** 396/**
402 * Context for #op_get_element_iterator 397 * Context for #op_get_element_iterator
403 */ 398 */
404struct GetElementContext 399struct GetElementContext {
405{
406 /** 400 /**
407 * FIXME. 401 * FIXME.
408 */ 402 */
@@ -426,20 +420,20 @@ struct GetElementContext
426 * #GNUNET_NO if we've found the element. 420 * #GNUNET_NO if we've found the element.
427 */ 421 */
428static int 422static int
429op_get_element_iterator (void *cls, 423op_get_element_iterator(void *cls,
430 uint32_t key, 424 uint32_t key,
431 void *value) 425 void *value)
432{ 426{
433 struct GetElementContext *ctx = cls; 427 struct GetElementContext *ctx = cls;
434 struct KeyEntry *k = value; 428 struct KeyEntry *k = value;
435 429
436 GNUNET_assert (NULL != k); 430 GNUNET_assert(NULL != k);
437 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, 431 if (0 == GNUNET_CRYPTO_hash_cmp(&k->element->element_hash,
438 &ctx->hash)) 432 &ctx->hash))
439 { 433 {
440 ctx->k = k; 434 ctx->k = k;
441 return GNUNET_NO; 435 return GNUNET_NO;
442 } 436 }
443 return GNUNET_YES; 437 return GNUNET_YES;
444} 438}
445 439
@@ -453,27 +447,27 @@ op_get_element_iterator (void *cls,
453 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise 447 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
454 */ 448 */
455static struct KeyEntry * 449static struct KeyEntry *
456op_get_element (struct Operation *op, 450op_get_element(struct Operation *op,
457 const struct GNUNET_HashCode *element_hash) 451 const struct GNUNET_HashCode *element_hash)
458{ 452{
459 int ret; 453 int ret;
460 struct IBF_Key ibf_key; 454 struct IBF_Key ibf_key;
461 struct GetElementContext ctx = {{{ 0 }} , 0}; 455 struct GetElementContext ctx = { { { 0 } }, 0 };
462 456
463 ctx.hash = *element_hash; 457 ctx.hash = *element_hash;
464 458
465 ibf_key = get_ibf_key (element_hash); 459 ibf_key = get_ibf_key(element_hash);
466 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, 460 ret = GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element,
467 (uint32_t) ibf_key.key_val, 461 (uint32_t)ibf_key.key_val,
468 op_get_element_iterator, 462 op_get_element_iterator,
469 &ctx); 463 &ctx);
470 464
471 /* was the iteration aborted because we found the element? */ 465 /* was the iteration aborted because we found the element? */
472 if (GNUNET_SYSERR == ret) 466 if (GNUNET_SYSERR == ret)
473 { 467 {
474 GNUNET_assert (NULL != ctx.k); 468 GNUNET_assert(NULL != ctx.k);
475 return ctx.k; 469 return ctx.k;
476 } 470 }
477 return NULL; 471 return NULL;
478} 472}
479 473
@@ -493,23 +487,23 @@ op_get_element (struct Operation *op,
493 * @parem received was this element received from the remote peer? 487 * @parem received was this element received from the remote peer?
494 */ 488 */
495static void 489static void
496op_register_element (struct Operation *op, 490op_register_element(struct Operation *op,
497 struct ElementEntry *ee, 491 struct ElementEntry *ee,
498 int received) 492 int received)
499{ 493{
500 struct IBF_Key ibf_key; 494 struct IBF_Key ibf_key;
501 struct KeyEntry *k; 495 struct KeyEntry *k;
502 496
503 ibf_key = get_ibf_key (&ee->element_hash); 497 ibf_key = get_ibf_key(&ee->element_hash);
504 k = GNUNET_new (struct KeyEntry); 498 k = GNUNET_new(struct KeyEntry);
505 k->element = ee; 499 k->element = ee;
506 k->ibf_key = ibf_key; 500 k->ibf_key = ibf_key;
507 k->received = received; 501 k->received = received;
508 GNUNET_assert (GNUNET_OK == 502 GNUNET_assert(GNUNET_OK ==
509 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, 503 GNUNET_CONTAINER_multihashmap32_put(op->state->key_to_element,
510 (uint32_t) ibf_key.key_val, 504 (uint32_t)ibf_key.key_val,
511 k, 505 k,
512 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 506 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
513} 507}
514 508
515 509
@@ -517,12 +511,13 @@ op_register_element (struct Operation *op,
517 * FIXME. 511 * FIXME.
518 */ 512 */
519static void 513static void
520salt_key (const struct IBF_Key *k_in, 514salt_key(const struct IBF_Key *k_in,
521 uint32_t salt, 515 uint32_t salt,
522 struct IBF_Key *k_out) 516 struct IBF_Key *k_out)
523{ 517{
524 int s = salt % 64; 518 int s = salt % 64;
525 uint64_t x = k_in->key_val; 519 uint64_t x = k_in->key_val;
520
526 /* rotate ibf key */ 521 /* rotate ibf key */
527 x = (x >> s) | (x << (64 - s)); 522 x = (x >> s) | (x << (64 - s));
528 k_out->key_val = x; 523 k_out->key_val = x;
@@ -533,12 +528,13 @@ salt_key (const struct IBF_Key *k_in,
533 * FIXME. 528 * FIXME.
534 */ 529 */
535static void 530static void
536unsalt_key (const struct IBF_Key *k_in, 531unsalt_key(const struct IBF_Key *k_in,
537 uint32_t salt, 532 uint32_t salt,
538 struct IBF_Key *k_out) 533 struct IBF_Key *k_out)
539{ 534{
540 int s = salt % 64; 535 int s = salt % 64;
541 uint64_t x = k_in->key_val; 536 uint64_t x = k_in->key_val;
537
542 x = (x << s) | (x >> (64 - s)); 538 x = (x << s) | (x >> (64 - s));
543 k_out->key_val = x; 539 k_out->key_val = x;
544} 540}
@@ -552,23 +548,23 @@ unsalt_key (const struct IBF_Key *k_in,
552 * @param value the key entry to get the key from 548 * @param value the key entry to get the key from
553 */ 549 */
554static int 550static int
555prepare_ibf_iterator (void *cls, 551prepare_ibf_iterator(void *cls,
556 uint32_t key, 552 uint32_t key,
557 void *value) 553 void *value)
558{ 554{
559 struct Operation *op = cls; 555 struct Operation *op = cls;
560 struct KeyEntry *ke = value; 556 struct KeyEntry *ke = value;
561 struct IBF_Key salted_key; 557 struct IBF_Key salted_key;
562 558
563 LOG (GNUNET_ERROR_TYPE_DEBUG, 559 LOG(GNUNET_ERROR_TYPE_DEBUG,
564 "[OP %x] inserting %lx (hash %s) into ibf\n", 560 "[OP %x] inserting %lx (hash %s) into ibf\n",
565 (void *) op, 561 (void *)op,
566 (unsigned long) ke->ibf_key.key_val, 562 (unsigned long)ke->ibf_key.key_val,
567 GNUNET_h2s (&ke->element->element_hash)); 563 GNUNET_h2s(&ke->element->element_hash));
568 salt_key (&ke->ibf_key, 564 salt_key(&ke->ibf_key,
569 op->state->salt_send, 565 op->state->salt_send,
570 &salted_key); 566 &salted_key);
571 ibf_insert (op->state->local_ibf, salted_key); 567 ibf_insert(op->state->local_ibf, salted_key);
572 return GNUNET_YES; 568 return GNUNET_YES;
573} 569}
574 570
@@ -584,9 +580,9 @@ prepare_ibf_iterator (void *cls,
584 * @return #GNUNET_YES (to continue iterating) 580 * @return #GNUNET_YES (to continue iterating)
585 */ 581 */
586static int 582static int
587init_key_to_element_iterator (void *cls, 583init_key_to_element_iterator(void *cls,
588 const struct GNUNET_HashCode *key, 584 const struct GNUNET_HashCode *key,
589 void *value) 585 void *value)
590{ 586{
591 struct Operation *op = cls; 587 struct Operation *op = cls;
592 struct ElementEntry *ee = value; 588 struct ElementEntry *ee = value;
@@ -594,13 +590,13 @@ init_key_to_element_iterator (void *cls,
594 /* make sure that the element belongs to the set at the time 590 /* make sure that the element belongs to the set at the time
595 * of creating the operation */ 591 * of creating the operation */
596 if (GNUNET_NO == 592 if (GNUNET_NO ==
597 _GSS_is_element_of_operation (ee, 593 _GSS_is_element_of_operation(ee,
598 op)) 594 op))
599 return GNUNET_YES; 595 return GNUNET_YES;
600 GNUNET_assert (GNUNET_NO == ee->remote); 596 GNUNET_assert(GNUNET_NO == ee->remote);
601 op_register_element (op, 597 op_register_element(op,
602 ee, 598 ee,
603 GNUNET_NO); 599 GNUNET_NO);
604 return GNUNET_YES; 600 return GNUNET_YES;
605} 601}
606 602
@@ -612,16 +608,16 @@ init_key_to_element_iterator (void *cls,
612 * @param op the set union operation 608 * @param op the set union operation
613 */ 609 */
614static void 610static void
615initialize_key_to_element (struct Operation *op) 611initialize_key_to_element(struct Operation *op)
616{ 612{
617 unsigned int len; 613 unsigned int len;
618 614
619 GNUNET_assert (NULL == op->state->key_to_element); 615 GNUNET_assert(NULL == op->state->key_to_element);
620 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements); 616 len = GNUNET_CONTAINER_multihashmap_size(op->set->content->elements);
621 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 617 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create(len + 1);
622 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, 618 GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements,
623 &init_key_to_element_iterator, 619 &init_key_to_element_iterator,
624 op); 620 op);
625} 621}
626 622
627 623
@@ -634,23 +630,23 @@ initialize_key_to_element (struct Operation *op)
634 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure 630 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
635 */ 631 */
636static int 632static int
637prepare_ibf (struct Operation *op, 633prepare_ibf(struct Operation *op,
638 uint32_t size) 634 uint32_t size)
639{ 635{
640 GNUNET_assert (NULL != op->state->key_to_element); 636 GNUNET_assert(NULL != op->state->key_to_element);
641 637
642 if (NULL != op->state->local_ibf) 638 if (NULL != op->state->local_ibf)
643 ibf_destroy (op->state->local_ibf); 639 ibf_destroy(op->state->local_ibf);
644 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 640 op->state->local_ibf = ibf_create(size, SE_IBF_HASH_NUM);
645 if (NULL == op->state->local_ibf) 641 if (NULL == op->state->local_ibf)
646 { 642 {
647 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 643 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
648 "Failed to allocate local IBF\n"); 644 "Failed to allocate local IBF\n");
649 return GNUNET_SYSERR; 645 return GNUNET_SYSERR;
650 } 646 }
651 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 647 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
652 &prepare_ibf_iterator, 648 &prepare_ibf_iterator,
653 op); 649 op);
654 return GNUNET_OK; 650 return GNUNET_OK;
655} 651}
656 652
@@ -665,60 +661,60 @@ prepare_ibf (struct Operation *op,
665 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure 661 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
666 */ 662 */
667static int 663static int
668send_ibf (struct Operation *op, 664send_ibf(struct Operation *op,
669 uint16_t ibf_order) 665 uint16_t ibf_order)
670{ 666{
671 unsigned int buckets_sent = 0; 667 unsigned int buckets_sent = 0;
672 struct InvertibleBloomFilter *ibf; 668 struct InvertibleBloomFilter *ibf;
673 669
674 if (GNUNET_OK != 670 if (GNUNET_OK !=
675 prepare_ibf (op, 1<<ibf_order)) 671 prepare_ibf(op, 1 << ibf_order))
676 { 672 {
677 /* allocation failed */ 673 /* allocation failed */
678 return GNUNET_SYSERR; 674 return GNUNET_SYSERR;
679 } 675 }
680 676
681 LOG (GNUNET_ERROR_TYPE_DEBUG, 677 LOG(GNUNET_ERROR_TYPE_DEBUG,
682 "sending ibf of size %u\n", 678 "sending ibf of size %u\n",
683 1<<ibf_order); 679 1 << ibf_order);
684 680
685 { 681 {
686 char name[64] = { 0 }; 682 char name[64] = { 0 };
687 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order); 683 snprintf(name, sizeof(name), "# sent IBF (order %u)", ibf_order);
688 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); 684 GNUNET_STATISTICS_update(_GSS_statistics, name, 1, GNUNET_NO);
689 } 685 }
690 686
691 ibf = op->state->local_ibf; 687 ibf = op->state->local_ibf;
692 688
693 while (buckets_sent < (1 << ibf_order)) 689 while (buckets_sent < (1 << ibf_order))
694 { 690 {
695 unsigned int buckets_in_message; 691 unsigned int buckets_in_message;
696 struct GNUNET_MQ_Envelope *ev; 692 struct GNUNET_MQ_Envelope *ev;
697 struct IBFMessage *msg; 693 struct IBFMessage *msg;
698 694
699 buckets_in_message = (1 << ibf_order) - buckets_sent; 695 buckets_in_message = (1 << ibf_order) - buckets_sent;
700 /* limit to maximum */ 696 /* limit to maximum */
701 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 697 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
702 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 698 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
703 699
704 ev = GNUNET_MQ_msg_extra (msg, 700 ev = GNUNET_MQ_msg_extra(msg,
705 buckets_in_message * IBF_BUCKET_SIZE, 701 buckets_in_message * IBF_BUCKET_SIZE,
706 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); 702 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
707 msg->reserved1 = 0; 703 msg->reserved1 = 0;
708 msg->reserved2 = 0; 704 msg->reserved2 = 0;
709 msg->order = ibf_order; 705 msg->order = ibf_order;
710 msg->offset = htonl (buckets_sent); 706 msg->offset = htonl(buckets_sent);
711 msg->salt = htonl (op->state->salt_send); 707 msg->salt = htonl(op->state->salt_send);
712 ibf_write_slice (ibf, buckets_sent, 708 ibf_write_slice(ibf, buckets_sent,
713 buckets_in_message, &msg[1]); 709 buckets_in_message, &msg[1]);
714 buckets_sent += buckets_in_message; 710 buckets_sent += buckets_in_message;
715 LOG (GNUNET_ERROR_TYPE_DEBUG, 711 LOG(GNUNET_ERROR_TYPE_DEBUG,
716 "ibf chunk size %u, %u/%u sent\n", 712 "ibf chunk size %u, %u/%u sent\n",
717 buckets_in_message, 713 buckets_in_message,
718 buckets_sent, 714 buckets_sent,
719 1<<ibf_order); 715 1 << ibf_order);
720 GNUNET_MQ_send (op->mq, ev); 716 GNUNET_MQ_send(op->mq, ev);
721 } 717 }
722 718
723 /* The other peer must decode the IBF, so 719 /* The other peer must decode the IBF, so
724 * we're passive. */ 720 * we're passive. */
@@ -735,14 +731,14 @@ send_ibf (struct Operation *op,
735 * @return the required size of the ibf 731 * @return the required size of the ibf
736 */ 732 */
737static unsigned int 733static unsigned int
738get_order_from_difference (unsigned int diff) 734get_order_from_difference(unsigned int diff)
739{ 735{
740 unsigned int ibf_order; 736 unsigned int ibf_order;
741 737
742 ibf_order = 2; 738 ibf_order = 2;
743 while ( ( (1<<ibf_order) < (IBF_ALPHA * diff) || 739 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
744 ((1<<ibf_order) < SE_IBF_HASH_NUM) ) && 740 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
745 (ibf_order < MAX_IBF_ORDER) ) 741 (ibf_order < MAX_IBF_ORDER))
746 ibf_order++; 742 ibf_order++;
747 // add one for correction 743 // add one for correction
748 return ibf_order + 1; 744 return ibf_order + 1;
@@ -759,9 +755,9 @@ get_order_from_difference (unsigned int diff)
759 * @return #GNUNET_YES (to continue iterating) 755 * @return #GNUNET_YES (to continue iterating)
760 */ 756 */
761static int 757static int
762send_full_element_iterator (void *cls, 758send_full_element_iterator(void *cls,
763 const struct GNUNET_HashCode *key, 759 const struct GNUNET_HashCode *key,
764 void *value) 760 void *value)
765{ 761{
766 struct Operation *op = cls; 762 struct Operation *op = cls;
767 struct GNUNET_SET_ElementMessage *emsg; 763 struct GNUNET_SET_ElementMessage *emsg;
@@ -769,18 +765,18 @@ send_full_element_iterator (void *cls,
769 struct GNUNET_SET_Element *el = &ee->element; 765 struct GNUNET_SET_Element *el = &ee->element;
770 struct GNUNET_MQ_Envelope *ev; 766 struct GNUNET_MQ_Envelope *ev;
771 767
772 LOG (GNUNET_ERROR_TYPE_DEBUG, 768 LOG(GNUNET_ERROR_TYPE_DEBUG,
773 "Sending element %s\n", 769 "Sending element %s\n",
774 GNUNET_h2s (key)); 770 GNUNET_h2s(key));
775 ev = GNUNET_MQ_msg_extra (emsg, 771 ev = GNUNET_MQ_msg_extra(emsg,
776 el->size, 772 el->size,
777 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); 773 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
778 emsg->element_type = htons (el->element_type); 774 emsg->element_type = htons(el->element_type);
779 GNUNET_memcpy (&emsg[1], 775 GNUNET_memcpy(&emsg[1],
780 el->data, 776 el->data,
781 el->size); 777 el->size);
782 GNUNET_MQ_send (op->mq, 778 GNUNET_MQ_send(op->mq,
783 ev); 779 ev);
784 return GNUNET_YES; 780 return GNUNET_YES;
785} 781}
786 782
@@ -791,21 +787,21 @@ send_full_element_iterator (void *cls,
791 * @param op operation to switch to full set transmission. 787 * @param op operation to switch to full set transmission.
792 */ 788 */
793static void 789static void
794send_full_set (struct Operation *op) 790send_full_set(struct Operation *op)
795{ 791{
796 struct GNUNET_MQ_Envelope *ev; 792 struct GNUNET_MQ_Envelope *ev;
797 793
798 op->state->phase = PHASE_FULL_SENDING; 794 op->state->phase = PHASE_FULL_SENDING;
799 LOG (GNUNET_ERROR_TYPE_DEBUG, 795 LOG(GNUNET_ERROR_TYPE_DEBUG,
800 "Dedicing to transmit the full set\n"); 796 "Dedicing to transmit the full set\n");
801 /* FIXME: use a more memory-friendly way of doing this with an 797 /* FIXME: use a more memory-friendly way of doing this with an
802 iterator, just as we do in the non-full case! */ 798 iterator, just as we do in the non-full case! */
803 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, 799 (void)GNUNET_CONTAINER_multihashmap_iterate(op->set->content->elements,
804 &send_full_element_iterator, 800 &send_full_element_iterator,
805 op); 801 op);
806 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 802 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
807 GNUNET_MQ_send (op->mq, 803 GNUNET_MQ_send(op->mq,
808 ev); 804 ev);
809} 805}
810 806
811 807
@@ -816,26 +812,26 @@ send_full_set (struct Operation *op)
816 * @param msg the message 812 * @param msg the message
817 */ 813 */
818int 814int
819check_union_p2p_strata_estimator (void *cls, 815check_union_p2p_strata_estimator(void *cls,
820 const struct StrataEstimatorMessage *msg) 816 const struct StrataEstimatorMessage *msg)
821{ 817{
822 struct Operation *op = cls; 818 struct Operation *op = cls;
823 int is_compressed; 819 int is_compressed;
824 size_t len; 820 size_t len;
825 821
826 if (op->state->phase != PHASE_EXPECT_SE) 822 if (op->state->phase != PHASE_EXPECT_SE)
827 { 823 {
828 GNUNET_break (0); 824 GNUNET_break(0);
829 return GNUNET_SYSERR; 825 return GNUNET_SYSERR;
830 } 826 }
831 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); 827 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
832 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); 828 len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
833 if ( (GNUNET_NO == is_compressed) && 829 if ((GNUNET_NO == is_compressed) &&
834 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) 830 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
835 { 831 {
836 GNUNET_break (0); 832 GNUNET_break(0);
837 return GNUNET_SYSERR; 833 return GNUNET_SYSERR;
838 } 834 }
839 return GNUNET_OK; 835 return GNUNET_OK;
840} 836}
841 837
@@ -847,8 +843,8 @@ check_union_p2p_strata_estimator (void *cls,
847 * @param msg the message 843 * @param msg the message
848 */ 844 */
849void 845void
850handle_union_p2p_strata_estimator (void *cls, 846handle_union_p2p_strata_estimator(void *cls,
851 const struct StrataEstimatorMessage *msg) 847 const struct StrataEstimatorMessage *msg)
852{ 848{
853 struct Operation *op = cls; 849 struct Operation *op = cls;
854 struct StrataEstimator *remote_se; 850 struct StrataEstimator *remote_se;
@@ -857,116 +853,116 @@ handle_union_p2p_strata_estimator (void *cls,
857 size_t len; 853 size_t len;
858 int is_compressed; 854 int is_compressed;
859 855
860 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); 856 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
861 GNUNET_STATISTICS_update (_GSS_statistics, 857 GNUNET_STATISTICS_update(_GSS_statistics,
862 "# bytes of SE received", 858 "# bytes of SE received",
863 ntohs (msg->header.size), 859 ntohs(msg->header.size),
864 GNUNET_NO); 860 GNUNET_NO);
865 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); 861 len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
866 other_size = GNUNET_ntohll (msg->set_size); 862 other_size = GNUNET_ntohll(msg->set_size);
867 remote_se = strata_estimator_create (SE_STRATA_COUNT, 863 remote_se = strata_estimator_create(SE_STRATA_COUNT,
868 SE_IBF_SIZE, 864 SE_IBF_SIZE,
869 SE_IBF_HASH_NUM); 865 SE_IBF_HASH_NUM);
870 if (NULL == remote_se) 866 if (NULL == remote_se)
871 { 867 {
872 /* insufficient resources, fail */ 868 /* insufficient resources, fail */
873 fail_union_operation (op); 869 fail_union_operation(op);
874 return; 870 return;
875 } 871 }
876 if (GNUNET_OK != 872 if (GNUNET_OK !=
877 strata_estimator_read (&msg[1], 873 strata_estimator_read(&msg[1],
878 len, 874 len,
879 is_compressed, 875 is_compressed,
880 remote_se)) 876 remote_se))
881 { 877 {
882 /* decompression failed */ 878 /* decompression failed */
883 strata_estimator_destroy (remote_se); 879 strata_estimator_destroy(remote_se);
884 fail_union_operation (op); 880 fail_union_operation(op);
885 return; 881 return;
886 } 882 }
887 GNUNET_assert (NULL != op->state->se); 883 GNUNET_assert(NULL != op->state->se);
888 diff = strata_estimator_difference (remote_se, 884 diff = strata_estimator_difference(remote_se,
889 op->state->se); 885 op->state->se);
890 886
891 if (diff > 200) 887 if (diff > 200)
892 diff = diff * 3 / 2; 888 diff = diff * 3 / 2;
893 889
894 strata_estimator_destroy (remote_se); 890 strata_estimator_destroy(remote_se);
895 strata_estimator_destroy (op->state->se); 891 strata_estimator_destroy(op->state->se);
896 op->state->se = NULL; 892 op->state->se = NULL;
897 LOG (GNUNET_ERROR_TYPE_DEBUG, 893 LOG(GNUNET_ERROR_TYPE_DEBUG,
898 "got se diff=%d, using ibf size %d\n", 894 "got se diff=%d, using ibf size %d\n",
899 diff, 895 diff,
900 1U << get_order_from_difference (diff)); 896 1U << get_order_from_difference(diff));
901 897
902 { 898 {
903 char *set_debug; 899 char *set_debug;
904 900
905 set_debug = getenv ("GNUNET_SET_BENCHMARK"); 901 set_debug = getenv("GNUNET_SET_BENCHMARK");
906 if ( (NULL != set_debug) && 902 if ((NULL != set_debug) &&
907 (0 == strcmp (set_debug, "1")) ) 903 (0 == strcmp(set_debug, "1")))
908 { 904 {
909 FILE *f = fopen ("set.log", "a"); 905 FILE *f = fopen("set.log", "a");
910 fprintf (f, "%llu\n", (unsigned long long) diff); 906 fprintf(f, "%llu\n", (unsigned long long)diff);
911 fclose (f); 907 fclose(f);
912 } 908 }
913 }
914
915 if ( (GNUNET_YES == op->byzantine) &&
916 (other_size < op->byzantine_lower_bound) )
917 {
918 GNUNET_break (0);
919 fail_union_operation (op);
920 return;
921 } 909 }
922 910
923 if ( (GNUNET_YES == op->force_full) || 911 if ((GNUNET_YES == op->byzantine) &&
924 (diff > op->state->initial_size / 4) || 912 (other_size < op->byzantine_lower_bound))
925 (0 == other_size) )
926 {
927 LOG (GNUNET_ERROR_TYPE_DEBUG,
928 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
929 diff,
930 op->state->initial_size);
931 GNUNET_STATISTICS_update (_GSS_statistics,
932 "# of full sends",
933 1,
934 GNUNET_NO);
935 if ( (op->state->initial_size <= other_size) ||
936 (0 == other_size) )
937 { 913 {
938 send_full_set (op); 914 GNUNET_break(0);
915 fail_union_operation(op);
916 return;
939 } 917 }
940 else
941 {
942 struct GNUNET_MQ_Envelope *ev;
943 918
944 LOG (GNUNET_ERROR_TYPE_DEBUG, 919 if ((GNUNET_YES == op->force_full) ||
945 "Telling other peer that we expect its full set\n"); 920 (diff > op->state->initial_size / 4) ||
946 op->state->phase = PHASE_EXPECT_IBF; 921 (0 == other_size))
947 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); 922 {
948 GNUNET_MQ_send (op->mq, 923 LOG(GNUNET_ERROR_TYPE_DEBUG,
949 ev); 924 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
925 diff,
926 op->state->initial_size);
927 GNUNET_STATISTICS_update(_GSS_statistics,
928 "# of full sends",
929 1,
930 GNUNET_NO);
931 if ((op->state->initial_size <= other_size) ||
932 (0 == other_size))
933 {
934 send_full_set(op);
935 }
936 else
937 {
938 struct GNUNET_MQ_Envelope *ev;
939
940 LOG(GNUNET_ERROR_TYPE_DEBUG,
941 "Telling other peer that we expect its full set\n");
942 op->state->phase = PHASE_EXPECT_IBF;
943 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
944 GNUNET_MQ_send(op->mq,
945 ev);
946 }
950 } 947 }
951 }
952 else 948 else
953 { 949 {
954 GNUNET_STATISTICS_update (_GSS_statistics, 950 GNUNET_STATISTICS_update(_GSS_statistics,
955 "# of ibf sends", 951 "# of ibf sends",
956 1, 952 1,
957 GNUNET_NO); 953 GNUNET_NO);
958 if (GNUNET_OK != 954 if (GNUNET_OK !=
959 send_ibf (op, 955 send_ibf(op,
960 get_order_from_difference (diff))) 956 get_order_from_difference(diff)))
961 { 957 {
962 /* Internal error, best we can do is shut the connection */ 958 /* Internal error, best we can do is shut the connection */
963 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 959 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
964 "Failed to send IBF, closing connection\n"); 960 "Failed to send IBF, closing connection\n");
965 fail_union_operation (op); 961 fail_union_operation(op);
966 return; 962 return;
963 }
967 } 964 }
968 } 965 GNUNET_CADET_receive_done(op->channel);
969 GNUNET_CADET_receive_done (op->channel);
970} 966}
971 967
972 968
@@ -978,9 +974,9 @@ handle_union_p2p_strata_estimator (void *cls,
978 * @param value the key entry 974 * @param value the key entry
979 */ 975 */
980static int 976static int
981send_offers_iterator (void *cls, 977send_offers_iterator(void *cls,
982 uint32_t key, 978 uint32_t key,
983 void *value) 979 void *value)
984{ 980{
985 struct SendElementClosure *sec = cls; 981 struct SendElementClosure *sec = cls;
986 struct Operation *op = sec->op; 982 struct Operation *op = sec->op;
@@ -992,17 +988,17 @@ send_offers_iterator (void *cls,
992 if (ke->ibf_key.key_val != sec->ibf_key.key_val) 988 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
993 return GNUNET_YES; 989 return GNUNET_YES;
994 990
995 ev = GNUNET_MQ_msg_header_extra (mh, 991 ev = GNUNET_MQ_msg_header_extra(mh,
996 sizeof (struct GNUNET_HashCode), 992 sizeof(struct GNUNET_HashCode),
997 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); 993 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
998 994
999 GNUNET_assert (NULL != ev); 995 GNUNET_assert(NULL != ev);
1000 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; 996 *(struct GNUNET_HashCode *)&mh[1] = ke->element->element_hash;
1001 LOG (GNUNET_ERROR_TYPE_DEBUG, 997 LOG(GNUNET_ERROR_TYPE_DEBUG,
1002 "[OP %x] sending element offer (%s) to peer\n", 998 "[OP %x] sending element offer (%s) to peer\n",
1003 (void *) op, 999 (void *)op,
1004 GNUNET_h2s (&ke->element->element_hash)); 1000 GNUNET_h2s(&ke->element->element_hash));
1005 GNUNET_MQ_send (op->mq, ev); 1001 GNUNET_MQ_send(op->mq, ev);
1006 return GNUNET_YES; 1002 return GNUNET_YES;
1007} 1003}
1008 1004
@@ -1014,17 +1010,17 @@ send_offers_iterator (void *cls,
1014 * @param ibf_key IBF key of interest 1010 * @param ibf_key IBF key of interest
1015 */ 1011 */
1016static void 1012static void
1017send_offers_for_key (struct Operation *op, 1013send_offers_for_key(struct Operation *op,
1018 struct IBF_Key ibf_key) 1014 struct IBF_Key ibf_key)
1019{ 1015{
1020 struct SendElementClosure send_cls; 1016 struct SendElementClosure send_cls;
1021 1017
1022 send_cls.ibf_key = ibf_key; 1018 send_cls.ibf_key = ibf_key;
1023 send_cls.op = op; 1019 send_cls.op = op;
1024 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, 1020 (void)GNUNET_CONTAINER_multihashmap32_get_multiple(op->state->key_to_element,
1025 (uint32_t) ibf_key.key_val, 1021 (uint32_t)ibf_key.key_val,
1026 &send_offers_iterator, 1022 &send_offers_iterator,
1027 &send_cls); 1023 &send_cls);
1028} 1024}
1029 1025
1030 1026
@@ -1036,7 +1032,7 @@ send_offers_for_key (struct Operation *op,
1036 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure 1032 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1037 */ 1033 */
1038static int 1034static int
1039decode_and_send (struct Operation *op) 1035decode_and_send(struct Operation *op)
1040{ 1036{
1041 struct IBF_Key key; 1037 struct IBF_Key key;
1042 struct IBF_Key last_key; 1038 struct IBF_Key last_key;
@@ -1044,147 +1040,147 @@ decode_and_send (struct Operation *op)
1044 unsigned int num_decoded; 1040 unsigned int num_decoded;
1045 struct InvertibleBloomFilter *diff_ibf; 1041 struct InvertibleBloomFilter *diff_ibf;
1046 1042
1047 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); 1043 GNUNET_assert(PHASE_INVENTORY_ACTIVE == op->state->phase);
1048 1044
1049 if (GNUNET_OK != 1045 if (GNUNET_OK !=
1050 prepare_ibf (op, 1046 prepare_ibf(op,
1051 op->state->remote_ibf->size)) 1047 op->state->remote_ibf->size))
1052 { 1048 {
1053 GNUNET_break (0); 1049 GNUNET_break(0);
1054 /* allocation failed */ 1050 /* allocation failed */
1055 return GNUNET_SYSERR; 1051 return GNUNET_SYSERR;
1056 } 1052 }
1057 diff_ibf = ibf_dup (op->state->local_ibf); 1053 diff_ibf = ibf_dup(op->state->local_ibf);
1058 ibf_subtract (diff_ibf, 1054 ibf_subtract(diff_ibf,
1059 op->state->remote_ibf); 1055 op->state->remote_ibf);
1060 1056
1061 ibf_destroy (op->state->remote_ibf); 1057 ibf_destroy(op->state->remote_ibf);
1062 op->state->remote_ibf = NULL; 1058 op->state->remote_ibf = NULL;
1063 1059
1064 LOG (GNUNET_ERROR_TYPE_DEBUG, 1060 LOG(GNUNET_ERROR_TYPE_DEBUG,
1065 "decoding IBF (size=%u)\n", 1061 "decoding IBF (size=%u)\n",
1066 diff_ibf->size); 1062 diff_ibf->size);
1067 1063
1068 num_decoded = 0; 1064 num_decoded = 0;
1069 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ 1065 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1070 1066
1071 while (1) 1067 while (1)
1072 { 1068 {
1073 int res; 1069 int res;
1074 int cycle_detected = GNUNET_NO; 1070 int cycle_detected = GNUNET_NO;
1075 1071
1076 last_key = key; 1072 last_key = key;
1077 1073
1078 res = ibf_decode (diff_ibf, &side, &key); 1074 res = ibf_decode(diff_ibf, &side, &key);
1079 if (res == GNUNET_OK) 1075 if (res == GNUNET_OK)
1080 {
1081 LOG (GNUNET_ERROR_TYPE_DEBUG,
1082 "decoded ibf key %lx\n",
1083 (unsigned long) key.key_val);
1084 num_decoded += 1;
1085 if ( (num_decoded > diff_ibf->size) ||
1086 ( (num_decoded > 1) &&
1087 (last_key.key_val == key.key_val) ) )
1088 {
1089 LOG (GNUNET_ERROR_TYPE_DEBUG,
1090 "detected cyclic ibf (decoded %u/%u)\n",
1091 num_decoded,
1092 diff_ibf->size);
1093 cycle_detected = GNUNET_YES;
1094 }
1095 }
1096 if ( (GNUNET_SYSERR == res) ||
1097 (GNUNET_YES == cycle_detected) )
1098 {
1099 int next_order;
1100 next_order = 0;
1101 while (1<<next_order < diff_ibf->size)
1102 next_order++;
1103 next_order++;
1104 if (next_order <= MAX_IBF_ORDER)
1105 {
1106 LOG (GNUNET_ERROR_TYPE_DEBUG,
1107 "decoding failed, sending larger ibf (size %u)\n",
1108 1<<next_order);
1109 GNUNET_STATISTICS_update (_GSS_statistics,
1110 "# of IBF retries",
1111 1,
1112 GNUNET_NO);
1113 op->state->salt_send++;
1114 if (GNUNET_OK !=
1115 send_ibf (op, next_order))
1116 { 1076 {
1117 /* Internal error, best we can do is shut the connection */ 1077 LOG(GNUNET_ERROR_TYPE_DEBUG,
1118 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1078 "decoded ibf key %lx\n",
1119 "Failed to send IBF, closing connection\n"); 1079 (unsigned long)key.key_val);
1120 fail_union_operation (op); 1080 num_decoded += 1;
1121 ibf_destroy (diff_ibf); 1081 if ((num_decoded > diff_ibf->size) ||
1122 return GNUNET_SYSERR; 1082 ((num_decoded > 1) &&
1083 (last_key.key_val == key.key_val)))
1084 {
1085 LOG(GNUNET_ERROR_TYPE_DEBUG,
1086 "detected cyclic ibf (decoded %u/%u)\n",
1087 num_decoded,
1088 diff_ibf->size);
1089 cycle_detected = GNUNET_YES;
1090 }
1123 } 1091 }
1124 } 1092 if ((GNUNET_SYSERR == res) ||
1125 else 1093 (GNUNET_YES == cycle_detected))
1126 { 1094 {
1127 GNUNET_STATISTICS_update (_GSS_statistics, 1095 int next_order;
1128 "# of failed union operations (too large)", 1096 next_order = 0;
1129 1, 1097 while (1 << next_order < diff_ibf->size)
1130 GNUNET_NO); 1098 next_order++;
1131 // XXX: Send the whole set, element-by-element 1099 next_order++;
1132 LOG (GNUNET_ERROR_TYPE_ERROR, 1100 if (next_order <= MAX_IBF_ORDER)
1133 "set union failed: reached ibf limit\n"); 1101 {
1134 fail_union_operation (op); 1102 LOG(GNUNET_ERROR_TYPE_DEBUG,
1135 ibf_destroy (diff_ibf); 1103 "decoding failed, sending larger ibf (size %u)\n",
1136 return GNUNET_SYSERR; 1104 1 << next_order);
1137 } 1105 GNUNET_STATISTICS_update(_GSS_statistics,
1138 break; 1106 "# of IBF retries",
1139 } 1107 1,
1140 if (GNUNET_NO == res) 1108 GNUNET_NO);
1141 { 1109 op->state->salt_send++;
1142 struct GNUNET_MQ_Envelope *ev; 1110 if (GNUNET_OK !=
1143 1111 send_ibf(op, next_order))
1144 LOG (GNUNET_ERROR_TYPE_DEBUG, 1112 {
1145 "transmitted all values, sending DONE\n"); 1113 /* Internal error, best we can do is shut the connection */
1146 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); 1114 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1147 GNUNET_MQ_send (op->mq, ev); 1115 "Failed to send IBF, closing connection\n");
1148 /* We now wait until we get a DONE message back 1116 fail_union_operation(op);
1149 * and then wait for our MQ to be flushed and all our 1117 ibf_destroy(diff_ibf);
1150 * demands be delivered. */ 1118 return GNUNET_SYSERR;
1151 break; 1119 }
1152 } 1120 }
1153 if (1 == side) 1121 else
1154 { 1122 {
1155 struct IBF_Key unsalted_key; 1123 GNUNET_STATISTICS_update(_GSS_statistics,
1124 "# of failed union operations (too large)",
1125 1,
1126 GNUNET_NO);
1127 // XXX: Send the whole set, element-by-element
1128 LOG(GNUNET_ERROR_TYPE_ERROR,
1129 "set union failed: reached ibf limit\n");
1130 fail_union_operation(op);
1131 ibf_destroy(diff_ibf);
1132 return GNUNET_SYSERR;
1133 }
1134 break;
1135 }
1136 if (GNUNET_NO == res)
1137 {
1138 struct GNUNET_MQ_Envelope *ev;
1139
1140 LOG(GNUNET_ERROR_TYPE_DEBUG,
1141 "transmitted all values, sending DONE\n");
1142 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1143 GNUNET_MQ_send(op->mq, ev);
1144 /* We now wait until we get a DONE message back
1145 * and then wait for our MQ to be flushed and all our
1146 * demands be delivered. */
1147 break;
1148 }
1149 if (1 == side)
1150 {
1151 struct IBF_Key unsalted_key;
1156 1152
1157 unsalt_key (&key, 1153 unsalt_key(&key,
1158 op->state->salt_receive, 1154 op->state->salt_receive,
1159 &unsalted_key); 1155 &unsalted_key);
1160 send_offers_for_key (op, 1156 send_offers_for_key(op,
1161 unsalted_key); 1157 unsalted_key);
1162 } 1158 }
1163 else if (-1 == side) 1159 else if (-1 == side)
1164 { 1160 {
1165 struct GNUNET_MQ_Envelope *ev; 1161 struct GNUNET_MQ_Envelope *ev;
1166 struct InquiryMessage *msg; 1162 struct InquiryMessage *msg;
1167 1163
1168 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth 1164 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1169 * the effort additional complexity. */ 1165 * the effort additional complexity. */
1170 ev = GNUNET_MQ_msg_extra (msg, 1166 ev = GNUNET_MQ_msg_extra(msg,
1171 sizeof (struct IBF_Key), 1167 sizeof(struct IBF_Key),
1172 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); 1168 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1173 msg->salt = htonl (op->state->salt_receive); 1169 msg->salt = htonl(op->state->salt_receive);
1174 GNUNET_memcpy (&msg[1], 1170 GNUNET_memcpy(&msg[1],
1175 &key, 1171 &key,
1176 sizeof (struct IBF_Key)); 1172 sizeof(struct IBF_Key));
1177 LOG (GNUNET_ERROR_TYPE_DEBUG, 1173 LOG(GNUNET_ERROR_TYPE_DEBUG,
1178 "sending element inquiry for IBF key %lx\n", 1174 "sending element inquiry for IBF key %lx\n",
1179 (unsigned long) key.key_val); 1175 (unsigned long)key.key_val);
1180 GNUNET_MQ_send (op->mq, ev); 1176 GNUNET_MQ_send(op->mq, ev);
1181 } 1177 }
1182 else 1178 else
1183 { 1179 {
1184 GNUNET_assert (0); 1180 GNUNET_assert(0);
1181 }
1185 } 1182 }
1186 } 1183 ibf_destroy(diff_ibf);
1187 ibf_destroy (diff_ibf);
1188 return GNUNET_OK; 1184 return GNUNET_OK;
1189} 1185}
1190 1186
@@ -1200,52 +1196,52 @@ decode_and_send (struct Operation *op)
1200 * @return #GNUNET_OK if @a msg is well-formed 1196 * @return #GNUNET_OK if @a msg is well-formed
1201 */ 1197 */
1202int 1198int
1203check_union_p2p_ibf (void *cls, 1199check_union_p2p_ibf(void *cls,
1204 const struct IBFMessage *msg) 1200 const struct IBFMessage *msg)
1205{ 1201{
1206 struct Operation *op = cls; 1202 struct Operation *op = cls;
1207 unsigned int buckets_in_message; 1203 unsigned int buckets_in_message;
1208 1204
1209 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1205 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1210 { 1206 {
1211 GNUNET_break_op (0); 1207 GNUNET_break_op(0);
1212 return GNUNET_SYSERR; 1208 return GNUNET_SYSERR;
1213 } 1209 }
1214 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 1210 buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1215 if (0 == buckets_in_message) 1211 if (0 == buckets_in_message)
1216 {
1217 GNUNET_break_op (0);
1218 return GNUNET_SYSERR;
1219 }
1220 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1221 {
1222 GNUNET_break_op (0);
1223 return GNUNET_SYSERR;
1224 }
1225 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1226 {
1227 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1228 { 1212 {
1229 GNUNET_break_op (0); 1213 GNUNET_break_op(0);
1230 return GNUNET_SYSERR; 1214 return GNUNET_SYSERR;
1231 } 1215 }
1232 if (1<<msg->order != op->state->remote_ibf->size) 1216 if ((ntohs(msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1233 { 1217 {
1234 GNUNET_break_op (0); 1218 GNUNET_break_op(0);
1235 return GNUNET_SYSERR; 1219 return GNUNET_SYSERR;
1236 } 1220 }
1237 if (ntohl (msg->salt) != op->state->salt_receive) 1221 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1238 { 1222 {
1239 GNUNET_break_op (0); 1223 if (ntohl(msg->offset) != op->state->ibf_buckets_received)
1224 {
1225 GNUNET_break_op(0);
1226 return GNUNET_SYSERR;
1227 }
1228 if (1 << msg->order != op->state->remote_ibf->size)
1229 {
1230 GNUNET_break_op(0);
1231 return GNUNET_SYSERR;
1232 }
1233 if (ntohl(msg->salt) != op->state->salt_receive)
1234 {
1235 GNUNET_break_op(0);
1236 return GNUNET_SYSERR;
1237 }
1238 }
1239 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1240 (op->state->phase != PHASE_EXPECT_IBF))
1241 {
1242 GNUNET_break_op(0);
1240 return GNUNET_SYSERR; 1243 return GNUNET_SYSERR;
1241 } 1244 }
1242 }
1243 else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1244 (op->state->phase != PHASE_EXPECT_IBF) )
1245 {
1246 GNUNET_break_op (0);
1247 return GNUNET_SYSERR;
1248 }
1249 1245
1250 return GNUNET_OK; 1246 return GNUNET_OK;
1251} 1247}
@@ -1261,71 +1257,71 @@ check_union_p2p_ibf (void *cls,
1261 * @param msg the header of the message 1257 * @param msg the header of the message
1262 */ 1258 */
1263void 1259void
1264handle_union_p2p_ibf (void *cls, 1260handle_union_p2p_ibf(void *cls,
1265 const struct IBFMessage *msg) 1261 const struct IBFMessage *msg)
1266{ 1262{
1267 struct Operation *op = cls; 1263 struct Operation *op = cls;
1268 unsigned int buckets_in_message; 1264 unsigned int buckets_in_message;
1269 1265
1270 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 1266 buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1271 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || 1267 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1272 (op->state->phase == PHASE_EXPECT_IBF) ) 1268 (op->state->phase == PHASE_EXPECT_IBF))
1273 {
1274 op->state->phase = PHASE_EXPECT_IBF_CONT;
1275 GNUNET_assert (NULL == op->state->remote_ibf);
1276 LOG (GNUNET_ERROR_TYPE_DEBUG,
1277 "Creating new ibf of size %u\n",
1278 1 << msg->order);
1279 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1280 op->state->salt_receive = ntohl (msg->salt);
1281 LOG (GNUNET_ERROR_TYPE_DEBUG,
1282 "Receiving new IBF with salt %u\n",
1283 op->state->salt_receive);
1284 if (NULL == op->state->remote_ibf)
1285 {
1286 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1287 "Failed to parse remote IBF, closing connection\n");
1288 fail_union_operation (op);
1289 return;
1290 }
1291 op->state->ibf_buckets_received = 0;
1292 if (0 != ntohl (msg->offset))
1293 { 1269 {
1294 GNUNET_break_op (0); 1270 op->state->phase = PHASE_EXPECT_IBF_CONT;
1295 fail_union_operation (op); 1271 GNUNET_assert(NULL == op->state->remote_ibf);
1296 return; 1272 LOG(GNUNET_ERROR_TYPE_DEBUG,
1273 "Creating new ibf of size %u\n",
1274 1 << msg->order);
1275 op->state->remote_ibf = ibf_create(1 << msg->order, SE_IBF_HASH_NUM);
1276 op->state->salt_receive = ntohl(msg->salt);
1277 LOG(GNUNET_ERROR_TYPE_DEBUG,
1278 "Receiving new IBF with salt %u\n",
1279 op->state->salt_receive);
1280 if (NULL == op->state->remote_ibf)
1281 {
1282 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1283 "Failed to parse remote IBF, closing connection\n");
1284 fail_union_operation(op);
1285 return;
1286 }
1287 op->state->ibf_buckets_received = 0;
1288 if (0 != ntohl(msg->offset))
1289 {
1290 GNUNET_break_op(0);
1291 fail_union_operation(op);
1292 return;
1293 }
1297 } 1294 }
1298 }
1299 else 1295 else
1300 { 1296 {
1301 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); 1297 GNUNET_assert(op->state->phase == PHASE_EXPECT_IBF_CONT);
1302 LOG (GNUNET_ERROR_TYPE_DEBUG, 1298 LOG(GNUNET_ERROR_TYPE_DEBUG,
1303 "Received more of IBF\n"); 1299 "Received more of IBF\n");
1304 } 1300 }
1305 GNUNET_assert (NULL != op->state->remote_ibf); 1301 GNUNET_assert(NULL != op->state->remote_ibf);
1306 1302
1307 ibf_read_slice (&msg[1], 1303 ibf_read_slice(&msg[1],
1308 op->state->ibf_buckets_received, 1304 op->state->ibf_buckets_received,
1309 buckets_in_message, 1305 buckets_in_message,
1310 op->state->remote_ibf); 1306 op->state->remote_ibf);
1311 op->state->ibf_buckets_received += buckets_in_message; 1307 op->state->ibf_buckets_received += buckets_in_message;
1312 1308
1313 if (op->state->ibf_buckets_received == op->state->remote_ibf->size) 1309 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1314 { 1310 {
1315 LOG (GNUNET_ERROR_TYPE_DEBUG, 1311 LOG(GNUNET_ERROR_TYPE_DEBUG,
1316 "received full ibf\n"); 1312 "received full ibf\n");
1317 op->state->phase = PHASE_INVENTORY_ACTIVE; 1313 op->state->phase = PHASE_INVENTORY_ACTIVE;
1318 if (GNUNET_OK != 1314 if (GNUNET_OK !=
1319 decode_and_send (op)) 1315 decode_and_send(op))
1320 { 1316 {
1321 /* Internal error, best we can do is shut down */ 1317 /* Internal error, best we can do is shut down */
1322 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1318 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
1323 "Failed to decode IBF, closing connection\n"); 1319 "Failed to decode IBF, closing connection\n");
1324 fail_union_operation (op); 1320 fail_union_operation(op);
1325 return; 1321 return;
1322 }
1326 } 1323 }
1327 } 1324 GNUNET_CADET_receive_done(op->channel);
1328 GNUNET_CADET_receive_done (op->channel);
1329} 1325}
1330 1326
1331 1327
@@ -1338,33 +1334,33 @@ handle_union_p2p_ibf (void *cls,
1338 * @param status status to send with the new element 1334 * @param status status to send with the new element
1339 */ 1335 */
1340static void 1336static void
1341send_client_element (struct Operation *op, 1337send_client_element(struct Operation *op,
1342 struct GNUNET_SET_Element *element, 1338 struct GNUNET_SET_Element *element,
1343 int status) 1339 int status)
1344{ 1340{
1345 struct GNUNET_MQ_Envelope *ev; 1341 struct GNUNET_MQ_Envelope *ev;
1346 struct GNUNET_SET_ResultMessage *rm; 1342 struct GNUNET_SET_ResultMessage *rm;
1347 1343
1348 LOG (GNUNET_ERROR_TYPE_DEBUG, 1344 LOG(GNUNET_ERROR_TYPE_DEBUG,
1349 "sending element (size %u) to client\n", 1345 "sending element (size %u) to client\n",
1350 element->size); 1346 element->size);
1351 GNUNET_assert (0 != op->client_request_id); 1347 GNUNET_assert(0 != op->client_request_id);
1352 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 1348 ev = GNUNET_MQ_msg_extra(rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1353 if (NULL == ev) 1349 if (NULL == ev)
1354 { 1350 {
1355 GNUNET_MQ_discard (ev); 1351 GNUNET_MQ_discard(ev);
1356 GNUNET_break (0); 1352 GNUNET_break(0);
1357 return; 1353 return;
1358 } 1354 }
1359 rm->result_status = htons (status); 1355 rm->result_status = htons(status);
1360 rm->request_id = htonl (op->client_request_id); 1356 rm->request_id = htonl(op->client_request_id);
1361 rm->element_type = htons (element->element_type); 1357 rm->element_type = htons(element->element_type);
1362 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); 1358 rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element));
1363 GNUNET_memcpy (&rm[1], 1359 GNUNET_memcpy(&rm[1],
1364 element->data, 1360 element->data,
1365 element->size); 1361 element->size);
1366 GNUNET_MQ_send (op->set->cs->mq, 1362 GNUNET_MQ_send(op->set->cs->mq,
1367 ev); 1363 ev);
1368} 1364}
1369 1365
1370 1366
@@ -1375,49 +1371,50 @@ send_client_element (struct Operation *op,
1375 * @param cls operation to destroy 1371 * @param cls operation to destroy
1376 */ 1372 */
1377static void 1373static void
1378send_client_done (void *cls) 1374send_client_done(void *cls)
1379{ 1375{
1380 struct Operation *op = cls; 1376 struct Operation *op = cls;
1381 struct GNUNET_MQ_Envelope *ev; 1377 struct GNUNET_MQ_Envelope *ev;
1382 struct GNUNET_SET_ResultMessage *rm; 1378 struct GNUNET_SET_ResultMessage *rm;
1383 1379
1384 if (GNUNET_YES == op->state->client_done_sent) 1380 if (GNUNET_YES == op->state->client_done_sent)
1385 { 1381 {
1386 return; 1382 return;
1387 } 1383 }
1388 1384
1389 if (PHASE_DONE != op->state->phase) { 1385 if (PHASE_DONE != op->state->phase)
1390 LOG (GNUNET_ERROR_TYPE_WARNING, 1386 {
1391 "Union operation failed\n"); 1387 LOG(GNUNET_ERROR_TYPE_WARNING,
1392 GNUNET_STATISTICS_update (_GSS_statistics, 1388 "Union operation failed\n");
1393 "# Union operations failed", 1389 GNUNET_STATISTICS_update(_GSS_statistics,
1394 1, 1390 "# Union operations failed",
1395 GNUNET_NO); 1391 1,
1396 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1392 GNUNET_NO);
1397 rm->result_status = htons (GNUNET_SET_STATUS_FAILURE); 1393 ev = GNUNET_MQ_msg(rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1398 rm->request_id = htonl (op->client_request_id); 1394 rm->result_status = htons(GNUNET_SET_STATUS_FAILURE);
1399 rm->element_type = htons (0); 1395 rm->request_id = htonl(op->client_request_id);
1400 GNUNET_MQ_send (op->set->cs->mq, 1396 rm->element_type = htons(0);
1401 ev); 1397 GNUNET_MQ_send(op->set->cs->mq,
1402 return; 1398 ev);
1403 } 1399 return;
1400 }
1404 1401
1405 op->state->client_done_sent = GNUNET_YES; 1402 op->state->client_done_sent = GNUNET_YES;
1406 1403
1407 GNUNET_STATISTICS_update (_GSS_statistics, 1404 GNUNET_STATISTICS_update(_GSS_statistics,
1408 "# Union operations succeeded", 1405 "# Union operations succeeded",
1409 1, 1406 1,
1410 GNUNET_NO); 1407 GNUNET_NO);
1411 LOG (GNUNET_ERROR_TYPE_INFO, 1408 LOG(GNUNET_ERROR_TYPE_INFO,
1412 "Signalling client that union operation is done\n"); 1409 "Signalling client that union operation is done\n");
1413 ev = GNUNET_MQ_msg (rm, 1410 ev = GNUNET_MQ_msg(rm,
1414 GNUNET_MESSAGE_TYPE_SET_RESULT); 1411 GNUNET_MESSAGE_TYPE_SET_RESULT);
1415 rm->request_id = htonl (op->client_request_id); 1412 rm->request_id = htonl(op->client_request_id);
1416 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1413 rm->result_status = htons(GNUNET_SET_STATUS_DONE);
1417 rm->element_type = htons (0); 1414 rm->element_type = htons(0);
1418 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); 1415 rm->current_size = GNUNET_htonll(GNUNET_CONTAINER_multihashmap32_size(op->state->key_to_element));
1419 GNUNET_MQ_send (op->set->cs->mq, 1416 GNUNET_MQ_send(op->set->cs->mq,
1420 ev); 1417 ev);
1421} 1418}
1422 1419
1423 1420
@@ -1427,41 +1424,41 @@ send_client_done (void *cls)
1427 * @param op operation to check 1424 * @param op operation to check
1428 */ 1425 */
1429static void 1426static void
1430maybe_finish (struct Operation *op) 1427maybe_finish(struct Operation *op)
1431{ 1428{
1432 unsigned int num_demanded; 1429 unsigned int num_demanded;
1433 1430
1434 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes); 1431 num_demanded = GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes);
1435 1432
1436 if (PHASE_FINISH_WAITING == op->state->phase) 1433 if (PHASE_FINISH_WAITING == op->state->phase)
1437 {
1438 LOG (GNUNET_ERROR_TYPE_DEBUG,
1439 "In PHASE_FINISH_WAITING, pending %u demands\n",
1440 num_demanded);
1441 if (0 == num_demanded)
1442 { 1434 {
1443 struct GNUNET_MQ_Envelope *ev; 1435 LOG(GNUNET_ERROR_TYPE_DEBUG,
1444 1436 "In PHASE_FINISH_WAITING, pending %u demands\n",
1445 op->state->phase = PHASE_DONE; 1437 num_demanded);
1446 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); 1438 if (0 == num_demanded)
1447 GNUNET_MQ_send (op->mq, 1439 {
1448 ev); 1440 struct GNUNET_MQ_Envelope *ev;
1449 /* We now wait until the other peer sends P2P_OVER 1441
1450 * after it got all elements from us. */ 1442 op->state->phase = PHASE_DONE;
1443 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1444 GNUNET_MQ_send(op->mq,
1445 ev);
1446 /* We now wait until the other peer sends P2P_OVER
1447 * after it got all elements from us. */
1448 }
1451 } 1449 }
1452 }
1453 if (PHASE_FINISH_CLOSING == op->state->phase) 1450 if (PHASE_FINISH_CLOSING == op->state->phase)
1454 {
1455 LOG (GNUNET_ERROR_TYPE_DEBUG,
1456 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1457 num_demanded);
1458 if (0 == num_demanded)
1459 { 1451 {
1460 op->state->phase = PHASE_DONE; 1452 LOG(GNUNET_ERROR_TYPE_DEBUG,
1461 send_client_done (op); 1453 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1462 _GSS_operation_destroy2 (op); 1454 num_demanded);
1455 if (0 == num_demanded)
1456 {
1457 op->state->phase = PHASE_DONE;
1458 send_client_done(op);
1459 _GSS_operation_destroy2(op);
1460 }
1463 } 1461 }
1464 }
1465} 1462}
1466 1463
1467 1464
@@ -1472,21 +1469,21 @@ maybe_finish (struct Operation *op)
1472 * @param emsg the message 1469 * @param emsg the message
1473 */ 1470 */
1474int 1471int
1475check_union_p2p_elements (void *cls, 1472check_union_p2p_elements(void *cls,
1476 const struct GNUNET_SET_ElementMessage *emsg) 1473 const struct GNUNET_SET_ElementMessage *emsg)
1477{ 1474{
1478 struct Operation *op = cls; 1475 struct Operation *op = cls;
1479 1476
1480 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1477 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1481 { 1478 {
1482 GNUNET_break_op (0); 1479 GNUNET_break_op(0);
1483 return GNUNET_SYSERR; 1480 return GNUNET_SYSERR;
1484 } 1481 }
1485 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) 1482 if (0 == GNUNET_CONTAINER_multihashmap_size(op->state->demanded_hashes))
1486 { 1483 {
1487 GNUNET_break_op (0); 1484 GNUNET_break_op(0);
1488 return GNUNET_SYSERR; 1485 return GNUNET_SYSERR;
1489 } 1486 }
1490 return GNUNET_OK; 1487 return GNUNET_OK;
1491} 1488}
1492 1489
@@ -1500,96 +1497,98 @@ check_union_p2p_elements (void *cls,
1500 * @param emsg the message 1497 * @param emsg the message
1501 */ 1498 */
1502void 1499void
1503handle_union_p2p_elements (void *cls, 1500handle_union_p2p_elements(void *cls,
1504 const struct GNUNET_SET_ElementMessage *emsg) 1501 const struct GNUNET_SET_ElementMessage *emsg)
1505{ 1502{
1506 struct Operation *op = cls; 1503 struct Operation *op = cls;
1507 struct ElementEntry *ee; 1504 struct ElementEntry *ee;
1508 struct KeyEntry *ke; 1505 struct KeyEntry *ke;
1509 uint16_t element_size; 1506 uint16_t element_size;
1510 1507
1511 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); 1508 element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1512 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1509 ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1513 GNUNET_memcpy (&ee[1], 1510 GNUNET_memcpy(&ee[1],
1514 &emsg[1], 1511 &emsg[1],
1515 element_size); 1512 element_size);
1516 ee->element.size = element_size; 1513 ee->element.size = element_size;
1517 ee->element.data = &ee[1]; 1514 ee->element.data = &ee[1];
1518 ee->element.element_type = ntohs (emsg->element_type); 1515 ee->element.element_type = ntohs(emsg->element_type);
1519 ee->remote = GNUNET_YES; 1516 ee->remote = GNUNET_YES;
1520 GNUNET_SET_element_hash (&ee->element, 1517 GNUNET_SET_element_hash(&ee->element,
1521 &ee->element_hash); 1518 &ee->element_hash);
1522 if (GNUNET_NO == 1519 if (GNUNET_NO ==
1523 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, 1520 GNUNET_CONTAINER_multihashmap_remove(op->state->demanded_hashes,
1524 &ee->element_hash, 1521 &ee->element_hash,
1525 NULL)) 1522 NULL))
1526 { 1523 {
1527 /* We got something we didn't demand, since it's not in our map. */ 1524 /* We got something we didn't demand, since it's not in our map. */
1528 GNUNET_break_op (0); 1525 GNUNET_break_op(0);
1529 fail_union_operation (op); 1526 fail_union_operation(op);
1530 return; 1527 return;
1531 } 1528 }
1532 1529
1533 LOG (GNUNET_ERROR_TYPE_DEBUG, 1530 LOG(GNUNET_ERROR_TYPE_DEBUG,
1534 "Got element (size %u, hash %s) from peer\n", 1531 "Got element (size %u, hash %s) from peer\n",
1535 (unsigned int) element_size, 1532 (unsigned int)element_size,
1536 GNUNET_h2s (&ee->element_hash)); 1533 GNUNET_h2s(&ee->element_hash));
1537 1534
1538 GNUNET_STATISTICS_update (_GSS_statistics, 1535 GNUNET_STATISTICS_update(_GSS_statistics,
1539 "# received elements", 1536 "# received elements",
1540 1, 1537 1,
1541 GNUNET_NO); 1538 GNUNET_NO);
1542 GNUNET_STATISTICS_update (_GSS_statistics, 1539 GNUNET_STATISTICS_update(_GSS_statistics,
1543 "# exchanged elements", 1540 "# exchanged elements",
1544 1, 1541 1,
1545 GNUNET_NO); 1542 GNUNET_NO);
1546 1543
1547 op->state->received_total++; 1544 op->state->received_total++;
1548 1545
1549 ke = op_get_element (op, &ee->element_hash); 1546 ke = op_get_element(op, &ee->element_hash);
1550 if (NULL != ke) 1547 if (NULL != ke)
1551 { 1548 {
1552 /* Got repeated element. Should not happen since 1549 /* Got repeated element. Should not happen since
1553 * we track demands. */ 1550 * we track demands. */
1554 GNUNET_STATISTICS_update (_GSS_statistics, 1551 GNUNET_STATISTICS_update(_GSS_statistics,
1555 "# repeated elements", 1552 "# repeated elements",
1556 1, 1553 1,
1557 GNUNET_NO); 1554 GNUNET_NO);
1558 ke->received = GNUNET_YES; 1555 ke->received = GNUNET_YES;
1559 GNUNET_free (ee); 1556 GNUNET_free(ee);
1560 } 1557 }
1561 else 1558 else
1562 { 1559 {
1563 LOG (GNUNET_ERROR_TYPE_DEBUG, 1560 LOG(GNUNET_ERROR_TYPE_DEBUG,
1564 "Registering new element from remote peer\n"); 1561 "Registering new element from remote peer\n");
1565 op->state->received_fresh++; 1562 op->state->received_fresh++;
1566 op_register_element (op, ee, GNUNET_YES); 1563 op_register_element(op, ee, GNUNET_YES);
1567 /* only send results immediately if the client wants it */ 1564 /* only send results immediately if the client wants it */
1568 switch (op->result_mode) 1565 switch (op->result_mode)
1569 { 1566 {
1570 case GNUNET_SET_RESULT_ADDED: 1567 case GNUNET_SET_RESULT_ADDED:
1571 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); 1568 send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK);
1572 break; 1569 break;
1573 case GNUNET_SET_RESULT_SYMMETRIC: 1570
1574 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); 1571 case GNUNET_SET_RESULT_SYMMETRIC:
1575 break; 1572 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1576 default: 1573 break;
1577 /* Result mode not supported, should have been caught earlier. */ 1574
1578 GNUNET_break (0); 1575 default:
1579 break; 1576 /* Result mode not supported, should have been caught earlier. */
1577 GNUNET_break(0);
1578 break;
1579 }
1580 } 1580 }
1581 }
1582 1581
1583 if ( (op->state->received_total > 8) && 1582 if ((op->state->received_total > 8) &&
1584 (op->state->received_fresh < op->state->received_total / 3) ) 1583 (op->state->received_fresh < op->state->received_total / 3))
1585 { 1584 {
1586 /* The other peer gave us lots of old elements, there's something wrong. */ 1585 /* The other peer gave us lots of old elements, there's something wrong. */
1587 GNUNET_break_op (0); 1586 GNUNET_break_op(0);
1588 fail_union_operation (op); 1587 fail_union_operation(op);
1589 return; 1588 return;
1590 } 1589 }
1591 GNUNET_CADET_receive_done (op->channel); 1590 GNUNET_CADET_receive_done(op->channel);
1592 maybe_finish (op); 1591 maybe_finish(op);
1593} 1592}
1594 1593
1595 1594
@@ -1600,16 +1599,16 @@ handle_union_p2p_elements (void *cls,
1600 * @param emsg the message 1599 * @param emsg the message
1601 */ 1600 */
1602int 1601int
1603check_union_p2p_full_element (void *cls, 1602check_union_p2p_full_element(void *cls,
1604 const struct GNUNET_SET_ElementMessage *emsg) 1603 const struct GNUNET_SET_ElementMessage *emsg)
1605{ 1604{
1606 struct Operation *op = cls; 1605 struct Operation *op = cls;
1607 1606
1608 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1607 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1609 { 1608 {
1610 GNUNET_break_op (0); 1609 GNUNET_break_op(0);
1611 return GNUNET_SYSERR; 1610 return GNUNET_SYSERR;
1612 } 1611 }
1613 // FIXME: check that we expect full elements here? 1612 // FIXME: check that we expect full elements here?
1614 return GNUNET_OK; 1613 return GNUNET_OK;
1615} 1614}
@@ -1622,87 +1621,89 @@ check_union_p2p_full_element (void *cls,
1622 * @param emsg the message 1621 * @param emsg the message
1623 */ 1622 */
1624void 1623void
1625handle_union_p2p_full_element (void *cls, 1624handle_union_p2p_full_element(void *cls,
1626 const struct GNUNET_SET_ElementMessage *emsg) 1625 const struct GNUNET_SET_ElementMessage *emsg)
1627{ 1626{
1628 struct Operation *op = cls; 1627 struct Operation *op = cls;
1629 struct ElementEntry *ee; 1628 struct ElementEntry *ee;
1630 struct KeyEntry *ke; 1629 struct KeyEntry *ke;
1631 uint16_t element_size; 1630 uint16_t element_size;
1632 1631
1633 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); 1632 element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1634 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1633 ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1635 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 1634 GNUNET_memcpy(&ee[1], &emsg[1], element_size);
1636 ee->element.size = element_size; 1635 ee->element.size = element_size;
1637 ee->element.data = &ee[1]; 1636 ee->element.data = &ee[1];
1638 ee->element.element_type = ntohs (emsg->element_type); 1637 ee->element.element_type = ntohs(emsg->element_type);
1639 ee->remote = GNUNET_YES; 1638 ee->remote = GNUNET_YES;
1640 GNUNET_SET_element_hash (&ee->element, &ee->element_hash); 1639 GNUNET_SET_element_hash(&ee->element, &ee->element_hash);
1641 1640
1642 LOG (GNUNET_ERROR_TYPE_DEBUG, 1641 LOG(GNUNET_ERROR_TYPE_DEBUG,
1643 "Got element (full diff, size %u, hash %s) from peer\n", 1642 "Got element (full diff, size %u, hash %s) from peer\n",
1644 (unsigned int) element_size, 1643 (unsigned int)element_size,
1645 GNUNET_h2s (&ee->element_hash)); 1644 GNUNET_h2s(&ee->element_hash));
1646 1645
1647 GNUNET_STATISTICS_update (_GSS_statistics, 1646 GNUNET_STATISTICS_update(_GSS_statistics,
1648 "# received elements", 1647 "# received elements",
1649 1, 1648 1,
1650 GNUNET_NO); 1649 GNUNET_NO);
1651 GNUNET_STATISTICS_update (_GSS_statistics, 1650 GNUNET_STATISTICS_update(_GSS_statistics,
1652 "# exchanged elements", 1651 "# exchanged elements",
1653 1, 1652 1,
1654 GNUNET_NO); 1653 GNUNET_NO);
1655 1654
1656 op->state->received_total++; 1655 op->state->received_total++;
1657 1656
1658 ke = op_get_element (op, &ee->element_hash); 1657 ke = op_get_element(op, &ee->element_hash);
1659 if (NULL != ke) 1658 if (NULL != ke)
1660 { 1659 {
1661 /* Got repeated element. Should not happen since 1660 /* Got repeated element. Should not happen since
1662 * we track demands. */ 1661 * we track demands. */
1663 GNUNET_STATISTICS_update (_GSS_statistics, 1662 GNUNET_STATISTICS_update(_GSS_statistics,
1664 "# repeated elements", 1663 "# repeated elements",
1665 1, 1664 1,
1666 GNUNET_NO); 1665 GNUNET_NO);
1667 ke->received = GNUNET_YES; 1666 ke->received = GNUNET_YES;
1668 GNUNET_free (ee); 1667 GNUNET_free(ee);
1669 } 1668 }
1670 else 1669 else
1671 { 1670 {
1672 LOG (GNUNET_ERROR_TYPE_DEBUG, 1671 LOG(GNUNET_ERROR_TYPE_DEBUG,
1673 "Registering new element from remote peer\n"); 1672 "Registering new element from remote peer\n");
1674 op->state->received_fresh++; 1673 op->state->received_fresh++;
1675 op_register_element (op, ee, GNUNET_YES); 1674 op_register_element(op, ee, GNUNET_YES);
1676 /* only send results immediately if the client wants it */ 1675 /* only send results immediately if the client wants it */
1677 switch (op->result_mode) 1676 switch (op->result_mode)
1678 { 1677 {
1679 case GNUNET_SET_RESULT_ADDED: 1678 case GNUNET_SET_RESULT_ADDED:
1680 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); 1679 send_client_element(op, &ee->element, GNUNET_SET_STATUS_OK);
1681 break; 1680 break;
1682 case GNUNET_SET_RESULT_SYMMETRIC: 1681
1683 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); 1682 case GNUNET_SET_RESULT_SYMMETRIC:
1684 break; 1683 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1685 default: 1684 break;
1686 /* Result mode not supported, should have been caught earlier. */ 1685
1687 GNUNET_break (0); 1686 default:
1688 break; 1687 /* Result mode not supported, should have been caught earlier. */
1688 GNUNET_break(0);
1689 break;
1690 }
1689 } 1691 }
1690 }
1691 1692
1692 if ( (GNUNET_YES == op->byzantine) && 1693 if ((GNUNET_YES == op->byzantine) &&
1693 (op->state->received_total > 384 + op->state->received_fresh * 4) && 1694 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1694 (op->state->received_fresh < op->state->received_total / 6) ) 1695 (op->state->received_fresh < op->state->received_total / 6))
1695 { 1696 {
1696 /* The other peer gave us lots of old elements, there's something wrong. */ 1697 /* The other peer gave us lots of old elements, there's something wrong. */
1697 LOG (GNUNET_ERROR_TYPE_ERROR, 1698 LOG(GNUNET_ERROR_TYPE_ERROR,
1698 "Other peer sent only %llu/%llu fresh elements, failing operation\n", 1699 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1699 (unsigned long long) op->state->received_fresh, 1700 (unsigned long long)op->state->received_fresh,
1700 (unsigned long long) op->state->received_total); 1701 (unsigned long long)op->state->received_total);
1701 GNUNET_break_op (0); 1702 GNUNET_break_op(0);
1702 fail_union_operation (op); 1703 fail_union_operation(op);
1703 return; 1704 return;
1704 } 1705 }
1705 GNUNET_CADET_receive_done (op->channel); 1706 GNUNET_CADET_receive_done(op->channel);
1706} 1707}
1707 1708
1708 1709
@@ -1714,30 +1715,30 @@ handle_union_p2p_full_element (void *cls,
1714 * @param msg the message 1715 * @param msg the message
1715 */ 1716 */
1716int 1717int
1717check_union_p2p_inquiry (void *cls, 1718check_union_p2p_inquiry(void *cls,
1718 const struct InquiryMessage *msg) 1719 const struct InquiryMessage *msg)
1719{ 1720{
1720 struct Operation *op = cls; 1721 struct Operation *op = cls;
1721 unsigned int num_keys; 1722 unsigned int num_keys;
1722 1723
1723 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1724 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1724 { 1725 {
1725 GNUNET_break_op (0); 1726 GNUNET_break_op(0);
1726 return GNUNET_SYSERR; 1727 return GNUNET_SYSERR;
1727 } 1728 }
1728 if (op->state->phase != PHASE_INVENTORY_PASSIVE) 1729 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1729 { 1730 {
1730 GNUNET_break_op (0); 1731 GNUNET_break_op(0);
1731 return GNUNET_SYSERR; 1732 return GNUNET_SYSERR;
1732 } 1733 }
1733 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) 1734 num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1734 / sizeof (struct IBF_Key); 1735 / sizeof(struct IBF_Key);
1735 if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage)) 1736 if ((ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1736 != num_keys * sizeof (struct IBF_Key)) 1737 != num_keys * sizeof(struct IBF_Key))
1737 { 1738 {
1738 GNUNET_break_op (0); 1739 GNUNET_break_op(0);
1739 return GNUNET_SYSERR; 1740 return GNUNET_SYSERR;
1740 } 1741 }
1741 return GNUNET_OK; 1742 return GNUNET_OK;
1742} 1743}
1743 1744
@@ -1750,30 +1751,30 @@ check_union_p2p_inquiry (void *cls,
1750 * @param msg the message 1751 * @param msg the message
1751 */ 1752 */
1752void 1753void
1753handle_union_p2p_inquiry (void *cls, 1754handle_union_p2p_inquiry(void *cls,
1754 const struct InquiryMessage *msg) 1755 const struct InquiryMessage *msg)
1755{ 1756{
1756 struct Operation *op = cls; 1757 struct Operation *op = cls;
1757 const struct IBF_Key *ibf_key; 1758 const struct IBF_Key *ibf_key;
1758 unsigned int num_keys; 1759 unsigned int num_keys;
1759 1760
1760 LOG (GNUNET_ERROR_TYPE_DEBUG, 1761 LOG(GNUNET_ERROR_TYPE_DEBUG,
1761 "Received union inquiry\n"); 1762 "Received union inquiry\n");
1762 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) 1763 num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1763 / sizeof (struct IBF_Key); 1764 / sizeof(struct IBF_Key);
1764 ibf_key = (const struct IBF_Key *) &msg[1]; 1765 ibf_key = (const struct IBF_Key *)&msg[1];
1765 while (0 != num_keys--) 1766 while (0 != num_keys--)
1766 { 1767 {
1767 struct IBF_Key unsalted_key; 1768 struct IBF_Key unsalted_key;
1768 1769
1769 unsalt_key (ibf_key, 1770 unsalt_key(ibf_key,
1770 ntohl (msg->salt), 1771 ntohl(msg->salt),
1771 &unsalted_key); 1772 &unsalted_key);
1772 send_offers_for_key (op, 1773 send_offers_for_key(op,
1773 unsalted_key); 1774 unsalted_key);
1774 ibf_key++; 1775 ibf_key++;
1775 } 1776 }
1776 GNUNET_CADET_receive_done (op->channel); 1777 GNUNET_CADET_receive_done(op->channel);
1777} 1778}
1778 1779
1779 1780
@@ -1788,9 +1789,9 @@ handle_union_p2p_inquiry (void *cls,
1788 * #GNUNET_NO if not. 1789 * #GNUNET_NO if not.
1789 */ 1790 */
1790static int 1791static int
1791send_missing_full_elements_iter (void *cls, 1792send_missing_full_elements_iter(void *cls,
1792 uint32_t key, 1793 uint32_t key,
1793 void *value) 1794 void *value)
1794{ 1795{
1795 struct Operation *op = cls; 1796 struct Operation *op = cls;
1796 struct KeyEntry *ke = value; 1797 struct KeyEntry *ke = value;
@@ -1800,15 +1801,15 @@ send_missing_full_elements_iter (void *cls,
1800 1801
1801 if (GNUNET_YES == ke->received) 1802 if (GNUNET_YES == ke->received)
1802 return GNUNET_YES; 1803 return GNUNET_YES;
1803 ev = GNUNET_MQ_msg_extra (emsg, 1804 ev = GNUNET_MQ_msg_extra(emsg,
1804 ee->element.size, 1805 ee->element.size,
1805 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); 1806 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1806 GNUNET_memcpy (&emsg[1], 1807 GNUNET_memcpy(&emsg[1],
1807 ee->element.data, 1808 ee->element.data,
1808 ee->element.size); 1809 ee->element.size);
1809 emsg->element_type = htons (ee->element.element_type); 1810 emsg->element_type = htons(ee->element.element_type);
1810 GNUNET_MQ_send (op->mq, 1811 GNUNET_MQ_send(op->mq,
1811 ev); 1812 ev);
1812 return GNUNET_YES; 1813 return GNUNET_YES;
1813} 1814}
1814 1815
@@ -1820,30 +1821,30 @@ send_missing_full_elements_iter (void *cls,
1820 * @param mh the demand message 1821 * @param mh the demand message
1821 */ 1822 */
1822void 1823void
1823handle_union_p2p_request_full (void *cls, 1824handle_union_p2p_request_full(void *cls,
1824 const struct GNUNET_MessageHeader *mh) 1825 const struct GNUNET_MessageHeader *mh)
1825{ 1826{
1826 struct Operation *op = cls; 1827 struct Operation *op = cls;
1827 1828
1828 LOG (GNUNET_ERROR_TYPE_DEBUG, 1829 LOG(GNUNET_ERROR_TYPE_DEBUG,
1829 "Received request for full set transmission\n"); 1830 "Received request for full set transmission\n");
1830 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1831 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1831 { 1832 {
1832 GNUNET_break_op (0); 1833 GNUNET_break_op(0);
1833 fail_union_operation (op); 1834 fail_union_operation(op);
1834 return; 1835 return;
1835 } 1836 }
1836 if (PHASE_EXPECT_IBF != op->state->phase) 1837 if (PHASE_EXPECT_IBF != op->state->phase)
1837 { 1838 {
1838 GNUNET_break_op (0); 1839 GNUNET_break_op(0);
1839 fail_union_operation (op); 1840 fail_union_operation(op);
1840 return; 1841 return;
1841 } 1842 }
1842 1843
1843 // FIXME: we need to check that our set is larger than the 1844 // FIXME: we need to check that our set is larger than the
1844 // byzantine_lower_bound by some threshold 1845 // byzantine_lower_bound by some threshold
1845 send_full_set (op); 1846 send_full_set(op);
1846 GNUNET_CADET_receive_done (op->channel); 1847 GNUNET_CADET_receive_done(op->channel);
1847} 1848}
1848 1849
1849 1850
@@ -1854,53 +1855,55 @@ handle_union_p2p_request_full (void *cls,
1854 * @param mh the demand message 1855 * @param mh the demand message
1855 */ 1856 */
1856void 1857void
1857handle_union_p2p_full_done (void *cls, 1858handle_union_p2p_full_done(void *cls,
1858 const struct GNUNET_MessageHeader *mh) 1859 const struct GNUNET_MessageHeader *mh)
1859{ 1860{
1860 struct Operation *op = cls; 1861 struct Operation *op = cls;
1861 1862
1862 switch (op->state->phase) 1863 switch (op->state->phase)
1863 { 1864 {
1864 case PHASE_EXPECT_IBF: 1865 case PHASE_EXPECT_IBF:
1865 { 1866 {
1866 struct GNUNET_MQ_Envelope *ev; 1867 struct GNUNET_MQ_Envelope *ev;
1867 1868
1868 LOG (GNUNET_ERROR_TYPE_DEBUG, 1869 LOG(GNUNET_ERROR_TYPE_DEBUG,
1869 "got FULL DONE, sending elements that other peer is missing\n"); 1870 "got FULL DONE, sending elements that other peer is missing\n");
1870 1871
1871 /* send all the elements that did not come from the remote peer */ 1872 /* send all the elements that did not come from the remote peer */
1872 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 1873 GNUNET_CONTAINER_multihashmap32_iterate(op->state->key_to_element,
1873 &send_missing_full_elements_iter, 1874 &send_missing_full_elements_iter,
1874 op); 1875 op);
1875 1876
1876 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 1877 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1877 GNUNET_MQ_send (op->mq, 1878 GNUNET_MQ_send(op->mq,
1878 ev); 1879 ev);
1879 op->state->phase = PHASE_DONE; 1880 op->state->phase = PHASE_DONE;
1880 /* we now wait until the other peer sends us the OVER message*/ 1881 /* we now wait until the other peer sends us the OVER message*/
1881 } 1882 }
1882 break; 1883 break;
1883 case PHASE_FULL_SENDING: 1884
1885 case PHASE_FULL_SENDING:
1884 { 1886 {
1885 LOG (GNUNET_ERROR_TYPE_DEBUG, 1887 LOG(GNUNET_ERROR_TYPE_DEBUG,
1886 "got FULL DONE, finishing\n"); 1888 "got FULL DONE, finishing\n");
1887 /* We sent the full set, and got the response for that. We're done. */ 1889 /* We sent the full set, and got the response for that. We're done. */
1888 op->state->phase = PHASE_DONE; 1890 op->state->phase = PHASE_DONE;
1889 GNUNET_CADET_receive_done (op->channel); 1891 GNUNET_CADET_receive_done(op->channel);
1890 send_client_done (op); 1892 send_client_done(op);
1891 _GSS_operation_destroy2 (op); 1893 _GSS_operation_destroy2(op);
1892 return; 1894 return;
1893 } 1895 }
1894 break; 1896 break;
1895 default: 1897
1896 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1898 default:
1897 "Handle full done phase is %u\n", 1899 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1898 (unsigned) op->state->phase); 1900 "Handle full done phase is %u\n",
1899 GNUNET_break_op (0); 1901 (unsigned)op->state->phase);
1900 fail_union_operation (op); 1902 GNUNET_break_op(0);
1901 return; 1903 fail_union_operation(op);
1902 } 1904 return;
1903 GNUNET_CADET_receive_done (op->channel); 1905 }
1906 GNUNET_CADET_receive_done(op->channel);
1904} 1907}
1905 1908
1906 1909
@@ -1913,25 +1916,25 @@ handle_union_p2p_full_done (void *cls,
1913 * @return #GNUNET_OK if @a mh is well-formed 1916 * @return #GNUNET_OK if @a mh is well-formed
1914 */ 1917 */
1915int 1918int
1916check_union_p2p_demand (void *cls, 1919check_union_p2p_demand(void *cls,
1917 const struct GNUNET_MessageHeader *mh) 1920 const struct GNUNET_MessageHeader *mh)
1918{ 1921{
1919 struct Operation *op = cls; 1922 struct Operation *op = cls;
1920 unsigned int num_hashes; 1923 unsigned int num_hashes;
1921 1924
1922 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1925 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1923 { 1926 {
1924 GNUNET_break_op (0); 1927 GNUNET_break_op(0);
1925 return GNUNET_SYSERR; 1928 return GNUNET_SYSERR;
1926 } 1929 }
1927 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1930 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1928 / sizeof (struct GNUNET_HashCode); 1931 / sizeof(struct GNUNET_HashCode);
1929 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1932 if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1930 != num_hashes * sizeof (struct GNUNET_HashCode)) 1933 != num_hashes * sizeof(struct GNUNET_HashCode))
1931 { 1934 {
1932 GNUNET_break_op (0); 1935 GNUNET_break_op(0);
1933 return GNUNET_SYSERR; 1936 return GNUNET_SYSERR;
1934 } 1937 }
1935 return GNUNET_OK; 1938 return GNUNET_OK;
1936} 1939}
1937 1940
@@ -1944,8 +1947,8 @@ check_union_p2p_demand (void *cls,
1944 * @param mh the demand message 1947 * @param mh the demand message
1945 */ 1948 */
1946void 1949void
1947handle_union_p2p_demand (void *cls, 1950handle_union_p2p_demand(void *cls,
1948 const struct GNUNET_MessageHeader *mh) 1951 const struct GNUNET_MessageHeader *mh)
1949{ 1952{
1950 struct Operation *op = cls; 1953 struct Operation *op = cls;
1951 struct ElementEntry *ee; 1954 struct ElementEntry *ee;
@@ -1954,58 +1957,60 @@ handle_union_p2p_demand (void *cls,
1954 unsigned int num_hashes; 1957 unsigned int num_hashes;
1955 struct GNUNET_MQ_Envelope *ev; 1958 struct GNUNET_MQ_Envelope *ev;
1956 1959
1957 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1960 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1958 / sizeof (struct GNUNET_HashCode); 1961 / sizeof(struct GNUNET_HashCode);
1959 for (hash = (const struct GNUNET_HashCode *) &mh[1]; 1962 for (hash = (const struct GNUNET_HashCode *)&mh[1];
1960 num_hashes > 0; 1963 num_hashes > 0;
1961 hash++, num_hashes--) 1964 hash++, num_hashes--)
1962 {
1963 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1964 hash);
1965 if (NULL == ee)
1966 { 1965 {
1967 /* Demand for non-existing element. */ 1966 ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements,
1968 GNUNET_break_op (0); 1967 hash);
1969 fail_union_operation (op); 1968 if (NULL == ee)
1970 return; 1969 {
1971 } 1970 /* Demand for non-existing element. */
1972 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) 1971 GNUNET_break_op(0);
1973 { 1972 fail_union_operation(op);
1974 /* Probably confused lazily copied sets. */ 1973 return;
1975 GNUNET_break_op (0); 1974 }
1976 fail_union_operation (op); 1975 if (GNUNET_NO == _GSS_is_element_of_operation(ee, op))
1977 return; 1976 {
1978 } 1977 /* Probably confused lazily copied sets. */
1979 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); 1978 GNUNET_break_op(0);
1980 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); 1979 fail_union_operation(op);
1981 emsg->reserved = htons (0); 1980 return;
1982 emsg->element_type = htons (ee->element.element_type); 1981 }
1983 LOG (GNUNET_ERROR_TYPE_DEBUG, 1982 ev = GNUNET_MQ_msg_extra(emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1984 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", 1983 GNUNET_memcpy(&emsg[1], ee->element.data, ee->element.size);
1985 (void *) op, 1984 emsg->reserved = htons(0);
1986 (unsigned int) ee->element.size, 1985 emsg->element_type = htons(ee->element.element_type);
1987 GNUNET_h2s (&ee->element_hash)); 1986 LOG(GNUNET_ERROR_TYPE_DEBUG,
1988 GNUNET_MQ_send (op->mq, ev); 1987 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1989 GNUNET_STATISTICS_update (_GSS_statistics, 1988 (void *)op,
1990 "# exchanged elements", 1989 (unsigned int)ee->element.size,
1991 1, 1990 GNUNET_h2s(&ee->element_hash));
1992 GNUNET_NO); 1991 GNUNET_MQ_send(op->mq, ev);
1993 1992 GNUNET_STATISTICS_update(_GSS_statistics,
1994 switch (op->result_mode) 1993 "# exchanged elements",
1995 { 1994 1,
1996 case GNUNET_SET_RESULT_ADDED: 1995 GNUNET_NO);
1997 /* Nothing to do. */ 1996
1998 break; 1997 switch (op->result_mode)
1999 case GNUNET_SET_RESULT_SYMMETRIC: 1998 {
2000 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); 1999 case GNUNET_SET_RESULT_ADDED:
2001 break; 2000 /* Nothing to do. */
2002 default: 2001 break;
2003 /* Result mode not supported, should have been caught earlier. */ 2002
2004 GNUNET_break (0); 2003 case GNUNET_SET_RESULT_SYMMETRIC:
2005 break; 2004 send_client_element(op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
2005 break;
2006
2007 default:
2008 /* Result mode not supported, should have been caught earlier. */
2009 GNUNET_break(0);
2010 break;
2011 }
2006 } 2012 }
2007 } 2013 GNUNET_CADET_receive_done(op->channel);
2008 GNUNET_CADET_receive_done (op->channel);
2009} 2014}
2010 2015
2011 2016
@@ -2017,32 +2022,32 @@ handle_union_p2p_demand (void *cls,
2017 * @return #GNUNET_OK if @a mh is well-formed 2022 * @return #GNUNET_OK if @a mh is well-formed
2018 */ 2023 */
2019int 2024int
2020check_union_p2p_offer (void *cls, 2025check_union_p2p_offer(void *cls,
2021 const struct GNUNET_MessageHeader *mh) 2026 const struct GNUNET_MessageHeader *mh)
2022{ 2027{
2023 struct Operation *op = cls; 2028 struct Operation *op = cls;
2024 unsigned int num_hashes; 2029 unsigned int num_hashes;
2025 2030
2026 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 2031 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2027 { 2032 {
2028 GNUNET_break_op (0); 2033 GNUNET_break_op(0);
2029 return GNUNET_SYSERR; 2034 return GNUNET_SYSERR;
2030 } 2035 }
2031 /* look up elements and send them */ 2036 /* look up elements and send them */
2032 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && 2037 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2033 (op->state->phase != PHASE_INVENTORY_ACTIVE)) 2038 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2034 { 2039 {
2035 GNUNET_break_op (0); 2040 GNUNET_break_op(0);
2036 return GNUNET_SYSERR; 2041 return GNUNET_SYSERR;
2037 } 2042 }
2038 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 2043 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2039 / sizeof (struct GNUNET_HashCode); 2044 / sizeof(struct GNUNET_HashCode);
2040 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) != 2045 if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2041 num_hashes * sizeof (struct GNUNET_HashCode)) 2046 num_hashes * sizeof(struct GNUNET_HashCode))
2042 { 2047 {
2043 GNUNET_break_op (0); 2048 GNUNET_break_op(0);
2044 return GNUNET_SYSERR; 2049 return GNUNET_SYSERR;
2045 } 2050 }
2046 return GNUNET_OK; 2051 return GNUNET_OK;
2047} 2052}
2048 2053
@@ -2055,56 +2060,56 @@ check_union_p2p_offer (void *cls,
2055 * @param mh the message 2060 * @param mh the message
2056 */ 2061 */
2057void 2062void
2058handle_union_p2p_offer (void *cls, 2063handle_union_p2p_offer(void *cls,
2059 const struct GNUNET_MessageHeader *mh) 2064 const struct GNUNET_MessageHeader *mh)
2060{ 2065{
2061 struct Operation *op = cls; 2066 struct Operation *op = cls;
2062 const struct GNUNET_HashCode *hash; 2067 const struct GNUNET_HashCode *hash;
2063 unsigned int num_hashes; 2068 unsigned int num_hashes;
2064 2069
2065 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 2070 num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2066 / sizeof (struct GNUNET_HashCode); 2071 / sizeof(struct GNUNET_HashCode);
2067 for (hash = (const struct GNUNET_HashCode *) &mh[1]; 2072 for (hash = (const struct GNUNET_HashCode *)&mh[1];
2068 num_hashes > 0; 2073 num_hashes > 0;
2069 hash++, num_hashes--) 2074 hash++, num_hashes--)
2070 { 2075 {
2071 struct ElementEntry *ee; 2076 struct ElementEntry *ee;
2072 struct GNUNET_MessageHeader *demands; 2077 struct GNUNET_MessageHeader *demands;
2073 struct GNUNET_MQ_Envelope *ev; 2078 struct GNUNET_MQ_Envelope *ev;
2074 2079
2075 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, 2080 ee = GNUNET_CONTAINER_multihashmap_get(op->set->content->elements,
2076 hash); 2081 hash);
2077 if (NULL != ee) 2082 if (NULL != ee)
2078 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) 2083 if (GNUNET_YES == _GSS_is_element_of_operation(ee, op))
2079 continue; 2084 continue;
2080 2085
2081 if (GNUNET_YES == 2086 if (GNUNET_YES ==
2082 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, 2087 GNUNET_CONTAINER_multihashmap_contains(op->state->demanded_hashes,
2083 hash)) 2088 hash))
2084 { 2089 {
2085 LOG (GNUNET_ERROR_TYPE_DEBUG, 2090 LOG(GNUNET_ERROR_TYPE_DEBUG,
2086 "Skipped sending duplicate demand\n"); 2091 "Skipped sending duplicate demand\n");
2087 continue; 2092 continue;
2088 } 2093 }
2089 2094
2090 GNUNET_assert (GNUNET_OK == 2095 GNUNET_assert(GNUNET_OK ==
2091 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes, 2096 GNUNET_CONTAINER_multihashmap_put(op->state->demanded_hashes,
2092 hash, 2097 hash,
2093 NULL, 2098 NULL,
2094 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); 2099 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2095 2100
2096 LOG (GNUNET_ERROR_TYPE_DEBUG, 2101 LOG(GNUNET_ERROR_TYPE_DEBUG,
2097 "[OP %x] Requesting element (hash %s)\n", 2102 "[OP %x] Requesting element (hash %s)\n",
2098 (void *) op, GNUNET_h2s (hash)); 2103 (void *)op, GNUNET_h2s(hash));
2099 ev = GNUNET_MQ_msg_header_extra (demands, 2104 ev = GNUNET_MQ_msg_header_extra(demands,
2100 sizeof (struct GNUNET_HashCode), 2105 sizeof(struct GNUNET_HashCode),
2101 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); 2106 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2102 GNUNET_memcpy (&demands[1], 2107 GNUNET_memcpy(&demands[1],
2103 hash, 2108 hash,
2104 sizeof (struct GNUNET_HashCode)); 2109 sizeof(struct GNUNET_HashCode));
2105 GNUNET_MQ_send (op->mq, ev); 2110 GNUNET_MQ_send(op->mq, ev);
2106 } 2111 }
2107 GNUNET_CADET_receive_done (op->channel); 2112 GNUNET_CADET_receive_done(op->channel);
2108} 2113}
2109 2114
2110 2115
@@ -2115,56 +2120,58 @@ handle_union_p2p_offer (void *cls,
2115 * @param mh the message 2120 * @param mh the message
2116 */ 2121 */
2117void 2122void
2118handle_union_p2p_done (void *cls, 2123handle_union_p2p_done(void *cls,
2119 const struct GNUNET_MessageHeader *mh) 2124 const struct GNUNET_MessageHeader *mh)
2120{ 2125{
2121 struct Operation *op = cls; 2126 struct Operation *op = cls;
2122 2127
2123 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 2128 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2124 { 2129 {
2125 GNUNET_break_op (0); 2130 GNUNET_break_op(0);
2126 fail_union_operation (op); 2131 fail_union_operation(op);
2127 return; 2132 return;
2128 } 2133 }
2129 switch (op->state->phase) 2134 switch (op->state->phase)
2130 { 2135 {
2131 case PHASE_INVENTORY_PASSIVE: 2136 case PHASE_INVENTORY_PASSIVE:
2132 /* We got all requests, but still have to send our elements in response. */ 2137 /* We got all requests, but still have to send our elements in response. */
2133 op->state->phase = PHASE_FINISH_WAITING; 2138 op->state->phase = PHASE_FINISH_WAITING;
2134 2139
2135 LOG (GNUNET_ERROR_TYPE_DEBUG, 2140 LOG(GNUNET_ERROR_TYPE_DEBUG,
2136 "got DONE (as passive partner), waiting for our demands to be satisfied\n"); 2141 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2137 /* The active peer is done sending offers 2142 /* The active peer is done sending offers
2138 * and inquiries. This means that all 2143 * and inquiries. This means that all
2139 * our responses to that (demands and offers) 2144 * our responses to that (demands and offers)
2140 * must be in flight (queued or in mesh). 2145 * must be in flight (queued or in mesh).
2141 * 2146 *
2142 * We should notify the active peer once 2147 * We should notify the active peer once
2143 * all our demands are satisfied, so that the active 2148 * all our demands are satisfied, so that the active
2144 * peer can quit if we gave it everything. 2149 * peer can quit if we gave it everything.
2145 */ 2150 */
2146 GNUNET_CADET_receive_done (op->channel); 2151 GNUNET_CADET_receive_done(op->channel);
2147 maybe_finish (op); 2152 maybe_finish(op);
2148 return; 2153 return;
2149 case PHASE_INVENTORY_ACTIVE: 2154
2150 LOG (GNUNET_ERROR_TYPE_DEBUG, 2155 case PHASE_INVENTORY_ACTIVE:
2151 "got DONE (as active partner), waiting to finish\n"); 2156 LOG(GNUNET_ERROR_TYPE_DEBUG,
2152 /* All demands of the other peer are satisfied, 2157 "got DONE (as active partner), waiting to finish\n");
2153 * and we processed all offers, thus we know 2158 /* All demands of the other peer are satisfied,
2154 * exactly what our demands must be. 2159 * and we processed all offers, thus we know
2155 * 2160 * exactly what our demands must be.
2156 * We'll close the channel 2161 *
2157 * to the other peer once our demands are met. 2162 * We'll close the channel
2158 */ 2163 * to the other peer once our demands are met.
2159 op->state->phase = PHASE_FINISH_CLOSING; 2164 */
2160 GNUNET_CADET_receive_done (op->channel); 2165 op->state->phase = PHASE_FINISH_CLOSING;
2161 maybe_finish (op); 2166 GNUNET_CADET_receive_done(op->channel);
2162 return; 2167 maybe_finish(op);
2163 default: 2168 return;
2164 GNUNET_break_op (0); 2169
2165 fail_union_operation (op); 2170 default:
2166 return; 2171 GNUNET_break_op(0);
2167 } 2172 fail_union_operation(op);
2173 return;
2174 }
2168} 2175}
2169 2176
2170/** 2177/**
@@ -2174,10 +2181,10 @@ handle_union_p2p_done (void *cls,
2174 * @param mh the message 2181 * @param mh the message
2175 */ 2182 */
2176void 2183void
2177handle_union_p2p_over (void *cls, 2184handle_union_p2p_over(void *cls,
2178 const struct GNUNET_MessageHeader *mh) 2185 const struct GNUNET_MessageHeader *mh)
2179{ 2186{
2180 send_client_done (cls); 2187 send_client_done(cls);
2181} 2188}
2182 2189
2183 2190
@@ -2189,54 +2196,54 @@ handle_union_p2p_over (void *cls,
2189 * to convince it to accept, may be NULL 2196 * to convince it to accept, may be NULL
2190 */ 2197 */
2191static struct OperationState * 2198static struct OperationState *
2192union_evaluate (struct Operation *op, 2199union_evaluate(struct Operation *op,
2193 const struct GNUNET_MessageHeader *opaque_context) 2200 const struct GNUNET_MessageHeader *opaque_context)
2194{ 2201{
2195 struct OperationState *state; 2202 struct OperationState *state;
2196 struct GNUNET_MQ_Envelope *ev; 2203 struct GNUNET_MQ_Envelope *ev;
2197 struct OperationRequestMessage *msg; 2204 struct OperationRequestMessage *msg;
2198 2205
2199 ev = GNUNET_MQ_msg_nested_mh (msg, 2206 ev = GNUNET_MQ_msg_nested_mh(msg,
2200 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 2207 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2201 opaque_context); 2208 opaque_context);
2202 if (NULL == ev) 2209 if (NULL == ev)
2203 { 2210 {
2204 /* the context message is too large */ 2211 /* the context message is too large */
2205 GNUNET_break (0); 2212 GNUNET_break(0);
2206 return NULL; 2213 return NULL;
2207 } 2214 }
2208 state = GNUNET_new (struct OperationState); 2215 state = GNUNET_new(struct OperationState);
2209 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 2216 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32,
2210 GNUNET_NO); 2217 GNUNET_NO);
2211 /* copy the current generation's strata estimator for this operation */ 2218 /* copy the current generation's strata estimator for this operation */
2212 state->se = strata_estimator_dup (op->set->state->se); 2219 state->se = strata_estimator_dup(op->set->state->se);
2213 /* we started the operation, thus we have to send the operation request */ 2220 /* we started the operation, thus we have to send the operation request */
2214 state->phase = PHASE_EXPECT_SE; 2221 state->phase = PHASE_EXPECT_SE;
2215 state->salt_receive = state->salt_send = 42; // FIXME????? 2222 state->salt_receive = state->salt_send = 42; // FIXME?????
2216 LOG (GNUNET_ERROR_TYPE_DEBUG, 2223 LOG(GNUNET_ERROR_TYPE_DEBUG,
2217 "Initiating union operation evaluation\n"); 2224 "Initiating union operation evaluation\n");
2218 GNUNET_STATISTICS_update (_GSS_statistics, 2225 GNUNET_STATISTICS_update(_GSS_statistics,
2219 "# of total union operations", 2226 "# of total union operations",
2220 1, 2227 1,
2221 GNUNET_NO); 2228 GNUNET_NO);
2222 GNUNET_STATISTICS_update (_GSS_statistics, 2229 GNUNET_STATISTICS_update(_GSS_statistics,
2223 "# of initiated union operations", 2230 "# of initiated union operations",
2224 1, 2231 1,
2225 GNUNET_NO); 2232 GNUNET_NO);
2226 msg->operation = htonl (GNUNET_SET_OPERATION_UNION); 2233 msg->operation = htonl(GNUNET_SET_OPERATION_UNION);
2227 GNUNET_MQ_send (op->mq, 2234 GNUNET_MQ_send(op->mq,
2228 ev); 2235 ev);
2229 2236
2230 if (NULL != opaque_context) 2237 if (NULL != opaque_context)
2231 LOG (GNUNET_ERROR_TYPE_DEBUG, 2238 LOG(GNUNET_ERROR_TYPE_DEBUG,
2232 "sent op request with context message\n"); 2239 "sent op request with context message\n");
2233 else 2240 else
2234 LOG (GNUNET_ERROR_TYPE_DEBUG, 2241 LOG(GNUNET_ERROR_TYPE_DEBUG,
2235 "sent op request without context message\n"); 2242 "sent op request without context message\n");
2236 2243
2237 op->state = state; 2244 op->state = state;
2238 initialize_key_to_element (op); 2245 initialize_key_to_element(op);
2239 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element); 2246 state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element);
2240 return state; 2247 return state;
2241} 2248}
2242 2249
@@ -2248,7 +2255,7 @@ union_evaluate (struct Operation *op,
2248 * @param op operation that will be accepted as a union operation 2255 * @param op operation that will be accepted as a union operation
2249 */ 2256 */
2250static struct OperationState * 2257static struct OperationState *
2251union_accept (struct Operation *op) 2258union_accept(struct Operation *op)
2252{ 2259{
2253 struct OperationState *state; 2260 struct OperationState *state;
2254 const struct StrataEstimator *se; 2261 const struct StrataEstimator *se;
@@ -2258,46 +2265,46 @@ union_accept (struct Operation *op)
2258 size_t len; 2265 size_t len;
2259 uint16_t type; 2266 uint16_t type;
2260 2267
2261 LOG (GNUNET_ERROR_TYPE_DEBUG, 2268 LOG(GNUNET_ERROR_TYPE_DEBUG,
2262 "accepting set union operation\n"); 2269 "accepting set union operation\n");
2263 GNUNET_STATISTICS_update (_GSS_statistics, 2270 GNUNET_STATISTICS_update(_GSS_statistics,
2264 "# of accepted union operations", 2271 "# of accepted union operations",
2265 1, 2272 1,
2266 GNUNET_NO); 2273 GNUNET_NO);
2267 GNUNET_STATISTICS_update (_GSS_statistics, 2274 GNUNET_STATISTICS_update(_GSS_statistics,
2268 "# of total union operations", 2275 "# of total union operations",
2269 1, 2276 1,
2270 GNUNET_NO); 2277 GNUNET_NO);
2271 2278
2272 state = GNUNET_new (struct OperationState); 2279 state = GNUNET_new(struct OperationState);
2273 state->se = strata_estimator_dup (op->set->state->se); 2280 state->se = strata_estimator_dup(op->set->state->se);
2274 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 2281 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create(32,
2275 GNUNET_NO); 2282 GNUNET_NO);
2276 state->salt_receive = state->salt_send = 42; // FIXME????? 2283 state->salt_receive = state->salt_send = 42; // FIXME?????
2277 op->state = state; 2284 op->state = state;
2278 initialize_key_to_element (op); 2285 initialize_key_to_element(op);
2279 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element); 2286 state->initial_size = GNUNET_CONTAINER_multihashmap32_size(state->key_to_element);
2280 2287
2281 /* kick off the operation */ 2288 /* kick off the operation */
2282 se = state->se; 2289 se = state->se;
2283 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); 2290 buf = GNUNET_malloc(se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2284 len = strata_estimator_write (se, 2291 len = strata_estimator_write(se,
2285 buf); 2292 buf);
2286 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) 2293 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2287 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; 2294 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2288 else 2295 else
2289 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; 2296 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2290 ev = GNUNET_MQ_msg_extra (strata_msg, 2297 ev = GNUNET_MQ_msg_extra(strata_msg,
2291 len, 2298 len,
2292 type); 2299 type);
2293 GNUNET_memcpy (&strata_msg[1], 2300 GNUNET_memcpy(&strata_msg[1],
2294 buf, 2301 buf,
2295 len); 2302 len);
2296 GNUNET_free (buf); 2303 GNUNET_free(buf);
2297 strata_msg->set_size 2304 strata_msg->set_size
2298 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); 2305 = GNUNET_htonll(GNUNET_CONTAINER_multihashmap_size(op->set->content->elements));
2299 GNUNET_MQ_send (op->mq, 2306 GNUNET_MQ_send(op->mq,
2300 ev); 2307 ev);
2301 state->phase = PHASE_EXPECT_IBF; 2308 state->phase = PHASE_EXPECT_IBF;
2302 return state; 2309 return state;
2303} 2310}
@@ -2312,22 +2319,22 @@ union_accept (struct Operation *op)
2312 * @return the newly created set, NULL on error 2319 * @return the newly created set, NULL on error
2313 */ 2320 */
2314static struct SetState * 2321static struct SetState *
2315union_set_create (void) 2322union_set_create(void)
2316{ 2323{
2317 struct SetState *set_state; 2324 struct SetState *set_state;
2318 2325
2319 LOG (GNUNET_ERROR_TYPE_DEBUG, 2326 LOG(GNUNET_ERROR_TYPE_DEBUG,
2320 "union set created\n"); 2327 "union set created\n");
2321 set_state = GNUNET_new (struct SetState); 2328 set_state = GNUNET_new(struct SetState);
2322 set_state->se = strata_estimator_create (SE_STRATA_COUNT, 2329 set_state->se = strata_estimator_create(SE_STRATA_COUNT,
2323 SE_IBF_SIZE, SE_IBF_HASH_NUM); 2330 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2324 if (NULL == set_state->se) 2331 if (NULL == set_state->se)
2325 { 2332 {
2326 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2333 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
2327 "Failed to allocate strata estimator\n"); 2334 "Failed to allocate strata estimator\n");
2328 GNUNET_free (set_state); 2335 GNUNET_free(set_state);
2329 return NULL; 2336 return NULL;
2330 } 2337 }
2331 return set_state; 2338 return set_state;
2332} 2339}
2333 2340
@@ -2339,11 +2346,11 @@ union_set_create (void)
2339 * @param ee the element to add to the set 2346 * @param ee the element to add to the set
2340 */ 2347 */
2341static void 2348static void
2342union_add (struct SetState *set_state, 2349union_add(struct SetState *set_state,
2343 struct ElementEntry *ee) 2350 struct ElementEntry *ee)
2344{ 2351{
2345 strata_estimator_insert (set_state->se, 2352 strata_estimator_insert(set_state->se,
2346 get_ibf_key (&ee->element_hash)); 2353 get_ibf_key(&ee->element_hash));
2347} 2354}
2348 2355
2349 2356
@@ -2355,11 +2362,11 @@ union_add (struct SetState *set_state,
2355 * @param ee set element to remove 2362 * @param ee set element to remove
2356 */ 2363 */
2357static void 2364static void
2358union_remove (struct SetState *set_state, 2365union_remove(struct SetState *set_state,
2359 struct ElementEntry *ee) 2366 struct ElementEntry *ee)
2360{ 2367{
2361 strata_estimator_remove (set_state->se, 2368 strata_estimator_remove(set_state->se,
2362 get_ibf_key (&ee->element_hash)); 2369 get_ibf_key(&ee->element_hash));
2363} 2370}
2364 2371
2365 2372
@@ -2369,14 +2376,14 @@ union_remove (struct SetState *set_state,
2369 * @param set_state the set to destroy 2376 * @param set_state the set to destroy
2370 */ 2377 */
2371static void 2378static void
2372union_set_destroy (struct SetState *set_state) 2379union_set_destroy(struct SetState *set_state)
2373{ 2380{
2374 if (NULL != set_state->se) 2381 if (NULL != set_state->se)
2375 { 2382 {
2376 strata_estimator_destroy (set_state->se); 2383 strata_estimator_destroy(set_state->se);
2377 set_state->se = NULL; 2384 set_state->se = NULL;
2378 } 2385 }
2379 GNUNET_free (set_state); 2386 GNUNET_free(set_state);
2380} 2387}
2381 2388
2382 2389
@@ -2387,14 +2394,14 @@ union_set_destroy (struct SetState *set_state)
2387 * @return a copy of the union-specific set state 2394 * @return a copy of the union-specific set state
2388 */ 2395 */
2389static struct SetState * 2396static struct SetState *
2390union_copy_state (struct SetState *state) 2397union_copy_state(struct SetState *state)
2391{ 2398{
2392 struct SetState *new_state; 2399 struct SetState *new_state;
2393 2400
2394 GNUNET_assert ( (NULL != state) && 2401 GNUNET_assert((NULL != state) &&
2395 (NULL != state->se) ); 2402 (NULL != state->se));
2396 new_state = GNUNET_new (struct SetState); 2403 new_state = GNUNET_new(struct SetState);
2397 new_state->se = strata_estimator_dup (state->se); 2404 new_state->se = strata_estimator_dup(state->se);
2398 2405
2399 return new_state; 2406 return new_state;
2400} 2407}
@@ -2406,11 +2413,11 @@ union_copy_state (struct SetState *state)
2406 * @param op operation that lost the channel 2413 * @param op operation that lost the channel
2407 */ 2414 */
2408static void 2415static void
2409union_channel_death (struct Operation *op) 2416union_channel_death(struct Operation *op)
2410{ 2417{
2411 send_client_done (op); 2418 send_client_done(op);
2412 _GSS_operation_destroy (op, 2419 _GSS_operation_destroy(op,
2413 GNUNET_YES); 2420 GNUNET_YES);
2414} 2421}
2415 2422
2416 2423
@@ -2421,7 +2428,7 @@ union_channel_death (struct Operation *op)
2421 * @return the operation specific VTable 2428 * @return the operation specific VTable
2422 */ 2429 */
2423const struct SetVT * 2430const struct SetVT *
2424_GSS_union_vt () 2431_GSS_union_vt()
2425{ 2432{
2426 static const struct SetVT union_vt = { 2433 static const struct SetVT union_vt = {
2427 .create = &union_set_create, 2434 .create = &union_set_create,