diff options
Diffstat (limited to 'src/setu/gnunet-service-setu.c')
-rw-r--r-- | src/setu/gnunet-service-setu.c | 2085 |
1 files changed, 1821 insertions, 264 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c index bd1113f15..339d347f8 100644 --- a/src/setu/gnunet-service-setu.c +++ b/src/setu/gnunet-service-setu.c | |||
@@ -22,6 +22,7 @@ | |||
22 | * @brief set union operation | 22 | * @brief set union operation |
23 | * @author Florian Dold | 23 | * @author Florian Dold |
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | #include "platform.h" | 27 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
@@ -50,33 +51,61 @@ | |||
50 | */ | 51 | */ |
51 | #define SE_STRATA_COUNT 32 | 52 | #define SE_STRATA_COUNT 32 |
52 | 53 | ||
54 | |||
53 | /** | 55 | /** |
54 | * Size of the IBFs in the strata estimator. | 56 | * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 |
57 | * Based on the bsc thesis of Elias Summermatter (2021) | ||
55 | */ | 58 | */ |
56 | #define SE_IBF_SIZE 80 | 59 | #define SE_IBFS_TOTAL_SIZE 632 |
57 | 60 | ||
58 | /** | 61 | /** |
59 | * The hash num parameter for the difference digests and strata estimators. | 62 | * The hash num parameter for the difference digests and strata estimators. |
60 | */ | 63 | */ |
61 | #define SE_IBF_HASH_NUM 4 | 64 | #define SE_IBF_HASH_NUM 3 |
62 | 65 | ||
63 | /** | 66 | /** |
64 | * Number of buckets that can be transmitted in one message. | 67 | * Number of buckets that can be transmitted in one message. |
65 | */ | 68 | */ |
66 | #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) | 69 | #define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE) |
67 | 70 | ||
68 | /** | 71 | /** |
69 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | 72 | * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20. |
70 | * Choose this value so that computing the IBF is still cheaper | 73 | * Choose this value so that computing the IBF is still cheaper |
71 | * than transmitting all values. | 74 | * than transmitting all values. |
72 | */ | 75 | */ |
73 | #define MAX_IBF_ORDER (20) | 76 | #define MAX_IBF_SIZE 1048576 |
77 | |||
78 | |||
79 | /** | ||
80 | * Minimal size of an ibf | ||
81 | * Based on the bsc thesis of Elias Summermatter (2021) | ||
82 | */ | ||
83 | #define IBF_MIN_SIZE 37 | ||
84 | |||
85 | /** | ||
86 | * AVG RTT for differential sync when k=2 and Factor = 2 | ||
87 | * Based on the bsc thesis of Elias Summermatter (2021) | ||
88 | */ | ||
89 | #define DIFFERENTIAL_RTT_MEAN 3.65145 | ||
90 | |||
91 | /** | ||
92 | * Security level used for byzantine checks (2^80) | ||
93 | */ | ||
94 | |||
95 | #define SECURITY_LEVEL 80 | ||
96 | |||
97 | /** | ||
98 | * Is the estimated probability for a new round this values | ||
99 | * is based on the bsc thesis of Elias Summermatter (2021) | ||
100 | */ | ||
101 | |||
102 | #define PROBABILITY_FOR_NEW_ROUND 0.15 | ||
74 | 103 | ||
75 | /** | 104 | /** |
76 | * Number of buckets used in the ibf per estimated | 105 | * Measure the performance in a csv |
77 | * difference. | ||
78 | */ | 106 | */ |
79 | #define IBF_ALPHA 4 | 107 | |
108 | #define MEASURE_PERFORMANCE 0 | ||
80 | 109 | ||
81 | 110 | ||
82 | /** | 111 | /** |
@@ -146,6 +175,28 @@ enum UnionOperationPhase | |||
146 | PHASE_FULL_RECEIVING | 175 | PHASE_FULL_RECEIVING |
147 | }; | 176 | }; |
148 | 177 | ||
178 | /** | ||
179 | * Different modes of operations | ||
180 | */ | ||
181 | |||
182 | enum MODE_OF_OPERATION | ||
183 | { | ||
184 | /** | ||
185 | * Mode just synchronizes the difference between sets | ||
186 | */ | ||
187 | DIFFERENTIAL_SYNC, | ||
188 | |||
189 | /** | ||
190 | * Mode send full set sending local set first | ||
191 | */ | ||
192 | FULL_SYNC_LOCAL_SENDING_FIRST, | ||
193 | |||
194 | /** | ||
195 | * Mode request full set from remote peer | ||
196 | */ | ||
197 | FULL_SYNC_REMOTE_SENDING_FIRST | ||
198 | }; | ||
199 | |||
149 | 200 | ||
150 | /** | 201 | /** |
151 | * Information about an element element in the set. All elements are | 202 | * Information about an element element in the set. All elements are |
@@ -277,7 +328,7 @@ struct Operation | |||
277 | * Copy of the set's strata estimator at the time of | 328 | * Copy of the set's strata estimator at the time of |
278 | * creation of this operation. | 329 | * creation of this operation. |
279 | */ | 330 | */ |
280 | struct StrataEstimator *se; | 331 | struct MultiStrataEstimator *se; |
281 | 332 | ||
282 | /** | 333 | /** |
283 | * The IBF we currently receive. | 334 | * The IBF we currently receive. |
@@ -320,7 +371,7 @@ struct Operation | |||
320 | /** | 371 | /** |
321 | * Number of ibf buckets already received into the @a remote_ibf. | 372 | * Number of ibf buckets already received into the @a remote_ibf. |
322 | */ | 373 | */ |
323 | unsigned int ibf_buckets_received; | 374 | uint64_t ibf_buckets_received; |
324 | 375 | ||
325 | /** | 376 | /** |
326 | * Salt that we're using for sending IBFs | 377 | * Salt that we're using for sending IBFs |
@@ -386,7 +437,7 @@ struct Operation | |||
386 | * Lower bound for the set size, used only when | 437 | * Lower bound for the set size, used only when |
387 | * byzantine mode is enabled. | 438 | * byzantine mode is enabled. |
388 | */ | 439 | */ |
389 | int byzantine_lower_bound; | 440 | uint64_t byzantine_lower_bound; |
390 | 441 | ||
391 | /** | 442 | /** |
392 | * Unique request id for the request from a remote peer, sent to the | 443 | * Unique request id for the request from a remote peer, sent to the |
@@ -401,21 +452,83 @@ struct Operation | |||
401 | */ | 452 | */ |
402 | unsigned int generation_created; | 453 | unsigned int generation_created; |
403 | 454 | ||
455 | |||
456 | /** | ||
457 | * User defined Bandwidth Round Trips Tradeoff | ||
458 | */ | ||
459 | uint64_t rtt_bandwidth_tradeoff; | ||
460 | |||
461 | |||
462 | /** | ||
463 | * Number of Element per bucket in IBF | ||
464 | */ | ||
465 | uint8_t ibf_number_buckets_per_element; | ||
466 | |||
467 | |||
468 | /** | ||
469 | * Set difference is multiplied with this factor | ||
470 | * to gennerate large enough IBF | ||
471 | */ | ||
472 | uint8_t ibf_bucket_number_factor; | ||
473 | |||
474 | /** | ||
475 | * Defines which site a client is | ||
476 | * 0 = Initiating peer | ||
477 | * 1 = Receiving peer | ||
478 | */ | ||
479 | uint8_t peer_site; | ||
480 | |||
481 | |||
404 | /** | 482 | /** |
405 | * User defined Bandwidth Round Trips Tradeoff | 483 | * Local peer element count |
406 | */ | 484 | */ |
407 | double rtt_bandwidth_tradeoff; | 485 | uint64_t local_element_count; |
408 | 486 | ||
409 | /** | 487 | /** |
410 | * Number of Element per bucket in IBF | 488 | * Mode of operation that was chosen by the algorithm |
411 | */ | 489 | */ |
412 | unsigned int ibf_number_buckets_per_element; | 490 | uint8_t mode_of_operation; |
413 | 491 | ||
414 | /** | 492 | /** |
415 | * Number of buckets in IBF | 493 | * Hashmap to keep track of the send/received messages |
416 | */ | 494 | */ |
417 | unsigned ibf_bucket_number; | 495 | struct GNUNET_CONTAINER_MultiHashMap *message_control_flow; |
418 | 496 | ||
497 | /** | ||
498 | * Hashmap to keep track of the send/received inquiries (ibf keys) | ||
499 | */ | ||
500 | struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent; | ||
501 | |||
502 | |||
503 | /** | ||
504 | * Total size of local set | ||
505 | */ | ||
506 | uint64_t total_elements_size_local; | ||
507 | |||
508 | /** | ||
509 | * Limit of number of elements in set | ||
510 | */ | ||
511 | uint64_t byzantine_upper_bound; | ||
512 | |||
513 | /** | ||
514 | * is the count of already passed differential sync iterations | ||
515 | */ | ||
516 | uint8_t differential_sync_iterations; | ||
517 | |||
518 | /** | ||
519 | * Estimated or committed set difference at the start | ||
520 | */ | ||
521 | uint64_t remote_set_diff; | ||
522 | |||
523 | /** | ||
524 | * Estimated or committed set difference at the start | ||
525 | */ | ||
526 | uint64_t local_set_diff; | ||
527 | |||
528 | /** | ||
529 | * Boolean to enforce an active passive switch | ||
530 | */ | ||
531 | bool active_passive_switch_required; | ||
419 | }; | 532 | }; |
420 | 533 | ||
421 | 534 | ||
@@ -431,6 +544,16 @@ struct SetContent | |||
431 | struct GNUNET_CONTAINER_MultiHashMap *elements; | 544 | struct GNUNET_CONTAINER_MultiHashMap *elements; |
432 | 545 | ||
433 | /** | 546 | /** |
547 | * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized. | ||
548 | */ | ||
549 | struct GNUNET_CONTAINER_MultiHashMap *elements_randomized; | ||
550 | |||
551 | /** | ||
552 | * Salt to construct the randomized element map | ||
553 | */ | ||
554 | uint64_t elements_randomized_salt; | ||
555 | |||
556 | /** | ||
434 | * Number of references to the content. | 557 | * Number of references to the content. |
435 | */ | 558 | */ |
436 | unsigned int refcount; | 559 | unsigned int refcount; |
@@ -478,7 +601,7 @@ struct Set | |||
478 | * The strata estimator is only generated once for each set. The IBF keys | 601 | * The strata estimator is only generated once for each set. The IBF keys |
479 | * are derived from the element hashes with salt=0. | 602 | * are derived from the element hashes with salt=0. |
480 | */ | 603 | */ |
481 | struct StrataEstimator *se; | 604 | struct MultiStrataEstimator *se; |
482 | 605 | ||
483 | /** | 606 | /** |
484 | * Evaluate operations are held in a linked list. | 607 | * Evaluate operations are held in a linked list. |
@@ -635,96 +758,687 @@ static int in_shutdown; | |||
635 | */ | 758 | */ |
636 | static uint32_t suggest_id; | 759 | static uint32_t suggest_id; |
637 | 760 | ||
761 | #if MEASURE_PERFORMANCE | ||
762 | /** | ||
763 | * Handles configuration file for setu performance test | ||
764 | * | ||
765 | */ | ||
766 | static const struct GNUNET_CONFIGURATION_Handle *setu_cfg; | ||
767 | |||
768 | |||
769 | /** | ||
770 | * Stores the performance data for induvidual message | ||
771 | */ | ||
772 | |||
773 | |||
774 | struct perf_num_send_received_msg | ||
775 | { | ||
776 | uint64_t sent; | ||
777 | uint64_t sent_var_bytes; | ||
778 | uint64_t received; | ||
779 | uint64_t received_var_bytes; | ||
780 | }; | ||
781 | |||
782 | /** | ||
783 | * Main struct to measure performance (data/rtts) | ||
784 | */ | ||
785 | struct per_store_struct | ||
786 | { | ||
787 | struct perf_num_send_received_msg operation_request; | ||
788 | struct perf_num_send_received_msg se; | ||
789 | struct perf_num_send_received_msg request_full; | ||
790 | struct perf_num_send_received_msg element_full; | ||
791 | struct perf_num_send_received_msg full_done; | ||
792 | struct perf_num_send_received_msg ibf; | ||
793 | struct perf_num_send_received_msg inquery; | ||
794 | struct perf_num_send_received_msg element; | ||
795 | struct perf_num_send_received_msg demand; | ||
796 | struct perf_num_send_received_msg offer; | ||
797 | struct perf_num_send_received_msg done; | ||
798 | struct perf_num_send_received_msg over; | ||
799 | uint64_t se_diff; | ||
800 | uint64_t se_diff_remote; | ||
801 | uint64_t se_diff_local; | ||
802 | uint64_t active_passive_switches; | ||
803 | uint8_t mode_of_operation; | ||
804 | }; | ||
805 | |||
806 | struct per_store_struct perf_store; | ||
807 | #endif | ||
808 | |||
809 | /** | ||
810 | * Different states to control the messages flow in differential mode | ||
811 | */ | ||
812 | |||
813 | enum MESSAGE_CONTROL_FLOW_STATE | ||
814 | { | ||
815 | /** | ||
816 | * Initial message state | ||
817 | */ | ||
818 | MSG_CFS_UNINITIALIZED, | ||
819 | |||
820 | /** | ||
821 | * Track that a message has been sent | ||
822 | */ | ||
823 | MSG_CFS_SENT, | ||
824 | |||
825 | /** | ||
826 | * Track that receiving this message is expected | ||
827 | */ | ||
828 | MSG_CFS_EXPECTED, | ||
829 | |||
830 | /** | ||
831 | * Track that message has been received | ||
832 | */ | ||
833 | MSG_CFS_RECEIVED, | ||
834 | }; | ||
638 | 835 | ||
639 | /** | 836 | /** |
640 | * Added Roundtripscounter | 837 | * Message types to track in message control flow |
641 | */ | 838 | */ |
642 | 839 | ||
840 | enum MESSAGE_TYPE | ||
841 | { | ||
842 | /** | ||
843 | * Offer message type | ||
844 | */ | ||
845 | OFFER_MESSAGE, | ||
643 | 846 | ||
644 | struct perf_num_send_resived_msg { | 847 | /** |
645 | int sent; | 848 | * Demand message type |
646 | int sent_var_bytes; | 849 | */ |
647 | int received; | 850 | DEMAND_MESSAGE, |
648 | int received_var_bytes; | 851 | |
852 | /** | ||
853 | * Element message type | ||
854 | */ | ||
855 | ELEMENT_MESSAGE, | ||
649 | }; | 856 | }; |
650 | 857 | ||
651 | 858 | ||
652 | struct perf_rtt_struct | 859 | /** |
653 | { | 860 | * Struct to tracked messages in message control flow |
654 | struct perf_num_send_resived_msg operation_request; | 861 | */ |
655 | struct perf_num_send_resived_msg se; | 862 | struct messageControlFlowElement |
656 | struct perf_num_send_resived_msg request_full; | 863 | { |
657 | struct perf_num_send_resived_msg element_full; | 864 | /** |
658 | struct perf_num_send_resived_msg full_done; | 865 | * Track the message control state of the offer message |
659 | struct perf_num_send_resived_msg ibf; | 866 | */ |
660 | struct perf_num_send_resived_msg inquery; | 867 | enum MESSAGE_CONTROL_FLOW_STATE offer; |
661 | struct perf_num_send_resived_msg element; | 868 | /** |
662 | struct perf_num_send_resived_msg demand; | 869 | * Track the message control state of the demand message |
663 | struct perf_num_send_resived_msg offer; | 870 | */ |
664 | struct perf_num_send_resived_msg done; | 871 | enum MESSAGE_CONTROL_FLOW_STATE demand; |
665 | struct perf_num_send_resived_msg over; | 872 | /** |
873 | * Track the message control state of the element message | ||
874 | */ | ||
875 | enum MESSAGE_CONTROL_FLOW_STATE element; | ||
666 | }; | 876 | }; |
667 | 877 | ||
668 | struct perf_rtt_struct perf_rtt; | ||
669 | 878 | ||
879 | #if MEASURE_PERFORMANCE | ||
670 | 880 | ||
881 | /** | ||
882 | * Loads different configuration to perform performance tests | ||
883 | * | ||
884 | * @param op operation handle | ||
885 | */ | ||
886 | static void | ||
887 | load_config (struct Operation *op) | ||
888 | { | ||
889 | long long number; | ||
890 | float fl; | ||
891 | |||
892 | setu_cfg = GNUNET_CONFIGURATION_create (); | ||
893 | GNUNET_CONFIGURATION_load (setu_cfg, | ||
894 | "perf_setu.conf"); | ||
895 | GNUNET_CONFIGURATION_get_value_float (setu_cfg, | ||
896 | "IBF", | ||
897 | "BUCKET_NUMBER_FACTOR", | ||
898 | &fl); | ||
899 | op->ibf_bucket_number_factor = fl; | ||
900 | GNUNET_CONFIGURATION_get_value_number (setu_cfg, | ||
901 | "IBF", | ||
902 | "NUMBER_PER_BUCKET", | ||
903 | &number); | ||
904 | op->ibf_number_buckets_per_element = number; | ||
905 | GNUNET_CONFIGURATION_get_value_number (setu_cfg, | ||
906 | "PERFORMANCE", | ||
907 | "TRADEOFF", | ||
908 | &number); | ||
909 | op->rtt_bandwidth_tradeoff = number; | ||
910 | GNUNET_CONFIGURATION_get_value_number (setu_cfg, | ||
911 | "BOUNDARIES", | ||
912 | "UPPER_ELEMENT", | ||
913 | &number); | ||
914 | op->byzantine_upper_bound = number; | ||
915 | op->peer_site = 0; | ||
916 | } | ||
917 | |||
918 | |||
919 | /** | ||
920 | * Function to calculate total bytes used for performance measurement | ||
921 | * @param size | ||
922 | * @param perf_num_send_received_msg | ||
923 | * @return bytes used | ||
924 | */ | ||
671 | static int | 925 | static int |
672 | sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) { | 926 | sum_sent_received_bytes (uint64_t size, |
673 | return (size * perf_rtt_struct.sent) + | 927 | struct perf_num_send_received_msg |
674 | (size * perf_rtt_struct.received) + | 928 | perf_num_send_received_msg) |
675 | perf_rtt_struct.sent_var_bytes + | 929 | { |
676 | perf_rtt_struct.received_var_bytes; | 930 | return (size * perf_num_send_received_msg.sent) |
931 | + (size * perf_num_send_received_msg.received) | ||
932 | + perf_num_send_received_msg.sent_var_bytes | ||
933 | + perf_num_send_received_msg.received_var_bytes; | ||
677 | } | 934 | } |
678 | 935 | ||
679 | static float | ||
680 | calculate_perf_rtt() { | ||
681 | /** | ||
682 | * Calculate RTT of init phase normally always 1 | ||
683 | */ | ||
684 | float rtt = 1; | ||
685 | int bytes_transmitted = 0; | ||
686 | 936 | ||
687 | /** | 937 | /** |
688 | * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending | 938 | * Function that calculates the perfmance values and writes them down |
689 | */ | 939 | */ |
690 | if (( perf_rtt.element_full.received != 0 ) || | 940 | static void |
691 | ( perf_rtt.element_full.sent != 0) | 941 | calculate_perf_store () |
692 | ) rtt += 1; | 942 | { |
693 | 943 | ||
694 | if (( perf_rtt.request_full.received != 0 ) || | 944 | /** |
695 | ( perf_rtt.request_full.sent != 0) | 945 | * Calculate RTT of init phase normally always 1 |
696 | ) rtt += 0.5; | 946 | */ |
947 | float rtt = 1; | ||
948 | int bytes_transmitted = 0; | ||
697 | 949 | ||
698 | /** | 950 | /** |
699 | * In case of a differential sync 3 rtt's are needed. | 951 | * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending |
700 | * for every active/passive switch additional 3.5 rtt's are used | 952 | */ |
701 | */ | 953 | if ((perf_store.element_full.received != 0) || |
954 | (perf_store.element_full.sent != 0) | ||
955 | ) | ||
956 | rtt += 1; | ||
957 | |||
958 | if ((perf_store.request_full.received != 0) || | ||
959 | (perf_store.request_full.sent != 0) | ||
960 | ) | ||
961 | rtt += 0.5; | ||
702 | 962 | ||
703 | int iterations = perf_rtt.ibf.received; | 963 | /** |
704 | if(iterations > 1) | 964 | * In case of a differential sync 3 rtt's are needed. |
705 | rtt += (iterations - 1 ) * 0.5; | 965 | * for every active/passive switch additional 3.5 rtt's are used |
706 | rtt += 3 * iterations; | 966 | */ |
967 | if ((perf_store.element.received != 0) || | ||
968 | (perf_store.element.sent != 0)) | ||
969 | { | ||
970 | int iterations = perf_store.active_passive_switches; | ||
971 | |||
972 | if (iterations > 0) | ||
973 | rtt += iterations * 0.5; | ||
974 | rtt += 2.5; | ||
975 | } | ||
976 | |||
977 | |||
978 | /** | ||
979 | * Calculate data sended size | ||
980 | */ | ||
981 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
982 | GNUNET_SETU_ResultMessage), | ||
983 | perf_store.request_full); | ||
984 | |||
985 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
986 | GNUNET_SETU_ElementMessage), | ||
987 | perf_store.element_full); | ||
988 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
989 | GNUNET_SETU_ElementMessage), | ||
990 | perf_store.element); | ||
991 | // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request); | ||
992 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
993 | StrataEstimatorMessage), | ||
994 | perf_store.se); | ||
995 | bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done); | ||
996 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage), | ||
997 | perf_store.ibf); | ||
998 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage), | ||
999 | perf_store.inquery); | ||
1000 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
1001 | GNUNET_MessageHeader), | ||
1002 | perf_store.demand); | ||
1003 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
1004 | GNUNET_MessageHeader), | ||
1005 | perf_store.offer); | ||
1006 | bytes_transmitted += sum_sent_received_bytes (4, perf_store.done); | ||
1007 | |||
1008 | /** | ||
1009 | * Write IBF failure rate for different BUCKET_NUMBER_FACTOR | ||
1010 | */ | ||
1011 | float factor; | ||
1012 | GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR", | ||
1013 | &factor); | ||
1014 | long long num_per_bucket; | ||
1015 | GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET", | ||
1016 | &num_per_bucket); | ||
1017 | |||
1018 | |||
1019 | int decoded = 0; | ||
1020 | if (perf_store.active_passive_switches == 0) | ||
1021 | decoded = 1; | ||
1022 | int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct | ||
1023 | IBFMessage), | ||
1024 | perf_store.ibf); | ||
1025 | |||
1026 | FILE *out1 = fopen ("perf_data.csv", "a"); | ||
1027 | fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor, | ||
1028 | decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff, | ||
1029 | bytes_transmitted, | ||
1030 | perf_store.se_diff_local,perf_store.se_diff_remote, | ||
1031 | perf_store.mode_of_operation); | ||
1032 | fclose (out1); | ||
1033 | |||
1034 | } | ||
1035 | |||
1036 | |||
1037 | #endif | ||
1038 | /** | ||
1039 | * Function that chooses the optimal mode of operation depending on | ||
1040 | * operation parameters. | ||
1041 | * @param avg_element_size | ||
1042 | * @param local_set_size | ||
1043 | * @param remote_set_size | ||
1044 | * @param est_set_diff_remote | ||
1045 | * @param est_set_diff_local | ||
1046 | * @param bandwith_latency_tradeoff | ||
1047 | * @param ibf_bucket_number_factor | ||
1048 | * @return calcuated mode of operation | ||
1049 | */ | ||
1050 | static uint8_t | ||
1051 | estimate_best_mode_of_operation (uint64_t avg_element_size, | ||
1052 | uint64_t local_set_size, | ||
1053 | uint64_t remote_set_size, | ||
1054 | uint64_t est_set_diff_remote, | ||
1055 | uint64_t est_set_diff_local, | ||
1056 | uint64_t bandwith_latency_tradeoff, | ||
1057 | uint64_t ibf_bucket_number_factor) | ||
1058 | { | ||
1059 | |||
1060 | /* | ||
1061 | * In case of initial sync fall to predefined states | ||
1062 | */ | ||
1063 | |||
1064 | if (0 == local_set_size) | ||
1065 | return FULL_SYNC_REMOTE_SENDING_FIRST; | ||
1066 | if (0 == remote_set_size) | ||
1067 | return FULL_SYNC_LOCAL_SENDING_FIRST; | ||
1068 | |||
1069 | /* | ||
1070 | * Calculate bytes for full Sync | ||
1071 | */ | ||
1072 | |||
1073 | uint8_t sizeof_full_done_header = 4; | ||
1074 | uint8_t sizeof_done_header = 4; | ||
1075 | uint8_t rtt_min_full = 2; | ||
1076 | uint8_t sizeof_request_full = 4; | ||
1077 | uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local); | ||
1078 | |||
1079 | /* Estimate byte required if we send first */ | ||
1080 | uint64_t total_elements_to_send_local_send_first = est_set_diff_remote | ||
1081 | + local_set_size; | ||
1082 | |||
1083 | uint64_t total_bytes_full_local_send_first = (avg_element_size | ||
1084 | * | ||
1085 | total_elements_to_send_local_send_first) \ | ||
1086 | + ( | ||
1087 | total_elements_to_send_local_send_first * sizeof(struct | ||
1088 | GNUNET_SETU_ElementMessage)) \ | ||
1089 | + (sizeof_full_done_header * 2) \ | ||
1090 | + rtt_min_full | ||
1091 | * bandwith_latency_tradeoff; | ||
1092 | |||
1093 | /* Estimate bytes required if we request from remote peer */ | ||
1094 | uint64_t total_elements_to_send_remote_send_first = est_set_diff_local | ||
1095 | + remote_set_size; | ||
1096 | |||
1097 | uint64_t total_bytes_full_remote_send_first = (avg_element_size | ||
1098 | * | ||
1099 | total_elements_to_send_remote_send_first) \ | ||
1100 | + ( | ||
1101 | total_elements_to_send_remote_send_first * sizeof(struct | ||
1102 | GNUNET_SETU_ElementMessage)) \ | ||
1103 | + (sizeof_full_done_header * 2) \ | ||
1104 | + (rtt_min_full + 0.5) | ||
1105 | * bandwith_latency_tradeoff \ | ||
1106 | + sizeof_request_full; | ||
1107 | |||
1108 | /* | ||
1109 | * Calculate bytes for differential Sync | ||
1110 | */ | ||
1111 | |||
1112 | /* Estimate bytes required by IBF transmission*/ | ||
1113 | |||
1114 | long double ibf_bucket_count = estimated_total_diff | ||
1115 | * ibf_bucket_number_factor; | ||
1116 | |||
1117 | if (ibf_bucket_count <= IBF_MIN_SIZE) | ||
1118 | { | ||
1119 | ibf_bucket_count = IBF_MIN_SIZE; | ||
1120 | } | ||
1121 | uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count) | ||
1122 | / ((float) MAX_BUCKETS_PER_MESSAGE)); | ||
1123 | |||
1124 | uint64_t estimated_counter_size = ceil ( | ||
1125 | MIN (2 * log2l (((float) local_set_size) | ||
1126 | / ((float) ibf_bucket_count)), | ||
1127 | log2l (local_set_size))); | ||
1128 | |||
1129 | long double counter_bytes = (float) estimated_counter_size / 8; | ||
1130 | |||
1131 | uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count) | ||
1132 | * 1.2 \ | ||
1133 | + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \ | ||
1134 | + (ibf_bucket_count * sizeof(struct IBF_KeyHash)) | ||
1135 | * 1.2 \ | ||
1136 | + (ibf_bucket_count * counter_bytes) * 1.2); | ||
1137 | |||
1138 | /* Estimate full byte count for differential sync */ | ||
1139 | uint64_t element_size = (avg_element_size | ||
1140 | + sizeof (struct GNUNET_SETU_ElementMessage)) \ | ||
1141 | * estimated_total_diff; | ||
1142 | uint64_t done_size = sizeof_done_header; | ||
1143 | uint64_t inquery_size = (sizeof (struct IBF_Key) | ||
1144 | + sizeof (struct InquiryMessage)) | ||
1145 | * estimated_total_diff; | ||
1146 | uint64_t demand_size = | ||
1147 | (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader)) | ||
1148 | * estimated_total_diff; | ||
1149 | uint64_t offer_size = (sizeof (struct GNUNET_HashCode) | ||
1150 | + sizeof (struct GNUNET_MessageHeader)) | ||
1151 | * estimated_total_diff; | ||
1152 | |||
1153 | uint64_t total_bytes_diff = (element_size + done_size + inquery_size | ||
1154 | + demand_size + offer_size + ibf_bytes) \ | ||
1155 | + (DIFFERENTIAL_RTT_MEAN | ||
1156 | * bandwith_latency_tradeoff); | ||
1157 | |||
1158 | uint64_t full_min = MIN (total_bytes_full_local_send_first, | ||
1159 | total_bytes_full_remote_send_first); | ||
1160 | |||
1161 | /* Decide between full and differential sync */ | ||
1162 | |||
1163 | if (full_min < total_bytes_diff) | ||
1164 | { | ||
1165 | /* Decide between sending all element first or receiving all elements */ | ||
1166 | if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first) | ||
1167 | { | ||
1168 | return FULL_SYNC_LOCAL_SENDING_FIRST; | ||
1169 | } | ||
1170 | else | ||
1171 | { | ||
1172 | return FULL_SYNC_REMOTE_SENDING_FIRST; | ||
1173 | } | ||
1174 | } | ||
1175 | else | ||
1176 | { | ||
1177 | return DIFFERENTIAL_SYNC; | ||
1178 | } | ||
1179 | } | ||
1180 | |||
1181 | |||
1182 | /** | ||
1183 | * Validates the if a message is received in a correct phase | ||
1184 | * @param allowed_phases | ||
1185 | * @param size_phases | ||
1186 | * @param op | ||
1187 | * @return #GNUNET_YES if message permitted in phase and #GNUNET_NO if not permitted in given | ||
1188 | * phase | ||
1189 | */ | ||
1190 | static enum GNUNET_GenericReturnValue | ||
1191 | check_valid_phase (const uint8_t allowed_phases[], | ||
1192 | size_t size_phases, | ||
1193 | struct Operation *op) | ||
1194 | { | ||
1195 | /** | ||
1196 | * Iterate over allowed phases | ||
1197 | */ | ||
1198 | for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++) | ||
1199 | { | ||
1200 | uint8_t phase = allowed_phases[phase_ctr]; | ||
1201 | if (phase == op->phase) | ||
1202 | { | ||
1203 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1204 | "Message received in valid phase\n"); | ||
1205 | return GNUNET_YES; | ||
1206 | } | ||
1207 | } | ||
1208 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1209 | "Received message in invalid phase: %u\n", op->phase); | ||
1210 | return GNUNET_NO; | ||
1211 | } | ||
1212 | |||
1213 | |||
1214 | /** | ||
1215 | * Function to update, track and validate message received in differential | ||
1216 | * sync. This function tracks states of messages and check it against different | ||
1217 | * constraints as described in Summermatter's BSc Thesis (2021) | ||
1218 | * @param hash_map: Hashmap to store message control flow | ||
1219 | * @param new_mcfs: The new message control flow state an given message type should be set to | ||
1220 | * @param hash_code: Hash code of the element | ||
1221 | * @param mt: The message type for which the message control flow state should be set | ||
1222 | * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid | ||
1223 | * at this point in message flow | ||
1224 | */ | ||
1225 | static int | ||
1226 | update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map, | ||
1227 | enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, | ||
1228 | const struct GNUNET_HashCode *hash_code, | ||
1229 | enum MESSAGE_TYPE mt) | ||
1230 | { | ||
1231 | struct messageControlFlowElement *cfe = NULL; | ||
1232 | enum MESSAGE_CONTROL_FLOW_STATE *mcfs; | ||
1233 | |||
1234 | /** | ||
1235 | * Check logic for forbidden messages | ||
1236 | */ | ||
1237 | |||
1238 | cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code); | ||
1239 | if ((ELEMENT_MESSAGE == mt) && (cfe != NULL)) | ||
1240 | { | ||
1241 | if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer)) | ||
1242 | { | ||
1243 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1244 | "Received an element without sent offer!\n"); | ||
1245 | return GNUNET_NO; | ||
1246 | } | ||
1247 | /* Check that only requested elements are received! */ | ||
1248 | if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand != | ||
1249 | MSG_CFS_SENT)) | ||
1250 | { | ||
1251 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1252 | "Received an element that was not demanded\n"); | ||
1253 | return GNUNET_NO; | ||
1254 | } | ||
1255 | } | ||
1256 | |||
1257 | /** | ||
1258 | * In case the element hash is not in the hashmap create a new entry | ||
1259 | */ | ||
1260 | |||
1261 | if (NULL == cfe) | ||
1262 | { | ||
1263 | cfe = GNUNET_new (struct messageControlFlowElement); | ||
1264 | if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code, | ||
1265 | cfe, | ||
1266 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) | ||
1267 | { | ||
1268 | GNUNET_free (cfe); | ||
1269 | return GNUNET_SYSERR; | ||
1270 | } | ||
1271 | } | ||
1272 | |||
1273 | /** | ||
1274 | * Set state of message | ||
1275 | */ | ||
1276 | |||
1277 | if (OFFER_MESSAGE == mt) | ||
1278 | { | ||
1279 | mcfs = &cfe->offer; | ||
1280 | } | ||
1281 | else if (DEMAND_MESSAGE == mt) | ||
1282 | { | ||
1283 | mcfs = &cfe->demand; | ||
1284 | } | ||
1285 | else if (ELEMENT_MESSAGE == mt) | ||
1286 | { | ||
1287 | mcfs = &cfe->element; | ||
1288 | } | ||
1289 | else | ||
1290 | { | ||
1291 | return GNUNET_SYSERR; | ||
1292 | } | ||
1293 | |||
1294 | /** | ||
1295 | * Check if state is allowed | ||
1296 | */ | ||
1297 | |||
1298 | if (new_mcfs <= *mcfs) | ||
1299 | { | ||
1300 | return GNUNET_NO; | ||
1301 | } | ||
1302 | |||
1303 | *mcfs = new_mcfs; | ||
1304 | return GNUNET_YES; | ||
1305 | } | ||
1306 | |||
1307 | |||
1308 | /** | ||
1309 | * Validate if a message in differential sync si already received before. | ||
1310 | * @param hash_map | ||
1311 | * @param hash_code | ||
1312 | * @param mt | ||
1313 | * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO | ||
1314 | */ | ||
1315 | static int | ||
1316 | is_message_in_message_control_flow (struct | ||
1317 | GNUNET_CONTAINER_MultiHashMap *hash_map, | ||
1318 | struct GNUNET_HashCode *hash_code, | ||
1319 | enum MESSAGE_TYPE mt) | ||
1320 | { | ||
1321 | struct messageControlFlowElement *cfe = NULL; | ||
1322 | enum MESSAGE_CONTROL_FLOW_STATE *mcfs; | ||
1323 | |||
1324 | cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code); | ||
1325 | |||
1326 | /** | ||
1327 | * Set state of message | ||
1328 | */ | ||
1329 | |||
1330 | if (cfe != NULL) | ||
1331 | { | ||
1332 | if (OFFER_MESSAGE == mt) | ||
1333 | { | ||
1334 | mcfs = &cfe->offer; | ||
1335 | } | ||
1336 | else if (DEMAND_MESSAGE == mt) | ||
1337 | { | ||
1338 | mcfs = &cfe->demand; | ||
1339 | } | ||
1340 | else if (ELEMENT_MESSAGE == mt) | ||
1341 | { | ||
1342 | mcfs = &cfe->element; | ||
1343 | } | ||
1344 | else | ||
1345 | { | ||
1346 | return GNUNET_SYSERR; | ||
1347 | } | ||
707 | 1348 | ||
708 | /** | 1349 | /** |
709 | * Calculate data sended size | 1350 | * Evaluate if set is in message |
710 | */ | 1351 | */ |
711 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full); | 1352 | if (*mcfs != MSG_CFS_UNINITIALIZED) |
712 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full); | 1353 | { |
713 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element); | 1354 | return GNUNET_YES; |
714 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request); | 1355 | } |
715 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se); | 1356 | } |
716 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done); | 1357 | return GNUNET_NO; |
717 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf); | 1358 | } |
718 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery); | ||
719 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand); | ||
720 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer); | ||
721 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done); | ||
722 | 1359 | ||
723 | LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted); | ||
724 | 1360 | ||
725 | LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt )); | 1361 | /** |
1362 | * Iterator for determining if all demands have been | ||
1363 | * satisfied | ||
1364 | * | ||
1365 | * @param cls the union operation `struct Operation *` | ||
1366 | * @param key unused | ||
1367 | * @param value the `struct ElementEntry *` to insert | ||
1368 | * into the key-to-element mapping | ||
1369 | * @return #GNUNET_YES (to continue iterating) | ||
1370 | */ | ||
1371 | static int | ||
1372 | determinate_done_message_iterator (void *cls, | ||
1373 | const struct GNUNET_HashCode *key, | ||
1374 | void *value) | ||
1375 | { | ||
1376 | struct messageControlFlowElement *mcfe = value; | ||
726 | 1377 | ||
727 | return rtt; | 1378 | if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) )) |
1379 | { | ||
1380 | return GNUNET_YES; | ||
1381 | } | ||
1382 | return GNUNET_NO; | ||
1383 | } | ||
1384 | |||
1385 | |||
1386 | /** | ||
1387 | * Iterator for determining average size | ||
1388 | * | ||
1389 | * @param cls the union operation `struct Operation *` | ||
1390 | * @param key unused | ||
1391 | * @param value the `struct ElementEntry *` to insert | ||
1392 | * into the key-to-element mapping | ||
1393 | * @return #GNUNET_YES (to continue iterating) | ||
1394 | */ | ||
1395 | static int | ||
1396 | determinate_avg_element_size_iterator (void *cls, | ||
1397 | const struct GNUNET_HashCode *key, | ||
1398 | void *value) | ||
1399 | { | ||
1400 | struct Operation *op = cls; | ||
1401 | struct GNUNET_SETU_Element *element = value; | ||
1402 | op->total_elements_size_local += element->size; | ||
1403 | return GNUNET_YES; | ||
1404 | } | ||
1405 | |||
1406 | |||
1407 | /** | ||
1408 | * Create randomized element hashmap for full sending | ||
1409 | * | ||
1410 | * @param cls the union operation `struct Operation *` | ||
1411 | * @param key unused | ||
1412 | * @param value the `struct ElementEntry *` to insert | ||
1413 | * into the key-to-element mapping | ||
1414 | * @return #GNUNET_YES (to continue iterating) | ||
1415 | */ | ||
1416 | static int | ||
1417 | create_randomized_element_iterator (void *cls, | ||
1418 | const struct GNUNET_HashCode *key, | ||
1419 | void *value) | ||
1420 | { | ||
1421 | struct Operation *op = cls; | ||
1422 | |||
1423 | struct GNUNET_HashContext *hashed_key_context = | ||
1424 | GNUNET_CRYPTO_hash_context_start (); | ||
1425 | struct GNUNET_HashCode new_key; | ||
1426 | |||
1427 | /** | ||
1428 | * Hash element with new salt to randomize hashmap | ||
1429 | */ | ||
1430 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
1431 | &key, | ||
1432 | sizeof(struct IBF_Key)); | ||
1433 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
1434 | &op->set->content->elements_randomized_salt, | ||
1435 | sizeof(uint32_t)); | ||
1436 | GNUNET_CRYPTO_hash_context_finish (hashed_key_context, | ||
1437 | &new_key); | ||
1438 | GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized, | ||
1439 | &new_key,value, | ||
1440 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
1441 | return GNUNET_YES; | ||
728 | } | 1442 | } |
729 | 1443 | ||
730 | 1444 | ||
@@ -808,6 +1522,36 @@ send_client_done (void *cls) | |||
808 | } | 1522 | } |
809 | 1523 | ||
810 | 1524 | ||
1525 | /** | ||
1526 | * Check if all given byzantine parameters are in given boundaries | ||
1527 | * @param op | ||
1528 | * @return indicator if all given byzantine parameters are in given boundaries | ||
1529 | */ | ||
1530 | |||
1531 | static int | ||
1532 | check_byzantine_bounds (struct Operation *op) | ||
1533 | { | ||
1534 | if (op->byzantine != GNUNET_YES) | ||
1535 | return GNUNET_OK; | ||
1536 | |||
1537 | /** | ||
1538 | * Check upper byzantine bounds | ||
1539 | */ | ||
1540 | if (op->remote_element_count + op->remote_set_diff > | ||
1541 | op->byzantine_upper_bound) | ||
1542 | return GNUNET_SYSERR; | ||
1543 | if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound) | ||
1544 | return GNUNET_SYSERR; | ||
1545 | |||
1546 | /** | ||
1547 | * Check lower byzantine bounds | ||
1548 | */ | ||
1549 | if (op->remote_element_count < op->byzantine_lower_bound) | ||
1550 | return GNUNET_SYSERR; | ||
1551 | return GNUNET_OK; | ||
1552 | } | ||
1553 | |||
1554 | |||
811 | /* FIXME: the destroy logic is a mess and should be cleaned up! */ | 1555 | /* FIXME: the destroy logic is a mess and should be cleaned up! */ |
812 | 1556 | ||
813 | /** | 1557 | /** |
@@ -977,6 +1721,101 @@ fail_union_operation (struct Operation *op) | |||
977 | 1721 | ||
978 | 1722 | ||
979 | /** | 1723 | /** |
1724 | * Function that checks if full sync is plausible | ||
1725 | * @param initial_local_elements_in_set | ||
1726 | * @param estimated_set_difference | ||
1727 | * @param repeated_elements | ||
1728 | * @param fresh_elements | ||
1729 | * @param op | ||
1730 | * @return GNUNET_OK if | ||
1731 | */ | ||
1732 | |||
1733 | static void | ||
1734 | full_sync_plausibility_check (struct Operation *op) | ||
1735 | { | ||
1736 | if (GNUNET_YES != op->byzantine) | ||
1737 | return; | ||
1738 | |||
1739 | int security_level_lb = -1 * SECURITY_LEVEL; | ||
1740 | uint64_t duplicates = op->received_fresh - op->received_total; | ||
1741 | |||
1742 | /* | ||
1743 | * Protect full sync from receiving double element when in FULL SENDING | ||
1744 | */ | ||
1745 | if (PHASE_FULL_SENDING == op->phase) | ||
1746 | { | ||
1747 | if (duplicates > 0) | ||
1748 | { | ||
1749 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1750 | "PROTOCOL VIOLATION: Received duplicate element in full receiving " | ||
1751 | "mode of operation this is not allowed! Duplicates: %llu\n", | ||
1752 | (unsigned long long) duplicates); | ||
1753 | GNUNET_break_op (0); | ||
1754 | fail_union_operation (op); | ||
1755 | return; | ||
1756 | } | ||
1757 | |||
1758 | } | ||
1759 | |||
1760 | /* | ||
1761 | * Protect full sync with probabilistic algorithm | ||
1762 | */ | ||
1763 | if (PHASE_FULL_RECEIVING == op->phase) | ||
1764 | { | ||
1765 | if (0 == op->remote_set_diff) | ||
1766 | op->remote_set_diff = 1; | ||
1767 | |||
1768 | long double base = (1 - (long double) (op->remote_set_diff | ||
1769 | / (long double) (op->initial_size | ||
1770 | + op-> | ||
1771 | remote_set_diff))); | ||
1772 | long double exponent = (op->received_total - (op->received_fresh * ((long | ||
1773 | double) | ||
1774 | op-> | ||
1775 | initial_size | ||
1776 | / (long | ||
1777 | double) | ||
1778 | op-> | ||
1779 | remote_set_diff))); | ||
1780 | long double value = exponent * (log2l (base) / log2l (2)); | ||
1781 | if ((value < security_level_lb) || (value > SECURITY_LEVEL) ) | ||
1782 | { | ||
1783 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1784 | "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving " | ||
1785 | "to many duplicated full element : %LF\n", | ||
1786 | value); | ||
1787 | GNUNET_break_op (0); | ||
1788 | fail_union_operation (op); | ||
1789 | return; | ||
1790 | } | ||
1791 | } | ||
1792 | } | ||
1793 | |||
1794 | |||
1795 | /** | ||
1796 | * Limit active passive switches in differential sync to configured security level | ||
1797 | * @param op | ||
1798 | */ | ||
1799 | static void | ||
1800 | check_max_differential_rounds (struct Operation *op) | ||
1801 | { | ||
1802 | double probability = op->differential_sync_iterations * (log2l ( | ||
1803 | PROBABILITY_FOR_NEW_ROUND) | ||
1804 | / log2l (2)); | ||
1805 | if ((-1 * SECURITY_LEVEL) > probability) | ||
1806 | { | ||
1807 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1808 | "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive " | ||
1809 | "switches in differential sync: %u\n", | ||
1810 | op->differential_sync_iterations); | ||
1811 | GNUNET_break_op (0); | ||
1812 | fail_union_operation (op); | ||
1813 | return; | ||
1814 | } | ||
1815 | } | ||
1816 | |||
1817 | |||
1818 | /** | ||
980 | * Derive the IBF key from a hash code and | 1819 | * Derive the IBF key from a hash code and |
981 | * a salt. | 1820 | * a salt. |
982 | * | 1821 | * |
@@ -1004,12 +1843,12 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
1004 | struct GetElementContext | 1843 | struct GetElementContext |
1005 | { | 1844 | { |
1006 | /** | 1845 | /** |
1007 | * FIXME. | 1846 | * Gnunet hash code in context |
1008 | */ | 1847 | */ |
1009 | struct GNUNET_HashCode hash; | 1848 | struct GNUNET_HashCode hash; |
1010 | 1849 | ||
1011 | /** | 1850 | /** |
1012 | * FIXME. | 1851 | * Pointer to the key entry |
1013 | */ | 1852 | */ |
1014 | struct KeyEntry *k; | 1853 | struct KeyEntry *k; |
1015 | }; | 1854 | }; |
@@ -1122,7 +1961,7 @@ salt_key (const struct IBF_Key *k_in, | |||
1122 | uint32_t salt, | 1961 | uint32_t salt, |
1123 | struct IBF_Key *k_out) | 1962 | struct IBF_Key *k_out) |
1124 | { | 1963 | { |
1125 | int s = salt % 64; | 1964 | int s = (salt * 7) % 64; |
1126 | uint64_t x = k_in->key_val; | 1965 | uint64_t x = k_in->key_val; |
1127 | 1966 | ||
1128 | /* rotate ibf key */ | 1967 | /* rotate ibf key */ |
@@ -1132,14 +1971,14 @@ salt_key (const struct IBF_Key *k_in, | |||
1132 | 1971 | ||
1133 | 1972 | ||
1134 | /** | 1973 | /** |
1135 | * FIXME. | 1974 | * Reverse modification done in the salt_key function |
1136 | */ | 1975 | */ |
1137 | static void | 1976 | static void |
1138 | unsalt_key (const struct IBF_Key *k_in, | 1977 | unsalt_key (const struct IBF_Key *k_in, |
1139 | uint32_t salt, | 1978 | uint32_t salt, |
1140 | struct IBF_Key *k_out) | 1979 | struct IBF_Key *k_out) |
1141 | { | 1980 | { |
1142 | int s = salt % 64; | 1981 | int s = (salt * 7) % 64; |
1143 | uint64_t x = k_in->key_val; | 1982 | uint64_t x = k_in->key_val; |
1144 | 1983 | ||
1145 | x = (x << s) | (x >> (64 - s)); | 1984 | x = (x << s) | (x >> (64 - s)); |
@@ -1258,7 +2097,9 @@ prepare_ibf (struct Operation *op, | |||
1258 | 2097 | ||
1259 | if (NULL != op->local_ibf) | 2098 | if (NULL != op->local_ibf) |
1260 | ibf_destroy (op->local_ibf); | 2099 | ibf_destroy (op->local_ibf); |
1261 | op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 2100 | // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
2101 | op->local_ibf = ibf_create (size, | ||
2102 | ((uint8_t) op->ibf_number_buckets_per_element)); | ||
1262 | if (NULL == op->local_ibf) | 2103 | if (NULL == op->local_ibf) |
1263 | { | 2104 | { |
1264 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2105 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -1283,13 +2124,23 @@ prepare_ibf (struct Operation *op, | |||
1283 | */ | 2124 | */ |
1284 | static int | 2125 | static int |
1285 | send_ibf (struct Operation *op, | 2126 | send_ibf (struct Operation *op, |
1286 | uint16_t ibf_order) | 2127 | uint32_t ibf_size) |
1287 | { | 2128 | { |
1288 | unsigned int buckets_sent = 0; | 2129 | uint64_t buckets_sent = 0; |
1289 | struct InvertibleBloomFilter *ibf; | 2130 | struct InvertibleBloomFilter *ibf; |
2131 | op->differential_sync_iterations++; | ||
1290 | 2132 | ||
2133 | /** | ||
2134 | * Enforce min size of IBF | ||
2135 | */ | ||
2136 | uint32_t ibf_min_size = IBF_MIN_SIZE; | ||
2137 | |||
2138 | if (ibf_size < ibf_min_size) | ||
2139 | { | ||
2140 | ibf_size = ibf_min_size; | ||
2141 | } | ||
1291 | if (GNUNET_OK != | 2142 | if (GNUNET_OK != |
1292 | prepare_ibf (op, 1 << ibf_order)) | 2143 | prepare_ibf (op, ibf_size)) |
1293 | { | 2144 | { |
1294 | /* allocation failed */ | 2145 | /* allocation failed */ |
1295 | return GNUNET_SYSERR; | 2146 | return GNUNET_SYSERR; |
@@ -1297,45 +2148,52 @@ send_ibf (struct Operation *op, | |||
1297 | 2148 | ||
1298 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2149 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1299 | "sending ibf of size %u\n", | 2150 | "sending ibf of size %u\n", |
1300 | 1 << ibf_order); | 2151 | (unsigned int) ibf_size); |
1301 | 2152 | ||
1302 | { | 2153 | { |
1303 | char name[64]; | 2154 | char name[64]; |
1304 | GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); | 2155 | |
2156 | GNUNET_snprintf (name, | ||
2157 | sizeof(name), | ||
2158 | "# sent IBF (order %u)", | ||
2159 | ibf_size); | ||
1305 | GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); | 2160 | GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); |
1306 | } | 2161 | } |
1307 | 2162 | ||
1308 | ibf = op->local_ibf; | 2163 | ibf = op->local_ibf; |
1309 | 2164 | ||
1310 | while (buckets_sent < (1 << ibf_order)) | 2165 | while (buckets_sent < ibf_size) |
1311 | { | 2166 | { |
1312 | unsigned int buckets_in_message; | 2167 | unsigned int buckets_in_message; |
1313 | struct GNUNET_MQ_Envelope *ev; | 2168 | struct GNUNET_MQ_Envelope *ev; |
1314 | struct IBFMessage *msg; | 2169 | struct IBFMessage *msg; |
1315 | 2170 | ||
1316 | buckets_in_message = (1 << ibf_order) - buckets_sent; | 2171 | buckets_in_message = ibf_size - buckets_sent; |
1317 | /* limit to maximum */ | 2172 | /* limit to maximum */ |
1318 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | 2173 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) |
1319 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | 2174 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; |
1320 | 2175 | ||
1321 | perf_rtt.ibf.sent += 1; | 2176 | #if MEASURE_PERFORMANCE |
1322 | perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE ); | 2177 | perf_store.ibf.sent += 1; |
2178 | perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE); | ||
2179 | #endif | ||
1323 | ev = GNUNET_MQ_msg_extra (msg, | 2180 | ev = GNUNET_MQ_msg_extra (msg, |
1324 | buckets_in_message * IBF_BUCKET_SIZE, | 2181 | buckets_in_message * IBF_BUCKET_SIZE, |
1325 | GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); | 2182 | GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); |
1326 | msg->reserved1 = 0; | 2183 | msg->ibf_size = ibf_size; |
1327 | msg->reserved2 = 0; | ||
1328 | msg->order = ibf_order; | ||
1329 | msg->offset = htonl (buckets_sent); | 2184 | msg->offset = htonl (buckets_sent); |
1330 | msg->salt = htonl (op->salt_send); | 2185 | msg->salt = htonl (op->salt_send); |
2186 | msg->ibf_counter_bit_length = ibf_get_max_counter (ibf); | ||
2187 | |||
2188 | |||
1331 | ibf_write_slice (ibf, buckets_sent, | 2189 | ibf_write_slice (ibf, buckets_sent, |
1332 | buckets_in_message, &msg[1]); | 2190 | buckets_in_message, &msg[1], msg->ibf_counter_bit_length); |
1333 | buckets_sent += buckets_in_message; | 2191 | buckets_sent += buckets_in_message; |
1334 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2192 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1335 | "ibf chunk size %u, %u/%u sent\n", | 2193 | "ibf chunk size %u, %llu/%u sent\n", |
1336 | buckets_in_message, | 2194 | (unsigned int) buckets_in_message, |
1337 | buckets_sent, | 2195 | (unsigned long long) buckets_sent, |
1338 | 1 << ibf_order); | 2196 | (unsigned int) ibf_size); |
1339 | GNUNET_MQ_send (op->mq, ev); | 2197 | GNUNET_MQ_send (op->mq, ev); |
1340 | } | 2198 | } |
1341 | 2199 | ||
@@ -1354,17 +2212,26 @@ send_ibf (struct Operation *op, | |||
1354 | * @return the required size of the ibf | 2212 | * @return the required size of the ibf |
1355 | */ | 2213 | */ |
1356 | static unsigned int | 2214 | static unsigned int |
1357 | get_order_from_difference (unsigned int diff) | 2215 | get_size_from_difference (unsigned int diff, int number_buckets_per_element, |
2216 | float ibf_bucket_number_factor) | ||
1358 | { | 2217 | { |
1359 | unsigned int ibf_order; | 2218 | /** Make ibf estimation size odd reasoning can be found in BSc Thesis of |
2219 | * Elias Summermatter (2021) in section 3.11 **/ | ||
2220 | return (((int) (diff * ibf_bucket_number_factor)) | 1); | ||
2221 | |||
2222 | } | ||
1360 | 2223 | ||
1361 | ibf_order = 2; | 2224 | |
1362 | while (((1 << ibf_order) < (IBF_ALPHA * diff) || | 2225 | static unsigned int |
1363 | ((1 << ibf_order) < SE_IBF_HASH_NUM)) && | 2226 | get_next_ibf_size (float ibf_bucket_number_factor, unsigned int |
1364 | (ibf_order < MAX_IBF_ORDER)) | 2227 | decoded_elements, unsigned int last_ibf_size) |
1365 | ibf_order++; | 2228 | { |
1366 | // add one for correction | 2229 | unsigned int next_size = (unsigned int) ((last_ibf_size * 2) |
1367 | return ibf_order + 1; | 2230 | - (ibf_bucket_number_factor |
2231 | * decoded_elements)); | ||
2232 | /** Make ibf estimation size odd reasoning can be found in BSc Thesis of | ||
2233 | * Elias Summermatter (2021) in section 3.11 **/ | ||
2234 | return next_size | 1; | ||
1368 | } | 2235 | } |
1369 | 2236 | ||
1370 | 2237 | ||
@@ -1391,8 +2258,10 @@ send_full_element_iterator (void *cls, | |||
1391 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2258 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1392 | "Sending element %s\n", | 2259 | "Sending element %s\n", |
1393 | GNUNET_h2s (key)); | 2260 | GNUNET_h2s (key)); |
1394 | perf_rtt.element_full.received += 1; | 2261 | #if MEASURE_PERFORMANCE |
1395 | perf_rtt.element_full.received_var_bytes += el->size; | 2262 | perf_store.element_full.received += 1; |
2263 | perf_store.element_full.received_var_bytes += el->size; | ||
2264 | #endif | ||
1396 | ev = GNUNET_MQ_msg_extra (emsg, | 2265 | ev = GNUNET_MQ_msg_extra (emsg, |
1397 | el->size, | 2266 | el->size, |
1398 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); | 2267 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); |
@@ -1421,10 +2290,25 @@ send_full_set (struct Operation *op) | |||
1421 | "Dedicing to transmit the full set\n"); | 2290 | "Dedicing to transmit the full set\n"); |
1422 | /* FIXME: use a more memory-friendly way of doing this with an | 2291 | /* FIXME: use a more memory-friendly way of doing this with an |
1423 | iterator, just as we do in the non-full case! */ | 2292 | iterator, just as we do in the non-full case! */ |
2293 | |||
2294 | // Randomize Elements to send | ||
2295 | op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create ( | ||
2296 | 32,GNUNET_NO); | ||
2297 | op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 ( | ||
2298 | GNUNET_CRYPTO_QUALITY_NONCE, | ||
2299 | UINT64_MAX); | ||
1424 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | 2300 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, |
1425 | &send_full_element_iterator, | 2301 | & |
2302 | create_randomized_element_iterator, | ||
1426 | op); | 2303 | op); |
1427 | perf_rtt.full_done.sent += 1; | 2304 | |
2305 | (void) GNUNET_CONTAINER_multihashmap_iterate ( | ||
2306 | op->set->content->elements_randomized, | ||
2307 | &send_full_element_iterator, | ||
2308 | op); | ||
2309 | #if MEASURE_PERFORMANCE | ||
2310 | perf_store.full_done.sent += 1; | ||
2311 | #endif | ||
1428 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); | 2312 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); |
1429 | GNUNET_MQ_send (op->mq, | 2313 | GNUNET_MQ_send (op->mq, |
1430 | ev); | 2314 | ev); |
@@ -1454,7 +2338,7 @@ check_union_p2p_strata_estimator (void *cls, | |||
1454 | msg->header.type)); | 2338 | msg->header.type)); |
1455 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); | 2339 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); |
1456 | if ((GNUNET_NO == is_compressed) && | 2340 | if ((GNUNET_NO == is_compressed) && |
1457 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) | 2341 | (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE)) |
1458 | { | 2342 | { |
1459 | GNUNET_break (0); | 2343 | GNUNET_break (0); |
1460 | return GNUNET_SYSERR; | 2344 | return GNUNET_SYSERR; |
@@ -1473,14 +2357,44 @@ static void | |||
1473 | handle_union_p2p_strata_estimator (void *cls, | 2357 | handle_union_p2p_strata_estimator (void *cls, |
1474 | const struct StrataEstimatorMessage *msg) | 2358 | const struct StrataEstimatorMessage *msg) |
1475 | { | 2359 | { |
1476 | perf_rtt.se.received += 1; | 2360 | #if MEASURE_PERFORMANCE |
1477 | perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); | 2361 | perf_store.se.received += 1; |
2362 | perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct | ||
2363 | StrataEstimatorMessage); | ||
2364 | #endif | ||
1478 | struct Operation *op = cls; | 2365 | struct Operation *op = cls; |
1479 | struct StrataEstimator *remote_se; | 2366 | struct MultiStrataEstimator *remote_se; |
1480 | unsigned int diff; | 2367 | unsigned int diff; |
1481 | uint64_t other_size; | 2368 | uint64_t other_size; |
1482 | size_t len; | 2369 | size_t len; |
1483 | int is_compressed; | 2370 | int is_compressed; |
2371 | op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( | ||
2372 | op->set->content->elements); | ||
2373 | // Setting peer site to receiving peer | ||
2374 | op->peer_site = 1; | ||
2375 | |||
2376 | /** | ||
2377 | * Check that the message is received only in supported phase | ||
2378 | */ | ||
2379 | uint8_t allowed_phases[] = {PHASE_EXPECT_SE}; | ||
2380 | if (GNUNET_OK != | ||
2381 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
2382 | { | ||
2383 | GNUNET_break (0); | ||
2384 | fail_union_operation (op); | ||
2385 | return; | ||
2386 | } | ||
2387 | |||
2388 | /** Only allow 1,2,4,8 SEs **/ | ||
2389 | if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1)) | ||
2390 | { | ||
2391 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2392 | "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n", | ||
2393 | msg->se_count); | ||
2394 | GNUNET_break_op (0); | ||
2395 | fail_union_operation (op); | ||
2396 | return; | ||
2397 | } | ||
1484 | 2398 | ||
1485 | is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( | 2399 | is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( |
1486 | msg->header.type)); | 2400 | msg->header.type)); |
@@ -1490,8 +2404,20 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1490 | GNUNET_NO); | 2404 | GNUNET_NO); |
1491 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); | 2405 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); |
1492 | other_size = GNUNET_ntohll (msg->set_size); | 2406 | other_size = GNUNET_ntohll (msg->set_size); |
2407 | op->remote_element_count = other_size; | ||
2408 | |||
2409 | if (op->byzantine_upper_bound < op->remote_element_count) | ||
2410 | { | ||
2411 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2412 | "Exceeded configured upper bound <%lu> of element: %u\n", | ||
2413 | op->byzantine_upper_bound, | ||
2414 | op->remote_element_count); | ||
2415 | fail_union_operation (op); | ||
2416 | return; | ||
2417 | } | ||
2418 | |||
1493 | remote_se = strata_estimator_create (SE_STRATA_COUNT, | 2419 | remote_se = strata_estimator_create (SE_STRATA_COUNT, |
1494 | SE_IBF_SIZE, | 2420 | SE_IBFS_TOTAL_SIZE, |
1495 | SE_IBF_HASH_NUM); | 2421 | SE_IBF_HASH_NUM); |
1496 | if (NULL == remote_se) | 2422 | if (NULL == remote_se) |
1497 | { | 2423 | { |
@@ -1503,6 +2429,8 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1503 | strata_estimator_read (&msg[1], | 2429 | strata_estimator_read (&msg[1], |
1504 | len, | 2430 | len, |
1505 | is_compressed, | 2431 | is_compressed, |
2432 | msg->se_count, | ||
2433 | SE_IBFS_TOTAL_SIZE, | ||
1506 | remote_se)) | 2434 | remote_se)) |
1507 | { | 2435 | { |
1508 | /* decompression failed */ | 2436 | /* decompression failed */ |
@@ -1511,11 +2439,76 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1511 | return; | 2439 | return; |
1512 | } | 2440 | } |
1513 | GNUNET_assert (NULL != op->se); | 2441 | GNUNET_assert (NULL != op->se); |
1514 | diff = strata_estimator_difference (remote_se, | 2442 | strata_estimator_difference (remote_se, |
1515 | op->se); | 2443 | op->se); |
2444 | |||
2445 | /* Calculate remote local diff */ | ||
2446 | long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count; | ||
2447 | long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count; | ||
2448 | |||
2449 | /* Prevent estimations from overshooting max element */ | ||
2450 | if (diff_remote + op->remote_element_count > op->byzantine_upper_bound) | ||
2451 | diff_remote = op->byzantine_upper_bound - op->remote_element_count; | ||
2452 | if (diff_local + op->local_element_count > op->byzantine_upper_bound) | ||
2453 | diff_local = op->byzantine_upper_bound - op->local_element_count; | ||
2454 | if ((diff_remote < 0) || (diff_local < 0)) | ||
2455 | { | ||
2456 | strata_estimator_destroy (remote_se); | ||
2457 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2458 | "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is " | ||
2459 | "malicious: remote diff %ld, local diff: %ld\n", | ||
2460 | diff_remote, diff_local); | ||
2461 | GNUNET_break_op (0); | ||
2462 | fail_union_operation (op); | ||
2463 | return; | ||
2464 | } | ||
1516 | 2465 | ||
1517 | if (diff > 200) | 2466 | /* Make estimation more precise in initial sync cases */ |
1518 | diff = diff * 3 / 2; | 2467 | if (0 == op->remote_element_count) |
2468 | { | ||
2469 | diff_remote = 0; | ||
2470 | diff_local = op->local_element_count; | ||
2471 | } | ||
2472 | if (0 == op->local_element_count) | ||
2473 | { | ||
2474 | diff_local = 0; | ||
2475 | diff_remote = op->remote_element_count; | ||
2476 | } | ||
2477 | |||
2478 | diff = diff_remote + diff_local; | ||
2479 | op->remote_set_diff = diff_remote; | ||
2480 | |||
2481 | /** Calculate avg element size if not initial sync **/ | ||
2482 | uint64_t avg_element_size = 0; | ||
2483 | if (0 < op->local_element_count) | ||
2484 | { | ||
2485 | op->total_elements_size_local = 0; | ||
2486 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
2487 | & | ||
2488 | determinate_avg_element_size_iterator, | ||
2489 | op); | ||
2490 | avg_element_size = op->total_elements_size_local / op->local_element_count; | ||
2491 | } | ||
2492 | |||
2493 | op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size, | ||
2494 | GNUNET_CONTAINER_multihashmap_size ( | ||
2495 | op->set->content-> | ||
2496 | elements), | ||
2497 | op-> | ||
2498 | remote_element_count, | ||
2499 | diff_remote, | ||
2500 | diff_local, | ||
2501 | op-> | ||
2502 | rtt_bandwidth_tradeoff, | ||
2503 | op-> | ||
2504 | ibf_bucket_number_factor); | ||
2505 | |||
2506 | #if MEASURE_PERFORMANCE | ||
2507 | perf_store.se_diff_local = diff_local; | ||
2508 | perf_store.se_diff_remote = diff_remote; | ||
2509 | perf_store.se_diff = diff; | ||
2510 | perf_store.mode_of_operation = op->mode_of_operation; | ||
2511 | #endif | ||
1519 | 2512 | ||
1520 | strata_estimator_destroy (remote_se); | 2513 | strata_estimator_destroy (remote_se); |
1521 | strata_estimator_destroy (op->se); | 2514 | strata_estimator_destroy (op->se); |
@@ -1523,7 +2516,8 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1523 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2516 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1524 | "got se diff=%d, using ibf size %d\n", | 2517 | "got se diff=%d, using ibf size %d\n", |
1525 | diff, | 2518 | diff, |
1526 | 1U << get_order_from_difference (diff)); | 2519 | 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element, |
2520 | op->ibf_bucket_number_factor)); | ||
1527 | 2521 | ||
1528 | { | 2522 | { |
1529 | char *set_debug; | 2523 | char *set_debug; |
@@ -1546,16 +2540,8 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1546 | return; | 2540 | return; |
1547 | } | 2541 | } |
1548 | 2542 | ||
1549 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1550 | "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff); | ||
1551 | |||
1552 | |||
1553 | /** | ||
1554 | * Added rtt_bandwidth_tradeoff directly need future improvements | ||
1555 | */ | ||
1556 | if ((GNUNET_YES == op->force_full) || | 2543 | if ((GNUNET_YES == op->force_full) || |
1557 | (diff > op->initial_size / 4) || | 2544 | (op->mode_of_operation != DIFFERENTIAL_SYNC)) |
1558 | (0 == other_size)) | ||
1559 | { | 2545 | { |
1560 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2546 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1561 | "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", | 2547 | "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", |
@@ -1565,9 +2551,17 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1565 | "# of full sends", | 2551 | "# of full sends", |
1566 | 1, | 2552 | 1, |
1567 | GNUNET_NO); | 2553 | GNUNET_NO); |
1568 | if ((op->initial_size <= other_size) || | 2554 | if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation) |
1569 | (0 == other_size)) | ||
1570 | { | 2555 | { |
2556 | struct TransmitFullMessage *signal_msg; | ||
2557 | struct GNUNET_MQ_Envelope *ev; | ||
2558 | ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage), | ||
2559 | GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL); | ||
2560 | signal_msg->remote_set_difference = htonl (diff_local); | ||
2561 | signal_msg->remote_set_size = htonl (op->local_element_count); | ||
2562 | signal_msg->local_set_difference = htonl (diff_remote); | ||
2563 | GNUNET_MQ_send (op->mq, | ||
2564 | ev); | ||
1571 | send_full_set (op); | 2565 | send_full_set (op); |
1572 | } | 2566 | } |
1573 | else | 2567 | else |
@@ -1577,9 +2571,15 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1577 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2571 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1578 | "Telling other peer that we expect its full set\n"); | 2572 | "Telling other peer that we expect its full set\n"); |
1579 | op->phase = PHASE_FULL_RECEIVING; | 2573 | op->phase = PHASE_FULL_RECEIVING; |
1580 | perf_rtt.request_full.sent += 1; | 2574 | #if MEASURE_PERFORMANCE |
1581 | ev = GNUNET_MQ_msg_header ( | 2575 | perf_store.request_full.sent += 1; |
1582 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); | 2576 | #endif |
2577 | struct TransmitFullMessage *signal_msg; | ||
2578 | ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage), | ||
2579 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); | ||
2580 | signal_msg->remote_set_difference = htonl (diff_local); | ||
2581 | signal_msg->remote_set_size = htonl (op->local_element_count); | ||
2582 | signal_msg->local_set_difference = htonl (diff_remote); | ||
1583 | GNUNET_MQ_send (op->mq, | 2583 | GNUNET_MQ_send (op->mq, |
1584 | ev); | 2584 | ev); |
1585 | } | 2585 | } |
@@ -1592,7 +2592,9 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1592 | GNUNET_NO); | 2592 | GNUNET_NO); |
1593 | if (GNUNET_OK != | 2593 | if (GNUNET_OK != |
1594 | send_ibf (op, | 2594 | send_ibf (op, |
1595 | get_order_from_difference (diff))) | 2595 | get_size_from_difference (diff, |
2596 | op->ibf_number_buckets_per_element, | ||
2597 | op->ibf_bucket_number_factor))) | ||
1596 | { | 2598 | { |
1597 | /* Internal error, best we can do is shut the connection */ | 2599 | /* Internal error, best we can do is shut the connection */ |
1598 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2600 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -1625,15 +2627,64 @@ send_offers_iterator (void *cls, | |||
1625 | 2627 | ||
1626 | /* Detect 32-bit key collision for the 64-bit IBF keys. */ | 2628 | /* Detect 32-bit key collision for the 64-bit IBF keys. */ |
1627 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) | 2629 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) |
2630 | { | ||
2631 | op->active_passive_switch_required = true; | ||
1628 | return GNUNET_YES; | 2632 | return GNUNET_YES; |
2633 | } | ||
1629 | 2634 | ||
1630 | perf_rtt.offer.sent += 1; | 2635 | /* Prevent implementation from sending a offer multiple times in case of roll switch */ |
1631 | perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); | 2636 | if (GNUNET_YES == |
2637 | is_message_in_message_control_flow ( | ||
2638 | op->message_control_flow, | ||
2639 | &ke->element->element_hash, | ||
2640 | OFFER_MESSAGE) | ||
2641 | ) | ||
2642 | { | ||
2643 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2644 | "Skipping already sent processed element offer!\n"); | ||
2645 | return GNUNET_YES; | ||
2646 | } | ||
1632 | 2647 | ||
2648 | /* Save send offer message for message control */ | ||
2649 | if (GNUNET_YES != | ||
2650 | update_message_control_flow ( | ||
2651 | op->message_control_flow, | ||
2652 | MSG_CFS_SENT, | ||
2653 | &ke->element->element_hash, | ||
2654 | OFFER_MESSAGE) | ||
2655 | ) | ||
2656 | { | ||
2657 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2658 | "Double offer message sent found!\n"); | ||
2659 | GNUNET_break (0); | ||
2660 | fail_union_operation (op); | ||
2661 | return GNUNET_NO; | ||
2662 | } | ||
2663 | ; | ||
2664 | |||
2665 | /* Mark element to be expected to received */ | ||
2666 | if (GNUNET_YES != | ||
2667 | update_message_control_flow ( | ||
2668 | op->message_control_flow, | ||
2669 | MSG_CFS_EXPECTED, | ||
2670 | &ke->element->element_hash, | ||
2671 | DEMAND_MESSAGE) | ||
2672 | ) | ||
2673 | { | ||
2674 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2675 | "Double demand received found!\n"); | ||
2676 | GNUNET_break (0); | ||
2677 | fail_union_operation (op); | ||
2678 | return GNUNET_NO; | ||
2679 | } | ||
2680 | ; | ||
2681 | #if MEASURE_PERFORMANCE | ||
2682 | perf_store.offer.sent += 1; | ||
2683 | perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); | ||
2684 | #endif | ||
1633 | ev = GNUNET_MQ_msg_header_extra (mh, | 2685 | ev = GNUNET_MQ_msg_header_extra (mh, |
1634 | sizeof(struct GNUNET_HashCode), | 2686 | sizeof(struct GNUNET_HashCode), |
1635 | GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); | 2687 | GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); |
1636 | |||
1637 | GNUNET_assert (NULL != ev); | 2688 | GNUNET_assert (NULL != ev); |
1638 | *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; | 2689 | *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; |
1639 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2690 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1651,7 +2702,7 @@ send_offers_iterator (void *cls, | |||
1651 | * @param op union operation | 2702 | * @param op union operation |
1652 | * @param ibf_key IBF key of interest | 2703 | * @param ibf_key IBF key of interest |
1653 | */ | 2704 | */ |
1654 | static void | 2705 | void |
1655 | send_offers_for_key (struct Operation *op, | 2706 | send_offers_for_key (struct Operation *op, |
1656 | struct IBF_Key ibf_key) | 2707 | struct IBF_Key ibf_key) |
1657 | { | 2708 | { |
@@ -1694,6 +2745,7 @@ decode_and_send (struct Operation *op) | |||
1694 | /* allocation failed */ | 2745 | /* allocation failed */ |
1695 | return GNUNET_SYSERR; | 2746 | return GNUNET_SYSERR; |
1696 | } | 2747 | } |
2748 | |||
1697 | diff_ibf = ibf_dup (op->local_ibf); | 2749 | diff_ibf = ibf_dup (op->local_ibf); |
1698 | ibf_subtract (diff_ibf, | 2750 | ibf_subtract (diff_ibf, |
1699 | op->remote_ibf); | 2751 | op->remote_ibf); |
@@ -1706,7 +2758,7 @@ decode_and_send (struct Operation *op) | |||
1706 | diff_ibf->size); | 2758 | diff_ibf->size); |
1707 | 2759 | ||
1708 | num_decoded = 0; | 2760 | num_decoded = 0; |
1709 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ | 2761 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ |
1710 | 2762 | ||
1711 | while (1) | 2763 | while (1) |
1712 | { | 2764 | { |
@@ -1738,23 +2790,36 @@ decode_and_send (struct Operation *op) | |||
1738 | if ((GNUNET_SYSERR == res) || | 2790 | if ((GNUNET_SYSERR == res) || |
1739 | (GNUNET_YES == cycle_detected)) | 2791 | (GNUNET_YES == cycle_detected)) |
1740 | { | 2792 | { |
1741 | int next_order; | 2793 | uint32_t next_size; |
1742 | next_order = 0; | 2794 | /** Enforce odd ibf size **/ |
1743 | while (1 << next_order < diff_ibf->size) | 2795 | |
1744 | next_order++; | 2796 | next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded, |
1745 | next_order++; | 2797 | diff_ibf->size); |
1746 | if (next_order <= MAX_IBF_ORDER) | 2798 | /** Make ibf estimation size odd reasoning can be found in BSc Thesis of |
2799 | * Elias Summermatter (2021) in section 3.11 **/ | ||
2800 | uint32_t ibf_min_size = IBF_MIN_SIZE | 1; | ||
2801 | |||
2802 | if (next_size<ibf_min_size) | ||
2803 | next_size = ibf_min_size; | ||
2804 | |||
2805 | |||
2806 | if (next_size <= MAX_IBF_SIZE) | ||
1747 | { | 2807 | { |
1748 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2808 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1749 | "decoding failed, sending larger ibf (size %u)\n", | 2809 | "decoding failed, sending larger ibf (size %u)\n", |
1750 | 1 << next_order); | 2810 | next_size); |
1751 | GNUNET_STATISTICS_update (_GSS_statistics, | 2811 | GNUNET_STATISTICS_update (_GSS_statistics, |
1752 | "# of IBF retries", | 2812 | "# of IBF retries", |
1753 | 1, | 2813 | 1, |
1754 | GNUNET_NO); | 2814 | GNUNET_NO); |
1755 | op->salt_send++; | 2815 | #if MEASURE_PERFORMANCE |
2816 | perf_store.active_passive_switches += 1; | ||
2817 | #endif | ||
2818 | |||
2819 | op->salt_send = op->salt_receive++; | ||
2820 | |||
1756 | if (GNUNET_OK != | 2821 | if (GNUNET_OK != |
1757 | send_ibf (op, next_order)) | 2822 | send_ibf (op, next_size)) |
1758 | { | 2823 | { |
1759 | /* Internal error, best we can do is shut the connection */ | 2824 | /* Internal error, best we can do is shut the connection */ |
1760 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2825 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -1786,7 +2851,9 @@ decode_and_send (struct Operation *op) | |||
1786 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2851 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1787 | "transmitted all values, sending DONE\n"); | 2852 | "transmitted all values, sending DONE\n"); |
1788 | 2853 | ||
1789 | perf_rtt.done.sent += 1; | 2854 | #if MEASURE_PERFORMANCE |
2855 | perf_store.done.sent += 1; | ||
2856 | #endif | ||
1790 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); | 2857 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); |
1791 | GNUNET_MQ_send (op->mq, ev); | 2858 | GNUNET_MQ_send (op->mq, ev); |
1792 | /* We now wait until we get a DONE message back | 2859 | /* We now wait until we get a DONE message back |
@@ -1797,7 +2864,6 @@ decode_and_send (struct Operation *op) | |||
1797 | if (1 == side) | 2864 | if (1 == side) |
1798 | { | 2865 | { |
1799 | struct IBF_Key unsalted_key; | 2866 | struct IBF_Key unsalted_key; |
1800 | |||
1801 | unsalt_key (&key, | 2867 | unsalt_key (&key, |
1802 | op->salt_receive, | 2868 | op->salt_receive, |
1803 | &unsalted_key); | 2869 | &unsalted_key); |
@@ -1809,8 +2875,29 @@ decode_and_send (struct Operation *op) | |||
1809 | struct GNUNET_MQ_Envelope *ev; | 2875 | struct GNUNET_MQ_Envelope *ev; |
1810 | struct InquiryMessage *msg; | 2876 | struct InquiryMessage *msg; |
1811 | 2877 | ||
1812 | perf_rtt.inquery.sent += 1; | 2878 | #if MEASURE_PERFORMANCE |
1813 | perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key); | 2879 | perf_store.inquery.sent += 1; |
2880 | perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key); | ||
2881 | #endif | ||
2882 | |||
2883 | /** Add sent inquiries to hashmap for flow control **/ | ||
2884 | struct GNUNET_HashContext *hashed_key_context = | ||
2885 | GNUNET_CRYPTO_hash_context_start (); | ||
2886 | struct GNUNET_HashCode *hashed_key = (struct | ||
2887 | GNUNET_HashCode*) GNUNET_malloc ( | ||
2888 | sizeof(struct GNUNET_HashCode)); | ||
2889 | enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT; | ||
2890 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
2891 | &key, | ||
2892 | sizeof(struct IBF_Key)); | ||
2893 | GNUNET_CRYPTO_hash_context_finish (hashed_key_context, | ||
2894 | hashed_key); | ||
2895 | GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent, | ||
2896 | hashed_key, | ||
2897 | &mcfs, | ||
2898 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE | ||
2899 | ); | ||
2900 | |||
1814 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth | 2901 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth |
1815 | * the effort additional complexity. */ | 2902 | * the effort additional complexity. */ |
1816 | ev = GNUNET_MQ_msg_extra (msg, | 2903 | ev = GNUNET_MQ_msg_extra (msg, |
@@ -1836,6 +2923,100 @@ decode_and_send (struct Operation *op) | |||
1836 | 2923 | ||
1837 | 2924 | ||
1838 | /** | 2925 | /** |
2926 | * Check send full message received from other peer | ||
2927 | * @param cls | ||
2928 | * @param msg | ||
2929 | * @return | ||
2930 | */ | ||
2931 | |||
2932 | static int | ||
2933 | check_union_p2p_send_full (void *cls, | ||
2934 | const struct TransmitFullMessage *msg) | ||
2935 | { | ||
2936 | return GNUNET_OK; | ||
2937 | } | ||
2938 | |||
2939 | |||
2940 | /** | ||
2941 | * Handle send full message received from other peer | ||
2942 | * | ||
2943 | * @param cls | ||
2944 | * @param msg | ||
2945 | */ | ||
2946 | static void | ||
2947 | handle_union_p2p_send_full (void *cls, | ||
2948 | const struct TransmitFullMessage *msg) | ||
2949 | { | ||
2950 | struct Operation *op = cls; | ||
2951 | |||
2952 | /** | ||
2953 | * Check that the message is received only in supported phase | ||
2954 | */ | ||
2955 | uint8_t allowed_phases[] = {PHASE_EXPECT_IBF}; | ||
2956 | if (GNUNET_OK != | ||
2957 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
2958 | { | ||
2959 | GNUNET_break (0); | ||
2960 | fail_union_operation (op); | ||
2961 | return; | ||
2962 | } | ||
2963 | |||
2964 | /** write received values to operator**/ | ||
2965 | op->remote_element_count = ntohl (msg->remote_set_size); | ||
2966 | op->remote_set_diff = ntohl (msg->remote_set_difference); | ||
2967 | op->local_set_diff = ntohl (msg->local_set_difference); | ||
2968 | |||
2969 | /** Check byzantine limits **/ | ||
2970 | if (check_byzantine_bounds (op) != GNUNET_OK) | ||
2971 | { | ||
2972 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2973 | "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine " | ||
2974 | "criteria\n"); | ||
2975 | GNUNET_break_op (0); | ||
2976 | fail_union_operation (op); | ||
2977 | return; | ||
2978 | } | ||
2979 | |||
2980 | /** Calculate avg element size if not initial sync **/ | ||
2981 | op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( | ||
2982 | op->set->content->elements); | ||
2983 | uint64_t avg_element_size = 0; | ||
2984 | if (0 < op->local_element_count) | ||
2985 | { | ||
2986 | op->total_elements_size_local = 0; | ||
2987 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
2988 | & | ||
2989 | determinate_avg_element_size_iterator, | ||
2990 | op); | ||
2991 | avg_element_size = op->total_elements_size_local / op->local_element_count; | ||
2992 | } | ||
2993 | |||
2994 | /** Validate mode of operation **/ | ||
2995 | int mode_of_operation = estimate_best_mode_of_operation (avg_element_size, | ||
2996 | op-> | ||
2997 | remote_element_count, | ||
2998 | op-> | ||
2999 | local_element_count, | ||
3000 | op->local_set_diff, | ||
3001 | op->remote_set_diff, | ||
3002 | op-> | ||
3003 | rtt_bandwidth_tradeoff, | ||
3004 | op-> | ||
3005 | ibf_bucket_number_factor); | ||
3006 | if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation) | ||
3007 | { | ||
3008 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3009 | "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been" | ||
3010 | " : %d\n", mode_of_operation); | ||
3011 | GNUNET_break_op (0); | ||
3012 | fail_union_operation (op); | ||
3013 | return; | ||
3014 | } | ||
3015 | op->phase = PHASE_FULL_RECEIVING; | ||
3016 | } | ||
3017 | |||
3018 | |||
3019 | /** | ||
1839 | * Check an IBF message from a remote peer. | 3020 | * Check an IBF message from a remote peer. |
1840 | * | 3021 | * |
1841 | * Reassemble the IBF from multiple pieces, and | 3022 | * Reassemble the IBF from multiple pieces, and |
@@ -1872,7 +3053,8 @@ check_union_p2p_ibf (void *cls, | |||
1872 | GNUNET_break_op (0); | 3053 | GNUNET_break_op (0); |
1873 | return GNUNET_SYSERR; | 3054 | return GNUNET_SYSERR; |
1874 | } | 3055 | } |
1875 | if (1 << msg->order != op->remote_ibf->size) | 3056 | |
3057 | if (msg->ibf_size != op->remote_ibf->size) | ||
1876 | { | 3058 | { |
1877 | GNUNET_break_op (0); | 3059 | GNUNET_break_op (0); |
1878 | return GNUNET_SYSERR; | 3060 | return GNUNET_SYSERR; |
@@ -1909,9 +3091,26 @@ handle_union_p2p_ibf (void *cls, | |||
1909 | { | 3091 | { |
1910 | struct Operation *op = cls; | 3092 | struct Operation *op = cls; |
1911 | unsigned int buckets_in_message; | 3093 | unsigned int buckets_in_message; |
3094 | /** | ||
3095 | * Check that the message is received only in supported phase | ||
3096 | */ | ||
3097 | uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST, | ||
3098 | PHASE_PASSIVE_DECODING}; | ||
3099 | if (GNUNET_OK != | ||
3100 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3101 | { | ||
3102 | GNUNET_break (0); | ||
3103 | fail_union_operation (op); | ||
3104 | return; | ||
3105 | } | ||
3106 | op->differential_sync_iterations++; | ||
3107 | check_max_differential_rounds (op); | ||
3108 | op->active_passive_switch_required = false; | ||
1912 | 3109 | ||
1913 | perf_rtt.ibf.received += 1; | 3110 | #if MEASURE_PERFORMANCE |
1914 | perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); | 3111 | perf_store.ibf.received += 1; |
3112 | perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); | ||
3113 | #endif | ||
1915 | 3114 | ||
1916 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) | 3115 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) |
1917 | / IBF_BUCKET_SIZE; | 3116 | / IBF_BUCKET_SIZE; |
@@ -1922,8 +3121,10 @@ handle_union_p2p_ibf (void *cls, | |||
1922 | GNUNET_assert (NULL == op->remote_ibf); | 3121 | GNUNET_assert (NULL == op->remote_ibf); |
1923 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3122 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1924 | "Creating new ibf of size %u\n", | 3123 | "Creating new ibf of size %u\n", |
1925 | 1 << msg->order); | 3124 | ntohl (msg->ibf_size)); |
1926 | op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); | 3125 | // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); |
3126 | op->remote_ibf = ibf_create (msg->ibf_size, | ||
3127 | ((uint8_t) op->ibf_number_buckets_per_element)); | ||
1927 | op->salt_receive = ntohl (msg->salt); | 3128 | op->salt_receive = ntohl (msg->salt); |
1928 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3129 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1929 | "Receiving new IBF with salt %u\n", | 3130 | "Receiving new IBF with salt %u\n", |
@@ -1954,7 +3155,7 @@ handle_union_p2p_ibf (void *cls, | |||
1954 | ibf_read_slice (&msg[1], | 3155 | ibf_read_slice (&msg[1], |
1955 | op->ibf_buckets_received, | 3156 | op->ibf_buckets_received, |
1956 | buckets_in_message, | 3157 | buckets_in_message, |
1957 | op->remote_ibf); | 3158 | op->remote_ibf, msg->ibf_counter_bit_length); |
1958 | op->ibf_buckets_received += buckets_in_message; | 3159 | op->ibf_buckets_received += buckets_in_message; |
1959 | 3160 | ||
1960 | if (op->ibf_buckets_received == op->remote_ibf->size) | 3161 | if (op->ibf_buckets_received == op->remote_ibf->size) |
@@ -2030,18 +3231,24 @@ maybe_finish (struct Operation *op) | |||
2030 | 3231 | ||
2031 | num_demanded = GNUNET_CONTAINER_multihashmap_size ( | 3232 | num_demanded = GNUNET_CONTAINER_multihashmap_size ( |
2032 | op->demanded_hashes); | 3233 | op->demanded_hashes); |
2033 | 3234 | int send_done = GNUNET_CONTAINER_multihashmap_iterate ( | |
3235 | op->message_control_flow, | ||
3236 | & | ||
3237 | determinate_done_message_iterator, | ||
3238 | op); | ||
2034 | if (PHASE_FINISH_WAITING == op->phase) | 3239 | if (PHASE_FINISH_WAITING == op->phase) |
2035 | { | 3240 | { |
2036 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3241 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2037 | "In PHASE_FINISH_WAITING, pending %u demands\n", | 3242 | "In PHASE_FINISH_WAITING, pending %u demands -> %d\n", |
2038 | num_demanded); | 3243 | num_demanded, op->peer_site); |
2039 | if (0 == num_demanded) | 3244 | if (-1 != send_done) |
2040 | { | 3245 | { |
2041 | struct GNUNET_MQ_Envelope *ev; | 3246 | struct GNUNET_MQ_Envelope *ev; |
2042 | 3247 | ||
2043 | op->phase = PHASE_FINISHED; | 3248 | op->phase = PHASE_FINISHED; |
2044 | perf_rtt.done.sent += 1; | 3249 | #if MEASURE_PERFORMANCE |
3250 | perf_store.done.sent += 1; | ||
3251 | #endif | ||
2045 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); | 3252 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); |
2046 | GNUNET_MQ_send (op->mq, | 3253 | GNUNET_MQ_send (op->mq, |
2047 | ev); | 3254 | ev); |
@@ -2052,9 +3259,9 @@ maybe_finish (struct Operation *op) | |||
2052 | if (PHASE_FINISH_CLOSING == op->phase) | 3259 | if (PHASE_FINISH_CLOSING == op->phase) |
2053 | { | 3260 | { |
2054 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3261 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2055 | "In PHASE_FINISH_CLOSING, pending %u demands\n", | 3262 | "In PHASE_FINISH_CLOSING, pending %u demands %d\n", |
2056 | num_demanded); | 3263 | num_demanded, op->peer_site); |
2057 | if (0 == num_demanded) | 3264 | if (-1 != send_done) |
2058 | { | 3265 | { |
2059 | op->phase = PHASE_FINISHED; | 3266 | op->phase = PHASE_FINISHED; |
2060 | send_client_done (op); | 3267 | send_client_done (op); |
@@ -2102,11 +3309,25 @@ handle_union_p2p_elements (void *cls, | |||
2102 | struct KeyEntry *ke; | 3309 | struct KeyEntry *ke; |
2103 | uint16_t element_size; | 3310 | uint16_t element_size; |
2104 | 3311 | ||
3312 | /** | ||
3313 | * Check that the message is received only in supported phase | ||
3314 | */ | ||
3315 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING, | ||
3316 | PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING}; | ||
3317 | if (GNUNET_OK != | ||
3318 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3319 | { | ||
3320 | GNUNET_break (0); | ||
3321 | fail_union_operation (op); | ||
3322 | return; | ||
3323 | } | ||
2105 | 3324 | ||
2106 | element_size = ntohs (emsg->header.size) - sizeof(struct | 3325 | element_size = ntohs (emsg->header.size) - sizeof(struct |
2107 | GNUNET_SETU_ElementMessage); | 3326 | GNUNET_SETU_ElementMessage); |
2108 | perf_rtt.element.received += 1; | 3327 | #if MEASURE_PERFORMANCE |
2109 | perf_rtt.element.received_var_bytes += element_size; | 3328 | perf_store.element.received += 1; |
3329 | perf_store.element.received_var_bytes += element_size; | ||
3330 | #endif | ||
2110 | 3331 | ||
2111 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); | 3332 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); |
2112 | GNUNET_memcpy (&ee[1], | 3333 | GNUNET_memcpy (&ee[1], |
@@ -2129,6 +3350,21 @@ handle_union_p2p_elements (void *cls, | |||
2129 | return; | 3350 | return; |
2130 | } | 3351 | } |
2131 | 3352 | ||
3353 | if (GNUNET_OK != | ||
3354 | update_message_control_flow ( | ||
3355 | op->message_control_flow, | ||
3356 | MSG_CFS_RECEIVED, | ||
3357 | &ee->element_hash, | ||
3358 | ELEMENT_MESSAGE) | ||
3359 | ) | ||
3360 | { | ||
3361 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3362 | "An element has been received more than once!\n"); | ||
3363 | GNUNET_break (0); | ||
3364 | fail_union_operation (op); | ||
3365 | return; | ||
3366 | } | ||
3367 | |||
2132 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3368 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2133 | "Got element (size %u, hash %s) from peer\n", | 3369 | "Got element (size %u, hash %s) from peer\n", |
2134 | (unsigned int) element_size, | 3370 | (unsigned int) element_size, |
@@ -2217,33 +3453,25 @@ handle_union_p2p_full_element (void *cls, | |||
2217 | struct KeyEntry *ke; | 3453 | struct KeyEntry *ke; |
2218 | uint16_t element_size; | 3454 | uint16_t element_size; |
2219 | 3455 | ||
2220 | 3456 | /** | |
2221 | 3457 | * Check that the message is received only in supported phase | |
2222 | if(PHASE_EXPECT_IBF == op->phase) { | 3458 | */ |
2223 | op->phase = PHASE_FULL_RECEIVING; | 3459 | uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING}; |
2224 | } | 3460 | if (GNUNET_OK != |
2225 | 3461 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | |
2226 | |||
2227 | |||
2228 | /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */ | ||
2229 | if ((PHASE_FULL_RECEIVING != op->phase) && | ||
2230 | (PHASE_FULL_SENDING != op->phase)) | ||
2231 | { | 3462 | { |
2232 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 3463 | GNUNET_break (0); |
2233 | "Handle full element phase is %u\n", | 3464 | fail_union_operation (op); |
2234 | (unsigned) op->phase); | 3465 | return; |
2235 | GNUNET_break_op (0); | ||
2236 | fail_union_operation (op); | ||
2237 | return; | ||
2238 | } | 3466 | } |
2239 | 3467 | ||
2240 | |||
2241 | |||
2242 | element_size = ntohs (emsg->header.size) | 3468 | element_size = ntohs (emsg->header.size) |
2243 | - sizeof(struct GNUNET_SETU_ElementMessage); | 3469 | - sizeof(struct GNUNET_SETU_ElementMessage); |
2244 | 3470 | ||
2245 | perf_rtt.element_full.received += 1; | 3471 | #if MEASURE_PERFORMANCE |
2246 | perf_rtt.element_full.received_var_bytes += element_size; | 3472 | perf_store.element_full.received += 1; |
3473 | perf_store.element_full.received_var_bytes += element_size; | ||
3474 | #endif | ||
2247 | 3475 | ||
2248 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); | 3476 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); |
2249 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | 3477 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); |
@@ -2268,17 +3496,15 @@ handle_union_p2p_full_element (void *cls, | |||
2268 | GNUNET_NO); | 3496 | GNUNET_NO); |
2269 | 3497 | ||
2270 | op->received_total++; | 3498 | op->received_total++; |
2271 | |||
2272 | ke = op_get_element (op, | 3499 | ke = op_get_element (op, |
2273 | &ee->element_hash); | 3500 | &ee->element_hash); |
2274 | if (NULL != ke) | 3501 | if (NULL != ke) |
2275 | { | 3502 | { |
2276 | /* Got repeated element. Should not happen since | ||
2277 | * we track demands. */ | ||
2278 | GNUNET_STATISTICS_update (_GSS_statistics, | 3503 | GNUNET_STATISTICS_update (_GSS_statistics, |
2279 | "# repeated elements", | 3504 | "# repeated elements", |
2280 | 1, | 3505 | 1, |
2281 | GNUNET_NO); | 3506 | GNUNET_NO); |
3507 | full_sync_plausibility_check (op); | ||
2282 | ke->received = GNUNET_YES; | 3508 | ke->received = GNUNET_YES; |
2283 | GNUNET_free (ee); | 3509 | GNUNET_free (ee); |
2284 | } | 3510 | } |
@@ -2294,15 +3520,15 @@ handle_union_p2p_full_element (void *cls, | |||
2294 | GNUNET_SETU_STATUS_ADD_LOCAL); | 3520 | GNUNET_SETU_STATUS_ADD_LOCAL); |
2295 | } | 3521 | } |
2296 | 3522 | ||
3523 | |||
2297 | if ((GNUNET_YES == op->byzantine) && | 3524 | if ((GNUNET_YES == op->byzantine) && |
2298 | (op->received_total > 384 + op->received_fresh * 4) && | 3525 | (op->received_total > op->remote_element_count) ) |
2299 | (op->received_fresh < op->received_total / 6)) | ||
2300 | { | 3526 | { |
2301 | /* The other peer gave us lots of old elements, there's something wrong. */ | 3527 | /* The other peer gave us lots of old elements, there's something wrong. */ |
2302 | LOG (GNUNET_ERROR_TYPE_ERROR, | 3528 | LOG (GNUNET_ERROR_TYPE_ERROR, |
2303 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", | 3529 | "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n", |
2304 | (unsigned long long) op->received_fresh, | 3530 | (unsigned long long) op->received_total, |
2305 | (unsigned long long) op->received_total); | 3531 | (unsigned long long) op->remote_element_count); |
2306 | GNUNET_break_op (0); | 3532 | GNUNET_break_op (0); |
2307 | fail_union_operation (op); | 3533 | fail_union_operation (op); |
2308 | return; | 3534 | return; |
@@ -2356,18 +3582,50 @@ handle_union_p2p_inquiry (void *cls, | |||
2356 | const struct IBF_Key *ibf_key; | 3582 | const struct IBF_Key *ibf_key; |
2357 | unsigned int num_keys; | 3583 | unsigned int num_keys; |
2358 | 3584 | ||
2359 | perf_rtt.inquery.received += 1; | 3585 | /** |
2360 | perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage)); | 3586 | * Check that the message is received only in supported phase |
3587 | */ | ||
3588 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; | ||
3589 | if (GNUNET_OK != | ||
3590 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3591 | { | ||
3592 | GNUNET_break (0); | ||
3593 | fail_union_operation (op); | ||
3594 | return; | ||
3595 | } | ||
3596 | |||
3597 | #if MEASURE_PERFORMANCE | ||
3598 | perf_store.inquery.received += 1; | ||
3599 | perf_store.inquery.received_var_bytes += (ntohs (msg->header.size) | ||
3600 | - sizeof(struct InquiryMessage)); | ||
3601 | #endif | ||
2361 | 3602 | ||
2362 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3603 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2363 | "Received union inquiry\n"); | 3604 | "Received union inquiry\n"); |
2364 | num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) | 3605 | num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) |
2365 | / sizeof(struct IBF_Key); | 3606 | / sizeof(struct IBF_Key); |
2366 | ibf_key = (const struct IBF_Key *) &msg[1]; | 3607 | ibf_key = (const struct IBF_Key *) &msg[1]; |
3608 | |||
3609 | /** Add received inquiries to hashmap for flow control **/ | ||
3610 | struct GNUNET_HashContext *hashed_key_context = | ||
3611 | GNUNET_CRYPTO_hash_context_start (); | ||
3612 | struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc ( | ||
3613 | sizeof(struct GNUNET_HashCode));; | ||
3614 | enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED; | ||
3615 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
3616 | &ibf_key, | ||
3617 | sizeof(struct IBF_Key)); | ||
3618 | GNUNET_CRYPTO_hash_context_finish (hashed_key_context, | ||
3619 | hashed_key); | ||
3620 | GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent, | ||
3621 | hashed_key, | ||
3622 | &mcfs, | ||
3623 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE | ||
3624 | ); | ||
3625 | |||
2367 | while (0 != num_keys--) | 3626 | while (0 != num_keys--) |
2368 | { | 3627 | { |
2369 | struct IBF_Key unsalted_key; | 3628 | struct IBF_Key unsalted_key; |
2370 | |||
2371 | unsalt_key (ibf_key, | 3629 | unsalt_key (ibf_key, |
2372 | ntohl (msg->salt), | 3630 | ntohl (msg->salt), |
2373 | &unsalted_key); | 3631 | &unsalted_key); |
@@ -2402,7 +3660,9 @@ send_missing_full_elements_iter (void *cls, | |||
2402 | 3660 | ||
2403 | if (GNUNET_YES == ke->received) | 3661 | if (GNUNET_YES == ke->received) |
2404 | return GNUNET_YES; | 3662 | return GNUNET_YES; |
2405 | perf_rtt.element_full.received += 1; | 3663 | #if MEASURE_PERFORMANCE |
3664 | perf_store.element_full.received += 1; | ||
3665 | #endif | ||
2406 | ev = GNUNET_MQ_msg_extra (emsg, | 3666 | ev = GNUNET_MQ_msg_extra (emsg, |
2407 | ee->element.size, | 3667 | ee->element.size, |
2408 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); | 3668 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); |
@@ -2422,18 +3682,84 @@ send_missing_full_elements_iter (void *cls, | |||
2422 | * @param cls closure, a set union operation | 3682 | * @param cls closure, a set union operation |
2423 | * @param mh the demand message | 3683 | * @param mh the demand message |
2424 | */ | 3684 | */ |
3685 | static int | ||
3686 | check_union_p2p_request_full (void *cls, | ||
3687 | const struct TransmitFullMessage *mh) | ||
3688 | { | ||
3689 | return GNUNET_OK; | ||
3690 | } | ||
3691 | |||
3692 | |||
2425 | static void | 3693 | static void |
2426 | handle_union_p2p_request_full (void *cls, | 3694 | handle_union_p2p_request_full (void *cls, |
2427 | const struct GNUNET_MessageHeader *mh) | 3695 | const struct TransmitFullMessage *msg) |
2428 | { | 3696 | { |
2429 | struct Operation *op = cls; | 3697 | struct Operation *op = cls; |
2430 | 3698 | ||
2431 | perf_rtt.request_full.received += 1; | 3699 | /** |
3700 | * Check that the message is received only in supported phase | ||
3701 | */ | ||
3702 | uint8_t allowed_phases[] = {PHASE_EXPECT_IBF}; | ||
3703 | if (GNUNET_OK != | ||
3704 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3705 | { | ||
3706 | GNUNET_break (0); | ||
3707 | fail_union_operation (op); | ||
3708 | return; | ||
3709 | } | ||
2432 | 3710 | ||
2433 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3711 | op->remote_element_count = ntohl (msg->remote_set_size); |
3712 | op->remote_set_diff = ntohl (msg->remote_set_difference); | ||
3713 | op->local_set_diff = ntohl (msg->local_set_difference); | ||
3714 | |||
3715 | |||
3716 | if (check_byzantine_bounds (op) != GNUNET_OK) | ||
3717 | { | ||
3718 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3719 | "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine " | ||
3720 | "criteria\n"); | ||
3721 | GNUNET_break_op (0); | ||
3722 | fail_union_operation (op); | ||
3723 | return; | ||
3724 | } | ||
3725 | |||
3726 | #if MEASURE_PERFORMANCE | ||
3727 | perf_store.request_full.received += 1; | ||
3728 | #endif | ||
3729 | |||
3730 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2434 | "Received request for full set transmission\n"); | 3731 | "Received request for full set transmission\n"); |
2435 | if (PHASE_EXPECT_IBF != op->phase) | 3732 | |
3733 | /** Calculate avg element size if not initial sync **/ | ||
3734 | op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( | ||
3735 | op->set->content->elements); | ||
3736 | uint64_t avg_element_size = 0; | ||
3737 | if (0 < op->local_element_count) | ||
3738 | { | ||
3739 | op->total_elements_size_local = 0; | ||
3740 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
3741 | & | ||
3742 | determinate_avg_element_size_iterator, | ||
3743 | op); | ||
3744 | avg_element_size = op->total_elements_size_local / op->local_element_count; | ||
3745 | } | ||
3746 | |||
3747 | int mode_of_operation = estimate_best_mode_of_operation (avg_element_size, | ||
3748 | op-> | ||
3749 | remote_element_count, | ||
3750 | op-> | ||
3751 | local_element_count, | ||
3752 | op->local_set_diff, | ||
3753 | op->remote_set_diff, | ||
3754 | op-> | ||
3755 | rtt_bandwidth_tradeoff, | ||
3756 | op-> | ||
3757 | ibf_bucket_number_factor); | ||
3758 | if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation) | ||
2436 | { | 3759 | { |
3760 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3761 | "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been" | ||
3762 | " : %d\n", mode_of_operation); | ||
2437 | GNUNET_break_op (0); | 3763 | GNUNET_break_op (0); |
2438 | fail_union_operation (op); | 3764 | fail_union_operation (op); |
2439 | return; | 3765 | return; |
@@ -2458,7 +3784,21 @@ handle_union_p2p_full_done (void *cls, | |||
2458 | { | 3784 | { |
2459 | struct Operation *op = cls; | 3785 | struct Operation *op = cls; |
2460 | 3786 | ||
2461 | perf_rtt.full_done.received += 1; | 3787 | /** |
3788 | * Check that the message is received only in supported phase | ||
3789 | */ | ||
3790 | uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING}; | ||
3791 | if (GNUNET_OK != | ||
3792 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3793 | { | ||
3794 | GNUNET_break (0); | ||
3795 | fail_union_operation (op); | ||
3796 | return; | ||
3797 | } | ||
3798 | |||
3799 | #if MEASURE_PERFORMANCE | ||
3800 | perf_store.full_done.received += 1; | ||
3801 | #endif | ||
2462 | 3802 | ||
2463 | switch (op->phase) | 3803 | switch (op->phase) |
2464 | { | 3804 | { |
@@ -2466,6 +3806,19 @@ handle_union_p2p_full_done (void *cls, | |||
2466 | { | 3806 | { |
2467 | struct GNUNET_MQ_Envelope *ev; | 3807 | struct GNUNET_MQ_Envelope *ev; |
2468 | 3808 | ||
3809 | if ((GNUNET_YES == op->byzantine) && | ||
3810 | (op->received_total != op->remote_element_count) ) | ||
3811 | { | ||
3812 | /* The other peer gave not enough elements before sending full done, there's something wrong. */ | ||
3813 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3814 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", | ||
3815 | (unsigned long long) op->received_total, | ||
3816 | (unsigned long long) op->remote_element_count); | ||
3817 | GNUNET_break_op (0); | ||
3818 | fail_union_operation (op); | ||
3819 | return; | ||
3820 | } | ||
3821 | |||
2469 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3822 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2470 | "got FULL DONE, sending elements that other peer is missing\n"); | 3823 | "got FULL DONE, sending elements that other peer is missing\n"); |
2471 | 3824 | ||
@@ -2473,7 +3826,9 @@ handle_union_p2p_full_done (void *cls, | |||
2473 | GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, | 3826 | GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, |
2474 | &send_missing_full_elements_iter, | 3827 | &send_missing_full_elements_iter, |
2475 | op); | 3828 | op); |
2476 | perf_rtt.full_done.sent += 1; | 3829 | #if MEASURE_PERFORMANCE |
3830 | perf_store.full_done.sent += 1; | ||
3831 | #endif | ||
2477 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); | 3832 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); |
2478 | GNUNET_MQ_send (op->mq, | 3833 | GNUNET_MQ_send (op->mq, |
2479 | ev); | 3834 | ev); |
@@ -2552,8 +3907,23 @@ handle_union_p2p_demand (void *cls, | |||
2552 | unsigned int num_hashes; | 3907 | unsigned int num_hashes; |
2553 | struct GNUNET_MQ_Envelope *ev; | 3908 | struct GNUNET_MQ_Envelope *ev; |
2554 | 3909 | ||
2555 | perf_rtt.demand.received += 1; | 3910 | /** |
2556 | perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); | 3911 | * Check that the message is received only in supported phase |
3912 | */ | ||
3913 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING, | ||
3914 | PHASE_FINISH_WAITING}; | ||
3915 | if (GNUNET_OK != | ||
3916 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3917 | { | ||
3918 | GNUNET_break (0); | ||
3919 | fail_union_operation (op); | ||
3920 | return; | ||
3921 | } | ||
3922 | #if MEASURE_PERFORMANCE | ||
3923 | perf_store.demand.received += 1; | ||
3924 | perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct | ||
3925 | GNUNET_MessageHeader)); | ||
3926 | #endif | ||
2557 | 3927 | ||
2558 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) | 3928 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2559 | / sizeof(struct GNUNET_HashCode); | 3929 | / sizeof(struct GNUNET_HashCode); |
@@ -2570,6 +3940,39 @@ handle_union_p2p_demand (void *cls, | |||
2570 | fail_union_operation (op); | 3940 | fail_union_operation (op); |
2571 | return; | 3941 | return; |
2572 | } | 3942 | } |
3943 | |||
3944 | /* Save send demand message for message control */ | ||
3945 | if (GNUNET_YES != | ||
3946 | update_message_control_flow ( | ||
3947 | op->message_control_flow, | ||
3948 | MSG_CFS_RECEIVED, | ||
3949 | &ee->element_hash, | ||
3950 | DEMAND_MESSAGE) | ||
3951 | ) | ||
3952 | { | ||
3953 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3954 | "Double demand message received found!\n"); | ||
3955 | GNUNET_break (0); | ||
3956 | fail_union_operation (op); | ||
3957 | return; | ||
3958 | } | ||
3959 | ; | ||
3960 | |||
3961 | /* Mark element to be expected to received */ | ||
3962 | if (GNUNET_YES != | ||
3963 | update_message_control_flow ( | ||
3964 | op->message_control_flow, | ||
3965 | MSG_CFS_SENT, | ||
3966 | &ee->element_hash, | ||
3967 | ELEMENT_MESSAGE) | ||
3968 | ) | ||
3969 | { | ||
3970 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3971 | "Double element message sent found!\n"); | ||
3972 | GNUNET_break (0); | ||
3973 | fail_union_operation (op); | ||
3974 | return; | ||
3975 | } | ||
2573 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | 3976 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) |
2574 | { | 3977 | { |
2575 | /* Probably confused lazily copied sets. */ | 3978 | /* Probably confused lazily copied sets. */ |
@@ -2577,8 +3980,10 @@ handle_union_p2p_demand (void *cls, | |||
2577 | fail_union_operation (op); | 3980 | fail_union_operation (op); |
2578 | return; | 3981 | return; |
2579 | } | 3982 | } |
2580 | perf_rtt.element.sent += 1; | 3983 | #if MEASURE_PERFORMANCE |
2581 | perf_rtt.element.sent_var_bytes += ee->element.size; | 3984 | perf_store.element.sent += 1; |
3985 | perf_store.element.sent_var_bytes += ee->element.size; | ||
3986 | #endif | ||
2582 | ev = GNUNET_MQ_msg_extra (emsg, | 3987 | ev = GNUNET_MQ_msg_extra (emsg, |
2583 | ee->element.size, | 3988 | ee->element.size, |
2584 | GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); | 3989 | GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); |
@@ -2600,9 +4005,10 @@ handle_union_p2p_demand (void *cls, | |||
2600 | if (op->symmetric) | 4005 | if (op->symmetric) |
2601 | send_client_element (op, | 4006 | send_client_element (op, |
2602 | &ee->element, | 4007 | &ee->element, |
2603 | GNUNET_SET_STATUS_ADD_REMOTE); | 4008 | GNUNET_SETU_STATUS_ADD_REMOTE); |
2604 | } | 4009 | } |
2605 | GNUNET_CADET_receive_done (op->channel); | 4010 | GNUNET_CADET_receive_done (op->channel); |
4011 | maybe_finish (op); | ||
2606 | } | 4012 | } |
2607 | 4013 | ||
2608 | 4014 | ||
@@ -2653,9 +4059,23 @@ handle_union_p2p_offer (void *cls, | |||
2653 | struct Operation *op = cls; | 4059 | struct Operation *op = cls; |
2654 | const struct GNUNET_HashCode *hash; | 4060 | const struct GNUNET_HashCode *hash; |
2655 | unsigned int num_hashes; | 4061 | unsigned int num_hashes; |
4062 | /** | ||
4063 | * Check that the message is received only in supported phase | ||
4064 | */ | ||
4065 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; | ||
4066 | if (GNUNET_OK != | ||
4067 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
4068 | { | ||
4069 | GNUNET_break (0); | ||
4070 | fail_union_operation (op); | ||
4071 | return; | ||
4072 | } | ||
2656 | 4073 | ||
2657 | perf_rtt.offer.received += 1; | 4074 | #if MEASURE_PERFORMANCE |
2658 | perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); | 4075 | perf_store.offer.received += 1; |
4076 | perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct | ||
4077 | GNUNET_MessageHeader)); | ||
4078 | #endif | ||
2659 | 4079 | ||
2660 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) | 4080 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2661 | / sizeof(struct GNUNET_HashCode); | 4081 | / sizeof(struct GNUNET_HashCode); |
@@ -2693,8 +4113,53 @@ handle_union_p2p_offer (void *cls, | |||
2693 | "[OP %p] Requesting element (hash %s)\n", | 4113 | "[OP %p] Requesting element (hash %s)\n", |
2694 | op, GNUNET_h2s (hash)); | 4114 | op, GNUNET_h2s (hash)); |
2695 | 4115 | ||
2696 | perf_rtt.demand.sent += 1; | 4116 | #if MEASURE_PERFORMANCE |
2697 | perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); | 4117 | perf_store.demand.sent += 1; |
4118 | perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); | ||
4119 | #endif | ||
4120 | /* Save send demand message for message control */ | ||
4121 | if (GNUNET_YES != | ||
4122 | update_message_control_flow ( | ||
4123 | op->message_control_flow, | ||
4124 | MSG_CFS_SENT, | ||
4125 | hash, | ||
4126 | DEMAND_MESSAGE)) | ||
4127 | { | ||
4128 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4129 | "Double demand message sent found!\n"); | ||
4130 | GNUNET_break (0); | ||
4131 | fail_union_operation (op); | ||
4132 | return; | ||
4133 | } | ||
4134 | |||
4135 | /* Mark offer as received received */ | ||
4136 | if (GNUNET_YES != | ||
4137 | update_message_control_flow ( | ||
4138 | op->message_control_flow, | ||
4139 | MSG_CFS_RECEIVED, | ||
4140 | hash, | ||
4141 | OFFER_MESSAGE)) | ||
4142 | { | ||
4143 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4144 | "Double offer message received found!\n"); | ||
4145 | GNUNET_break (0); | ||
4146 | fail_union_operation (op); | ||
4147 | return; | ||
4148 | } | ||
4149 | /* Mark element to be expected to received */ | ||
4150 | if (GNUNET_YES != | ||
4151 | update_message_control_flow ( | ||
4152 | op->message_control_flow, | ||
4153 | MSG_CFS_EXPECTED, | ||
4154 | hash, | ||
4155 | ELEMENT_MESSAGE)) | ||
4156 | { | ||
4157 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4158 | "Element already expected!\n"); | ||
4159 | GNUNET_break (0); | ||
4160 | fail_union_operation (op); | ||
4161 | return; | ||
4162 | } | ||
2698 | ev = GNUNET_MQ_msg_header_extra (demands, | 4163 | ev = GNUNET_MQ_msg_header_extra (demands, |
2699 | sizeof(struct GNUNET_HashCode), | 4164 | sizeof(struct GNUNET_HashCode), |
2700 | GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); | 4165 | GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); |
@@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls, | |||
2719 | { | 4184 | { |
2720 | struct Operation *op = cls; | 4185 | struct Operation *op = cls; |
2721 | 4186 | ||
2722 | perf_rtt.done.received += 1; | 4187 | /** |
4188 | * Check that the message is received only in supported phase | ||
4189 | */ | ||
4190 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; | ||
4191 | if (GNUNET_OK != | ||
4192 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
4193 | { | ||
4194 | GNUNET_break (0); | ||
4195 | fail_union_operation (op); | ||
4196 | return; | ||
4197 | } | ||
4198 | |||
4199 | if (op->active_passive_switch_required) | ||
4200 | { | ||
4201 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4202 | "PROTOCOL VIOLATION: Received done but role change is necessary\n"); | ||
4203 | GNUNET_break (0); | ||
4204 | fail_union_operation (op); | ||
4205 | return; | ||
4206 | } | ||
4207 | |||
4208 | #if MEASURE_PERFORMANCE | ||
4209 | perf_store.done.received += 1; | ||
4210 | #endif | ||
2723 | switch (op->phase) | 4211 | switch (op->phase) |
2724 | { | 4212 | { |
2725 | case PHASE_PASSIVE_DECODING: | 4213 | case PHASE_PASSIVE_DECODING: |
@@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls, | |||
2728 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 4216 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2729 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); | 4217 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); |
2730 | /* The active peer is done sending offers | 4218 | /* The active peer is done sending offers |
2731 | * and inquiries. This means that all | 4219 | * and inquiries. This means that all |
2732 | * our responses to that (demands and offers) | 4220 | * our responses to that (demands and offers) |
2733 | * must be in flight (queued or in mesh). | 4221 | * must be in flight (queued or in mesh). |
2734 | * | 4222 | * |
2735 | * We should notify the active peer once | 4223 | * We should notify the active peer once |
2736 | * all our demands are satisfied, so that the active | 4224 | * all our demands are satisfied, so that the active |
2737 | * peer can quit if we gave it everything. | 4225 | * peer can quit if we gave it everything. |
2738 | */GNUNET_CADET_receive_done (op->channel); | 4226 | */GNUNET_CADET_receive_done (op->channel); |
2739 | maybe_finish (op); | 4227 | maybe_finish (op); |
2740 | return; | 4228 | return; |
2741 | case PHASE_ACTIVE_DECODING: | 4229 | case PHASE_ACTIVE_DECODING: |
2742 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 4230 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2743 | "got DONE (as active partner), waiting to finish\n"); | 4231 | "got DONE (as active partner), waiting to finish\n"); |
2744 | /* All demands of the other peer are satisfied, | 4232 | /* All demands of the other peer are satisfied, |
2745 | * and we processed all offers, thus we know | 4233 | * and we processed all offers, thus we know |
2746 | * exactly what our demands must be. | 4234 | * exactly what our demands must be. |
2747 | * | 4235 | * |
2748 | * We'll close the channel | 4236 | * We'll close the channel |
2749 | * to the other peer once our demands are met. | 4237 | * to the other peer once our demands are met. |
2750 | */op->phase = PHASE_FINISH_CLOSING; | 4238 | */op->phase = PHASE_FINISH_CLOSING; |
2751 | GNUNET_CADET_receive_done (op->channel); | 4239 | GNUNET_CADET_receive_done (op->channel); |
2752 | maybe_finish (op); | 4240 | maybe_finish (op); |
2753 | return; | 4241 | return; |
@@ -2769,7 +4257,9 @@ static void | |||
2769 | handle_union_p2p_over (void *cls, | 4257 | handle_union_p2p_over (void *cls, |
2770 | const struct GNUNET_MessageHeader *mh) | 4258 | const struct GNUNET_MessageHeader *mh) |
2771 | { | 4259 | { |
2772 | perf_rtt.over.received += 1; | 4260 | #if MEASURE_PERFORMANCE |
4261 | perf_store.over.received += 1; | ||
4262 | #endif | ||
2773 | send_client_done (cls); | 4263 | send_client_done (cls); |
2774 | } | 4264 | } |
2775 | 4265 | ||
@@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls, | |||
2943 | struct Listener *listener = op->listener; | 4433 | struct Listener *listener = op->listener; |
2944 | const struct GNUNET_MessageHeader *nested_context; | 4434 | const struct GNUNET_MessageHeader *nested_context; |
2945 | 4435 | ||
2946 | /* double operation request */ | 4436 | /* double operation request */ |
2947 | if (0 != op->suggest_id) | 4437 | if (0 != op->suggest_id) |
2948 | { | 4438 | { |
2949 | GNUNET_break_op (0); | 4439 | GNUNET_break_op (0); |
@@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls, | |||
3053 | } | 4543 | } |
3054 | set = GNUNET_new (struct Set); | 4544 | set = GNUNET_new (struct Set); |
3055 | { | 4545 | { |
3056 | struct StrataEstimator *se; | 4546 | struct MultiStrataEstimator *se; |
3057 | 4547 | ||
3058 | se = strata_estimator_create (SE_STRATA_COUNT, | 4548 | se = strata_estimator_create (SE_STRATA_COUNT, |
3059 | SE_IBF_SIZE, | 4549 | SE_IBFS_TOTAL_SIZE, |
3060 | SE_IBF_HASH_NUM); | 4550 | SE_IBF_HASH_NUM); |
3061 | if (NULL == se) | 4551 | if (NULL == se) |
3062 | { | 4552 | { |
@@ -3198,6 +4688,7 @@ channel_window_cb (void *cls, | |||
3198 | * @param cls client that sent the message | 4688 | * @param cls client that sent the message |
3199 | * @param msg message sent by the client | 4689 | * @param msg message sent by the client |
3200 | */ | 4690 | */ |
4691 | |||
3201 | static void | 4692 | static void |
3202 | handle_client_listen (void *cls, | 4693 | handle_client_listen (void *cls, |
3203 | const struct GNUNET_SETU_ListenMessage *msg) | 4694 | const struct GNUNET_SETU_ListenMessage *msg) |
@@ -3240,10 +4731,10 @@ handle_client_listen (void *cls, | |||
3240 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, | 4731 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, |
3241 | struct GNUNET_MessageHeader, | 4732 | struct GNUNET_MessageHeader, |
3242 | NULL), | 4733 | NULL), |
3243 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, | 4734 | GNUNET_MQ_hd_var_size (union_p2p_request_full, |
3244 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, | 4735 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, |
3245 | struct GNUNET_MessageHeader, | 4736 | struct TransmitFullMessage, |
3246 | NULL), | 4737 | NULL), |
3247 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | 4738 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, |
3248 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, | 4739 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, |
3249 | struct StrataEstimatorMessage, | 4740 | struct StrataEstimatorMessage, |
@@ -3256,6 +4747,10 @@ handle_client_listen (void *cls, | |||
3256 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, | 4747 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, |
3257 | struct GNUNET_SETU_ElementMessage, | 4748 | struct GNUNET_SETU_ElementMessage, |
3258 | NULL), | 4749 | NULL), |
4750 | GNUNET_MQ_hd_var_size (union_p2p_send_full, | ||
4751 | GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL, | ||
4752 | struct TransmitFullMessage, | ||
4753 | NULL), | ||
3259 | GNUNET_MQ_handler_end () | 4754 | GNUNET_MQ_handler_end () |
3260 | }; | 4755 | }; |
3261 | struct Listener *listener; | 4756 | struct Listener *listener; |
@@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls, | |||
3451 | { | 4946 | { |
3452 | struct ClientState *cs = cls; | 4947 | struct ClientState *cs = cls; |
3453 | struct Operation *op = GNUNET_new (struct Operation); | 4948 | struct Operation *op = GNUNET_new (struct Operation); |
4949 | |||
3454 | const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { | 4950 | const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { |
3455 | GNUNET_MQ_hd_var_size (incoming_msg, | 4951 | GNUNET_MQ_hd_var_size (incoming_msg, |
3456 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, | 4952 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, |
@@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls, | |||
3488 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, | 4984 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, |
3489 | struct GNUNET_MessageHeader, | 4985 | struct GNUNET_MessageHeader, |
3490 | op), | 4986 | op), |
3491 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, | 4987 | GNUNET_MQ_hd_var_size (union_p2p_request_full, |
3492 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, | 4988 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, |
3493 | struct GNUNET_MessageHeader, | 4989 | struct TransmitFullMessage, |
3494 | op), | 4990 | op), |
3495 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | 4991 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, |
3496 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, | 4992 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, |
3497 | struct StrataEstimatorMessage, | 4993 | struct StrataEstimatorMessage, |
@@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls, | |||
3504 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, | 5000 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, |
3505 | struct GNUNET_SETU_ElementMessage, | 5001 | struct GNUNET_SETU_ElementMessage, |
3506 | op), | 5002 | op), |
5003 | GNUNET_MQ_hd_var_size (union_p2p_send_full, | ||
5004 | GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL, | ||
5005 | struct TransmitFullMessage, | ||
5006 | NULL), | ||
3507 | GNUNET_MQ_handler_end () | 5007 | GNUNET_MQ_handler_end () |
3508 | }; | 5008 | }; |
3509 | struct Set *set; | 5009 | struct Set *set; |
@@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls, | |||
3525 | op->force_full = msg->force_full; | 5025 | op->force_full = msg->force_full; |
3526 | op->force_delta = msg->force_delta; | 5026 | op->force_delta = msg->force_delta; |
3527 | op->symmetric = msg->symmetric; | 5027 | op->symmetric = msg->symmetric; |
5028 | op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff; | ||
5029 | op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor; | ||
5030 | op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element; | ||
5031 | op->byzantine_upper_bound = msg->byzantine_upper_bond; | ||
5032 | op->active_passive_switch_required = false; | ||
3528 | context = GNUNET_MQ_extract_nested_mh (msg); | 5033 | context = GNUNET_MQ_extract_nested_mh (msg); |
3529 | 5034 | ||
5035 | /* create hashmap for message control */ | ||
5036 | op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32, | ||
5037 | GNUNET_NO); | ||
5038 | op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO); | ||
5039 | |||
5040 | #if MEASURE_PERFORMANCE | ||
5041 | /* load config */ | ||
5042 | load_config (op); | ||
5043 | #endif | ||
5044 | |||
3530 | /* Advance generation values, so that | 5045 | /* Advance generation values, so that |
3531 | mutations won't interfer with the running operation. */ | 5046 | mutations won't interfer with the running operation. */ |
3532 | op->set = set; | 5047 | op->set = set; |
@@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls, | |||
3550 | struct GNUNET_MQ_Envelope *ev; | 5065 | struct GNUNET_MQ_Envelope *ev; |
3551 | struct OperationRequestMessage *msg; | 5066 | struct OperationRequestMessage *msg; |
3552 | 5067 | ||
3553 | perf_rtt.operation_request.sent += 1; | 5068 | #if MEASURE_PERFORMANCE |
5069 | perf_store.operation_request.sent += 1; | ||
5070 | #endif | ||
3554 | ev = GNUNET_MQ_msg_nested_mh (msg, | 5071 | ev = GNUNET_MQ_msg_nested_mh (msg, |
3555 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, | 5072 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, |
3556 | context); | 5073 | context); |
@@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls, | |||
3567 | op->se = strata_estimator_dup (op->set->se); | 5084 | op->se = strata_estimator_dup (op->set->se); |
3568 | /* we started the operation, thus we have to send the operation request */ | 5085 | /* we started the operation, thus we have to send the operation request */ |
3569 | op->phase = PHASE_EXPECT_SE; | 5086 | op->phase = PHASE_EXPECT_SE; |
3570 | op->salt_receive = op->salt_send = 42; // FIXME????? | 5087 | |
5088 | op->salt_receive = (op->peer_site + 1) % 2; | ||
5089 | op->salt_send = op->peer_site; // FIXME????? | ||
5090 | |||
5091 | |||
3571 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 5092 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
3572 | "Initiating union operation evaluation\n"); | 5093 | "Initiating union operation evaluation\n"); |
3573 | GNUNET_STATISTICS_update (_GSS_statistics, | 5094 | GNUNET_STATISTICS_update (_GSS_statistics, |
@@ -3711,6 +5232,20 @@ handle_client_accept (void *cls, | |||
3711 | op->force_full = msg->force_full; | 5232 | op->force_full = msg->force_full; |
3712 | op->force_delta = msg->force_delta; | 5233 | op->force_delta = msg->force_delta; |
3713 | op->symmetric = msg->symmetric; | 5234 | op->symmetric = msg->symmetric; |
5235 | op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff; | ||
5236 | op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor; | ||
5237 | op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element; | ||
5238 | op->byzantine_upper_bound = msg->byzantine_upper_bond; | ||
5239 | op->active_passive_switch_required = false; | ||
5240 | /* create hashmap for message control */ | ||
5241 | op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32, | ||
5242 | GNUNET_NO); | ||
5243 | op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO); | ||
5244 | |||
5245 | #if MEASURE_PERFORMANCE | ||
5246 | /* load config */ | ||
5247 | load_config (op); | ||
5248 | #endif | ||
3714 | 5249 | ||
3715 | /* Advance generation values, so that future mutations do not | 5250 | /* Advance generation values, so that future mutations do not |
3716 | interfer with the running operation. */ | 5251 | interfer with the running operation. */ |
@@ -3729,7 +5264,7 @@ handle_client_accept (void *cls, | |||
3729 | 1, | 5264 | 1, |
3730 | GNUNET_NO); | 5265 | GNUNET_NO); |
3731 | { | 5266 | { |
3732 | const struct StrataEstimator *se; | 5267 | struct MultiStrataEstimator *se; |
3733 | struct GNUNET_MQ_Envelope *ev; | 5268 | struct GNUNET_MQ_Envelope *ev; |
3734 | struct StrataEstimatorMessage *strata_msg; | 5269 | struct StrataEstimatorMessage *strata_msg; |
3735 | char *buf; | 5270 | char *buf; |
@@ -3739,20 +5274,40 @@ handle_client_accept (void *cls, | |||
3739 | op->se = strata_estimator_dup (op->set->se); | 5274 | op->se = strata_estimator_dup (op->set->se); |
3740 | op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, | 5275 | op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, |
3741 | GNUNET_NO); | 5276 | GNUNET_NO); |
3742 | op->salt_receive = op->salt_send = 42; // FIXME????? | 5277 | op->salt_receive = (op->peer_site + 1) % 2; |
5278 | op->salt_send = op->peer_site; // FIXME????? | ||
3743 | initialize_key_to_element (op); | 5279 | initialize_key_to_element (op); |
3744 | op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( | 5280 | op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( |
3745 | op->key_to_element); | 5281 | op->key_to_element); |
3746 | 5282 | ||
3747 | /* kick off the operation */ | 5283 | /* kick off the operation */ |
3748 | se = op->se; | 5284 | se = op->se; |
3749 | buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); | 5285 | |
5286 | uint8_t se_count = 1; | ||
5287 | if (op->initial_size > 0) | ||
5288 | { | ||
5289 | op->total_elements_size_local = 0; | ||
5290 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
5291 | & | ||
5292 | determinate_avg_element_size_iterator, | ||
5293 | op); | ||
5294 | se_count = determine_strata_count ( | ||
5295 | op->total_elements_size_local / op->initial_size, | ||
5296 | op->initial_size); | ||
5297 | } | ||
5298 | buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE | ||
5299 | * ((SE_IBFS_TOTAL_SIZE / 8) * se_count)); | ||
3750 | len = strata_estimator_write (se, | 5300 | len = strata_estimator_write (se, |
5301 | SE_IBFS_TOTAL_SIZE, | ||
5302 | se_count, | ||
3751 | buf); | 5303 | buf); |
3752 | perf_rtt.se.sent += 1; | 5304 | #if MEASURE_PERFORMANCE |
3753 | perf_rtt.se.sent_var_bytes += len; | 5305 | perf_store.se.sent += 1; |
5306 | perf_store.se.sent_var_bytes += len; | ||
5307 | #endif | ||
3754 | 5308 | ||
3755 | if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) | 5309 | if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE |
5310 | * SE_IBFS_TOTAL_SIZE) | ||
3756 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; | 5311 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; |
3757 | else | 5312 | else |
3758 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; | 5313 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; |
@@ -3766,6 +5321,7 @@ handle_client_accept (void *cls, | |||
3766 | strata_msg->set_size | 5321 | strata_msg->set_size |
3767 | = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( | 5322 | = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( |
3768 | op->set->content->elements)); | 5323 | op->set->content->elements)); |
5324 | strata_msg->se_count = se_count; | ||
3769 | GNUNET_MQ_send (op->mq, | 5325 | GNUNET_MQ_send (op->mq, |
3770 | ev); | 5326 | ev); |
3771 | op->phase = PHASE_EXPECT_IBF; | 5327 | op->phase = PHASE_EXPECT_IBF; |
@@ -3800,8 +5356,9 @@ shutdown_task (void *cls) | |||
3800 | GNUNET_YES); | 5356 | GNUNET_YES); |
3801 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 5357 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3802 | "handled shutdown request\n"); | 5358 | "handled shutdown request\n"); |
3803 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 5359 | #if MEASURE_PERFORMANCE |
3804 | "RTT:%f\n", calculate_perf_rtt()); | 5360 | calculate_perf_store (); |
5361 | #endif | ||
3805 | } | 5362 | } |
3806 | 5363 | ||
3807 | 5364 | ||