aboutsummaryrefslogtreecommitdiff
path: root/src/setu/gnunet-service-setu.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/setu/gnunet-service-setu.c')
-rw-r--r--src/setu/gnunet-service-setu.c2085
1 files changed, 1821 insertions, 264 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index bd1113f15..339d347f8 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -22,6 +22,7 @@
22 * @brief set union operation 22 * @brief set union operation
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -50,33 +51,61 @@
50 */ 51 */
51#define SE_STRATA_COUNT 32 52#define SE_STRATA_COUNT 32
52 53
54
53/** 55/**
54 * Size of the IBFs in the strata estimator. 56 * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348
57 * Based on the bsc thesis of Elias Summermatter (2021)
55 */ 58 */
56#define SE_IBF_SIZE 80 59#define SE_IBFS_TOTAL_SIZE 632
57 60
58/** 61/**
59 * The hash num parameter for the difference digests and strata estimators. 62 * The hash num parameter for the difference digests and strata estimators.
60 */ 63 */
61#define SE_IBF_HASH_NUM 4 64#define SE_IBF_HASH_NUM 3
62 65
63/** 66/**
64 * Number of buckets that can be transmitted in one message. 67 * Number of buckets that can be transmitted in one message.
65 */ 68 */
66#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) 69#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
67 70
68/** 71/**
69 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). 72 * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
70 * Choose this value so that computing the IBF is still cheaper 73 * Choose this value so that computing the IBF is still cheaper
71 * than transmitting all values. 74 * than transmitting all values.
72 */ 75 */
73#define MAX_IBF_ORDER (20) 76#define MAX_IBF_SIZE 1048576
77
78
79/**
80 * Minimal size of an ibf
81 * Based on the bsc thesis of Elias Summermatter (2021)
82 */
83#define IBF_MIN_SIZE 37
84
85/**
86 * AVG RTT for differential sync when k=2 and Factor = 2
87 * Based on the bsc thesis of Elias Summermatter (2021)
88 */
89#define DIFFERENTIAL_RTT_MEAN 3.65145
90
91/**
92 * Security level used for byzantine checks (2^80)
93 */
94
95#define SECURITY_LEVEL 80
96
97/**
98 * Is the estimated probability for a new round this values
99 * is based on the bsc thesis of Elias Summermatter (2021)
100 */
101
102#define PROBABILITY_FOR_NEW_ROUND 0.15
74 103
75/** 104/**
76 * Number of buckets used in the ibf per estimated 105 * Measure the performance in a csv
77 * difference.
78 */ 106 */
79#define IBF_ALPHA 4 107
108#define MEASURE_PERFORMANCE 0
80 109
81 110
82/** 111/**
@@ -146,6 +175,28 @@ enum UnionOperationPhase
146 PHASE_FULL_RECEIVING 175 PHASE_FULL_RECEIVING
147}; 176};
148 177
178/**
179 * Different modes of operations
180 */
181
182enum MODE_OF_OPERATION
183{
184 /**
185 * Mode just synchronizes the difference between sets
186 */
187 DIFFERENTIAL_SYNC,
188
189 /**
190 * Mode send full set sending local set first
191 */
192 FULL_SYNC_LOCAL_SENDING_FIRST,
193
194 /**
195 * Mode request full set from remote peer
196 */
197 FULL_SYNC_REMOTE_SENDING_FIRST
198};
199
149 200
150/** 201/**
151 * Information about an element element in the set. All elements are 202 * Information about an element element in the set. All elements are
@@ -277,7 +328,7 @@ struct Operation
277 * Copy of the set's strata estimator at the time of 328 * Copy of the set's strata estimator at the time of
278 * creation of this operation. 329 * creation of this operation.
279 */ 330 */
280 struct StrataEstimator *se; 331 struct MultiStrataEstimator *se;
281 332
282 /** 333 /**
283 * The IBF we currently receive. 334 * The IBF we currently receive.
@@ -320,7 +371,7 @@ struct Operation
320 /** 371 /**
321 * Number of ibf buckets already received into the @a remote_ibf. 372 * Number of ibf buckets already received into the @a remote_ibf.
322 */ 373 */
323 unsigned int ibf_buckets_received; 374 uint64_t ibf_buckets_received;
324 375
325 /** 376 /**
326 * Salt that we're using for sending IBFs 377 * Salt that we're using for sending IBFs
@@ -386,7 +437,7 @@ struct Operation
386 * Lower bound for the set size, used only when 437 * Lower bound for the set size, used only when
387 * byzantine mode is enabled. 438 * byzantine mode is enabled.
388 */ 439 */
389 int byzantine_lower_bound; 440 uint64_t byzantine_lower_bound;
390 441
391 /** 442 /**
392 * Unique request id for the request from a remote peer, sent to the 443 * Unique request id for the request from a remote peer, sent to the
@@ -401,21 +452,83 @@ struct Operation
401 */ 452 */
402 unsigned int generation_created; 453 unsigned int generation_created;
403 454
455
456 /**
457 * User defined Bandwidth Round Trips Tradeoff
458 */
459 uint64_t rtt_bandwidth_tradeoff;
460
461
462 /**
463 * Number of Element per bucket in IBF
464 */
465 uint8_t ibf_number_buckets_per_element;
466
467
468 /**
469 * Set difference is multiplied with this factor
470 * to gennerate large enough IBF
471 */
472 uint8_t ibf_bucket_number_factor;
473
474 /**
475 * Defines which site a client is
476 * 0 = Initiating peer
477 * 1 = Receiving peer
478 */
479 uint8_t peer_site;
480
481
404 /** 482 /**
405 * User defined Bandwidth Round Trips Tradeoff 483 * Local peer element count
406 */ 484 */
407 double rtt_bandwidth_tradeoff; 485 uint64_t local_element_count;
408 486
409 /** 487 /**
410 * Number of Element per bucket in IBF 488 * Mode of operation that was chosen by the algorithm
411 */ 489 */
412 unsigned int ibf_number_buckets_per_element; 490 uint8_t mode_of_operation;
413 491
414 /** 492 /**
415 * Number of buckets in IBF 493 * Hashmap to keep track of the send/received messages
416 */ 494 */
417 unsigned ibf_bucket_number; 495 struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
418 496
497 /**
498 * Hashmap to keep track of the send/received inquiries (ibf keys)
499 */
500 struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
501
502
503 /**
504 * Total size of local set
505 */
506 uint64_t total_elements_size_local;
507
508 /**
509 * Limit of number of elements in set
510 */
511 uint64_t byzantine_upper_bound;
512
513 /**
514 * is the count of already passed differential sync iterations
515 */
516 uint8_t differential_sync_iterations;
517
518 /**
519 * Estimated or committed set difference at the start
520 */
521 uint64_t remote_set_diff;
522
523 /**
524 * Estimated or committed set difference at the start
525 */
526 uint64_t local_set_diff;
527
528 /**
529 * Boolean to enforce an active passive switch
530 */
531 bool active_passive_switch_required;
419}; 532};
420 533
421 534
@@ -431,6 +544,16 @@ struct SetContent
431 struct GNUNET_CONTAINER_MultiHashMap *elements; 544 struct GNUNET_CONTAINER_MultiHashMap *elements;
432 545
433 /** 546 /**
547 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized.
548 */
549 struct GNUNET_CONTAINER_MultiHashMap *elements_randomized;
550
551 /**
552 * Salt to construct the randomized element map
553 */
554 uint64_t elements_randomized_salt;
555
556 /**
434 * Number of references to the content. 557 * Number of references to the content.
435 */ 558 */
436 unsigned int refcount; 559 unsigned int refcount;
@@ -478,7 +601,7 @@ struct Set
478 * The strata estimator is only generated once for each set. The IBF keys 601 * The strata estimator is only generated once for each set. The IBF keys
479 * are derived from the element hashes with salt=0. 602 * are derived from the element hashes with salt=0.
480 */ 603 */
481 struct StrataEstimator *se; 604 struct MultiStrataEstimator *se;
482 605
483 /** 606 /**
484 * Evaluate operations are held in a linked list. 607 * Evaluate operations are held in a linked list.
@@ -635,96 +758,687 @@ static int in_shutdown;
635 */ 758 */
636static uint32_t suggest_id; 759static uint32_t suggest_id;
637 760
761#if MEASURE_PERFORMANCE
762/**
763 * Handles configuration file for setu performance test
764 *
765 */
766static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767
768
769/**
770 * Stores the performance data for induvidual message
771 */
772
773
774struct perf_num_send_received_msg
775{
776 uint64_t sent;
777 uint64_t sent_var_bytes;
778 uint64_t received;
779 uint64_t received_var_bytes;
780};
781
782/**
783 * Main struct to measure performance (data/rtts)
784 */
785struct per_store_struct
786{
787 struct perf_num_send_received_msg operation_request;
788 struct perf_num_send_received_msg se;
789 struct perf_num_send_received_msg request_full;
790 struct perf_num_send_received_msg element_full;
791 struct perf_num_send_received_msg full_done;
792 struct perf_num_send_received_msg ibf;
793 struct perf_num_send_received_msg inquery;
794 struct perf_num_send_received_msg element;
795 struct perf_num_send_received_msg demand;
796 struct perf_num_send_received_msg offer;
797 struct perf_num_send_received_msg done;
798 struct perf_num_send_received_msg over;
799 uint64_t se_diff;
800 uint64_t se_diff_remote;
801 uint64_t se_diff_local;
802 uint64_t active_passive_switches;
803 uint8_t mode_of_operation;
804};
805
806struct per_store_struct perf_store;
807#endif
808
809/**
810 * Different states to control the messages flow in differential mode
811 */
812
813enum MESSAGE_CONTROL_FLOW_STATE
814{
815 /**
816 * Initial message state
817 */
818 MSG_CFS_UNINITIALIZED,
819
820 /**
821 * Track that a message has been sent
822 */
823 MSG_CFS_SENT,
824
825 /**
826 * Track that receiving this message is expected
827 */
828 MSG_CFS_EXPECTED,
829
830 /**
831 * Track that message has been received
832 */
833 MSG_CFS_RECEIVED,
834};
638 835
639/** 836/**
640 * Added Roundtripscounter 837 * Message types to track in message control flow
641 */ 838 */
642 839
840enum MESSAGE_TYPE
841{
842 /**
843 * Offer message type
844 */
845 OFFER_MESSAGE,
643 846
644struct perf_num_send_resived_msg { 847 /**
645 int sent; 848 * Demand message type
646 int sent_var_bytes; 849 */
647 int received; 850 DEMAND_MESSAGE,
648 int received_var_bytes; 851
852 /**
853 * Element message type
854 */
855 ELEMENT_MESSAGE,
649}; 856};
650 857
651 858
652struct perf_rtt_struct 859/**
653{ 860 * Struct to tracked messages in message control flow
654 struct perf_num_send_resived_msg operation_request; 861 */
655 struct perf_num_send_resived_msg se; 862struct messageControlFlowElement
656 struct perf_num_send_resived_msg request_full; 863{
657 struct perf_num_send_resived_msg element_full; 864 /**
658 struct perf_num_send_resived_msg full_done; 865 * Track the message control state of the offer message
659 struct perf_num_send_resived_msg ibf; 866 */
660 struct perf_num_send_resived_msg inquery; 867 enum MESSAGE_CONTROL_FLOW_STATE offer;
661 struct perf_num_send_resived_msg element; 868 /**
662 struct perf_num_send_resived_msg demand; 869 * Track the message control state of the demand message
663 struct perf_num_send_resived_msg offer; 870 */
664 struct perf_num_send_resived_msg done; 871 enum MESSAGE_CONTROL_FLOW_STATE demand;
665 struct perf_num_send_resived_msg over; 872 /**
873 * Track the message control state of the element message
874 */
875 enum MESSAGE_CONTROL_FLOW_STATE element;
666}; 876};
667 877
668struct perf_rtt_struct perf_rtt;
669 878
879#if MEASURE_PERFORMANCE
670 880
881/**
882 * Loads different configuration to perform performance tests
883 *
884 * @param op operation handle
885 */
886static void
887load_config (struct Operation *op)
888{
889 long long number;
890 float fl;
891
892 setu_cfg = GNUNET_CONFIGURATION_create ();
893 GNUNET_CONFIGURATION_load (setu_cfg,
894 "perf_setu.conf");
895 GNUNET_CONFIGURATION_get_value_float (setu_cfg,
896 "IBF",
897 "BUCKET_NUMBER_FACTOR",
898 &fl);
899 op->ibf_bucket_number_factor = fl;
900 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
901 "IBF",
902 "NUMBER_PER_BUCKET",
903 &number);
904 op->ibf_number_buckets_per_element = number;
905 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
906 "PERFORMANCE",
907 "TRADEOFF",
908 &number);
909 op->rtt_bandwidth_tradeoff = number;
910 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
911 "BOUNDARIES",
912 "UPPER_ELEMENT",
913 &number);
914 op->byzantine_upper_bound = number;
915 op->peer_site = 0;
916}
917
918
919/**
920 * Function to calculate total bytes used for performance measurement
921 * @param size
922 * @param perf_num_send_received_msg
923 * @return bytes used
924 */
671static int 925static int
672sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) { 926sum_sent_received_bytes (uint64_t size,
673 return (size * perf_rtt_struct.sent) + 927 struct perf_num_send_received_msg
674 (size * perf_rtt_struct.received) + 928 perf_num_send_received_msg)
675 perf_rtt_struct.sent_var_bytes + 929{
676 perf_rtt_struct.received_var_bytes; 930 return (size * perf_num_send_received_msg.sent)
931 + (size * perf_num_send_received_msg.received)
932 + perf_num_send_received_msg.sent_var_bytes
933 + perf_num_send_received_msg.received_var_bytes;
677} 934}
678 935
679static float
680calculate_perf_rtt() {
681 /**
682 * Calculate RTT of init phase normally always 1
683 */
684 float rtt = 1;
685 int bytes_transmitted = 0;
686 936
687 /** 937/**
688 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending 938 * Function that calculates the perfmance values and writes them down
689 */ 939 */
690 if (( perf_rtt.element_full.received != 0 ) || 940static void
691 ( perf_rtt.element_full.sent != 0) 941calculate_perf_store ()
692 ) rtt += 1; 942{
693 943
694 if (( perf_rtt.request_full.received != 0 ) || 944 /**
695 ( perf_rtt.request_full.sent != 0) 945 * Calculate RTT of init phase normally always 1
696 ) rtt += 0.5; 946 */
947 float rtt = 1;
948 int bytes_transmitted = 0;
697 949
698 /** 950 /**
699 * In case of a differential sync 3 rtt's are needed. 951 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending
700 * for every active/passive switch additional 3.5 rtt's are used 952 */
701 */ 953 if ((perf_store.element_full.received != 0) ||
954 (perf_store.element_full.sent != 0)
955 )
956 rtt += 1;
957
958 if ((perf_store.request_full.received != 0) ||
959 (perf_store.request_full.sent != 0)
960 )
961 rtt += 0.5;
702 962
703 int iterations = perf_rtt.ibf.received; 963 /**
704 if(iterations > 1) 964 * In case of a differential sync 3 rtt's are needed.
705 rtt += (iterations - 1 ) * 0.5; 965 * for every active/passive switch additional 3.5 rtt's are used
706 rtt += 3 * iterations; 966 */
967 if ((perf_store.element.received != 0) ||
968 (perf_store.element.sent != 0))
969 {
970 int iterations = perf_store.active_passive_switches;
971
972 if (iterations > 0)
973 rtt += iterations * 0.5;
974 rtt += 2.5;
975 }
976
977
978 /**
979 * Calculate data sended size
980 */
981 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
982 GNUNET_SETU_ResultMessage),
983 perf_store.request_full);
984
985 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
986 GNUNET_SETU_ElementMessage),
987 perf_store.element_full);
988 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
989 GNUNET_SETU_ElementMessage),
990 perf_store.element);
991 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
992 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
993 StrataEstimatorMessage),
994 perf_store.se);
995 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
996 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
997 perf_store.ibf);
998 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
999 perf_store.inquery);
1000 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1001 GNUNET_MessageHeader),
1002 perf_store.demand);
1003 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1004 GNUNET_MessageHeader),
1005 perf_store.offer);
1006 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1007
1008 /**
1009 * Write IBF failure rate for different BUCKET_NUMBER_FACTOR
1010 */
1011 float factor;
1012 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1013 &factor);
1014 long long num_per_bucket;
1015 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1016 &num_per_bucket);
1017
1018
1019 int decoded = 0;
1020 if (perf_store.active_passive_switches == 0)
1021 decoded = 1;
1022 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1023 IBFMessage),
1024 perf_store.ibf);
1025
1026 FILE *out1 = fopen ("perf_data.csv", "a");
1027 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1028 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1029 bytes_transmitted,
1030 perf_store.se_diff_local,perf_store.se_diff_remote,
1031 perf_store.mode_of_operation);
1032 fclose (out1);
1033
1034}
1035
1036
1037#endif
1038/**
1039 * Function that chooses the optimal mode of operation depending on
1040 * operation parameters.
1041 * @param avg_element_size
1042 * @param local_set_size
1043 * @param remote_set_size
1044 * @param est_set_diff_remote
1045 * @param est_set_diff_local
1046 * @param bandwith_latency_tradeoff
1047 * @param ibf_bucket_number_factor
1048 * @return calcuated mode of operation
1049 */
1050static uint8_t
1051estimate_best_mode_of_operation (uint64_t avg_element_size,
1052 uint64_t local_set_size,
1053 uint64_t remote_set_size,
1054 uint64_t est_set_diff_remote,
1055 uint64_t est_set_diff_local,
1056 uint64_t bandwith_latency_tradeoff,
1057 uint64_t ibf_bucket_number_factor)
1058{
1059
1060 /*
1061 * In case of initial sync fall to predefined states
1062 */
1063
1064 if (0 == local_set_size)
1065 return FULL_SYNC_REMOTE_SENDING_FIRST;
1066 if (0 == remote_set_size)
1067 return FULL_SYNC_LOCAL_SENDING_FIRST;
1068
1069 /*
1070 * Calculate bytes for full Sync
1071 */
1072
1073 uint8_t sizeof_full_done_header = 4;
1074 uint8_t sizeof_done_header = 4;
1075 uint8_t rtt_min_full = 2;
1076 uint8_t sizeof_request_full = 4;
1077 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1078
1079 /* Estimate byte required if we send first */
1080 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1081 + local_set_size;
1082
1083 uint64_t total_bytes_full_local_send_first = (avg_element_size
1084 *
1085 total_elements_to_send_local_send_first) \
1086 + (
1087 total_elements_to_send_local_send_first * sizeof(struct
1088 GNUNET_SETU_ElementMessage)) \
1089 + (sizeof_full_done_header * 2) \
1090 + rtt_min_full
1091 * bandwith_latency_tradeoff;
1092
1093 /* Estimate bytes required if we request from remote peer */
1094 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095 + remote_set_size;
1096
1097 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098 *
1099 total_elements_to_send_remote_send_first) \
1100 + (
1101 total_elements_to_send_remote_send_first * sizeof(struct
1102 GNUNET_SETU_ElementMessage)) \
1103 + (sizeof_full_done_header * 2) \
1104 + (rtt_min_full + 0.5)
1105 * bandwith_latency_tradeoff \
1106 + sizeof_request_full;
1107
1108 /*
1109 * Calculate bytes for differential Sync
1110 */
1111
1112 /* Estimate bytes required by IBF transmission*/
1113
1114 long double ibf_bucket_count = estimated_total_diff
1115 * ibf_bucket_number_factor;
1116
1117 if (ibf_bucket_count <= IBF_MIN_SIZE)
1118 {
1119 ibf_bucket_count = IBF_MIN_SIZE;
1120 }
1121 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1122 / ((float) MAX_BUCKETS_PER_MESSAGE));
1123
1124 uint64_t estimated_counter_size = ceil (
1125 MIN (2 * log2l (((float) local_set_size)
1126 / ((float) ibf_bucket_count)),
1127 log2l (local_set_size)));
1128
1129 long double counter_bytes = (float) estimated_counter_size / 8;
1130
1131 uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count)
1132 * 1.2 \
1133 + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1134 + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1135 * 1.2 \
1136 + (ibf_bucket_count * counter_bytes) * 1.2);
1137
1138 /* Estimate full byte count for differential sync */
1139 uint64_t element_size = (avg_element_size
1140 + sizeof (struct GNUNET_SETU_ElementMessage)) \
1141 * estimated_total_diff;
1142 uint64_t done_size = sizeof_done_header;
1143 uint64_t inquery_size = (sizeof (struct IBF_Key)
1144 + sizeof (struct InquiryMessage))
1145 * estimated_total_diff;
1146 uint64_t demand_size =
1147 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1148 * estimated_total_diff;
1149 uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1150 + sizeof (struct GNUNET_MessageHeader))
1151 * estimated_total_diff;
1152
1153 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1154 + demand_size + offer_size + ibf_bytes) \
1155 + (DIFFERENTIAL_RTT_MEAN
1156 * bandwith_latency_tradeoff);
1157
1158 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1159 total_bytes_full_remote_send_first);
1160
1161 /* Decide between full and differential sync */
1162
1163 if (full_min < total_bytes_diff)
1164 {
1165 /* Decide between sending all element first or receiving all elements */
1166 if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1167 {
1168 return FULL_SYNC_LOCAL_SENDING_FIRST;
1169 }
1170 else
1171 {
1172 return FULL_SYNC_REMOTE_SENDING_FIRST;
1173 }
1174 }
1175 else
1176 {
1177 return DIFFERENTIAL_SYNC;
1178 }
1179}
1180
1181
1182/**
1183 * Validates the if a message is received in a correct phase
1184 * @param allowed_phases
1185 * @param size_phases
1186 * @param op
1187 * @return #GNUNET_YES if message permitted in phase and #GNUNET_NO if not permitted in given
1188 * phase
1189 */
1190static enum GNUNET_GenericReturnValue
1191check_valid_phase (const uint8_t allowed_phases[],
1192 size_t size_phases,
1193 struct Operation *op)
1194{
1195 /**
1196 * Iterate over allowed phases
1197 */
1198 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1199 {
1200 uint8_t phase = allowed_phases[phase_ctr];
1201 if (phase == op->phase)
1202 {
1203 LOG (GNUNET_ERROR_TYPE_DEBUG,
1204 "Message received in valid phase\n");
1205 return GNUNET_YES;
1206 }
1207 }
1208 LOG (GNUNET_ERROR_TYPE_ERROR,
1209 "Received message in invalid phase: %u\n", op->phase);
1210 return GNUNET_NO;
1211}
1212
1213
1214/**
1215 * Function to update, track and validate message received in differential
1216 * sync. This function tracks states of messages and check it against different
1217 * constraints as described in Summermatter's BSc Thesis (2021)
1218 * @param hash_map: Hashmap to store message control flow
1219 * @param new_mcfs: The new message control flow state an given message type should be set to
1220 * @param hash_code: Hash code of the element
1221 * @param mt: The message type for which the message control flow state should be set
1222 * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid
1223 * at this point in message flow
1224 */
1225static int
1226update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map,
1227 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1228 const struct GNUNET_HashCode *hash_code,
1229 enum MESSAGE_TYPE mt)
1230{
1231 struct messageControlFlowElement *cfe = NULL;
1232 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1233
1234 /**
1235 * Check logic for forbidden messages
1236 */
1237
1238 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1239 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1240 {
1241 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1242 {
1243 LOG (GNUNET_ERROR_TYPE_ERROR,
1244 "Received an element without sent offer!\n");
1245 return GNUNET_NO;
1246 }
1247 /* Check that only requested elements are received! */
1248 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1249 MSG_CFS_SENT))
1250 {
1251 LOG (GNUNET_ERROR_TYPE_ERROR,
1252 "Received an element that was not demanded\n");
1253 return GNUNET_NO;
1254 }
1255 }
1256
1257 /**
1258 * In case the element hash is not in the hashmap create a new entry
1259 */
1260
1261 if (NULL == cfe)
1262 {
1263 cfe = GNUNET_new (struct messageControlFlowElement);
1264 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1265 cfe,
1266 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1267 {
1268 GNUNET_free (cfe);
1269 return GNUNET_SYSERR;
1270 }
1271 }
1272
1273 /**
1274 * Set state of message
1275 */
1276
1277 if (OFFER_MESSAGE == mt)
1278 {
1279 mcfs = &cfe->offer;
1280 }
1281 else if (DEMAND_MESSAGE == mt)
1282 {
1283 mcfs = &cfe->demand;
1284 }
1285 else if (ELEMENT_MESSAGE == mt)
1286 {
1287 mcfs = &cfe->element;
1288 }
1289 else
1290 {
1291 return GNUNET_SYSERR;
1292 }
1293
1294 /**
1295 * Check if state is allowed
1296 */
1297
1298 if (new_mcfs <= *mcfs)
1299 {
1300 return GNUNET_NO;
1301 }
1302
1303 *mcfs = new_mcfs;
1304 return GNUNET_YES;
1305}
1306
1307
1308/**
1309 * Validate if a message in differential sync si already received before.
1310 * @param hash_map
1311 * @param hash_code
1312 * @param mt
1313 * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO
1314 */
1315static int
1316is_message_in_message_control_flow (struct
1317 GNUNET_CONTAINER_MultiHashMap *hash_map,
1318 struct GNUNET_HashCode *hash_code,
1319 enum MESSAGE_TYPE mt)
1320{
1321 struct messageControlFlowElement *cfe = NULL;
1322 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1323
1324 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1325
1326 /**
1327 * Set state of message
1328 */
1329
1330 if (cfe != NULL)
1331 {
1332 if (OFFER_MESSAGE == mt)
1333 {
1334 mcfs = &cfe->offer;
1335 }
1336 else if (DEMAND_MESSAGE == mt)
1337 {
1338 mcfs = &cfe->demand;
1339 }
1340 else if (ELEMENT_MESSAGE == mt)
1341 {
1342 mcfs = &cfe->element;
1343 }
1344 else
1345 {
1346 return GNUNET_SYSERR;
1347 }
707 1348
708 /** 1349 /**
709 * Calculate data sended size 1350 * Evaluate if set is in message
710 */ 1351 */
711 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full); 1352 if (*mcfs != MSG_CFS_UNINITIALIZED)
712 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full); 1353 {
713 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element); 1354 return GNUNET_YES;
714 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request); 1355 }
715 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se); 1356 }
716 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done); 1357 return GNUNET_NO;
717 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf); 1358}
718 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery);
719 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand);
720 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer);
721 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done);
722 1359
723 LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted);
724 1360
725 LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt )); 1361/**
1362 * Iterator for determining if all demands have been
1363 * satisfied
1364 *
1365 * @param cls the union operation `struct Operation *`
1366 * @param key unused
1367 * @param value the `struct ElementEntry *` to insert
1368 * into the key-to-element mapping
1369 * @return #GNUNET_YES (to continue iterating)
1370 */
1371static int
1372determinate_done_message_iterator (void *cls,
1373 const struct GNUNET_HashCode *key,
1374 void *value)
1375{
1376 struct messageControlFlowElement *mcfe = value;
726 1377
727 return rtt; 1378 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1379 {
1380 return GNUNET_YES;
1381 }
1382 return GNUNET_NO;
1383}
1384
1385
1386/**
1387 * Iterator for determining average size
1388 *
1389 * @param cls the union operation `struct Operation *`
1390 * @param key unused
1391 * @param value the `struct ElementEntry *` to insert
1392 * into the key-to-element mapping
1393 * @return #GNUNET_YES (to continue iterating)
1394 */
1395static int
1396determinate_avg_element_size_iterator (void *cls,
1397 const struct GNUNET_HashCode *key,
1398 void *value)
1399{
1400 struct Operation *op = cls;
1401 struct GNUNET_SETU_Element *element = value;
1402 op->total_elements_size_local += element->size;
1403 return GNUNET_YES;
1404}
1405
1406
1407/**
1408 * Create randomized element hashmap for full sending
1409 *
1410 * @param cls the union operation `struct Operation *`
1411 * @param key unused
1412 * @param value the `struct ElementEntry *` to insert
1413 * into the key-to-element mapping
1414 * @return #GNUNET_YES (to continue iterating)
1415 */
1416static int
1417create_randomized_element_iterator (void *cls,
1418 const struct GNUNET_HashCode *key,
1419 void *value)
1420{
1421 struct Operation *op = cls;
1422
1423 struct GNUNET_HashContext *hashed_key_context =
1424 GNUNET_CRYPTO_hash_context_start ();
1425 struct GNUNET_HashCode new_key;
1426
1427 /**
1428 * Hash element with new salt to randomize hashmap
1429 */
1430 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1431 &key,
1432 sizeof(struct IBF_Key));
1433 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1434 &op->set->content->elements_randomized_salt,
1435 sizeof(uint32_t));
1436 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1437 &new_key);
1438 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1439 &new_key,value,
1440 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1441 return GNUNET_YES;
728} 1442}
729 1443
730 1444
@@ -808,6 +1522,36 @@ send_client_done (void *cls)
808} 1522}
809 1523
810 1524
1525/**
1526 * Check if all given byzantine parameters are in given boundaries
1527 * @param op
1528 * @return indicator if all given byzantine parameters are in given boundaries
1529 */
1530
1531static int
1532check_byzantine_bounds (struct Operation *op)
1533{
1534 if (op->byzantine != GNUNET_YES)
1535 return GNUNET_OK;
1536
1537 /**
1538 * Check upper byzantine bounds
1539 */
1540 if (op->remote_element_count + op->remote_set_diff >
1541 op->byzantine_upper_bound)
1542 return GNUNET_SYSERR;
1543 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1544 return GNUNET_SYSERR;
1545
1546 /**
1547 * Check lower byzantine bounds
1548 */
1549 if (op->remote_element_count < op->byzantine_lower_bound)
1550 return GNUNET_SYSERR;
1551 return GNUNET_OK;
1552}
1553
1554
811/* FIXME: the destroy logic is a mess and should be cleaned up! */ 1555/* FIXME: the destroy logic is a mess and should be cleaned up! */
812 1556
813/** 1557/**
@@ -977,6 +1721,101 @@ fail_union_operation (struct Operation *op)
977 1721
978 1722
979/** 1723/**
1724 * Function that checks if full sync is plausible
1725 * @param initial_local_elements_in_set
1726 * @param estimated_set_difference
1727 * @param repeated_elements
1728 * @param fresh_elements
1729 * @param op
1730 * @return GNUNET_OK if
1731 */
1732
1733static void
1734full_sync_plausibility_check (struct Operation *op)
1735{
1736 if (GNUNET_YES != op->byzantine)
1737 return;
1738
1739 int security_level_lb = -1 * SECURITY_LEVEL;
1740 uint64_t duplicates = op->received_fresh - op->received_total;
1741
1742 /*
1743 * Protect full sync from receiving double element when in FULL SENDING
1744 */
1745 if (PHASE_FULL_SENDING == op->phase)
1746 {
1747 if (duplicates > 0)
1748 {
1749 LOG (GNUNET_ERROR_TYPE_ERROR,
1750 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1751 "mode of operation this is not allowed! Duplicates: %llu\n",
1752 (unsigned long long) duplicates);
1753 GNUNET_break_op (0);
1754 fail_union_operation (op);
1755 return;
1756 }
1757
1758 }
1759
1760 /*
1761 * Protect full sync with probabilistic algorithm
1762 */
1763 if (PHASE_FULL_RECEIVING == op->phase)
1764 {
1765 if (0 == op->remote_set_diff)
1766 op->remote_set_diff = 1;
1767
1768 long double base = (1 - (long double) (op->remote_set_diff
1769 / (long double) (op->initial_size
1770 + op->
1771 remote_set_diff)));
1772 long double exponent = (op->received_total - (op->received_fresh * ((long
1773 double)
1774 op->
1775 initial_size
1776 / (long
1777 double)
1778 op->
1779 remote_set_diff)));
1780 long double value = exponent * (log2l (base) / log2l (2));
1781 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1782 {
1783 LOG (GNUNET_ERROR_TYPE_ERROR,
1784 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1785 "to many duplicated full element : %LF\n",
1786 value);
1787 GNUNET_break_op (0);
1788 fail_union_operation (op);
1789 return;
1790 }
1791 }
1792}
1793
1794
1795/**
1796 * Limit active passive switches in differential sync to configured security level
1797 * @param op
1798 */
1799static void
1800check_max_differential_rounds (struct Operation *op)
1801{
1802 double probability = op->differential_sync_iterations * (log2l (
1803 PROBABILITY_FOR_NEW_ROUND)
1804 / log2l (2));
1805 if ((-1 * SECURITY_LEVEL) > probability)
1806 {
1807 LOG (GNUNET_ERROR_TYPE_ERROR,
1808 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1809 "switches in differential sync: %u\n",
1810 op->differential_sync_iterations);
1811 GNUNET_break_op (0);
1812 fail_union_operation (op);
1813 return;
1814 }
1815}
1816
1817
1818/**
980 * Derive the IBF key from a hash code and 1819 * Derive the IBF key from a hash code and
981 * a salt. 1820 * a salt.
982 * 1821 *
@@ -1004,12 +1843,12 @@ get_ibf_key (const struct GNUNET_HashCode *src)
1004struct GetElementContext 1843struct GetElementContext
1005{ 1844{
1006 /** 1845 /**
1007 * FIXME. 1846 * Gnunet hash code in context
1008 */ 1847 */
1009 struct GNUNET_HashCode hash; 1848 struct GNUNET_HashCode hash;
1010 1849
1011 /** 1850 /**
1012 * FIXME. 1851 * Pointer to the key entry
1013 */ 1852 */
1014 struct KeyEntry *k; 1853 struct KeyEntry *k;
1015}; 1854};
@@ -1122,7 +1961,7 @@ salt_key (const struct IBF_Key *k_in,
1122 uint32_t salt, 1961 uint32_t salt,
1123 struct IBF_Key *k_out) 1962 struct IBF_Key *k_out)
1124{ 1963{
1125 int s = salt % 64; 1964 int s = (salt * 7) % 64;
1126 uint64_t x = k_in->key_val; 1965 uint64_t x = k_in->key_val;
1127 1966
1128 /* rotate ibf key */ 1967 /* rotate ibf key */
@@ -1132,14 +1971,14 @@ salt_key (const struct IBF_Key *k_in,
1132 1971
1133 1972
1134/** 1973/**
1135 * FIXME. 1974 * Reverse modification done in the salt_key function
1136 */ 1975 */
1137static void 1976static void
1138unsalt_key (const struct IBF_Key *k_in, 1977unsalt_key (const struct IBF_Key *k_in,
1139 uint32_t salt, 1978 uint32_t salt,
1140 struct IBF_Key *k_out) 1979 struct IBF_Key *k_out)
1141{ 1980{
1142 int s = salt % 64; 1981 int s = (salt * 7) % 64;
1143 uint64_t x = k_in->key_val; 1982 uint64_t x = k_in->key_val;
1144 1983
1145 x = (x << s) | (x >> (64 - s)); 1984 x = (x << s) | (x >> (64 - s));
@@ -1258,7 +2097,9 @@ prepare_ibf (struct Operation *op,
1258 2097
1259 if (NULL != op->local_ibf) 2098 if (NULL != op->local_ibf)
1260 ibf_destroy (op->local_ibf); 2099 ibf_destroy (op->local_ibf);
1261 op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 2100 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2101 op->local_ibf = ibf_create (size,
2102 ((uint8_t) op->ibf_number_buckets_per_element));
1262 if (NULL == op->local_ibf) 2103 if (NULL == op->local_ibf)
1263 { 2104 {
1264 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2105 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1283,13 +2124,23 @@ prepare_ibf (struct Operation *op,
1283 */ 2124 */
1284static int 2125static int
1285send_ibf (struct Operation *op, 2126send_ibf (struct Operation *op,
1286 uint16_t ibf_order) 2127 uint32_t ibf_size)
1287{ 2128{
1288 unsigned int buckets_sent = 0; 2129 uint64_t buckets_sent = 0;
1289 struct InvertibleBloomFilter *ibf; 2130 struct InvertibleBloomFilter *ibf;
2131 op->differential_sync_iterations++;
1290 2132
2133 /**
2134 * Enforce min size of IBF
2135 */
2136 uint32_t ibf_min_size = IBF_MIN_SIZE;
2137
2138 if (ibf_size < ibf_min_size)
2139 {
2140 ibf_size = ibf_min_size;
2141 }
1291 if (GNUNET_OK != 2142 if (GNUNET_OK !=
1292 prepare_ibf (op, 1 << ibf_order)) 2143 prepare_ibf (op, ibf_size))
1293 { 2144 {
1294 /* allocation failed */ 2145 /* allocation failed */
1295 return GNUNET_SYSERR; 2146 return GNUNET_SYSERR;
@@ -1297,45 +2148,52 @@ send_ibf (struct Operation *op,
1297 2148
1298 LOG (GNUNET_ERROR_TYPE_DEBUG, 2149 LOG (GNUNET_ERROR_TYPE_DEBUG,
1299 "sending ibf of size %u\n", 2150 "sending ibf of size %u\n",
1300 1 << ibf_order); 2151 (unsigned int) ibf_size);
1301 2152
1302 { 2153 {
1303 char name[64]; 2154 char name[64];
1304 GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); 2155
2156 GNUNET_snprintf (name,
2157 sizeof(name),
2158 "# sent IBF (order %u)",
2159 ibf_size);
1305 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); 2160 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1306 } 2161 }
1307 2162
1308 ibf = op->local_ibf; 2163 ibf = op->local_ibf;
1309 2164
1310 while (buckets_sent < (1 << ibf_order)) 2165 while (buckets_sent < ibf_size)
1311 { 2166 {
1312 unsigned int buckets_in_message; 2167 unsigned int buckets_in_message;
1313 struct GNUNET_MQ_Envelope *ev; 2168 struct GNUNET_MQ_Envelope *ev;
1314 struct IBFMessage *msg; 2169 struct IBFMessage *msg;
1315 2170
1316 buckets_in_message = (1 << ibf_order) - buckets_sent; 2171 buckets_in_message = ibf_size - buckets_sent;
1317 /* limit to maximum */ 2172 /* limit to maximum */
1318 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 2173 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1319 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 2174 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1320 2175
1321 perf_rtt.ibf.sent += 1; 2176#if MEASURE_PERFORMANCE
1322 perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE ); 2177 perf_store.ibf.sent += 1;
2178 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2179#endif
1323 ev = GNUNET_MQ_msg_extra (msg, 2180 ev = GNUNET_MQ_msg_extra (msg,
1324 buckets_in_message * IBF_BUCKET_SIZE, 2181 buckets_in_message * IBF_BUCKET_SIZE,
1325 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); 2182 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
1326 msg->reserved1 = 0; 2183 msg->ibf_size = ibf_size;
1327 msg->reserved2 = 0;
1328 msg->order = ibf_order;
1329 msg->offset = htonl (buckets_sent); 2184 msg->offset = htonl (buckets_sent);
1330 msg->salt = htonl (op->salt_send); 2185 msg->salt = htonl (op->salt_send);
2186 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2187
2188
1331 ibf_write_slice (ibf, buckets_sent, 2189 ibf_write_slice (ibf, buckets_sent,
1332 buckets_in_message, &msg[1]); 2190 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
1333 buckets_sent += buckets_in_message; 2191 buckets_sent += buckets_in_message;
1334 LOG (GNUNET_ERROR_TYPE_DEBUG, 2192 LOG (GNUNET_ERROR_TYPE_DEBUG,
1335 "ibf chunk size %u, %u/%u sent\n", 2193 "ibf chunk size %u, %llu/%u sent\n",
1336 buckets_in_message, 2194 (unsigned int) buckets_in_message,
1337 buckets_sent, 2195 (unsigned long long) buckets_sent,
1338 1 << ibf_order); 2196 (unsigned int) ibf_size);
1339 GNUNET_MQ_send (op->mq, ev); 2197 GNUNET_MQ_send (op->mq, ev);
1340 } 2198 }
1341 2199
@@ -1354,17 +2212,26 @@ send_ibf (struct Operation *op,
1354 * @return the required size of the ibf 2212 * @return the required size of the ibf
1355 */ 2213 */
1356static unsigned int 2214static unsigned int
1357get_order_from_difference (unsigned int diff) 2215get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2216 float ibf_bucket_number_factor)
1358{ 2217{
1359 unsigned int ibf_order; 2218 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2219 * Elias Summermatter (2021) in section 3.11 **/
2220 return (((int) (diff * ibf_bucket_number_factor)) | 1);
2221
2222}
1360 2223
1361 ibf_order = 2; 2224
1362 while (((1 << ibf_order) < (IBF_ALPHA * diff) || 2225static unsigned int
1363 ((1 << ibf_order) < SE_IBF_HASH_NUM)) && 2226get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
1364 (ibf_order < MAX_IBF_ORDER)) 2227 decoded_elements, unsigned int last_ibf_size)
1365 ibf_order++; 2228{
1366 // add one for correction 2229 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
1367 return ibf_order + 1; 2230 - (ibf_bucket_number_factor
2231 * decoded_elements));
2232 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2233 * Elias Summermatter (2021) in section 3.11 **/
2234 return next_size | 1;
1368} 2235}
1369 2236
1370 2237
@@ -1391,8 +2258,10 @@ send_full_element_iterator (void *cls,
1391 LOG (GNUNET_ERROR_TYPE_DEBUG, 2258 LOG (GNUNET_ERROR_TYPE_DEBUG,
1392 "Sending element %s\n", 2259 "Sending element %s\n",
1393 GNUNET_h2s (key)); 2260 GNUNET_h2s (key));
1394 perf_rtt.element_full.received += 1; 2261#if MEASURE_PERFORMANCE
1395 perf_rtt.element_full.received_var_bytes += el->size; 2262 perf_store.element_full.received += 1;
2263 perf_store.element_full.received_var_bytes += el->size;
2264#endif
1396 ev = GNUNET_MQ_msg_extra (emsg, 2265 ev = GNUNET_MQ_msg_extra (emsg,
1397 el->size, 2266 el->size,
1398 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 2267 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -1421,10 +2290,25 @@ send_full_set (struct Operation *op)
1421 "Dedicing to transmit the full set\n"); 2290 "Dedicing to transmit the full set\n");
1422 /* FIXME: use a more memory-friendly way of doing this with an 2291 /* FIXME: use a more memory-friendly way of doing this with an
1423 iterator, just as we do in the non-full case! */ 2292 iterator, just as we do in the non-full case! */
2293
2294 // Randomize Elements to send
2295 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2296 32,GNUNET_NO);
2297 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2298 GNUNET_CRYPTO_QUALITY_NONCE,
2299 UINT64_MAX);
1424 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, 2300 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1425 &send_full_element_iterator, 2301 &
2302 create_randomized_element_iterator,
1426 op); 2303 op);
1427 perf_rtt.full_done.sent += 1; 2304
2305 (void) GNUNET_CONTAINER_multihashmap_iterate (
2306 op->set->content->elements_randomized,
2307 &send_full_element_iterator,
2308 op);
2309#if MEASURE_PERFORMANCE
2310 perf_store.full_done.sent += 1;
2311#endif
1428 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 2312 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
1429 GNUNET_MQ_send (op->mq, 2313 GNUNET_MQ_send (op->mq,
1430 ev); 2314 ev);
@@ -1454,7 +2338,7 @@ check_union_p2p_strata_estimator (void *cls,
1454 msg->header.type)); 2338 msg->header.type));
1455 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2339 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1456 if ((GNUNET_NO == is_compressed) && 2340 if ((GNUNET_NO == is_compressed) &&
1457 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) 2341 (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE))
1458 { 2342 {
1459 GNUNET_break (0); 2343 GNUNET_break (0);
1460 return GNUNET_SYSERR; 2344 return GNUNET_SYSERR;
@@ -1473,14 +2357,44 @@ static void
1473handle_union_p2p_strata_estimator (void *cls, 2357handle_union_p2p_strata_estimator (void *cls,
1474 const struct StrataEstimatorMessage *msg) 2358 const struct StrataEstimatorMessage *msg)
1475{ 2359{
1476 perf_rtt.se.received += 1; 2360#if MEASURE_PERFORMANCE
1477 perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2361 perf_store.se.received += 1;
2362 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2363 StrataEstimatorMessage);
2364#endif
1478 struct Operation *op = cls; 2365 struct Operation *op = cls;
1479 struct StrataEstimator *remote_se; 2366 struct MultiStrataEstimator *remote_se;
1480 unsigned int diff; 2367 unsigned int diff;
1481 uint64_t other_size; 2368 uint64_t other_size;
1482 size_t len; 2369 size_t len;
1483 int is_compressed; 2370 int is_compressed;
2371 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2372 op->set->content->elements);
2373 // Setting peer site to receiving peer
2374 op->peer_site = 1;
2375
2376 /**
2377 * Check that the message is received only in supported phase
2378 */
2379 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2380 if (GNUNET_OK !=
2381 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2382 {
2383 GNUNET_break (0);
2384 fail_union_operation (op);
2385 return;
2386 }
2387
2388 /** Only allow 1,2,4,8 SEs **/
2389 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2390 {
2391 LOG (GNUNET_ERROR_TYPE_ERROR,
2392 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2393 msg->se_count);
2394 GNUNET_break_op (0);
2395 fail_union_operation (op);
2396 return;
2397 }
1484 2398
1485 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( 2399 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1486 msg->header.type)); 2400 msg->header.type));
@@ -1490,8 +2404,20 @@ handle_union_p2p_strata_estimator (void *cls,
1490 GNUNET_NO); 2404 GNUNET_NO);
1491 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2405 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1492 other_size = GNUNET_ntohll (msg->set_size); 2406 other_size = GNUNET_ntohll (msg->set_size);
2407 op->remote_element_count = other_size;
2408
2409 if (op->byzantine_upper_bound < op->remote_element_count)
2410 {
2411 LOG (GNUNET_ERROR_TYPE_ERROR,
2412 "Exceeded configured upper bound <%lu> of element: %u\n",
2413 op->byzantine_upper_bound,
2414 op->remote_element_count);
2415 fail_union_operation (op);
2416 return;
2417 }
2418
1493 remote_se = strata_estimator_create (SE_STRATA_COUNT, 2419 remote_se = strata_estimator_create (SE_STRATA_COUNT,
1494 SE_IBF_SIZE, 2420 SE_IBFS_TOTAL_SIZE,
1495 SE_IBF_HASH_NUM); 2421 SE_IBF_HASH_NUM);
1496 if (NULL == remote_se) 2422 if (NULL == remote_se)
1497 { 2423 {
@@ -1503,6 +2429,8 @@ handle_union_p2p_strata_estimator (void *cls,
1503 strata_estimator_read (&msg[1], 2429 strata_estimator_read (&msg[1],
1504 len, 2430 len,
1505 is_compressed, 2431 is_compressed,
2432 msg->se_count,
2433 SE_IBFS_TOTAL_SIZE,
1506 remote_se)) 2434 remote_se))
1507 { 2435 {
1508 /* decompression failed */ 2436 /* decompression failed */
@@ -1511,11 +2439,76 @@ handle_union_p2p_strata_estimator (void *cls,
1511 return; 2439 return;
1512 } 2440 }
1513 GNUNET_assert (NULL != op->se); 2441 GNUNET_assert (NULL != op->se);
1514 diff = strata_estimator_difference (remote_se, 2442 strata_estimator_difference (remote_se,
1515 op->se); 2443 op->se);
2444
2445 /* Calculate remote local diff */
2446 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2447 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2448
2449 /* Prevent estimations from overshooting max element */
2450 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2451 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2452 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2453 diff_local = op->byzantine_upper_bound - op->local_element_count;
2454 if ((diff_remote < 0) || (diff_local < 0))
2455 {
2456 strata_estimator_destroy (remote_se);
2457 LOG (GNUNET_ERROR_TYPE_ERROR,
2458 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2459 "malicious: remote diff %ld, local diff: %ld\n",
2460 diff_remote, diff_local);
2461 GNUNET_break_op (0);
2462 fail_union_operation (op);
2463 return;
2464 }
1516 2465
1517 if (diff > 200) 2466 /* Make estimation more precise in initial sync cases */
1518 diff = diff * 3 / 2; 2467 if (0 == op->remote_element_count)
2468 {
2469 diff_remote = 0;
2470 diff_local = op->local_element_count;
2471 }
2472 if (0 == op->local_element_count)
2473 {
2474 diff_local = 0;
2475 diff_remote = op->remote_element_count;
2476 }
2477
2478 diff = diff_remote + diff_local;
2479 op->remote_set_diff = diff_remote;
2480
2481 /** Calculate avg element size if not initial sync **/
2482 uint64_t avg_element_size = 0;
2483 if (0 < op->local_element_count)
2484 {
2485 op->total_elements_size_local = 0;
2486 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2487 &
2488 determinate_avg_element_size_iterator,
2489 op);
2490 avg_element_size = op->total_elements_size_local / op->local_element_count;
2491 }
2492
2493 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2494 GNUNET_CONTAINER_multihashmap_size (
2495 op->set->content->
2496 elements),
2497 op->
2498 remote_element_count,
2499 diff_remote,
2500 diff_local,
2501 op->
2502 rtt_bandwidth_tradeoff,
2503 op->
2504 ibf_bucket_number_factor);
2505
2506#if MEASURE_PERFORMANCE
2507 perf_store.se_diff_local = diff_local;
2508 perf_store.se_diff_remote = diff_remote;
2509 perf_store.se_diff = diff;
2510 perf_store.mode_of_operation = op->mode_of_operation;
2511#endif
1519 2512
1520 strata_estimator_destroy (remote_se); 2513 strata_estimator_destroy (remote_se);
1521 strata_estimator_destroy (op->se); 2514 strata_estimator_destroy (op->se);
@@ -1523,7 +2516,8 @@ handle_union_p2p_strata_estimator (void *cls,
1523 LOG (GNUNET_ERROR_TYPE_DEBUG, 2516 LOG (GNUNET_ERROR_TYPE_DEBUG,
1524 "got se diff=%d, using ibf size %d\n", 2517 "got se diff=%d, using ibf size %d\n",
1525 diff, 2518 diff,
1526 1U << get_order_from_difference (diff)); 2519 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2520 op->ibf_bucket_number_factor));
1527 2521
1528 { 2522 {
1529 char *set_debug; 2523 char *set_debug;
@@ -1546,16 +2540,8 @@ handle_union_p2p_strata_estimator (void *cls,
1546 return; 2540 return;
1547 } 2541 }
1548 2542
1549 LOG (GNUNET_ERROR_TYPE_ERROR,
1550 "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff);
1551
1552
1553 /**
1554 * Added rtt_bandwidth_tradeoff directly need future improvements
1555 */
1556 if ((GNUNET_YES == op->force_full) || 2543 if ((GNUNET_YES == op->force_full) ||
1557 (diff > op->initial_size / 4) || 2544 (op->mode_of_operation != DIFFERENTIAL_SYNC))
1558 (0 == other_size))
1559 { 2545 {
1560 LOG (GNUNET_ERROR_TYPE_DEBUG, 2546 LOG (GNUNET_ERROR_TYPE_DEBUG,
1561 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", 2547 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
@@ -1565,9 +2551,17 @@ handle_union_p2p_strata_estimator (void *cls,
1565 "# of full sends", 2551 "# of full sends",
1566 1, 2552 1,
1567 GNUNET_NO); 2553 GNUNET_NO);
1568 if ((op->initial_size <= other_size) || 2554 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
1569 (0 == other_size))
1570 { 2555 {
2556 struct TransmitFullMessage *signal_msg;
2557 struct GNUNET_MQ_Envelope *ev;
2558 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2559 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL);
2560 signal_msg->remote_set_difference = htonl (diff_local);
2561 signal_msg->remote_set_size = htonl (op->local_element_count);
2562 signal_msg->local_set_difference = htonl (diff_remote);
2563 GNUNET_MQ_send (op->mq,
2564 ev);
1571 send_full_set (op); 2565 send_full_set (op);
1572 } 2566 }
1573 else 2567 else
@@ -1577,9 +2571,15 @@ handle_union_p2p_strata_estimator (void *cls,
1577 LOG (GNUNET_ERROR_TYPE_DEBUG, 2571 LOG (GNUNET_ERROR_TYPE_DEBUG,
1578 "Telling other peer that we expect its full set\n"); 2572 "Telling other peer that we expect its full set\n");
1579 op->phase = PHASE_FULL_RECEIVING; 2573 op->phase = PHASE_FULL_RECEIVING;
1580 perf_rtt.request_full.sent += 1; 2574#if MEASURE_PERFORMANCE
1581 ev = GNUNET_MQ_msg_header ( 2575 perf_store.request_full.sent += 1;
1582 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); 2576#endif
2577 struct TransmitFullMessage *signal_msg;
2578 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2579 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
2580 signal_msg->remote_set_difference = htonl (diff_local);
2581 signal_msg->remote_set_size = htonl (op->local_element_count);
2582 signal_msg->local_set_difference = htonl (diff_remote);
1583 GNUNET_MQ_send (op->mq, 2583 GNUNET_MQ_send (op->mq,
1584 ev); 2584 ev);
1585 } 2585 }
@@ -1592,7 +2592,9 @@ handle_union_p2p_strata_estimator (void *cls,
1592 GNUNET_NO); 2592 GNUNET_NO);
1593 if (GNUNET_OK != 2593 if (GNUNET_OK !=
1594 send_ibf (op, 2594 send_ibf (op,
1595 get_order_from_difference (diff))) 2595 get_size_from_difference (diff,
2596 op->ibf_number_buckets_per_element,
2597 op->ibf_bucket_number_factor)))
1596 { 2598 {
1597 /* Internal error, best we can do is shut the connection */ 2599 /* Internal error, best we can do is shut the connection */
1598 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2600 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1625,15 +2627,64 @@ send_offers_iterator (void *cls,
1625 2627
1626 /* Detect 32-bit key collision for the 64-bit IBF keys. */ 2628 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1627 if (ke->ibf_key.key_val != sec->ibf_key.key_val) 2629 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2630 {
2631 op->active_passive_switch_required = true;
1628 return GNUNET_YES; 2632 return GNUNET_YES;
2633 }
1629 2634
1630 perf_rtt.offer.sent += 1; 2635 /* Prevent implementation from sending a offer multiple times in case of roll switch */
1631 perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); 2636 if (GNUNET_YES ==
2637 is_message_in_message_control_flow (
2638 op->message_control_flow,
2639 &ke->element->element_hash,
2640 OFFER_MESSAGE)
2641 )
2642 {
2643 LOG (GNUNET_ERROR_TYPE_DEBUG,
2644 "Skipping already sent processed element offer!\n");
2645 return GNUNET_YES;
2646 }
1632 2647
2648 /* Save send offer message for message control */
2649 if (GNUNET_YES !=
2650 update_message_control_flow (
2651 op->message_control_flow,
2652 MSG_CFS_SENT,
2653 &ke->element->element_hash,
2654 OFFER_MESSAGE)
2655 )
2656 {
2657 LOG (GNUNET_ERROR_TYPE_ERROR,
2658 "Double offer message sent found!\n");
2659 GNUNET_break (0);
2660 fail_union_operation (op);
2661 return GNUNET_NO;
2662 }
2663 ;
2664
2665 /* Mark element to be expected to received */
2666 if (GNUNET_YES !=
2667 update_message_control_flow (
2668 op->message_control_flow,
2669 MSG_CFS_EXPECTED,
2670 &ke->element->element_hash,
2671 DEMAND_MESSAGE)
2672 )
2673 {
2674 LOG (GNUNET_ERROR_TYPE_ERROR,
2675 "Double demand received found!\n");
2676 GNUNET_break (0);
2677 fail_union_operation (op);
2678 return GNUNET_NO;
2679 }
2680 ;
2681#if MEASURE_PERFORMANCE
2682 perf_store.offer.sent += 1;
2683 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2684#endif
1633 ev = GNUNET_MQ_msg_header_extra (mh, 2685 ev = GNUNET_MQ_msg_header_extra (mh,
1634 sizeof(struct GNUNET_HashCode), 2686 sizeof(struct GNUNET_HashCode),
1635 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); 2687 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
1636
1637 GNUNET_assert (NULL != ev); 2688 GNUNET_assert (NULL != ev);
1638 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; 2689 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1639 LOG (GNUNET_ERROR_TYPE_DEBUG, 2690 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1651,7 +2702,7 @@ send_offers_iterator (void *cls,
1651 * @param op union operation 2702 * @param op union operation
1652 * @param ibf_key IBF key of interest 2703 * @param ibf_key IBF key of interest
1653 */ 2704 */
1654static void 2705void
1655send_offers_for_key (struct Operation *op, 2706send_offers_for_key (struct Operation *op,
1656 struct IBF_Key ibf_key) 2707 struct IBF_Key ibf_key)
1657{ 2708{
@@ -1694,6 +2745,7 @@ decode_and_send (struct Operation *op)
1694 /* allocation failed */ 2745 /* allocation failed */
1695 return GNUNET_SYSERR; 2746 return GNUNET_SYSERR;
1696 } 2747 }
2748
1697 diff_ibf = ibf_dup (op->local_ibf); 2749 diff_ibf = ibf_dup (op->local_ibf);
1698 ibf_subtract (diff_ibf, 2750 ibf_subtract (diff_ibf,
1699 op->remote_ibf); 2751 op->remote_ibf);
@@ -1706,7 +2758,7 @@ decode_and_send (struct Operation *op)
1706 diff_ibf->size); 2758 diff_ibf->size);
1707 2759
1708 num_decoded = 0; 2760 num_decoded = 0;
1709 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ 2761 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1710 2762
1711 while (1) 2763 while (1)
1712 { 2764 {
@@ -1738,23 +2790,36 @@ decode_and_send (struct Operation *op)
1738 if ((GNUNET_SYSERR == res) || 2790 if ((GNUNET_SYSERR == res) ||
1739 (GNUNET_YES == cycle_detected)) 2791 (GNUNET_YES == cycle_detected))
1740 { 2792 {
1741 int next_order; 2793 uint32_t next_size;
1742 next_order = 0; 2794 /** Enforce odd ibf size **/
1743 while (1 << next_order < diff_ibf->size) 2795
1744 next_order++; 2796 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
1745 next_order++; 2797 diff_ibf->size);
1746 if (next_order <= MAX_IBF_ORDER) 2798 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2799 * Elias Summermatter (2021) in section 3.11 **/
2800 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2801
2802 if (next_size<ibf_min_size)
2803 next_size = ibf_min_size;
2804
2805
2806 if (next_size <= MAX_IBF_SIZE)
1747 { 2807 {
1748 LOG (GNUNET_ERROR_TYPE_DEBUG, 2808 LOG (GNUNET_ERROR_TYPE_DEBUG,
1749 "decoding failed, sending larger ibf (size %u)\n", 2809 "decoding failed, sending larger ibf (size %u)\n",
1750 1 << next_order); 2810 next_size);
1751 GNUNET_STATISTICS_update (_GSS_statistics, 2811 GNUNET_STATISTICS_update (_GSS_statistics,
1752 "# of IBF retries", 2812 "# of IBF retries",
1753 1, 2813 1,
1754 GNUNET_NO); 2814 GNUNET_NO);
1755 op->salt_send++; 2815#if MEASURE_PERFORMANCE
2816 perf_store.active_passive_switches += 1;
2817#endif
2818
2819 op->salt_send = op->salt_receive++;
2820
1756 if (GNUNET_OK != 2821 if (GNUNET_OK !=
1757 send_ibf (op, next_order)) 2822 send_ibf (op, next_size))
1758 { 2823 {
1759 /* Internal error, best we can do is shut the connection */ 2824 /* Internal error, best we can do is shut the connection */
1760 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2825 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1786,7 +2851,9 @@ decode_and_send (struct Operation *op)
1786 LOG (GNUNET_ERROR_TYPE_DEBUG, 2851 LOG (GNUNET_ERROR_TYPE_DEBUG,
1787 "transmitted all values, sending DONE\n"); 2852 "transmitted all values, sending DONE\n");
1788 2853
1789 perf_rtt.done.sent += 1; 2854#if MEASURE_PERFORMANCE
2855 perf_store.done.sent += 1;
2856#endif
1790 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 2857 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1791 GNUNET_MQ_send (op->mq, ev); 2858 GNUNET_MQ_send (op->mq, ev);
1792 /* We now wait until we get a DONE message back 2859 /* We now wait until we get a DONE message back
@@ -1797,7 +2864,6 @@ decode_and_send (struct Operation *op)
1797 if (1 == side) 2864 if (1 == side)
1798 { 2865 {
1799 struct IBF_Key unsalted_key; 2866 struct IBF_Key unsalted_key;
1800
1801 unsalt_key (&key, 2867 unsalt_key (&key,
1802 op->salt_receive, 2868 op->salt_receive,
1803 &unsalted_key); 2869 &unsalted_key);
@@ -1809,8 +2875,29 @@ decode_and_send (struct Operation *op)
1809 struct GNUNET_MQ_Envelope *ev; 2875 struct GNUNET_MQ_Envelope *ev;
1810 struct InquiryMessage *msg; 2876 struct InquiryMessage *msg;
1811 2877
1812 perf_rtt.inquery.sent += 1; 2878#if MEASURE_PERFORMANCE
1813 perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key); 2879 perf_store.inquery.sent += 1;
2880 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2881#endif
2882
2883 /** Add sent inquiries to hashmap for flow control **/
2884 struct GNUNET_HashContext *hashed_key_context =
2885 GNUNET_CRYPTO_hash_context_start ();
2886 struct GNUNET_HashCode *hashed_key = (struct
2887 GNUNET_HashCode*) GNUNET_malloc (
2888 sizeof(struct GNUNET_HashCode));
2889 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT;
2890 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2891 &key,
2892 sizeof(struct IBF_Key));
2893 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2894 hashed_key);
2895 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2896 hashed_key,
2897 &mcfs,
2898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
2899 );
2900
1814 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth 2901 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1815 * the effort additional complexity. */ 2902 * the effort additional complexity. */
1816 ev = GNUNET_MQ_msg_extra (msg, 2903 ev = GNUNET_MQ_msg_extra (msg,
@@ -1836,6 +2923,100 @@ decode_and_send (struct Operation *op)
1836 2923
1837 2924
1838/** 2925/**
2926 * Check send full message received from other peer
2927 * @param cls
2928 * @param msg
2929 * @return
2930 */
2931
2932static int
2933check_union_p2p_send_full (void *cls,
2934 const struct TransmitFullMessage *msg)
2935{
2936 return GNUNET_OK;
2937}
2938
2939
2940/**
2941 * Handle send full message received from other peer
2942 *
2943 * @param cls
2944 * @param msg
2945 */
2946static void
2947handle_union_p2p_send_full (void *cls,
2948 const struct TransmitFullMessage *msg)
2949{
2950 struct Operation *op = cls;
2951
2952 /**
2953 * Check that the message is received only in supported phase
2954 */
2955 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2956 if (GNUNET_OK !=
2957 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2958 {
2959 GNUNET_break (0);
2960 fail_union_operation (op);
2961 return;
2962 }
2963
2964 /** write received values to operator**/
2965 op->remote_element_count = ntohl (msg->remote_set_size);
2966 op->remote_set_diff = ntohl (msg->remote_set_difference);
2967 op->local_set_diff = ntohl (msg->local_set_difference);
2968
2969 /** Check byzantine limits **/
2970 if (check_byzantine_bounds (op) != GNUNET_OK)
2971 {
2972 LOG (GNUNET_ERROR_TYPE_ERROR,
2973 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2974 "criteria\n");
2975 GNUNET_break_op (0);
2976 fail_union_operation (op);
2977 return;
2978 }
2979
2980 /** Calculate avg element size if not initial sync **/
2981 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2982 op->set->content->elements);
2983 uint64_t avg_element_size = 0;
2984 if (0 < op->local_element_count)
2985 {
2986 op->total_elements_size_local = 0;
2987 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2988 &
2989 determinate_avg_element_size_iterator,
2990 op);
2991 avg_element_size = op->total_elements_size_local / op->local_element_count;
2992 }
2993
2994 /** Validate mode of operation **/
2995 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2996 op->
2997 remote_element_count,
2998 op->
2999 local_element_count,
3000 op->local_set_diff,
3001 op->remote_set_diff,
3002 op->
3003 rtt_bandwidth_tradeoff,
3004 op->
3005 ibf_bucket_number_factor);
3006 if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation)
3007 {
3008 LOG (GNUNET_ERROR_TYPE_ERROR,
3009 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3010 " : %d\n", mode_of_operation);
3011 GNUNET_break_op (0);
3012 fail_union_operation (op);
3013 return;
3014 }
3015 op->phase = PHASE_FULL_RECEIVING;
3016}
3017
3018
3019/**
1839 * Check an IBF message from a remote peer. 3020 * Check an IBF message from a remote peer.
1840 * 3021 *
1841 * Reassemble the IBF from multiple pieces, and 3022 * Reassemble the IBF from multiple pieces, and
@@ -1872,7 +3053,8 @@ check_union_p2p_ibf (void *cls,
1872 GNUNET_break_op (0); 3053 GNUNET_break_op (0);
1873 return GNUNET_SYSERR; 3054 return GNUNET_SYSERR;
1874 } 3055 }
1875 if (1 << msg->order != op->remote_ibf->size) 3056
3057 if (msg->ibf_size != op->remote_ibf->size)
1876 { 3058 {
1877 GNUNET_break_op (0); 3059 GNUNET_break_op (0);
1878 return GNUNET_SYSERR; 3060 return GNUNET_SYSERR;
@@ -1909,9 +3091,26 @@ handle_union_p2p_ibf (void *cls,
1909{ 3091{
1910 struct Operation *op = cls; 3092 struct Operation *op = cls;
1911 unsigned int buckets_in_message; 3093 unsigned int buckets_in_message;
3094 /**
3095 * Check that the message is received only in supported phase
3096 */
3097 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3098 PHASE_PASSIVE_DECODING};
3099 if (GNUNET_OK !=
3100 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3101 {
3102 GNUNET_break (0);
3103 fail_union_operation (op);
3104 return;
3105 }
3106 op->differential_sync_iterations++;
3107 check_max_differential_rounds (op);
3108 op->active_passive_switch_required = false;
1912 3109
1913 perf_rtt.ibf.received += 1; 3110#if MEASURE_PERFORMANCE
1914 perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); 3111 perf_store.ibf.received += 1;
3112 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3113#endif
1915 3114
1916 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) 3115 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1917 / IBF_BUCKET_SIZE; 3116 / IBF_BUCKET_SIZE;
@@ -1922,8 +3121,10 @@ handle_union_p2p_ibf (void *cls,
1922 GNUNET_assert (NULL == op->remote_ibf); 3121 GNUNET_assert (NULL == op->remote_ibf);
1923 LOG (GNUNET_ERROR_TYPE_DEBUG, 3122 LOG (GNUNET_ERROR_TYPE_DEBUG,
1924 "Creating new ibf of size %u\n", 3123 "Creating new ibf of size %u\n",
1925 1 << msg->order); 3124 ntohl (msg->ibf_size));
1926 op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); 3125 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3126 op->remote_ibf = ibf_create (msg->ibf_size,
3127 ((uint8_t) op->ibf_number_buckets_per_element));
1927 op->salt_receive = ntohl (msg->salt); 3128 op->salt_receive = ntohl (msg->salt);
1928 LOG (GNUNET_ERROR_TYPE_DEBUG, 3129 LOG (GNUNET_ERROR_TYPE_DEBUG,
1929 "Receiving new IBF with salt %u\n", 3130 "Receiving new IBF with salt %u\n",
@@ -1954,7 +3155,7 @@ handle_union_p2p_ibf (void *cls,
1954 ibf_read_slice (&msg[1], 3155 ibf_read_slice (&msg[1],
1955 op->ibf_buckets_received, 3156 op->ibf_buckets_received,
1956 buckets_in_message, 3157 buckets_in_message,
1957 op->remote_ibf); 3158 op->remote_ibf, msg->ibf_counter_bit_length);
1958 op->ibf_buckets_received += buckets_in_message; 3159 op->ibf_buckets_received += buckets_in_message;
1959 3160
1960 if (op->ibf_buckets_received == op->remote_ibf->size) 3161 if (op->ibf_buckets_received == op->remote_ibf->size)
@@ -2030,18 +3231,24 @@ maybe_finish (struct Operation *op)
2030 3231
2031 num_demanded = GNUNET_CONTAINER_multihashmap_size ( 3232 num_demanded = GNUNET_CONTAINER_multihashmap_size (
2032 op->demanded_hashes); 3233 op->demanded_hashes);
2033 3234 int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3235 op->message_control_flow,
3236 &
3237 determinate_done_message_iterator,
3238 op);
2034 if (PHASE_FINISH_WAITING == op->phase) 3239 if (PHASE_FINISH_WAITING == op->phase)
2035 { 3240 {
2036 LOG (GNUNET_ERROR_TYPE_DEBUG, 3241 LOG (GNUNET_ERROR_TYPE_DEBUG,
2037 "In PHASE_FINISH_WAITING, pending %u demands\n", 3242 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
2038 num_demanded); 3243 num_demanded, op->peer_site);
2039 if (0 == num_demanded) 3244 if (-1 != send_done)
2040 { 3245 {
2041 struct GNUNET_MQ_Envelope *ev; 3246 struct GNUNET_MQ_Envelope *ev;
2042 3247
2043 op->phase = PHASE_FINISHED; 3248 op->phase = PHASE_FINISHED;
2044 perf_rtt.done.sent += 1; 3249#if MEASURE_PERFORMANCE
3250 perf_store.done.sent += 1;
3251#endif
2045 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 3252 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
2046 GNUNET_MQ_send (op->mq, 3253 GNUNET_MQ_send (op->mq,
2047 ev); 3254 ev);
@@ -2052,9 +3259,9 @@ maybe_finish (struct Operation *op)
2052 if (PHASE_FINISH_CLOSING == op->phase) 3259 if (PHASE_FINISH_CLOSING == op->phase)
2053 { 3260 {
2054 LOG (GNUNET_ERROR_TYPE_DEBUG, 3261 LOG (GNUNET_ERROR_TYPE_DEBUG,
2055 "In PHASE_FINISH_CLOSING, pending %u demands\n", 3262 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
2056 num_demanded); 3263 num_demanded, op->peer_site);
2057 if (0 == num_demanded) 3264 if (-1 != send_done)
2058 { 3265 {
2059 op->phase = PHASE_FINISHED; 3266 op->phase = PHASE_FINISHED;
2060 send_client_done (op); 3267 send_client_done (op);
@@ -2102,11 +3309,25 @@ handle_union_p2p_elements (void *cls,
2102 struct KeyEntry *ke; 3309 struct KeyEntry *ke;
2103 uint16_t element_size; 3310 uint16_t element_size;
2104 3311
3312 /**
3313 * Check that the message is received only in supported phase
3314 */
3315 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3316 PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING};
3317 if (GNUNET_OK !=
3318 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3319 {
3320 GNUNET_break (0);
3321 fail_union_operation (op);
3322 return;
3323 }
2105 3324
2106 element_size = ntohs (emsg->header.size) - sizeof(struct 3325 element_size = ntohs (emsg->header.size) - sizeof(struct
2107 GNUNET_SETU_ElementMessage); 3326 GNUNET_SETU_ElementMessage);
2108 perf_rtt.element.received += 1; 3327#if MEASURE_PERFORMANCE
2109 perf_rtt.element.received_var_bytes += element_size; 3328 perf_store.element.received += 1;
3329 perf_store.element.received_var_bytes += element_size;
3330#endif
2110 3331
2111 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3332 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2112 GNUNET_memcpy (&ee[1], 3333 GNUNET_memcpy (&ee[1],
@@ -2129,6 +3350,21 @@ handle_union_p2p_elements (void *cls,
2129 return; 3350 return;
2130 } 3351 }
2131 3352
3353 if (GNUNET_OK !=
3354 update_message_control_flow (
3355 op->message_control_flow,
3356 MSG_CFS_RECEIVED,
3357 &ee->element_hash,
3358 ELEMENT_MESSAGE)
3359 )
3360 {
3361 LOG (GNUNET_ERROR_TYPE_ERROR,
3362 "An element has been received more than once!\n");
3363 GNUNET_break (0);
3364 fail_union_operation (op);
3365 return;
3366 }
3367
2132 LOG (GNUNET_ERROR_TYPE_DEBUG, 3368 LOG (GNUNET_ERROR_TYPE_DEBUG,
2133 "Got element (size %u, hash %s) from peer\n", 3369 "Got element (size %u, hash %s) from peer\n",
2134 (unsigned int) element_size, 3370 (unsigned int) element_size,
@@ -2217,33 +3453,25 @@ handle_union_p2p_full_element (void *cls,
2217 struct KeyEntry *ke; 3453 struct KeyEntry *ke;
2218 uint16_t element_size; 3454 uint16_t element_size;
2219 3455
2220 3456 /**
2221 3457 * Check that the message is received only in supported phase
2222 if(PHASE_EXPECT_IBF == op->phase) { 3458 */
2223 op->phase = PHASE_FULL_RECEIVING; 3459 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
2224 } 3460 if (GNUNET_OK !=
2225 3461 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2226
2227
2228 /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */
2229 if ((PHASE_FULL_RECEIVING != op->phase) &&
2230 (PHASE_FULL_SENDING != op->phase))
2231 { 3462 {
2232 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 3463 GNUNET_break (0);
2233 "Handle full element phase is %u\n", 3464 fail_union_operation (op);
2234 (unsigned) op->phase); 3465 return;
2235 GNUNET_break_op (0);
2236 fail_union_operation (op);
2237 return;
2238 } 3466 }
2239 3467
2240
2241
2242 element_size = ntohs (emsg->header.size) 3468 element_size = ntohs (emsg->header.size)
2243 - sizeof(struct GNUNET_SETU_ElementMessage); 3469 - sizeof(struct GNUNET_SETU_ElementMessage);
2244 3470
2245 perf_rtt.element_full.received += 1; 3471#if MEASURE_PERFORMANCE
2246 perf_rtt.element_full.received_var_bytes += element_size; 3472 perf_store.element_full.received += 1;
3473 perf_store.element_full.received_var_bytes += element_size;
3474#endif
2247 3475
2248 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3476 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2249 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 3477 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
@@ -2268,17 +3496,15 @@ handle_union_p2p_full_element (void *cls,
2268 GNUNET_NO); 3496 GNUNET_NO);
2269 3497
2270 op->received_total++; 3498 op->received_total++;
2271
2272 ke = op_get_element (op, 3499 ke = op_get_element (op,
2273 &ee->element_hash); 3500 &ee->element_hash);
2274 if (NULL != ke) 3501 if (NULL != ke)
2275 { 3502 {
2276 /* Got repeated element. Should not happen since
2277 * we track demands. */
2278 GNUNET_STATISTICS_update (_GSS_statistics, 3503 GNUNET_STATISTICS_update (_GSS_statistics,
2279 "# repeated elements", 3504 "# repeated elements",
2280 1, 3505 1,
2281 GNUNET_NO); 3506 GNUNET_NO);
3507 full_sync_plausibility_check (op);
2282 ke->received = GNUNET_YES; 3508 ke->received = GNUNET_YES;
2283 GNUNET_free (ee); 3509 GNUNET_free (ee);
2284 } 3510 }
@@ -2294,15 +3520,15 @@ handle_union_p2p_full_element (void *cls,
2294 GNUNET_SETU_STATUS_ADD_LOCAL); 3520 GNUNET_SETU_STATUS_ADD_LOCAL);
2295 } 3521 }
2296 3522
3523
2297 if ((GNUNET_YES == op->byzantine) && 3524 if ((GNUNET_YES == op->byzantine) &&
2298 (op->received_total > 384 + op->received_fresh * 4) && 3525 (op->received_total > op->remote_element_count) )
2299 (op->received_fresh < op->received_total / 6))
2300 { 3526 {
2301 /* The other peer gave us lots of old elements, there's something wrong. */ 3527 /* The other peer gave us lots of old elements, there's something wrong. */
2302 LOG (GNUNET_ERROR_TYPE_ERROR, 3528 LOG (GNUNET_ERROR_TYPE_ERROR,
2303 "Other peer sent only %llu/%llu fresh elements, failing operation\n", 3529 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
2304 (unsigned long long) op->received_fresh, 3530 (unsigned long long) op->received_total,
2305 (unsigned long long) op->received_total); 3531 (unsigned long long) op->remote_element_count);
2306 GNUNET_break_op (0); 3532 GNUNET_break_op (0);
2307 fail_union_operation (op); 3533 fail_union_operation (op);
2308 return; 3534 return;
@@ -2356,18 +3582,50 @@ handle_union_p2p_inquiry (void *cls,
2356 const struct IBF_Key *ibf_key; 3582 const struct IBF_Key *ibf_key;
2357 unsigned int num_keys; 3583 unsigned int num_keys;
2358 3584
2359 perf_rtt.inquery.received += 1; 3585 /**
2360 perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage)); 3586 * Check that the message is received only in supported phase
3587 */
3588 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3589 if (GNUNET_OK !=
3590 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3591 {
3592 GNUNET_break (0);
3593 fail_union_operation (op);
3594 return;
3595 }
3596
3597#if MEASURE_PERFORMANCE
3598 perf_store.inquery.received += 1;
3599 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3600 - sizeof(struct InquiryMessage));
3601#endif
2361 3602
2362 LOG (GNUNET_ERROR_TYPE_DEBUG, 3603 LOG (GNUNET_ERROR_TYPE_DEBUG,
2363 "Received union inquiry\n"); 3604 "Received union inquiry\n");
2364 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) 3605 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2365 / sizeof(struct IBF_Key); 3606 / sizeof(struct IBF_Key);
2366 ibf_key = (const struct IBF_Key *) &msg[1]; 3607 ibf_key = (const struct IBF_Key *) &msg[1];
3608
3609 /** Add received inquiries to hashmap for flow control **/
3610 struct GNUNET_HashContext *hashed_key_context =
3611 GNUNET_CRYPTO_hash_context_start ();
3612 struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3613 sizeof(struct GNUNET_HashCode));;
3614 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED;
3615 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3616 &ibf_key,
3617 sizeof(struct IBF_Key));
3618 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3619 hashed_key);
3620 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3621 hashed_key,
3622 &mcfs,
3623 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
3624 );
3625
2367 while (0 != num_keys--) 3626 while (0 != num_keys--)
2368 { 3627 {
2369 struct IBF_Key unsalted_key; 3628 struct IBF_Key unsalted_key;
2370
2371 unsalt_key (ibf_key, 3629 unsalt_key (ibf_key,
2372 ntohl (msg->salt), 3630 ntohl (msg->salt),
2373 &unsalted_key); 3631 &unsalted_key);
@@ -2402,7 +3660,9 @@ send_missing_full_elements_iter (void *cls,
2402 3660
2403 if (GNUNET_YES == ke->received) 3661 if (GNUNET_YES == ke->received)
2404 return GNUNET_YES; 3662 return GNUNET_YES;
2405 perf_rtt.element_full.received += 1; 3663#if MEASURE_PERFORMANCE
3664 perf_store.element_full.received += 1;
3665#endif
2406 ev = GNUNET_MQ_msg_extra (emsg, 3666 ev = GNUNET_MQ_msg_extra (emsg,
2407 ee->element.size, 3667 ee->element.size,
2408 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 3668 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -2422,18 +3682,84 @@ send_missing_full_elements_iter (void *cls,
2422 * @param cls closure, a set union operation 3682 * @param cls closure, a set union operation
2423 * @param mh the demand message 3683 * @param mh the demand message
2424 */ 3684 */
3685static int
3686check_union_p2p_request_full (void *cls,
3687 const struct TransmitFullMessage *mh)
3688{
3689 return GNUNET_OK;
3690}
3691
3692
2425static void 3693static void
2426handle_union_p2p_request_full (void *cls, 3694handle_union_p2p_request_full (void *cls,
2427 const struct GNUNET_MessageHeader *mh) 3695 const struct TransmitFullMessage *msg)
2428{ 3696{
2429 struct Operation *op = cls; 3697 struct Operation *op = cls;
2430 3698
2431 perf_rtt.request_full.received += 1; 3699 /**
3700 * Check that the message is received only in supported phase
3701 */
3702 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3703 if (GNUNET_OK !=
3704 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3705 {
3706 GNUNET_break (0);
3707 fail_union_operation (op);
3708 return;
3709 }
2432 3710
2433 LOG (GNUNET_ERROR_TYPE_DEBUG, 3711 op->remote_element_count = ntohl (msg->remote_set_size);
3712 op->remote_set_diff = ntohl (msg->remote_set_difference);
3713 op->local_set_diff = ntohl (msg->local_set_difference);
3714
3715
3716 if (check_byzantine_bounds (op) != GNUNET_OK)
3717 {
3718 LOG (GNUNET_ERROR_TYPE_ERROR,
3719 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3720 "criteria\n");
3721 GNUNET_break_op (0);
3722 fail_union_operation (op);
3723 return;
3724 }
3725
3726#if MEASURE_PERFORMANCE
3727 perf_store.request_full.received += 1;
3728#endif
3729
3730 LOG (GNUNET_ERROR_TYPE_DEBUG,
2434 "Received request for full set transmission\n"); 3731 "Received request for full set transmission\n");
2435 if (PHASE_EXPECT_IBF != op->phase) 3732
3733 /** Calculate avg element size if not initial sync **/
3734 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3735 op->set->content->elements);
3736 uint64_t avg_element_size = 0;
3737 if (0 < op->local_element_count)
3738 {
3739 op->total_elements_size_local = 0;
3740 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3741 &
3742 determinate_avg_element_size_iterator,
3743 op);
3744 avg_element_size = op->total_elements_size_local / op->local_element_count;
3745 }
3746
3747 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3748 op->
3749 remote_element_count,
3750 op->
3751 local_element_count,
3752 op->local_set_diff,
3753 op->remote_set_diff,
3754 op->
3755 rtt_bandwidth_tradeoff,
3756 op->
3757 ibf_bucket_number_factor);
3758 if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation)
2436 { 3759 {
3760 LOG (GNUNET_ERROR_TYPE_ERROR,
3761 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3762 " : %d\n", mode_of_operation);
2437 GNUNET_break_op (0); 3763 GNUNET_break_op (0);
2438 fail_union_operation (op); 3764 fail_union_operation (op);
2439 return; 3765 return;
@@ -2458,7 +3784,21 @@ handle_union_p2p_full_done (void *cls,
2458{ 3784{
2459 struct Operation *op = cls; 3785 struct Operation *op = cls;
2460 3786
2461 perf_rtt.full_done.received += 1; 3787 /**
3788 * Check that the message is received only in supported phase
3789 */
3790 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3791 if (GNUNET_OK !=
3792 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3793 {
3794 GNUNET_break (0);
3795 fail_union_operation (op);
3796 return;
3797 }
3798
3799#if MEASURE_PERFORMANCE
3800 perf_store.full_done.received += 1;
3801#endif
2462 3802
2463 switch (op->phase) 3803 switch (op->phase)
2464 { 3804 {
@@ -2466,6 +3806,19 @@ handle_union_p2p_full_done (void *cls,
2466 { 3806 {
2467 struct GNUNET_MQ_Envelope *ev; 3807 struct GNUNET_MQ_Envelope *ev;
2468 3808
3809 if ((GNUNET_YES == op->byzantine) &&
3810 (op->received_total != op->remote_element_count) )
3811 {
3812 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3813 LOG (GNUNET_ERROR_TYPE_ERROR,
3814 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3815 (unsigned long long) op->received_total,
3816 (unsigned long long) op->remote_element_count);
3817 GNUNET_break_op (0);
3818 fail_union_operation (op);
3819 return;
3820 }
3821
2469 LOG (GNUNET_ERROR_TYPE_DEBUG, 3822 LOG (GNUNET_ERROR_TYPE_DEBUG,
2470 "got FULL DONE, sending elements that other peer is missing\n"); 3823 "got FULL DONE, sending elements that other peer is missing\n");
2471 3824
@@ -2473,7 +3826,9 @@ handle_union_p2p_full_done (void *cls,
2473 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, 3826 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
2474 &send_missing_full_elements_iter, 3827 &send_missing_full_elements_iter,
2475 op); 3828 op);
2476 perf_rtt.full_done.sent += 1; 3829#if MEASURE_PERFORMANCE
3830 perf_store.full_done.sent += 1;
3831#endif
2477 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 3832 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
2478 GNUNET_MQ_send (op->mq, 3833 GNUNET_MQ_send (op->mq,
2479 ev); 3834 ev);
@@ -2552,8 +3907,23 @@ handle_union_p2p_demand (void *cls,
2552 unsigned int num_hashes; 3907 unsigned int num_hashes;
2553 struct GNUNET_MQ_Envelope *ev; 3908 struct GNUNET_MQ_Envelope *ev;
2554 3909
2555 perf_rtt.demand.received += 1; 3910 /**
2556 perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 3911 * Check that the message is received only in supported phase
3912 */
3913 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3914 PHASE_FINISH_WAITING};
3915 if (GNUNET_OK !=
3916 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3917 {
3918 GNUNET_break (0);
3919 fail_union_operation (op);
3920 return;
3921 }
3922#if MEASURE_PERFORMANCE
3923 perf_store.demand.received += 1;
3924 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3925 GNUNET_MessageHeader));
3926#endif
2557 3927
2558 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 3928 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2559 / sizeof(struct GNUNET_HashCode); 3929 / sizeof(struct GNUNET_HashCode);
@@ -2570,6 +3940,39 @@ handle_union_p2p_demand (void *cls,
2570 fail_union_operation (op); 3940 fail_union_operation (op);
2571 return; 3941 return;
2572 } 3942 }
3943
3944 /* Save send demand message for message control */
3945 if (GNUNET_YES !=
3946 update_message_control_flow (
3947 op->message_control_flow,
3948 MSG_CFS_RECEIVED,
3949 &ee->element_hash,
3950 DEMAND_MESSAGE)
3951 )
3952 {
3953 LOG (GNUNET_ERROR_TYPE_ERROR,
3954 "Double demand message received found!\n");
3955 GNUNET_break (0);
3956 fail_union_operation (op);
3957 return;
3958 }
3959 ;
3960
3961 /* Mark element to be expected to received */
3962 if (GNUNET_YES !=
3963 update_message_control_flow (
3964 op->message_control_flow,
3965 MSG_CFS_SENT,
3966 &ee->element_hash,
3967 ELEMENT_MESSAGE)
3968 )
3969 {
3970 LOG (GNUNET_ERROR_TYPE_ERROR,
3971 "Double element message sent found!\n");
3972 GNUNET_break (0);
3973 fail_union_operation (op);
3974 return;
3975 }
2573 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) 3976 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2574 { 3977 {
2575 /* Probably confused lazily copied sets. */ 3978 /* Probably confused lazily copied sets. */
@@ -2577,8 +3980,10 @@ handle_union_p2p_demand (void *cls,
2577 fail_union_operation (op); 3980 fail_union_operation (op);
2578 return; 3981 return;
2579 } 3982 }
2580 perf_rtt.element.sent += 1; 3983#if MEASURE_PERFORMANCE
2581 perf_rtt.element.sent_var_bytes += ee->element.size; 3984 perf_store.element.sent += 1;
3985 perf_store.element.sent_var_bytes += ee->element.size;
3986#endif
2582 ev = GNUNET_MQ_msg_extra (emsg, 3987 ev = GNUNET_MQ_msg_extra (emsg,
2583 ee->element.size, 3988 ee->element.size,
2584 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); 3989 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
@@ -2600,9 +4005,10 @@ handle_union_p2p_demand (void *cls,
2600 if (op->symmetric) 4005 if (op->symmetric)
2601 send_client_element (op, 4006 send_client_element (op,
2602 &ee->element, 4007 &ee->element,
2603 GNUNET_SET_STATUS_ADD_REMOTE); 4008 GNUNET_SETU_STATUS_ADD_REMOTE);
2604 } 4009 }
2605 GNUNET_CADET_receive_done (op->channel); 4010 GNUNET_CADET_receive_done (op->channel);
4011 maybe_finish (op);
2606} 4012}
2607 4013
2608 4014
@@ -2653,9 +4059,23 @@ handle_union_p2p_offer (void *cls,
2653 struct Operation *op = cls; 4059 struct Operation *op = cls;
2654 const struct GNUNET_HashCode *hash; 4060 const struct GNUNET_HashCode *hash;
2655 unsigned int num_hashes; 4061 unsigned int num_hashes;
4062 /**
4063 * Check that the message is received only in supported phase
4064 */
4065 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4066 if (GNUNET_OK !=
4067 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4068 {
4069 GNUNET_break (0);
4070 fail_union_operation (op);
4071 return;
4072 }
2656 4073
2657 perf_rtt.offer.received += 1; 4074#if MEASURE_PERFORMANCE
2658 perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 4075 perf_store.offer.received += 1;
4076 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4077 GNUNET_MessageHeader));
4078#endif
2659 4079
2660 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 4080 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2661 / sizeof(struct GNUNET_HashCode); 4081 / sizeof(struct GNUNET_HashCode);
@@ -2693,8 +4113,53 @@ handle_union_p2p_offer (void *cls,
2693 "[OP %p] Requesting element (hash %s)\n", 4113 "[OP %p] Requesting element (hash %s)\n",
2694 op, GNUNET_h2s (hash)); 4114 op, GNUNET_h2s (hash));
2695 4115
2696 perf_rtt.demand.sent += 1; 4116#if MEASURE_PERFORMANCE
2697 perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); 4117 perf_store.demand.sent += 1;
4118 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4119#endif
4120 /* Save send demand message for message control */
4121 if (GNUNET_YES !=
4122 update_message_control_flow (
4123 op->message_control_flow,
4124 MSG_CFS_SENT,
4125 hash,
4126 DEMAND_MESSAGE))
4127 {
4128 LOG (GNUNET_ERROR_TYPE_ERROR,
4129 "Double demand message sent found!\n");
4130 GNUNET_break (0);
4131 fail_union_operation (op);
4132 return;
4133 }
4134
4135 /* Mark offer as received received */
4136 if (GNUNET_YES !=
4137 update_message_control_flow (
4138 op->message_control_flow,
4139 MSG_CFS_RECEIVED,
4140 hash,
4141 OFFER_MESSAGE))
4142 {
4143 LOG (GNUNET_ERROR_TYPE_ERROR,
4144 "Double offer message received found!\n");
4145 GNUNET_break (0);
4146 fail_union_operation (op);
4147 return;
4148 }
4149 /* Mark element to be expected to received */
4150 if (GNUNET_YES !=
4151 update_message_control_flow (
4152 op->message_control_flow,
4153 MSG_CFS_EXPECTED,
4154 hash,
4155 ELEMENT_MESSAGE))
4156 {
4157 LOG (GNUNET_ERROR_TYPE_ERROR,
4158 "Element already expected!\n");
4159 GNUNET_break (0);
4160 fail_union_operation (op);
4161 return;
4162 }
2698 ev = GNUNET_MQ_msg_header_extra (demands, 4163 ev = GNUNET_MQ_msg_header_extra (demands,
2699 sizeof(struct GNUNET_HashCode), 4164 sizeof(struct GNUNET_HashCode),
2700 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); 4165 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
@@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls,
2719{ 4184{
2720 struct Operation *op = cls; 4185 struct Operation *op = cls;
2721 4186
2722 perf_rtt.done.received += 1; 4187 /**
4188 * Check that the message is received only in supported phase
4189 */
4190 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4191 if (GNUNET_OK !=
4192 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4193 {
4194 GNUNET_break (0);
4195 fail_union_operation (op);
4196 return;
4197 }
4198
4199 if (op->active_passive_switch_required)
4200 {
4201 LOG (GNUNET_ERROR_TYPE_ERROR,
4202 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4203 GNUNET_break (0);
4204 fail_union_operation (op);
4205 return;
4206 }
4207
4208#if MEASURE_PERFORMANCE
4209 perf_store.done.received += 1;
4210#endif
2723 switch (op->phase) 4211 switch (op->phase)
2724 { 4212 {
2725 case PHASE_PASSIVE_DECODING: 4213 case PHASE_PASSIVE_DECODING:
@@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls,
2728 LOG (GNUNET_ERROR_TYPE_DEBUG, 4216 LOG (GNUNET_ERROR_TYPE_DEBUG,
2729 "got DONE (as passive partner), waiting for our demands to be satisfied\n"); 4217 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2730 /* The active peer is done sending offers 4218 /* The active peer is done sending offers
2731 * and inquiries. This means that all 4219 * and inquiries. This means that all
2732 * our responses to that (demands and offers) 4220 * our responses to that (demands and offers)
2733 * must be in flight (queued or in mesh). 4221 * must be in flight (queued or in mesh).
2734 * 4222 *
2735 * We should notify the active peer once 4223 * We should notify the active peer once
2736 * all our demands are satisfied, so that the active 4224 * all our demands are satisfied, so that the active
2737 * peer can quit if we gave it everything. 4225 * peer can quit if we gave it everything.
2738 */GNUNET_CADET_receive_done (op->channel); 4226 */GNUNET_CADET_receive_done (op->channel);
2739 maybe_finish (op); 4227 maybe_finish (op);
2740 return; 4228 return;
2741 case PHASE_ACTIVE_DECODING: 4229 case PHASE_ACTIVE_DECODING:
2742 LOG (GNUNET_ERROR_TYPE_DEBUG, 4230 LOG (GNUNET_ERROR_TYPE_DEBUG,
2743 "got DONE (as active partner), waiting to finish\n"); 4231 "got DONE (as active partner), waiting to finish\n");
2744 /* All demands of the other peer are satisfied, 4232 /* All demands of the other peer are satisfied,
2745 * and we processed all offers, thus we know 4233 * and we processed all offers, thus we know
2746 * exactly what our demands must be. 4234 * exactly what our demands must be.
2747 * 4235 *
2748 * We'll close the channel 4236 * We'll close the channel
2749 * to the other peer once our demands are met. 4237 * to the other peer once our demands are met.
2750 */op->phase = PHASE_FINISH_CLOSING; 4238 */op->phase = PHASE_FINISH_CLOSING;
2751 GNUNET_CADET_receive_done (op->channel); 4239 GNUNET_CADET_receive_done (op->channel);
2752 maybe_finish (op); 4240 maybe_finish (op);
2753 return; 4241 return;
@@ -2769,7 +4257,9 @@ static void
2769handle_union_p2p_over (void *cls, 4257handle_union_p2p_over (void *cls,
2770 const struct GNUNET_MessageHeader *mh) 4258 const struct GNUNET_MessageHeader *mh)
2771{ 4259{
2772 perf_rtt.over.received += 1; 4260#if MEASURE_PERFORMANCE
4261 perf_store.over.received += 1;
4262#endif
2773 send_client_done (cls); 4263 send_client_done (cls);
2774} 4264}
2775 4265
@@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls,
2943 struct Listener *listener = op->listener; 4433 struct Listener *listener = op->listener;
2944 const struct GNUNET_MessageHeader *nested_context; 4434 const struct GNUNET_MessageHeader *nested_context;
2945 4435
2946 /* double operation request */ 4436 /* double operation request */
2947 if (0 != op->suggest_id) 4437 if (0 != op->suggest_id)
2948 { 4438 {
2949 GNUNET_break_op (0); 4439 GNUNET_break_op (0);
@@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls,
3053 } 4543 }
3054 set = GNUNET_new (struct Set); 4544 set = GNUNET_new (struct Set);
3055 { 4545 {
3056 struct StrataEstimator *se; 4546 struct MultiStrataEstimator *se;
3057 4547
3058 se = strata_estimator_create (SE_STRATA_COUNT, 4548 se = strata_estimator_create (SE_STRATA_COUNT,
3059 SE_IBF_SIZE, 4549 SE_IBFS_TOTAL_SIZE,
3060 SE_IBF_HASH_NUM); 4550 SE_IBF_HASH_NUM);
3061 if (NULL == se) 4551 if (NULL == se)
3062 { 4552 {
@@ -3198,6 +4688,7 @@ channel_window_cb (void *cls,
3198 * @param cls client that sent the message 4688 * @param cls client that sent the message
3199 * @param msg message sent by the client 4689 * @param msg message sent by the client
3200 */ 4690 */
4691
3201static void 4692static void
3202handle_client_listen (void *cls, 4693handle_client_listen (void *cls,
3203 const struct GNUNET_SETU_ListenMessage *msg) 4694 const struct GNUNET_SETU_ListenMessage *msg)
@@ -3240,10 +4731,10 @@ handle_client_listen (void *cls,
3240 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4731 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3241 struct GNUNET_MessageHeader, 4732 struct GNUNET_MessageHeader,
3242 NULL), 4733 NULL),
3243 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4734 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3244 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4735 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3245 struct GNUNET_MessageHeader, 4736 struct TransmitFullMessage,
3246 NULL), 4737 NULL),
3247 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4738 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3248 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4739 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3249 struct StrataEstimatorMessage, 4740 struct StrataEstimatorMessage,
@@ -3256,6 +4747,10 @@ handle_client_listen (void *cls,
3256 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 4747 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3257 struct GNUNET_SETU_ElementMessage, 4748 struct GNUNET_SETU_ElementMessage,
3258 NULL), 4749 NULL),
4750 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4751 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
4752 struct TransmitFullMessage,
4753 NULL),
3259 GNUNET_MQ_handler_end () 4754 GNUNET_MQ_handler_end ()
3260 }; 4755 };
3261 struct Listener *listener; 4756 struct Listener *listener;
@@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls,
3451{ 4946{
3452 struct ClientState *cs = cls; 4947 struct ClientState *cs = cls;
3453 struct Operation *op = GNUNET_new (struct Operation); 4948 struct Operation *op = GNUNET_new (struct Operation);
4949
3454 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 4950 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3455 GNUNET_MQ_hd_var_size (incoming_msg, 4951 GNUNET_MQ_hd_var_size (incoming_msg,
3456 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 4952 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
@@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls,
3488 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4984 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3489 struct GNUNET_MessageHeader, 4985 struct GNUNET_MessageHeader,
3490 op), 4986 op),
3491 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4987 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3492 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4988 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3493 struct GNUNET_MessageHeader, 4989 struct TransmitFullMessage,
3494 op), 4990 op),
3495 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4991 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3496 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4992 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3497 struct StrataEstimatorMessage, 4993 struct StrataEstimatorMessage,
@@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls,
3504 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 5000 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3505 struct GNUNET_SETU_ElementMessage, 5001 struct GNUNET_SETU_ElementMessage,
3506 op), 5002 op),
5003 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5004 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
5005 struct TransmitFullMessage,
5006 NULL),
3507 GNUNET_MQ_handler_end () 5007 GNUNET_MQ_handler_end ()
3508 }; 5008 };
3509 struct Set *set; 5009 struct Set *set;
@@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls,
3525 op->force_full = msg->force_full; 5025 op->force_full = msg->force_full;
3526 op->force_delta = msg->force_delta; 5026 op->force_delta = msg->force_delta;
3527 op->symmetric = msg->symmetric; 5027 op->symmetric = msg->symmetric;
5028 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5029 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5030 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5031 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5032 op->active_passive_switch_required = false;
3528 context = GNUNET_MQ_extract_nested_mh (msg); 5033 context = GNUNET_MQ_extract_nested_mh (msg);
3529 5034
5035 /* create hashmap for message control */
5036 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5037 GNUNET_NO);
5038 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5039
5040#if MEASURE_PERFORMANCE
5041 /* load config */
5042 load_config (op);
5043#endif
5044
3530 /* Advance generation values, so that 5045 /* Advance generation values, so that
3531 mutations won't interfer with the running operation. */ 5046 mutations won't interfer with the running operation. */
3532 op->set = set; 5047 op->set = set;
@@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls,
3550 struct GNUNET_MQ_Envelope *ev; 5065 struct GNUNET_MQ_Envelope *ev;
3551 struct OperationRequestMessage *msg; 5066 struct OperationRequestMessage *msg;
3552 5067
3553 perf_rtt.operation_request.sent += 1; 5068#if MEASURE_PERFORMANCE
5069 perf_store.operation_request.sent += 1;
5070#endif
3554 ev = GNUNET_MQ_msg_nested_mh (msg, 5071 ev = GNUNET_MQ_msg_nested_mh (msg,
3555 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 5072 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3556 context); 5073 context);
@@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls,
3567 op->se = strata_estimator_dup (op->set->se); 5084 op->se = strata_estimator_dup (op->set->se);
3568 /* we started the operation, thus we have to send the operation request */ 5085 /* we started the operation, thus we have to send the operation request */
3569 op->phase = PHASE_EXPECT_SE; 5086 op->phase = PHASE_EXPECT_SE;
3570 op->salt_receive = op->salt_send = 42; // FIXME????? 5087
5088 op->salt_receive = (op->peer_site + 1) % 2;
5089 op->salt_send = op->peer_site; // FIXME?????
5090
5091
3571 LOG (GNUNET_ERROR_TYPE_DEBUG, 5092 LOG (GNUNET_ERROR_TYPE_DEBUG,
3572 "Initiating union operation evaluation\n"); 5093 "Initiating union operation evaluation\n");
3573 GNUNET_STATISTICS_update (_GSS_statistics, 5094 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -3711,6 +5232,20 @@ handle_client_accept (void *cls,
3711 op->force_full = msg->force_full; 5232 op->force_full = msg->force_full;
3712 op->force_delta = msg->force_delta; 5233 op->force_delta = msg->force_delta;
3713 op->symmetric = msg->symmetric; 5234 op->symmetric = msg->symmetric;
5235 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5236 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5237 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5238 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5239 op->active_passive_switch_required = false;
5240 /* create hashmap for message control */
5241 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5242 GNUNET_NO);
5243 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5244
5245#if MEASURE_PERFORMANCE
5246 /* load config */
5247 load_config (op);
5248#endif
3714 5249
3715 /* Advance generation values, so that future mutations do not 5250 /* Advance generation values, so that future mutations do not
3716 interfer with the running operation. */ 5251 interfer with the running operation. */
@@ -3729,7 +5264,7 @@ handle_client_accept (void *cls,
3729 1, 5264 1,
3730 GNUNET_NO); 5265 GNUNET_NO);
3731 { 5266 {
3732 const struct StrataEstimator *se; 5267 struct MultiStrataEstimator *se;
3733 struct GNUNET_MQ_Envelope *ev; 5268 struct GNUNET_MQ_Envelope *ev;
3734 struct StrataEstimatorMessage *strata_msg; 5269 struct StrataEstimatorMessage *strata_msg;
3735 char *buf; 5270 char *buf;
@@ -3739,20 +5274,40 @@ handle_client_accept (void *cls,
3739 op->se = strata_estimator_dup (op->set->se); 5274 op->se = strata_estimator_dup (op->set->se);
3740 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 5275 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3741 GNUNET_NO); 5276 GNUNET_NO);
3742 op->salt_receive = op->salt_send = 42; // FIXME????? 5277 op->salt_receive = (op->peer_site + 1) % 2;
5278 op->salt_send = op->peer_site; // FIXME?????
3743 initialize_key_to_element (op); 5279 initialize_key_to_element (op);
3744 op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( 5280 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3745 op->key_to_element); 5281 op->key_to_element);
3746 5282
3747 /* kick off the operation */ 5283 /* kick off the operation */
3748 se = op->se; 5284 se = op->se;
3749 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); 5285
5286 uint8_t se_count = 1;
5287 if (op->initial_size > 0)
5288 {
5289 op->total_elements_size_local = 0;
5290 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5291 &
5292 determinate_avg_element_size_iterator,
5293 op);
5294 se_count = determine_strata_count (
5295 op->total_elements_size_local / op->initial_size,
5296 op->initial_size);
5297 }
5298 buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5299 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
3750 len = strata_estimator_write (se, 5300 len = strata_estimator_write (se,
5301 SE_IBFS_TOTAL_SIZE,
5302 se_count,
3751 buf); 5303 buf);
3752 perf_rtt.se.sent += 1; 5304#if MEASURE_PERFORMANCE
3753 perf_rtt.se.sent_var_bytes += len; 5305 perf_store.se.sent += 1;
5306 perf_store.se.sent_var_bytes += len;
5307#endif
3754 5308
3755 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) 5309 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5310 * SE_IBFS_TOTAL_SIZE)
3756 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; 5311 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
3757 else 5312 else
3758 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; 5313 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
@@ -3766,6 +5321,7 @@ handle_client_accept (void *cls,
3766 strata_msg->set_size 5321 strata_msg->set_size
3767 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( 5322 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3768 op->set->content->elements)); 5323 op->set->content->elements));
5324 strata_msg->se_count = se_count;
3769 GNUNET_MQ_send (op->mq, 5325 GNUNET_MQ_send (op->mq,
3770 ev); 5326 ev);
3771 op->phase = PHASE_EXPECT_IBF; 5327 op->phase = PHASE_EXPECT_IBF;
@@ -3800,8 +5356,9 @@ shutdown_task (void *cls)
3800 GNUNET_YES); 5356 GNUNET_YES);
3801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5357 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3802 "handled shutdown request\n"); 5358 "handled shutdown request\n");
3803 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 5359#if MEASURE_PERFORMANCE
3804 "RTT:%f\n", calculate_perf_rtt()); 5360 calculate_perf_store ();
5361#endif
3805} 5362}
3806 5363
3807 5364