aboutsummaryrefslogtreecommitdiff
path: root/src/setu
diff options
context:
space:
mode:
Diffstat (limited to 'src/setu')
-rw-r--r--src/setu/gnunet-service-setu.c2085
-rw-r--r--src/setu/gnunet-service-setu_protocol.h78
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.c362
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.h54
-rw-r--r--src/setu/ibf.c290
-rw-r--r--src/setu/ibf.h65
-rw-r--r--src/setu/perf_setu_api.c571
-rw-r--r--src/setu/setu.h49
-rw-r--r--src/setu/setu_api.c40
-rw-r--r--src/setu/test_setu_api.c60
10 files changed, 2878 insertions, 776 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index bd1113f15..339d347f8 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -22,6 +22,7 @@
22 * @brief set union operation 22 * @brief set union operation
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -50,33 +51,61 @@
50 */ 51 */
51#define SE_STRATA_COUNT 32 52#define SE_STRATA_COUNT 32
52 53
54
53/** 55/**
54 * Size of the IBFs in the strata estimator. 56 * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348
57 * Based on the bsc thesis of Elias Summermatter (2021)
55 */ 58 */
56#define SE_IBF_SIZE 80 59#define SE_IBFS_TOTAL_SIZE 632
57 60
58/** 61/**
59 * The hash num parameter for the difference digests and strata estimators. 62 * The hash num parameter for the difference digests and strata estimators.
60 */ 63 */
61#define SE_IBF_HASH_NUM 4 64#define SE_IBF_HASH_NUM 3
62 65
63/** 66/**
64 * Number of buckets that can be transmitted in one message. 67 * Number of buckets that can be transmitted in one message.
65 */ 68 */
66#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) 69#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
67 70
68/** 71/**
69 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). 72 * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
70 * Choose this value so that computing the IBF is still cheaper 73 * Choose this value so that computing the IBF is still cheaper
71 * than transmitting all values. 74 * than transmitting all values.
72 */ 75 */
73#define MAX_IBF_ORDER (20) 76#define MAX_IBF_SIZE 1048576
77
78
79/**
80 * Minimal size of an ibf
81 * Based on the bsc thesis of Elias Summermatter (2021)
82 */
83#define IBF_MIN_SIZE 37
84
85/**
86 * AVG RTT for differential sync when k=2 and Factor = 2
87 * Based on the bsc thesis of Elias Summermatter (2021)
88 */
89#define DIFFERENTIAL_RTT_MEAN 3.65145
90
91/**
92 * Security level used for byzantine checks (2^80)
93 */
94
95#define SECURITY_LEVEL 80
96
97/**
98 * Is the estimated probability for a new round this values
99 * is based on the bsc thesis of Elias Summermatter (2021)
100 */
101
102#define PROBABILITY_FOR_NEW_ROUND 0.15
74 103
75/** 104/**
76 * Number of buckets used in the ibf per estimated 105 * Measure the performance in a csv
77 * difference.
78 */ 106 */
79#define IBF_ALPHA 4 107
108#define MEASURE_PERFORMANCE 0
80 109
81 110
82/** 111/**
@@ -146,6 +175,28 @@ enum UnionOperationPhase
146 PHASE_FULL_RECEIVING 175 PHASE_FULL_RECEIVING
147}; 176};
148 177
178/**
179 * Different modes of operations
180 */
181
182enum MODE_OF_OPERATION
183{
184 /**
185 * Mode just synchronizes the difference between sets
186 */
187 DIFFERENTIAL_SYNC,
188
189 /**
190 * Mode send full set sending local set first
191 */
192 FULL_SYNC_LOCAL_SENDING_FIRST,
193
194 /**
195 * Mode request full set from remote peer
196 */
197 FULL_SYNC_REMOTE_SENDING_FIRST
198};
199
149 200
150/** 201/**
151 * Information about an element element in the set. All elements are 202 * Information about an element element in the set. All elements are
@@ -277,7 +328,7 @@ struct Operation
277 * Copy of the set's strata estimator at the time of 328 * Copy of the set's strata estimator at the time of
278 * creation of this operation. 329 * creation of this operation.
279 */ 330 */
280 struct StrataEstimator *se; 331 struct MultiStrataEstimator *se;
281 332
282 /** 333 /**
283 * The IBF we currently receive. 334 * The IBF we currently receive.
@@ -320,7 +371,7 @@ struct Operation
320 /** 371 /**
321 * Number of ibf buckets already received into the @a remote_ibf. 372 * Number of ibf buckets already received into the @a remote_ibf.
322 */ 373 */
323 unsigned int ibf_buckets_received; 374 uint64_t ibf_buckets_received;
324 375
325 /** 376 /**
326 * Salt that we're using for sending IBFs 377 * Salt that we're using for sending IBFs
@@ -386,7 +437,7 @@ struct Operation
386 * Lower bound for the set size, used only when 437 * Lower bound for the set size, used only when
387 * byzantine mode is enabled. 438 * byzantine mode is enabled.
388 */ 439 */
389 int byzantine_lower_bound; 440 uint64_t byzantine_lower_bound;
390 441
391 /** 442 /**
392 * Unique request id for the request from a remote peer, sent to the 443 * Unique request id for the request from a remote peer, sent to the
@@ -401,21 +452,83 @@ struct Operation
401 */ 452 */
402 unsigned int generation_created; 453 unsigned int generation_created;
403 454
455
456 /**
457 * User defined Bandwidth Round Trips Tradeoff
458 */
459 uint64_t rtt_bandwidth_tradeoff;
460
461
462 /**
463 * Number of Element per bucket in IBF
464 */
465 uint8_t ibf_number_buckets_per_element;
466
467
468 /**
469 * Set difference is multiplied with this factor
470 * to gennerate large enough IBF
471 */
472 uint8_t ibf_bucket_number_factor;
473
474 /**
475 * Defines which site a client is
476 * 0 = Initiating peer
477 * 1 = Receiving peer
478 */
479 uint8_t peer_site;
480
481
404 /** 482 /**
405 * User defined Bandwidth Round Trips Tradeoff 483 * Local peer element count
406 */ 484 */
407 double rtt_bandwidth_tradeoff; 485 uint64_t local_element_count;
408 486
409 /** 487 /**
410 * Number of Element per bucket in IBF 488 * Mode of operation that was chosen by the algorithm
411 */ 489 */
412 unsigned int ibf_number_buckets_per_element; 490 uint8_t mode_of_operation;
413 491
414 /** 492 /**
415 * Number of buckets in IBF 493 * Hashmap to keep track of the send/received messages
416 */ 494 */
417 unsigned ibf_bucket_number; 495 struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
418 496
497 /**
498 * Hashmap to keep track of the send/received inquiries (ibf keys)
499 */
500 struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
501
502
503 /**
504 * Total size of local set
505 */
506 uint64_t total_elements_size_local;
507
508 /**
509 * Limit of number of elements in set
510 */
511 uint64_t byzantine_upper_bound;
512
513 /**
514 * is the count of already passed differential sync iterations
515 */
516 uint8_t differential_sync_iterations;
517
518 /**
519 * Estimated or committed set difference at the start
520 */
521 uint64_t remote_set_diff;
522
523 /**
524 * Estimated or committed set difference at the start
525 */
526 uint64_t local_set_diff;
527
528 /**
529 * Boolean to enforce an active passive switch
530 */
531 bool active_passive_switch_required;
419}; 532};
420 533
421 534
@@ -431,6 +544,16 @@ struct SetContent
431 struct GNUNET_CONTAINER_MultiHashMap *elements; 544 struct GNUNET_CONTAINER_MultiHashMap *elements;
432 545
433 /** 546 /**
547 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized.
548 */
549 struct GNUNET_CONTAINER_MultiHashMap *elements_randomized;
550
551 /**
552 * Salt to construct the randomized element map
553 */
554 uint64_t elements_randomized_salt;
555
556 /**
434 * Number of references to the content. 557 * Number of references to the content.
435 */ 558 */
436 unsigned int refcount; 559 unsigned int refcount;
@@ -478,7 +601,7 @@ struct Set
478 * The strata estimator is only generated once for each set. The IBF keys 601 * The strata estimator is only generated once for each set. The IBF keys
479 * are derived from the element hashes with salt=0. 602 * are derived from the element hashes with salt=0.
480 */ 603 */
481 struct StrataEstimator *se; 604 struct MultiStrataEstimator *se;
482 605
483 /** 606 /**
484 * Evaluate operations are held in a linked list. 607 * Evaluate operations are held in a linked list.
@@ -635,96 +758,687 @@ static int in_shutdown;
635 */ 758 */
636static uint32_t suggest_id; 759static uint32_t suggest_id;
637 760
761#if MEASURE_PERFORMANCE
762/**
763 * Handles configuration file for setu performance test
764 *
765 */
766static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767
768
769/**
770 * Stores the performance data for induvidual message
771 */
772
773
774struct perf_num_send_received_msg
775{
776 uint64_t sent;
777 uint64_t sent_var_bytes;
778 uint64_t received;
779 uint64_t received_var_bytes;
780};
781
782/**
783 * Main struct to measure performance (data/rtts)
784 */
785struct per_store_struct
786{
787 struct perf_num_send_received_msg operation_request;
788 struct perf_num_send_received_msg se;
789 struct perf_num_send_received_msg request_full;
790 struct perf_num_send_received_msg element_full;
791 struct perf_num_send_received_msg full_done;
792 struct perf_num_send_received_msg ibf;
793 struct perf_num_send_received_msg inquery;
794 struct perf_num_send_received_msg element;
795 struct perf_num_send_received_msg demand;
796 struct perf_num_send_received_msg offer;
797 struct perf_num_send_received_msg done;
798 struct perf_num_send_received_msg over;
799 uint64_t se_diff;
800 uint64_t se_diff_remote;
801 uint64_t se_diff_local;
802 uint64_t active_passive_switches;
803 uint8_t mode_of_operation;
804};
805
806struct per_store_struct perf_store;
807#endif
808
809/**
810 * Different states to control the messages flow in differential mode
811 */
812
813enum MESSAGE_CONTROL_FLOW_STATE
814{
815 /**
816 * Initial message state
817 */
818 MSG_CFS_UNINITIALIZED,
819
820 /**
821 * Track that a message has been sent
822 */
823 MSG_CFS_SENT,
824
825 /**
826 * Track that receiving this message is expected
827 */
828 MSG_CFS_EXPECTED,
829
830 /**
831 * Track that message has been received
832 */
833 MSG_CFS_RECEIVED,
834};
638 835
639/** 836/**
640 * Added Roundtripscounter 837 * Message types to track in message control flow
641 */ 838 */
642 839
840enum MESSAGE_TYPE
841{
842 /**
843 * Offer message type
844 */
845 OFFER_MESSAGE,
643 846
644struct perf_num_send_resived_msg { 847 /**
645 int sent; 848 * Demand message type
646 int sent_var_bytes; 849 */
647 int received; 850 DEMAND_MESSAGE,
648 int received_var_bytes; 851
852 /**
853 * Element message type
854 */
855 ELEMENT_MESSAGE,
649}; 856};
650 857
651 858
652struct perf_rtt_struct 859/**
653{ 860 * Struct to tracked messages in message control flow
654 struct perf_num_send_resived_msg operation_request; 861 */
655 struct perf_num_send_resived_msg se; 862struct messageControlFlowElement
656 struct perf_num_send_resived_msg request_full; 863{
657 struct perf_num_send_resived_msg element_full; 864 /**
658 struct perf_num_send_resived_msg full_done; 865 * Track the message control state of the offer message
659 struct perf_num_send_resived_msg ibf; 866 */
660 struct perf_num_send_resived_msg inquery; 867 enum MESSAGE_CONTROL_FLOW_STATE offer;
661 struct perf_num_send_resived_msg element; 868 /**
662 struct perf_num_send_resived_msg demand; 869 * Track the message control state of the demand message
663 struct perf_num_send_resived_msg offer; 870 */
664 struct perf_num_send_resived_msg done; 871 enum MESSAGE_CONTROL_FLOW_STATE demand;
665 struct perf_num_send_resived_msg over; 872 /**
873 * Track the message control state of the element message
874 */
875 enum MESSAGE_CONTROL_FLOW_STATE element;
666}; 876};
667 877
668struct perf_rtt_struct perf_rtt;
669 878
879#if MEASURE_PERFORMANCE
670 880
881/**
882 * Loads different configuration to perform performance tests
883 *
884 * @param op operation handle
885 */
886static void
887load_config (struct Operation *op)
888{
889 long long number;
890 float fl;
891
892 setu_cfg = GNUNET_CONFIGURATION_create ();
893 GNUNET_CONFIGURATION_load (setu_cfg,
894 "perf_setu.conf");
895 GNUNET_CONFIGURATION_get_value_float (setu_cfg,
896 "IBF",
897 "BUCKET_NUMBER_FACTOR",
898 &fl);
899 op->ibf_bucket_number_factor = fl;
900 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
901 "IBF",
902 "NUMBER_PER_BUCKET",
903 &number);
904 op->ibf_number_buckets_per_element = number;
905 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
906 "PERFORMANCE",
907 "TRADEOFF",
908 &number);
909 op->rtt_bandwidth_tradeoff = number;
910 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
911 "BOUNDARIES",
912 "UPPER_ELEMENT",
913 &number);
914 op->byzantine_upper_bound = number;
915 op->peer_site = 0;
916}
917
918
919/**
920 * Function to calculate total bytes used for performance measurement
921 * @param size
922 * @param perf_num_send_received_msg
923 * @return bytes used
924 */
671static int 925static int
672sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) { 926sum_sent_received_bytes (uint64_t size,
673 return (size * perf_rtt_struct.sent) + 927 struct perf_num_send_received_msg
674 (size * perf_rtt_struct.received) + 928 perf_num_send_received_msg)
675 perf_rtt_struct.sent_var_bytes + 929{
676 perf_rtt_struct.received_var_bytes; 930 return (size * perf_num_send_received_msg.sent)
931 + (size * perf_num_send_received_msg.received)
932 + perf_num_send_received_msg.sent_var_bytes
933 + perf_num_send_received_msg.received_var_bytes;
677} 934}
678 935
679static float
680calculate_perf_rtt() {
681 /**
682 * Calculate RTT of init phase normally always 1
683 */
684 float rtt = 1;
685 int bytes_transmitted = 0;
686 936
687 /** 937/**
688 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending 938 * Function that calculates the perfmance values and writes them down
689 */ 939 */
690 if (( perf_rtt.element_full.received != 0 ) || 940static void
691 ( perf_rtt.element_full.sent != 0) 941calculate_perf_store ()
692 ) rtt += 1; 942{
693 943
694 if (( perf_rtt.request_full.received != 0 ) || 944 /**
695 ( perf_rtt.request_full.sent != 0) 945 * Calculate RTT of init phase normally always 1
696 ) rtt += 0.5; 946 */
947 float rtt = 1;
948 int bytes_transmitted = 0;
697 949
698 /** 950 /**
699 * In case of a differential sync 3 rtt's are needed. 951 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending
700 * for every active/passive switch additional 3.5 rtt's are used 952 */
701 */ 953 if ((perf_store.element_full.received != 0) ||
954 (perf_store.element_full.sent != 0)
955 )
956 rtt += 1;
957
958 if ((perf_store.request_full.received != 0) ||
959 (perf_store.request_full.sent != 0)
960 )
961 rtt += 0.5;
702 962
703 int iterations = perf_rtt.ibf.received; 963 /**
704 if(iterations > 1) 964 * In case of a differential sync 3 rtt's are needed.
705 rtt += (iterations - 1 ) * 0.5; 965 * for every active/passive switch additional 3.5 rtt's are used
706 rtt += 3 * iterations; 966 */
967 if ((perf_store.element.received != 0) ||
968 (perf_store.element.sent != 0))
969 {
970 int iterations = perf_store.active_passive_switches;
971
972 if (iterations > 0)
973 rtt += iterations * 0.5;
974 rtt += 2.5;
975 }
976
977
978 /**
979 * Calculate data sended size
980 */
981 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
982 GNUNET_SETU_ResultMessage),
983 perf_store.request_full);
984
985 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
986 GNUNET_SETU_ElementMessage),
987 perf_store.element_full);
988 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
989 GNUNET_SETU_ElementMessage),
990 perf_store.element);
991 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
992 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
993 StrataEstimatorMessage),
994 perf_store.se);
995 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
996 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
997 perf_store.ibf);
998 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
999 perf_store.inquery);
1000 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1001 GNUNET_MessageHeader),
1002 perf_store.demand);
1003 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1004 GNUNET_MessageHeader),
1005 perf_store.offer);
1006 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1007
1008 /**
1009 * Write IBF failure rate for different BUCKET_NUMBER_FACTOR
1010 */
1011 float factor;
1012 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1013 &factor);
1014 long long num_per_bucket;
1015 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1016 &num_per_bucket);
1017
1018
1019 int decoded = 0;
1020 if (perf_store.active_passive_switches == 0)
1021 decoded = 1;
1022 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1023 IBFMessage),
1024 perf_store.ibf);
1025
1026 FILE *out1 = fopen ("perf_data.csv", "a");
1027 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1028 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1029 bytes_transmitted,
1030 perf_store.se_diff_local,perf_store.se_diff_remote,
1031 perf_store.mode_of_operation);
1032 fclose (out1);
1033
1034}
1035
1036
1037#endif
1038/**
1039 * Function that chooses the optimal mode of operation depending on
1040 * operation parameters.
1041 * @param avg_element_size
1042 * @param local_set_size
1043 * @param remote_set_size
1044 * @param est_set_diff_remote
1045 * @param est_set_diff_local
1046 * @param bandwith_latency_tradeoff
1047 * @param ibf_bucket_number_factor
1048 * @return calcuated mode of operation
1049 */
1050static uint8_t
1051estimate_best_mode_of_operation (uint64_t avg_element_size,
1052 uint64_t local_set_size,
1053 uint64_t remote_set_size,
1054 uint64_t est_set_diff_remote,
1055 uint64_t est_set_diff_local,
1056 uint64_t bandwith_latency_tradeoff,
1057 uint64_t ibf_bucket_number_factor)
1058{
1059
1060 /*
1061 * In case of initial sync fall to predefined states
1062 */
1063
1064 if (0 == local_set_size)
1065 return FULL_SYNC_REMOTE_SENDING_FIRST;
1066 if (0 == remote_set_size)
1067 return FULL_SYNC_LOCAL_SENDING_FIRST;
1068
1069 /*
1070 * Calculate bytes for full Sync
1071 */
1072
1073 uint8_t sizeof_full_done_header = 4;
1074 uint8_t sizeof_done_header = 4;
1075 uint8_t rtt_min_full = 2;
1076 uint8_t sizeof_request_full = 4;
1077 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1078
1079 /* Estimate byte required if we send first */
1080 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1081 + local_set_size;
1082
1083 uint64_t total_bytes_full_local_send_first = (avg_element_size
1084 *
1085 total_elements_to_send_local_send_first) \
1086 + (
1087 total_elements_to_send_local_send_first * sizeof(struct
1088 GNUNET_SETU_ElementMessage)) \
1089 + (sizeof_full_done_header * 2) \
1090 + rtt_min_full
1091 * bandwith_latency_tradeoff;
1092
1093 /* Estimate bytes required if we request from remote peer */
1094 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095 + remote_set_size;
1096
1097 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098 *
1099 total_elements_to_send_remote_send_first) \
1100 + (
1101 total_elements_to_send_remote_send_first * sizeof(struct
1102 GNUNET_SETU_ElementMessage)) \
1103 + (sizeof_full_done_header * 2) \
1104 + (rtt_min_full + 0.5)
1105 * bandwith_latency_tradeoff \
1106 + sizeof_request_full;
1107
1108 /*
1109 * Calculate bytes for differential Sync
1110 */
1111
1112 /* Estimate bytes required by IBF transmission*/
1113
1114 long double ibf_bucket_count = estimated_total_diff
1115 * ibf_bucket_number_factor;
1116
1117 if (ibf_bucket_count <= IBF_MIN_SIZE)
1118 {
1119 ibf_bucket_count = IBF_MIN_SIZE;
1120 }
1121 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1122 / ((float) MAX_BUCKETS_PER_MESSAGE));
1123
1124 uint64_t estimated_counter_size = ceil (
1125 MIN (2 * log2l (((float) local_set_size)
1126 / ((float) ibf_bucket_count)),
1127 log2l (local_set_size)));
1128
1129 long double counter_bytes = (float) estimated_counter_size / 8;
1130
1131 uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count)
1132 * 1.2 \
1133 + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1134 + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1135 * 1.2 \
1136 + (ibf_bucket_count * counter_bytes) * 1.2);
1137
1138 /* Estimate full byte count for differential sync */
1139 uint64_t element_size = (avg_element_size
1140 + sizeof (struct GNUNET_SETU_ElementMessage)) \
1141 * estimated_total_diff;
1142 uint64_t done_size = sizeof_done_header;
1143 uint64_t inquery_size = (sizeof (struct IBF_Key)
1144 + sizeof (struct InquiryMessage))
1145 * estimated_total_diff;
1146 uint64_t demand_size =
1147 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1148 * estimated_total_diff;
1149 uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1150 + sizeof (struct GNUNET_MessageHeader))
1151 * estimated_total_diff;
1152
1153 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1154 + demand_size + offer_size + ibf_bytes) \
1155 + (DIFFERENTIAL_RTT_MEAN
1156 * bandwith_latency_tradeoff);
1157
1158 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1159 total_bytes_full_remote_send_first);
1160
1161 /* Decide between full and differential sync */
1162
1163 if (full_min < total_bytes_diff)
1164 {
1165 /* Decide between sending all element first or receiving all elements */
1166 if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1167 {
1168 return FULL_SYNC_LOCAL_SENDING_FIRST;
1169 }
1170 else
1171 {
1172 return FULL_SYNC_REMOTE_SENDING_FIRST;
1173 }
1174 }
1175 else
1176 {
1177 return DIFFERENTIAL_SYNC;
1178 }
1179}
1180
1181
1182/**
1183 * Validates the if a message is received in a correct phase
1184 * @param allowed_phases
1185 * @param size_phases
1186 * @param op
1187 * @return #GNUNET_YES if message permitted in phase and #GNUNET_NO if not permitted in given
1188 * phase
1189 */
1190static enum GNUNET_GenericReturnValue
1191check_valid_phase (const uint8_t allowed_phases[],
1192 size_t size_phases,
1193 struct Operation *op)
1194{
1195 /**
1196 * Iterate over allowed phases
1197 */
1198 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1199 {
1200 uint8_t phase = allowed_phases[phase_ctr];
1201 if (phase == op->phase)
1202 {
1203 LOG (GNUNET_ERROR_TYPE_DEBUG,
1204 "Message received in valid phase\n");
1205 return GNUNET_YES;
1206 }
1207 }
1208 LOG (GNUNET_ERROR_TYPE_ERROR,
1209 "Received message in invalid phase: %u\n", op->phase);
1210 return GNUNET_NO;
1211}
1212
1213
1214/**
1215 * Function to update, track and validate message received in differential
1216 * sync. This function tracks states of messages and check it against different
1217 * constraints as described in Summermatter's BSc Thesis (2021)
1218 * @param hash_map: Hashmap to store message control flow
1219 * @param new_mcfs: The new message control flow state an given message type should be set to
1220 * @param hash_code: Hash code of the element
1221 * @param mt: The message type for which the message control flow state should be set
1222 * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid
1223 * at this point in message flow
1224 */
1225static int
1226update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map,
1227 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1228 const struct GNUNET_HashCode *hash_code,
1229 enum MESSAGE_TYPE mt)
1230{
1231 struct messageControlFlowElement *cfe = NULL;
1232 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1233
1234 /**
1235 * Check logic for forbidden messages
1236 */
1237
1238 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1239 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1240 {
1241 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1242 {
1243 LOG (GNUNET_ERROR_TYPE_ERROR,
1244 "Received an element without sent offer!\n");
1245 return GNUNET_NO;
1246 }
1247 /* Check that only requested elements are received! */
1248 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1249 MSG_CFS_SENT))
1250 {
1251 LOG (GNUNET_ERROR_TYPE_ERROR,
1252 "Received an element that was not demanded\n");
1253 return GNUNET_NO;
1254 }
1255 }
1256
1257 /**
1258 * In case the element hash is not in the hashmap create a new entry
1259 */
1260
1261 if (NULL == cfe)
1262 {
1263 cfe = GNUNET_new (struct messageControlFlowElement);
1264 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1265 cfe,
1266 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1267 {
1268 GNUNET_free (cfe);
1269 return GNUNET_SYSERR;
1270 }
1271 }
1272
1273 /**
1274 * Set state of message
1275 */
1276
1277 if (OFFER_MESSAGE == mt)
1278 {
1279 mcfs = &cfe->offer;
1280 }
1281 else if (DEMAND_MESSAGE == mt)
1282 {
1283 mcfs = &cfe->demand;
1284 }
1285 else if (ELEMENT_MESSAGE == mt)
1286 {
1287 mcfs = &cfe->element;
1288 }
1289 else
1290 {
1291 return GNUNET_SYSERR;
1292 }
1293
1294 /**
1295 * Check if state is allowed
1296 */
1297
1298 if (new_mcfs <= *mcfs)
1299 {
1300 return GNUNET_NO;
1301 }
1302
1303 *mcfs = new_mcfs;
1304 return GNUNET_YES;
1305}
1306
1307
1308/**
1309 * Validate if a message in differential sync si already received before.
1310 * @param hash_map
1311 * @param hash_code
1312 * @param mt
1313 * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO
1314 */
1315static int
1316is_message_in_message_control_flow (struct
1317 GNUNET_CONTAINER_MultiHashMap *hash_map,
1318 struct GNUNET_HashCode *hash_code,
1319 enum MESSAGE_TYPE mt)
1320{
1321 struct messageControlFlowElement *cfe = NULL;
1322 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1323
1324 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1325
1326 /**
1327 * Set state of message
1328 */
1329
1330 if (cfe != NULL)
1331 {
1332 if (OFFER_MESSAGE == mt)
1333 {
1334 mcfs = &cfe->offer;
1335 }
1336 else if (DEMAND_MESSAGE == mt)
1337 {
1338 mcfs = &cfe->demand;
1339 }
1340 else if (ELEMENT_MESSAGE == mt)
1341 {
1342 mcfs = &cfe->element;
1343 }
1344 else
1345 {
1346 return GNUNET_SYSERR;
1347 }
707 1348
708 /** 1349 /**
709 * Calculate data sended size 1350 * Evaluate if set is in message
710 */ 1351 */
711 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full); 1352 if (*mcfs != MSG_CFS_UNINITIALIZED)
712 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full); 1353 {
713 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element); 1354 return GNUNET_YES;
714 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request); 1355 }
715 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se); 1356 }
716 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done); 1357 return GNUNET_NO;
717 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf); 1358}
718 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery);
719 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand);
720 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer);
721 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done);
722 1359
723 LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted);
724 1360
725 LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt )); 1361/**
1362 * Iterator for determining if all demands have been
1363 * satisfied
1364 *
1365 * @param cls the union operation `struct Operation *`
1366 * @param key unused
1367 * @param value the `struct ElementEntry *` to insert
1368 * into the key-to-element mapping
1369 * @return #GNUNET_YES (to continue iterating)
1370 */
1371static int
1372determinate_done_message_iterator (void *cls,
1373 const struct GNUNET_HashCode *key,
1374 void *value)
1375{
1376 struct messageControlFlowElement *mcfe = value;
726 1377
727 return rtt; 1378 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1379 {
1380 return GNUNET_YES;
1381 }
1382 return GNUNET_NO;
1383}
1384
1385
1386/**
1387 * Iterator for determining average size
1388 *
1389 * @param cls the union operation `struct Operation *`
1390 * @param key unused
1391 * @param value the `struct ElementEntry *` to insert
1392 * into the key-to-element mapping
1393 * @return #GNUNET_YES (to continue iterating)
1394 */
1395static int
1396determinate_avg_element_size_iterator (void *cls,
1397 const struct GNUNET_HashCode *key,
1398 void *value)
1399{
1400 struct Operation *op = cls;
1401 struct GNUNET_SETU_Element *element = value;
1402 op->total_elements_size_local += element->size;
1403 return GNUNET_YES;
1404}
1405
1406
1407/**
1408 * Create randomized element hashmap for full sending
1409 *
1410 * @param cls the union operation `struct Operation *`
1411 * @param key unused
1412 * @param value the `struct ElementEntry *` to insert
1413 * into the key-to-element mapping
1414 * @return #GNUNET_YES (to continue iterating)
1415 */
1416static int
1417create_randomized_element_iterator (void *cls,
1418 const struct GNUNET_HashCode *key,
1419 void *value)
1420{
1421 struct Operation *op = cls;
1422
1423 struct GNUNET_HashContext *hashed_key_context =
1424 GNUNET_CRYPTO_hash_context_start ();
1425 struct GNUNET_HashCode new_key;
1426
1427 /**
1428 * Hash element with new salt to randomize hashmap
1429 */
1430 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1431 &key,
1432 sizeof(struct IBF_Key));
1433 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1434 &op->set->content->elements_randomized_salt,
1435 sizeof(uint32_t));
1436 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1437 &new_key);
1438 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1439 &new_key,value,
1440 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1441 return GNUNET_YES;
728} 1442}
729 1443
730 1444
@@ -808,6 +1522,36 @@ send_client_done (void *cls)
808} 1522}
809 1523
810 1524
1525/**
1526 * Check if all given byzantine parameters are in given boundaries
1527 * @param op
1528 * @return indicator if all given byzantine parameters are in given boundaries
1529 */
1530
1531static int
1532check_byzantine_bounds (struct Operation *op)
1533{
1534 if (op->byzantine != GNUNET_YES)
1535 return GNUNET_OK;
1536
1537 /**
1538 * Check upper byzantine bounds
1539 */
1540 if (op->remote_element_count + op->remote_set_diff >
1541 op->byzantine_upper_bound)
1542 return GNUNET_SYSERR;
1543 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1544 return GNUNET_SYSERR;
1545
1546 /**
1547 * Check lower byzantine bounds
1548 */
1549 if (op->remote_element_count < op->byzantine_lower_bound)
1550 return GNUNET_SYSERR;
1551 return GNUNET_OK;
1552}
1553
1554
811/* FIXME: the destroy logic is a mess and should be cleaned up! */ 1555/* FIXME: the destroy logic is a mess and should be cleaned up! */
812 1556
813/** 1557/**
@@ -977,6 +1721,101 @@ fail_union_operation (struct Operation *op)
977 1721
978 1722
979/** 1723/**
1724 * Function that checks if full sync is plausible
1725 * @param initial_local_elements_in_set
1726 * @param estimated_set_difference
1727 * @param repeated_elements
1728 * @param fresh_elements
1729 * @param op
1730 * @return GNUNET_OK if
1731 */
1732
1733static void
1734full_sync_plausibility_check (struct Operation *op)
1735{
1736 if (GNUNET_YES != op->byzantine)
1737 return;
1738
1739 int security_level_lb = -1 * SECURITY_LEVEL;
1740 uint64_t duplicates = op->received_fresh - op->received_total;
1741
1742 /*
1743 * Protect full sync from receiving double element when in FULL SENDING
1744 */
1745 if (PHASE_FULL_SENDING == op->phase)
1746 {
1747 if (duplicates > 0)
1748 {
1749 LOG (GNUNET_ERROR_TYPE_ERROR,
1750 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1751 "mode of operation this is not allowed! Duplicates: %llu\n",
1752 (unsigned long long) duplicates);
1753 GNUNET_break_op (0);
1754 fail_union_operation (op);
1755 return;
1756 }
1757
1758 }
1759
1760 /*
1761 * Protect full sync with probabilistic algorithm
1762 */
1763 if (PHASE_FULL_RECEIVING == op->phase)
1764 {
1765 if (0 == op->remote_set_diff)
1766 op->remote_set_diff = 1;
1767
1768 long double base = (1 - (long double) (op->remote_set_diff
1769 / (long double) (op->initial_size
1770 + op->
1771 remote_set_diff)));
1772 long double exponent = (op->received_total - (op->received_fresh * ((long
1773 double)
1774 op->
1775 initial_size
1776 / (long
1777 double)
1778 op->
1779 remote_set_diff)));
1780 long double value = exponent * (log2l (base) / log2l (2));
1781 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1782 {
1783 LOG (GNUNET_ERROR_TYPE_ERROR,
1784 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1785 "to many duplicated full element : %LF\n",
1786 value);
1787 GNUNET_break_op (0);
1788 fail_union_operation (op);
1789 return;
1790 }
1791 }
1792}
1793
1794
1795/**
1796 * Limit active passive switches in differential sync to configured security level
1797 * @param op
1798 */
1799static void
1800check_max_differential_rounds (struct Operation *op)
1801{
1802 double probability = op->differential_sync_iterations * (log2l (
1803 PROBABILITY_FOR_NEW_ROUND)
1804 / log2l (2));
1805 if ((-1 * SECURITY_LEVEL) > probability)
1806 {
1807 LOG (GNUNET_ERROR_TYPE_ERROR,
1808 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1809 "switches in differential sync: %u\n",
1810 op->differential_sync_iterations);
1811 GNUNET_break_op (0);
1812 fail_union_operation (op);
1813 return;
1814 }
1815}
1816
1817
1818/**
980 * Derive the IBF key from a hash code and 1819 * Derive the IBF key from a hash code and
981 * a salt. 1820 * a salt.
982 * 1821 *
@@ -1004,12 +1843,12 @@ get_ibf_key (const struct GNUNET_HashCode *src)
1004struct GetElementContext 1843struct GetElementContext
1005{ 1844{
1006 /** 1845 /**
1007 * FIXME. 1846 * Gnunet hash code in context
1008 */ 1847 */
1009 struct GNUNET_HashCode hash; 1848 struct GNUNET_HashCode hash;
1010 1849
1011 /** 1850 /**
1012 * FIXME. 1851 * Pointer to the key entry
1013 */ 1852 */
1014 struct KeyEntry *k; 1853 struct KeyEntry *k;
1015}; 1854};
@@ -1122,7 +1961,7 @@ salt_key (const struct IBF_Key *k_in,
1122 uint32_t salt, 1961 uint32_t salt,
1123 struct IBF_Key *k_out) 1962 struct IBF_Key *k_out)
1124{ 1963{
1125 int s = salt % 64; 1964 int s = (salt * 7) % 64;
1126 uint64_t x = k_in->key_val; 1965 uint64_t x = k_in->key_val;
1127 1966
1128 /* rotate ibf key */ 1967 /* rotate ibf key */
@@ -1132,14 +1971,14 @@ salt_key (const struct IBF_Key *k_in,
1132 1971
1133 1972
1134/** 1973/**
1135 * FIXME. 1974 * Reverse modification done in the salt_key function
1136 */ 1975 */
1137static void 1976static void
1138unsalt_key (const struct IBF_Key *k_in, 1977unsalt_key (const struct IBF_Key *k_in,
1139 uint32_t salt, 1978 uint32_t salt,
1140 struct IBF_Key *k_out) 1979 struct IBF_Key *k_out)
1141{ 1980{
1142 int s = salt % 64; 1981 int s = (salt * 7) % 64;
1143 uint64_t x = k_in->key_val; 1982 uint64_t x = k_in->key_val;
1144 1983
1145 x = (x << s) | (x >> (64 - s)); 1984 x = (x << s) | (x >> (64 - s));
@@ -1258,7 +2097,9 @@ prepare_ibf (struct Operation *op,
1258 2097
1259 if (NULL != op->local_ibf) 2098 if (NULL != op->local_ibf)
1260 ibf_destroy (op->local_ibf); 2099 ibf_destroy (op->local_ibf);
1261 op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 2100 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2101 op->local_ibf = ibf_create (size,
2102 ((uint8_t) op->ibf_number_buckets_per_element));
1262 if (NULL == op->local_ibf) 2103 if (NULL == op->local_ibf)
1263 { 2104 {
1264 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2105 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1283,13 +2124,23 @@ prepare_ibf (struct Operation *op,
1283 */ 2124 */
1284static int 2125static int
1285send_ibf (struct Operation *op, 2126send_ibf (struct Operation *op,
1286 uint16_t ibf_order) 2127 uint32_t ibf_size)
1287{ 2128{
1288 unsigned int buckets_sent = 0; 2129 uint64_t buckets_sent = 0;
1289 struct InvertibleBloomFilter *ibf; 2130 struct InvertibleBloomFilter *ibf;
2131 op->differential_sync_iterations++;
1290 2132
2133 /**
2134 * Enforce min size of IBF
2135 */
2136 uint32_t ibf_min_size = IBF_MIN_SIZE;
2137
2138 if (ibf_size < ibf_min_size)
2139 {
2140 ibf_size = ibf_min_size;
2141 }
1291 if (GNUNET_OK != 2142 if (GNUNET_OK !=
1292 prepare_ibf (op, 1 << ibf_order)) 2143 prepare_ibf (op, ibf_size))
1293 { 2144 {
1294 /* allocation failed */ 2145 /* allocation failed */
1295 return GNUNET_SYSERR; 2146 return GNUNET_SYSERR;
@@ -1297,45 +2148,52 @@ send_ibf (struct Operation *op,
1297 2148
1298 LOG (GNUNET_ERROR_TYPE_DEBUG, 2149 LOG (GNUNET_ERROR_TYPE_DEBUG,
1299 "sending ibf of size %u\n", 2150 "sending ibf of size %u\n",
1300 1 << ibf_order); 2151 (unsigned int) ibf_size);
1301 2152
1302 { 2153 {
1303 char name[64]; 2154 char name[64];
1304 GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); 2155
2156 GNUNET_snprintf (name,
2157 sizeof(name),
2158 "# sent IBF (order %u)",
2159 ibf_size);
1305 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); 2160 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1306 } 2161 }
1307 2162
1308 ibf = op->local_ibf; 2163 ibf = op->local_ibf;
1309 2164
1310 while (buckets_sent < (1 << ibf_order)) 2165 while (buckets_sent < ibf_size)
1311 { 2166 {
1312 unsigned int buckets_in_message; 2167 unsigned int buckets_in_message;
1313 struct GNUNET_MQ_Envelope *ev; 2168 struct GNUNET_MQ_Envelope *ev;
1314 struct IBFMessage *msg; 2169 struct IBFMessage *msg;
1315 2170
1316 buckets_in_message = (1 << ibf_order) - buckets_sent; 2171 buckets_in_message = ibf_size - buckets_sent;
1317 /* limit to maximum */ 2172 /* limit to maximum */
1318 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 2173 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1319 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 2174 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1320 2175
1321 perf_rtt.ibf.sent += 1; 2176#if MEASURE_PERFORMANCE
1322 perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE ); 2177 perf_store.ibf.sent += 1;
2178 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2179#endif
1323 ev = GNUNET_MQ_msg_extra (msg, 2180 ev = GNUNET_MQ_msg_extra (msg,
1324 buckets_in_message * IBF_BUCKET_SIZE, 2181 buckets_in_message * IBF_BUCKET_SIZE,
1325 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); 2182 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
1326 msg->reserved1 = 0; 2183 msg->ibf_size = ibf_size;
1327 msg->reserved2 = 0;
1328 msg->order = ibf_order;
1329 msg->offset = htonl (buckets_sent); 2184 msg->offset = htonl (buckets_sent);
1330 msg->salt = htonl (op->salt_send); 2185 msg->salt = htonl (op->salt_send);
2186 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2187
2188
1331 ibf_write_slice (ibf, buckets_sent, 2189 ibf_write_slice (ibf, buckets_sent,
1332 buckets_in_message, &msg[1]); 2190 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
1333 buckets_sent += buckets_in_message; 2191 buckets_sent += buckets_in_message;
1334 LOG (GNUNET_ERROR_TYPE_DEBUG, 2192 LOG (GNUNET_ERROR_TYPE_DEBUG,
1335 "ibf chunk size %u, %u/%u sent\n", 2193 "ibf chunk size %u, %llu/%u sent\n",
1336 buckets_in_message, 2194 (unsigned int) buckets_in_message,
1337 buckets_sent, 2195 (unsigned long long) buckets_sent,
1338 1 << ibf_order); 2196 (unsigned int) ibf_size);
1339 GNUNET_MQ_send (op->mq, ev); 2197 GNUNET_MQ_send (op->mq, ev);
1340 } 2198 }
1341 2199
@@ -1354,17 +2212,26 @@ send_ibf (struct Operation *op,
1354 * @return the required size of the ibf 2212 * @return the required size of the ibf
1355 */ 2213 */
1356static unsigned int 2214static unsigned int
1357get_order_from_difference (unsigned int diff) 2215get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2216 float ibf_bucket_number_factor)
1358{ 2217{
1359 unsigned int ibf_order; 2218 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2219 * Elias Summermatter (2021) in section 3.11 **/
2220 return (((int) (diff * ibf_bucket_number_factor)) | 1);
2221
2222}
1360 2223
1361 ibf_order = 2; 2224
1362 while (((1 << ibf_order) < (IBF_ALPHA * diff) || 2225static unsigned int
1363 ((1 << ibf_order) < SE_IBF_HASH_NUM)) && 2226get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
1364 (ibf_order < MAX_IBF_ORDER)) 2227 decoded_elements, unsigned int last_ibf_size)
1365 ibf_order++; 2228{
1366 // add one for correction 2229 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
1367 return ibf_order + 1; 2230 - (ibf_bucket_number_factor
2231 * decoded_elements));
2232 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2233 * Elias Summermatter (2021) in section 3.11 **/
2234 return next_size | 1;
1368} 2235}
1369 2236
1370 2237
@@ -1391,8 +2258,10 @@ send_full_element_iterator (void *cls,
1391 LOG (GNUNET_ERROR_TYPE_DEBUG, 2258 LOG (GNUNET_ERROR_TYPE_DEBUG,
1392 "Sending element %s\n", 2259 "Sending element %s\n",
1393 GNUNET_h2s (key)); 2260 GNUNET_h2s (key));
1394 perf_rtt.element_full.received += 1; 2261#if MEASURE_PERFORMANCE
1395 perf_rtt.element_full.received_var_bytes += el->size; 2262 perf_store.element_full.received += 1;
2263 perf_store.element_full.received_var_bytes += el->size;
2264#endif
1396 ev = GNUNET_MQ_msg_extra (emsg, 2265 ev = GNUNET_MQ_msg_extra (emsg,
1397 el->size, 2266 el->size,
1398 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 2267 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -1421,10 +2290,25 @@ send_full_set (struct Operation *op)
1421 "Dedicing to transmit the full set\n"); 2290 "Dedicing to transmit the full set\n");
1422 /* FIXME: use a more memory-friendly way of doing this with an 2291 /* FIXME: use a more memory-friendly way of doing this with an
1423 iterator, just as we do in the non-full case! */ 2292 iterator, just as we do in the non-full case! */
2293
2294 // Randomize Elements to send
2295 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2296 32,GNUNET_NO);
2297 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2298 GNUNET_CRYPTO_QUALITY_NONCE,
2299 UINT64_MAX);
1424 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, 2300 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1425 &send_full_element_iterator, 2301 &
2302 create_randomized_element_iterator,
1426 op); 2303 op);
1427 perf_rtt.full_done.sent += 1; 2304
2305 (void) GNUNET_CONTAINER_multihashmap_iterate (
2306 op->set->content->elements_randomized,
2307 &send_full_element_iterator,
2308 op);
2309#if MEASURE_PERFORMANCE
2310 perf_store.full_done.sent += 1;
2311#endif
1428 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 2312 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
1429 GNUNET_MQ_send (op->mq, 2313 GNUNET_MQ_send (op->mq,
1430 ev); 2314 ev);
@@ -1454,7 +2338,7 @@ check_union_p2p_strata_estimator (void *cls,
1454 msg->header.type)); 2338 msg->header.type));
1455 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2339 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1456 if ((GNUNET_NO == is_compressed) && 2340 if ((GNUNET_NO == is_compressed) &&
1457 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) 2341 (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE))
1458 { 2342 {
1459 GNUNET_break (0); 2343 GNUNET_break (0);
1460 return GNUNET_SYSERR; 2344 return GNUNET_SYSERR;
@@ -1473,14 +2357,44 @@ static void
1473handle_union_p2p_strata_estimator (void *cls, 2357handle_union_p2p_strata_estimator (void *cls,
1474 const struct StrataEstimatorMessage *msg) 2358 const struct StrataEstimatorMessage *msg)
1475{ 2359{
1476 perf_rtt.se.received += 1; 2360#if MEASURE_PERFORMANCE
1477 perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2361 perf_store.se.received += 1;
2362 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2363 StrataEstimatorMessage);
2364#endif
1478 struct Operation *op = cls; 2365 struct Operation *op = cls;
1479 struct StrataEstimator *remote_se; 2366 struct MultiStrataEstimator *remote_se;
1480 unsigned int diff; 2367 unsigned int diff;
1481 uint64_t other_size; 2368 uint64_t other_size;
1482 size_t len; 2369 size_t len;
1483 int is_compressed; 2370 int is_compressed;
2371 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2372 op->set->content->elements);
2373 // Setting peer site to receiving peer
2374 op->peer_site = 1;
2375
2376 /**
2377 * Check that the message is received only in supported phase
2378 */
2379 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2380 if (GNUNET_OK !=
2381 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2382 {
2383 GNUNET_break (0);
2384 fail_union_operation (op);
2385 return;
2386 }
2387
2388 /** Only allow 1,2,4,8 SEs **/
2389 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2390 {
2391 LOG (GNUNET_ERROR_TYPE_ERROR,
2392 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2393 msg->se_count);
2394 GNUNET_break_op (0);
2395 fail_union_operation (op);
2396 return;
2397 }
1484 2398
1485 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( 2399 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1486 msg->header.type)); 2400 msg->header.type));
@@ -1490,8 +2404,20 @@ handle_union_p2p_strata_estimator (void *cls,
1490 GNUNET_NO); 2404 GNUNET_NO);
1491 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2405 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1492 other_size = GNUNET_ntohll (msg->set_size); 2406 other_size = GNUNET_ntohll (msg->set_size);
2407 op->remote_element_count = other_size;
2408
2409 if (op->byzantine_upper_bound < op->remote_element_count)
2410 {
2411 LOG (GNUNET_ERROR_TYPE_ERROR,
2412 "Exceeded configured upper bound <%lu> of element: %u\n",
2413 op->byzantine_upper_bound,
2414 op->remote_element_count);
2415 fail_union_operation (op);
2416 return;
2417 }
2418
1493 remote_se = strata_estimator_create (SE_STRATA_COUNT, 2419 remote_se = strata_estimator_create (SE_STRATA_COUNT,
1494 SE_IBF_SIZE, 2420 SE_IBFS_TOTAL_SIZE,
1495 SE_IBF_HASH_NUM); 2421 SE_IBF_HASH_NUM);
1496 if (NULL == remote_se) 2422 if (NULL == remote_se)
1497 { 2423 {
@@ -1503,6 +2429,8 @@ handle_union_p2p_strata_estimator (void *cls,
1503 strata_estimator_read (&msg[1], 2429 strata_estimator_read (&msg[1],
1504 len, 2430 len,
1505 is_compressed, 2431 is_compressed,
2432 msg->se_count,
2433 SE_IBFS_TOTAL_SIZE,
1506 remote_se)) 2434 remote_se))
1507 { 2435 {
1508 /* decompression failed */ 2436 /* decompression failed */
@@ -1511,11 +2439,76 @@ handle_union_p2p_strata_estimator (void *cls,
1511 return; 2439 return;
1512 } 2440 }
1513 GNUNET_assert (NULL != op->se); 2441 GNUNET_assert (NULL != op->se);
1514 diff = strata_estimator_difference (remote_se, 2442 strata_estimator_difference (remote_se,
1515 op->se); 2443 op->se);
2444
2445 /* Calculate remote local diff */
2446 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2447 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2448
2449 /* Prevent estimations from overshooting max element */
2450 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2451 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2452 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2453 diff_local = op->byzantine_upper_bound - op->local_element_count;
2454 if ((diff_remote < 0) || (diff_local < 0))
2455 {
2456 strata_estimator_destroy (remote_se);
2457 LOG (GNUNET_ERROR_TYPE_ERROR,
2458 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2459 "malicious: remote diff %ld, local diff: %ld\n",
2460 diff_remote, diff_local);
2461 GNUNET_break_op (0);
2462 fail_union_operation (op);
2463 return;
2464 }
1516 2465
1517 if (diff > 200) 2466 /* Make estimation more precise in initial sync cases */
1518 diff = diff * 3 / 2; 2467 if (0 == op->remote_element_count)
2468 {
2469 diff_remote = 0;
2470 diff_local = op->local_element_count;
2471 }
2472 if (0 == op->local_element_count)
2473 {
2474 diff_local = 0;
2475 diff_remote = op->remote_element_count;
2476 }
2477
2478 diff = diff_remote + diff_local;
2479 op->remote_set_diff = diff_remote;
2480
2481 /** Calculate avg element size if not initial sync **/
2482 uint64_t avg_element_size = 0;
2483 if (0 < op->local_element_count)
2484 {
2485 op->total_elements_size_local = 0;
2486 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2487 &
2488 determinate_avg_element_size_iterator,
2489 op);
2490 avg_element_size = op->total_elements_size_local / op->local_element_count;
2491 }
2492
2493 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2494 GNUNET_CONTAINER_multihashmap_size (
2495 op->set->content->
2496 elements),
2497 op->
2498 remote_element_count,
2499 diff_remote,
2500 diff_local,
2501 op->
2502 rtt_bandwidth_tradeoff,
2503 op->
2504 ibf_bucket_number_factor);
2505
2506#if MEASURE_PERFORMANCE
2507 perf_store.se_diff_local = diff_local;
2508 perf_store.se_diff_remote = diff_remote;
2509 perf_store.se_diff = diff;
2510 perf_store.mode_of_operation = op->mode_of_operation;
2511#endif
1519 2512
1520 strata_estimator_destroy (remote_se); 2513 strata_estimator_destroy (remote_se);
1521 strata_estimator_destroy (op->se); 2514 strata_estimator_destroy (op->se);
@@ -1523,7 +2516,8 @@ handle_union_p2p_strata_estimator (void *cls,
1523 LOG (GNUNET_ERROR_TYPE_DEBUG, 2516 LOG (GNUNET_ERROR_TYPE_DEBUG,
1524 "got se diff=%d, using ibf size %d\n", 2517 "got se diff=%d, using ibf size %d\n",
1525 diff, 2518 diff,
1526 1U << get_order_from_difference (diff)); 2519 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2520 op->ibf_bucket_number_factor));
1527 2521
1528 { 2522 {
1529 char *set_debug; 2523 char *set_debug;
@@ -1546,16 +2540,8 @@ handle_union_p2p_strata_estimator (void *cls,
1546 return; 2540 return;
1547 } 2541 }
1548 2542
1549 LOG (GNUNET_ERROR_TYPE_ERROR,
1550 "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff);
1551
1552
1553 /**
1554 * Added rtt_bandwidth_tradeoff directly need future improvements
1555 */
1556 if ((GNUNET_YES == op->force_full) || 2543 if ((GNUNET_YES == op->force_full) ||
1557 (diff > op->initial_size / 4) || 2544 (op->mode_of_operation != DIFFERENTIAL_SYNC))
1558 (0 == other_size))
1559 { 2545 {
1560 LOG (GNUNET_ERROR_TYPE_DEBUG, 2546 LOG (GNUNET_ERROR_TYPE_DEBUG,
1561 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", 2547 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
@@ -1565,9 +2551,17 @@ handle_union_p2p_strata_estimator (void *cls,
1565 "# of full sends", 2551 "# of full sends",
1566 1, 2552 1,
1567 GNUNET_NO); 2553 GNUNET_NO);
1568 if ((op->initial_size <= other_size) || 2554 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
1569 (0 == other_size))
1570 { 2555 {
2556 struct TransmitFullMessage *signal_msg;
2557 struct GNUNET_MQ_Envelope *ev;
2558 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2559 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL);
2560 signal_msg->remote_set_difference = htonl (diff_local);
2561 signal_msg->remote_set_size = htonl (op->local_element_count);
2562 signal_msg->local_set_difference = htonl (diff_remote);
2563 GNUNET_MQ_send (op->mq,
2564 ev);
1571 send_full_set (op); 2565 send_full_set (op);
1572 } 2566 }
1573 else 2567 else
@@ -1577,9 +2571,15 @@ handle_union_p2p_strata_estimator (void *cls,
1577 LOG (GNUNET_ERROR_TYPE_DEBUG, 2571 LOG (GNUNET_ERROR_TYPE_DEBUG,
1578 "Telling other peer that we expect its full set\n"); 2572 "Telling other peer that we expect its full set\n");
1579 op->phase = PHASE_FULL_RECEIVING; 2573 op->phase = PHASE_FULL_RECEIVING;
1580 perf_rtt.request_full.sent += 1; 2574#if MEASURE_PERFORMANCE
1581 ev = GNUNET_MQ_msg_header ( 2575 perf_store.request_full.sent += 1;
1582 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); 2576#endif
2577 struct TransmitFullMessage *signal_msg;
2578 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2579 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
2580 signal_msg->remote_set_difference = htonl (diff_local);
2581 signal_msg->remote_set_size = htonl (op->local_element_count);
2582 signal_msg->local_set_difference = htonl (diff_remote);
1583 GNUNET_MQ_send (op->mq, 2583 GNUNET_MQ_send (op->mq,
1584 ev); 2584 ev);
1585 } 2585 }
@@ -1592,7 +2592,9 @@ handle_union_p2p_strata_estimator (void *cls,
1592 GNUNET_NO); 2592 GNUNET_NO);
1593 if (GNUNET_OK != 2593 if (GNUNET_OK !=
1594 send_ibf (op, 2594 send_ibf (op,
1595 get_order_from_difference (diff))) 2595 get_size_from_difference (diff,
2596 op->ibf_number_buckets_per_element,
2597 op->ibf_bucket_number_factor)))
1596 { 2598 {
1597 /* Internal error, best we can do is shut the connection */ 2599 /* Internal error, best we can do is shut the connection */
1598 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2600 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1625,15 +2627,64 @@ send_offers_iterator (void *cls,
1625 2627
1626 /* Detect 32-bit key collision for the 64-bit IBF keys. */ 2628 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1627 if (ke->ibf_key.key_val != sec->ibf_key.key_val) 2629 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2630 {
2631 op->active_passive_switch_required = true;
1628 return GNUNET_YES; 2632 return GNUNET_YES;
2633 }
1629 2634
1630 perf_rtt.offer.sent += 1; 2635 /* Prevent implementation from sending a offer multiple times in case of roll switch */
1631 perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); 2636 if (GNUNET_YES ==
2637 is_message_in_message_control_flow (
2638 op->message_control_flow,
2639 &ke->element->element_hash,
2640 OFFER_MESSAGE)
2641 )
2642 {
2643 LOG (GNUNET_ERROR_TYPE_DEBUG,
2644 "Skipping already sent processed element offer!\n");
2645 return GNUNET_YES;
2646 }
1632 2647
2648 /* Save send offer message for message control */
2649 if (GNUNET_YES !=
2650 update_message_control_flow (
2651 op->message_control_flow,
2652 MSG_CFS_SENT,
2653 &ke->element->element_hash,
2654 OFFER_MESSAGE)
2655 )
2656 {
2657 LOG (GNUNET_ERROR_TYPE_ERROR,
2658 "Double offer message sent found!\n");
2659 GNUNET_break (0);
2660 fail_union_operation (op);
2661 return GNUNET_NO;
2662 }
2663 ;
2664
2665 /* Mark element to be expected to received */
2666 if (GNUNET_YES !=
2667 update_message_control_flow (
2668 op->message_control_flow,
2669 MSG_CFS_EXPECTED,
2670 &ke->element->element_hash,
2671 DEMAND_MESSAGE)
2672 )
2673 {
2674 LOG (GNUNET_ERROR_TYPE_ERROR,
2675 "Double demand received found!\n");
2676 GNUNET_break (0);
2677 fail_union_operation (op);
2678 return GNUNET_NO;
2679 }
2680 ;
2681#if MEASURE_PERFORMANCE
2682 perf_store.offer.sent += 1;
2683 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2684#endif
1633 ev = GNUNET_MQ_msg_header_extra (mh, 2685 ev = GNUNET_MQ_msg_header_extra (mh,
1634 sizeof(struct GNUNET_HashCode), 2686 sizeof(struct GNUNET_HashCode),
1635 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); 2687 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
1636
1637 GNUNET_assert (NULL != ev); 2688 GNUNET_assert (NULL != ev);
1638 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; 2689 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1639 LOG (GNUNET_ERROR_TYPE_DEBUG, 2690 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1651,7 +2702,7 @@ send_offers_iterator (void *cls,
1651 * @param op union operation 2702 * @param op union operation
1652 * @param ibf_key IBF key of interest 2703 * @param ibf_key IBF key of interest
1653 */ 2704 */
1654static void 2705void
1655send_offers_for_key (struct Operation *op, 2706send_offers_for_key (struct Operation *op,
1656 struct IBF_Key ibf_key) 2707 struct IBF_Key ibf_key)
1657{ 2708{
@@ -1694,6 +2745,7 @@ decode_and_send (struct Operation *op)
1694 /* allocation failed */ 2745 /* allocation failed */
1695 return GNUNET_SYSERR; 2746 return GNUNET_SYSERR;
1696 } 2747 }
2748
1697 diff_ibf = ibf_dup (op->local_ibf); 2749 diff_ibf = ibf_dup (op->local_ibf);
1698 ibf_subtract (diff_ibf, 2750 ibf_subtract (diff_ibf,
1699 op->remote_ibf); 2751 op->remote_ibf);
@@ -1706,7 +2758,7 @@ decode_and_send (struct Operation *op)
1706 diff_ibf->size); 2758 diff_ibf->size);
1707 2759
1708 num_decoded = 0; 2760 num_decoded = 0;
1709 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ 2761 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1710 2762
1711 while (1) 2763 while (1)
1712 { 2764 {
@@ -1738,23 +2790,36 @@ decode_and_send (struct Operation *op)
1738 if ((GNUNET_SYSERR == res) || 2790 if ((GNUNET_SYSERR == res) ||
1739 (GNUNET_YES == cycle_detected)) 2791 (GNUNET_YES == cycle_detected))
1740 { 2792 {
1741 int next_order; 2793 uint32_t next_size;
1742 next_order = 0; 2794 /** Enforce odd ibf size **/
1743 while (1 << next_order < diff_ibf->size) 2795
1744 next_order++; 2796 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
1745 next_order++; 2797 diff_ibf->size);
1746 if (next_order <= MAX_IBF_ORDER) 2798 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2799 * Elias Summermatter (2021) in section 3.11 **/
2800 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2801
2802 if (next_size<ibf_min_size)
2803 next_size = ibf_min_size;
2804
2805
2806 if (next_size <= MAX_IBF_SIZE)
1747 { 2807 {
1748 LOG (GNUNET_ERROR_TYPE_DEBUG, 2808 LOG (GNUNET_ERROR_TYPE_DEBUG,
1749 "decoding failed, sending larger ibf (size %u)\n", 2809 "decoding failed, sending larger ibf (size %u)\n",
1750 1 << next_order); 2810 next_size);
1751 GNUNET_STATISTICS_update (_GSS_statistics, 2811 GNUNET_STATISTICS_update (_GSS_statistics,
1752 "# of IBF retries", 2812 "# of IBF retries",
1753 1, 2813 1,
1754 GNUNET_NO); 2814 GNUNET_NO);
1755 op->salt_send++; 2815#if MEASURE_PERFORMANCE
2816 perf_store.active_passive_switches += 1;
2817#endif
2818
2819 op->salt_send = op->salt_receive++;
2820
1756 if (GNUNET_OK != 2821 if (GNUNET_OK !=
1757 send_ibf (op, next_order)) 2822 send_ibf (op, next_size))
1758 { 2823 {
1759 /* Internal error, best we can do is shut the connection */ 2824 /* Internal error, best we can do is shut the connection */
1760 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2825 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1786,7 +2851,9 @@ decode_and_send (struct Operation *op)
1786 LOG (GNUNET_ERROR_TYPE_DEBUG, 2851 LOG (GNUNET_ERROR_TYPE_DEBUG,
1787 "transmitted all values, sending DONE\n"); 2852 "transmitted all values, sending DONE\n");
1788 2853
1789 perf_rtt.done.sent += 1; 2854#if MEASURE_PERFORMANCE
2855 perf_store.done.sent += 1;
2856#endif
1790 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 2857 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1791 GNUNET_MQ_send (op->mq, ev); 2858 GNUNET_MQ_send (op->mq, ev);
1792 /* We now wait until we get a DONE message back 2859 /* We now wait until we get a DONE message back
@@ -1797,7 +2864,6 @@ decode_and_send (struct Operation *op)
1797 if (1 == side) 2864 if (1 == side)
1798 { 2865 {
1799 struct IBF_Key unsalted_key; 2866 struct IBF_Key unsalted_key;
1800
1801 unsalt_key (&key, 2867 unsalt_key (&key,
1802 op->salt_receive, 2868 op->salt_receive,
1803 &unsalted_key); 2869 &unsalted_key);
@@ -1809,8 +2875,29 @@ decode_and_send (struct Operation *op)
1809 struct GNUNET_MQ_Envelope *ev; 2875 struct GNUNET_MQ_Envelope *ev;
1810 struct InquiryMessage *msg; 2876 struct InquiryMessage *msg;
1811 2877
1812 perf_rtt.inquery.sent += 1; 2878#if MEASURE_PERFORMANCE
1813 perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key); 2879 perf_store.inquery.sent += 1;
2880 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2881#endif
2882
2883 /** Add sent inquiries to hashmap for flow control **/
2884 struct GNUNET_HashContext *hashed_key_context =
2885 GNUNET_CRYPTO_hash_context_start ();
2886 struct GNUNET_HashCode *hashed_key = (struct
2887 GNUNET_HashCode*) GNUNET_malloc (
2888 sizeof(struct GNUNET_HashCode));
2889 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT;
2890 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2891 &key,
2892 sizeof(struct IBF_Key));
2893 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2894 hashed_key);
2895 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2896 hashed_key,
2897 &mcfs,
2898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
2899 );
2900
1814 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth 2901 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1815 * the effort additional complexity. */ 2902 * the effort additional complexity. */
1816 ev = GNUNET_MQ_msg_extra (msg, 2903 ev = GNUNET_MQ_msg_extra (msg,
@@ -1836,6 +2923,100 @@ decode_and_send (struct Operation *op)
1836 2923
1837 2924
1838/** 2925/**
2926 * Check send full message received from other peer
2927 * @param cls
2928 * @param msg
2929 * @return
2930 */
2931
2932static int
2933check_union_p2p_send_full (void *cls,
2934 const struct TransmitFullMessage *msg)
2935{
2936 return GNUNET_OK;
2937}
2938
2939
2940/**
2941 * Handle send full message received from other peer
2942 *
2943 * @param cls
2944 * @param msg
2945 */
2946static void
2947handle_union_p2p_send_full (void *cls,
2948 const struct TransmitFullMessage *msg)
2949{
2950 struct Operation *op = cls;
2951
2952 /**
2953 * Check that the message is received only in supported phase
2954 */
2955 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2956 if (GNUNET_OK !=
2957 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2958 {
2959 GNUNET_break (0);
2960 fail_union_operation (op);
2961 return;
2962 }
2963
2964 /** write received values to operator**/
2965 op->remote_element_count = ntohl (msg->remote_set_size);
2966 op->remote_set_diff = ntohl (msg->remote_set_difference);
2967 op->local_set_diff = ntohl (msg->local_set_difference);
2968
2969 /** Check byzantine limits **/
2970 if (check_byzantine_bounds (op) != GNUNET_OK)
2971 {
2972 LOG (GNUNET_ERROR_TYPE_ERROR,
2973 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2974 "criteria\n");
2975 GNUNET_break_op (0);
2976 fail_union_operation (op);
2977 return;
2978 }
2979
2980 /** Calculate avg element size if not initial sync **/
2981 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2982 op->set->content->elements);
2983 uint64_t avg_element_size = 0;
2984 if (0 < op->local_element_count)
2985 {
2986 op->total_elements_size_local = 0;
2987 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2988 &
2989 determinate_avg_element_size_iterator,
2990 op);
2991 avg_element_size = op->total_elements_size_local / op->local_element_count;
2992 }
2993
2994 /** Validate mode of operation **/
2995 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2996 op->
2997 remote_element_count,
2998 op->
2999 local_element_count,
3000 op->local_set_diff,
3001 op->remote_set_diff,
3002 op->
3003 rtt_bandwidth_tradeoff,
3004 op->
3005 ibf_bucket_number_factor);
3006 if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation)
3007 {
3008 LOG (GNUNET_ERROR_TYPE_ERROR,
3009 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3010 " : %d\n", mode_of_operation);
3011 GNUNET_break_op (0);
3012 fail_union_operation (op);
3013 return;
3014 }
3015 op->phase = PHASE_FULL_RECEIVING;
3016}
3017
3018
3019/**
1839 * Check an IBF message from a remote peer. 3020 * Check an IBF message from a remote peer.
1840 * 3021 *
1841 * Reassemble the IBF from multiple pieces, and 3022 * Reassemble the IBF from multiple pieces, and
@@ -1872,7 +3053,8 @@ check_union_p2p_ibf (void *cls,
1872 GNUNET_break_op (0); 3053 GNUNET_break_op (0);
1873 return GNUNET_SYSERR; 3054 return GNUNET_SYSERR;
1874 } 3055 }
1875 if (1 << msg->order != op->remote_ibf->size) 3056
3057 if (msg->ibf_size != op->remote_ibf->size)
1876 { 3058 {
1877 GNUNET_break_op (0); 3059 GNUNET_break_op (0);
1878 return GNUNET_SYSERR; 3060 return GNUNET_SYSERR;
@@ -1909,9 +3091,26 @@ handle_union_p2p_ibf (void *cls,
1909{ 3091{
1910 struct Operation *op = cls; 3092 struct Operation *op = cls;
1911 unsigned int buckets_in_message; 3093 unsigned int buckets_in_message;
3094 /**
3095 * Check that the message is received only in supported phase
3096 */
3097 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3098 PHASE_PASSIVE_DECODING};
3099 if (GNUNET_OK !=
3100 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3101 {
3102 GNUNET_break (0);
3103 fail_union_operation (op);
3104 return;
3105 }
3106 op->differential_sync_iterations++;
3107 check_max_differential_rounds (op);
3108 op->active_passive_switch_required = false;
1912 3109
1913 perf_rtt.ibf.received += 1; 3110#if MEASURE_PERFORMANCE
1914 perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); 3111 perf_store.ibf.received += 1;
3112 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3113#endif
1915 3114
1916 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) 3115 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1917 / IBF_BUCKET_SIZE; 3116 / IBF_BUCKET_SIZE;
@@ -1922,8 +3121,10 @@ handle_union_p2p_ibf (void *cls,
1922 GNUNET_assert (NULL == op->remote_ibf); 3121 GNUNET_assert (NULL == op->remote_ibf);
1923 LOG (GNUNET_ERROR_TYPE_DEBUG, 3122 LOG (GNUNET_ERROR_TYPE_DEBUG,
1924 "Creating new ibf of size %u\n", 3123 "Creating new ibf of size %u\n",
1925 1 << msg->order); 3124 ntohl (msg->ibf_size));
1926 op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); 3125 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3126 op->remote_ibf = ibf_create (msg->ibf_size,
3127 ((uint8_t) op->ibf_number_buckets_per_element));
1927 op->salt_receive = ntohl (msg->salt); 3128 op->salt_receive = ntohl (msg->salt);
1928 LOG (GNUNET_ERROR_TYPE_DEBUG, 3129 LOG (GNUNET_ERROR_TYPE_DEBUG,
1929 "Receiving new IBF with salt %u\n", 3130 "Receiving new IBF with salt %u\n",
@@ -1954,7 +3155,7 @@ handle_union_p2p_ibf (void *cls,
1954 ibf_read_slice (&msg[1], 3155 ibf_read_slice (&msg[1],
1955 op->ibf_buckets_received, 3156 op->ibf_buckets_received,
1956 buckets_in_message, 3157 buckets_in_message,
1957 op->remote_ibf); 3158 op->remote_ibf, msg->ibf_counter_bit_length);
1958 op->ibf_buckets_received += buckets_in_message; 3159 op->ibf_buckets_received += buckets_in_message;
1959 3160
1960 if (op->ibf_buckets_received == op->remote_ibf->size) 3161 if (op->ibf_buckets_received == op->remote_ibf->size)
@@ -2030,18 +3231,24 @@ maybe_finish (struct Operation *op)
2030 3231
2031 num_demanded = GNUNET_CONTAINER_multihashmap_size ( 3232 num_demanded = GNUNET_CONTAINER_multihashmap_size (
2032 op->demanded_hashes); 3233 op->demanded_hashes);
2033 3234 int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3235 op->message_control_flow,
3236 &
3237 determinate_done_message_iterator,
3238 op);
2034 if (PHASE_FINISH_WAITING == op->phase) 3239 if (PHASE_FINISH_WAITING == op->phase)
2035 { 3240 {
2036 LOG (GNUNET_ERROR_TYPE_DEBUG, 3241 LOG (GNUNET_ERROR_TYPE_DEBUG,
2037 "In PHASE_FINISH_WAITING, pending %u demands\n", 3242 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
2038 num_demanded); 3243 num_demanded, op->peer_site);
2039 if (0 == num_demanded) 3244 if (-1 != send_done)
2040 { 3245 {
2041 struct GNUNET_MQ_Envelope *ev; 3246 struct GNUNET_MQ_Envelope *ev;
2042 3247
2043 op->phase = PHASE_FINISHED; 3248 op->phase = PHASE_FINISHED;
2044 perf_rtt.done.sent += 1; 3249#if MEASURE_PERFORMANCE
3250 perf_store.done.sent += 1;
3251#endif
2045 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 3252 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
2046 GNUNET_MQ_send (op->mq, 3253 GNUNET_MQ_send (op->mq,
2047 ev); 3254 ev);
@@ -2052,9 +3259,9 @@ maybe_finish (struct Operation *op)
2052 if (PHASE_FINISH_CLOSING == op->phase) 3259 if (PHASE_FINISH_CLOSING == op->phase)
2053 { 3260 {
2054 LOG (GNUNET_ERROR_TYPE_DEBUG, 3261 LOG (GNUNET_ERROR_TYPE_DEBUG,
2055 "In PHASE_FINISH_CLOSING, pending %u demands\n", 3262 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
2056 num_demanded); 3263 num_demanded, op->peer_site);
2057 if (0 == num_demanded) 3264 if (-1 != send_done)
2058 { 3265 {
2059 op->phase = PHASE_FINISHED; 3266 op->phase = PHASE_FINISHED;
2060 send_client_done (op); 3267 send_client_done (op);
@@ -2102,11 +3309,25 @@ handle_union_p2p_elements (void *cls,
2102 struct KeyEntry *ke; 3309 struct KeyEntry *ke;
2103 uint16_t element_size; 3310 uint16_t element_size;
2104 3311
3312 /**
3313 * Check that the message is received only in supported phase
3314 */
3315 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3316 PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING};
3317 if (GNUNET_OK !=
3318 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3319 {
3320 GNUNET_break (0);
3321 fail_union_operation (op);
3322 return;
3323 }
2105 3324
2106 element_size = ntohs (emsg->header.size) - sizeof(struct 3325 element_size = ntohs (emsg->header.size) - sizeof(struct
2107 GNUNET_SETU_ElementMessage); 3326 GNUNET_SETU_ElementMessage);
2108 perf_rtt.element.received += 1; 3327#if MEASURE_PERFORMANCE
2109 perf_rtt.element.received_var_bytes += element_size; 3328 perf_store.element.received += 1;
3329 perf_store.element.received_var_bytes += element_size;
3330#endif
2110 3331
2111 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3332 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2112 GNUNET_memcpy (&ee[1], 3333 GNUNET_memcpy (&ee[1],
@@ -2129,6 +3350,21 @@ handle_union_p2p_elements (void *cls,
2129 return; 3350 return;
2130 } 3351 }
2131 3352
3353 if (GNUNET_OK !=
3354 update_message_control_flow (
3355 op->message_control_flow,
3356 MSG_CFS_RECEIVED,
3357 &ee->element_hash,
3358 ELEMENT_MESSAGE)
3359 )
3360 {
3361 LOG (GNUNET_ERROR_TYPE_ERROR,
3362 "An element has been received more than once!\n");
3363 GNUNET_break (0);
3364 fail_union_operation (op);
3365 return;
3366 }
3367
2132 LOG (GNUNET_ERROR_TYPE_DEBUG, 3368 LOG (GNUNET_ERROR_TYPE_DEBUG,
2133 "Got element (size %u, hash %s) from peer\n", 3369 "Got element (size %u, hash %s) from peer\n",
2134 (unsigned int) element_size, 3370 (unsigned int) element_size,
@@ -2217,33 +3453,25 @@ handle_union_p2p_full_element (void *cls,
2217 struct KeyEntry *ke; 3453 struct KeyEntry *ke;
2218 uint16_t element_size; 3454 uint16_t element_size;
2219 3455
2220 3456 /**
2221 3457 * Check that the message is received only in supported phase
2222 if(PHASE_EXPECT_IBF == op->phase) { 3458 */
2223 op->phase = PHASE_FULL_RECEIVING; 3459 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
2224 } 3460 if (GNUNET_OK !=
2225 3461 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2226
2227
2228 /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */
2229 if ((PHASE_FULL_RECEIVING != op->phase) &&
2230 (PHASE_FULL_SENDING != op->phase))
2231 { 3462 {
2232 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 3463 GNUNET_break (0);
2233 "Handle full element phase is %u\n", 3464 fail_union_operation (op);
2234 (unsigned) op->phase); 3465 return;
2235 GNUNET_break_op (0);
2236 fail_union_operation (op);
2237 return;
2238 } 3466 }
2239 3467
2240
2241
2242 element_size = ntohs (emsg->header.size) 3468 element_size = ntohs (emsg->header.size)
2243 - sizeof(struct GNUNET_SETU_ElementMessage); 3469 - sizeof(struct GNUNET_SETU_ElementMessage);
2244 3470
2245 perf_rtt.element_full.received += 1; 3471#if MEASURE_PERFORMANCE
2246 perf_rtt.element_full.received_var_bytes += element_size; 3472 perf_store.element_full.received += 1;
3473 perf_store.element_full.received_var_bytes += element_size;
3474#endif
2247 3475
2248 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3476 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2249 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 3477 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
@@ -2268,17 +3496,15 @@ handle_union_p2p_full_element (void *cls,
2268 GNUNET_NO); 3496 GNUNET_NO);
2269 3497
2270 op->received_total++; 3498 op->received_total++;
2271
2272 ke = op_get_element (op, 3499 ke = op_get_element (op,
2273 &ee->element_hash); 3500 &ee->element_hash);
2274 if (NULL != ke) 3501 if (NULL != ke)
2275 { 3502 {
2276 /* Got repeated element. Should not happen since
2277 * we track demands. */
2278 GNUNET_STATISTICS_update (_GSS_statistics, 3503 GNUNET_STATISTICS_update (_GSS_statistics,
2279 "# repeated elements", 3504 "# repeated elements",
2280 1, 3505 1,
2281 GNUNET_NO); 3506 GNUNET_NO);
3507 full_sync_plausibility_check (op);
2282 ke->received = GNUNET_YES; 3508 ke->received = GNUNET_YES;
2283 GNUNET_free (ee); 3509 GNUNET_free (ee);
2284 } 3510 }
@@ -2294,15 +3520,15 @@ handle_union_p2p_full_element (void *cls,
2294 GNUNET_SETU_STATUS_ADD_LOCAL); 3520 GNUNET_SETU_STATUS_ADD_LOCAL);
2295 } 3521 }
2296 3522
3523
2297 if ((GNUNET_YES == op->byzantine) && 3524 if ((GNUNET_YES == op->byzantine) &&
2298 (op->received_total > 384 + op->received_fresh * 4) && 3525 (op->received_total > op->remote_element_count) )
2299 (op->received_fresh < op->received_total / 6))
2300 { 3526 {
2301 /* The other peer gave us lots of old elements, there's something wrong. */ 3527 /* The other peer gave us lots of old elements, there's something wrong. */
2302 LOG (GNUNET_ERROR_TYPE_ERROR, 3528 LOG (GNUNET_ERROR_TYPE_ERROR,
2303 "Other peer sent only %llu/%llu fresh elements, failing operation\n", 3529 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
2304 (unsigned long long) op->received_fresh, 3530 (unsigned long long) op->received_total,
2305 (unsigned long long) op->received_total); 3531 (unsigned long long) op->remote_element_count);
2306 GNUNET_break_op (0); 3532 GNUNET_break_op (0);
2307 fail_union_operation (op); 3533 fail_union_operation (op);
2308 return; 3534 return;
@@ -2356,18 +3582,50 @@ handle_union_p2p_inquiry (void *cls,
2356 const struct IBF_Key *ibf_key; 3582 const struct IBF_Key *ibf_key;
2357 unsigned int num_keys; 3583 unsigned int num_keys;
2358 3584
2359 perf_rtt.inquery.received += 1; 3585 /**
2360 perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage)); 3586 * Check that the message is received only in supported phase
3587 */
3588 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3589 if (GNUNET_OK !=
3590 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3591 {
3592 GNUNET_break (0);
3593 fail_union_operation (op);
3594 return;
3595 }
3596
3597#if MEASURE_PERFORMANCE
3598 perf_store.inquery.received += 1;
3599 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3600 - sizeof(struct InquiryMessage));
3601#endif
2361 3602
2362 LOG (GNUNET_ERROR_TYPE_DEBUG, 3603 LOG (GNUNET_ERROR_TYPE_DEBUG,
2363 "Received union inquiry\n"); 3604 "Received union inquiry\n");
2364 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) 3605 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2365 / sizeof(struct IBF_Key); 3606 / sizeof(struct IBF_Key);
2366 ibf_key = (const struct IBF_Key *) &msg[1]; 3607 ibf_key = (const struct IBF_Key *) &msg[1];
3608
3609 /** Add received inquiries to hashmap for flow control **/
3610 struct GNUNET_HashContext *hashed_key_context =
3611 GNUNET_CRYPTO_hash_context_start ();
3612 struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3613 sizeof(struct GNUNET_HashCode));;
3614 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED;
3615 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3616 &ibf_key,
3617 sizeof(struct IBF_Key));
3618 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3619 hashed_key);
3620 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3621 hashed_key,
3622 &mcfs,
3623 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
3624 );
3625
2367 while (0 != num_keys--) 3626 while (0 != num_keys--)
2368 { 3627 {
2369 struct IBF_Key unsalted_key; 3628 struct IBF_Key unsalted_key;
2370
2371 unsalt_key (ibf_key, 3629 unsalt_key (ibf_key,
2372 ntohl (msg->salt), 3630 ntohl (msg->salt),
2373 &unsalted_key); 3631 &unsalted_key);
@@ -2402,7 +3660,9 @@ send_missing_full_elements_iter (void *cls,
2402 3660
2403 if (GNUNET_YES == ke->received) 3661 if (GNUNET_YES == ke->received)
2404 return GNUNET_YES; 3662 return GNUNET_YES;
2405 perf_rtt.element_full.received += 1; 3663#if MEASURE_PERFORMANCE
3664 perf_store.element_full.received += 1;
3665#endif
2406 ev = GNUNET_MQ_msg_extra (emsg, 3666 ev = GNUNET_MQ_msg_extra (emsg,
2407 ee->element.size, 3667 ee->element.size,
2408 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 3668 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -2422,18 +3682,84 @@ send_missing_full_elements_iter (void *cls,
2422 * @param cls closure, a set union operation 3682 * @param cls closure, a set union operation
2423 * @param mh the demand message 3683 * @param mh the demand message
2424 */ 3684 */
3685static int
3686check_union_p2p_request_full (void *cls,
3687 const struct TransmitFullMessage *mh)
3688{
3689 return GNUNET_OK;
3690}
3691
3692
2425static void 3693static void
2426handle_union_p2p_request_full (void *cls, 3694handle_union_p2p_request_full (void *cls,
2427 const struct GNUNET_MessageHeader *mh) 3695 const struct TransmitFullMessage *msg)
2428{ 3696{
2429 struct Operation *op = cls; 3697 struct Operation *op = cls;
2430 3698
2431 perf_rtt.request_full.received += 1; 3699 /**
3700 * Check that the message is received only in supported phase
3701 */
3702 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3703 if (GNUNET_OK !=
3704 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3705 {
3706 GNUNET_break (0);
3707 fail_union_operation (op);
3708 return;
3709 }
2432 3710
2433 LOG (GNUNET_ERROR_TYPE_DEBUG, 3711 op->remote_element_count = ntohl (msg->remote_set_size);
3712 op->remote_set_diff = ntohl (msg->remote_set_difference);
3713 op->local_set_diff = ntohl (msg->local_set_difference);
3714
3715
3716 if (check_byzantine_bounds (op) != GNUNET_OK)
3717 {
3718 LOG (GNUNET_ERROR_TYPE_ERROR,
3719 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3720 "criteria\n");
3721 GNUNET_break_op (0);
3722 fail_union_operation (op);
3723 return;
3724 }
3725
3726#if MEASURE_PERFORMANCE
3727 perf_store.request_full.received += 1;
3728#endif
3729
3730 LOG (GNUNET_ERROR_TYPE_DEBUG,
2434 "Received request for full set transmission\n"); 3731 "Received request for full set transmission\n");
2435 if (PHASE_EXPECT_IBF != op->phase) 3732
3733 /** Calculate avg element size if not initial sync **/
3734 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3735 op->set->content->elements);
3736 uint64_t avg_element_size = 0;
3737 if (0 < op->local_element_count)
3738 {
3739 op->total_elements_size_local = 0;
3740 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3741 &
3742 determinate_avg_element_size_iterator,
3743 op);
3744 avg_element_size = op->total_elements_size_local / op->local_element_count;
3745 }
3746
3747 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3748 op->
3749 remote_element_count,
3750 op->
3751 local_element_count,
3752 op->local_set_diff,
3753 op->remote_set_diff,
3754 op->
3755 rtt_bandwidth_tradeoff,
3756 op->
3757 ibf_bucket_number_factor);
3758 if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation)
2436 { 3759 {
3760 LOG (GNUNET_ERROR_TYPE_ERROR,
3761 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3762 " : %d\n", mode_of_operation);
2437 GNUNET_break_op (0); 3763 GNUNET_break_op (0);
2438 fail_union_operation (op); 3764 fail_union_operation (op);
2439 return; 3765 return;
@@ -2458,7 +3784,21 @@ handle_union_p2p_full_done (void *cls,
2458{ 3784{
2459 struct Operation *op = cls; 3785 struct Operation *op = cls;
2460 3786
2461 perf_rtt.full_done.received += 1; 3787 /**
3788 * Check that the message is received only in supported phase
3789 */
3790 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3791 if (GNUNET_OK !=
3792 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3793 {
3794 GNUNET_break (0);
3795 fail_union_operation (op);
3796 return;
3797 }
3798
3799#if MEASURE_PERFORMANCE
3800 perf_store.full_done.received += 1;
3801#endif
2462 3802
2463 switch (op->phase) 3803 switch (op->phase)
2464 { 3804 {
@@ -2466,6 +3806,19 @@ handle_union_p2p_full_done (void *cls,
2466 { 3806 {
2467 struct GNUNET_MQ_Envelope *ev; 3807 struct GNUNET_MQ_Envelope *ev;
2468 3808
3809 if ((GNUNET_YES == op->byzantine) &&
3810 (op->received_total != op->remote_element_count) )
3811 {
3812 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3813 LOG (GNUNET_ERROR_TYPE_ERROR,
3814 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3815 (unsigned long long) op->received_total,
3816 (unsigned long long) op->remote_element_count);
3817 GNUNET_break_op (0);
3818 fail_union_operation (op);
3819 return;
3820 }
3821
2469 LOG (GNUNET_ERROR_TYPE_DEBUG, 3822 LOG (GNUNET_ERROR_TYPE_DEBUG,
2470 "got FULL DONE, sending elements that other peer is missing\n"); 3823 "got FULL DONE, sending elements that other peer is missing\n");
2471 3824
@@ -2473,7 +3826,9 @@ handle_union_p2p_full_done (void *cls,
2473 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, 3826 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
2474 &send_missing_full_elements_iter, 3827 &send_missing_full_elements_iter,
2475 op); 3828 op);
2476 perf_rtt.full_done.sent += 1; 3829#if MEASURE_PERFORMANCE
3830 perf_store.full_done.sent += 1;
3831#endif
2477 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 3832 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
2478 GNUNET_MQ_send (op->mq, 3833 GNUNET_MQ_send (op->mq,
2479 ev); 3834 ev);
@@ -2552,8 +3907,23 @@ handle_union_p2p_demand (void *cls,
2552 unsigned int num_hashes; 3907 unsigned int num_hashes;
2553 struct GNUNET_MQ_Envelope *ev; 3908 struct GNUNET_MQ_Envelope *ev;
2554 3909
2555 perf_rtt.demand.received += 1; 3910 /**
2556 perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 3911 * Check that the message is received only in supported phase
3912 */
3913 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3914 PHASE_FINISH_WAITING};
3915 if (GNUNET_OK !=
3916 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3917 {
3918 GNUNET_break (0);
3919 fail_union_operation (op);
3920 return;
3921 }
3922#if MEASURE_PERFORMANCE
3923 perf_store.demand.received += 1;
3924 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3925 GNUNET_MessageHeader));
3926#endif
2557 3927
2558 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 3928 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2559 / sizeof(struct GNUNET_HashCode); 3929 / sizeof(struct GNUNET_HashCode);
@@ -2570,6 +3940,39 @@ handle_union_p2p_demand (void *cls,
2570 fail_union_operation (op); 3940 fail_union_operation (op);
2571 return; 3941 return;
2572 } 3942 }
3943
3944 /* Save send demand message for message control */
3945 if (GNUNET_YES !=
3946 update_message_control_flow (
3947 op->message_control_flow,
3948 MSG_CFS_RECEIVED,
3949 &ee->element_hash,
3950 DEMAND_MESSAGE)
3951 )
3952 {
3953 LOG (GNUNET_ERROR_TYPE_ERROR,
3954 "Double demand message received found!\n");
3955 GNUNET_break (0);
3956 fail_union_operation (op);
3957 return;
3958 }
3959 ;
3960
3961 /* Mark element to be expected to received */
3962 if (GNUNET_YES !=
3963 update_message_control_flow (
3964 op->message_control_flow,
3965 MSG_CFS_SENT,
3966 &ee->element_hash,
3967 ELEMENT_MESSAGE)
3968 )
3969 {
3970 LOG (GNUNET_ERROR_TYPE_ERROR,
3971 "Double element message sent found!\n");
3972 GNUNET_break (0);
3973 fail_union_operation (op);
3974 return;
3975 }
2573 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) 3976 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2574 { 3977 {
2575 /* Probably confused lazily copied sets. */ 3978 /* Probably confused lazily copied sets. */
@@ -2577,8 +3980,10 @@ handle_union_p2p_demand (void *cls,
2577 fail_union_operation (op); 3980 fail_union_operation (op);
2578 return; 3981 return;
2579 } 3982 }
2580 perf_rtt.element.sent += 1; 3983#if MEASURE_PERFORMANCE
2581 perf_rtt.element.sent_var_bytes += ee->element.size; 3984 perf_store.element.sent += 1;
3985 perf_store.element.sent_var_bytes += ee->element.size;
3986#endif
2582 ev = GNUNET_MQ_msg_extra (emsg, 3987 ev = GNUNET_MQ_msg_extra (emsg,
2583 ee->element.size, 3988 ee->element.size,
2584 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); 3989 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
@@ -2600,9 +4005,10 @@ handle_union_p2p_demand (void *cls,
2600 if (op->symmetric) 4005 if (op->symmetric)
2601 send_client_element (op, 4006 send_client_element (op,
2602 &ee->element, 4007 &ee->element,
2603 GNUNET_SET_STATUS_ADD_REMOTE); 4008 GNUNET_SETU_STATUS_ADD_REMOTE);
2604 } 4009 }
2605 GNUNET_CADET_receive_done (op->channel); 4010 GNUNET_CADET_receive_done (op->channel);
4011 maybe_finish (op);
2606} 4012}
2607 4013
2608 4014
@@ -2653,9 +4059,23 @@ handle_union_p2p_offer (void *cls,
2653 struct Operation *op = cls; 4059 struct Operation *op = cls;
2654 const struct GNUNET_HashCode *hash; 4060 const struct GNUNET_HashCode *hash;
2655 unsigned int num_hashes; 4061 unsigned int num_hashes;
4062 /**
4063 * Check that the message is received only in supported phase
4064 */
4065 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4066 if (GNUNET_OK !=
4067 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4068 {
4069 GNUNET_break (0);
4070 fail_union_operation (op);
4071 return;
4072 }
2656 4073
2657 perf_rtt.offer.received += 1; 4074#if MEASURE_PERFORMANCE
2658 perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 4075 perf_store.offer.received += 1;
4076 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4077 GNUNET_MessageHeader));
4078#endif
2659 4079
2660 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 4080 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2661 / sizeof(struct GNUNET_HashCode); 4081 / sizeof(struct GNUNET_HashCode);
@@ -2693,8 +4113,53 @@ handle_union_p2p_offer (void *cls,
2693 "[OP %p] Requesting element (hash %s)\n", 4113 "[OP %p] Requesting element (hash %s)\n",
2694 op, GNUNET_h2s (hash)); 4114 op, GNUNET_h2s (hash));
2695 4115
2696 perf_rtt.demand.sent += 1; 4116#if MEASURE_PERFORMANCE
2697 perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); 4117 perf_store.demand.sent += 1;
4118 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4119#endif
4120 /* Save send demand message for message control */
4121 if (GNUNET_YES !=
4122 update_message_control_flow (
4123 op->message_control_flow,
4124 MSG_CFS_SENT,
4125 hash,
4126 DEMAND_MESSAGE))
4127 {
4128 LOG (GNUNET_ERROR_TYPE_ERROR,
4129 "Double demand message sent found!\n");
4130 GNUNET_break (0);
4131 fail_union_operation (op);
4132 return;
4133 }
4134
4135 /* Mark offer as received received */
4136 if (GNUNET_YES !=
4137 update_message_control_flow (
4138 op->message_control_flow,
4139 MSG_CFS_RECEIVED,
4140 hash,
4141 OFFER_MESSAGE))
4142 {
4143 LOG (GNUNET_ERROR_TYPE_ERROR,
4144 "Double offer message received found!\n");
4145 GNUNET_break (0);
4146 fail_union_operation (op);
4147 return;
4148 }
4149 /* Mark element to be expected to received */
4150 if (GNUNET_YES !=
4151 update_message_control_flow (
4152 op->message_control_flow,
4153 MSG_CFS_EXPECTED,
4154 hash,
4155 ELEMENT_MESSAGE))
4156 {
4157 LOG (GNUNET_ERROR_TYPE_ERROR,
4158 "Element already expected!\n");
4159 GNUNET_break (0);
4160 fail_union_operation (op);
4161 return;
4162 }
2698 ev = GNUNET_MQ_msg_header_extra (demands, 4163 ev = GNUNET_MQ_msg_header_extra (demands,
2699 sizeof(struct GNUNET_HashCode), 4164 sizeof(struct GNUNET_HashCode),
2700 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); 4165 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
@@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls,
2719{ 4184{
2720 struct Operation *op = cls; 4185 struct Operation *op = cls;
2721 4186
2722 perf_rtt.done.received += 1; 4187 /**
4188 * Check that the message is received only in supported phase
4189 */
4190 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4191 if (GNUNET_OK !=
4192 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4193 {
4194 GNUNET_break (0);
4195 fail_union_operation (op);
4196 return;
4197 }
4198
4199 if (op->active_passive_switch_required)
4200 {
4201 LOG (GNUNET_ERROR_TYPE_ERROR,
4202 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4203 GNUNET_break (0);
4204 fail_union_operation (op);
4205 return;
4206 }
4207
4208#if MEASURE_PERFORMANCE
4209 perf_store.done.received += 1;
4210#endif
2723 switch (op->phase) 4211 switch (op->phase)
2724 { 4212 {
2725 case PHASE_PASSIVE_DECODING: 4213 case PHASE_PASSIVE_DECODING:
@@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls,
2728 LOG (GNUNET_ERROR_TYPE_DEBUG, 4216 LOG (GNUNET_ERROR_TYPE_DEBUG,
2729 "got DONE (as passive partner), waiting for our demands to be satisfied\n"); 4217 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2730 /* The active peer is done sending offers 4218 /* The active peer is done sending offers
2731 * and inquiries. This means that all 4219 * and inquiries. This means that all
2732 * our responses to that (demands and offers) 4220 * our responses to that (demands and offers)
2733 * must be in flight (queued or in mesh). 4221 * must be in flight (queued or in mesh).
2734 * 4222 *
2735 * We should notify the active peer once 4223 * We should notify the active peer once
2736 * all our demands are satisfied, so that the active 4224 * all our demands are satisfied, so that the active
2737 * peer can quit if we gave it everything. 4225 * peer can quit if we gave it everything.
2738 */GNUNET_CADET_receive_done (op->channel); 4226 */GNUNET_CADET_receive_done (op->channel);
2739 maybe_finish (op); 4227 maybe_finish (op);
2740 return; 4228 return;
2741 case PHASE_ACTIVE_DECODING: 4229 case PHASE_ACTIVE_DECODING:
2742 LOG (GNUNET_ERROR_TYPE_DEBUG, 4230 LOG (GNUNET_ERROR_TYPE_DEBUG,
2743 "got DONE (as active partner), waiting to finish\n"); 4231 "got DONE (as active partner), waiting to finish\n");
2744 /* All demands of the other peer are satisfied, 4232 /* All demands of the other peer are satisfied,
2745 * and we processed all offers, thus we know 4233 * and we processed all offers, thus we know
2746 * exactly what our demands must be. 4234 * exactly what our demands must be.
2747 * 4235 *
2748 * We'll close the channel 4236 * We'll close the channel
2749 * to the other peer once our demands are met. 4237 * to the other peer once our demands are met.
2750 */op->phase = PHASE_FINISH_CLOSING; 4238 */op->phase = PHASE_FINISH_CLOSING;
2751 GNUNET_CADET_receive_done (op->channel); 4239 GNUNET_CADET_receive_done (op->channel);
2752 maybe_finish (op); 4240 maybe_finish (op);
2753 return; 4241 return;
@@ -2769,7 +4257,9 @@ static void
2769handle_union_p2p_over (void *cls, 4257handle_union_p2p_over (void *cls,
2770 const struct GNUNET_MessageHeader *mh) 4258 const struct GNUNET_MessageHeader *mh)
2771{ 4259{
2772 perf_rtt.over.received += 1; 4260#if MEASURE_PERFORMANCE
4261 perf_store.over.received += 1;
4262#endif
2773 send_client_done (cls); 4263 send_client_done (cls);
2774} 4264}
2775 4265
@@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls,
2943 struct Listener *listener = op->listener; 4433 struct Listener *listener = op->listener;
2944 const struct GNUNET_MessageHeader *nested_context; 4434 const struct GNUNET_MessageHeader *nested_context;
2945 4435
2946 /* double operation request */ 4436 /* double operation request */
2947 if (0 != op->suggest_id) 4437 if (0 != op->suggest_id)
2948 { 4438 {
2949 GNUNET_break_op (0); 4439 GNUNET_break_op (0);
@@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls,
3053 } 4543 }
3054 set = GNUNET_new (struct Set); 4544 set = GNUNET_new (struct Set);
3055 { 4545 {
3056 struct StrataEstimator *se; 4546 struct MultiStrataEstimator *se;
3057 4547
3058 se = strata_estimator_create (SE_STRATA_COUNT, 4548 se = strata_estimator_create (SE_STRATA_COUNT,
3059 SE_IBF_SIZE, 4549 SE_IBFS_TOTAL_SIZE,
3060 SE_IBF_HASH_NUM); 4550 SE_IBF_HASH_NUM);
3061 if (NULL == se) 4551 if (NULL == se)
3062 { 4552 {
@@ -3198,6 +4688,7 @@ channel_window_cb (void *cls,
3198 * @param cls client that sent the message 4688 * @param cls client that sent the message
3199 * @param msg message sent by the client 4689 * @param msg message sent by the client
3200 */ 4690 */
4691
3201static void 4692static void
3202handle_client_listen (void *cls, 4693handle_client_listen (void *cls,
3203 const struct GNUNET_SETU_ListenMessage *msg) 4694 const struct GNUNET_SETU_ListenMessage *msg)
@@ -3240,10 +4731,10 @@ handle_client_listen (void *cls,
3240 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4731 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3241 struct GNUNET_MessageHeader, 4732 struct GNUNET_MessageHeader,
3242 NULL), 4733 NULL),
3243 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4734 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3244 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4735 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3245 struct GNUNET_MessageHeader, 4736 struct TransmitFullMessage,
3246 NULL), 4737 NULL),
3247 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4738 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3248 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4739 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3249 struct StrataEstimatorMessage, 4740 struct StrataEstimatorMessage,
@@ -3256,6 +4747,10 @@ handle_client_listen (void *cls,
3256 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 4747 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3257 struct GNUNET_SETU_ElementMessage, 4748 struct GNUNET_SETU_ElementMessage,
3258 NULL), 4749 NULL),
4750 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4751 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
4752 struct TransmitFullMessage,
4753 NULL),
3259 GNUNET_MQ_handler_end () 4754 GNUNET_MQ_handler_end ()
3260 }; 4755 };
3261 struct Listener *listener; 4756 struct Listener *listener;
@@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls,
3451{ 4946{
3452 struct ClientState *cs = cls; 4947 struct ClientState *cs = cls;
3453 struct Operation *op = GNUNET_new (struct Operation); 4948 struct Operation *op = GNUNET_new (struct Operation);
4949
3454 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 4950 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3455 GNUNET_MQ_hd_var_size (incoming_msg, 4951 GNUNET_MQ_hd_var_size (incoming_msg,
3456 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 4952 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
@@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls,
3488 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4984 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3489 struct GNUNET_MessageHeader, 4985 struct GNUNET_MessageHeader,
3490 op), 4986 op),
3491 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4987 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3492 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4988 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3493 struct GNUNET_MessageHeader, 4989 struct TransmitFullMessage,
3494 op), 4990 op),
3495 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4991 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3496 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4992 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3497 struct StrataEstimatorMessage, 4993 struct StrataEstimatorMessage,
@@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls,
3504 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 5000 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3505 struct GNUNET_SETU_ElementMessage, 5001 struct GNUNET_SETU_ElementMessage,
3506 op), 5002 op),
5003 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5004 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
5005 struct TransmitFullMessage,
5006 NULL),
3507 GNUNET_MQ_handler_end () 5007 GNUNET_MQ_handler_end ()
3508 }; 5008 };
3509 struct Set *set; 5009 struct Set *set;
@@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls,
3525 op->force_full = msg->force_full; 5025 op->force_full = msg->force_full;
3526 op->force_delta = msg->force_delta; 5026 op->force_delta = msg->force_delta;
3527 op->symmetric = msg->symmetric; 5027 op->symmetric = msg->symmetric;
5028 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5029 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5030 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5031 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5032 op->active_passive_switch_required = false;
3528 context = GNUNET_MQ_extract_nested_mh (msg); 5033 context = GNUNET_MQ_extract_nested_mh (msg);
3529 5034
5035 /* create hashmap for message control */
5036 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5037 GNUNET_NO);
5038 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5039
5040#if MEASURE_PERFORMANCE
5041 /* load config */
5042 load_config (op);
5043#endif
5044
3530 /* Advance generation values, so that 5045 /* Advance generation values, so that
3531 mutations won't interfer with the running operation. */ 5046 mutations won't interfer with the running operation. */
3532 op->set = set; 5047 op->set = set;
@@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls,
3550 struct GNUNET_MQ_Envelope *ev; 5065 struct GNUNET_MQ_Envelope *ev;
3551 struct OperationRequestMessage *msg; 5066 struct OperationRequestMessage *msg;
3552 5067
3553 perf_rtt.operation_request.sent += 1; 5068#if MEASURE_PERFORMANCE
5069 perf_store.operation_request.sent += 1;
5070#endif
3554 ev = GNUNET_MQ_msg_nested_mh (msg, 5071 ev = GNUNET_MQ_msg_nested_mh (msg,
3555 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 5072 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3556 context); 5073 context);
@@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls,
3567 op->se = strata_estimator_dup (op->set->se); 5084 op->se = strata_estimator_dup (op->set->se);
3568 /* we started the operation, thus we have to send the operation request */ 5085 /* we started the operation, thus we have to send the operation request */
3569 op->phase = PHASE_EXPECT_SE; 5086 op->phase = PHASE_EXPECT_SE;
3570 op->salt_receive = op->salt_send = 42; // FIXME????? 5087
5088 op->salt_receive = (op->peer_site + 1) % 2;
5089 op->salt_send = op->peer_site; // FIXME?????
5090
5091
3571 LOG (GNUNET_ERROR_TYPE_DEBUG, 5092 LOG (GNUNET_ERROR_TYPE_DEBUG,
3572 "Initiating union operation evaluation\n"); 5093 "Initiating union operation evaluation\n");
3573 GNUNET_STATISTICS_update (_GSS_statistics, 5094 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -3711,6 +5232,20 @@ handle_client_accept (void *cls,
3711 op->force_full = msg->force_full; 5232 op->force_full = msg->force_full;
3712 op->force_delta = msg->force_delta; 5233 op->force_delta = msg->force_delta;
3713 op->symmetric = msg->symmetric; 5234 op->symmetric = msg->symmetric;
5235 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5236 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5237 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5238 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5239 op->active_passive_switch_required = false;
5240 /* create hashmap for message control */
5241 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5242 GNUNET_NO);
5243 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5244
5245#if MEASURE_PERFORMANCE
5246 /* load config */
5247 load_config (op);
5248#endif
3714 5249
3715 /* Advance generation values, so that future mutations do not 5250 /* Advance generation values, so that future mutations do not
3716 interfer with the running operation. */ 5251 interfer with the running operation. */
@@ -3729,7 +5264,7 @@ handle_client_accept (void *cls,
3729 1, 5264 1,
3730 GNUNET_NO); 5265 GNUNET_NO);
3731 { 5266 {
3732 const struct StrataEstimator *se; 5267 struct MultiStrataEstimator *se;
3733 struct GNUNET_MQ_Envelope *ev; 5268 struct GNUNET_MQ_Envelope *ev;
3734 struct StrataEstimatorMessage *strata_msg; 5269 struct StrataEstimatorMessage *strata_msg;
3735 char *buf; 5270 char *buf;
@@ -3739,20 +5274,40 @@ handle_client_accept (void *cls,
3739 op->se = strata_estimator_dup (op->set->se); 5274 op->se = strata_estimator_dup (op->set->se);
3740 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 5275 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3741 GNUNET_NO); 5276 GNUNET_NO);
3742 op->salt_receive = op->salt_send = 42; // FIXME????? 5277 op->salt_receive = (op->peer_site + 1) % 2;
5278 op->salt_send = op->peer_site; // FIXME?????
3743 initialize_key_to_element (op); 5279 initialize_key_to_element (op);
3744 op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( 5280 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3745 op->key_to_element); 5281 op->key_to_element);
3746 5282
3747 /* kick off the operation */ 5283 /* kick off the operation */
3748 se = op->se; 5284 se = op->se;
3749 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); 5285
5286 uint8_t se_count = 1;
5287 if (op->initial_size > 0)
5288 {
5289 op->total_elements_size_local = 0;
5290 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5291 &
5292 determinate_avg_element_size_iterator,
5293 op);
5294 se_count = determine_strata_count (
5295 op->total_elements_size_local / op->initial_size,
5296 op->initial_size);
5297 }
5298 buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5299 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
3750 len = strata_estimator_write (se, 5300 len = strata_estimator_write (se,
5301 SE_IBFS_TOTAL_SIZE,
5302 se_count,
3751 buf); 5303 buf);
3752 perf_rtt.se.sent += 1; 5304#if MEASURE_PERFORMANCE
3753 perf_rtt.se.sent_var_bytes += len; 5305 perf_store.se.sent += 1;
5306 perf_store.se.sent_var_bytes += len;
5307#endif
3754 5308
3755 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) 5309 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5310 * SE_IBFS_TOTAL_SIZE)
3756 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; 5311 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
3757 else 5312 else
3758 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; 5313 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
@@ -3766,6 +5321,7 @@ handle_client_accept (void *cls,
3766 strata_msg->set_size 5321 strata_msg->set_size
3767 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( 5322 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3768 op->set->content->elements)); 5323 op->set->content->elements));
5324 strata_msg->se_count = se_count;
3769 GNUNET_MQ_send (op->mq, 5325 GNUNET_MQ_send (op->mq,
3770 ev); 5326 ev);
3771 op->phase = PHASE_EXPECT_IBF; 5327 op->phase = PHASE_EXPECT_IBF;
@@ -3800,8 +5356,9 @@ shutdown_task (void *cls)
3800 GNUNET_YES); 5356 GNUNET_YES);
3801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5357 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3802 "handled shutdown request\n"); 5358 "handled shutdown request\n");
3803 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 5359#if MEASURE_PERFORMANCE
3804 "RTT:%f\n", calculate_perf_rtt()); 5360 calculate_perf_store ();
5361#endif
3805} 5362}
3806 5363
3807 5364
diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h
index a2803ee47..c2a166e60 100644
--- a/src/setu/gnunet-service-setu_protocol.h
+++ b/src/setu/gnunet-service-setu_protocol.h
@@ -40,11 +40,6 @@ struct OperationRequestMessage
40 struct GNUNET_MessageHeader header; 40 struct GNUNET_MessageHeader header;
41 41
42 /** 42 /**
43 * Operation to request, values from `enum GNUNET_SET_OperationType`
44 */
45 uint32_t operation GNUNET_PACKED;
46
47 /**
48 * For Intersection: my element count 43 * For Intersection: my element count
49 */ 44 */
50 uint32_t element_count GNUNET_PACKED; 45 uint32_t element_count GNUNET_PACKED;
@@ -72,20 +67,9 @@ struct IBFMessage
72 struct GNUNET_MessageHeader header; 67 struct GNUNET_MessageHeader header;
73 68
74 /** 69 /**
75 * Order of the whole ibf, where 70 * Size of the whole ibf (number of buckets)
76 * num_buckets = 2^order
77 */
78 uint8_t order;
79
80 /**
81 * Padding, must be 0.
82 */ 71 */
83 uint8_t reserved1; 72 uint32_t ibf_size;
84
85 /**
86 * Padding, must be 0.
87 */
88 uint16_t reserved2 GNUNET_PACKED;
89 73
90 /** 74 /**
91 * Offset of the strata in the rest of the message 75 * Offset of the strata in the rest of the message
@@ -95,12 +79,25 @@ struct IBFMessage
95 /** 79 /**
96 * Salt used when hashing elements for this IBF. 80 * Salt used when hashing elements for this IBF.
97 */ 81 */
98 uint32_t salt GNUNET_PACKED; 82 uint16_t salt GNUNET_PACKED;
99 83
84 /**
85 * The bit length of the counter
86 */
87 uint16_t ibf_counter_bit_length;
100 /* rest: buckets */ 88 /* rest: buckets */
101}; 89};
102 90
103 91
92/**
93estimate_best_mode_of_operation (uint64_t avg_element_size,
94uint64_t local_set_size,
95 uint64_t remote_set_size,
96uint64_t est_set_diff_remote,
97 uint64_t est_set_diff_local,)
98 **/
99
100
104struct InquiryMessage 101struct InquiryMessage
105{ 102{
106 /** 103 /**
@@ -113,11 +110,6 @@ struct InquiryMessage
113 */ 110 */
114 uint32_t salt GNUNET_PACKED; 111 uint32_t salt GNUNET_PACKED;
115 112
116 /**
117 * Reserved, set to 0.
118 */
119 uint32_t reserved GNUNET_PACKED;
120
121 /* rest: inquiry IBF keys */ 113 /* rest: inquiry IBF keys */
122}; 114};
123 115
@@ -218,9 +210,47 @@ struct StrataEstimatorMessage
218 */ 210 */
219 struct GNUNET_MessageHeader header; 211 struct GNUNET_MessageHeader header;
220 212
213 /**
214 * The number of ses transmitted
215 */
216 uint8_t se_count;
217
218 /**
219 * Size of the local set
220 */
221 uint64_t set_size; 221 uint64_t set_size;
222}; 222};
223 223
224
225/**
226 * Message which signals to other peer that we are sending full set
227 *
228 */
229struct TransmitFullMessage
230{
231 /**
232 * Type: #GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
233 */
234 struct GNUNET_MessageHeader header;
235
236 /**
237 * Remote set difference calculated with strata estimator
238 */
239 uint32_t remote_set_difference;
240
241 /**
242 * Total remote set size
243 */
244 uint32_t remote_set_size;
245
246 /**
247 * Local set difference calculated with strata estimator
248 */
249 uint32_t local_set_difference;
250
251};
252
253
224GNUNET_NETWORK_STRUCT_END 254GNUNET_NETWORK_STRUCT_END
225 255
226#endif 256#endif
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c
index 7c9a4deb6..7981cc847 100644
--- a/src/setu/gnunet-service-setu_strata_estimator.c
+++ b/src/setu/gnunet-service-setu_strata_estimator.c
@@ -22,6 +22,7 @@
22 * @brief invertible bloom filter 22 * @brief invertible bloom filter
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -30,6 +31,82 @@
30 31
31 32
32/** 33/**
34 * Should we try compressing the strata estimator? This will
35 * break compatibility with the 0.10.1-network.
36 */
37#define FAIL_10_1_COMPATIBILTIY 1
38
39/**
40 * Number of strata estimators in memory NOT transmitted
41 */
42
43#define MULTI_SE_BASE_COUNT 8
44
45/**
46 * The avg size of 1 se
47 * Based on the bsc thesis of Elias Summermatter (2021)
48 */
49
50#define AVG_BYTE_SIZE_SE 4221
51
52/**
53 * Calculates the optimal number of strata Estimators to send
54 * @param avg_element_size
55 * @param element_count
56 * @return
57 */
58uint8_t
59determine_strata_count (uint64_t avg_element_size, uint64_t element_count)
60{
61 uint64_t base_size = avg_element_size * element_count;
62 /* >67kb total size of elements in set */
63 if (base_size < AVG_BYTE_SIZE_SE * 16)
64 return 1;
65 /* >270kb total size of elements in set */
66 if (base_size < AVG_BYTE_SIZE_SE * 64)
67 return 2;
68 /* >1mb total size of elements in set */
69 if (base_size < AVG_BYTE_SIZE_SE * 256)
70 return 4;
71 return 8;
72}
73
74
75/**
76 * Modify an IBF key @a k_in based on the @a salt, returning a
77 * salted key in @a k_out.
78 */
79static void
80salt_key (const struct IBF_Key *k_in,
81 uint32_t salt,
82 struct IBF_Key *k_out)
83{
84 int s = (salt * 7) % 64;
85 uint64_t x = k_in->key_val;
86
87 /* rotate ibf key */
88 x = (x >> s) | (x << (64 - s));
89 k_out->key_val = x;
90}
91
92
93/**
94 * Reverse modification done in the salt_key function
95 */
96static void
97unsalt_key (const struct IBF_Key *k_in,
98 uint32_t salt,
99 struct IBF_Key *k_out)
100{
101 int s = (salt * 7) % 64;
102 uint64_t x = k_in->key_val;
103
104 x = (x << s) | (x >> (64 - s));
105 k_out->key_val = x;
106}
107
108
109/**
33 * Write the given strata estimator to the buffer. 110 * Write the given strata estimator to the buffer.
34 * 111 *
35 * @param se strata estimator to serialize 112 * @param se strata estimator to serialize
@@ -37,21 +114,33 @@
37 * @return number of bytes written to @a buf 114 * @return number of bytes written to @a buf
38 */ 115 */
39size_t 116size_t
40strata_estimator_write (const struct StrataEstimator *se, 117strata_estimator_write (struct MultiStrataEstimator *se,
118 uint16_t se_ibf_total_size,
119 uint8_t number_se_send,
41 void *buf) 120 void *buf)
42{ 121{
43 char *sbuf = buf; 122 char *sbuf = buf;
123 unsigned int i;
44 size_t osize; 124 size_t osize;
125 uint64_t sbuf_offset = 0;
126 se->size = number_se_send;
45 127
46 GNUNET_assert (NULL != se); 128 GNUNET_assert (NULL != se);
47 for (unsigned int i = 0; i < se->strata_count; i++) 129 for (uint8_t strata_ctr = 0; strata_ctr < number_se_send; strata_ctr++)
48 { 130 {
49 ibf_write_slice (se->strata[i], 131 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
50 0, 132 {
51 se->ibf_size, 133 ibf_write_slice (se->stratas[strata_ctr]->strata[i],
52 &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]); 134 0,
135 se->stratas[strata_ctr]->ibf_size,
136 &sbuf[sbuf_offset],
137 8);
138 sbuf_offset += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE;
139 }
53 } 140 }
54 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; 141 osize = ((se_ibf_total_size / 8) * number_se_send) * IBF_BUCKET_SIZE
142 * se->stratas[0]->strata_count;
143#if FAIL_10_1_COMPATIBILTIY
55 { 144 {
56 char *cbuf; 145 char *cbuf;
57 size_t nsize; 146 size_t nsize;
@@ -62,13 +151,12 @@ strata_estimator_write (const struct StrataEstimator *se,
62 &cbuf, 151 &cbuf,
63 &nsize)) 152 &nsize))
64 { 153 {
65 GNUNET_memcpy (buf, 154 GNUNET_memcpy (buf, cbuf, nsize);
66 cbuf,
67 nsize);
68 osize = nsize; 155 osize = nsize;
69 GNUNET_free (cbuf); 156 GNUNET_free (cbuf);
70 } 157 }
71 } 158 }
159#endif
72 return osize; 160 return osize;
73} 161}
74 162
@@ -87,15 +175,19 @@ int
87strata_estimator_read (const void *buf, 175strata_estimator_read (const void *buf,
88 size_t buf_len, 176 size_t buf_len,
89 int is_compressed, 177 int is_compressed,
90 struct StrataEstimator *se) 178 uint8_t number_se_received,
179 uint16_t se_ibf_total_size,
180 struct MultiStrataEstimator *se)
91{ 181{
182 unsigned int i;
92 size_t osize; 183 size_t osize;
93 char *dbuf; 184 char *dbuf;
94 185
95 dbuf = NULL; 186 dbuf = NULL;
96 if (GNUNET_YES == is_compressed) 187 if (GNUNET_YES == is_compressed)
97 { 188 {
98 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; 189 osize = ((se_ibf_total_size / 8) * number_se_received) * IBF_BUCKET_SIZE
190 * se->stratas[0]->strata_count;
99 dbuf = GNUNET_decompress (buf, 191 dbuf = GNUNET_decompress (buf,
100 buf_len, 192 buf_len,
101 osize); 193 osize);
@@ -108,18 +200,25 @@ strata_estimator_read (const void *buf,
108 buf_len = osize; 200 buf_len = osize;
109 } 201 }
110 202
111 if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE) 203 if (buf_len != se->stratas[0]->strata_count * ((se_ibf_total_size / 8)
204 * number_se_received)
205 * IBF_BUCKET_SIZE)
112 { 206 {
113 GNUNET_break (0); /* very odd error */ 207 GNUNET_break (0); /* very odd error */
114 GNUNET_free (dbuf); 208 GNUNET_free (dbuf);
115 return GNUNET_SYSERR; 209 return GNUNET_SYSERR;
116 } 210 }
117 211
118 for (unsigned int i = 0; i < se->strata_count; i++) 212 for (uint8_t strata_ctr = 0; strata_ctr < number_se_received; strata_ctr++)
119 { 213 {
120 ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); 214 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
121 buf += se->ibf_size * IBF_BUCKET_SIZE; 215 {
216 ibf_read_slice (buf, 0, se->stratas[strata_ctr]->ibf_size,
217 se->stratas[strata_ctr]->strata[i], 8);
218 buf += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE;
219 }
122 } 220 }
221 se->size = number_se_received;
123 GNUNET_free (dbuf); 222 GNUNET_free (dbuf);
124 return GNUNET_OK; 223 return GNUNET_OK;
125} 224}
@@ -132,38 +231,61 @@ strata_estimator_read (const void *buf,
132 * @param key key to add 231 * @param key key to add
133 */ 232 */
134void 233void
135strata_estimator_insert (struct StrataEstimator *se, 234strata_estimator_insert (struct MultiStrataEstimator *se,
136 struct IBF_Key key) 235 struct IBF_Key key)
137{ 236{
138 uint64_t v;
139 unsigned int i;
140 237
141 v = key.key_val; 238
142 /* count trailing '1'-bits of v */ 239 /* count trailing '1'-bits of v */
143 for (i = 0; v & 1; v >>= 1, i++) 240 for (int strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
144 /* empty */; 241 {
145 ibf_insert (se->strata[i], key); 242 unsigned int i;
243 uint64_t v;
244
245 struct IBF_Key salted_key;
246 salt_key (&key,
247 strata_ctr * (64 / MULTI_SE_BASE_COUNT),
248 &salted_key);
249 v = salted_key.key_val;
250 for (i = 0; v & 1; v >>= 1, i++)
251 {
252 ibf_insert (se->stratas[strata_ctr]->strata[i], salted_key);
253 }
254 }
255 /* empty */;
256
146} 257}
147 258
148 259
149/** 260/**
150 * Remove a key from the strata estimator. 261 * Remove a key from the strata estimator. (NOT USED)
151 * 262 *
152 * @param se strata estimator to remove the key from 263 * @param se strata estimator to remove the key from
153 * @param key key to remove 264 * @param key key to remove
154 */ 265 */
155void 266void
156strata_estimator_remove (struct StrataEstimator *se, 267strata_estimator_remove (struct MultiStrataEstimator *se,
157 struct IBF_Key key) 268 struct IBF_Key key)
158{ 269{
159 uint64_t v;
160 unsigned int i;
161 270
162 v = key.key_val;
163 /* count trailing '1'-bits of v */ 271 /* count trailing '1'-bits of v */
164 for (i = 0; v & 1; v >>= 1, i++) 272 for (int strata_ctr = 0; strata_ctr < se->size; strata_ctr++)
165 /* empty */; 273 {
166 ibf_remove (se->strata[i], key); 274 uint64_t v;
275 unsigned int i;
276
277 struct IBF_Key unsalted_key;
278 unsalt_key (&key,
279 strata_ctr * (64 / MULTI_SE_BASE_COUNT),
280 &unsalted_key);
281
282 v = unsalted_key.key_val;
283 for (i = 0; v & 1; v >>= 1, i++)
284 {
285 /* empty */;
286 ibf_remove (se->stratas[strata_ctr]->strata[i], unsalted_key);
287 }
288 }
167} 289}
168 290
169 291
@@ -175,29 +297,42 @@ strata_estimator_remove (struct StrataEstimator *se,
175 * @param ibf_hashnum hashnum parameter of each ibf 297 * @param ibf_hashnum hashnum parameter of each ibf
176 * @return a freshly allocated, empty strata estimator, NULL on error 298 * @return a freshly allocated, empty strata estimator, NULL on error
177 */ 299 */
178struct StrataEstimator * 300struct MultiStrataEstimator *
179strata_estimator_create (unsigned int strata_count, 301strata_estimator_create (unsigned int strata_count,
180 uint32_t ibf_size, 302 uint32_t ibf_size,
181 uint8_t ibf_hashnum) 303 uint8_t ibf_hashnum)
182{ 304{
183 struct StrataEstimator *se; 305 struct MultiStrataEstimator *se;
184 306 unsigned int i;
185 se = GNUNET_new (struct StrataEstimator); 307 unsigned int j;
186 se->strata_count = strata_count; 308 se = GNUNET_new (struct MultiStrataEstimator);
187 se->ibf_size = ibf_size; 309
188 se->strata = GNUNET_new_array (strata_count, 310 se->size = MULTI_SE_BASE_COUNT;
189 struct InvertibleBloomFilter *); 311 se->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *);
190 for (unsigned int i = 0; i < strata_count; i++) 312
313 uint8_t ibf_prime_sizes[] = {79,79,79,79,79,79,79,79};
314
315 for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
191 { 316 {
192 se->strata[i] = ibf_create (ibf_size, ibf_hashnum); 317 se->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator);
193 if (NULL == se->strata[i]) 318 se->stratas[strata_ctr]->strata_count = strata_count;
319 se->stratas[strata_ctr]->ibf_size = ibf_prime_sizes[strata_ctr];
320 se->stratas[strata_ctr]->strata = GNUNET_new_array (strata_count * 4,
321 struct
322 InvertibleBloomFilter *);
323 for (i = 0; i < strata_count; i++)
194 { 324 {
195 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 325 se->stratas[strata_ctr]->strata[i] = ibf_create (
196 "Failed to allocate memory for strata estimator\n"); 326 ibf_prime_sizes[strata_ctr], ibf_hashnum);
197 for (unsigned int j = 0; j < i; j++) 327 if (NULL == se->stratas[strata_ctr]->strata[i])
198 ibf_destroy (se->strata[i]); 328 {
199 GNUNET_free (se); 329 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
200 return NULL; 330 "Failed to allocate memory for strata estimator\n");
331 for (j = 0; j < i; j++)
332 ibf_destroy (se->stratas[strata_ctr]->strata[i]);
333 GNUNET_free (se);
334 return NULL;
335 }
201 } 336 }
202 } 337 }
203 return se; 338 return se;
@@ -213,46 +348,71 @@ strata_estimator_create (unsigned int strata_count,
213 * @param se2 second strata estimator 348 * @param se2 second strata estimator
214 * @return the estimated difference 349 * @return the estimated difference
215 */ 350 */
216unsigned int 351void
217strata_estimator_difference (const struct StrataEstimator *se1, 352strata_estimator_difference (const struct MultiStrataEstimator *se1,
218 const struct StrataEstimator *se2) 353 const struct MultiStrataEstimator *se2)
219{ 354{
220 unsigned int count; 355 int avg_local_diff = 0;
356 int avg_remote_diff = 0;
357 uint8_t number_of_estimators = se1->size;
221 358
222 GNUNET_assert (se1->strata_count == se2->strata_count); 359 for (uint8_t strata_ctr = 0; strata_ctr < number_of_estimators; strata_ctr++)
223 count = 0;
224 for (int i = se1->strata_count - 1; i >= 0; i--)
225 { 360 {
226 struct InvertibleBloomFilter *diff; 361 GNUNET_assert (se1->stratas[strata_ctr]->strata_count ==
227 /* number of keys decoded from the ibf */ 362 se2->stratas[strata_ctr]->strata_count);
228 363
229 /* FIXME: implement this without always allocating new IBFs */ 364
230 diff = ibf_dup (se1->strata[i]); 365 for (int i = se1->stratas[strata_ctr]->strata_count - 1; i >= 0; i--)
231 ibf_subtract (diff,
232 se2->strata[i]);
233 for (int ibf_count = 0; GNUNET_YES; ibf_count++)
234 { 366 {
235 int more; 367 struct InvertibleBloomFilter *diff;
368 /* number of keys decoded from the ibf */
236 369
237 more = ibf_decode (diff, 370 /* FIXME: implement this without always allocating new IBFs */
238 NULL, 371 diff = ibf_dup (se1->stratas[strata_ctr]->strata[i]);
239 NULL); 372 diff->local_decoded_count = 0;
240 if (GNUNET_NO == more) 373 diff->remote_decoded_count = 0;
241 { 374
242 count += ibf_count; 375 ibf_subtract (diff, se2->stratas[strata_ctr]->strata[i]);
243 break; 376
244 } 377 for (int ibf_count = 0; GNUNET_YES; ibf_count++)
245 /* Estimate if decoding fails or would not terminate */
246 if ( (GNUNET_SYSERR == more) ||
247 (ibf_count > diff->size) )
248 { 378 {
249 ibf_destroy (diff); 379 int more;
250 return count * (1 << (i + 1)); 380
381 more = ibf_decode (diff, NULL, NULL);
382 if (GNUNET_NO == more)
383 {
384 se1->stratas[strata_ctr]->strata[0]->local_decoded_count +=
385 diff->local_decoded_count;
386 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count +=
387 diff->remote_decoded_count;
388 break;
389 }
390 /* Estimate if decoding fails or would not terminate */
391 if ((GNUNET_SYSERR == more) || (ibf_count > diff->size))
392 {
393 se1->stratas[strata_ctr]->strata[0]->local_decoded_count =
394 se1->stratas[strata_ctr]->strata[0]->local_decoded_count * (1 << (i
395 +
396 1));
397 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count =
398 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count * (1 << (i
399 +
400 1));
401 ibf_destroy (diff);
402 goto break_all_counting_loops;
403 }
251 } 404 }
405 ibf_destroy (diff);
252 } 406 }
253 ibf_destroy (diff); 407break_all_counting_loops:;
408 avg_local_diff += se1->stratas[strata_ctr]->strata[0]->local_decoded_count;
409 avg_remote_diff +=
410 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count;
254 } 411 }
255 return count; 412 se1->stratas[0]->strata[0]->local_decoded_count = avg_local_diff
413 / number_of_estimators;
414 se1->stratas[0]->strata[0]->remote_decoded_count = avg_remote_diff
415 / number_of_estimators;
256} 416}
257 417
258 418
@@ -262,18 +422,28 @@ strata_estimator_difference (const struct StrataEstimator *se1,
262 * @param se the strata estimator to copy 422 * @param se the strata estimator to copy
263 * @return the copy 423 * @return the copy
264 */ 424 */
265struct StrataEstimator * 425struct MultiStrataEstimator *
266strata_estimator_dup (struct StrataEstimator *se) 426strata_estimator_dup (struct MultiStrataEstimator *se)
267{ 427{
268 struct StrataEstimator *c; 428 struct MultiStrataEstimator *c;
269 429 unsigned int i;
270 c = GNUNET_new (struct StrataEstimator); 430
271 c->strata_count = se->strata_count; 431 c = GNUNET_new (struct MultiStrataEstimator);
272 c->ibf_size = se->ibf_size; 432 c->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *);
273 c->strata = GNUNET_new_array (se->strata_count, 433 for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
274 struct InvertibleBloomFilter *); 434 {
275 for (unsigned int i = 0; i < se->strata_count; i++) 435 c->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator);
276 c->strata[i] = ibf_dup (se->strata[i]); 436 c->stratas[strata_ctr]->strata_count =
437 se->stratas[strata_ctr]->strata_count;
438 c->stratas[strata_ctr]->ibf_size = se->stratas[strata_ctr]->ibf_size;
439 c->stratas[strata_ctr]->strata = GNUNET_new_array (
440 se->stratas[strata_ctr]->strata_count,
441 struct
442 InvertibleBloomFilter *);
443 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
444 c->stratas[strata_ctr]->strata[i] = ibf_dup (
445 se->stratas[strata_ctr]->strata[i]);
446 }
277 return c; 447 return c;
278} 448}
279 449
@@ -284,10 +454,14 @@ strata_estimator_dup (struct StrataEstimator *se)
284 * @param se strata estimator to destroy. 454 * @param se strata estimator to destroy.
285 */ 455 */
286void 456void
287strata_estimator_destroy (struct StrataEstimator *se) 457strata_estimator_destroy (struct MultiStrataEstimator *se)
288{ 458{
289 for (unsigned int i = 0; i < se->strata_count; i++) 459 unsigned int i;
290 ibf_destroy (se->strata[i]); 460 for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
291 GNUNET_free (se->strata); 461 {
462 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
463 ibf_destroy (se->stratas[strata_ctr]->strata[i]);
464 GNUNET_free (se->stratas[strata_ctr]->strata);
465 }
292 GNUNET_free (se); 466 GNUNET_free (se);
293} 467}
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h
index afdbcdbbf..4871a7fcd 100644
--- a/src/setu/gnunet-service-setu_strata_estimator.h
+++ b/src/setu/gnunet-service-setu_strata_estimator.h
@@ -22,6 +22,7 @@
22 * @file set/gnunet-service-setu_strata_estimator.h 22 * @file set/gnunet-service-setu_strata_estimator.h
23 * @brief estimator of set difference 23 * @brief estimator of set difference
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26 27
27#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H 28#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
@@ -61,6 +62,31 @@ struct StrataEstimator
61 unsigned int ibf_size; 62 unsigned int ibf_size;
62}; 63};
63 64
65struct MultiStrataEstimator
66{
67 /**
68 * Array of strata estimators
69 */
70 struct StrataEstimator **stratas;
71
72 /**
73 * Number of strata estimators in struct
74 */
75 uint8_t size;
76
77};
78
79/**
80 * Deteminate how many strata estimators in the message are necessary
81 * @param avg_element_size
82 * @param element_count
83 * @return number of strata's
84 */
85
86uint8_t
87determine_strata_count (uint64_t avg_element_size,
88 uint64_t element_count);
89
64 90
65/** 91/**
66 * Write the given strata estimator to the buffer. 92 * Write the given strata estimator to the buffer.
@@ -70,7 +96,9 @@ struct StrataEstimator
70 * @return number of bytes written to @a buf 96 * @return number of bytes written to @a buf
71 */ 97 */
72size_t 98size_t
73strata_estimator_write (const struct StrataEstimator *se, 99strata_estimator_write (struct MultiStrataEstimator *se,
100 uint16_t se_ibf_total_size,
101 uint8_t number_se_send,
74 void *buf); 102 void *buf);
75 103
76 104
@@ -88,7 +116,9 @@ int
88strata_estimator_read (const void *buf, 116strata_estimator_read (const void *buf,
89 size_t buf_len, 117 size_t buf_len,
90 int is_compressed, 118 int is_compressed,
91 struct StrataEstimator *se); 119 uint8_t number_se_received,
120 uint16_t se_ibf_total_size,
121 struct MultiStrataEstimator *se);
92 122
93 123
94/** 124/**
@@ -99,7 +129,7 @@ strata_estimator_read (const void *buf,
99 * @param ibf_hashnum hashnum parameter of each ibf 129 * @param ibf_hashnum hashnum parameter of each ibf
100 * @return a freshly allocated, empty strata estimator, NULL on error 130 * @return a freshly allocated, empty strata estimator, NULL on error
101 */ 131 */
102struct StrataEstimator * 132struct MultiStrataEstimator *
103strata_estimator_create (unsigned int strata_count, 133strata_estimator_create (unsigned int strata_count,
104 uint32_t ibf_size, 134 uint32_t ibf_size,
105 uint8_t ibf_hashnum); 135 uint8_t ibf_hashnum);
@@ -111,11 +141,11 @@ strata_estimator_create (unsigned int strata_count,
111 * 141 *
112 * @param se1 first strata estimator 142 * @param se1 first strata estimator
113 * @param se2 second strata estimator 143 * @param se2 second strata estimator
114 * @return abs(|se1| - |se2|) 144 * @return nothing
115 */ 145 */
116unsigned int 146void
117strata_estimator_difference (const struct StrataEstimator *se1, 147strata_estimator_difference (const struct MultiStrataEstimator *se1,
118 const struct StrataEstimator *se2); 148 const struct MultiStrataEstimator *se2);
119 149
120 150
121/** 151/**
@@ -125,7 +155,7 @@ strata_estimator_difference (const struct StrataEstimator *se1,
125 * @param key key to add 155 * @param key key to add
126 */ 156 */
127void 157void
128strata_estimator_insert (struct StrataEstimator *se, 158strata_estimator_insert (struct MultiStrataEstimator *se,
129 struct IBF_Key key); 159 struct IBF_Key key);
130 160
131 161
@@ -136,7 +166,7 @@ strata_estimator_insert (struct StrataEstimator *se,
136 * @param key key to remove 166 * @param key key to remove
137 */ 167 */
138void 168void
139strata_estimator_remove (struct StrataEstimator *se, 169strata_estimator_remove (struct MultiStrataEstimator *se,
140 struct IBF_Key key); 170 struct IBF_Key key);
141 171
142 172
@@ -146,7 +176,7 @@ strata_estimator_remove (struct StrataEstimator *se,
146 * @param se strata estimator to destroy. 176 * @param se strata estimator to destroy.
147 */ 177 */
148void 178void
149strata_estimator_destroy (struct StrataEstimator *se); 179strata_estimator_destroy (struct MultiStrataEstimator *se);
150 180
151 181
152/** 182/**
@@ -155,8 +185,8 @@ strata_estimator_destroy (struct StrataEstimator *se);
155 * @param se the strata estimator to copy 185 * @param se the strata estimator to copy
156 * @return the copy 186 * @return the copy
157 */ 187 */
158struct StrataEstimator * 188struct MultiStrataEstimator *
159strata_estimator_dup (struct StrataEstimator *se); 189strata_estimator_dup (struct MultiStrataEstimator *se);
160 190
161 191
162#if 0 /* keep Emacsens' auto-indent happy */ 192#if 0 /* keep Emacsens' auto-indent happy */
diff --git a/src/setu/ibf.c b/src/setu/ibf.c
index 1beba9065..dbd23c320 100644
--- a/src/setu/ibf.c
+++ b/src/setu/ibf.c
@@ -20,11 +20,15 @@
20 20
21/** 21/**
22 * @file set/ibf.c 22 * @file set/ibf.c
23 * @brief implementation of the invertible Bloom filter 23 * @brief implementation of the invertible bloom filter
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26 27
27#include "ibf.h" 28#include "ibf.h"
29#include "gnunet_util_lib.h"
30#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
31
28 32
29/** 33/**
30 * Compute the key's hash from the key. 34 * Compute the key's hash from the key.
@@ -58,11 +62,12 @@ ibf_hashcode_from_key (struct IBF_Key key,
58 struct GNUNET_HashCode *dst) 62 struct GNUNET_HashCode *dst)
59{ 63{
60 struct IBF_Key *p; 64 struct IBF_Key *p;
65 unsigned int i;
61 const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode) 66 const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode)
62 / sizeof(struct IBF_Key); 67 / sizeof(struct IBF_Key);
63 68
64 p = (struct IBF_Key *) dst; 69 p = (struct IBF_Key *) dst;
65 for (unsigned int i = 0; i < keys_per_hashcode; i++) 70 for (i = 0; i < keys_per_hashcode; i++)
66 *p++ = key; 71 *p++ = key;
67} 72}
68 73
@@ -75,14 +80,14 @@ ibf_hashcode_from_key (struct IBF_Key key,
75 * @return the newly created invertible bloom filter, NULL on error 80 * @return the newly created invertible bloom filter, NULL on error
76 */ 81 */
77struct InvertibleBloomFilter * 82struct InvertibleBloomFilter *
78ibf_create (uint32_t size, 83ibf_create (uint32_t size, uint8_t hash_num)
79 uint8_t hash_num)
80{ 84{
81 struct InvertibleBloomFilter *ibf; 85 struct InvertibleBloomFilter *ibf;
82 86
83 GNUNET_assert (0 != size); 87 GNUNET_assert (0 != size);
88
84 ibf = GNUNET_new (struct InvertibleBloomFilter); 89 ibf = GNUNET_new (struct InvertibleBloomFilter);
85 ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t)); 90 ibf->count = GNUNET_malloc_large (size * sizeof(uint64_t));
86 if (NULL == ibf->count) 91 if (NULL == ibf->count)
87 { 92 {
88 GNUNET_free (ibf); 93 GNUNET_free (ibf);
@@ -105,6 +110,7 @@ ibf_create (uint32_t size,
105 } 110 }
106 ibf->size = size; 111 ibf->size = size;
107 ibf->hash_num = hash_num; 112 ibf->hash_num = hash_num;
113
108 return ibf; 114 return ibf;
109} 115}
110 116
@@ -121,8 +127,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
121 uint32_t i; 127 uint32_t i;
122 uint32_t bucket; 128 uint32_t bucket;
123 129
124 bucket = GNUNET_CRYPTO_crc32_n (&key, 130 bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key);
125 sizeof (key));
126 for (i = 0, filled = 0; filled < ibf->hash_num; i++) 131 for (i = 0, filled = 0; filled < ibf->hash_num; i++)
127 { 132 {
128 uint64_t x; 133 uint64_t x;
@@ -133,8 +138,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
133 dst[filled++] = bucket % ibf->size; 138 dst[filled++] = bucket % ibf->size;
134try_next: 139try_next:
135 x = ((uint64_t) bucket << 32) | i; 140 x = ((uint64_t) bucket << 32) | i;
136 bucket = GNUNET_CRYPTO_crc32_n (&x, 141 bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
137 sizeof (x));
138 } 142 }
139} 143}
140 144
@@ -170,13 +174,8 @@ ibf_insert (struct InvertibleBloomFilter *ibf,
170 int buckets[ibf->hash_num]; 174 int buckets[ibf->hash_num];
171 175
172 GNUNET_assert (ibf->hash_num <= ibf->size); 176 GNUNET_assert (ibf->hash_num <= ibf->size);
173 ibf_get_indices (ibf, 177 ibf_get_indices (ibf, key, buckets);
174 key, 178 ibf_insert_into (ibf, key, buckets, 1);
175 buckets);
176 ibf_insert_into (ibf,
177 key,
178 buckets,
179 1);
180} 179}
181 180
182 181
@@ -193,13 +192,8 @@ ibf_remove (struct InvertibleBloomFilter *ibf,
193 int buckets[ibf->hash_num]; 192 int buckets[ibf->hash_num];
194 193
195 GNUNET_assert (ibf->hash_num <= ibf->size); 194 GNUNET_assert (ibf->hash_num <= ibf->size);
196 ibf_get_indices (ibf, 195 ibf_get_indices (ibf, key, buckets);
197 key, 196 ibf_insert_into (ibf, key, buckets, -1);
198 buckets);
199 ibf_insert_into (ibf,
200 key,
201 buckets,
202 -1);
203} 197}
204 198
205 199
@@ -244,6 +238,8 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
244 238
245 for (uint32_t i = 0; i < ibf->size; i++) 239 for (uint32_t i = 0; i < ibf->size; i++)
246 { 240 {
241 int hit;
242
247 /* we can only decode from pure buckets */ 243 /* we can only decode from pure buckets */
248 if ( (1 != ibf->count[i].count_val) && 244 if ( (1 != ibf->count[i].count_val) &&
249 (-1 != ibf->count[i].count_val) ) 245 (-1 != ibf->count[i].count_val) )
@@ -257,30 +253,33 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
257 253
258 /* test if key in bucket hits its own location, 254 /* test if key in bucket hits its own location,
259 * if not, the key hash was subject to collision */ 255 * if not, the key hash was subject to collision */
260 { 256 hit = GNUNET_NO;
261 bool hit = false; 257 ibf_get_indices (ibf, ibf->key_sum[i], buckets);
258 for (int j = 0; j < ibf->hash_num; j++)
259 if (buckets[j] == i)
260 hit = GNUNET_YES;
262 261
263 ibf_get_indices (ibf, 262 if (GNUNET_NO == hit)
264 ibf->key_sum[i], 263 continue;
265 buckets); 264
266 for (int j = 0; j < ibf->hash_num; j++) 265 if (1 == ibf->count[i].count_val)
267 if (buckets[j] == i) 266 {
268 { 267 ibf->remote_decoded_count++;
269 hit = true;
270 break;
271 }
272 if (! hit)
273 continue;
274 } 268 }
269 else
270 {
271 ibf->local_decoded_count++;
272 }
273
274
275 if (NULL != ret_side) 275 if (NULL != ret_side)
276 *ret_side = ibf->count[i].count_val; 276 *ret_side = ibf->count[i].count_val;
277 if (NULL != ret_id) 277 if (NULL != ret_id)
278 *ret_id = ibf->key_sum[i]; 278 *ret_id = ibf->key_sum[i];
279 279
280 /* insert on the opposite side, effectively removing the element */ 280 /* insert on the opposite side, effectively removing the element */
281 ibf_insert_into (ibf, 281 ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
282 ibf->key_sum[i], buckets, 282
283 -ibf->count[i].count_val);
284 return GNUNET_YES; 283 return GNUNET_YES;
285 } 284 }
286 285
@@ -291,6 +290,26 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
291 290
292 291
293/** 292/**
293 * Returns the minimal bytes needed to store the counter of the IBF
294 *
295 * @param ibf the IBF
296 */
297uint8_t
298ibf_get_max_counter (struct InvertibleBloomFilter *ibf)
299{
300 long long max_counter = 0;
301 for (uint64_t i = 0; i < ibf->size; i++)
302 {
303 if (ibf->count[i].count_val > max_counter)
304 {
305 max_counter = ibf->count[i].count_val;
306 }
307 }
308 return 64 - __builtin_clzll (max_counter);
309}
310
311
312/**
294 * Write buckets from an ibf to a buffer. 313 * Write buckets from an ibf to a buffer.
295 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. 314 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
296 * 315 *
@@ -298,16 +317,17 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
298 * @param start with which bucket to start 317 * @param start with which bucket to start
299 * @param count how many buckets to write 318 * @param count how many buckets to write
300 * @param buf buffer to write the data to 319 * @param buf buffer to write the data to
320 * @param max bit length of a counter for unpacking
301 */ 321 */
302void 322void
303ibf_write_slice (const struct InvertibleBloomFilter *ibf, 323ibf_write_slice (const struct InvertibleBloomFilter *ibf,
304 uint32_t start, 324 uint32_t start,
305 uint32_t count, 325 uint64_t count,
306 void *buf) 326 void *buf,
327 uint8_t counter_max_length)
307{ 328{
308 struct IBF_Key *key_dst; 329 struct IBF_Key *key_dst;
309 struct IBF_KeyHash *key_hash_dst; 330 struct IBF_KeyHash *key_hash_dst;
310 struct IBF_Count *count_dst;
311 331
312 GNUNET_assert (start + count <= ibf->size); 332 GNUNET_assert (start + count <= ibf->size);
313 333
@@ -315,19 +335,178 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
315 key_dst = (struct IBF_Key *) buf; 335 key_dst = (struct IBF_Key *) buf;
316 GNUNET_memcpy (key_dst, 336 GNUNET_memcpy (key_dst,
317 ibf->key_sum + start, 337 ibf->key_sum + start,
318 count * sizeof *key_dst); 338 count * sizeof(*key_dst));
319 key_dst += count; 339 key_dst += count;
320 /* copy key hashes */ 340 /* copy key hashes */
321 key_hash_dst = (struct IBF_KeyHash *) key_dst; 341 key_hash_dst = (struct IBF_KeyHash *) key_dst;
322 GNUNET_memcpy (key_hash_dst, 342 GNUNET_memcpy (key_hash_dst,
323 ibf->key_hash_sum + start, 343 ibf->key_hash_sum + start,
324 count * sizeof *key_hash_dst); 344 count * sizeof(*key_hash_dst));
325 key_hash_dst += count; 345 key_hash_dst += count;
326 /* copy counts */ 346
327 count_dst = (struct IBF_Count *) key_hash_dst; 347 /* pack and copy counter */
328 GNUNET_memcpy (count_dst, 348 pack_counter (ibf,
329 ibf->count + start, 349 start,
330 count * sizeof *count_dst); 350 count,
351 (uint8_t *) key_hash_dst,
352 counter_max_length);
353
354
355}
356
357
358/**
359 * Packs the counter to transmit only the smallest possible amount of bytes and
360 * preventing overflow of the counter
361 * @param ibf the ibf to write
362 * @param start with which bucket to start
363 * @param count how many buckets to write
364 * @param buf buffer to write the data to
365 * @param max bit length of a counter for unpacking
366 */
367
368void
369pack_counter (const struct InvertibleBloomFilter *ibf,
370 uint32_t start,
371 uint64_t count,
372 uint8_t *buf,
373 uint8_t counter_max_length)
374{
375 uint8_t store_size = 0;
376 uint8_t store = 0;
377 uint16_t byte_ctr = 0;
378
379 /**
380 * Iterate over IBF bucket
381 */
382 for (uint64_t i = start; i< (count + start);)
383 {
384 uint64_t count_val_to_write = ibf->count[i].count_val;
385 uint8_t count_len_to_write = counter_max_length;
386
387 /**
388 * Pack and compose counters to byte values
389 */
390 while ((count_len_to_write + store_size) >= 8)
391 {
392 uint8_t bit_shift = 0;
393
394 /**
395 * Shift bits if more than a byte has to be written
396 * or the store size is not empty
397 */
398 if ((store_size > 0) || (count_len_to_write > 8))
399 {
400 uint8_t bit_unused = 8 - store_size;
401 bit_shift = count_len_to_write - bit_unused;
402 store = store << bit_unused;
403 }
404
405 buf[byte_ctr] = ((count_val_to_write >> bit_shift) | store) & 0xFF;
406 byte_ctr++;
407 count_len_to_write -= (8 - store_size);
408 count_val_to_write = count_val_to_write & ((1ULL <<
409 count_len_to_write) - 1);
410 store = 0;
411 store_size = 0;
412 }
413 store = (store << count_len_to_write) | count_val_to_write;
414 store_size = store_size + count_len_to_write;
415 count_len_to_write = 0;
416 i++;
417 }
418
419 /**
420 * Pack data left in story before finishing
421 */
422 if (store_size > 0)
423 {
424 buf[byte_ctr] = store << (8 - store_size);
425 byte_ctr++;
426 }
427
428}
429
430
431/**
432 * Unpacks the counter to transmit only the smallest possible amount of bytes and
433 * preventing overflow of the counter
434 * @param ibf the ibf to write
435 * @param start with which bucket to start
436 * @param count how many buckets to write
437 * @param buf buffer to write the data to
438 * @param max bit length of a counter for unpacking
439 */
440
441void
442unpack_counter (const struct InvertibleBloomFilter *ibf,
443 uint32_t start,
444 uint64_t count,
445 uint8_t *buf,
446 uint8_t counter_max_length)
447{
448 uint64_t ibf_counter_ctr = 0;
449 uint64_t store = 0;
450 uint64_t store_bit_ctr = 0;
451 uint64_t byte_ctr = 0;
452
453 /**
454 * Iterate over received bytes
455 */
456 while (true)
457 {
458 uint8_t byte_read = buf[byte_ctr];
459 uint8_t bit_to_read_left = 8;
460 byte_ctr++;
461
462 /**
463 * Pack data left in story before finishing
464 */
465 while (true)
466 {
467 /**
468 * Stop decoding when end is reached
469 */
470 if (ibf_counter_ctr > (count - 1))
471 return;
472
473 /*
474 * Unpack the counter
475 */
476 if ((store_bit_ctr + bit_to_read_left) >= counter_max_length)
477 {
478 uint8_t bytes_used = counter_max_length - store_bit_ctr;
479 if (store_bit_ctr > 0)
480 {
481 store = store << bytes_used;
482 }
483
484 uint8_t bytes_to_shift = bit_to_read_left - bytes_used;
485 uint64_t counter_part = byte_read >> bytes_to_shift;
486 store = store | counter_part;
487 ibf->count[ibf_counter_ctr + start].count_val = store;
488 byte_read = byte_read & ((1 << bytes_to_shift) - 1);
489 bit_to_read_left -= bytes_used;
490 ibf_counter_ctr++;
491 store = 0;
492 store_bit_ctr = 0;
493 }
494 else
495 {
496 store_bit_ctr += bit_to_read_left;
497 if (0 == store)
498 {
499 store = byte_read;
500 }
501 else
502 {
503 store = store << bit_to_read_left;
504 store = store | byte_read;
505 }
506 break;
507 }
508 }
509 }
331} 510}
332 511
333 512
@@ -338,12 +517,14 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
338 * @param start which bucket to start at 517 * @param start which bucket to start at
339 * @param count how many buckets to read 518 * @param count how many buckets to read
340 * @param ibf the ibf to read from 519 * @param ibf the ibf to read from
520 * @param max bit length of a counter for unpacking
341 */ 521 */
342void 522void
343ibf_read_slice (const void *buf, 523ibf_read_slice (const void *buf,
344 uint32_t start, 524 uint32_t start,
345 uint32_t count, 525 uint64_t count,
346 struct InvertibleBloomFilter *ibf) 526 struct InvertibleBloomFilter *ibf,
527 uint8_t counter_max_length)
347{ 528{
348 struct IBF_Key *key_src; 529 struct IBF_Key *key_src;
349 struct IBF_KeyHash *key_hash_src; 530 struct IBF_KeyHash *key_hash_src;
@@ -364,11 +545,10 @@ ibf_read_slice (const void *buf,
364 key_hash_src, 545 key_hash_src,
365 count * sizeof *key_hash_src); 546 count * sizeof *key_hash_src);
366 key_hash_src += count; 547 key_hash_src += count;
367 /* copy counts */ 548
549 /* copy and unpack counts */
368 count_src = (struct IBF_Count *) key_hash_src; 550 count_src = (struct IBF_Count *) key_hash_src;
369 GNUNET_memcpy (ibf->count + start, 551 unpack_counter (ibf,start,count,(uint8_t *) count_src,counter_max_length);
370 count_src,
371 count * sizeof *count_src);
372} 552}
373 553
374 554
diff --git a/src/setu/ibf.h b/src/setu/ibf.h
index 7c2ab33b1..5628405dc 100644
--- a/src/setu/ibf.h
+++ b/src/setu/ibf.h
@@ -22,6 +22,7 @@
22 * @file set/ibf.h 22 * @file set/ibf.h
23 * @brief invertible bloom filter 23 * @brief invertible bloom filter
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26 27
27#ifndef GNUNET_CONSENSUS_IBF_H 28#ifndef GNUNET_CONSENSUS_IBF_H
@@ -62,7 +63,7 @@ struct IBF_KeyHash
62 */ 63 */
63struct IBF_Count 64struct IBF_Count
64{ 65{
65 int8_t count_val; 66 int64_t count_val;
66}; 67};
67 68
68 69
@@ -93,6 +94,20 @@ struct InvertibleBloomFilter
93 uint8_t hash_num; 94 uint8_t hash_num;
94 95
95 /** 96 /**
97 * If an IBF is decoded this count stores how many
98 * elements are on the local site. This is used
99 * to estimate the set difference on a site
100 */
101 int local_decoded_count;
102
103 /**
104 * If an IBF is decoded this count stores how many
105 * elements are on the remote site. This is used
106 * to estimate the set difference on a site
107 */
108 int remote_decoded_count;
109
110 /**
96 * Xor sums of the elements' keys, used to identify the elements. 111 * Xor sums of the elements' keys, used to identify the elements.
97 * Array of 'size' elements. 112 * Array of 'size' elements.
98 */ 113 */
@@ -125,8 +140,9 @@ struct InvertibleBloomFilter
125void 140void
126ibf_write_slice (const struct InvertibleBloomFilter *ibf, 141ibf_write_slice (const struct InvertibleBloomFilter *ibf,
127 uint32_t start, 142 uint32_t start,
128 uint32_t count, 143 uint64_t count,
129 void *buf); 144 void *buf,
145 uint8_t counter_max_length);
130 146
131 147
132/** 148/**
@@ -140,8 +156,9 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
140void 156void
141ibf_read_slice (const void *buf, 157ibf_read_slice (const void *buf,
142 uint32_t start, 158 uint32_t start,
143 uint32_t count, 159 uint64_t count,
144 struct InvertibleBloomFilter *ibf); 160 struct InvertibleBloomFilter *ibf,
161 uint8_t counter_max_length);
145 162
146 163
147/** 164/**
@@ -244,6 +261,44 @@ ibf_dup (const struct InvertibleBloomFilter *ibf);
244void 261void
245ibf_destroy (struct InvertibleBloomFilter *ibf); 262ibf_destroy (struct InvertibleBloomFilter *ibf);
246 263
264uint8_t
265ibf_get_max_counter (struct InvertibleBloomFilter *ibf);
266
267
268/**
269 * Packs the counter to transmit only the smallest possible amount of bytes and
270 * preventing overflow of the counter
271 * @param ibf the ibf to write
272 * @param start with which bucket to start
273 * @param count how many buckets to write
274 * @param buf buffer to write the data to
275 * @param max bit length of a counter for unpacking
276 */
277
278void
279pack_counter (const struct InvertibleBloomFilter *ibf,
280 uint32_t start,
281 uint64_t count,
282 uint8_t *buf,
283 uint8_t counter_max_length);
284
285/**
286 * Unpacks the counter to transmit only the smallest possible amount of bytes and
287 * preventing overflow of the counter
288 * @param ibf the ibf to write
289 * @param start with which bucket to start
290 * @param count how many buckets to write
291 * @param buf buffer to write the data to
292 * @param max bit length of a counter for unpacking
293 */
294
295void
296unpack_counter (const struct InvertibleBloomFilter *ibf,
297 uint32_t start,
298 uint64_t count,
299 uint8_t *buf,
300 uint8_t counter_max_length);
301
247 302
248#if 0 /* keep Emacsens' auto-indent happy */ 303#if 0 /* keep Emacsens' auto-indent happy */
249{ 304{
diff --git a/src/setu/perf_setu_api.c b/src/setu/perf_setu_api.c
index b273f9c71..7f4d64f74 100644
--- a/src/setu/perf_setu_api.c
+++ b/src/setu/perf_setu_api.c
@@ -22,11 +22,14 @@
22 * @file set/test_setu_api.c 22 * @file set/test_setu_api.c
23 * @brief testcase for setu_api.c 23 * @brief testcase for setu_api.c
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
28#include "gnunet_testing_lib.h" 29#include "gnunet_testing_lib.h"
29#include "gnunet_setu_service.h" 30#include "gnunet_setu_service.h"
31#include <sys/sysinfo.h>
32#include <pthread.h>
30 33
31 34
32static struct GNUNET_PeerIdentity local_id; 35static struct GNUNET_PeerIdentity local_id;
@@ -50,6 +53,12 @@ static int ret;
50static struct GNUNET_SCHEDULER_Task *tt; 53static struct GNUNET_SCHEDULER_Task *tt;
51 54
52 55
56/**
57 * Handles configuration file for setu performance test
58 *
59 */
60static struct GNUNET_CONFIGURATION_Handle *setu_cfg;
61
53 62
54static void 63static void
55result_cb_set1 (void *cls, 64result_cb_set1 (void *cls,
@@ -57,44 +66,44 @@ result_cb_set1 (void *cls,
57 uint64_t size, 66 uint64_t size,
58 enum GNUNET_SETU_Status status) 67 enum GNUNET_SETU_Status status)
59{ 68{
60 switch (status) 69 switch (status)
70 {
71 case GNUNET_SETU_STATUS_ADD_LOCAL:
72 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
73 break;
74
75 case GNUNET_SETU_STATUS_FAILURE:
76 GNUNET_break (0);
77 oh1 = NULL;
78 fprintf (stderr, "set 1: received failure status!\n");
79 ret = 1;
80 if (NULL != tt)
81 {
82 GNUNET_SCHEDULER_cancel (tt);
83 tt = NULL;
84 }
85 GNUNET_SCHEDULER_shutdown ();
86 break;
87
88 case GNUNET_SETU_STATUS_DONE:
89 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
90 oh1 = NULL;
91 if (NULL != set1)
61 { 92 {
62 case GNUNET_SETU_STATUS_ADD_LOCAL: 93 GNUNET_SETU_destroy (set1);
63 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n"); 94 set1 = NULL;
64 break;
65
66 case GNUNET_SETU_STATUS_FAILURE:
67 GNUNET_break (0);
68 oh1 = NULL;
69 fprintf (stderr, "set 1: received failure status!\n");
70 ret = 1;
71 if (NULL != tt)
72 {
73 GNUNET_SCHEDULER_cancel (tt);
74 tt = NULL;
75 }
76 GNUNET_SCHEDULER_shutdown ();
77 break;
78
79 case GNUNET_SETU_STATUS_DONE:
80 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
81 oh1 = NULL;
82 if (NULL != set1)
83 {
84 GNUNET_SETU_destroy (set1);
85 set1 = NULL;
86 }
87 if (NULL == set2)
88 {
89 GNUNET_SCHEDULER_cancel (tt);
90 tt = NULL;
91 GNUNET_SCHEDULER_shutdown ();
92 }
93 break;
94
95 default:
96 GNUNET_assert (0);
97 } 95 }
96 if (NULL == set2)
97 {
98 GNUNET_SCHEDULER_cancel (tt);
99 tt = NULL;
100 GNUNET_SCHEDULER_shutdown ();
101 }
102 break;
103
104 default:
105 GNUNET_assert (0);
106 }
98} 107}
99 108
100 109
@@ -104,36 +113,36 @@ result_cb_set2 (void *cls,
104 uint64_t size, 113 uint64_t size,
105 enum GNUNET_SETU_Status status) 114 enum GNUNET_SETU_Status status)
106{ 115{
107 switch (status) 116 switch (status)
117 {
118 case GNUNET_SETU_STATUS_ADD_LOCAL:
119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
120 break;
121
122 case GNUNET_SETU_STATUS_FAILURE:
123 GNUNET_break (0);
124 oh2 = NULL;
125 fprintf (stderr, "set 2: received failure status\n");
126 GNUNET_SCHEDULER_shutdown ();
127 ret = 1;
128 break;
129
130 case GNUNET_SETU_STATUS_DONE:
131 oh2 = NULL;
132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
133 GNUNET_SETU_destroy (set2);
134 set2 = NULL;
135 if (NULL == set1)
108 { 136 {
109 case GNUNET_SETU_STATUS_ADD_LOCAL: 137 GNUNET_SCHEDULER_cancel (tt);
110 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n"); 138 tt = NULL;
111 break; 139 GNUNET_SCHEDULER_shutdown ();
112
113 case GNUNET_SETU_STATUS_FAILURE:
114 GNUNET_break (0);
115 oh2 = NULL;
116 fprintf (stderr, "set 2: received failure status\n");
117 GNUNET_SCHEDULER_shutdown ();
118 ret = 1;
119 break;
120
121 case GNUNET_SETU_STATUS_DONE:
122 oh2 = NULL;
123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
124 GNUNET_SETU_destroy (set2);
125 set2 = NULL;
126 if (NULL == set1)
127 {
128 GNUNET_SCHEDULER_cancel (tt);
129 tt = NULL;
130 GNUNET_SCHEDULER_shutdown ();
131 }
132 break;
133
134 default:
135 GNUNET_assert (0);
136 } 140 }
141 break;
142
143 default:
144 GNUNET_assert (0);
145 }
137} 146}
138 147
139 148
@@ -143,14 +152,14 @@ listen_cb (void *cls,
143 const struct GNUNET_MessageHeader *context_msg, 152 const struct GNUNET_MessageHeader *context_msg,
144 struct GNUNET_SETU_Request *request) 153 struct GNUNET_SETU_Request *request)
145{ 154{
146 GNUNET_assert (NULL != context_msg); 155 GNUNET_assert (NULL != context_msg);
147 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); 156 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n"); 157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
149 oh2 = GNUNET_SETU_accept (request, 158 oh2 = GNUNET_SETU_accept (request,
150 (struct GNUNET_SETU_Option[]){ 0 }, 159 (struct GNUNET_SETU_Option[]){ 0 },
151 &result_cb_set2, 160 &result_cb_set2,
152 NULL); 161 NULL);
153 GNUNET_SETU_commit (oh2, set2); 162 GNUNET_SETU_commit (oh2, set2);
154} 163}
155 164
156 165
@@ -162,122 +171,89 @@ listen_cb (void *cls,
162static void 171static void
163start (void *cls) 172start (void *cls)
164{ 173{
165 struct GNUNET_MessageHeader context_msg; 174 struct GNUNET_MessageHeader context_msg;
166 175
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n"); 176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
168 context_msg.size = htons (sizeof context_msg); 177 context_msg.size = htons (sizeof context_msg);
169 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); 178 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
170 listen_handle = GNUNET_SETU_listen (config, 179 listen_handle = GNUNET_SETU_listen (config,
171 &app_id, 180 &app_id,
172 &listen_cb, 181 &listen_cb,
173 NULL); 182 NULL);
174 oh1 = GNUNET_SETU_prepare (&local_id, 183 oh1 = GNUNET_SETU_prepare (&local_id,
175 &app_id, 184 &app_id,
176 &context_msg, 185 &context_msg,
177 (struct GNUNET_SETU_Option[]){ 0 }, 186 (struct GNUNET_SETU_Option[]){ 0 },
178 &result_cb_set1, 187 &result_cb_set1,
179 NULL); 188 NULL);
180 GNUNET_SETU_commit (oh1, set1); 189 GNUNET_SETU_commit (oh1, set1);
181} 190}
182 191
183 192
184/** 193/**
185 * Initialize the second set, continue
186 *
187 * @param cls closure, unused
188 */
189static void
190init_set2 (void *cls)
191{
192 struct GNUNET_SETU_Element element;
193
194 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
195
196 element.element_type = 0;
197 element.data = "hello1";
198 element.size = strlen (element.data);
199 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
200 element.data = "quux";
201 element.size = strlen (element.data);
202 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
203 element.data = "baz";
204 element.size = strlen (element.data);
205 GNUNET_SETU_add_element (set2, &element, &start, NULL);
206}
207
208/**
209 * Generate random byte stream 194 * Generate random byte stream
210 */ 195 */
211 196
212unsigned char *gen_rdm_bytestream (size_t num_bytes) 197unsigned char *
198gen_rdm_bytestream (size_t num_bytes)
213{ 199{
214 unsigned char *stream = GNUNET_malloc (num_bytes); 200 unsigned char *stream = GNUNET_malloc (num_bytes);
215 GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); 201 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes);
216 return stream; 202 return stream;
217} 203}
218 204
205
219/** 206/**
220 * Generate random sets 207 * Generate random sets
221 */ 208 */
222 209
223static void 210static void
224initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes) 211initRandomSets (int overlap, int set1_size, int set2_size, int
212 element_size_in_bytes)
225{ 213{
226 struct GNUNET_SETU_Element element; 214 struct GNUNET_SETU_Element element;
227 element.element_type = 0; 215 element.element_type = 0;
228 216
229 // Add elements to both sets 217 // Add elements to both sets
230 for (int i = 0; i < overlap; i++) { 218 for (int i = 0; i < overlap; i++)
231 element.data = gen_rdm_bytestream(element_size_in_bytes); 219 {
232 element.size = element_size_in_bytes; 220 element.data = gen_rdm_bytestream (element_size_in_bytes);
233 GNUNET_SETU_add_element (set1, &element, NULL, NULL); 221 element.size = element_size_in_bytes;
234 GNUNET_SETU_add_element (set2, &element, NULL, NULL); 222 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
235 set1_size--; 223 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
236 set2_size--; 224 set1_size--;
237 } 225 set2_size--;
238 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); 226 }
239 227 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n");
240 // Add other elements to set 1 228
241 while(set1_size>0) { 229 // Add other elements to set 1
242 element.data = gen_rdm_bytestream(element_size_in_bytes); 230 while (set1_size>0)
243 element.size = element_size_in_bytes; 231 {
244 GNUNET_SETU_add_element (set1, &element, NULL, NULL); 232 element.data = gen_rdm_bytestream (element_size_in_bytes);
245 set1_size--; 233 element.size = element_size_in_bytes;
246 } 234 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
247 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); 235 set1_size--;
248 236 }
249 // Add other elements to set 2 237 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n");
250 while(set2_size > 0) {
251 element.data = gen_rdm_bytestream(element_size_in_bytes);
252 element.size = element_size_in_bytes;
253 238
254 if(set2_size != 1) { 239 // Add other elements to set 2
255 GNUNET_SETU_add_element (set2, &element,NULL, NULL); 240 while (set2_size > 0)
256 } else { 241 {
257 GNUNET_SETU_add_element (set2, &element,&start, NULL); 242 element.data = gen_rdm_bytestream (element_size_in_bytes);
258 } 243 element.size = element_size_in_bytes;
259 244
260 set2_size--; 245 if (set2_size != 1)
246 {
247 GNUNET_SETU_add_element (set2, &element,NULL, NULL);
248 }
249 else
250 {
251 GNUNET_SETU_add_element (set2, &element,&start, NULL);
261 } 252 }
262 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
263}
264
265/**
266 * Initialize the first set, continue.
267 */
268static void
269init_set1 (void)
270{
271 struct GNUNET_SETU_Element element;
272 253
273 element.element_type = 0; 254 set2_size--;
274 element.data = "hello"; 255 }
275 element.size = strlen (element.data); 256 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
276 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
277 element.data = "bar";
278 element.size = strlen (element.data);
279 GNUNET_SETU_add_element (set1, &element, &init_set2, NULL);
280 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
281} 257}
282 258
283 259
@@ -289,10 +265,10 @@ init_set1 (void)
289static void 265static void
290timeout_fail (void *cls) 266timeout_fail (void *cls)
291{ 267{
292 tt = NULL; 268 tt = NULL;
293 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n"); 269 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
294 GNUNET_SCHEDULER_shutdown (); 270 GNUNET_SCHEDULER_shutdown ();
295 ret = 1; 271 ret = 1;
296} 272}
297 273
298 274
@@ -304,36 +280,36 @@ timeout_fail (void *cls)
304static void 280static void
305do_shutdown (void *cls) 281do_shutdown (void *cls)
306{ 282{
307 if (NULL != tt) 283 if (NULL != tt)
308 { 284 {
309 GNUNET_SCHEDULER_cancel (tt); 285 GNUNET_SCHEDULER_cancel (tt);
310 tt = NULL; 286 tt = NULL;
311 } 287 }
312 if (NULL != oh1) 288 if (NULL != oh1)
313 { 289 {
314 GNUNET_SETU_operation_cancel (oh1); 290 GNUNET_SETU_operation_cancel (oh1);
315 oh1 = NULL; 291 oh1 = NULL;
316 } 292 }
317 if (NULL != oh2) 293 if (NULL != oh2)
318 { 294 {
319 GNUNET_SETU_operation_cancel (oh2); 295 GNUNET_SETU_operation_cancel (oh2);
320 oh2 = NULL; 296 oh2 = NULL;
321 } 297 }
322 if (NULL != set1) 298 if (NULL != set1)
323 { 299 {
324 GNUNET_SETU_destroy (set1); 300 GNUNET_SETU_destroy (set1);
325 set1 = NULL; 301 set1 = NULL;
326 } 302 }
327 if (NULL != set2) 303 if (NULL != set2)
328 { 304 {
329 GNUNET_SETU_destroy (set2); 305 GNUNET_SETU_destroy (set2);
330 set2 = NULL; 306 set2 = NULL;
331 } 307 }
332 if (NULL != listen_handle) 308 if (NULL != listen_handle)
333 { 309 {
334 GNUNET_SETU_listen_cancel (listen_handle); 310 GNUNET_SETU_listen_cancel (listen_handle);
335 listen_handle = NULL; 311 listen_handle = NULL;
336 } 312 }
337} 313}
338 314
339 315
@@ -350,79 +326,148 @@ run (void *cls,
350 const struct GNUNET_CONFIGURATION_Handle *cfg, 326 const struct GNUNET_CONFIGURATION_Handle *cfg,
351 struct GNUNET_TESTING_Peer *peer) 327 struct GNUNET_TESTING_Peer *peer)
352{ 328{
353 struct GNUNET_SETU_OperationHandle *my_oh; 329 struct GNUNET_SETU_OperationHandle *my_oh;
354 330
355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
356 "Running preparatory tests\n"); 332 "Running preparatory tests\n");
357 tt = GNUNET_SCHEDULER_add_delayed ( 333 tt = GNUNET_SCHEDULER_add_delayed (
358 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), 334 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
359 &timeout_fail, 335 &timeout_fail,
360 NULL); 336 NULL);
361 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); 337 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
362 338
363 config = cfg; 339 config = cfg;
364 GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg, 340 GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
365 &local_id)); 341 &local_id));
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 "my id (from CRYPTO): %s\n", 343 "my id (from CRYPTO): %s\n",
368 GNUNET_i2s (&local_id)); 344 GNUNET_i2s (&local_id));
369 GNUNET_TESTING_peer_get_identity (peer, 345 GNUNET_TESTING_peer_get_identity (peer,
370 &local_id); 346 &local_id);
371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
372 "my id (from TESTING): %s\n", 348 "my id (from TESTING): %s\n",
373 GNUNET_i2s (&local_id)); 349 GNUNET_i2s (&local_id));
374 set1 = GNUNET_SETU_create (cfg); 350 set1 = GNUNET_SETU_create (cfg);
375 set2 = GNUNET_SETU_create (cfg); 351 set2 = GNUNET_SETU_create (cfg);
376 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377 "Created sets %p and %p for union operation\n", 353 "Created sets %p and %p for union operation\n",
378 set1, 354 set1,
379 set2); 355 set2);
380 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); 356 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
381 357
382 /* test if canceling an uncommitted request works! */ 358 /* test if canceling an uncommitted request works! */
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "Launching and instantly stopping set operation\n"); 360 "Launching and instantly stopping set operation\n");
385 my_oh = GNUNET_SETU_prepare (&local_id, 361 my_oh = GNUNET_SETU_prepare (&local_id,
386 &app_id, 362 &app_id,
387 NULL, 363 NULL,
388 (struct GNUNET_SETU_Option[]){ 0 }, 364 (struct GNUNET_SETU_Option[]){ 0 },
389 NULL, 365 NULL,
390 NULL); 366 NULL);
391 GNUNET_SETU_operation_cancel (my_oh); 367 GNUNET_SETU_operation_cancel (my_oh);
392 368
393 /* test the real set reconciliation */ 369 /* test the real set reconciliation */
394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395 "Running real set-reconciliation\n"); 371 "Running real set-reconciliation\n");
396 //init_set1 (); 372 // init_set1 ();
397 // limit ~23800 element total 373 // limit ~23800 element total
398 initRandomSets(50,100,100,128); 374 initRandomSets (490, 500,500,32);
399} 375}
400 376
401static void execute_perf() 377
378void
379perf_thread ()
402{ 380{
403 for( int repeat_ctr = 0; repeat_ctr<1; repeat_ctr++ ) { 381 GNUNET_TESTING_service_run ("perf_setu_api",
382 "arm",
383 "test_setu.conf",
384 &run,
385 NULL);
386
387}
404 388
405 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
406 "Executing perf round %d\n", repeat_ctr);
407 389
408 GNUNET_TESTING_service_run ("perf_setu_api", 390static void
409 "arm", 391run_petf_thread (int total_runs)
410 "test_setu.conf", 392{
411 &run, 393 int core_count = get_nprocs_conf ();
412 NULL); 394 pid_t child_pid, wpid;
395 int status = 0;
396
397// Father code (before child processes start)
398 for (int processed = 0; processed < total_runs;)
399 {
400 for (int id = 0; id < core_count; id++)
401 {
402 if (processed >= total_runs)
403 break;
404
405 if ((child_pid = fork ()) == 0)
406 {
407 perf_thread ();
408 exit (0);
409 }
410 processed += 1;
413 } 411 }
414 return 0; 412 while ((wpid = wait (&status)) > 0)
413 ;
414
415 }
415} 416}
416 417
417 418
419static void
420execute_perf ()
421{
422
423 /**
424 * Erase statfile
425 */
426 remove ("perf_stats.csv");
427 remove ("perf_failure_bucket_number_factor.csv");
428 for (int out_out_ctr = 3; out_out_ctr <= 3; out_out_ctr++)
429 {
430
431 for (int out_ctr = 20; out_ctr <= 20; out_ctr++)
432 {
433 float base = 0.1;
434 float x = out_ctr * base;
435 char factor[10];
436 char *buffer = gcvt (x, 4, factor);
437 setu_cfg = GNUNET_CONFIGURATION_create ();
438 GNUNET_CONFIGURATION_set_value_string (setu_cfg, "IBF",
439 "BUCKET_NUMBER_FACTOR",
440 buffer); // Factor default=4
441 GNUNET_CONFIGURATION_set_value_number (setu_cfg, "IBF",
442 "NUMBER_PER_BUCKET", 3); // K default=4
443 GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE",
444 "TRADEOFF", "2"); // default=0.25
445 GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE",
446 "MAX_SET_DIFF_FACTOR_DIFFERENTIAL",
447 "20000"); // default=0.25
448 GNUNET_CONFIGURATION_set_value_number (setu_cfg, "BOUNDARIES",
449 "UPPER_ELEMENT", 5000);
450
451
452 if (GNUNET_OK != GNUNET_CONFIGURATION_write (setu_cfg, "perf_setu.conf"))
453 GNUNET_log (
454 GNUNET_ERROR_TYPE_ERROR,
455 _ ("Failed to write subsystem default identifier map'.\n"));
456 run_petf_thread (100);
457 }
458
459 }
460 return;
461}
462
418 463
419int 464int
420main (int argc, char **argv) 465main (int argc, char **argv)
421{ 466{
422 GNUNET_log_setup ("perf_setu_api",
423 "WARNING",
424 NULL);
425 467
426 execute_perf(); 468 GNUNET_log_setup ("perf_setu_api",
427 return 0; 469 "WARNING",
470 NULL);
471 execute_perf ();
472 return 0;
428} 473}
diff --git a/src/setu/setu.h b/src/setu/setu.h
index 7c2a98a02..7b606f12c 100644
--- a/src/setu/setu.h
+++ b/src/setu/setu.h
@@ -122,6 +122,31 @@ struct GNUNET_SETU_AcceptMessage
122 */ 122 */
123 uint32_t byzantine_lower_bound; 123 uint32_t byzantine_lower_bound;
124 124
125
126 /**
127 * Upper bound for the set size, used only when
128 * byzantine mode is enabled.
129 */
130 uint64_t byzantine_upper_bond;
131
132 /**
133 * Bandwidth latency tradeoff determines how much bytes a single RTT is
134 * worth, which is a performance setting
135 */
136 uint64_t bandwidth_latency_tradeoff;
137
138 /**
139 * The factor determines the number of buckets an IBF has which is
140 * multiplied by the estimated setsize default: 2
141 */
142 uint64_t ibf_bucket_number_factor;
143
144 /**
145 * This setting determines to how many IBF buckets an single elements
146 * is mapped to.
147 */
148 uint64_t ibf_number_of_buckets_per_element;
149
125}; 150};
126 151
127 152
@@ -226,6 +251,30 @@ struct GNUNET_SETU_EvaluateMessage
226 */ 251 */
227 uint32_t byzantine_lower_bound; 252 uint32_t byzantine_lower_bound;
228 253
254 /**
255 * Upper bound for the set size, used only when
256 * byzantine mode is enabled.
257 */
258 uint64_t byzantine_upper_bond;
259
260 /**
261 * Bandwidth latency tradeoff determines how much bytes a single RTT is
262 * worth, which is a performance setting
263 */
264 uint64_t bandwidth_latency_tradeoff;
265
266 /**
267 * The factor determines the number of buckets an IBF has which is
268 * multiplied by the estimated setsize default: 2
269 */
270 uint64_t ibf_bucket_number_factor;
271
272 /**
273 * This setting determines to how many IBF buckets an single elements
274 * is mapped to.
275 */
276 uint64_t ibf_number_of_buckets_per_element;
277
229 /* rest: context message, that is, application-specific 278 /* rest: context message, that is, application-specific
230 message to convince listener to pick up */ 279 message to convince listener to pick up */
231}; 280};
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c
index 0a09b18b2..faa57aaba 100644
--- a/src/setu/setu_api.c
+++ b/src/setu/setu_api.c
@@ -22,6 +22,7 @@
22 * @brief api for the set union service 22 * @brief api for the set union service
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -526,6 +527,14 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
526 context_msg); 527 context_msg);
527 msg->app_id = *app_id; 528 msg->app_id = *app_id;
528 msg->target_peer = *other_peer; 529 msg->target_peer = *other_peer;
530
531 /* Set default values */
532 msg->byzantine_upper_bond = UINT64_MAX;
533 msg->bandwidth_latency_tradeoff = 0;
534 msg->ibf_bucket_number_factor = 2;
535 msg->ibf_number_of_buckets_per_element = 3;
536
537
529 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) 538 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
530 { 539 {
531 switch (opt->type) 540 switch (opt->type)
@@ -534,6 +543,18 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
534 msg->byzantine = GNUNET_YES; 543 msg->byzantine = GNUNET_YES;
535 msg->byzantine_lower_bound = htonl (opt->v.num); 544 msg->byzantine_lower_bound = htonl (opt->v.num);
536 break; 545 break;
546 case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND:
547 msg->byzantine_upper_bond = htonl (opt->v.num);
548 break;
549 case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF:
550 msg->bandwidth_latency_tradeoff = htonl (opt->v.num);
551 break;
552 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR:
553 msg->ibf_bucket_number_factor = htonl (opt->v.num);
554 break;
555 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT:
556 msg->ibf_number_of_buckets_per_element = htonl (opt->v.num);
557 break;
537 case GNUNET_SETU_OPTION_FORCE_FULL: 558 case GNUNET_SETU_OPTION_FORCE_FULL:
538 msg->force_full = GNUNET_YES; 559 msg->force_full = GNUNET_YES;
539 break; 560 break;
@@ -788,6 +809,13 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
788 mqm = GNUNET_MQ_msg (msg, 809 mqm = GNUNET_MQ_msg (msg,
789 GNUNET_MESSAGE_TYPE_SETU_ACCEPT); 810 GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
790 msg->accept_reject_id = htonl (request->accept_id); 811 msg->accept_reject_id = htonl (request->accept_id);
812
813 /* Set default values */
814 msg->byzantine_upper_bond = UINT64_MAX;
815 msg->bandwidth_latency_tradeoff = 0;
816 msg->ibf_bucket_number_factor = 2;
817 msg->ibf_number_of_buckets_per_element = 3;
818
791 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) 819 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
792 { 820 {
793 switch (opt->type) 821 switch (opt->type)
@@ -796,6 +824,18 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
796 msg->byzantine = GNUNET_YES; 824 msg->byzantine = GNUNET_YES;
797 msg->byzantine_lower_bound = htonl (opt->v.num); 825 msg->byzantine_lower_bound = htonl (opt->v.num);
798 break; 826 break;
827 case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND:
828 msg->byzantine_upper_bond = htonl (opt->v.num);
829 break;
830 case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF:
831 msg->bandwidth_latency_tradeoff = htonl (opt->v.num);
832 break;
833 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR:
834 msg->ibf_bucket_number_factor = htonl (opt->v.num);
835 break;
836 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT:
837 msg->ibf_number_of_buckets_per_element = htonl (opt->v.num);
838 break;
799 case GNUNET_SETU_OPTION_FORCE_FULL: 839 case GNUNET_SETU_OPTION_FORCE_FULL:
800 msg->force_full = GNUNET_YES; 840 msg->force_full = GNUNET_YES;
801 break; 841 break;
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c
index 2fb7d015e..5a0c9d70d 100644
--- a/src/setu/test_setu_api.c
+++ b/src/setu/test_setu_api.c
@@ -204,62 +204,6 @@ init_set2 (void *cls)
204 GNUNET_SETU_add_element (set2, &element, &start, NULL); 204 GNUNET_SETU_add_element (set2, &element, &start, NULL);
205} 205}
206 206
207/**
208 * Generate random byte stream
209 */
210
211unsigned char *gen_rdm_bytestream (size_t num_bytes)
212{
213 unsigned char *stream = GNUNET_malloc (num_bytes);
214 GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes);
215 return stream;
216}
217
218/**
219 * Generate random sets
220 */
221
222static void
223initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes)
224{
225 struct GNUNET_SETU_Element element;
226 element.element_type = 0;
227
228 // Add elements to both sets
229 for (int i = 0; i < overlap; i++) {
230 element.data = gen_rdm_bytestream(element_size_in_bytes);
231 element.size = element_size_in_bytes;
232 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
233 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
234 set1_size--;
235 set2_size--;
236 }
237 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n");
238
239 // Add other elements to set 1
240 while(set1_size>0) {
241 element.data = gen_rdm_bytestream(element_size_in_bytes);
242 element.size = element_size_in_bytes;
243 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
244 set1_size--;
245 }
246 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n");
247
248 // Add other elements to set 2
249 while(set2_size > 0) {
250 element.data = gen_rdm_bytestream(element_size_in_bytes);
251 element.size = element_size_in_bytes;
252
253 if(set2_size != 1) {
254 GNUNET_SETU_add_element (set2, &element,NULL, NULL);
255 } else {
256 GNUNET_SETU_add_element (set2, &element,&start, NULL);
257 }
258
259 set2_size--;
260 }
261 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
262}
263 207
264/** 208/**
265 * Initialize the first set, continue. 209 * Initialize the first set, continue.
@@ -392,9 +336,7 @@ run (void *cls,
392 /* test the real set reconciliation */ 336 /* test the real set reconciliation */
393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
394 "Running real set-reconciliation\n"); 338 "Running real set-reconciliation\n");
395 //init_set1 (); 339 init_set1 ();
396 initRandomSets(19500,20000,20000,4096);
397 //initRandomSets(19500,20000,20000,32);
398} 340}
399 341
400 342