aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_protocols.h7
-rw-r--r--src/include/gnunet_setu_service.h26
-rw-r--r--src/set/ibf.c2
-rw-r--r--src/set/ibf.h1
-rw-r--r--src/setu/Makefile.am11
-rw-r--r--src/setu/gnunet-service-setu.c2081
-rw-r--r--src/setu/gnunet-service-setu_protocol.h77
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.c362
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.h54
-rw-r--r--src/setu/ibf.c294
-rw-r--r--src/setu/ibf.h65
-rw-r--r--src/setu/perf_setu_api.c571
-rw-r--r--src/setu/setu.h49
-rw-r--r--src/setu/setu_api.c40
-rw-r--r--src/setu/test_setu_api.c60
15 files changed, 2921 insertions, 779 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 715e94c6a..6b61dfc72 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1784,6 +1784,13 @@ extern "C" {
1784 */ 1784 */
1785#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572 1785#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572
1786 1786
1787/**
1788 * Signals other peer that all elements are sent.
1789 */
1790
1791#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL 710
1792
1793
1787 1794
1788/******************************************************************************* 1795/*******************************************************************************
1789 * SETI message types 1796 * SETI message types
diff --git a/src/include/gnunet_setu_service.h b/src/include/gnunet_setu_service.h
index bacec9408..1d7e48402 100644
--- a/src/include/gnunet_setu_service.h
+++ b/src/include/gnunet_setu_service.h
@@ -163,7 +163,31 @@ enum GNUNET_SETU_OptionType
163 /** 163 /**
164 * Notify client also if we are sending a value to the other peer. 164 * Notify client also if we are sending a value to the other peer.
165 */ 165 */
166 GNUNET_SETU_OPTION_SYMMETRIC = 8 166 GNUNET_SETU_OPTION_SYMMETRIC = 8,
167
168 /**
169 * Byzantine upper bound. Is the maximal plausible number of elements
170 * a peer can have default max uint64
171 */
172 GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND = 16,
173
174 /**
175 * Bandwidth latency tradeoff determines how much bytes a single RTT is
176 * worth, which is a performance setting
177 */
178 GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF= 32,
179
180 /**
181 * The factor determines the number of buckets an IBF has which is
182 * multiplied by the estimated setsize default: 2
183 */
184 GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR= 64,
185
186 /**
187 * This setting determines to how many IBF buckets an single elements
188 * is mapped to.
189 */
190 GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT= 128
167}; 191};
168 192
169 193
diff --git a/src/set/ibf.c b/src/set/ibf.c
index 1532afceb..0f7eb6a9f 100644
--- a/src/set/ibf.c
+++ b/src/set/ibf.c
@@ -294,7 +294,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start,
294 struct IBF_KeyHash *key_hash_dst; 294 struct IBF_KeyHash *key_hash_dst;
295 struct IBF_Count *count_dst; 295 struct IBF_Count *count_dst;
296 296
297 GNUNET_assert (start + count <= ibf->size); 297 GNUNET_assert (start + count <= ibf->size);
298 298
299 /* copy keys */ 299 /* copy keys */
300 key_dst = (struct IBF_Key *) buf; 300 key_dst = (struct IBF_Key *) buf;
diff --git a/src/set/ibf.h b/src/set/ibf.h
index 7c2ab33b1..334a797ef 100644
--- a/src/set/ibf.h
+++ b/src/set/ibf.h
@@ -245,6 +245,7 @@ void
245ibf_destroy (struct InvertibleBloomFilter *ibf); 245ibf_destroy (struct InvertibleBloomFilter *ibf);
246 246
247 247
248
248#if 0 /* keep Emacsens' auto-indent happy */ 249#if 0 /* keep Emacsens' auto-indent happy */
249{ 250{
250#endif 251#endif
diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am
index 77d048add..6e2865d8c 100644
--- a/src/setu/Makefile.am
+++ b/src/setu/Makefile.am
@@ -7,6 +7,8 @@ libexecdir= $(pkglibdir)/libexec/
7 7
8plugindir = $(libdir)/gnunet 8plugindir = $(libdir)/gnunet
9 9
10PTHREAD = -lpthread
11
10pkgcfg_DATA = \ 12pkgcfg_DATA = \
11 setu.conf 13 setu.conf
12 14
@@ -63,7 +65,8 @@ libgnunetsetu_la_SOURCES = \
63 setu_api.c setu.h 65 setu_api.c setu.h
64libgnunetsetu_la_LIBADD = \ 66libgnunetsetu_la_LIBADD = \
65 $(top_builddir)/src/util/libgnunetutil.la \ 67 $(top_builddir)/src/util/libgnunetutil.la \
66 $(LTLIBINTL) 68 $(LTLIBINTL) \
69 $(PTHREAD)
67libgnunetsetu_la_LDFLAGS = \ 70libgnunetsetu_la_LDFLAGS = \
68 $(GN_LIB_LDFLAGS) 71 $(GN_LIB_LDFLAGS)
69 72
@@ -91,7 +94,8 @@ perf_setu_api_SOURCES = \
91perf_setu_api_LDADD = \ 94perf_setu_api_LDADD = \
92 $(top_builddir)/src/util/libgnunetutil.la \ 95 $(top_builddir)/src/util/libgnunetutil.la \
93 $(top_builddir)/src/testing/libgnunettesting.la \ 96 $(top_builddir)/src/testing/libgnunettesting.la \
94 libgnunetsetu.la 97 libgnunetsetu.la \
98 $(PTHREAD)
95 99
96 100
97plugin_LTLIBRARIES = \ 101plugin_LTLIBRARIES = \
@@ -103,7 +107,8 @@ libgnunet_plugin_block_setu_test_la_LIBADD = \
103 $(top_builddir)/src/block/libgnunetblock.la \ 107 $(top_builddir)/src/block/libgnunetblock.la \
104 $(top_builddir)/src/block/libgnunetblockgroup.la \ 108 $(top_builddir)/src/block/libgnunetblockgroup.la \
105 $(top_builddir)/src/util/libgnunetutil.la \ 109 $(top_builddir)/src/util/libgnunetutil.la \
106 $(LTLIBINTL) 110 $(LTLIBINTL) \
111 $(PTHREAD)
107libgnunet_plugin_block_setu_test_la_LDFLAGS = \ 112libgnunet_plugin_block_setu_test_la_LDFLAGS = \
108 $(GN_PLUGIN_LDFLAGS) 113 $(GN_PLUGIN_LDFLAGS)
109 114
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index bd1113f15..a51e92b71 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -22,6 +22,7 @@
22 * @brief set union operation 22 * @brief set union operation
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -50,33 +51,61 @@
50 */ 51 */
51#define SE_STRATA_COUNT 32 52#define SE_STRATA_COUNT 32
52 53
54
53/** 55/**
54 * Size of the IBFs in the strata estimator. 56 * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348
57 * Based on the bsc thesis of Elias Summermatter (2021)
55 */ 58 */
56#define SE_IBF_SIZE 80 59#define SE_IBFS_TOTAL_SIZE 632
57 60
58/** 61/**
59 * The hash num parameter for the difference digests and strata estimators. 62 * The hash num parameter for the difference digests and strata estimators.
60 */ 63 */
61#define SE_IBF_HASH_NUM 4 64#define SE_IBF_HASH_NUM 3
62 65
63/** 66/**
64 * Number of buckets that can be transmitted in one message. 67 * Number of buckets that can be transmitted in one message.
65 */ 68 */
66#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) 69#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
67 70
68/** 71/**
69 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). 72 * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
70 * Choose this value so that computing the IBF is still cheaper 73 * Choose this value so that computing the IBF is still cheaper
71 * than transmitting all values. 74 * than transmitting all values.
72 */ 75 */
73#define MAX_IBF_ORDER (20) 76#define MAX_IBF_SIZE 1048576
77
78
79/**
80 * Minimal size of an ibf
81 * Based on the bsc thesis of Elias Summermatter (2021)
82 */
83#define IBF_MIN_SIZE 37
84
85/**
86 * AVG RTT for differential sync when k=2 and Factor = 2
87 * Based on the bsc thesis of Elias Summermatter (2021)
88 */
89#define DIFFERENTIAL_RTT_MEAN 3.65145
90
91/**
92 * Security level used for byzantine checks (2^80)
93 */
94
95#define SECURITY_LEVEL 80
96
97/**
98 * Is the estimated probabily for a new round this values
99 * is based on the bsc thesis of Elias Summermatter (2021)
100 */
101
102#define PROBABILITY_FOR_NEW_ROUND 0.15
74 103
75/** 104/**
76 * Number of buckets used in the ibf per estimated 105 * Measure the performance in a csv
77 * difference.
78 */ 106 */
79#define IBF_ALPHA 4 107
108#define MEASURE_PERFORMANCE 0
80 109
81 110
82/** 111/**
@@ -146,6 +175,28 @@ enum UnionOperationPhase
146 PHASE_FULL_RECEIVING 175 PHASE_FULL_RECEIVING
147}; 176};
148 177
178/**
179 * Different modes of operations
180 */
181
182enum MODE_OF_OPERATION
183{
184 /**
185 * Mode just synchronizes the difference between sets
186 */
187 DIFFERENTIAL_SYNC,
188
189 /**
190 * Mode send full set sending local set first
191 */
192 FULL_SYNC_LOCAL_SENDING_FIRST,
193
194 /**
195 * Mode request full set from remote peer
196 */
197 FULL_SYNC_REMOTE_SENDING_FIRST
198};
199
149 200
150/** 201/**
151 * Information about an element element in the set. All elements are 202 * Information about an element element in the set. All elements are
@@ -277,7 +328,7 @@ struct Operation
277 * Copy of the set's strata estimator at the time of 328 * Copy of the set's strata estimator at the time of
278 * creation of this operation. 329 * creation of this operation.
279 */ 330 */
280 struct StrataEstimator *se; 331 struct MultiStrataEstimator *se;
281 332
282 /** 333 /**
283 * The IBF we currently receive. 334 * The IBF we currently receive.
@@ -320,7 +371,7 @@ struct Operation
320 /** 371 /**
321 * Number of ibf buckets already received into the @a remote_ibf. 372 * Number of ibf buckets already received into the @a remote_ibf.
322 */ 373 */
323 unsigned int ibf_buckets_received; 374 uint64_t ibf_buckets_received;
324 375
325 /** 376 /**
326 * Salt that we're using for sending IBFs 377 * Salt that we're using for sending IBFs
@@ -386,7 +437,7 @@ struct Operation
386 * Lower bound for the set size, used only when 437 * Lower bound for the set size, used only when
387 * byzantine mode is enabled. 438 * byzantine mode is enabled.
388 */ 439 */
389 int byzantine_lower_bound; 440 uint64_t byzantine_lower_bound;
390 441
391 /** 442 /**
392 * Unique request id for the request from a remote peer, sent to the 443 * Unique request id for the request from a remote peer, sent to the
@@ -401,21 +452,83 @@ struct Operation
401 */ 452 */
402 unsigned int generation_created; 453 unsigned int generation_created;
403 454
455
456 /**
457 * User defined Bandwidth Round Trips Tradeoff
458 */
459 uint64_t rtt_bandwidth_tradeoff;
460
461
404 /** 462 /**
405 * User defined Bandwidth Round Trips Tradeoff 463 * Number of Element per bucket in IBF
464 */
465 uint8_t ibf_number_buckets_per_element;
466
467
468 /**
469 * Set difference is multiplied with this factor
470 * to gennerate large enought IBF
406 */ 471 */
407 double rtt_bandwidth_tradeoff; 472 uint8_t ibf_bucket_number_factor;
408 473
409 /** 474 /**
410 * Number of Element per bucket in IBF 475 * Defines which site a client is
476 * 0 = Initiating peer
477 * 1 = Receiving peer
411 */ 478 */
412 unsigned int ibf_number_buckets_per_element; 479 uint8_t peer_site;
480
413 481
414 /** 482 /**
415 * Number of buckets in IBF 483 * Local peer element count
416 */ 484 */
417 unsigned ibf_bucket_number; 485 uint64_t local_element_count;
418 486
487 /**
488 * Mode of operation that was chosen by the algorithm
489 */
490 uint8_t mode_of_operation;
491
492 /**
493 * Hashmap to keep track of the send/received messages
494 */
495 struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
496
497 /**
498 * Hashmap to keep track of the send/received inquiries (ibf keys)
499 */
500 struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
501
502
503 /**
504 * Total size of local set
505 */
506 uint64_t total_elements_size_local;
507
508 /**
509 * Limit of number of elements in set
510 */
511 uint64_t byzantine_upper_bound;
512
513 /**
514 * is the count of already passed differential sync iterations
515 */
516 uint8_t differential_sync_iterations;
517
518 /**
519 * Estimated or committed set difference at the start
520 */
521 uint64_t remote_set_diff;
522
523 /**
524 * Estimated or committed set difference at the start
525 */
526 uint64_t local_set_diff;
527
528 /**
529 * Boolean to enforce an active passive switch
530 */
531 bool active_passive_switch_required;
419}; 532};
420 533
421 534
@@ -431,6 +544,16 @@ struct SetContent
431 struct GNUNET_CONTAINER_MultiHashMap *elements; 544 struct GNUNET_CONTAINER_MultiHashMap *elements;
432 545
433 /** 546 /**
547 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized.
548 */
549 struct GNUNET_CONTAINER_MultiHashMap *elements_randomized;
550
551 /**
552 * Salt to construct the randomized element map
553 */
554 uint64_t elements_randomized_salt;
555
556 /**
434 * Number of references to the content. 557 * Number of references to the content.
435 */ 558 */
436 unsigned int refcount; 559 unsigned int refcount;
@@ -478,7 +601,7 @@ struct Set
478 * The strata estimator is only generated once for each set. The IBF keys 601 * The strata estimator is only generated once for each set. The IBF keys
479 * are derived from the element hashes with salt=0. 602 * are derived from the element hashes with salt=0.
480 */ 603 */
481 struct StrataEstimator *se; 604 struct MultiStrataEstimator *se;
482 605
483 /** 606 /**
484 * Evaluate operations are held in a linked list. 607 * Evaluate operations are held in a linked list.
@@ -635,96 +758,679 @@ static int in_shutdown;
635 */ 758 */
636static uint32_t suggest_id; 759static uint32_t suggest_id;
637 760
761#if MEASURE_PERFORMANCE
762/**
763 * Handles configuration file for setu performance test
764 *
765 */
766static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767
638 768
639/** 769/**
640 * Added Roundtripscounter 770 * Stores the performance data for induvidual message
641 */ 771 */
642 772
643 773
644struct perf_num_send_resived_msg { 774struct perf_num_send_received_msg
645 int sent; 775{
646 int sent_var_bytes; 776 uint64_t sent;
647 int received; 777 uint64_t sent_var_bytes;
648 int received_var_bytes; 778 uint64_t received;
779 uint64_t received_var_bytes;
649}; 780};
650 781
782/**
783 * Main struct to messure perfomance (data/rtts)
784 */
785struct per_store_struct
786{
787 struct perf_num_send_received_msg operation_request;
788 struct perf_num_send_received_msg se;
789 struct perf_num_send_received_msg request_full;
790 struct perf_num_send_received_msg element_full;
791 struct perf_num_send_received_msg full_done;
792 struct perf_num_send_received_msg ibf;
793 struct perf_num_send_received_msg inquery;
794 struct perf_num_send_received_msg element;
795 struct perf_num_send_received_msg demand;
796 struct perf_num_send_received_msg offer;
797 struct perf_num_send_received_msg done;
798 struct perf_num_send_received_msg over;
799 uint64_t se_diff;
800 uint64_t se_diff_remote;
801 uint64_t se_diff_local;
802 uint64_t active_passive_switches;
803 uint8_t mode_of_operation;
804};
651 805
652struct perf_rtt_struct 806struct per_store_struct perf_store;
653{ 807#endif
654 struct perf_num_send_resived_msg operation_request; 808
655 struct perf_num_send_resived_msg se; 809/**
656 struct perf_num_send_resived_msg request_full; 810 * Different states to control the messages flow in differential mode
657 struct perf_num_send_resived_msg element_full; 811 */
658 struct perf_num_send_resived_msg full_done; 812
659 struct perf_num_send_resived_msg ibf; 813enum MESSAGE_CONTROL_FLOW_STATE
660 struct perf_num_send_resived_msg inquery; 814{
661 struct perf_num_send_resived_msg element; 815 /**
662 struct perf_num_send_resived_msg demand; 816 * Initial message state
663 struct perf_num_send_resived_msg offer; 817 */
664 struct perf_num_send_resived_msg done; 818 MSG_CFS_UNINITIALIZED,
665 struct perf_num_send_resived_msg over; 819
820 /**
821 * Track that a message has been sent
822 */
823 MSG_CFS_SENT,
824
825 /**
826 * Track that receiving this message is expected
827 */
828 MSG_CFS_EXPECTED,
829
830 /**
831 * Track that message has been recieved
832 */
833 MSG_CFS_RECEIVED,
834};
835
836/**
837 * Message types to track in message control flow
838 */
839
840enum MESSAGE_TYPE
841{
842 /**
843 * Offer message type
844 */
845 OFFER_MESSAGE,
846 /**
847 * Demand message type
848 */
849 DEMAND_MESSAGE,
850 /**
851 * Elemente message type
852 */
853 ELEMENT_MESSAGE,
666}; 854};
667 855
668struct perf_rtt_struct perf_rtt; 856/**
857 * Struct to tracked messages in message controll flow
858 */
859
860struct messageControlFlowElement
861{
862 /**
863 * Track the message control state of the offer message
864 */
865 enum MESSAGE_CONTROL_FLOW_STATE offer;
866 /**
867 * Track the message control state of the demand message
868 */
869 enum MESSAGE_CONTROL_FLOW_STATE demand;
870 /**
871 * Track the message control state of the element message
872 */
873 enum MESSAGE_CONTROL_FLOW_STATE element;
874};
875
876
877#if MEASURE_PERFORMANCE
878/**
879 * Loads different configuration to do perform perfomance tests
880 * @param op
881 */
882static void
883load_config (struct Operation *op)
884{
885
886 setu_cfg = GNUNET_CONFIGURATION_create ();
887 GNUNET_CONFIGURATION_load (setu_cfg,"perf_setu.conf");
888
889
890 long long number;
891 float fl;
892 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
893 &fl);
894 op->ibf_bucket_number_factor = fl;
895
896 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
897 &number);
898 op->ibf_number_buckets_per_element = number;
899
900 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"PERFORMANCE", "TRADEOFF",
901 &number);
902 op->rtt_bandwidth_tradeoff = number;
903
904
905 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"BOUNDARIES", "UPPER_ELEMENT",
906 &number);
907 op->byzantine_upper_bound = number;
908
909
910 op->peer_site = 0;
911}
669 912
670 913
914/**
915 * Function to calculate total bytes used for performance messurement
916 * @param size
917 * @param perf_num_send_received_msg
918 * @return bytes used
919 */
671static int 920static int
672sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) { 921sum_sent_received_bytes (uint64_t size, struct perf_num_send_received_msg
673 return (size * perf_rtt_struct.sent) + 922 perf_num_send_received_msg)
674 (size * perf_rtt_struct.received) + 923{
675 perf_rtt_struct.sent_var_bytes + 924 return (size * perf_num_send_received_msg.sent)
676 perf_rtt_struct.received_var_bytes; 925 + (size * perf_num_send_received_msg.received)
926 + perf_num_send_received_msg.sent_var_bytes
927 + perf_num_send_received_msg.received_var_bytes;
677} 928}
678 929
679static float
680calculate_perf_rtt() {
681 /**
682 * Calculate RTT of init phase normally always 1
683 */
684 float rtt = 1;
685 int bytes_transmitted = 0;
686 930
687 /** 931/**
688 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending 932 * Function that calculates the perfmance values and writes them down
689 */ 933 */
690 if (( perf_rtt.element_full.received != 0 ) || 934static void
691 ( perf_rtt.element_full.sent != 0) 935calculate_perf_store ()
692 ) rtt += 1; 936{
693 937
694 if (( perf_rtt.request_full.received != 0 ) || 938 /**
695 ( perf_rtt.request_full.sent != 0) 939 * Calculate RTT of init phase normally always 1
696 ) rtt += 0.5; 940 */
941 float rtt = 1;
942 int bytes_transmitted = 0;
697 943
698 /** 944 /**
699 * In case of a differential sync 3 rtt's are needed. 945 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normaly 1 or 1.5 depending
700 * for every active/passive switch additional 3.5 rtt's are used 946 */
701 */ 947 if ((perf_store.element_full.received != 0) ||
948 (perf_store.element_full.sent != 0)
949 )
950 rtt += 1;
951
952 if ((perf_store.request_full.received != 0) ||
953 (perf_store.request_full.sent != 0)
954 )
955 rtt += 0.5;
956
957 /**
958 * In case of a differential sync 3 rtt's are needed.
959 * for every active/passive switch additional 3.5 rtt's are used
960 */
961 if ((perf_store.element.received != 0) ||
962 (perf_store.element.sent != 0))
963 {
964 int iterations = perf_store.active_passive_switches;
965
966 if (iterations > 0)
967 rtt += iterations * 0.5;
968 rtt += 2.5;
969 }
970
971
972 /**
973 * Calculate data sended size
974 */
975 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
976 GNUNET_SETU_ResultMessage),
977 perf_store.request_full);
978
979 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
980 GNUNET_SETU_ElementMessage),
981 perf_store.element_full);
982 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
983 GNUNET_SETU_ElementMessage),
984 perf_store.element);
985 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
986 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
987 StrataEstimatorMessage),
988 perf_store.se);
989 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
990 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
991 perf_store.ibf);
992 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
993 perf_store.inquery);
994 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
995 GNUNET_MessageHeader),
996 perf_store.demand);
997 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
998 GNUNET_MessageHeader),
999 perf_store.offer);
1000 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1001
1002 /**
1003 * Write IBF failure rate for different BUCKET_NUMBER_FACTOR
1004 */
1005 float factor;
1006 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1007 &factor);
1008 long long num_per_bucket;
1009 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1010 &num_per_bucket);
1011
1012
1013 int decoded = 0;
1014 if (perf_store.active_passive_switches == 0)
1015 decoded = 1;
1016 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1017 IBFMessage),
1018 perf_store.ibf);
1019
1020 FILE *out1 = fopen ("perf_data.csv", "a");
1021 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1022 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1023 bytes_transmitted,
1024 perf_store.se_diff_local,perf_store.se_diff_remote,
1025 perf_store.mode_of_operation);
1026 fclose (out1);
1027
1028}
1029
1030
1031#endif
1032/**
1033 * Function that chooses the optimal mode of operation depending on
1034 * operation parameters.
1035 * @param avg_element_size
1036 * @param local_set_size
1037 * @param remote_set_size
1038 * @param est_set_diff_remote
1039 * @param est_set_diff_local
1040 * @param bandwith_latency_tradeoff
1041 * @param ibf_bucket_number_factor
1042 * @return calcuated mode of operation
1043 */
1044static uint8_t
1045estimate_best_mode_of_operation (uint64_t avg_element_size,
1046 uint64_t local_set_size,
1047 uint64_t remote_set_size,
1048 uint64_t est_set_diff_remote,
1049 uint64_t est_set_diff_local,
1050 uint64_t bandwith_latency_tradeoff,
1051 uint64_t ibf_bucket_number_factor)
1052{
1053
1054 /*
1055 * In case of initial sync fall to predefined states
1056 */
1057
1058 if (0 == local_set_size)
1059 return FULL_SYNC_REMOTE_SENDING_FIRST;
1060 if (0 == remote_set_size)
1061 return FULL_SYNC_LOCAL_SENDING_FIRST;
1062
1063 /*
1064 * Calculate bytes for full Sync
1065 */
1066
1067 uint8_t sizeof_full_done_header = 4;
1068 uint8_t sizeof_done_header = 4;
1069 uint8_t rtt_min_full = 2;
1070 uint8_t sizeof_request_full = 4;
1071 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1072
1073 /* Estimate byte required if we send first */
1074 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1075 + local_set_size;
1076
1077 uint64_t total_bytes_full_local_send_first = (avg_element_size
1078 *
1079 total_elements_to_send_local_send_first) \
1080 + (
1081 total_elements_to_send_local_send_first * sizeof(struct
1082 GNUNET_SETU_ElementMessage)) \
1083 + (sizeof_full_done_header * 2) \
1084 + rtt_min_full
1085 * bandwith_latency_tradeoff;
1086
1087 /* Estimate bytes required if we request from remote peer */
1088 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1089 + remote_set_size;
1090
1091 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1092 *
1093 total_elements_to_send_remote_send_first) \
1094 + (
1095 total_elements_to_send_remote_send_first * sizeof(struct
1096 GNUNET_SETU_ElementMessage)) \
1097 + (sizeof_full_done_header * 2) \
1098 + (rtt_min_full + 0.5)
1099 * bandwith_latency_tradeoff \
1100 + sizeof_request_full;
1101
1102 /*
1103 * Calculate bytes for differential Sync
1104 */
1105
1106 /* Estimate bytes required by IBF transmission*/
1107
1108 long double ibf_bucket_count = estimated_total_diff
1109 * ibf_bucket_number_factor;
1110
1111 if (ibf_bucket_count <= IBF_MIN_SIZE)
1112 {
1113 ibf_bucket_count = IBF_MIN_SIZE;
1114 }
1115 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1116 / MAX_BUCKETS_PER_MESSAGE);
1117
1118 uint64_t estimated_counter_size = ceil (
1119 MIN (2 * log2l ((float) local_set_size / ibf_bucket_count), log2l (
1120 local_set_size)));
1121
1122 long double counter_bytes = (float) estimated_counter_size / 8;
1123
1124 uint64_t ibf_bytes = ceil ((sizeof(struct IBFMessage) * ibf_message_count)
1125 * 1.2 \
1126 + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1127 + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1128 * 1.2 \
1129 + (ibf_bucket_count * counter_bytes) * 1.2);
1130
1131 /* Estimate full byte count for differential sync */
1132 uint64_t element_size = (avg_element_size + sizeof(struct
1133 GNUNET_SETU_ElementMessage)) \
1134 * estimated_total_diff;
1135 uint64_t done_size = sizeof_done_header;
1136 uint64_t inquery_size = (sizeof(struct IBF_Key) + sizeof(struct
1137 InquiryMessage))
1138 * estimated_total_diff;
1139 uint64_t demand_size =
1140 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1141 * estimated_total_diff;
1142 uint64_t offer_size = (sizeof(struct GNUNET_HashCode) + sizeof(struct
1143 GNUNET_MessageHeader))
1144 * estimated_total_diff;
1145
1146 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1147 + demand_size + offer_size + ibf_bytes) \
1148 + (DIFFERENTIAL_RTT_MEAN
1149 * bandwith_latency_tradeoff);
1150
1151 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1152 total_bytes_full_remote_send_first);
1153
1154 /* Decide between full and differential sync */
1155
1156 if (full_min < total_bytes_diff)
1157 {
1158 /* Decide between sending all element first or receiving all elements */
1159 if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1160 {
1161 return FULL_SYNC_LOCAL_SENDING_FIRST;
1162 }
1163 else
1164 {
1165 return FULL_SYNC_REMOTE_SENDING_FIRST;
1166 }
1167 }
1168 else
1169 {
1170 return DIFFERENTIAL_SYNC;
1171 }
1172}
1173
1174
1175/**
1176 * Validates the if a message is received in a correct phase
1177 * @param allowed_phases
1178 * @param size_phases
1179 * @param op
1180 * @return GNUNET_YES if message permitted in phase and GNUNET_NO if not permitted in given
1181 * phase
1182 */
1183static int
1184check_valid_phase (const uint8_t allowed_phases[], size_t size_phases, struct
1185 Operation *op)
1186{
1187 /**
1188 * Iterate over allowed phases
1189 */
1190 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1191 {
1192 uint8_t phase = allowed_phases[phase_ctr];
1193 if (phase == op->phase)
1194 {
1195 LOG (GNUNET_ERROR_TYPE_DEBUG,
1196 "Message received in valid phase\n");
1197 return GNUNET_YES;
1198 }
1199 }
1200 LOG (GNUNET_ERROR_TYPE_ERROR,
1201 "Received message in invalid phase: %u\n", op->phase);
1202 return GNUNET_NO;
1203}
1204
1205
1206/**
1207 * Function to update, track and validate message received in differential
1208 * sync. This function tracks states of messages and check it against different
1209 * constraints as described in Summermatter's BSc Thesis (2021)
1210 * @param hash_map: Hashmap to store message control flow
1211 * @param new_mcfs: The new message control flow state an given message type should be set to
1212 * @param hash_code: Hash code of the element
1213 * @param mt: The message type for which the message control flow state should be set
1214 * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid
1215 * at this point in message flow
1216 */
1217static int
1218update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map,
1219 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1220 const struct GNUNET_HashCode *hash_code,
1221 enum MESSAGE_TYPE mt)
1222{
1223 struct messageControlFlowElement *cfe = NULL;
1224 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1225
1226 /**
1227 * Check logic for forbidden messages
1228 */
702 1229
703 int iterations = perf_rtt.ibf.received; 1230 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
704 if(iterations > 1) 1231 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
705 rtt += (iterations - 1 ) * 0.5; 1232 {
706 rtt += 3 * iterations; 1233 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1234 {
1235 LOG (GNUNET_ERROR_TYPE_ERROR,
1236 "Received an element without sent offer!\n");
1237 return GNUNET_NO;
1238 }
1239 /* Check that only requested elements are received! */
1240 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1241 MSG_CFS_SENT))
1242 {
1243 LOG (GNUNET_ERROR_TYPE_ERROR,
1244 "Received an element that was not demanded\n");
1245 return GNUNET_NO;
1246 }
1247 }
1248
1249 /**
1250 * In case the element hash is not in the hashmap create a new entry
1251 */
1252
1253 if (NULL == cfe)
1254 {
1255 cfe = GNUNET_new (struct messageControlFlowElement);
1256 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1257 cfe,
1258 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1259 {
1260 GNUNET_free (cfe);
1261 return GNUNET_SYSERR;
1262 }
1263 }
1264
1265 /**
1266 * Set state of message
1267 */
1268
1269 if (OFFER_MESSAGE == mt)
1270 {
1271 mcfs = &cfe->offer;
1272 }
1273 else if (DEMAND_MESSAGE == mt)
1274 {
1275 mcfs = &cfe->demand;
1276 }
1277 else if (ELEMENT_MESSAGE == mt)
1278 {
1279 mcfs = &cfe->element;
1280 }
1281 else
1282 {
1283 return GNUNET_SYSERR;
1284 }
1285
1286 /**
1287 * Check if state is allowed
1288 */
1289
1290 if (new_mcfs <= *mcfs)
1291 {
1292 return GNUNET_NO;
1293 }
1294
1295 *mcfs = new_mcfs;
1296 return GNUNET_YES;
1297}
1298
1299
1300/**
1301 * Validate if a message in differential sync si already received before.
1302 * @param hash_map
1303 * @param hash_code
1304 * @param mt
1305 * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO
1306 */
1307static int
1308is_message_in_message_control_flow (struct
1309 GNUNET_CONTAINER_MultiHashMap *hash_map,
1310 struct GNUNET_HashCode *hash_code,
1311 enum MESSAGE_TYPE mt)
1312{
1313 struct messageControlFlowElement *cfe = NULL;
1314 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1315
1316 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1317
1318 /**
1319 * Set state of message
1320 */
1321
1322 if (cfe != NULL)
1323 {
1324 if (OFFER_MESSAGE == mt)
1325 {
1326 mcfs = &cfe->offer;
1327 }
1328 else if (DEMAND_MESSAGE == mt)
1329 {
1330 mcfs = &cfe->demand;
1331 }
1332 else if (ELEMENT_MESSAGE == mt)
1333 {
1334 mcfs = &cfe->element;
1335 }
1336 else
1337 {
1338 return GNUNET_SYSERR;
1339 }
707 1340
708 /** 1341 /**
709 * Calculate data sended size 1342 * Evaluate if set is in message
710 */ 1343 */
711 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full); 1344 if (*mcfs != MSG_CFS_UNINITIALIZED)
712 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full); 1345 {
713 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element); 1346 return GNUNET_YES;
714 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request); 1347 }
715 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se); 1348 }
716 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done); 1349 return GNUNET_NO;
717 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf); 1350}
718 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery); 1351
719 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand); 1352
720 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer); 1353/**
721 bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done); 1354 * Iterator for determining if all demands have been
1355 * satisfied
1356 *
1357 * @param cls the union operation `struct Operation *`
1358 * @param key unused
1359 * @param value the `struct ElementEntry *` to insert
1360 * into the key-to-element mapping
1361 * @return #GNUNET_YES (to continue iterating)
1362 */
1363static int
1364determinate_done_message_iterator (void *cls,
1365 const struct GNUNET_HashCode *key,
1366 void *value)
1367{
1368 struct messageControlFlowElement *mcfe = value;
1369
1370 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1371 {
1372 return GNUNET_YES;
1373 }
1374 return GNUNET_NO;
1375}
1376
1377
1378/**
1379 * Iterator for determining average size
1380 *
1381 * @param cls the union operation `struct Operation *`
1382 * @param key unused
1383 * @param value the `struct ElementEntry *` to insert
1384 * into the key-to-element mapping
1385 * @return #GNUNET_YES (to continue iterating)
1386 */
1387static int
1388determinate_avg_element_size_iterator (void *cls,
1389 const struct GNUNET_HashCode *key,
1390 void *value)
1391{
1392 struct Operation *op = cls;
1393 struct GNUNET_SETU_Element *element = value;
1394 op->total_elements_size_local += element->size;
1395 return GNUNET_YES;
1396}
722 1397
723 LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted);
724 1398
725 LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt )); 1399/**
1400 * Create randomized element hashmap for full sending
1401 *
1402 * @param cls the union operation `struct Operation *`
1403 * @param key unused
1404 * @param value the `struct ElementEntry *` to insert
1405 * into the key-to-element mapping
1406 * @return #GNUNET_YES (to continue iterating)
1407 */
1408static int
1409create_randomized_element_iterator (void *cls,
1410 const struct GNUNET_HashCode *key,
1411 void *value)
1412{
1413 struct Operation *op = cls;
726 1414
727 return rtt; 1415 struct GNUNET_HashContext *hashed_key_context =
1416 GNUNET_CRYPTO_hash_context_start ();
1417 struct GNUNET_HashCode new_key;
1418
1419 /**
1420 * Hash element with new salt to randomize hashmap
1421 */
1422 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1423 &key,
1424 sizeof(struct IBF_Key));
1425 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1426 &op->set->content->elements_randomized_salt,
1427 sizeof(uint32_t));
1428 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1429 &new_key);
1430 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1431 &new_key,value,
1432 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1433 return GNUNET_YES;
728} 1434}
729 1435
730 1436
@@ -808,6 +1514,36 @@ send_client_done (void *cls)
808} 1514}
809 1515
810 1516
1517/**
1518 * Check if all given byzantine parameters are in given boundaries
1519 * @param op
1520 * @return indicator if all given byzantine parameters are in given boundaries
1521 */
1522
1523static int
1524check_byzantine_bounds (struct Operation *op)
1525{
1526 if (op->byzantine != GNUNET_YES)
1527 return GNUNET_OK;
1528
1529 /**
1530 * Check upper byzantine bounds
1531 */
1532 if (op->remote_element_count + op->remote_set_diff >
1533 op->byzantine_upper_bound)
1534 return GNUNET_SYSERR;
1535 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1536 return GNUNET_SYSERR;
1537
1538 /**
1539 * Check lower byzantine bounds
1540 */
1541 if (op->remote_element_count < op->byzantine_lower_bound)
1542 return GNUNET_SYSERR;
1543 return GNUNET_OK;
1544}
1545
1546
811/* FIXME: the destroy logic is a mess and should be cleaned up! */ 1547/* FIXME: the destroy logic is a mess and should be cleaned up! */
812 1548
813/** 1549/**
@@ -977,6 +1713,101 @@ fail_union_operation (struct Operation *op)
977 1713
978 1714
979/** 1715/**
1716 * Function that checks if full sync is plausible
1717 * @param initial_local_elements_in_set
1718 * @param estimated_set_difference
1719 * @param repeated_elements
1720 * @param fresh_elements
1721 * @param op
1722 * @return GNUNET_OK if
1723 */
1724
1725static void
1726full_sync_plausibility_check (struct Operation *op)
1727{
1728 if (GNUNET_YES != op->byzantine)
1729 return;
1730
1731 int security_level_lb = -1 * SECURITY_LEVEL;
1732 uint64_t duplicates = op->received_fresh - op->received_total;
1733
1734 /*
1735 * Protect full sync from receiving double element when in FULL SENDING
1736 */
1737 if (PHASE_FULL_SENDING == op->phase)
1738 {
1739 if (duplicates > 0)
1740 {
1741 LOG (GNUNET_ERROR_TYPE_ERROR,
1742 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1743 "mode of operation this is not allowed! Duplicates: %lu\n",
1744 duplicates);
1745 GNUNET_break_op (0);
1746 fail_union_operation (op);
1747 return;
1748 }
1749
1750 }
1751
1752 /*
1753 * Protect full sync with probabilistic algorithm
1754 */
1755 if (PHASE_FULL_RECEIVING == op->phase)
1756 {
1757 if (0 == op->remote_set_diff)
1758 op->remote_set_diff = 1;
1759
1760 long double base = (1 - (long double) (op->remote_set_diff
1761 / (long double) (op->initial_size
1762 + op->
1763 remote_set_diff)));
1764 long double exponent = (op->received_total - (op->received_fresh * ((long
1765 double)
1766 op->
1767 initial_size
1768 / (long
1769 double)
1770 op->
1771 remote_set_diff)));
1772 long double value = exponent * (log2l (base) / log2l (2));
1773 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1774 {
1775 LOG (GNUNET_ERROR_TYPE_ERROR,
1776 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1777 "to many duplicated full element : %LF\n",
1778 value);
1779 GNUNET_break_op (0);
1780 fail_union_operation (op);
1781 return;
1782 }
1783 }
1784}
1785
1786
1787/**
1788 * Limit active passive switches in differential sync to configured security level
1789 * @param op
1790 */
1791static void
1792check_max_differential_rounds (struct Operation *op)
1793{
1794 double probability = op->differential_sync_iterations * (log2l (
1795 PROBABILITY_FOR_NEW_ROUND)
1796 / log2l (2));
1797 if ((-1 * SECURITY_LEVEL) > probability)
1798 {
1799 LOG (GNUNET_ERROR_TYPE_ERROR,
1800 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1801 "switches in differential sync: %u\n",
1802 op->differential_sync_iterations);
1803 GNUNET_break_op (0);
1804 fail_union_operation (op);
1805 return;
1806 }
1807}
1808
1809
1810/**
980 * Derive the IBF key from a hash code and 1811 * Derive the IBF key from a hash code and
981 * a salt. 1812 * a salt.
982 * 1813 *
@@ -1004,12 +1835,12 @@ get_ibf_key (const struct GNUNET_HashCode *src)
1004struct GetElementContext 1835struct GetElementContext
1005{ 1836{
1006 /** 1837 /**
1007 * FIXME. 1838 * Gnunet hash code in context
1008 */ 1839 */
1009 struct GNUNET_HashCode hash; 1840 struct GNUNET_HashCode hash;
1010 1841
1011 /** 1842 /**
1012 * FIXME. 1843 * Pointer to the key enty
1013 */ 1844 */
1014 struct KeyEntry *k; 1845 struct KeyEntry *k;
1015}; 1846};
@@ -1122,7 +1953,7 @@ salt_key (const struct IBF_Key *k_in,
1122 uint32_t salt, 1953 uint32_t salt,
1123 struct IBF_Key *k_out) 1954 struct IBF_Key *k_out)
1124{ 1955{
1125 int s = salt % 64; 1956 int s = (salt * 7) % 64;
1126 uint64_t x = k_in->key_val; 1957 uint64_t x = k_in->key_val;
1127 1958
1128 /* rotate ibf key */ 1959 /* rotate ibf key */
@@ -1132,14 +1963,14 @@ salt_key (const struct IBF_Key *k_in,
1132 1963
1133 1964
1134/** 1965/**
1135 * FIXME. 1966 * Reverse modification done in the salt_key function
1136 */ 1967 */
1137static void 1968static void
1138unsalt_key (const struct IBF_Key *k_in, 1969unsalt_key (const struct IBF_Key *k_in,
1139 uint32_t salt, 1970 uint32_t salt,
1140 struct IBF_Key *k_out) 1971 struct IBF_Key *k_out)
1141{ 1972{
1142 int s = salt % 64; 1973 int s = (salt * 7) % 64;
1143 uint64_t x = k_in->key_val; 1974 uint64_t x = k_in->key_val;
1144 1975
1145 x = (x << s) | (x >> (64 - s)); 1976 x = (x << s) | (x >> (64 - s));
@@ -1258,7 +2089,9 @@ prepare_ibf (struct Operation *op,
1258 2089
1259 if (NULL != op->local_ibf) 2090 if (NULL != op->local_ibf)
1260 ibf_destroy (op->local_ibf); 2091 ibf_destroy (op->local_ibf);
1261 op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 2092 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2093 op->local_ibf = ibf_create (size,
2094 ((uint8_t) op->ibf_number_buckets_per_element));
1262 if (NULL == op->local_ibf) 2095 if (NULL == op->local_ibf)
1263 { 2096 {
1264 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2097 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1283,13 +2116,23 @@ prepare_ibf (struct Operation *op,
1283 */ 2116 */
1284static int 2117static int
1285send_ibf (struct Operation *op, 2118send_ibf (struct Operation *op,
1286 uint16_t ibf_order) 2119 uint32_t ibf_size)
1287{ 2120{
1288 unsigned int buckets_sent = 0; 2121 uint64_t buckets_sent = 0;
1289 struct InvertibleBloomFilter *ibf; 2122 struct InvertibleBloomFilter *ibf;
2123 op->differential_sync_iterations++;
2124
2125 /**
2126 * Enforce min size of IBF
2127 */
2128 uint32_t ibf_min_size = IBF_MIN_SIZE;
1290 2129
2130 if (ibf_size < ibf_min_size)
2131 {
2132 ibf_size = ibf_min_size;
2133 }
1291 if (GNUNET_OK != 2134 if (GNUNET_OK !=
1292 prepare_ibf (op, 1 << ibf_order)) 2135 prepare_ibf (op, ibf_size))
1293 { 2136 {
1294 /* allocation failed */ 2137 /* allocation failed */
1295 return GNUNET_SYSERR; 2138 return GNUNET_SYSERR;
@@ -1297,45 +2140,48 @@ send_ibf (struct Operation *op,
1297 2140
1298 LOG (GNUNET_ERROR_TYPE_DEBUG, 2141 LOG (GNUNET_ERROR_TYPE_DEBUG,
1299 "sending ibf of size %u\n", 2142 "sending ibf of size %u\n",
1300 1 << ibf_order); 2143 1 << ibf_size);
1301 2144
1302 { 2145 {
1303 char name[64]; 2146 char name[64];
1304 GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); 2147 GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_size);
1305 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); 2148 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1306 } 2149 }
1307 2150
1308 ibf = op->local_ibf; 2151 ibf = op->local_ibf;
1309 2152
1310 while (buckets_sent < (1 << ibf_order)) 2153 while (buckets_sent < ibf_size)
1311 { 2154 {
1312 unsigned int buckets_in_message; 2155 unsigned int buckets_in_message;
1313 struct GNUNET_MQ_Envelope *ev; 2156 struct GNUNET_MQ_Envelope *ev;
1314 struct IBFMessage *msg; 2157 struct IBFMessage *msg;
1315 2158
1316 buckets_in_message = (1 << ibf_order) - buckets_sent; 2159 buckets_in_message = ibf_size - buckets_sent;
1317 /* limit to maximum */ 2160 /* limit to maximum */
1318 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 2161 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1319 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 2162 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1320 2163
1321 perf_rtt.ibf.sent += 1; 2164#if MEASURE_PERFORMANCE
1322 perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE ); 2165 perf_store.ibf.sent += 1;
2166 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2167#endif
1323 ev = GNUNET_MQ_msg_extra (msg, 2168 ev = GNUNET_MQ_msg_extra (msg,
1324 buckets_in_message * IBF_BUCKET_SIZE, 2169 buckets_in_message * IBF_BUCKET_SIZE,
1325 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); 2170 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
1326 msg->reserved1 = 0; 2171 msg->ibf_size = ibf_size;
1327 msg->reserved2 = 0;
1328 msg->order = ibf_order;
1329 msg->offset = htonl (buckets_sent); 2172 msg->offset = htonl (buckets_sent);
1330 msg->salt = htonl (op->salt_send); 2173 msg->salt = htonl (op->salt_send);
2174 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2175
2176
1331 ibf_write_slice (ibf, buckets_sent, 2177 ibf_write_slice (ibf, buckets_sent,
1332 buckets_in_message, &msg[1]); 2178 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
1333 buckets_sent += buckets_in_message; 2179 buckets_sent += buckets_in_message;
1334 LOG (GNUNET_ERROR_TYPE_DEBUG, 2180 LOG (GNUNET_ERROR_TYPE_DEBUG,
1335 "ibf chunk size %u, %u/%u sent\n", 2181 "ibf chunk size %u, %lu/%u sent\n",
1336 buckets_in_message, 2182 buckets_in_message,
1337 buckets_sent, 2183 buckets_sent,
1338 1 << ibf_order); 2184 ibf_size);
1339 GNUNET_MQ_send (op->mq, ev); 2185 GNUNET_MQ_send (op->mq, ev);
1340 } 2186 }
1341 2187
@@ -1354,17 +2200,26 @@ send_ibf (struct Operation *op,
1354 * @return the required size of the ibf 2200 * @return the required size of the ibf
1355 */ 2201 */
1356static unsigned int 2202static unsigned int
1357get_order_from_difference (unsigned int diff) 2203get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2204 float ibf_bucket_number_factor)
1358{ 2205{
1359 unsigned int ibf_order; 2206 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2207 * Elias Summermatter (2021) in section 3.11 **/
2208 return (((int) (diff * ibf_bucket_number_factor)) | 1);
1360 2209
1361 ibf_order = 2; 2210}
1362 while (((1 << ibf_order) < (IBF_ALPHA * diff) || 2211
1363 ((1 << ibf_order) < SE_IBF_HASH_NUM)) && 2212
1364 (ibf_order < MAX_IBF_ORDER)) 2213static unsigned int
1365 ibf_order++; 2214get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
1366 // add one for correction 2215 decoded_elements, unsigned int last_ibf_size)
1367 return ibf_order + 1; 2216{
2217 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2218 - (ibf_bucket_number_factor
2219 * decoded_elements));
2220 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2221 * Elias Summermatter (2021) in section 3.11 **/
2222 return next_size | 1;
1368} 2223}
1369 2224
1370 2225
@@ -1391,8 +2246,10 @@ send_full_element_iterator (void *cls,
1391 LOG (GNUNET_ERROR_TYPE_DEBUG, 2246 LOG (GNUNET_ERROR_TYPE_DEBUG,
1392 "Sending element %s\n", 2247 "Sending element %s\n",
1393 GNUNET_h2s (key)); 2248 GNUNET_h2s (key));
1394 perf_rtt.element_full.received += 1; 2249#if MEASURE_PERFORMANCE
1395 perf_rtt.element_full.received_var_bytes += el->size; 2250 perf_store.element_full.received += 1;
2251 perf_store.element_full.received_var_bytes += el->size;
2252#endif
1396 ev = GNUNET_MQ_msg_extra (emsg, 2253 ev = GNUNET_MQ_msg_extra (emsg,
1397 el->size, 2254 el->size,
1398 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 2255 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -1421,10 +2278,25 @@ send_full_set (struct Operation *op)
1421 "Dedicing to transmit the full set\n"); 2278 "Dedicing to transmit the full set\n");
1422 /* FIXME: use a more memory-friendly way of doing this with an 2279 /* FIXME: use a more memory-friendly way of doing this with an
1423 iterator, just as we do in the non-full case! */ 2280 iterator, just as we do in the non-full case! */
2281
2282 // Randomize Elements to send
2283 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2284 32,GNUNET_NO);
2285 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2286 GNUNET_CRYPTO_QUALITY_NONCE,
2287 UINT64_MAX);
1424 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, 2288 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1425 &send_full_element_iterator, 2289 &
2290 create_randomized_element_iterator,
1426 op); 2291 op);
1427 perf_rtt.full_done.sent += 1; 2292
2293 (void) GNUNET_CONTAINER_multihashmap_iterate (
2294 op->set->content->elements_randomized,
2295 &send_full_element_iterator,
2296 op);
2297#if MEASURE_PERFORMANCE
2298 perf_store.full_done.sent += 1;
2299#endif
1428 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 2300 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
1429 GNUNET_MQ_send (op->mq, 2301 GNUNET_MQ_send (op->mq,
1430 ev); 2302 ev);
@@ -1454,7 +2326,7 @@ check_union_p2p_strata_estimator (void *cls,
1454 msg->header.type)); 2326 msg->header.type));
1455 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2327 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1456 if ((GNUNET_NO == is_compressed) && 2328 if ((GNUNET_NO == is_compressed) &&
1457 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) 2329 (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE))
1458 { 2330 {
1459 GNUNET_break (0); 2331 GNUNET_break (0);
1460 return GNUNET_SYSERR; 2332 return GNUNET_SYSERR;
@@ -1473,14 +2345,44 @@ static void
1473handle_union_p2p_strata_estimator (void *cls, 2345handle_union_p2p_strata_estimator (void *cls,
1474 const struct StrataEstimatorMessage *msg) 2346 const struct StrataEstimatorMessage *msg)
1475{ 2347{
1476 perf_rtt.se.received += 1; 2348#if MEASURE_PERFORMANCE
1477 perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2349 perf_store.se.received += 1;
2350 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2351 StrataEstimatorMessage);
2352#endif
1478 struct Operation *op = cls; 2353 struct Operation *op = cls;
1479 struct StrataEstimator *remote_se; 2354 struct MultiStrataEstimator *remote_se;
1480 unsigned int diff; 2355 unsigned int diff;
1481 uint64_t other_size; 2356 uint64_t other_size;
1482 size_t len; 2357 size_t len;
1483 int is_compressed; 2358 int is_compressed;
2359 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2360 op->set->content->elements);
2361 // Setting peer site to receiving peer
2362 op->peer_site = 1;
2363
2364 /**
2365 * Check that the message is received only in supported phase
2366 */
2367 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2368 if (GNUNET_OK !=
2369 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2370 {
2371 GNUNET_break (0);
2372 fail_union_operation (op);
2373 return;
2374 }
2375
2376 /** Only allow 1,2,4,8 SEs **/
2377 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2378 {
2379 LOG (GNUNET_ERROR_TYPE_ERROR,
2380 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2381 msg->se_count);
2382 GNUNET_break_op (0);
2383 fail_union_operation (op);
2384 return;
2385 }
1484 2386
1485 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( 2387 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1486 msg->header.type)); 2388 msg->header.type));
@@ -1490,8 +2392,20 @@ handle_union_p2p_strata_estimator (void *cls,
1490 GNUNET_NO); 2392 GNUNET_NO);
1491 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); 2393 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1492 other_size = GNUNET_ntohll (msg->set_size); 2394 other_size = GNUNET_ntohll (msg->set_size);
2395 op->remote_element_count = other_size;
2396
2397 if (op->byzantine_upper_bound < op->remote_element_count)
2398 {
2399 LOG (GNUNET_ERROR_TYPE_ERROR,
2400 "Exceeded configured upper bound <%lu> of element: %u\n",
2401 op->byzantine_upper_bound,
2402 op->remote_element_count);
2403 fail_union_operation (op);
2404 return;
2405 }
2406
1493 remote_se = strata_estimator_create (SE_STRATA_COUNT, 2407 remote_se = strata_estimator_create (SE_STRATA_COUNT,
1494 SE_IBF_SIZE, 2408 SE_IBFS_TOTAL_SIZE,
1495 SE_IBF_HASH_NUM); 2409 SE_IBF_HASH_NUM);
1496 if (NULL == remote_se) 2410 if (NULL == remote_se)
1497 { 2411 {
@@ -1503,6 +2417,8 @@ handle_union_p2p_strata_estimator (void *cls,
1503 strata_estimator_read (&msg[1], 2417 strata_estimator_read (&msg[1],
1504 len, 2418 len,
1505 is_compressed, 2419 is_compressed,
2420 msg->se_count,
2421 SE_IBFS_TOTAL_SIZE,
1506 remote_se)) 2422 remote_se))
1507 { 2423 {
1508 /* decompression failed */ 2424 /* decompression failed */
@@ -1511,11 +2427,76 @@ handle_union_p2p_strata_estimator (void *cls,
1511 return; 2427 return;
1512 } 2428 }
1513 GNUNET_assert (NULL != op->se); 2429 GNUNET_assert (NULL != op->se);
1514 diff = strata_estimator_difference (remote_se, 2430 strata_estimator_difference (remote_se,
1515 op->se); 2431 op->se);
2432
2433 /* Calculate remote local diff */
2434 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2435 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2436
2437 /* Prevent estimations from overshooting max element */
2438 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2439 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2440 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2441 diff_local = op->byzantine_upper_bound - op->local_element_count;
2442 if ((diff_remote < 0) || (diff_local < 0))
2443 {
2444 strata_estimator_destroy (remote_se);
2445 LOG (GNUNET_ERROR_TYPE_ERROR,
2446 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2447 "malicious: remote diff %ld, local diff: %ld\n",
2448 diff_remote, diff_local);
2449 GNUNET_break_op (0);
2450 fail_union_operation (op);
2451 return;
2452 }
1516 2453
1517 if (diff > 200) 2454 /* Make estimation more precise in initial sync cases */
1518 diff = diff * 3 / 2; 2455 if (0 == op->remote_element_count)
2456 {
2457 diff_remote = 0;
2458 diff_local = op->local_element_count;
2459 }
2460 if (0 == op->local_element_count)
2461 {
2462 diff_local = 0;
2463 diff_remote = op->remote_element_count;
2464 }
2465
2466 diff = diff_remote + diff_local;
2467 op->remote_set_diff = diff_remote;
2468
2469 /** Calculate avg element size if not initial sync **/
2470 uint64_t avg_element_size = 0;
2471 if (0 < op->local_element_count)
2472 {
2473 op->total_elements_size_local = 0;
2474 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2475 &
2476 determinate_avg_element_size_iterator,
2477 op);
2478 avg_element_size = op->total_elements_size_local / op->local_element_count;
2479 }
2480
2481 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2482 GNUNET_CONTAINER_multihashmap_size (
2483 op->set->content->
2484 elements),
2485 op->
2486 remote_element_count,
2487 diff_remote,
2488 diff_local,
2489 op->
2490 rtt_bandwidth_tradeoff,
2491 op->
2492 ibf_bucket_number_factor);
2493
2494#if MEASURE_PERFORMANCE
2495 perf_store.se_diff_local = diff_local;
2496 perf_store.se_diff_remote = diff_remote;
2497 perf_store.se_diff = diff;
2498 perf_store.mode_of_operation = op->mode_of_operation;
2499#endif
1519 2500
1520 strata_estimator_destroy (remote_se); 2501 strata_estimator_destroy (remote_se);
1521 strata_estimator_destroy (op->se); 2502 strata_estimator_destroy (op->se);
@@ -1523,7 +2504,8 @@ handle_union_p2p_strata_estimator (void *cls,
1523 LOG (GNUNET_ERROR_TYPE_DEBUG, 2504 LOG (GNUNET_ERROR_TYPE_DEBUG,
1524 "got se diff=%d, using ibf size %d\n", 2505 "got se diff=%d, using ibf size %d\n",
1525 diff, 2506 diff,
1526 1U << get_order_from_difference (diff)); 2507 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2508 op->ibf_bucket_number_factor));
1527 2509
1528 { 2510 {
1529 char *set_debug; 2511 char *set_debug;
@@ -1546,16 +2528,8 @@ handle_union_p2p_strata_estimator (void *cls,
1546 return; 2528 return;
1547 } 2529 }
1548 2530
1549 LOG (GNUNET_ERROR_TYPE_ERROR,
1550 "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff);
1551
1552
1553 /**
1554 * Added rtt_bandwidth_tradeoff directly need future improvements
1555 */
1556 if ((GNUNET_YES == op->force_full) || 2531 if ((GNUNET_YES == op->force_full) ||
1557 (diff > op->initial_size / 4) || 2532 (op->mode_of_operation != DIFFERENTIAL_SYNC))
1558 (0 == other_size))
1559 { 2533 {
1560 LOG (GNUNET_ERROR_TYPE_DEBUG, 2534 LOG (GNUNET_ERROR_TYPE_DEBUG,
1561 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", 2535 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
@@ -1565,9 +2539,17 @@ handle_union_p2p_strata_estimator (void *cls,
1565 "# of full sends", 2539 "# of full sends",
1566 1, 2540 1,
1567 GNUNET_NO); 2541 GNUNET_NO);
1568 if ((op->initial_size <= other_size) || 2542 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
1569 (0 == other_size))
1570 { 2543 {
2544 struct TransmitFullMessage *signal_msg;
2545 struct GNUNET_MQ_Envelope *ev;
2546 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2547 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL);
2548 signal_msg->remote_set_difference = htonl (diff_local);
2549 signal_msg->remote_set_size = htonl (op->local_element_count);
2550 signal_msg->local_set_difference = htonl (diff_remote);
2551 GNUNET_MQ_send (op->mq,
2552 ev);
1571 send_full_set (op); 2553 send_full_set (op);
1572 } 2554 }
1573 else 2555 else
@@ -1577,9 +2559,15 @@ handle_union_p2p_strata_estimator (void *cls,
1577 LOG (GNUNET_ERROR_TYPE_DEBUG, 2559 LOG (GNUNET_ERROR_TYPE_DEBUG,
1578 "Telling other peer that we expect its full set\n"); 2560 "Telling other peer that we expect its full set\n");
1579 op->phase = PHASE_FULL_RECEIVING; 2561 op->phase = PHASE_FULL_RECEIVING;
1580 perf_rtt.request_full.sent += 1; 2562#if MEASURE_PERFORMANCE
1581 ev = GNUNET_MQ_msg_header ( 2563 perf_store.request_full.sent += 1;
1582 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); 2564#endif
2565 struct TransmitFullMessage *signal_msg;
2566 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2567 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
2568 signal_msg->remote_set_difference = htonl (diff_local);
2569 signal_msg->remote_set_size = htonl (op->local_element_count);
2570 signal_msg->local_set_difference = htonl (diff_remote);
1583 GNUNET_MQ_send (op->mq, 2571 GNUNET_MQ_send (op->mq,
1584 ev); 2572 ev);
1585 } 2573 }
@@ -1592,7 +2580,9 @@ handle_union_p2p_strata_estimator (void *cls,
1592 GNUNET_NO); 2580 GNUNET_NO);
1593 if (GNUNET_OK != 2581 if (GNUNET_OK !=
1594 send_ibf (op, 2582 send_ibf (op,
1595 get_order_from_difference (diff))) 2583 get_size_from_difference (diff,
2584 op->ibf_number_buckets_per_element,
2585 op->ibf_bucket_number_factor)))
1596 { 2586 {
1597 /* Internal error, best we can do is shut the connection */ 2587 /* Internal error, best we can do is shut the connection */
1598 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2588 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1625,15 +2615,64 @@ send_offers_iterator (void *cls,
1625 2615
1626 /* Detect 32-bit key collision for the 64-bit IBF keys. */ 2616 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1627 if (ke->ibf_key.key_val != sec->ibf_key.key_val) 2617 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2618 {
2619 op->active_passive_switch_required = true;
1628 return GNUNET_YES; 2620 return GNUNET_YES;
2621 }
1629 2622
1630 perf_rtt.offer.sent += 1; 2623 /* Prevent implementation from sending a offer multiple times in case of roll switch */
1631 perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); 2624 if (GNUNET_YES ==
2625 is_message_in_message_control_flow (
2626 op->message_control_flow,
2627 &ke->element->element_hash,
2628 OFFER_MESSAGE)
2629 )
2630 {
2631 LOG (GNUNET_ERROR_TYPE_DEBUG,
2632 "Skipping already sent processed element offer!\n");
2633 return GNUNET_YES;
2634 }
1632 2635
2636 /* Save send offer message for message control */
2637 if (GNUNET_YES !=
2638 update_message_control_flow (
2639 op->message_control_flow,
2640 MSG_CFS_SENT,
2641 &ke->element->element_hash,
2642 OFFER_MESSAGE)
2643 )
2644 {
2645 LOG (GNUNET_ERROR_TYPE_ERROR,
2646 "Double offer message sent found!\n");
2647 GNUNET_break (0);
2648 fail_union_operation (op);
2649 return GNUNET_NO;
2650 }
2651 ;
2652
2653 /* Mark element to be expected to received */
2654 if (GNUNET_YES !=
2655 update_message_control_flow (
2656 op->message_control_flow,
2657 MSG_CFS_EXPECTED,
2658 &ke->element->element_hash,
2659 DEMAND_MESSAGE)
2660 )
2661 {
2662 LOG (GNUNET_ERROR_TYPE_ERROR,
2663 "Double demand received found!\n");
2664 GNUNET_break (0);
2665 fail_union_operation (op);
2666 return GNUNET_NO;
2667 }
2668 ;
2669#if MEASURE_PERFORMANCE
2670 perf_store.offer.sent += 1;
2671 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2672#endif
1633 ev = GNUNET_MQ_msg_header_extra (mh, 2673 ev = GNUNET_MQ_msg_header_extra (mh,
1634 sizeof(struct GNUNET_HashCode), 2674 sizeof(struct GNUNET_HashCode),
1635 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); 2675 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
1636
1637 GNUNET_assert (NULL != ev); 2676 GNUNET_assert (NULL != ev);
1638 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; 2677 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1639 LOG (GNUNET_ERROR_TYPE_DEBUG, 2678 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1651,7 +2690,7 @@ send_offers_iterator (void *cls,
1651 * @param op union operation 2690 * @param op union operation
1652 * @param ibf_key IBF key of interest 2691 * @param ibf_key IBF key of interest
1653 */ 2692 */
1654static void 2693void
1655send_offers_for_key (struct Operation *op, 2694send_offers_for_key (struct Operation *op,
1656 struct IBF_Key ibf_key) 2695 struct IBF_Key ibf_key)
1657{ 2696{
@@ -1694,6 +2733,7 @@ decode_and_send (struct Operation *op)
1694 /* allocation failed */ 2733 /* allocation failed */
1695 return GNUNET_SYSERR; 2734 return GNUNET_SYSERR;
1696 } 2735 }
2736
1697 diff_ibf = ibf_dup (op->local_ibf); 2737 diff_ibf = ibf_dup (op->local_ibf);
1698 ibf_subtract (diff_ibf, 2738 ibf_subtract (diff_ibf,
1699 op->remote_ibf); 2739 op->remote_ibf);
@@ -1706,7 +2746,7 @@ decode_and_send (struct Operation *op)
1706 diff_ibf->size); 2746 diff_ibf->size);
1707 2747
1708 num_decoded = 0; 2748 num_decoded = 0;
1709 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ 2749 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1710 2750
1711 while (1) 2751 while (1)
1712 { 2752 {
@@ -1738,23 +2778,36 @@ decode_and_send (struct Operation *op)
1738 if ((GNUNET_SYSERR == res) || 2778 if ((GNUNET_SYSERR == res) ||
1739 (GNUNET_YES == cycle_detected)) 2779 (GNUNET_YES == cycle_detected))
1740 { 2780 {
1741 int next_order; 2781 uint32_t next_size;
1742 next_order = 0; 2782 /** Enforce odd ibf size **/
1743 while (1 << next_order < diff_ibf->size) 2783
1744 next_order++; 2784 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
1745 next_order++; 2785 diff_ibf->size);
1746 if (next_order <= MAX_IBF_ORDER) 2786 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2787 * Elias Summermatter (2021) in section 3.11 **/
2788 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2789
2790 if (next_size<ibf_min_size)
2791 next_size = ibf_min_size;
2792
2793
2794 if (next_size <= MAX_IBF_SIZE)
1747 { 2795 {
1748 LOG (GNUNET_ERROR_TYPE_DEBUG, 2796 LOG (GNUNET_ERROR_TYPE_DEBUG,
1749 "decoding failed, sending larger ibf (size %u)\n", 2797 "decoding failed, sending larger ibf (size %u)\n",
1750 1 << next_order); 2798 next_size);
1751 GNUNET_STATISTICS_update (_GSS_statistics, 2799 GNUNET_STATISTICS_update (_GSS_statistics,
1752 "# of IBF retries", 2800 "# of IBF retries",
1753 1, 2801 1,
1754 GNUNET_NO); 2802 GNUNET_NO);
1755 op->salt_send++; 2803#if MEASURE_PERFORMANCE
2804 perf_store.active_passive_switches += 1;
2805#endif
2806
2807 op->salt_send = op->salt_receive++;
2808
1756 if (GNUNET_OK != 2809 if (GNUNET_OK !=
1757 send_ibf (op, next_order)) 2810 send_ibf (op, next_size))
1758 { 2811 {
1759 /* Internal error, best we can do is shut the connection */ 2812 /* Internal error, best we can do is shut the connection */
1760 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2813 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1786,7 +2839,9 @@ decode_and_send (struct Operation *op)
1786 LOG (GNUNET_ERROR_TYPE_DEBUG, 2839 LOG (GNUNET_ERROR_TYPE_DEBUG,
1787 "transmitted all values, sending DONE\n"); 2840 "transmitted all values, sending DONE\n");
1788 2841
1789 perf_rtt.done.sent += 1; 2842#if MEASURE_PERFORMANCE
2843 perf_store.done.sent += 1;
2844#endif
1790 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 2845 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1791 GNUNET_MQ_send (op->mq, ev); 2846 GNUNET_MQ_send (op->mq, ev);
1792 /* We now wait until we get a DONE message back 2847 /* We now wait until we get a DONE message back
@@ -1797,7 +2852,6 @@ decode_and_send (struct Operation *op)
1797 if (1 == side) 2852 if (1 == side)
1798 { 2853 {
1799 struct IBF_Key unsalted_key; 2854 struct IBF_Key unsalted_key;
1800
1801 unsalt_key (&key, 2855 unsalt_key (&key,
1802 op->salt_receive, 2856 op->salt_receive,
1803 &unsalted_key); 2857 &unsalted_key);
@@ -1809,8 +2863,29 @@ decode_and_send (struct Operation *op)
1809 struct GNUNET_MQ_Envelope *ev; 2863 struct GNUNET_MQ_Envelope *ev;
1810 struct InquiryMessage *msg; 2864 struct InquiryMessage *msg;
1811 2865
1812 perf_rtt.inquery.sent += 1; 2866#if MEASURE_PERFORMANCE
1813 perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key); 2867 perf_store.inquery.sent += 1;
2868 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2869#endif
2870
2871 /** Add sent inquiries to hashmap for flow control **/
2872 struct GNUNET_HashContext *hashed_key_context =
2873 GNUNET_CRYPTO_hash_context_start ();
2874 struct GNUNET_HashCode *hashed_key = (struct
2875 GNUNET_HashCode*) GNUNET_malloc (
2876 sizeof(struct GNUNET_HashCode));
2877 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT;
2878 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2879 &key,
2880 sizeof(struct IBF_Key));
2881 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2882 hashed_key);
2883 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2884 hashed_key,
2885 &mcfs,
2886 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
2887 );
2888
1814 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth 2889 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1815 * the effort additional complexity. */ 2890 * the effort additional complexity. */
1816 ev = GNUNET_MQ_msg_extra (msg, 2891 ev = GNUNET_MQ_msg_extra (msg,
@@ -1836,6 +2911,100 @@ decode_and_send (struct Operation *op)
1836 2911
1837 2912
1838/** 2913/**
2914 * Check send full message received from other peer
2915 * @param cls
2916 * @param msg
2917 * @return
2918 */
2919
2920static int
2921check_union_p2p_send_full (void *cls,
2922 const struct TransmitFullMessage *msg)
2923{
2924 return GNUNET_OK;
2925}
2926
2927
2928/**
2929 * Handle send full message received from other peer
2930 *
2931 * @param cls
2932 * @param msg
2933 */
2934static void
2935handle_union_p2p_send_full (void *cls,
2936 const struct TransmitFullMessage *msg)
2937{
2938 struct Operation *op = cls;
2939
2940 /**
2941 * Check that the message is received only in supported phase
2942 */
2943 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2944 if (GNUNET_OK !=
2945 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2946 {
2947 GNUNET_break (0);
2948 fail_union_operation (op);
2949 return;
2950 }
2951
2952 /** write received values to operator**/
2953 op->remote_element_count = ntohl (msg->remote_set_size);
2954 op->remote_set_diff = ntohl (msg->remote_set_difference);
2955 op->local_set_diff = ntohl (msg->local_set_difference);
2956
2957 /** Check byzantine limits **/
2958 if (check_byzantine_bounds (op) != GNUNET_OK)
2959 {
2960 LOG (GNUNET_ERROR_TYPE_ERROR,
2961 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2962 "criteria\n");
2963 GNUNET_break_op (0);
2964 fail_union_operation (op);
2965 return;
2966 }
2967
2968 /** Calculate avg element size if not initial sync **/
2969 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2970 op->set->content->elements);
2971 uint64_t avg_element_size = 0;
2972 if (0 < op->local_element_count)
2973 {
2974 op->total_elements_size_local = 0;
2975 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2976 &
2977 determinate_avg_element_size_iterator,
2978 op);
2979 avg_element_size = op->total_elements_size_local / op->local_element_count;
2980 }
2981
2982 /** Validate mode of operation **/
2983 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2984 op->
2985 remote_element_count,
2986 op->
2987 local_element_count,
2988 op->local_set_diff,
2989 op->remote_set_diff,
2990 op->
2991 rtt_bandwidth_tradeoff,
2992 op->
2993 ibf_bucket_number_factor);
2994 if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation)
2995 {
2996 LOG (GNUNET_ERROR_TYPE_ERROR,
2997 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
2998 " : %d\n", mode_of_operation);
2999 GNUNET_break_op (0);
3000 fail_union_operation (op);
3001 return;
3002 }
3003 op->phase = PHASE_FULL_RECEIVING;
3004}
3005
3006
3007/**
1839 * Check an IBF message from a remote peer. 3008 * Check an IBF message from a remote peer.
1840 * 3009 *
1841 * Reassemble the IBF from multiple pieces, and 3010 * Reassemble the IBF from multiple pieces, and
@@ -1872,7 +3041,8 @@ check_union_p2p_ibf (void *cls,
1872 GNUNET_break_op (0); 3041 GNUNET_break_op (0);
1873 return GNUNET_SYSERR; 3042 return GNUNET_SYSERR;
1874 } 3043 }
1875 if (1 << msg->order != op->remote_ibf->size) 3044
3045 if (msg->ibf_size != op->remote_ibf->size)
1876 { 3046 {
1877 GNUNET_break_op (0); 3047 GNUNET_break_op (0);
1878 return GNUNET_SYSERR; 3048 return GNUNET_SYSERR;
@@ -1909,9 +3079,26 @@ handle_union_p2p_ibf (void *cls,
1909{ 3079{
1910 struct Operation *op = cls; 3080 struct Operation *op = cls;
1911 unsigned int buckets_in_message; 3081 unsigned int buckets_in_message;
3082 /**
3083 * Check that the message is received only in supported phase
3084 */
3085 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3086 PHASE_PASSIVE_DECODING};
3087 if (GNUNET_OK !=
3088 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3089 {
3090 GNUNET_break (0);
3091 fail_union_operation (op);
3092 return;
3093 }
3094 op->differential_sync_iterations++;
3095 check_max_differential_rounds (op);
3096 op->active_passive_switch_required = false;
1912 3097
1913 perf_rtt.ibf.received += 1; 3098#if MEASURE_PERFORMANCE
1914 perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); 3099 perf_store.ibf.received += 1;
3100 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3101#endif
1915 3102
1916 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) 3103 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1917 / IBF_BUCKET_SIZE; 3104 / IBF_BUCKET_SIZE;
@@ -1922,8 +3109,10 @@ handle_union_p2p_ibf (void *cls,
1922 GNUNET_assert (NULL == op->remote_ibf); 3109 GNUNET_assert (NULL == op->remote_ibf);
1923 LOG (GNUNET_ERROR_TYPE_DEBUG, 3110 LOG (GNUNET_ERROR_TYPE_DEBUG,
1924 "Creating new ibf of size %u\n", 3111 "Creating new ibf of size %u\n",
1925 1 << msg->order); 3112 ntohl (msg->ibf_size));
1926 op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); 3113 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3114 op->remote_ibf = ibf_create (msg->ibf_size,
3115 ((uint8_t) op->ibf_number_buckets_per_element));
1927 op->salt_receive = ntohl (msg->salt); 3116 op->salt_receive = ntohl (msg->salt);
1928 LOG (GNUNET_ERROR_TYPE_DEBUG, 3117 LOG (GNUNET_ERROR_TYPE_DEBUG,
1929 "Receiving new IBF with salt %u\n", 3118 "Receiving new IBF with salt %u\n",
@@ -1954,7 +3143,7 @@ handle_union_p2p_ibf (void *cls,
1954 ibf_read_slice (&msg[1], 3143 ibf_read_slice (&msg[1],
1955 op->ibf_buckets_received, 3144 op->ibf_buckets_received,
1956 buckets_in_message, 3145 buckets_in_message,
1957 op->remote_ibf); 3146 op->remote_ibf, msg->ibf_counter_bit_length);
1958 op->ibf_buckets_received += buckets_in_message; 3147 op->ibf_buckets_received += buckets_in_message;
1959 3148
1960 if (op->ibf_buckets_received == op->remote_ibf->size) 3149 if (op->ibf_buckets_received == op->remote_ibf->size)
@@ -2030,18 +3219,24 @@ maybe_finish (struct Operation *op)
2030 3219
2031 num_demanded = GNUNET_CONTAINER_multihashmap_size ( 3220 num_demanded = GNUNET_CONTAINER_multihashmap_size (
2032 op->demanded_hashes); 3221 op->demanded_hashes);
2033 3222 int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3223 op->message_control_flow,
3224 &
3225 determinate_done_message_iterator,
3226 op);
2034 if (PHASE_FINISH_WAITING == op->phase) 3227 if (PHASE_FINISH_WAITING == op->phase)
2035 { 3228 {
2036 LOG (GNUNET_ERROR_TYPE_DEBUG, 3229 LOG (GNUNET_ERROR_TYPE_DEBUG,
2037 "In PHASE_FINISH_WAITING, pending %u demands\n", 3230 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
2038 num_demanded); 3231 num_demanded, op->peer_site);
2039 if (0 == num_demanded) 3232 if (-1 != send_done)
2040 { 3233 {
2041 struct GNUNET_MQ_Envelope *ev; 3234 struct GNUNET_MQ_Envelope *ev;
2042 3235
2043 op->phase = PHASE_FINISHED; 3236 op->phase = PHASE_FINISHED;
2044 perf_rtt.done.sent += 1; 3237#if MEASURE_PERFORMANCE
3238 perf_store.done.sent += 1;
3239#endif
2045 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); 3240 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
2046 GNUNET_MQ_send (op->mq, 3241 GNUNET_MQ_send (op->mq,
2047 ev); 3242 ev);
@@ -2052,9 +3247,9 @@ maybe_finish (struct Operation *op)
2052 if (PHASE_FINISH_CLOSING == op->phase) 3247 if (PHASE_FINISH_CLOSING == op->phase)
2053 { 3248 {
2054 LOG (GNUNET_ERROR_TYPE_DEBUG, 3249 LOG (GNUNET_ERROR_TYPE_DEBUG,
2055 "In PHASE_FINISH_CLOSING, pending %u demands\n", 3250 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
2056 num_demanded); 3251 num_demanded, op->peer_site);
2057 if (0 == num_demanded) 3252 if (-1 != send_done)
2058 { 3253 {
2059 op->phase = PHASE_FINISHED; 3254 op->phase = PHASE_FINISHED;
2060 send_client_done (op); 3255 send_client_done (op);
@@ -2102,11 +3297,25 @@ handle_union_p2p_elements (void *cls,
2102 struct KeyEntry *ke; 3297 struct KeyEntry *ke;
2103 uint16_t element_size; 3298 uint16_t element_size;
2104 3299
3300 /**
3301 * Check that the message is received only in supported phase
3302 */
3303 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3304 PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING};
3305 if (GNUNET_OK !=
3306 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3307 {
3308 GNUNET_break (0);
3309 fail_union_operation (op);
3310 return;
3311 }
2105 3312
2106 element_size = ntohs (emsg->header.size) - sizeof(struct 3313 element_size = ntohs (emsg->header.size) - sizeof(struct
2107 GNUNET_SETU_ElementMessage); 3314 GNUNET_SETU_ElementMessage);
2108 perf_rtt.element.received += 1; 3315#if MEASURE_PERFORMANCE
2109 perf_rtt.element.received_var_bytes += element_size; 3316 perf_store.element.received += 1;
3317 perf_store.element.received_var_bytes += element_size;
3318#endif
2110 3319
2111 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3320 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2112 GNUNET_memcpy (&ee[1], 3321 GNUNET_memcpy (&ee[1],
@@ -2129,6 +3338,21 @@ handle_union_p2p_elements (void *cls,
2129 return; 3338 return;
2130 } 3339 }
2131 3340
3341 if (GNUNET_OK !=
3342 update_message_control_flow (
3343 op->message_control_flow,
3344 MSG_CFS_RECEIVED,
3345 &ee->element_hash,
3346 ELEMENT_MESSAGE)
3347 )
3348 {
3349 LOG (GNUNET_ERROR_TYPE_ERROR,
3350 "An element has been received more than once!\n");
3351 GNUNET_break (0);
3352 fail_union_operation (op);
3353 return;
3354 }
3355
2132 LOG (GNUNET_ERROR_TYPE_DEBUG, 3356 LOG (GNUNET_ERROR_TYPE_DEBUG,
2133 "Got element (size %u, hash %s) from peer\n", 3357 "Got element (size %u, hash %s) from peer\n",
2134 (unsigned int) element_size, 3358 (unsigned int) element_size,
@@ -2217,33 +3441,25 @@ handle_union_p2p_full_element (void *cls,
2217 struct KeyEntry *ke; 3441 struct KeyEntry *ke;
2218 uint16_t element_size; 3442 uint16_t element_size;
2219 3443
2220 3444 /**
2221 3445 * Check that the message is received only in supported phase
2222 if(PHASE_EXPECT_IBF == op->phase) { 3446 */
2223 op->phase = PHASE_FULL_RECEIVING; 3447 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
2224 } 3448 if (GNUNET_OK !=
2225 3449 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2226
2227
2228 /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */
2229 if ((PHASE_FULL_RECEIVING != op->phase) &&
2230 (PHASE_FULL_SENDING != op->phase))
2231 { 3450 {
2232 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 3451 GNUNET_break (0);
2233 "Handle full element phase is %u\n", 3452 fail_union_operation (op);
2234 (unsigned) op->phase); 3453 return;
2235 GNUNET_break_op (0);
2236 fail_union_operation (op);
2237 return;
2238 } 3454 }
2239 3455
2240
2241
2242 element_size = ntohs (emsg->header.size) 3456 element_size = ntohs (emsg->header.size)
2243 - sizeof(struct GNUNET_SETU_ElementMessage); 3457 - sizeof(struct GNUNET_SETU_ElementMessage);
2244 3458
2245 perf_rtt.element_full.received += 1; 3459#if MEASURE_PERFORMANCE
2246 perf_rtt.element_full.received_var_bytes += element_size; 3460 perf_store.element_full.received += 1;
3461 perf_store.element_full.received_var_bytes += element_size;
3462#endif
2247 3463
2248 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); 3464 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2249 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 3465 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
@@ -2268,17 +3484,15 @@ handle_union_p2p_full_element (void *cls,
2268 GNUNET_NO); 3484 GNUNET_NO);
2269 3485
2270 op->received_total++; 3486 op->received_total++;
2271
2272 ke = op_get_element (op, 3487 ke = op_get_element (op,
2273 &ee->element_hash); 3488 &ee->element_hash);
2274 if (NULL != ke) 3489 if (NULL != ke)
2275 { 3490 {
2276 /* Got repeated element. Should not happen since
2277 * we track demands. */
2278 GNUNET_STATISTICS_update (_GSS_statistics, 3491 GNUNET_STATISTICS_update (_GSS_statistics,
2279 "# repeated elements", 3492 "# repeated elements",
2280 1, 3493 1,
2281 GNUNET_NO); 3494 GNUNET_NO);
3495 full_sync_plausibility_check (op);
2282 ke->received = GNUNET_YES; 3496 ke->received = GNUNET_YES;
2283 GNUNET_free (ee); 3497 GNUNET_free (ee);
2284 } 3498 }
@@ -2294,15 +3508,15 @@ handle_union_p2p_full_element (void *cls,
2294 GNUNET_SETU_STATUS_ADD_LOCAL); 3508 GNUNET_SETU_STATUS_ADD_LOCAL);
2295 } 3509 }
2296 3510
3511
2297 if ((GNUNET_YES == op->byzantine) && 3512 if ((GNUNET_YES == op->byzantine) &&
2298 (op->received_total > 384 + op->received_fresh * 4) && 3513 (op->received_total > op->remote_element_count) )
2299 (op->received_fresh < op->received_total / 6))
2300 { 3514 {
2301 /* The other peer gave us lots of old elements, there's something wrong. */ 3515 /* The other peer gave us lots of old elements, there's something wrong. */
2302 LOG (GNUNET_ERROR_TYPE_ERROR, 3516 LOG (GNUNET_ERROR_TYPE_ERROR,
2303 "Other peer sent only %llu/%llu fresh elements, failing operation\n", 3517 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
2304 (unsigned long long) op->received_fresh, 3518 (unsigned long long) op->received_total,
2305 (unsigned long long) op->received_total); 3519 (unsigned long long) op->remote_element_count);
2306 GNUNET_break_op (0); 3520 GNUNET_break_op (0);
2307 fail_union_operation (op); 3521 fail_union_operation (op);
2308 return; 3522 return;
@@ -2356,18 +3570,50 @@ handle_union_p2p_inquiry (void *cls,
2356 const struct IBF_Key *ibf_key; 3570 const struct IBF_Key *ibf_key;
2357 unsigned int num_keys; 3571 unsigned int num_keys;
2358 3572
2359 perf_rtt.inquery.received += 1; 3573 /**
2360 perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage)); 3574 * Check that the message is received only in supported phase
3575 */
3576 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3577 if (GNUNET_OK !=
3578 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3579 {
3580 GNUNET_break (0);
3581 fail_union_operation (op);
3582 return;
3583 }
3584
3585#if MEASURE_PERFORMANCE
3586 perf_store.inquery.received += 1;
3587 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3588 - sizeof(struct InquiryMessage));
3589#endif
2361 3590
2362 LOG (GNUNET_ERROR_TYPE_DEBUG, 3591 LOG (GNUNET_ERROR_TYPE_DEBUG,
2363 "Received union inquiry\n"); 3592 "Received union inquiry\n");
2364 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) 3593 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2365 / sizeof(struct IBF_Key); 3594 / sizeof(struct IBF_Key);
2366 ibf_key = (const struct IBF_Key *) &msg[1]; 3595 ibf_key = (const struct IBF_Key *) &msg[1];
3596
3597 /** Add received inquiries to hashmap for flow control **/
3598 struct GNUNET_HashContext *hashed_key_context =
3599 GNUNET_CRYPTO_hash_context_start ();
3600 struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3601 sizeof(struct GNUNET_HashCode));;
3602 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED;
3603 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3604 &ibf_key,
3605 sizeof(struct IBF_Key));
3606 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3607 hashed_key);
3608 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3609 hashed_key,
3610 &mcfs,
3611 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
3612 );
3613
2367 while (0 != num_keys--) 3614 while (0 != num_keys--)
2368 { 3615 {
2369 struct IBF_Key unsalted_key; 3616 struct IBF_Key unsalted_key;
2370
2371 unsalt_key (ibf_key, 3617 unsalt_key (ibf_key,
2372 ntohl (msg->salt), 3618 ntohl (msg->salt),
2373 &unsalted_key); 3619 &unsalted_key);
@@ -2402,7 +3648,9 @@ send_missing_full_elements_iter (void *cls,
2402 3648
2403 if (GNUNET_YES == ke->received) 3649 if (GNUNET_YES == ke->received)
2404 return GNUNET_YES; 3650 return GNUNET_YES;
2405 perf_rtt.element_full.received += 1; 3651#if MEASURE_PERFORMANCE
3652 perf_store.element_full.received += 1;
3653#endif
2406 ev = GNUNET_MQ_msg_extra (emsg, 3654 ev = GNUNET_MQ_msg_extra (emsg,
2407 ee->element.size, 3655 ee->element.size,
2408 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); 3656 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -2422,18 +3670,84 @@ send_missing_full_elements_iter (void *cls,
2422 * @param cls closure, a set union operation 3670 * @param cls closure, a set union operation
2423 * @param mh the demand message 3671 * @param mh the demand message
2424 */ 3672 */
3673static int
3674check_union_p2p_request_full (void *cls,
3675 const struct TransmitFullMessage *mh)
3676{
3677 return GNUNET_OK;
3678}
3679
3680
2425static void 3681static void
2426handle_union_p2p_request_full (void *cls, 3682handle_union_p2p_request_full (void *cls,
2427 const struct GNUNET_MessageHeader *mh) 3683 const struct TransmitFullMessage *msg)
2428{ 3684{
2429 struct Operation *op = cls; 3685 struct Operation *op = cls;
2430 3686
2431 perf_rtt.request_full.received += 1; 3687 /**
3688 * Check that the message is received only in supported phase
3689 */
3690 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3691 if (GNUNET_OK !=
3692 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3693 {
3694 GNUNET_break (0);
3695 fail_union_operation (op);
3696 return;
3697 }
2432 3698
2433 LOG (GNUNET_ERROR_TYPE_DEBUG, 3699 op->remote_element_count = ntohl (msg->remote_set_size);
3700 op->remote_set_diff = ntohl (msg->remote_set_difference);
3701 op->local_set_diff = ntohl (msg->local_set_difference);
3702
3703
3704 if (check_byzantine_bounds (op) != GNUNET_OK)
3705 {
3706 LOG (GNUNET_ERROR_TYPE_ERROR,
3707 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3708 "criteria\n");
3709 GNUNET_break_op (0);
3710 fail_union_operation (op);
3711 return;
3712 }
3713
3714#if MEASURE_PERFORMANCE
3715 perf_store.request_full.received += 1;
3716#endif
3717
3718 LOG (GNUNET_ERROR_TYPE_DEBUG,
2434 "Received request for full set transmission\n"); 3719 "Received request for full set transmission\n");
2435 if (PHASE_EXPECT_IBF != op->phase) 3720
3721 /** Calculate avg element size if not initial sync **/
3722 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3723 op->set->content->elements);
3724 uint64_t avg_element_size = 0;
3725 if (0 < op->local_element_count)
3726 {
3727 op->total_elements_size_local = 0;
3728 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3729 &
3730 determinate_avg_element_size_iterator,
3731 op);
3732 avg_element_size = op->total_elements_size_local / op->local_element_count;
3733 }
3734
3735 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3736 op->
3737 remote_element_count,
3738 op->
3739 local_element_count,
3740 op->local_set_diff,
3741 op->remote_set_diff,
3742 op->
3743 rtt_bandwidth_tradeoff,
3744 op->
3745 ibf_bucket_number_factor);
3746 if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation)
2436 { 3747 {
3748 LOG (GNUNET_ERROR_TYPE_ERROR,
3749 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3750 " : %d\n", mode_of_operation);
2437 GNUNET_break_op (0); 3751 GNUNET_break_op (0);
2438 fail_union_operation (op); 3752 fail_union_operation (op);
2439 return; 3753 return;
@@ -2458,7 +3772,21 @@ handle_union_p2p_full_done (void *cls,
2458{ 3772{
2459 struct Operation *op = cls; 3773 struct Operation *op = cls;
2460 3774
2461 perf_rtt.full_done.received += 1; 3775 /**
3776 * Check that the message is received only in supported phase
3777 */
3778 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3779 if (GNUNET_OK !=
3780 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3781 {
3782 GNUNET_break (0);
3783 fail_union_operation (op);
3784 return;
3785 }
3786
3787#if MEASURE_PERFORMANCE
3788 perf_store.full_done.received += 1;
3789#endif
2462 3790
2463 switch (op->phase) 3791 switch (op->phase)
2464 { 3792 {
@@ -2466,6 +3794,19 @@ handle_union_p2p_full_done (void *cls,
2466 { 3794 {
2467 struct GNUNET_MQ_Envelope *ev; 3795 struct GNUNET_MQ_Envelope *ev;
2468 3796
3797 if ((GNUNET_YES == op->byzantine) &&
3798 (op->received_total != op->remote_element_count) )
3799 {
3800 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3801 LOG (GNUNET_ERROR_TYPE_ERROR,
3802 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3803 (unsigned long long) op->received_total,
3804 (unsigned long long) op->remote_element_count);
3805 GNUNET_break_op (0);
3806 fail_union_operation (op);
3807 return;
3808 }
3809
2469 LOG (GNUNET_ERROR_TYPE_DEBUG, 3810 LOG (GNUNET_ERROR_TYPE_DEBUG,
2470 "got FULL DONE, sending elements that other peer is missing\n"); 3811 "got FULL DONE, sending elements that other peer is missing\n");
2471 3812
@@ -2473,7 +3814,9 @@ handle_union_p2p_full_done (void *cls,
2473 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, 3814 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
2474 &send_missing_full_elements_iter, 3815 &send_missing_full_elements_iter,
2475 op); 3816 op);
2476 perf_rtt.full_done.sent += 1; 3817#if MEASURE_PERFORMANCE
3818 perf_store.full_done.sent += 1;
3819#endif
2477 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); 3820 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
2478 GNUNET_MQ_send (op->mq, 3821 GNUNET_MQ_send (op->mq,
2479 ev); 3822 ev);
@@ -2552,8 +3895,23 @@ handle_union_p2p_demand (void *cls,
2552 unsigned int num_hashes; 3895 unsigned int num_hashes;
2553 struct GNUNET_MQ_Envelope *ev; 3896 struct GNUNET_MQ_Envelope *ev;
2554 3897
2555 perf_rtt.demand.received += 1; 3898 /**
2556 perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 3899 * Check that the message is received only in supported phase
3900 */
3901 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3902 PHASE_FINISH_WAITING};
3903 if (GNUNET_OK !=
3904 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3905 {
3906 GNUNET_break (0);
3907 fail_union_operation (op);
3908 return;
3909 }
3910#if MEASURE_PERFORMANCE
3911 perf_store.demand.received += 1;
3912 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3913 GNUNET_MessageHeader));
3914#endif
2557 3915
2558 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 3916 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2559 / sizeof(struct GNUNET_HashCode); 3917 / sizeof(struct GNUNET_HashCode);
@@ -2570,6 +3928,39 @@ handle_union_p2p_demand (void *cls,
2570 fail_union_operation (op); 3928 fail_union_operation (op);
2571 return; 3929 return;
2572 } 3930 }
3931
3932 /* Save send demand message for message control */
3933 if (GNUNET_YES !=
3934 update_message_control_flow (
3935 op->message_control_flow,
3936 MSG_CFS_RECEIVED,
3937 &ee->element_hash,
3938 DEMAND_MESSAGE)
3939 )
3940 {
3941 LOG (GNUNET_ERROR_TYPE_ERROR,
3942 "Double demand message received found!\n");
3943 GNUNET_break (0);
3944 fail_union_operation (op);
3945 return;
3946 }
3947 ;
3948
3949 /* Mark element to be expected to received */
3950 if (GNUNET_YES !=
3951 update_message_control_flow (
3952 op->message_control_flow,
3953 MSG_CFS_SENT,
3954 &ee->element_hash,
3955 ELEMENT_MESSAGE)
3956 )
3957 {
3958 LOG (GNUNET_ERROR_TYPE_ERROR,
3959 "Double element message sent found!\n");
3960 GNUNET_break (0);
3961 fail_union_operation (op);
3962 return;
3963 }
2573 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) 3964 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2574 { 3965 {
2575 /* Probably confused lazily copied sets. */ 3966 /* Probably confused lazily copied sets. */
@@ -2577,8 +3968,10 @@ handle_union_p2p_demand (void *cls,
2577 fail_union_operation (op); 3968 fail_union_operation (op);
2578 return; 3969 return;
2579 } 3970 }
2580 perf_rtt.element.sent += 1; 3971#if MEASURE_PERFORMANCE
2581 perf_rtt.element.sent_var_bytes += ee->element.size; 3972 perf_store.element.sent += 1;
3973 perf_store.element.sent_var_bytes += ee->element.size;
3974#endif
2582 ev = GNUNET_MQ_msg_extra (emsg, 3975 ev = GNUNET_MQ_msg_extra (emsg,
2583 ee->element.size, 3976 ee->element.size,
2584 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); 3977 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
@@ -2600,9 +3993,10 @@ handle_union_p2p_demand (void *cls,
2600 if (op->symmetric) 3993 if (op->symmetric)
2601 send_client_element (op, 3994 send_client_element (op,
2602 &ee->element, 3995 &ee->element,
2603 GNUNET_SET_STATUS_ADD_REMOTE); 3996 GNUNET_SETU_STATUS_ADD_REMOTE);
2604 } 3997 }
2605 GNUNET_CADET_receive_done (op->channel); 3998 GNUNET_CADET_receive_done (op->channel);
3999 maybe_finish (op);
2606} 4000}
2607 4001
2608 4002
@@ -2653,9 +4047,23 @@ handle_union_p2p_offer (void *cls,
2653 struct Operation *op = cls; 4047 struct Operation *op = cls;
2654 const struct GNUNET_HashCode *hash; 4048 const struct GNUNET_HashCode *hash;
2655 unsigned int num_hashes; 4049 unsigned int num_hashes;
4050 /**
4051 * Check that the message is received only in supported phase
4052 */
4053 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4054 if (GNUNET_OK !=
4055 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4056 {
4057 GNUNET_break (0);
4058 fail_union_operation (op);
4059 return;
4060 }
2656 4061
2657 perf_rtt.offer.received += 1; 4062#if MEASURE_PERFORMANCE
2658 perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); 4063 perf_store.offer.received += 1;
4064 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4065 GNUNET_MessageHeader));
4066#endif
2659 4067
2660 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) 4068 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2661 / sizeof(struct GNUNET_HashCode); 4069 / sizeof(struct GNUNET_HashCode);
@@ -2693,11 +4101,68 @@ handle_union_p2p_offer (void *cls,
2693 "[OP %p] Requesting element (hash %s)\n", 4101 "[OP %p] Requesting element (hash %s)\n",
2694 op, GNUNET_h2s (hash)); 4102 op, GNUNET_h2s (hash));
2695 4103
2696 perf_rtt.demand.sent += 1; 4104#if MEASURE_PERFORMANCE
2697 perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); 4105 perf_store.demand.sent += 1;
4106 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4107#endif
2698 ev = GNUNET_MQ_msg_header_extra (demands, 4108 ev = GNUNET_MQ_msg_header_extra (demands,
2699 sizeof(struct GNUNET_HashCode), 4109 sizeof(struct GNUNET_HashCode),
2700 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); 4110 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
4111 /* Save send demand message for message control */
4112 if (GNUNET_YES !=
4113 update_message_control_flow (
4114 op->message_control_flow,
4115 MSG_CFS_SENT,
4116 hash,
4117 DEMAND_MESSAGE)
4118 )
4119 {
4120 // GNUNET_free (ev);
4121 LOG (GNUNET_ERROR_TYPE_ERROR,
4122 "Double demand message sent found!\n");
4123 GNUNET_break (0);
4124 fail_union_operation (op);
4125 return;
4126 }
4127 ;
4128
4129 /* Mark offer as received received */
4130 if (GNUNET_YES !=
4131 update_message_control_flow (
4132 op->message_control_flow,
4133 MSG_CFS_RECEIVED,
4134 hash,
4135 OFFER_MESSAGE)
4136 )
4137 {
4138 // GNUNET_free (ev);
4139 LOG (GNUNET_ERROR_TYPE_ERROR,
4140 "Double offer message received found!\n");
4141 GNUNET_break (0);
4142 fail_union_operation (op);
4143 return;
4144 }
4145 ;
4146
4147 /* Mark element to be expected to received */
4148 if (GNUNET_YES !=
4149 update_message_control_flow (
4150 op->message_control_flow,
4151 MSG_CFS_EXPECTED,
4152 hash,
4153 ELEMENT_MESSAGE)
4154 )
4155 {
4156 // GNUNET_free (ev);
4157 LOG (GNUNET_ERROR_TYPE_ERROR,
4158 "Element already expected!\n");
4159 GNUNET_break (0);
4160 fail_union_operation (op);
4161 return;
4162 }
4163 ;
4164
4165
2701 GNUNET_memcpy (&demands[1], 4166 GNUNET_memcpy (&demands[1],
2702 hash, 4167 hash,
2703 sizeof(struct GNUNET_HashCode)); 4168 sizeof(struct GNUNET_HashCode));
@@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls,
2719{ 4184{
2720 struct Operation *op = cls; 4185 struct Operation *op = cls;
2721 4186
2722 perf_rtt.done.received += 1; 4187 /**
4188 * Check that the message is received only in supported phase
4189 */
4190 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4191 if (GNUNET_OK !=
4192 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4193 {
4194 GNUNET_break (0);
4195 fail_union_operation (op);
4196 return;
4197 }
4198
4199 if (op->active_passive_switch_required)
4200 {
4201 LOG (GNUNET_ERROR_TYPE_ERROR,
4202 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4203 GNUNET_break (0);
4204 fail_union_operation (op);
4205 return;
4206 }
4207
4208#if MEASURE_PERFORMANCE
4209 perf_store.done.received += 1;
4210#endif
2723 switch (op->phase) 4211 switch (op->phase)
2724 { 4212 {
2725 case PHASE_PASSIVE_DECODING: 4213 case PHASE_PASSIVE_DECODING:
@@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls,
2728 LOG (GNUNET_ERROR_TYPE_DEBUG, 4216 LOG (GNUNET_ERROR_TYPE_DEBUG,
2729 "got DONE (as passive partner), waiting for our demands to be satisfied\n"); 4217 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2730 /* The active peer is done sending offers 4218 /* The active peer is done sending offers
2731 * and inquiries. This means that all 4219 * and inquiries. This means that all
2732 * our responses to that (demands and offers) 4220 * our responses to that (demands and offers)
2733 * must be in flight (queued or in mesh). 4221 * must be in flight (queued or in mesh).
2734 * 4222 *
2735 * We should notify the active peer once 4223 * We should notify the active peer once
2736 * all our demands are satisfied, so that the active 4224 * all our demands are satisfied, so that the active
2737 * peer can quit if we gave it everything. 4225 * peer can quit if we gave it everything.
2738 */GNUNET_CADET_receive_done (op->channel); 4226 */GNUNET_CADET_receive_done (op->channel);
2739 maybe_finish (op); 4227 maybe_finish (op);
2740 return; 4228 return;
2741 case PHASE_ACTIVE_DECODING: 4229 case PHASE_ACTIVE_DECODING:
2742 LOG (GNUNET_ERROR_TYPE_DEBUG, 4230 LOG (GNUNET_ERROR_TYPE_DEBUG,
2743 "got DONE (as active partner), waiting to finish\n"); 4231 "got DONE (as active partner), waiting to finish\n");
2744 /* All demands of the other peer are satisfied, 4232 /* All demands of the other peer are satisfied,
2745 * and we processed all offers, thus we know 4233 * and we processed all offers, thus we know
2746 * exactly what our demands must be. 4234 * exactly what our demands must be.
2747 * 4235 *
2748 * We'll close the channel 4236 * We'll close the channel
2749 * to the other peer once our demands are met. 4237 * to the other peer once our demands are met.
2750 */op->phase = PHASE_FINISH_CLOSING; 4238 */op->phase = PHASE_FINISH_CLOSING;
2751 GNUNET_CADET_receive_done (op->channel); 4239 GNUNET_CADET_receive_done (op->channel);
2752 maybe_finish (op); 4240 maybe_finish (op);
2753 return; 4241 return;
@@ -2769,7 +4257,9 @@ static void
2769handle_union_p2p_over (void *cls, 4257handle_union_p2p_over (void *cls,
2770 const struct GNUNET_MessageHeader *mh) 4258 const struct GNUNET_MessageHeader *mh)
2771{ 4259{
2772 perf_rtt.over.received += 1; 4260#if MEASURE_PERFORMANCE
4261 perf_store.over.received += 1;
4262#endif
2773 send_client_done (cls); 4263 send_client_done (cls);
2774} 4264}
2775 4265
@@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls,
2943 struct Listener *listener = op->listener; 4433 struct Listener *listener = op->listener;
2944 const struct GNUNET_MessageHeader *nested_context; 4434 const struct GNUNET_MessageHeader *nested_context;
2945 4435
2946 /* double operation request */ 4436 /* double operation request */
2947 if (0 != op->suggest_id) 4437 if (0 != op->suggest_id)
2948 { 4438 {
2949 GNUNET_break_op (0); 4439 GNUNET_break_op (0);
@@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls,
3053 } 4543 }
3054 set = GNUNET_new (struct Set); 4544 set = GNUNET_new (struct Set);
3055 { 4545 {
3056 struct StrataEstimator *se; 4546 struct MultiStrataEstimator *se;
3057 4547
3058 se = strata_estimator_create (SE_STRATA_COUNT, 4548 se = strata_estimator_create (SE_STRATA_COUNT,
3059 SE_IBF_SIZE, 4549 SE_IBFS_TOTAL_SIZE,
3060 SE_IBF_HASH_NUM); 4550 SE_IBF_HASH_NUM);
3061 if (NULL == se) 4551 if (NULL == se)
3062 { 4552 {
@@ -3198,6 +4688,7 @@ channel_window_cb (void *cls,
3198 * @param cls client that sent the message 4688 * @param cls client that sent the message
3199 * @param msg message sent by the client 4689 * @param msg message sent by the client
3200 */ 4690 */
4691
3201static void 4692static void
3202handle_client_listen (void *cls, 4693handle_client_listen (void *cls,
3203 const struct GNUNET_SETU_ListenMessage *msg) 4694 const struct GNUNET_SETU_ListenMessage *msg)
@@ -3240,10 +4731,10 @@ handle_client_listen (void *cls,
3240 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4731 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3241 struct GNUNET_MessageHeader, 4732 struct GNUNET_MessageHeader,
3242 NULL), 4733 NULL),
3243 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4734 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3244 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4735 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3245 struct GNUNET_MessageHeader, 4736 struct TransmitFullMessage,
3246 NULL), 4737 NULL),
3247 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4738 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3248 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4739 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3249 struct StrataEstimatorMessage, 4740 struct StrataEstimatorMessage,
@@ -3256,6 +4747,10 @@ handle_client_listen (void *cls,
3256 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 4747 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3257 struct GNUNET_SETU_ElementMessage, 4748 struct GNUNET_SETU_ElementMessage,
3258 NULL), 4749 NULL),
4750 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4751 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
4752 struct TransmitFullMessage,
4753 NULL),
3259 GNUNET_MQ_handler_end () 4754 GNUNET_MQ_handler_end ()
3260 }; 4755 };
3261 struct Listener *listener; 4756 struct Listener *listener;
@@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls,
3451{ 4946{
3452 struct ClientState *cs = cls; 4947 struct ClientState *cs = cls;
3453 struct Operation *op = GNUNET_new (struct Operation); 4948 struct Operation *op = GNUNET_new (struct Operation);
4949
3454 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 4950 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3455 GNUNET_MQ_hd_var_size (incoming_msg, 4951 GNUNET_MQ_hd_var_size (incoming_msg,
3456 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 4952 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
@@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls,
3488 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, 4984 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3489 struct GNUNET_MessageHeader, 4985 struct GNUNET_MessageHeader,
3490 op), 4986 op),
3491 GNUNET_MQ_hd_fixed_size (union_p2p_request_full, 4987 GNUNET_MQ_hd_var_size (union_p2p_request_full,
3492 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, 4988 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3493 struct GNUNET_MessageHeader, 4989 struct TransmitFullMessage,
3494 op), 4990 op),
3495 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, 4991 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3496 GNUNET_MESSAGE_TYPE_SETU_P2P_SE, 4992 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3497 struct StrataEstimatorMessage, 4993 struct StrataEstimatorMessage,
@@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls,
3504 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, 5000 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3505 struct GNUNET_SETU_ElementMessage, 5001 struct GNUNET_SETU_ElementMessage,
3506 op), 5002 op),
5003 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5004 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
5005 struct TransmitFullMessage,
5006 NULL),
3507 GNUNET_MQ_handler_end () 5007 GNUNET_MQ_handler_end ()
3508 }; 5008 };
3509 struct Set *set; 5009 struct Set *set;
@@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls,
3525 op->force_full = msg->force_full; 5025 op->force_full = msg->force_full;
3526 op->force_delta = msg->force_delta; 5026 op->force_delta = msg->force_delta;
3527 op->symmetric = msg->symmetric; 5027 op->symmetric = msg->symmetric;
5028 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5029 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5030 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5031 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5032 op->active_passive_switch_required = false;
3528 context = GNUNET_MQ_extract_nested_mh (msg); 5033 context = GNUNET_MQ_extract_nested_mh (msg);
3529 5034
5035 /* create hashmap for message control */
5036 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5037 GNUNET_NO);
5038 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5039
5040#if MEASURE_PERFORMANCE
5041 /* load config */
5042 load_config (op);
5043#endif
5044
3530 /* Advance generation values, so that 5045 /* Advance generation values, so that
3531 mutations won't interfer with the running operation. */ 5046 mutations won't interfer with the running operation. */
3532 op->set = set; 5047 op->set = set;
@@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls,
3550 struct GNUNET_MQ_Envelope *ev; 5065 struct GNUNET_MQ_Envelope *ev;
3551 struct OperationRequestMessage *msg; 5066 struct OperationRequestMessage *msg;
3552 5067
3553 perf_rtt.operation_request.sent += 1; 5068#if MEASURE_PERFORMANCE
5069 perf_store.operation_request.sent += 1;
5070#endif
3554 ev = GNUNET_MQ_msg_nested_mh (msg, 5071 ev = GNUNET_MQ_msg_nested_mh (msg,
3555 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, 5072 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3556 context); 5073 context);
@@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls,
3567 op->se = strata_estimator_dup (op->set->se); 5084 op->se = strata_estimator_dup (op->set->se);
3568 /* we started the operation, thus we have to send the operation request */ 5085 /* we started the operation, thus we have to send the operation request */
3569 op->phase = PHASE_EXPECT_SE; 5086 op->phase = PHASE_EXPECT_SE;
3570 op->salt_receive = op->salt_send = 42; // FIXME????? 5087
5088 op->salt_receive = (op->peer_site + 1) % 2;
5089 op->salt_send = op->peer_site; // FIXME?????
5090
5091
3571 LOG (GNUNET_ERROR_TYPE_DEBUG, 5092 LOG (GNUNET_ERROR_TYPE_DEBUG,
3572 "Initiating union operation evaluation\n"); 5093 "Initiating union operation evaluation\n");
3573 GNUNET_STATISTICS_update (_GSS_statistics, 5094 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -3711,6 +5232,20 @@ handle_client_accept (void *cls,
3711 op->force_full = msg->force_full; 5232 op->force_full = msg->force_full;
3712 op->force_delta = msg->force_delta; 5233 op->force_delta = msg->force_delta;
3713 op->symmetric = msg->symmetric; 5234 op->symmetric = msg->symmetric;
5235 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5236 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5237 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5238 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5239 op->active_passive_switch_required = false;
5240 /* create hashmap for message control */
5241 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5242 GNUNET_NO);
5243 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5244
5245#if MEASURE_PERFORMANCE
5246 /* load config */
5247 load_config (op);
5248#endif
3714 5249
3715 /* Advance generation values, so that future mutations do not 5250 /* Advance generation values, so that future mutations do not
3716 interfer with the running operation. */ 5251 interfer with the running operation. */
@@ -3729,7 +5264,7 @@ handle_client_accept (void *cls,
3729 1, 5264 1,
3730 GNUNET_NO); 5265 GNUNET_NO);
3731 { 5266 {
3732 const struct StrataEstimator *se; 5267 struct MultiStrataEstimator *se;
3733 struct GNUNET_MQ_Envelope *ev; 5268 struct GNUNET_MQ_Envelope *ev;
3734 struct StrataEstimatorMessage *strata_msg; 5269 struct StrataEstimatorMessage *strata_msg;
3735 char *buf; 5270 char *buf;
@@ -3739,20 +5274,40 @@ handle_client_accept (void *cls,
3739 op->se = strata_estimator_dup (op->set->se); 5274 op->se = strata_estimator_dup (op->set->se);
3740 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 5275 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3741 GNUNET_NO); 5276 GNUNET_NO);
3742 op->salt_receive = op->salt_send = 42; // FIXME????? 5277 op->salt_receive = (op->peer_site + 1) % 2;
5278 op->salt_send = op->peer_site; // FIXME?????
3743 initialize_key_to_element (op); 5279 initialize_key_to_element (op);
3744 op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( 5280 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3745 op->key_to_element); 5281 op->key_to_element);
3746 5282
3747 /* kick off the operation */ 5283 /* kick off the operation */
3748 se = op->se; 5284 se = op->se;
3749 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); 5285
5286 uint8_t se_count = 1;
5287 if (op->initial_size > 0)
5288 {
5289 op->total_elements_size_local = 0;
5290 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5291 &
5292 determinate_avg_element_size_iterator,
5293 op);
5294 se_count = determine_strata_count (
5295 op->total_elements_size_local / op->initial_size,
5296 op->initial_size);
5297 }
5298 buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5299 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
3750 len = strata_estimator_write (se, 5300 len = strata_estimator_write (se,
5301 SE_IBFS_TOTAL_SIZE,
5302 se_count,
3751 buf); 5303 buf);
3752 perf_rtt.se.sent += 1; 5304#if MEASURE_PERFORMANCE
3753 perf_rtt.se.sent_var_bytes += len; 5305 perf_store.se.sent += 1;
5306 perf_store.se.sent_var_bytes += len;
5307#endif
3754 5308
3755 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) 5309 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5310 * SE_IBFS_TOTAL_SIZE)
3756 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; 5311 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
3757 else 5312 else
3758 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; 5313 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
@@ -3766,6 +5321,7 @@ handle_client_accept (void *cls,
3766 strata_msg->set_size 5321 strata_msg->set_size
3767 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( 5322 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3768 op->set->content->elements)); 5323 op->set->content->elements));
5324 strata_msg->se_count = se_count;
3769 GNUNET_MQ_send (op->mq, 5325 GNUNET_MQ_send (op->mq,
3770 ev); 5326 ev);
3771 op->phase = PHASE_EXPECT_IBF; 5327 op->phase = PHASE_EXPECT_IBF;
@@ -3800,8 +5356,9 @@ shutdown_task (void *cls)
3800 GNUNET_YES); 5356 GNUNET_YES);
3801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5357 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3802 "handled shutdown request\n"); 5358 "handled shutdown request\n");
3803 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 5359#if MEASURE_PERFORMANCE
3804 "RTT:%f\n", calculate_perf_rtt()); 5360 calculate_perf_store ();
5361#endif
3805} 5362}
3806 5363
3807 5364
diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h
index a2803ee47..d8f34f69c 100644
--- a/src/setu/gnunet-service-setu_protocol.h
+++ b/src/setu/gnunet-service-setu_protocol.h
@@ -40,11 +40,6 @@ struct OperationRequestMessage
40 struct GNUNET_MessageHeader header; 40 struct GNUNET_MessageHeader header;
41 41
42 /** 42 /**
43 * Operation to request, values from `enum GNUNET_SET_OperationType`
44 */
45 uint32_t operation GNUNET_PACKED;
46
47 /**
48 * For Intersection: my element count 43 * For Intersection: my element count
49 */ 44 */
50 uint32_t element_count GNUNET_PACKED; 45 uint32_t element_count GNUNET_PACKED;
@@ -72,20 +67,9 @@ struct IBFMessage
72 struct GNUNET_MessageHeader header; 67 struct GNUNET_MessageHeader header;
73 68
74 /** 69 /**
75 * Order of the whole ibf, where 70 * Size of the whole ibf (number of buckets)
76 * num_buckets = 2^order
77 */
78 uint8_t order;
79
80 /**
81 * Padding, must be 0.
82 */ 71 */
83 uint8_t reserved1; 72 uint32_t ibf_size;
84
85 /**
86 * Padding, must be 0.
87 */
88 uint16_t reserved2 GNUNET_PACKED;
89 73
90 /** 74 /**
91 * Offset of the strata in the rest of the message 75 * Offset of the strata in the rest of the message
@@ -95,10 +79,22 @@ struct IBFMessage
95 /** 79 /**
96 * Salt used when hashing elements for this IBF. 80 * Salt used when hashing elements for this IBF.
97 */ 81 */
98 uint32_t salt GNUNET_PACKED; 82 uint16_t salt GNUNET_PACKED;
99 83
84 /**
85 * The bit lenght of the counter
86 */
87 uint16_t ibf_counter_bit_length;
100 /* rest: buckets */ 88 /* rest: buckets */
101}; 89};
90/**
91estimate_best_mode_of_operation (uint64_t avg_element_size,
92uint64_t local_set_size,
93 uint64_t remote_set_size,
94uint64_t est_set_diff_remote,
95 uint64_t est_set_diff_local,)
96 **/
97
102 98
103 99
104struct InquiryMessage 100struct InquiryMessage
@@ -113,11 +109,6 @@ struct InquiryMessage
113 */ 109 */
114 uint32_t salt GNUNET_PACKED; 110 uint32_t salt GNUNET_PACKED;
115 111
116 /**
117 * Reserved, set to 0.
118 */
119 uint32_t reserved GNUNET_PACKED;
120
121 /* rest: inquiry IBF keys */ 112 /* rest: inquiry IBF keys */
122}; 113};
123 114
@@ -218,9 +209,47 @@ struct StrataEstimatorMessage
218 */ 209 */
219 struct GNUNET_MessageHeader header; 210 struct GNUNET_MessageHeader header;
220 211
212 /**
213 * The number of ses transmitted
214 */
215 uint8_t se_count;
216
217 /**
218 * Size of the local set
219 */
221 uint64_t set_size; 220 uint64_t set_size;
222}; 221};
223 222
223
224/**
225 * Message which signals to other peer that we are sending full set
226 *
227 */
228struct TransmitFullMessage
229{
230 /**
231 * Type: #GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
232 */
233 struct GNUNET_MessageHeader header;
234
235 /**
236 * Remote set difference calculated with strata estimator
237 */
238 uint32_t remote_set_difference;
239
240 /**
241 * Total remote set size
242 */
243 uint32_t remote_set_size;
244
245 /**
246 * Local set difference calculated with strata estimator
247 */
248 uint32_t local_set_difference;
249
250};
251
252
224GNUNET_NETWORK_STRUCT_END 253GNUNET_NETWORK_STRUCT_END
225 254
226#endif 255#endif
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c
index 7c9a4deb6..7981cc847 100644
--- a/src/setu/gnunet-service-setu_strata_estimator.c
+++ b/src/setu/gnunet-service-setu_strata_estimator.c
@@ -22,6 +22,7 @@
22 * @brief invertible bloom filter 22 * @brief invertible bloom filter
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -30,6 +31,82 @@
30 31
31 32
32/** 33/**
34 * Should we try compressing the strata estimator? This will
35 * break compatibility with the 0.10.1-network.
36 */
37#define FAIL_10_1_COMPATIBILTIY 1
38
39/**
40 * Number of strata estimators in memory NOT transmitted
41 */
42
43#define MULTI_SE_BASE_COUNT 8
44
45/**
46 * The avg size of 1 se
47 * Based on the bsc thesis of Elias Summermatter (2021)
48 */
49
50#define AVG_BYTE_SIZE_SE 4221
51
52/**
53 * Calculates the optimal number of strata Estimators to send
54 * @param avg_element_size
55 * @param element_count
56 * @return
57 */
58uint8_t
59determine_strata_count (uint64_t avg_element_size, uint64_t element_count)
60{
61 uint64_t base_size = avg_element_size * element_count;
62 /* >67kb total size of elements in set */
63 if (base_size < AVG_BYTE_SIZE_SE * 16)
64 return 1;
65 /* >270kb total size of elements in set */
66 if (base_size < AVG_BYTE_SIZE_SE * 64)
67 return 2;
68 /* >1mb total size of elements in set */
69 if (base_size < AVG_BYTE_SIZE_SE * 256)
70 return 4;
71 return 8;
72}
73
74
75/**
76 * Modify an IBF key @a k_in based on the @a salt, returning a
77 * salted key in @a k_out.
78 */
79static void
80salt_key (const struct IBF_Key *k_in,
81 uint32_t salt,
82 struct IBF_Key *k_out)
83{
84 int s = (salt * 7) % 64;
85 uint64_t x = k_in->key_val;
86
87 /* rotate ibf key */
88 x = (x >> s) | (x << (64 - s));
89 k_out->key_val = x;
90}
91
92
93/**
94 * Reverse modification done in the salt_key function
95 */
96static void
97unsalt_key (const struct IBF_Key *k_in,
98 uint32_t salt,
99 struct IBF_Key *k_out)
100{
101 int s = (salt * 7) % 64;
102 uint64_t x = k_in->key_val;
103
104 x = (x << s) | (x >> (64 - s));
105 k_out->key_val = x;
106}
107
108
109/**
33 * Write the given strata estimator to the buffer. 110 * Write the given strata estimator to the buffer.
34 * 111 *
35 * @param se strata estimator to serialize 112 * @param se strata estimator to serialize
@@ -37,21 +114,33 @@
37 * @return number of bytes written to @a buf 114 * @return number of bytes written to @a buf
38 */ 115 */
39size_t 116size_t
40strata_estimator_write (const struct StrataEstimator *se, 117strata_estimator_write (struct MultiStrataEstimator *se,
118 uint16_t se_ibf_total_size,
119 uint8_t number_se_send,
41 void *buf) 120 void *buf)
42{ 121{
43 char *sbuf = buf; 122 char *sbuf = buf;
123 unsigned int i;
44 size_t osize; 124 size_t osize;
125 uint64_t sbuf_offset = 0;
126 se->size = number_se_send;
45 127
46 GNUNET_assert (NULL != se); 128 GNUNET_assert (NULL != se);
47 for (unsigned int i = 0; i < se->strata_count; i++) 129 for (uint8_t strata_ctr = 0; strata_ctr < number_se_send; strata_ctr++)
48 { 130 {
49 ibf_write_slice (se->strata[i], 131 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
50 0, 132 {
51 se->ibf_size, 133 ibf_write_slice (se->stratas[strata_ctr]->strata[i],
52 &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]); 134 0,
135 se->stratas[strata_ctr]->ibf_size,
136 &sbuf[sbuf_offset],
137 8);
138 sbuf_offset += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE;
139 }
53 } 140 }
54 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; 141 osize = ((se_ibf_total_size / 8) * number_se_send) * IBF_BUCKET_SIZE
142 * se->stratas[0]->strata_count;
143#if FAIL_10_1_COMPATIBILTIY
55 { 144 {
56 char *cbuf; 145 char *cbuf;
57 size_t nsize; 146 size_t nsize;
@@ -62,13 +151,12 @@ strata_estimator_write (const struct StrataEstimator *se,
62 &cbuf, 151 &cbuf,
63 &nsize)) 152 &nsize))
64 { 153 {
65 GNUNET_memcpy (buf, 154 GNUNET_memcpy (buf, cbuf, nsize);
66 cbuf,
67 nsize);
68 osize = nsize; 155 osize = nsize;
69 GNUNET_free (cbuf); 156 GNUNET_free (cbuf);
70 } 157 }
71 } 158 }
159#endif
72 return osize; 160 return osize;
73} 161}
74 162
@@ -87,15 +175,19 @@ int
87strata_estimator_read (const void *buf, 175strata_estimator_read (const void *buf,
88 size_t buf_len, 176 size_t buf_len,
89 int is_compressed, 177 int is_compressed,
90 struct StrataEstimator *se) 178 uint8_t number_se_received,
179 uint16_t se_ibf_total_size,
180 struct MultiStrataEstimator *se)
91{ 181{
182 unsigned int i;
92 size_t osize; 183 size_t osize;
93 char *dbuf; 184 char *dbuf;
94 185
95 dbuf = NULL; 186 dbuf = NULL;
96 if (GNUNET_YES == is_compressed) 187 if (GNUNET_YES == is_compressed)
97 { 188 {
98 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; 189 osize = ((se_ibf_total_size / 8) * number_se_received) * IBF_BUCKET_SIZE
190 * se->stratas[0]->strata_count;
99 dbuf = GNUNET_decompress (buf, 191 dbuf = GNUNET_decompress (buf,
100 buf_len, 192 buf_len,
101 osize); 193 osize);
@@ -108,18 +200,25 @@ strata_estimator_read (const void *buf,
108 buf_len = osize; 200 buf_len = osize;
109 } 201 }
110 202
111 if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE) 203 if (buf_len != se->stratas[0]->strata_count * ((se_ibf_total_size / 8)
204 * number_se_received)
205 * IBF_BUCKET_SIZE)
112 { 206 {
113 GNUNET_break (0); /* very odd error */ 207 GNUNET_break (0); /* very odd error */
114 GNUNET_free (dbuf); 208 GNUNET_free (dbuf);
115 return GNUNET_SYSERR; 209 return GNUNET_SYSERR;
116 } 210 }
117 211
118 for (unsigned int i = 0; i < se->strata_count; i++) 212 for (uint8_t strata_ctr = 0; strata_ctr < number_se_received; strata_ctr++)
119 { 213 {
120 ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); 214 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
121 buf += se->ibf_size * IBF_BUCKET_SIZE; 215 {
216 ibf_read_slice (buf, 0, se->stratas[strata_ctr]->ibf_size,
217 se->stratas[strata_ctr]->strata[i], 8);
218 buf += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE;
219 }
122 } 220 }
221 se->size = number_se_received;
123 GNUNET_free (dbuf); 222 GNUNET_free (dbuf);
124 return GNUNET_OK; 223 return GNUNET_OK;
125} 224}
@@ -132,38 +231,61 @@ strata_estimator_read (const void *buf,
132 * @param key key to add 231 * @param key key to add
133 */ 232 */
134void 233void
135strata_estimator_insert (struct StrataEstimator *se, 234strata_estimator_insert (struct MultiStrataEstimator *se,
136 struct IBF_Key key) 235 struct IBF_Key key)
137{ 236{
138 uint64_t v;
139 unsigned int i;
140 237
141 v = key.key_val; 238
142 /* count trailing '1'-bits of v */ 239 /* count trailing '1'-bits of v */
143 for (i = 0; v & 1; v >>= 1, i++) 240 for (int strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
144 /* empty */; 241 {
145 ibf_insert (se->strata[i], key); 242 unsigned int i;
243 uint64_t v;
244
245 struct IBF_Key salted_key;
246 salt_key (&key,
247 strata_ctr * (64 / MULTI_SE_BASE_COUNT),
248 &salted_key);
249 v = salted_key.key_val;
250 for (i = 0; v & 1; v >>= 1, i++)
251 {
252 ibf_insert (se->stratas[strata_ctr]->strata[i], salted_key);
253 }
254 }
255 /* empty */;
256
146} 257}
147 258
148 259
149/** 260/**
150 * Remove a key from the strata estimator. 261 * Remove a key from the strata estimator. (NOT USED)
151 * 262 *
152 * @param se strata estimator to remove the key from 263 * @param se strata estimator to remove the key from
153 * @param key key to remove 264 * @param key key to remove
154 */ 265 */
155void 266void
156strata_estimator_remove (struct StrataEstimator *se, 267strata_estimator_remove (struct MultiStrataEstimator *se,
157 struct IBF_Key key) 268 struct IBF_Key key)
158{ 269{
159 uint64_t v;
160 unsigned int i;
161 270
162 v = key.key_val;
163 /* count trailing '1'-bits of v */ 271 /* count trailing '1'-bits of v */
164 for (i = 0; v & 1; v >>= 1, i++) 272 for (int strata_ctr = 0; strata_ctr < se->size; strata_ctr++)
165 /* empty */; 273 {
166 ibf_remove (se->strata[i], key); 274 uint64_t v;
275 unsigned int i;
276
277 struct IBF_Key unsalted_key;
278 unsalt_key (&key,
279 strata_ctr * (64 / MULTI_SE_BASE_COUNT),
280 &unsalted_key);
281
282 v = unsalted_key.key_val;
283 for (i = 0; v & 1; v >>= 1, i++)
284 {
285 /* empty */;
286 ibf_remove (se->stratas[strata_ctr]->strata[i], unsalted_key);
287 }
288 }
167} 289}
168 290
169 291
@@ -175,29 +297,42 @@ strata_estimator_remove (struct StrataEstimator *se,
175 * @param ibf_hashnum hashnum parameter of each ibf 297 * @param ibf_hashnum hashnum parameter of each ibf
176 * @return a freshly allocated, empty strata estimator, NULL on error 298 * @return a freshly allocated, empty strata estimator, NULL on error
177 */ 299 */
178struct StrataEstimator * 300struct MultiStrataEstimator *
179strata_estimator_create (unsigned int strata_count, 301strata_estimator_create (unsigned int strata_count,
180 uint32_t ibf_size, 302 uint32_t ibf_size,
181 uint8_t ibf_hashnum) 303 uint8_t ibf_hashnum)
182{ 304{
183 struct StrataEstimator *se; 305 struct MultiStrataEstimator *se;
184 306 unsigned int i;
185 se = GNUNET_new (struct StrataEstimator); 307 unsigned int j;
186 se->strata_count = strata_count; 308 se = GNUNET_new (struct MultiStrataEstimator);
187 se->ibf_size = ibf_size; 309
188 se->strata = GNUNET_new_array (strata_count, 310 se->size = MULTI_SE_BASE_COUNT;
189 struct InvertibleBloomFilter *); 311 se->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *);
190 for (unsigned int i = 0; i < strata_count; i++) 312
313 uint8_t ibf_prime_sizes[] = {79,79,79,79,79,79,79,79};
314
315 for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
191 { 316 {
192 se->strata[i] = ibf_create (ibf_size, ibf_hashnum); 317 se->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator);
193 if (NULL == se->strata[i]) 318 se->stratas[strata_ctr]->strata_count = strata_count;
319 se->stratas[strata_ctr]->ibf_size = ibf_prime_sizes[strata_ctr];
320 se->stratas[strata_ctr]->strata = GNUNET_new_array (strata_count * 4,
321 struct
322 InvertibleBloomFilter *);
323 for (i = 0; i < strata_count; i++)
194 { 324 {
195 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 325 se->stratas[strata_ctr]->strata[i] = ibf_create (
196 "Failed to allocate memory for strata estimator\n"); 326 ibf_prime_sizes[strata_ctr], ibf_hashnum);
197 for (unsigned int j = 0; j < i; j++) 327 if (NULL == se->stratas[strata_ctr]->strata[i])
198 ibf_destroy (se->strata[i]); 328 {
199 GNUNET_free (se); 329 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
200 return NULL; 330 "Failed to allocate memory for strata estimator\n");
331 for (j = 0; j < i; j++)
332 ibf_destroy (se->stratas[strata_ctr]->strata[i]);
333 GNUNET_free (se);
334 return NULL;
335 }
201 } 336 }
202 } 337 }
203 return se; 338 return se;
@@ -213,46 +348,71 @@ strata_estimator_create (unsigned int strata_count,
213 * @param se2 second strata estimator 348 * @param se2 second strata estimator
214 * @return the estimated difference 349 * @return the estimated difference
215 */ 350 */
216unsigned int 351void
217strata_estimator_difference (const struct StrataEstimator *se1, 352strata_estimator_difference (const struct MultiStrataEstimator *se1,
218 const struct StrataEstimator *se2) 353 const struct MultiStrataEstimator *se2)
219{ 354{
220 unsigned int count; 355 int avg_local_diff = 0;
356 int avg_remote_diff = 0;
357 uint8_t number_of_estimators = se1->size;
221 358
222 GNUNET_assert (se1->strata_count == se2->strata_count); 359 for (uint8_t strata_ctr = 0; strata_ctr < number_of_estimators; strata_ctr++)
223 count = 0;
224 for (int i = se1->strata_count - 1; i >= 0; i--)
225 { 360 {
226 struct InvertibleBloomFilter *diff; 361 GNUNET_assert (se1->stratas[strata_ctr]->strata_count ==
227 /* number of keys decoded from the ibf */ 362 se2->stratas[strata_ctr]->strata_count);
228 363
229 /* FIXME: implement this without always allocating new IBFs */ 364
230 diff = ibf_dup (se1->strata[i]); 365 for (int i = se1->stratas[strata_ctr]->strata_count - 1; i >= 0; i--)
231 ibf_subtract (diff,
232 se2->strata[i]);
233 for (int ibf_count = 0; GNUNET_YES; ibf_count++)
234 { 366 {
235 int more; 367 struct InvertibleBloomFilter *diff;
368 /* number of keys decoded from the ibf */
236 369
237 more = ibf_decode (diff, 370 /* FIXME: implement this without always allocating new IBFs */
238 NULL, 371 diff = ibf_dup (se1->stratas[strata_ctr]->strata[i]);
239 NULL); 372 diff->local_decoded_count = 0;
240 if (GNUNET_NO == more) 373 diff->remote_decoded_count = 0;
241 { 374
242 count += ibf_count; 375 ibf_subtract (diff, se2->stratas[strata_ctr]->strata[i]);
243 break; 376
244 } 377 for (int ibf_count = 0; GNUNET_YES; ibf_count++)
245 /* Estimate if decoding fails or would not terminate */
246 if ( (GNUNET_SYSERR == more) ||
247 (ibf_count > diff->size) )
248 { 378 {
249 ibf_destroy (diff); 379 int more;
250 return count * (1 << (i + 1)); 380
381 more = ibf_decode (diff, NULL, NULL);
382 if (GNUNET_NO == more)
383 {
384 se1->stratas[strata_ctr]->strata[0]->local_decoded_count +=
385 diff->local_decoded_count;
386 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count +=
387 diff->remote_decoded_count;
388 break;
389 }
390 /* Estimate if decoding fails or would not terminate */
391 if ((GNUNET_SYSERR == more) || (ibf_count > diff->size))
392 {
393 se1->stratas[strata_ctr]->strata[0]->local_decoded_count =
394 se1->stratas[strata_ctr]->strata[0]->local_decoded_count * (1 << (i
395 +
396 1));
397 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count =
398 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count * (1 << (i
399 +
400 1));
401 ibf_destroy (diff);
402 goto break_all_counting_loops;
403 }
251 } 404 }
405 ibf_destroy (diff);
252 } 406 }
253 ibf_destroy (diff); 407break_all_counting_loops:;
408 avg_local_diff += se1->stratas[strata_ctr]->strata[0]->local_decoded_count;
409 avg_remote_diff +=
410 se1->stratas[strata_ctr]->strata[0]->remote_decoded_count;
254 } 411 }
255 return count; 412 se1->stratas[0]->strata[0]->local_decoded_count = avg_local_diff
413 / number_of_estimators;
414 se1->stratas[0]->strata[0]->remote_decoded_count = avg_remote_diff
415 / number_of_estimators;
256} 416}
257 417
258 418
@@ -262,18 +422,28 @@ strata_estimator_difference (const struct StrataEstimator *se1,
262 * @param se the strata estimator to copy 422 * @param se the strata estimator to copy
263 * @return the copy 423 * @return the copy
264 */ 424 */
265struct StrataEstimator * 425struct MultiStrataEstimator *
266strata_estimator_dup (struct StrataEstimator *se) 426strata_estimator_dup (struct MultiStrataEstimator *se)
267{ 427{
268 struct StrataEstimator *c; 428 struct MultiStrataEstimator *c;
269 429 unsigned int i;
270 c = GNUNET_new (struct StrataEstimator); 430
271 c->strata_count = se->strata_count; 431 c = GNUNET_new (struct MultiStrataEstimator);
272 c->ibf_size = se->ibf_size; 432 c->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *);
273 c->strata = GNUNET_new_array (se->strata_count, 433 for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
274 struct InvertibleBloomFilter *); 434 {
275 for (unsigned int i = 0; i < se->strata_count; i++) 435 c->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator);
276 c->strata[i] = ibf_dup (se->strata[i]); 436 c->stratas[strata_ctr]->strata_count =
437 se->stratas[strata_ctr]->strata_count;
438 c->stratas[strata_ctr]->ibf_size = se->stratas[strata_ctr]->ibf_size;
439 c->stratas[strata_ctr]->strata = GNUNET_new_array (
440 se->stratas[strata_ctr]->strata_count,
441 struct
442 InvertibleBloomFilter *);
443 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
444 c->stratas[strata_ctr]->strata[i] = ibf_dup (
445 se->stratas[strata_ctr]->strata[i]);
446 }
277 return c; 447 return c;
278} 448}
279 449
@@ -284,10 +454,14 @@ strata_estimator_dup (struct StrataEstimator *se)
284 * @param se strata estimator to destroy. 454 * @param se strata estimator to destroy.
285 */ 455 */
286void 456void
287strata_estimator_destroy (struct StrataEstimator *se) 457strata_estimator_destroy (struct MultiStrataEstimator *se)
288{ 458{
289 for (unsigned int i = 0; i < se->strata_count; i++) 459 unsigned int i;
290 ibf_destroy (se->strata[i]); 460 for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
291 GNUNET_free (se->strata); 461 {
462 for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
463 ibf_destroy (se->stratas[strata_ctr]->strata[i]);
464 GNUNET_free (se->stratas[strata_ctr]->strata);
465 }
292 GNUNET_free (se); 466 GNUNET_free (se);
293} 467}
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h
index afdbcdbbf..4871a7fcd 100644
--- a/src/setu/gnunet-service-setu_strata_estimator.h
+++ b/src/setu/gnunet-service-setu_strata_estimator.h
@@ -22,6 +22,7 @@
22 * @file set/gnunet-service-setu_strata_estimator.h 22 * @file set/gnunet-service-setu_strata_estimator.h
23 * @brief estimator of set difference 23 * @brief estimator of set difference
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26 27
27#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H 28#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
@@ -61,6 +62,31 @@ struct StrataEstimator
61 unsigned int ibf_size; 62 unsigned int ibf_size;
62}; 63};
63 64
65struct MultiStrataEstimator
66{
67 /**
68 * Array of strata estimators
69 */
70 struct StrataEstimator **stratas;
71
72 /**
73 * Number of strata estimators in struct
74 */
75 uint8_t size;
76
77};
78
79/**
80 * Deteminate how many strata estimators in the message are necessary
81 * @param avg_element_size
82 * @param element_count
83 * @return number of strata's
84 */
85
86uint8_t
87determine_strata_count (uint64_t avg_element_size,
88 uint64_t element_count);
89
64 90
65/** 91/**
66 * Write the given strata estimator to the buffer. 92 * Write the given strata estimator to the buffer.
@@ -70,7 +96,9 @@ struct StrataEstimator
70 * @return number of bytes written to @a buf 96 * @return number of bytes written to @a buf
71 */ 97 */
72size_t 98size_t
73strata_estimator_write (const struct StrataEstimator *se, 99strata_estimator_write (struct MultiStrataEstimator *se,
100 uint16_t se_ibf_total_size,
101 uint8_t number_se_send,
74 void *buf); 102 void *buf);
75 103
76 104
@@ -88,7 +116,9 @@ int
88strata_estimator_read (const void *buf, 116strata_estimator_read (const void *buf,
89 size_t buf_len, 117 size_t buf_len,
90 int is_compressed, 118 int is_compressed,
91 struct StrataEstimator *se); 119 uint8_t number_se_received,
120 uint16_t se_ibf_total_size,
121 struct MultiStrataEstimator *se);
92 122
93 123
94/** 124/**
@@ -99,7 +129,7 @@ strata_estimator_read (const void *buf,
99 * @param ibf_hashnum hashnum parameter of each ibf 129 * @param ibf_hashnum hashnum parameter of each ibf
100 * @return a freshly allocated, empty strata estimator, NULL on error 130 * @return a freshly allocated, empty strata estimator, NULL on error
101 */ 131 */
102struct StrataEstimator * 132struct MultiStrataEstimator *
103strata_estimator_create (unsigned int strata_count, 133strata_estimator_create (unsigned int strata_count,
104 uint32_t ibf_size, 134 uint32_t ibf_size,
105 uint8_t ibf_hashnum); 135 uint8_t ibf_hashnum);
@@ -111,11 +141,11 @@ strata_estimator_create (unsigned int strata_count,
111 * 141 *
112 * @param se1 first strata estimator 142 * @param se1 first strata estimator
113 * @param se2 second strata estimator 143 * @param se2 second strata estimator
114 * @return abs(|se1| - |se2|) 144 * @return nothing
115 */ 145 */
116unsigned int 146void
117strata_estimator_difference (const struct StrataEstimator *se1, 147strata_estimator_difference (const struct MultiStrataEstimator *se1,
118 const struct StrataEstimator *se2); 148 const struct MultiStrataEstimator *se2);
119 149
120 150
121/** 151/**
@@ -125,7 +155,7 @@ strata_estimator_difference (const struct StrataEstimator *se1,
125 * @param key key to add 155 * @param key key to add
126 */ 156 */
127void 157void
128strata_estimator_insert (struct StrataEstimator *se, 158strata_estimator_insert (struct MultiStrataEstimator *se,
129 struct IBF_Key key); 159 struct IBF_Key key);
130 160
131 161
@@ -136,7 +166,7 @@ strata_estimator_insert (struct StrataEstimator *se,
136 * @param key key to remove 166 * @param key key to remove
137 */ 167 */
138void 168void
139strata_estimator_remove (struct StrataEstimator *se, 169strata_estimator_remove (struct MultiStrataEstimator *se,
140 struct IBF_Key key); 170 struct IBF_Key key);
141 171
142 172
@@ -146,7 +176,7 @@ strata_estimator_remove (struct StrataEstimator *se,
146 * @param se strata estimator to destroy. 176 * @param se strata estimator to destroy.
147 */ 177 */
148void 178void
149strata_estimator_destroy (struct StrataEstimator *se); 179strata_estimator_destroy (struct MultiStrataEstimator *se);
150 180
151 181
152/** 182/**
@@ -155,8 +185,8 @@ strata_estimator_destroy (struct StrataEstimator *se);
155 * @param se the strata estimator to copy 185 * @param se the strata estimator to copy
156 * @return the copy 186 * @return the copy
157 */ 187 */
158struct StrataEstimator * 188struct MultiStrataEstimator *
159strata_estimator_dup (struct StrataEstimator *se); 189strata_estimator_dup (struct MultiStrataEstimator *se);
160 190
161 191
162#if 0 /* keep Emacsens' auto-indent happy */ 192#if 0 /* keep Emacsens' auto-indent happy */
diff --git a/src/setu/ibf.c b/src/setu/ibf.c
index 1beba9065..8f29adb62 100644
--- a/src/setu/ibf.c
+++ b/src/setu/ibf.c
@@ -20,11 +20,15 @@
20 20
21/** 21/**
22 * @file set/ibf.c 22 * @file set/ibf.c
23 * @brief implementation of the invertible Bloom filter 23 * @brief implementation of the invertible bloom filter
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26 27
27#include "ibf.h" 28#include "ibf.h"
29#include "gnunet_util_lib.h"
30#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
31
28 32
29/** 33/**
30 * Compute the key's hash from the key. 34 * Compute the key's hash from the key.
@@ -58,11 +62,12 @@ ibf_hashcode_from_key (struct IBF_Key key,
58 struct GNUNET_HashCode *dst) 62 struct GNUNET_HashCode *dst)
59{ 63{
60 struct IBF_Key *p; 64 struct IBF_Key *p;
65 unsigned int i;
61 const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode) 66 const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode)
62 / sizeof(struct IBF_Key); 67 / sizeof(struct IBF_Key);
63 68
64 p = (struct IBF_Key *) dst; 69 p = (struct IBF_Key *) dst;
65 for (unsigned int i = 0; i < keys_per_hashcode; i++) 70 for (i = 0; i < keys_per_hashcode; i++)
66 *p++ = key; 71 *p++ = key;
67} 72}
68 73
@@ -75,14 +80,14 @@ ibf_hashcode_from_key (struct IBF_Key key,
75 * @return the newly created invertible bloom filter, NULL on error 80 * @return the newly created invertible bloom filter, NULL on error
76 */ 81 */
77struct InvertibleBloomFilter * 82struct InvertibleBloomFilter *
78ibf_create (uint32_t size, 83ibf_create (uint32_t size, uint8_t hash_num)
79 uint8_t hash_num)
80{ 84{
81 struct InvertibleBloomFilter *ibf; 85 struct InvertibleBloomFilter *ibf;
82 86
83 GNUNET_assert (0 != size); 87 GNUNET_assert (0 != size);
88
84 ibf = GNUNET_new (struct InvertibleBloomFilter); 89 ibf = GNUNET_new (struct InvertibleBloomFilter);
85 ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t)); 90 ibf->count = GNUNET_malloc_large (size * sizeof(uint64_t));
86 if (NULL == ibf->count) 91 if (NULL == ibf->count)
87 { 92 {
88 GNUNET_free (ibf); 93 GNUNET_free (ibf);
@@ -105,6 +110,7 @@ ibf_create (uint32_t size,
105 } 110 }
106 ibf->size = size; 111 ibf->size = size;
107 ibf->hash_num = hash_num; 112 ibf->hash_num = hash_num;
113
108 return ibf; 114 return ibf;
109} 115}
110 116
@@ -121,8 +127,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
121 uint32_t i; 127 uint32_t i;
122 uint32_t bucket; 128 uint32_t bucket;
123 129
124 bucket = GNUNET_CRYPTO_crc32_n (&key, 130 bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key);
125 sizeof (key));
126 for (i = 0, filled = 0; filled < ibf->hash_num; i++) 131 for (i = 0, filled = 0; filled < ibf->hash_num; i++)
127 { 132 {
128 uint64_t x; 133 uint64_t x;
@@ -133,8 +138,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
133 dst[filled++] = bucket % ibf->size; 138 dst[filled++] = bucket % ibf->size;
134try_next: 139try_next:
135 x = ((uint64_t) bucket << 32) | i; 140 x = ((uint64_t) bucket << 32) | i;
136 bucket = GNUNET_CRYPTO_crc32_n (&x, 141 bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
137 sizeof (x));
138 } 142 }
139} 143}
140 144
@@ -170,13 +174,8 @@ ibf_insert (struct InvertibleBloomFilter *ibf,
170 int buckets[ibf->hash_num]; 174 int buckets[ibf->hash_num];
171 175
172 GNUNET_assert (ibf->hash_num <= ibf->size); 176 GNUNET_assert (ibf->hash_num <= ibf->size);
173 ibf_get_indices (ibf, 177 ibf_get_indices (ibf, key, buckets);
174 key, 178 ibf_insert_into (ibf, key, buckets, 1);
175 buckets);
176 ibf_insert_into (ibf,
177 key,
178 buckets,
179 1);
180} 179}
181 180
182 181
@@ -193,13 +192,8 @@ ibf_remove (struct InvertibleBloomFilter *ibf,
193 int buckets[ibf->hash_num]; 192 int buckets[ibf->hash_num];
194 193
195 GNUNET_assert (ibf->hash_num <= ibf->size); 194 GNUNET_assert (ibf->hash_num <= ibf->size);
196 ibf_get_indices (ibf, 195 ibf_get_indices (ibf, key, buckets);
197 key, 196 ibf_insert_into (ibf, key, buckets, -1);
198 buckets);
199 ibf_insert_into (ibf,
200 key,
201 buckets,
202 -1);
203} 197}
204 198
205 199
@@ -244,6 +238,8 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
244 238
245 for (uint32_t i = 0; i < ibf->size; i++) 239 for (uint32_t i = 0; i < ibf->size; i++)
246 { 240 {
241 int hit;
242
247 /* we can only decode from pure buckets */ 243 /* we can only decode from pure buckets */
248 if ( (1 != ibf->count[i].count_val) && 244 if ( (1 != ibf->count[i].count_val) &&
249 (-1 != ibf->count[i].count_val) ) 245 (-1 != ibf->count[i].count_val) )
@@ -257,30 +253,33 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
257 253
258 /* test if key in bucket hits its own location, 254 /* test if key in bucket hits its own location,
259 * if not, the key hash was subject to collision */ 255 * if not, the key hash was subject to collision */
260 { 256 hit = GNUNET_NO;
261 bool hit = false; 257 ibf_get_indices (ibf, ibf->key_sum[i], buckets);
258 for (int j = 0; j < ibf->hash_num; j++)
259 if (buckets[j] == i)
260 hit = GNUNET_YES;
262 261
263 ibf_get_indices (ibf, 262 if (GNUNET_NO == hit)
264 ibf->key_sum[i], 263 continue;
265 buckets); 264
266 for (int j = 0; j < ibf->hash_num; j++) 265 if (1 == ibf->count[i].count_val)
267 if (buckets[j] == i) 266 {
268 { 267 ibf->remote_decoded_count++;
269 hit = true;
270 break;
271 }
272 if (! hit)
273 continue;
274 } 268 }
269 else
270 {
271 ibf->local_decoded_count++;
272 }
273
274
275 if (NULL != ret_side) 275 if (NULL != ret_side)
276 *ret_side = ibf->count[i].count_val; 276 *ret_side = ibf->count[i].count_val;
277 if (NULL != ret_id) 277 if (NULL != ret_id)
278 *ret_id = ibf->key_sum[i]; 278 *ret_id = ibf->key_sum[i];
279 279
280 /* insert on the opposite side, effectively removing the element */ 280 /* insert on the opposite side, effectively removing the element */
281 ibf_insert_into (ibf, 281 ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
282 ibf->key_sum[i], buckets, 282
283 -ibf->count[i].count_val);
284 return GNUNET_YES; 283 return GNUNET_YES;
285 } 284 }
286 285
@@ -291,6 +290,26 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
291 290
292 291
293/** 292/**
293 * Returns the minimal bytes needed to store the counter of the IBF
294 *
295 * @param ibf the IBF
296 */
297uint8_t
298ibf_get_max_counter (struct InvertibleBloomFilter *ibf)
299{
300 long long max_counter = 0;
301 for (uint64_t i = 0; i < ibf->size; i++)
302 {
303 if (ibf->count[i].count_val > max_counter)
304 {
305 max_counter = ibf->count[i].count_val;
306 }
307 }
308 return 64 - __builtin_clzll (max_counter);
309}
310
311
312/**
294 * Write buckets from an ibf to a buffer. 313 * Write buckets from an ibf to a buffer.
295 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. 314 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
296 * 315 *
@@ -298,16 +317,17 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
298 * @param start with which bucket to start 317 * @param start with which bucket to start
299 * @param count how many buckets to write 318 * @param count how many buckets to write
300 * @param buf buffer to write the data to 319 * @param buf buffer to write the data to
320 * @param max bit length of a counter for unpacking
301 */ 321 */
302void 322void
303ibf_write_slice (const struct InvertibleBloomFilter *ibf, 323ibf_write_slice (const struct InvertibleBloomFilter *ibf,
304 uint32_t start, 324 uint32_t start,
305 uint32_t count, 325 uint64_t count,
306 void *buf) 326 void *buf,
327 uint8_t counter_max_length)
307{ 328{
308 struct IBF_Key *key_dst; 329 struct IBF_Key *key_dst;
309 struct IBF_KeyHash *key_hash_dst; 330 struct IBF_KeyHash *key_hash_dst;
310 struct IBF_Count *count_dst;
311 331
312 GNUNET_assert (start + count <= ibf->size); 332 GNUNET_assert (start + count <= ibf->size);
313 333
@@ -315,19 +335,182 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
315 key_dst = (struct IBF_Key *) buf; 335 key_dst = (struct IBF_Key *) buf;
316 GNUNET_memcpy (key_dst, 336 GNUNET_memcpy (key_dst,
317 ibf->key_sum + start, 337 ibf->key_sum + start,
318 count * sizeof *key_dst); 338 count * sizeof(*key_dst));
319 key_dst += count; 339 key_dst += count;
320 /* copy key hashes */ 340 /* copy key hashes */
321 key_hash_dst = (struct IBF_KeyHash *) key_dst; 341 key_hash_dst = (struct IBF_KeyHash *) key_dst;
322 GNUNET_memcpy (key_hash_dst, 342 GNUNET_memcpy (key_hash_dst,
323 ibf->key_hash_sum + start, 343 ibf->key_hash_sum + start,
324 count * sizeof *key_hash_dst); 344 count * sizeof(*key_hash_dst));
325 key_hash_dst += count; 345 key_hash_dst += count;
326 /* copy counts */ 346
327 count_dst = (struct IBF_Count *) key_hash_dst; 347 /* pack and copy counter */
328 GNUNET_memcpy (count_dst, 348 pack_counter (ibf,
329 ibf->count + start, 349 start,
330 count * sizeof *count_dst); 350 count,
351 (uint8_t *) key_hash_dst,
352 counter_max_length);
353
354
355}
356
357
358/**
359 * Packs the counter to transmit only the smallest possible amount of bytes and
360 * preventing overflow of the counter
361 * @param ibf the ibf to write
362 * @param start with which bucket to start
363 * @param count how many buckets to write
364 * @param buf buffer to write the data to
365 * @param max bit length of a counter for unpacking
366 */
367
368void
369pack_counter (const struct InvertibleBloomFilter *ibf,
370 uint32_t start,
371 uint64_t count,
372 uint8_t *buf,
373 uint8_t counter_max_length)
374{
375 uint8_t store_size = 0;
376 uint8_t store = 0;
377 uint16_t byte_ctr = 0;
378
379 /**
380 * Iterate over IBF bucket
381 */
382 for (uint64_t i = start; i< (count + start);)
383 {
384 uint64_t count_val_to_write = ibf->count[i].count_val;
385 uint8_t count_len_to_write = counter_max_length;
386
387 /**
388 * Pack and compose counters to byte values
389 */
390 while ((count_len_to_write + store_size) >= 8)
391 {
392 uint8_t bit_shift = 0;
393
394 /**
395 * Shift bits if more than a byte has to be written
396 * or the store size is not empty
397 */
398 if ((store_size > 0) || (count_len_to_write > 8))
399 {
400 uint8_t bit_unused = 8 - store_size;
401 bit_shift = count_len_to_write - bit_unused;
402 store = store << bit_unused;
403 }
404
405 buf[byte_ctr] = ((count_val_to_write >> bit_shift) | store) & 0xFF;
406 byte_ctr++;
407 count_len_to_write -= (8 - store_size);
408 count_val_to_write = count_val_to_write & ((1ULL <<
409 count_len_to_write) - 1);
410 store = 0;
411 store_size = 0;
412 }
413 store = (store << count_len_to_write) | count_val_to_write;
414 store_size = store_size + count_len_to_write;
415 count_len_to_write = 0;
416 i++;
417 }
418
419 /**
420 * Pack data left in story before finishing
421 */
422 if (store_size > 0)
423 {
424 buf[byte_ctr] = store << (8 - store_size);
425 byte_ctr++;
426 }
427
428}
429
430
431/**
432 * Unpacks the counter to transmit only the smallest possible amount of bytes and
433 * preventing overflow of the counter
434 * @param ibf the ibf to write
435 * @param start with which bucket to start
436 * @param count how many buckets to write
437 * @param buf buffer to write the data to
438 * @param max bit length of a counter for unpacking
439 */
440
441void
442unpack_counter (const struct InvertibleBloomFilter *ibf,
443 uint32_t start,
444 uint64_t count,
445 uint8_t *buf,
446 uint8_t counter_max_length)
447{
448 uint64_t ibf_counter_ctr = 0;
449 uint64_t store = 0;
450 uint64_t store_bit_ctr = 0;
451 uint64_t byte_ctr = 0;
452
453 /**
454 * Iterate over received bytes
455 */
456 while (true)
457 {
458 uint8_t byte_read = buf[byte_ctr];
459 uint8_t bit_to_read_left = 8;
460 byte_ctr++;
461
462 /**
463 * Pack data left in story before finishing
464 */
465 while (bit_to_read_left >= 0)
466 {
467 /**
468 * Stop decoding when end is reached
469 */
470 if (ibf_counter_ctr > (count - 1))
471 return;
472
473 /*
474 * Unpack the counter
475 */
476 if ((store_bit_ctr + bit_to_read_left) >= counter_max_length)
477 {
478 uint8_t bytes_used = counter_max_length - store_bit_ctr;
479 if (store_bit_ctr > 0)
480 {
481 store = store << bytes_used;
482 }
483
484 uint8_t bytes_to_shift = bit_to_read_left - bytes_used;
485 uint64_t counter_part = byte_read >> bytes_to_shift;
486 store = store | counter_part;
487 ibf->count[ibf_counter_ctr + start].count_val = store;
488 byte_read = byte_read & ((1 << bytes_to_shift) - 1);
489 bit_to_read_left -= bytes_used;
490 ibf_counter_ctr++;
491 store = 0;
492 store_bit_ctr = 0;
493 }
494 else
495 {
496 store_bit_ctr += bit_to_read_left;
497 if (0 == store)
498 {
499 store = byte_read;
500 }
501 else
502 {
503 store = store << bit_to_read_left;
504 store = store | byte_read;
505 }
506 break;
507
508 }
509
510 }
511
512 }
513
331} 514}
332 515
333 516
@@ -338,12 +521,14 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
338 * @param start which bucket to start at 521 * @param start which bucket to start at
339 * @param count how many buckets to read 522 * @param count how many buckets to read
340 * @param ibf the ibf to read from 523 * @param ibf the ibf to read from
524 * @param max bit length of a counter for unpacking
341 */ 525 */
342void 526void
343ibf_read_slice (const void *buf, 527ibf_read_slice (const void *buf,
344 uint32_t start, 528 uint32_t start,
345 uint32_t count, 529 uint64_t count,
346 struct InvertibleBloomFilter *ibf) 530 struct InvertibleBloomFilter *ibf,
531 uint8_t counter_max_length)
347{ 532{
348 struct IBF_Key *key_src; 533 struct IBF_Key *key_src;
349 struct IBF_KeyHash *key_hash_src; 534 struct IBF_KeyHash *key_hash_src;
@@ -364,11 +549,10 @@ ibf_read_slice (const void *buf,
364 key_hash_src, 549 key_hash_src,
365 count * sizeof *key_hash_src); 550 count * sizeof *key_hash_src);
366 key_hash_src += count; 551 key_hash_src += count;
367 /* copy counts */ 552
553 /* copy and unpack counts */
368 count_src = (struct IBF_Count *) key_hash_src; 554 count_src = (struct IBF_Count *) key_hash_src;
369 GNUNET_memcpy (ibf->count + start, 555 unpack_counter (ibf,start,count,(uint8_t *) count_src,counter_max_length);
370 count_src,
371 count * sizeof *count_src);
372} 556}
373 557
374 558
diff --git a/src/setu/ibf.h b/src/setu/ibf.h
index 7c2ab33b1..5628405dc 100644
--- a/src/setu/ibf.h
+++ b/src/setu/ibf.h
@@ -22,6 +22,7 @@
22 * @file set/ibf.h 22 * @file set/ibf.h
23 * @brief invertible bloom filter 23 * @brief invertible bloom filter
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26 27
27#ifndef GNUNET_CONSENSUS_IBF_H 28#ifndef GNUNET_CONSENSUS_IBF_H
@@ -62,7 +63,7 @@ struct IBF_KeyHash
62 */ 63 */
63struct IBF_Count 64struct IBF_Count
64{ 65{
65 int8_t count_val; 66 int64_t count_val;
66}; 67};
67 68
68 69
@@ -93,6 +94,20 @@ struct InvertibleBloomFilter
93 uint8_t hash_num; 94 uint8_t hash_num;
94 95
95 /** 96 /**
97 * If an IBF is decoded this count stores how many
98 * elements are on the local site. This is used
99 * to estimate the set difference on a site
100 */
101 int local_decoded_count;
102
103 /**
104 * If an IBF is decoded this count stores how many
105 * elements are on the remote site. This is used
106 * to estimate the set difference on a site
107 */
108 int remote_decoded_count;
109
110 /**
96 * Xor sums of the elements' keys, used to identify the elements. 111 * Xor sums of the elements' keys, used to identify the elements.
97 * Array of 'size' elements. 112 * Array of 'size' elements.
98 */ 113 */
@@ -125,8 +140,9 @@ struct InvertibleBloomFilter
125void 140void
126ibf_write_slice (const struct InvertibleBloomFilter *ibf, 141ibf_write_slice (const struct InvertibleBloomFilter *ibf,
127 uint32_t start, 142 uint32_t start,
128 uint32_t count, 143 uint64_t count,
129 void *buf); 144 void *buf,
145 uint8_t counter_max_length);
130 146
131 147
132/** 148/**
@@ -140,8 +156,9 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
140void 156void
141ibf_read_slice (const void *buf, 157ibf_read_slice (const void *buf,
142 uint32_t start, 158 uint32_t start,
143 uint32_t count, 159 uint64_t count,
144 struct InvertibleBloomFilter *ibf); 160 struct InvertibleBloomFilter *ibf,
161 uint8_t counter_max_length);
145 162
146 163
147/** 164/**
@@ -244,6 +261,44 @@ ibf_dup (const struct InvertibleBloomFilter *ibf);
244void 261void
245ibf_destroy (struct InvertibleBloomFilter *ibf); 262ibf_destroy (struct InvertibleBloomFilter *ibf);
246 263
264uint8_t
265ibf_get_max_counter (struct InvertibleBloomFilter *ibf);
266
267
268/**
269 * Packs the counter to transmit only the smallest possible amount of bytes and
270 * preventing overflow of the counter
271 * @param ibf the ibf to write
272 * @param start with which bucket to start
273 * @param count how many buckets to write
274 * @param buf buffer to write the data to
275 * @param max bit length of a counter for unpacking
276 */
277
278void
279pack_counter (const struct InvertibleBloomFilter *ibf,
280 uint32_t start,
281 uint64_t count,
282 uint8_t *buf,
283 uint8_t counter_max_length);
284
285/**
286 * Unpacks the counter to transmit only the smallest possible amount of bytes and
287 * preventing overflow of the counter
288 * @param ibf the ibf to write
289 * @param start with which bucket to start
290 * @param count how many buckets to write
291 * @param buf buffer to write the data to
292 * @param max bit length of a counter for unpacking
293 */
294
295void
296unpack_counter (const struct InvertibleBloomFilter *ibf,
297 uint32_t start,
298 uint64_t count,
299 uint8_t *buf,
300 uint8_t counter_max_length);
301
247 302
248#if 0 /* keep Emacsens' auto-indent happy */ 303#if 0 /* keep Emacsens' auto-indent happy */
249{ 304{
diff --git a/src/setu/perf_setu_api.c b/src/setu/perf_setu_api.c
index b273f9c71..af84994f8 100644
--- a/src/setu/perf_setu_api.c
+++ b/src/setu/perf_setu_api.c
@@ -22,11 +22,14 @@
22 * @file set/test_setu_api.c 22 * @file set/test_setu_api.c
23 * @brief testcase for setu_api.c 23 * @brief testcase for setu_api.c
24 * @author Florian Dold 24 * @author Florian Dold
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
28#include "gnunet_testing_lib.h" 29#include "gnunet_testing_lib.h"
29#include "gnunet_setu_service.h" 30#include "gnunet_setu_service.h"
31#include <sys/sysinfo.h>
32#include <pthread.h>
30 33
31 34
32static struct GNUNET_PeerIdentity local_id; 35static struct GNUNET_PeerIdentity local_id;
@@ -50,6 +53,12 @@ static int ret;
50static struct GNUNET_SCHEDULER_Task *tt; 53static struct GNUNET_SCHEDULER_Task *tt;
51 54
52 55
56/**
57 * Handles configuration file for setu performance test
58 *
59 */
60static struct GNUNET_CONFIGURATION_Handle *setu_cfg;
61
53 62
54static void 63static void
55result_cb_set1 (void *cls, 64result_cb_set1 (void *cls,
@@ -57,44 +66,44 @@ result_cb_set1 (void *cls,
57 uint64_t size, 66 uint64_t size,
58 enum GNUNET_SETU_Status status) 67 enum GNUNET_SETU_Status status)
59{ 68{
60 switch (status) 69 switch (status)
70 {
71 case GNUNET_SETU_STATUS_ADD_LOCAL:
72 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
73 break;
74
75 case GNUNET_SETU_STATUS_FAILURE:
76 GNUNET_break (0);
77 oh1 = NULL;
78 fprintf (stderr, "set 1: received failure status!\n");
79 ret = 1;
80 if (NULL != tt)
81 {
82 GNUNET_SCHEDULER_cancel (tt);
83 tt = NULL;
84 }
85 GNUNET_SCHEDULER_shutdown ();
86 break;
87
88 case GNUNET_SETU_STATUS_DONE:
89 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
90 oh1 = NULL;
91 if (NULL != set1)
61 { 92 {
62 case GNUNET_SETU_STATUS_ADD_LOCAL: 93 GNUNET_SETU_destroy (set1);
63 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n"); 94 set1 = NULL;
64 break;
65
66 case GNUNET_SETU_STATUS_FAILURE:
67 GNUNET_break (0);
68 oh1 = NULL;
69 fprintf (stderr, "set 1: received failure status!\n");
70 ret = 1;
71 if (NULL != tt)
72 {
73 GNUNET_SCHEDULER_cancel (tt);
74 tt = NULL;
75 }
76 GNUNET_SCHEDULER_shutdown ();
77 break;
78
79 case GNUNET_SETU_STATUS_DONE:
80 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
81 oh1 = NULL;
82 if (NULL != set1)
83 {
84 GNUNET_SETU_destroy (set1);
85 set1 = NULL;
86 }
87 if (NULL == set2)
88 {
89 GNUNET_SCHEDULER_cancel (tt);
90 tt = NULL;
91 GNUNET_SCHEDULER_shutdown ();
92 }
93 break;
94
95 default:
96 GNUNET_assert (0);
97 } 95 }
96 if (NULL == set2)
97 {
98 GNUNET_SCHEDULER_cancel (tt);
99 tt = NULL;
100 GNUNET_SCHEDULER_shutdown ();
101 }
102 break;
103
104 default:
105 GNUNET_assert (0);
106 }
98} 107}
99 108
100 109
@@ -104,36 +113,36 @@ result_cb_set2 (void *cls,
104 uint64_t size, 113 uint64_t size,
105 enum GNUNET_SETU_Status status) 114 enum GNUNET_SETU_Status status)
106{ 115{
107 switch (status) 116 switch (status)
117 {
118 case GNUNET_SETU_STATUS_ADD_LOCAL:
119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
120 break;
121
122 case GNUNET_SETU_STATUS_FAILURE:
123 GNUNET_break (0);
124 oh2 = NULL;
125 fprintf (stderr, "set 2: received failure status\n");
126 GNUNET_SCHEDULER_shutdown ();
127 ret = 1;
128 break;
129
130 case GNUNET_SETU_STATUS_DONE:
131 oh2 = NULL;
132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
133 GNUNET_SETU_destroy (set2);
134 set2 = NULL;
135 if (NULL == set1)
108 { 136 {
109 case GNUNET_SETU_STATUS_ADD_LOCAL: 137 GNUNET_SCHEDULER_cancel (tt);
110 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n"); 138 tt = NULL;
111 break; 139 GNUNET_SCHEDULER_shutdown ();
112
113 case GNUNET_SETU_STATUS_FAILURE:
114 GNUNET_break (0);
115 oh2 = NULL;
116 fprintf (stderr, "set 2: received failure status\n");
117 GNUNET_SCHEDULER_shutdown ();
118 ret = 1;
119 break;
120
121 case GNUNET_SETU_STATUS_DONE:
122 oh2 = NULL;
123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
124 GNUNET_SETU_destroy (set2);
125 set2 = NULL;
126 if (NULL == set1)
127 {
128 GNUNET_SCHEDULER_cancel (tt);
129 tt = NULL;
130 GNUNET_SCHEDULER_shutdown ();
131 }
132 break;
133
134 default:
135 GNUNET_assert (0);
136 } 140 }
141 break;
142
143 default:
144 GNUNET_assert (0);
145 }
137} 146}
138 147
139 148
@@ -143,14 +152,14 @@ listen_cb (void *cls,
143 const struct GNUNET_MessageHeader *context_msg, 152 const struct GNUNET_MessageHeader *context_msg,
144 struct GNUNET_SETU_Request *request) 153 struct GNUNET_SETU_Request *request)
145{ 154{
146 GNUNET_assert (NULL != context_msg); 155 GNUNET_assert (NULL != context_msg);
147 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); 156 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n"); 157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
149 oh2 = GNUNET_SETU_accept (request, 158 oh2 = GNUNET_SETU_accept (request,
150 (struct GNUNET_SETU_Option[]){ 0 }, 159 (struct GNUNET_SETU_Option[]){ 0 },
151 &result_cb_set2, 160 &result_cb_set2,
152 NULL); 161 NULL);
153 GNUNET_SETU_commit (oh2, set2); 162 GNUNET_SETU_commit (oh2, set2);
154} 163}
155 164
156 165
@@ -162,122 +171,89 @@ listen_cb (void *cls,
162static void 171static void
163start (void *cls) 172start (void *cls)
164{ 173{
165 struct GNUNET_MessageHeader context_msg; 174 struct GNUNET_MessageHeader context_msg;
166 175
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n"); 176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
168 context_msg.size = htons (sizeof context_msg); 177 context_msg.size = htons (sizeof context_msg);
169 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); 178 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
170 listen_handle = GNUNET_SETU_listen (config, 179 listen_handle = GNUNET_SETU_listen (config,
171 &app_id, 180 &app_id,
172 &listen_cb, 181 &listen_cb,
173 NULL); 182 NULL);
174 oh1 = GNUNET_SETU_prepare (&local_id, 183 oh1 = GNUNET_SETU_prepare (&local_id,
175 &app_id, 184 &app_id,
176 &context_msg, 185 &context_msg,
177 (struct GNUNET_SETU_Option[]){ 0 }, 186 (struct GNUNET_SETU_Option[]){ 0 },
178 &result_cb_set1, 187 &result_cb_set1,
179 NULL); 188 NULL);
180 GNUNET_SETU_commit (oh1, set1); 189 GNUNET_SETU_commit (oh1, set1);
181} 190}
182 191
183 192
184/** 193/**
185 * Initialize the second set, continue
186 *
187 * @param cls closure, unused
188 */
189static void
190init_set2 (void *cls)
191{
192 struct GNUNET_SETU_Element element;
193
194 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
195
196 element.element_type = 0;
197 element.data = "hello1";
198 element.size = strlen (element.data);
199 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
200 element.data = "quux";
201 element.size = strlen (element.data);
202 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
203 element.data = "baz";
204 element.size = strlen (element.data);
205 GNUNET_SETU_add_element (set2, &element, &start, NULL);
206}
207
208/**
209 * Generate random byte stream 194 * Generate random byte stream
210 */ 195 */
211 196
212unsigned char *gen_rdm_bytestream (size_t num_bytes) 197unsigned char *
198gen_rdm_bytestream (size_t num_bytes)
213{ 199{
214 unsigned char *stream = GNUNET_malloc (num_bytes); 200 unsigned char *stream = GNUNET_malloc (num_bytes);
215 GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); 201 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes);
216 return stream; 202 return stream;
217} 203}
218 204
205
219/** 206/**
220 * Generate random sets 207 * Generate random sets
221 */ 208 */
222 209
223static void 210static void
224initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes) 211initRandomSets (int overlap, int set1_size, int set2_size, int
212 element_size_in_bytes)
225{ 213{
226 struct GNUNET_SETU_Element element; 214 struct GNUNET_SETU_Element element;
227 element.element_type = 0; 215 element.element_type = 0;
228 216
229 // Add elements to both sets 217 // Add elements to both sets
230 for (int i = 0; i < overlap; i++) { 218 for (int i = 0; i < overlap; i++)
231 element.data = gen_rdm_bytestream(element_size_in_bytes); 219 {
232 element.size = element_size_in_bytes; 220 element.data = gen_rdm_bytestream (element_size_in_bytes);
233 GNUNET_SETU_add_element (set1, &element, NULL, NULL); 221 element.size = element_size_in_bytes;
234 GNUNET_SETU_add_element (set2, &element, NULL, NULL); 222 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
235 set1_size--; 223 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
236 set2_size--; 224 set1_size--;
237 } 225 set2_size--;
238 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); 226 }
239 227 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n");
240 // Add other elements to set 1 228
241 while(set1_size>0) { 229 // Add other elements to set 1
242 element.data = gen_rdm_bytestream(element_size_in_bytes); 230 while (set1_size>0)
243 element.size = element_size_in_bytes; 231 {
244 GNUNET_SETU_add_element (set1, &element, NULL, NULL); 232 element.data = gen_rdm_bytestream (element_size_in_bytes);
245 set1_size--; 233 element.size = element_size_in_bytes;
246 } 234 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
247 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); 235 set1_size--;
248 236 }
249 // Add other elements to set 2 237 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n");
250 while(set2_size > 0) {
251 element.data = gen_rdm_bytestream(element_size_in_bytes);
252 element.size = element_size_in_bytes;
253 238
254 if(set2_size != 1) { 239 // Add other elements to set 2
255 GNUNET_SETU_add_element (set2, &element,NULL, NULL); 240 while (set2_size > 0)
256 } else { 241 {
257 GNUNET_SETU_add_element (set2, &element,&start, NULL); 242 element.data = gen_rdm_bytestream (element_size_in_bytes);
258 } 243 element.size = element_size_in_bytes;
259 244
260 set2_size--; 245 if (set2_size != 1)
246 {
247 GNUNET_SETU_add_element (set2, &element,NULL, NULL);
248 }
249 else
250 {
251 GNUNET_SETU_add_element (set2, &element,&start, NULL);
261 } 252 }
262 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
263}
264
265/**
266 * Initialize the first set, continue.
267 */
268static void
269init_set1 (void)
270{
271 struct GNUNET_SETU_Element element;
272 253
273 element.element_type = 0; 254 set2_size--;
274 element.data = "hello"; 255 }
275 element.size = strlen (element.data); 256 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
276 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
277 element.data = "bar";
278 element.size = strlen (element.data);
279 GNUNET_SETU_add_element (set1, &element, &init_set2, NULL);
280 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
281} 257}
282 258
283 259
@@ -289,10 +265,10 @@ init_set1 (void)
289static void 265static void
290timeout_fail (void *cls) 266timeout_fail (void *cls)
291{ 267{
292 tt = NULL; 268 tt = NULL;
293 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n"); 269 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
294 GNUNET_SCHEDULER_shutdown (); 270 GNUNET_SCHEDULER_shutdown ();
295 ret = 1; 271 ret = 1;
296} 272}
297 273
298 274
@@ -304,36 +280,36 @@ timeout_fail (void *cls)
304static void 280static void
305do_shutdown (void *cls) 281do_shutdown (void *cls)
306{ 282{
307 if (NULL != tt) 283 if (NULL != tt)
308 { 284 {
309 GNUNET_SCHEDULER_cancel (tt); 285 GNUNET_SCHEDULER_cancel (tt);
310 tt = NULL; 286 tt = NULL;
311 } 287 }
312 if (NULL != oh1) 288 if (NULL != oh1)
313 { 289 {
314 GNUNET_SETU_operation_cancel (oh1); 290 GNUNET_SETU_operation_cancel (oh1);
315 oh1 = NULL; 291 oh1 = NULL;
316 } 292 }
317 if (NULL != oh2) 293 if (NULL != oh2)
318 { 294 {
319 GNUNET_SETU_operation_cancel (oh2); 295 GNUNET_SETU_operation_cancel (oh2);
320 oh2 = NULL; 296 oh2 = NULL;
321 } 297 }
322 if (NULL != set1) 298 if (NULL != set1)
323 { 299 {
324 GNUNET_SETU_destroy (set1); 300 GNUNET_SETU_destroy (set1);
325 set1 = NULL; 301 set1 = NULL;
326 } 302 }
327 if (NULL != set2) 303 if (NULL != set2)
328 { 304 {
329 GNUNET_SETU_destroy (set2); 305 GNUNET_SETU_destroy (set2);
330 set2 = NULL; 306 set2 = NULL;
331 } 307 }
332 if (NULL != listen_handle) 308 if (NULL != listen_handle)
333 { 309 {
334 GNUNET_SETU_listen_cancel (listen_handle); 310 GNUNET_SETU_listen_cancel (listen_handle);
335 listen_handle = NULL; 311 listen_handle = NULL;
336 } 312 }
337} 313}
338 314
339 315
@@ -350,79 +326,148 @@ run (void *cls,
350 const struct GNUNET_CONFIGURATION_Handle *cfg, 326 const struct GNUNET_CONFIGURATION_Handle *cfg,
351 struct GNUNET_TESTING_Peer *peer) 327 struct GNUNET_TESTING_Peer *peer)
352{ 328{
353 struct GNUNET_SETU_OperationHandle *my_oh; 329 struct GNUNET_SETU_OperationHandle *my_oh;
354 330
355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
356 "Running preparatory tests\n"); 332 "Running preparatory tests\n");
357 tt = GNUNET_SCHEDULER_add_delayed ( 333 tt = GNUNET_SCHEDULER_add_delayed (
358 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), 334 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
359 &timeout_fail, 335 &timeout_fail,
360 NULL); 336 NULL);
361 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); 337 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
362 338
363 config = cfg; 339 config = cfg;
364 GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg, 340 GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
365 &local_id)); 341 &local_id));
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
367 "my id (from CRYPTO): %s\n", 343 "my id (from CRYPTO): %s\n",
368 GNUNET_i2s (&local_id)); 344 GNUNET_i2s (&local_id));
369 GNUNET_TESTING_peer_get_identity (peer, 345 GNUNET_TESTING_peer_get_identity (peer,
370 &local_id); 346 &local_id);
371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
372 "my id (from TESTING): %s\n", 348 "my id (from TESTING): %s\n",
373 GNUNET_i2s (&local_id)); 349 GNUNET_i2s (&local_id));
374 set1 = GNUNET_SETU_create (cfg); 350 set1 = GNUNET_SETU_create (cfg);
375 set2 = GNUNET_SETU_create (cfg); 351 set2 = GNUNET_SETU_create (cfg);
376 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377 "Created sets %p and %p for union operation\n", 353 "Created sets %p and %p for union operation\n",
378 set1, 354 set1,
379 set2); 355 set2);
380 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); 356 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
381 357
382 /* test if canceling an uncommitted request works! */ 358 /* test if canceling an uncommited request works! */
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "Launching and instantly stopping set operation\n"); 360 "Launching and instantly stopping set operation\n");
385 my_oh = GNUNET_SETU_prepare (&local_id, 361 my_oh = GNUNET_SETU_prepare (&local_id,
386 &app_id, 362 &app_id,
387 NULL, 363 NULL,
388 (struct GNUNET_SETU_Option[]){ 0 }, 364 (struct GNUNET_SETU_Option[]){ 0 },
389 NULL, 365 NULL,
390 NULL); 366 NULL);
391 GNUNET_SETU_operation_cancel (my_oh); 367 GNUNET_SETU_operation_cancel (my_oh);
392 368
393 /* test the real set reconciliation */ 369 /* test the real set reconciliation */
394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
395 "Running real set-reconciliation\n"); 371 "Running real set-reconciliation\n");
396 //init_set1 (); 372 // init_set1 ();
397 // limit ~23800 element total 373 // limit ~23800 element total
398 initRandomSets(50,100,100,128); 374 initRandomSets (490, 500,500,32);
399} 375}
400 376
401static void execute_perf() 377
378void
379perf_thread ()
402{ 380{
403 for( int repeat_ctr = 0; repeat_ctr<1; repeat_ctr++ ) { 381 GNUNET_TESTING_service_run ("perf_setu_api",
382 "arm",
383 "test_setu.conf",
384 &run,
385 NULL);
386
387}
404 388
405 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
406 "Executing perf round %d\n", repeat_ctr);
407 389
408 GNUNET_TESTING_service_run ("perf_setu_api", 390static void
409 "arm", 391run_petf_thread (int total_runs)
410 "test_setu.conf", 392{
411 &run, 393 int core_count = get_nprocs_conf ();
412 NULL); 394 pid_t child_pid, wpid;
395 int status = 0;
396
397// Father code (before child processes start)
398 for (int processed = 0; processed < total_runs;)
399 {
400 for (int id = 0; id < core_count; id++)
401 {
402 if (processed >= total_runs)
403 break;
404
405 if ((child_pid = fork ()) == 0)
406 {
407 perf_thread ();
408 exit (0);
409 }
410 processed += 1;
413 } 411 }
414 return 0; 412 while ((wpid = wait (&status)) > 0)
413 ;
414
415 }
415} 416}
416 417
417 418
419static void
420execute_perf ()
421{
422
423 /**
424 * Erase statfile
425 */
426 remove ("perf_stats.csv");
427 remove ("perf_failure_bucket_number_factor.csv");
428 for (int out_out_ctr = 3; out_out_ctr <= 3; out_out_ctr++)
429 {
430
431 for (int out_ctr = 20; out_ctr <= 20; out_ctr++)
432 {
433 float base = 0.1;
434 float x = out_ctr * base;
435 char factor[10];
436 char *buffer = gcvt (x, 4, factor);
437 setu_cfg = GNUNET_CONFIGURATION_create ();
438 GNUNET_CONFIGURATION_set_value_string (setu_cfg, "IBF",
439 "BUCKET_NUMBER_FACTOR",
440 buffer); // Factor default=4
441 GNUNET_CONFIGURATION_set_value_number (setu_cfg, "IBF",
442 "NUMBER_PER_BUCKET", 3); // K default=4
443 GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE",
444 "TRADEOFF", "2"); // default=0.25
445 GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE",
446 "MAX_SET_DIFF_FACTOR_DIFFERENTIAL",
447 "20000"); // default=0.25
448 GNUNET_CONFIGURATION_set_value_number (setu_cfg, "BOUNDARIES",
449 "UPPER_ELEMENT", 5000);
450
451
452 if (GNUNET_OK != GNUNET_CONFIGURATION_write (setu_cfg, "perf_setu.conf"))
453 GNUNET_log (
454 GNUNET_ERROR_TYPE_ERROR,
455 _ ("Failed to write subsystem default identifier map'.\n"));
456 run_petf_thread (100);
457 }
458
459 }
460 return;
461}
462
418 463
419int 464int
420main (int argc, char **argv) 465main (int argc, char **argv)
421{ 466{
422 GNUNET_log_setup ("perf_setu_api",
423 "WARNING",
424 NULL);
425 467
426 execute_perf(); 468 GNUNET_log_setup ("perf_setu_api",
427 return 0; 469 "WARNING",
470 NULL);
471 execute_perf ();
472 return 0;
428} 473}
diff --git a/src/setu/setu.h b/src/setu/setu.h
index 7c2a98a02..7b606f12c 100644
--- a/src/setu/setu.h
+++ b/src/setu/setu.h
@@ -122,6 +122,31 @@ struct GNUNET_SETU_AcceptMessage
122 */ 122 */
123 uint32_t byzantine_lower_bound; 123 uint32_t byzantine_lower_bound;
124 124
125
126 /**
127 * Upper bound for the set size, used only when
128 * byzantine mode is enabled.
129 */
130 uint64_t byzantine_upper_bond;
131
132 /**
133 * Bandwidth latency tradeoff determines how much bytes a single RTT is
134 * worth, which is a performance setting
135 */
136 uint64_t bandwidth_latency_tradeoff;
137
138 /**
139 * The factor determines the number of buckets an IBF has which is
140 * multiplied by the estimated setsize default: 2
141 */
142 uint64_t ibf_bucket_number_factor;
143
144 /**
145 * This setting determines to how many IBF buckets an single elements
146 * is mapped to.
147 */
148 uint64_t ibf_number_of_buckets_per_element;
149
125}; 150};
126 151
127 152
@@ -226,6 +251,30 @@ struct GNUNET_SETU_EvaluateMessage
226 */ 251 */
227 uint32_t byzantine_lower_bound; 252 uint32_t byzantine_lower_bound;
228 253
254 /**
255 * Upper bound for the set size, used only when
256 * byzantine mode is enabled.
257 */
258 uint64_t byzantine_upper_bond;
259
260 /**
261 * Bandwidth latency tradeoff determines how much bytes a single RTT is
262 * worth, which is a performance setting
263 */
264 uint64_t bandwidth_latency_tradeoff;
265
266 /**
267 * The factor determines the number of buckets an IBF has which is
268 * multiplied by the estimated setsize default: 2
269 */
270 uint64_t ibf_bucket_number_factor;
271
272 /**
273 * This setting determines to how many IBF buckets an single elements
274 * is mapped to.
275 */
276 uint64_t ibf_number_of_buckets_per_element;
277
229 /* rest: context message, that is, application-specific 278 /* rest: context message, that is, application-specific
230 message to convince listener to pick up */ 279 message to convince listener to pick up */
231}; 280};
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c
index 0a09b18b2..faa57aaba 100644
--- a/src/setu/setu_api.c
+++ b/src/setu/setu_api.c
@@ -22,6 +22,7 @@
22 * @brief api for the set union service 22 * @brief api for the set union service
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Elias Summermatter
25 */ 26 */
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
@@ -526,6 +527,14 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
526 context_msg); 527 context_msg);
527 msg->app_id = *app_id; 528 msg->app_id = *app_id;
528 msg->target_peer = *other_peer; 529 msg->target_peer = *other_peer;
530
531 /* Set default values */
532 msg->byzantine_upper_bond = UINT64_MAX;
533 msg->bandwidth_latency_tradeoff = 0;
534 msg->ibf_bucket_number_factor = 2;
535 msg->ibf_number_of_buckets_per_element = 3;
536
537
529 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) 538 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
530 { 539 {
531 switch (opt->type) 540 switch (opt->type)
@@ -534,6 +543,18 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
534 msg->byzantine = GNUNET_YES; 543 msg->byzantine = GNUNET_YES;
535 msg->byzantine_lower_bound = htonl (opt->v.num); 544 msg->byzantine_lower_bound = htonl (opt->v.num);
536 break; 545 break;
546 case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND:
547 msg->byzantine_upper_bond = htonl (opt->v.num);
548 break;
549 case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF:
550 msg->bandwidth_latency_tradeoff = htonl (opt->v.num);
551 break;
552 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR:
553 msg->ibf_bucket_number_factor = htonl (opt->v.num);
554 break;
555 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT:
556 msg->ibf_number_of_buckets_per_element = htonl (opt->v.num);
557 break;
537 case GNUNET_SETU_OPTION_FORCE_FULL: 558 case GNUNET_SETU_OPTION_FORCE_FULL:
538 msg->force_full = GNUNET_YES; 559 msg->force_full = GNUNET_YES;
539 break; 560 break;
@@ -788,6 +809,13 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
788 mqm = GNUNET_MQ_msg (msg, 809 mqm = GNUNET_MQ_msg (msg,
789 GNUNET_MESSAGE_TYPE_SETU_ACCEPT); 810 GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
790 msg->accept_reject_id = htonl (request->accept_id); 811 msg->accept_reject_id = htonl (request->accept_id);
812
813 /* Set default values */
814 msg->byzantine_upper_bond = UINT64_MAX;
815 msg->bandwidth_latency_tradeoff = 0;
816 msg->ibf_bucket_number_factor = 2;
817 msg->ibf_number_of_buckets_per_element = 3;
818
791 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) 819 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
792 { 820 {
793 switch (opt->type) 821 switch (opt->type)
@@ -796,6 +824,18 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
796 msg->byzantine = GNUNET_YES; 824 msg->byzantine = GNUNET_YES;
797 msg->byzantine_lower_bound = htonl (opt->v.num); 825 msg->byzantine_lower_bound = htonl (opt->v.num);
798 break; 826 break;
827 case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND:
828 msg->byzantine_upper_bond = htonl (opt->v.num);
829 break;
830 case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF:
831 msg->bandwidth_latency_tradeoff = htonl (opt->v.num);
832 break;
833 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR:
834 msg->ibf_bucket_number_factor = htonl (opt->v.num);
835 break;
836 case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT:
837 msg->ibf_number_of_buckets_per_element = htonl (opt->v.num);
838 break;
799 case GNUNET_SETU_OPTION_FORCE_FULL: 839 case GNUNET_SETU_OPTION_FORCE_FULL:
800 msg->force_full = GNUNET_YES; 840 msg->force_full = GNUNET_YES;
801 break; 841 break;
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c
index 2fb7d015e..5a0c9d70d 100644
--- a/src/setu/test_setu_api.c
+++ b/src/setu/test_setu_api.c
@@ -204,62 +204,6 @@ init_set2 (void *cls)
204 GNUNET_SETU_add_element (set2, &element, &start, NULL); 204 GNUNET_SETU_add_element (set2, &element, &start, NULL);
205} 205}
206 206
207/**
208 * Generate random byte stream
209 */
210
211unsigned char *gen_rdm_bytestream (size_t num_bytes)
212{
213 unsigned char *stream = GNUNET_malloc (num_bytes);
214 GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes);
215 return stream;
216}
217
218/**
219 * Generate random sets
220 */
221
222static void
223initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes)
224{
225 struct GNUNET_SETU_Element element;
226 element.element_type = 0;
227
228 // Add elements to both sets
229 for (int i = 0; i < overlap; i++) {
230 element.data = gen_rdm_bytestream(element_size_in_bytes);
231 element.size = element_size_in_bytes;
232 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
233 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
234 set1_size--;
235 set2_size--;
236 }
237 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n");
238
239 // Add other elements to set 1
240 while(set1_size>0) {
241 element.data = gen_rdm_bytestream(element_size_in_bytes);
242 element.size = element_size_in_bytes;
243 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
244 set1_size--;
245 }
246 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n");
247
248 // Add other elements to set 2
249 while(set2_size > 0) {
250 element.data = gen_rdm_bytestream(element_size_in_bytes);
251 element.size = element_size_in_bytes;
252
253 if(set2_size != 1) {
254 GNUNET_SETU_add_element (set2, &element,NULL, NULL);
255 } else {
256 GNUNET_SETU_add_element (set2, &element,&start, NULL);
257 }
258
259 set2_size--;
260 }
261 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
262}
263 207
264/** 208/**
265 * Initialize the first set, continue. 209 * Initialize the first set, continue.
@@ -392,9 +336,7 @@ run (void *cls,
392 /* test the real set reconciliation */ 336 /* test the real set reconciliation */
393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
394 "Running real set-reconciliation\n"); 338 "Running real set-reconciliation\n");
395 //init_set1 (); 339 init_set1 ();
396 initRandomSets(19500,20000,20000,4096);
397 //initRandomSets(19500,20000,20000,32);
398} 340}
399 341
400 342