diff options
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 454 |
1 files changed, 414 insertions, 40 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index acaabd94a..ab39a2e8a 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -85,6 +85,7 @@ enum UnionOperationPhase | |||
85 | * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. | 85 | * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. |
86 | * | 86 | * |
87 | * XXX: could use better wording. | 87 | * XXX: could use better wording. |
88 | * XXX: repurposed to also expect a "request full set" message, should be renamed | ||
88 | * | 89 | * |
89 | * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS | 90 | * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS |
90 | */ | 91 | */ |
@@ -115,14 +116,22 @@ enum UnionOperationPhase | |||
115 | * In the penultimate phase, | 116 | * In the penultimate phase, |
116 | * we wait until all our demands | 117 | * we wait until all our demands |
117 | * are satisfied. Then we send a done | 118 | * are satisfied. Then we send a done |
118 | * message, and wait for another done message.*/ | 119 | * message, and wait for another done message. |
120 | */ | ||
119 | PHASE_FINISH_WAITING, | 121 | PHASE_FINISH_WAITING, |
120 | 122 | ||
121 | /** | 123 | /** |
122 | * In the ultimate phase, we wait until | 124 | * In the ultimate phase, we wait until |
123 | * our demands are satisfied and then | 125 | * our demands are satisfied and then |
124 | * quit (sending another DONE message). */ | 126 | * quit (sending another DONE message). |
125 | PHASE_DONE | 127 | */ |
128 | PHASE_DONE, | ||
129 | |||
130 | /** | ||
131 | * After sending the full set, wait for responses with the elements | ||
132 | * that the local peer is missing. | ||
133 | */ | ||
134 | PHASE_FULL_SENDING, | ||
126 | }; | 135 | }; |
127 | 136 | ||
128 | 137 | ||
@@ -148,7 +157,7 @@ struct OperationState | |||
148 | struct InvertibleBloomFilter *local_ibf; | 157 | struct InvertibleBloomFilter *local_ibf; |
149 | 158 | ||
150 | /** | 159 | /** |
151 | * Maps IBF-Keys (specific to the current salt) to elements. | 160 | * Maps unsalted IBF-Keys to elements. |
152 | * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. | 161 | * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. |
153 | * Colliding IBF-Keys are linked. | 162 | * Colliding IBF-Keys are linked. |
154 | */ | 163 | */ |
@@ -183,6 +192,23 @@ struct OperationState | |||
183 | * Salt for the IBF we've received and that we're currently decoding. | 192 | * Salt for the IBF we've received and that we're currently decoding. |
184 | */ | 193 | */ |
185 | uint32_t salt_receive; | 194 | uint32_t salt_receive; |
195 | |||
196 | /** | ||
197 | * Number of elements we received from the other peer | ||
198 | * that were not in the local set yet. | ||
199 | */ | ||
200 | uint32_t received_fresh; | ||
201 | |||
202 | /** | ||
203 | * Total number of elements received from the other peer. | ||
204 | */ | ||
205 | uint32_t received_total; | ||
206 | |||
207 | /** | ||
208 | * Initial size of our set, just before | ||
209 | * the operation started. | ||
210 | */ | ||
211 | uint64_t initial_size; | ||
186 | }; | 212 | }; |
187 | 213 | ||
188 | 214 | ||
@@ -203,6 +229,14 @@ struct KeyEntry | |||
203 | * is #GNUNET_YES. | 229 | * is #GNUNET_YES. |
204 | */ | 230 | */ |
205 | struct ElementEntry *element; | 231 | struct ElementEntry *element; |
232 | |||
233 | /** | ||
234 | * Did we receive this element? | ||
235 | * Even if element->is_foreign is false, we might | ||
236 | * have received the element, so this indicates that | ||
237 | * the other peer has it. | ||
238 | */ | ||
239 | int received; | ||
206 | }; | 240 | }; |
207 | 241 | ||
208 | 242 | ||
@@ -362,6 +396,16 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
362 | 396 | ||
363 | 397 | ||
364 | /** | 398 | /** |
399 | * Context for #op_get_element_iterator | ||
400 | */ | ||
401 | struct GetElementContext | ||
402 | { | ||
403 | struct GNUNET_HashCode hash; | ||
404 | struct KeyEntry *k; | ||
405 | }; | ||
406 | |||
407 | |||
408 | /** | ||
365 | * Iterator over the mapping from IBF keys to element entries. Checks if we | 409 | * Iterator over the mapping from IBF keys to element entries. Checks if we |
366 | * have an element with a given GNUNET_HashCode. | 410 | * have an element with a given GNUNET_HashCode. |
367 | * | 411 | * |
@@ -372,17 +416,20 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
372 | * #GNUNET_NO if we've found the element. | 416 | * #GNUNET_NO if we've found the element. |
373 | */ | 417 | */ |
374 | static int | 418 | static int |
375 | op_has_element_iterator (void *cls, | 419 | op_get_element_iterator (void *cls, |
376 | uint32_t key, | 420 | uint32_t key, |
377 | void *value) | 421 | void *value) |
378 | { | 422 | { |
379 | struct GNUNET_HashCode *element_hash = cls; | 423 | struct GetElementContext *ctx = cls; |
380 | struct KeyEntry *k = value; | 424 | struct KeyEntry *k = value; |
381 | 425 | ||
382 | GNUNET_assert (NULL != k); | 426 | GNUNET_assert (NULL != k); |
383 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, | 427 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, |
384 | element_hash)) | 428 | &ctx->hash)) |
429 | { | ||
430 | ctx->k = k; | ||
385 | return GNUNET_NO; | 431 | return GNUNET_NO; |
432 | } | ||
386 | return GNUNET_YES; | 433 | return GNUNET_YES; |
387 | } | 434 | } |
388 | 435 | ||
@@ -395,23 +442,29 @@ op_has_element_iterator (void *cls, | |||
395 | * @param element_hash hash of the element to look for | 442 | * @param element_hash hash of the element to look for |
396 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise | 443 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise |
397 | */ | 444 | */ |
398 | static int | 445 | static struct KeyEntry * |
399 | op_has_element (struct Operation *op, | 446 | op_get_element (struct Operation *op, |
400 | const struct GNUNET_HashCode *element_hash) | 447 | const struct GNUNET_HashCode *element_hash) |
401 | { | 448 | { |
402 | int ret; | 449 | int ret; |
403 | struct IBF_Key ibf_key; | 450 | struct IBF_Key ibf_key; |
451 | struct GetElementContext ctx = { 0 }; | ||
452 | |||
453 | ctx.hash = *element_hash; | ||
404 | 454 | ||
405 | ibf_key = get_ibf_key (element_hash); | 455 | ibf_key = get_ibf_key (element_hash); |
406 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | 456 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, |
407 | (uint32_t) ibf_key.key_val, | 457 | (uint32_t) ibf_key.key_val, |
408 | op_has_element_iterator, | 458 | op_get_element_iterator, |
409 | (void *) element_hash); | 459 | &ctx); |
410 | 460 | ||
411 | /* was the iteration aborted because we found the element? */ | 461 | /* was the iteration aborted because we found the element? */ |
412 | if (GNUNET_SYSERR == ret) | 462 | if (GNUNET_SYSERR == ret) |
413 | return GNUNET_YES; | 463 | { |
414 | return GNUNET_NO; | 464 | GNUNET_assert (NULL != ctx.k); |
465 | return ctx.k; | ||
466 | } | ||
467 | return NULL; | ||
415 | } | 468 | } |
416 | 469 | ||
417 | 470 | ||
@@ -427,10 +480,12 @@ op_has_element (struct Operation *op, | |||
427 | * | 480 | * |
428 | * @param op the union operation | 481 | * @param op the union operation |
429 | * @param ee the element entry | 482 | * @param ee the element entry |
483 | * @parem received was this element received from the remote peer? | ||
430 | */ | 484 | */ |
431 | static void | 485 | static void |
432 | op_register_element (struct Operation *op, | 486 | op_register_element (struct Operation *op, |
433 | struct ElementEntry *ee) | 487 | struct ElementEntry *ee, |
488 | int received) | ||
434 | { | 489 | { |
435 | struct IBF_Key ibf_key; | 490 | struct IBF_Key ibf_key; |
436 | struct KeyEntry *k; | 491 | struct KeyEntry *k; |
@@ -439,6 +494,7 @@ op_register_element (struct Operation *op, | |||
439 | k = GNUNET_new (struct KeyEntry); | 494 | k = GNUNET_new (struct KeyEntry); |
440 | k->element = ee; | 495 | k->element = ee; |
441 | k->ibf_key = ibf_key; | 496 | k->ibf_key = ibf_key; |
497 | k->received = received; | ||
442 | GNUNET_assert (GNUNET_OK == | 498 | GNUNET_assert (GNUNET_OK == |
443 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, | 499 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, |
444 | (uint32_t) ibf_key.key_val, | 500 | (uint32_t) ibf_key.key_val, |
@@ -524,12 +580,30 @@ init_key_to_element_iterator (void *cls, | |||
524 | 580 | ||
525 | GNUNET_assert (GNUNET_NO == ee->remote); | 581 | GNUNET_assert (GNUNET_NO == ee->remote); |
526 | 582 | ||
527 | op_register_element (op, ee); | 583 | op_register_element (op, ee, GNUNET_NO); |
528 | return GNUNET_YES; | 584 | return GNUNET_YES; |
529 | } | 585 | } |
530 | 586 | ||
531 | 587 | ||
532 | /** | 588 | /** |
589 | * Initialize the IBF key to element mapping local to this set | ||
590 | * operation. | ||
591 | * | ||
592 | * @param op the set union operation | ||
593 | */ | ||
594 | static void | ||
595 | initialize_key_to_element (struct Operation *op) | ||
596 | { | ||
597 | unsigned int len; | ||
598 | |||
599 | GNUNET_assert (NULL == op->state->key_to_element); | ||
600 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
601 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | ||
602 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); | ||
603 | } | ||
604 | |||
605 | |||
606 | /** | ||
533 | * Create an ibf with the operation's elements | 607 | * Create an ibf with the operation's elements |
534 | * of the specified size | 608 | * of the specified size |
535 | * | 609 | * |
@@ -541,15 +615,8 @@ static int | |||
541 | prepare_ibf (struct Operation *op, | 615 | prepare_ibf (struct Operation *op, |
542 | uint32_t size) | 616 | uint32_t size) |
543 | { | 617 | { |
544 | if (NULL == op->state->key_to_element) | 618 | GNUNET_assert (NULL != op->state->key_to_element); |
545 | { | ||
546 | unsigned int len; | ||
547 | 619 | ||
548 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
549 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | ||
550 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
551 | init_key_to_element_iterator, op); | ||
552 | } | ||
553 | if (NULL != op->state->local_ibf) | 620 | if (NULL != op->state->local_ibf) |
554 | ibf_destroy (op->state->local_ibf); | 621 | ibf_destroy (op->state->local_ibf); |
555 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 622 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
@@ -648,7 +715,7 @@ send_strata_estimator (struct Operation *op) | |||
648 | { | 715 | { |
649 | const struct StrataEstimator *se = op->state->se; | 716 | const struct StrataEstimator *se = op->state->se; |
650 | struct GNUNET_MQ_Envelope *ev; | 717 | struct GNUNET_MQ_Envelope *ev; |
651 | struct GNUNET_MessageHeader *strata_msg; | 718 | struct StrataEstimatorMessage *strata_msg; |
652 | char *buf; | 719 | char *buf; |
653 | size_t len; | 720 | size_t len; |
654 | uint16_t type; | 721 | uint16_t type; |
@@ -660,13 +727,14 @@ send_strata_estimator (struct Operation *op) | |||
660 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; | 727 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; |
661 | else | 728 | else |
662 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; | 729 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; |
663 | ev = GNUNET_MQ_msg_header_extra (strata_msg, | 730 | ev = GNUNET_MQ_msg_extra (strata_msg, |
664 | len, | 731 | len, |
665 | type); | 732 | type); |
666 | GNUNET_memcpy (&strata_msg[1], | 733 | GNUNET_memcpy (&strata_msg[1], |
667 | buf, | 734 | buf, |
668 | len); | 735 | len); |
669 | GNUNET_free (buf); | 736 | GNUNET_free (buf); |
737 | strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); | ||
670 | GNUNET_MQ_send (op->mq, | 738 | GNUNET_MQ_send (op->mq, |
671 | ev); | 739 | ev); |
672 | op->state->phase = PHASE_EXPECT_IBF; | 740 | op->state->phase = PHASE_EXPECT_IBF; |
@@ -698,6 +766,47 @@ get_order_from_difference (unsigned int diff) | |||
698 | 766 | ||
699 | 767 | ||
700 | /** | 768 | /** |
769 | * Send a set element. | ||
770 | * | ||
771 | * @param cls the union operation `struct Operation *` | ||
772 | * @param key unused | ||
773 | * @param value the `struct ElementEntry *` to insert | ||
774 | * into the key-to-element mapping | ||
775 | * @return #GNUNET_YES (to continue iterating) | ||
776 | */ | ||
777 | static int | ||
778 | send_element_iterator (void *cls, | ||
779 | const struct GNUNET_HashCode *key, | ||
780 | void *value) | ||
781 | { | ||
782 | struct Operation *op = cls; | ||
783 | struct GNUNET_SET_ElementMessage *emsg; | ||
784 | struct GNUNET_SET_Element *el = value; | ||
785 | struct GNUNET_MQ_Envelope *ev; | ||
786 | |||
787 | ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | ||
788 | emsg->element_type = htonl (el->element_type); | ||
789 | GNUNET_memcpy (&emsg[1], el->data, el->size); | ||
790 | GNUNET_MQ_send (op->mq, ev); | ||
791 | return GNUNET_YES; | ||
792 | } | ||
793 | |||
794 | |||
795 | static void | ||
796 | send_full_set (struct Operation *op) | ||
797 | { | ||
798 | struct GNUNET_MQ_Envelope *ev; | ||
799 | |||
800 | op->state->phase = PHASE_FULL_SENDING; | ||
801 | |||
802 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
803 | &send_element_iterator, op); | ||
804 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | ||
805 | GNUNET_MQ_send (op->mq, ev); | ||
806 | } | ||
807 | |||
808 | |||
809 | /** | ||
701 | * Handle a strata estimator from a remote peer | 810 | * Handle a strata estimator from a remote peer |
702 | * | 811 | * |
703 | * @param cls the union operation | 812 | * @param cls the union operation |
@@ -713,7 +822,9 @@ handle_p2p_strata_estimator (void *cls, | |||
713 | { | 822 | { |
714 | struct Operation *op = cls; | 823 | struct Operation *op = cls; |
715 | struct StrataEstimator *remote_se; | 824 | struct StrataEstimator *remote_se; |
716 | int diff; | 825 | struct StrataEstimatorMessage *msg = (void *) mh; |
826 | unsigned int diff; | ||
827 | uint64_t other_size; | ||
717 | size_t len; | 828 | size_t len; |
718 | 829 | ||
719 | GNUNET_STATISTICS_update (_GSS_statistics, | 830 | GNUNET_STATISTICS_update (_GSS_statistics, |
@@ -723,11 +834,11 @@ handle_p2p_strata_estimator (void *cls, | |||
723 | 834 | ||
724 | if (op->state->phase != PHASE_EXPECT_SE) | 835 | if (op->state->phase != PHASE_EXPECT_SE) |
725 | { | 836 | { |
726 | fail_union_operation (op); | ||
727 | GNUNET_break (0); | 837 | GNUNET_break (0); |
838 | fail_union_operation (op); | ||
728 | return GNUNET_SYSERR; | 839 | return GNUNET_SYSERR; |
729 | } | 840 | } |
730 | len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); | 841 | len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); |
731 | if ( (GNUNET_NO == is_compressed) && | 842 | if ( (GNUNET_NO == is_compressed) && |
732 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) | 843 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) |
733 | { | 844 | { |
@@ -735,6 +846,7 @@ handle_p2p_strata_estimator (void *cls, | |||
735 | GNUNET_break (0); | 846 | GNUNET_break (0); |
736 | return GNUNET_SYSERR; | 847 | return GNUNET_SYSERR; |
737 | } | 848 | } |
849 | other_size = GNUNET_ntohll (msg->set_size); | ||
738 | remote_se = strata_estimator_create (SE_STRATA_COUNT, | 850 | remote_se = strata_estimator_create (SE_STRATA_COUNT, |
739 | SE_IBF_SIZE, | 851 | SE_IBF_SIZE, |
740 | SE_IBF_HASH_NUM); | 852 | SE_IBF_HASH_NUM); |
@@ -745,7 +857,7 @@ handle_p2p_strata_estimator (void *cls, | |||
745 | return GNUNET_SYSERR; | 857 | return GNUNET_SYSERR; |
746 | } | 858 | } |
747 | if (GNUNET_OK != | 859 | if (GNUNET_OK != |
748 | strata_estimator_read (&mh[1], | 860 | strata_estimator_read (&msg[1], |
749 | len, | 861 | len, |
750 | is_compressed, | 862 | is_compressed, |
751 | remote_se)) | 863 | remote_se)) |
@@ -765,16 +877,46 @@ handle_p2p_strata_estimator (void *cls, | |||
765 | "got se diff=%d, using ibf size %d\n", | 877 | "got se diff=%d, using ibf size %d\n", |
766 | diff, | 878 | diff, |
767 | 1<<get_order_from_difference (diff)); | 879 | 1<<get_order_from_difference (diff)); |
768 | if (GNUNET_OK != | 880 | |
769 | send_ibf (op, | 881 | if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) |
770 | get_order_from_difference (diff))) | ||
771 | { | 882 | { |
772 | /* Internal error, best we can do is shut the connection */ | 883 | GNUNET_break (0); |
773 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
774 | "Failed to send IBF, closing connection\n"); | ||
775 | fail_union_operation (op); | 884 | fail_union_operation (op); |
776 | return GNUNET_SYSERR; | 885 | return GNUNET_SYSERR; |
777 | } | 886 | } |
887 | |||
888 | |||
889 | if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2)) | ||
890 | { | ||
891 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
892 | "Sending full set (diff=%d, own set=%u)\n", | ||
893 | diff, | ||
894 | op->state->initial_size); | ||
895 | if (op->state->initial_size <= other_size) | ||
896 | { | ||
897 | send_full_set (op); | ||
898 | } | ||
899 | else | ||
900 | { | ||
901 | struct GNUNET_MQ_Envelope *ev; | ||
902 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); | ||
903 | GNUNET_MQ_send (op->mq, ev); | ||
904 | } | ||
905 | } | ||
906 | else | ||
907 | { | ||
908 | if (GNUNET_OK != | ||
909 | send_ibf (op, | ||
910 | get_order_from_difference (diff))) | ||
911 | { | ||
912 | /* Internal error, best we can do is shut the connection */ | ||
913 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
914 | "Failed to send IBF, closing connection\n"); | ||
915 | fail_union_operation (op); | ||
916 | return GNUNET_SYSERR; | ||
917 | } | ||
918 | } | ||
919 | |||
778 | return GNUNET_OK; | 920 | return GNUNET_OK; |
779 | } | 921 | } |
780 | 922 | ||
@@ -1210,6 +1352,8 @@ maybe_finish (struct Operation *op) | |||
1210 | 1352 | ||
1211 | /** | 1353 | /** |
1212 | * Handle an element message from a remote peer. | 1354 | * Handle an element message from a remote peer. |
1355 | * Sent by the other peer either because we decoded an IBF and placed a demand, | ||
1356 | * or because the other peer switched to full set transmission. | ||
1213 | * | 1357 | * |
1214 | * @param cls the union operation | 1358 | * @param cls the union operation |
1215 | * @param mh the message | 1359 | * @param mh the message |
@@ -1273,7 +1417,11 @@ handle_p2p_elements (void *cls, | |||
1273 | 1, | 1417 | 1, |
1274 | GNUNET_NO); | 1418 | GNUNET_NO); |
1275 | 1419 | ||
1276 | if (GNUNET_YES == op_has_element (op, &ee->element_hash)) | 1420 | op->state->received_total += 1; |
1421 | |||
1422 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); | ||
1423 | |||
1424 | if (NULL != ke) | ||
1277 | { | 1425 | { |
1278 | /* Got repeated element. Should not happen since | 1426 | /* Got repeated element. Should not happen since |
1279 | * we track demands. */ | 1427 | * we track demands. */ |
@@ -1281,13 +1429,15 @@ handle_p2p_elements (void *cls, | |||
1281 | "# repeated elements", | 1429 | "# repeated elements", |
1282 | 1, | 1430 | 1, |
1283 | GNUNET_NO); | 1431 | GNUNET_NO); |
1432 | ke->received = GNUNET_YES; | ||
1284 | GNUNET_free (ee); | 1433 | GNUNET_free (ee); |
1285 | } | 1434 | } |
1286 | else | 1435 | else |
1287 | { | 1436 | { |
1288 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1437 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1289 | "Registering new element from remote peer\n"); | 1438 | "Registering new element from remote peer\n"); |
1290 | op_register_element (op, ee); | 1439 | op->state->received_fresh += 1; |
1440 | op_register_element (op, ee, GNUNET_YES); | ||
1291 | /* only send results immediately if the client wants it */ | 1441 | /* only send results immediately if the client wants it */ |
1292 | switch (op->spec->result_mode) | 1442 | switch (op->spec->result_mode) |
1293 | { | 1443 | { |
@@ -1304,11 +1454,112 @@ handle_p2p_elements (void *cls, | |||
1304 | } | 1454 | } |
1305 | } | 1455 | } |
1306 | 1456 | ||
1457 | if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) | ||
1458 | { | ||
1459 | /* The other peer gave us lots of old elements, there's something wrong. */ | ||
1460 | GNUNET_break_op (0); | ||
1461 | fail_union_operation (op); | ||
1462 | return; | ||
1463 | } | ||
1464 | |||
1307 | maybe_finish (op); | 1465 | maybe_finish (op); |
1308 | } | 1466 | } |
1309 | 1467 | ||
1310 | 1468 | ||
1311 | /** | 1469 | /** |
1470 | * Handle an element message from a remote peer. | ||
1471 | * | ||
1472 | * @param cls the union operation | ||
1473 | * @param mh the message | ||
1474 | */ | ||
1475 | static void | ||
1476 | handle_p2p_full_element (void *cls, | ||
1477 | const struct GNUNET_MessageHeader *mh) | ||
1478 | { | ||
1479 | struct Operation *op = cls; | ||
1480 | struct ElementEntry *ee; | ||
1481 | const struct GNUNET_SET_ElementMessage *emsg; | ||
1482 | uint16_t element_size; | ||
1483 | |||
1484 | if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) | ||
1485 | { | ||
1486 | GNUNET_break_op (0); | ||
1487 | fail_union_operation (op); | ||
1488 | return; | ||
1489 | } | ||
1490 | |||
1491 | emsg = (const struct GNUNET_SET_ElementMessage *) mh; | ||
1492 | |||
1493 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); | ||
1494 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | ||
1495 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | ||
1496 | ee->element.size = element_size; | ||
1497 | ee->element.data = &ee[1]; | ||
1498 | ee->element.element_type = ntohs (emsg->element_type); | ||
1499 | ee->remote = GNUNET_YES; | ||
1500 | GNUNET_SET_element_hash (&ee->element, &ee->element_hash); | ||
1501 | |||
1502 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1503 | "Got element (full diff, size %u, hash %s) from peer\n", | ||
1504 | (unsigned int) element_size, | ||
1505 | GNUNET_h2s (&ee->element_hash)); | ||
1506 | |||
1507 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1508 | "# received elements", | ||
1509 | 1, | ||
1510 | GNUNET_NO); | ||
1511 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1512 | "# exchanged elements", | ||
1513 | 1, | ||
1514 | GNUNET_NO); | ||
1515 | |||
1516 | op->state->received_total += 1; | ||
1517 | |||
1518 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); | ||
1519 | |||
1520 | if (NULL != ke) | ||
1521 | { | ||
1522 | /* Got repeated element. Should not happen since | ||
1523 | * we track demands. */ | ||
1524 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1525 | "# repeated elements", | ||
1526 | 1, | ||
1527 | GNUNET_NO); | ||
1528 | ke->received = GNUNET_YES; | ||
1529 | GNUNET_free (ee); | ||
1530 | } | ||
1531 | else | ||
1532 | { | ||
1533 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1534 | "Registering new element from remote peer\n"); | ||
1535 | op->state->received_fresh += 1; | ||
1536 | op_register_element (op, ee, GNUNET_YES); | ||
1537 | /* only send results immediately if the client wants it */ | ||
1538 | switch (op->spec->result_mode) | ||
1539 | { | ||
1540 | case GNUNET_SET_RESULT_ADDED: | ||
1541 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); | ||
1542 | break; | ||
1543 | case GNUNET_SET_RESULT_SYMMETRIC: | ||
1544 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | ||
1545 | break; | ||
1546 | default: | ||
1547 | /* Result mode not supported, should have been caught earlier. */ | ||
1548 | GNUNET_break (0); | ||
1549 | break; | ||
1550 | } | ||
1551 | } | ||
1552 | |||
1553 | if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) | ||
1554 | { | ||
1555 | /* The other peer gave us lots of old elements, there's something wrong. */ | ||
1556 | GNUNET_break_op (0); | ||
1557 | fail_union_operation (op); | ||
1558 | return; | ||
1559 | } | ||
1560 | } | ||
1561 | |||
1562 | /** | ||
1312 | * Send offers (for GNUNET_Hash-es) in response | 1563 | * Send offers (for GNUNET_Hash-es) in response |
1313 | * to inquiries (for IBF_Key-s). | 1564 | * to inquiries (for IBF_Key-s). |
1314 | * | 1565 | * |
@@ -1355,7 +1606,116 @@ handle_p2p_inquiry (void *cls, | |||
1355 | 1606 | ||
1356 | 1607 | ||
1357 | /** | 1608 | /** |
1358 | * FIXME | 1609 | * Iterator over hash map entries, called to |
1610 | * destroy the linked list of colliding ibf key entries. | ||
1611 | * | ||
1612 | * @param cls closure | ||
1613 | * @param key current key code | ||
1614 | * @param value value in the hash map | ||
1615 | * @return #GNUNET_YES if we should continue to iterate, | ||
1616 | * #GNUNET_NO if not. | ||
1617 | */ | ||
1618 | static int | ||
1619 | send_missing_elements_iter (void *cls, | ||
1620 | uint32_t key, | ||
1621 | void *value) | ||
1622 | { | ||
1623 | struct Operation *op = cls; | ||
1624 | struct KeyEntry *ke = value; | ||
1625 | struct GNUNET_MQ_Envelope *ev; | ||
1626 | struct GNUNET_SET_ElementMessage *emsg; | ||
1627 | struct ElementEntry *ee = ke->element; | ||
1628 | |||
1629 | if (GNUNET_YES == ke->received) | ||
1630 | return GNUNET_YES; | ||
1631 | |||
1632 | ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | ||
1633 | GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); | ||
1634 | emsg->reserved = htons (0); | ||
1635 | emsg->element_type = htons (ee->element.element_type); | ||
1636 | GNUNET_MQ_send (op->mq, ev); | ||
1637 | |||
1638 | return GNUNET_YES; | ||
1639 | } | ||
1640 | |||
1641 | |||
1642 | /** | ||
1643 | * Handle a | ||
1644 | * | ||
1645 | * @parem cls closure, a set union operation | ||
1646 | * @param mh the demand message | ||
1647 | */ | ||
1648 | static void | ||
1649 | handle_p2p_request_full (void *cls, | ||
1650 | const struct GNUNET_MessageHeader *mh) | ||
1651 | { | ||
1652 | struct Operation *op = cls; | ||
1653 | |||
1654 | if (PHASE_EXPECT_IBF != op->state->phase) | ||
1655 | { | ||
1656 | fail_union_operation (op); | ||
1657 | GNUNET_break_op (0); | ||
1658 | return; | ||
1659 | } | ||
1660 | |||
1661 | // FIXME: we need to check that our set is larger than the | ||
1662 | // byzantine_lower_bound by some threshold | ||
1663 | send_full_set (op); | ||
1664 | } | ||
1665 | |||
1666 | |||
1667 | /** | ||
1668 | * Handle a "full done" message. | ||
1669 | * | ||
1670 | * @parem cls closure, a set union operation | ||
1671 | * @param mh the demand message | ||
1672 | */ | ||
1673 | static void | ||
1674 | handle_p2p_full_done (void *cls, | ||
1675 | const struct GNUNET_MessageHeader *mh) | ||
1676 | { | ||
1677 | struct Operation *op = cls; | ||
1678 | |||
1679 | if (PHASE_EXPECT_IBF == op->state->phase) | ||
1680 | { | ||
1681 | struct GNUNET_MQ_Envelope *ev; | ||
1682 | |||
1683 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); | ||
1684 | |||
1685 | /* send all the elements that did not come from the remote peer */ | ||
1686 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | ||
1687 | &send_missing_elements_iter, | ||
1688 | op); | ||
1689 | |||
1690 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | ||
1691 | GNUNET_MQ_send (op->mq, ev); | ||
1692 | op->state->phase = PHASE_DONE; | ||
1693 | |||
1694 | /* we now wait until the other peer shuts the tunnel down*/ | ||
1695 | } | ||
1696 | else if (PHASE_FULL_SENDING == op->state->phase) | ||
1697 | { | ||
1698 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); | ||
1699 | /* We sent the full set, and got the response for that. We're done. */ | ||
1700 | op->state->phase = PHASE_DONE; | ||
1701 | send_done_and_destroy (op); | ||
1702 | } | ||
1703 | else | ||
1704 | { | ||
1705 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); | ||
1706 | GNUNET_break_op (0); | ||
1707 | fail_union_operation (op); | ||
1708 | return; | ||
1709 | } | ||
1710 | } | ||
1711 | |||
1712 | |||
1713 | /** | ||
1714 | * Handle a demand by the other peer for elements based on a list | ||
1715 | * of GNUNET_HashCode-s. | ||
1716 | * | ||
1717 | * @parem cls closure, a set union operation | ||
1718 | * @param mh the demand message | ||
1359 | */ | 1719 | */ |
1360 | static void | 1720 | static void |
1361 | handle_p2p_demand (void *cls, | 1721 | handle_p2p_demand (void *cls, |
@@ -1607,6 +1967,9 @@ union_evaluate (struct Operation *op, | |||
1607 | else | 1967 | else |
1608 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1968 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1609 | "sent op request without context message\n"); | 1969 | "sent op request without context message\n"); |
1970 | |||
1971 | op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
1972 | initialize_key_to_element (op); | ||
1610 | } | 1973 | } |
1611 | 1974 | ||
1612 | 1975 | ||
@@ -1636,6 +1999,8 @@ union_accept (struct Operation *op) | |||
1636 | op->state->se = strata_estimator_dup (op->spec->set->state->se); | 1999 | op->state->se = strata_estimator_dup (op->spec->set->state->se); |
1637 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); | 2000 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); |
1638 | op->state->salt_receive = op->state->salt_send = 42; | 2001 | op->state->salt_receive = op->state->salt_send = 42; |
2002 | op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
2003 | initialize_key_to_element (op); | ||
1639 | /* kick off the operation */ | 2004 | /* kick off the operation */ |
1640 | send_strata_estimator (op); | 2005 | send_strata_estimator (op); |
1641 | } | 2006 | } |
@@ -1743,6 +2108,9 @@ union_handle_p2p_message (struct Operation *op, | |||
1743 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | 2108 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: |
1744 | handle_p2p_elements (op, mh); | 2109 | handle_p2p_elements (op, mh); |
1745 | break; | 2110 | break; |
2111 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: | ||
2112 | handle_p2p_full_element (op, mh); | ||
2113 | break; | ||
1746 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: | 2114 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: |
1747 | handle_p2p_inquiry (op, mh); | 2115 | handle_p2p_inquiry (op, mh); |
1748 | break; | 2116 | break; |
@@ -1755,6 +2123,12 @@ union_handle_p2p_message (struct Operation *op, | |||
1755 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: | 2123 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: |
1756 | handle_p2p_demand (op, mh); | 2124 | handle_p2p_demand (op, mh); |
1757 | break; | 2125 | break; |
2126 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: | ||
2127 | handle_p2p_full_done (op, mh); | ||
2128 | break; | ||
2129 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL: | ||
2130 | handle_p2p_request_full (op, mh); | ||
2131 | break; | ||
1758 | default: | 2132 | default: |
1759 | /* Something wrong with cadet's message handlers? */ | 2133 | /* Something wrong with cadet's message handlers? */ |
1760 | GNUNET_assert (0); | 2134 | GNUNET_assert (0); |