diff options
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 482 |
1 files changed, 440 insertions, 42 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index acaabd94a..f46713c31 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -85,6 +85,7 @@ enum UnionOperationPhase | |||
85 | * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. | 85 | * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. |
86 | * | 86 | * |
87 | * XXX: could use better wording. | 87 | * XXX: could use better wording. |
88 | * XXX: repurposed to also expect a "request full set" message, should be renamed | ||
88 | * | 89 | * |
89 | * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS | 90 | * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS |
90 | */ | 91 | */ |
@@ -115,14 +116,22 @@ enum UnionOperationPhase | |||
115 | * In the penultimate phase, | 116 | * In the penultimate phase, |
116 | * we wait until all our demands | 117 | * we wait until all our demands |
117 | * are satisfied. Then we send a done | 118 | * are satisfied. Then we send a done |
118 | * message, and wait for another done message.*/ | 119 | * message, and wait for another done message. |
120 | */ | ||
119 | PHASE_FINISH_WAITING, | 121 | PHASE_FINISH_WAITING, |
120 | 122 | ||
121 | /** | 123 | /** |
122 | * In the ultimate phase, we wait until | 124 | * In the ultimate phase, we wait until |
123 | * our demands are satisfied and then | 125 | * our demands are satisfied and then |
124 | * quit (sending another DONE message). */ | 126 | * quit (sending another DONE message). |
125 | PHASE_DONE | 127 | */ |
128 | PHASE_DONE, | ||
129 | |||
130 | /** | ||
131 | * After sending the full set, wait for responses with the elements | ||
132 | * that the local peer is missing. | ||
133 | */ | ||
134 | PHASE_FULL_SENDING, | ||
126 | }; | 135 | }; |
127 | 136 | ||
128 | 137 | ||
@@ -148,7 +157,7 @@ struct OperationState | |||
148 | struct InvertibleBloomFilter *local_ibf; | 157 | struct InvertibleBloomFilter *local_ibf; |
149 | 158 | ||
150 | /** | 159 | /** |
151 | * Maps IBF-Keys (specific to the current salt) to elements. | 160 | * Maps unsalted IBF-Keys to elements. |
152 | * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. | 161 | * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. |
153 | * Colliding IBF-Keys are linked. | 162 | * Colliding IBF-Keys are linked. |
154 | */ | 163 | */ |
@@ -183,6 +192,23 @@ struct OperationState | |||
183 | * Salt for the IBF we've received and that we're currently decoding. | 192 | * Salt for the IBF we've received and that we're currently decoding. |
184 | */ | 193 | */ |
185 | uint32_t salt_receive; | 194 | uint32_t salt_receive; |
195 | |||
196 | /** | ||
197 | * Number of elements we received from the other peer | ||
198 | * that were not in the local set yet. | ||
199 | */ | ||
200 | uint32_t received_fresh; | ||
201 | |||
202 | /** | ||
203 | * Total number of elements received from the other peer. | ||
204 | */ | ||
205 | uint32_t received_total; | ||
206 | |||
207 | /** | ||
208 | * Initial size of our set, just before | ||
209 | * the operation started. | ||
210 | */ | ||
211 | uint64_t initial_size; | ||
186 | }; | 212 | }; |
187 | 213 | ||
188 | 214 | ||
@@ -203,6 +229,14 @@ struct KeyEntry | |||
203 | * is #GNUNET_YES. | 229 | * is #GNUNET_YES. |
204 | */ | 230 | */ |
205 | struct ElementEntry *element; | 231 | struct ElementEntry *element; |
232 | |||
233 | /** | ||
234 | * Did we receive this element? | ||
235 | * Even if element->is_foreign is false, we might | ||
236 | * have received the element, so this indicates that | ||
237 | * the other peer has it. | ||
238 | */ | ||
239 | int received; | ||
206 | }; | 240 | }; |
207 | 241 | ||
208 | 242 | ||
@@ -362,6 +396,16 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
362 | 396 | ||
363 | 397 | ||
364 | /** | 398 | /** |
399 | * Context for #op_get_element_iterator | ||
400 | */ | ||
401 | struct GetElementContext | ||
402 | { | ||
403 | struct GNUNET_HashCode hash; | ||
404 | struct KeyEntry *k; | ||
405 | }; | ||
406 | |||
407 | |||
408 | /** | ||
365 | * Iterator over the mapping from IBF keys to element entries. Checks if we | 409 | * Iterator over the mapping from IBF keys to element entries. Checks if we |
366 | * have an element with a given GNUNET_HashCode. | 410 | * have an element with a given GNUNET_HashCode. |
367 | * | 411 | * |
@@ -372,17 +416,20 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
372 | * #GNUNET_NO if we've found the element. | 416 | * #GNUNET_NO if we've found the element. |
373 | */ | 417 | */ |
374 | static int | 418 | static int |
375 | op_has_element_iterator (void *cls, | 419 | op_get_element_iterator (void *cls, |
376 | uint32_t key, | 420 | uint32_t key, |
377 | void *value) | 421 | void *value) |
378 | { | 422 | { |
379 | struct GNUNET_HashCode *element_hash = cls; | 423 | struct GetElementContext *ctx = cls; |
380 | struct KeyEntry *k = value; | 424 | struct KeyEntry *k = value; |
381 | 425 | ||
382 | GNUNET_assert (NULL != k); | 426 | GNUNET_assert (NULL != k); |
383 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, | 427 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, |
384 | element_hash)) | 428 | &ctx->hash)) |
429 | { | ||
430 | ctx->k = k; | ||
385 | return GNUNET_NO; | 431 | return GNUNET_NO; |
432 | } | ||
386 | return GNUNET_YES; | 433 | return GNUNET_YES; |
387 | } | 434 | } |
388 | 435 | ||
@@ -395,23 +442,29 @@ op_has_element_iterator (void *cls, | |||
395 | * @param element_hash hash of the element to look for | 442 | * @param element_hash hash of the element to look for |
396 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise | 443 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise |
397 | */ | 444 | */ |
398 | static int | 445 | static struct KeyEntry * |
399 | op_has_element (struct Operation *op, | 446 | op_get_element (struct Operation *op, |
400 | const struct GNUNET_HashCode *element_hash) | 447 | const struct GNUNET_HashCode *element_hash) |
401 | { | 448 | { |
402 | int ret; | 449 | int ret; |
403 | struct IBF_Key ibf_key; | 450 | struct IBF_Key ibf_key; |
451 | struct GetElementContext ctx = {{{ 0 }} , 0}; | ||
452 | |||
453 | ctx.hash = *element_hash; | ||
404 | 454 | ||
405 | ibf_key = get_ibf_key (element_hash); | 455 | ibf_key = get_ibf_key (element_hash); |
406 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | 456 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, |
407 | (uint32_t) ibf_key.key_val, | 457 | (uint32_t) ibf_key.key_val, |
408 | op_has_element_iterator, | 458 | op_get_element_iterator, |
409 | (void *) element_hash); | 459 | &ctx); |
410 | 460 | ||
411 | /* was the iteration aborted because we found the element? */ | 461 | /* was the iteration aborted because we found the element? */ |
412 | if (GNUNET_SYSERR == ret) | 462 | if (GNUNET_SYSERR == ret) |
413 | return GNUNET_YES; | 463 | { |
414 | return GNUNET_NO; | 464 | GNUNET_assert (NULL != ctx.k); |
465 | return ctx.k; | ||
466 | } | ||
467 | return NULL; | ||
415 | } | 468 | } |
416 | 469 | ||
417 | 470 | ||
@@ -427,10 +480,12 @@ op_has_element (struct Operation *op, | |||
427 | * | 480 | * |
428 | * @param op the union operation | 481 | * @param op the union operation |
429 | * @param ee the element entry | 482 | * @param ee the element entry |
483 | * @parem received was this element received from the remote peer? | ||
430 | */ | 484 | */ |
431 | static void | 485 | static void |
432 | op_register_element (struct Operation *op, | 486 | op_register_element (struct Operation *op, |
433 | struct ElementEntry *ee) | 487 | struct ElementEntry *ee, |
488 | int received) | ||
434 | { | 489 | { |
435 | struct IBF_Key ibf_key; | 490 | struct IBF_Key ibf_key; |
436 | struct KeyEntry *k; | 491 | struct KeyEntry *k; |
@@ -439,6 +494,7 @@ op_register_element (struct Operation *op, | |||
439 | k = GNUNET_new (struct KeyEntry); | 494 | k = GNUNET_new (struct KeyEntry); |
440 | k->element = ee; | 495 | k->element = ee; |
441 | k->ibf_key = ibf_key; | 496 | k->ibf_key = ibf_key; |
497 | k->received = received; | ||
442 | GNUNET_assert (GNUNET_OK == | 498 | GNUNET_assert (GNUNET_OK == |
443 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, | 499 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, |
444 | (uint32_t) ibf_key.key_val, | 500 | (uint32_t) ibf_key.key_val, |
@@ -524,12 +580,30 @@ init_key_to_element_iterator (void *cls, | |||
524 | 580 | ||
525 | GNUNET_assert (GNUNET_NO == ee->remote); | 581 | GNUNET_assert (GNUNET_NO == ee->remote); |
526 | 582 | ||
527 | op_register_element (op, ee); | 583 | op_register_element (op, ee, GNUNET_NO); |
528 | return GNUNET_YES; | 584 | return GNUNET_YES; |
529 | } | 585 | } |
530 | 586 | ||
531 | 587 | ||
532 | /** | 588 | /** |
589 | * Initialize the IBF key to element mapping local to this set | ||
590 | * operation. | ||
591 | * | ||
592 | * @param op the set union operation | ||
593 | */ | ||
594 | static void | ||
595 | initialize_key_to_element (struct Operation *op) | ||
596 | { | ||
597 | unsigned int len; | ||
598 | |||
599 | GNUNET_assert (NULL == op->state->key_to_element); | ||
600 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
601 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | ||
602 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); | ||
603 | } | ||
604 | |||
605 | |||
606 | /** | ||
533 | * Create an ibf with the operation's elements | 607 | * Create an ibf with the operation's elements |
534 | * of the specified size | 608 | * of the specified size |
535 | * | 609 | * |
@@ -541,15 +615,8 @@ static int | |||
541 | prepare_ibf (struct Operation *op, | 615 | prepare_ibf (struct Operation *op, |
542 | uint32_t size) | 616 | uint32_t size) |
543 | { | 617 | { |
544 | if (NULL == op->state->key_to_element) | 618 | GNUNET_assert (NULL != op->state->key_to_element); |
545 | { | ||
546 | unsigned int len; | ||
547 | 619 | ||
548 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
549 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | ||
550 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
551 | init_key_to_element_iterator, op); | ||
552 | } | ||
553 | if (NULL != op->state->local_ibf) | 620 | if (NULL != op->state->local_ibf) |
554 | ibf_destroy (op->state->local_ibf); | 621 | ibf_destroy (op->state->local_ibf); |
555 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 622 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
@@ -648,7 +715,7 @@ send_strata_estimator (struct Operation *op) | |||
648 | { | 715 | { |
649 | const struct StrataEstimator *se = op->state->se; | 716 | const struct StrataEstimator *se = op->state->se; |
650 | struct GNUNET_MQ_Envelope *ev; | 717 | struct GNUNET_MQ_Envelope *ev; |
651 | struct GNUNET_MessageHeader *strata_msg; | 718 | struct StrataEstimatorMessage *strata_msg; |
652 | char *buf; | 719 | char *buf; |
653 | size_t len; | 720 | size_t len; |
654 | uint16_t type; | 721 | uint16_t type; |
@@ -660,13 +727,14 @@ send_strata_estimator (struct Operation *op) | |||
660 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; | 727 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; |
661 | else | 728 | else |
662 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; | 729 | type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; |
663 | ev = GNUNET_MQ_msg_header_extra (strata_msg, | 730 | ev = GNUNET_MQ_msg_extra (strata_msg, |
664 | len, | 731 | len, |
665 | type); | 732 | type); |
666 | GNUNET_memcpy (&strata_msg[1], | 733 | GNUNET_memcpy (&strata_msg[1], |
667 | buf, | 734 | buf, |
668 | len); | 735 | len); |
669 | GNUNET_free (buf); | 736 | GNUNET_free (buf); |
737 | strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); | ||
670 | GNUNET_MQ_send (op->mq, | 738 | GNUNET_MQ_send (op->mq, |
671 | ev); | 739 | ev); |
672 | op->state->phase = PHASE_EXPECT_IBF; | 740 | op->state->phase = PHASE_EXPECT_IBF; |
@@ -693,7 +761,51 @@ get_order_from_difference (unsigned int diff) | |||
693 | ibf_order++; | 761 | ibf_order++; |
694 | if (ibf_order > MAX_IBF_ORDER) | 762 | if (ibf_order > MAX_IBF_ORDER) |
695 | ibf_order = MAX_IBF_ORDER; | 763 | ibf_order = MAX_IBF_ORDER; |
696 | return ibf_order; | 764 | // add one for correction |
765 | return ibf_order + 1; | ||
766 | } | ||
767 | |||
768 | |||
769 | /** | ||
770 | * Send a set element. | ||
771 | * | ||
772 | * @param cls the union operation `struct Operation *` | ||
773 | * @param key unused | ||
774 | * @param value the `struct ElementEntry *` to insert | ||
775 | * into the key-to-element mapping | ||
776 | * @return #GNUNET_YES (to continue iterating) | ||
777 | */ | ||
778 | static int | ||
779 | send_element_iterator (void *cls, | ||
780 | const struct GNUNET_HashCode *key, | ||
781 | void *value) | ||
782 | { | ||
783 | struct Operation *op = cls; | ||
784 | struct GNUNET_SET_ElementMessage *emsg; | ||
785 | struct ElementEntry *ee = value; | ||
786 | struct GNUNET_SET_Element *el = &ee->element; | ||
787 | struct GNUNET_MQ_Envelope *ev; | ||
788 | |||
789 | |||
790 | ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | ||
791 | emsg->element_type = htons (el->element_type); | ||
792 | GNUNET_memcpy (&emsg[1], el->data, el->size); | ||
793 | GNUNET_MQ_send (op->mq, ev); | ||
794 | return GNUNET_YES; | ||
795 | } | ||
796 | |||
797 | |||
798 | static void | ||
799 | send_full_set (struct Operation *op) | ||
800 | { | ||
801 | struct GNUNET_MQ_Envelope *ev; | ||
802 | |||
803 | op->state->phase = PHASE_FULL_SENDING; | ||
804 | |||
805 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
806 | &send_element_iterator, op); | ||
807 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | ||
808 | GNUNET_MQ_send (op->mq, ev); | ||
697 | } | 809 | } |
698 | 810 | ||
699 | 811 | ||
@@ -713,7 +825,9 @@ handle_p2p_strata_estimator (void *cls, | |||
713 | { | 825 | { |
714 | struct Operation *op = cls; | 826 | struct Operation *op = cls; |
715 | struct StrataEstimator *remote_se; | 827 | struct StrataEstimator *remote_se; |
716 | int diff; | 828 | struct StrataEstimatorMessage *msg = (void *) mh; |
829 | unsigned int diff; | ||
830 | uint64_t other_size; | ||
717 | size_t len; | 831 | size_t len; |
718 | 832 | ||
719 | GNUNET_STATISTICS_update (_GSS_statistics, | 833 | GNUNET_STATISTICS_update (_GSS_statistics, |
@@ -723,11 +837,11 @@ handle_p2p_strata_estimator (void *cls, | |||
723 | 837 | ||
724 | if (op->state->phase != PHASE_EXPECT_SE) | 838 | if (op->state->phase != PHASE_EXPECT_SE) |
725 | { | 839 | { |
726 | fail_union_operation (op); | ||
727 | GNUNET_break (0); | 840 | GNUNET_break (0); |
841 | fail_union_operation (op); | ||
728 | return GNUNET_SYSERR; | 842 | return GNUNET_SYSERR; |
729 | } | 843 | } |
730 | len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); | 844 | len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); |
731 | if ( (GNUNET_NO == is_compressed) && | 845 | if ( (GNUNET_NO == is_compressed) && |
732 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) | 846 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) |
733 | { | 847 | { |
@@ -735,6 +849,7 @@ handle_p2p_strata_estimator (void *cls, | |||
735 | GNUNET_break (0); | 849 | GNUNET_break (0); |
736 | return GNUNET_SYSERR; | 850 | return GNUNET_SYSERR; |
737 | } | 851 | } |
852 | other_size = GNUNET_ntohll (msg->set_size); | ||
738 | remote_se = strata_estimator_create (SE_STRATA_COUNT, | 853 | remote_se = strata_estimator_create (SE_STRATA_COUNT, |
739 | SE_IBF_SIZE, | 854 | SE_IBF_SIZE, |
740 | SE_IBF_HASH_NUM); | 855 | SE_IBF_HASH_NUM); |
@@ -745,7 +860,7 @@ handle_p2p_strata_estimator (void *cls, | |||
745 | return GNUNET_SYSERR; | 860 | return GNUNET_SYSERR; |
746 | } | 861 | } |
747 | if (GNUNET_OK != | 862 | if (GNUNET_OK != |
748 | strata_estimator_read (&mh[1], | 863 | strata_estimator_read (&msg[1], |
749 | len, | 864 | len, |
750 | is_compressed, | 865 | is_compressed, |
751 | remote_se)) | 866 | remote_se)) |
@@ -758,6 +873,10 @@ handle_p2p_strata_estimator (void *cls, | |||
758 | GNUNET_assert (NULL != op->state->se); | 873 | GNUNET_assert (NULL != op->state->se); |
759 | diff = strata_estimator_difference (remote_se, | 874 | diff = strata_estimator_difference (remote_se, |
760 | op->state->se); | 875 | op->state->se); |
876 | |||
877 | if (diff > 200) | ||
878 | diff = diff * 3 / 2; | ||
879 | |||
761 | strata_estimator_destroy (remote_se); | 880 | strata_estimator_destroy (remote_se); |
762 | strata_estimator_destroy (op->state->se); | 881 | strata_estimator_destroy (op->state->se); |
763 | op->state->se = NULL; | 882 | op->state->se = NULL; |
@@ -765,16 +884,55 @@ handle_p2p_strata_estimator (void *cls, | |||
765 | "got se diff=%d, using ibf size %d\n", | 884 | "got se diff=%d, using ibf size %d\n", |
766 | diff, | 885 | diff, |
767 | 1<<get_order_from_difference (diff)); | 886 | 1<<get_order_from_difference (diff)); |
768 | if (GNUNET_OK != | 887 | |
769 | send_ibf (op, | 888 | if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) |
770 | get_order_from_difference (diff))) | ||
771 | { | 889 | { |
772 | /* Internal error, best we can do is shut the connection */ | 890 | GNUNET_break (0); |
773 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
774 | "Failed to send IBF, closing connection\n"); | ||
775 | fail_union_operation (op); | 891 | fail_union_operation (op); |
776 | return GNUNET_SYSERR; | 892 | return GNUNET_SYSERR; |
777 | } | 893 | } |
894 | |||
895 | |||
896 | if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) | ||
897 | { | ||
898 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
899 | "Sending full set (diff=%d, own set=%u)\n", | ||
900 | diff, | ||
901 | op->state->initial_size); | ||
902 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
903 | "# of full sends", | ||
904 | 1, | ||
905 | GNUNET_NO); | ||
906 | if (op->state->initial_size <= other_size) | ||
907 | { | ||
908 | send_full_set (op); | ||
909 | } | ||
910 | else | ||
911 | { | ||
912 | struct GNUNET_MQ_Envelope *ev; | ||
913 | op->state->phase = PHASE_EXPECT_IBF; | ||
914 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); | ||
915 | GNUNET_MQ_send (op->mq, ev); | ||
916 | } | ||
917 | } | ||
918 | else | ||
919 | { | ||
920 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
921 | "# of ibf sends", | ||
922 | 1, | ||
923 | GNUNET_NO); | ||
924 | if (GNUNET_OK != | ||
925 | send_ibf (op, | ||
926 | get_order_from_difference (diff))) | ||
927 | { | ||
928 | /* Internal error, best we can do is shut the connection */ | ||
929 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
930 | "Failed to send IBF, closing connection\n"); | ||
931 | fail_union_operation (op); | ||
932 | return GNUNET_SYSERR; | ||
933 | } | ||
934 | } | ||
935 | |||
778 | return GNUNET_OK; | 936 | return GNUNET_OK; |
779 | } | 937 | } |
780 | 938 | ||
@@ -1141,7 +1299,8 @@ send_client_element (struct Operation *op, | |||
1141 | } | 1299 | } |
1142 | rm->result_status = htons (status); | 1300 | rm->result_status = htons (status); |
1143 | rm->request_id = htonl (op->spec->client_request_id); | 1301 | rm->request_id = htonl (op->spec->client_request_id); |
1144 | rm->element_type = element->element_type; | 1302 | rm->element_type = htons (element->element_type); |
1303 | rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); | ||
1145 | GNUNET_memcpy (&rm[1], element->data, element->size); | 1304 | GNUNET_memcpy (&rm[1], element->data, element->size); |
1146 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | 1305 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
1147 | } | 1306 | } |
@@ -1164,6 +1323,7 @@ send_done_and_destroy (void *cls) | |||
1164 | rm->request_id = htonl (op->spec->client_request_id); | 1323 | rm->request_id = htonl (op->spec->client_request_id); |
1165 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 1324 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1166 | rm->element_type = htons (0); | 1325 | rm->element_type = htons (0); |
1326 | rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); | ||
1167 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | 1327 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
1168 | /* Will also call the union-specific cancel function. */ | 1328 | /* Will also call the union-specific cancel function. */ |
1169 | _GSS_operation_destroy (op, GNUNET_YES); | 1329 | _GSS_operation_destroy (op, GNUNET_YES); |
@@ -1210,6 +1370,8 @@ maybe_finish (struct Operation *op) | |||
1210 | 1370 | ||
1211 | /** | 1371 | /** |
1212 | * Handle an element message from a remote peer. | 1372 | * Handle an element message from a remote peer. |
1373 | * Sent by the other peer either because we decoded an IBF and placed a demand, | ||
1374 | * or because the other peer switched to full set transmission. | ||
1213 | * | 1375 | * |
1214 | * @param cls the union operation | 1376 | * @param cls the union operation |
1215 | * @param mh the message | 1377 | * @param mh the message |
@@ -1273,7 +1435,11 @@ handle_p2p_elements (void *cls, | |||
1273 | 1, | 1435 | 1, |
1274 | GNUNET_NO); | 1436 | GNUNET_NO); |
1275 | 1437 | ||
1276 | if (GNUNET_YES == op_has_element (op, &ee->element_hash)) | 1438 | op->state->received_total += 1; |
1439 | |||
1440 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); | ||
1441 | |||
1442 | if (NULL != ke) | ||
1277 | { | 1443 | { |
1278 | /* Got repeated element. Should not happen since | 1444 | /* Got repeated element. Should not happen since |
1279 | * we track demands. */ | 1445 | * we track demands. */ |
@@ -1281,13 +1447,15 @@ handle_p2p_elements (void *cls, | |||
1281 | "# repeated elements", | 1447 | "# repeated elements", |
1282 | 1, | 1448 | 1, |
1283 | GNUNET_NO); | 1449 | GNUNET_NO); |
1450 | ke->received = GNUNET_YES; | ||
1284 | GNUNET_free (ee); | 1451 | GNUNET_free (ee); |
1285 | } | 1452 | } |
1286 | else | 1453 | else |
1287 | { | 1454 | { |
1288 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1455 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1289 | "Registering new element from remote peer\n"); | 1456 | "Registering new element from remote peer\n"); |
1290 | op_register_element (op, ee); | 1457 | op->state->received_fresh += 1; |
1458 | op_register_element (op, ee, GNUNET_YES); | ||
1291 | /* only send results immediately if the client wants it */ | 1459 | /* only send results immediately if the client wants it */ |
1292 | switch (op->spec->result_mode) | 1460 | switch (op->spec->result_mode) |
1293 | { | 1461 | { |
@@ -1304,11 +1472,118 @@ handle_p2p_elements (void *cls, | |||
1304 | } | 1472 | } |
1305 | } | 1473 | } |
1306 | 1474 | ||
1475 | if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) | ||
1476 | { | ||
1477 | /* The other peer gave us lots of old elements, there's something wrong. */ | ||
1478 | GNUNET_break_op (0); | ||
1479 | fail_union_operation (op); | ||
1480 | return; | ||
1481 | } | ||
1482 | |||
1307 | maybe_finish (op); | 1483 | maybe_finish (op); |
1308 | } | 1484 | } |
1309 | 1485 | ||
1310 | 1486 | ||
1311 | /** | 1487 | /** |
1488 | * Handle an element message from a remote peer. | ||
1489 | * | ||
1490 | * @param cls the union operation | ||
1491 | * @param mh the message | ||
1492 | */ | ||
1493 | static void | ||
1494 | handle_p2p_full_element (void *cls, | ||
1495 | const struct GNUNET_MessageHeader *mh) | ||
1496 | { | ||
1497 | struct Operation *op = cls; | ||
1498 | struct ElementEntry *ee; | ||
1499 | const struct GNUNET_SET_ElementMessage *emsg; | ||
1500 | uint16_t element_size; | ||
1501 | |||
1502 | if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) | ||
1503 | { | ||
1504 | GNUNET_break_op (0); | ||
1505 | fail_union_operation (op); | ||
1506 | return; | ||
1507 | } | ||
1508 | |||
1509 | emsg = (const struct GNUNET_SET_ElementMessage *) mh; | ||
1510 | |||
1511 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); | ||
1512 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | ||
1513 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | ||
1514 | ee->element.size = element_size; | ||
1515 | ee->element.data = &ee[1]; | ||
1516 | ee->element.element_type = ntohs (emsg->element_type); | ||
1517 | ee->remote = GNUNET_YES; | ||
1518 | GNUNET_SET_element_hash (&ee->element, &ee->element_hash); | ||
1519 | |||
1520 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1521 | "Got element (full diff, size %u, hash %s) from peer\n", | ||
1522 | (unsigned int) element_size, | ||
1523 | GNUNET_h2s (&ee->element_hash)); | ||
1524 | |||
1525 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1526 | "# received elements", | ||
1527 | 1, | ||
1528 | GNUNET_NO); | ||
1529 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1530 | "# exchanged elements", | ||
1531 | 1, | ||
1532 | GNUNET_NO); | ||
1533 | |||
1534 | op->state->received_total += 1; | ||
1535 | |||
1536 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); | ||
1537 | |||
1538 | if (NULL != ke) | ||
1539 | { | ||
1540 | /* Got repeated element. Should not happen since | ||
1541 | * we track demands. */ | ||
1542 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1543 | "# repeated elements", | ||
1544 | 1, | ||
1545 | GNUNET_NO); | ||
1546 | ke->received = GNUNET_YES; | ||
1547 | GNUNET_free (ee); | ||
1548 | } | ||
1549 | else | ||
1550 | { | ||
1551 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1552 | "Registering new element from remote peer\n"); | ||
1553 | op->state->received_fresh += 1; | ||
1554 | op_register_element (op, ee, GNUNET_YES); | ||
1555 | /* only send results immediately if the client wants it */ | ||
1556 | switch (op->spec->result_mode) | ||
1557 | { | ||
1558 | case GNUNET_SET_RESULT_ADDED: | ||
1559 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); | ||
1560 | break; | ||
1561 | case GNUNET_SET_RESULT_SYMMETRIC: | ||
1562 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | ||
1563 | break; | ||
1564 | default: | ||
1565 | /* Result mode not supported, should have been caught earlier. */ | ||
1566 | GNUNET_break (0); | ||
1567 | break; | ||
1568 | } | ||
1569 | } | ||
1570 | |||
1571 | if ( (GNUNET_YES == op->spec->byzantine) && | ||
1572 | (op->state->received_total > 384 + op->state->received_fresh * 4) && | ||
1573 | (op->state->received_fresh < op->state->received_total / 6) ) | ||
1574 | { | ||
1575 | /* The other peer gave us lots of old elements, there's something wrong. */ | ||
1576 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1577 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", | ||
1578 | (unsigned long long) op->state->received_fresh, | ||
1579 | (unsigned long long) op->state->received_total); | ||
1580 | GNUNET_break_op (0); | ||
1581 | fail_union_operation (op); | ||
1582 | return; | ||
1583 | } | ||
1584 | } | ||
1585 | |||
1586 | /** | ||
1312 | * Send offers (for GNUNET_Hash-es) in response | 1587 | * Send offers (for GNUNET_Hash-es) in response |
1313 | * to inquiries (for IBF_Key-s). | 1588 | * to inquiries (for IBF_Key-s). |
1314 | * | 1589 | * |
@@ -1355,7 +1630,116 @@ handle_p2p_inquiry (void *cls, | |||
1355 | 1630 | ||
1356 | 1631 | ||
1357 | /** | 1632 | /** |
1358 | * FIXME | 1633 | * Iterator over hash map entries, called to |
1634 | * destroy the linked list of colliding ibf key entries. | ||
1635 | * | ||
1636 | * @param cls closure | ||
1637 | * @param key current key code | ||
1638 | * @param value value in the hash map | ||
1639 | * @return #GNUNET_YES if we should continue to iterate, | ||
1640 | * #GNUNET_NO if not. | ||
1641 | */ | ||
1642 | static int | ||
1643 | send_missing_elements_iter (void *cls, | ||
1644 | uint32_t key, | ||
1645 | void *value) | ||
1646 | { | ||
1647 | struct Operation *op = cls; | ||
1648 | struct KeyEntry *ke = value; | ||
1649 | struct GNUNET_MQ_Envelope *ev; | ||
1650 | struct GNUNET_SET_ElementMessage *emsg; | ||
1651 | struct ElementEntry *ee = ke->element; | ||
1652 | |||
1653 | if (GNUNET_YES == ke->received) | ||
1654 | return GNUNET_YES; | ||
1655 | |||
1656 | ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | ||
1657 | GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); | ||
1658 | emsg->reserved = htons (0); | ||
1659 | emsg->element_type = htons (ee->element.element_type); | ||
1660 | GNUNET_MQ_send (op->mq, ev); | ||
1661 | |||
1662 | return GNUNET_YES; | ||
1663 | } | ||
1664 | |||
1665 | |||
1666 | /** | ||
1667 | * Handle a | ||
1668 | * | ||
1669 | * @parem cls closure, a set union operation | ||
1670 | * @param mh the demand message | ||
1671 | */ | ||
1672 | static void | ||
1673 | handle_p2p_request_full (void *cls, | ||
1674 | const struct GNUNET_MessageHeader *mh) | ||
1675 | { | ||
1676 | struct Operation *op = cls; | ||
1677 | |||
1678 | if (PHASE_EXPECT_IBF != op->state->phase) | ||
1679 | { | ||
1680 | fail_union_operation (op); | ||
1681 | GNUNET_break_op (0); | ||
1682 | return; | ||
1683 | } | ||
1684 | |||
1685 | // FIXME: we need to check that our set is larger than the | ||
1686 | // byzantine_lower_bound by some threshold | ||
1687 | send_full_set (op); | ||
1688 | } | ||
1689 | |||
1690 | |||
1691 | /** | ||
1692 | * Handle a "full done" message. | ||
1693 | * | ||
1694 | * @parem cls closure, a set union operation | ||
1695 | * @param mh the demand message | ||
1696 | */ | ||
1697 | static void | ||
1698 | handle_p2p_full_done (void *cls, | ||
1699 | const struct GNUNET_MessageHeader *mh) | ||
1700 | { | ||
1701 | struct Operation *op = cls; | ||
1702 | |||
1703 | if (PHASE_EXPECT_IBF == op->state->phase) | ||
1704 | { | ||
1705 | struct GNUNET_MQ_Envelope *ev; | ||
1706 | |||
1707 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); | ||
1708 | |||
1709 | /* send all the elements that did not come from the remote peer */ | ||
1710 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | ||
1711 | &send_missing_elements_iter, | ||
1712 | op); | ||
1713 | |||
1714 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | ||
1715 | GNUNET_MQ_send (op->mq, ev); | ||
1716 | op->state->phase = PHASE_DONE; | ||
1717 | |||
1718 | /* we now wait until the other peer shuts the tunnel down*/ | ||
1719 | } | ||
1720 | else if (PHASE_FULL_SENDING == op->state->phase) | ||
1721 | { | ||
1722 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); | ||
1723 | /* We sent the full set, and got the response for that. We're done. */ | ||
1724 | op->state->phase = PHASE_DONE; | ||
1725 | send_done_and_destroy (op); | ||
1726 | } | ||
1727 | else | ||
1728 | { | ||
1729 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); | ||
1730 | GNUNET_break_op (0); | ||
1731 | fail_union_operation (op); | ||
1732 | return; | ||
1733 | } | ||
1734 | } | ||
1735 | |||
1736 | |||
1737 | /** | ||
1738 | * Handle a demand by the other peer for elements based on a list | ||
1739 | * of GNUNET_HashCode-s. | ||
1740 | * | ||
1741 | * @parem cls closure, a set union operation | ||
1742 | * @param mh the demand message | ||
1359 | */ | 1743 | */ |
1360 | static void | 1744 | static void |
1361 | handle_p2p_demand (void *cls, | 1745 | handle_p2p_demand (void *cls, |
@@ -1607,6 +1991,9 @@ union_evaluate (struct Operation *op, | |||
1607 | else | 1991 | else |
1608 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1992 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1609 | "sent op request without context message\n"); | 1993 | "sent op request without context message\n"); |
1994 | |||
1995 | initialize_key_to_element (op); | ||
1996 | op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); | ||
1610 | } | 1997 | } |
1611 | 1998 | ||
1612 | 1999 | ||
@@ -1636,6 +2023,8 @@ union_accept (struct Operation *op) | |||
1636 | op->state->se = strata_estimator_dup (op->spec->set->state->se); | 2023 | op->state->se = strata_estimator_dup (op->spec->set->state->se); |
1637 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); | 2024 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); |
1638 | op->state->salt_receive = op->state->salt_send = 42; | 2025 | op->state->salt_receive = op->state->salt_send = 42; |
2026 | initialize_key_to_element (op); | ||
2027 | op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); | ||
1639 | /* kick off the operation */ | 2028 | /* kick off the operation */ |
1640 | send_strata_estimator (op); | 2029 | send_strata_estimator (op); |
1641 | } | 2030 | } |
@@ -1743,6 +2132,9 @@ union_handle_p2p_message (struct Operation *op, | |||
1743 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | 2132 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: |
1744 | handle_p2p_elements (op, mh); | 2133 | handle_p2p_elements (op, mh); |
1745 | break; | 2134 | break; |
2135 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: | ||
2136 | handle_p2p_full_element (op, mh); | ||
2137 | break; | ||
1746 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: | 2138 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: |
1747 | handle_p2p_inquiry (op, mh); | 2139 | handle_p2p_inquiry (op, mh); |
1748 | break; | 2140 | break; |
@@ -1755,6 +2147,12 @@ union_handle_p2p_message (struct Operation *op, | |||
1755 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: | 2147 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: |
1756 | handle_p2p_demand (op, mh); | 2148 | handle_p2p_demand (op, mh); |
1757 | break; | 2149 | break; |
2150 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: | ||
2151 | handle_p2p_full_done (op, mh); | ||
2152 | break; | ||
2153 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL: | ||
2154 | handle_p2p_request_full (op, mh); | ||
2155 | break; | ||
1758 | default: | 2156 | default: |
1759 | /* Something wrong with cadet's message handlers? */ | 2157 | /* Something wrong with cadet's message handlers? */ |
1760 | GNUNET_assert (0); | 2158 | GNUNET_assert (0); |