aboutsummaryrefslogtreecommitdiff
path: root/src/setu/gnunet-service-setu.c
diff options
context:
space:
mode:
authorElias Summermatter <elias.summermatter@seccom.ch>2021-04-02 15:46:25 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2021-07-30 16:06:15 +0200
commitebc70e1bccd6c2f784df8630f1105a91bc7bfeed (patch)
treebc5963b5c3ad38176120a7b0223e68c0709b41f2 /src/setu/gnunet-service-setu.c
parente8eb1ecc006ffd3d7aa99fc9d3e8d19eedd9d343 (diff)
downloadgnunet-ebc70e1bccd6c2f784df8630f1105a91bc7bfeed.tar.gz
gnunet-ebc70e1bccd6c2f784df8630f1105a91bc7bfeed.zip
SETU: Implement LSD0003
Diffstat (limited to 'src/setu/gnunet-service-setu.c')
-rw-r--r--src/setu/gnunet-service-setu.c2081
1 files changed, 1819 insertions, 262 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index bd1113f15..a51e92b71 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -22,6 +22,7 @@
22 * @brief set union operation 22 * @brief set union operation
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -50,33 +51,61 @@
50 */ 51 */
51#define SE_STRATA_COUNT 32 52#define SE_STRATA_COUNT 32
52 53
54
53/** 55/**
54 * Size of the IBFs in the strata estimator. 56 * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348
57 * Based on the bsc thesis of Elias Summermatter (2021)
55 */ 58 */
56#define SE_IBF_SIZE 80 59#define SE_IBFS_TOTAL_SIZE 632
57 60
58/** 61/**
59 * The hash num parameter for the difference digests and strata estimators. 62 * The hash num parameter for the difference digests and strata estimators.
60 */ 63 */
61#define SE_IBF_HASH_NUM 4 64#define SE_IBF_HASH_NUM 3
62 65
63/** 66/**
64 * Number of buckets that can be transmitted in one message. 67 * Number of buckets that can be transmitted in one message.
65 */ 68 */
66#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) 69#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
67 70
68/** 71/**
69 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). 72 * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
70 * Choose this value so that computing the IBF is still cheaper 73 * Choose this value so that computing the IBF is still cheaper
71 * than transmitting all values. 74 * than transmitting all values.
72 */ 75 */
73#define MAX_IBF_ORDER (20) 76#define MAX_IBF_SIZE 1048576
77
78
79/**
80 * Minimal size of an ibf
81 * Based on the bsc thesis of Elias Summermatter (2021)
82 */
83#define IBF_MIN_SIZE 37
84
85/**
86 * AVG RTT for differential sync when k=2 and Factor = 2
87 * Based on the bsc thesis of Elias Summermatter (2021)
88 */
89#define DIFFERENTIAL_RTT_MEAN 3.65145
90
91/**
92 * Security level used for byzantine checks (2^80)
93 */
94
95#define SECURITY_LEVEL 80
96
97/**
98 * Is the estimated probabily for a new round this values
99 * is based on the bsc thesis of Elias Summermatter (2021)
100 */
101
102#define PROBABILITY_FOR_NEW_ROUND 0.15
74 103
75/** 104/**
76 * Number of buckets used in the ibf per estimated 105 * Measure the performance in a csv
77 * difference.
78 */ 106 */
79#define IBF_ALPHA 4 107
108#define MEASURE_PERFORMANCE 0
80 109
81 110
82/** 111/**
@@ -146,6 +175,28 @@ enum UnionOperationPhase
146 PHASE_FULL_RECEIVING 175 PHASE_FULL_RECEIVING
147}; 176};
148 177
178/**
179 * Different modes of operations
180 */
181
182enum MODE_OF_OPERATION
183{
184 /**
185 * Mode just synchronizes the difference between sets
186 */
187 DIFFERENTIAL_SYNC,
188
189 /**
190 * Mode send full set sending local set first
191 */
192 FULL_SYNC_LOCAL_SENDING_FIRST,
193
194 /**
195 * Mode request full set from remote peer
196 */
197 FULL_SYNC_REMOTE_SENDING_FIRST
198};
199
149 200
150/** 201/**
151 * Information about an element element in the set. All elements are 202 * Information about an element element in the set. All elements are
@@ -277,7 +328,7 @@ struct Operation
277 * Copy of the set's strata estimator at the time of 328 * Copy of the set's strata estimator at the time of
278 * creation of this operation. 329 * creation of this operation.
279 */ 330 */
280 struct StrataEstimator *se; 331 struct MultiStrataEstimator *se;
281 332
282 /** 333 /**
283 * The IBF we currently receive. 334 * The IBF we currently receive.
@@ -320,7 +371,7 @@ struct Operation
320 /** 371 /**
321 * Number of ibf buckets already received into the @a remote_ibf. 372 * Number of ibf buckets already received into the @a remote_ibf.
322 */ 373 */
323 unsigned int ibf_buckets_received; 374 uint64_t ibf_buckets_received;
324 375
325 /** 376 /**
326 * Salt that we're using for sending IBFs 377 * Salt that we're using for sending IBFs
@@ -386,7 +437,7 @@ struct Operation
386 * Lower bound for the set size, used only when 437 * Lower bound for the set size, used only when
387 * byzantine mode is enabled. 438 * byzantine mode is enabled.
388 */ 439 */
389 int byzantine_lower_bound; 440 uint64_t byzantine_lower_bound;
390 441
391 /** 442 /**
392 * Unique request id for the request from a remote peer, sent to the 443 * Unique request id for the request from a remote peer, sent to the
@@ -401,21 +452,83 @@ struct Operation
401 */ 452 */
402 unsigned int generation_created; 453 unsigned int generation_created;
403 454
455
456 /**
457 * User defined Bandwidth Round Trips Tradeoff
458 */
459 uint64_t rtt_bandwidth_tradeoff;
460
461
404 /** 462 /**
405 * User defined Bandwidth Round Trips Tradeoff 463 * Number of Element per bucket in IBF
464 */
465 uint8_t ibf_number_buckets_per_element;
466
467
468 /**
469 * Set difference is multiplied with this factor
470 * to gennerate large enought IBF
406 */ 471 */
407 double rtt_bandwidth_tradeoff; 472 uint8_t ibf_bucket_number_factor;
408 473
409 /** 474 /**
410 * Number of Element per bucket in IBF 475 * Defines which site a client is
476 * 0 = Initiating peer
477 * 1 = Receiving peer
411 */ 478 */
412 unsigned int ibf_number_buckets_per_element; 479 uint8_t peer_site;
480
413 481
414 /** 482 /**
415 * Number of buckets in IBF 483 * Local peer element count
416 */ 484 */
417 unsigned ibf_bucket_number; 485 uint64_t local_element_count;
418 486
487 /**
488 * Mode of operation that was chosen by the algorithm
489 */
490 uint8_t mode_of_operation;
491
492 /**
493 * Hashmap to keep track of the send/received messages
494 */
495 struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
496
497 /**
498 * Hashmap to keep track of the send/received inquiries (ibf keys)
499 */
500 struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
501
502
503 /**
504 * Total size of local set
505 */
506 uint64_t total_elements_size_local;
507
508 /**
509 * Limit of number of elements in set
510 */
511 uint64_t byzantine_upper_bound;
512
513 /**
514 * is the count of already passed differential sync iterations
515 */
516 uint8_t differential_sync_iterations;
517
518 /**
519 * Estimated or committed set difference at the start
520 */
521 uint64_t remote_set_diff;
522
523 /**
524 * Estimated or committed set difference at the start
525 */
526 uint64_t local_set_diff;
527
528 /**
529 * Boolean to enforce an active passive switch
530 */
531 bool active_passive_switch_required;
419}; 532};
420 533
421 534
@@ -431,6 +544,16 @@ struct SetContent
431 struct GNUNET_CONTAINER_MultiHashMap *elements; 544 struct GNUNET_CONTAINER_MultiHashMap *elements;
432 545
433 /** 546 /**
547 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized.
548 */
549 struct GNUNET_CONTAINER_MultiHashMap *elements_randomized;
550
551 /**
552 * Salt to construct the randomized element map
553 */
554 uint64_t elements_randomized_salt;
555
556 /**
434 * Number of references to the content. 557 * Number of references to the content.
435 */ 558 */
436 unsigned int refcount; 559 unsigned int refcount;
@@ -478,7 +601,7 @@ struct Set
478 * The strata estimator is only generated once for each set. The IBF keys 601 * The strata estimator is only generated once for each set. The IBF keys
479 * are derived from the element hashes with salt=0. 602 * are derived from the element hashes with salt=0.
480 */ 603 */
481 struct StrataEstimator *se; 604 struct MultiStrataEstimator *se;
482 605
483 /** 606 /**
484 * Evaluate operations are held in a linked list. 607 * Evaluate operations are held in a linked list.
@@ -635,96 +758,679 @@ static int in_shutdown;
635 */ 758 */
636static uint32_t suggest_id; 759static uint32_t suggest_id;
637 760
761#if MEASURE_PERFORMANCE
762/**
763 * Handles configuration file for setu performance test
764 *
765 */
766static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767
638 768
639/** 769/**
640 * Added Roundtripscounter 770 * Stores the performance data for induvidual message
641 */ 771 */
642 772
643 773
644struct perf_num_send_resived_msg { 774struct perf_num_send_received_msg
645 int sent; 775{
646 int sent_var_bytes; 776 uint64_t sent;
647 int received; 777 uint64_t sent_var_bytes;
648 int received_var_bytes; 778 uint64_t received;
779 uint64_t received_var_bytes;
649}; 780};
650 781
782/**
783 * Main struct to messure perfomance (data/rtts)
784 */
785struct per_store_struct
786{
787 struct perf_num_send_received_msg operation_request;
788 struct perf_num_send_received_msg se;
789 struct perf_num_send_received_msg request_full;
790 struct perf_num_send_received_msg element_full;
791 struct perf_num_send_received_msg full_done;
792 struct perf_num_send_received_msg ibf;
793 struct perf_num_send_received_msg inquery;
794 struct perf_num_send_received_msg element;
795 struct perf_num_send_received_msg demand;
796 struct perf_num_send_received_msg offer;
797 struct perf_num_send_received_msg done;
798 struct perf_num_send_received_msg over;
799 uint64_t se_diff;
800 uint64_t se_diff_remote;
801 uint64_t se_diff_local;
802 uint64_t active_passive_switches;
803 uint8_t mode_of_operation;
804};
651 805
652struct perf_rtt_struct 806struct per_store_struct perf_store;
653{ 807#endif
654 struct perf_num_send_resived_msg operation_request; 808
655 struct perf_num_send_resived_msg se; 809/**
656 struct perf_num_send_resived_msg request_full; 810 * Different states to control the messages flow in differential mode
657 struct perf_num_send_resived_msg element_full; 811 */
658 struct perf_num_send_resived_msg full_done; 812
659 struct perf_num_send_resived_msg ibf; 813enum MESSAGE_CONTROL_FLOW_STATE
660 struct perf_num_send_resived_msg inquery; 814{
661 struct perf_num_send_resived_msg element; 815 /**
662 struct perf_num_send_resived_msg demand; 816 * Initial message state
663 struct perf_num_send_resived_msg offer; 817 */
664 struct perf_num_send_resived_msg done; 818 MSG_CFS_UNINITIALIZED,
665 struct perf_num_send_resived_msg over; 819
820 /**
821 * Track that a message has been sent
822 */
823 MSG_CFS_SENT,
824
825 /**
826 * Track that receiving this message is expected
827 */
828 MSG_CFS_EXPECTED,
829
830 /**
831 * Track that message has been recieved
832 */
833 MSG_CFS_RECEIVED,
834};
835
836/**
837 * Message types to track in message control flow
838 */
839
840enum MESSAGE_TYPE
841{
842 /**
843 * Offer message type
844 */
845 OFFER_MESSAGE,
846 /**
847 * Demand message type
848 */
849 DEMAND_MESSAGE,
850 /**
851 * Elemente message type
852 */
853 ELEMENT_MESSAGE,
666}; 854};
667 855
668struct perf_rtt_struct perf_rtt; 856/**
857 * Struct to tracked messages in message controll flow
858 */
859
860struct messageControlFlowElement
861{
862 /**
863 * Track the message control state of the offer message
864 */
865 enum MESSAGE_CONTROL_FLOW_STATE offer;
866 /**
867 * Track the message control state of the demand message
868 */
869 enum MESSAGE_CONTROL_FLOW_STATE demand;
870 /**
871 * Track the message control state of the element message
872 */
873 enum MESSAGE_CONTROL_FLOW_STATE element;
874};
875
876
877#if MEASURE_PERFORMANCE
878/**
879 * Loads different configuration to do perform perfomance tests
880 * @param op
881 */
882static void
883load_config (struct Operation *op)
884{
885
886 setu_cfg = GNUNET_CONFIGURATION_create ();
887 GNUNET_CONFIGURATION_load (setu_cfg,"perf_setu.conf");
888
889
890 long long number;
891 float fl;
892 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
893 &fl);
894 op->ibf_bucket_number_factor = fl;
895
896 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
897 &number);
898 op->ibf_number_buckets_per_element = number;
899
900 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"PERFORMANCE", "TRADEOFF",
901 &number);
902 op->rtt_bandwidth_tradeoff = number;
903
904
905 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"BOUNDARIES", "UPPER_ELEMENT",
906 &number);
907 op->byzantine_upper_bound = number;
908
909
910 op->peer_site = 0;
911}
669 912
670 913
914/**
915 * Function to calculate total bytes used for performance messurement
916 * @param size
917 * @param perf_num_send_received_msg
918 * @return bytes used
919 */
671static int 920static int
672sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) { 921sum_sent_received_bytes (uint64_t size, struct perf_num_send_received_msg
673 return (size * perf_rtt_struct.sent) + 922 perf_num_send_received_msg)
674 (size * perf_rtt_struct.received) + 923{
675 perf_rtt_struct.sent_var_bytes + 924 return (size * perf_num_send_received_msg.sent)
676 perf_rtt_struct.received_var_bytes; 925 + (size * perf_num_send_received_msg.received)
926 + perf_num_send_received_msg.sent_var_bytes
927 + perf_num_send_received_msg.received_var_bytes;
677} 928}
678 929
679static float
680calculate_perf_rtt() {
681 /**
682 * Calculate RTT of init phase normally always 1
683 */
684 float rtt = 1;
685 int bytes_transmitted = 0;
686 930
687 /** 931/**
688 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending 932 * Function that calculates the perfmance values and writes them down
689 */ 933 */
690 if (( perf_rtt.element_full.received != 0 ) || 934static void
691 ( perf_rtt.element_full.sent != 0) 935calculate_perf_store ()
692 ) rtt += 1; 936{
693 937
694 if (( perf_rtt.request_full.received != 0 ) || 938 /**
695 ( perf_rtt.request_full.sent != 0) 939 * Calculate RTT of init phase normally always 1
696 ) rtt += 0.5; 940 */
941 float rtt = 1;
942 int bytes_transmitted = 0;
697 943
698 /** 944 /**
699 * In case of a differential sync 3 rtt's are needed. 945 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normaly 1 or 1.5 depending
700 * for every active/passive switch additional 3.5 rtt's are used 946 */
701 */ 947 if ((perf_store.element_full.received != 0) ||
948 (perf_store.element_full.sent != 0)
949 )
950 rtt += 1;
951
952 if ((perf_store.request_full.received != 0) ||
953 (perf_store.request_full.sent != 0)
954 )
955 rtt += 0.5;
956
957 /**
958 * In case of a differential sync 3 rtt's are needed.
959 * for every active/passive switch additional 3.5 rtt's are used
960 */
961 if ((perf_store.element.received != 0) ||
962 (perf_store.element.sent != 0))
963 {
964 int iterations = perf_store.active_passive_switches;
965
966 if (iterations > 0)
967 rtt += iterations * 0.5;
968 rtt += 2.5;
969 }
970
971
972 /**
973 * Calculate data sended size
974 */
975 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
976 GNUNET_SETU_ResultMessage),
977 perf_store.request_full);
978
979 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
980 GNUNET_SETU_ElementMessage),
981 perf_store.element_full);
982 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
983 GNUNET_SETU_ElementMessage),
984 perf_store.element);
985 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
986 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
987 StrataEstimatorMessage),
988 perf_store.se);
989 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
990 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
991 perf_store.ibf);
992 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
993 perf_store.inquery);
994 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
995 GNUNET_MessageHeader),
996 perf_store.demand);
997 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
998 GNUNET_MessageHeader),
999 perf_store.offer);
1000 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1001
1002 /**
1003 * Write IBF failure rate for different BUCKET_NUMBER_FACTOR
1004 */
1005 float factor;
1006 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1007 &factor);
1008 long long num_per_bucket;
1009 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1010 &num_per_bucket);
1011
1012
1013 int decoded = 0;
1014 if (perf_store.active_passive_switches == 0)
1015 decoded = 1;
1016 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1017 IBFMessage),
1018 perf_store.ibf);
1019
1020 FILE *out1 = fopen ("perf_data.csv", "a");
1021 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1022 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1023 bytes_transmitted,
1024 perf_store.se_diff_local,perf_store.se_diff_remote,
1025 perf_store.mode_of_operation);
1026 fclose (out1);
1027
1028}
1029
1030
1031#endif
1032/**
1033 * Function that chooses the optimal mode of operation depending on
1034 * operation parameters.
1035 * @param avg_element_size
1036 * @param local_set_size
1037 * @param remote_set_size
1038 * @param est_set_diff_remote
1039 * @param est_set_diff_local
1040 * @param bandwith_latency_tradeoff
1041 * @param ibf_bucket_number_factor
1042 * @return calcuated mode of operation
1043 */
1044static uint8_t
1045estimate_best_mode_of_operation (uint64_t avg_element_size,
1046 uint64_t local_set_size,
1047 uint64_t remote_set_size,
1048 uint64_t est_set_diff_remote,
1049 uint64_t est_set_diff_local,
1050 uint64_t bandwith_latency_tradeoff,
1051 uint64_t ibf_bucket_number_factor)
1052{
1053
1054 /*
1055 * In case of initial sync fall to predefined states
1056 */
1057
1058 if (0 == local_set_size)
1059 return FULL_SYNC_REMOTE_SENDING_FIRST;
1060 if (0 == remote_set_size)
1061 return FULL_SYNC_LOCAL_SENDING_FIRST;
1062
1063 /*
1064 * Calculate bytes for full Sync
1065 */
1066
1067 uint8_t sizeof_full_done_header = 4;
1068 uint8_t sizeof_done_header = 4;
1069 uint8_t rtt_min_full = 2;
1070 uint8_t sizeof_request_full = 4;
1071 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1072
1073 /* Estimate byte required if we send first */
1074 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1075 + local_set_size;
1076
1077 uint64_t total_bytes_full_local_send_first = (avg_element_size
1078 *
1079 total_elements_to_send_local_send_first) \
1080 + (
1081 total_elements_to_send_local_send_first * sizeof(struct
1082 GNUNET_SETU_ElementMessage)) \
1083 + (sizeof_full_done_header * 2) \
1084 + rtt_min_full
1085 * bandwith_latency_tradeoff;
1086
1087 /* Estimate bytes required if we request from remote peer */
1088 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1089 + remote_set_size;
1090
1091 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1092 *
1093 total_elements_to_send_remote_send_first) \
1094 + (
1095 total_elements_to_send_remote_send_first * sizeof(struct
1096 GNUNET_SETU_ElementMessage)) \
1097 + (sizeof_full_done_header * 2) \
1098 + (rtt_min_full + 0.5)
1099 * bandwith_latency_tradeoff \
1100 + sizeof_request_full;
1101
1102 /*
1103 * Calculate bytes for differential Sync
1104 */
1105
1106 /* Estimate bytes required by IBF transmission*/
1107
1108 long double ibf_bucket_count = estimated_total_diff
1109 * ibf_bucket_number_factor;
1110
1111 if (ibf_bucket_count <= IBF_MIN_SIZE)
1112 {
1113 ibf_bucket_count = IBF_MIN_SIZE;
1114 }
1115 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1116 / MAX_BUCKETS_PER_MESSAGE);
1117
1118 uint64_t estimated_counter_size = ceil (
1119 MIN (2 * log2l ((float) local_set_size / ibf_bucket_count), log2l (
1120 local_set_size)));
1121
1122 long double counter_bytes = (float) estimated_counter_size / 8;
1123
1124 uint64_t ibf_bytes = ceil ((sizeof(struct IBFMessage) * ibf_message_count)
1125 * 1.2 \
1126 + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1127 + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1128 * 1.2 \
1129 + (ibf_bucket_count * counter_bytes) * 1.2);
1130
1131 /* Estimate full byte count for differential sync */
1132 uint64_t element_size = (avg_element_size + sizeof(struct
1133 GNUNET_SETU_ElementMessage)) \
1134 * estimated_total_diff;
1135 uint64_t done_size = sizeof_done_header;
1136 uint64_t inquery_size = (sizeof(struct IBF_Key) + sizeof(struct
1137 InquiryMessage))
1138 * estimated_total_diff;
1139 uint64_t demand_size =
1140 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1141 * estimated_total_diff;
1142 uint64_t offer_size = (sizeof(struct GNUNET_HashCode) + sizeof(struct
1143 GNUNET_MessageHeader))
1144 * estimated_total_diff;
1145
1146 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1147 + demand_size + offer_size + ibf_bytes) \
1148 + (DIFFERENTIAL_RTT_MEAN
1149 * bandwith_latency_tradeoff);
1150
1151 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1152 total_bytes_full_remote_send_first);
1153
1154 /* Decide between full and differential sync */
1155
1156 if (full_min < total_bytes_diff)
1157 {
1158 /* Decide between sending all element first or receiving all elements */
1159 if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1160 {
1161 return FULL_SYNC_LOCAL_SENDING_FIRST;
1162 }
1163 else
1164 {
1165 return FULL_SYNC_REMOTE_SENDING_FIRST;
1166 }
1167 }
1168 else
1169 {
1170 return DIFFERENTIAL_SYNC;
1171 }
1172}
1173
1174
1175/**
1176 * Validates the if a message is received in a correct phase
1177 * @param allowed_phases
1178 * @param size_phases
1179 * @param op
1180 * @return GNUNET_YES if message permitted in phase and GNUNET_NO if not permitted in given
1181 * phase
1182 */
1183static int
1184check_valid_phase (const uint8_t allowed_phases[], size_t size_phases, struct
1185 Operation *op)
1186{
1187 /**
1188 * Iterate over allowed phases
1189 */
1190 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1191 {
1192 uint8_t phase = allowed_phases[phase_ctr];
1193 if (phase == op->phase)
1194 {
1195 LOG (GNUNET_ERROR_TYPE_DEBUG,
1196 "Message received in valid phase\n");
1197 return GNUNET_YES;
1198 }
1199 }
1200 LOG (GNUNET_ERROR_TYPE_ERROR,
1201 "Received message in invalid phase: %u\n", op->phase);
1202 return GNUNET_NO;
1203}
1204
1205
1206/**
1207 * Function to update, track and validate message received in differential
1208 * sync. This function tracks states of messages and check it against different
1209 * constraints as described in Summermatter's BSc Thesis (2021)
1210 * @param hash_map: Hashmap to store message control flow
1211 * @param new_mcfs: The new message control flow state an given message type should be set to
1212 * @param hash_code: Hash code of the element
1213 * @param mt: The message type for which the message control flow state should be set
1214 * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid
1215 * at this point in message flow
1216 */
1217static int
1218update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map,
1219 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1220 const struct GNUNET_HashCode *hash_code,
1221 enum MESSAGE_TYPE mt)
1222{
1223 struct messageControlFlowElement *cfe = NULL;
1224 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1225
1226 /**
1227 * Check logic for forbidden messages
1228 */
702 1229
703 int iterations = perf_rtt.ibf.received; 1230 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
704 if(iterations > 1) 1231 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
705 rtt += (iterations - 1 ) * 0.5; 1232 {
706 rtt += 3 * iterations; 1233 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1234 {
1235 LOG (GNUNET_ERROR_TYPE_ERROR,
1236 "Received an element without sent offer!\n");
1237 return GNUNET_NO;
1238 }
1239 /* Check that only requested elements are received! */
1240 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1241 MSG_CFS_SENT))
1242 {
1243 LOG (GNUNET_ERROR_TYPE_ERROR,
1244 "Received an element that was not demanded\n");
1245 return GNUNET_NO;
1246 }
1247 }
1248
1249 /**
1250 * In case the element hash is not in the hashmap create a new entry
1251 */
1252
1253 if (NULL == cfe)
1254 {
1255 cfe = GNUNET_new (struct messageControlFlowElement);
1256 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1257 cfe,
1258 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1259 {
1260 GNUNET_free (cfe);
1261 return GNUNET_SYSERR;
1262 }
1263 }
1264
1265 /**
1266 * Set state of message
1267 */
1268
1269 if (OFFER_MESSAGE == mt)
1270 {
1271 mcfs = &cfe->offer;
1272 }
1273 else if (DEMAND_MESSAGE == mt)
1274 {
1275 mcfs = &cfe->demand;
1276 }
1277 else if (ELEMENT_MESSAGE == mt)
1278 {
1279 mcfs = &cfe->element;
1280 }
1281 else
1282 {
1283 return GNUNET_SYSERR;
1284 }
1285
1286 /**
1287 * Check if state is allowed
1288 */
1289
1290 if (new_mcfs <= *mcfs)
1291 {
1292 return GNUNET_NO;
1293 }
1294
1295 *mcfs = new_mcfs;
1296 return GNUNET_YES;
1297}
1298
1299
1300/**
1301 * Validate if a message in differential sync si already received before.
1302 * @param hash_map
1303 * @param hash_code
1304 * @param mt
1305 * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO
1306 */
1307static int
1308is_message_in_message_control_flow (struct
1309 GNUNET_CONTAINER_MultiHashMap *hash_map,
1310 struct GNUNET_HashCode *hash_code,
1311 enum MESSAGE_TYPE mt)
1312{
1313 struct messageControlFlowElement *cfe = NULL;
1314 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1315
1316 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1317
1318 /**
1319 * Set state of message
1320 */
1321
1322 if (cfe != NULL)
1323 {
1324 if (OFFER_MESSAGE == mt)
1325 {
1326 mcfs = &cfe->offer;
1327 }
1328 else if (DEMAND_MESSAGE == mt)
1329 {
1330 mcfs = &cfe->demand;
1331 }
1332 else if (ELEMENT_MESSAGE == mt)
1333 {
1334 mcfs = &cfe->element;
1335 }
1336 else
1337 {
1338 return GNUNET_SYSERR;
1339 }
707 1340
708 /** 1341 /**
709 * Calculate data sended size 1342 * Evaluate if set is in message
710 */ 1343 */
711 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full); 1344 if (*mcfs != MSG_CFS_UNINITIALIZED)
712 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full); 1345 {
713 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element); 1346 return GNUNET_YES;
714 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request); 1347 }
715 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se); 1348 }
716 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done); 1349 return GNUNET_NO;
717 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf); 1350}
718 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery); 1351
719 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand); 1352
720 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer); 1353/**
721 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done); 1354 * Iterator for determining if all demands have been
1355 * satisfied
1356 *
1357 * @param cls the union operation `struct Operation *`
1358 * @param key unused
1359 * @param value the `struct ElementEntry *` to insert
1360 * into the key-to-element mapping
1361 * @return #GNUNET_YES (to continue iterating)
1362 */
1363static int
1364determinate_done_message_iterator (void *cls,
1365 const struct GNUNET_HashCode *key,
1366 void *value)
1367{
1368 struct messageControlFlowElement *mcfe = value;
1369
1370 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1371 {
1372 return GNUNET_YES;
1373 }
1374 return GNUNET_NO;
1375}
1376
1377
1378/**
1379 * Iterator for determining average size
1380 *
1381 * @param cls the union operation `struct Operation *`
1382 * @param key unused
1383 * @param value the `struct ElementEntry *` to insert
1384 * into the key-to-element mapping
1385 * @return #GNUNET_YES (to continue iterating)
1386 */
1387static int
1388determinate_avg_element_size_iterator (void *cls,
1389 const struct GNUNET_HashCode *key,
1390 void *value)
1391{
1392 struct Operation *op = cls;
1393 struct GNUNET_SETU_Element *element = value;
1394 op->total_elements_size_local += element->size;
1395 return GNUNET_YES;
1396}
722 1397
723 LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted);
724 1398
725 LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt )); 1399/**
1400 * Create randomized element hashmap for full sending
1401 *
1402 * @param cls the union operation `struct Operation *`
1403 * @param key unused
1404 * @param value the `struct ElementEntry *` to insert
1405 * into the key-to-element mapping
1406 * @return #GNUNET_YES (to continue iterating)
1407 */
1408static int
1409create_randomized_element_iterator (void *cls,
1410 const struct GNUNET_HashCode *key,
1411 void *value)
1412{
1413 struct Operation *op = cls;
726 1414
727 return rtt; 1415 struct GNUNET_HashContext *hashed_key_context =
1416 GNUNET_CRYPTO_hash_context_start ();
1417 struct GNUNET_HashCode new_key;
1418
1419 /**
1420 * Hash element with new salt to randomize hashmap
1421 */
1422 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1423 &key,
1424 sizeof(struct IBF_Key));
1425 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1426 &op->set->content->elements_randomized_salt,
1427 sizeof(uint32_t));
1428 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1429 &new_key);
1430 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1431 &new_key,value,
1432 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1433 return GNUNET_YES;
728} 1434}
729 1435
730 1436
@@ -808,6 +1514,36 @@ send_client_done (void *cls)
808} 1514}
809 1515
810 1516
1517/**
1518 * Check if all given byzantine parameters are in given boundaries
1519 * @param op
1520 * @return indicator if all given byzantine parameters are in given boundaries
1521 */
1522
1523static int
1524check_byzantine_bounds (struct Operation *op)
1525{
1526 if (op->byzantine != GNUNET_YES)
1527 return GNUNET_OK;
1528
1529 /**
1530 * Check upper byzantine bounds
1531 */
1532 if (op->remote_element_count + op->remote_set_diff >
1533 op->byzantine_upper_bound)
1534 return GNUNET_SYSERR;
1535 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1536 return GNUNET_SYSERR;
1537
1538 /**
1539 * Check lower byzantine bounds
1540 */
1541 if (op->remote_element_count < op->byzantine_lower_bound)
1542 return GNUNET_SYSERR;
1543 return GNUNET_OK;
1544}
1545
1546
811/* FIXME: the destroy logic is a mess and should be cleaned up! */ 1547/* FIXME: the destroy logic is a mess and should be cleaned up! */
812 1548
813/** 1549/**
@@ -977,6 +1713,101 @@ fail_union_operation (struct Operation *op)
977 1713
978 1714
979/** 1715/**
1716 * Function that checks if full sync is plausible
1717 * @param initial_local_elements_in_set
1718 * @param estimated_set_difference
1719 * @param repeated_elements
1720 * @param fresh_elements
1721 * @param op
1722 * @return GNUNET_OK if
1723 */
1724
1725static void
1726full_sync_plausibility_check (struct Operation *op)
1727{
1728 if (GNUNET_YES != op->byzantine)
1729 return;
1730
1731 int security_level_lb = -1 * SECURITY_LEVEL;
1732 uint64_t duplicates = op->received_fresh - op->received_total;
1733
1734 /*
1735 * Protect full sync from receiving double element when in FULL SENDING
1736 */
1737 if (PHASE_FULL_SENDING == op->phase)
1738 {
1739 if (duplicates > 0)
1740 {
1741 LOG (GNUNET_ERROR_TYPE_ERROR,
1742 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1743 "mode of operation this is not allowed! Duplicates: %lu\n",
1744 duplicates);
1745 GNUNET_break_op (0);
1746 fail_union_operation (op);
1747 return;
1748 }
1749
1750 }
1751
1752 /*
1753 * Protect full sync with probabilistic algorithm
1754 */
1755 if (PHASE_FULL_RECEIVING == op->phase)
1756 {
1757 if (0 == op->remote_set_diff)
1758 op->remote_set_diff = 1;
1759
1760 long double base = (1 - (long double) (op->remote_set_diff
1761 / (long double) (op->initial_size
1762 + op->
1763 remote_set_diff)));
1764 long double exponent = (op->received_total - (op->received_fresh * ((long
1765 double)
1766 op->
1767 initial_size
1768 / (long
1769 double)
1770 op->
1771 remote_set_diff)));
1772 long double value = exponent * (log2l (base) / log2l (2));
1773 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1774 {
1775 LOG (GNUNET_ERROR_TYPE_ERROR,
1776 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1777 "to many duplicated full element : %LF\n",
1778 value);
1779 GNUNET_break_op (0);
1780 fail_union_operation (op);
1781 return;
1782 }
1783 }
1784}
1785
1786
1787/**
1788 * Limit active passive switches in differential sync to configured security level
1789 * @param op
1790 */
1791static void
1792check_max_differential_rounds (struct Operation *op)
1793{
1794 double probability = op->differential_sync_iterations * (log2l (
1795 PROBABILITY_FOR_NEW_ROUND)
1796 / log2l (2));
1797 if ((-1 * SECURITY_LEVEL) > probability)
1798 {
1799 LOG (GNUNET_ERROR_TYPE_ERROR,
1800 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1801 "switches in differential sync: %u\n",
1802 op->differential_sync_iterations);
1803 GNUNET_break_op (0);
1804 fail_union_operation (op);
1805 return;
1806 }
1807}
1808
1809
1810/**
980 * Derive the IBF key from a hash code and 1811 * Derive the IBF key from a hash code and
981 * a salt. 1812 * a salt.
982 * 1813 *
@@ -1004,12 +1835,12 @@ get_ibf_key (const struct GNUNET_HashCode *src)
1004struct GetElementContext 1835struct GetElementContext
1005{ 1836{
1006 /** 1837 /**
1007 * FIXME. 1838 * Gnunet hash code in context
1008 */ 1839 */
1009 struct GNUNET_HashCode hash; 1840 struct GNUNET_HashCode hash;
1010 1841
1011 /** 1842 /**
1012 * FIXME. 1843 * Pointer to the key enty
1013 */ 1844 */
1014 struct KeyEntry *k; 1845 struct KeyEntry *k;
1015}; 1846};
@@ -1122,7 +1953,7 @@ salt_key (const struct IBF_Key *k_in,
1122 uint32_t salt, 1953 uint32_t salt,
1123 struct IBF_Key *k_out) 1954 struct IBF_Key *k_out)
1124{ 1955{
1125 int s = salt % 64; 1956 int s = (salt * 7) % 64;
1126 uint64_t x = k_in->key_val; 1957 uint64_t x = k_in->key_val;
1127 1958
1128 /* rotate ibf key */ 1959 /* rotate ibf key */
@@ -1132,14 +1963,14 @@ salt_key (const struct IBF_Key *k_in,
1132 1963
1133 1964
1134/** 1965/**
1135 * FIXME. 1966 * Reverse modification done in the salt_key function
1136 */ 1967 */
1137static void 1968static void
1138unsalt_key (const struct IBF_Key *k_in, 1969unsalt_key (const struct IBF_Key *k_in,
1139 uint32_t salt, 1970 uint32_t salt,
1140 struct IBF_Key *k_out) 1971 struct IBF_Key *k_out)
1141{ 1972{
1142 int s = salt % 64; 1973 int s = (salt * 7) % 64;
1143 uint64_t x = k_in->key_val; 1974 uint64_t x = k_in->key_val;
1144 1975
1145 x = (x << s) | (x >> (64 - s)); 1976 x = (x << s) | (x >> (64 - s));
@@ -1258,7 +2089,9 @@ prepare_ibf (struct Operation *op,
1258 2089
1259 if (NULL != op->local_ibf) 2090 if (NULL != op->local_ibf)
1260 ibf_destroy (op->local_ibf); 2091 ibf_destroy (op->local_ibf);
1261 op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 2092 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2093 op->local_ibf = ibf_create (size,
2094 ((uint8_t) op->ibf_number_buckets_per_element));
1262 if (NULL == op->local_ibf) 2095 if (NULL == op->local_ibf)
1263 { 2096 {
1264 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2097 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1283,13 +2116,23 @@ prepare_ibf (struct Operation *op,
1283 */ 2116 */
1284static int 2117static int
1285send_ibf (struct Operation *op, 2118send_ibf (struct Operation *op,
1286 uint16_t ibf_order) 2119 uint32_t ibf_size)
1287{ 2120{
1288 unsigned int buckets_sent = 0; 2121 uint64_t buckets_sent = 0;
1289 struct InvertibleBloomFilter *ibf; 2122 struct InvertibleBloomFilter *ibf;
2123 op->differential_sync_iterations++;
2124
2125 /**
2126 * Enforce min size of IBF
2127 */
2128 uint32_t ibf_min_size = IBF_MIN_SIZE;
1290 2129
2130 if (ibf_size < ibf_min_size)
2131 {
2132 ibf_size = ibf_min_size;
2133 }
1291 if (GNUNET_OK != 2134 if (GNUNET_OK !=
1292 prepare_ibf (op, 1 << ibf_order)) 2135 prepare_ibf (op, ibf_size))
1293 { 2136 {
1294 /* allocation failed */ 2137 /* allocation failed */
1295 return GNUNET_SYSERR; 2138 return GNUNET_SYSERR;
@@ -1297,45 +2140,48 @@ send_ibf (struct Operation *op,
1297 2140
1298 LOG (GNUNET_ERROR_TYPE_DEBUG, 2141 LOG (GNUNET_ERROR_TYPE_DEBUG,
1299 "sending ibf of size %u\n", 2142 "sending ibf of size %u\n",
1300 1 << ibf_order); 2143 1 << ibf_size);
1301 2144
1302 { 2145 {
1303 char name[64]; 2146 char name[64];
1304 GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); 2147 GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_size);
1305 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); 2148 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1306 } 2149 }
1307 2150
1308 ibf = op->local_ibf; 2151 ibf = op->local_ibf;
1309 2152
1310 while (buckets_sent < (1 << ibf_order)) 2153 while (buckets_sent < ibf_size)
1311 { 2154 {
1312 unsigned int buckets_in_message; 2155 unsigned int buckets_in_message;
1313 struct GNUNET_MQ_Envelope *ev; 2156 struct GNUNET_MQ_Envelope *ev;
1314 struct IBFMessage *msg; 2157 struct IBFMessage *msg;
1315 2158
1316 buckets_in_message = (1 << ibf_order) - buckets_sent; 2159 buckets_in_message = ibf_size - buckets_sent;
1317 /* limit to maximum */ 2160 /* limit to maximum */
1318 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 2161 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1319 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 2162 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1320 2163
1321 perf_rtt.ibf.sent += 1; 2164#if MEASURE_PERFORMANCE
1322 perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE ); 2165 perf_store.ibf.sent += 1;
2166 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2167#endif
1323 ev = GNUNET_MQ_msg_extra (msg, 2168 ev = GNUNET_MQ_msg_extra (msg,
1324 buckets_in_message * IBF_BUCKET_SIZE, 2169 buckets_in_message * IBF_BUCKET_SIZE,
1325 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); 2170 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
1326 msg->reserved1 = 0; 2171 msg->ibf_size = ibf_size;
1327 msg->reserved2 = 0;
1328 msg->order = ibf_order;
1329 msg->offset = htonl (buckets_sent); 2172 msg->offset = htonl (buckets_sent);
1330 msg->salt = htonl (op->salt_send); 2173 msg->salt = htonl (op->salt_send);
2174 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2175
2176
1331 ibf_write_slice (ibf, buckets_sent, 2177 ibf_write_slice (ibf, buckets_sent,
1332 buckets_in_message, &msg[1]); 2178 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
1333 buckets_sent += buckets_in_message; 2179 buckets_sent += buckets_in_message;
1334 LOG (GNUNET_ERROR_TYPE_DEBUG, 2180 LOG (GNUNET_ERROR_TYPE_DEBUG,
1335 "ibf chunk size %u, %u/%u sent\n", 2181 "ibf chunk size %u, %lu/%u sent\n",
1336 buckets_in_message, 2182 buckets_in_message,
1337 buckets_sent, 2183 buckets_sent,
1338 1 << ibf_order); 2184 ibf_size);
1339 GNUNET_MQ_send (op->mq, ev); 2185 GNUNET_MQ_send (op->mq, ev);
1340 } 2186 }
1341 2187
@@ -1354,17 +2200,26 @@ send_ibf (struct Operation *op,
1354 * @return the required size of the ibf 2200 * @return the required size of the ibf
1355 */ 2201 */
1356static unsigned int 2202static unsigned int
1357get_order_from_difference (unsigned int diff) 2203get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2204 float ibf_bucket_number_factor)
1358{ 2205{
1359 unsigned int ibf_order; 2206 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2207 * Elias Summermatter (2021) in section 3.11 **/
2208 return (((int) (diff * ibf_bucket_number_factor)) | 1);
1360 2209
1361 ibf_order = 2; 2210}
1362 while (((1 << ibf_order) < (IBF_ALPHA * diff) || 2211
1363 ((1 << ibf_order) < SE_IBF_HASH_NUM)) && 2212
1364 (ibf_order < MAX_IBF_ORDER)) 2213static unsigned int
1365 ibf_order++; 2214get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
1366 // add one for correction 2215 decoded_elements, unsigned int last_ibf_size)
1367 return ibf_order + 1; 2216{
2217 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2218 - (ibf_bucket_number_factor
2219 * decoded_elements));
2220 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2221 * Elias Summermatter (2021) in section 3.11 **/
2222 return next_size | 1;
1368} 2223}
1369 2224
1370 2225
@@ -1391,8 +2246,10 @@ send_full_element_iterator (void *cls,
1391 LOG (GNUNET_ERROR_TYPE_DEBUG, 2246 LOG (GNUNET_ERROR_TYPE_DEBUG,
1392 "Sending element %s\n", 2247 "Sending element %s\n",
1393 GNUNET_h2s (key)); 2248 GNUNET_h2s (key));
1394 perf_rtt.element_full.received += 1; 2249#if MEASURE_PERFORMANCE
1395 perf_rtt.element_full.received_var_bytes += el->size; 2250 perf_store.element_full.received += 1;
2251 perf_store.element_full.received_var_bytes += el->size;
2252#endif
1396 ev = GNUNET_MQ_msg_extra (emsg, 2253 ev = GNUNET_MQ_msg_extra (emsg,
1397 el->size, 2254 el->size,
1398 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 2255 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -1421,10 +2278,25 @@ send_full_set (struct Operation *op)
1421 "Dedicing to transmit the full set\n"); 2278 "Dedicing to transmit the full set\n");
1422 /* FIXME: use a more memory-friendly way of doing this with an 2279 /* FIXME: use a more memory-friendly way of doing this with an
1423 iterator, just as we do in the non-full case! */ 2280 iterator, just as we do in the non-full case! */
2281
2282 // Randomize Elements to send
2283 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2284 32,GNUNET_NO);
2285 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2286 GNUNET_CRYPTO_QUALITY_NONCE,
2287 UINT64_MAX);
1424 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, 2288 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1425 &send_full_element_iterator, 2289 &
2290 create_randomized_element_iterator,
1426 op); 2291 op);
1427 perf_rtt.full_done.sent += 1; 2292
2293 (void) GNUNET_CONTAINER_multihashmap_iterate (
2294 op->set->content->elements_randomized,
2295 &send_full_element_iterator,
2296 op);
2297#if MEASURE_PERFORMANCE
2298 perf_store.full_done.sent += 1;
2299#endif
1428 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 2300 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
1429 GNUNET_MQ_send (op->mq, 2301 GNUNET_MQ_send (op->mq,
1430 ev); 2302 ev);
@@ -1454,7 +2326,7 @@ check_union_p2p_strata_estimator (void *cls,
1454 msg->header.type)); 2326 msg->header.type));
1455 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2327 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1456 if ((GNUNET_NO == is_compressed) && 2328 if ((GNUNET_NO == is_compressed) &&
1457 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) 2329 (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE))
1458 { 2330 {
1459 GNUNET_break (0); 2331 GNUNET_break (0);
1460 return GNUNET_SYSERR; 2332 return GNUNET_SYSERR;
@@ -1473,14 +2345,44 @@ static void
1473handle_union_p2p_strata_estimator (void *cls, 2345handle_union_p2p_strata_estimator (void *cls,
1474 const struct StrataEstimatorMessage *msg) 2346 const struct StrataEstimatorMessage *msg)
1475{ 2347{
1476 perf_rtt.se.received += 1; 2348#if MEASURE_PERFORMANCE
1477 perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2349 perf_store.se.received += 1;
2350 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2351 StrataEstimatorMessage);
2352#endif
1478 struct Operation *op = cls; 2353 struct Operation *op = cls;
1479 struct StrataEstimator *remote_se; 2354 struct MultiStrataEstimator *remote_se;
1480 unsigned int diff; 2355 unsigned int diff;
1481 uint64_t other_size; 2356 uint64_t other_size;
1482 size_t len; 2357 size_t len;
1483 int is_compressed; 2358 int is_compressed;
2359 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2360 op->set->content->elements);
2361 // Setting peer site to receiving peer
2362 op->peer_site = 1;
2363
2364 /**
2365 * Check that the message is received only in supported phase
2366 */
2367 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2368 if (GNUNET_OK !=
2369 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2370 {
2371 GNUNET_break (0);
2372 fail_union_operation (op);
2373 return;
2374 }
2375
2376 /** Only allow 1,2,4,8 SEs **/
2377 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2378 {
2379 LOG (GNUNET_ERROR_TYPE_ERROR,
2380 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2381 msg->se_count);
2382 GNUNET_break_op (0);
2383 fail_union_operation (op);
2384 return;
2385 }
1484 2386
1485 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( 2387 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1486 msg->header.type)); 2388 msg->header.type));
@@ -1490,8 +2392,20 @@ handle_union_p2p_strata_estimator (void *cls,
1490 GNUNET_NO); 2392 GNUNET_NO);
1491 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2393 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1492 other_size = GNUNET_ntohll (msg->set_size); 2394 other_size = GNUNET_ntohll (msg->set_size);
2395 op->remote_element_count = other_size;
2396
2397 if (op->byzantine_upper_bound < op->remote_element_count)
2398 {
2399 LOG (GNUNET_ERROR_TYPE_ERROR,
2400 "Exceeded configured upper bound <%lu> of element: %u\n",
2401 op->byzantine_upper_bound,
2402 op->remote_element_count);
2403 fail_union_operation (op);
2404 return;
2405 }
2406
1493 remote_se = strata_estimator_create (SE_STRATA_COUNT, 2407 remote_se = strata_estimator_create (SE_STRATA_COUNT,
1494 SE_IBF_SIZE, 2408 SE_IBFS_TOTAL_SIZE,
1495 SE_IBF_HASH_NUM); 2409 SE_IBF_HASH_NUM);
1496 if (NULL == remote_se) 2410 if (NULL == remote_se)
1497 { 2411 {
@@ -1503,6 +2417,8 @@ handle_union_p2p_strata_estimator (void *cls,
1503 strata_estimator_read (&msg[1], 2417 strata_estimator_read (&msg[1],
1504 len, 2418 len,
1505 is_compressed, 2419 is_compressed,
2420 msg->se_count,
2421 SE_IBFS_TOTAL_SIZE,
1506 remote_se)) 2422 remote_se))
1507 { 2423 {
1508 /* decompression failed */ 2424 /* decompression failed */
@@ -1511,11 +2427,76 @@ handle_union_p2p_strata_estimator (void *cls,
1511 return; 2427 return;
1512 } 2428 }
1513 GNUNET_assert (NULL != op->se); 2429 GNUNET_assert (NULL != op->se);
1514 diff = strata_estimator_difference (remote_se, 2430 strata_estimator_difference (remote_se,
1515 op->se); 2431 op->se);
2432
2433 /* Calculate remote local diff */
2434 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2435 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2436
2437 /* Prevent estimations from overshooting max element */
2438 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2439 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2440 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2441 diff_local = op->byzantine_upper_bound - op->local_element_count;
2442 if ((diff_remote < 0) || (diff_local < 0))
2443 {
2444 strata_estimator_destroy (remote_se);
2445 LOG (GNUNET_ERROR_TYPE_ERROR,
2446 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2447 "malicious: remote diff %ld, local diff: %ld\n",
2448 diff_remote, diff_local);
2449 GNUNET_break_op (0);
2450 fail_union_operation (op);
2451 return;
2452 }
1516 2453
1517 if (diff > 200) 2454 /* Make estimation more precise in initial sync cases */
1518 diff = diff * 3 / 2; 2455 if (0 == op->remote_element_count)
2456 {
2457 diff_remote = 0;
2458 diff_local = op->local_element_count;
2459 }
2460 if (0 == op->local_element_count)
2461 {
2462 diff_local = 0;
2463 diff_remote = op->remote_element_count;
2464 }
2465
2466 diff = diff_remote + diff_local;
2467 op->remote_set_diff = diff_remote;
2468
2469 /** Calculate avg element size if not initial sync **/
2470 uint64_t avg_element_size = 0;
2471 if (0 < op->local_element_count)
2472 {
2473 op->total_elements_size_local = 0;
2474 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2475 &
2476 determinate_avg_element_size_iterator,
2477 op);
2478 avg_element_size = op->total_elements_size_local / op->local_element_count;
2479 }
2480
2481 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2482 GNUNET_CONTAINER_multihashmap_size (
2483 op->set->content->
2484 elements),
2485 op->
2486 remote_element_count,
2487 diff_remote,
2488 diff_local,
2489 op->
2490 rtt_bandwidth_tradeoff,
2491 op->
2492 ibf_bucket_number_factor);
2493
2494#if MEASURE_PERFORMANCE
2495 perf_store.se_diff_local = diff_local;
2496 perf_store.se_diff_remote = diff_remote;
2497 perf_store.se_diff = diff;
2498 perf_store.mode_of_operation = op->mode_of_operation;
2499#endif
1519 2500
1520 strata_estimator_destroy (remote_se); 2501 strata_estimator_destroy (remote_se);
1521 strata_estimator_destroy (op->se); 2502 strata_estimator_destroy (op->se);
@@ -1523,7 +2504,8 @@ handle_union_p2p_strata_estimator (void *cls,
1523 LOG (GNUNET_ERROR_TYPE_DEBUG, 2504 LOG (GNUNET_ERROR_TYPE_DEBUG,
1524 "got se diff=%d, using ibf size %d\n", 2505 "got se diff=%d, using ibf size %d\n",
1525 diff, 2506 diff,
1526 1U << get_order_from_difference (diff)); 2507 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2508 op->ibf_bucket_number_factor));
1527 2509
1528 { 2510 {
1529 char *set_debug; 2511 char *set_debug;
@@ -1546,16 +2528,8 @@ handle_union_p2p_strata_estimator (void *cls,
1546 return; 2528 return;
1547 } 2529 }
1548 2530
1549 LOG (GNUNET_ERROR_TYPE_ERROR,
1550 "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff);
1551
1552
1553 /**
1554 * Added rtt_bandwidth_tradeoff directly need future improvements
1555 */
1556 if ((GNUNET_YES == op->force_full) || 2531 if ((GNUNET_YES == op->force_full) ||
1557 (diff > op->initial_size / 4) || 2532 (op->mode_of_operation != DIFFERENTIAL_SYNC))
1558 (0 == other_size))
1559 { 2533 {
1560 LOG (GNUNET_ERROR_TYPE_DEBUG, 2534 LOG (GNUNET_ERROR_TYPE_DEBUG,
1561 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", 2535 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
@@ -1565,9 +2539,17 @@ handle_union_p2p_strata_estimator (void *cls,
1565 "# of full sends", 2539 "# of full sends",
1566 1, 2540 1,
1567 GNUNET_NO); 2541 GNUNET_NO);
1568 if ((op->initial_size <= other_size) || 2542 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
1569 (0 == other_size))
1570 { 2543 {
2544 struct TransmitFullMessage *signal_msg;
2545 struct GNUNET_MQ_Envelope *ev;
2546 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2547 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL);
2548 signal_msg->remote_set_difference = htonl (diff_local);
2549 signal_msg->remote_set_size = htonl (op->local_element_count);
2550 signal_msg->local_set_difference = htonl (diff_remote);
2551 GNUNET_MQ_send (op->mq,
2552 ev);
1571 send_full_set (op); 2553 send_full_set (op);
1572 } 2554 }
1573 else 2555 else
@@ -1577,9 +2559,15 @@ handle_union_p2p_strata_estimator (void *cls,
1577 LOG (GNUNET_ERROR_TYPE_DEBUG, 2559 LOG (GNUNET_ERROR_TYPE_DEBUG,
1578 "Telling other peer that we expect its full set\n"); 2560 "Telling other peer that we expect its full set\n");
1579 op->phase = PHASE_FULL_RECEIVING; 2561 op->phase = PHASE_FULL_RECEIVING;
1580 perf_rtt.request_full.sent += 1; 2562#if MEASURE_PERFORMANCE
1581 ev = GNUNET_MQ_msg_header ( 2563 perf_store.request_full.sent += 1;
1582 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); 2564#endif
2565 struct TransmitFullMessage *signal_msg;
2566 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2567 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
2568 signal_msg->remote_set_difference = htonl (diff_local);
2569 signal_msg->remote_set_size = htonl (op->local_element_count);
2570 signal_msg->local_set_difference = htonl (diff_remote);
1583 GNUNET_MQ_send (op->mq, 2571 GNUNET_MQ_send (op->mq,
1584 ev); 2572 ev);
1585 } 2573 }
@@ -1592,7 +2580,9 @@ handle_union_p2p_strata_estimator (void *cls,
1592 GNUNET_NO); 2580 GNUNET_NO);
1593 if (GNUNET_OK != 2581 if (GNUNET_OK !=
1594 send_ibf (op, 2582 send_ibf (op,
1595 get_order_from_difference (diff))) 2583 get_size_from_difference (diff,
2584 op->ibf_number_buckets_per_element,
2585 op->ibf_bucket_number_factor)))
1596 { 2586 {
1597 /* Internal error, best we can do is shut the connection */ 2587 /* Internal error, best we can do is shut the connection */
1598 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2588 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1625,15 +2615,64 @@ send_offers_iterator (void *cls,
1625 2615
1626 /* Detect 32-bit key collision for the 64-bit IBF keys. */ 2616 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1627 if (ke->ibf_key.key_val != sec->ibf_key.key_val) 2617 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2618 {
2619 op->active_passive_switch_required = true;
1628 return GNUNET_YES; 2620 return GNUNET_YES;
2621 }
1629 2622
1630 perf_rtt.offer.sent += 1; 2623 /* Prevent implementation from sending a offer multiple times in case of roll switch */
1631 perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); 2624 if (GNUNET_YES ==
2625 is_message_in_message_control_flow (
2626 op->message_control_flow,
2627 &ke->element->element_hash,
2628 OFFER_MESSAGE)
2629 )
2630 {
2631 LOG (GNUNET_ERROR_TYPE_DEBUG,
2632 "Skipping already sent processed element offer!\n");
2633 return GNUNET_YES;
2634 }
1632 2635
2636 /* Save send offer message for message control */
2637 if (GNUNET_YES !=
2638 update_message_control_flow (
2639 op->message_control_flow,
2640 MSG_CFS_SENT,
2641 &ke->element->element_hash,
2642 OFFER_MESSAGE)
2643 )
2644 {
2645 LOG (GNUNET_ERROR_TYPE_ERROR,
2646 "Double offer message sent found!\n");
2647 GNUNET_break (0);
2648 fail_union_operation (op);
2649 return GNUNET_NO;
2650 }
2651 ;
2652
2653 /* Mark element to be expected to received */
2654 if (GNUNET_YES !=
2655 update_message_control_flow (
2656 op->message_control_flow,
2657 MSG_CFS_EXPECTED,
2658 &ke->element->element_hash,
2659 DEMAND_MESSAGE)
2660 )
2661 {
2662 LOG (GNUNET_ERROR_TYPE_ERROR,
2663 "Double demand received found!\n");
2664 GNUNET_break (0);
2665 fail_union_operation (op);
2666 return GNUNET_NO;
2667 }
2668 ;
2669#if MEASURE_PERFORMANCE
2670 perf_store.offer.sent += 1;
2671 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2672#endif
1633 ev = GNUNET_MQ_msg_header_extra (mh, 2673 ev = GNUNET_MQ_msg_header_extra (mh,
1634 sizeof(struct GNUNET_HashCode), 2674 sizeof(struct GNUNET_HashCode),
1635 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); 2675 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
1636
1637 GNUNET_assert (NULL != ev); 2676 GNUNET_assert (NULL != ev);
1638 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; 2677 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1639 LOG (GNUNET_ERROR_TYPE_DEBUG, 2678 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1651,7 +2690,7 @@ send_offers_iterator (void *cls,
1651 * @param op union operation 2690 * @param op union operation
1652 * @param ibf_key IBF key of interest 2691 * @param ibf_key IBF key of interest
1653 */ 2692 */
1654static void 2693void
1655send_offers_for_key (struct Operation *op, 2694send_offers_for_key (struct Operation *op,
1656 struct IBF_Key ibf_key) 2695 struct IBF_Key ibf_key)
1657{ 2696{
@@ -1694,6 +2733,7 @@ decode_and_send (struct Operation *op)
1694 /* allocation failed */ 2733 /* allocation failed */
1695 return GNUNET_SYSERR; 2734 return GNUNET_SYSERR;
1696 } 2735 }
2736
1697 diff_ibf = ibf_dup (op->local_ibf); 2737 diff_ibf = ibf_dup (op->local_ibf);
1698 ibf_subtract (diff_ibf, 2738 ibf_subtract (diff_ibf,
1699 op->remote_ibf); 2739 op->remote_ibf);
@@ -1706,7 +2746,7 @@ decode_and_send (struct Operation *op)
1706 diff_ibf->size); 2746 diff_ibf->size);
1707 2747
1708 num_decoded = 0; 2748 num_decoded = 0;
1709 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ 2749 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1710 2750
1711 while (1) 2751 while (1)
1712 { 2752 {
@@ -1738,23 +2778,36 @@ decode_and_send (struct Operation *op)
1738 if ((GNUNET_SYSERR == res) || 2778 if ((GNUNET_SYSERR == res) ||
1739 (GNUNET_YES == cycle_detected)) 2779 (GNUNET_YES == cycle_detected))
1740 { 2780 {
1741 int next_order; 2781 uint32_t next_size;
1742 next_order = 0; 2782 /** Enforce odd ibf size **/
1743 while (1 << next_order < diff_ibf->size) 2783
1744 next_order++; 2784 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
1745 next_order++; 2785 diff_ibf->size);
1746 if (next_order <= MAX_IBF_ORDER) 2786 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2787 * Elias Summermatter (2021) in section 3.11 **/
2788 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2789
2790 if (next_size<ibf_min_size)
2791 next_size = ibf_min_size;
2792
2793
2794 if (next_size <= MAX_IBF_SIZE)
1747 { 2795 {
1748 LOG (GNUNET_ERROR_TYPE_DEBUG, 2796 LOG (GNUNET_ERROR_TYPE_DEBUG,
1749 "decoding failed, sending larger ibf (size %u)\n", 2797 "decoding failed, sending larger ibf (size %u)\n",
1750 1 << next_order); 2798 next_size);
1751 GNUNET_STATISTICS_update (_GSS_statistics, 2799 GNUNET_STATISTICS_update (_GSS_statistics,
1752 "# of IBF retries", 2800 "# of IBF retries",
1753 1, 2801 1,
1754 GNUNET_NO); 2802 GNUNET_NO);
1755 op->salt_send++; 2803#if MEASURE_PERFORMANCE
2804 perf_store.active_passive_switches += 1;
2805#endif
2806
2807 op->salt_send = op->salt_receive++;
2808
1756 if (GNUNET_OK != 2809 if (GNUNET_OK !=
1757 send_ibf (op, next_order)) 2810 send_ibf (op, next_size))
1758 { 2811 {
1759 /* Internal error, best we can do is shut the connection */ 2812 /* Internal error, best we can do is shut the connection */
1760 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2813 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1786,7 +2839,9 @@ decode_and_send (struct Operation *op)
1786 LOG (GNUNET_ERROR_TYPE_DEBUG, 2839 LOG (GNUNET_ERROR_TYPE_DEBUG,
1787 "transmitted all values, sending DONE\n"); 2840 "transmitted all values, sending DONE\n");
1788 2841
1789 perf_rtt.done.sent += 1; 2842#if MEASURE_PERFORMANCE
2843 perf_store.done.sent += 1;
2844#endif
1790 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 2845 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1791 GNUNET_MQ_send (op->mq, ev); 2846 GNUNET_MQ_send (op->mq, ev);
1792 /* We now wait until we get a DONE message back 2847 /* We now wait until we get a DONE message back
@@ -1797,7 +2852,6 @@ decode_and_send (struct Operation *op)
1797 if (1 == side) 2852 if (1 == side)
1798 { 2853 {
1799 struct IBF_Key unsalted_key; 2854 struct IBF_Key unsalted_key;
1800
1801 unsalt_key (&key, 2855 unsalt_key (&key,
1802 op->salt_receive, 2856 op->salt_receive,
1803 &unsalted_key); 2857 &unsalted_key);
@@ -1809,8 +2863,29 @@ decode_and_send (struct Operation *op)
1809 struct GNUNET_MQ_Envelope *ev; 2863 struct GNUNET_MQ_Envelope *ev;
1810 struct InquiryMessage *msg; 2864 struct InquiryMessage *msg;
1811 2865
1812 perf_rtt.inquery.sent += 1; 2866#if MEASURE_PERFORMANCE
1813 perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key); 2867 perf_store.inquery.sent += 1;
2868 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2869#endif
2870
2871 /** Add sent inquiries to hashmap for flow control **/
2872 struct GNUNET_HashContext *hashed_key_context =
2873 GNUNET_CRYPTO_hash_context_start ();
2874 struct GNUNET_HashCode *hashed_key = (struct
2875 GNUNET_HashCode*) GNUNET_malloc (
2876 sizeof(struct GNUNET_HashCode));
2877 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT;
2878 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2879 &key,
2880 sizeof(struct IBF_Key));
2881 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2882 hashed_key);
2883 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2884 hashed_key,
2885 &mcfs,
2886 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
2887 );
2888
1814 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth 2889 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1815 * the effort additional complexity. */ 2890 * the effort additional complexity. */
1816 ev = GNUNET_MQ_msg_extra (msg, 2891 ev = GNUNET_MQ_msg_extra (msg,
@@ -1836,6 +2911,100 @@ decode_and_send (struct Operation *op)
1836 2911
1837 2912
1838/** 2913/**
2914 * Check send full message received from other peer
2915 * @param cls
2916 * @param msg
2917 * @return
2918 */
2919
2920static int
2921check_union_p2p_send_full (void *cls,
2922 const struct TransmitFullMessage *msg)
2923{
2924 return GNUNET_OK;
2925}
2926
2927
2928/**
2929 * Handle send full message received from other peer
2930 *
2931 * @param cls
2932 * @param msg
2933 */
2934static void
2935handle_union_p2p_send_full (void *cls,
2936 const struct TransmitFullMessage *msg)
2937{
2938 struct Operation *op = cls;
2939
2940 /**
2941 * Check that the message is received only in supported phase
2942 */
2943 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2944 if (GNUNET_OK !=
2945 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2946 {
2947 GNUNET_break (0);
2948 fail_union_operation (op);
2949 return;
2950 }
2951
2952 /** write received values to operator**/
2953 op->remote_element_count = ntohl (msg->remote_set_size);
2954 op->remote_set_diff = ntohl (msg->remote_set_difference);
2955 op->local_set_diff = ntohl (msg->local_set_difference);
2956
2957 /** Check byzantine limits **/
2958 if (check_byzantine_bounds (op) != GNUNET_OK)
2959 {
2960 LOG (GNUNET_ERROR_TYPE_ERROR,
2961 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2962 "criteria\n");
2963 GNUNET_break_op (0);
2964 fail_union_operation (op);
2965 return;
2966 }
2967
2968 /** Calculate avg element size if not initial sync **/
2969 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2970 op->set->content->elements);
2971 uint64_t avg_element_size = 0;
2972 if (0 < op->local_element_count)
2973 {
2974 op->total_elements_size_local = 0;
2975 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2976 &
2977 determinate_avg_element_size_iterator,
2978 op);
2979 avg_element_size = op->total_elements_size_local / op->local_element_count;
2980 }
2981
2982 /** Validate mode of operation **/
2983 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2984 op->
2985 remote_element_count,
2986 op->
2987 local_element_count,
2988 op->local_set_diff,
2989 op->remote_set_diff,
2990 op->
2991 rtt_bandwidth_tradeoff,
2992 op->
2993 ibf_bucket_number_factor);
2994 if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation)
2995 {
2996 LOG (GNUNET_ERROR_TYPE_ERROR,
2997 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
2998 " : %d\n", mode_of_operation);
2999 GNUNET_break_op (0);
3000 fail_union_operation (op);
3001 return;
3002 }
3003 op->phase = PHASE_FULL_RECEIVING;
3004}
3005
3006
3007/**
1839 * Check an IBF message from a remote peer. 3008 * Check an IBF message from a remote peer.
1840 * 3009 *
1841 * Reassemble the IBF from multiple pieces, and 3010 * Reassemble the IBF from multiple pieces, and
@@ -1872,7 +3041,8 @@ check_union_p2p_ibf (void *cls,
1872 GNUNET_break_op (0); 3041 GNUNET_break_op (0);
1873 return GNUNET_SYSERR; 3042 return GNUNET_SYSERR;
1874 } 3043 }
1875 if (1 << msg->order != op->remote_ibf->size) 3044
3045 if (msg->ibf_size != op->remote_ibf->size)
1876 { 3046 {
1877 GNUNET_break_op (0); 3047 GNUNET_break_op (0);
1878 return GNUNET_SYSERR; 3048 return GNUNET_SYSERR;
@@ -1909,9 +3079,26 @@ handle_union_p2p_ibf (void *cls,
1909{ 3079{
1910 struct Operation *op = cls; 3080 struct Operation *op = cls;
1911 unsigned int buckets_in_message; 3081 unsigned int buckets_in_message;
3082 /**
3083 * Check that the message is received only in supported phase
3084 */
3085 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3086 PHASE_PASSIVE_DECODING};
3087 if (GNUNET_OK !=
3088 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3089 {
3090 GNUNET_break (0);
3091 fail_union_operation (op);
3092 return;
3093 }
3094 op->differential_sync_iterations++;
3095 check_max_differential_rounds (op);
3096 op->active_passive_switch_required = false;
1912 3097
1913 perf_rtt.ibf.received += 1; 3098#if MEASURE_PERFORMANCE
1914 perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); 3099 perf_store.ibf.received += 1;
3100 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3101#endif
1915 3102
1916 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) 3103 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1917 / IBF_BUCKET_SIZE; 3104 / IBF_BUCKET_SIZE;
@@ -1922,8 +3109,10 @@ handle_union_p2p_ibf (void *cls,
1922 GNUNET_assert (NULL == op->remote_ibf); 3109 GNUNET_assert (NULL == op->remote_ibf);
1923 LOG (GNUNET_ERROR_TYPE_DEBUG, 3110 LOG (GNUNET_ERROR_TYPE_DEBUG,
1924 "Creating new ibf of size %u\n", 3111 "Creating new ibf of size %u\n",
1925 1 << msg->order); 3112 ntohl (msg->ibf_size));
1926 op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); 3113 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3114 op->remote_ibf = ibf_create (msg->ibf_size,
3115 ((uint8_t) op->ibf_number_buckets_per_element));
1927 op->salt_receive = ntohl (msg->salt); 3116 op->salt_receive = ntohl (msg->salt);
1928 LOG (GNUNET_ERROR_TYPE_DEBUG, 3117 LOG (GNUNET_ERROR_TYPE_DEBUG,
1929 "Receiving new IBF with salt %u\n", 3118 "Receiving new IBF with salt %u\n",
@@ -1954,7 +3143,7 @@ handle_union_p2p_ibf (void *cls,
1954 ibf_read_slice (&msg[1], 3143 ibf_read_slice (&msg[1],
1955 op->ibf_buckets_received, 3144 op->ibf_buckets_received,
1956 buckets_in_message, 3145 buckets_in_message,
1957 op->remote_ibf); 3146 op->remote_ibf, msg->ibf_counter_bit_length);
1958 op->ibf_buckets_received += buckets_in_message; 3147 op->ibf_buckets_received += buckets_in_message;
1959 3148
1960 if (op->ibf_buckets_received == op->remote_ibf->size) 3149 if (op->ibf_buckets_received == op->remote_ibf->size)
@@ -2030,18 +3219,24 @@ maybe_finish (struct Operation *op)
2030 3219
2031 num_demanded = GNUNET_CONTAINER_multihashmap_size ( 3220 num_demanded = GNUNET_CONTAINER_multihashmap_size (
2032 op->demanded_hashes); 3221 op->demanded_hashes);
2033 3222 int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3223 op->message_control_flow,
3224 &
3225 determinate_done_message_iterator,
3226 op);
2034 if (PHASE_FINISH_WAITING == op->phase) 3227 if (PHASE_FINISH_WAITING == op->phase)
2035 { 3228 {
2036 LOG (GNUNET_ERROR_TYPE_DEBUG, 3229 LOG (GNUNET_ERROR_TYPE_DEBUG,
2037 "In PHASE_FINISH_WAITING, pending %u demands\n", 3230 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
2038 num_demanded); 3231 num_demanded, op->peer_site);
2039 if (0 == num_demanded) 3232 if (-1 != send_done)
2040 { 3233 {
2041 struct GNUNET_MQ_Envelope *ev; 3234 struct GNUNET_MQ_Envelope *ev;
2042 3235
2043 op->phase = PHASE_FINISHED; 3236 op->phase = PHASE_FINISHED;
2044 perf_rtt.done.sent += 1; 3237#if MEASURE_PERFORMANCE
3238 perf_store.done.sent += 1;
3239#endif
2045 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 3240 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
2046 GNUNET_MQ_send (op->mq, 3241 GNUNET_MQ_send (op->mq,
2047 ev); 3242 ev);
@@ -2052,9 +3247,9 @@ maybe_finish (struct Operation *op)
2052 if (PHASE_FINISH_CLOSING == op->phase) 3247 if (PHASE_FINISH_CLOSING == op->phase)
2053 { 3248 {
2054 LOG (GNUNET_ERROR_TYPE_DEBUG, 3249 LOG (GNUNET_ERROR_TYPE_DEBUG,
2055 "In PHASE_FINISH_CLOSING, pending %u demands\n", 3250 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
2056 num_demanded); 3251 num_demanded, op->peer_site);
2057 if (0 == num_demanded) 3252 if (-1 != send_done)
2058 { 3253 {
2059 op->phase = PHASE_FINISHED; 3254 op->phase = PHASE_FINISHED;
2060 send_client_done (op); 3255 send_client_done (op);
@@ -2102,11 +3297,25 @@ handle_union_p2p_elements (void *cls,
2102 struct KeyEntry *ke; 3297 struct KeyEntry *ke;
2103 uint16_t element_size; 3298 uint16_t element_size;
2104 3299
3300 /**
3301 * Check that the message is received only in supported phase
3302 */
3303 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3304 PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING};
3305 if (GNUNET_OK !=
3306 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3307 {
3308 GNUNET_break (0);
3309 fail_union_operation (op);
3310 return;
3311 }
2105 3312
2106 element_size = ntohs (emsg->header.size) - sizeof(struct 3313 element_size = ntohs (emsg->header.size) - sizeof(struct
2107 GNUNET_SETU_ElementMessage); 3314 GNUNET_SETU_ElementMessage);
2108 perf_rtt.element.received += 1; 3315#if MEASURE_PERFORMANCE
2109 perf_rtt.element.received_var_bytes += element_size; 3316 perf_store.element.received += 1;
3317 perf_store.element.received_var_bytes += element_size;
3318#endif
2110 3319
2111 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3320 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2112 GNUNET_memcpy (&ee[1], 3321 GNUNET_memcpy (&ee[1],
@@ -2129,6 +3338,21 @@ handle_union_p2p_elements (void *cls,
2129 return; 3338 return;
2130 } 3339 }
2131 3340
3341 if (GNUNET_OK !=
3342 update_message_control_flow (
3343 op->message_control_flow,
3344 MSG_CFS_RECEIVED,
3345 &ee->element_hash,
3346 ELEMENT_MESSAGE)
3347 )
3348 {
3349 LOG (GNUNET_ERROR_TYPE_ERROR,
3350 "An element has been received more than once!\n");
3351 GNUNET_break (0);
3352 fail_union_operation (op);
3353 return;
3354 }
3355
2132 LOG (GNUNET_ERROR_TYPE_DEBUG, 3356 LOG (GNUNET_ERROR_TYPE_DEBUG,
2133 "Got element (size %u, hash %s) from peer\n", 3357 "Got element (size %u, hash %s) from peer\n",
2134 (unsigned int) element_size, 3358 (unsigned int) element_size,
@@ -2217,33 +3441,25 @@ handle_union_p2p_full_element (void *cls,
2217 struct KeyEntry *ke; 3441 struct KeyEntry *ke;
2218 uint16_t element_size; 3442 uint16_t element_size;
2219 3443
2220 3444 /**
2221 3445 * Check that the message is received only in supported phase
2222 if(PHASE_EXPECT_IBF == op->phase) { 3446 */
2223 op->phase = PHASE_FULL_RECEIVING; 3447 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
2224 } 3448 if (GNUNET_OK !=
2225 3449 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2226
2227
2228 /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */
2229 if ((PHASE_FULL_RECEIVING != op->phase) &&
2230 (PHASE_FULL_SENDING != op->phase))
2231 { 3450 {
2232 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 3451 GNUNET_break (0);
2233 "Handle full element phase is %u\n", 3452 fail_union_operation (op);
2234 (unsigned) op->phase); 3453 return;
2235 GNUNET_break_op (0);
2236 fail_union_operation (op);
2237 return;
2238 } 3454 }
2239 3455
2240
2241
2242 element_size = ntohs (emsg->header.size) 3456 element_size = ntohs (emsg->header.size)
2243 - sizeof(struct GNUNET_SETU_ElementMessage); 3457 - sizeof(struct GNUNET_SETU_ElementMessage);
2244 3458
2245 perf_rtt.element_full.received += 1; 3459#if MEASURE_PERFORMANCE
2246 perf_rtt.element_full.received_var_bytes += element_size; 3460 perf_store.element_full.received += 1;
3461 perf_store.element_full.received_var_bytes += element_size;
3462#endif
2247 3463
2248 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3464 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2249 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 3465 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
@@ -2268,17 +3484,15 @@ handle_union_p2p_full_element (void *cls,
2268 GNUNET_NO); 3484 GNUNET_NO);
2269 3485
2270 op->received_total++; 3486 op->received_total++;
2271
2272 ke = op_get_element (op, 3487 ke = op_get_element (op,
2273 &ee->element_hash); 3488 &ee->element_hash);
2274 if (NULL != ke) 3489 if (NULL != ke)
2275 { 3490 {
2276 /* Got repeated element. Should not happen since
2277 * we track demands. */
2278 GNUNET_STATISTICS_update (_GSS_statistics, 3491 GNUNET_STATISTICS_update (_GSS_statistics,
2279 "# repeated elements", 3492 "# repeated elements",
2280 1, 3493 1,
2281 GNUNET_NO); 3494 GNUNET_NO);
3495 full_sync_plausibility_check (op);
2282 ke->received = GNUNET_YES; 3496 ke->received = GNUNET_YES;
2283 GNUNET_free (ee); 3497 GNUNET_free (ee);
2284 } 3498 }
@@ -2294,15 +3508,15 @@ handle_union_p2p_full_element (void *cls,
2294 GNUNET_SETU_STATUS_ADD_LOCAL); 3508 GNUNET_SETU_STATUS_ADD_LOCAL);
2295 } 3509 }
2296 3510
3511
2297 if ((GNUNET_YES == op->byzantine) && 3512 if ((GNUNET_YES == op->byzantine) &&
2298 (op->received_total > 384 + op->received_fresh * 4) && 3513 (op->received_total > op->remote_element_count) )
2299 (op->received_fresh < op->received_total / 6))
2300 { 3514 {
2301 /* The other peer gave us lots of old elements, there's something wrong. */ 3515 /* The other peer gave us lots of old elements, there's something wrong. */
2302 LOG (GNUNET_ERROR_TYPE_ERROR, 3516 LOG (GNUNET_ERROR_TYPE_ERROR,
2303 "Other peer sent only %llu/%llu fresh elements, failing operation\n", 3517 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
2304 (unsigned long long) op->received_fresh, 3518 (unsigned long long) op->received_total,
2305 (unsigned long long) op->received_total); 3519 (unsigned long long) op->remote_element_count);
2306 GNUNET_break_op (0); 3520 GNUNET_break_op (0);
2307 fail_union_operation (op); 3521 fail_union_operation (op);
2308 return; 3522 return;
@@ -2356,18 +3570,50 @@ handle_union_p2p_inquiry (void *cls,
2356 const struct IBF_Key *ibf_key; 3570 const struct IBF_Key *ibf_key;
2357 unsigned int num_keys; 3571 unsigned int num_keys;
2358 3572
2359 perf_rtt.inquery.received += 1; 3573 /**
2360 perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage)); 3574 * Check that the message is received only in supported phase
3575 */
3576 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3577 if (GNUNET_OK !=
3578 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3579 {
3580 GNUNET_break (0);
3581 fail_union_operation (op);
3582 return;
3583 }
3584
3585#if MEASURE_PERFORMANCE
3586 perf_store.inquery.received += 1;
3587 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3588 - sizeof(struct InquiryMessage));
3589#endif
2361 3590
2362 LOG (GNUNET_ERROR_TYPE_DEBUG, 3591 LOG (GNUNET_ERROR_TYPE_DEBUG,
2363 "Received union inquiry\n"); 3592 "Received union inquiry\n");
2364 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) 3593 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2365 / sizeof(struct IBF_Key); 3594 / sizeof(struct IBF_Key);
2366 ibf_key = (const struct IBF_Key *) &msg[1]; 3595 ibf_key = (const struct IBF_Key *) &msg[1];
3596
3597 /** Add received inquiries to hashmap for flow control **/
3598 struct GNUNET_HashContext *hashed_key_context =
3599 GNUNET_CRYPTO_hash_context_start ();
3600 struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3601 sizeof(struct GNUNET_HashCode));;
3602 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED;
3603 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3604 &ibf_key,
3605 sizeof(struct IBF_Key));
3606 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3607 hashed_key);
3608 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3609 hashed_key,
3610 &mcfs,
3611 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
3612 );
3613
2367 while (0 != num_keys--) 3614 while (0 != num_keys--)
2368 { 3615 {
2369 struct IBF_Key unsalted_key; 3616 struct IBF_Key unsalted_key;
2370
2371 unsalt_key (ibf_key, 3617 unsalt_key (ibf_key,
2372 ntohl (msg->salt), 3618 ntohl (msg->salt),
2373 &unsalted_key); 3619 &unsalted_key);
@@ -2402,7 +3648,9 @@ send_missing_full_elements_iter (void *cls,
2402 3648
2403 if (GNUNET_YES == ke->received) 3649 if (GNUNET_YES == ke->received)
2404 return GNUNET_YES; 3650 return GNUNET_YES;
2405 perf_rtt.element_full.received += 1; 3651#if MEASURE_PERFORMANCE
3652 perf_store.element_full.received += 1;
3653#endif
2406 ev = GNUNET_MQ_msg_extra (emsg, 3654 ev = GNUNET_MQ_msg_extra (emsg,
2407 ee->element.size, 3655 ee->element.size,
2408 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 3656 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -2422,18 +3670,84 @@ send_missing_full_elements_iter (void *cls,
2422 * @param cls closure, a set union operation 3670 * @param cls closure, a set union operation
2423 * @param mh the demand message 3671 * @param mh the demand message
2424 */ 3672 */
3673static int
3674check_union_p2p_request_full (void *cls,
3675 const struct TransmitFullMessage *mh)
3676{
3677 return GNUNET_OK;
3678}
3679
3680
2425static void 3681static void
2426handle_union_p2p_request_full (void *cls, 3682handle_union_p2p_request_full (void *cls,
2427 const struct GNUNET_MessageHeader *mh) 3683 const struct TransmitFullMessage *msg)
2428{ 3684{
2429 struct Operation *op = cls; 3685 struct Operation *op = cls;
2430 3686
2431 perf_rtt.request_full.received += 1; 3687 /**
3688 * Check that the message is received only in supported phase
3689 */
3690 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3691 if (GNUNET_OK !=
3692 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3693 {
3694 GNUNET_break (0);
3695 fail_union_operation (op);
3696 return;
3697 }
2432 3698
2433 LOG (GNUNET_ERROR_TYPE_DEBUG, 3699 op->remote_element_count = ntohl (msg->remote_set_size);
3700 op->remote_set_diff = ntohl (msg->remote_set_difference);
3701 op->local_set_diff = ntohl (msg->local_set_difference);
3702
3703
3704 if (check_byzantine_bounds (op) != GNUNET_OK)
3705 {
3706 LOG (GNUNET_ERROR_TYPE_ERROR,
3707 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3708 "criteria\n");
3709 GNUNET_break_op (0);
3710 fail_union_operation (op);
3711 return;
3712 }
3713
3714#if MEASURE_PERFORMANCE
3715 perf_store.request_full.received += 1;
3716#endif
3717
3718 LOG (GNUNET_ERROR_TYPE_DEBUG,
2434 "Received request for full set transmission\n"); 3719 "Received request for full set transmission\n");
2435 if (PHASE_EXPECT_IBF != op->phase) 3720
3721 /** Calculate avg element size if not initial sync **/
3722 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3723 op->set->content->elements);
3724 uint64_t avg_element_size = 0;
3725 if (0 < op->local_element_count)
3726 {
3727 op->total_elements_size_local = 0;
3728 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3729 &
3730 determinate_avg_element_size_iterator,
3731 op);
3732 avg_element_size = op->total_elements_size_local / op->local_element_count;
3733 }
3734
3735 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3736 op->
3737 remote_element_count,
3738 op->
3739 local_element_count,
3740 op->local_set_diff,
3741 op->remote_set_diff,
3742 op->
3743 rtt_bandwidth_tradeoff,
3744 op->
3745 ibf_bucket_number_factor);
3746 if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation)
2436 { 3747 {
3748 LOG (GNUNET_ERROR_TYPE_ERROR,
3749 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3750 " : %d\n", mode_of_operation);
2437 GNUNET_break_op (0); 3751 GNUNET_break_op (0);
2438 fail_union_operation (op); 3752 fail_union_operation (op);
2439 return; 3753 return;
@@ -2458,7 +3772,21 @@ handle_union_p2p_full_done (void *cls,
2458{ 3772{
2459 struct Operation *op = cls; 3773 struct Operation *op = cls;
2460 3774
2461 perf_rtt.full_done.received += 1; 3775 /**
3776 * Check that the message is received only in supported phase
3777 */
3778 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3779 if (GNUNET_OK !=
3780 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3781 {
3782 GNUNET_break (0);
3783 fail_union_operation (op);
3784 return;
3785 }
3786
3787#if MEASURE_PERFORMANCE
3788 perf_store.full_done.received += 1;
3789#endif
2462 3790
2463 switch (op->phase) 3791 switch (op->phase)
2464 { 3792 {
@@ -2466,6 +3794,19 @@ handle_union_p2p_full_done (void *cls,
2466 { 3794 {
2467 struct GNUNET_MQ_Envelope *ev; 3795 struct GNUNET_MQ_Envelope *ev;
2468 3796
3797 if ((GNUNET_YES == op->byzantine) &&
3798 (op->received_total != op->remote_element_count) )
3799 {
3800 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3801 LOG (GNUNET_ERROR_TYPE_ERROR,
3802 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3803 (unsigned long long) op->received_total,
3804 (unsigned long long) op->remote_element_count);
3805 GNUNET_break_op (0);
3806 fail_union_operation (op);
3807 return;
3808 }
3809
2469 LOG (GNUNET_ERROR_TYPE_DEBUG, 3810 LOG (GNUNET_ERROR_TYPE_DEBUG,
2470 "got FULL DONE, sending elements that other peer is missing\n"); 3811 "got FULL DONE, sending elements that other peer is missing\n");
2471 3812
@@ -2473,7 +3814,9 @@ handle_union_p2p_full_done (void *cls,
2473 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, 3814 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
2474 &send_missing_full_elements_iter, 3815 &send_missing_full_elements_iter,
2475 op); 3816 op);
2476 perf_rtt.full_done.sent += 1; 3817#if MEASURE_PERFORMANCE
3818 perf_store.full_done.sent += 1;
3819#endif
2477 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 3820 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
2478 GNUNET_MQ_send (op->mq, 3821 GNUNET_MQ_send (op->mq,
2479 ev); 3822 ev);
@@ -2552,8 +3895,23 @@ handle_union_p2p_demand (void *cls,
2552 unsigned int num_hashes; 3895 unsigned int num_hashes;
2553 struct GNUNET_MQ_Envelope *ev; 3896 struct GNUNET_MQ_Envelope *ev;
2554 3897
2555 perf_rtt.demand.received += 1; 3898 /**
2556 perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 3899 * Check that the message is received only in supported phase
3900 */
3901 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3902 PHASE_FINISH_WAITING};
3903 if (GNUNET_OK !=
3904 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3905 {
3906 GNUNET_break (0);
3907 fail_union_operation (op);
3908 return;
3909 }
3910#if MEASURE_PERFORMANCE
3911 perf_store.demand.received += 1;
3912 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3913 GNUNET_MessageHeader));
3914#endif
2557 3915
2558 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 3916 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2559 / sizeof(struct GNUNET_HashCode); 3917 / sizeof(struct GNUNET_HashCode);
@@ -2570,6 +3928,39 @@ handle_union_p2p_demand (void *cls,
2570 fail_union_operation (op); 3928 fail_union_operation (op);
2571 return; 3929 return;
2572 } 3930 }
3931
3932 /* Save send demand message for message control */
3933 if (GNUNET_YES !=
3934 update_message_control_flow (
3935 op->message_control_flow,
3936 MSG_CFS_RECEIVED,
3937 &ee->element_hash,
3938 DEMAND_MESSAGE)
3939 )
3940 {
3941 LOG (GNUNET_ERROR_TYPE_ERROR,
3942 "Double demand message received found!\n");
3943 GNUNET_break (0);
3944 fail_union_operation (op);
3945 return;
3946 }
3947 ;
3948
3949 /* Mark element to be expected to received */
3950 if (GNUNET_YES !=
3951 update_message_control_flow (
3952 op->message_control_flow,
3953 MSG_CFS_SENT,
3954 &ee->element_hash,
3955 ELEMENT_MESSAGE)
3956 )
3957 {
3958 LOG (GNUNET_ERROR_TYPE_ERROR,
3959 "Double element message sent found!\n");
3960 GNUNET_break (0);
3961 fail_union_operation (op);
3962 return;
3963 }
2573 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) 3964 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2574 { 3965 {
2575 /* Probably confused lazily copied sets. */ 3966 /* Probably confused lazily copied sets. */
@@ -2577,8 +3968,10 @@ handle_union_p2p_demand (void *cls,
2577 fail_union_operation (op); 3968 fail_union_operation (op);
2578 return; 3969 return;
2579 } 3970 }
2580 perf_rtt.element.sent += 1; 3971#if MEASURE_PERFORMANCE
2581 perf_rtt.element.sent_var_bytes += ee->element.size; 3972 perf_store.element.sent += 1;
3973 perf_store.element.sent_var_bytes += ee->element.size;
3974#endif
2582 ev = GNUNET_MQ_msg_extra (emsg, 3975 ev = GNUNET_MQ_msg_extra (emsg,
2583 ee->element.size, 3976 ee->element.size,
2584 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); 3977 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
@@ -2600,9 +3993,10 @@ handle_union_p2p_demand (void *cls,
2600 if (op->symmetric) 3993 if (op->symmetric)
2601 send_client_element (op, 3994 send_client_element (op,
2602 &ee->element, 3995 &ee->element,
2603 GNUNET_SET_STATUS_ADD_REMOTE); 3996 GNUNET_SETU_STATUS_ADD_REMOTE);
2604 } 3997 }
2605 GNUNET_CADET_receive_done (op->channel); 3998 GNUNET_CADET_receive_done (op->channel);
3999 maybe_finish (op);
2606} 4000}
2607 4001
2608 4002
@@ -2653,9 +4047,23 @@ handle_union_p2p_offer (void *cls,
2653 struct Operation *op = cls; 4047 struct Operation *op = cls;
2654 const struct GNUNET_HashCode *hash; 4048 const struct GNUNET_HashCode *hash;
2655 unsigned int num_hashes; 4049 unsigned int num_hashes;
4050 /**
4051 * Check that the message is received only in supported phase
4052 */
4053 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4054 if (GNUNET_OK !=
4055 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4056 {
4057 GNUNET_break (0);
4058 fail_union_operation (op);
4059 return;
4060 }
2656 4061
2657 perf_rtt.offer.received += 1; 4062#if MEASURE_PERFORMANCE
2658 perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 4063 perf_store.offer.received += 1;
4064 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4065 GNUNET_MessageHeader));
4066#endif
2659 4067
2660 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 4068 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2661 / sizeof(struct GNUNET_HashCode); 4069 / sizeof(struct GNUNET_HashCode);
@@ -2693,11 +4101,68 @@ handle_union_p2p_offer (void *cls,
2693 "[OP %p] Requesting element (hash %s)\n", 4101 "[OP %p] Requesting element (hash %s)\n",
2694 op, GNUNET_h2s (hash)); 4102 op, GNUNET_h2s (hash));
2695 4103
2696 perf_rtt.demand.sent += 1; 4104#if MEASURE_PERFORMANCE
2697 perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); 4105 perf_store.demand.sent += 1;
4106 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4107#endif
2698 ev = GNUNET_MQ_msg_header_extra (demands, 4108 ev = GNUNET_MQ_msg_header_extra (demands,
2699 sizeof(struct GNUNET_HashCode), 4109 sizeof(struct GNUNET_HashCode),
2700 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); 4110 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
4111 /* Save send demand message for message control */
4112 if (GNUNET_YES !=
4113 update_message_control_flow (
4114 op->message_control_flow,
4115 MSG_CFS_SENT,
4116 hash,
4117 DEMAND_MESSAGE)
4118 )
4119 {
4120 // GNUNET_free (ev);
4121 LOG (GNUNET_ERROR_TYPE_ERROR,
4122 "Double demand message sent found!\n");
4123 GNUNET_break (0);
4124 fail_union_operation (op);
4125 return;
4126 }
4127 ;
4128
4129 /* Mark offer as received received */
4130 if (GNUNET_YES !=
4131 update_message_control_flow (
4132 op->message_control_flow,
4133 MSG_CFS_RECEIVED,
4134 hash,
4135 OFFER_MESSAGE)
4136 )
4137 {
4138 // GNUNET_free (ev);
4139 LOG (GNUNET_ERROR_TYPE_ERROR,
4140 "Double offer message received found!\n");
4141 GNUNET_break (0);
4142 fail_union_operation (op);
4143 return;
4144 }
4145 ;
4146
4147 /* Mark element to be expected to received */
4148 if (GNUNET_YES !=
4149 update_message_control_flow (
4150 op->message_control_flow,
4151 MSG_CFS_EXPECTED,
4152 hash,
4153 ELEMENT_MESSAGE)
4154 )
4155 {
4156 // GNUNET_free (ev);
4157 LOG (GNUNET_ERROR_TYPE_ERROR,
4158 "Element already expected!\n");
4159 GNUNET_break (0);
4160 fail_union_operation (op);
4161 return;
4162 }
4163 ;
4164
4165
2701 GNUNET_memcpy (&demands[1], 4166 GNUNET_memcpy (&demands[1],
2702 hash, 4167 hash,
2703 sizeof(struct GNUNET_HashCode)); 4168 sizeof(struct GNUNET_HashCode));
@@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls,
2719{ 4184{
2720 struct Operation *op = cls; 4185 struct Operation *op = cls;
2721 4186
2722 perf_rtt.done.received += 1; 4187 /**
4188 * Check that the message is received only in supported phase
4189 */
4190 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4191 if (GNUNET_OK !=
4192 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4193 {
4194 GNUNET_break (0);
4195 fail_union_operation (op);
4196 return;
4197 }
4198
4199 if (op->active_passive_switch_required)
4200 {
4201 LOG (GNUNET_ERROR_TYPE_ERROR,
4202 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4203 GNUNET_break (0);
4204 fail_union_operation (op);
4205 return;
4206 }
4207
4208#if MEASURE_PERFORMANCE
4209 perf_store.done.received += 1;
4210#endif
2723 switch (op->phase) 4211 switch (op->phase)
2724 { 4212 {
2725 case PHASE_PASSIVE_DECODING: 4213 case PHASE_PASSIVE_DECODING:
@@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls,
2728 LOG (GNUNET_ERROR_TYPE_DEBUG, 4216 LOG (GNUNET_ERROR_TYPE_DEBUG,
2729 "got DONE (as passive partner), waiting for our demands to be satisfied\n"); 4217 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2730 /* The active peer is done sending offers 4218 /* The active peer is done sending offers
2731 * and inquiries. This means that all 4219 * and inquiries. This means that all
2732 * our responses to that (demands and offers) 4220 * our responses to that (demands and offers)
2733 * must be in flight (queued or in mesh). 4221 * must be in flight (queued or in mesh).
2734 * 4222 *
2735 * We should notify the active peer once 4223 * We should notify the active peer once
2736 * all our demands are satisfied, so that the active 4224 * all our demands are satisfied, so that the active
2737 * peer can quit if we gave it everything. 4225 * peer can quit if we gave it everything.
2738 */GNUNET_CADET_receive_done (op->channel); 4226 */GNUNET_CADET_receive_done (op->channel);
2739 maybe_finish (op); 4227 maybe_finish (op);
2740 return; 4228 return;
2741 case PHASE_ACTIVE_DECODING: 4229 case PHASE_ACTIVE_DECODING:
2742 LOG (GNUNET_ERROR_TYPE_DEBUG, 4230 LOG (GNUNET_ERROR_TYPE_DEBUG,
2743 "got DONE (as active partner), waiting to finish\n"); 4231 "got DONE (as active partner), waiting to finish\n");
2744 /* All demands of the other peer are satisfied, 4232 /* All demands of the other peer are satisfied,
2745 * and we processed all offers, thus we know 4233 * and we processed all offers, thus we know
2746 * exactly what our demands must be. 4234 * exactly what our demands must be.
2747 * 4235 *
2748 * We'll close the channel 4236 * We'll close the channel
2749 * to the other peer once our demands are met. 4237 * to the other peer once our demands are met.
2750 */op->phase = PHASE_FINISH_CLOSING; 4238 */op->phase = PHASE_FINISH_CLOSING;
2751 GNUNET_CADET_receive_done (op->channel); 4239 GNUNET_CADET_receive_done (op->channel);
2752 maybe_finish (op); 4240 maybe_finish (op);
2753 return; 4241 return;
@@ -2769,7 +4257,9 @@ static void
2769handle_union_p2p_over (void *cls, 4257handle_union_p2p_over (void *cls,
2770 const struct GNUNET_MessageHeader *mh) 4258 const struct GNUNET_MessageHeader *mh)
2771{ 4259{
2772 perf_rtt.over.received += 1; 4260#if MEASURE_PERFORMANCE
4261 perf_store.over.received += 1;
4262#endif
2773 send_client_done (cls); 4263 send_client_done (cls);
2774} 4264}
2775 4265
@@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls,
2943 struct Listener *listener = op->listener; 4433 struct Listener *listener = op->listener;
2944 const struct GNUNET_MessageHeader *nested_context; 4434 const struct GNUNET_MessageHeader *nested_context;
2945 4435
2946 /* double operation request */ 4436 /* double operation request */
2947 if (0 != op->suggest_id) 4437 if (0 != op->suggest_id)
2948 { 4438 {
2949 GNUNET_break_op (0); 4439 GNUNET_break_op (0);
@@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls,
3053 } 4543 }
3054 set = GNUNET_new (struct Set); 4544 set = GNUNET_new (struct Set);
3055 { 4545 {
3056 struct StrataEstimator *se; 4546 struct MultiStrataEstimator *se;
3057 4547
3058 se = strata_estimator_create (SE_STRATA_COUNT, 4548 se = strata_estimator_create (SE_STRATA_COUNT,
3059 SE_IBF_SIZE, 4549 SE_IBFS_TOTAL_SIZE,
3060 SE_IBF_HASH_NUM); 4550 SE_IBF_HASH_NUM);
3061 if (NULL == se) 4551 if (NULL == se)
3062 { 4552 {
@@ -3198,6 +4688,7 @@ channel_window_cb (void *cls,
3198 * @param cls client that sent the message 4688 * @param cls client that sent the message
3199 * @param msg message sent by the client 4689 * @param msg message sent by the client
3200 */ 4690 */
4691
3201static void 4692static void
3202handle_client_listen (void *cls, 4693handle_client_listen (void *cls,
3203 const struct GNUNET_SETU_ListenMessage *msg) 4694 const struct GNUNET_SETU_ListenMessage *msg)
@@ -3240,10 +4731,10 @@ handle_client_listen (void *cls,
3240 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4731 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3241 struct GNUNET_MessageHeader, 4732 struct GNUNET_MessageHeader,
3242 NULL), 4733 NULL),
3243 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4734 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3244 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4735 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3245 struct GNUNET_MessageHeader, 4736 struct TransmitFullMessage,
3246 NULL), 4737 NULL),
3247 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4738 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3248 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4739 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3249 struct StrataEstimatorMessage, 4740 struct StrataEstimatorMessage,
@@ -3256,6 +4747,10 @@ handle_client_listen (void *cls,
3256 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 4747 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3257 struct GNUNET_SETU_ElementMessage, 4748 struct GNUNET_SETU_ElementMessage,
3258 NULL), 4749 NULL),
4750 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4751 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
4752 struct TransmitFullMessage,
4753 NULL),
3259 GNUNET_MQ_handler_end () 4754 GNUNET_MQ_handler_end ()
3260 }; 4755 };
3261 struct Listener *listener; 4756 struct Listener *listener;
@@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls,
3451{ 4946{
3452 struct ClientState *cs = cls; 4947 struct ClientState *cs = cls;
3453 struct Operation *op = GNUNET_new (struct Operation); 4948 struct Operation *op = GNUNET_new (struct Operation);
4949
3454 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 4950 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3455 GNUNET_MQ_hd_var_size (incoming_msg, 4951 GNUNET_MQ_hd_var_size (incoming_msg,
3456 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 4952 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
@@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls,
3488 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4984 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3489 struct GNUNET_MessageHeader, 4985 struct GNUNET_MessageHeader,
3490 op), 4986 op),
3491 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4987 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3492 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4988 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3493 struct GNUNET_MessageHeader, 4989 struct TransmitFullMessage,
3494 op), 4990 op),
3495 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4991 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3496 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4992 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3497 struct StrataEstimatorMessage, 4993 struct StrataEstimatorMessage,
@@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls,
3504 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 5000 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3505 struct GNUNET_SETU_ElementMessage, 5001 struct GNUNET_SETU_ElementMessage,
3506 op), 5002 op),
5003 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5004 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
5005 struct TransmitFullMessage,
5006 NULL),
3507 GNUNET_MQ_handler_end () 5007 GNUNET_MQ_handler_end ()
3508 }; 5008 };
3509 struct Set *set; 5009 struct Set *set;
@@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls,
3525 op->force_full = msg->force_full; 5025 op->force_full = msg->force_full;
3526 op->force_delta = msg->force_delta; 5026 op->force_delta = msg->force_delta;
3527 op->symmetric = msg->symmetric; 5027 op->symmetric = msg->symmetric;
5028 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5029 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5030 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5031 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5032 op->active_passive_switch_required = false;
3528 context = GNUNET_MQ_extract_nested_mh (msg); 5033 context = GNUNET_MQ_extract_nested_mh (msg);
3529 5034
5035 /* create hashmap for message control */
5036 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5037 GNUNET_NO);
5038 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5039
5040#if MEASURE_PERFORMANCE
5041 /* load config */
5042 load_config (op);
5043#endif
5044
3530 /* Advance generation values, so that 5045 /* Advance generation values, so that
3531 mutations won't interfer with the running operation. */ 5046 mutations won't interfer with the running operation. */
3532 op->set = set; 5047 op->set = set;
@@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls,
3550 struct GNUNET_MQ_Envelope *ev; 5065 struct GNUNET_MQ_Envelope *ev;
3551 struct OperationRequestMessage *msg; 5066 struct OperationRequestMessage *msg;
3552 5067
3553 perf_rtt.operation_request.sent += 1; 5068#if MEASURE_PERFORMANCE
5069 perf_store.operation_request.sent += 1;
5070#endif
3554 ev = GNUNET_MQ_msg_nested_mh (msg, 5071 ev = GNUNET_MQ_msg_nested_mh (msg,
3555 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 5072 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3556 context); 5073 context);
@@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls,
3567 op->se = strata_estimator_dup (op->set->se); 5084 op->se = strata_estimator_dup (op->set->se);
3568 /* we started the operation, thus we have to send the operation request */ 5085 /* we started the operation, thus we have to send the operation request */
3569 op->phase = PHASE_EXPECT_SE; 5086 op->phase = PHASE_EXPECT_SE;
3570 op->salt_receive = op->salt_send = 42; // FIXME????? 5087
5088 op->salt_receive = (op->peer_site + 1) % 2;
5089 op->salt_send = op->peer_site; // FIXME?????
5090
5091
3571 LOG (GNUNET_ERROR_TYPE_DEBUG, 5092 LOG (GNUNET_ERROR_TYPE_DEBUG,
3572 "Initiating union operation evaluation\n"); 5093 "Initiating union operation evaluation\n");
3573 GNUNET_STATISTICS_update (_GSS_statistics, 5094 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -3711,6 +5232,20 @@ handle_client_accept (void *cls,
3711 op->force_full = msg->force_full; 5232 op->force_full = msg->force_full;
3712 op->force_delta = msg->force_delta; 5233 op->force_delta = msg->force_delta;
3713 op->symmetric = msg->symmetric; 5234 op->symmetric = msg->symmetric;
5235 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5236 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5237 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5238 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5239 op->active_passive_switch_required = false;
5240 /* create hashmap for message control */
5241 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5242 GNUNET_NO);
5243 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5244
5245#if MEASURE_PERFORMANCE
5246 /* load config */
5247 load_config (op);
5248#endif
3714 5249
3715 /* Advance generation values, so that future mutations do not 5250 /* Advance generation values, so that future mutations do not
3716 interfer with the running operation. */ 5251 interfer with the running operation. */
@@ -3729,7 +5264,7 @@ handle_client_accept (void *cls,
3729 1, 5264 1,
3730 GNUNET_NO); 5265 GNUNET_NO);
3731 { 5266 {
3732 const struct StrataEstimator *se; 5267 struct MultiStrataEstimator *se;
3733 struct GNUNET_MQ_Envelope *ev; 5268 struct GNUNET_MQ_Envelope *ev;
3734 struct StrataEstimatorMessage *strata_msg; 5269 struct StrataEstimatorMessage *strata_msg;
3735 char *buf; 5270 char *buf;
@@ -3739,20 +5274,40 @@ handle_client_accept (void *cls,
3739 op->se = strata_estimator_dup (op->set->se); 5274 op->se = strata_estimator_dup (op->set->se);
3740 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 5275 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3741 GNUNET_NO); 5276 GNUNET_NO);
3742 op->salt_receive = op->salt_send = 42; // FIXME????? 5277 op->salt_receive = (op->peer_site + 1) % 2;
5278 op->salt_send = op->peer_site; // FIXME?????
3743 initialize_key_to_element (op); 5279 initialize_key_to_element (op);
3744 op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( 5280 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3745 op->key_to_element); 5281 op->key_to_element);
3746 5282
3747 /* kick off the operation */ 5283 /* kick off the operation */
3748 se = op->se; 5284 se = op->se;
3749 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); 5285
5286 uint8_t se_count = 1;
5287 if (op->initial_size > 0)
5288 {
5289 op->total_elements_size_local = 0;
5290 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5291 &
5292 determinate_avg_element_size_iterator,
5293 op);
5294 se_count = determine_strata_count (
5295 op->total_elements_size_local / op->initial_size,
5296 op->initial_size);
5297 }
5298 buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5299 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
3750 len = strata_estimator_write (se, 5300 len = strata_estimator_write (se,
5301 SE_IBFS_TOTAL_SIZE,
5302 se_count,
3751 buf); 5303 buf);
3752 perf_rtt.se.sent += 1; 5304#if MEASURE_PERFORMANCE
3753 perf_rtt.se.sent_var_bytes += len; 5305 perf_store.se.sent += 1;
5306 perf_store.se.sent_var_bytes += len;
5307#endif
3754 5308
3755 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) 5309 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5310 * SE_IBFS_TOTAL_SIZE)
3756 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; 5311 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
3757 else 5312 else
3758 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; 5313 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
@@ -3766,6 +5321,7 @@ handle_client_accept (void *cls,
3766 strata_msg->set_size 5321 strata_msg->set_size
3767 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( 5322 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3768 op->set->content->elements)); 5323 op->set->content->elements));
5324 strata_msg->se_count = se_count;
3769 GNUNET_MQ_send (op->mq, 5325 GNUNET_MQ_send (op->mq,
3770 ev); 5326 ev);
3771 op->phase = PHASE_EXPECT_IBF; 5327 op->phase = PHASE_EXPECT_IBF;
@@ -3800,8 +5356,9 @@ shutdown_task (void *cls)
3800 GNUNET_YES); 5356 GNUNET_YES);
3801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5357 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3802 "handled shutdown request\n"); 5358 "handled shutdown request\n");
3803 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 5359#if MEASURE_PERFORMANCE
3804 "RTT:%f\n", calculate_perf_rtt()); 5360 calculate_perf_store ();
5361#endif
3805} 5362}
3806 5363
3807 5364