aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c454
1 files changed, 414 insertions, 40 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index acaabd94a..ab39a2e8a 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -85,6 +85,7 @@ enum UnionOperationPhase
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. 85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86 * 86 *
87 * XXX: could use better wording. 87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
88 * 89 *
89 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS 90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90 */ 91 */
@@ -115,14 +116,22 @@ enum UnionOperationPhase
115 * In the penultimate phase, 116 * In the penultimate phase,
116 * we wait until all our demands 117 * we wait until all our demands
117 * are satisfied. Then we send a done 118 * are satisfied. Then we send a done
118 * message, and wait for another done message.*/ 119 * message, and wait for another done message.
120 */
119 PHASE_FINISH_WAITING, 121 PHASE_FINISH_WAITING,
120 122
121 /** 123 /**
122 * In the ultimate phase, we wait until 124 * In the ultimate phase, we wait until
123 * our demands are satisfied and then 125 * our demands are satisfied and then
124 * quit (sending another DONE message). */ 126 * quit (sending another DONE message).
125 PHASE_DONE 127 */
128 PHASE_DONE,
129
130 /**
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
133 */
134 PHASE_FULL_SENDING,
126}; 135};
127 136
128 137
@@ -148,7 +157,7 @@ struct OperationState
148 struct InvertibleBloomFilter *local_ibf; 157 struct InvertibleBloomFilter *local_ibf;
149 158
150 /** 159 /**
151 * Maps IBF-Keys (specific to the current salt) to elements. 160 * Maps unsalted IBF-Keys to elements.
152 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. 161 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153 * Colliding IBF-Keys are linked. 162 * Colliding IBF-Keys are linked.
154 */ 163 */
@@ -183,6 +192,23 @@ struct OperationState
183 * Salt for the IBF we've received and that we're currently decoding. 192 * Salt for the IBF we've received and that we're currently decoding.
184 */ 193 */
185 uint32_t salt_receive; 194 uint32_t salt_receive;
195
196 /**
197 * Number of elements we received from the other peer
198 * that were not in the local set yet.
199 */
200 uint32_t received_fresh;
201
202 /**
203 * Total number of elements received from the other peer.
204 */
205 uint32_t received_total;
206
207 /**
208 * Initial size of our set, just before
209 * the operation started.
210 */
211 uint64_t initial_size;
186}; 212};
187 213
188 214
@@ -203,6 +229,14 @@ struct KeyEntry
203 * is #GNUNET_YES. 229 * is #GNUNET_YES.
204 */ 230 */
205 struct ElementEntry *element; 231 struct ElementEntry *element;
232
233 /**
234 * Did we receive this element?
235 * Even if element->is_foreign is false, we might
236 * have received the element, so this indicates that
237 * the other peer has it.
238 */
239 int received;
206}; 240};
207 241
208 242
@@ -362,6 +396,16 @@ get_ibf_key (const struct GNUNET_HashCode *src)
362 396
363 397
364/** 398/**
399 * Context for #op_get_element_iterator
400 */
401struct GetElementContext
402{
403 struct GNUNET_HashCode hash;
404 struct KeyEntry *k;
405};
406
407
408/**
365 * Iterator over the mapping from IBF keys to element entries. Checks if we 409 * Iterator over the mapping from IBF keys to element entries. Checks if we
366 * have an element with a given GNUNET_HashCode. 410 * have an element with a given GNUNET_HashCode.
367 * 411 *
@@ -372,17 +416,20 @@ get_ibf_key (const struct GNUNET_HashCode *src)
372 * #GNUNET_NO if we've found the element. 416 * #GNUNET_NO if we've found the element.
373 */ 417 */
374static int 418static int
375op_has_element_iterator (void *cls, 419op_get_element_iterator (void *cls,
376 uint32_t key, 420 uint32_t key,
377 void *value) 421 void *value)
378{ 422{
379 struct GNUNET_HashCode *element_hash = cls; 423 struct GetElementContext *ctx = cls;
380 struct KeyEntry *k = value; 424 struct KeyEntry *k = value;
381 425
382 GNUNET_assert (NULL != k); 426 GNUNET_assert (NULL != k);
383 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, 427 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
384 element_hash)) 428 &ctx->hash))
429 {
430 ctx->k = k;
385 return GNUNET_NO; 431 return GNUNET_NO;
432 }
386 return GNUNET_YES; 433 return GNUNET_YES;
387} 434}
388 435
@@ -395,23 +442,29 @@ op_has_element_iterator (void *cls,
395 * @param element_hash hash of the element to look for 442 * @param element_hash hash of the element to look for
396 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise 443 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
397 */ 444 */
398static int 445static struct KeyEntry *
399op_has_element (struct Operation *op, 446op_get_element (struct Operation *op,
400 const struct GNUNET_HashCode *element_hash) 447 const struct GNUNET_HashCode *element_hash)
401{ 448{
402 int ret; 449 int ret;
403 struct IBF_Key ibf_key; 450 struct IBF_Key ibf_key;
451 struct GetElementContext ctx = { 0 };
452
453 ctx.hash = *element_hash;
404 454
405 ibf_key = get_ibf_key (element_hash); 455 ibf_key = get_ibf_key (element_hash);
406 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, 456 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
407 (uint32_t) ibf_key.key_val, 457 (uint32_t) ibf_key.key_val,
408 op_has_element_iterator, 458 op_get_element_iterator,
409 (void *) element_hash); 459 &ctx);
410 460
411 /* was the iteration aborted because we found the element? */ 461 /* was the iteration aborted because we found the element? */
412 if (GNUNET_SYSERR == ret) 462 if (GNUNET_SYSERR == ret)
413 return GNUNET_YES; 463 {
414 return GNUNET_NO; 464 GNUNET_assert (NULL != ctx.k);
465 return ctx.k;
466 }
467 return NULL;
415} 468}
416 469
417 470
@@ -427,10 +480,12 @@ op_has_element (struct Operation *op,
427 * 480 *
428 * @param op the union operation 481 * @param op the union operation
429 * @param ee the element entry 482 * @param ee the element entry
483 * @parem received was this element received from the remote peer?
430 */ 484 */
431static void 485static void
432op_register_element (struct Operation *op, 486op_register_element (struct Operation *op,
433 struct ElementEntry *ee) 487 struct ElementEntry *ee,
488 int received)
434{ 489{
435 struct IBF_Key ibf_key; 490 struct IBF_Key ibf_key;
436 struct KeyEntry *k; 491 struct KeyEntry *k;
@@ -439,6 +494,7 @@ op_register_element (struct Operation *op,
439 k = GNUNET_new (struct KeyEntry); 494 k = GNUNET_new (struct KeyEntry);
440 k->element = ee; 495 k->element = ee;
441 k->ibf_key = ibf_key; 496 k->ibf_key = ibf_key;
497 k->received = received;
442 GNUNET_assert (GNUNET_OK == 498 GNUNET_assert (GNUNET_OK ==
443 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, 499 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
444 (uint32_t) ibf_key.key_val, 500 (uint32_t) ibf_key.key_val,
@@ -524,12 +580,30 @@ init_key_to_element_iterator (void *cls,
524 580
525 GNUNET_assert (GNUNET_NO == ee->remote); 581 GNUNET_assert (GNUNET_NO == ee->remote);
526 582
527 op_register_element (op, ee); 583 op_register_element (op, ee, GNUNET_NO);
528 return GNUNET_YES; 584 return GNUNET_YES;
529} 585}
530 586
531 587
532/** 588/**
589 * Initialize the IBF key to element mapping local to this set
590 * operation.
591 *
592 * @param op the set union operation
593 */
594static void
595initialize_key_to_element (struct Operation *op)
596{
597 unsigned int len;
598
599 GNUNET_assert (NULL == op->state->key_to_element);
600 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
603}
604
605
606/**
533 * Create an ibf with the operation's elements 607 * Create an ibf with the operation's elements
534 * of the specified size 608 * of the specified size
535 * 609 *
@@ -541,15 +615,8 @@ static int
541prepare_ibf (struct Operation *op, 615prepare_ibf (struct Operation *op,
542 uint32_t size) 616 uint32_t size)
543{ 617{
544 if (NULL == op->state->key_to_element) 618 GNUNET_assert (NULL != op->state->key_to_element);
545 {
546 unsigned int len;
547 619
548 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
549 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
550 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
551 init_key_to_element_iterator, op);
552 }
553 if (NULL != op->state->local_ibf) 620 if (NULL != op->state->local_ibf)
554 ibf_destroy (op->state->local_ibf); 621 ibf_destroy (op->state->local_ibf);
555 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 622 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
@@ -648,7 +715,7 @@ send_strata_estimator (struct Operation *op)
648{ 715{
649 const struct StrataEstimator *se = op->state->se; 716 const struct StrataEstimator *se = op->state->se;
650 struct GNUNET_MQ_Envelope *ev; 717 struct GNUNET_MQ_Envelope *ev;
651 struct GNUNET_MessageHeader *strata_msg; 718 struct StrataEstimatorMessage *strata_msg;
652 char *buf; 719 char *buf;
653 size_t len; 720 size_t len;
654 uint16_t type; 721 uint16_t type;
@@ -660,13 +727,14 @@ send_strata_estimator (struct Operation *op)
660 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; 727 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
661 else 728 else
662 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; 729 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
663 ev = GNUNET_MQ_msg_header_extra (strata_msg, 730 ev = GNUNET_MQ_msg_extra (strata_msg,
664 len, 731 len,
665 type); 732 type);
666 GNUNET_memcpy (&strata_msg[1], 733 GNUNET_memcpy (&strata_msg[1],
667 buf, 734 buf,
668 len); 735 len);
669 GNUNET_free (buf); 736 GNUNET_free (buf);
737 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
670 GNUNET_MQ_send (op->mq, 738 GNUNET_MQ_send (op->mq,
671 ev); 739 ev);
672 op->state->phase = PHASE_EXPECT_IBF; 740 op->state->phase = PHASE_EXPECT_IBF;
@@ -698,6 +766,47 @@ get_order_from_difference (unsigned int diff)
698 766
699 767
700/** 768/**
769 * Send a set element.
770 *
771 * @param cls the union operation `struct Operation *`
772 * @param key unused
773 * @param value the `struct ElementEntry *` to insert
774 * into the key-to-element mapping
775 * @return #GNUNET_YES (to continue iterating)
776 */
777static int
778send_element_iterator (void *cls,
779 const struct GNUNET_HashCode *key,
780 void *value)
781{
782 struct Operation *op = cls;
783 struct GNUNET_SET_ElementMessage *emsg;
784 struct GNUNET_SET_Element *el = value;
785 struct GNUNET_MQ_Envelope *ev;
786
787 ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
788 emsg->element_type = htonl (el->element_type);
789 GNUNET_memcpy (&emsg[1], el->data, el->size);
790 GNUNET_MQ_send (op->mq, ev);
791 return GNUNET_YES;
792}
793
794
795static void
796send_full_set (struct Operation *op)
797{
798 struct GNUNET_MQ_Envelope *ev;
799
800 op->state->phase = PHASE_FULL_SENDING;
801
802 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
803 &send_element_iterator, op);
804 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
805 GNUNET_MQ_send (op->mq, ev);
806}
807
808
809/**
701 * Handle a strata estimator from a remote peer 810 * Handle a strata estimator from a remote peer
702 * 811 *
703 * @param cls the union operation 812 * @param cls the union operation
@@ -713,7 +822,9 @@ handle_p2p_strata_estimator (void *cls,
713{ 822{
714 struct Operation *op = cls; 823 struct Operation *op = cls;
715 struct StrataEstimator *remote_se; 824 struct StrataEstimator *remote_se;
716 int diff; 825 struct StrataEstimatorMessage *msg = (void *) mh;
826 unsigned int diff;
827 uint64_t other_size;
717 size_t len; 828 size_t len;
718 829
719 GNUNET_STATISTICS_update (_GSS_statistics, 830 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -723,11 +834,11 @@ handle_p2p_strata_estimator (void *cls,
723 834
724 if (op->state->phase != PHASE_EXPECT_SE) 835 if (op->state->phase != PHASE_EXPECT_SE)
725 { 836 {
726 fail_union_operation (op);
727 GNUNET_break (0); 837 GNUNET_break (0);
838 fail_union_operation (op);
728 return GNUNET_SYSERR; 839 return GNUNET_SYSERR;
729 } 840 }
730 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); 841 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
731 if ( (GNUNET_NO == is_compressed) && 842 if ( (GNUNET_NO == is_compressed) &&
732 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) 843 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
733 { 844 {
@@ -735,6 +846,7 @@ handle_p2p_strata_estimator (void *cls,
735 GNUNET_break (0); 846 GNUNET_break (0);
736 return GNUNET_SYSERR; 847 return GNUNET_SYSERR;
737 } 848 }
849 other_size = GNUNET_ntohll (msg->set_size);
738 remote_se = strata_estimator_create (SE_STRATA_COUNT, 850 remote_se = strata_estimator_create (SE_STRATA_COUNT,
739 SE_IBF_SIZE, 851 SE_IBF_SIZE,
740 SE_IBF_HASH_NUM); 852 SE_IBF_HASH_NUM);
@@ -745,7 +857,7 @@ handle_p2p_strata_estimator (void *cls,
745 return GNUNET_SYSERR; 857 return GNUNET_SYSERR;
746 } 858 }
747 if (GNUNET_OK != 859 if (GNUNET_OK !=
748 strata_estimator_read (&mh[1], 860 strata_estimator_read (&msg[1],
749 len, 861 len,
750 is_compressed, 862 is_compressed,
751 remote_se)) 863 remote_se))
@@ -765,16 +877,46 @@ handle_p2p_strata_estimator (void *cls,
765 "got se diff=%d, using ibf size %d\n", 877 "got se diff=%d, using ibf size %d\n",
766 diff, 878 diff,
767 1<<get_order_from_difference (diff)); 879 1<<get_order_from_difference (diff));
768 if (GNUNET_OK != 880
769 send_ibf (op, 881 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
770 get_order_from_difference (diff)))
771 { 882 {
772 /* Internal error, best we can do is shut the connection */ 883 GNUNET_break (0);
773 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
774 "Failed to send IBF, closing connection\n");
775 fail_union_operation (op); 884 fail_union_operation (op);
776 return GNUNET_SYSERR; 885 return GNUNET_SYSERR;
777 } 886 }
887
888
889 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
890 {
891 LOG (GNUNET_ERROR_TYPE_INFO,
892 "Sending full set (diff=%d, own set=%u)\n",
893 diff,
894 op->state->initial_size);
895 if (op->state->initial_size <= other_size)
896 {
897 send_full_set (op);
898 }
899 else
900 {
901 struct GNUNET_MQ_Envelope *ev;
902 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
903 GNUNET_MQ_send (op->mq, ev);
904 }
905 }
906 else
907 {
908 if (GNUNET_OK !=
909 send_ibf (op,
910 get_order_from_difference (diff)))
911 {
912 /* Internal error, best we can do is shut the connection */
913 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
914 "Failed to send IBF, closing connection\n");
915 fail_union_operation (op);
916 return GNUNET_SYSERR;
917 }
918 }
919
778 return GNUNET_OK; 920 return GNUNET_OK;
779} 921}
780 922
@@ -1210,6 +1352,8 @@ maybe_finish (struct Operation *op)
1210 1352
1211/** 1353/**
1212 * Handle an element message from a remote peer. 1354 * Handle an element message from a remote peer.
1355 * Sent by the other peer either because we decoded an IBF and placed a demand,
1356 * or because the other peer switched to full set transmission.
1213 * 1357 *
1214 * @param cls the union operation 1358 * @param cls the union operation
1215 * @param mh the message 1359 * @param mh the message
@@ -1273,7 +1417,11 @@ handle_p2p_elements (void *cls,
1273 1, 1417 1,
1274 GNUNET_NO); 1418 GNUNET_NO);
1275 1419
1276 if (GNUNET_YES == op_has_element (op, &ee->element_hash)) 1420 op->state->received_total += 1;
1421
1422 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1423
1424 if (NULL != ke)
1277 { 1425 {
1278 /* Got repeated element. Should not happen since 1426 /* Got repeated element. Should not happen since
1279 * we track demands. */ 1427 * we track demands. */
@@ -1281,13 +1429,15 @@ handle_p2p_elements (void *cls,
1281 "# repeated elements", 1429 "# repeated elements",
1282 1, 1430 1,
1283 GNUNET_NO); 1431 GNUNET_NO);
1432 ke->received = GNUNET_YES;
1284 GNUNET_free (ee); 1433 GNUNET_free (ee);
1285 } 1434 }
1286 else 1435 else
1287 { 1436 {
1288 LOG (GNUNET_ERROR_TYPE_DEBUG, 1437 LOG (GNUNET_ERROR_TYPE_DEBUG,
1289 "Registering new element from remote peer\n"); 1438 "Registering new element from remote peer\n");
1290 op_register_element (op, ee); 1439 op->state->received_fresh += 1;
1440 op_register_element (op, ee, GNUNET_YES);
1291 /* only send results immediately if the client wants it */ 1441 /* only send results immediately if the client wants it */
1292 switch (op->spec->result_mode) 1442 switch (op->spec->result_mode)
1293 { 1443 {
@@ -1304,11 +1454,112 @@ handle_p2p_elements (void *cls,
1304 } 1454 }
1305 } 1455 }
1306 1456
1457 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1458 {
1459 /* The other peer gave us lots of old elements, there's something wrong. */
1460 GNUNET_break_op (0);
1461 fail_union_operation (op);
1462 return;
1463 }
1464
1307 maybe_finish (op); 1465 maybe_finish (op);
1308} 1466}
1309 1467
1310 1468
1311/** 1469/**
1470 * Handle an element message from a remote peer.
1471 *
1472 * @param cls the union operation
1473 * @param mh the message
1474 */
1475static void
1476handle_p2p_full_element (void *cls,
1477 const struct GNUNET_MessageHeader *mh)
1478{
1479 struct Operation *op = cls;
1480 struct ElementEntry *ee;
1481 const struct GNUNET_SET_ElementMessage *emsg;
1482 uint16_t element_size;
1483
1484 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1485 {
1486 GNUNET_break_op (0);
1487 fail_union_operation (op);
1488 return;
1489 }
1490
1491 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1492
1493 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1494 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1495 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1496 ee->element.size = element_size;
1497 ee->element.data = &ee[1];
1498 ee->element.element_type = ntohs (emsg->element_type);
1499 ee->remote = GNUNET_YES;
1500 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1501
1502 LOG (GNUNET_ERROR_TYPE_DEBUG,
1503 "Got element (full diff, size %u, hash %s) from peer\n",
1504 (unsigned int) element_size,
1505 GNUNET_h2s (&ee->element_hash));
1506
1507 GNUNET_STATISTICS_update (_GSS_statistics,
1508 "# received elements",
1509 1,
1510 GNUNET_NO);
1511 GNUNET_STATISTICS_update (_GSS_statistics,
1512 "# exchanged elements",
1513 1,
1514 GNUNET_NO);
1515
1516 op->state->received_total += 1;
1517
1518 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1519
1520 if (NULL != ke)
1521 {
1522 /* Got repeated element. Should not happen since
1523 * we track demands. */
1524 GNUNET_STATISTICS_update (_GSS_statistics,
1525 "# repeated elements",
1526 1,
1527 GNUNET_NO);
1528 ke->received = GNUNET_YES;
1529 GNUNET_free (ee);
1530 }
1531 else
1532 {
1533 LOG (GNUNET_ERROR_TYPE_DEBUG,
1534 "Registering new element from remote peer\n");
1535 op->state->received_fresh += 1;
1536 op_register_element (op, ee, GNUNET_YES);
1537 /* only send results immediately if the client wants it */
1538 switch (op->spec->result_mode)
1539 {
1540 case GNUNET_SET_RESULT_ADDED:
1541 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1542 break;
1543 case GNUNET_SET_RESULT_SYMMETRIC:
1544 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1545 break;
1546 default:
1547 /* Result mode not supported, should have been caught earlier. */
1548 GNUNET_break (0);
1549 break;
1550 }
1551 }
1552
1553 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1554 {
1555 /* The other peer gave us lots of old elements, there's something wrong. */
1556 GNUNET_break_op (0);
1557 fail_union_operation (op);
1558 return;
1559 }
1560}
1561
1562/**
1312 * Send offers (for GNUNET_Hash-es) in response 1563 * Send offers (for GNUNET_Hash-es) in response
1313 * to inquiries (for IBF_Key-s). 1564 * to inquiries (for IBF_Key-s).
1314 * 1565 *
@@ -1355,7 +1606,116 @@ handle_p2p_inquiry (void *cls,
1355 1606
1356 1607
1357/** 1608/**
1358 * FIXME 1609 * Iterator over hash map entries, called to
1610 * destroy the linked list of colliding ibf key entries.
1611 *
1612 * @param cls closure
1613 * @param key current key code
1614 * @param value value in the hash map
1615 * @return #GNUNET_YES if we should continue to iterate,
1616 * #GNUNET_NO if not.
1617 */
1618static int
1619send_missing_elements_iter (void *cls,
1620 uint32_t key,
1621 void *value)
1622{
1623 struct Operation *op = cls;
1624 struct KeyEntry *ke = value;
1625 struct GNUNET_MQ_Envelope *ev;
1626 struct GNUNET_SET_ElementMessage *emsg;
1627 struct ElementEntry *ee = ke->element;
1628
1629 if (GNUNET_YES == ke->received)
1630 return GNUNET_YES;
1631
1632 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1633 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1634 emsg->reserved = htons (0);
1635 emsg->element_type = htons (ee->element.element_type);
1636 GNUNET_MQ_send (op->mq, ev);
1637
1638 return GNUNET_YES;
1639}
1640
1641
1642/**
1643 * Handle a
1644 *
1645 * @parem cls closure, a set union operation
1646 * @param mh the demand message
1647 */
1648static void
1649handle_p2p_request_full (void *cls,
1650 const struct GNUNET_MessageHeader *mh)
1651{
1652 struct Operation *op = cls;
1653
1654 if (PHASE_EXPECT_IBF != op->state->phase)
1655 {
1656 fail_union_operation (op);
1657 GNUNET_break_op (0);
1658 return;
1659 }
1660
1661 // FIXME: we need to check that our set is larger than the
1662 // byzantine_lower_bound by some threshold
1663 send_full_set (op);
1664}
1665
1666
1667/**
1668 * Handle a "full done" message.
1669 *
1670 * @parem cls closure, a set union operation
1671 * @param mh the demand message
1672 */
1673static void
1674handle_p2p_full_done (void *cls,
1675 const struct GNUNET_MessageHeader *mh)
1676{
1677 struct Operation *op = cls;
1678
1679 if (PHASE_EXPECT_IBF == op->state->phase)
1680 {
1681 struct GNUNET_MQ_Envelope *ev;
1682
1683 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1684
1685 /* send all the elements that did not come from the remote peer */
1686 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1687 &send_missing_elements_iter,
1688 op);
1689
1690 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1691 GNUNET_MQ_send (op->mq, ev);
1692 op->state->phase = PHASE_DONE;
1693
1694 /* we now wait until the other peer shuts the tunnel down*/
1695 }
1696 else if (PHASE_FULL_SENDING == op->state->phase)
1697 {
1698 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1699 /* We sent the full set, and got the response for that. We're done. */
1700 op->state->phase = PHASE_DONE;
1701 send_done_and_destroy (op);
1702 }
1703 else
1704 {
1705 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1706 GNUNET_break_op (0);
1707 fail_union_operation (op);
1708 return;
1709 }
1710}
1711
1712
1713/**
1714 * Handle a demand by the other peer for elements based on a list
1715 * of GNUNET_HashCode-s.
1716 *
1717 * @parem cls closure, a set union operation
1718 * @param mh the demand message
1359 */ 1719 */
1360static void 1720static void
1361handle_p2p_demand (void *cls, 1721handle_p2p_demand (void *cls,
@@ -1607,6 +1967,9 @@ union_evaluate (struct Operation *op,
1607 else 1967 else
1608 LOG (GNUNET_ERROR_TYPE_DEBUG, 1968 LOG (GNUNET_ERROR_TYPE_DEBUG,
1609 "sent op request without context message\n"); 1969 "sent op request without context message\n");
1970
1971 op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
1972 initialize_key_to_element (op);
1610} 1973}
1611 1974
1612 1975
@@ -1636,6 +1999,8 @@ union_accept (struct Operation *op)
1636 op->state->se = strata_estimator_dup (op->spec->set->state->se); 1999 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1637 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); 2000 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1638 op->state->salt_receive = op->state->salt_send = 42; 2001 op->state->salt_receive = op->state->salt_send = 42;
2002 op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
2003 initialize_key_to_element (op);
1639 /* kick off the operation */ 2004 /* kick off the operation */
1640 send_strata_estimator (op); 2005 send_strata_estimator (op);
1641} 2006}
@@ -1743,6 +2108,9 @@ union_handle_p2p_message (struct Operation *op,
1743 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 2108 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1744 handle_p2p_elements (op, mh); 2109 handle_p2p_elements (op, mh);
1745 break; 2110 break;
2111 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2112 handle_p2p_full_element (op, mh);
2113 break;
1746 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: 2114 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1747 handle_p2p_inquiry (op, mh); 2115 handle_p2p_inquiry (op, mh);
1748 break; 2116 break;
@@ -1755,6 +2123,12 @@ union_handle_p2p_message (struct Operation *op,
1755 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: 2123 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1756 handle_p2p_demand (op, mh); 2124 handle_p2p_demand (op, mh);
1757 break; 2125 break;
2126 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2127 handle_p2p_full_done (op, mh);
2128 break;
2129 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2130 handle_p2p_request_full (op, mh);
2131 break;
1758 default: 2132 default:
1759 /* Something wrong with cadet's message handlers? */ 2133 /* Something wrong with cadet's message handlers? */
1760 GNUNET_assert (0); 2134 GNUNET_assert (0);