diff options
-rw-r--r-- | src/include/gnunet_protocols.h | 7 | ||||
-rw-r--r-- | src/include/gnunet_setu_service.h | 26 | ||||
-rw-r--r-- | src/set/ibf.c | 2 | ||||
-rw-r--r-- | src/set/ibf.h | 1 | ||||
-rw-r--r-- | src/setu/Makefile.am | 11 | ||||
-rw-r--r-- | src/setu/gnunet-service-setu.c | 2081 | ||||
-rw-r--r-- | src/setu/gnunet-service-setu_protocol.h | 77 | ||||
-rw-r--r-- | src/setu/gnunet-service-setu_strata_estimator.c | 362 | ||||
-rw-r--r-- | src/setu/gnunet-service-setu_strata_estimator.h | 54 | ||||
-rw-r--r-- | src/setu/ibf.c | 294 | ||||
-rw-r--r-- | src/setu/ibf.h | 65 | ||||
-rw-r--r-- | src/setu/perf_setu_api.c | 571 | ||||
-rw-r--r-- | src/setu/setu.h | 49 | ||||
-rw-r--r-- | src/setu/setu_api.c | 40 | ||||
-rw-r--r-- | src/setu/test_setu_api.c | 60 |
15 files changed, 2921 insertions, 779 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 715e94c6a..6b61dfc72 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -1784,6 +1784,13 @@ extern "C" { | |||
1784 | */ | 1784 | */ |
1785 | #define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572 | 1785 | #define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572 |
1786 | 1786 | ||
1787 | /** | ||
1788 | * Signals other peer that all elements are sent. | ||
1789 | */ | ||
1790 | |||
1791 | #define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL 710 | ||
1792 | |||
1793 | |||
1787 | 1794 | ||
1788 | /******************************************************************************* | 1795 | /******************************************************************************* |
1789 | * SETI message types | 1796 | * SETI message types |
diff --git a/src/include/gnunet_setu_service.h b/src/include/gnunet_setu_service.h index bacec9408..1d7e48402 100644 --- a/src/include/gnunet_setu_service.h +++ b/src/include/gnunet_setu_service.h | |||
@@ -163,7 +163,31 @@ enum GNUNET_SETU_OptionType | |||
163 | /** | 163 | /** |
164 | * Notify client also if we are sending a value to the other peer. | 164 | * Notify client also if we are sending a value to the other peer. |
165 | */ | 165 | */ |
166 | GNUNET_SETU_OPTION_SYMMETRIC = 8 | 166 | GNUNET_SETU_OPTION_SYMMETRIC = 8, |
167 | |||
168 | /** | ||
169 | * Byzantine upper bound. Is the maximal plausible number of elements | ||
170 | * a peer can have default max uint64 | ||
171 | */ | ||
172 | GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND = 16, | ||
173 | |||
174 | /** | ||
175 | * Bandwidth latency tradeoff determines how much bytes a single RTT is | ||
176 | * worth, which is a performance setting | ||
177 | */ | ||
178 | GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF= 32, | ||
179 | |||
180 | /** | ||
181 | * The factor determines the number of buckets an IBF has which is | ||
182 | * multiplied by the estimated setsize default: 2 | ||
183 | */ | ||
184 | GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR= 64, | ||
185 | |||
186 | /** | ||
187 | * This setting determines to how many IBF buckets an single elements | ||
188 | * is mapped to. | ||
189 | */ | ||
190 | GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT= 128 | ||
167 | }; | 191 | }; |
168 | 192 | ||
169 | 193 | ||
diff --git a/src/set/ibf.c b/src/set/ibf.c index 1532afceb..0f7eb6a9f 100644 --- a/src/set/ibf.c +++ b/src/set/ibf.c | |||
@@ -294,7 +294,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, | |||
294 | struct IBF_KeyHash *key_hash_dst; | 294 | struct IBF_KeyHash *key_hash_dst; |
295 | struct IBF_Count *count_dst; | 295 | struct IBF_Count *count_dst; |
296 | 296 | ||
297 | GNUNET_assert (start + count <= ibf->size); | 297 | GNUNET_assert (start + count <= ibf->size); |
298 | 298 | ||
299 | /* copy keys */ | 299 | /* copy keys */ |
300 | key_dst = (struct IBF_Key *) buf; | 300 | key_dst = (struct IBF_Key *) buf; |
diff --git a/src/set/ibf.h b/src/set/ibf.h index 7c2ab33b1..334a797ef 100644 --- a/src/set/ibf.h +++ b/src/set/ibf.h | |||
@@ -245,6 +245,7 @@ void | |||
245 | ibf_destroy (struct InvertibleBloomFilter *ibf); | 245 | ibf_destroy (struct InvertibleBloomFilter *ibf); |
246 | 246 | ||
247 | 247 | ||
248 | |||
248 | #if 0 /* keep Emacsens' auto-indent happy */ | 249 | #if 0 /* keep Emacsens' auto-indent happy */ |
249 | { | 250 | { |
250 | #endif | 251 | #endif |
diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am index 77d048add..6e2865d8c 100644 --- a/src/setu/Makefile.am +++ b/src/setu/Makefile.am | |||
@@ -7,6 +7,8 @@ libexecdir= $(pkglibdir)/libexec/ | |||
7 | 7 | ||
8 | plugindir = $(libdir)/gnunet | 8 | plugindir = $(libdir)/gnunet |
9 | 9 | ||
10 | PTHREAD = -lpthread | ||
11 | |||
10 | pkgcfg_DATA = \ | 12 | pkgcfg_DATA = \ |
11 | setu.conf | 13 | setu.conf |
12 | 14 | ||
@@ -63,7 +65,8 @@ libgnunetsetu_la_SOURCES = \ | |||
63 | setu_api.c setu.h | 65 | setu_api.c setu.h |
64 | libgnunetsetu_la_LIBADD = \ | 66 | libgnunetsetu_la_LIBADD = \ |
65 | $(top_builddir)/src/util/libgnunetutil.la \ | 67 | $(top_builddir)/src/util/libgnunetutil.la \ |
66 | $(LTLIBINTL) | 68 | $(LTLIBINTL) \ |
69 | $(PTHREAD) | ||
67 | libgnunetsetu_la_LDFLAGS = \ | 70 | libgnunetsetu_la_LDFLAGS = \ |
68 | $(GN_LIB_LDFLAGS) | 71 | $(GN_LIB_LDFLAGS) |
69 | 72 | ||
@@ -91,7 +94,8 @@ perf_setu_api_SOURCES = \ | |||
91 | perf_setu_api_LDADD = \ | 94 | perf_setu_api_LDADD = \ |
92 | $(top_builddir)/src/util/libgnunetutil.la \ | 95 | $(top_builddir)/src/util/libgnunetutil.la \ |
93 | $(top_builddir)/src/testing/libgnunettesting.la \ | 96 | $(top_builddir)/src/testing/libgnunettesting.la \ |
94 | libgnunetsetu.la | 97 | libgnunetsetu.la \ |
98 | $(PTHREAD) | ||
95 | 99 | ||
96 | 100 | ||
97 | plugin_LTLIBRARIES = \ | 101 | plugin_LTLIBRARIES = \ |
@@ -103,7 +107,8 @@ libgnunet_plugin_block_setu_test_la_LIBADD = \ | |||
103 | $(top_builddir)/src/block/libgnunetblock.la \ | 107 | $(top_builddir)/src/block/libgnunetblock.la \ |
104 | $(top_builddir)/src/block/libgnunetblockgroup.la \ | 108 | $(top_builddir)/src/block/libgnunetblockgroup.la \ |
105 | $(top_builddir)/src/util/libgnunetutil.la \ | 109 | $(top_builddir)/src/util/libgnunetutil.la \ |
106 | $(LTLIBINTL) | 110 | $(LTLIBINTL) \ |
111 | $(PTHREAD) | ||
107 | libgnunet_plugin_block_setu_test_la_LDFLAGS = \ | 112 | libgnunet_plugin_block_setu_test_la_LDFLAGS = \ |
108 | $(GN_PLUGIN_LDFLAGS) | 113 | $(GN_PLUGIN_LDFLAGS) |
109 | 114 | ||
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c index bd1113f15..a51e92b71 100644 --- a/src/setu/gnunet-service-setu.c +++ b/src/setu/gnunet-service-setu.c | |||
@@ -22,6 +22,7 @@ | |||
22 | * @brief set union operation | 22 | * @brief set union operation |
23 | * @author Florian Dold | 23 | * @author Florian Dold |
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | #include "platform.h" | 27 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
@@ -50,33 +51,61 @@ | |||
50 | */ | 51 | */ |
51 | #define SE_STRATA_COUNT 32 | 52 | #define SE_STRATA_COUNT 32 |
52 | 53 | ||
54 | |||
53 | /** | 55 | /** |
54 | * Size of the IBFs in the strata estimator. | 56 | * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 |
57 | * Based on the bsc thesis of Elias Summermatter (2021) | ||
55 | */ | 58 | */ |
56 | #define SE_IBF_SIZE 80 | 59 | #define SE_IBFS_TOTAL_SIZE 632 |
57 | 60 | ||
58 | /** | 61 | /** |
59 | * The hash num parameter for the difference digests and strata estimators. | 62 | * The hash num parameter for the difference digests and strata estimators. |
60 | */ | 63 | */ |
61 | #define SE_IBF_HASH_NUM 4 | 64 | #define SE_IBF_HASH_NUM 3 |
62 | 65 | ||
63 | /** | 66 | /** |
64 | * Number of buckets that can be transmitted in one message. | 67 | * Number of buckets that can be transmitted in one message. |
65 | */ | 68 | */ |
66 | #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) | 69 | #define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE) |
67 | 70 | ||
68 | /** | 71 | /** |
69 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | 72 | * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20. |
70 | * Choose this value so that computing the IBF is still cheaper | 73 | * Choose this value so that computing the IBF is still cheaper |
71 | * than transmitting all values. | 74 | * than transmitting all values. |
72 | */ | 75 | */ |
73 | #define MAX_IBF_ORDER (20) | 76 | #define MAX_IBF_SIZE 1048576 |
77 | |||
78 | |||
79 | /** | ||
80 | * Minimal size of an ibf | ||
81 | * Based on the bsc thesis of Elias Summermatter (2021) | ||
82 | */ | ||
83 | #define IBF_MIN_SIZE 37 | ||
84 | |||
85 | /** | ||
86 | * AVG RTT for differential sync when k=2 and Factor = 2 | ||
87 | * Based on the bsc thesis of Elias Summermatter (2021) | ||
88 | */ | ||
89 | #define DIFFERENTIAL_RTT_MEAN 3.65145 | ||
90 | |||
91 | /** | ||
92 | * Security level used for byzantine checks (2^80) | ||
93 | */ | ||
94 | |||
95 | #define SECURITY_LEVEL 80 | ||
96 | |||
97 | /** | ||
98 | * Is the estimated probabily for a new round this values | ||
99 | * is based on the bsc thesis of Elias Summermatter (2021) | ||
100 | */ | ||
101 | |||
102 | #define PROBABILITY_FOR_NEW_ROUND 0.15 | ||
74 | 103 | ||
75 | /** | 104 | /** |
76 | * Number of buckets used in the ibf per estimated | 105 | * Measure the performance in a csv |
77 | * difference. | ||
78 | */ | 106 | */ |
79 | #define IBF_ALPHA 4 | 107 | |
108 | #define MEASURE_PERFORMANCE 0 | ||
80 | 109 | ||
81 | 110 | ||
82 | /** | 111 | /** |
@@ -146,6 +175,28 @@ enum UnionOperationPhase | |||
146 | PHASE_FULL_RECEIVING | 175 | PHASE_FULL_RECEIVING |
147 | }; | 176 | }; |
148 | 177 | ||
178 | /** | ||
179 | * Different modes of operations | ||
180 | */ | ||
181 | |||
182 | enum MODE_OF_OPERATION | ||
183 | { | ||
184 | /** | ||
185 | * Mode just synchronizes the difference between sets | ||
186 | */ | ||
187 | DIFFERENTIAL_SYNC, | ||
188 | |||
189 | /** | ||
190 | * Mode send full set sending local set first | ||
191 | */ | ||
192 | FULL_SYNC_LOCAL_SENDING_FIRST, | ||
193 | |||
194 | /** | ||
195 | * Mode request full set from remote peer | ||
196 | */ | ||
197 | FULL_SYNC_REMOTE_SENDING_FIRST | ||
198 | }; | ||
199 | |||
149 | 200 | ||
150 | /** | 201 | /** |
151 | * Information about an element element in the set. All elements are | 202 | * Information about an element element in the set. All elements are |
@@ -277,7 +328,7 @@ struct Operation | |||
277 | * Copy of the set's strata estimator at the time of | 328 | * Copy of the set's strata estimator at the time of |
278 | * creation of this operation. | 329 | * creation of this operation. |
279 | */ | 330 | */ |
280 | struct StrataEstimator *se; | 331 | struct MultiStrataEstimator *se; |
281 | 332 | ||
282 | /** | 333 | /** |
283 | * The IBF we currently receive. | 334 | * The IBF we currently receive. |
@@ -320,7 +371,7 @@ struct Operation | |||
320 | /** | 371 | /** |
321 | * Number of ibf buckets already received into the @a remote_ibf. | 372 | * Number of ibf buckets already received into the @a remote_ibf. |
322 | */ | 373 | */ |
323 | unsigned int ibf_buckets_received; | 374 | uint64_t ibf_buckets_received; |
324 | 375 | ||
325 | /** | 376 | /** |
326 | * Salt that we're using for sending IBFs | 377 | * Salt that we're using for sending IBFs |
@@ -386,7 +437,7 @@ struct Operation | |||
386 | * Lower bound for the set size, used only when | 437 | * Lower bound for the set size, used only when |
387 | * byzantine mode is enabled. | 438 | * byzantine mode is enabled. |
388 | */ | 439 | */ |
389 | int byzantine_lower_bound; | 440 | uint64_t byzantine_lower_bound; |
390 | 441 | ||
391 | /** | 442 | /** |
392 | * Unique request id for the request from a remote peer, sent to the | 443 | * Unique request id for the request from a remote peer, sent to the |
@@ -401,21 +452,83 @@ struct Operation | |||
401 | */ | 452 | */ |
402 | unsigned int generation_created; | 453 | unsigned int generation_created; |
403 | 454 | ||
455 | |||
456 | /** | ||
457 | * User defined Bandwidth Round Trips Tradeoff | ||
458 | */ | ||
459 | uint64_t rtt_bandwidth_tradeoff; | ||
460 | |||
461 | |||
404 | /** | 462 | /** |
405 | * User defined Bandwidth Round Trips Tradeoff | 463 | * Number of Element per bucket in IBF |
464 | */ | ||
465 | uint8_t ibf_number_buckets_per_element; | ||
466 | |||
467 | |||
468 | /** | ||
469 | * Set difference is multiplied with this factor | ||
470 | * to gennerate large enought IBF | ||
406 | */ | 471 | */ |
407 | double rtt_bandwidth_tradeoff; | 472 | uint8_t ibf_bucket_number_factor; |
408 | 473 | ||
409 | /** | 474 | /** |
410 | * Number of Element per bucket in IBF | 475 | * Defines which site a client is |
476 | * 0 = Initiating peer | ||
477 | * 1 = Receiving peer | ||
411 | */ | 478 | */ |
412 | unsigned int ibf_number_buckets_per_element; | 479 | uint8_t peer_site; |
480 | |||
413 | 481 | ||
414 | /** | 482 | /** |
415 | * Number of buckets in IBF | 483 | * Local peer element count |
416 | */ | 484 | */ |
417 | unsigned ibf_bucket_number; | 485 | uint64_t local_element_count; |
418 | 486 | ||
487 | /** | ||
488 | * Mode of operation that was chosen by the algorithm | ||
489 | */ | ||
490 | uint8_t mode_of_operation; | ||
491 | |||
492 | /** | ||
493 | * Hashmap to keep track of the send/received messages | ||
494 | */ | ||
495 | struct GNUNET_CONTAINER_MultiHashMap *message_control_flow; | ||
496 | |||
497 | /** | ||
498 | * Hashmap to keep track of the send/received inquiries (ibf keys) | ||
499 | */ | ||
500 | struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent; | ||
501 | |||
502 | |||
503 | /** | ||
504 | * Total size of local set | ||
505 | */ | ||
506 | uint64_t total_elements_size_local; | ||
507 | |||
508 | /** | ||
509 | * Limit of number of elements in set | ||
510 | */ | ||
511 | uint64_t byzantine_upper_bound; | ||
512 | |||
513 | /** | ||
514 | * is the count of already passed differential sync iterations | ||
515 | */ | ||
516 | uint8_t differential_sync_iterations; | ||
517 | |||
518 | /** | ||
519 | * Estimated or committed set difference at the start | ||
520 | */ | ||
521 | uint64_t remote_set_diff; | ||
522 | |||
523 | /** | ||
524 | * Estimated or committed set difference at the start | ||
525 | */ | ||
526 | uint64_t local_set_diff; | ||
527 | |||
528 | /** | ||
529 | * Boolean to enforce an active passive switch | ||
530 | */ | ||
531 | bool active_passive_switch_required; | ||
419 | }; | 532 | }; |
420 | 533 | ||
421 | 534 | ||
@@ -431,6 +544,16 @@ struct SetContent | |||
431 | struct GNUNET_CONTAINER_MultiHashMap *elements; | 544 | struct GNUNET_CONTAINER_MultiHashMap *elements; |
432 | 545 | ||
433 | /** | 546 | /** |
547 | * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized. | ||
548 | */ | ||
549 | struct GNUNET_CONTAINER_MultiHashMap *elements_randomized; | ||
550 | |||
551 | /** | ||
552 | * Salt to construct the randomized element map | ||
553 | */ | ||
554 | uint64_t elements_randomized_salt; | ||
555 | |||
556 | /** | ||
434 | * Number of references to the content. | 557 | * Number of references to the content. |
435 | */ | 558 | */ |
436 | unsigned int refcount; | 559 | unsigned int refcount; |
@@ -478,7 +601,7 @@ struct Set | |||
478 | * The strata estimator is only generated once for each set. The IBF keys | 601 | * The strata estimator is only generated once for each set. The IBF keys |
479 | * are derived from the element hashes with salt=0. | 602 | * are derived from the element hashes with salt=0. |
480 | */ | 603 | */ |
481 | struct StrataEstimator *se; | 604 | struct MultiStrataEstimator *se; |
482 | 605 | ||
483 | /** | 606 | /** |
484 | * Evaluate operations are held in a linked list. | 607 | * Evaluate operations are held in a linked list. |
@@ -635,96 +758,679 @@ static int in_shutdown; | |||
635 | */ | 758 | */ |
636 | static uint32_t suggest_id; | 759 | static uint32_t suggest_id; |
637 | 760 | ||
761 | #if MEASURE_PERFORMANCE | ||
762 | /** | ||
763 | * Handles configuration file for setu performance test | ||
764 | * | ||
765 | */ | ||
766 | static const struct GNUNET_CONFIGURATION_Handle *setu_cfg; | ||
767 | |||
638 | 768 | ||
639 | /** | 769 | /** |
640 | * Added Roundtripscounter | 770 | * Stores the performance data for induvidual message |
641 | */ | 771 | */ |
642 | 772 | ||
643 | 773 | ||
644 | struct perf_num_send_resived_msg { | 774 | struct perf_num_send_received_msg |
645 | int sent; | 775 | { |
646 | int sent_var_bytes; | 776 | uint64_t sent; |
647 | int received; | 777 | uint64_t sent_var_bytes; |
648 | int received_var_bytes; | 778 | uint64_t received; |
779 | uint64_t received_var_bytes; | ||
649 | }; | 780 | }; |
650 | 781 | ||
782 | /** | ||
783 | * Main struct to messure perfomance (data/rtts) | ||
784 | */ | ||
785 | struct per_store_struct | ||
786 | { | ||
787 | struct perf_num_send_received_msg operation_request; | ||
788 | struct perf_num_send_received_msg se; | ||
789 | struct perf_num_send_received_msg request_full; | ||
790 | struct perf_num_send_received_msg element_full; | ||
791 | struct perf_num_send_received_msg full_done; | ||
792 | struct perf_num_send_received_msg ibf; | ||
793 | struct perf_num_send_received_msg inquery; | ||
794 | struct perf_num_send_received_msg element; | ||
795 | struct perf_num_send_received_msg demand; | ||
796 | struct perf_num_send_received_msg offer; | ||
797 | struct perf_num_send_received_msg done; | ||
798 | struct perf_num_send_received_msg over; | ||
799 | uint64_t se_diff; | ||
800 | uint64_t se_diff_remote; | ||
801 | uint64_t se_diff_local; | ||
802 | uint64_t active_passive_switches; | ||
803 | uint8_t mode_of_operation; | ||
804 | }; | ||
651 | 805 | ||
652 | struct perf_rtt_struct | 806 | struct per_store_struct perf_store; |
653 | { | 807 | #endif |
654 | struct perf_num_send_resived_msg operation_request; | 808 | |
655 | struct perf_num_send_resived_msg se; | 809 | /** |
656 | struct perf_num_send_resived_msg request_full; | 810 | * Different states to control the messages flow in differential mode |
657 | struct perf_num_send_resived_msg element_full; | 811 | */ |
658 | struct perf_num_send_resived_msg full_done; | 812 | |
659 | struct perf_num_send_resived_msg ibf; | 813 | enum MESSAGE_CONTROL_FLOW_STATE |
660 | struct perf_num_send_resived_msg inquery; | 814 | { |
661 | struct perf_num_send_resived_msg element; | 815 | /** |
662 | struct perf_num_send_resived_msg demand; | 816 | * Initial message state |
663 | struct perf_num_send_resived_msg offer; | 817 | */ |
664 | struct perf_num_send_resived_msg done; | 818 | MSG_CFS_UNINITIALIZED, |
665 | struct perf_num_send_resived_msg over; | 819 | |
820 | /** | ||
821 | * Track that a message has been sent | ||
822 | */ | ||
823 | MSG_CFS_SENT, | ||
824 | |||
825 | /** | ||
826 | * Track that receiving this message is expected | ||
827 | */ | ||
828 | MSG_CFS_EXPECTED, | ||
829 | |||
830 | /** | ||
831 | * Track that message has been recieved | ||
832 | */ | ||
833 | MSG_CFS_RECEIVED, | ||
834 | }; | ||
835 | |||
836 | /** | ||
837 | * Message types to track in message control flow | ||
838 | */ | ||
839 | |||
840 | enum MESSAGE_TYPE | ||
841 | { | ||
842 | /** | ||
843 | * Offer message type | ||
844 | */ | ||
845 | OFFER_MESSAGE, | ||
846 | /** | ||
847 | * Demand message type | ||
848 | */ | ||
849 | DEMAND_MESSAGE, | ||
850 | /** | ||
851 | * Elemente message type | ||
852 | */ | ||
853 | ELEMENT_MESSAGE, | ||
666 | }; | 854 | }; |
667 | 855 | ||
668 | struct perf_rtt_struct perf_rtt; | 856 | /** |
857 | * Struct to tracked messages in message controll flow | ||
858 | */ | ||
859 | |||
860 | struct messageControlFlowElement | ||
861 | { | ||
862 | /** | ||
863 | * Track the message control state of the offer message | ||
864 | */ | ||
865 | enum MESSAGE_CONTROL_FLOW_STATE offer; | ||
866 | /** | ||
867 | * Track the message control state of the demand message | ||
868 | */ | ||
869 | enum MESSAGE_CONTROL_FLOW_STATE demand; | ||
870 | /** | ||
871 | * Track the message control state of the element message | ||
872 | */ | ||
873 | enum MESSAGE_CONTROL_FLOW_STATE element; | ||
874 | }; | ||
875 | |||
876 | |||
877 | #if MEASURE_PERFORMANCE | ||
878 | /** | ||
879 | * Loads different configuration to do perform perfomance tests | ||
880 | * @param op | ||
881 | */ | ||
882 | static void | ||
883 | load_config (struct Operation *op) | ||
884 | { | ||
885 | |||
886 | setu_cfg = GNUNET_CONFIGURATION_create (); | ||
887 | GNUNET_CONFIGURATION_load (setu_cfg,"perf_setu.conf"); | ||
888 | |||
889 | |||
890 | long long number; | ||
891 | float fl; | ||
892 | GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR", | ||
893 | &fl); | ||
894 | op->ibf_bucket_number_factor = fl; | ||
895 | |||
896 | GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET", | ||
897 | &number); | ||
898 | op->ibf_number_buckets_per_element = number; | ||
899 | |||
900 | GNUNET_CONFIGURATION_get_value_number (setu_cfg,"PERFORMANCE", "TRADEOFF", | ||
901 | &number); | ||
902 | op->rtt_bandwidth_tradeoff = number; | ||
903 | |||
904 | |||
905 | GNUNET_CONFIGURATION_get_value_number (setu_cfg,"BOUNDARIES", "UPPER_ELEMENT", | ||
906 | &number); | ||
907 | op->byzantine_upper_bound = number; | ||
908 | |||
909 | |||
910 | op->peer_site = 0; | ||
911 | } | ||
669 | 912 | ||
670 | 913 | ||
914 | /** | ||
915 | * Function to calculate total bytes used for performance messurement | ||
916 | * @param size | ||
917 | * @param perf_num_send_received_msg | ||
918 | * @return bytes used | ||
919 | */ | ||
671 | static int | 920 | static int |
672 | sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) { | 921 | sum_sent_received_bytes (uint64_t size, struct perf_num_send_received_msg |
673 | return (size * perf_rtt_struct.sent) + | 922 | perf_num_send_received_msg) |
674 | (size * perf_rtt_struct.received) + | 923 | { |
675 | perf_rtt_struct.sent_var_bytes + | 924 | return (size * perf_num_send_received_msg.sent) |
676 | perf_rtt_struct.received_var_bytes; | 925 | + (size * perf_num_send_received_msg.received) |
926 | + perf_num_send_received_msg.sent_var_bytes | ||
927 | + perf_num_send_received_msg.received_var_bytes; | ||
677 | } | 928 | } |
678 | 929 | ||
679 | static float | ||
680 | calculate_perf_rtt() { | ||
681 | /** | ||
682 | * Calculate RTT of init phase normally always 1 | ||
683 | */ | ||
684 | float rtt = 1; | ||
685 | int bytes_transmitted = 0; | ||
686 | 930 | ||
687 | /** | 931 | /** |
688 | * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending | 932 | * Function that calculates the perfmance values and writes them down |
689 | */ | 933 | */ |
690 | if (( perf_rtt.element_full.received != 0 ) || | 934 | static void |
691 | ( perf_rtt.element_full.sent != 0) | 935 | calculate_perf_store () |
692 | ) rtt += 1; | 936 | { |
693 | 937 | ||
694 | if (( perf_rtt.request_full.received != 0 ) || | 938 | /** |
695 | ( perf_rtt.request_full.sent != 0) | 939 | * Calculate RTT of init phase normally always 1 |
696 | ) rtt += 0.5; | 940 | */ |
941 | float rtt = 1; | ||
942 | int bytes_transmitted = 0; | ||
697 | 943 | ||
698 | /** | 944 | /** |
699 | * In case of a differential sync 3 rtt's are needed. | 945 | * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normaly 1 or 1.5 depending |
700 | * for every active/passive switch additional 3.5 rtt's are used | 946 | */ |
701 | */ | 947 | if ((perf_store.element_full.received != 0) || |
948 | (perf_store.element_full.sent != 0) | ||
949 | ) | ||
950 | rtt += 1; | ||
951 | |||
952 | if ((perf_store.request_full.received != 0) || | ||
953 | (perf_store.request_full.sent != 0) | ||
954 | ) | ||
955 | rtt += 0.5; | ||
956 | |||
957 | /** | ||
958 | * In case of a differential sync 3 rtt's are needed. | ||
959 | * for every active/passive switch additional 3.5 rtt's are used | ||
960 | */ | ||
961 | if ((perf_store.element.received != 0) || | ||
962 | (perf_store.element.sent != 0)) | ||
963 | { | ||
964 | int iterations = perf_store.active_passive_switches; | ||
965 | |||
966 | if (iterations > 0) | ||
967 | rtt += iterations * 0.5; | ||
968 | rtt += 2.5; | ||
969 | } | ||
970 | |||
971 | |||
972 | /** | ||
973 | * Calculate data sended size | ||
974 | */ | ||
975 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
976 | GNUNET_SETU_ResultMessage), | ||
977 | perf_store.request_full); | ||
978 | |||
979 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
980 | GNUNET_SETU_ElementMessage), | ||
981 | perf_store.element_full); | ||
982 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
983 | GNUNET_SETU_ElementMessage), | ||
984 | perf_store.element); | ||
985 | // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request); | ||
986 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
987 | StrataEstimatorMessage), | ||
988 | perf_store.se); | ||
989 | bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done); | ||
990 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage), | ||
991 | perf_store.ibf); | ||
992 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage), | ||
993 | perf_store.inquery); | ||
994 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
995 | GNUNET_MessageHeader), | ||
996 | perf_store.demand); | ||
997 | bytes_transmitted += sum_sent_received_bytes (sizeof(struct | ||
998 | GNUNET_MessageHeader), | ||
999 | perf_store.offer); | ||
1000 | bytes_transmitted += sum_sent_received_bytes (4, perf_store.done); | ||
1001 | |||
1002 | /** | ||
1003 | * Write IBF failure rate for different BUCKET_NUMBER_FACTOR | ||
1004 | */ | ||
1005 | float factor; | ||
1006 | GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR", | ||
1007 | &factor); | ||
1008 | long long num_per_bucket; | ||
1009 | GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET", | ||
1010 | &num_per_bucket); | ||
1011 | |||
1012 | |||
1013 | int decoded = 0; | ||
1014 | if (perf_store.active_passive_switches == 0) | ||
1015 | decoded = 1; | ||
1016 | int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct | ||
1017 | IBFMessage), | ||
1018 | perf_store.ibf); | ||
1019 | |||
1020 | FILE *out1 = fopen ("perf_data.csv", "a"); | ||
1021 | fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor, | ||
1022 | decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff, | ||
1023 | bytes_transmitted, | ||
1024 | perf_store.se_diff_local,perf_store.se_diff_remote, | ||
1025 | perf_store.mode_of_operation); | ||
1026 | fclose (out1); | ||
1027 | |||
1028 | } | ||
1029 | |||
1030 | |||
1031 | #endif | ||
1032 | /** | ||
1033 | * Function that chooses the optimal mode of operation depending on | ||
1034 | * operation parameters. | ||
1035 | * @param avg_element_size | ||
1036 | * @param local_set_size | ||
1037 | * @param remote_set_size | ||
1038 | * @param est_set_diff_remote | ||
1039 | * @param est_set_diff_local | ||
1040 | * @param bandwith_latency_tradeoff | ||
1041 | * @param ibf_bucket_number_factor | ||
1042 | * @return calcuated mode of operation | ||
1043 | */ | ||
1044 | static uint8_t | ||
1045 | estimate_best_mode_of_operation (uint64_t avg_element_size, | ||
1046 | uint64_t local_set_size, | ||
1047 | uint64_t remote_set_size, | ||
1048 | uint64_t est_set_diff_remote, | ||
1049 | uint64_t est_set_diff_local, | ||
1050 | uint64_t bandwith_latency_tradeoff, | ||
1051 | uint64_t ibf_bucket_number_factor) | ||
1052 | { | ||
1053 | |||
1054 | /* | ||
1055 | * In case of initial sync fall to predefined states | ||
1056 | */ | ||
1057 | |||
1058 | if (0 == local_set_size) | ||
1059 | return FULL_SYNC_REMOTE_SENDING_FIRST; | ||
1060 | if (0 == remote_set_size) | ||
1061 | return FULL_SYNC_LOCAL_SENDING_FIRST; | ||
1062 | |||
1063 | /* | ||
1064 | * Calculate bytes for full Sync | ||
1065 | */ | ||
1066 | |||
1067 | uint8_t sizeof_full_done_header = 4; | ||
1068 | uint8_t sizeof_done_header = 4; | ||
1069 | uint8_t rtt_min_full = 2; | ||
1070 | uint8_t sizeof_request_full = 4; | ||
1071 | uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local); | ||
1072 | |||
1073 | /* Estimate byte required if we send first */ | ||
1074 | uint64_t total_elements_to_send_local_send_first = est_set_diff_remote | ||
1075 | + local_set_size; | ||
1076 | |||
1077 | uint64_t total_bytes_full_local_send_first = (avg_element_size | ||
1078 | * | ||
1079 | total_elements_to_send_local_send_first) \ | ||
1080 | + ( | ||
1081 | total_elements_to_send_local_send_first * sizeof(struct | ||
1082 | GNUNET_SETU_ElementMessage)) \ | ||
1083 | + (sizeof_full_done_header * 2) \ | ||
1084 | + rtt_min_full | ||
1085 | * bandwith_latency_tradeoff; | ||
1086 | |||
1087 | /* Estimate bytes required if we request from remote peer */ | ||
1088 | uint64_t total_elements_to_send_remote_send_first = est_set_diff_local | ||
1089 | + remote_set_size; | ||
1090 | |||
1091 | uint64_t total_bytes_full_remote_send_first = (avg_element_size | ||
1092 | * | ||
1093 | total_elements_to_send_remote_send_first) \ | ||
1094 | + ( | ||
1095 | total_elements_to_send_remote_send_first * sizeof(struct | ||
1096 | GNUNET_SETU_ElementMessage)) \ | ||
1097 | + (sizeof_full_done_header * 2) \ | ||
1098 | + (rtt_min_full + 0.5) | ||
1099 | * bandwith_latency_tradeoff \ | ||
1100 | + sizeof_request_full; | ||
1101 | |||
1102 | /* | ||
1103 | * Calculate bytes for differential Sync | ||
1104 | */ | ||
1105 | |||
1106 | /* Estimate bytes required by IBF transmission*/ | ||
1107 | |||
1108 | long double ibf_bucket_count = estimated_total_diff | ||
1109 | * ibf_bucket_number_factor; | ||
1110 | |||
1111 | if (ibf_bucket_count <= IBF_MIN_SIZE) | ||
1112 | { | ||
1113 | ibf_bucket_count = IBF_MIN_SIZE; | ||
1114 | } | ||
1115 | uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count) | ||
1116 | / MAX_BUCKETS_PER_MESSAGE); | ||
1117 | |||
1118 | uint64_t estimated_counter_size = ceil ( | ||
1119 | MIN (2 * log2l ((float) local_set_size / ibf_bucket_count), log2l ( | ||
1120 | local_set_size))); | ||
1121 | |||
1122 | long double counter_bytes = (float) estimated_counter_size / 8; | ||
1123 | |||
1124 | uint64_t ibf_bytes = ceil ((sizeof(struct IBFMessage) * ibf_message_count) | ||
1125 | * 1.2 \ | ||
1126 | + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \ | ||
1127 | + (ibf_bucket_count * sizeof(struct IBF_KeyHash)) | ||
1128 | * 1.2 \ | ||
1129 | + (ibf_bucket_count * counter_bytes) * 1.2); | ||
1130 | |||
1131 | /* Estimate full byte count for differential sync */ | ||
1132 | uint64_t element_size = (avg_element_size + sizeof(struct | ||
1133 | GNUNET_SETU_ElementMessage)) \ | ||
1134 | * estimated_total_diff; | ||
1135 | uint64_t done_size = sizeof_done_header; | ||
1136 | uint64_t inquery_size = (sizeof(struct IBF_Key) + sizeof(struct | ||
1137 | InquiryMessage)) | ||
1138 | * estimated_total_diff; | ||
1139 | uint64_t demand_size = | ||
1140 | (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader)) | ||
1141 | * estimated_total_diff; | ||
1142 | uint64_t offer_size = (sizeof(struct GNUNET_HashCode) + sizeof(struct | ||
1143 | GNUNET_MessageHeader)) | ||
1144 | * estimated_total_diff; | ||
1145 | |||
1146 | uint64_t total_bytes_diff = (element_size + done_size + inquery_size | ||
1147 | + demand_size + offer_size + ibf_bytes) \ | ||
1148 | + (DIFFERENTIAL_RTT_MEAN | ||
1149 | * bandwith_latency_tradeoff); | ||
1150 | |||
1151 | uint64_t full_min = MIN (total_bytes_full_local_send_first, | ||
1152 | total_bytes_full_remote_send_first); | ||
1153 | |||
1154 | /* Decide between full and differential sync */ | ||
1155 | |||
1156 | if (full_min < total_bytes_diff) | ||
1157 | { | ||
1158 | /* Decide between sending all element first or receiving all elements */ | ||
1159 | if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first) | ||
1160 | { | ||
1161 | return FULL_SYNC_LOCAL_SENDING_FIRST; | ||
1162 | } | ||
1163 | else | ||
1164 | { | ||
1165 | return FULL_SYNC_REMOTE_SENDING_FIRST; | ||
1166 | } | ||
1167 | } | ||
1168 | else | ||
1169 | { | ||
1170 | return DIFFERENTIAL_SYNC; | ||
1171 | } | ||
1172 | } | ||
1173 | |||
1174 | |||
1175 | /** | ||
1176 | * Validates the if a message is received in a correct phase | ||
1177 | * @param allowed_phases | ||
1178 | * @param size_phases | ||
1179 | * @param op | ||
1180 | * @return GNUNET_YES if message permitted in phase and GNUNET_NO if not permitted in given | ||
1181 | * phase | ||
1182 | */ | ||
1183 | static int | ||
1184 | check_valid_phase (const uint8_t allowed_phases[], size_t size_phases, struct | ||
1185 | Operation *op) | ||
1186 | { | ||
1187 | /** | ||
1188 | * Iterate over allowed phases | ||
1189 | */ | ||
1190 | for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++) | ||
1191 | { | ||
1192 | uint8_t phase = allowed_phases[phase_ctr]; | ||
1193 | if (phase == op->phase) | ||
1194 | { | ||
1195 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1196 | "Message received in valid phase\n"); | ||
1197 | return GNUNET_YES; | ||
1198 | } | ||
1199 | } | ||
1200 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1201 | "Received message in invalid phase: %u\n", op->phase); | ||
1202 | return GNUNET_NO; | ||
1203 | } | ||
1204 | |||
1205 | |||
1206 | /** | ||
1207 | * Function to update, track and validate message received in differential | ||
1208 | * sync. This function tracks states of messages and check it against different | ||
1209 | * constraints as described in Summermatter's BSc Thesis (2021) | ||
1210 | * @param hash_map: Hashmap to store message control flow | ||
1211 | * @param new_mcfs: The new message control flow state an given message type should be set to | ||
1212 | * @param hash_code: Hash code of the element | ||
1213 | * @param mt: The message type for which the message control flow state should be set | ||
1214 | * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid | ||
1215 | * at this point in message flow | ||
1216 | */ | ||
1217 | static int | ||
1218 | update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map, | ||
1219 | enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, | ||
1220 | const struct GNUNET_HashCode *hash_code, | ||
1221 | enum MESSAGE_TYPE mt) | ||
1222 | { | ||
1223 | struct messageControlFlowElement *cfe = NULL; | ||
1224 | enum MESSAGE_CONTROL_FLOW_STATE *mcfs; | ||
1225 | |||
1226 | /** | ||
1227 | * Check logic for forbidden messages | ||
1228 | */ | ||
702 | 1229 | ||
703 | int iterations = perf_rtt.ibf.received; | 1230 | cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code); |
704 | if(iterations > 1) | 1231 | if ((ELEMENT_MESSAGE == mt) && (cfe != NULL)) |
705 | rtt += (iterations - 1 ) * 0.5; | 1232 | { |
706 | rtt += 3 * iterations; | 1233 | if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer)) |
1234 | { | ||
1235 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1236 | "Received an element without sent offer!\n"); | ||
1237 | return GNUNET_NO; | ||
1238 | } | ||
1239 | /* Check that only requested elements are received! */ | ||
1240 | if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand != | ||
1241 | MSG_CFS_SENT)) | ||
1242 | { | ||
1243 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1244 | "Received an element that was not demanded\n"); | ||
1245 | return GNUNET_NO; | ||
1246 | } | ||
1247 | } | ||
1248 | |||
1249 | /** | ||
1250 | * In case the element hash is not in the hashmap create a new entry | ||
1251 | */ | ||
1252 | |||
1253 | if (NULL == cfe) | ||
1254 | { | ||
1255 | cfe = GNUNET_new (struct messageControlFlowElement); | ||
1256 | if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code, | ||
1257 | cfe, | ||
1258 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) | ||
1259 | { | ||
1260 | GNUNET_free (cfe); | ||
1261 | return GNUNET_SYSERR; | ||
1262 | } | ||
1263 | } | ||
1264 | |||
1265 | /** | ||
1266 | * Set state of message | ||
1267 | */ | ||
1268 | |||
1269 | if (OFFER_MESSAGE == mt) | ||
1270 | { | ||
1271 | mcfs = &cfe->offer; | ||
1272 | } | ||
1273 | else if (DEMAND_MESSAGE == mt) | ||
1274 | { | ||
1275 | mcfs = &cfe->demand; | ||
1276 | } | ||
1277 | else if (ELEMENT_MESSAGE == mt) | ||
1278 | { | ||
1279 | mcfs = &cfe->element; | ||
1280 | } | ||
1281 | else | ||
1282 | { | ||
1283 | return GNUNET_SYSERR; | ||
1284 | } | ||
1285 | |||
1286 | /** | ||
1287 | * Check if state is allowed | ||
1288 | */ | ||
1289 | |||
1290 | if (new_mcfs <= *mcfs) | ||
1291 | { | ||
1292 | return GNUNET_NO; | ||
1293 | } | ||
1294 | |||
1295 | *mcfs = new_mcfs; | ||
1296 | return GNUNET_YES; | ||
1297 | } | ||
1298 | |||
1299 | |||
1300 | /** | ||
1301 | * Validate if a message in differential sync si already received before. | ||
1302 | * @param hash_map | ||
1303 | * @param hash_code | ||
1304 | * @param mt | ||
1305 | * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO | ||
1306 | */ | ||
1307 | static int | ||
1308 | is_message_in_message_control_flow (struct | ||
1309 | GNUNET_CONTAINER_MultiHashMap *hash_map, | ||
1310 | struct GNUNET_HashCode *hash_code, | ||
1311 | enum MESSAGE_TYPE mt) | ||
1312 | { | ||
1313 | struct messageControlFlowElement *cfe = NULL; | ||
1314 | enum MESSAGE_CONTROL_FLOW_STATE *mcfs; | ||
1315 | |||
1316 | cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code); | ||
1317 | |||
1318 | /** | ||
1319 | * Set state of message | ||
1320 | */ | ||
1321 | |||
1322 | if (cfe != NULL) | ||
1323 | { | ||
1324 | if (OFFER_MESSAGE == mt) | ||
1325 | { | ||
1326 | mcfs = &cfe->offer; | ||
1327 | } | ||
1328 | else if (DEMAND_MESSAGE == mt) | ||
1329 | { | ||
1330 | mcfs = &cfe->demand; | ||
1331 | } | ||
1332 | else if (ELEMENT_MESSAGE == mt) | ||
1333 | { | ||
1334 | mcfs = &cfe->element; | ||
1335 | } | ||
1336 | else | ||
1337 | { | ||
1338 | return GNUNET_SYSERR; | ||
1339 | } | ||
707 | 1340 | ||
708 | /** | 1341 | /** |
709 | * Calculate data sended size | 1342 | * Evaluate if set is in message |
710 | */ | 1343 | */ |
711 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full); | 1344 | if (*mcfs != MSG_CFS_UNINITIALIZED) |
712 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full); | 1345 | { |
713 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element); | 1346 | return GNUNET_YES; |
714 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request); | 1347 | } |
715 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se); | 1348 | } |
716 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done); | 1349 | return GNUNET_NO; |
717 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf); | 1350 | } |
718 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery); | 1351 | |
719 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand); | 1352 | |
720 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer); | 1353 | /** |
721 | bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done); | 1354 | * Iterator for determining if all demands have been |
1355 | * satisfied | ||
1356 | * | ||
1357 | * @param cls the union operation `struct Operation *` | ||
1358 | * @param key unused | ||
1359 | * @param value the `struct ElementEntry *` to insert | ||
1360 | * into the key-to-element mapping | ||
1361 | * @return #GNUNET_YES (to continue iterating) | ||
1362 | */ | ||
1363 | static int | ||
1364 | determinate_done_message_iterator (void *cls, | ||
1365 | const struct GNUNET_HashCode *key, | ||
1366 | void *value) | ||
1367 | { | ||
1368 | struct messageControlFlowElement *mcfe = value; | ||
1369 | |||
1370 | if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) )) | ||
1371 | { | ||
1372 | return GNUNET_YES; | ||
1373 | } | ||
1374 | return GNUNET_NO; | ||
1375 | } | ||
1376 | |||
1377 | |||
1378 | /** | ||
1379 | * Iterator for determining average size | ||
1380 | * | ||
1381 | * @param cls the union operation `struct Operation *` | ||
1382 | * @param key unused | ||
1383 | * @param value the `struct ElementEntry *` to insert | ||
1384 | * into the key-to-element mapping | ||
1385 | * @return #GNUNET_YES (to continue iterating) | ||
1386 | */ | ||
1387 | static int | ||
1388 | determinate_avg_element_size_iterator (void *cls, | ||
1389 | const struct GNUNET_HashCode *key, | ||
1390 | void *value) | ||
1391 | { | ||
1392 | struct Operation *op = cls; | ||
1393 | struct GNUNET_SETU_Element *element = value; | ||
1394 | op->total_elements_size_local += element->size; | ||
1395 | return GNUNET_YES; | ||
1396 | } | ||
722 | 1397 | ||
723 | LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted); | ||
724 | 1398 | ||
725 | LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt )); | 1399 | /** |
1400 | * Create randomized element hashmap for full sending | ||
1401 | * | ||
1402 | * @param cls the union operation `struct Operation *` | ||
1403 | * @param key unused | ||
1404 | * @param value the `struct ElementEntry *` to insert | ||
1405 | * into the key-to-element mapping | ||
1406 | * @return #GNUNET_YES (to continue iterating) | ||
1407 | */ | ||
1408 | static int | ||
1409 | create_randomized_element_iterator (void *cls, | ||
1410 | const struct GNUNET_HashCode *key, | ||
1411 | void *value) | ||
1412 | { | ||
1413 | struct Operation *op = cls; | ||
726 | 1414 | ||
727 | return rtt; | 1415 | struct GNUNET_HashContext *hashed_key_context = |
1416 | GNUNET_CRYPTO_hash_context_start (); | ||
1417 | struct GNUNET_HashCode new_key; | ||
1418 | |||
1419 | /** | ||
1420 | * Hash element with new salt to randomize hashmap | ||
1421 | */ | ||
1422 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
1423 | &key, | ||
1424 | sizeof(struct IBF_Key)); | ||
1425 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
1426 | &op->set->content->elements_randomized_salt, | ||
1427 | sizeof(uint32_t)); | ||
1428 | GNUNET_CRYPTO_hash_context_finish (hashed_key_context, | ||
1429 | &new_key); | ||
1430 | GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized, | ||
1431 | &new_key,value, | ||
1432 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
1433 | return GNUNET_YES; | ||
728 | } | 1434 | } |
729 | 1435 | ||
730 | 1436 | ||
@@ -808,6 +1514,36 @@ send_client_done (void *cls) | |||
808 | } | 1514 | } |
809 | 1515 | ||
810 | 1516 | ||
1517 | /** | ||
1518 | * Check if all given byzantine parameters are in given boundaries | ||
1519 | * @param op | ||
1520 | * @return indicator if all given byzantine parameters are in given boundaries | ||
1521 | */ | ||
1522 | |||
1523 | static int | ||
1524 | check_byzantine_bounds (struct Operation *op) | ||
1525 | { | ||
1526 | if (op->byzantine != GNUNET_YES) | ||
1527 | return GNUNET_OK; | ||
1528 | |||
1529 | /** | ||
1530 | * Check upper byzantine bounds | ||
1531 | */ | ||
1532 | if (op->remote_element_count + op->remote_set_diff > | ||
1533 | op->byzantine_upper_bound) | ||
1534 | return GNUNET_SYSERR; | ||
1535 | if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound) | ||
1536 | return GNUNET_SYSERR; | ||
1537 | |||
1538 | /** | ||
1539 | * Check lower byzantine bounds | ||
1540 | */ | ||
1541 | if (op->remote_element_count < op->byzantine_lower_bound) | ||
1542 | return GNUNET_SYSERR; | ||
1543 | return GNUNET_OK; | ||
1544 | } | ||
1545 | |||
1546 | |||
811 | /* FIXME: the destroy logic is a mess and should be cleaned up! */ | 1547 | /* FIXME: the destroy logic is a mess and should be cleaned up! */ |
812 | 1548 | ||
813 | /** | 1549 | /** |
@@ -977,6 +1713,101 @@ fail_union_operation (struct Operation *op) | |||
977 | 1713 | ||
978 | 1714 | ||
979 | /** | 1715 | /** |
1716 | * Function that checks if full sync is plausible | ||
1717 | * @param initial_local_elements_in_set | ||
1718 | * @param estimated_set_difference | ||
1719 | * @param repeated_elements | ||
1720 | * @param fresh_elements | ||
1721 | * @param op | ||
1722 | * @return GNUNET_OK if | ||
1723 | */ | ||
1724 | |||
1725 | static void | ||
1726 | full_sync_plausibility_check (struct Operation *op) | ||
1727 | { | ||
1728 | if (GNUNET_YES != op->byzantine) | ||
1729 | return; | ||
1730 | |||
1731 | int security_level_lb = -1 * SECURITY_LEVEL; | ||
1732 | uint64_t duplicates = op->received_fresh - op->received_total; | ||
1733 | |||
1734 | /* | ||
1735 | * Protect full sync from receiving double element when in FULL SENDING | ||
1736 | */ | ||
1737 | if (PHASE_FULL_SENDING == op->phase) | ||
1738 | { | ||
1739 | if (duplicates > 0) | ||
1740 | { | ||
1741 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1742 | "PROTOCOL VIOLATION: Received duplicate element in full receiving " | ||
1743 | "mode of operation this is not allowed! Duplicates: %lu\n", | ||
1744 | duplicates); | ||
1745 | GNUNET_break_op (0); | ||
1746 | fail_union_operation (op); | ||
1747 | return; | ||
1748 | } | ||
1749 | |||
1750 | } | ||
1751 | |||
1752 | /* | ||
1753 | * Protect full sync with probabilistic algorithm | ||
1754 | */ | ||
1755 | if (PHASE_FULL_RECEIVING == op->phase) | ||
1756 | { | ||
1757 | if (0 == op->remote_set_diff) | ||
1758 | op->remote_set_diff = 1; | ||
1759 | |||
1760 | long double base = (1 - (long double) (op->remote_set_diff | ||
1761 | / (long double) (op->initial_size | ||
1762 | + op-> | ||
1763 | remote_set_diff))); | ||
1764 | long double exponent = (op->received_total - (op->received_fresh * ((long | ||
1765 | double) | ||
1766 | op-> | ||
1767 | initial_size | ||
1768 | / (long | ||
1769 | double) | ||
1770 | op-> | ||
1771 | remote_set_diff))); | ||
1772 | long double value = exponent * (log2l (base) / log2l (2)); | ||
1773 | if ((value < security_level_lb) || (value > SECURITY_LEVEL) ) | ||
1774 | { | ||
1775 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1776 | "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving " | ||
1777 | "to many duplicated full element : %LF\n", | ||
1778 | value); | ||
1779 | GNUNET_break_op (0); | ||
1780 | fail_union_operation (op); | ||
1781 | return; | ||
1782 | } | ||
1783 | } | ||
1784 | } | ||
1785 | |||
1786 | |||
1787 | /** | ||
1788 | * Limit active passive switches in differential sync to configured security level | ||
1789 | * @param op | ||
1790 | */ | ||
1791 | static void | ||
1792 | check_max_differential_rounds (struct Operation *op) | ||
1793 | { | ||
1794 | double probability = op->differential_sync_iterations * (log2l ( | ||
1795 | PROBABILITY_FOR_NEW_ROUND) | ||
1796 | / log2l (2)); | ||
1797 | if ((-1 * SECURITY_LEVEL) > probability) | ||
1798 | { | ||
1799 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1800 | "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive " | ||
1801 | "switches in differential sync: %u\n", | ||
1802 | op->differential_sync_iterations); | ||
1803 | GNUNET_break_op (0); | ||
1804 | fail_union_operation (op); | ||
1805 | return; | ||
1806 | } | ||
1807 | } | ||
1808 | |||
1809 | |||
1810 | /** | ||
980 | * Derive the IBF key from a hash code and | 1811 | * Derive the IBF key from a hash code and |
981 | * a salt. | 1812 | * a salt. |
982 | * | 1813 | * |
@@ -1004,12 +1835,12 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
1004 | struct GetElementContext | 1835 | struct GetElementContext |
1005 | { | 1836 | { |
1006 | /** | 1837 | /** |
1007 | * FIXME. | 1838 | * Gnunet hash code in context |
1008 | */ | 1839 | */ |
1009 | struct GNUNET_HashCode hash; | 1840 | struct GNUNET_HashCode hash; |
1010 | 1841 | ||
1011 | /** | 1842 | /** |
1012 | * FIXME. | 1843 | * Pointer to the key enty |
1013 | */ | 1844 | */ |
1014 | struct KeyEntry *k; | 1845 | struct KeyEntry *k; |
1015 | }; | 1846 | }; |
@@ -1122,7 +1953,7 @@ salt_key (const struct IBF_Key *k_in, | |||
1122 | uint32_t salt, | 1953 | uint32_t salt, |
1123 | struct IBF_Key *k_out) | 1954 | struct IBF_Key *k_out) |
1124 | { | 1955 | { |
1125 | int s = salt % 64; | 1956 | int s = (salt * 7) % 64; |
1126 | uint64_t x = k_in->key_val; | 1957 | uint64_t x = k_in->key_val; |
1127 | 1958 | ||
1128 | /* rotate ibf key */ | 1959 | /* rotate ibf key */ |
@@ -1132,14 +1963,14 @@ salt_key (const struct IBF_Key *k_in, | |||
1132 | 1963 | ||
1133 | 1964 | ||
1134 | /** | 1965 | /** |
1135 | * FIXME. | 1966 | * Reverse modification done in the salt_key function |
1136 | */ | 1967 | */ |
1137 | static void | 1968 | static void |
1138 | unsalt_key (const struct IBF_Key *k_in, | 1969 | unsalt_key (const struct IBF_Key *k_in, |
1139 | uint32_t salt, | 1970 | uint32_t salt, |
1140 | struct IBF_Key *k_out) | 1971 | struct IBF_Key *k_out) |
1141 | { | 1972 | { |
1142 | int s = salt % 64; | 1973 | int s = (salt * 7) % 64; |
1143 | uint64_t x = k_in->key_val; | 1974 | uint64_t x = k_in->key_val; |
1144 | 1975 | ||
1145 | x = (x << s) | (x >> (64 - s)); | 1976 | x = (x << s) | (x >> (64 - s)); |
@@ -1258,7 +2089,9 @@ prepare_ibf (struct Operation *op, | |||
1258 | 2089 | ||
1259 | if (NULL != op->local_ibf) | 2090 | if (NULL != op->local_ibf) |
1260 | ibf_destroy (op->local_ibf); | 2091 | ibf_destroy (op->local_ibf); |
1261 | op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 2092 | // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
2093 | op->local_ibf = ibf_create (size, | ||
2094 | ((uint8_t) op->ibf_number_buckets_per_element)); | ||
1262 | if (NULL == op->local_ibf) | 2095 | if (NULL == op->local_ibf) |
1263 | { | 2096 | { |
1264 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2097 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -1283,13 +2116,23 @@ prepare_ibf (struct Operation *op, | |||
1283 | */ | 2116 | */ |
1284 | static int | 2117 | static int |
1285 | send_ibf (struct Operation *op, | 2118 | send_ibf (struct Operation *op, |
1286 | uint16_t ibf_order) | 2119 | uint32_t ibf_size) |
1287 | { | 2120 | { |
1288 | unsigned int buckets_sent = 0; | 2121 | uint64_t buckets_sent = 0; |
1289 | struct InvertibleBloomFilter *ibf; | 2122 | struct InvertibleBloomFilter *ibf; |
2123 | op->differential_sync_iterations++; | ||
2124 | |||
2125 | /** | ||
2126 | * Enforce min size of IBF | ||
2127 | */ | ||
2128 | uint32_t ibf_min_size = IBF_MIN_SIZE; | ||
1290 | 2129 | ||
2130 | if (ibf_size < ibf_min_size) | ||
2131 | { | ||
2132 | ibf_size = ibf_min_size; | ||
2133 | } | ||
1291 | if (GNUNET_OK != | 2134 | if (GNUNET_OK != |
1292 | prepare_ibf (op, 1 << ibf_order)) | 2135 | prepare_ibf (op, ibf_size)) |
1293 | { | 2136 | { |
1294 | /* allocation failed */ | 2137 | /* allocation failed */ |
1295 | return GNUNET_SYSERR; | 2138 | return GNUNET_SYSERR; |
@@ -1297,45 +2140,48 @@ send_ibf (struct Operation *op, | |||
1297 | 2140 | ||
1298 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2141 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1299 | "sending ibf of size %u\n", | 2142 | "sending ibf of size %u\n", |
1300 | 1 << ibf_order); | 2143 | 1 << ibf_size); |
1301 | 2144 | ||
1302 | { | 2145 | { |
1303 | char name[64]; | 2146 | char name[64]; |
1304 | GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); | 2147 | GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_size); |
1305 | GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); | 2148 | GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); |
1306 | } | 2149 | } |
1307 | 2150 | ||
1308 | ibf = op->local_ibf; | 2151 | ibf = op->local_ibf; |
1309 | 2152 | ||
1310 | while (buckets_sent < (1 << ibf_order)) | 2153 | while (buckets_sent < ibf_size) |
1311 | { | 2154 | { |
1312 | unsigned int buckets_in_message; | 2155 | unsigned int buckets_in_message; |
1313 | struct GNUNET_MQ_Envelope *ev; | 2156 | struct GNUNET_MQ_Envelope *ev; |
1314 | struct IBFMessage *msg; | 2157 | struct IBFMessage *msg; |
1315 | 2158 | ||
1316 | buckets_in_message = (1 << ibf_order) - buckets_sent; | 2159 | buckets_in_message = ibf_size - buckets_sent; |
1317 | /* limit to maximum */ | 2160 | /* limit to maximum */ |
1318 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | 2161 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) |
1319 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | 2162 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; |
1320 | 2163 | ||
1321 | perf_rtt.ibf.sent += 1; | 2164 | #if MEASURE_PERFORMANCE |
1322 | perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE ); | 2165 | perf_store.ibf.sent += 1; |
2166 | perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE); | ||
2167 | #endif | ||
1323 | ev = GNUNET_MQ_msg_extra (msg, | 2168 | ev = GNUNET_MQ_msg_extra (msg, |
1324 | buckets_in_message * IBF_BUCKET_SIZE, | 2169 | buckets_in_message * IBF_BUCKET_SIZE, |
1325 | GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); | 2170 | GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); |
1326 | msg->reserved1 = 0; | 2171 | msg->ibf_size = ibf_size; |
1327 | msg->reserved2 = 0; | ||
1328 | msg->order = ibf_order; | ||
1329 | msg->offset = htonl (buckets_sent); | 2172 | msg->offset = htonl (buckets_sent); |
1330 | msg->salt = htonl (op->salt_send); | 2173 | msg->salt = htonl (op->salt_send); |
2174 | msg->ibf_counter_bit_length = ibf_get_max_counter (ibf); | ||
2175 | |||
2176 | |||
1331 | ibf_write_slice (ibf, buckets_sent, | 2177 | ibf_write_slice (ibf, buckets_sent, |
1332 | buckets_in_message, &msg[1]); | 2178 | buckets_in_message, &msg[1], msg->ibf_counter_bit_length); |
1333 | buckets_sent += buckets_in_message; | 2179 | buckets_sent += buckets_in_message; |
1334 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2180 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1335 | "ibf chunk size %u, %u/%u sent\n", | 2181 | "ibf chunk size %u, %lu/%u sent\n", |
1336 | buckets_in_message, | 2182 | buckets_in_message, |
1337 | buckets_sent, | 2183 | buckets_sent, |
1338 | 1 << ibf_order); | 2184 | ibf_size); |
1339 | GNUNET_MQ_send (op->mq, ev); | 2185 | GNUNET_MQ_send (op->mq, ev); |
1340 | } | 2186 | } |
1341 | 2187 | ||
@@ -1354,17 +2200,26 @@ send_ibf (struct Operation *op, | |||
1354 | * @return the required size of the ibf | 2200 | * @return the required size of the ibf |
1355 | */ | 2201 | */ |
1356 | static unsigned int | 2202 | static unsigned int |
1357 | get_order_from_difference (unsigned int diff) | 2203 | get_size_from_difference (unsigned int diff, int number_buckets_per_element, |
2204 | float ibf_bucket_number_factor) | ||
1358 | { | 2205 | { |
1359 | unsigned int ibf_order; | 2206 | /** Make ibf estimation size odd reasoning can be found in BSc Thesis of |
2207 | * Elias Summermatter (2021) in section 3.11 **/ | ||
2208 | return (((int) (diff * ibf_bucket_number_factor)) | 1); | ||
1360 | 2209 | ||
1361 | ibf_order = 2; | 2210 | } |
1362 | while (((1 << ibf_order) < (IBF_ALPHA * diff) || | 2211 | |
1363 | ((1 << ibf_order) < SE_IBF_HASH_NUM)) && | 2212 | |
1364 | (ibf_order < MAX_IBF_ORDER)) | 2213 | static unsigned int |
1365 | ibf_order++; | 2214 | get_next_ibf_size (float ibf_bucket_number_factor, unsigned int |
1366 | // add one for correction | 2215 | decoded_elements, unsigned int last_ibf_size) |
1367 | return ibf_order + 1; | 2216 | { |
2217 | unsigned int next_size = (unsigned int) ((last_ibf_size * 2) | ||
2218 | - (ibf_bucket_number_factor | ||
2219 | * decoded_elements)); | ||
2220 | /** Make ibf estimation size odd reasoning can be found in BSc Thesis of | ||
2221 | * Elias Summermatter (2021) in section 3.11 **/ | ||
2222 | return next_size | 1; | ||
1368 | } | 2223 | } |
1369 | 2224 | ||
1370 | 2225 | ||
@@ -1391,8 +2246,10 @@ send_full_element_iterator (void *cls, | |||
1391 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2246 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1392 | "Sending element %s\n", | 2247 | "Sending element %s\n", |
1393 | GNUNET_h2s (key)); | 2248 | GNUNET_h2s (key)); |
1394 | perf_rtt.element_full.received += 1; | 2249 | #if MEASURE_PERFORMANCE |
1395 | perf_rtt.element_full.received_var_bytes += el->size; | 2250 | perf_store.element_full.received += 1; |
2251 | perf_store.element_full.received_var_bytes += el->size; | ||
2252 | #endif | ||
1396 | ev = GNUNET_MQ_msg_extra (emsg, | 2253 | ev = GNUNET_MQ_msg_extra (emsg, |
1397 | el->size, | 2254 | el->size, |
1398 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); | 2255 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); |
@@ -1421,10 +2278,25 @@ send_full_set (struct Operation *op) | |||
1421 | "Dedicing to transmit the full set\n"); | 2278 | "Dedicing to transmit the full set\n"); |
1422 | /* FIXME: use a more memory-friendly way of doing this with an | 2279 | /* FIXME: use a more memory-friendly way of doing this with an |
1423 | iterator, just as we do in the non-full case! */ | 2280 | iterator, just as we do in the non-full case! */ |
2281 | |||
2282 | // Randomize Elements to send | ||
2283 | op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create ( | ||
2284 | 32,GNUNET_NO); | ||
2285 | op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 ( | ||
2286 | GNUNET_CRYPTO_QUALITY_NONCE, | ||
2287 | UINT64_MAX); | ||
1424 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | 2288 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, |
1425 | &send_full_element_iterator, | 2289 | & |
2290 | create_randomized_element_iterator, | ||
1426 | op); | 2291 | op); |
1427 | perf_rtt.full_done.sent += 1; | 2292 | |
2293 | (void) GNUNET_CONTAINER_multihashmap_iterate ( | ||
2294 | op->set->content->elements_randomized, | ||
2295 | &send_full_element_iterator, | ||
2296 | op); | ||
2297 | #if MEASURE_PERFORMANCE | ||
2298 | perf_store.full_done.sent += 1; | ||
2299 | #endif | ||
1428 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); | 2300 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); |
1429 | GNUNET_MQ_send (op->mq, | 2301 | GNUNET_MQ_send (op->mq, |
1430 | ev); | 2302 | ev); |
@@ -1454,7 +2326,7 @@ check_union_p2p_strata_estimator (void *cls, | |||
1454 | msg->header.type)); | 2326 | msg->header.type)); |
1455 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); | 2327 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); |
1456 | if ((GNUNET_NO == is_compressed) && | 2328 | if ((GNUNET_NO == is_compressed) && |
1457 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) | 2329 | (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE)) |
1458 | { | 2330 | { |
1459 | GNUNET_break (0); | 2331 | GNUNET_break (0); |
1460 | return GNUNET_SYSERR; | 2332 | return GNUNET_SYSERR; |
@@ -1473,14 +2345,44 @@ static void | |||
1473 | handle_union_p2p_strata_estimator (void *cls, | 2345 | handle_union_p2p_strata_estimator (void *cls, |
1474 | const struct StrataEstimatorMessage *msg) | 2346 | const struct StrataEstimatorMessage *msg) |
1475 | { | 2347 | { |
1476 | perf_rtt.se.received += 1; | 2348 | #if MEASURE_PERFORMANCE |
1477 | perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); | 2349 | perf_store.se.received += 1; |
2350 | perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct | ||
2351 | StrataEstimatorMessage); | ||
2352 | #endif | ||
1478 | struct Operation *op = cls; | 2353 | struct Operation *op = cls; |
1479 | struct StrataEstimator *remote_se; | 2354 | struct MultiStrataEstimator *remote_se; |
1480 | unsigned int diff; | 2355 | unsigned int diff; |
1481 | uint64_t other_size; | 2356 | uint64_t other_size; |
1482 | size_t len; | 2357 | size_t len; |
1483 | int is_compressed; | 2358 | int is_compressed; |
2359 | op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( | ||
2360 | op->set->content->elements); | ||
2361 | // Setting peer site to receiving peer | ||
2362 | op->peer_site = 1; | ||
2363 | |||
2364 | /** | ||
2365 | * Check that the message is received only in supported phase | ||
2366 | */ | ||
2367 | uint8_t allowed_phases[] = {PHASE_EXPECT_SE}; | ||
2368 | if (GNUNET_OK != | ||
2369 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
2370 | { | ||
2371 | GNUNET_break (0); | ||
2372 | fail_union_operation (op); | ||
2373 | return; | ||
2374 | } | ||
2375 | |||
2376 | /** Only allow 1,2,4,8 SEs **/ | ||
2377 | if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1)) | ||
2378 | { | ||
2379 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2380 | "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n", | ||
2381 | msg->se_count); | ||
2382 | GNUNET_break_op (0); | ||
2383 | fail_union_operation (op); | ||
2384 | return; | ||
2385 | } | ||
1484 | 2386 | ||
1485 | is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( | 2387 | is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( |
1486 | msg->header.type)); | 2388 | msg->header.type)); |
@@ -1490,8 +2392,20 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1490 | GNUNET_NO); | 2392 | GNUNET_NO); |
1491 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); | 2393 | len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); |
1492 | other_size = GNUNET_ntohll (msg->set_size); | 2394 | other_size = GNUNET_ntohll (msg->set_size); |
2395 | op->remote_element_count = other_size; | ||
2396 | |||
2397 | if (op->byzantine_upper_bound < op->remote_element_count) | ||
2398 | { | ||
2399 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2400 | "Exceeded configured upper bound <%lu> of element: %u\n", | ||
2401 | op->byzantine_upper_bound, | ||
2402 | op->remote_element_count); | ||
2403 | fail_union_operation (op); | ||
2404 | return; | ||
2405 | } | ||
2406 | |||
1493 | remote_se = strata_estimator_create (SE_STRATA_COUNT, | 2407 | remote_se = strata_estimator_create (SE_STRATA_COUNT, |
1494 | SE_IBF_SIZE, | 2408 | SE_IBFS_TOTAL_SIZE, |
1495 | SE_IBF_HASH_NUM); | 2409 | SE_IBF_HASH_NUM); |
1496 | if (NULL == remote_se) | 2410 | if (NULL == remote_se) |
1497 | { | 2411 | { |
@@ -1503,6 +2417,8 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1503 | strata_estimator_read (&msg[1], | 2417 | strata_estimator_read (&msg[1], |
1504 | len, | 2418 | len, |
1505 | is_compressed, | 2419 | is_compressed, |
2420 | msg->se_count, | ||
2421 | SE_IBFS_TOTAL_SIZE, | ||
1506 | remote_se)) | 2422 | remote_se)) |
1507 | { | 2423 | { |
1508 | /* decompression failed */ | 2424 | /* decompression failed */ |
@@ -1511,11 +2427,76 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1511 | return; | 2427 | return; |
1512 | } | 2428 | } |
1513 | GNUNET_assert (NULL != op->se); | 2429 | GNUNET_assert (NULL != op->se); |
1514 | diff = strata_estimator_difference (remote_se, | 2430 | strata_estimator_difference (remote_se, |
1515 | op->se); | 2431 | op->se); |
2432 | |||
2433 | /* Calculate remote local diff */ | ||
2434 | long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count; | ||
2435 | long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count; | ||
2436 | |||
2437 | /* Prevent estimations from overshooting max element */ | ||
2438 | if (diff_remote + op->remote_element_count > op->byzantine_upper_bound) | ||
2439 | diff_remote = op->byzantine_upper_bound - op->remote_element_count; | ||
2440 | if (diff_local + op->local_element_count > op->byzantine_upper_bound) | ||
2441 | diff_local = op->byzantine_upper_bound - op->local_element_count; | ||
2442 | if ((diff_remote < 0) || (diff_local < 0)) | ||
2443 | { | ||
2444 | strata_estimator_destroy (remote_se); | ||
2445 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2446 | "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is " | ||
2447 | "malicious: remote diff %ld, local diff: %ld\n", | ||
2448 | diff_remote, diff_local); | ||
2449 | GNUNET_break_op (0); | ||
2450 | fail_union_operation (op); | ||
2451 | return; | ||
2452 | } | ||
1516 | 2453 | ||
1517 | if (diff > 200) | 2454 | /* Make estimation more precise in initial sync cases */ |
1518 | diff = diff * 3 / 2; | 2455 | if (0 == op->remote_element_count) |
2456 | { | ||
2457 | diff_remote = 0; | ||
2458 | diff_local = op->local_element_count; | ||
2459 | } | ||
2460 | if (0 == op->local_element_count) | ||
2461 | { | ||
2462 | diff_local = 0; | ||
2463 | diff_remote = op->remote_element_count; | ||
2464 | } | ||
2465 | |||
2466 | diff = diff_remote + diff_local; | ||
2467 | op->remote_set_diff = diff_remote; | ||
2468 | |||
2469 | /** Calculate avg element size if not initial sync **/ | ||
2470 | uint64_t avg_element_size = 0; | ||
2471 | if (0 < op->local_element_count) | ||
2472 | { | ||
2473 | op->total_elements_size_local = 0; | ||
2474 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
2475 | & | ||
2476 | determinate_avg_element_size_iterator, | ||
2477 | op); | ||
2478 | avg_element_size = op->total_elements_size_local / op->local_element_count; | ||
2479 | } | ||
2480 | |||
2481 | op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size, | ||
2482 | GNUNET_CONTAINER_multihashmap_size ( | ||
2483 | op->set->content-> | ||
2484 | elements), | ||
2485 | op-> | ||
2486 | remote_element_count, | ||
2487 | diff_remote, | ||
2488 | diff_local, | ||
2489 | op-> | ||
2490 | rtt_bandwidth_tradeoff, | ||
2491 | op-> | ||
2492 | ibf_bucket_number_factor); | ||
2493 | |||
2494 | #if MEASURE_PERFORMANCE | ||
2495 | perf_store.se_diff_local = diff_local; | ||
2496 | perf_store.se_diff_remote = diff_remote; | ||
2497 | perf_store.se_diff = diff; | ||
2498 | perf_store.mode_of_operation = op->mode_of_operation; | ||
2499 | #endif | ||
1519 | 2500 | ||
1520 | strata_estimator_destroy (remote_se); | 2501 | strata_estimator_destroy (remote_se); |
1521 | strata_estimator_destroy (op->se); | 2502 | strata_estimator_destroy (op->se); |
@@ -1523,7 +2504,8 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1523 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2504 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1524 | "got se diff=%d, using ibf size %d\n", | 2505 | "got se diff=%d, using ibf size %d\n", |
1525 | diff, | 2506 | diff, |
1526 | 1U << get_order_from_difference (diff)); | 2507 | 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element, |
2508 | op->ibf_bucket_number_factor)); | ||
1527 | 2509 | ||
1528 | { | 2510 | { |
1529 | char *set_debug; | 2511 | char *set_debug; |
@@ -1546,16 +2528,8 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1546 | return; | 2528 | return; |
1547 | } | 2529 | } |
1548 | 2530 | ||
1549 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1550 | "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff); | ||
1551 | |||
1552 | |||
1553 | /** | ||
1554 | * Added rtt_bandwidth_tradeoff directly need future improvements | ||
1555 | */ | ||
1556 | if ((GNUNET_YES == op->force_full) || | 2531 | if ((GNUNET_YES == op->force_full) || |
1557 | (diff > op->initial_size / 4) || | 2532 | (op->mode_of_operation != DIFFERENTIAL_SYNC)) |
1558 | (0 == other_size)) | ||
1559 | { | 2533 | { |
1560 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2534 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1561 | "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", | 2535 | "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", |
@@ -1565,9 +2539,17 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1565 | "# of full sends", | 2539 | "# of full sends", |
1566 | 1, | 2540 | 1, |
1567 | GNUNET_NO); | 2541 | GNUNET_NO); |
1568 | if ((op->initial_size <= other_size) || | 2542 | if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation) |
1569 | (0 == other_size)) | ||
1570 | { | 2543 | { |
2544 | struct TransmitFullMessage *signal_msg; | ||
2545 | struct GNUNET_MQ_Envelope *ev; | ||
2546 | ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage), | ||
2547 | GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL); | ||
2548 | signal_msg->remote_set_difference = htonl (diff_local); | ||
2549 | signal_msg->remote_set_size = htonl (op->local_element_count); | ||
2550 | signal_msg->local_set_difference = htonl (diff_remote); | ||
2551 | GNUNET_MQ_send (op->mq, | ||
2552 | ev); | ||
1571 | send_full_set (op); | 2553 | send_full_set (op); |
1572 | } | 2554 | } |
1573 | else | 2555 | else |
@@ -1577,9 +2559,15 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1577 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2559 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1578 | "Telling other peer that we expect its full set\n"); | 2560 | "Telling other peer that we expect its full set\n"); |
1579 | op->phase = PHASE_FULL_RECEIVING; | 2561 | op->phase = PHASE_FULL_RECEIVING; |
1580 | perf_rtt.request_full.sent += 1; | 2562 | #if MEASURE_PERFORMANCE |
1581 | ev = GNUNET_MQ_msg_header ( | 2563 | perf_store.request_full.sent += 1; |
1582 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); | 2564 | #endif |
2565 | struct TransmitFullMessage *signal_msg; | ||
2566 | ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage), | ||
2567 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); | ||
2568 | signal_msg->remote_set_difference = htonl (diff_local); | ||
2569 | signal_msg->remote_set_size = htonl (op->local_element_count); | ||
2570 | signal_msg->local_set_difference = htonl (diff_remote); | ||
1583 | GNUNET_MQ_send (op->mq, | 2571 | GNUNET_MQ_send (op->mq, |
1584 | ev); | 2572 | ev); |
1585 | } | 2573 | } |
@@ -1592,7 +2580,9 @@ handle_union_p2p_strata_estimator (void *cls, | |||
1592 | GNUNET_NO); | 2580 | GNUNET_NO); |
1593 | if (GNUNET_OK != | 2581 | if (GNUNET_OK != |
1594 | send_ibf (op, | 2582 | send_ibf (op, |
1595 | get_order_from_difference (diff))) | 2583 | get_size_from_difference (diff, |
2584 | op->ibf_number_buckets_per_element, | ||
2585 | op->ibf_bucket_number_factor))) | ||
1596 | { | 2586 | { |
1597 | /* Internal error, best we can do is shut the connection */ | 2587 | /* Internal error, best we can do is shut the connection */ |
1598 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2588 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -1625,15 +2615,64 @@ send_offers_iterator (void *cls, | |||
1625 | 2615 | ||
1626 | /* Detect 32-bit key collision for the 64-bit IBF keys. */ | 2616 | /* Detect 32-bit key collision for the 64-bit IBF keys. */ |
1627 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) | 2617 | if (ke->ibf_key.key_val != sec->ibf_key.key_val) |
2618 | { | ||
2619 | op->active_passive_switch_required = true; | ||
1628 | return GNUNET_YES; | 2620 | return GNUNET_YES; |
2621 | } | ||
1629 | 2622 | ||
1630 | perf_rtt.offer.sent += 1; | 2623 | /* Prevent implementation from sending a offer multiple times in case of roll switch */ |
1631 | perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); | 2624 | if (GNUNET_YES == |
2625 | is_message_in_message_control_flow ( | ||
2626 | op->message_control_flow, | ||
2627 | &ke->element->element_hash, | ||
2628 | OFFER_MESSAGE) | ||
2629 | ) | ||
2630 | { | ||
2631 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2632 | "Skipping already sent processed element offer!\n"); | ||
2633 | return GNUNET_YES; | ||
2634 | } | ||
1632 | 2635 | ||
2636 | /* Save send offer message for message control */ | ||
2637 | if (GNUNET_YES != | ||
2638 | update_message_control_flow ( | ||
2639 | op->message_control_flow, | ||
2640 | MSG_CFS_SENT, | ||
2641 | &ke->element->element_hash, | ||
2642 | OFFER_MESSAGE) | ||
2643 | ) | ||
2644 | { | ||
2645 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2646 | "Double offer message sent found!\n"); | ||
2647 | GNUNET_break (0); | ||
2648 | fail_union_operation (op); | ||
2649 | return GNUNET_NO; | ||
2650 | } | ||
2651 | ; | ||
2652 | |||
2653 | /* Mark element to be expected to received */ | ||
2654 | if (GNUNET_YES != | ||
2655 | update_message_control_flow ( | ||
2656 | op->message_control_flow, | ||
2657 | MSG_CFS_EXPECTED, | ||
2658 | &ke->element->element_hash, | ||
2659 | DEMAND_MESSAGE) | ||
2660 | ) | ||
2661 | { | ||
2662 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2663 | "Double demand received found!\n"); | ||
2664 | GNUNET_break (0); | ||
2665 | fail_union_operation (op); | ||
2666 | return GNUNET_NO; | ||
2667 | } | ||
2668 | ; | ||
2669 | #if MEASURE_PERFORMANCE | ||
2670 | perf_store.offer.sent += 1; | ||
2671 | perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); | ||
2672 | #endif | ||
1633 | ev = GNUNET_MQ_msg_header_extra (mh, | 2673 | ev = GNUNET_MQ_msg_header_extra (mh, |
1634 | sizeof(struct GNUNET_HashCode), | 2674 | sizeof(struct GNUNET_HashCode), |
1635 | GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); | 2675 | GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); |
1636 | |||
1637 | GNUNET_assert (NULL != ev); | 2676 | GNUNET_assert (NULL != ev); |
1638 | *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; | 2677 | *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; |
1639 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2678 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1651,7 +2690,7 @@ send_offers_iterator (void *cls, | |||
1651 | * @param op union operation | 2690 | * @param op union operation |
1652 | * @param ibf_key IBF key of interest | 2691 | * @param ibf_key IBF key of interest |
1653 | */ | 2692 | */ |
1654 | static void | 2693 | void |
1655 | send_offers_for_key (struct Operation *op, | 2694 | send_offers_for_key (struct Operation *op, |
1656 | struct IBF_Key ibf_key) | 2695 | struct IBF_Key ibf_key) |
1657 | { | 2696 | { |
@@ -1694,6 +2733,7 @@ decode_and_send (struct Operation *op) | |||
1694 | /* allocation failed */ | 2733 | /* allocation failed */ |
1695 | return GNUNET_SYSERR; | 2734 | return GNUNET_SYSERR; |
1696 | } | 2735 | } |
2736 | |||
1697 | diff_ibf = ibf_dup (op->local_ibf); | 2737 | diff_ibf = ibf_dup (op->local_ibf); |
1698 | ibf_subtract (diff_ibf, | 2738 | ibf_subtract (diff_ibf, |
1699 | op->remote_ibf); | 2739 | op->remote_ibf); |
@@ -1706,7 +2746,7 @@ decode_and_send (struct Operation *op) | |||
1706 | diff_ibf->size); | 2746 | diff_ibf->size); |
1707 | 2747 | ||
1708 | num_decoded = 0; | 2748 | num_decoded = 0; |
1709 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ | 2749 | key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ |
1710 | 2750 | ||
1711 | while (1) | 2751 | while (1) |
1712 | { | 2752 | { |
@@ -1738,23 +2778,36 @@ decode_and_send (struct Operation *op) | |||
1738 | if ((GNUNET_SYSERR == res) || | 2778 | if ((GNUNET_SYSERR == res) || |
1739 | (GNUNET_YES == cycle_detected)) | 2779 | (GNUNET_YES == cycle_detected)) |
1740 | { | 2780 | { |
1741 | int next_order; | 2781 | uint32_t next_size; |
1742 | next_order = 0; | 2782 | /** Enforce odd ibf size **/ |
1743 | while (1 << next_order < diff_ibf->size) | 2783 | |
1744 | next_order++; | 2784 | next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded, |
1745 | next_order++; | 2785 | diff_ibf->size); |
1746 | if (next_order <= MAX_IBF_ORDER) | 2786 | /** Make ibf estimation size odd reasoning can be found in BSc Thesis of |
2787 | * Elias Summermatter (2021) in section 3.11 **/ | ||
2788 | uint32_t ibf_min_size = IBF_MIN_SIZE | 1; | ||
2789 | |||
2790 | if (next_size<ibf_min_size) | ||
2791 | next_size = ibf_min_size; | ||
2792 | |||
2793 | |||
2794 | if (next_size <= MAX_IBF_SIZE) | ||
1747 | { | 2795 | { |
1748 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2796 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1749 | "decoding failed, sending larger ibf (size %u)\n", | 2797 | "decoding failed, sending larger ibf (size %u)\n", |
1750 | 1 << next_order); | 2798 | next_size); |
1751 | GNUNET_STATISTICS_update (_GSS_statistics, | 2799 | GNUNET_STATISTICS_update (_GSS_statistics, |
1752 | "# of IBF retries", | 2800 | "# of IBF retries", |
1753 | 1, | 2801 | 1, |
1754 | GNUNET_NO); | 2802 | GNUNET_NO); |
1755 | op->salt_send++; | 2803 | #if MEASURE_PERFORMANCE |
2804 | perf_store.active_passive_switches += 1; | ||
2805 | #endif | ||
2806 | |||
2807 | op->salt_send = op->salt_receive++; | ||
2808 | |||
1756 | if (GNUNET_OK != | 2809 | if (GNUNET_OK != |
1757 | send_ibf (op, next_order)) | 2810 | send_ibf (op, next_size)) |
1758 | { | 2811 | { |
1759 | /* Internal error, best we can do is shut the connection */ | 2812 | /* Internal error, best we can do is shut the connection */ |
1760 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2813 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -1786,7 +2839,9 @@ decode_and_send (struct Operation *op) | |||
1786 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2839 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1787 | "transmitted all values, sending DONE\n"); | 2840 | "transmitted all values, sending DONE\n"); |
1788 | 2841 | ||
1789 | perf_rtt.done.sent += 1; | 2842 | #if MEASURE_PERFORMANCE |
2843 | perf_store.done.sent += 1; | ||
2844 | #endif | ||
1790 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); | 2845 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); |
1791 | GNUNET_MQ_send (op->mq, ev); | 2846 | GNUNET_MQ_send (op->mq, ev); |
1792 | /* We now wait until we get a DONE message back | 2847 | /* We now wait until we get a DONE message back |
@@ -1797,7 +2852,6 @@ decode_and_send (struct Operation *op) | |||
1797 | if (1 == side) | 2852 | if (1 == side) |
1798 | { | 2853 | { |
1799 | struct IBF_Key unsalted_key; | 2854 | struct IBF_Key unsalted_key; |
1800 | |||
1801 | unsalt_key (&key, | 2855 | unsalt_key (&key, |
1802 | op->salt_receive, | 2856 | op->salt_receive, |
1803 | &unsalted_key); | 2857 | &unsalted_key); |
@@ -1809,8 +2863,29 @@ decode_and_send (struct Operation *op) | |||
1809 | struct GNUNET_MQ_Envelope *ev; | 2863 | struct GNUNET_MQ_Envelope *ev; |
1810 | struct InquiryMessage *msg; | 2864 | struct InquiryMessage *msg; |
1811 | 2865 | ||
1812 | perf_rtt.inquery.sent += 1; | 2866 | #if MEASURE_PERFORMANCE |
1813 | perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key); | 2867 | perf_store.inquery.sent += 1; |
2868 | perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key); | ||
2869 | #endif | ||
2870 | |||
2871 | /** Add sent inquiries to hashmap for flow control **/ | ||
2872 | struct GNUNET_HashContext *hashed_key_context = | ||
2873 | GNUNET_CRYPTO_hash_context_start (); | ||
2874 | struct GNUNET_HashCode *hashed_key = (struct | ||
2875 | GNUNET_HashCode*) GNUNET_malloc ( | ||
2876 | sizeof(struct GNUNET_HashCode)); | ||
2877 | enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT; | ||
2878 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
2879 | &key, | ||
2880 | sizeof(struct IBF_Key)); | ||
2881 | GNUNET_CRYPTO_hash_context_finish (hashed_key_context, | ||
2882 | hashed_key); | ||
2883 | GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent, | ||
2884 | hashed_key, | ||
2885 | &mcfs, | ||
2886 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE | ||
2887 | ); | ||
2888 | |||
1814 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth | 2889 | /* It may be nice to merge multiple requests, but with CADET's corking it is not worth |
1815 | * the effort additional complexity. */ | 2890 | * the effort additional complexity. */ |
1816 | ev = GNUNET_MQ_msg_extra (msg, | 2891 | ev = GNUNET_MQ_msg_extra (msg, |
@@ -1836,6 +2911,100 @@ decode_and_send (struct Operation *op) | |||
1836 | 2911 | ||
1837 | 2912 | ||
1838 | /** | 2913 | /** |
2914 | * Check send full message received from other peer | ||
2915 | * @param cls | ||
2916 | * @param msg | ||
2917 | * @return | ||
2918 | */ | ||
2919 | |||
2920 | static int | ||
2921 | check_union_p2p_send_full (void *cls, | ||
2922 | const struct TransmitFullMessage *msg) | ||
2923 | { | ||
2924 | return GNUNET_OK; | ||
2925 | } | ||
2926 | |||
2927 | |||
2928 | /** | ||
2929 | * Handle send full message received from other peer | ||
2930 | * | ||
2931 | * @param cls | ||
2932 | * @param msg | ||
2933 | */ | ||
2934 | static void | ||
2935 | handle_union_p2p_send_full (void *cls, | ||
2936 | const struct TransmitFullMessage *msg) | ||
2937 | { | ||
2938 | struct Operation *op = cls; | ||
2939 | |||
2940 | /** | ||
2941 | * Check that the message is received only in supported phase | ||
2942 | */ | ||
2943 | uint8_t allowed_phases[] = {PHASE_EXPECT_IBF}; | ||
2944 | if (GNUNET_OK != | ||
2945 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
2946 | { | ||
2947 | GNUNET_break (0); | ||
2948 | fail_union_operation (op); | ||
2949 | return; | ||
2950 | } | ||
2951 | |||
2952 | /** write received values to operator**/ | ||
2953 | op->remote_element_count = ntohl (msg->remote_set_size); | ||
2954 | op->remote_set_diff = ntohl (msg->remote_set_difference); | ||
2955 | op->local_set_diff = ntohl (msg->local_set_difference); | ||
2956 | |||
2957 | /** Check byzantine limits **/ | ||
2958 | if (check_byzantine_bounds (op) != GNUNET_OK) | ||
2959 | { | ||
2960 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2961 | "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine " | ||
2962 | "criteria\n"); | ||
2963 | GNUNET_break_op (0); | ||
2964 | fail_union_operation (op); | ||
2965 | return; | ||
2966 | } | ||
2967 | |||
2968 | /** Calculate avg element size if not initial sync **/ | ||
2969 | op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( | ||
2970 | op->set->content->elements); | ||
2971 | uint64_t avg_element_size = 0; | ||
2972 | if (0 < op->local_element_count) | ||
2973 | { | ||
2974 | op->total_elements_size_local = 0; | ||
2975 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
2976 | & | ||
2977 | determinate_avg_element_size_iterator, | ||
2978 | op); | ||
2979 | avg_element_size = op->total_elements_size_local / op->local_element_count; | ||
2980 | } | ||
2981 | |||
2982 | /** Validate mode of operation **/ | ||
2983 | int mode_of_operation = estimate_best_mode_of_operation (avg_element_size, | ||
2984 | op-> | ||
2985 | remote_element_count, | ||
2986 | op-> | ||
2987 | local_element_count, | ||
2988 | op->local_set_diff, | ||
2989 | op->remote_set_diff, | ||
2990 | op-> | ||
2991 | rtt_bandwidth_tradeoff, | ||
2992 | op-> | ||
2993 | ibf_bucket_number_factor); | ||
2994 | if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation) | ||
2995 | { | ||
2996 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
2997 | "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been" | ||
2998 | " : %d\n", mode_of_operation); | ||
2999 | GNUNET_break_op (0); | ||
3000 | fail_union_operation (op); | ||
3001 | return; | ||
3002 | } | ||
3003 | op->phase = PHASE_FULL_RECEIVING; | ||
3004 | } | ||
3005 | |||
3006 | |||
3007 | /** | ||
1839 | * Check an IBF message from a remote peer. | 3008 | * Check an IBF message from a remote peer. |
1840 | * | 3009 | * |
1841 | * Reassemble the IBF from multiple pieces, and | 3010 | * Reassemble the IBF from multiple pieces, and |
@@ -1872,7 +3041,8 @@ check_union_p2p_ibf (void *cls, | |||
1872 | GNUNET_break_op (0); | 3041 | GNUNET_break_op (0); |
1873 | return GNUNET_SYSERR; | 3042 | return GNUNET_SYSERR; |
1874 | } | 3043 | } |
1875 | if (1 << msg->order != op->remote_ibf->size) | 3044 | |
3045 | if (msg->ibf_size != op->remote_ibf->size) | ||
1876 | { | 3046 | { |
1877 | GNUNET_break_op (0); | 3047 | GNUNET_break_op (0); |
1878 | return GNUNET_SYSERR; | 3048 | return GNUNET_SYSERR; |
@@ -1909,9 +3079,26 @@ handle_union_p2p_ibf (void *cls, | |||
1909 | { | 3079 | { |
1910 | struct Operation *op = cls; | 3080 | struct Operation *op = cls; |
1911 | unsigned int buckets_in_message; | 3081 | unsigned int buckets_in_message; |
3082 | /** | ||
3083 | * Check that the message is received only in supported phase | ||
3084 | */ | ||
3085 | uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST, | ||
3086 | PHASE_PASSIVE_DECODING}; | ||
3087 | if (GNUNET_OK != | ||
3088 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3089 | { | ||
3090 | GNUNET_break (0); | ||
3091 | fail_union_operation (op); | ||
3092 | return; | ||
3093 | } | ||
3094 | op->differential_sync_iterations++; | ||
3095 | check_max_differential_rounds (op); | ||
3096 | op->active_passive_switch_required = false; | ||
1912 | 3097 | ||
1913 | perf_rtt.ibf.received += 1; | 3098 | #if MEASURE_PERFORMANCE |
1914 | perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); | 3099 | perf_store.ibf.received += 1; |
3100 | perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); | ||
3101 | #endif | ||
1915 | 3102 | ||
1916 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) | 3103 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) |
1917 | / IBF_BUCKET_SIZE; | 3104 | / IBF_BUCKET_SIZE; |
@@ -1922,8 +3109,10 @@ handle_union_p2p_ibf (void *cls, | |||
1922 | GNUNET_assert (NULL == op->remote_ibf); | 3109 | GNUNET_assert (NULL == op->remote_ibf); |
1923 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3110 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1924 | "Creating new ibf of size %u\n", | 3111 | "Creating new ibf of size %u\n", |
1925 | 1 << msg->order); | 3112 | ntohl (msg->ibf_size)); |
1926 | op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); | 3113 | // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); |
3114 | op->remote_ibf = ibf_create (msg->ibf_size, | ||
3115 | ((uint8_t) op->ibf_number_buckets_per_element)); | ||
1927 | op->salt_receive = ntohl (msg->salt); | 3116 | op->salt_receive = ntohl (msg->salt); |
1928 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3117 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1929 | "Receiving new IBF with salt %u\n", | 3118 | "Receiving new IBF with salt %u\n", |
@@ -1954,7 +3143,7 @@ handle_union_p2p_ibf (void *cls, | |||
1954 | ibf_read_slice (&msg[1], | 3143 | ibf_read_slice (&msg[1], |
1955 | op->ibf_buckets_received, | 3144 | op->ibf_buckets_received, |
1956 | buckets_in_message, | 3145 | buckets_in_message, |
1957 | op->remote_ibf); | 3146 | op->remote_ibf, msg->ibf_counter_bit_length); |
1958 | op->ibf_buckets_received += buckets_in_message; | 3147 | op->ibf_buckets_received += buckets_in_message; |
1959 | 3148 | ||
1960 | if (op->ibf_buckets_received == op->remote_ibf->size) | 3149 | if (op->ibf_buckets_received == op->remote_ibf->size) |
@@ -2030,18 +3219,24 @@ maybe_finish (struct Operation *op) | |||
2030 | 3219 | ||
2031 | num_demanded = GNUNET_CONTAINER_multihashmap_size ( | 3220 | num_demanded = GNUNET_CONTAINER_multihashmap_size ( |
2032 | op->demanded_hashes); | 3221 | op->demanded_hashes); |
2033 | 3222 | int send_done = GNUNET_CONTAINER_multihashmap_iterate ( | |
3223 | op->message_control_flow, | ||
3224 | & | ||
3225 | determinate_done_message_iterator, | ||
3226 | op); | ||
2034 | if (PHASE_FINISH_WAITING == op->phase) | 3227 | if (PHASE_FINISH_WAITING == op->phase) |
2035 | { | 3228 | { |
2036 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3229 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2037 | "In PHASE_FINISH_WAITING, pending %u demands\n", | 3230 | "In PHASE_FINISH_WAITING, pending %u demands -> %d\n", |
2038 | num_demanded); | 3231 | num_demanded, op->peer_site); |
2039 | if (0 == num_demanded) | 3232 | if (-1 != send_done) |
2040 | { | 3233 | { |
2041 | struct GNUNET_MQ_Envelope *ev; | 3234 | struct GNUNET_MQ_Envelope *ev; |
2042 | 3235 | ||
2043 | op->phase = PHASE_FINISHED; | 3236 | op->phase = PHASE_FINISHED; |
2044 | perf_rtt.done.sent += 1; | 3237 | #if MEASURE_PERFORMANCE |
3238 | perf_store.done.sent += 1; | ||
3239 | #endif | ||
2045 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); | 3240 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); |
2046 | GNUNET_MQ_send (op->mq, | 3241 | GNUNET_MQ_send (op->mq, |
2047 | ev); | 3242 | ev); |
@@ -2052,9 +3247,9 @@ maybe_finish (struct Operation *op) | |||
2052 | if (PHASE_FINISH_CLOSING == op->phase) | 3247 | if (PHASE_FINISH_CLOSING == op->phase) |
2053 | { | 3248 | { |
2054 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3249 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2055 | "In PHASE_FINISH_CLOSING, pending %u demands\n", | 3250 | "In PHASE_FINISH_CLOSING, pending %u demands %d\n", |
2056 | num_demanded); | 3251 | num_demanded, op->peer_site); |
2057 | if (0 == num_demanded) | 3252 | if (-1 != send_done) |
2058 | { | 3253 | { |
2059 | op->phase = PHASE_FINISHED; | 3254 | op->phase = PHASE_FINISHED; |
2060 | send_client_done (op); | 3255 | send_client_done (op); |
@@ -2102,11 +3297,25 @@ handle_union_p2p_elements (void *cls, | |||
2102 | struct KeyEntry *ke; | 3297 | struct KeyEntry *ke; |
2103 | uint16_t element_size; | 3298 | uint16_t element_size; |
2104 | 3299 | ||
3300 | /** | ||
3301 | * Check that the message is received only in supported phase | ||
3302 | */ | ||
3303 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING, | ||
3304 | PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING}; | ||
3305 | if (GNUNET_OK != | ||
3306 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3307 | { | ||
3308 | GNUNET_break (0); | ||
3309 | fail_union_operation (op); | ||
3310 | return; | ||
3311 | } | ||
2105 | 3312 | ||
2106 | element_size = ntohs (emsg->header.size) - sizeof(struct | 3313 | element_size = ntohs (emsg->header.size) - sizeof(struct |
2107 | GNUNET_SETU_ElementMessage); | 3314 | GNUNET_SETU_ElementMessage); |
2108 | perf_rtt.element.received += 1; | 3315 | #if MEASURE_PERFORMANCE |
2109 | perf_rtt.element.received_var_bytes += element_size; | 3316 | perf_store.element.received += 1; |
3317 | perf_store.element.received_var_bytes += element_size; | ||
3318 | #endif | ||
2110 | 3319 | ||
2111 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); | 3320 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); |
2112 | GNUNET_memcpy (&ee[1], | 3321 | GNUNET_memcpy (&ee[1], |
@@ -2129,6 +3338,21 @@ handle_union_p2p_elements (void *cls, | |||
2129 | return; | 3338 | return; |
2130 | } | 3339 | } |
2131 | 3340 | ||
3341 | if (GNUNET_OK != | ||
3342 | update_message_control_flow ( | ||
3343 | op->message_control_flow, | ||
3344 | MSG_CFS_RECEIVED, | ||
3345 | &ee->element_hash, | ||
3346 | ELEMENT_MESSAGE) | ||
3347 | ) | ||
3348 | { | ||
3349 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3350 | "An element has been received more than once!\n"); | ||
3351 | GNUNET_break (0); | ||
3352 | fail_union_operation (op); | ||
3353 | return; | ||
3354 | } | ||
3355 | |||
2132 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3356 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2133 | "Got element (size %u, hash %s) from peer\n", | 3357 | "Got element (size %u, hash %s) from peer\n", |
2134 | (unsigned int) element_size, | 3358 | (unsigned int) element_size, |
@@ -2217,33 +3441,25 @@ handle_union_p2p_full_element (void *cls, | |||
2217 | struct KeyEntry *ke; | 3441 | struct KeyEntry *ke; |
2218 | uint16_t element_size; | 3442 | uint16_t element_size; |
2219 | 3443 | ||
2220 | 3444 | /** | |
2221 | 3445 | * Check that the message is received only in supported phase | |
2222 | if(PHASE_EXPECT_IBF == op->phase) { | 3446 | */ |
2223 | op->phase = PHASE_FULL_RECEIVING; | 3447 | uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING}; |
2224 | } | 3448 | if (GNUNET_OK != |
2225 | 3449 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | |
2226 | |||
2227 | |||
2228 | /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */ | ||
2229 | if ((PHASE_FULL_RECEIVING != op->phase) && | ||
2230 | (PHASE_FULL_SENDING != op->phase)) | ||
2231 | { | 3450 | { |
2232 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 3451 | GNUNET_break (0); |
2233 | "Handle full element phase is %u\n", | 3452 | fail_union_operation (op); |
2234 | (unsigned) op->phase); | 3453 | return; |
2235 | GNUNET_break_op (0); | ||
2236 | fail_union_operation (op); | ||
2237 | return; | ||
2238 | } | 3454 | } |
2239 | 3455 | ||
2240 | |||
2241 | |||
2242 | element_size = ntohs (emsg->header.size) | 3456 | element_size = ntohs (emsg->header.size) |
2243 | - sizeof(struct GNUNET_SETU_ElementMessage); | 3457 | - sizeof(struct GNUNET_SETU_ElementMessage); |
2244 | 3458 | ||
2245 | perf_rtt.element_full.received += 1; | 3459 | #if MEASURE_PERFORMANCE |
2246 | perf_rtt.element_full.received_var_bytes += element_size; | 3460 | perf_store.element_full.received += 1; |
3461 | perf_store.element_full.received_var_bytes += element_size; | ||
3462 | #endif | ||
2247 | 3463 | ||
2248 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); | 3464 | ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); |
2249 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | 3465 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); |
@@ -2268,17 +3484,15 @@ handle_union_p2p_full_element (void *cls, | |||
2268 | GNUNET_NO); | 3484 | GNUNET_NO); |
2269 | 3485 | ||
2270 | op->received_total++; | 3486 | op->received_total++; |
2271 | |||
2272 | ke = op_get_element (op, | 3487 | ke = op_get_element (op, |
2273 | &ee->element_hash); | 3488 | &ee->element_hash); |
2274 | if (NULL != ke) | 3489 | if (NULL != ke) |
2275 | { | 3490 | { |
2276 | /* Got repeated element. Should not happen since | ||
2277 | * we track demands. */ | ||
2278 | GNUNET_STATISTICS_update (_GSS_statistics, | 3491 | GNUNET_STATISTICS_update (_GSS_statistics, |
2279 | "# repeated elements", | 3492 | "# repeated elements", |
2280 | 1, | 3493 | 1, |
2281 | GNUNET_NO); | 3494 | GNUNET_NO); |
3495 | full_sync_plausibility_check (op); | ||
2282 | ke->received = GNUNET_YES; | 3496 | ke->received = GNUNET_YES; |
2283 | GNUNET_free (ee); | 3497 | GNUNET_free (ee); |
2284 | } | 3498 | } |
@@ -2294,15 +3508,15 @@ handle_union_p2p_full_element (void *cls, | |||
2294 | GNUNET_SETU_STATUS_ADD_LOCAL); | 3508 | GNUNET_SETU_STATUS_ADD_LOCAL); |
2295 | } | 3509 | } |
2296 | 3510 | ||
3511 | |||
2297 | if ((GNUNET_YES == op->byzantine) && | 3512 | if ((GNUNET_YES == op->byzantine) && |
2298 | (op->received_total > 384 + op->received_fresh * 4) && | 3513 | (op->received_total > op->remote_element_count) ) |
2299 | (op->received_fresh < op->received_total / 6)) | ||
2300 | { | 3514 | { |
2301 | /* The other peer gave us lots of old elements, there's something wrong. */ | 3515 | /* The other peer gave us lots of old elements, there's something wrong. */ |
2302 | LOG (GNUNET_ERROR_TYPE_ERROR, | 3516 | LOG (GNUNET_ERROR_TYPE_ERROR, |
2303 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", | 3517 | "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n", |
2304 | (unsigned long long) op->received_fresh, | 3518 | (unsigned long long) op->received_total, |
2305 | (unsigned long long) op->received_total); | 3519 | (unsigned long long) op->remote_element_count); |
2306 | GNUNET_break_op (0); | 3520 | GNUNET_break_op (0); |
2307 | fail_union_operation (op); | 3521 | fail_union_operation (op); |
2308 | return; | 3522 | return; |
@@ -2356,18 +3570,50 @@ handle_union_p2p_inquiry (void *cls, | |||
2356 | const struct IBF_Key *ibf_key; | 3570 | const struct IBF_Key *ibf_key; |
2357 | unsigned int num_keys; | 3571 | unsigned int num_keys; |
2358 | 3572 | ||
2359 | perf_rtt.inquery.received += 1; | 3573 | /** |
2360 | perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage)); | 3574 | * Check that the message is received only in supported phase |
3575 | */ | ||
3576 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; | ||
3577 | if (GNUNET_OK != | ||
3578 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3579 | { | ||
3580 | GNUNET_break (0); | ||
3581 | fail_union_operation (op); | ||
3582 | return; | ||
3583 | } | ||
3584 | |||
3585 | #if MEASURE_PERFORMANCE | ||
3586 | perf_store.inquery.received += 1; | ||
3587 | perf_store.inquery.received_var_bytes += (ntohs (msg->header.size) | ||
3588 | - sizeof(struct InquiryMessage)); | ||
3589 | #endif | ||
2361 | 3590 | ||
2362 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3591 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2363 | "Received union inquiry\n"); | 3592 | "Received union inquiry\n"); |
2364 | num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) | 3593 | num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) |
2365 | / sizeof(struct IBF_Key); | 3594 | / sizeof(struct IBF_Key); |
2366 | ibf_key = (const struct IBF_Key *) &msg[1]; | 3595 | ibf_key = (const struct IBF_Key *) &msg[1]; |
3596 | |||
3597 | /** Add received inquiries to hashmap for flow control **/ | ||
3598 | struct GNUNET_HashContext *hashed_key_context = | ||
3599 | GNUNET_CRYPTO_hash_context_start (); | ||
3600 | struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc ( | ||
3601 | sizeof(struct GNUNET_HashCode));; | ||
3602 | enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED; | ||
3603 | GNUNET_CRYPTO_hash_context_read (hashed_key_context, | ||
3604 | &ibf_key, | ||
3605 | sizeof(struct IBF_Key)); | ||
3606 | GNUNET_CRYPTO_hash_context_finish (hashed_key_context, | ||
3607 | hashed_key); | ||
3608 | GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent, | ||
3609 | hashed_key, | ||
3610 | &mcfs, | ||
3611 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE | ||
3612 | ); | ||
3613 | |||
2367 | while (0 != num_keys--) | 3614 | while (0 != num_keys--) |
2368 | { | 3615 | { |
2369 | struct IBF_Key unsalted_key; | 3616 | struct IBF_Key unsalted_key; |
2370 | |||
2371 | unsalt_key (ibf_key, | 3617 | unsalt_key (ibf_key, |
2372 | ntohl (msg->salt), | 3618 | ntohl (msg->salt), |
2373 | &unsalted_key); | 3619 | &unsalted_key); |
@@ -2402,7 +3648,9 @@ send_missing_full_elements_iter (void *cls, | |||
2402 | 3648 | ||
2403 | if (GNUNET_YES == ke->received) | 3649 | if (GNUNET_YES == ke->received) |
2404 | return GNUNET_YES; | 3650 | return GNUNET_YES; |
2405 | perf_rtt.element_full.received += 1; | 3651 | #if MEASURE_PERFORMANCE |
3652 | perf_store.element_full.received += 1; | ||
3653 | #endif | ||
2406 | ev = GNUNET_MQ_msg_extra (emsg, | 3654 | ev = GNUNET_MQ_msg_extra (emsg, |
2407 | ee->element.size, | 3655 | ee->element.size, |
2408 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); | 3656 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); |
@@ -2422,18 +3670,84 @@ send_missing_full_elements_iter (void *cls, | |||
2422 | * @param cls closure, a set union operation | 3670 | * @param cls closure, a set union operation |
2423 | * @param mh the demand message | 3671 | * @param mh the demand message |
2424 | */ | 3672 | */ |
3673 | static int | ||
3674 | check_union_p2p_request_full (void *cls, | ||
3675 | const struct TransmitFullMessage *mh) | ||
3676 | { | ||
3677 | return GNUNET_OK; | ||
3678 | } | ||
3679 | |||
3680 | |||
2425 | static void | 3681 | static void |
2426 | handle_union_p2p_request_full (void *cls, | 3682 | handle_union_p2p_request_full (void *cls, |
2427 | const struct GNUNET_MessageHeader *mh) | 3683 | const struct TransmitFullMessage *msg) |
2428 | { | 3684 | { |
2429 | struct Operation *op = cls; | 3685 | struct Operation *op = cls; |
2430 | 3686 | ||
2431 | perf_rtt.request_full.received += 1; | 3687 | /** |
3688 | * Check that the message is received only in supported phase | ||
3689 | */ | ||
3690 | uint8_t allowed_phases[] = {PHASE_EXPECT_IBF}; | ||
3691 | if (GNUNET_OK != | ||
3692 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3693 | { | ||
3694 | GNUNET_break (0); | ||
3695 | fail_union_operation (op); | ||
3696 | return; | ||
3697 | } | ||
2432 | 3698 | ||
2433 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3699 | op->remote_element_count = ntohl (msg->remote_set_size); |
3700 | op->remote_set_diff = ntohl (msg->remote_set_difference); | ||
3701 | op->local_set_diff = ntohl (msg->local_set_difference); | ||
3702 | |||
3703 | |||
3704 | if (check_byzantine_bounds (op) != GNUNET_OK) | ||
3705 | { | ||
3706 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3707 | "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine " | ||
3708 | "criteria\n"); | ||
3709 | GNUNET_break_op (0); | ||
3710 | fail_union_operation (op); | ||
3711 | return; | ||
3712 | } | ||
3713 | |||
3714 | #if MEASURE_PERFORMANCE | ||
3715 | perf_store.request_full.received += 1; | ||
3716 | #endif | ||
3717 | |||
3718 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2434 | "Received request for full set transmission\n"); | 3719 | "Received request for full set transmission\n"); |
2435 | if (PHASE_EXPECT_IBF != op->phase) | 3720 | |
3721 | /** Calculate avg element size if not initial sync **/ | ||
3722 | op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( | ||
3723 | op->set->content->elements); | ||
3724 | uint64_t avg_element_size = 0; | ||
3725 | if (0 < op->local_element_count) | ||
3726 | { | ||
3727 | op->total_elements_size_local = 0; | ||
3728 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
3729 | & | ||
3730 | determinate_avg_element_size_iterator, | ||
3731 | op); | ||
3732 | avg_element_size = op->total_elements_size_local / op->local_element_count; | ||
3733 | } | ||
3734 | |||
3735 | int mode_of_operation = estimate_best_mode_of_operation (avg_element_size, | ||
3736 | op-> | ||
3737 | remote_element_count, | ||
3738 | op-> | ||
3739 | local_element_count, | ||
3740 | op->local_set_diff, | ||
3741 | op->remote_set_diff, | ||
3742 | op-> | ||
3743 | rtt_bandwidth_tradeoff, | ||
3744 | op-> | ||
3745 | ibf_bucket_number_factor); | ||
3746 | if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation) | ||
2436 | { | 3747 | { |
3748 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3749 | "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been" | ||
3750 | " : %d\n", mode_of_operation); | ||
2437 | GNUNET_break_op (0); | 3751 | GNUNET_break_op (0); |
2438 | fail_union_operation (op); | 3752 | fail_union_operation (op); |
2439 | return; | 3753 | return; |
@@ -2458,7 +3772,21 @@ handle_union_p2p_full_done (void *cls, | |||
2458 | { | 3772 | { |
2459 | struct Operation *op = cls; | 3773 | struct Operation *op = cls; |
2460 | 3774 | ||
2461 | perf_rtt.full_done.received += 1; | 3775 | /** |
3776 | * Check that the message is received only in supported phase | ||
3777 | */ | ||
3778 | uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING}; | ||
3779 | if (GNUNET_OK != | ||
3780 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3781 | { | ||
3782 | GNUNET_break (0); | ||
3783 | fail_union_operation (op); | ||
3784 | return; | ||
3785 | } | ||
3786 | |||
3787 | #if MEASURE_PERFORMANCE | ||
3788 | perf_store.full_done.received += 1; | ||
3789 | #endif | ||
2462 | 3790 | ||
2463 | switch (op->phase) | 3791 | switch (op->phase) |
2464 | { | 3792 | { |
@@ -2466,6 +3794,19 @@ handle_union_p2p_full_done (void *cls, | |||
2466 | { | 3794 | { |
2467 | struct GNUNET_MQ_Envelope *ev; | 3795 | struct GNUNET_MQ_Envelope *ev; |
2468 | 3796 | ||
3797 | if ((GNUNET_YES == op->byzantine) && | ||
3798 | (op->received_total != op->remote_element_count) ) | ||
3799 | { | ||
3800 | /* The other peer gave not enough elements before sending full done, there's something wrong. */ | ||
3801 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3802 | "Other peer sent only %llu/%llu fresh elements, failing operation\n", | ||
3803 | (unsigned long long) op->received_total, | ||
3804 | (unsigned long long) op->remote_element_count); | ||
3805 | GNUNET_break_op (0); | ||
3806 | fail_union_operation (op); | ||
3807 | return; | ||
3808 | } | ||
3809 | |||
2469 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 3810 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2470 | "got FULL DONE, sending elements that other peer is missing\n"); | 3811 | "got FULL DONE, sending elements that other peer is missing\n"); |
2471 | 3812 | ||
@@ -2473,7 +3814,9 @@ handle_union_p2p_full_done (void *cls, | |||
2473 | GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, | 3814 | GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, |
2474 | &send_missing_full_elements_iter, | 3815 | &send_missing_full_elements_iter, |
2475 | op); | 3816 | op); |
2476 | perf_rtt.full_done.sent += 1; | 3817 | #if MEASURE_PERFORMANCE |
3818 | perf_store.full_done.sent += 1; | ||
3819 | #endif | ||
2477 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); | 3820 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); |
2478 | GNUNET_MQ_send (op->mq, | 3821 | GNUNET_MQ_send (op->mq, |
2479 | ev); | 3822 | ev); |
@@ -2552,8 +3895,23 @@ handle_union_p2p_demand (void *cls, | |||
2552 | unsigned int num_hashes; | 3895 | unsigned int num_hashes; |
2553 | struct GNUNET_MQ_Envelope *ev; | 3896 | struct GNUNET_MQ_Envelope *ev; |
2554 | 3897 | ||
2555 | perf_rtt.demand.received += 1; | 3898 | /** |
2556 | perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); | 3899 | * Check that the message is received only in supported phase |
3900 | */ | ||
3901 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING, | ||
3902 | PHASE_FINISH_WAITING}; | ||
3903 | if (GNUNET_OK != | ||
3904 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
3905 | { | ||
3906 | GNUNET_break (0); | ||
3907 | fail_union_operation (op); | ||
3908 | return; | ||
3909 | } | ||
3910 | #if MEASURE_PERFORMANCE | ||
3911 | perf_store.demand.received += 1; | ||
3912 | perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct | ||
3913 | GNUNET_MessageHeader)); | ||
3914 | #endif | ||
2557 | 3915 | ||
2558 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) | 3916 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2559 | / sizeof(struct GNUNET_HashCode); | 3917 | / sizeof(struct GNUNET_HashCode); |
@@ -2570,6 +3928,39 @@ handle_union_p2p_demand (void *cls, | |||
2570 | fail_union_operation (op); | 3928 | fail_union_operation (op); |
2571 | return; | 3929 | return; |
2572 | } | 3930 | } |
3931 | |||
3932 | /* Save send demand message for message control */ | ||
3933 | if (GNUNET_YES != | ||
3934 | update_message_control_flow ( | ||
3935 | op->message_control_flow, | ||
3936 | MSG_CFS_RECEIVED, | ||
3937 | &ee->element_hash, | ||
3938 | DEMAND_MESSAGE) | ||
3939 | ) | ||
3940 | { | ||
3941 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3942 | "Double demand message received found!\n"); | ||
3943 | GNUNET_break (0); | ||
3944 | fail_union_operation (op); | ||
3945 | return; | ||
3946 | } | ||
3947 | ; | ||
3948 | |||
3949 | /* Mark element to be expected to received */ | ||
3950 | if (GNUNET_YES != | ||
3951 | update_message_control_flow ( | ||
3952 | op->message_control_flow, | ||
3953 | MSG_CFS_SENT, | ||
3954 | &ee->element_hash, | ||
3955 | ELEMENT_MESSAGE) | ||
3956 | ) | ||
3957 | { | ||
3958 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
3959 | "Double element message sent found!\n"); | ||
3960 | GNUNET_break (0); | ||
3961 | fail_union_operation (op); | ||
3962 | return; | ||
3963 | } | ||
2573 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) | 3964 | if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) |
2574 | { | 3965 | { |
2575 | /* Probably confused lazily copied sets. */ | 3966 | /* Probably confused lazily copied sets. */ |
@@ -2577,8 +3968,10 @@ handle_union_p2p_demand (void *cls, | |||
2577 | fail_union_operation (op); | 3968 | fail_union_operation (op); |
2578 | return; | 3969 | return; |
2579 | } | 3970 | } |
2580 | perf_rtt.element.sent += 1; | 3971 | #if MEASURE_PERFORMANCE |
2581 | perf_rtt.element.sent_var_bytes += ee->element.size; | 3972 | perf_store.element.sent += 1; |
3973 | perf_store.element.sent_var_bytes += ee->element.size; | ||
3974 | #endif | ||
2582 | ev = GNUNET_MQ_msg_extra (emsg, | 3975 | ev = GNUNET_MQ_msg_extra (emsg, |
2583 | ee->element.size, | 3976 | ee->element.size, |
2584 | GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); | 3977 | GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); |
@@ -2600,9 +3993,10 @@ handle_union_p2p_demand (void *cls, | |||
2600 | if (op->symmetric) | 3993 | if (op->symmetric) |
2601 | send_client_element (op, | 3994 | send_client_element (op, |
2602 | &ee->element, | 3995 | &ee->element, |
2603 | GNUNET_SET_STATUS_ADD_REMOTE); | 3996 | GNUNET_SETU_STATUS_ADD_REMOTE); |
2604 | } | 3997 | } |
2605 | GNUNET_CADET_receive_done (op->channel); | 3998 | GNUNET_CADET_receive_done (op->channel); |
3999 | maybe_finish (op); | ||
2606 | } | 4000 | } |
2607 | 4001 | ||
2608 | 4002 | ||
@@ -2653,9 +4047,23 @@ handle_union_p2p_offer (void *cls, | |||
2653 | struct Operation *op = cls; | 4047 | struct Operation *op = cls; |
2654 | const struct GNUNET_HashCode *hash; | 4048 | const struct GNUNET_HashCode *hash; |
2655 | unsigned int num_hashes; | 4049 | unsigned int num_hashes; |
4050 | /** | ||
4051 | * Check that the message is received only in supported phase | ||
4052 | */ | ||
4053 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; | ||
4054 | if (GNUNET_OK != | ||
4055 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
4056 | { | ||
4057 | GNUNET_break (0); | ||
4058 | fail_union_operation (op); | ||
4059 | return; | ||
4060 | } | ||
2656 | 4061 | ||
2657 | perf_rtt.offer.received += 1; | 4062 | #if MEASURE_PERFORMANCE |
2658 | perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); | 4063 | perf_store.offer.received += 1; |
4064 | perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct | ||
4065 | GNUNET_MessageHeader)); | ||
4066 | #endif | ||
2659 | 4067 | ||
2660 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) | 4068 | num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) |
2661 | / sizeof(struct GNUNET_HashCode); | 4069 | / sizeof(struct GNUNET_HashCode); |
@@ -2693,11 +4101,68 @@ handle_union_p2p_offer (void *cls, | |||
2693 | "[OP %p] Requesting element (hash %s)\n", | 4101 | "[OP %p] Requesting element (hash %s)\n", |
2694 | op, GNUNET_h2s (hash)); | 4102 | op, GNUNET_h2s (hash)); |
2695 | 4103 | ||
2696 | perf_rtt.demand.sent += 1; | 4104 | #if MEASURE_PERFORMANCE |
2697 | perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); | 4105 | perf_store.demand.sent += 1; |
4106 | perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); | ||
4107 | #endif | ||
2698 | ev = GNUNET_MQ_msg_header_extra (demands, | 4108 | ev = GNUNET_MQ_msg_header_extra (demands, |
2699 | sizeof(struct GNUNET_HashCode), | 4109 | sizeof(struct GNUNET_HashCode), |
2700 | GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); | 4110 | GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); |
4111 | /* Save send demand message for message control */ | ||
4112 | if (GNUNET_YES != | ||
4113 | update_message_control_flow ( | ||
4114 | op->message_control_flow, | ||
4115 | MSG_CFS_SENT, | ||
4116 | hash, | ||
4117 | DEMAND_MESSAGE) | ||
4118 | ) | ||
4119 | { | ||
4120 | // GNUNET_free (ev); | ||
4121 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4122 | "Double demand message sent found!\n"); | ||
4123 | GNUNET_break (0); | ||
4124 | fail_union_operation (op); | ||
4125 | return; | ||
4126 | } | ||
4127 | ; | ||
4128 | |||
4129 | /* Mark offer as received received */ | ||
4130 | if (GNUNET_YES != | ||
4131 | update_message_control_flow ( | ||
4132 | op->message_control_flow, | ||
4133 | MSG_CFS_RECEIVED, | ||
4134 | hash, | ||
4135 | OFFER_MESSAGE) | ||
4136 | ) | ||
4137 | { | ||
4138 | // GNUNET_free (ev); | ||
4139 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4140 | "Double offer message received found!\n"); | ||
4141 | GNUNET_break (0); | ||
4142 | fail_union_operation (op); | ||
4143 | return; | ||
4144 | } | ||
4145 | ; | ||
4146 | |||
4147 | /* Mark element to be expected to received */ | ||
4148 | if (GNUNET_YES != | ||
4149 | update_message_control_flow ( | ||
4150 | op->message_control_flow, | ||
4151 | MSG_CFS_EXPECTED, | ||
4152 | hash, | ||
4153 | ELEMENT_MESSAGE) | ||
4154 | ) | ||
4155 | { | ||
4156 | // GNUNET_free (ev); | ||
4157 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4158 | "Element already expected!\n"); | ||
4159 | GNUNET_break (0); | ||
4160 | fail_union_operation (op); | ||
4161 | return; | ||
4162 | } | ||
4163 | ; | ||
4164 | |||
4165 | |||
2701 | GNUNET_memcpy (&demands[1], | 4166 | GNUNET_memcpy (&demands[1], |
2702 | hash, | 4167 | hash, |
2703 | sizeof(struct GNUNET_HashCode)); | 4168 | sizeof(struct GNUNET_HashCode)); |
@@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls, | |||
2719 | { | 4184 | { |
2720 | struct Operation *op = cls; | 4185 | struct Operation *op = cls; |
2721 | 4186 | ||
2722 | perf_rtt.done.received += 1; | 4187 | /** |
4188 | * Check that the message is received only in supported phase | ||
4189 | */ | ||
4190 | uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; | ||
4191 | if (GNUNET_OK != | ||
4192 | check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) | ||
4193 | { | ||
4194 | GNUNET_break (0); | ||
4195 | fail_union_operation (op); | ||
4196 | return; | ||
4197 | } | ||
4198 | |||
4199 | if (op->active_passive_switch_required) | ||
4200 | { | ||
4201 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
4202 | "PROTOCOL VIOLATION: Received done but role change is necessary\n"); | ||
4203 | GNUNET_break (0); | ||
4204 | fail_union_operation (op); | ||
4205 | return; | ||
4206 | } | ||
4207 | |||
4208 | #if MEASURE_PERFORMANCE | ||
4209 | perf_store.done.received += 1; | ||
4210 | #endif | ||
2723 | switch (op->phase) | 4211 | switch (op->phase) |
2724 | { | 4212 | { |
2725 | case PHASE_PASSIVE_DECODING: | 4213 | case PHASE_PASSIVE_DECODING: |
@@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls, | |||
2728 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 4216 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2729 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); | 4217 | "got DONE (as passive partner), waiting for our demands to be satisfied\n"); |
2730 | /* The active peer is done sending offers | 4218 | /* The active peer is done sending offers |
2731 | * and inquiries. This means that all | 4219 | * and inquiries. This means that all |
2732 | * our responses to that (demands and offers) | 4220 | * our responses to that (demands and offers) |
2733 | * must be in flight (queued or in mesh). | 4221 | * must be in flight (queued or in mesh). |
2734 | * | 4222 | * |
2735 | * We should notify the active peer once | 4223 | * We should notify the active peer once |
2736 | * all our demands are satisfied, so that the active | 4224 | * all our demands are satisfied, so that the active |
2737 | * peer can quit if we gave it everything. | 4225 | * peer can quit if we gave it everything. |
2738 | */GNUNET_CADET_receive_done (op->channel); | 4226 | */GNUNET_CADET_receive_done (op->channel); |
2739 | maybe_finish (op); | 4227 | maybe_finish (op); |
2740 | return; | 4228 | return; |
2741 | case PHASE_ACTIVE_DECODING: | 4229 | case PHASE_ACTIVE_DECODING: |
2742 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 4230 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
2743 | "got DONE (as active partner), waiting to finish\n"); | 4231 | "got DONE (as active partner), waiting to finish\n"); |
2744 | /* All demands of the other peer are satisfied, | 4232 | /* All demands of the other peer are satisfied, |
2745 | * and we processed all offers, thus we know | 4233 | * and we processed all offers, thus we know |
2746 | * exactly what our demands must be. | 4234 | * exactly what our demands must be. |
2747 | * | 4235 | * |
2748 | * We'll close the channel | 4236 | * We'll close the channel |
2749 | * to the other peer once our demands are met. | 4237 | * to the other peer once our demands are met. |
2750 | */op->phase = PHASE_FINISH_CLOSING; | 4238 | */op->phase = PHASE_FINISH_CLOSING; |
2751 | GNUNET_CADET_receive_done (op->channel); | 4239 | GNUNET_CADET_receive_done (op->channel); |
2752 | maybe_finish (op); | 4240 | maybe_finish (op); |
2753 | return; | 4241 | return; |
@@ -2769,7 +4257,9 @@ static void | |||
2769 | handle_union_p2p_over (void *cls, | 4257 | handle_union_p2p_over (void *cls, |
2770 | const struct GNUNET_MessageHeader *mh) | 4258 | const struct GNUNET_MessageHeader *mh) |
2771 | { | 4259 | { |
2772 | perf_rtt.over.received += 1; | 4260 | #if MEASURE_PERFORMANCE |
4261 | perf_store.over.received += 1; | ||
4262 | #endif | ||
2773 | send_client_done (cls); | 4263 | send_client_done (cls); |
2774 | } | 4264 | } |
2775 | 4265 | ||
@@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls, | |||
2943 | struct Listener *listener = op->listener; | 4433 | struct Listener *listener = op->listener; |
2944 | const struct GNUNET_MessageHeader *nested_context; | 4434 | const struct GNUNET_MessageHeader *nested_context; |
2945 | 4435 | ||
2946 | /* double operation request */ | 4436 | /* double operation request */ |
2947 | if (0 != op->suggest_id) | 4437 | if (0 != op->suggest_id) |
2948 | { | 4438 | { |
2949 | GNUNET_break_op (0); | 4439 | GNUNET_break_op (0); |
@@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls, | |||
3053 | } | 4543 | } |
3054 | set = GNUNET_new (struct Set); | 4544 | set = GNUNET_new (struct Set); |
3055 | { | 4545 | { |
3056 | struct StrataEstimator *se; | 4546 | struct MultiStrataEstimator *se; |
3057 | 4547 | ||
3058 | se = strata_estimator_create (SE_STRATA_COUNT, | 4548 | se = strata_estimator_create (SE_STRATA_COUNT, |
3059 | SE_IBF_SIZE, | 4549 | SE_IBFS_TOTAL_SIZE, |
3060 | SE_IBF_HASH_NUM); | 4550 | SE_IBF_HASH_NUM); |
3061 | if (NULL == se) | 4551 | if (NULL == se) |
3062 | { | 4552 | { |
@@ -3198,6 +4688,7 @@ channel_window_cb (void *cls, | |||
3198 | * @param cls client that sent the message | 4688 | * @param cls client that sent the message |
3199 | * @param msg message sent by the client | 4689 | * @param msg message sent by the client |
3200 | */ | 4690 | */ |
4691 | |||
3201 | static void | 4692 | static void |
3202 | handle_client_listen (void *cls, | 4693 | handle_client_listen (void *cls, |
3203 | const struct GNUNET_SETU_ListenMessage *msg) | 4694 | const struct GNUNET_SETU_ListenMessage *msg) |
@@ -3240,10 +4731,10 @@ handle_client_listen (void *cls, | |||
3240 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, | 4731 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, |
3241 | struct GNUNET_MessageHeader, | 4732 | struct GNUNET_MessageHeader, |
3242 | NULL), | 4733 | NULL), |
3243 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, | 4734 | GNUNET_MQ_hd_var_size (union_p2p_request_full, |
3244 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, | 4735 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, |
3245 | struct GNUNET_MessageHeader, | 4736 | struct TransmitFullMessage, |
3246 | NULL), | 4737 | NULL), |
3247 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | 4738 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, |
3248 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, | 4739 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, |
3249 | struct StrataEstimatorMessage, | 4740 | struct StrataEstimatorMessage, |
@@ -3256,6 +4747,10 @@ handle_client_listen (void *cls, | |||
3256 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, | 4747 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, |
3257 | struct GNUNET_SETU_ElementMessage, | 4748 | struct GNUNET_SETU_ElementMessage, |
3258 | NULL), | 4749 | NULL), |
4750 | GNUNET_MQ_hd_var_size (union_p2p_send_full, | ||
4751 | GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL, | ||
4752 | struct TransmitFullMessage, | ||
4753 | NULL), | ||
3259 | GNUNET_MQ_handler_end () | 4754 | GNUNET_MQ_handler_end () |
3260 | }; | 4755 | }; |
3261 | struct Listener *listener; | 4756 | struct Listener *listener; |
@@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls, | |||
3451 | { | 4946 | { |
3452 | struct ClientState *cs = cls; | 4947 | struct ClientState *cs = cls; |
3453 | struct Operation *op = GNUNET_new (struct Operation); | 4948 | struct Operation *op = GNUNET_new (struct Operation); |
4949 | |||
3454 | const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { | 4950 | const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { |
3455 | GNUNET_MQ_hd_var_size (incoming_msg, | 4951 | GNUNET_MQ_hd_var_size (incoming_msg, |
3456 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, | 4952 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, |
@@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls, | |||
3488 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, | 4984 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, |
3489 | struct GNUNET_MessageHeader, | 4985 | struct GNUNET_MessageHeader, |
3490 | op), | 4986 | op), |
3491 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, | 4987 | GNUNET_MQ_hd_var_size (union_p2p_request_full, |
3492 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, | 4988 | GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, |
3493 | struct GNUNET_MessageHeader, | 4989 | struct TransmitFullMessage, |
3494 | op), | 4990 | op), |
3495 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | 4991 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, |
3496 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, | 4992 | GNUNET_MESSAGE_TYPE_SETU_P2P_SE, |
3497 | struct StrataEstimatorMessage, | 4993 | struct StrataEstimatorMessage, |
@@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls, | |||
3504 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, | 5000 | GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, |
3505 | struct GNUNET_SETU_ElementMessage, | 5001 | struct GNUNET_SETU_ElementMessage, |
3506 | op), | 5002 | op), |
5003 | GNUNET_MQ_hd_var_size (union_p2p_send_full, | ||
5004 | GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL, | ||
5005 | struct TransmitFullMessage, | ||
5006 | NULL), | ||
3507 | GNUNET_MQ_handler_end () | 5007 | GNUNET_MQ_handler_end () |
3508 | }; | 5008 | }; |
3509 | struct Set *set; | 5009 | struct Set *set; |
@@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls, | |||
3525 | op->force_full = msg->force_full; | 5025 | op->force_full = msg->force_full; |
3526 | op->force_delta = msg->force_delta; | 5026 | op->force_delta = msg->force_delta; |
3527 | op->symmetric = msg->symmetric; | 5027 | op->symmetric = msg->symmetric; |
5028 | op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff; | ||
5029 | op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor; | ||
5030 | op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element; | ||
5031 | op->byzantine_upper_bound = msg->byzantine_upper_bond; | ||
5032 | op->active_passive_switch_required = false; | ||
3528 | context = GNUNET_MQ_extract_nested_mh (msg); | 5033 | context = GNUNET_MQ_extract_nested_mh (msg); |
3529 | 5034 | ||
5035 | /* create hashmap for message control */ | ||
5036 | op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32, | ||
5037 | GNUNET_NO); | ||
5038 | op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO); | ||
5039 | |||
5040 | #if MEASURE_PERFORMANCE | ||
5041 | /* load config */ | ||
5042 | load_config (op); | ||
5043 | #endif | ||
5044 | |||
3530 | /* Advance generation values, so that | 5045 | /* Advance generation values, so that |
3531 | mutations won't interfer with the running operation. */ | 5046 | mutations won't interfer with the running operation. */ |
3532 | op->set = set; | 5047 | op->set = set; |
@@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls, | |||
3550 | struct GNUNET_MQ_Envelope *ev; | 5065 | struct GNUNET_MQ_Envelope *ev; |
3551 | struct OperationRequestMessage *msg; | 5066 | struct OperationRequestMessage *msg; |
3552 | 5067 | ||
3553 | perf_rtt.operation_request.sent += 1; | 5068 | #if MEASURE_PERFORMANCE |
5069 | perf_store.operation_request.sent += 1; | ||
5070 | #endif | ||
3554 | ev = GNUNET_MQ_msg_nested_mh (msg, | 5071 | ev = GNUNET_MQ_msg_nested_mh (msg, |
3555 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, | 5072 | GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, |
3556 | context); | 5073 | context); |
@@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls, | |||
3567 | op->se = strata_estimator_dup (op->set->se); | 5084 | op->se = strata_estimator_dup (op->set->se); |
3568 | /* we started the operation, thus we have to send the operation request */ | 5085 | /* we started the operation, thus we have to send the operation request */ |
3569 | op->phase = PHASE_EXPECT_SE; | 5086 | op->phase = PHASE_EXPECT_SE; |
3570 | op->salt_receive = op->salt_send = 42; // FIXME????? | 5087 | |
5088 | op->salt_receive = (op->peer_site + 1) % 2; | ||
5089 | op->salt_send = op->peer_site; // FIXME????? | ||
5090 | |||
5091 | |||
3571 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 5092 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
3572 | "Initiating union operation evaluation\n"); | 5093 | "Initiating union operation evaluation\n"); |
3573 | GNUNET_STATISTICS_update (_GSS_statistics, | 5094 | GNUNET_STATISTICS_update (_GSS_statistics, |
@@ -3711,6 +5232,20 @@ handle_client_accept (void *cls, | |||
3711 | op->force_full = msg->force_full; | 5232 | op->force_full = msg->force_full; |
3712 | op->force_delta = msg->force_delta; | 5233 | op->force_delta = msg->force_delta; |
3713 | op->symmetric = msg->symmetric; | 5234 | op->symmetric = msg->symmetric; |
5235 | op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff; | ||
5236 | op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor; | ||
5237 | op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element; | ||
5238 | op->byzantine_upper_bound = msg->byzantine_upper_bond; | ||
5239 | op->active_passive_switch_required = false; | ||
5240 | /* create hashmap for message control */ | ||
5241 | op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32, | ||
5242 | GNUNET_NO); | ||
5243 | op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO); | ||
5244 | |||
5245 | #if MEASURE_PERFORMANCE | ||
5246 | /* load config */ | ||
5247 | load_config (op); | ||
5248 | #endif | ||
3714 | 5249 | ||
3715 | /* Advance generation values, so that future mutations do not | 5250 | /* Advance generation values, so that future mutations do not |
3716 | interfer with the running operation. */ | 5251 | interfer with the running operation. */ |
@@ -3729,7 +5264,7 @@ handle_client_accept (void *cls, | |||
3729 | 1, | 5264 | 1, |
3730 | GNUNET_NO); | 5265 | GNUNET_NO); |
3731 | { | 5266 | { |
3732 | const struct StrataEstimator *se; | 5267 | struct MultiStrataEstimator *se; |
3733 | struct GNUNET_MQ_Envelope *ev; | 5268 | struct GNUNET_MQ_Envelope *ev; |
3734 | struct StrataEstimatorMessage *strata_msg; | 5269 | struct StrataEstimatorMessage *strata_msg; |
3735 | char *buf; | 5270 | char *buf; |
@@ -3739,20 +5274,40 @@ handle_client_accept (void *cls, | |||
3739 | op->se = strata_estimator_dup (op->set->se); | 5274 | op->se = strata_estimator_dup (op->set->se); |
3740 | op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, | 5275 | op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, |
3741 | GNUNET_NO); | 5276 | GNUNET_NO); |
3742 | op->salt_receive = op->salt_send = 42; // FIXME????? | 5277 | op->salt_receive = (op->peer_site + 1) % 2; |
5278 | op->salt_send = op->peer_site; // FIXME????? | ||
3743 | initialize_key_to_element (op); | 5279 | initialize_key_to_element (op); |
3744 | op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( | 5280 | op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( |
3745 | op->key_to_element); | 5281 | op->key_to_element); |
3746 | 5282 | ||
3747 | /* kick off the operation */ | 5283 | /* kick off the operation */ |
3748 | se = op->se; | 5284 | se = op->se; |
3749 | buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); | 5285 | |
5286 | uint8_t se_count = 1; | ||
5287 | if (op->initial_size > 0) | ||
5288 | { | ||
5289 | op->total_elements_size_local = 0; | ||
5290 | GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, | ||
5291 | & | ||
5292 | determinate_avg_element_size_iterator, | ||
5293 | op); | ||
5294 | se_count = determine_strata_count ( | ||
5295 | op->total_elements_size_local / op->initial_size, | ||
5296 | op->initial_size); | ||
5297 | } | ||
5298 | buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE | ||
5299 | * ((SE_IBFS_TOTAL_SIZE / 8) * se_count)); | ||
3750 | len = strata_estimator_write (se, | 5300 | len = strata_estimator_write (se, |
5301 | SE_IBFS_TOTAL_SIZE, | ||
5302 | se_count, | ||
3751 | buf); | 5303 | buf); |
3752 | perf_rtt.se.sent += 1; | 5304 | #if MEASURE_PERFORMANCE |
3753 | perf_rtt.se.sent_var_bytes += len; | 5305 | perf_store.se.sent += 1; |
5306 | perf_store.se.sent_var_bytes += len; | ||
5307 | #endif | ||
3754 | 5308 | ||
3755 | if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) | 5309 | if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE |
5310 | * SE_IBFS_TOTAL_SIZE) | ||
3756 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; | 5311 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; |
3757 | else | 5312 | else |
3758 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; | 5313 | type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; |
@@ -3766,6 +5321,7 @@ handle_client_accept (void *cls, | |||
3766 | strata_msg->set_size | 5321 | strata_msg->set_size |
3767 | = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( | 5322 | = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( |
3768 | op->set->content->elements)); | 5323 | op->set->content->elements)); |
5324 | strata_msg->se_count = se_count; | ||
3769 | GNUNET_MQ_send (op->mq, | 5325 | GNUNET_MQ_send (op->mq, |
3770 | ev); | 5326 | ev); |
3771 | op->phase = PHASE_EXPECT_IBF; | 5327 | op->phase = PHASE_EXPECT_IBF; |
@@ -3800,8 +5356,9 @@ shutdown_task (void *cls) | |||
3800 | GNUNET_YES); | 5356 | GNUNET_YES); |
3801 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 5357 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3802 | "handled shutdown request\n"); | 5358 | "handled shutdown request\n"); |
3803 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 5359 | #if MEASURE_PERFORMANCE |
3804 | "RTT:%f\n", calculate_perf_rtt()); | 5360 | calculate_perf_store (); |
5361 | #endif | ||
3805 | } | 5362 | } |
3806 | 5363 | ||
3807 | 5364 | ||
diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h index a2803ee47..d8f34f69c 100644 --- a/src/setu/gnunet-service-setu_protocol.h +++ b/src/setu/gnunet-service-setu_protocol.h | |||
@@ -40,11 +40,6 @@ struct OperationRequestMessage | |||
40 | struct GNUNET_MessageHeader header; | 40 | struct GNUNET_MessageHeader header; |
41 | 41 | ||
42 | /** | 42 | /** |
43 | * Operation to request, values from `enum GNUNET_SET_OperationType` | ||
44 | */ | ||
45 | uint32_t operation GNUNET_PACKED; | ||
46 | |||
47 | /** | ||
48 | * For Intersection: my element count | 43 | * For Intersection: my element count |
49 | */ | 44 | */ |
50 | uint32_t element_count GNUNET_PACKED; | 45 | uint32_t element_count GNUNET_PACKED; |
@@ -72,20 +67,9 @@ struct IBFMessage | |||
72 | struct GNUNET_MessageHeader header; | 67 | struct GNUNET_MessageHeader header; |
73 | 68 | ||
74 | /** | 69 | /** |
75 | * Order of the whole ibf, where | 70 | * Size of the whole ibf (number of buckets) |
76 | * num_buckets = 2^order | ||
77 | */ | ||
78 | uint8_t order; | ||
79 | |||
80 | /** | ||
81 | * Padding, must be 0. | ||
82 | */ | 71 | */ |
83 | uint8_t reserved1; | 72 | uint32_t ibf_size; |
84 | |||
85 | /** | ||
86 | * Padding, must be 0. | ||
87 | */ | ||
88 | uint16_t reserved2 GNUNET_PACKED; | ||
89 | 73 | ||
90 | /** | 74 | /** |
91 | * Offset of the strata in the rest of the message | 75 | * Offset of the strata in the rest of the message |
@@ -95,10 +79,22 @@ struct IBFMessage | |||
95 | /** | 79 | /** |
96 | * Salt used when hashing elements for this IBF. | 80 | * Salt used when hashing elements for this IBF. |
97 | */ | 81 | */ |
98 | uint32_t salt GNUNET_PACKED; | 82 | uint16_t salt GNUNET_PACKED; |
99 | 83 | ||
84 | /** | ||
85 | * The bit lenght of the counter | ||
86 | */ | ||
87 | uint16_t ibf_counter_bit_length; | ||
100 | /* rest: buckets */ | 88 | /* rest: buckets */ |
101 | }; | 89 | }; |
90 | /** | ||
91 | estimate_best_mode_of_operation (uint64_t avg_element_size, | ||
92 | uint64_t local_set_size, | ||
93 | uint64_t remote_set_size, | ||
94 | uint64_t est_set_diff_remote, | ||
95 | uint64_t est_set_diff_local,) | ||
96 | **/ | ||
97 | |||
102 | 98 | ||
103 | 99 | ||
104 | struct InquiryMessage | 100 | struct InquiryMessage |
@@ -113,11 +109,6 @@ struct InquiryMessage | |||
113 | */ | 109 | */ |
114 | uint32_t salt GNUNET_PACKED; | 110 | uint32_t salt GNUNET_PACKED; |
115 | 111 | ||
116 | /** | ||
117 | * Reserved, set to 0. | ||
118 | */ | ||
119 | uint32_t reserved GNUNET_PACKED; | ||
120 | |||
121 | /* rest: inquiry IBF keys */ | 112 | /* rest: inquiry IBF keys */ |
122 | }; | 113 | }; |
123 | 114 | ||
@@ -218,9 +209,47 @@ struct StrataEstimatorMessage | |||
218 | */ | 209 | */ |
219 | struct GNUNET_MessageHeader header; | 210 | struct GNUNET_MessageHeader header; |
220 | 211 | ||
212 | /** | ||
213 | * The number of ses transmitted | ||
214 | */ | ||
215 | uint8_t se_count; | ||
216 | |||
217 | /** | ||
218 | * Size of the local set | ||
219 | */ | ||
221 | uint64_t set_size; | 220 | uint64_t set_size; |
222 | }; | 221 | }; |
223 | 222 | ||
223 | |||
224 | /** | ||
225 | * Message which signals to other peer that we are sending full set | ||
226 | * | ||
227 | */ | ||
228 | struct TransmitFullMessage | ||
229 | { | ||
230 | /** | ||
231 | * Type: #GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL | ||
232 | */ | ||
233 | struct GNUNET_MessageHeader header; | ||
234 | |||
235 | /** | ||
236 | * Remote set difference calculated with strata estimator | ||
237 | */ | ||
238 | uint32_t remote_set_difference; | ||
239 | |||
240 | /** | ||
241 | * Total remote set size | ||
242 | */ | ||
243 | uint32_t remote_set_size; | ||
244 | |||
245 | /** | ||
246 | * Local set difference calculated with strata estimator | ||
247 | */ | ||
248 | uint32_t local_set_difference; | ||
249 | |||
250 | }; | ||
251 | |||
252 | |||
224 | GNUNET_NETWORK_STRUCT_END | 253 | GNUNET_NETWORK_STRUCT_END |
225 | 254 | ||
226 | #endif | 255 | #endif |
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c index 7c9a4deb6..7981cc847 100644 --- a/src/setu/gnunet-service-setu_strata_estimator.c +++ b/src/setu/gnunet-service-setu_strata_estimator.c | |||
@@ -22,6 +22,7 @@ | |||
22 | * @brief invertible bloom filter | 22 | * @brief invertible bloom filter |
23 | * @author Florian Dold | 23 | * @author Florian Dold |
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | #include "platform.h" | 27 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
@@ -30,6 +31,82 @@ | |||
30 | 31 | ||
31 | 32 | ||
32 | /** | 33 | /** |
34 | * Should we try compressing the strata estimator? This will | ||
35 | * break compatibility with the 0.10.1-network. | ||
36 | */ | ||
37 | #define FAIL_10_1_COMPATIBILTIY 1 | ||
38 | |||
39 | /** | ||
40 | * Number of strata estimators in memory NOT transmitted | ||
41 | */ | ||
42 | |||
43 | #define MULTI_SE_BASE_COUNT 8 | ||
44 | |||
45 | /** | ||
46 | * The avg size of 1 se | ||
47 | * Based on the bsc thesis of Elias Summermatter (2021) | ||
48 | */ | ||
49 | |||
50 | #define AVG_BYTE_SIZE_SE 4221 | ||
51 | |||
52 | /** | ||
53 | * Calculates the optimal number of strata Estimators to send | ||
54 | * @param avg_element_size | ||
55 | * @param element_count | ||
56 | * @return | ||
57 | */ | ||
58 | uint8_t | ||
59 | determine_strata_count (uint64_t avg_element_size, uint64_t element_count) | ||
60 | { | ||
61 | uint64_t base_size = avg_element_size * element_count; | ||
62 | /* >67kb total size of elements in set */ | ||
63 | if (base_size < AVG_BYTE_SIZE_SE * 16) | ||
64 | return 1; | ||
65 | /* >270kb total size of elements in set */ | ||
66 | if (base_size < AVG_BYTE_SIZE_SE * 64) | ||
67 | return 2; | ||
68 | /* >1mb total size of elements in set */ | ||
69 | if (base_size < AVG_BYTE_SIZE_SE * 256) | ||
70 | return 4; | ||
71 | return 8; | ||
72 | } | ||
73 | |||
74 | |||
75 | /** | ||
76 | * Modify an IBF key @a k_in based on the @a salt, returning a | ||
77 | * salted key in @a k_out. | ||
78 | */ | ||
79 | static void | ||
80 | salt_key (const struct IBF_Key *k_in, | ||
81 | uint32_t salt, | ||
82 | struct IBF_Key *k_out) | ||
83 | { | ||
84 | int s = (salt * 7) % 64; | ||
85 | uint64_t x = k_in->key_val; | ||
86 | |||
87 | /* rotate ibf key */ | ||
88 | x = (x >> s) | (x << (64 - s)); | ||
89 | k_out->key_val = x; | ||
90 | } | ||
91 | |||
92 | |||
93 | /** | ||
94 | * Reverse modification done in the salt_key function | ||
95 | */ | ||
96 | static void | ||
97 | unsalt_key (const struct IBF_Key *k_in, | ||
98 | uint32_t salt, | ||
99 | struct IBF_Key *k_out) | ||
100 | { | ||
101 | int s = (salt * 7) % 64; | ||
102 | uint64_t x = k_in->key_val; | ||
103 | |||
104 | x = (x << s) | (x >> (64 - s)); | ||
105 | k_out->key_val = x; | ||
106 | } | ||
107 | |||
108 | |||
109 | /** | ||
33 | * Write the given strata estimator to the buffer. | 110 | * Write the given strata estimator to the buffer. |
34 | * | 111 | * |
35 | * @param se strata estimator to serialize | 112 | * @param se strata estimator to serialize |
@@ -37,21 +114,33 @@ | |||
37 | * @return number of bytes written to @a buf | 114 | * @return number of bytes written to @a buf |
38 | */ | 115 | */ |
39 | size_t | 116 | size_t |
40 | strata_estimator_write (const struct StrataEstimator *se, | 117 | strata_estimator_write (struct MultiStrataEstimator *se, |
118 | uint16_t se_ibf_total_size, | ||
119 | uint8_t number_se_send, | ||
41 | void *buf) | 120 | void *buf) |
42 | { | 121 | { |
43 | char *sbuf = buf; | 122 | char *sbuf = buf; |
123 | unsigned int i; | ||
44 | size_t osize; | 124 | size_t osize; |
125 | uint64_t sbuf_offset = 0; | ||
126 | se->size = number_se_send; | ||
45 | 127 | ||
46 | GNUNET_assert (NULL != se); | 128 | GNUNET_assert (NULL != se); |
47 | for (unsigned int i = 0; i < se->strata_count; i++) | 129 | for (uint8_t strata_ctr = 0; strata_ctr < number_se_send; strata_ctr++) |
48 | { | 130 | { |
49 | ibf_write_slice (se->strata[i], | 131 | for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) |
50 | 0, | 132 | { |
51 | se->ibf_size, | 133 | ibf_write_slice (se->stratas[strata_ctr]->strata[i], |
52 | &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]); | 134 | 0, |
135 | se->stratas[strata_ctr]->ibf_size, | ||
136 | &sbuf[sbuf_offset], | ||
137 | 8); | ||
138 | sbuf_offset += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE; | ||
139 | } | ||
53 | } | 140 | } |
54 | osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; | 141 | osize = ((se_ibf_total_size / 8) * number_se_send) * IBF_BUCKET_SIZE |
142 | * se->stratas[0]->strata_count; | ||
143 | #if FAIL_10_1_COMPATIBILTIY | ||
55 | { | 144 | { |
56 | char *cbuf; | 145 | char *cbuf; |
57 | size_t nsize; | 146 | size_t nsize; |
@@ -62,13 +151,12 @@ strata_estimator_write (const struct StrataEstimator *se, | |||
62 | &cbuf, | 151 | &cbuf, |
63 | &nsize)) | 152 | &nsize)) |
64 | { | 153 | { |
65 | GNUNET_memcpy (buf, | 154 | GNUNET_memcpy (buf, cbuf, nsize); |
66 | cbuf, | ||
67 | nsize); | ||
68 | osize = nsize; | 155 | osize = nsize; |
69 | GNUNET_free (cbuf); | 156 | GNUNET_free (cbuf); |
70 | } | 157 | } |
71 | } | 158 | } |
159 | #endif | ||
72 | return osize; | 160 | return osize; |
73 | } | 161 | } |
74 | 162 | ||
@@ -87,15 +175,19 @@ int | |||
87 | strata_estimator_read (const void *buf, | 175 | strata_estimator_read (const void *buf, |
88 | size_t buf_len, | 176 | size_t buf_len, |
89 | int is_compressed, | 177 | int is_compressed, |
90 | struct StrataEstimator *se) | 178 | uint8_t number_se_received, |
179 | uint16_t se_ibf_total_size, | ||
180 | struct MultiStrataEstimator *se) | ||
91 | { | 181 | { |
182 | unsigned int i; | ||
92 | size_t osize; | 183 | size_t osize; |
93 | char *dbuf; | 184 | char *dbuf; |
94 | 185 | ||
95 | dbuf = NULL; | 186 | dbuf = NULL; |
96 | if (GNUNET_YES == is_compressed) | 187 | if (GNUNET_YES == is_compressed) |
97 | { | 188 | { |
98 | osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; | 189 | osize = ((se_ibf_total_size / 8) * number_se_received) * IBF_BUCKET_SIZE |
190 | * se->stratas[0]->strata_count; | ||
99 | dbuf = GNUNET_decompress (buf, | 191 | dbuf = GNUNET_decompress (buf, |
100 | buf_len, | 192 | buf_len, |
101 | osize); | 193 | osize); |
@@ -108,18 +200,25 @@ strata_estimator_read (const void *buf, | |||
108 | buf_len = osize; | 200 | buf_len = osize; |
109 | } | 201 | } |
110 | 202 | ||
111 | if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE) | 203 | if (buf_len != se->stratas[0]->strata_count * ((se_ibf_total_size / 8) |
204 | * number_se_received) | ||
205 | * IBF_BUCKET_SIZE) | ||
112 | { | 206 | { |
113 | GNUNET_break (0); /* very odd error */ | 207 | GNUNET_break (0); /* very odd error */ |
114 | GNUNET_free (dbuf); | 208 | GNUNET_free (dbuf); |
115 | return GNUNET_SYSERR; | 209 | return GNUNET_SYSERR; |
116 | } | 210 | } |
117 | 211 | ||
118 | for (unsigned int i = 0; i < se->strata_count; i++) | 212 | for (uint8_t strata_ctr = 0; strata_ctr < number_se_received; strata_ctr++) |
119 | { | 213 | { |
120 | ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); | 214 | for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) |
121 | buf += se->ibf_size * IBF_BUCKET_SIZE; | 215 | { |
216 | ibf_read_slice (buf, 0, se->stratas[strata_ctr]->ibf_size, | ||
217 | se->stratas[strata_ctr]->strata[i], 8); | ||
218 | buf += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE; | ||
219 | } | ||
122 | } | 220 | } |
221 | se->size = number_se_received; | ||
123 | GNUNET_free (dbuf); | 222 | GNUNET_free (dbuf); |
124 | return GNUNET_OK; | 223 | return GNUNET_OK; |
125 | } | 224 | } |
@@ -132,38 +231,61 @@ strata_estimator_read (const void *buf, | |||
132 | * @param key key to add | 231 | * @param key key to add |
133 | */ | 232 | */ |
134 | void | 233 | void |
135 | strata_estimator_insert (struct StrataEstimator *se, | 234 | strata_estimator_insert (struct MultiStrataEstimator *se, |
136 | struct IBF_Key key) | 235 | struct IBF_Key key) |
137 | { | 236 | { |
138 | uint64_t v; | ||
139 | unsigned int i; | ||
140 | 237 | ||
141 | v = key.key_val; | 238 | |
142 | /* count trailing '1'-bits of v */ | 239 | /* count trailing '1'-bits of v */ |
143 | for (i = 0; v & 1; v >>= 1, i++) | 240 | for (int strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) |
144 | /* empty */; | 241 | { |
145 | ibf_insert (se->strata[i], key); | 242 | unsigned int i; |
243 | uint64_t v; | ||
244 | |||
245 | struct IBF_Key salted_key; | ||
246 | salt_key (&key, | ||
247 | strata_ctr * (64 / MULTI_SE_BASE_COUNT), | ||
248 | &salted_key); | ||
249 | v = salted_key.key_val; | ||
250 | for (i = 0; v & 1; v >>= 1, i++) | ||
251 | { | ||
252 | ibf_insert (se->stratas[strata_ctr]->strata[i], salted_key); | ||
253 | } | ||
254 | } | ||
255 | /* empty */; | ||
256 | |||
146 | } | 257 | } |
147 | 258 | ||
148 | 259 | ||
149 | /** | 260 | /** |
150 | * Remove a key from the strata estimator. | 261 | * Remove a key from the strata estimator. (NOT USED) |
151 | * | 262 | * |
152 | * @param se strata estimator to remove the key from | 263 | * @param se strata estimator to remove the key from |
153 | * @param key key to remove | 264 | * @param key key to remove |
154 | */ | 265 | */ |
155 | void | 266 | void |
156 | strata_estimator_remove (struct StrataEstimator *se, | 267 | strata_estimator_remove (struct MultiStrataEstimator *se, |
157 | struct IBF_Key key) | 268 | struct IBF_Key key) |
158 | { | 269 | { |
159 | uint64_t v; | ||
160 | unsigned int i; | ||
161 | 270 | ||
162 | v = key.key_val; | ||
163 | /* count trailing '1'-bits of v */ | 271 | /* count trailing '1'-bits of v */ |
164 | for (i = 0; v & 1; v >>= 1, i++) | 272 | for (int strata_ctr = 0; strata_ctr < se->size; strata_ctr++) |
165 | /* empty */; | 273 | { |
166 | ibf_remove (se->strata[i], key); | 274 | uint64_t v; |
275 | unsigned int i; | ||
276 | |||
277 | struct IBF_Key unsalted_key; | ||
278 | unsalt_key (&key, | ||
279 | strata_ctr * (64 / MULTI_SE_BASE_COUNT), | ||
280 | &unsalted_key); | ||
281 | |||
282 | v = unsalted_key.key_val; | ||
283 | for (i = 0; v & 1; v >>= 1, i++) | ||
284 | { | ||
285 | /* empty */; | ||
286 | ibf_remove (se->stratas[strata_ctr]->strata[i], unsalted_key); | ||
287 | } | ||
288 | } | ||
167 | } | 289 | } |
168 | 290 | ||
169 | 291 | ||
@@ -175,29 +297,42 @@ strata_estimator_remove (struct StrataEstimator *se, | |||
175 | * @param ibf_hashnum hashnum parameter of each ibf | 297 | * @param ibf_hashnum hashnum parameter of each ibf |
176 | * @return a freshly allocated, empty strata estimator, NULL on error | 298 | * @return a freshly allocated, empty strata estimator, NULL on error |
177 | */ | 299 | */ |
178 | struct StrataEstimator * | 300 | struct MultiStrataEstimator * |
179 | strata_estimator_create (unsigned int strata_count, | 301 | strata_estimator_create (unsigned int strata_count, |
180 | uint32_t ibf_size, | 302 | uint32_t ibf_size, |
181 | uint8_t ibf_hashnum) | 303 | uint8_t ibf_hashnum) |
182 | { | 304 | { |
183 | struct StrataEstimator *se; | 305 | struct MultiStrataEstimator *se; |
184 | 306 | unsigned int i; | |
185 | se = GNUNET_new (struct StrataEstimator); | 307 | unsigned int j; |
186 | se->strata_count = strata_count; | 308 | se = GNUNET_new (struct MultiStrataEstimator); |
187 | se->ibf_size = ibf_size; | 309 | |
188 | se->strata = GNUNET_new_array (strata_count, | 310 | se->size = MULTI_SE_BASE_COUNT; |
189 | struct InvertibleBloomFilter *); | 311 | se->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *); |
190 | for (unsigned int i = 0; i < strata_count; i++) | 312 | |
313 | uint8_t ibf_prime_sizes[] = {79,79,79,79,79,79,79,79}; | ||
314 | |||
315 | for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) | ||
191 | { | 316 | { |
192 | se->strata[i] = ibf_create (ibf_size, ibf_hashnum); | 317 | se->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator); |
193 | if (NULL == se->strata[i]) | 318 | se->stratas[strata_ctr]->strata_count = strata_count; |
319 | se->stratas[strata_ctr]->ibf_size = ibf_prime_sizes[strata_ctr]; | ||
320 | se->stratas[strata_ctr]->strata = GNUNET_new_array (strata_count * 4, | ||
321 | struct | ||
322 | InvertibleBloomFilter *); | ||
323 | for (i = 0; i < strata_count; i++) | ||
194 | { | 324 | { |
195 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 325 | se->stratas[strata_ctr]->strata[i] = ibf_create ( |
196 | "Failed to allocate memory for strata estimator\n"); | 326 | ibf_prime_sizes[strata_ctr], ibf_hashnum); |
197 | for (unsigned int j = 0; j < i; j++) | 327 | if (NULL == se->stratas[strata_ctr]->strata[i]) |
198 | ibf_destroy (se->strata[i]); | 328 | { |
199 | GNUNET_free (se); | 329 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
200 | return NULL; | 330 | "Failed to allocate memory for strata estimator\n"); |
331 | for (j = 0; j < i; j++) | ||
332 | ibf_destroy (se->stratas[strata_ctr]->strata[i]); | ||
333 | GNUNET_free (se); | ||
334 | return NULL; | ||
335 | } | ||
201 | } | 336 | } |
202 | } | 337 | } |
203 | return se; | 338 | return se; |
@@ -213,46 +348,71 @@ strata_estimator_create (unsigned int strata_count, | |||
213 | * @param se2 second strata estimator | 348 | * @param se2 second strata estimator |
214 | * @return the estimated difference | 349 | * @return the estimated difference |
215 | */ | 350 | */ |
216 | unsigned int | 351 | void |
217 | strata_estimator_difference (const struct StrataEstimator *se1, | 352 | strata_estimator_difference (const struct MultiStrataEstimator *se1, |
218 | const struct StrataEstimator *se2) | 353 | const struct MultiStrataEstimator *se2) |
219 | { | 354 | { |
220 | unsigned int count; | 355 | int avg_local_diff = 0; |
356 | int avg_remote_diff = 0; | ||
357 | uint8_t number_of_estimators = se1->size; | ||
221 | 358 | ||
222 | GNUNET_assert (se1->strata_count == se2->strata_count); | 359 | for (uint8_t strata_ctr = 0; strata_ctr < number_of_estimators; strata_ctr++) |
223 | count = 0; | ||
224 | for (int i = se1->strata_count - 1; i >= 0; i--) | ||
225 | { | 360 | { |
226 | struct InvertibleBloomFilter *diff; | 361 | GNUNET_assert (se1->stratas[strata_ctr]->strata_count == |
227 | /* number of keys decoded from the ibf */ | 362 | se2->stratas[strata_ctr]->strata_count); |
228 | 363 | ||
229 | /* FIXME: implement this without always allocating new IBFs */ | 364 | |
230 | diff = ibf_dup (se1->strata[i]); | 365 | for (int i = se1->stratas[strata_ctr]->strata_count - 1; i >= 0; i--) |
231 | ibf_subtract (diff, | ||
232 | se2->strata[i]); | ||
233 | for (int ibf_count = 0; GNUNET_YES; ibf_count++) | ||
234 | { | 366 | { |
235 | int more; | 367 | struct InvertibleBloomFilter *diff; |
368 | /* number of keys decoded from the ibf */ | ||
236 | 369 | ||
237 | more = ibf_decode (diff, | 370 | /* FIXME: implement this without always allocating new IBFs */ |
238 | NULL, | 371 | diff = ibf_dup (se1->stratas[strata_ctr]->strata[i]); |
239 | NULL); | 372 | diff->local_decoded_count = 0; |
240 | if (GNUNET_NO == more) | 373 | diff->remote_decoded_count = 0; |
241 | { | 374 | |
242 | count += ibf_count; | 375 | ibf_subtract (diff, se2->stratas[strata_ctr]->strata[i]); |
243 | break; | 376 | |
244 | } | 377 | for (int ibf_count = 0; GNUNET_YES; ibf_count++) |
245 | /* Estimate if decoding fails or would not terminate */ | ||
246 | if ( (GNUNET_SYSERR == more) || | ||
247 | (ibf_count > diff->size) ) | ||
248 | { | 378 | { |
249 | ibf_destroy (diff); | 379 | int more; |
250 | return count * (1 << (i + 1)); | 380 | |
381 | more = ibf_decode (diff, NULL, NULL); | ||
382 | if (GNUNET_NO == more) | ||
383 | { | ||
384 | se1->stratas[strata_ctr]->strata[0]->local_decoded_count += | ||
385 | diff->local_decoded_count; | ||
386 | se1->stratas[strata_ctr]->strata[0]->remote_decoded_count += | ||
387 | diff->remote_decoded_count; | ||
388 | break; | ||
389 | } | ||
390 | /* Estimate if decoding fails or would not terminate */ | ||
391 | if ((GNUNET_SYSERR == more) || (ibf_count > diff->size)) | ||
392 | { | ||
393 | se1->stratas[strata_ctr]->strata[0]->local_decoded_count = | ||
394 | se1->stratas[strata_ctr]->strata[0]->local_decoded_count * (1 << (i | ||
395 | + | ||
396 | 1)); | ||
397 | se1->stratas[strata_ctr]->strata[0]->remote_decoded_count = | ||
398 | se1->stratas[strata_ctr]->strata[0]->remote_decoded_count * (1 << (i | ||
399 | + | ||
400 | 1)); | ||
401 | ibf_destroy (diff); | ||
402 | goto break_all_counting_loops; | ||
403 | } | ||
251 | } | 404 | } |
405 | ibf_destroy (diff); | ||
252 | } | 406 | } |
253 | ibf_destroy (diff); | 407 | break_all_counting_loops:; |
408 | avg_local_diff += se1->stratas[strata_ctr]->strata[0]->local_decoded_count; | ||
409 | avg_remote_diff += | ||
410 | se1->stratas[strata_ctr]->strata[0]->remote_decoded_count; | ||
254 | } | 411 | } |
255 | return count; | 412 | se1->stratas[0]->strata[0]->local_decoded_count = avg_local_diff |
413 | / number_of_estimators; | ||
414 | se1->stratas[0]->strata[0]->remote_decoded_count = avg_remote_diff | ||
415 | / number_of_estimators; | ||
256 | } | 416 | } |
257 | 417 | ||
258 | 418 | ||
@@ -262,18 +422,28 @@ strata_estimator_difference (const struct StrataEstimator *se1, | |||
262 | * @param se the strata estimator to copy | 422 | * @param se the strata estimator to copy |
263 | * @return the copy | 423 | * @return the copy |
264 | */ | 424 | */ |
265 | struct StrataEstimator * | 425 | struct MultiStrataEstimator * |
266 | strata_estimator_dup (struct StrataEstimator *se) | 426 | strata_estimator_dup (struct MultiStrataEstimator *se) |
267 | { | 427 | { |
268 | struct StrataEstimator *c; | 428 | struct MultiStrataEstimator *c; |
269 | 429 | unsigned int i; | |
270 | c = GNUNET_new (struct StrataEstimator); | 430 | |
271 | c->strata_count = se->strata_count; | 431 | c = GNUNET_new (struct MultiStrataEstimator); |
272 | c->ibf_size = se->ibf_size; | 432 | c->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *); |
273 | c->strata = GNUNET_new_array (se->strata_count, | 433 | for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) |
274 | struct InvertibleBloomFilter *); | 434 | { |
275 | for (unsigned int i = 0; i < se->strata_count; i++) | 435 | c->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator); |
276 | c->strata[i] = ibf_dup (se->strata[i]); | 436 | c->stratas[strata_ctr]->strata_count = |
437 | se->stratas[strata_ctr]->strata_count; | ||
438 | c->stratas[strata_ctr]->ibf_size = se->stratas[strata_ctr]->ibf_size; | ||
439 | c->stratas[strata_ctr]->strata = GNUNET_new_array ( | ||
440 | se->stratas[strata_ctr]->strata_count, | ||
441 | struct | ||
442 | InvertibleBloomFilter *); | ||
443 | for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) | ||
444 | c->stratas[strata_ctr]->strata[i] = ibf_dup ( | ||
445 | se->stratas[strata_ctr]->strata[i]); | ||
446 | } | ||
277 | return c; | 447 | return c; |
278 | } | 448 | } |
279 | 449 | ||
@@ -284,10 +454,14 @@ strata_estimator_dup (struct StrataEstimator *se) | |||
284 | * @param se strata estimator to destroy. | 454 | * @param se strata estimator to destroy. |
285 | */ | 455 | */ |
286 | void | 456 | void |
287 | strata_estimator_destroy (struct StrataEstimator *se) | 457 | strata_estimator_destroy (struct MultiStrataEstimator *se) |
288 | { | 458 | { |
289 | for (unsigned int i = 0; i < se->strata_count; i++) | 459 | unsigned int i; |
290 | ibf_destroy (se->strata[i]); | 460 | for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) |
291 | GNUNET_free (se->strata); | 461 | { |
462 | for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) | ||
463 | ibf_destroy (se->stratas[strata_ctr]->strata[i]); | ||
464 | GNUNET_free (se->stratas[strata_ctr]->strata); | ||
465 | } | ||
292 | GNUNET_free (se); | 466 | GNUNET_free (se); |
293 | } | 467 | } |
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h index afdbcdbbf..4871a7fcd 100644 --- a/src/setu/gnunet-service-setu_strata_estimator.h +++ b/src/setu/gnunet-service-setu_strata_estimator.h | |||
@@ -22,6 +22,7 @@ | |||
22 | * @file set/gnunet-service-setu_strata_estimator.h | 22 | * @file set/gnunet-service-setu_strata_estimator.h |
23 | * @brief estimator of set difference | 23 | * @brief estimator of set difference |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | 27 | ||
27 | #ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H | 28 | #ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H |
@@ -61,6 +62,31 @@ struct StrataEstimator | |||
61 | unsigned int ibf_size; | 62 | unsigned int ibf_size; |
62 | }; | 63 | }; |
63 | 64 | ||
65 | struct MultiStrataEstimator | ||
66 | { | ||
67 | /** | ||
68 | * Array of strata estimators | ||
69 | */ | ||
70 | struct StrataEstimator **stratas; | ||
71 | |||
72 | /** | ||
73 | * Number of strata estimators in struct | ||
74 | */ | ||
75 | uint8_t size; | ||
76 | |||
77 | }; | ||
78 | |||
79 | /** | ||
80 | * Deteminate how many strata estimators in the message are necessary | ||
81 | * @param avg_element_size | ||
82 | * @param element_count | ||
83 | * @return number of strata's | ||
84 | */ | ||
85 | |||
86 | uint8_t | ||
87 | determine_strata_count (uint64_t avg_element_size, | ||
88 | uint64_t element_count); | ||
89 | |||
64 | 90 | ||
65 | /** | 91 | /** |
66 | * Write the given strata estimator to the buffer. | 92 | * Write the given strata estimator to the buffer. |
@@ -70,7 +96,9 @@ struct StrataEstimator | |||
70 | * @return number of bytes written to @a buf | 96 | * @return number of bytes written to @a buf |
71 | */ | 97 | */ |
72 | size_t | 98 | size_t |
73 | strata_estimator_write (const struct StrataEstimator *se, | 99 | strata_estimator_write (struct MultiStrataEstimator *se, |
100 | uint16_t se_ibf_total_size, | ||
101 | uint8_t number_se_send, | ||
74 | void *buf); | 102 | void *buf); |
75 | 103 | ||
76 | 104 | ||
@@ -88,7 +116,9 @@ int | |||
88 | strata_estimator_read (const void *buf, | 116 | strata_estimator_read (const void *buf, |
89 | size_t buf_len, | 117 | size_t buf_len, |
90 | int is_compressed, | 118 | int is_compressed, |
91 | struct StrataEstimator *se); | 119 | uint8_t number_se_received, |
120 | uint16_t se_ibf_total_size, | ||
121 | struct MultiStrataEstimator *se); | ||
92 | 122 | ||
93 | 123 | ||
94 | /** | 124 | /** |
@@ -99,7 +129,7 @@ strata_estimator_read (const void *buf, | |||
99 | * @param ibf_hashnum hashnum parameter of each ibf | 129 | * @param ibf_hashnum hashnum parameter of each ibf |
100 | * @return a freshly allocated, empty strata estimator, NULL on error | 130 | * @return a freshly allocated, empty strata estimator, NULL on error |
101 | */ | 131 | */ |
102 | struct StrataEstimator * | 132 | struct MultiStrataEstimator * |
103 | strata_estimator_create (unsigned int strata_count, | 133 | strata_estimator_create (unsigned int strata_count, |
104 | uint32_t ibf_size, | 134 | uint32_t ibf_size, |
105 | uint8_t ibf_hashnum); | 135 | uint8_t ibf_hashnum); |
@@ -111,11 +141,11 @@ strata_estimator_create (unsigned int strata_count, | |||
111 | * | 141 | * |
112 | * @param se1 first strata estimator | 142 | * @param se1 first strata estimator |
113 | * @param se2 second strata estimator | 143 | * @param se2 second strata estimator |
114 | * @return abs(|se1| - |se2|) | 144 | * @return nothing |
115 | */ | 145 | */ |
116 | unsigned int | 146 | void |
117 | strata_estimator_difference (const struct StrataEstimator *se1, | 147 | strata_estimator_difference (const struct MultiStrataEstimator *se1, |
118 | const struct StrataEstimator *se2); | 148 | const struct MultiStrataEstimator *se2); |
119 | 149 | ||
120 | 150 | ||
121 | /** | 151 | /** |
@@ -125,7 +155,7 @@ strata_estimator_difference (const struct StrataEstimator *se1, | |||
125 | * @param key key to add | 155 | * @param key key to add |
126 | */ | 156 | */ |
127 | void | 157 | void |
128 | strata_estimator_insert (struct StrataEstimator *se, | 158 | strata_estimator_insert (struct MultiStrataEstimator *se, |
129 | struct IBF_Key key); | 159 | struct IBF_Key key); |
130 | 160 | ||
131 | 161 | ||
@@ -136,7 +166,7 @@ strata_estimator_insert (struct StrataEstimator *se, | |||
136 | * @param key key to remove | 166 | * @param key key to remove |
137 | */ | 167 | */ |
138 | void | 168 | void |
139 | strata_estimator_remove (struct StrataEstimator *se, | 169 | strata_estimator_remove (struct MultiStrataEstimator *se, |
140 | struct IBF_Key key); | 170 | struct IBF_Key key); |
141 | 171 | ||
142 | 172 | ||
@@ -146,7 +176,7 @@ strata_estimator_remove (struct StrataEstimator *se, | |||
146 | * @param se strata estimator to destroy. | 176 | * @param se strata estimator to destroy. |
147 | */ | 177 | */ |
148 | void | 178 | void |
149 | strata_estimator_destroy (struct StrataEstimator *se); | 179 | strata_estimator_destroy (struct MultiStrataEstimator *se); |
150 | 180 | ||
151 | 181 | ||
152 | /** | 182 | /** |
@@ -155,8 +185,8 @@ strata_estimator_destroy (struct StrataEstimator *se); | |||
155 | * @param se the strata estimator to copy | 185 | * @param se the strata estimator to copy |
156 | * @return the copy | 186 | * @return the copy |
157 | */ | 187 | */ |
158 | struct StrataEstimator * | 188 | struct MultiStrataEstimator * |
159 | strata_estimator_dup (struct StrataEstimator *se); | 189 | strata_estimator_dup (struct MultiStrataEstimator *se); |
160 | 190 | ||
161 | 191 | ||
162 | #if 0 /* keep Emacsens' auto-indent happy */ | 192 | #if 0 /* keep Emacsens' auto-indent happy */ |
diff --git a/src/setu/ibf.c b/src/setu/ibf.c index 1beba9065..8f29adb62 100644 --- a/src/setu/ibf.c +++ b/src/setu/ibf.c | |||
@@ -20,11 +20,15 @@ | |||
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file set/ibf.c | 22 | * @file set/ibf.c |
23 | * @brief implementation of the invertible Bloom filter | 23 | * @brief implementation of the invertible bloom filter |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | 27 | ||
27 | #include "ibf.h" | 28 | #include "ibf.h" |
29 | #include "gnunet_util_lib.h" | ||
30 | #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__) | ||
31 | |||
28 | 32 | ||
29 | /** | 33 | /** |
30 | * Compute the key's hash from the key. | 34 | * Compute the key's hash from the key. |
@@ -58,11 +62,12 @@ ibf_hashcode_from_key (struct IBF_Key key, | |||
58 | struct GNUNET_HashCode *dst) | 62 | struct GNUNET_HashCode *dst) |
59 | { | 63 | { |
60 | struct IBF_Key *p; | 64 | struct IBF_Key *p; |
65 | unsigned int i; | ||
61 | const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode) | 66 | const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode) |
62 | / sizeof(struct IBF_Key); | 67 | / sizeof(struct IBF_Key); |
63 | 68 | ||
64 | p = (struct IBF_Key *) dst; | 69 | p = (struct IBF_Key *) dst; |
65 | for (unsigned int i = 0; i < keys_per_hashcode; i++) | 70 | for (i = 0; i < keys_per_hashcode; i++) |
66 | *p++ = key; | 71 | *p++ = key; |
67 | } | 72 | } |
68 | 73 | ||
@@ -75,14 +80,14 @@ ibf_hashcode_from_key (struct IBF_Key key, | |||
75 | * @return the newly created invertible bloom filter, NULL on error | 80 | * @return the newly created invertible bloom filter, NULL on error |
76 | */ | 81 | */ |
77 | struct InvertibleBloomFilter * | 82 | struct InvertibleBloomFilter * |
78 | ibf_create (uint32_t size, | 83 | ibf_create (uint32_t size, uint8_t hash_num) |
79 | uint8_t hash_num) | ||
80 | { | 84 | { |
81 | struct InvertibleBloomFilter *ibf; | 85 | struct InvertibleBloomFilter *ibf; |
82 | 86 | ||
83 | GNUNET_assert (0 != size); | 87 | GNUNET_assert (0 != size); |
88 | |||
84 | ibf = GNUNET_new (struct InvertibleBloomFilter); | 89 | ibf = GNUNET_new (struct InvertibleBloomFilter); |
85 | ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t)); | 90 | ibf->count = GNUNET_malloc_large (size * sizeof(uint64_t)); |
86 | if (NULL == ibf->count) | 91 | if (NULL == ibf->count) |
87 | { | 92 | { |
88 | GNUNET_free (ibf); | 93 | GNUNET_free (ibf); |
@@ -105,6 +110,7 @@ ibf_create (uint32_t size, | |||
105 | } | 110 | } |
106 | ibf->size = size; | 111 | ibf->size = size; |
107 | ibf->hash_num = hash_num; | 112 | ibf->hash_num = hash_num; |
113 | |||
108 | return ibf; | 114 | return ibf; |
109 | } | 115 | } |
110 | 116 | ||
@@ -121,8 +127,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf, | |||
121 | uint32_t i; | 127 | uint32_t i; |
122 | uint32_t bucket; | 128 | uint32_t bucket; |
123 | 129 | ||
124 | bucket = GNUNET_CRYPTO_crc32_n (&key, | 130 | bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key); |
125 | sizeof (key)); | ||
126 | for (i = 0, filled = 0; filled < ibf->hash_num; i++) | 131 | for (i = 0, filled = 0; filled < ibf->hash_num; i++) |
127 | { | 132 | { |
128 | uint64_t x; | 133 | uint64_t x; |
@@ -133,8 +138,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf, | |||
133 | dst[filled++] = bucket % ibf->size; | 138 | dst[filled++] = bucket % ibf->size; |
134 | try_next: | 139 | try_next: |
135 | x = ((uint64_t) bucket << 32) | i; | 140 | x = ((uint64_t) bucket << 32) | i; |
136 | bucket = GNUNET_CRYPTO_crc32_n (&x, | 141 | bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x); |
137 | sizeof (x)); | ||
138 | } | 142 | } |
139 | } | 143 | } |
140 | 144 | ||
@@ -170,13 +174,8 @@ ibf_insert (struct InvertibleBloomFilter *ibf, | |||
170 | int buckets[ibf->hash_num]; | 174 | int buckets[ibf->hash_num]; |
171 | 175 | ||
172 | GNUNET_assert (ibf->hash_num <= ibf->size); | 176 | GNUNET_assert (ibf->hash_num <= ibf->size); |
173 | ibf_get_indices (ibf, | 177 | ibf_get_indices (ibf, key, buckets); |
174 | key, | 178 | ibf_insert_into (ibf, key, buckets, 1); |
175 | buckets); | ||
176 | ibf_insert_into (ibf, | ||
177 | key, | ||
178 | buckets, | ||
179 | 1); | ||
180 | } | 179 | } |
181 | 180 | ||
182 | 181 | ||
@@ -193,13 +192,8 @@ ibf_remove (struct InvertibleBloomFilter *ibf, | |||
193 | int buckets[ibf->hash_num]; | 192 | int buckets[ibf->hash_num]; |
194 | 193 | ||
195 | GNUNET_assert (ibf->hash_num <= ibf->size); | 194 | GNUNET_assert (ibf->hash_num <= ibf->size); |
196 | ibf_get_indices (ibf, | 195 | ibf_get_indices (ibf, key, buckets); |
197 | key, | 196 | ibf_insert_into (ibf, key, buckets, -1); |
198 | buckets); | ||
199 | ibf_insert_into (ibf, | ||
200 | key, | ||
201 | buckets, | ||
202 | -1); | ||
203 | } | 197 | } |
204 | 198 | ||
205 | 199 | ||
@@ -244,6 +238,8 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
244 | 238 | ||
245 | for (uint32_t i = 0; i < ibf->size; i++) | 239 | for (uint32_t i = 0; i < ibf->size; i++) |
246 | { | 240 | { |
241 | int hit; | ||
242 | |||
247 | /* we can only decode from pure buckets */ | 243 | /* we can only decode from pure buckets */ |
248 | if ( (1 != ibf->count[i].count_val) && | 244 | if ( (1 != ibf->count[i].count_val) && |
249 | (-1 != ibf->count[i].count_val) ) | 245 | (-1 != ibf->count[i].count_val) ) |
@@ -257,30 +253,33 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
257 | 253 | ||
258 | /* test if key in bucket hits its own location, | 254 | /* test if key in bucket hits its own location, |
259 | * if not, the key hash was subject to collision */ | 255 | * if not, the key hash was subject to collision */ |
260 | { | 256 | hit = GNUNET_NO; |
261 | bool hit = false; | 257 | ibf_get_indices (ibf, ibf->key_sum[i], buckets); |
258 | for (int j = 0; j < ibf->hash_num; j++) | ||
259 | if (buckets[j] == i) | ||
260 | hit = GNUNET_YES; | ||
262 | 261 | ||
263 | ibf_get_indices (ibf, | 262 | if (GNUNET_NO == hit) |
264 | ibf->key_sum[i], | 263 | continue; |
265 | buckets); | 264 | |
266 | for (int j = 0; j < ibf->hash_num; j++) | 265 | if (1 == ibf->count[i].count_val) |
267 | if (buckets[j] == i) | 266 | { |
268 | { | 267 | ibf->remote_decoded_count++; |
269 | hit = true; | ||
270 | break; | ||
271 | } | ||
272 | if (! hit) | ||
273 | continue; | ||
274 | } | 268 | } |
269 | else | ||
270 | { | ||
271 | ibf->local_decoded_count++; | ||
272 | } | ||
273 | |||
274 | |||
275 | if (NULL != ret_side) | 275 | if (NULL != ret_side) |
276 | *ret_side = ibf->count[i].count_val; | 276 | *ret_side = ibf->count[i].count_val; |
277 | if (NULL != ret_id) | 277 | if (NULL != ret_id) |
278 | *ret_id = ibf->key_sum[i]; | 278 | *ret_id = ibf->key_sum[i]; |
279 | 279 | ||
280 | /* insert on the opposite side, effectively removing the element */ | 280 | /* insert on the opposite side, effectively removing the element */ |
281 | ibf_insert_into (ibf, | 281 | ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val); |
282 | ibf->key_sum[i], buckets, | 282 | |
283 | -ibf->count[i].count_val); | ||
284 | return GNUNET_YES; | 283 | return GNUNET_YES; |
285 | } | 284 | } |
286 | 285 | ||
@@ -291,6 +290,26 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
291 | 290 | ||
292 | 291 | ||
293 | /** | 292 | /** |
293 | * Returns the minimal bytes needed to store the counter of the IBF | ||
294 | * | ||
295 | * @param ibf the IBF | ||
296 | */ | ||
297 | uint8_t | ||
298 | ibf_get_max_counter (struct InvertibleBloomFilter *ibf) | ||
299 | { | ||
300 | long long max_counter = 0; | ||
301 | for (uint64_t i = 0; i < ibf->size; i++) | ||
302 | { | ||
303 | if (ibf->count[i].count_val > max_counter) | ||
304 | { | ||
305 | max_counter = ibf->count[i].count_val; | ||
306 | } | ||
307 | } | ||
308 | return 64 - __builtin_clzll (max_counter); | ||
309 | } | ||
310 | |||
311 | |||
312 | /** | ||
294 | * Write buckets from an ibf to a buffer. | 313 | * Write buckets from an ibf to a buffer. |
295 | * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. | 314 | * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. |
296 | * | 315 | * |
@@ -298,16 +317,17 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
298 | * @param start with which bucket to start | 317 | * @param start with which bucket to start |
299 | * @param count how many buckets to write | 318 | * @param count how many buckets to write |
300 | * @param buf buffer to write the data to | 319 | * @param buf buffer to write the data to |
320 | * @param max bit length of a counter for unpacking | ||
301 | */ | 321 | */ |
302 | void | 322 | void |
303 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, | 323 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, |
304 | uint32_t start, | 324 | uint32_t start, |
305 | uint32_t count, | 325 | uint64_t count, |
306 | void *buf) | 326 | void *buf, |
327 | uint8_t counter_max_length) | ||
307 | { | 328 | { |
308 | struct IBF_Key *key_dst; | 329 | struct IBF_Key *key_dst; |
309 | struct IBF_KeyHash *key_hash_dst; | 330 | struct IBF_KeyHash *key_hash_dst; |
310 | struct IBF_Count *count_dst; | ||
311 | 331 | ||
312 | GNUNET_assert (start + count <= ibf->size); | 332 | GNUNET_assert (start + count <= ibf->size); |
313 | 333 | ||
@@ -315,19 +335,182 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, | |||
315 | key_dst = (struct IBF_Key *) buf; | 335 | key_dst = (struct IBF_Key *) buf; |
316 | GNUNET_memcpy (key_dst, | 336 | GNUNET_memcpy (key_dst, |
317 | ibf->key_sum + start, | 337 | ibf->key_sum + start, |
318 | count * sizeof *key_dst); | 338 | count * sizeof(*key_dst)); |
319 | key_dst += count; | 339 | key_dst += count; |
320 | /* copy key hashes */ | 340 | /* copy key hashes */ |
321 | key_hash_dst = (struct IBF_KeyHash *) key_dst; | 341 | key_hash_dst = (struct IBF_KeyHash *) key_dst; |
322 | GNUNET_memcpy (key_hash_dst, | 342 | GNUNET_memcpy (key_hash_dst, |
323 | ibf->key_hash_sum + start, | 343 | ibf->key_hash_sum + start, |
324 | count * sizeof *key_hash_dst); | 344 | count * sizeof(*key_hash_dst)); |
325 | key_hash_dst += count; | 345 | key_hash_dst += count; |
326 | /* copy counts */ | 346 | |
327 | count_dst = (struct IBF_Count *) key_hash_dst; | 347 | /* pack and copy counter */ |
328 | GNUNET_memcpy (count_dst, | 348 | pack_counter (ibf, |
329 | ibf->count + start, | 349 | start, |
330 | count * sizeof *count_dst); | 350 | count, |
351 | (uint8_t *) key_hash_dst, | ||
352 | counter_max_length); | ||
353 | |||
354 | |||
355 | } | ||
356 | |||
357 | |||
358 | /** | ||
359 | * Packs the counter to transmit only the smallest possible amount of bytes and | ||
360 | * preventing overflow of the counter | ||
361 | * @param ibf the ibf to write | ||
362 | * @param start with which bucket to start | ||
363 | * @param count how many buckets to write | ||
364 | * @param buf buffer to write the data to | ||
365 | * @param max bit length of a counter for unpacking | ||
366 | */ | ||
367 | |||
368 | void | ||
369 | pack_counter (const struct InvertibleBloomFilter *ibf, | ||
370 | uint32_t start, | ||
371 | uint64_t count, | ||
372 | uint8_t *buf, | ||
373 | uint8_t counter_max_length) | ||
374 | { | ||
375 | uint8_t store_size = 0; | ||
376 | uint8_t store = 0; | ||
377 | uint16_t byte_ctr = 0; | ||
378 | |||
379 | /** | ||
380 | * Iterate over IBF bucket | ||
381 | */ | ||
382 | for (uint64_t i = start; i< (count + start);) | ||
383 | { | ||
384 | uint64_t count_val_to_write = ibf->count[i].count_val; | ||
385 | uint8_t count_len_to_write = counter_max_length; | ||
386 | |||
387 | /** | ||
388 | * Pack and compose counters to byte values | ||
389 | */ | ||
390 | while ((count_len_to_write + store_size) >= 8) | ||
391 | { | ||
392 | uint8_t bit_shift = 0; | ||
393 | |||
394 | /** | ||
395 | * Shift bits if more than a byte has to be written | ||
396 | * or the store size is not empty | ||
397 | */ | ||
398 | if ((store_size > 0) || (count_len_to_write > 8)) | ||
399 | { | ||
400 | uint8_t bit_unused = 8 - store_size; | ||
401 | bit_shift = count_len_to_write - bit_unused; | ||
402 | store = store << bit_unused; | ||
403 | } | ||
404 | |||
405 | buf[byte_ctr] = ((count_val_to_write >> bit_shift) | store) & 0xFF; | ||
406 | byte_ctr++; | ||
407 | count_len_to_write -= (8 - store_size); | ||
408 | count_val_to_write = count_val_to_write & ((1ULL << | ||
409 | count_len_to_write) - 1); | ||
410 | store = 0; | ||
411 | store_size = 0; | ||
412 | } | ||
413 | store = (store << count_len_to_write) | count_val_to_write; | ||
414 | store_size = store_size + count_len_to_write; | ||
415 | count_len_to_write = 0; | ||
416 | i++; | ||
417 | } | ||
418 | |||
419 | /** | ||
420 | * Pack data left in story before finishing | ||
421 | */ | ||
422 | if (store_size > 0) | ||
423 | { | ||
424 | buf[byte_ctr] = store << (8 - store_size); | ||
425 | byte_ctr++; | ||
426 | } | ||
427 | |||
428 | } | ||
429 | |||
430 | |||
431 | /** | ||
432 | * Unpacks the counter to transmit only the smallest possible amount of bytes and | ||
433 | * preventing overflow of the counter | ||
434 | * @param ibf the ibf to write | ||
435 | * @param start with which bucket to start | ||
436 | * @param count how many buckets to write | ||
437 | * @param buf buffer to write the data to | ||
438 | * @param max bit length of a counter for unpacking | ||
439 | */ | ||
440 | |||
441 | void | ||
442 | unpack_counter (const struct InvertibleBloomFilter *ibf, | ||
443 | uint32_t start, | ||
444 | uint64_t count, | ||
445 | uint8_t *buf, | ||
446 | uint8_t counter_max_length) | ||
447 | { | ||
448 | uint64_t ibf_counter_ctr = 0; | ||
449 | uint64_t store = 0; | ||
450 | uint64_t store_bit_ctr = 0; | ||
451 | uint64_t byte_ctr = 0; | ||
452 | |||
453 | /** | ||
454 | * Iterate over received bytes | ||
455 | */ | ||
456 | while (true) | ||
457 | { | ||
458 | uint8_t byte_read = buf[byte_ctr]; | ||
459 | uint8_t bit_to_read_left = 8; | ||
460 | byte_ctr++; | ||
461 | |||
462 | /** | ||
463 | * Pack data left in story before finishing | ||
464 | */ | ||
465 | while (bit_to_read_left >= 0) | ||
466 | { | ||
467 | /** | ||
468 | * Stop decoding when end is reached | ||
469 | */ | ||
470 | if (ibf_counter_ctr > (count - 1)) | ||
471 | return; | ||
472 | |||
473 | /* | ||
474 | * Unpack the counter | ||
475 | */ | ||
476 | if ((store_bit_ctr + bit_to_read_left) >= counter_max_length) | ||
477 | { | ||
478 | uint8_t bytes_used = counter_max_length - store_bit_ctr; | ||
479 | if (store_bit_ctr > 0) | ||
480 | { | ||
481 | store = store << bytes_used; | ||
482 | } | ||
483 | |||
484 | uint8_t bytes_to_shift = bit_to_read_left - bytes_used; | ||
485 | uint64_t counter_part = byte_read >> bytes_to_shift; | ||
486 | store = store | counter_part; | ||
487 | ibf->count[ibf_counter_ctr + start].count_val = store; | ||
488 | byte_read = byte_read & ((1 << bytes_to_shift) - 1); | ||
489 | bit_to_read_left -= bytes_used; | ||
490 | ibf_counter_ctr++; | ||
491 | store = 0; | ||
492 | store_bit_ctr = 0; | ||
493 | } | ||
494 | else | ||
495 | { | ||
496 | store_bit_ctr += bit_to_read_left; | ||
497 | if (0 == store) | ||
498 | { | ||
499 | store = byte_read; | ||
500 | } | ||
501 | else | ||
502 | { | ||
503 | store = store << bit_to_read_left; | ||
504 | store = store | byte_read; | ||
505 | } | ||
506 | break; | ||
507 | |||
508 | } | ||
509 | |||
510 | } | ||
511 | |||
512 | } | ||
513 | |||
331 | } | 514 | } |
332 | 515 | ||
333 | 516 | ||
@@ -338,12 +521,14 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, | |||
338 | * @param start which bucket to start at | 521 | * @param start which bucket to start at |
339 | * @param count how many buckets to read | 522 | * @param count how many buckets to read |
340 | * @param ibf the ibf to read from | 523 | * @param ibf the ibf to read from |
524 | * @param max bit length of a counter for unpacking | ||
341 | */ | 525 | */ |
342 | void | 526 | void |
343 | ibf_read_slice (const void *buf, | 527 | ibf_read_slice (const void *buf, |
344 | uint32_t start, | 528 | uint32_t start, |
345 | uint32_t count, | 529 | uint64_t count, |
346 | struct InvertibleBloomFilter *ibf) | 530 | struct InvertibleBloomFilter *ibf, |
531 | uint8_t counter_max_length) | ||
347 | { | 532 | { |
348 | struct IBF_Key *key_src; | 533 | struct IBF_Key *key_src; |
349 | struct IBF_KeyHash *key_hash_src; | 534 | struct IBF_KeyHash *key_hash_src; |
@@ -364,11 +549,10 @@ ibf_read_slice (const void *buf, | |||
364 | key_hash_src, | 549 | key_hash_src, |
365 | count * sizeof *key_hash_src); | 550 | count * sizeof *key_hash_src); |
366 | key_hash_src += count; | 551 | key_hash_src += count; |
367 | /* copy counts */ | 552 | |
553 | /* copy and unpack counts */ | ||
368 | count_src = (struct IBF_Count *) key_hash_src; | 554 | count_src = (struct IBF_Count *) key_hash_src; |
369 | GNUNET_memcpy (ibf->count + start, | 555 | unpack_counter (ibf,start,count,(uint8_t *) count_src,counter_max_length); |
370 | count_src, | ||
371 | count * sizeof *count_src); | ||
372 | } | 556 | } |
373 | 557 | ||
374 | 558 | ||
diff --git a/src/setu/ibf.h b/src/setu/ibf.h index 7c2ab33b1..5628405dc 100644 --- a/src/setu/ibf.h +++ b/src/setu/ibf.h | |||
@@ -22,6 +22,7 @@ | |||
22 | * @file set/ibf.h | 22 | * @file set/ibf.h |
23 | * @brief invertible bloom filter | 23 | * @brief invertible bloom filter |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | 27 | ||
27 | #ifndef GNUNET_CONSENSUS_IBF_H | 28 | #ifndef GNUNET_CONSENSUS_IBF_H |
@@ -62,7 +63,7 @@ struct IBF_KeyHash | |||
62 | */ | 63 | */ |
63 | struct IBF_Count | 64 | struct IBF_Count |
64 | { | 65 | { |
65 | int8_t count_val; | 66 | int64_t count_val; |
66 | }; | 67 | }; |
67 | 68 | ||
68 | 69 | ||
@@ -93,6 +94,20 @@ struct InvertibleBloomFilter | |||
93 | uint8_t hash_num; | 94 | uint8_t hash_num; |
94 | 95 | ||
95 | /** | 96 | /** |
97 | * If an IBF is decoded this count stores how many | ||
98 | * elements are on the local site. This is used | ||
99 | * to estimate the set difference on a site | ||
100 | */ | ||
101 | int local_decoded_count; | ||
102 | |||
103 | /** | ||
104 | * If an IBF is decoded this count stores how many | ||
105 | * elements are on the remote site. This is used | ||
106 | * to estimate the set difference on a site | ||
107 | */ | ||
108 | int remote_decoded_count; | ||
109 | |||
110 | /** | ||
96 | * Xor sums of the elements' keys, used to identify the elements. | 111 | * Xor sums of the elements' keys, used to identify the elements. |
97 | * Array of 'size' elements. | 112 | * Array of 'size' elements. |
98 | */ | 113 | */ |
@@ -125,8 +140,9 @@ struct InvertibleBloomFilter | |||
125 | void | 140 | void |
126 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, | 141 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, |
127 | uint32_t start, | 142 | uint32_t start, |
128 | uint32_t count, | 143 | uint64_t count, |
129 | void *buf); | 144 | void *buf, |
145 | uint8_t counter_max_length); | ||
130 | 146 | ||
131 | 147 | ||
132 | /** | 148 | /** |
@@ -140,8 +156,9 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, | |||
140 | void | 156 | void |
141 | ibf_read_slice (const void *buf, | 157 | ibf_read_slice (const void *buf, |
142 | uint32_t start, | 158 | uint32_t start, |
143 | uint32_t count, | 159 | uint64_t count, |
144 | struct InvertibleBloomFilter *ibf); | 160 | struct InvertibleBloomFilter *ibf, |
161 | uint8_t counter_max_length); | ||
145 | 162 | ||
146 | 163 | ||
147 | /** | 164 | /** |
@@ -244,6 +261,44 @@ ibf_dup (const struct InvertibleBloomFilter *ibf); | |||
244 | void | 261 | void |
245 | ibf_destroy (struct InvertibleBloomFilter *ibf); | 262 | ibf_destroy (struct InvertibleBloomFilter *ibf); |
246 | 263 | ||
264 | uint8_t | ||
265 | ibf_get_max_counter (struct InvertibleBloomFilter *ibf); | ||
266 | |||
267 | |||
268 | /** | ||
269 | * Packs the counter to transmit only the smallest possible amount of bytes and | ||
270 | * preventing overflow of the counter | ||
271 | * @param ibf the ibf to write | ||
272 | * @param start with which bucket to start | ||
273 | * @param count how many buckets to write | ||
274 | * @param buf buffer to write the data to | ||
275 | * @param max bit length of a counter for unpacking | ||
276 | */ | ||
277 | |||
278 | void | ||
279 | pack_counter (const struct InvertibleBloomFilter *ibf, | ||
280 | uint32_t start, | ||
281 | uint64_t count, | ||
282 | uint8_t *buf, | ||
283 | uint8_t counter_max_length); | ||
284 | |||
285 | /** | ||
286 | * Unpacks the counter to transmit only the smallest possible amount of bytes and | ||
287 | * preventing overflow of the counter | ||
288 | * @param ibf the ibf to write | ||
289 | * @param start with which bucket to start | ||
290 | * @param count how many buckets to write | ||
291 | * @param buf buffer to write the data to | ||
292 | * @param max bit length of a counter for unpacking | ||
293 | */ | ||
294 | |||
295 | void | ||
296 | unpack_counter (const struct InvertibleBloomFilter *ibf, | ||
297 | uint32_t start, | ||
298 | uint64_t count, | ||
299 | uint8_t *buf, | ||
300 | uint8_t counter_max_length); | ||
301 | |||
247 | 302 | ||
248 | #if 0 /* keep Emacsens' auto-indent happy */ | 303 | #if 0 /* keep Emacsens' auto-indent happy */ |
249 | { | 304 | { |
diff --git a/src/setu/perf_setu_api.c b/src/setu/perf_setu_api.c index b273f9c71..af84994f8 100644 --- a/src/setu/perf_setu_api.c +++ b/src/setu/perf_setu_api.c | |||
@@ -22,11 +22,14 @@ | |||
22 | * @file set/test_setu_api.c | 22 | * @file set/test_setu_api.c |
23 | * @brief testcase for setu_api.c | 23 | * @brief testcase for setu_api.c |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | #include "platform.h" | 27 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
28 | #include "gnunet_testing_lib.h" | 29 | #include "gnunet_testing_lib.h" |
29 | #include "gnunet_setu_service.h" | 30 | #include "gnunet_setu_service.h" |
31 | #include <sys/sysinfo.h> | ||
32 | #include <pthread.h> | ||
30 | 33 | ||
31 | 34 | ||
32 | static struct GNUNET_PeerIdentity local_id; | 35 | static struct GNUNET_PeerIdentity local_id; |
@@ -50,6 +53,12 @@ static int ret; | |||
50 | static struct GNUNET_SCHEDULER_Task *tt; | 53 | static struct GNUNET_SCHEDULER_Task *tt; |
51 | 54 | ||
52 | 55 | ||
56 | /** | ||
57 | * Handles configuration file for setu performance test | ||
58 | * | ||
59 | */ | ||
60 | static struct GNUNET_CONFIGURATION_Handle *setu_cfg; | ||
61 | |||
53 | 62 | ||
54 | static void | 63 | static void |
55 | result_cb_set1 (void *cls, | 64 | result_cb_set1 (void *cls, |
@@ -57,44 +66,44 @@ result_cb_set1 (void *cls, | |||
57 | uint64_t size, | 66 | uint64_t size, |
58 | enum GNUNET_SETU_Status status) | 67 | enum GNUNET_SETU_Status status) |
59 | { | 68 | { |
60 | switch (status) | 69 | switch (status) |
70 | { | ||
71 | case GNUNET_SETU_STATUS_ADD_LOCAL: | ||
72 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n"); | ||
73 | break; | ||
74 | |||
75 | case GNUNET_SETU_STATUS_FAILURE: | ||
76 | GNUNET_break (0); | ||
77 | oh1 = NULL; | ||
78 | fprintf (stderr, "set 1: received failure status!\n"); | ||
79 | ret = 1; | ||
80 | if (NULL != tt) | ||
81 | { | ||
82 | GNUNET_SCHEDULER_cancel (tt); | ||
83 | tt = NULL; | ||
84 | } | ||
85 | GNUNET_SCHEDULER_shutdown (); | ||
86 | break; | ||
87 | |||
88 | case GNUNET_SETU_STATUS_DONE: | ||
89 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n"); | ||
90 | oh1 = NULL; | ||
91 | if (NULL != set1) | ||
61 | { | 92 | { |
62 | case GNUNET_SETU_STATUS_ADD_LOCAL: | 93 | GNUNET_SETU_destroy (set1); |
63 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n"); | 94 | set1 = NULL; |
64 | break; | ||
65 | |||
66 | case GNUNET_SETU_STATUS_FAILURE: | ||
67 | GNUNET_break (0); | ||
68 | oh1 = NULL; | ||
69 | fprintf (stderr, "set 1: received failure status!\n"); | ||
70 | ret = 1; | ||
71 | if (NULL != tt) | ||
72 | { | ||
73 | GNUNET_SCHEDULER_cancel (tt); | ||
74 | tt = NULL; | ||
75 | } | ||
76 | GNUNET_SCHEDULER_shutdown (); | ||
77 | break; | ||
78 | |||
79 | case GNUNET_SETU_STATUS_DONE: | ||
80 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n"); | ||
81 | oh1 = NULL; | ||
82 | if (NULL != set1) | ||
83 | { | ||
84 | GNUNET_SETU_destroy (set1); | ||
85 | set1 = NULL; | ||
86 | } | ||
87 | if (NULL == set2) | ||
88 | { | ||
89 | GNUNET_SCHEDULER_cancel (tt); | ||
90 | tt = NULL; | ||
91 | GNUNET_SCHEDULER_shutdown (); | ||
92 | } | ||
93 | break; | ||
94 | |||
95 | default: | ||
96 | GNUNET_assert (0); | ||
97 | } | 95 | } |
96 | if (NULL == set2) | ||
97 | { | ||
98 | GNUNET_SCHEDULER_cancel (tt); | ||
99 | tt = NULL; | ||
100 | GNUNET_SCHEDULER_shutdown (); | ||
101 | } | ||
102 | break; | ||
103 | |||
104 | default: | ||
105 | GNUNET_assert (0); | ||
106 | } | ||
98 | } | 107 | } |
99 | 108 | ||
100 | 109 | ||
@@ -104,36 +113,36 @@ result_cb_set2 (void *cls, | |||
104 | uint64_t size, | 113 | uint64_t size, |
105 | enum GNUNET_SETU_Status status) | 114 | enum GNUNET_SETU_Status status) |
106 | { | 115 | { |
107 | switch (status) | 116 | switch (status) |
117 | { | ||
118 | case GNUNET_SETU_STATUS_ADD_LOCAL: | ||
119 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n"); | ||
120 | break; | ||
121 | |||
122 | case GNUNET_SETU_STATUS_FAILURE: | ||
123 | GNUNET_break (0); | ||
124 | oh2 = NULL; | ||
125 | fprintf (stderr, "set 2: received failure status\n"); | ||
126 | GNUNET_SCHEDULER_shutdown (); | ||
127 | ret = 1; | ||
128 | break; | ||
129 | |||
130 | case GNUNET_SETU_STATUS_DONE: | ||
131 | oh2 = NULL; | ||
132 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n"); | ||
133 | GNUNET_SETU_destroy (set2); | ||
134 | set2 = NULL; | ||
135 | if (NULL == set1) | ||
108 | { | 136 | { |
109 | case GNUNET_SETU_STATUS_ADD_LOCAL: | 137 | GNUNET_SCHEDULER_cancel (tt); |
110 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n"); | 138 | tt = NULL; |
111 | break; | 139 | GNUNET_SCHEDULER_shutdown (); |
112 | |||
113 | case GNUNET_SETU_STATUS_FAILURE: | ||
114 | GNUNET_break (0); | ||
115 | oh2 = NULL; | ||
116 | fprintf (stderr, "set 2: received failure status\n"); | ||
117 | GNUNET_SCHEDULER_shutdown (); | ||
118 | ret = 1; | ||
119 | break; | ||
120 | |||
121 | case GNUNET_SETU_STATUS_DONE: | ||
122 | oh2 = NULL; | ||
123 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n"); | ||
124 | GNUNET_SETU_destroy (set2); | ||
125 | set2 = NULL; | ||
126 | if (NULL == set1) | ||
127 | { | ||
128 | GNUNET_SCHEDULER_cancel (tt); | ||
129 | tt = NULL; | ||
130 | GNUNET_SCHEDULER_shutdown (); | ||
131 | } | ||
132 | break; | ||
133 | |||
134 | default: | ||
135 | GNUNET_assert (0); | ||
136 | } | 140 | } |
141 | break; | ||
142 | |||
143 | default: | ||
144 | GNUNET_assert (0); | ||
145 | } | ||
137 | } | 146 | } |
138 | 147 | ||
139 | 148 | ||
@@ -143,14 +152,14 @@ listen_cb (void *cls, | |||
143 | const struct GNUNET_MessageHeader *context_msg, | 152 | const struct GNUNET_MessageHeader *context_msg, |
144 | struct GNUNET_SETU_Request *request) | 153 | struct GNUNET_SETU_Request *request) |
145 | { | 154 | { |
146 | GNUNET_assert (NULL != context_msg); | 155 | GNUNET_assert (NULL != context_msg); |
147 | GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); | 156 | GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); |
148 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n"); | 157 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n"); |
149 | oh2 = GNUNET_SETU_accept (request, | 158 | oh2 = GNUNET_SETU_accept (request, |
150 | (struct GNUNET_SETU_Option[]){ 0 }, | 159 | (struct GNUNET_SETU_Option[]){ 0 }, |
151 | &result_cb_set2, | 160 | &result_cb_set2, |
152 | NULL); | 161 | NULL); |
153 | GNUNET_SETU_commit (oh2, set2); | 162 | GNUNET_SETU_commit (oh2, set2); |
154 | } | 163 | } |
155 | 164 | ||
156 | 165 | ||
@@ -162,122 +171,89 @@ listen_cb (void *cls, | |||
162 | static void | 171 | static void |
163 | start (void *cls) | 172 | start (void *cls) |
164 | { | 173 | { |
165 | struct GNUNET_MessageHeader context_msg; | 174 | struct GNUNET_MessageHeader context_msg; |
166 | 175 | ||
167 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n"); | 176 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n"); |
168 | context_msg.size = htons (sizeof context_msg); | 177 | context_msg.size = htons (sizeof context_msg); |
169 | context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); | 178 | context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); |
170 | listen_handle = GNUNET_SETU_listen (config, | 179 | listen_handle = GNUNET_SETU_listen (config, |
171 | &app_id, | 180 | &app_id, |
172 | &listen_cb, | 181 | &listen_cb, |
173 | NULL); | 182 | NULL); |
174 | oh1 = GNUNET_SETU_prepare (&local_id, | 183 | oh1 = GNUNET_SETU_prepare (&local_id, |
175 | &app_id, | 184 | &app_id, |
176 | &context_msg, | 185 | &context_msg, |
177 | (struct GNUNET_SETU_Option[]){ 0 }, | 186 | (struct GNUNET_SETU_Option[]){ 0 }, |
178 | &result_cb_set1, | 187 | &result_cb_set1, |
179 | NULL); | 188 | NULL); |
180 | GNUNET_SETU_commit (oh1, set1); | 189 | GNUNET_SETU_commit (oh1, set1); |
181 | } | 190 | } |
182 | 191 | ||
183 | 192 | ||
184 | /** | 193 | /** |
185 | * Initialize the second set, continue | ||
186 | * | ||
187 | * @param cls closure, unused | ||
188 | */ | ||
189 | static void | ||
190 | init_set2 (void *cls) | ||
191 | { | ||
192 | struct GNUNET_SETU_Element element; | ||
193 | |||
194 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); | ||
195 | |||
196 | element.element_type = 0; | ||
197 | element.data = "hello1"; | ||
198 | element.size = strlen (element.data); | ||
199 | GNUNET_SETU_add_element (set2, &element, NULL, NULL); | ||
200 | element.data = "quux"; | ||
201 | element.size = strlen (element.data); | ||
202 | GNUNET_SETU_add_element (set2, &element, NULL, NULL); | ||
203 | element.data = "baz"; | ||
204 | element.size = strlen (element.data); | ||
205 | GNUNET_SETU_add_element (set2, &element, &start, NULL); | ||
206 | } | ||
207 | |||
208 | /** | ||
209 | * Generate random byte stream | 194 | * Generate random byte stream |
210 | */ | 195 | */ |
211 | 196 | ||
212 | unsigned char *gen_rdm_bytestream (size_t num_bytes) | 197 | unsigned char * |
198 | gen_rdm_bytestream (size_t num_bytes) | ||
213 | { | 199 | { |
214 | unsigned char *stream = GNUNET_malloc (num_bytes); | 200 | unsigned char *stream = GNUNET_malloc (num_bytes); |
215 | GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); | 201 | GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); |
216 | return stream; | 202 | return stream; |
217 | } | 203 | } |
218 | 204 | ||
205 | |||
219 | /** | 206 | /** |
220 | * Generate random sets | 207 | * Generate random sets |
221 | */ | 208 | */ |
222 | 209 | ||
223 | static void | 210 | static void |
224 | initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes) | 211 | initRandomSets (int overlap, int set1_size, int set2_size, int |
212 | element_size_in_bytes) | ||
225 | { | 213 | { |
226 | struct GNUNET_SETU_Element element; | 214 | struct GNUNET_SETU_Element element; |
227 | element.element_type = 0; | 215 | element.element_type = 0; |
228 | 216 | ||
229 | // Add elements to both sets | 217 | // Add elements to both sets |
230 | for (int i = 0; i < overlap; i++) { | 218 | for (int i = 0; i < overlap; i++) |
231 | element.data = gen_rdm_bytestream(element_size_in_bytes); | 219 | { |
232 | element.size = element_size_in_bytes; | 220 | element.data = gen_rdm_bytestream (element_size_in_bytes); |
233 | GNUNET_SETU_add_element (set1, &element, NULL, NULL); | 221 | element.size = element_size_in_bytes; |
234 | GNUNET_SETU_add_element (set2, &element, NULL, NULL); | 222 | GNUNET_SETU_add_element (set1, &element, NULL, NULL); |
235 | set1_size--; | 223 | GNUNET_SETU_add_element (set2, &element, NULL, NULL); |
236 | set2_size--; | 224 | set1_size--; |
237 | } | 225 | set2_size--; |
238 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); | 226 | } |
239 | 227 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); | |
240 | // Add other elements to set 1 | 228 | |
241 | while(set1_size>0) { | 229 | // Add other elements to set 1 |
242 | element.data = gen_rdm_bytestream(element_size_in_bytes); | 230 | while (set1_size>0) |
243 | element.size = element_size_in_bytes; | 231 | { |
244 | GNUNET_SETU_add_element (set1, &element, NULL, NULL); | 232 | element.data = gen_rdm_bytestream (element_size_in_bytes); |
245 | set1_size--; | 233 | element.size = element_size_in_bytes; |
246 | } | 234 | GNUNET_SETU_add_element (set1, &element, NULL, NULL); |
247 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); | 235 | set1_size--; |
248 | 236 | } | |
249 | // Add other elements to set 2 | 237 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); |
250 | while(set2_size > 0) { | ||
251 | element.data = gen_rdm_bytestream(element_size_in_bytes); | ||
252 | element.size = element_size_in_bytes; | ||
253 | 238 | ||
254 | if(set2_size != 1) { | 239 | // Add other elements to set 2 |
255 | GNUNET_SETU_add_element (set2, &element,NULL, NULL); | 240 | while (set2_size > 0) |
256 | } else { | 241 | { |
257 | GNUNET_SETU_add_element (set2, &element,&start, NULL); | 242 | element.data = gen_rdm_bytestream (element_size_in_bytes); |
258 | } | 243 | element.size = element_size_in_bytes; |
259 | 244 | ||
260 | set2_size--; | 245 | if (set2_size != 1) |
246 | { | ||
247 | GNUNET_SETU_add_element (set2, &element,NULL, NULL); | ||
248 | } | ||
249 | else | ||
250 | { | ||
251 | GNUNET_SETU_add_element (set2, &element,&start, NULL); | ||
261 | } | 252 | } |
262 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n"); | ||
263 | } | ||
264 | |||
265 | /** | ||
266 | * Initialize the first set, continue. | ||
267 | */ | ||
268 | static void | ||
269 | init_set1 (void) | ||
270 | { | ||
271 | struct GNUNET_SETU_Element element; | ||
272 | 253 | ||
273 | element.element_type = 0; | 254 | set2_size--; |
274 | element.data = "hello"; | 255 | } |
275 | element.size = strlen (element.data); | 256 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n"); |
276 | GNUNET_SETU_add_element (set1, &element, NULL, NULL); | ||
277 | element.data = "bar"; | ||
278 | element.size = strlen (element.data); | ||
279 | GNUNET_SETU_add_element (set1, &element, &init_set2, NULL); | ||
280 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); | ||
281 | } | 257 | } |
282 | 258 | ||
283 | 259 | ||
@@ -289,10 +265,10 @@ init_set1 (void) | |||
289 | static void | 265 | static void |
290 | timeout_fail (void *cls) | 266 | timeout_fail (void *cls) |
291 | { | 267 | { |
292 | tt = NULL; | 268 | tt = NULL; |
293 | GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n"); | 269 | GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n"); |
294 | GNUNET_SCHEDULER_shutdown (); | 270 | GNUNET_SCHEDULER_shutdown (); |
295 | ret = 1; | 271 | ret = 1; |
296 | } | 272 | } |
297 | 273 | ||
298 | 274 | ||
@@ -304,36 +280,36 @@ timeout_fail (void *cls) | |||
304 | static void | 280 | static void |
305 | do_shutdown (void *cls) | 281 | do_shutdown (void *cls) |
306 | { | 282 | { |
307 | if (NULL != tt) | 283 | if (NULL != tt) |
308 | { | 284 | { |
309 | GNUNET_SCHEDULER_cancel (tt); | 285 | GNUNET_SCHEDULER_cancel (tt); |
310 | tt = NULL; | 286 | tt = NULL; |
311 | } | 287 | } |
312 | if (NULL != oh1) | 288 | if (NULL != oh1) |
313 | { | 289 | { |
314 | GNUNET_SETU_operation_cancel (oh1); | 290 | GNUNET_SETU_operation_cancel (oh1); |
315 | oh1 = NULL; | 291 | oh1 = NULL; |
316 | } | 292 | } |
317 | if (NULL != oh2) | 293 | if (NULL != oh2) |
318 | { | 294 | { |
319 | GNUNET_SETU_operation_cancel (oh2); | 295 | GNUNET_SETU_operation_cancel (oh2); |
320 | oh2 = NULL; | 296 | oh2 = NULL; |
321 | } | 297 | } |
322 | if (NULL != set1) | 298 | if (NULL != set1) |
323 | { | 299 | { |
324 | GNUNET_SETU_destroy (set1); | 300 | GNUNET_SETU_destroy (set1); |
325 | set1 = NULL; | 301 | set1 = NULL; |
326 | } | 302 | } |
327 | if (NULL != set2) | 303 | if (NULL != set2) |
328 | { | 304 | { |
329 | GNUNET_SETU_destroy (set2); | 305 | GNUNET_SETU_destroy (set2); |
330 | set2 = NULL; | 306 | set2 = NULL; |
331 | } | 307 | } |
332 | if (NULL != listen_handle) | 308 | if (NULL != listen_handle) |
333 | { | 309 | { |
334 | GNUNET_SETU_listen_cancel (listen_handle); | 310 | GNUNET_SETU_listen_cancel (listen_handle); |
335 | listen_handle = NULL; | 311 | listen_handle = NULL; |
336 | } | 312 | } |
337 | } | 313 | } |
338 | 314 | ||
339 | 315 | ||
@@ -350,79 +326,148 @@ run (void *cls, | |||
350 | const struct GNUNET_CONFIGURATION_Handle *cfg, | 326 | const struct GNUNET_CONFIGURATION_Handle *cfg, |
351 | struct GNUNET_TESTING_Peer *peer) | 327 | struct GNUNET_TESTING_Peer *peer) |
352 | { | 328 | { |
353 | struct GNUNET_SETU_OperationHandle *my_oh; | 329 | struct GNUNET_SETU_OperationHandle *my_oh; |
354 | 330 | ||
355 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 331 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
356 | "Running preparatory tests\n"); | 332 | "Running preparatory tests\n"); |
357 | tt = GNUNET_SCHEDULER_add_delayed ( | 333 | tt = GNUNET_SCHEDULER_add_delayed ( |
358 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), | 334 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), |
359 | &timeout_fail, | 335 | &timeout_fail, |
360 | NULL); | 336 | NULL); |
361 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); | 337 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); |
362 | 338 | ||
363 | config = cfg; | 339 | config = cfg; |
364 | GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg, | 340 | GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg, |
365 | &local_id)); | 341 | &local_id)); |
366 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 342 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
367 | "my id (from CRYPTO): %s\n", | 343 | "my id (from CRYPTO): %s\n", |
368 | GNUNET_i2s (&local_id)); | 344 | GNUNET_i2s (&local_id)); |
369 | GNUNET_TESTING_peer_get_identity (peer, | 345 | GNUNET_TESTING_peer_get_identity (peer, |
370 | &local_id); | 346 | &local_id); |
371 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 347 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
372 | "my id (from TESTING): %s\n", | 348 | "my id (from TESTING): %s\n", |
373 | GNUNET_i2s (&local_id)); | 349 | GNUNET_i2s (&local_id)); |
374 | set1 = GNUNET_SETU_create (cfg); | 350 | set1 = GNUNET_SETU_create (cfg); |
375 | set2 = GNUNET_SETU_create (cfg); | 351 | set2 = GNUNET_SETU_create (cfg); |
376 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 352 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
377 | "Created sets %p and %p for union operation\n", | 353 | "Created sets %p and %p for union operation\n", |
378 | set1, | 354 | set1, |
379 | set2); | 355 | set2); |
380 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); | 356 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); |
381 | 357 | ||
382 | /* test if canceling an uncommitted request works! */ | 358 | /* test if canceling an uncommited request works! */ |
383 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 359 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
384 | "Launching and instantly stopping set operation\n"); | 360 | "Launching and instantly stopping set operation\n"); |
385 | my_oh = GNUNET_SETU_prepare (&local_id, | 361 | my_oh = GNUNET_SETU_prepare (&local_id, |
386 | &app_id, | 362 | &app_id, |
387 | NULL, | 363 | NULL, |
388 | (struct GNUNET_SETU_Option[]){ 0 }, | 364 | (struct GNUNET_SETU_Option[]){ 0 }, |
389 | NULL, | 365 | NULL, |
390 | NULL); | 366 | NULL); |
391 | GNUNET_SETU_operation_cancel (my_oh); | 367 | GNUNET_SETU_operation_cancel (my_oh); |
392 | 368 | ||
393 | /* test the real set reconciliation */ | 369 | /* test the real set reconciliation */ |
394 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 370 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
395 | "Running real set-reconciliation\n"); | 371 | "Running real set-reconciliation\n"); |
396 | //init_set1 (); | 372 | // init_set1 (); |
397 | // limit ~23800 element total | 373 | // limit ~23800 element total |
398 | initRandomSets(50,100,100,128); | 374 | initRandomSets (490, 500,500,32); |
399 | } | 375 | } |
400 | 376 | ||
401 | static void execute_perf() | 377 | |
378 | void | ||
379 | perf_thread () | ||
402 | { | 380 | { |
403 | for( int repeat_ctr = 0; repeat_ctr<1; repeat_ctr++ ) { | 381 | GNUNET_TESTING_service_run ("perf_setu_api", |
382 | "arm", | ||
383 | "test_setu.conf", | ||
384 | &run, | ||
385 | NULL); | ||
386 | |||
387 | } | ||
404 | 388 | ||
405 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
406 | "Executing perf round %d\n", repeat_ctr); | ||
407 | 389 | ||
408 | GNUNET_TESTING_service_run ("perf_setu_api", | 390 | static void |
409 | "arm", | 391 | run_petf_thread (int total_runs) |
410 | "test_setu.conf", | 392 | { |
411 | &run, | 393 | int core_count = get_nprocs_conf (); |
412 | NULL); | 394 | pid_t child_pid, wpid; |
395 | int status = 0; | ||
396 | |||
397 | // Father code (before child processes start) | ||
398 | for (int processed = 0; processed < total_runs;) | ||
399 | { | ||
400 | for (int id = 0; id < core_count; id++) | ||
401 | { | ||
402 | if (processed >= total_runs) | ||
403 | break; | ||
404 | |||
405 | if ((child_pid = fork ()) == 0) | ||
406 | { | ||
407 | perf_thread (); | ||
408 | exit (0); | ||
409 | } | ||
410 | processed += 1; | ||
413 | } | 411 | } |
414 | return 0; | 412 | while ((wpid = wait (&status)) > 0) |
413 | ; | ||
414 | |||
415 | } | ||
415 | } | 416 | } |
416 | 417 | ||
417 | 418 | ||
419 | static void | ||
420 | execute_perf () | ||
421 | { | ||
422 | |||
423 | /** | ||
424 | * Erase statfile | ||
425 | */ | ||
426 | remove ("perf_stats.csv"); | ||
427 | remove ("perf_failure_bucket_number_factor.csv"); | ||
428 | for (int out_out_ctr = 3; out_out_ctr <= 3; out_out_ctr++) | ||
429 | { | ||
430 | |||
431 | for (int out_ctr = 20; out_ctr <= 20; out_ctr++) | ||
432 | { | ||
433 | float base = 0.1; | ||
434 | float x = out_ctr * base; | ||
435 | char factor[10]; | ||
436 | char *buffer = gcvt (x, 4, factor); | ||
437 | setu_cfg = GNUNET_CONFIGURATION_create (); | ||
438 | GNUNET_CONFIGURATION_set_value_string (setu_cfg, "IBF", | ||
439 | "BUCKET_NUMBER_FACTOR", | ||
440 | buffer); // Factor default=4 | ||
441 | GNUNET_CONFIGURATION_set_value_number (setu_cfg, "IBF", | ||
442 | "NUMBER_PER_BUCKET", 3); // K default=4 | ||
443 | GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE", | ||
444 | "TRADEOFF", "2"); // default=0.25 | ||
445 | GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE", | ||
446 | "MAX_SET_DIFF_FACTOR_DIFFERENTIAL", | ||
447 | "20000"); // default=0.25 | ||
448 | GNUNET_CONFIGURATION_set_value_number (setu_cfg, "BOUNDARIES", | ||
449 | "UPPER_ELEMENT", 5000); | ||
450 | |||
451 | |||
452 | if (GNUNET_OK != GNUNET_CONFIGURATION_write (setu_cfg, "perf_setu.conf")) | ||
453 | GNUNET_log ( | ||
454 | GNUNET_ERROR_TYPE_ERROR, | ||
455 | _ ("Failed to write subsystem default identifier map'.\n")); | ||
456 | run_petf_thread (100); | ||
457 | } | ||
458 | |||
459 | } | ||
460 | return; | ||
461 | } | ||
462 | |||
418 | 463 | ||
419 | int | 464 | int |
420 | main (int argc, char **argv) | 465 | main (int argc, char **argv) |
421 | { | 466 | { |
422 | GNUNET_log_setup ("perf_setu_api", | ||
423 | "WARNING", | ||
424 | NULL); | ||
425 | 467 | ||
426 | execute_perf(); | 468 | GNUNET_log_setup ("perf_setu_api", |
427 | return 0; | 469 | "WARNING", |
470 | NULL); | ||
471 | execute_perf (); | ||
472 | return 0; | ||
428 | } | 473 | } |
diff --git a/src/setu/setu.h b/src/setu/setu.h index 7c2a98a02..7b606f12c 100644 --- a/src/setu/setu.h +++ b/src/setu/setu.h | |||
@@ -122,6 +122,31 @@ struct GNUNET_SETU_AcceptMessage | |||
122 | */ | 122 | */ |
123 | uint32_t byzantine_lower_bound; | 123 | uint32_t byzantine_lower_bound; |
124 | 124 | ||
125 | |||
126 | /** | ||
127 | * Upper bound for the set size, used only when | ||
128 | * byzantine mode is enabled. | ||
129 | */ | ||
130 | uint64_t byzantine_upper_bond; | ||
131 | |||
132 | /** | ||
133 | * Bandwidth latency tradeoff determines how much bytes a single RTT is | ||
134 | * worth, which is a performance setting | ||
135 | */ | ||
136 | uint64_t bandwidth_latency_tradeoff; | ||
137 | |||
138 | /** | ||
139 | * The factor determines the number of buckets an IBF has which is | ||
140 | * multiplied by the estimated setsize default: 2 | ||
141 | */ | ||
142 | uint64_t ibf_bucket_number_factor; | ||
143 | |||
144 | /** | ||
145 | * This setting determines to how many IBF buckets an single elements | ||
146 | * is mapped to. | ||
147 | */ | ||
148 | uint64_t ibf_number_of_buckets_per_element; | ||
149 | |||
125 | }; | 150 | }; |
126 | 151 | ||
127 | 152 | ||
@@ -226,6 +251,30 @@ struct GNUNET_SETU_EvaluateMessage | |||
226 | */ | 251 | */ |
227 | uint32_t byzantine_lower_bound; | 252 | uint32_t byzantine_lower_bound; |
228 | 253 | ||
254 | /** | ||
255 | * Upper bound for the set size, used only when | ||
256 | * byzantine mode is enabled. | ||
257 | */ | ||
258 | uint64_t byzantine_upper_bond; | ||
259 | |||
260 | /** | ||
261 | * Bandwidth latency tradeoff determines how much bytes a single RTT is | ||
262 | * worth, which is a performance setting | ||
263 | */ | ||
264 | uint64_t bandwidth_latency_tradeoff; | ||
265 | |||
266 | /** | ||
267 | * The factor determines the number of buckets an IBF has which is | ||
268 | * multiplied by the estimated setsize default: 2 | ||
269 | */ | ||
270 | uint64_t ibf_bucket_number_factor; | ||
271 | |||
272 | /** | ||
273 | * This setting determines to how many IBF buckets an single elements | ||
274 | * is mapped to. | ||
275 | */ | ||
276 | uint64_t ibf_number_of_buckets_per_element; | ||
277 | |||
229 | /* rest: context message, that is, application-specific | 278 | /* rest: context message, that is, application-specific |
230 | message to convince listener to pick up */ | 279 | message to convince listener to pick up */ |
231 | }; | 280 | }; |
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c index 0a09b18b2..faa57aaba 100644 --- a/src/setu/setu_api.c +++ b/src/setu/setu_api.c | |||
@@ -22,6 +22,7 @@ | |||
22 | * @brief api for the set union service | 22 | * @brief api for the set union service |
23 | * @author Florian Dold | 23 | * @author Florian Dold |
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * @author Elias Summermatter | ||
25 | */ | 26 | */ |
26 | #include "platform.h" | 27 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
@@ -526,6 +527,14 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer, | |||
526 | context_msg); | 527 | context_msg); |
527 | msg->app_id = *app_id; | 528 | msg->app_id = *app_id; |
528 | msg->target_peer = *other_peer; | 529 | msg->target_peer = *other_peer; |
530 | |||
531 | /* Set default values */ | ||
532 | msg->byzantine_upper_bond = UINT64_MAX; | ||
533 | msg->bandwidth_latency_tradeoff = 0; | ||
534 | msg->ibf_bucket_number_factor = 2; | ||
535 | msg->ibf_number_of_buckets_per_element = 3; | ||
536 | |||
537 | |||
529 | for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) | 538 | for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) |
530 | { | 539 | { |
531 | switch (opt->type) | 540 | switch (opt->type) |
@@ -534,6 +543,18 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer, | |||
534 | msg->byzantine = GNUNET_YES; | 543 | msg->byzantine = GNUNET_YES; |
535 | msg->byzantine_lower_bound = htonl (opt->v.num); | 544 | msg->byzantine_lower_bound = htonl (opt->v.num); |
536 | break; | 545 | break; |
546 | case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND: | ||
547 | msg->byzantine_upper_bond = htonl (opt->v.num); | ||
548 | break; | ||
549 | case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF: | ||
550 | msg->bandwidth_latency_tradeoff = htonl (opt->v.num); | ||
551 | break; | ||
552 | case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR: | ||
553 | msg->ibf_bucket_number_factor = htonl (opt->v.num); | ||
554 | break; | ||
555 | case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT: | ||
556 | msg->ibf_number_of_buckets_per_element = htonl (opt->v.num); | ||
557 | break; | ||
537 | case GNUNET_SETU_OPTION_FORCE_FULL: | 558 | case GNUNET_SETU_OPTION_FORCE_FULL: |
538 | msg->force_full = GNUNET_YES; | 559 | msg->force_full = GNUNET_YES; |
539 | break; | 560 | break; |
@@ -788,6 +809,13 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request, | |||
788 | mqm = GNUNET_MQ_msg (msg, | 809 | mqm = GNUNET_MQ_msg (msg, |
789 | GNUNET_MESSAGE_TYPE_SETU_ACCEPT); | 810 | GNUNET_MESSAGE_TYPE_SETU_ACCEPT); |
790 | msg->accept_reject_id = htonl (request->accept_id); | 811 | msg->accept_reject_id = htonl (request->accept_id); |
812 | |||
813 | /* Set default values */ | ||
814 | msg->byzantine_upper_bond = UINT64_MAX; | ||
815 | msg->bandwidth_latency_tradeoff = 0; | ||
816 | msg->ibf_bucket_number_factor = 2; | ||
817 | msg->ibf_number_of_buckets_per_element = 3; | ||
818 | |||
791 | for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) | 819 | for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) |
792 | { | 820 | { |
793 | switch (opt->type) | 821 | switch (opt->type) |
@@ -796,6 +824,18 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request, | |||
796 | msg->byzantine = GNUNET_YES; | 824 | msg->byzantine = GNUNET_YES; |
797 | msg->byzantine_lower_bound = htonl (opt->v.num); | 825 | msg->byzantine_lower_bound = htonl (opt->v.num); |
798 | break; | 826 | break; |
827 | case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND: | ||
828 | msg->byzantine_upper_bond = htonl (opt->v.num); | ||
829 | break; | ||
830 | case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF: | ||
831 | msg->bandwidth_latency_tradeoff = htonl (opt->v.num); | ||
832 | break; | ||
833 | case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR: | ||
834 | msg->ibf_bucket_number_factor = htonl (opt->v.num); | ||
835 | break; | ||
836 | case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT: | ||
837 | msg->ibf_number_of_buckets_per_element = htonl (opt->v.num); | ||
838 | break; | ||
799 | case GNUNET_SETU_OPTION_FORCE_FULL: | 839 | case GNUNET_SETU_OPTION_FORCE_FULL: |
800 | msg->force_full = GNUNET_YES; | 840 | msg->force_full = GNUNET_YES; |
801 | break; | 841 | break; |
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c index 2fb7d015e..5a0c9d70d 100644 --- a/src/setu/test_setu_api.c +++ b/src/setu/test_setu_api.c | |||
@@ -204,62 +204,6 @@ init_set2 (void *cls) | |||
204 | GNUNET_SETU_add_element (set2, &element, &start, NULL); | 204 | GNUNET_SETU_add_element (set2, &element, &start, NULL); |
205 | } | 205 | } |
206 | 206 | ||
207 | /** | ||
208 | * Generate random byte stream | ||
209 | */ | ||
210 | |||
211 | unsigned char *gen_rdm_bytestream (size_t num_bytes) | ||
212 | { | ||
213 | unsigned char *stream = GNUNET_malloc (num_bytes); | ||
214 | GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); | ||
215 | return stream; | ||
216 | } | ||
217 | |||
218 | /** | ||
219 | * Generate random sets | ||
220 | */ | ||
221 | |||
222 | static void | ||
223 | initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes) | ||
224 | { | ||
225 | struct GNUNET_SETU_Element element; | ||
226 | element.element_type = 0; | ||
227 | |||
228 | // Add elements to both sets | ||
229 | for (int i = 0; i < overlap; i++) { | ||
230 | element.data = gen_rdm_bytestream(element_size_in_bytes); | ||
231 | element.size = element_size_in_bytes; | ||
232 | GNUNET_SETU_add_element (set1, &element, NULL, NULL); | ||
233 | GNUNET_SETU_add_element (set2, &element, NULL, NULL); | ||
234 | set1_size--; | ||
235 | set2_size--; | ||
236 | } | ||
237 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); | ||
238 | |||
239 | // Add other elements to set 1 | ||
240 | while(set1_size>0) { | ||
241 | element.data = gen_rdm_bytestream(element_size_in_bytes); | ||
242 | element.size = element_size_in_bytes; | ||
243 | GNUNET_SETU_add_element (set1, &element, NULL, NULL); | ||
244 | set1_size--; | ||
245 | } | ||
246 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); | ||
247 | |||
248 | // Add other elements to set 2 | ||
249 | while(set2_size > 0) { | ||
250 | element.data = gen_rdm_bytestream(element_size_in_bytes); | ||
251 | element.size = element_size_in_bytes; | ||
252 | |||
253 | if(set2_size != 1) { | ||
254 | GNUNET_SETU_add_element (set2, &element,NULL, NULL); | ||
255 | } else { | ||
256 | GNUNET_SETU_add_element (set2, &element,&start, NULL); | ||
257 | } | ||
258 | |||
259 | set2_size--; | ||
260 | } | ||
261 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n"); | ||
262 | } | ||
263 | 207 | ||
264 | /** | 208 | /** |
265 | * Initialize the first set, continue. | 209 | * Initialize the first set, continue. |
@@ -392,9 +336,7 @@ run (void *cls, | |||
392 | /* test the real set reconciliation */ | 336 | /* test the real set reconciliation */ |
393 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 337 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
394 | "Running real set-reconciliation\n"); | 338 | "Running real set-reconciliation\n"); |
395 | //init_set1 (); | 339 | init_set1 (); |
396 | initRandomSets(19500,20000,20000,4096); | ||
397 | //initRandomSets(19500,20000,20000,32); | ||
398 | } | 340 | } |
399 | 341 | ||
400 | 342 | ||