From ebc70e1bccd6c2f784df8630f1105a91bc7bfeed Mon Sep 17 00:00:00 2001 From: Elias Summermatter Date: Fri, 2 Apr 2021 15:46:25 +0200 Subject: SETU: Implement LSD0003 --- src/include/gnunet_protocols.h | 7 + src/include/gnunet_setu_service.h | 26 +- src/set/ibf.c | 2 +- src/set/ibf.h | 1 + src/setu/Makefile.am | 11 +- src/setu/gnunet-service-setu.c | 2081 ++++++++++++++++++++--- src/setu/gnunet-service-setu_protocol.h | 77 +- src/setu/gnunet-service-setu_strata_estimator.c | 362 +++- src/setu/gnunet-service-setu_strata_estimator.h | 54 +- src/setu/ibf.c | 294 +++- src/setu/ibf.h | 65 +- src/setu/perf_setu_api.c | 571 ++++--- src/setu/setu.h | 49 + src/setu/setu_api.c | 40 + src/setu/test_setu_api.c | 60 +- 15 files changed, 2921 insertions(+), 779 deletions(-) diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 715e94c6a..6b61dfc72 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1784,6 +1784,13 @@ extern "C" { */ #define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572 +/** + * Signals other peer that all elements are sent. + */ + +#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL 710 + + /******************************************************************************* * SETI message types diff --git a/src/include/gnunet_setu_service.h b/src/include/gnunet_setu_service.h index bacec9408..1d7e48402 100644 --- a/src/include/gnunet_setu_service.h +++ b/src/include/gnunet_setu_service.h @@ -163,7 +163,31 @@ enum GNUNET_SETU_OptionType /** * Notify client also if we are sending a value to the other peer. */ - GNUNET_SETU_OPTION_SYMMETRIC = 8 + GNUNET_SETU_OPTION_SYMMETRIC = 8, + + /** + * Byzantine upper bound. Is the maximal plausible number of elements + * a peer can have default max uint64 + */ + GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND = 16, + + /** + * Bandwidth latency tradeoff determines how much bytes a single RTT is + * worth, which is a performance setting + */ + GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF= 32, + + /** + * The factor determines the number of buckets an IBF has which is + * multiplied by the estimated setsize default: 2 + */ + GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR= 64, + + /** + * This setting determines to how many IBF buckets an single elements + * is mapped to. + */ + GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT= 128 }; diff --git a/src/set/ibf.c b/src/set/ibf.c index 1532afceb..0f7eb6a9f 100644 --- a/src/set/ibf.c +++ b/src/set/ibf.c @@ -294,7 +294,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, struct IBF_KeyHash *key_hash_dst; struct IBF_Count *count_dst; - GNUNET_assert (start + count <= ibf->size); + GNUNET_assert (start + count <= ibf->size); /* copy keys */ key_dst = (struct IBF_Key *) buf; diff --git a/src/set/ibf.h b/src/set/ibf.h index 7c2ab33b1..334a797ef 100644 --- a/src/set/ibf.h +++ b/src/set/ibf.h @@ -245,6 +245,7 @@ void ibf_destroy (struct InvertibleBloomFilter *ibf); + #if 0 /* keep Emacsens' auto-indent happy */ { #endif diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am index 77d048add..6e2865d8c 100644 --- a/src/setu/Makefile.am +++ b/src/setu/Makefile.am @@ -7,6 +7,8 @@ libexecdir= $(pkglibdir)/libexec/ plugindir = $(libdir)/gnunet +PTHREAD = -lpthread + pkgcfg_DATA = \ setu.conf @@ -63,7 +65,8 @@ libgnunetsetu_la_SOURCES = \ setu_api.c setu.h libgnunetsetu_la_LIBADD = \ $(top_builddir)/src/util/libgnunetutil.la \ - $(LTLIBINTL) + $(LTLIBINTL) \ + $(PTHREAD) libgnunetsetu_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) @@ -91,7 +94,8 @@ perf_setu_api_SOURCES = \ perf_setu_api_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/testing/libgnunettesting.la \ - libgnunetsetu.la + libgnunetsetu.la \ + $(PTHREAD) plugin_LTLIBRARIES = \ @@ -103,7 +107,8 @@ libgnunet_plugin_block_setu_test_la_LIBADD = \ $(top_builddir)/src/block/libgnunetblock.la \ $(top_builddir)/src/block/libgnunetblockgroup.la \ $(top_builddir)/src/util/libgnunetutil.la \ - $(LTLIBINTL) + $(LTLIBINTL) \ + $(PTHREAD) libgnunet_plugin_block_setu_test_la_LDFLAGS = \ $(GN_PLUGIN_LDFLAGS) diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c index bd1113f15..a51e92b71 100644 --- a/src/setu/gnunet-service-setu.c +++ b/src/setu/gnunet-service-setu.c @@ -22,6 +22,7 @@ * @brief set union operation * @author Florian Dold * @author Christian Grothoff + * @author Elias Summermatter */ #include "platform.h" #include "gnunet_util_lib.h" @@ -50,33 +51,61 @@ */ #define SE_STRATA_COUNT 32 + /** - * Size of the IBFs in the strata estimator. + * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 + * Based on the bsc thesis of Elias Summermatter (2021) */ -#define SE_IBF_SIZE 80 +#define SE_IBFS_TOTAL_SIZE 632 /** * The hash num parameter for the difference digests and strata estimators. */ -#define SE_IBF_HASH_NUM 4 +#define SE_IBF_HASH_NUM 3 /** * Number of buckets that can be transmitted in one message. */ -#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) +#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE) /** - * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). + * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20. * Choose this value so that computing the IBF is still cheaper * than transmitting all values. */ -#define MAX_IBF_ORDER (20) +#define MAX_IBF_SIZE 1048576 + + +/** + * Minimal size of an ibf + * Based on the bsc thesis of Elias Summermatter (2021) + */ +#define IBF_MIN_SIZE 37 + +/** + * AVG RTT for differential sync when k=2 and Factor = 2 + * Based on the bsc thesis of Elias Summermatter (2021) + */ +#define DIFFERENTIAL_RTT_MEAN 3.65145 + +/** + * Security level used for byzantine checks (2^80) + */ + +#define SECURITY_LEVEL 80 + +/** + * Is the estimated probabily for a new round this values + * is based on the bsc thesis of Elias Summermatter (2021) + */ + +#define PROBABILITY_FOR_NEW_ROUND 0.15 /** - * Number of buckets used in the ibf per estimated - * difference. + * Measure the performance in a csv */ -#define IBF_ALPHA 4 + +#define MEASURE_PERFORMANCE 0 /** @@ -146,6 +175,28 @@ enum UnionOperationPhase PHASE_FULL_RECEIVING }; +/** + * Different modes of operations + */ + +enum MODE_OF_OPERATION +{ + /** + * Mode just synchronizes the difference between sets + */ + DIFFERENTIAL_SYNC, + + /** + * Mode send full set sending local set first + */ + FULL_SYNC_LOCAL_SENDING_FIRST, + + /** + * Mode request full set from remote peer + */ + FULL_SYNC_REMOTE_SENDING_FIRST +}; + /** * Information about an element element in the set. All elements are @@ -277,7 +328,7 @@ struct Operation * Copy of the set's strata estimator at the time of * creation of this operation. */ - struct StrataEstimator *se; + struct MultiStrataEstimator *se; /** * The IBF we currently receive. @@ -320,7 +371,7 @@ struct Operation /** * Number of ibf buckets already received into the @a remote_ibf. */ - unsigned int ibf_buckets_received; + uint64_t ibf_buckets_received; /** * Salt that we're using for sending IBFs @@ -386,7 +437,7 @@ struct Operation * Lower bound for the set size, used only when * byzantine mode is enabled. */ - int byzantine_lower_bound; + uint64_t byzantine_lower_bound; /** * Unique request id for the request from a remote peer, sent to the @@ -401,21 +452,83 @@ struct Operation */ unsigned int generation_created; + + /** + * User defined Bandwidth Round Trips Tradeoff + */ + uint64_t rtt_bandwidth_tradeoff; + + + /** + * Number of Element per bucket in IBF + */ + uint8_t ibf_number_buckets_per_element; + + /** - * User defined Bandwidth Round Trips Tradeoff + * Set difference is multiplied with this factor + * to gennerate large enought IBF */ - double rtt_bandwidth_tradeoff; + uint8_t ibf_bucket_number_factor; /** - * Number of Element per bucket in IBF + * Defines which site a client is + * 0 = Initiating peer + * 1 = Receiving peer */ - unsigned int ibf_number_buckets_per_element; + uint8_t peer_site; + /** - * Number of buckets in IBF + * Local peer element count */ - unsigned ibf_bucket_number; + uint64_t local_element_count; + /** + * Mode of operation that was chosen by the algorithm + */ + uint8_t mode_of_operation; + + /** + * Hashmap to keep track of the send/received messages + */ + struct GNUNET_CONTAINER_MultiHashMap *message_control_flow; + + /** + * Hashmap to keep track of the send/received inquiries (ibf keys) + */ + struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent; + + + /** + * Total size of local set + */ + uint64_t total_elements_size_local; + + /** + * Limit of number of elements in set + */ + uint64_t byzantine_upper_bound; + + /** + * is the count of already passed differential sync iterations + */ + uint8_t differential_sync_iterations; + + /** + * Estimated or committed set difference at the start + */ + uint64_t remote_set_diff; + + /** + * Estimated or committed set difference at the start + */ + uint64_t local_set_diff; + + /** + * Boolean to enforce an active passive switch + */ + bool active_passive_switch_required; }; @@ -430,6 +543,16 @@ struct SetContent */ struct GNUNET_CONTAINER_MultiHashMap *elements; + /** + * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized. + */ + struct GNUNET_CONTAINER_MultiHashMap *elements_randomized; + + /** + * Salt to construct the randomized element map + */ + uint64_t elements_randomized_salt; + /** * Number of references to the content. */ @@ -478,7 +601,7 @@ struct Set * The strata estimator is only generated once for each set. The IBF keys * are derived from the element hashes with salt=0. */ - struct StrataEstimator *se; + struct MultiStrataEstimator *se; /** * Evaluate operations are held in a linked list. @@ -635,96 +758,679 @@ static int in_shutdown; */ static uint32_t suggest_id; +#if MEASURE_PERFORMANCE +/** + * Handles configuration file for setu performance test + * + */ +static const struct GNUNET_CONFIGURATION_Handle *setu_cfg; + + +/** + * Stores the performance data for induvidual message + */ + + +struct perf_num_send_received_msg +{ + uint64_t sent; + uint64_t sent_var_bytes; + uint64_t received; + uint64_t received_var_bytes; +}; + +/** + * Main struct to messure perfomance (data/rtts) + */ +struct per_store_struct +{ + struct perf_num_send_received_msg operation_request; + struct perf_num_send_received_msg se; + struct perf_num_send_received_msg request_full; + struct perf_num_send_received_msg element_full; + struct perf_num_send_received_msg full_done; + struct perf_num_send_received_msg ibf; + struct perf_num_send_received_msg inquery; + struct perf_num_send_received_msg element; + struct perf_num_send_received_msg demand; + struct perf_num_send_received_msg offer; + struct perf_num_send_received_msg done; + struct perf_num_send_received_msg over; + uint64_t se_diff; + uint64_t se_diff_remote; + uint64_t se_diff_local; + uint64_t active_passive_switches; + uint8_t mode_of_operation; +}; + +struct per_store_struct perf_store; +#endif /** - * Added Roundtripscounter + * Different states to control the messages flow in differential mode */ +enum MESSAGE_CONTROL_FLOW_STATE +{ + /** + * Initial message state + */ + MSG_CFS_UNINITIALIZED, + + /** + * Track that a message has been sent + */ + MSG_CFS_SENT, + + /** + * Track that receiving this message is expected + */ + MSG_CFS_EXPECTED, -struct perf_num_send_resived_msg { - int sent; - int sent_var_bytes; - int received; - int received_var_bytes; + /** + * Track that message has been recieved + */ + MSG_CFS_RECEIVED, }; +/** + * Message types to track in message control flow + */ -struct perf_rtt_struct -{ - struct perf_num_send_resived_msg operation_request; - struct perf_num_send_resived_msg se; - struct perf_num_send_resived_msg request_full; - struct perf_num_send_resived_msg element_full; - struct perf_num_send_resived_msg full_done; - struct perf_num_send_resived_msg ibf; - struct perf_num_send_resived_msg inquery; - struct perf_num_send_resived_msg element; - struct perf_num_send_resived_msg demand; - struct perf_num_send_resived_msg offer; - struct perf_num_send_resived_msg done; - struct perf_num_send_resived_msg over; +enum MESSAGE_TYPE +{ + /** + * Offer message type + */ + OFFER_MESSAGE, + /** + * Demand message type + */ + DEMAND_MESSAGE, + /** + * Elemente message type + */ + ELEMENT_MESSAGE, +}; + +/** + * Struct to tracked messages in message controll flow + */ + +struct messageControlFlowElement +{ + /** + * Track the message control state of the offer message + */ + enum MESSAGE_CONTROL_FLOW_STATE offer; + /** + * Track the message control state of the demand message + */ + enum MESSAGE_CONTROL_FLOW_STATE demand; + /** + * Track the message control state of the element message + */ + enum MESSAGE_CONTROL_FLOW_STATE element; }; -struct perf_rtt_struct perf_rtt; +#if MEASURE_PERFORMANCE +/** + * Loads different configuration to do perform perfomance tests + * @param op + */ +static void +load_config (struct Operation *op) +{ + + setu_cfg = GNUNET_CONFIGURATION_create (); + GNUNET_CONFIGURATION_load (setu_cfg,"perf_setu.conf"); + + + long long number; + float fl; + GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR", + &fl); + op->ibf_bucket_number_factor = fl; + + GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET", + &number); + op->ibf_number_buckets_per_element = number; + GNUNET_CONFIGURATION_get_value_number (setu_cfg,"PERFORMANCE", "TRADEOFF", + &number); + op->rtt_bandwidth_tradeoff = number; + + + GNUNET_CONFIGURATION_get_value_number (setu_cfg,"BOUNDARIES", "UPPER_ELEMENT", + &number); + op->byzantine_upper_bound = number; + + + op->peer_site = 0; +} + + +/** + * Function to calculate total bytes used for performance messurement + * @param size + * @param perf_num_send_received_msg + * @return bytes used + */ static int -sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) { - return (size * perf_rtt_struct.sent) + - (size * perf_rtt_struct.received) + - perf_rtt_struct.sent_var_bytes + - perf_rtt_struct.received_var_bytes; +sum_sent_received_bytes (uint64_t size, struct perf_num_send_received_msg + perf_num_send_received_msg) +{ + return (size * perf_num_send_received_msg.sent) + + (size * perf_num_send_received_msg.received) + + perf_num_send_received_msg.sent_var_bytes + + perf_num_send_received_msg.received_var_bytes; } -static float -calculate_perf_rtt() { - /** - * Calculate RTT of init phase normally always 1 - */ - float rtt = 1; - int bytes_transmitted = 0; - /** - * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending - */ - if (( perf_rtt.element_full.received != 0 ) || - ( perf_rtt.element_full.sent != 0) - ) rtt += 1; +/** + * Function that calculates the perfmance values and writes them down + */ +static void +calculate_perf_store () +{ - if (( perf_rtt.request_full.received != 0 ) || - ( perf_rtt.request_full.sent != 0) - ) rtt += 0.5; + /** + * Calculate RTT of init phase normally always 1 + */ + float rtt = 1; + int bytes_transmitted = 0; - /** - * In case of a differential sync 3 rtt's are needed. - * for every active/passive switch additional 3.5 rtt's are used - */ + /** + * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normaly 1 or 1.5 depending + */ + if ((perf_store.element_full.received != 0) || + (perf_store.element_full.sent != 0) + ) + rtt += 1; + + if ((perf_store.request_full.received != 0) || + (perf_store.request_full.sent != 0) + ) + rtt += 0.5; + + /** + * In case of a differential sync 3 rtt's are needed. + * for every active/passive switch additional 3.5 rtt's are used + */ + if ((perf_store.element.received != 0) || + (perf_store.element.sent != 0)) + { + int iterations = perf_store.active_passive_switches; + + if (iterations > 0) + rtt += iterations * 0.5; + rtt += 2.5; + } + + + /** + * Calculate data sended size + */ + bytes_transmitted += sum_sent_received_bytes (sizeof(struct + GNUNET_SETU_ResultMessage), + perf_store.request_full); + + bytes_transmitted += sum_sent_received_bytes (sizeof(struct + GNUNET_SETU_ElementMessage), + perf_store.element_full); + bytes_transmitted += sum_sent_received_bytes (sizeof(struct + GNUNET_SETU_ElementMessage), + perf_store.element); + // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request); + bytes_transmitted += sum_sent_received_bytes (sizeof(struct + StrataEstimatorMessage), + perf_store.se); + bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done); + bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage), + perf_store.ibf); + bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage), + perf_store.inquery); + bytes_transmitted += sum_sent_received_bytes (sizeof(struct + GNUNET_MessageHeader), + perf_store.demand); + bytes_transmitted += sum_sent_received_bytes (sizeof(struct + GNUNET_MessageHeader), + perf_store.offer); + bytes_transmitted += sum_sent_received_bytes (4, perf_store.done); + + /** + * Write IBF failure rate for different BUCKET_NUMBER_FACTOR + */ + float factor; + GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR", + &factor); + long long num_per_bucket; + GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET", + &num_per_bucket); + + + int decoded = 0; + if (perf_store.active_passive_switches == 0) + decoded = 1; + int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct + IBFMessage), + perf_store.ibf); + + FILE *out1 = fopen ("perf_data.csv", "a"); + fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor, + decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff, + bytes_transmitted, + perf_store.se_diff_local,perf_store.se_diff_remote, + perf_store.mode_of_operation); + fclose (out1); + +} + + +#endif +/** + * Function that chooses the optimal mode of operation depending on + * operation parameters. + * @param avg_element_size + * @param local_set_size + * @param remote_set_size + * @param est_set_diff_remote + * @param est_set_diff_local + * @param bandwith_latency_tradeoff + * @param ibf_bucket_number_factor + * @return calcuated mode of operation + */ +static uint8_t +estimate_best_mode_of_operation (uint64_t avg_element_size, + uint64_t local_set_size, + uint64_t remote_set_size, + uint64_t est_set_diff_remote, + uint64_t est_set_diff_local, + uint64_t bandwith_latency_tradeoff, + uint64_t ibf_bucket_number_factor) +{ + + /* + * In case of initial sync fall to predefined states + */ + + if (0 == local_set_size) + return FULL_SYNC_REMOTE_SENDING_FIRST; + if (0 == remote_set_size) + return FULL_SYNC_LOCAL_SENDING_FIRST; + + /* + * Calculate bytes for full Sync + */ + + uint8_t sizeof_full_done_header = 4; + uint8_t sizeof_done_header = 4; + uint8_t rtt_min_full = 2; + uint8_t sizeof_request_full = 4; + uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local); + + /* Estimate byte required if we send first */ + uint64_t total_elements_to_send_local_send_first = est_set_diff_remote + + local_set_size; + + uint64_t total_bytes_full_local_send_first = (avg_element_size + * + total_elements_to_send_local_send_first) \ + + ( + total_elements_to_send_local_send_first * sizeof(struct + GNUNET_SETU_ElementMessage)) \ + + (sizeof_full_done_header * 2) \ + + rtt_min_full + * bandwith_latency_tradeoff; + + /* Estimate bytes required if we request from remote peer */ + uint64_t total_elements_to_send_remote_send_first = est_set_diff_local + + remote_set_size; + + uint64_t total_bytes_full_remote_send_first = (avg_element_size + * + total_elements_to_send_remote_send_first) \ + + ( + total_elements_to_send_remote_send_first * sizeof(struct + GNUNET_SETU_ElementMessage)) \ + + (sizeof_full_done_header * 2) \ + + (rtt_min_full + 0.5) + * bandwith_latency_tradeoff \ + + sizeof_request_full; + + /* + * Calculate bytes for differential Sync + */ + + /* Estimate bytes required by IBF transmission*/ + + long double ibf_bucket_count = estimated_total_diff + * ibf_bucket_number_factor; + + if (ibf_bucket_count <= IBF_MIN_SIZE) + { + ibf_bucket_count = IBF_MIN_SIZE; + } + uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count) + / MAX_BUCKETS_PER_MESSAGE); + + uint64_t estimated_counter_size = ceil ( + MIN (2 * log2l ((float) local_set_size / ibf_bucket_count), log2l ( + local_set_size))); + + long double counter_bytes = (float) estimated_counter_size / 8; + + uint64_t ibf_bytes = ceil ((sizeof(struct IBFMessage) * ibf_message_count) + * 1.2 \ + + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \ + + (ibf_bucket_count * sizeof(struct IBF_KeyHash)) + * 1.2 \ + + (ibf_bucket_count * counter_bytes) * 1.2); + + /* Estimate full byte count for differential sync */ + uint64_t element_size = (avg_element_size + sizeof(struct + GNUNET_SETU_ElementMessage)) \ + * estimated_total_diff; + uint64_t done_size = sizeof_done_header; + uint64_t inquery_size = (sizeof(struct IBF_Key) + sizeof(struct + InquiryMessage)) + * estimated_total_diff; + uint64_t demand_size = + (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader)) + * estimated_total_diff; + uint64_t offer_size = (sizeof(struct GNUNET_HashCode) + sizeof(struct + GNUNET_MessageHeader)) + * estimated_total_diff; + + uint64_t total_bytes_diff = (element_size + done_size + inquery_size + + demand_size + offer_size + ibf_bytes) \ + + (DIFFERENTIAL_RTT_MEAN + * bandwith_latency_tradeoff); + + uint64_t full_min = MIN (total_bytes_full_local_send_first, + total_bytes_full_remote_send_first); + + /* Decide between full and differential sync */ + + if (full_min < total_bytes_diff) + { + /* Decide between sending all element first or receiving all elements */ + if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first) + { + return FULL_SYNC_LOCAL_SENDING_FIRST; + } + else + { + return FULL_SYNC_REMOTE_SENDING_FIRST; + } + } + else + { + return DIFFERENTIAL_SYNC; + } +} + + +/** + * Validates the if a message is received in a correct phase + * @param allowed_phases + * @param size_phases + * @param op + * @return GNUNET_YES if message permitted in phase and GNUNET_NO if not permitted in given + * phase + */ +static int +check_valid_phase (const uint8_t allowed_phases[], size_t size_phases, struct + Operation *op) +{ + /** + * Iterate over allowed phases + */ + for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++) + { + uint8_t phase = allowed_phases[phase_ctr]; + if (phase == op->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message received in valid phase\n"); + return GNUNET_YES; + } + } + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received message in invalid phase: %u\n", op->phase); + return GNUNET_NO; +} + + +/** + * Function to update, track and validate message received in differential + * sync. This function tracks states of messages and check it against different + * constraints as described in Summermatter's BSc Thesis (2021) + * @param hash_map: Hashmap to store message control flow + * @param new_mcfs: The new message control flow state an given message type should be set to + * @param hash_code: Hash code of the element + * @param mt: The message type for which the message control flow state should be set + * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid + * at this point in message flow + */ +static int +update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map, + enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, + const struct GNUNET_HashCode *hash_code, + enum MESSAGE_TYPE mt) +{ + struct messageControlFlowElement *cfe = NULL; + enum MESSAGE_CONTROL_FLOW_STATE *mcfs; + + /** + * Check logic for forbidden messages + */ + + cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code); + if ((ELEMENT_MESSAGE == mt) && (cfe != NULL)) + { + if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received an element without sent offer!\n"); + return GNUNET_NO; + } + /* Check that only requested elements are received! */ + if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand != + MSG_CFS_SENT)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Received an element that was not demanded\n"); + return GNUNET_NO; + } + } + + /** + * In case the element hash is not in the hashmap create a new entry + */ + + if (NULL == cfe) + { + cfe = GNUNET_new (struct messageControlFlowElement); + if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code, + cfe, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + { + GNUNET_free (cfe); + return GNUNET_SYSERR; + } + } + + /** + * Set state of message + */ + + if (OFFER_MESSAGE == mt) + { + mcfs = &cfe->offer; + } + else if (DEMAND_MESSAGE == mt) + { + mcfs = &cfe->demand; + } + else if (ELEMENT_MESSAGE == mt) + { + mcfs = &cfe->element; + } + else + { + return GNUNET_SYSERR; + } + + /** + * Check if state is allowed + */ - int iterations = perf_rtt.ibf.received; - if(iterations > 1) - rtt += (iterations - 1 ) * 0.5; - rtt += 3 * iterations; + if (new_mcfs <= *mcfs) + { + return GNUNET_NO; + } + + *mcfs = new_mcfs; + return GNUNET_YES; +} + + +/** + * Validate if a message in differential sync si already received before. + * @param hash_map + * @param hash_code + * @param mt + * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO + */ +static int +is_message_in_message_control_flow (struct + GNUNET_CONTAINER_MultiHashMap *hash_map, + struct GNUNET_HashCode *hash_code, + enum MESSAGE_TYPE mt) +{ + struct messageControlFlowElement *cfe = NULL; + enum MESSAGE_CONTROL_FLOW_STATE *mcfs; + + cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code); + + /** + * Set state of message + */ + + if (cfe != NULL) + { + if (OFFER_MESSAGE == mt) + { + mcfs = &cfe->offer; + } + else if (DEMAND_MESSAGE == mt) + { + mcfs = &cfe->demand; + } + else if (ELEMENT_MESSAGE == mt) + { + mcfs = &cfe->element; + } + else + { + return GNUNET_SYSERR; + } /** - * Calculate data sended size + * Evaluate if set is in message */ - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer); - bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done); + if (*mcfs != MSG_CFS_UNINITIALIZED) + { + return GNUNET_YES; + } + } + return GNUNET_NO; +} - LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted); - LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt )); +/** + * Iterator for determining if all demands have been + * satisfied + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +determinate_done_message_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct messageControlFlowElement *mcfe = value; + + if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) )) + { + return GNUNET_YES; + } + return GNUNET_NO; +} + + +/** + * Iterator for determining average size + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +determinate_avg_element_size_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct GNUNET_SETU_Element *element = value; + op->total_elements_size_local += element->size; + return GNUNET_YES; +} + + +/** + * Create randomized element hashmap for full sending + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +create_randomized_element_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; - return rtt; + struct GNUNET_HashContext *hashed_key_context = + GNUNET_CRYPTO_hash_context_start (); + struct GNUNET_HashCode new_key; + + /** + * Hash element with new salt to randomize hashmap + */ + GNUNET_CRYPTO_hash_context_read (hashed_key_context, + &key, + sizeof(struct IBF_Key)); + GNUNET_CRYPTO_hash_context_read (hashed_key_context, + &op->set->content->elements_randomized_salt, + sizeof(uint32_t)); + GNUNET_CRYPTO_hash_context_finish (hashed_key_context, + &new_key); + GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized, + &new_key,value, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + return GNUNET_YES; } @@ -808,6 +1514,36 @@ send_client_done (void *cls) } +/** + * Check if all given byzantine parameters are in given boundaries + * @param op + * @return indicator if all given byzantine parameters are in given boundaries + */ + +static int +check_byzantine_bounds (struct Operation *op) +{ + if (op->byzantine != GNUNET_YES) + return GNUNET_OK; + + /** + * Check upper byzantine bounds + */ + if (op->remote_element_count + op->remote_set_diff > + op->byzantine_upper_bound) + return GNUNET_SYSERR; + if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound) + return GNUNET_SYSERR; + + /** + * Check lower byzantine bounds + */ + if (op->remote_element_count < op->byzantine_lower_bound) + return GNUNET_SYSERR; + return GNUNET_OK; +} + + /* FIXME: the destroy logic is a mess and should be cleaned up! */ /** @@ -976,6 +1712,101 @@ fail_union_operation (struct Operation *op) } +/** + * Function that checks if full sync is plausible + * @param initial_local_elements_in_set + * @param estimated_set_difference + * @param repeated_elements + * @param fresh_elements + * @param op + * @return GNUNET_OK if + */ + +static void +full_sync_plausibility_check (struct Operation *op) +{ + if (GNUNET_YES != op->byzantine) + return; + + int security_level_lb = -1 * SECURITY_LEVEL; + uint64_t duplicates = op->received_fresh - op->received_total; + + /* + * Protect full sync from receiving double element when in FULL SENDING + */ + if (PHASE_FULL_SENDING == op->phase) + { + if (duplicates > 0) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Received duplicate element in full receiving " + "mode of operation this is not allowed! Duplicates: %lu\n", + duplicates); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + } + + /* + * Protect full sync with probabilistic algorithm + */ + if (PHASE_FULL_RECEIVING == op->phase) + { + if (0 == op->remote_set_diff) + op->remote_set_diff = 1; + + long double base = (1 - (long double) (op->remote_set_diff + / (long double) (op->initial_size + + op-> + remote_set_diff))); + long double exponent = (op->received_total - (op->received_fresh * ((long + double) + op-> + initial_size + / (long + double) + op-> + remote_set_diff))); + long double value = exponent * (log2l (base) / log2l (2)); + if ((value < security_level_lb) || (value > SECURITY_LEVEL) ) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving " + "to many duplicated full element : %LF\n", + value); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + } +} + + +/** + * Limit active passive switches in differential sync to configured security level + * @param op + */ +static void +check_max_differential_rounds (struct Operation *op) +{ + double probability = op->differential_sync_iterations * (log2l ( + PROBABILITY_FOR_NEW_ROUND) + / log2l (2)); + if ((-1 * SECURITY_LEVEL) > probability) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive " + "switches in differential sync: %u\n", + op->differential_sync_iterations); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + + /** * Derive the IBF key from a hash code and * a salt. @@ -1004,12 +1835,12 @@ get_ibf_key (const struct GNUNET_HashCode *src) struct GetElementContext { /** - * FIXME. + * Gnunet hash code in context */ struct GNUNET_HashCode hash; /** - * FIXME. + * Pointer to the key enty */ struct KeyEntry *k; }; @@ -1122,7 +1953,7 @@ salt_key (const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out) { - int s = salt % 64; + int s = (salt * 7) % 64; uint64_t x = k_in->key_val; /* rotate ibf key */ @@ -1132,14 +1963,14 @@ salt_key (const struct IBF_Key *k_in, /** - * FIXME. + * Reverse modification done in the salt_key function */ static void unsalt_key (const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out) { - int s = salt % 64; + int s = (salt * 7) % 64; uint64_t x = k_in->key_val; x = (x << s) | (x >> (64 - s)); @@ -1258,7 +2089,9 @@ prepare_ibf (struct Operation *op, if (NULL != op->local_ibf) ibf_destroy (op->local_ibf); - op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); + // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); + op->local_ibf = ibf_create (size, + ((uint8_t) op->ibf_number_buckets_per_element)); if (NULL == op->local_ibf) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1283,13 +2116,23 @@ prepare_ibf (struct Operation *op, */ static int send_ibf (struct Operation *op, - uint16_t ibf_order) + uint32_t ibf_size) { - unsigned int buckets_sent = 0; + uint64_t buckets_sent = 0; struct InvertibleBloomFilter *ibf; + op->differential_sync_iterations++; + + /** + * Enforce min size of IBF + */ + uint32_t ibf_min_size = IBF_MIN_SIZE; + if (ibf_size < ibf_min_size) + { + ibf_size = ibf_min_size; + } if (GNUNET_OK != - prepare_ibf (op, 1 << ibf_order)) + prepare_ibf (op, ibf_size)) { /* allocation failed */ return GNUNET_SYSERR; @@ -1297,45 +2140,48 @@ send_ibf (struct Operation *op, LOG (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", - 1 << ibf_order); + 1 << ibf_size); { char name[64]; - GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); + GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_size); GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); } ibf = op->local_ibf; - while (buckets_sent < (1 << ibf_order)) + while (buckets_sent < ibf_size) { unsigned int buckets_in_message; struct GNUNET_MQ_Envelope *ev; struct IBFMessage *msg; - buckets_in_message = (1 << ibf_order) - buckets_sent; + buckets_in_message = ibf_size - buckets_sent; /* limit to maximum */ if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) buckets_in_message = MAX_BUCKETS_PER_MESSAGE; - perf_rtt.ibf.sent += 1; - perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE ); +#if MEASURE_PERFORMANCE + perf_store.ibf.sent += 1; + perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE); +#endif ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, GNUNET_MESSAGE_TYPE_SETU_P2P_IBF); - msg->reserved1 = 0; - msg->reserved2 = 0; - msg->order = ibf_order; + msg->ibf_size = ibf_size; msg->offset = htonl (buckets_sent); msg->salt = htonl (op->salt_send); + msg->ibf_counter_bit_length = ibf_get_max_counter (ibf); + + ibf_write_slice (ibf, buckets_sent, - buckets_in_message, &msg[1]); + buckets_in_message, &msg[1], msg->ibf_counter_bit_length); buckets_sent += buckets_in_message; LOG (GNUNET_ERROR_TYPE_DEBUG, - "ibf chunk size %u, %u/%u sent\n", + "ibf chunk size %u, %lu/%u sent\n", buckets_in_message, buckets_sent, - 1 << ibf_order); + ibf_size); GNUNET_MQ_send (op->mq, ev); } @@ -1354,17 +2200,26 @@ send_ibf (struct Operation *op, * @return the required size of the ibf */ static unsigned int -get_order_from_difference (unsigned int diff) +get_size_from_difference (unsigned int diff, int number_buckets_per_element, + float ibf_bucket_number_factor) { - unsigned int ibf_order; + /** Make ibf estimation size odd reasoning can be found in BSc Thesis of + * Elias Summermatter (2021) in section 3.11 **/ + return (((int) (diff * ibf_bucket_number_factor)) | 1); + +} - ibf_order = 2; - while (((1 << ibf_order) < (IBF_ALPHA * diff) || - ((1 << ibf_order) < SE_IBF_HASH_NUM)) && - (ibf_order < MAX_IBF_ORDER)) - ibf_order++; - // add one for correction - return ibf_order + 1; + +static unsigned int +get_next_ibf_size (float ibf_bucket_number_factor, unsigned int + decoded_elements, unsigned int last_ibf_size) +{ + unsigned int next_size = (unsigned int) ((last_ibf_size * 2) + - (ibf_bucket_number_factor + * decoded_elements)); + /** Make ibf estimation size odd reasoning can be found in BSc Thesis of + * Elias Summermatter (2021) in section 3.11 **/ + return next_size | 1; } @@ -1391,8 +2246,10 @@ send_full_element_iterator (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending element %s\n", GNUNET_h2s (key)); - perf_rtt.element_full.received += 1; - perf_rtt.element_full.received_var_bytes += el->size; +#if MEASURE_PERFORMANCE + perf_store.element_full.received += 1; + perf_store.element_full.received_var_bytes += el->size; +#endif ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); @@ -1421,10 +2278,25 @@ send_full_set (struct Operation *op) "Dedicing to transmit the full set\n"); /* FIXME: use a more memory-friendly way of doing this with an iterator, just as we do in the non-full case! */ + + // Randomize Elements to send + op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create ( + 32,GNUNET_NO); + op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 ( + GNUNET_CRYPTO_QUALITY_NONCE, + UINT64_MAX); (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, - &send_full_element_iterator, + & + create_randomized_element_iterator, op); - perf_rtt.full_done.sent += 1; + + (void) GNUNET_CONTAINER_multihashmap_iterate ( + op->set->content->elements_randomized, + &send_full_element_iterator, + op); +#if MEASURE_PERFORMANCE + perf_store.full_done.sent += 1; +#endif ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); GNUNET_MQ_send (op->mq, ev); @@ -1454,7 +2326,7 @@ check_union_p2p_strata_estimator (void *cls, msg->header.type)); len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); if ((GNUNET_NO == is_compressed) && - (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) + (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -1473,14 +2345,44 @@ static void handle_union_p2p_strata_estimator (void *cls, const struct StrataEstimatorMessage *msg) { - perf_rtt.se.received += 1; - perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); +#if MEASURE_PERFORMANCE + perf_store.se.received += 1; + perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct + StrataEstimatorMessage); +#endif struct Operation *op = cls; - struct StrataEstimator *remote_se; + struct MultiStrataEstimator *remote_se; unsigned int diff; uint64_t other_size; size_t len; int is_compressed; + op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( + op->set->content->elements); + // Setting peer site to receiving peer + op->peer_site = 1; + + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_EXPECT_SE}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + + /** Only allow 1,2,4,8 SEs **/ + if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n", + msg->se_count); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons ( msg->header.type)); @@ -1490,8 +2392,20 @@ handle_union_p2p_strata_estimator (void *cls, GNUNET_NO); len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); other_size = GNUNET_ntohll (msg->set_size); + op->remote_element_count = other_size; + + if (op->byzantine_upper_bound < op->remote_element_count) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Exceeded configured upper bound <%lu> of element: %u\n", + op->byzantine_upper_bound, + op->remote_element_count); + fail_union_operation (op); + return; + } + remote_se = strata_estimator_create (SE_STRATA_COUNT, - SE_IBF_SIZE, + SE_IBFS_TOTAL_SIZE, SE_IBF_HASH_NUM); if (NULL == remote_se) { @@ -1503,6 +2417,8 @@ handle_union_p2p_strata_estimator (void *cls, strata_estimator_read (&msg[1], len, is_compressed, + msg->se_count, + SE_IBFS_TOTAL_SIZE, remote_se)) { /* decompression failed */ @@ -1511,11 +2427,76 @@ handle_union_p2p_strata_estimator (void *cls, return; } GNUNET_assert (NULL != op->se); - diff = strata_estimator_difference (remote_se, - op->se); + strata_estimator_difference (remote_se, + op->se); + + /* Calculate remote local diff */ + long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count; + long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count; + + /* Prevent estimations from overshooting max element */ + if (diff_remote + op->remote_element_count > op->byzantine_upper_bound) + diff_remote = op->byzantine_upper_bound - op->remote_element_count; + if (diff_local + op->local_element_count > op->byzantine_upper_bound) + diff_local = op->byzantine_upper_bound - op->local_element_count; + if ((diff_remote < 0) || (diff_local < 0)) + { + strata_estimator_destroy (remote_se); + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is " + "malicious: remote diff %ld, local diff: %ld\n", + diff_remote, diff_local); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } - if (diff > 200) - diff = diff * 3 / 2; + /* Make estimation more precise in initial sync cases */ + if (0 == op->remote_element_count) + { + diff_remote = 0; + diff_local = op->local_element_count; + } + if (0 == op->local_element_count) + { + diff_local = 0; + diff_remote = op->remote_element_count; + } + + diff = diff_remote + diff_local; + op->remote_set_diff = diff_remote; + + /** Calculate avg element size if not initial sync **/ + uint64_t avg_element_size = 0; + if (0 < op->local_element_count) + { + op->total_elements_size_local = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + & + determinate_avg_element_size_iterator, + op); + avg_element_size = op->total_elements_size_local / op->local_element_count; + } + + op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size, + GNUNET_CONTAINER_multihashmap_size ( + op->set->content-> + elements), + op-> + remote_element_count, + diff_remote, + diff_local, + op-> + rtt_bandwidth_tradeoff, + op-> + ibf_bucket_number_factor); + +#if MEASURE_PERFORMANCE + perf_store.se_diff_local = diff_local; + perf_store.se_diff_remote = diff_remote; + perf_store.se_diff = diff; + perf_store.mode_of_operation = op->mode_of_operation; +#endif strata_estimator_destroy (remote_se); strata_estimator_destroy (op->se); @@ -1523,7 +2504,8 @@ handle_union_p2p_strata_estimator (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", diff, - 1U << get_order_from_difference (diff)); + 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element, + op->ibf_bucket_number_factor)); { char *set_debug; @@ -1546,16 +2528,8 @@ handle_union_p2p_strata_estimator (void *cls, return; } - LOG (GNUNET_ERROR_TYPE_ERROR, - "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff); - - - /** - * Added rtt_bandwidth_tradeoff directly need future improvements - */ if ((GNUNET_YES == op->force_full) || - (diff > op->initial_size / 4) || - (0 == other_size)) + (op->mode_of_operation != DIFFERENTIAL_SYNC)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Deciding to go for full set transmission (diff=%d, own set=%llu)\n", @@ -1565,9 +2539,17 @@ handle_union_p2p_strata_estimator (void *cls, "# of full sends", 1, GNUNET_NO); - if ((op->initial_size <= other_size) || - (0 == other_size)) + if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation) { + struct TransmitFullMessage *signal_msg; + struct GNUNET_MQ_Envelope *ev; + ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage), + GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL); + signal_msg->remote_set_difference = htonl (diff_local); + signal_msg->remote_set_size = htonl (op->local_element_count); + signal_msg->local_set_difference = htonl (diff_remote); + GNUNET_MQ_send (op->mq, + ev); send_full_set (op); } else @@ -1577,9 +2559,15 @@ handle_union_p2p_strata_estimator (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Telling other peer that we expect its full set\n"); op->phase = PHASE_FULL_RECEIVING; - perf_rtt.request_full.sent += 1; - ev = GNUNET_MQ_msg_header ( - GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); +#if MEASURE_PERFORMANCE + perf_store.request_full.sent += 1; +#endif + struct TransmitFullMessage *signal_msg; + ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage), + GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL); + signal_msg->remote_set_difference = htonl (diff_local); + signal_msg->remote_set_size = htonl (op->local_element_count); + signal_msg->local_set_difference = htonl (diff_remote); GNUNET_MQ_send (op->mq, ev); } @@ -1592,7 +2580,9 @@ handle_union_p2p_strata_estimator (void *cls, GNUNET_NO); if (GNUNET_OK != send_ibf (op, - get_order_from_difference (diff))) + get_size_from_difference (diff, + op->ibf_number_buckets_per_element, + op->ibf_bucket_number_factor))) { /* Internal error, best we can do is shut the connection */ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1625,15 +2615,64 @@ send_offers_iterator (void *cls, /* Detect 32-bit key collision for the 64-bit IBF keys. */ if (ke->ibf_key.key_val != sec->ibf_key.key_val) + { + op->active_passive_switch_required = true; return GNUNET_YES; + } - perf_rtt.offer.sent += 1; - perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); + /* Prevent implementation from sending a offer multiple times in case of roll switch */ + if (GNUNET_YES == + is_message_in_message_control_flow ( + op->message_control_flow, + &ke->element->element_hash, + OFFER_MESSAGE) + ) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Skipping already sent processed element offer!\n"); + return GNUNET_YES; + } + /* Save send offer message for message control */ + if (GNUNET_YES != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_SENT, + &ke->element->element_hash, + OFFER_MESSAGE) + ) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Double offer message sent found!\n"); + GNUNET_break (0); + fail_union_operation (op); + return GNUNET_NO; + } + ; + + /* Mark element to be expected to received */ + if (GNUNET_YES != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_EXPECTED, + &ke->element->element_hash, + DEMAND_MESSAGE) + ) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Double demand received found!\n"); + GNUNET_break (0); + fail_union_operation (op); + return GNUNET_NO; + } + ; +#if MEASURE_PERFORMANCE + perf_store.offer.sent += 1; + perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode); +#endif ev = GNUNET_MQ_msg_header_extra (mh, sizeof(struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER); - GNUNET_assert (NULL != ev); *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1651,7 +2690,7 @@ send_offers_iterator (void *cls, * @param op union operation * @param ibf_key IBF key of interest */ -static void +void send_offers_for_key (struct Operation *op, struct IBF_Key ibf_key) { @@ -1694,6 +2733,7 @@ decode_and_send (struct Operation *op) /* allocation failed */ return GNUNET_SYSERR; } + diff_ibf = ibf_dup (op->local_ibf); ibf_subtract (diff_ibf, op->remote_ibf); @@ -1706,7 +2746,7 @@ decode_and_send (struct Operation *op) diff_ibf->size); num_decoded = 0; - key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ + key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ while (1) { @@ -1738,23 +2778,36 @@ decode_and_send (struct Operation *op) if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) { - int next_order; - next_order = 0; - while (1 << next_order < diff_ibf->size) - next_order++; - next_order++; - if (next_order <= MAX_IBF_ORDER) + uint32_t next_size; + /** Enforce odd ibf size **/ + + next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded, + diff_ibf->size); + /** Make ibf estimation size odd reasoning can be found in BSc Thesis of + * Elias Summermatter (2021) in section 3.11 **/ + uint32_t ibf_min_size = IBF_MIN_SIZE | 1; + + if (next_sizesalt_send++; +#if MEASURE_PERFORMANCE + perf_store.active_passive_switches += 1; +#endif + + op->salt_send = op->salt_receive++; + if (GNUNET_OK != - send_ibf (op, next_order)) + send_ibf (op, next_size)) { /* Internal error, best we can do is shut the connection */ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1786,7 +2839,9 @@ decode_and_send (struct Operation *op) LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); - perf_rtt.done.sent += 1; +#if MEASURE_PERFORMANCE + perf_store.done.sent += 1; +#endif ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); GNUNET_MQ_send (op->mq, ev); /* We now wait until we get a DONE message back @@ -1797,7 +2852,6 @@ decode_and_send (struct Operation *op) if (1 == side) { struct IBF_Key unsalted_key; - unsalt_key (&key, op->salt_receive, &unsalted_key); @@ -1809,8 +2863,29 @@ decode_and_send (struct Operation *op) struct GNUNET_MQ_Envelope *ev; struct InquiryMessage *msg; - perf_rtt.inquery.sent += 1; - perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key); +#if MEASURE_PERFORMANCE + perf_store.inquery.sent += 1; + perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key); +#endif + + /** Add sent inquiries to hashmap for flow control **/ + struct GNUNET_HashContext *hashed_key_context = + GNUNET_CRYPTO_hash_context_start (); + struct GNUNET_HashCode *hashed_key = (struct + GNUNET_HashCode*) GNUNET_malloc ( + sizeof(struct GNUNET_HashCode)); + enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT; + GNUNET_CRYPTO_hash_context_read (hashed_key_context, + &key, + sizeof(struct IBF_Key)); + GNUNET_CRYPTO_hash_context_finish (hashed_key_context, + hashed_key); + GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent, + hashed_key, + &mcfs, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE + ); + /* It may be nice to merge multiple requests, but with CADET's corking it is not worth * the effort additional complexity. */ ev = GNUNET_MQ_msg_extra (msg, @@ -1835,6 +2910,100 @@ decode_and_send (struct Operation *op) } +/** + * Check send full message received from other peer + * @param cls + * @param msg + * @return + */ + +static int +check_union_p2p_send_full (void *cls, + const struct TransmitFullMessage *msg) +{ + return GNUNET_OK; +} + + +/** + * Handle send full message received from other peer + * + * @param cls + * @param msg + */ +static void +handle_union_p2p_send_full (void *cls, + const struct TransmitFullMessage *msg) +{ + struct Operation *op = cls; + + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_EXPECT_IBF}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + + /** write received values to operator**/ + op->remote_element_count = ntohl (msg->remote_set_size); + op->remote_set_diff = ntohl (msg->remote_set_difference); + op->local_set_diff = ntohl (msg->local_set_difference); + + /** Check byzantine limits **/ + if (check_byzantine_bounds (op) != GNUNET_OK) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine " + "criteria\n"); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + /** Calculate avg element size if not initial sync **/ + op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( + op->set->content->elements); + uint64_t avg_element_size = 0; + if (0 < op->local_element_count) + { + op->total_elements_size_local = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + & + determinate_avg_element_size_iterator, + op); + avg_element_size = op->total_elements_size_local / op->local_element_count; + } + + /** Validate mode of operation **/ + int mode_of_operation = estimate_best_mode_of_operation (avg_element_size, + op-> + remote_element_count, + op-> + local_element_count, + op->local_set_diff, + op->remote_set_diff, + op-> + rtt_bandwidth_tradeoff, + op-> + ibf_bucket_number_factor); + if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been" + " : %d\n", mode_of_operation); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + op->phase = PHASE_FULL_RECEIVING; +} + + /** * Check an IBF message from a remote peer. * @@ -1872,7 +3041,8 @@ check_union_p2p_ibf (void *cls, GNUNET_break_op (0); return GNUNET_SYSERR; } - if (1 << msg->order != op->remote_ibf->size) + + if (msg->ibf_size != op->remote_ibf->size) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1909,9 +3079,26 @@ handle_union_p2p_ibf (void *cls, { struct Operation *op = cls; unsigned int buckets_in_message; + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST, + PHASE_PASSIVE_DECODING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + op->differential_sync_iterations++; + check_max_differential_rounds (op); + op->active_passive_switch_required = false; - perf_rtt.ibf.received += 1; - perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); +#if MEASURE_PERFORMANCE + perf_store.ibf.received += 1; + perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg); +#endif buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; @@ -1922,8 +3109,10 @@ handle_union_p2p_ibf (void *cls, GNUNET_assert (NULL == op->remote_ibf); LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating new ibf of size %u\n", - 1 << msg->order); - op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); + ntohl (msg->ibf_size)); + // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); + op->remote_ibf = ibf_create (msg->ibf_size, + ((uint8_t) op->ibf_number_buckets_per_element)); op->salt_receive = ntohl (msg->salt); LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", @@ -1954,7 +3143,7 @@ handle_union_p2p_ibf (void *cls, ibf_read_slice (&msg[1], op->ibf_buckets_received, buckets_in_message, - op->remote_ibf); + op->remote_ibf, msg->ibf_counter_bit_length); op->ibf_buckets_received += buckets_in_message; if (op->ibf_buckets_received == op->remote_ibf->size) @@ -2030,18 +3219,24 @@ maybe_finish (struct Operation *op) num_demanded = GNUNET_CONTAINER_multihashmap_size ( op->demanded_hashes); - + int send_done = GNUNET_CONTAINER_multihashmap_iterate ( + op->message_control_flow, + & + determinate_done_message_iterator, + op); if (PHASE_FINISH_WAITING == op->phase) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "In PHASE_FINISH_WAITING, pending %u demands\n", - num_demanded); - if (0 == num_demanded) + "In PHASE_FINISH_WAITING, pending %u demands -> %d\n", + num_demanded, op->peer_site); + if (-1 != send_done) { struct GNUNET_MQ_Envelope *ev; op->phase = PHASE_FINISHED; - perf_rtt.done.sent += 1; +#if MEASURE_PERFORMANCE + perf_store.done.sent += 1; +#endif ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE); GNUNET_MQ_send (op->mq, ev); @@ -2052,9 +3247,9 @@ maybe_finish (struct Operation *op) if (PHASE_FINISH_CLOSING == op->phase) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "In PHASE_FINISH_CLOSING, pending %u demands\n", - num_demanded); - if (0 == num_demanded) + "In PHASE_FINISH_CLOSING, pending %u demands %d\n", + num_demanded, op->peer_site); + if (-1 != send_done) { op->phase = PHASE_FINISHED; send_client_done (op); @@ -2102,11 +3297,25 @@ handle_union_p2p_elements (void *cls, struct KeyEntry *ke; uint16_t element_size; + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING, + PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } element_size = ntohs (emsg->header.size) - sizeof(struct GNUNET_SETU_ElementMessage); - perf_rtt.element.received += 1; - perf_rtt.element.received_var_bytes += element_size; +#if MEASURE_PERFORMANCE + perf_store.element.received += 1; + perf_store.element.received_var_bytes += element_size; +#endif ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); GNUNET_memcpy (&ee[1], @@ -2129,6 +3338,21 @@ handle_union_p2p_elements (void *cls, return; } + if (GNUNET_OK != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_RECEIVED, + &ee->element_hash, + ELEMENT_MESSAGE) + ) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "An element has been received more than once!\n"); + GNUNET_break (0); + fail_union_operation (op); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Got element (size %u, hash %s) from peer\n", (unsigned int) element_size, @@ -2217,33 +3441,25 @@ handle_union_p2p_full_element (void *cls, struct KeyEntry *ke; uint16_t element_size; - - - if(PHASE_EXPECT_IBF == op->phase) { - op->phase = PHASE_FULL_RECEIVING; - } - - - - /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */ - if ((PHASE_FULL_RECEIVING != op->phase) && - (PHASE_FULL_SENDING != op->phase)) + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Handle full element phase is %u\n", - (unsigned) op->phase); - GNUNET_break_op (0); - fail_union_operation (op); - return; + GNUNET_break (0); + fail_union_operation (op); + return; } - - element_size = ntohs (emsg->header.size) - sizeof(struct GNUNET_SETU_ElementMessage); - perf_rtt.element_full.received += 1; - perf_rtt.element_full.received_var_bytes += element_size; +#if MEASURE_PERFORMANCE + perf_store.element_full.received += 1; + perf_store.element_full.received_var_bytes += element_size; +#endif ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); GNUNET_memcpy (&ee[1], &emsg[1], element_size); @@ -2268,17 +3484,15 @@ handle_union_p2p_full_element (void *cls, GNUNET_NO); op->received_total++; - ke = op_get_element (op, &ee->element_hash); if (NULL != ke) { - /* Got repeated element. Should not happen since - * we track demands. */ GNUNET_STATISTICS_update (_GSS_statistics, "# repeated elements", 1, GNUNET_NO); + full_sync_plausibility_check (op); ke->received = GNUNET_YES; GNUNET_free (ee); } @@ -2294,15 +3508,15 @@ handle_union_p2p_full_element (void *cls, GNUNET_SETU_STATUS_ADD_LOCAL); } + if ((GNUNET_YES == op->byzantine) && - (op->received_total > 384 + op->received_fresh * 4) && - (op->received_fresh < op->received_total / 6)) + (op->received_total > op->remote_element_count) ) { /* The other peer gave us lots of old elements, there's something wrong. */ LOG (GNUNET_ERROR_TYPE_ERROR, - "Other peer sent only %llu/%llu fresh elements, failing operation\n", - (unsigned long long) op->received_fresh, - (unsigned long long) op->received_total); + "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n", + (unsigned long long) op->received_total, + (unsigned long long) op->remote_element_count); GNUNET_break_op (0); fail_union_operation (op); return; @@ -2356,18 +3570,50 @@ handle_union_p2p_inquiry (void *cls, const struct IBF_Key *ibf_key; unsigned int num_keys; - perf_rtt.inquery.received += 1; - perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage)); + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + +#if MEASURE_PERFORMANCE + perf_store.inquery.received += 1; + perf_store.inquery.received_var_bytes += (ntohs (msg->header.size) + - sizeof(struct InquiryMessage)); +#endif LOG (GNUNET_ERROR_TYPE_DEBUG, "Received union inquiry\n"); num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) / sizeof(struct IBF_Key); ibf_key = (const struct IBF_Key *) &msg[1]; + + /** Add received inquiries to hashmap for flow control **/ + struct GNUNET_HashContext *hashed_key_context = + GNUNET_CRYPTO_hash_context_start (); + struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc ( + sizeof(struct GNUNET_HashCode));; + enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED; + GNUNET_CRYPTO_hash_context_read (hashed_key_context, + &ibf_key, + sizeof(struct IBF_Key)); + GNUNET_CRYPTO_hash_context_finish (hashed_key_context, + hashed_key); + GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent, + hashed_key, + &mcfs, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE + ); + while (0 != num_keys--) { struct IBF_Key unsalted_key; - unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); @@ -2402,7 +3648,9 @@ send_missing_full_elements_iter (void *cls, if (GNUNET_YES == ke->received) return GNUNET_YES; - perf_rtt.element_full.received += 1; +#if MEASURE_PERFORMANCE + perf_store.element_full.received += 1; +#endif ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT); @@ -2422,18 +3670,84 @@ send_missing_full_elements_iter (void *cls, * @param cls closure, a set union operation * @param mh the demand message */ +static int +check_union_p2p_request_full (void *cls, + const struct TransmitFullMessage *mh) +{ + return GNUNET_OK; +} + + static void handle_union_p2p_request_full (void *cls, - const struct GNUNET_MessageHeader *mh) + const struct TransmitFullMessage *msg) { struct Operation *op = cls; - perf_rtt.request_full.received += 1; + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_EXPECT_IBF}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + + op->remote_element_count = ntohl (msg->remote_set_size); + op->remote_set_diff = ntohl (msg->remote_set_difference); + op->local_set_diff = ntohl (msg->local_set_difference); - LOG (GNUNET_ERROR_TYPE_DEBUG, + + if (check_byzantine_bounds (op) != GNUNET_OK) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine " + "criteria\n"); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + +#if MEASURE_PERFORMANCE + perf_store.request_full.received += 1; +#endif + + LOG (GNUNET_ERROR_TYPE_DEBUG, "Received request for full set transmission\n"); - if (PHASE_EXPECT_IBF != op->phase) + + /** Calculate avg element size if not initial sync **/ + op->local_element_count = GNUNET_CONTAINER_multihashmap_size ( + op->set->content->elements); + uint64_t avg_element_size = 0; + if (0 < op->local_element_count) + { + op->total_elements_size_local = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + & + determinate_avg_element_size_iterator, + op); + avg_element_size = op->total_elements_size_local / op->local_element_count; + } + + int mode_of_operation = estimate_best_mode_of_operation (avg_element_size, + op-> + remote_element_count, + op-> + local_element_count, + op->local_set_diff, + op->remote_set_diff, + op-> + rtt_bandwidth_tradeoff, + op-> + ibf_bucket_number_factor); + if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation) { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been" + " : %d\n", mode_of_operation); GNUNET_break_op (0); fail_union_operation (op); return; @@ -2458,7 +3772,21 @@ handle_union_p2p_full_done (void *cls, { struct Operation *op = cls; - perf_rtt.full_done.received += 1; + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + +#if MEASURE_PERFORMANCE + perf_store.full_done.received += 1; +#endif switch (op->phase) { @@ -2466,6 +3794,19 @@ handle_union_p2p_full_done (void *cls, { struct GNUNET_MQ_Envelope *ev; + if ((GNUNET_YES == op->byzantine) && + (op->received_total != op->remote_element_count) ) + { + /* The other peer gave not enough elements before sending full done, there's something wrong. */ + LOG (GNUNET_ERROR_TYPE_ERROR, + "Other peer sent only %llu/%llu fresh elements, failing operation\n", + (unsigned long long) op->received_total, + (unsigned long long) op->remote_element_count); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); @@ -2473,7 +3814,9 @@ handle_union_p2p_full_done (void *cls, GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element, &send_missing_full_elements_iter, op); - perf_rtt.full_done.sent += 1; +#if MEASURE_PERFORMANCE + perf_store.full_done.sent += 1; +#endif ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE); GNUNET_MQ_send (op->mq, ev); @@ -2552,8 +3895,23 @@ handle_union_p2p_demand (void *cls, unsigned int num_hashes; struct GNUNET_MQ_Envelope *ev; - perf_rtt.demand.received += 1; - perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING, + PHASE_FINISH_WAITING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } +#if MEASURE_PERFORMANCE + perf_store.demand.received += 1; + perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct + GNUNET_MessageHeader)); +#endif num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) / sizeof(struct GNUNET_HashCode); @@ -2570,6 +3928,39 @@ handle_union_p2p_demand (void *cls, fail_union_operation (op); return; } + + /* Save send demand message for message control */ + if (GNUNET_YES != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_RECEIVED, + &ee->element_hash, + DEMAND_MESSAGE) + ) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Double demand message received found!\n"); + GNUNET_break (0); + fail_union_operation (op); + return; + } + ; + + /* Mark element to be expected to received */ + if (GNUNET_YES != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_SENT, + &ee->element_hash, + ELEMENT_MESSAGE) + ) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Double element message sent found!\n"); + GNUNET_break (0); + fail_union_operation (op); + return; + } if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) { /* Probably confused lazily copied sets. */ @@ -2577,8 +3968,10 @@ handle_union_p2p_demand (void *cls, fail_union_operation (op); return; } - perf_rtt.element.sent += 1; - perf_rtt.element.sent_var_bytes += ee->element.size; +#if MEASURE_PERFORMANCE + perf_store.element.sent += 1; + perf_store.element.sent_var_bytes += ee->element.size; +#endif ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS); @@ -2600,9 +3993,10 @@ handle_union_p2p_demand (void *cls, if (op->symmetric) send_client_element (op, &ee->element, - GNUNET_SET_STATUS_ADD_REMOTE); + GNUNET_SETU_STATUS_ADD_REMOTE); } GNUNET_CADET_receive_done (op->channel); + maybe_finish (op); } @@ -2653,9 +4047,23 @@ handle_union_p2p_offer (void *cls, struct Operation *op = cls; const struct GNUNET_HashCode *hash; unsigned int num_hashes; + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } - perf_rtt.offer.received += 1; - perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)); +#if MEASURE_PERFORMANCE + perf_store.offer.received += 1; + perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct + GNUNET_MessageHeader)); +#endif num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) / sizeof(struct GNUNET_HashCode); @@ -2693,11 +4101,68 @@ handle_union_p2p_offer (void *cls, "[OP %p] Requesting element (hash %s)\n", op, GNUNET_h2s (hash)); - perf_rtt.demand.sent += 1; - perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); +#if MEASURE_PERFORMANCE + perf_store.demand.sent += 1; + perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode); +#endif ev = GNUNET_MQ_msg_header_extra (demands, sizeof(struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND); + /* Save send demand message for message control */ + if (GNUNET_YES != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_SENT, + hash, + DEMAND_MESSAGE) + ) + { + // GNUNET_free (ev); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Double demand message sent found!\n"); + GNUNET_break (0); + fail_union_operation (op); + return; + } + ; + + /* Mark offer as received received */ + if (GNUNET_YES != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_RECEIVED, + hash, + OFFER_MESSAGE) + ) + { + // GNUNET_free (ev); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Double offer message received found!\n"); + GNUNET_break (0); + fail_union_operation (op); + return; + } + ; + + /* Mark element to be expected to received */ + if (GNUNET_YES != + update_message_control_flow ( + op->message_control_flow, + MSG_CFS_EXPECTED, + hash, + ELEMENT_MESSAGE) + ) + { + // GNUNET_free (ev); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Element already expected!\n"); + GNUNET_break (0); + fail_union_operation (op); + return; + } + ; + + GNUNET_memcpy (&demands[1], hash, sizeof(struct GNUNET_HashCode)); @@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls, { struct Operation *op = cls; - perf_rtt.done.received += 1; + /** + * Check that the message is received only in supported phase + */ + uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING}; + if (GNUNET_OK != + check_valid_phase (allowed_phases,sizeof(allowed_phases),op)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + + if (op->active_passive_switch_required) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "PROTOCOL VIOLATION: Received done but role change is necessary\n"); + GNUNET_break (0); + fail_union_operation (op); + return; + } + +#if MEASURE_PERFORMANCE + perf_store.done.received += 1; +#endif switch (op->phase) { case PHASE_PASSIVE_DECODING: @@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "got DONE (as passive partner), waiting for our demands to be satisfied\n"); /* The active peer is done sending offers - * and inquiries. This means that all - * our responses to that (demands and offers) - * must be in flight (queued or in mesh). - * - * We should notify the active peer once - * all our demands are satisfied, so that the active - * peer can quit if we gave it everything. - */GNUNET_CADET_receive_done (op->channel); + * and inquiries. This means that all + * our responses to that (demands and offers) + * must be in flight (queued or in mesh). + * + * We should notify the active peer once + * all our demands are satisfied, so that the active + * peer can quit if we gave it everything. + */GNUNET_CADET_receive_done (op->channel); maybe_finish (op); return; case PHASE_ACTIVE_DECODING: LOG (GNUNET_ERROR_TYPE_DEBUG, "got DONE (as active partner), waiting to finish\n"); /* All demands of the other peer are satisfied, - * and we processed all offers, thus we know - * exactly what our demands must be. - * - * We'll close the channel - * to the other peer once our demands are met. - */op->phase = PHASE_FINISH_CLOSING; + * and we processed all offers, thus we know + * exactly what our demands must be. + * + * We'll close the channel + * to the other peer once our demands are met. + */op->phase = PHASE_FINISH_CLOSING; GNUNET_CADET_receive_done (op->channel); maybe_finish (op); return; @@ -2769,7 +4257,9 @@ static void handle_union_p2p_over (void *cls, const struct GNUNET_MessageHeader *mh) { - perf_rtt.over.received += 1; +#if MEASURE_PERFORMANCE + perf_store.over.received += 1; +#endif send_client_done (cls); } @@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls, struct Listener *listener = op->listener; const struct GNUNET_MessageHeader *nested_context; - /* double operation request */ + /* double operation request */ if (0 != op->suggest_id) { GNUNET_break_op (0); @@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls, } set = GNUNET_new (struct Set); { - struct StrataEstimator *se; + struct MultiStrataEstimator *se; se = strata_estimator_create (SE_STRATA_COUNT, - SE_IBF_SIZE, + SE_IBFS_TOTAL_SIZE, SE_IBF_HASH_NUM); if (NULL == se) { @@ -3198,6 +4688,7 @@ channel_window_cb (void *cls, * @param cls client that sent the message * @param msg message sent by the client */ + static void handle_client_listen (void *cls, const struct GNUNET_SETU_ListenMessage *msg) @@ -3240,10 +4731,10 @@ handle_client_listen (void *cls, GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, struct GNUNET_MessageHeader, NULL), - GNUNET_MQ_hd_fixed_size (union_p2p_request_full, - GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, - struct GNUNET_MessageHeader, - NULL), + GNUNET_MQ_hd_var_size (union_p2p_request_full, + GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, + struct TransmitFullMessage, + NULL), GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SETU_P2P_SE, struct StrataEstimatorMessage, @@ -3256,6 +4747,10 @@ handle_client_listen (void *cls, GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, struct GNUNET_SETU_ElementMessage, NULL), + GNUNET_MQ_hd_var_size (union_p2p_send_full, + GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL, + struct TransmitFullMessage, + NULL), GNUNET_MQ_handler_end () }; struct Listener *listener; @@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls, { struct ClientState *cs = cls; struct Operation *op = GNUNET_new (struct Operation); + const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { GNUNET_MQ_hd_var_size (incoming_msg, GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, @@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls, GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, struct GNUNET_MessageHeader, op), - GNUNET_MQ_hd_fixed_size (union_p2p_request_full, - GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, - struct GNUNET_MessageHeader, - op), + GNUNET_MQ_hd_var_size (union_p2p_request_full, + GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, + struct TransmitFullMessage, + op), GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SETU_P2P_SE, struct StrataEstimatorMessage, @@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls, GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, struct GNUNET_SETU_ElementMessage, op), + GNUNET_MQ_hd_var_size (union_p2p_send_full, + GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL, + struct TransmitFullMessage, + NULL), GNUNET_MQ_handler_end () }; struct Set *set; @@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls, op->force_full = msg->force_full; op->force_delta = msg->force_delta; op->symmetric = msg->symmetric; + op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff; + op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor; + op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element; + op->byzantine_upper_bound = msg->byzantine_upper_bond; + op->active_passive_switch_required = false; context = GNUNET_MQ_extract_nested_mh (msg); + /* create hashmap for message control */ + op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32, + GNUNET_NO); + op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO); + +#if MEASURE_PERFORMANCE + /* load config */ + load_config (op); +#endif + /* Advance generation values, so that mutations won't interfer with the running operation. */ op->set = set; @@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls, struct GNUNET_MQ_Envelope *ev; struct OperationRequestMessage *msg; - perf_rtt.operation_request.sent += 1; +#if MEASURE_PERFORMANCE + perf_store.operation_request.sent += 1; +#endif ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, context); @@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls, op->se = strata_estimator_dup (op->set->se); /* we started the operation, thus we have to send the operation request */ op->phase = PHASE_EXPECT_SE; - op->salt_receive = op->salt_send = 42; // FIXME????? + + op->salt_receive = (op->peer_site + 1) % 2; + op->salt_send = op->peer_site; // FIXME????? + + LOG (GNUNET_ERROR_TYPE_DEBUG, "Initiating union operation evaluation\n"); GNUNET_STATISTICS_update (_GSS_statistics, @@ -3711,6 +5232,20 @@ handle_client_accept (void *cls, op->force_full = msg->force_full; op->force_delta = msg->force_delta; op->symmetric = msg->symmetric; + op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff; + op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor; + op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element; + op->byzantine_upper_bound = msg->byzantine_upper_bond; + op->active_passive_switch_required = false; + /* create hashmap for message control */ + op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32, + GNUNET_NO); + op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO); + +#if MEASURE_PERFORMANCE + /* load config */ + load_config (op); +#endif /* Advance generation values, so that future mutations do not interfer with the running operation. */ @@ -3729,7 +5264,7 @@ handle_client_accept (void *cls, 1, GNUNET_NO); { - const struct StrataEstimator *se; + struct MultiStrataEstimator *se; struct GNUNET_MQ_Envelope *ev; struct StrataEstimatorMessage *strata_msg; char *buf; @@ -3739,20 +5274,40 @@ handle_client_accept (void *cls, op->se = strata_estimator_dup (op->set->se); op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); - op->salt_receive = op->salt_send = 42; // FIXME????? + op->salt_receive = (op->peer_site + 1) % 2; + op->salt_send = op->peer_site; // FIXME????? initialize_key_to_element (op); op->initial_size = GNUNET_CONTAINER_multihashmap32_size ( op->key_to_element); /* kick off the operation */ se = op->se; - buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); + + uint8_t se_count = 1; + if (op->initial_size > 0) + { + op->total_elements_size_local = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + & + determinate_avg_element_size_iterator, + op); + se_count = determine_strata_count ( + op->total_elements_size_local / op->initial_size, + op->initial_size); + } + buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE + * ((SE_IBFS_TOTAL_SIZE / 8) * se_count)); len = strata_estimator_write (se, + SE_IBFS_TOTAL_SIZE, + se_count, buf); - perf_rtt.se.sent += 1; - perf_rtt.se.sent_var_bytes += len; +#if MEASURE_PERFORMANCE + perf_store.se.sent += 1; + perf_store.se.sent_var_bytes += len; +#endif - if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) + if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE + * SE_IBFS_TOTAL_SIZE) type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC; else type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE; @@ -3766,6 +5321,7 @@ handle_client_accept (void *cls, strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( op->set->content->elements)); + strata_msg->se_count = se_count; GNUNET_MQ_send (op->mq, ev); op->phase = PHASE_EXPECT_IBF; @@ -3800,8 +5356,9 @@ shutdown_task (void *cls) GNUNET_YES); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "RTT:%f\n", calculate_perf_rtt()); +#if MEASURE_PERFORMANCE + calculate_perf_store (); +#endif } diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h index a2803ee47..d8f34f69c 100644 --- a/src/setu/gnunet-service-setu_protocol.h +++ b/src/setu/gnunet-service-setu_protocol.h @@ -39,11 +39,6 @@ struct OperationRequestMessage */ struct GNUNET_MessageHeader header; - /** - * Operation to request, values from `enum GNUNET_SET_OperationType` - */ - uint32_t operation GNUNET_PACKED; - /** * For Intersection: my element count */ @@ -72,20 +67,9 @@ struct IBFMessage struct GNUNET_MessageHeader header; /** - * Order of the whole ibf, where - * num_buckets = 2^order - */ - uint8_t order; - - /** - * Padding, must be 0. + * Size of the whole ibf (number of buckets) */ - uint8_t reserved1; - - /** - * Padding, must be 0. - */ - uint16_t reserved2 GNUNET_PACKED; + uint32_t ibf_size; /** * Offset of the strata in the rest of the message @@ -95,10 +79,22 @@ struct IBFMessage /** * Salt used when hashing elements for this IBF. */ - uint32_t salt GNUNET_PACKED; + uint16_t salt GNUNET_PACKED; + /** + * The bit lenght of the counter + */ + uint16_t ibf_counter_bit_length; /* rest: buckets */ }; +/** +estimate_best_mode_of_operation (uint64_t avg_element_size, +uint64_t local_set_size, + uint64_t remote_set_size, +uint64_t est_set_diff_remote, + uint64_t est_set_diff_local,) + **/ + struct InquiryMessage @@ -113,11 +109,6 @@ struct InquiryMessage */ uint32_t salt GNUNET_PACKED; - /** - * Reserved, set to 0. - */ - uint32_t reserved GNUNET_PACKED; - /* rest: inquiry IBF keys */ }; @@ -218,9 +209,47 @@ struct StrataEstimatorMessage */ struct GNUNET_MessageHeader header; + /** + * The number of ses transmitted + */ + uint8_t se_count; + + /** + * Size of the local set + */ uint64_t set_size; }; + +/** + * Message which signals to other peer that we are sending full set + * + */ +struct TransmitFullMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL + */ + struct GNUNET_MessageHeader header; + + /** + * Remote set difference calculated with strata estimator + */ + uint32_t remote_set_difference; + + /** + * Total remote set size + */ + uint32_t remote_set_size; + + /** + * Local set difference calculated with strata estimator + */ + uint32_t local_set_difference; + +}; + + GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c index 7c9a4deb6..7981cc847 100644 --- a/src/setu/gnunet-service-setu_strata_estimator.c +++ b/src/setu/gnunet-service-setu_strata_estimator.c @@ -22,6 +22,7 @@ * @brief invertible bloom filter * @author Florian Dold * @author Christian Grothoff + * @author Elias Summermatter */ #include "platform.h" #include "gnunet_util_lib.h" @@ -29,6 +30,82 @@ #include "gnunet-service-setu_strata_estimator.h" +/** + * Should we try compressing the strata estimator? This will + * break compatibility with the 0.10.1-network. + */ +#define FAIL_10_1_COMPATIBILTIY 1 + +/** + * Number of strata estimators in memory NOT transmitted + */ + +#define MULTI_SE_BASE_COUNT 8 + +/** + * The avg size of 1 se + * Based on the bsc thesis of Elias Summermatter (2021) + */ + +#define AVG_BYTE_SIZE_SE 4221 + +/** + * Calculates the optimal number of strata Estimators to send + * @param avg_element_size + * @param element_count + * @return + */ +uint8_t +determine_strata_count (uint64_t avg_element_size, uint64_t element_count) +{ + uint64_t base_size = avg_element_size * element_count; + /* >67kb total size of elements in set */ + if (base_size < AVG_BYTE_SIZE_SE * 16) + return 1; + /* >270kb total size of elements in set */ + if (base_size < AVG_BYTE_SIZE_SE * 64) + return 2; + /* >1mb total size of elements in set */ + if (base_size < AVG_BYTE_SIZE_SE * 256) + return 4; + return 8; +} + + +/** + * Modify an IBF key @a k_in based on the @a salt, returning a + * salted key in @a k_out. + */ +static void +salt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = (salt * 7) % 64; + uint64_t x = k_in->key_val; + + /* rotate ibf key */ + x = (x >> s) | (x << (64 - s)); + k_out->key_val = x; +} + + +/** + * Reverse modification done in the salt_key function + */ +static void +unsalt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = (salt * 7) % 64; + uint64_t x = k_in->key_val; + + x = (x << s) | (x >> (64 - s)); + k_out->key_val = x; +} + + /** * Write the given strata estimator to the buffer. * @@ -37,21 +114,33 @@ * @return number of bytes written to @a buf */ size_t -strata_estimator_write (const struct StrataEstimator *se, +strata_estimator_write (struct MultiStrataEstimator *se, + uint16_t se_ibf_total_size, + uint8_t number_se_send, void *buf) { char *sbuf = buf; + unsigned int i; size_t osize; + uint64_t sbuf_offset = 0; + se->size = number_se_send; GNUNET_assert (NULL != se); - for (unsigned int i = 0; i < se->strata_count; i++) + for (uint8_t strata_ctr = 0; strata_ctr < number_se_send; strata_ctr++) { - ibf_write_slice (se->strata[i], - 0, - se->ibf_size, - &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]); + for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) + { + ibf_write_slice (se->stratas[strata_ctr]->strata[i], + 0, + se->stratas[strata_ctr]->ibf_size, + &sbuf[sbuf_offset], + 8); + sbuf_offset += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE; + } } - osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; + osize = ((se_ibf_total_size / 8) * number_se_send) * IBF_BUCKET_SIZE + * se->stratas[0]->strata_count; +#if FAIL_10_1_COMPATIBILTIY { char *cbuf; size_t nsize; @@ -62,13 +151,12 @@ strata_estimator_write (const struct StrataEstimator *se, &cbuf, &nsize)) { - GNUNET_memcpy (buf, - cbuf, - nsize); + GNUNET_memcpy (buf, cbuf, nsize); osize = nsize; GNUNET_free (cbuf); } } +#endif return osize; } @@ -87,15 +175,19 @@ int strata_estimator_read (const void *buf, size_t buf_len, int is_compressed, - struct StrataEstimator *se) + uint8_t number_se_received, + uint16_t se_ibf_total_size, + struct MultiStrataEstimator *se) { + unsigned int i; size_t osize; char *dbuf; dbuf = NULL; if (GNUNET_YES == is_compressed) { - osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; + osize = ((se_ibf_total_size / 8) * number_se_received) * IBF_BUCKET_SIZE + * se->stratas[0]->strata_count; dbuf = GNUNET_decompress (buf, buf_len, osize); @@ -108,18 +200,25 @@ strata_estimator_read (const void *buf, buf_len = osize; } - if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE) + if (buf_len != se->stratas[0]->strata_count * ((se_ibf_total_size / 8) + * number_se_received) + * IBF_BUCKET_SIZE) { GNUNET_break (0); /* very odd error */ GNUNET_free (dbuf); return GNUNET_SYSERR; } - for (unsigned int i = 0; i < se->strata_count; i++) + for (uint8_t strata_ctr = 0; strata_ctr < number_se_received; strata_ctr++) { - ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); - buf += se->ibf_size * IBF_BUCKET_SIZE; + for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) + { + ibf_read_slice (buf, 0, se->stratas[strata_ctr]->ibf_size, + se->stratas[strata_ctr]->strata[i], 8); + buf += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE; + } } + se->size = number_se_received; GNUNET_free (dbuf); return GNUNET_OK; } @@ -132,38 +231,61 @@ strata_estimator_read (const void *buf, * @param key key to add */ void -strata_estimator_insert (struct StrataEstimator *se, +strata_estimator_insert (struct MultiStrataEstimator *se, struct IBF_Key key) { - uint64_t v; - unsigned int i; - v = key.key_val; + /* count trailing '1'-bits of v */ - for (i = 0; v & 1; v >>= 1, i++) - /* empty */; - ibf_insert (se->strata[i], key); + for (int strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) + { + unsigned int i; + uint64_t v; + + struct IBF_Key salted_key; + salt_key (&key, + strata_ctr * (64 / MULTI_SE_BASE_COUNT), + &salted_key); + v = salted_key.key_val; + for (i = 0; v & 1; v >>= 1, i++) + { + ibf_insert (se->stratas[strata_ctr]->strata[i], salted_key); + } + } + /* empty */; + } /** - * Remove a key from the strata estimator. + * Remove a key from the strata estimator. (NOT USED) * * @param se strata estimator to remove the key from * @param key key to remove */ void -strata_estimator_remove (struct StrataEstimator *se, +strata_estimator_remove (struct MultiStrataEstimator *se, struct IBF_Key key) { - uint64_t v; - unsigned int i; - v = key.key_val; /* count trailing '1'-bits of v */ - for (i = 0; v & 1; v >>= 1, i++) - /* empty */; - ibf_remove (se->strata[i], key); + for (int strata_ctr = 0; strata_ctr < se->size; strata_ctr++) + { + uint64_t v; + unsigned int i; + + struct IBF_Key unsalted_key; + unsalt_key (&key, + strata_ctr * (64 / MULTI_SE_BASE_COUNT), + &unsalted_key); + + v = unsalted_key.key_val; + for (i = 0; v & 1; v >>= 1, i++) + { + /* empty */; + ibf_remove (se->stratas[strata_ctr]->strata[i], unsalted_key); + } + } } @@ -175,29 +297,42 @@ strata_estimator_remove (struct StrataEstimator *se, * @param ibf_hashnum hashnum parameter of each ibf * @return a freshly allocated, empty strata estimator, NULL on error */ -struct StrataEstimator * +struct MultiStrataEstimator * strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum) { - struct StrataEstimator *se; - - se = GNUNET_new (struct StrataEstimator); - se->strata_count = strata_count; - se->ibf_size = ibf_size; - se->strata = GNUNET_new_array (strata_count, - struct InvertibleBloomFilter *); - for (unsigned int i = 0; i < strata_count; i++) + struct MultiStrataEstimator *se; + unsigned int i; + unsigned int j; + se = GNUNET_new (struct MultiStrataEstimator); + + se->size = MULTI_SE_BASE_COUNT; + se->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *); + + uint8_t ibf_prime_sizes[] = {79,79,79,79,79,79,79,79}; + + for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) { - se->strata[i] = ibf_create (ibf_size, ibf_hashnum); - if (NULL == se->strata[i]) + se->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator); + se->stratas[strata_ctr]->strata_count = strata_count; + se->stratas[strata_ctr]->ibf_size = ibf_prime_sizes[strata_ctr]; + se->stratas[strata_ctr]->strata = GNUNET_new_array (strata_count * 4, + struct + InvertibleBloomFilter *); + for (i = 0; i < strata_count; i++) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to allocate memory for strata estimator\n"); - for (unsigned int j = 0; j < i; j++) - ibf_destroy (se->strata[i]); - GNUNET_free (se); - return NULL; + se->stratas[strata_ctr]->strata[i] = ibf_create ( + ibf_prime_sizes[strata_ctr], ibf_hashnum); + if (NULL == se->stratas[strata_ctr]->strata[i]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate memory for strata estimator\n"); + for (j = 0; j < i; j++) + ibf_destroy (se->stratas[strata_ctr]->strata[i]); + GNUNET_free (se); + return NULL; + } } } return se; @@ -213,46 +348,71 @@ strata_estimator_create (unsigned int strata_count, * @param se2 second strata estimator * @return the estimated difference */ -unsigned int -strata_estimator_difference (const struct StrataEstimator *se1, - const struct StrataEstimator *se2) +void +strata_estimator_difference (const struct MultiStrataEstimator *se1, + const struct MultiStrataEstimator *se2) { - unsigned int count; + int avg_local_diff = 0; + int avg_remote_diff = 0; + uint8_t number_of_estimators = se1->size; - GNUNET_assert (se1->strata_count == se2->strata_count); - count = 0; - for (int i = se1->strata_count - 1; i >= 0; i--) + for (uint8_t strata_ctr = 0; strata_ctr < number_of_estimators; strata_ctr++) { - struct InvertibleBloomFilter *diff; - /* number of keys decoded from the ibf */ - - /* FIXME: implement this without always allocating new IBFs */ - diff = ibf_dup (se1->strata[i]); - ibf_subtract (diff, - se2->strata[i]); - for (int ibf_count = 0; GNUNET_YES; ibf_count++) + GNUNET_assert (se1->stratas[strata_ctr]->strata_count == + se2->stratas[strata_ctr]->strata_count); + + + for (int i = se1->stratas[strata_ctr]->strata_count - 1; i >= 0; i--) { - int more; + struct InvertibleBloomFilter *diff; + /* number of keys decoded from the ibf */ - more = ibf_decode (diff, - NULL, - NULL); - if (GNUNET_NO == more) - { - count += ibf_count; - break; - } - /* Estimate if decoding fails or would not terminate */ - if ( (GNUNET_SYSERR == more) || - (ibf_count > diff->size) ) + /* FIXME: implement this without always allocating new IBFs */ + diff = ibf_dup (se1->stratas[strata_ctr]->strata[i]); + diff->local_decoded_count = 0; + diff->remote_decoded_count = 0; + + ibf_subtract (diff, se2->stratas[strata_ctr]->strata[i]); + + for (int ibf_count = 0; GNUNET_YES; ibf_count++) { - ibf_destroy (diff); - return count * (1 << (i + 1)); + int more; + + more = ibf_decode (diff, NULL, NULL); + if (GNUNET_NO == more) + { + se1->stratas[strata_ctr]->strata[0]->local_decoded_count += + diff->local_decoded_count; + se1->stratas[strata_ctr]->strata[0]->remote_decoded_count += + diff->remote_decoded_count; + break; + } + /* Estimate if decoding fails or would not terminate */ + if ((GNUNET_SYSERR == more) || (ibf_count > diff->size)) + { + se1->stratas[strata_ctr]->strata[0]->local_decoded_count = + se1->stratas[strata_ctr]->strata[0]->local_decoded_count * (1 << (i + + + 1)); + se1->stratas[strata_ctr]->strata[0]->remote_decoded_count = + se1->stratas[strata_ctr]->strata[0]->remote_decoded_count * (1 << (i + + + 1)); + ibf_destroy (diff); + goto break_all_counting_loops; + } } + ibf_destroy (diff); } - ibf_destroy (diff); +break_all_counting_loops:; + avg_local_diff += se1->stratas[strata_ctr]->strata[0]->local_decoded_count; + avg_remote_diff += + se1->stratas[strata_ctr]->strata[0]->remote_decoded_count; } - return count; + se1->stratas[0]->strata[0]->local_decoded_count = avg_local_diff + / number_of_estimators; + se1->stratas[0]->strata[0]->remote_decoded_count = avg_remote_diff + / number_of_estimators; } @@ -262,18 +422,28 @@ strata_estimator_difference (const struct StrataEstimator *se1, * @param se the strata estimator to copy * @return the copy */ -struct StrataEstimator * -strata_estimator_dup (struct StrataEstimator *se) +struct MultiStrataEstimator * +strata_estimator_dup (struct MultiStrataEstimator *se) { - struct StrataEstimator *c; - - c = GNUNET_new (struct StrataEstimator); - c->strata_count = se->strata_count; - c->ibf_size = se->ibf_size; - c->strata = GNUNET_new_array (se->strata_count, - struct InvertibleBloomFilter *); - for (unsigned int i = 0; i < se->strata_count; i++) - c->strata[i] = ibf_dup (se->strata[i]); + struct MultiStrataEstimator *c; + unsigned int i; + + c = GNUNET_new (struct MultiStrataEstimator); + c->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *); + for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) + { + c->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator); + c->stratas[strata_ctr]->strata_count = + se->stratas[strata_ctr]->strata_count; + c->stratas[strata_ctr]->ibf_size = se->stratas[strata_ctr]->ibf_size; + c->stratas[strata_ctr]->strata = GNUNET_new_array ( + se->stratas[strata_ctr]->strata_count, + struct + InvertibleBloomFilter *); + for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) + c->stratas[strata_ctr]->strata[i] = ibf_dup ( + se->stratas[strata_ctr]->strata[i]); + } return c; } @@ -284,10 +454,14 @@ strata_estimator_dup (struct StrataEstimator *se) * @param se strata estimator to destroy. */ void -strata_estimator_destroy (struct StrataEstimator *se) +strata_estimator_destroy (struct MultiStrataEstimator *se) { - for (unsigned int i = 0; i < se->strata_count; i++) - ibf_destroy (se->strata[i]); - GNUNET_free (se->strata); + unsigned int i; + for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++) + { + for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++) + ibf_destroy (se->stratas[strata_ctr]->strata[i]); + GNUNET_free (se->stratas[strata_ctr]->strata); + } GNUNET_free (se); } diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h index afdbcdbbf..4871a7fcd 100644 --- a/src/setu/gnunet-service-setu_strata_estimator.h +++ b/src/setu/gnunet-service-setu_strata_estimator.h @@ -22,6 +22,7 @@ * @file set/gnunet-service-setu_strata_estimator.h * @brief estimator of set difference * @author Florian Dold + * @author Elias Summermatter */ #ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H @@ -61,6 +62,31 @@ struct StrataEstimator unsigned int ibf_size; }; +struct MultiStrataEstimator +{ + /** + * Array of strata estimators + */ + struct StrataEstimator **stratas; + + /** + * Number of strata estimators in struct + */ + uint8_t size; + +}; + +/** + * Deteminate how many strata estimators in the message are necessary + * @param avg_element_size + * @param element_count + * @return number of strata's + */ + +uint8_t +determine_strata_count (uint64_t avg_element_size, + uint64_t element_count); + /** * Write the given strata estimator to the buffer. @@ -70,7 +96,9 @@ struct StrataEstimator * @return number of bytes written to @a buf */ size_t -strata_estimator_write (const struct StrataEstimator *se, +strata_estimator_write (struct MultiStrataEstimator *se, + uint16_t se_ibf_total_size, + uint8_t number_se_send, void *buf); @@ -88,7 +116,9 @@ int strata_estimator_read (const void *buf, size_t buf_len, int is_compressed, - struct StrataEstimator *se); + uint8_t number_se_received, + uint16_t se_ibf_total_size, + struct MultiStrataEstimator *se); /** @@ -99,7 +129,7 @@ strata_estimator_read (const void *buf, * @param ibf_hashnum hashnum parameter of each ibf * @return a freshly allocated, empty strata estimator, NULL on error */ -struct StrataEstimator * +struct MultiStrataEstimator * strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); @@ -111,11 +141,11 @@ strata_estimator_create (unsigned int strata_count, * * @param se1 first strata estimator * @param se2 second strata estimator - * @return abs(|se1| - |se2|) + * @return nothing */ -unsigned int -strata_estimator_difference (const struct StrataEstimator *se1, - const struct StrataEstimator *se2); +void +strata_estimator_difference (const struct MultiStrataEstimator *se1, + const struct MultiStrataEstimator *se2); /** @@ -125,7 +155,7 @@ strata_estimator_difference (const struct StrataEstimator *se1, * @param key key to add */ void -strata_estimator_insert (struct StrataEstimator *se, +strata_estimator_insert (struct MultiStrataEstimator *se, struct IBF_Key key); @@ -136,7 +166,7 @@ strata_estimator_insert (struct StrataEstimator *se, * @param key key to remove */ void -strata_estimator_remove (struct StrataEstimator *se, +strata_estimator_remove (struct MultiStrataEstimator *se, struct IBF_Key key); @@ -146,7 +176,7 @@ strata_estimator_remove (struct StrataEstimator *se, * @param se strata estimator to destroy. */ void -strata_estimator_destroy (struct StrataEstimator *se); +strata_estimator_destroy (struct MultiStrataEstimator *se); /** @@ -155,8 +185,8 @@ strata_estimator_destroy (struct StrataEstimator *se); * @param se the strata estimator to copy * @return the copy */ -struct StrataEstimator * -strata_estimator_dup (struct StrataEstimator *se); +struct MultiStrataEstimator * +strata_estimator_dup (struct MultiStrataEstimator *se); #if 0 /* keep Emacsens' auto-indent happy */ diff --git a/src/setu/ibf.c b/src/setu/ibf.c index 1beba9065..8f29adb62 100644 --- a/src/setu/ibf.c +++ b/src/setu/ibf.c @@ -20,11 +20,15 @@ /** * @file set/ibf.c - * @brief implementation of the invertible Bloom filter + * @brief implementation of the invertible bloom filter * @author Florian Dold + * @author Elias Summermatter */ #include "ibf.h" +#include "gnunet_util_lib.h" +#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__) + /** * Compute the key's hash from the key. @@ -58,11 +62,12 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst) { struct IBF_Key *p; + unsigned int i; const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode) / sizeof(struct IBF_Key); p = (struct IBF_Key *) dst; - for (unsigned int i = 0; i < keys_per_hashcode; i++) + for (i = 0; i < keys_per_hashcode; i++) *p++ = key; } @@ -75,14 +80,14 @@ ibf_hashcode_from_key (struct IBF_Key key, * @return the newly created invertible bloom filter, NULL on error */ struct InvertibleBloomFilter * -ibf_create (uint32_t size, - uint8_t hash_num) +ibf_create (uint32_t size, uint8_t hash_num) { struct InvertibleBloomFilter *ibf; GNUNET_assert (0 != size); + ibf = GNUNET_new (struct InvertibleBloomFilter); - ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t)); + ibf->count = GNUNET_malloc_large (size * sizeof(uint64_t)); if (NULL == ibf->count) { GNUNET_free (ibf); @@ -105,6 +110,7 @@ ibf_create (uint32_t size, } ibf->size = size; ibf->hash_num = hash_num; + return ibf; } @@ -121,8 +127,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf, uint32_t i; uint32_t bucket; - bucket = GNUNET_CRYPTO_crc32_n (&key, - sizeof (key)); + bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key); for (i = 0, filled = 0; filled < ibf->hash_num; i++) { uint64_t x; @@ -133,8 +138,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf, dst[filled++] = bucket % ibf->size; try_next: x = ((uint64_t) bucket << 32) | i; - bucket = GNUNET_CRYPTO_crc32_n (&x, - sizeof (x)); + bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x); } } @@ -170,13 +174,8 @@ ibf_insert (struct InvertibleBloomFilter *ibf, int buckets[ibf->hash_num]; GNUNET_assert (ibf->hash_num <= ibf->size); - ibf_get_indices (ibf, - key, - buckets); - ibf_insert_into (ibf, - key, - buckets, - 1); + ibf_get_indices (ibf, key, buckets); + ibf_insert_into (ibf, key, buckets, 1); } @@ -193,13 +192,8 @@ ibf_remove (struct InvertibleBloomFilter *ibf, int buckets[ibf->hash_num]; GNUNET_assert (ibf->hash_num <= ibf->size); - ibf_get_indices (ibf, - key, - buckets); - ibf_insert_into (ibf, - key, - buckets, - -1); + ibf_get_indices (ibf, key, buckets); + ibf_insert_into (ibf, key, buckets, -1); } @@ -244,6 +238,8 @@ ibf_decode (struct InvertibleBloomFilter *ibf, for (uint32_t i = 0; i < ibf->size; i++) { + int hit; + /* we can only decode from pure buckets */ if ( (1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val) ) @@ -257,30 +253,33 @@ ibf_decode (struct InvertibleBloomFilter *ibf, /* test if key in bucket hits its own location, * if not, the key hash was subject to collision */ - { - bool hit = false; + hit = GNUNET_NO; + ibf_get_indices (ibf, ibf->key_sum[i], buckets); + for (int j = 0; j < ibf->hash_num; j++) + if (buckets[j] == i) + hit = GNUNET_YES; - ibf_get_indices (ibf, - ibf->key_sum[i], - buckets); - for (int j = 0; j < ibf->hash_num; j++) - if (buckets[j] == i) - { - hit = true; - break; - } - if (! hit) - continue; + if (GNUNET_NO == hit) + continue; + + if (1 == ibf->count[i].count_val) + { + ibf->remote_decoded_count++; } + else + { + ibf->local_decoded_count++; + } + + if (NULL != ret_side) *ret_side = ibf->count[i].count_val; if (NULL != ret_id) *ret_id = ibf->key_sum[i]; /* insert on the opposite side, effectively removing the element */ - ibf_insert_into (ibf, - ibf->key_sum[i], buckets, - -ibf->count[i].count_val); + ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val); + return GNUNET_YES; } @@ -290,6 +289,26 @@ ibf_decode (struct InvertibleBloomFilter *ibf, } +/** + * Returns the minimal bytes needed to store the counter of the IBF + * + * @param ibf the IBF + */ +uint8_t +ibf_get_max_counter (struct InvertibleBloomFilter *ibf) +{ + long long max_counter = 0; + for (uint64_t i = 0; i < ibf->size; i++) + { + if (ibf->count[i].count_val > max_counter) + { + max_counter = ibf->count[i].count_val; + } + } + return 64 - __builtin_clzll (max_counter); +} + + /** * Write buckets from an ibf to a buffer. * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. @@ -298,16 +317,17 @@ ibf_decode (struct InvertibleBloomFilter *ibf, * @param start with which bucket to start * @param count how many buckets to write * @param buf buffer to write the data to + * @param max bit length of a counter for unpacking */ void ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, - uint32_t count, - void *buf) + uint64_t count, + void *buf, + uint8_t counter_max_length) { struct IBF_Key *key_dst; struct IBF_KeyHash *key_hash_dst; - struct IBF_Count *count_dst; GNUNET_assert (start + count <= ibf->size); @@ -315,19 +335,182 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, key_dst = (struct IBF_Key *) buf; GNUNET_memcpy (key_dst, ibf->key_sum + start, - count * sizeof *key_dst); + count * sizeof(*key_dst)); key_dst += count; /* copy key hashes */ key_hash_dst = (struct IBF_KeyHash *) key_dst; GNUNET_memcpy (key_hash_dst, ibf->key_hash_sum + start, - count * sizeof *key_hash_dst); + count * sizeof(*key_hash_dst)); key_hash_dst += count; - /* copy counts */ - count_dst = (struct IBF_Count *) key_hash_dst; - GNUNET_memcpy (count_dst, - ibf->count + start, - count * sizeof *count_dst); + + /* pack and copy counter */ + pack_counter (ibf, + start, + count, + (uint8_t *) key_hash_dst, + counter_max_length); + + +} + + +/** + * Packs the counter to transmit only the smallest possible amount of bytes and + * preventing overflow of the counter + * @param ibf the ibf to write + * @param start with which bucket to start + * @param count how many buckets to write + * @param buf buffer to write the data to + * @param max bit length of a counter for unpacking + */ + +void +pack_counter (const struct InvertibleBloomFilter *ibf, + uint32_t start, + uint64_t count, + uint8_t *buf, + uint8_t counter_max_length) +{ + uint8_t store_size = 0; + uint8_t store = 0; + uint16_t byte_ctr = 0; + + /** + * Iterate over IBF bucket + */ + for (uint64_t i = start; i< (count + start);) + { + uint64_t count_val_to_write = ibf->count[i].count_val; + uint8_t count_len_to_write = counter_max_length; + + /** + * Pack and compose counters to byte values + */ + while ((count_len_to_write + store_size) >= 8) + { + uint8_t bit_shift = 0; + + /** + * Shift bits if more than a byte has to be written + * or the store size is not empty + */ + if ((store_size > 0) || (count_len_to_write > 8)) + { + uint8_t bit_unused = 8 - store_size; + bit_shift = count_len_to_write - bit_unused; + store = store << bit_unused; + } + + buf[byte_ctr] = ((count_val_to_write >> bit_shift) | store) & 0xFF; + byte_ctr++; + count_len_to_write -= (8 - store_size); + count_val_to_write = count_val_to_write & ((1ULL << + count_len_to_write) - 1); + store = 0; + store_size = 0; + } + store = (store << count_len_to_write) | count_val_to_write; + store_size = store_size + count_len_to_write; + count_len_to_write = 0; + i++; + } + + /** + * Pack data left in story before finishing + */ + if (store_size > 0) + { + buf[byte_ctr] = store << (8 - store_size); + byte_ctr++; + } + +} + + +/** + * Unpacks the counter to transmit only the smallest possible amount of bytes and + * preventing overflow of the counter + * @param ibf the ibf to write + * @param start with which bucket to start + * @param count how many buckets to write + * @param buf buffer to write the data to + * @param max bit length of a counter for unpacking + */ + +void +unpack_counter (const struct InvertibleBloomFilter *ibf, + uint32_t start, + uint64_t count, + uint8_t *buf, + uint8_t counter_max_length) +{ + uint64_t ibf_counter_ctr = 0; + uint64_t store = 0; + uint64_t store_bit_ctr = 0; + uint64_t byte_ctr = 0; + + /** + * Iterate over received bytes + */ + while (true) + { + uint8_t byte_read = buf[byte_ctr]; + uint8_t bit_to_read_left = 8; + byte_ctr++; + + /** + * Pack data left in story before finishing + */ + while (bit_to_read_left >= 0) + { + /** + * Stop decoding when end is reached + */ + if (ibf_counter_ctr > (count - 1)) + return; + + /* + * Unpack the counter + */ + if ((store_bit_ctr + bit_to_read_left) >= counter_max_length) + { + uint8_t bytes_used = counter_max_length - store_bit_ctr; + if (store_bit_ctr > 0) + { + store = store << bytes_used; + } + + uint8_t bytes_to_shift = bit_to_read_left - bytes_used; + uint64_t counter_part = byte_read >> bytes_to_shift; + store = store | counter_part; + ibf->count[ibf_counter_ctr + start].count_val = store; + byte_read = byte_read & ((1 << bytes_to_shift) - 1); + bit_to_read_left -= bytes_used; + ibf_counter_ctr++; + store = 0; + store_bit_ctr = 0; + } + else + { + store_bit_ctr += bit_to_read_left; + if (0 == store) + { + store = byte_read; + } + else + { + store = store << bit_to_read_left; + store = store | byte_read; + } + break; + + } + + } + + } + } @@ -338,12 +521,14 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, * @param start which bucket to start at * @param count how many buckets to read * @param ibf the ibf to read from + * @param max bit length of a counter for unpacking */ void ibf_read_slice (const void *buf, uint32_t start, - uint32_t count, - struct InvertibleBloomFilter *ibf) + uint64_t count, + struct InvertibleBloomFilter *ibf, + uint8_t counter_max_length) { struct IBF_Key *key_src; struct IBF_KeyHash *key_hash_src; @@ -364,11 +549,10 @@ ibf_read_slice (const void *buf, key_hash_src, count * sizeof *key_hash_src); key_hash_src += count; - /* copy counts */ + + /* copy and unpack counts */ count_src = (struct IBF_Count *) key_hash_src; - GNUNET_memcpy (ibf->count + start, - count_src, - count * sizeof *count_src); + unpack_counter (ibf,start,count,(uint8_t *) count_src,counter_max_length); } diff --git a/src/setu/ibf.h b/src/setu/ibf.h index 7c2ab33b1..5628405dc 100644 --- a/src/setu/ibf.h +++ b/src/setu/ibf.h @@ -22,6 +22,7 @@ * @file set/ibf.h * @brief invertible bloom filter * @author Florian Dold + * @author Elias Summermatter */ #ifndef GNUNET_CONSENSUS_IBF_H @@ -62,7 +63,7 @@ struct IBF_KeyHash */ struct IBF_Count { - int8_t count_val; + int64_t count_val; }; @@ -92,6 +93,20 @@ struct InvertibleBloomFilter */ uint8_t hash_num; + /** + * If an IBF is decoded this count stores how many + * elements are on the local site. This is used + * to estimate the set difference on a site + */ + int local_decoded_count; + + /** + * If an IBF is decoded this count stores how many + * elements are on the remote site. This is used + * to estimate the set difference on a site + */ + int remote_decoded_count; + /** * Xor sums of the elements' keys, used to identify the elements. * Array of 'size' elements. @@ -125,8 +140,9 @@ struct InvertibleBloomFilter void ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, - uint32_t count, - void *buf); + uint64_t count, + void *buf, + uint8_t counter_max_length); /** @@ -140,8 +156,9 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, void ibf_read_slice (const void *buf, uint32_t start, - uint32_t count, - struct InvertibleBloomFilter *ibf); + uint64_t count, + struct InvertibleBloomFilter *ibf, + uint8_t counter_max_length); /** @@ -244,6 +261,44 @@ ibf_dup (const struct InvertibleBloomFilter *ibf); void ibf_destroy (struct InvertibleBloomFilter *ibf); +uint8_t +ibf_get_max_counter (struct InvertibleBloomFilter *ibf); + + +/** + * Packs the counter to transmit only the smallest possible amount of bytes and + * preventing overflow of the counter + * @param ibf the ibf to write + * @param start with which bucket to start + * @param count how many buckets to write + * @param buf buffer to write the data to + * @param max bit length of a counter for unpacking + */ + +void +pack_counter (const struct InvertibleBloomFilter *ibf, + uint32_t start, + uint64_t count, + uint8_t *buf, + uint8_t counter_max_length); + +/** + * Unpacks the counter to transmit only the smallest possible amount of bytes and + * preventing overflow of the counter + * @param ibf the ibf to write + * @param start with which bucket to start + * @param count how many buckets to write + * @param buf buffer to write the data to + * @param max bit length of a counter for unpacking + */ + +void +unpack_counter (const struct InvertibleBloomFilter *ibf, + uint32_t start, + uint64_t count, + uint8_t *buf, + uint8_t counter_max_length); + #if 0 /* keep Emacsens' auto-indent happy */ { diff --git a/src/setu/perf_setu_api.c b/src/setu/perf_setu_api.c index b273f9c71..af84994f8 100644 --- a/src/setu/perf_setu_api.c +++ b/src/setu/perf_setu_api.c @@ -22,11 +22,14 @@ * @file set/test_setu_api.c * @brief testcase for setu_api.c * @author Florian Dold + * @author Elias Summermatter */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_testing_lib.h" #include "gnunet_setu_service.h" +#include +#include static struct GNUNET_PeerIdentity local_id; @@ -50,6 +53,12 @@ static int ret; static struct GNUNET_SCHEDULER_Task *tt; +/** + * Handles configuration file for setu performance test + * + */ +static struct GNUNET_CONFIGURATION_Handle *setu_cfg; + static void result_cb_set1 (void *cls, @@ -57,44 +66,44 @@ result_cb_set1 (void *cls, uint64_t size, enum GNUNET_SETU_Status status) { - switch (status) + switch (status) + { + case GNUNET_SETU_STATUS_ADD_LOCAL: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n"); + break; + + case GNUNET_SETU_STATUS_FAILURE: + GNUNET_break (0); + oh1 = NULL; + fprintf (stderr, "set 1: received failure status!\n"); + ret = 1; + if (NULL != tt) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + } + GNUNET_SCHEDULER_shutdown (); + break; + + case GNUNET_SETU_STATUS_DONE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n"); + oh1 = NULL; + if (NULL != set1) { - case GNUNET_SETU_STATUS_ADD_LOCAL: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n"); - break; - - case GNUNET_SETU_STATUS_FAILURE: - GNUNET_break (0); - oh1 = NULL; - fprintf (stderr, "set 1: received failure status!\n"); - ret = 1; - if (NULL != tt) - { - GNUNET_SCHEDULER_cancel (tt); - tt = NULL; - } - GNUNET_SCHEDULER_shutdown (); - break; - - case GNUNET_SETU_STATUS_DONE: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n"); - oh1 = NULL; - if (NULL != set1) - { - GNUNET_SETU_destroy (set1); - set1 = NULL; - } - if (NULL == set2) - { - GNUNET_SCHEDULER_cancel (tt); - tt = NULL; - GNUNET_SCHEDULER_shutdown (); - } - break; - - default: - GNUNET_assert (0); + GNUNET_SETU_destroy (set1); + set1 = NULL; } + if (NULL == set2) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + GNUNET_SCHEDULER_shutdown (); + } + break; + + default: + GNUNET_assert (0); + } } @@ -104,36 +113,36 @@ result_cb_set2 (void *cls, uint64_t size, enum GNUNET_SETU_Status status) { - switch (status) + switch (status) + { + case GNUNET_SETU_STATUS_ADD_LOCAL: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n"); + break; + + case GNUNET_SETU_STATUS_FAILURE: + GNUNET_break (0); + oh2 = NULL; + fprintf (stderr, "set 2: received failure status\n"); + GNUNET_SCHEDULER_shutdown (); + ret = 1; + break; + + case GNUNET_SETU_STATUS_DONE: + oh2 = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n"); + GNUNET_SETU_destroy (set2); + set2 = NULL; + if (NULL == set1) { - case GNUNET_SETU_STATUS_ADD_LOCAL: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n"); - break; - - case GNUNET_SETU_STATUS_FAILURE: - GNUNET_break (0); - oh2 = NULL; - fprintf (stderr, "set 2: received failure status\n"); - GNUNET_SCHEDULER_shutdown (); - ret = 1; - break; - - case GNUNET_SETU_STATUS_DONE: - oh2 = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n"); - GNUNET_SETU_destroy (set2); - set2 = NULL; - if (NULL == set1) - { - GNUNET_SCHEDULER_cancel (tt); - tt = NULL; - GNUNET_SCHEDULER_shutdown (); - } - break; - - default: - GNUNET_assert (0); + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + GNUNET_SCHEDULER_shutdown (); } + break; + + default: + GNUNET_assert (0); + } } @@ -143,14 +152,14 @@ listen_cb (void *cls, const struct GNUNET_MessageHeader *context_msg, struct GNUNET_SETU_Request *request) { - GNUNET_assert (NULL != context_msg); - GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n"); - oh2 = GNUNET_SETU_accept (request, - (struct GNUNET_SETU_Option[]){ 0 }, - &result_cb_set2, - NULL); - GNUNET_SETU_commit (oh2, set2); + GNUNET_assert (NULL != context_msg); + GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n"); + oh2 = GNUNET_SETU_accept (request, + (struct GNUNET_SETU_Option[]){ 0 }, + &result_cb_set2, + NULL); + GNUNET_SETU_commit (oh2, set2); } @@ -162,122 +171,89 @@ listen_cb (void *cls, static void start (void *cls) { - struct GNUNET_MessageHeader context_msg; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n"); - context_msg.size = htons (sizeof context_msg); - context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); - listen_handle = GNUNET_SETU_listen (config, - &app_id, - &listen_cb, - NULL); - oh1 = GNUNET_SETU_prepare (&local_id, - &app_id, - &context_msg, - (struct GNUNET_SETU_Option[]){ 0 }, - &result_cb_set1, - NULL); - GNUNET_SETU_commit (oh1, set1); + struct GNUNET_MessageHeader context_msg; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n"); + context_msg.size = htons (sizeof context_msg); + context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); + listen_handle = GNUNET_SETU_listen (config, + &app_id, + &listen_cb, + NULL); + oh1 = GNUNET_SETU_prepare (&local_id, + &app_id, + &context_msg, + (struct GNUNET_SETU_Option[]){ 0 }, + &result_cb_set1, + NULL); + GNUNET_SETU_commit (oh1, set1); } -/** - * Initialize the second set, continue - * - * @param cls closure, unused - */ -static void -init_set2 (void *cls) -{ - struct GNUNET_SETU_Element element; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); - - element.element_type = 0; - element.data = "hello1"; - element.size = strlen (element.data); - GNUNET_SETU_add_element (set2, &element, NULL, NULL); - element.data = "quux"; - element.size = strlen (element.data); - GNUNET_SETU_add_element (set2, &element, NULL, NULL); - element.data = "baz"; - element.size = strlen (element.data); - GNUNET_SETU_add_element (set2, &element, &start, NULL); -} - /** * Generate random byte stream */ -unsigned char *gen_rdm_bytestream (size_t num_bytes) +unsigned char * +gen_rdm_bytestream (size_t num_bytes) { - unsigned char *stream = GNUNET_malloc (num_bytes); - GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); - return stream; + unsigned char *stream = GNUNET_malloc (num_bytes); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); + return stream; } + /** * Generate random sets */ static void -initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes) +initRandomSets (int overlap, int set1_size, int set2_size, int + element_size_in_bytes) { - struct GNUNET_SETU_Element element; - element.element_type = 0; - - // Add elements to both sets - for (int i = 0; i < overlap; i++) { - element.data = gen_rdm_bytestream(element_size_in_bytes); - element.size = element_size_in_bytes; - GNUNET_SETU_add_element (set1, &element, NULL, NULL); - GNUNET_SETU_add_element (set2, &element, NULL, NULL); - set1_size--; - set2_size--; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); - - // Add other elements to set 1 - while(set1_size>0) { - element.data = gen_rdm_bytestream(element_size_in_bytes); - element.size = element_size_in_bytes; - GNUNET_SETU_add_element (set1, &element, NULL, NULL); - set1_size--; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); - - // Add other elements to set 2 - while(set2_size > 0) { - element.data = gen_rdm_bytestream(element_size_in_bytes); - element.size = element_size_in_bytes; + struct GNUNET_SETU_Element element; + element.element_type = 0; + + // Add elements to both sets + for (int i = 0; i < overlap; i++) + { + element.data = gen_rdm_bytestream (element_size_in_bytes); + element.size = element_size_in_bytes; + GNUNET_SETU_add_element (set1, &element, NULL, NULL); + GNUNET_SETU_add_element (set2, &element, NULL, NULL); + set1_size--; + set2_size--; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); + + // Add other elements to set 1 + while (set1_size>0) + { + element.data = gen_rdm_bytestream (element_size_in_bytes); + element.size = element_size_in_bytes; + GNUNET_SETU_add_element (set1, &element, NULL, NULL); + set1_size--; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); - if(set2_size != 1) { - GNUNET_SETU_add_element (set2, &element,NULL, NULL); - } else { - GNUNET_SETU_add_element (set2, &element,&start, NULL); - } + // Add other elements to set 2 + while (set2_size > 0) + { + element.data = gen_rdm_bytestream (element_size_in_bytes); + element.size = element_size_in_bytes; - set2_size--; + if (set2_size != 1) + { + GNUNET_SETU_add_element (set2, &element,NULL, NULL); + } + else + { + GNUNET_SETU_add_element (set2, &element,&start, NULL); } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n"); -} - -/** - * Initialize the first set, continue. - */ -static void -init_set1 (void) -{ - struct GNUNET_SETU_Element element; - element.element_type = 0; - element.data = "hello"; - element.size = strlen (element.data); - GNUNET_SETU_add_element (set1, &element, NULL, NULL); - element.data = "bar"; - element.size = strlen (element.data); - GNUNET_SETU_add_element (set1, &element, &init_set2, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); + set2_size--; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n"); } @@ -289,10 +265,10 @@ init_set1 (void) static void timeout_fail (void *cls) { - tt = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n"); - GNUNET_SCHEDULER_shutdown (); - ret = 1; + tt = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n"); + GNUNET_SCHEDULER_shutdown (); + ret = 1; } @@ -304,36 +280,36 @@ timeout_fail (void *cls) static void do_shutdown (void *cls) { - if (NULL != tt) - { - GNUNET_SCHEDULER_cancel (tt); - tt = NULL; - } - if (NULL != oh1) - { - GNUNET_SETU_operation_cancel (oh1); - oh1 = NULL; - } - if (NULL != oh2) - { - GNUNET_SETU_operation_cancel (oh2); - oh2 = NULL; - } - if (NULL != set1) - { - GNUNET_SETU_destroy (set1); - set1 = NULL; - } - if (NULL != set2) - { - GNUNET_SETU_destroy (set2); - set2 = NULL; - } - if (NULL != listen_handle) - { - GNUNET_SETU_listen_cancel (listen_handle); - listen_handle = NULL; - } + if (NULL != tt) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + } + if (NULL != oh1) + { + GNUNET_SETU_operation_cancel (oh1); + oh1 = NULL; + } + if (NULL != oh2) + { + GNUNET_SETU_operation_cancel (oh2); + oh2 = NULL; + } + if (NULL != set1) + { + GNUNET_SETU_destroy (set1); + set1 = NULL; + } + if (NULL != set2) + { + GNUNET_SETU_destroy (set2); + set2 = NULL; + } + if (NULL != listen_handle) + { + GNUNET_SETU_listen_cancel (listen_handle); + listen_handle = NULL; + } } @@ -350,79 +326,148 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_TESTING_Peer *peer) { - struct GNUNET_SETU_OperationHandle *my_oh; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Running preparatory tests\n"); - tt = GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), - &timeout_fail, - NULL); - GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); - - config = cfg; - GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg, - &local_id)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "my id (from CRYPTO): %s\n", - GNUNET_i2s (&local_id)); - GNUNET_TESTING_peer_get_identity (peer, - &local_id); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "my id (from TESTING): %s\n", - GNUNET_i2s (&local_id)); - set1 = GNUNET_SETU_create (cfg); - set2 = GNUNET_SETU_create (cfg); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Created sets %p and %p for union operation\n", - set1, - set2); - GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); - - /* test if canceling an uncommitted request works! */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Launching and instantly stopping set operation\n"); - my_oh = GNUNET_SETU_prepare (&local_id, - &app_id, - NULL, - (struct GNUNET_SETU_Option[]){ 0 }, - NULL, - NULL); - GNUNET_SETU_operation_cancel (my_oh); - - /* test the real set reconciliation */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Running real set-reconciliation\n"); - //init_set1 (); - // limit ~23800 element total - initRandomSets(50,100,100,128); + struct GNUNET_SETU_OperationHandle *my_oh; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Running preparatory tests\n"); + tt = GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), + &timeout_fail, + NULL); + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); + + config = cfg; + GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg, + &local_id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "my id (from CRYPTO): %s\n", + GNUNET_i2s (&local_id)); + GNUNET_TESTING_peer_get_identity (peer, + &local_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "my id (from TESTING): %s\n", + GNUNET_i2s (&local_id)); + set1 = GNUNET_SETU_create (cfg); + set2 = GNUNET_SETU_create (cfg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Created sets %p and %p for union operation\n", + set1, + set2); + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); + + /* test if canceling an uncommited request works! */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Launching and instantly stopping set operation\n"); + my_oh = GNUNET_SETU_prepare (&local_id, + &app_id, + NULL, + (struct GNUNET_SETU_Option[]){ 0 }, + NULL, + NULL); + GNUNET_SETU_operation_cancel (my_oh); + + /* test the real set reconciliation */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Running real set-reconciliation\n"); + // init_set1 (); + // limit ~23800 element total + initRandomSets (490, 500,500,32); } -static void execute_perf() + +void +perf_thread () { - for( int repeat_ctr = 0; repeat_ctr<1; repeat_ctr++ ) { + GNUNET_TESTING_service_run ("perf_setu_api", + "arm", + "test_setu.conf", + &run, + NULL); + +} - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Executing perf round %d\n", repeat_ctr); - GNUNET_TESTING_service_run ("perf_setu_api", - "arm", - "test_setu.conf", - &run, - NULL); +static void +run_petf_thread (int total_runs) +{ + int core_count = get_nprocs_conf (); + pid_t child_pid, wpid; + int status = 0; + +// Father code (before child processes start) + for (int processed = 0; processed < total_runs;) + { + for (int id = 0; id < core_count; id++) + { + if (processed >= total_runs) + break; + + if ((child_pid = fork ()) == 0) + { + perf_thread (); + exit (0); + } + processed += 1; } - return 0; + while ((wpid = wait (&status)) > 0) + ; + + } } +static void +execute_perf () +{ + + /** + * Erase statfile + */ + remove ("perf_stats.csv"); + remove ("perf_failure_bucket_number_factor.csv"); + for (int out_out_ctr = 3; out_out_ctr <= 3; out_out_ctr++) + { + + for (int out_ctr = 20; out_ctr <= 20; out_ctr++) + { + float base = 0.1; + float x = out_ctr * base; + char factor[10]; + char *buffer = gcvt (x, 4, factor); + setu_cfg = GNUNET_CONFIGURATION_create (); + GNUNET_CONFIGURATION_set_value_string (setu_cfg, "IBF", + "BUCKET_NUMBER_FACTOR", + buffer); // Factor default=4 + GNUNET_CONFIGURATION_set_value_number (setu_cfg, "IBF", + "NUMBER_PER_BUCKET", 3); // K default=4 + GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE", + "TRADEOFF", "2"); // default=0.25 + GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE", + "MAX_SET_DIFF_FACTOR_DIFFERENTIAL", + "20000"); // default=0.25 + GNUNET_CONFIGURATION_set_value_number (setu_cfg, "BOUNDARIES", + "UPPER_ELEMENT", 5000); + + + if (GNUNET_OK != GNUNET_CONFIGURATION_write (setu_cfg, "perf_setu.conf")) + GNUNET_log ( + GNUNET_ERROR_TYPE_ERROR, + _ ("Failed to write subsystem default identifier map'.\n")); + run_petf_thread (100); + } + + } + return; +} + int main (int argc, char **argv) { - GNUNET_log_setup ("perf_setu_api", - "WARNING", - NULL); - execute_perf(); - return 0; + GNUNET_log_setup ("perf_setu_api", + "WARNING", + NULL); + execute_perf (); + return 0; } diff --git a/src/setu/setu.h b/src/setu/setu.h index 7c2a98a02..7b606f12c 100644 --- a/src/setu/setu.h +++ b/src/setu/setu.h @@ -122,6 +122,31 @@ struct GNUNET_SETU_AcceptMessage */ uint32_t byzantine_lower_bound; + + /** + * Upper bound for the set size, used only when + * byzantine mode is enabled. + */ + uint64_t byzantine_upper_bond; + + /** + * Bandwidth latency tradeoff determines how much bytes a single RTT is + * worth, which is a performance setting + */ + uint64_t bandwidth_latency_tradeoff; + + /** + * The factor determines the number of buckets an IBF has which is + * multiplied by the estimated setsize default: 2 + */ + uint64_t ibf_bucket_number_factor; + + /** + * This setting determines to how many IBF buckets an single elements + * is mapped to. + */ + uint64_t ibf_number_of_buckets_per_element; + }; @@ -226,6 +251,30 @@ struct GNUNET_SETU_EvaluateMessage */ uint32_t byzantine_lower_bound; + /** + * Upper bound for the set size, used only when + * byzantine mode is enabled. + */ + uint64_t byzantine_upper_bond; + + /** + * Bandwidth latency tradeoff determines how much bytes a single RTT is + * worth, which is a performance setting + */ + uint64_t bandwidth_latency_tradeoff; + + /** + * The factor determines the number of buckets an IBF has which is + * multiplied by the estimated setsize default: 2 + */ + uint64_t ibf_bucket_number_factor; + + /** + * This setting determines to how many IBF buckets an single elements + * is mapped to. + */ + uint64_t ibf_number_of_buckets_per_element; + /* rest: context message, that is, application-specific message to convince listener to pick up */ }; diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c index 0a09b18b2..faa57aaba 100644 --- a/src/setu/setu_api.c +++ b/src/setu/setu_api.c @@ -22,6 +22,7 @@ * @brief api for the set union service * @author Florian Dold * @author Christian Grothoff + * @author Elias Summermatter */ #include "platform.h" #include "gnunet_util_lib.h" @@ -526,6 +527,14 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer, context_msg); msg->app_id = *app_id; msg->target_peer = *other_peer; + + /* Set default values */ + msg->byzantine_upper_bond = UINT64_MAX; + msg->bandwidth_latency_tradeoff = 0; + msg->ibf_bucket_number_factor = 2; + msg->ibf_number_of_buckets_per_element = 3; + + for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) { switch (opt->type) @@ -534,6 +543,18 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer, msg->byzantine = GNUNET_YES; msg->byzantine_lower_bound = htonl (opt->v.num); break; + case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND: + msg->byzantine_upper_bond = htonl (opt->v.num); + break; + case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF: + msg->bandwidth_latency_tradeoff = htonl (opt->v.num); + break; + case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR: + msg->ibf_bucket_number_factor = htonl (opt->v.num); + break; + case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT: + msg->ibf_number_of_buckets_per_element = htonl (opt->v.num); + break; case GNUNET_SETU_OPTION_FORCE_FULL: msg->force_full = GNUNET_YES; break; @@ -788,6 +809,13 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request, mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETU_ACCEPT); msg->accept_reject_id = htonl (request->accept_id); + + /* Set default values */ + msg->byzantine_upper_bond = UINT64_MAX; + msg->bandwidth_latency_tradeoff = 0; + msg->ibf_bucket_number_factor = 2; + msg->ibf_number_of_buckets_per_element = 3; + for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) { switch (opt->type) @@ -796,6 +824,18 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request, msg->byzantine = GNUNET_YES; msg->byzantine_lower_bound = htonl (opt->v.num); break; + case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND: + msg->byzantine_upper_bond = htonl (opt->v.num); + break; + case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF: + msg->bandwidth_latency_tradeoff = htonl (opt->v.num); + break; + case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR: + msg->ibf_bucket_number_factor = htonl (opt->v.num); + break; + case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT: + msg->ibf_number_of_buckets_per_element = htonl (opt->v.num); + break; case GNUNET_SETU_OPTION_FORCE_FULL: msg->force_full = GNUNET_YES; break; diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c index 2fb7d015e..5a0c9d70d 100644 --- a/src/setu/test_setu_api.c +++ b/src/setu/test_setu_api.c @@ -204,62 +204,6 @@ init_set2 (void *cls) GNUNET_SETU_add_element (set2, &element, &start, NULL); } -/** - * Generate random byte stream - */ - -unsigned char *gen_rdm_bytestream (size_t num_bytes) -{ - unsigned char *stream = GNUNET_malloc (num_bytes); - GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes); - return stream; -} - -/** - * Generate random sets - */ - -static void -initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes) -{ - struct GNUNET_SETU_Element element; - element.element_type = 0; - - // Add elements to both sets - for (int i = 0; i < overlap; i++) { - element.data = gen_rdm_bytestream(element_size_in_bytes); - element.size = element_size_in_bytes; - GNUNET_SETU_add_element (set1, &element, NULL, NULL); - GNUNET_SETU_add_element (set2, &element, NULL, NULL); - set1_size--; - set2_size--; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n"); - - // Add other elements to set 1 - while(set1_size>0) { - element.data = gen_rdm_bytestream(element_size_in_bytes); - element.size = element_size_in_bytes; - GNUNET_SETU_add_element (set1, &element, NULL, NULL); - set1_size--; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n"); - - // Add other elements to set 2 - while(set2_size > 0) { - element.data = gen_rdm_bytestream(element_size_in_bytes); - element.size = element_size_in_bytes; - - if(set2_size != 1) { - GNUNET_SETU_add_element (set2, &element,NULL, NULL); - } else { - GNUNET_SETU_add_element (set2, &element,&start, NULL); - } - - set2_size--; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n"); -} /** * Initialize the first set, continue. @@ -392,9 +336,7 @@ run (void *cls, /* test the real set reconciliation */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Running real set-reconciliation\n"); - //init_set1 (); - initRandomSets(19500,20000,20000,4096); - //initRandomSets(19500,20000,20000,32); + init_set1 (); } -- cgit v1.2.3