summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorElias Summermatter <elias.summermatter@seccom.ch>2021-04-02 15:46:25 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2021-07-30 16:06:15 +0200
commitebc70e1bccd6c2f784df8630f1105a91bc7bfeed (patch)
treebc5963b5c3ad38176120a7b0223e68c0709b41f2 /src
parente8eb1ecc006ffd3d7aa99fc9d3e8d19eedd9d343 (diff)
SETU: Implement LSD0003
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h7
-rw-r--r--src/include/gnunet_setu_service.h26
-rw-r--r--src/set/ibf.c2
-rw-r--r--src/set/ibf.h1
-rw-r--r--src/setu/Makefile.am11
-rw-r--r--src/setu/gnunet-service-setu.c2081
-rw-r--r--src/setu/gnunet-service-setu_protocol.h77
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.c362
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.h54
-rw-r--r--src/setu/ibf.c294
-rw-r--r--src/setu/ibf.h65
-rw-r--r--src/setu/perf_setu_api.c571
-rw-r--r--src/setu/setu.h49
-rw-r--r--src/setu/setu_api.c40
-rw-r--r--src/setu/test_setu_api.c60
15 files changed, 2921 insertions, 779 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 715e94c6a..6b61dfc72 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1784,6 +1784,13 @@ extern "C" {
*/
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572
+/**
+ * Signals other peer that all elements are sent.
+ */
+
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL 710
+
+
/*******************************************************************************
* SETI message types
diff --git a/src/include/gnunet_setu_service.h b/src/include/gnunet_setu_service.h
index bacec9408..1d7e48402 100644
--- a/src/include/gnunet_setu_service.h
+++ b/src/include/gnunet_setu_service.h
@@ -163,7 +163,31 @@ enum GNUNET_SETU_OptionType
/**
* Notify client also if we are sending a value to the other peer.
*/
- GNUNET_SETU_OPTION_SYMMETRIC = 8
+ GNUNET_SETU_OPTION_SYMMETRIC = 8,
+
+ /**
+ * Byzantine upper bound. Is the maximal plausible number of elements
+ * a peer can have default max uint64
+ */
+ GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND = 16,
+
+ /**
+ * Bandwidth latency tradeoff determines how much bytes a single RTT is
+ * worth, which is a performance setting
+ */
+ GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF= 32,
+
+ /**
+ * The factor determines the number of buckets an IBF has which is
+ * multiplied by the estimated setsize default: 2
+ */
+ GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR= 64,
+
+ /**
+ * This setting determines to how many IBF buckets an single elements
+ * is mapped to.
+ */
+ GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT= 128
};
diff --git a/src/set/ibf.c b/src/set/ibf.c
index 1532afceb..0f7eb6a9f 100644
--- a/src/set/ibf.c
+++ b/src/set/ibf.c
@@ -294,7 +294,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start,
struct IBF_KeyHash *key_hash_dst;
struct IBF_Count *count_dst;
- GNUNET_assert (start + count <= ibf->size);
+ GNUNET_assert (start + count <= ibf->size);
/* copy keys */
key_dst = (struct IBF_Key *) buf;
diff --git a/src/set/ibf.h b/src/set/ibf.h
index 7c2ab33b1..334a797ef 100644
--- a/src/set/ibf.h
+++ b/src/set/ibf.h
@@ -245,6 +245,7 @@ void
ibf_destroy (struct InvertibleBloomFilter *ibf);
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am
index 77d048add..6e2865d8c 100644
--- a/src/setu/Makefile.am
+++ b/src/setu/Makefile.am
@@ -7,6 +7,8 @@ libexecdir= $(pkglibdir)/libexec/
plugindir = $(libdir)/gnunet
+PTHREAD = -lpthread
+
pkgcfg_DATA = \
setu.conf
@@ -63,7 +65,8 @@ libgnunetsetu_la_SOURCES = \
setu_api.c setu.h
libgnunetsetu_la_LIBADD = \
$(top_builddir)/src/util/libgnunetutil.la \
- $(LTLIBINTL)
+ $(LTLIBINTL) \
+ $(PTHREAD)
libgnunetsetu_la_LDFLAGS = \
$(GN_LIB_LDFLAGS)
@@ -91,7 +94,8 @@ perf_setu_api_SOURCES = \
perf_setu_api_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/testing/libgnunettesting.la \
- libgnunetsetu.la
+ libgnunetsetu.la \
+ $(PTHREAD)
plugin_LTLIBRARIES = \
@@ -103,7 +107,8 @@ libgnunet_plugin_block_setu_test_la_LIBADD = \
$(top_builddir)/src/block/libgnunetblock.la \
$(top_builddir)/src/block/libgnunetblockgroup.la \
$(top_builddir)/src/util/libgnunetutil.la \
- $(LTLIBINTL)
+ $(LTLIBINTL) \
+ $(PTHREAD)
libgnunet_plugin_block_setu_test_la_LDFLAGS = \
$(GN_PLUGIN_LDFLAGS)
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index bd1113f15..a51e92b71 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -22,6 +22,7 @@
* @brief set union operation
* @author Florian Dold
* @author Christian Grothoff
+ * @author Elias Summermatter
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -50,33 +51,61 @@
*/
#define SE_STRATA_COUNT 32
+
/**
- * Size of the IBFs in the strata estimator.
+ * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348
+ * Based on the bsc thesis of Elias Summermatter (2021)
*/
-#define SE_IBF_SIZE 80
+#define SE_IBFS_TOTAL_SIZE 632
/**
* The hash num parameter for the difference digests and strata estimators.
*/
-#define SE_IBF_HASH_NUM 4
+#define SE_IBF_HASH_NUM 3
/**
* Number of buckets that can be transmitted in one message.
*/
-#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
+#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
/**
- * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
+ * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
* Choose this value so that computing the IBF is still cheaper
* than transmitting all values.
*/
-#define MAX_IBF_ORDER (20)
+#define MAX_IBF_SIZE 1048576
+
+
+/**
+ * Minimal size of an ibf
+ * Based on the bsc thesis of Elias Summermatter (2021)
+ */
+#define IBF_MIN_SIZE 37
+
+/**
+ * AVG RTT for differential sync when k=2 and Factor = 2
+ * Based on the bsc thesis of Elias Summermatter (2021)
+ */
+#define DIFFERENTIAL_RTT_MEAN 3.65145
+
+/**
+ * Security level used for byzantine checks (2^80)
+ */
+
+#define SECURITY_LEVEL 80
+
+/**
+ * Is the estimated probabily for a new round this values
+ * is based on the bsc thesis of Elias Summermatter (2021)
+ */
+
+#define PROBABILITY_FOR_NEW_ROUND 0.15
/**
- * Number of buckets used in the ibf per estimated
- * difference.
+ * Measure the performance in a csv
*/
-#define IBF_ALPHA 4
+
+#define MEASURE_PERFORMANCE 0
/**
@@ -146,6 +175,28 @@ enum UnionOperationPhase
PHASE_FULL_RECEIVING
};
+/**
+ * Different modes of operations
+ */
+
+enum MODE_OF_OPERATION
+{
+ /**
+ * Mode just synchronizes the difference between sets
+ */
+ DIFFERENTIAL_SYNC,
+
+ /**
+ * Mode send full set sending local set first
+ */
+ FULL_SYNC_LOCAL_SENDING_FIRST,
+
+ /**
+ * Mode request full set from remote peer
+ */
+ FULL_SYNC_REMOTE_SENDING_FIRST
+};
+
/**
* Information about an element element in the set. All elements are
@@ -277,7 +328,7 @@ struct Operation
* Copy of the set's strata estimator at the time of
* creation of this operation.
*/
- struct StrataEstimator *se;
+ struct MultiStrataEstimator *se;
/**
* The IBF we currently receive.
@@ -320,7 +371,7 @@ struct Operation
/**
* Number of ibf buckets already received into the @a remote_ibf.
*/
- unsigned int ibf_buckets_received;
+ uint64_t ibf_buckets_received;
/**
* Salt that we're using for sending IBFs
@@ -386,7 +437,7 @@ struct Operation
* Lower bound for the set size, used only when
* byzantine mode is enabled.
*/
- int byzantine_lower_bound;
+ uint64_t byzantine_lower_bound;
/**
* Unique request id for the request from a remote peer, sent to the
@@ -401,21 +452,83 @@ struct Operation
*/
unsigned int generation_created;
+
+ /**
+ * User defined Bandwidth Round Trips Tradeoff
+ */
+ uint64_t rtt_bandwidth_tradeoff;
+
+
/**
- * User defined Bandwidth Round Trips Tradeoff
+ * Number of Element per bucket in IBF
+ */
+ uint8_t ibf_number_buckets_per_element;
+
+
+ /**
+ * Set difference is multiplied with this factor
+ * to gennerate large enought IBF
*/
- double rtt_bandwidth_tradeoff;
+ uint8_t ibf_bucket_number_factor;
/**
- * Number of Element per bucket in IBF
+ * Defines which site a client is
+ * 0 = Initiating peer
+ * 1 = Receiving peer
*/
- unsigned int ibf_number_buckets_per_element;
+ uint8_t peer_site;
+
/**
- * Number of buckets in IBF
+ * Local peer element count
*/
- unsigned ibf_bucket_number;
+ uint64_t local_element_count;
+ /**
+ * Mode of operation that was chosen by the algorithm
+ */
+ uint8_t mode_of_operation;
+
+ /**
+ * Hashmap to keep track of the send/received messages
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
+
+ /**
+ * Hashmap to keep track of the send/received inquiries (ibf keys)
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
+
+
+ /**
+ * Total size of local set
+ */
+ uint64_t total_elements_size_local;
+
+ /**
+ * Limit of number of elements in set
+ */
+ uint64_t byzantine_upper_bound;
+
+ /**
+ * is the count of already passed differential sync iterations
+ */
+ uint8_t differential_sync_iterations;
+
+ /**
+ * Estimated or committed set difference at the start
+ */
+ uint64_t remote_set_diff;
+
+ /**
+ * Estimated or committed set difference at the start
+ */
+ uint64_t local_set_diff;
+
+ /**
+ * Boolean to enforce an active passive switch
+ */
+ bool active_passive_switch_required;
};
@@ -431,6 +544,16 @@ struct SetContent
struct GNUNET_CONTAINER_MultiHashMap *elements;
/**
+ * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *elements_randomized;
+
+ /**
+ * Salt to construct the randomized element map
+ */
+ uint64_t elements_randomized_salt;
+
+ /**
* Number of references to the content.
*/
unsigned int refcount;
@@ -478,7 +601,7 @@ struct Set
* The strata estimator is only generated once for each set. The IBF keys
* are derived from the element hashes with salt=0.
*/
- struct StrataEstimator *se;
+ struct MultiStrataEstimator *se;
/**
* Evaluate operations are held in a linked list.
@@ -635,96 +758,679 @@ static int in_shutdown;
*/
static uint32_t suggest_id;
+#if MEASURE_PERFORMANCE
+/**
+ * Handles configuration file for setu performance test
+ *
+ */
+static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
+
/**
- * Added Roundtripscounter
+ * Stores the performance data for induvidual message
*/
-struct perf_num_send_resived_msg {
- int sent;
- int sent_var_bytes;
- int received;
- int received_var_bytes;
+struct perf_num_send_received_msg
+{
+ uint64_t sent;
+ uint64_t sent_var_bytes;
+ uint64_t received;
+ uint64_t received_var_bytes;
};
+/**
+ * Main struct to messure perfomance (data/rtts)
+ */
+struct per_store_struct
+{
+ struct perf_num_send_received_msg operation_request;
+ struct perf_num_send_received_msg se;
+ struct perf_num_send_received_msg request_full;
+ struct perf_num_send_received_msg element_full;
+ struct perf_num_send_received_msg full_done;
+ struct perf_num_send_received_msg ibf;
+ struct perf_num_send_received_msg inquery;
+ struct perf_num_send_received_msg element;
+ struct perf_num_send_received_msg demand;
+ struct perf_num_send_received_msg offer;
+ struct perf_num_send_received_msg done;
+ struct perf_num_send_received_msg over;
+ uint64_t se_diff;
+ uint64_t se_diff_remote;
+ uint64_t se_diff_local;
+ uint64_t active_passive_switches;
+ uint8_t mode_of_operation;
+};
-struct perf_rtt_struct
-{
- struct perf_num_send_resived_msg operation_request;
- struct perf_num_send_resived_msg se;
- struct perf_num_send_resived_msg request_full;
- struct perf_num_send_resived_msg element_full;
- struct perf_num_send_resived_msg full_done;
- struct perf_num_send_resived_msg ibf;
- struct perf_num_send_resived_msg inquery;
- struct perf_num_send_resived_msg element;
- struct perf_num_send_resived_msg demand;
- struct perf_num_send_resived_msg offer;
- struct perf_num_send_resived_msg done;
- struct perf_num_send_resived_msg over;
+struct per_store_struct perf_store;
+#endif
+
+/**
+ * Different states to control the messages flow in differential mode
+ */
+
+enum MESSAGE_CONTROL_FLOW_STATE
+{
+ /**
+ * Initial message state
+ */
+ MSG_CFS_UNINITIALIZED,
+
+ /**
+ * Track that a message has been sent
+ */
+ MSG_CFS_SENT,
+
+ /**
+ * Track that receiving this message is expected
+ */
+ MSG_CFS_EXPECTED,
+
+ /**
+ * Track that message has been recieved
+ */
+ MSG_CFS_RECEIVED,
+};
+
+/**
+ * Message types to track in message control flow
+ */
+
+enum MESSAGE_TYPE
+{
+ /**
+ * Offer message type
+ */
+ OFFER_MESSAGE,
+ /**
+ * Demand message type
+ */
+ DEMAND_MESSAGE,
+ /**
+ * Elemente message type
+ */
+ ELEMENT_MESSAGE,
};
-struct perf_rtt_struct perf_rtt;
+/**
+ * Struct to tracked messages in message controll flow
+ */
+
+struct messageControlFlowElement
+{
+ /**
+ * Track the message control state of the offer message
+ */
+ enum MESSAGE_CONTROL_FLOW_STATE offer;
+ /**
+ * Track the message control state of the demand message
+ */
+ enum MESSAGE_CONTROL_FLOW_STATE demand;
+ /**
+ * Track the message control state of the element message
+ */
+ enum MESSAGE_CONTROL_FLOW_STATE element;
+};
+
+
+#if MEASURE_PERFORMANCE
+/**
+ * Loads different configuration to do perform perfomance tests
+ * @param op
+ */
+static void
+load_config (struct Operation *op)
+{
+
+ setu_cfg = GNUNET_CONFIGURATION_create ();
+ GNUNET_CONFIGURATION_load (setu_cfg,"perf_setu.conf");
+
+
+ long long number;
+ float fl;
+ GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
+ &fl);
+ op->ibf_bucket_number_factor = fl;
+
+ GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
+ &number);
+ op->ibf_number_buckets_per_element = number;
+
+ GNUNET_CONFIGURATION_get_value_number (setu_cfg,"PERFORMANCE", "TRADEOFF",
+ &number);
+ op->rtt_bandwidth_tradeoff = number;
+
+
+ GNUNET_CONFIGURATION_get_value_number (setu_cfg,"BOUNDARIES", "UPPER_ELEMENT",
+ &number);
+ op->byzantine_upper_bound = number;
+
+
+ op->peer_site = 0;
+}
+/**
+ * Function to calculate total bytes used for performance messurement
+ * @param size
+ * @param perf_num_send_received_msg
+ * @return bytes used
+ */
static int
-sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct) {
- return (size * perf_rtt_struct.sent) +
- (size * perf_rtt_struct.received) +
- perf_rtt_struct.sent_var_bytes +
- perf_rtt_struct.received_var_bytes;
+sum_sent_received_bytes (uint64_t size, struct perf_num_send_received_msg
+ perf_num_send_received_msg)
+{
+ return (size * perf_num_send_received_msg.sent)
+ + (size * perf_num_send_received_msg.received)
+ + perf_num_send_received_msg.sent_var_bytes
+ + perf_num_send_received_msg.received_var_bytes;
}
-static float
-calculate_perf_rtt() {
- /**
- * Calculate RTT of init phase normally always 1
- */
- float rtt = 1;
- int bytes_transmitted = 0;
- /**
- * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending
- */
- if (( perf_rtt.element_full.received != 0 ) ||
- ( perf_rtt.element_full.sent != 0)
- ) rtt += 1;
+/**
+ * Function that calculates the perfmance values and writes them down
+ */
+static void
+calculate_perf_store ()
+{
- if (( perf_rtt.request_full.received != 0 ) ||
- ( perf_rtt.request_full.sent != 0)
- ) rtt += 0.5;
+ /**
+ * Calculate RTT of init phase normally always 1
+ */
+ float rtt = 1;
+ int bytes_transmitted = 0;
- /**
- * In case of a differential sync 3 rtt's are needed.
- * for every active/passive switch additional 3.5 rtt's are used
- */
+ /**
+ * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normaly 1 or 1.5 depending
+ */
+ if ((perf_store.element_full.received != 0) ||
+ (perf_store.element_full.sent != 0)
+ )
+ rtt += 1;
+
+ if ((perf_store.request_full.received != 0) ||
+ (perf_store.request_full.sent != 0)
+ )
+ rtt += 0.5;
+
+ /**
+ * In case of a differential sync 3 rtt's are needed.
+ * for every active/passive switch additional 3.5 rtt's are used
+ */
+ if ((perf_store.element.received != 0) ||
+ (perf_store.element.sent != 0))
+ {
+ int iterations = perf_store.active_passive_switches;
+
+ if (iterations > 0)
+ rtt += iterations * 0.5;
+ rtt += 2.5;
+ }
+
+
+ /**
+ * Calculate data sended size
+ */
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct
+ GNUNET_SETU_ResultMessage),
+ perf_store.request_full);
+
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct
+ GNUNET_SETU_ElementMessage),
+ perf_store.element_full);
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct
+ GNUNET_SETU_ElementMessage),
+ perf_store.element);
+ // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct
+ StrataEstimatorMessage),
+ perf_store.se);
+ bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
+ perf_store.ibf);
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
+ perf_store.inquery);
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct
+ GNUNET_MessageHeader),
+ perf_store.demand);
+ bytes_transmitted += sum_sent_received_bytes (sizeof(struct
+ GNUNET_MessageHeader),
+ perf_store.offer);
+ bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
+
+ /**
+ * Write IBF failure rate for different BUCKET_NUMBER_FACTOR
+ */
+ float factor;
+ GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
+ &factor);
+ long long num_per_bucket;
+ GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
+ &num_per_bucket);
+
+
+ int decoded = 0;
+ if (perf_store.active_passive_switches == 0)
+ decoded = 1;
+ int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
+ IBFMessage),
+ perf_store.ibf);
+
+ FILE *out1 = fopen ("perf_data.csv", "a");
+ fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
+ decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
+ bytes_transmitted,
+ perf_store.se_diff_local,perf_store.se_diff_remote,
+ perf_store.mode_of_operation);
+ fclose (out1);
+
+}
+
+
+#endif
+/**
+ * Function that chooses the optimal mode of operation depending on
+ * operation parameters.
+ * @param avg_element_size
+ * @param local_set_size
+ * @param remote_set_size
+ * @param est_set_diff_remote
+ * @param est_set_diff_local
+ * @param bandwith_latency_tradeoff
+ * @param ibf_bucket_number_factor
+ * @return calcuated mode of operation
+ */
+static uint8_t
+estimate_best_mode_of_operation (uint64_t avg_element_size,
+ uint64_t local_set_size,
+ uint64_t remote_set_size,
+ uint64_t est_set_diff_remote,
+ uint64_t est_set_diff_local,
+ uint64_t bandwith_latency_tradeoff,
+ uint64_t ibf_bucket_number_factor)
+{
+
+ /*
+ * In case of initial sync fall to predefined states
+ */
+
+ if (0 == local_set_size)
+ return FULL_SYNC_REMOTE_SENDING_FIRST;
+ if (0 == remote_set_size)
+ return FULL_SYNC_LOCAL_SENDING_FIRST;
+
+ /*
+ * Calculate bytes for full Sync
+ */
+
+ uint8_t sizeof_full_done_header = 4;
+ uint8_t sizeof_done_header = 4;
+ uint8_t rtt_min_full = 2;
+ uint8_t sizeof_request_full = 4;
+ uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
+
+ /* Estimate byte required if we send first */
+ uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
+ + local_set_size;
+
+ uint64_t total_bytes_full_local_send_first = (avg_element_size
+ *
+ total_elements_to_send_local_send_first) \
+ + (
+ total_elements_to_send_local_send_first * sizeof(struct
+ GNUNET_SETU_ElementMessage)) \
+ + (sizeof_full_done_header * 2) \
+ + rtt_min_full
+ * bandwith_latency_tradeoff;
+
+ /* Estimate bytes required if we request from remote peer */
+ uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
+ + remote_set_size;
+
+ uint64_t total_bytes_full_remote_send_first = (avg_element_size
+ *
+ total_elements_to_send_remote_send_first) \
+ + (
+ total_elements_to_send_remote_send_first * sizeof(struct
+ GNUNET_SETU_ElementMessage)) \
+ + (sizeof_full_done_header * 2) \
+ + (rtt_min_full + 0.5)
+ * bandwith_latency_tradeoff \
+ + sizeof_request_full;
+
+ /*
+ * Calculate bytes for differential Sync
+ */
+
+ /* Estimate bytes required by IBF transmission*/
+
+ long double ibf_bucket_count = estimated_total_diff
+ * ibf_bucket_number_factor;
+
+ if (ibf_bucket_count <= IBF_MIN_SIZE)
+ {
+ ibf_bucket_count = IBF_MIN_SIZE;
+ }
+ uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
+ / MAX_BUCKETS_PER_MESSAGE);
+
+ uint64_t estimated_counter_size = ceil (
+ MIN (2 * log2l ((float) local_set_size / ibf_bucket_count), log2l (
+ local_set_size)));
+
+ long double counter_bytes = (float) estimated_counter_size / 8;
+
+ uint64_t ibf_bytes = ceil ((sizeof(struct IBFMessage) * ibf_message_count)
+ * 1.2 \
+ + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
+ + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
+ * 1.2 \
+ + (ibf_bucket_count * counter_bytes) * 1.2);
+
+ /* Estimate full byte count for differential sync */
+ uint64_t element_size = (avg_element_size + sizeof(struct
+ GNUNET_SETU_ElementMessage)) \
+ * estimated_total_diff;
+ uint64_t done_size = sizeof_done_header;
+ uint64_t inquery_size = (sizeof(struct IBF_Key) + sizeof(struct
+ InquiryMessage))
+ * estimated_total_diff;
+ uint64_t demand_size =
+ (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
+ * estimated_total_diff;
+ uint64_t offer_size = (sizeof(struct GNUNET_HashCode) + sizeof(struct
+ GNUNET_MessageHeader))
+ * estimated_total_diff;
+
+ uint64_t total_bytes_diff = (element_size + done_size + inquery_size
+ + demand_size + offer_size + ibf_bytes) \
+ + (DIFFERENTIAL_RTT_MEAN
+ * bandwith_latency_tradeoff);
+
+ uint64_t full_min = MIN (total_bytes_full_local_send_first,
+ total_bytes_full_remote_send_first);
+
+ /* Decide between full and differential sync */
+
+ if (full_min < total_bytes_diff)
+ {
+ /* Decide between sending all element first or receiving all elements */
+ if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
+ {
+ return FULL_SYNC_LOCAL_SENDING_FIRST;
+ }
+ else
+ {
+ return FULL_SYNC_REMOTE_SENDING_FIRST;
+ }
+ }
+ else
+ {
+ return DIFFERENTIAL_SYNC;
+ }
+}
+
+
+/**
+ * Validates the if a message is received in a correct phase
+ * @param allowed_phases
+ * @param size_phases
+ * @param op
+ * @return GNUNET_YES if message permitted in phase and GNUNET_NO if not permitted in given
+ * phase
+ */
+static int
+check_valid_phase (const uint8_t allowed_phases[], size_t size_phases, struct
+ Operation *op)
+{
+ /**
+ * Iterate over allowed phases
+ */
+ for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
+ {
+ uint8_t phase = allowed_phases[phase_ctr];
+ if (phase == op->phase)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message received in valid phase\n");
+ return GNUNET_YES;
+ }
+ }
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received message in invalid phase: %u\n", op->phase);
+ return GNUNET_NO;
+}
+
+
+/**
+ * Function to update, track and validate message received in differential
+ * sync. This function tracks states of messages and check it against different
+ * constraints as described in Summermatter's BSc Thesis (2021)
+ * @param hash_map: Hashmap to store message control flow
+ * @param new_mcfs: The new message control flow state an given message type should be set to
+ * @param hash_code: Hash code of the element
+ * @param mt: The message type for which the message control flow state should be set
+ * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid
+ * at this point in message flow
+ */
+static int
+update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map,
+ enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
+ const struct GNUNET_HashCode *hash_code,
+ enum MESSAGE_TYPE mt)
+{
+ struct messageControlFlowElement *cfe = NULL;
+ enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
+
+ /**
+ * Check logic for forbidden messages
+ */
- int iterations = perf_rtt.ibf.received;
- if(iterations > 1)
- rtt += (iterations - 1 ) * 0.5;
- rtt += 3 * iterations;
+ cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
+ if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
+ {
+ if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received an element without sent offer!\n");
+ return GNUNET_NO;
+ }
+ /* Check that only requested elements are received! */
+ if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
+ MSG_CFS_SENT))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received an element that was not demanded\n");
+ return GNUNET_NO;
+ }
+ }
+
+ /**
+ * In case the element hash is not in the hashmap create a new entry
+ */
+
+ if (NULL == cfe)
+ {
+ cfe = GNUNET_new (struct messageControlFlowElement);
+ if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
+ cfe,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+ {
+ GNUNET_free (cfe);
+ return GNUNET_SYSERR;
+ }
+ }
+
+ /**
+ * Set state of message
+ */
+
+ if (OFFER_MESSAGE == mt)
+ {
+ mcfs = &cfe->offer;
+ }
+ else if (DEMAND_MESSAGE == mt)
+ {
+ mcfs = &cfe->demand;
+ }
+ else if (ELEMENT_MESSAGE == mt)
+ {
+ mcfs = &cfe->element;
+ }
+ else
+ {
+ return GNUNET_SYSERR;
+ }
+
+ /**
+ * Check if state is allowed
+ */
+
+ if (new_mcfs <= *mcfs)
+ {
+ return GNUNET_NO;
+ }
+
+ *mcfs = new_mcfs;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Validate if a message in differential sync si already received before.
+ * @param hash_map
+ * @param hash_code
+ * @param mt
+ * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO
+ */
+static int
+is_message_in_message_control_flow (struct
+ GNUNET_CONTAINER_MultiHashMap *hash_map,
+ struct GNUNET_HashCode *hash_code,
+ enum MESSAGE_TYPE mt)
+{
+ struct messageControlFlowElement *cfe = NULL;
+ enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
+
+ cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
+
+ /**
+ * Set state of message
+ */
+
+ if (cfe != NULL)
+ {
+ if (OFFER_MESSAGE == mt)
+ {
+ mcfs = &cfe->offer;
+ }
+ else if (DEMAND_MESSAGE == mt)
+ {
+ mcfs = &cfe->demand;
+ }
+ else if (ELEMENT_MESSAGE == mt)
+ {
+ mcfs = &cfe->element;
+ }
+ else
+ {
+ return GNUNET_SYSERR;
+ }
/**
- * Calculate data sended size
+ * Evaluate if set is in message
*/
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL), perf_rtt.request_full);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT), perf_rtt.element_full);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS), perf_rtt.element);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_rtt.operation_request);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_SE), perf_rtt.se);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE), perf_rtt.full_done);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_IBF), perf_rtt.ibf);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY), perf_rtt.inquery);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND), perf_rtt.demand);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER), perf_rtt.offer);
- bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_DONE), perf_rtt.done);
+ if (*mcfs != MSG_CFS_UNINITIALIZED)
+ {
+ return GNUNET_YES;
+ }
+ }
+ return GNUNET_NO;
+}
+
+
+/**
+ * Iterator for determining if all demands have been
+ * satisfied
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ * into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+determinate_done_message_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct messageControlFlowElement *mcfe = value;
+
+ if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
+ {
+ return GNUNET_YES;
+ }
+ return GNUNET_NO;
+}
+
+
+/**
+ * Iterator for determining average size
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ * into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+determinate_avg_element_size_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct GNUNET_SETU_Element *element = value;
+ op->total_elements_size_local += element->size;
+ return GNUNET_YES;
+}
- LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted);
- LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt ));
+/**
+ * Create randomized element hashmap for full sending
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ * into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+create_randomized_element_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct Operation *op = cls;
- return rtt;
+ struct GNUNET_HashContext *hashed_key_context =
+ GNUNET_CRYPTO_hash_context_start ();
+ struct GNUNET_HashCode new_key;
+
+ /**
+ * Hash element with new salt to randomize hashmap
+ */
+ GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+ &key,
+ sizeof(struct IBF_Key));
+ GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+ &op->set->content->elements_randomized_salt,
+ sizeof(uint32_t));
+ GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
+ &new_key);
+ GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
+ &new_key,value,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ return GNUNET_YES;
}
@@ -808,6 +1514,36 @@ send_client_done (void *cls)
}
+/**
+ * Check if all given byzantine parameters are in given boundaries
+ * @param op
+ * @return indicator if all given byzantine parameters are in given boundaries
+ */
+
+static int
+check_byzantine_bounds (struct Operation *op)
+{
+ if (op->byzantine != GNUNET_YES)
+ return GNUNET_OK;
+
+ /**
+ * Check upper byzantine bounds
+ */
+ if (op->remote_element_count + op->remote_set_diff >
+ op->byzantine_upper_bound)
+ return GNUNET_SYSERR;
+ if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
+ return GNUNET_SYSERR;
+
+ /**
+ * Check lower byzantine bounds
+ */
+ if (op->remote_element_count < op->byzantine_lower_bound)
+ return GNUNET_SYSERR;
+ return GNUNET_OK;
+}
+
+
/* FIXME: the destroy logic is a mess and should be cleaned up! */
/**
@@ -977,6 +1713,101 @@ fail_union_operation (struct Operation *op)
/**
+ * Function that checks if full sync is plausible
+ * @param initial_local_elements_in_set
+ * @param estimated_set_difference
+ * @param repeated_elements
+ * @param fresh_elements
+ * @param op
+ * @return GNUNET_OK if
+ */
+
+static void
+full_sync_plausibility_check (struct Operation *op)
+{
+ if (GNUNET_YES != op->byzantine)
+ return;
+
+ int security_level_lb = -1 * SECURITY_LEVEL;
+ uint64_t duplicates = op->received_fresh - op->received_total;
+
+ /*
+ * Protect full sync from receiving double element when in FULL SENDING
+ */
+ if (PHASE_FULL_SENDING == op->phase)
+ {
+ if (duplicates > 0)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Received duplicate element in full receiving "
+ "mode of operation this is not allowed! Duplicates: %lu\n",
+ duplicates);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ }
+
+ /*
+ * Protect full sync with probabilistic algorithm
+ */
+ if (PHASE_FULL_RECEIVING == op->phase)
+ {
+ if (0 == op->remote_set_diff)
+ op->remote_set_diff = 1;
+
+ long double base = (1 - (long double) (op->remote_set_diff
+ / (long double) (op->initial_size
+ + op->
+ remote_set_diff)));
+ long double exponent = (op->received_total - (op->received_fresh * ((long
+ double)
+ op->
+ initial_size
+ / (long
+ double)
+ op->
+ remote_set_diff)));
+ long double value = exponent * (log2l (base) / log2l (2));
+ if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
+ "to many duplicated full element : %LF\n",
+ value);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ }
+}
+
+
+/**
+ * Limit active passive switches in differential sync to configured security level
+ * @param op
+ */
+static void
+check_max_differential_rounds (struct Operation *op)
+{
+ double probability = op->differential_sync_iterations * (log2l (
+ PROBABILITY_FOR_NEW_ROUND)
+ / log2l (2));
+ if ((-1 * SECURITY_LEVEL) > probability)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
+ "switches in differential sync: %u\n",
+ op->differential_sync_iterations);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+}
+
+
+/**
* Derive the IBF key from a hash code and
* a salt.
*
@@ -1004,12 +1835,12 @@ get_ibf_key (const struct GNUNET_HashCode *src)
struct GetElementContext
{
/**
- * FIXME.
+ * Gnunet hash code in context
*/
struct GNUNET_HashCode hash;
/**
- * FIXME.
+ * Pointer to the key enty
*/
struct KeyEntry *k;
};
@@ -1122,7 +1953,7 @@ salt_key (const struct IBF_Key *k_in,
uint32_t salt,
struct IBF_Key *k_out)
{
- int s = salt % 64;
+ int s = (salt * 7) % 64;
uint64_t x = k_in->key_val;
/* rotate ibf key */
@@ -1132,14 +1963,14 @@ salt_key (const struct IBF_Key *k_in,
/**
- * FIXME.
+ * Reverse modification done in the salt_key function
*/
static void
unsalt_key (const struct IBF_Key *k_in,
uint32_t salt,
struct IBF_Key *k_out)
{
- int s = salt % 64;
+ int s = (salt * 7) % 64;
uint64_t x = k_in->key_val;
x = (x << s) | (x >> (64 - s));
@@ -1258,7 +2089,9 @@ prepare_ibf (struct Operation *op,
if (NULL != op->local_ibf)
ibf_destroy (op->local_ibf);
- op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
+ // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
+ op->local_ibf = ibf_create (size,
+ ((uint8_t) op->ibf_number_buckets_per_element));
if (NULL == op->local_ibf)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1283,13 +2116,23 @@ prepare_ibf (struct Operation *op,
*/
static int
send_ibf (struct Operation *op,
- uint16_t ibf_order)
+ uint32_t ibf_size)
{
- unsigned int buckets_sent = 0;
+ uint64_t buckets_sent = 0;
struct InvertibleBloomFilter *ibf;
+ op->differential_sync_iterations++;
+
+ /**
+ * Enforce min size of IBF
+ */
+ uint32_t ibf_min_size = IBF_MIN_SIZE;
+ if (ibf_size < ibf_min_size)
+ {
+ ibf_size = ibf_min_size;
+ }
if (GNUNET_OK !=
- prepare_ibf (op, 1 << ibf_order))
+ prepare_ibf (op, ibf_size))
{
/* allocation failed */
return GNUNET_SYSERR;
@@ -1297,45 +2140,48 @@ send_ibf (struct Operation *op,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"sending ibf of size %u\n",
- 1 << ibf_order);
+ 1 << ibf_size);
{
char name[64];
- GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
+ GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_size);
GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
}
ibf = op->local_ibf;
- while (buckets_sent < (1 << ibf_order))
+ while (buckets_sent < ibf_size)
{
unsigned int buckets_in_message;
struct GNUNET_MQ_Envelope *ev;
struct IBFMessage *msg;
- buckets_in_message = (1 << ibf_order) - buckets_sent;
+ buckets_in_message = ibf_size - buckets_sent;
/* limit to maximum */
if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
- perf_rtt.ibf.sent += 1;
- perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE );
+#if MEASURE_PERFORMANCE
+ perf_store.ibf.sent += 1;
+ perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
+#endif
ev = GNUNET_MQ_msg_extra (msg,
buckets_in_message * IBF_BUCKET_SIZE,
GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
- msg->reserved1 = 0;
- msg->reserved2 = 0;
- msg->order = ibf_order;
+ msg->ibf_size = ibf_size;
msg->offset = htonl (buckets_sent);
msg->salt = htonl (op->salt_send);
+ msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
+
+
ibf_write_slice (ibf, buckets_sent,
- buckets_in_message, &msg[1]);
+ buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
buckets_sent += buckets_in_message;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "ibf chunk size %u, %u/%u sent\n",
+ "ibf chunk size %u, %lu/%u sent\n",
buckets_in_message,
buckets_sent,
- 1 << ibf_order);
+ ibf_size);
GNUNET_MQ_send (op->mq, ev);
}
@@ -1354,17 +2200,26 @@ send_ibf (struct Operation *op,
* @return the required size of the ibf
*/
static unsigned int
-get_order_from_difference (unsigned int diff)
+get_size_from_difference (unsigned int diff, int number_buckets_per_element,
+ float ibf_bucket_number_factor)
{
- unsigned int ibf_order;
+ /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
+ * Elias Summermatter (2021) in section 3.11 **/
+ return (((int) (diff * ibf_bucket_number_factor)) | 1);
- ibf_order = 2;
- while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
- ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
- (ibf_order < MAX_IBF_ORDER))
- ibf_order++;
- // add one for correction
- return ibf_order + 1;
+}
+
+
+static unsigned int
+get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
+ decoded_elements, unsigned int last_ibf_size)
+{
+ unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
+ - (ibf_bucket_number_factor
+ * decoded_elements));
+ /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
+ * Elias Summermatter (2021) in section 3.11 **/
+ return next_size | 1;
}
@@ -1391,8 +2246,10 @@ send_full_element_iterator (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending element %s\n",
GNUNET_h2s (key));
- perf_rtt.element_full.received += 1;
- perf_rtt.element_full.received_var_bytes += el->size;
+#if MEASURE_PERFORMANCE
+ perf_store.element_full.received += 1;
+ perf_store.element_full.received_var_bytes += el->size;
+#endif
ev = GNUNET_MQ_msg_extra (emsg,
el->size,
GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -1421,10 +2278,25 @@ send_full_set (struct Operation *op)
"Dedicing to transmit the full set\n");
/* FIXME: use a more memory-friendly way of doing this with an
iterator, just as we do in the non-full case! */
+
+ // Randomize Elements to send
+ op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
+ 32,GNUNET_NO);
+ op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_NONCE,
+ UINT64_MAX);
(void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
- &send_full_element_iterator,
+ &
+ create_randomized_element_iterator,
op);
- perf_rtt.full_done.sent += 1;
+
+ (void) GNUNET_CONTAINER_multihashmap_iterate (
+ op->set->content->elements_randomized,
+ &send_full_element_iterator,
+ op);
+#if MEASURE_PERFORMANCE
+ perf_store.full_done.sent += 1;
+#endif
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
GNUNET_MQ_send (op->mq,
ev);
@@ -1454,7 +2326,7 @@ check_union_p2p_strata_estimator (void *cls,
msg->header.type));
len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
if ((GNUNET_NO == is_compressed) &&
- (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
+ (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE))
{
GNUNET_break (0);
return GNUNET_SYSERR;
@@ -1473,14 +2345,44 @@ static void
handle_union_p2p_strata_estimator (void *cls,
const struct StrataEstimatorMessage *msg)
{
- perf_rtt.se.received += 1;
- perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
+#if MEASURE_PERFORMANCE
+ perf_store.se.received += 1;
+ perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
+ StrataEstimatorMessage);
+#endif
struct Operation *op = cls;
- struct StrataEstimator *remote_se;
+ struct MultiStrataEstimator *remote_se;
unsigned int diff;
uint64_t other_size;
size_t len;
int is_compressed;
+ op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
+ op->set->content->elements);
+ // Setting peer site to receiving peer
+ op->peer_site = 1;
+
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ /** Only allow 1,2,4,8 SEs **/
+ if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
+ msg->se_count);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
msg->header.type));
@@ -1490,8 +2392,20 @@ handle_union_p2p_strata_estimator (void *cls,
GNUNET_NO);
len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
other_size = GNUNET_ntohll (msg->set_size);
+ op->remote_element_count = other_size;
+
+ if (op->byzantine_upper_bound < op->remote_element_count)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Exceeded configured upper bound <%lu> of element: %u\n",
+ op->byzantine_upper_bound,
+ op->remote_element_count);
+ fail_union_operation (op);
+ return;
+ }
+
remote_se = strata_estimator_create (SE_STRATA_COUNT,
- SE_IBF_SIZE,
+ SE_IBFS_TOTAL_SIZE,
SE_IBF_HASH_NUM);
if (NULL == remote_se)
{
@@ -1503,6 +2417,8 @@ handle_union_p2p_strata_estimator (void *cls,
strata_estimator_read (&msg[1],
len,
is_compressed,
+ msg->se_count,
+ SE_IBFS_TOTAL_SIZE,
remote_se))
{
/* decompression failed */
@@ -1511,11 +2427,76 @@ handle_union_p2p_strata_estimator (void *cls,
return;
}
GNUNET_assert (NULL != op->se);
- diff = strata_estimator_difference (remote_se,
- op->se);
+ strata_estimator_difference (remote_se,
+ op->se);
+
+ /* Calculate remote local diff */
+ long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
+ long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
+
+ /* Prevent estimations from overshooting max element */
+ if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
+ diff_remote = op->byzantine_upper_bound - op->remote_element_count;
+ if (diff_local + op->local_element_count > op->byzantine_upper_bound)
+ diff_local = op->byzantine_upper_bound - op->local_element_count;
+ if ((diff_remote < 0) || (diff_local < 0))
+ {
+ strata_estimator_destroy (remote_se);
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
+ "malicious: remote diff %ld, local diff: %ld\n",
+ diff_remote, diff_local);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
- if (diff > 200)
- diff = diff * 3 / 2;
+ /* Make estimation more precise in initial sync cases */
+ if (0 == op->remote_element_count)
+ {
+ diff_remote = 0;
+ diff_local = op->local_element_count;
+ }
+ if (0 == op->local_element_count)
+ {
+ diff_local = 0;
+ diff_remote = op->remote_element_count;
+ }
+
+ diff = diff_remote + diff_local;
+ op->remote_set_diff = diff_remote;
+
+ /** Calculate avg element size if not initial sync **/
+ uint64_t avg_element_size = 0;
+ if (0 < op->local_element_count)
+ {
+ op->total_elements_size_local = 0;
+ GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+ &
+ determinate_avg_element_size_iterator,
+ op);
+ avg_element_size = op->total_elements_size_local / op->local_element_count;
+ }
+
+ op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
+ GNUNET_CONTAINER_multihashmap_size (
+ op->set->content->
+ elements),
+ op->
+ remote_element_count,
+ diff_remote,
+ diff_local,
+ op->
+ rtt_bandwidth_tradeoff,
+ op->
+ ibf_bucket_number_factor);
+
+#if MEASURE_PERFORMANCE
+ perf_store.se_diff_local = diff_local;
+ perf_store.se_diff_remote = diff_remote;
+ perf_store.se_diff = diff;
+ perf_store.mode_of_operation = op->mode_of_operation;
+#endif
strata_estimator_destroy (remote_se);
strata_estimator_destroy (op->se);
@@ -1523,7 +2504,8 @@ handle_union_p2p_strata_estimator (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got se diff=%d, using ibf size %d\n",
diff,
- 1U << get_order_from_difference (diff));
+ 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
+ op->ibf_bucket_number_factor));
{
char *set_debug;
@@ -1546,16 +2528,8 @@ handle_union_p2p_strata_estimator (void *cls,
return;
}
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff);
-
-
- /**
- * Added rtt_bandwidth_tradeoff directly need future improvements
- */
if ((GNUNET_YES == op->force_full) ||
- (diff > op->initial_size / 4) ||
- (0 == other_size))
+ (op->mode_of_operation != DIFFERENTIAL_SYNC))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
@@ -1565,9 +2539,17 @@ handle_union_p2p_strata_estimator (void *cls,
"# of full sends",
1,
GNUNET_NO);
- if ((op->initial_size <= other_size) ||
- (0 == other_size))
+ if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
{
+ struct TransmitFullMessage *signal_msg;
+ struct GNUNET_MQ_Envelope *ev;
+ ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
+ GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL);
+ signal_msg->remote_set_difference = htonl (diff_local);
+ signal_msg->remote_set_size = htonl (op->local_element_count);
+ signal_msg->local_set_difference = htonl (diff_remote);
+ GNUNET_MQ_send (op->mq,
+ ev);
send_full_set (op);
}
else
@@ -1577,9 +2559,15 @@ handle_union_p2p_strata_estimator (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Telling other peer that we expect its full set\n");
op->phase = PHASE_FULL_RECEIVING;
- perf_rtt.request_full.sent += 1;
- ev = GNUNET_MQ_msg_header (
- GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
+#if MEASURE_PERFORMANCE
+ perf_store.request_full.sent += 1;
+#endif
+ struct TransmitFullMessage *signal_msg;
+ ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
+ GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
+ signal_msg->remote_set_difference = htonl (diff_local);
+ signal_msg->remote_set_size = htonl (op->local_element_count);
+ signal_msg->local_set_difference = htonl (diff_remote);
GNUNET_MQ_send (op->mq,
ev);
}
@@ -1592,7 +2580,9 @@ handle_union_p2p_strata_estimator (void *cls,
GNUNET_NO);
if (GNUNET_OK !=
send_ibf (op,
- get_order_from_difference (diff)))
+ get_size_from_difference (diff,
+ op->ibf_number_buckets_per_element,
+ op->ibf_bucket_number_factor)))
{
/* Internal error, best we can do is shut the connection */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1625,15 +2615,64 @@ send_offers_iterator (void *cls,
/* Detect 32-bit key collision for the 64-bit IBF keys. */
if (ke->ibf_key.key_val != sec->ibf_key.key_val)
+ {
+ op->active_passive_switch_required = true;
return GNUNET_YES;
+ }
- perf_rtt.offer.sent += 1;
- perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
+ /* Prevent implementation from sending a offer multiple times in case of roll switch */
+ if (GNUNET_YES ==
+ is_message_in_message_control_flow (
+ op->message_control_flow,
+ &ke->element->element_hash,
+ OFFER_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Skipping already sent processed element offer!\n");
+ return GNUNET_YES;
+ }
+ /* Save send offer message for message control */
+ if (GNUNET_YES !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_SENT,
+ &ke->element->element_hash,
+ OFFER_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double offer message sent found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return GNUNET_NO;
+ }
+ ;
+
+ /* Mark element to be expected to received */
+ if (GNUNET_YES !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_EXPECTED,
+ &ke->element->element_hash,
+ DEMAND_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double demand received found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return GNUNET_NO;
+ }
+ ;
+#if MEASURE_PERFORMANCE
+ perf_store.offer.sent += 1;
+ perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
+#endif
ev = GNUNET_MQ_msg_header_extra (mh,
sizeof(struct GNUNET_HashCode),
GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
-
GNUNET_assert (NULL != ev);
*(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1651,7 +2690,7 @@ send_offers_iterator (void *cls,
* @param op union operation
* @param ibf_key IBF key of interest
*/
-static void
+void
send_offers_for_key (struct Operation *op,
struct IBF_Key ibf_key)
{
@@ -1694,6 +2733,7 @@ decode_and_send (struct Operation *op)
/* allocation failed */
return GNUNET_SYSERR;
}
+
diff_ibf = ibf_dup (op->local_ibf);
ibf_subtract (diff_ibf,
op->remote_ibf);
@@ -1706,7 +2746,7 @@ decode_and_send (struct Operation *op)
diff_ibf->size);
num_decoded = 0;
- key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
+ key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
while (1)
{
@@ -1738,23 +2778,36 @@ decode_and_send (struct Operation *op)
if ((GNUNET_SYSERR == res) ||
(GNUNET_YES == cycle_detected))
{
- int next_order;
- next_order = 0;
- while (1 << next_order < diff_ibf->size)
- next_order++;
- next_order++;
- if (next_order <= MAX_IBF_ORDER)
+ uint32_t next_size;
+ /** Enforce odd ibf size **/
+
+ next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
+ diff_ibf->size);
+ /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
+ * Elias Summermatter (2021) in section 3.11 **/
+ uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
+
+ if (next_size<ibf_min_size)
+ next_size = ibf_min_size;
+
+
+ if (next_size <= MAX_IBF_SIZE)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"decoding failed, sending larger ibf (size %u)\n",
- 1 << next_order);
+ next_size);
GNUNET_STATISTICS_update (_GSS_statistics,
"# of IBF retries",
1,
GNUNET_NO);
- op->salt_send++;
+#if MEASURE_PERFORMANCE
+ perf_store.active_passive_switches += 1;
+#endif
+
+ op->salt_send = op->salt_receive++;
+
if (GNUNET_OK !=
- send_ibf (op, next_order))
+ send_ibf (op, next_size))
{
/* Internal error, best we can do is shut the connection */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1786,7 +2839,9 @@ decode_and_send (struct Operation *op)
LOG (GNUNET_ERROR_TYPE_DEBUG,
"transmitted all values, sending DONE\n");
- perf_rtt.done.sent += 1;
+#if MEASURE_PERFORMANCE
+ perf_store.done.sent += 1;
+#endif
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
GNUNET_MQ_send (op->mq, ev);
/* We now wait until we get a DONE message back
@@ -1797,7 +2852,6 @@ decode_and_send (struct Operation *op)
if (1 == side)
{
struct IBF_Key unsalted_key;
-
unsalt_key (&key,
op->salt_receive,
&unsalted_key);
@@ -1809,8 +2863,29 @@ decode_and_send (struct Operation *op)
struct GNUNET_MQ_Envelope *ev;
struct InquiryMessage *msg;
- perf_rtt.inquery.sent += 1;
- perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key);
+#if MEASURE_PERFORMANCE
+ perf_store.inquery.sent += 1;
+ perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
+#endif
+
+ /** Add sent inquiries to hashmap for flow control **/
+ struct GNUNET_HashContext *hashed_key_context =
+ GNUNET_CRYPTO_hash_context_start ();
+ struct GNUNET_HashCode *hashed_key = (struct
+ GNUNET_HashCode*) GNUNET_malloc (
+ sizeof(struct GNUNET_HashCode));
+ enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT;
+ GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+ &key,
+ sizeof(struct IBF_Key));
+ GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
+ hashed_key);
+ GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
+ hashed_key,
+ &mcfs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
+ );
+
/* It may be nice to merge multiple requests, but with CADET's corking it is not worth
* the effort additional complexity. */
ev = GNUNET_MQ_msg_extra (msg,
@@ -1836,6 +2911,100 @@ decode_and_send (struct Operation *op)
/**
+ * Check send full message received from other peer
+ * @param cls
+ * @param msg
+ * @return
+ */
+
+static int
+check_union_p2p_send_full (void *cls,
+ const struct TransmitFullMessage *msg)
+{
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle send full message received from other peer
+ *
+ * @param cls
+ * @param msg
+ */
+static void
+handle_union_p2p_send_full (void *cls,
+ const struct TransmitFullMessage *msg)
+{
+ struct Operation *op = cls;
+
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ /** write received values to operator**/
+ op->remote_element_count = ntohl (msg->remote_set_size);
+ op->remote_set_diff = ntohl (msg->remote_set_difference);
+ op->local_set_diff = ntohl (msg->local_set_difference);
+
+ /** Check byzantine limits **/
+ if (check_byzantine_bounds (op) != GNUNET_OK)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
+ "criteria\n");
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ /** Calculate avg element size if not initial sync **/
+ op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
+ op->set->content->elements);
+ uint64_t avg_element_size = 0;
+ if (0 < op->local_element_count)
+ {
+ op->total_elements_size_local = 0;
+ GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+ &
+ determinate_avg_element_size_iterator,
+ op);
+ avg_element_size = op->total_elements_size_local / op->local_element_count;
+ }
+
+ /** Validate mode of operation **/
+ int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
+ op->
+ remote_element_count,
+ op->
+ local_element_count,
+ op->local_set_diff,
+ op->remote_set_diff,
+ op->
+ rtt_bandwidth_tradeoff,
+ op->
+ ibf_bucket_number_factor);
+ if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
+ " : %d\n", mode_of_operation);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ op->phase = PHASE_FULL_RECEIVING;
+}
+
+
+/**
* Check an IBF message from a remote peer.
*
* Reassemble the IBF from multiple pieces, and
@@ -1872,7 +3041,8 @@ check_union_p2p_ibf (void *cls,
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- if (1 << msg->order != op->remote_ibf->size)
+
+ if (msg->ibf_size != op->remote_ibf->size)
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
@@ -1909,9 +3079,26 @@ handle_union_p2p_ibf (void *cls,
{
struct Operation *op = cls;
unsigned int buckets_in_message;
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
+ PHASE_PASSIVE_DECODING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+ op->differential_sync_iterations++;
+ check_max_differential_rounds (op);
+ op->active_passive_switch_required = false;
- perf_rtt.ibf.received += 1;
- perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
+#if MEASURE_PERFORMANCE
+ perf_store.ibf.received += 1;
+ perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
+#endif
buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
/ IBF_BUCKET_SIZE;
@@ -1922,8 +3109,10 @@ handle_union_p2p_ibf (void *cls,
GNUNET_assert (NULL == op->remote_ibf);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Creating new ibf of size %u\n",
- 1 << msg->order);
- op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
+ ntohl (msg->ibf_size));
+ // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
+ op->remote_ibf = ibf_create (msg->ibf_size,
+ ((uint8_t) op->ibf_number_buckets_per_element));
op->salt_receive = ntohl (msg->salt);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Receiving new IBF with salt %u\n",
@@ -1954,7 +3143,7 @@ handle_union_p2p_ibf (void *cls,
ibf_read_slice (&msg[1],
op->ibf_buckets_received,
buckets_in_message,
- op->remote_ibf);
+ op->remote_ibf, msg->ibf_counter_bit_length);
op->ibf_buckets_received += buckets_in_message;
if (op->ibf_buckets_received == op->remote_ibf->size)
@@ -2030,18 +3219,24 @@ maybe_finish (struct Operation *op)
num_demanded = GNUNET_CONTAINER_multihashmap_size (
op->demanded_hashes);
-
+ int send_done = GNUNET_CONTAINER_multihashmap_iterate (
+ op->message_control_flow,
+ &
+ determinate_done_message_iterator,
+ op);
if (PHASE_FINISH_WAITING == op->phase)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "In PHASE_FINISH_WAITING, pending %u demands\n",
- num_demanded);
- if (0 == num_demanded)
+ "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
+ num_demanded, op->peer_site);
+ if (-1 != send_done)
{
struct GNUNET_MQ_Envelope *ev;
op->phase = PHASE_FINISHED;
- perf_rtt.done.sent += 1;
+#if MEASURE_PERFORMANCE
+ perf_store.done.sent += 1;
+#endif
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
GNUNET_MQ_send (op->mq,
ev);
@@ -2052,9 +3247,9 @@ maybe_finish (struct Operation *op)
if (PHASE_FINISH_CLOSING == op->phase)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "In PHASE_FINISH_CLOSING, pending %u demands\n",
- num_demanded);
- if (0 == num_demanded)
+ "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
+ num_demanded, op->peer_site);
+ if (-1 != send_done)
{
op->phase = PHASE_FINISHED;
send_client_done (op);
@@ -2102,11 +3297,25 @@ handle_union_p2p_elements (void *cls,
struct KeyEntry *ke;
uint16_t element_size;
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
+ PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
element_size = ntohs (emsg->header.size) - sizeof(struct
GNUNET_SETU_ElementMessage);
- perf_rtt.element.received += 1;
- perf_rtt.element.received_var_bytes += element_size;
+#if MEASURE_PERFORMANCE
+ perf_store.element.received += 1;
+ perf_store.element.received_var_bytes += element_size;
+#endif
ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
GNUNET_memcpy (&ee[1],
@@ -2129,6 +3338,21 @@ handle_union_p2p_elements (void *cls,
return;
}
+ if (GNUNET_OK !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_RECEIVED,
+ &ee->element_hash,
+ ELEMENT_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "An element has been received more than once!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got element (size %u, hash %s) from peer\n",
(unsigned int) element_size,
@@ -2217,33 +3441,25 @@ handle_union_p2p_full_element (void *cls,
struct KeyEntry *ke;
uint16_t element_size;
-
-
- if(PHASE_EXPECT_IBF == op->phase) {
- op->phase = PHASE_FULL_RECEIVING;
- }
-
-
-
- /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */
- if ((PHASE_FULL_RECEIVING != op->phase) &&
- (PHASE_FULL_SENDING != op->phase))
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Handle full element phase is %u\n",
- (unsigned) op->phase);
- GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
}
-
-
element_size = ntohs (emsg->header.size)
- sizeof(struct GNUNET_SETU_ElementMessage);
- perf_rtt.element_full.received += 1;
- perf_rtt.element_full.received_var_bytes += element_size;
+#if MEASURE_PERFORMANCE
+ perf_store.element_full.received += 1;
+ perf_store.element_full.received_var_bytes += element_size;
+#endif
ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
GNUNET_memcpy (&ee[1], &emsg[1], element_size);
@@ -2268,17 +3484,15 @@ handle_union_p2p_full_element (void *cls,
GNUNET_NO);
op->received_total++;
-
ke = op_get_element (op,
&ee->element_hash);
if (NULL != ke)
{
- /* Got repeated element. Should not happen since
- * we track demands. */
GNUNET_STATISTICS_update (_GSS_statistics,
"# repeated elements",
1,
GNUNET_NO);
+ full_sync_plausibility_check (op);
ke->received = GNUNET_YES;
GNUNET_free (ee);
}
@@ -2294,15 +3508,15 @@ handle_union_p2p_full_element (void *cls,
GNUNET_SETU_STATUS_ADD_LOCAL);
}
+
if ((GNUNET_YES == op->byzantine) &&
- (op->received_total > 384 + op->received_fresh * 4) &&
- (op->received_fresh < op->received_total / 6))
+ (op->received_total > op->remote_element_count) )
{
/* The other peer gave us lots of old elements, there's something wrong. */
LOG (GNUNET_ERROR_TYPE_ERROR,
- "Other peer sent only %llu/%llu fresh elements, failing operation\n",
- (unsigned long long) op->received_fresh,
- (unsigned long long) op->received_total);
+ "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
+ (unsigned long long) op->received_total,
+ (unsigned long long) op->remote_element_count);
GNUNET_break_op (0);
fail_union_operation (op);
return;
@@ -2356,18 +3570,50 @@ handle_union_p2p_inquiry (void *cls,
const struct IBF_Key *ibf_key;
unsigned int num_keys;
- perf_rtt.inquery.received += 1;
- perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage));
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
+#if MEASURE_PERFORMANCE
+ perf_store.inquery.received += 1;
+ perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
+ - sizeof(struct InquiryMessage));
+#endif
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received union inquiry\n");
num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
/ sizeof(struct IBF_Key);
ibf_key = (const struct IBF_Key *) &msg[1];
+
+ /** Add received inquiries to hashmap for flow control **/
+ struct GNUNET_HashContext *hashed_key_context =
+ GNUNET_CRYPTO_hash_context_start ();
+ struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
+ sizeof(struct GNUNET_HashCode));;
+ enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED;
+ GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+ &ibf_key,
+ sizeof(struct IBF_Key));
+ GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
+ hashed_key);
+ GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
+ hashed_key,
+ &mcfs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
+ );
+
while (0 != num_keys--)
{
struct IBF_Key unsalted_key;
-
unsalt_key (ibf_key,
ntohl (msg->salt),
&unsalted_key);
@@ -2402,7 +3648,9 @@ send_missing_full_elements_iter (void *cls,
if (GNUNET_YES == ke->received)
return GNUNET_YES;
- perf_rtt.element_full.received += 1;
+#if MEASURE_PERFORMANCE
+ perf_store.element_full.received += 1;
+#endif
ev = GNUNET_MQ_msg_extra (emsg,
ee->element.size,
GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
@@ -2422,18 +3670,84 @@ send_missing_full_elements_iter (void *cls,
* @param cls closure, a set union operation
* @param mh the demand message
*/
+static int
+check_union_p2p_request_full (void *cls,
+ const struct TransmitFullMessage *mh)
+{
+ return GNUNET_OK;
+}
+
+
static void
handle_union_p2p_request_full (void *cls,
- const struct GNUNET_MessageHeader *mh)
+ const struct TransmitFullMessage *msg)
{
struct Operation *op = cls;
- perf_rtt.request_full.received += 1;
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
+ op->remote_element_count = ntohl (msg->remote_set_size);
+ op->remote_set_diff = ntohl (msg->remote_set_difference);
+ op->local_set_diff = ntohl (msg->local_set_difference);
+
+
+ if (check_byzantine_bounds (op) != GNUNET_OK)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
+ "criteria\n");
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+#if MEASURE_PERFORMANCE
+ perf_store.request_full.received += 1;
+#endif
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received request for full set transmission\n");
- if (PHASE_EXPECT_IBF != op->phase)
+
+ /** Calculate avg element size if not initial sync **/
+ op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
+ op->set->content->elements);
+ uint64_t avg_element_size = 0;
+ if (0 < op->local_element_count)
+ {
+ op->total_elements_size_local = 0;
+ GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+ &
+ determinate_avg_element_size_iterator,
+ op);
+ avg_element_size = op->total_elements_size_local / op->local_element_count;
+ }
+
+ int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
+ op->
+ remote_element_count,
+ op->
+ local_element_count,
+ op->local_set_diff,
+ op->remote_set_diff,
+ op->
+ rtt_bandwidth_tradeoff,
+ op->
+ ibf_bucket_number_factor);
+ if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation)
{
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
+ " : %d\n", mode_of_operation);
GNUNET_break_op (0);
fail_union_operation (op);
return;
@@ -2458,7 +3772,21 @@ handle_union_p2p_full_done (void *cls,
{
struct Operation *op = cls;
- perf_rtt.full_done.received += 1;
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
+#if MEASURE_PERFORMANCE
+ perf_store.full_done.received += 1;
+#endif
switch (op->phase)
{
@@ -2466,6 +3794,19 @@ handle_union_p2p_full_done (void *cls,
{
struct GNUNET_MQ_Envelope *ev;
+ if ((GNUNET_YES == op->byzantine) &&
+ (op->received_total != op->remote_element_count) )
+ {
+ /* The other peer gave not enough elements before sending full done, there's something wrong. */
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Other peer sent only %llu/%llu fresh elements, failing operation\n",
+ (unsigned long long) op->received_total,
+ (unsigned long long) op->remote_element_count);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got FULL DONE, sending elements that other peer is missing\n");
@@ -2473,7 +3814,9 @@ handle_union_p2p_full_done (void *cls,
GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
&send_missing_full_elements_iter,
op);
- perf_rtt.full_done.sent += 1;
+#if MEASURE_PERFORMANCE
+ perf_store.full_done.sent += 1;
+#endif
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
GNUNET_MQ_send (op->mq,
ev);
@@ -2552,8 +3895,23 @@ handle_union_p2p_demand (void *cls,
unsigned int num_hashes;
struct GNUNET_MQ_Envelope *ev;
- perf_rtt.demand.received += 1;
- perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader));
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
+ PHASE_FINISH_WAITING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+#if MEASURE_PERFORMANCE
+ perf_store.demand.received += 1;
+ perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
+ GNUNET_MessageHeader));
+#endif
num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
/ sizeof(struct GNUNET_HashCode);
@@ -2570,6 +3928,39 @@ handle_union_p2p_demand (void *cls,
fail_union_operation (op);
return;
}
+
+ /* Save send demand message for message control */
+ if (GNUNET_YES !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_RECEIVED,
+ &ee->element_hash,
+ DEMAND_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double demand message received found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+ ;
+
+ /* Mark element to be expected to received */
+ if (GNUNET_YES !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_SENT,
+ &ee->element_hash,
+ ELEMENT_MESSAGE)
+ )
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double element message sent found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
{
/* Probably confused lazily copied sets. */
@@ -2577,8 +3968,10 @@ handle_union_p2p_demand (void *cls,
fail_union_operation (op);
return;
}
- perf_rtt.element.sent += 1;
- perf_rtt.element.sent_var_bytes += ee->element.size;
+#if MEASURE_PERFORMANCE
+ perf_store.element.sent += 1;
+ perf_store.element.sent_var_bytes += ee->element.size;
+#endif
ev = GNUNET_MQ_msg_extra (emsg,
ee->element.size,
GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
@@ -2600,9 +3993,10 @@ handle_union_p2p_demand (void *cls,
if (op->symmetric)
send_client_element (op,
&ee->element,
- GNUNET_SET_STATUS_ADD_REMOTE);
+ GNUNET_SETU_STATUS_ADD_REMOTE);
}
GNUNET_CADET_receive_done (op->channel);
+ maybe_finish (op);
}
@@ -2653,9 +4047,23 @@ handle_union_p2p_offer (void *cls,
struct Operation *op = cls;
const struct GNUNET_HashCode *hash;
unsigned int num_hashes;
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
- perf_rtt.offer.received += 1;
- perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader));
+#if MEASURE_PERFORMANCE
+ perf_store.offer.received += 1;
+ perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
+ GNUNET_MessageHeader));
+#endif
num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
/ sizeof(struct GNUNET_HashCode);
@@ -2693,11 +4101,68 @@ handle_union_p2p_offer (void *cls,
"[OP %p] Requesting element (hash %s)\n",
op, GNUNET_h2s (hash));
- perf_rtt.demand.sent += 1;
- perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
+#if MEASURE_PERFORMANCE
+ perf_store.demand.sent += 1;
+ perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
+#endif
ev = GNUNET_MQ_msg_header_extra (demands,
sizeof(struct GNUNET_HashCode),
GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
+ /* Save send demand message for message control */
+ if (GNUNET_YES !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_SENT,
+ hash,
+ DEMAND_MESSAGE)
+ )
+ {
+ // GNUNET_free (ev);
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double demand message sent found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+ ;
+
+ /* Mark offer as received received */
+ if (GNUNET_YES !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_RECEIVED,
+ hash,
+ OFFER_MESSAGE)
+ )
+ {
+ // GNUNET_free (ev);
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Double offer message received found!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+ ;
+
+ /* Mark element to be expected to received */
+ if (GNUNET_YES !=
+ update_message_control_flow (
+ op->message_control_flow,
+ MSG_CFS_EXPECTED,
+ hash,
+ ELEMENT_MESSAGE)
+ )
+ {
+ // GNUNET_free (ev);
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Element already expected!\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+ ;
+
+
GNUNET_memcpy (&demands[1],
hash,
sizeof(struct GNUNET_HashCode));
@@ -2719,7 +4184,30 @@ handle_union_p2p_done (void *cls,
{
struct Operation *op = cls;
- perf_rtt.done.received += 1;
+ /**
+ * Check that the message is received only in supported phase
+ */
+ uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
+ if (GNUNET_OK !=
+ check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ if (op->active_passive_switch_required)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "PROTOCOL VIOLATION: Received done but role change is necessary\n");
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
+#if MEASURE_PERFORMANCE
+ perf_store.done.received += 1;
+#endif
switch (op->phase)
{
case PHASE_PASSIVE_DECODING:
@@ -2728,26 +4216,26 @@ handle_union_p2p_done (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got DONE (as passive partner), waiting for our demands to be satisfied\n");
/* The active peer is done sending offers
- * and inquiries. This means that all
- * our responses to that (demands and offers)
- * must be in flight (queued or in mesh).
- *
- * We should notify the active peer once
- * all our demands are satisfied, so that the active
- * peer can quit if we gave it everything.
- */GNUNET_CADET_receive_done (op->channel);
+ * and inquiries. This means that all
+ * our responses to that (demands and offers)
+ * must be in flight (queued or in mesh).
+ *
+ * We should notify the active peer once
+ * all our demands are satisfied, so that the active
+ * peer can quit if we gave it everything.
+ */GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
return;
case PHASE_ACTIVE_DECODING:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got DONE (as active partner), waiting to finish\n");
/* All demands of the other peer are satisfied,
- * and we processed all offers, thus we know
- * exactly what our demands must be.
- *
- * We'll close the channel
- * to the other peer once our demands are met.
- */op->phase = PHASE_FINISH_CLOSING;
+ * and we processed all offers, thus we know
+ * exactly what our demands must be.
+ *
+ * We'll close the channel
+ * to the other peer once our demands are met.
+ */op->phase = PHASE_FINISH_CLOSING;
GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
return;
@@ -2769,7 +4257,9 @@ static void
handle_union_p2p_over (void *cls,
const struct GNUNET_MessageHeader *mh)
{
- perf_rtt.over.received += 1;
+#if MEASURE_PERFORMANCE
+ perf_store.over.received += 1;
+#endif
send_client_done (cls);
}
@@ -2943,7 +4433,7 @@ check_incoming_msg (void *cls,
struct Listener *listener = op->listener;
const struct GNUNET_MessageHeader *nested_context;
- /* double operation request */
+ /* double operation request */
if (0 != op->suggest_id)
{
GNUNET_break_op (0);
@@ -3053,10 +4543,10 @@ handle_client_create_set (void *cls,
}
set = GNUNET_new (struct Set);
{
- struct StrataEstimator *se;
+ struct MultiStrataEstimator *se;
se = strata_estimator_create (SE_STRATA_COUNT,
- SE_IBF_SIZE,
+ SE_IBFS_TOTAL_SIZE,
SE_IBF_HASH_NUM);
if (NULL == se)
{
@@ -3198,6 +4688,7 @@ channel_window_cb (void *cls,
* @param cls client that sent the message
* @param msg message sent by the client
*/
+
static void
handle_client_listen (void *cls,
const struct GNUNET_SETU_ListenMessage *msg)
@@ -3240,10 +4731,10 @@ handle_client_listen (void *cls,
GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
struct GNUNET_MessageHeader,
NULL),
- GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
- GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
- NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
+ struct TransmitFullMessage,
+ NULL),
GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
struct StrataEstimatorMessage,
@@ -3256,6 +4747,10 @@ handle_client_listen (void *cls,
GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
struct GNUNET_SETU_ElementMessage,
NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_send_full,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
+ struct TransmitFullMessage,
+ NULL),
GNUNET_MQ_handler_end ()
};
struct Listener *listener;
@@ -3451,6 +4946,7 @@ handle_client_evaluate (void *cls,
{
struct ClientState *cs = cls;
struct Operation *op = GNUNET_new (struct Operation);
+
const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
GNUNET_MQ_hd_var_size (incoming_msg,
GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
@@ -3488,10 +4984,10 @@ handle_client_evaluate (void *cls,
GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
struct GNUNET_MessageHeader,
op),
- GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
- GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
- op),
+ GNUNET_MQ_hd_var_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
+ struct TransmitFullMessage,
+ op),
GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
struct StrataEstimatorMessage,
@@ -3504,6 +5000,10 @@ handle_client_evaluate (void *cls,
GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
struct GNUNET_SETU_ElementMessage,
op),
+ GNUNET_MQ_hd_var_size (union_p2p_send_full,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
+ struct TransmitFullMessage,
+ NULL),
GNUNET_MQ_handler_end ()
};
struct Set *set;
@@ -3525,8 +5025,23 @@ handle_client_evaluate (void *cls,
op->force_full = msg->force_full;
op->force_delta = msg->force_delta;
op->symmetric = msg->symmetric;
+ op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
+ op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
+ op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
+ op->byzantine_upper_bound = msg->byzantine_upper_bond;
+ op->active_passive_switch_required = false;
context = GNUNET_MQ_extract_nested_mh (msg);
+ /* create hashmap for message control */
+ op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
+ GNUNET_NO);
+ op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
+
+#if MEASURE_PERFORMANCE
+ /* load config */
+ load_config (op);
+#endif
+
/* Advance generation values, so that
mutations won't interfer with the running operation. */
op->set = set;
@@ -3550,7 +5065,9 @@ handle_client_evaluate (void *cls,
struct GNUNET_MQ_Envelope *ev;
struct OperationRequestMessage *msg;
- perf_rtt.operation_request.sent += 1;
+#if MEASURE_PERFORMANCE
+ perf_store.operation_request.sent += 1;
+#endif
ev = GNUNET_MQ_msg_nested_mh (msg,
GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
context);
@@ -3567,7 +5084,11 @@ handle_client_evaluate (void *cls,
op->se = strata_estimator_dup (op->set->se);
/* we started the operation, thus we have to send the operation request */
op->phase = PHASE_EXPECT_SE;
- op->salt_receive = op->salt_send = 42; // FIXME?????
+
+ op->salt_receive = (op->peer_site + 1) % 2;
+ op->salt_send = op->peer_site; // FIXME?????
+
+
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Initiating union operation evaluation\n");
GNUNET_STATISTICS_update (_GSS_statistics,
@@ -3711,6 +5232,20 @@ handle_client_accept (void *cls,
op->force_full = msg->force_full;
op->force_delta = msg->force_delta;
op->symmetric = msg->symmetric;
+ op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
+ op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
+ op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
+ op->byzantine_upper_bound = msg->byzantine_upper_bond;
+ op->active_passive_switch_required = false;
+ /* create hashmap for message control */
+ op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
+ GNUNET_NO);
+ op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
+
+#if MEASURE_PERFORMANCE
+ /* load config */
+ load_config (op);
+#endif
/* Advance generation values, so that future mutations do not
interfer with the running operation. */
@@ -3729,7 +5264,7 @@ handle_client_accept (void *cls,
1,
GNUNET_NO);
{
- const struct StrataEstimator *se;
+ struct MultiStrataEstimator *se;
struct GNUNET_MQ_Envelope *ev;
struct StrataEstimatorMessage *strata_msg;
char *buf;
@@ -3739,20 +5274,40 @@ handle_client_accept (void *cls,
op->se = strata_estimator_dup (op->set->se);
op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
GNUNET_NO);
- op->salt_receive = op->salt_send = 42; // FIXME?????
+ op->salt_receive = (op->peer_site + 1) % 2;
+ op->salt_send = op->peer_site; // FIXME?????
initialize_key_to_element (op);
op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
op->key_to_element);
/* kick off the operation */
se = op->se;
- buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
+
+ uint8_t se_count = 1;
+ if (op->initial_size > 0)
+ {
+ op->total_elements_size_local = 0;
+ GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+ &
+ determinate_avg_element_size_iterator,
+ op);
+ se_count = determine_strata_count (
+ op->total_elements_size_local / op->initial_size,
+ op->initial_size);
+ }
+ buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE
+ * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
len = strata_estimator_write (se,
+ SE_IBFS_TOTAL_SIZE,
+ se_count,
buf);
- perf_rtt.se.sent += 1;
- perf_rtt.se.sent_var_bytes += len;
+#if MEASURE_PERFORMANCE
+ perf_store.se.sent += 1;
+ perf_store.se.sent_var_bytes += len;
+#endif
- if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
+ if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
+ * SE_IBFS_TOTAL_SIZE)
type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
else
type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
@@ -3766,6 +5321,7 @@ handle_client_accept (void *cls,
strata_msg->set_size
= GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
op->set->content->elements));
+ strata_msg->se_count = se_count;
GNUNET_MQ_send (op->mq,
ev);
op->phase = PHASE_EXPECT_IBF;
@@ -3800,8 +5356,9 @@ shutdown_task (void *cls)
GNUNET_YES);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"handled shutdown request\n");
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "RTT:%f\n", calculate_perf_rtt());
+#if MEASURE_PERFORMANCE
+ calculate_perf_store ();
+#endif
}
diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h
index a2803ee47..d8f34f69c 100644
--- a/src/setu/gnunet-service-setu_protocol.h
+++ b/src/setu/gnunet-service-setu_protocol.h
@@ -40,11 +40,6 @@ struct OperationRequestMessage
struct GNUNET_MessageHeader header;
/**
- * Operation to request, values from `enum GNUNET_SET_OperationType`
- */
- uint32_t operation GNUNET_PACKED;
-
- /**
* For Intersection: my element count
*/
uint32_t element_count GNUNET_PACKED;
@@ -72,20 +67,9 @@ struct IBFMessage
struct GNUNET_MessageHeader header;
/**
- * Order of the whole ibf, where
- * num_buckets = 2^order
- */
- uint8_t order;
-
- /**
- * Padding, must be 0.
+ * Size of the whole ibf (number of buckets)
*/
- uint8_t reserved1;
-
- /**
- * Padding, must be 0.
- */
- uint16_t reserved2 GNUNET_PACKED;
+ uint32_t ibf_size;
/**
* Offset of the strata in the rest of the message
@@ -95,10 +79,22 @@ struct IBFMessage
/**
* Salt used when hashing elements for this IBF.
*/
- uint32_t salt GNUNET_PACKED;
+ uint16_t salt GNUNET_PACKED;
+ /**
+ * The bit lenght of the counter
+ */
+ uint16_t ibf_counter_bit_length;
/* rest: buckets */
};
+/**
+estimate_best_mode_of_operation (uint64_t avg_element_size,
+uint64_t local_set_size,
+ uint64_t remote_set_size,
+uint64_t est_set_diff_remote,
+ uint64_t est_set_diff_local,)
+ **/
+
struct InquiryMessage
@@ -113,11 +109,6 @@ struct InquiryMessage
*/
uint32_t salt GNUNET_PACKED;
- /**
- * Reserved, set to 0.
- */
- uint32_t reserved GNUNET_PACKED;
-
/* rest: inquiry IBF keys */
};
@@ -218,9 +209,47 @@ struct StrataEstimatorMessage
*/
struct GNUNET_MessageHeader header;
+ /**
+ * The number of ses transmitted
+ */
+ uint8_t se_count;
+
+ /**
+ * Size of the local set
+ */
uint64_t set_size;
};
+
+/**
+ * Message which signals to other peer that we are sending full set
+ *
+ */
+struct TransmitFullMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Remote set difference calculated with strata estimator
+ */
+ uint32_t remote_set_difference;
+
+ /**
+ * Total remote set size
+ */
+ uint32_t remote_set_size;
+
+ /**
+ * Local set difference calculated with strata estimator
+ */
+ uint32_t local_set_difference;
+
+};
+
+
GNUNET_NETWORK_STRUCT_END
#endif
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c
index 7c9a4deb6..7981cc847 100644
--- a/src/setu/gnunet-service-setu_strata_estimator.c
+++ b/src/setu/gnunet-service-setu_strata_estimator.c
@@ -22,6 +22,7 @@
* @brief invertible bloom filter
* @author Florian Dold
* @author Christian Grothoff
+ * @author Elias Summermatter
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -30,6 +31,82 @@
/**
+ * Should we try compressing the strata estimator? This will
+ * break compatibility with the 0.10.1-network.
+ */
+#define FAIL_10_1_COMPATIBILTIY 1
+
+/**
+ * Number of strata estimators in memory NOT transmitted
+ */
+
+#define MULTI_SE_BASE_COUNT 8
+
+/**
+ * The avg size of 1 se
+ * Based on the bsc thesis of Elias Summermatter (2021)
+ */
+
+#define AVG_BYTE_SIZE_SE 4221
+
+/**
+ * Calculates the optimal number of strata Estimators to send
+ * @param avg_element_size
+ * @param element_count
+ * @return
+ */
+uint8_t
+determine_strata_count (uint64_t avg_element_size, uint64_t element_count)
+{
+ uint64_t base_size = avg_element_size * element_count;
+ /* >67kb total size of elements in set */
+ if (base_size < AVG_BYTE_SIZE_SE * 16)
+ return 1;
+ /* >270kb total size of elements in set */
+ if (base_size < AVG_BYTE_SIZE_SE * 64)
+ return 2;
+ /* >1mb total size of elements in set */
+ if (base_size < AVG_BYTE_SIZE_SE * 256)
+ return 4;
+ return 8;
+}
+
+
+/**
+ * Modify an IBF key @a k_in based on the @a salt, returning a
+ * salted key in @a k_out.
+ */
+static void
+salt_key (const struct IBF_Key *k_in,
+ uint32_t salt,
+ struct IBF_Key *k_out)
+{
+ int s = (salt * 7) % 64;
+ uint64_t x = k_in->key_val;
+
+ /* rotate ibf key */
+ x = (x >> s) | (x << (64 - s));
+ k_out->key_val = x;
+}
+
+
+/**
+ * Reverse modification done in the salt_key function
+ */
+static void
+unsalt_key (const struct IBF_Key *k_in,
+ uint32_t salt,
+ struct IBF_Key *k_out)
+{
+ int s = (salt * 7) % 64;
+ uint64_t x = k_in->key_val;
+
+ x = (x << s) | (x >> (64 - s));
+ k_out->key_val = x;
+}
+
+
+/**
* Write the given strata estimator to the buffer.
*
* @param se strata estimator to serialize
@@ -37,21 +114,33 @@
* @return number of bytes written to @a buf
*/
size_t
-strata_estimator_write (const struct StrataEstimator *se,
+strata_estimator_write (struct MultiStrataEstimator *se,
+ uint16_t se_ibf_total_size,
+ uint8_t number_se_send,
void *buf)
{
char *sbuf = buf;
+ unsigned int i;
size_t osize;
+ uint64_t sbuf_offset = 0;
+ se->size = number_se_send;
GNUNET_assert (NULL != se);
- for (unsigned int i = 0; i < se->strata_count; i++)
+ for (uint8_t strata_ctr = 0; strata_ctr < number_se_send; strata_ctr++)
{
- ibf_write_slice (se->strata[i],
- 0,
- se->ibf_size,
- &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]);
+ for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
+ {
+ ibf_write_slice (se->stratas[strata_ctr]->strata[i],
+ 0,
+ se->stratas[strata_ctr]->ibf_size,
+ &sbuf[sbuf_offset],
+ 8);
+ sbuf_offset += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE;
+ }
}
- osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
+ osize = ((se_ibf_total_size / 8) * number_se_send) * IBF_BUCKET_SIZE
+ * se->stratas[0]->strata_count;
+#if FAIL_10_1_COMPATIBILTIY
{
char *cbuf;
size_t nsize;
@@ -62,13 +151,12 @@ strata_estimator_write (const struct StrataEstimator *se,
&cbuf,
&nsize))
{
- GNUNET_memcpy (buf,
- cbuf,
- nsize);
+ GNUNET_memcpy (buf, cbuf, nsize);
osize = nsize;
GNUNET_free (cbuf);
}
}
+#endif
return osize;
}
@@ -87,15 +175,19 @@ int
strata_estimator_read (const void *buf,
size_t buf_len,
int is_compressed,
- struct StrataEstimator *se)
+ uint8_t number_se_received,
+ uint16_t se_ibf_total_size,
+ struct MultiStrataEstimator *se)
{
+ unsigned int i;
size_t osize;
char *dbuf;
dbuf = NULL;
if (GNUNET_YES == is_compressed)
{
- osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
+ osize = ((se_ibf_total_size / 8) * number_se_received) * IBF_BUCKET_SIZE
+ * se->stratas[0]->strata_count;
dbuf = GNUNET_decompress (buf,
buf_len,
osize);
@@ -108,18 +200,25 @@ strata_estimator_read (const void *buf,
buf_len = osize;
}
- if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE)
+ if (buf_len != se->stratas[0]->strata_count * ((se_ibf_total_size / 8)
+ * number_se_received)
+ * IBF_BUCKET_SIZE)
{
GNUNET_break (0); /* very odd error */
GNUNET_free (dbuf);
return GNUNET_SYSERR;
}
- for (unsigned int i = 0; i < se->strata_count; i++)
+ for (uint8_t strata_ctr = 0; strata_ctr < number_se_received; strata_ctr++)
{
- ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]);
- buf += se->ibf_size * IBF_BUCKET_SIZE;
+ for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
+ {
+ ibf_read_slice (buf, 0, se->stratas[strata_ctr]->ibf_size,
+ se->stratas[strata_ctr]->strata[i], 8);
+ buf += se->stratas[strata_ctr]->ibf_size * IBF_BUCKET_SIZE;
+ }
}
+ se->size = number_se_received;
GNUNET_free (dbuf);
return GNUNET_OK;
}
@@ -132,38 +231,61 @@ strata_estimator_read (const void *buf,
* @param key key to add
*/
void
-strata_estimator_insert (struct StrataEstimator *se,
+strata_estimator_insert (struct MultiStrataEstimator *se,
struct IBF_Key key)
{
- uint64_t v;
- unsigned int i;
- v = key.key_val;
+
/* count trailing '1'-bits of v */
- for (i = 0; v & 1; v >>= 1, i++)
- /* empty */;
- ibf_insert (se->strata[i], key);
+ for (int strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
+ {
+ unsigned int i;
+ uint64_t v;
+
+ struct IBF_Key salted_key;
+ salt_key (&key,
+ strata_ctr * (64 / MULTI_SE_BASE_COUNT),
+ &salted_key);
+ v = salted_key.key_val;
+ for (i = 0; v & 1; v >>= 1, i++)
+ {
+ ibf_insert (se->stratas[strata_ctr]->strata[i], salted_key);
+ }
+ }
+ /* empty */;
+
}
/**
- * Remove a key from the strata estimator.
+ * Remove a key from the strata estimator. (NOT USED)
*
* @param se strata estimator to remove the key from
* @param key key to remove
*/
void
-strata_estimator_remove (struct StrataEstimator *se,
+strata_estimator_remove (struct MultiStrataEstimator *se,
struct IBF_Key key)
{
- uint64_t v;
- unsigned int i;
- v = key.key_val;
/* count trailing '1'-bits of v */
- for (i = 0; v & 1; v >>= 1, i++)
- /* empty */;
- ibf_remove (se->strata[i], key);
+ for (int strata_ctr = 0; strata_ctr < se->size; strata_ctr++)
+ {
+ uint64_t v;
+ unsigned int i;
+
+ struct IBF_Key unsalted_key;
+ unsalt_key (&key,
+ strata_ctr * (64 / MULTI_SE_BASE_COUNT),
+ &unsalted_key);
+
+ v = unsalted_key.key_val;
+ for (i = 0; v & 1; v >>= 1, i++)
+ {
+ /* empty */;
+ ibf_remove (se->stratas[strata_ctr]->strata[i], unsalted_key);
+ }
+ }
}
@@ -175,29 +297,42 @@ strata_estimator_remove (struct StrataEstimator *se,
* @param ibf_hashnum hashnum parameter of each ibf
* @return a freshly allocated, empty strata estimator, NULL on error
*/
-struct StrataEstimator *
+struct MultiStrataEstimator *
strata_estimator_create (unsigned int strata_count,
uint32_t ibf_size,
uint8_t ibf_hashnum)
{
- struct StrataEstimator *se;
-
- se = GNUNET_new (struct StrataEstimator);
- se->strata_count = strata_count;
- se->ibf_size = ibf_size;
- se->strata = GNUNET_new_array (strata_count,
- struct InvertibleBloomFilter *);
- for (unsigned int i = 0; i < strata_count; i++)
+ struct MultiStrataEstimator *se;
+ unsigned int i;
+ unsigned int j;
+ se = GNUNET_new (struct MultiStrataEstimator);
+
+ se->size = MULTI_SE_BASE_COUNT;
+ se->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *);
+
+ uint8_t ibf_prime_sizes[] = {79,79,79,79,79,79,79,79};
+
+ for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
{
- se->strata[i] = ibf_create (ibf_size, ibf_hashnum);
- if (NULL == se->strata[i])
+ se->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator);
+ se->stratas[strata_ctr]->strata_count = strata_count;
+ se->stratas[strata_ctr]->ibf_size = ibf_prime_sizes[strata_ctr];
+ se->stratas[strata_ctr]->strata = GNUNET_new_array (strata_count * 4,
+ struct
+ InvertibleBloomFilter *);
+ for (i = 0; i < strata_count; i++)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to allocate memory for strata estimator\n");
- for (unsigned int j = 0; j < i; j++)
- ibf_destroy (se->strata[i]);
- GNUNET_free (se);
- return NULL;
+ se->stratas[strata_ctr]->strata[i] = ibf_create (
+ ibf_prime_sizes[strata_ctr], ibf_hashnum);
+ if (NULL == se->stratas[strata_ctr]->strata[i])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to allocate memory for strata estimator\n");
+ for (j = 0; j < i; j++)
+ ibf_destroy (se->stratas[strata_ctr]->strata[i]);
+ GNUNET_free (se);
+ return NULL;
+ }
}
}
return se;
@@ -213,46 +348,71 @@ strata_estimator_create (unsigned int strata_count,
* @param se2 second strata estimator
* @return the estimated difference
*/
-unsigned int
-strata_estimator_difference (const struct StrataEstimator *se1,
- const struct StrataEstimator *se2)
+void
+strata_estimator_difference (const struct MultiStrataEstimator *se1,
+ const struct MultiStrataEstimator *se2)
{
- unsigned int count;
+ int avg_local_diff = 0;
+ int avg_remote_diff = 0;
+ uint8_t number_of_estimators = se1->size;
- GNUNET_assert (se1->strata_count == se2->strata_count);
- count = 0;
- for (int i = se1->strata_count - 1; i >= 0; i--)
+ for (uint8_t strata_ctr = 0; strata_ctr < number_of_estimators; strata_ctr++)
{
- struct InvertibleBloomFilter *diff;
- /* number of keys decoded from the ibf */
-
- /* FIXME: implement this without always allocating new IBFs */
- diff = ibf_dup (se1->strata[i]);
- ibf_subtract (diff,
- se2->strata[i]);
- for (int ibf_count = 0; GNUNET_YES; ibf_count++)
+ GNUNET_assert (se1->stratas[strata_ctr]->strata_count ==
+ se2->stratas[strata_ctr]->strata_count);
+
+
+ for (int i = se1->stratas[strata_ctr]->strata_count - 1; i >= 0; i--)
{
- int more;
+ struct InvertibleBloomFilter *diff;
+ /* number of keys decoded from the ibf */
- more = ibf_decode (diff,
- NULL,
- NULL);
- if (GNUNET_NO == more)
- {
- count += ibf_count;
- break;
- }
- /* Estimate if decoding fails or would not terminate */
- if ( (GNUNET_SYSERR == more) ||
- (ibf_count > diff->size) )
+ /* FIXME: implement this without always allocating new IBFs */
+ diff = ibf_dup (se1->stratas[strata_ctr]->strata[i]);
+ diff->local_decoded_count = 0;
+ diff->remote_decoded_count = 0;
+
+ ibf_subtract (diff, se2->stratas[strata_ctr]->strata[i]);
+
+ for (int ibf_count = 0; GNUNET_YES; ibf_count++)
{
- ibf_destroy (diff);
- return count * (1 << (i + 1));
+ int more;
+
+ more = ibf_decode (diff, NULL, NULL);
+ if (GNUNET_NO == more)
+ {
+ se1->stratas[strata_ctr]->strata[0]->local_decoded_count +=
+ diff->local_decoded_count;
+ se1->stratas[strata_ctr]->strata[0]->remote_decoded_count +=
+ diff->remote_decoded_count;
+ break;
+ }
+ /* Estimate if decoding fails or would not terminate */
+ if ((GNUNET_SYSERR == more) || (ibf_count > diff->size))
+ {
+ se1->stratas[strata_ctr]->strata[0]->local_decoded_count =
+ se1->stratas[strata_ctr]->strata[0]->local_decoded_count * (1 << (i
+ +
+ 1));
+ se1->stratas[strata_ctr]->strata[0]->remote_decoded_count =
+ se1->stratas[strata_ctr]->strata[0]->remote_decoded_count * (1 << (i
+ +
+ 1));
+ ibf_destroy (diff);
+ goto break_all_counting_loops;
+ }
}
+ ibf_destroy (diff);
}
- ibf_destroy (diff);
+break_all_counting_loops:;
+ avg_local_diff += se1->stratas[strata_ctr]->strata[0]->local_decoded_count;
+ avg_remote_diff +=
+ se1->stratas[strata_ctr]->strata[0]->remote_decoded_count;
}
- return count;
+ se1->stratas[0]->strata[0]->local_decoded_count = avg_local_diff
+ / number_of_estimators;
+ se1->stratas[0]->strata[0]->remote_decoded_count = avg_remote_diff
+ / number_of_estimators;
}
@@ -262,18 +422,28 @@ strata_estimator_difference (const struct StrataEstimator *se1,
* @param se the strata estimator to copy
* @return the copy
*/
-struct StrataEstimator *
-strata_estimator_dup (struct StrataEstimator *se)
+struct MultiStrataEstimator *
+strata_estimator_dup (struct MultiStrataEstimator *se)
{
- struct StrataEstimator *c;
-
- c = GNUNET_new (struct StrataEstimator);
- c->strata_count = se->strata_count;
- c->ibf_size = se->ibf_size;
- c->strata = GNUNET_new_array (se->strata_count,
- struct InvertibleBloomFilter *);
- for (unsigned int i = 0; i < se->strata_count; i++)
- c->strata[i] = ibf_dup (se->strata[i]);
+ struct MultiStrataEstimator *c;
+ unsigned int i;
+
+ c = GNUNET_new (struct MultiStrataEstimator);
+ c->stratas = GNUNET_new_array (MULTI_SE_BASE_COUNT,struct StrataEstimator *);
+ for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
+ {
+ c->stratas[strata_ctr] = GNUNET_new (struct StrataEstimator);
+ c->stratas[strata_ctr]->strata_count =
+ se->stratas[strata_ctr]->strata_count;
+ c->stratas[strata_ctr]->ibf_size = se->stratas[strata_ctr]->ibf_size;
+ c->stratas[strata_ctr]->strata = GNUNET_new_array (
+ se->stratas[strata_ctr]->strata_count,
+ struct
+ InvertibleBloomFilter *);
+ for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
+ c->stratas[strata_ctr]->strata[i] = ibf_dup (
+ se->stratas[strata_ctr]->strata[i]);
+ }
return c;
}
@@ -284,10 +454,14 @@ strata_estimator_dup (struct StrataEstimator *se)
* @param se strata estimator to destroy.
*/
void
-strata_estimator_destroy (struct StrataEstimator *se)
+strata_estimator_destroy (struct MultiStrataEstimator *se)
{
- for (unsigned int i = 0; i < se->strata_count; i++)
- ibf_destroy (se->strata[i]);
- GNUNET_free (se->strata);
+ unsigned int i;
+ for (uint8_t strata_ctr = 0; strata_ctr < MULTI_SE_BASE_COUNT; strata_ctr++)
+ {
+ for (i = 0; i < se->stratas[strata_ctr]->strata_count; i++)
+ ibf_destroy (se->stratas[strata_ctr]->strata[i]);
+ GNUNET_free (se->stratas[strata_ctr]->strata);
+ }
GNUNET_free (se);
}
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h
index afdbcdbbf..4871a7fcd 100644
--- a/src/setu/gnunet-service-setu_strata_estimator.h
+++ b/src/setu/gnunet-service-setu_strata_estimator.h
@@ -22,6 +22,7 @@
* @file set/gnunet-service-setu_strata_estimator.h
* @brief estimator of set difference
* @author Florian Dold
+ * @author Elias Summermatter
*/
#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
@@ -61,6 +62,31 @@ struct StrataEstimator
unsigned int ibf_size;
};
+struct MultiStrataEstimator
+{
+ /**
+ * Array of strata estimators
+ */
+ struct StrataEstimator **stratas;
+
+ /**
+ * Number of strata estimators in struct
+ */
+ uint8_t size;
+
+};
+
+/**
+ * Deteminate how many strata estimators in the message are necessary
+ * @param avg_element_size
+ * @param element_count
+ * @return number of strata's
+ */
+
+uint8_t
+determine_strata_count (uint64_t avg_element_size,
+ uint64_t element_count);
+
/**
* Write the given strata estimator to the buffer.
@@ -70,7 +96,9 @@ struct StrataEstimator
* @return number of bytes written to @a buf
*/
size_t
-strata_estimator_write (const struct StrataEstimator *se,
+strata_estimator_write (struct MultiStrataEstimator *se,
+ uint16_t se_ibf_total_size,
+ uint8_t number_se_send,
void *buf);
@@ -88,7 +116,9 @@ int
strata_estimator_read (const void *buf,
size_t buf_len,
int is_compressed,
- struct StrataEstimator *se);
+ uint8_t number_se_received,
+ uint16_t se_ibf_total_size,
+ struct MultiStrataEstimator *se);
/**
@@ -99,7 +129,7 @@ strata_estimator_read (const void *buf,
* @param ibf_hashnum hashnum parameter of each ibf
* @return a freshly allocated, empty strata estimator, NULL on error
*/
-struct StrataEstimator *
+struct MultiStrataEstimator *
strata_estimator_create (unsigned int strata_count,
uint32_t ibf_size,
uint8_t ibf_hashnum);
@@ -111,11 +141,11 @@ strata_estimator_create (unsigned int strata_count,
*
* @param se1 first strata estimator
* @param se2 second strata estimator
- * @return abs(|se1| - |se2|)
+ * @return nothing
*/
-unsigned int
-strata_estimator_difference (const struct StrataEstimator *se1,
- const struct StrataEstimator *se2);
+void
+strata_estimator_difference (const struct MultiStrataEstimator *se1,
+ const struct MultiStrataEstimator *se2);
/**
@@ -125,7 +155,7 @@ strata_estimator_difference (const struct StrataEstimator *se1,
* @param key key to add
*/
void
-strata_estimator_insert (struct StrataEstimator *se,
+strata_estimator_insert (struct MultiStrataEstimator *se,
struct IBF_Key key);
@@ -136,7 +166,7 @@ strata_estimator_insert (struct StrataEstimator *se,
* @param key key to remove
*/
void
-strata_estimator_remove (struct StrataEstimator *se,
+strata_estimator_remove (struct MultiStrataEstimator *se,
struct IBF_Key key);
@@ -146,7 +176,7 @@ strata_estimator_remove (struct StrataEstimator *se,
* @param se strata estimator to destroy.
*/
void
-strata_estimator_destroy (struct StrataEstimator *se);
+strata_estimator_destroy (struct MultiStrataEstimator *se);
/**
@@ -155,8 +185,8 @@ strata_estimator_destroy (struct StrataEstimator *se);
* @param se the strata estimator to copy
* @return the copy
*/
-struct StrataEstimator *
-strata_estimator_dup (struct StrataEstimator *se);
+struct MultiStrataEstimator *
+strata_estimator_dup (struct MultiStrataEstimator *se);
#if 0 /* keep Emacsens' auto-indent happy */
diff --git a/src/setu/ibf.c b/src/setu/ibf.c
index 1beba9065..8f29adb62 100644
--- a/src/setu/ibf.c
+++ b/src/setu/ibf.c
@@ -20,11 +20,15 @@
/**
* @file set/ibf.c
- * @brief implementation of the invertible Bloom filter
+ * @brief implementation of the invertible bloom filter
* @author Florian Dold
+ * @author Elias Summermatter
*/
#include "ibf.h"
+#include "gnunet_util_lib.h"
+#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
+
/**
* Compute the key's hash from the key.
@@ -58,11 +62,12 @@ ibf_hashcode_from_key (struct IBF_Key key,
struct GNUNET_HashCode *dst)
{
struct IBF_Key *p;
+ unsigned int i;
const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode)
/ sizeof(struct IBF_Key);
p = (struct IBF_Key *) dst;
- for (unsigned int i = 0; i < keys_per_hashcode; i++)
+ for (i = 0; i < keys_per_hashcode; i++)
*p++ = key;
}
@@ -75,14 +80,14 @@ ibf_hashcode_from_key (struct IBF_Key key,
* @return the newly created invertible bloom filter, NULL on error
*/
struct InvertibleBloomFilter *
-ibf_create (uint32_t size,
- uint8_t hash_num)
+ibf_create (uint32_t size, uint8_t hash_num)
{
struct InvertibleBloomFilter *ibf;
GNUNET_assert (0 != size);
+
ibf = GNUNET_new (struct InvertibleBloomFilter);
- ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t));
+ ibf->count = GNUNET_malloc_large (size * sizeof(uint64_t));
if (NULL == ibf->count)
{
GNUNET_free (ibf);
@@ -105,6 +110,7 @@ ibf_create (uint32_t size,
}
ibf->size = size;
ibf->hash_num = hash_num;
+
return ibf;
}
@@ -121,8 +127,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
uint32_t i;
uint32_t bucket;
- bucket = GNUNET_CRYPTO_crc32_n (&key,
- sizeof (key));
+ bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key);
for (i = 0, filled = 0; filled < ibf->hash_num; i++)
{
uint64_t x;
@@ -133,8 +138,7 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
dst[filled++] = bucket % ibf->size;
try_next:
x = ((uint64_t) bucket << 32) | i;
- bucket = GNUNET_CRYPTO_crc32_n (&x,
- sizeof (x));
+ bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
}
}
@@ -170,13 +174,8 @@ ibf_insert (struct InvertibleBloomFilter *ibf,
int buckets[ibf->hash_num];
GNUNET_assert (ibf->hash_num <= ibf->size);
- ibf_get_indices (ibf,
- key,
- buckets);
- ibf_insert_into (ibf,
- key,
- buckets,
- 1);
+ ibf_get_indices (ibf, key, buckets);
+ ibf_insert_into (ibf, key, buckets, 1);
}
@@ -193,13 +192,8 @@ ibf_remove (struct InvertibleBloomFilter *ibf,
int buckets[ibf->hash_num];
GNUNET_assert (ibf->hash_num <= ibf->size);
- ibf_get_indices (ibf,
- key,
- buckets);
- ibf_insert_into (ibf,
- key,
- buckets,
- -1);
+ ibf_get_indices (ibf, key, buckets);
+ ibf_insert_into (ibf, key, buckets, -1);
}
@@ -244,6 +238,8 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
for (uint32_t i = 0; i < ibf->size; i++)
{
+ int hit;
+
/* we can only decode from pure buckets */
if ( (1 != ibf->count[i].count_val) &&
(-1 != ibf->count[i].count_val) )
@@ -257,30 +253,33 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
/* test if key in bucket hits its own location,
* if not, the key hash was subject to collision */
- {
- bool hit = false;
+ hit = GNUNET_NO;
+ ibf_get_indices (ibf, ibf->key_sum[i], buckets);
+ for (int j = 0; j < ibf->hash_num; j++)
+ if (buckets[j] == i)
+ hit = GNUNET_YES;
- ibf_get_indices (ibf,
- ibf->key_sum[i],
- buckets);
- for (int j = 0; j < ibf->hash_num; j++)
- if (buckets[j] == i)
- {
- hit = true;
- break;
- }
- if (! hit)
- continue;
+ if (GNUNET_NO == hit)
+ continue;
+
+ if (1 == ibf->count[i].count_val)
+ {
+ ibf->remote_decoded_count++;
}
+ else
+ {
+ ibf->local_decoded_count++;
+ }
+
+
if (NULL != ret_side)
*ret_side = ibf->count[i].count_val;
if (NULL != ret_id)
*ret_id = ibf->key_sum[i];
/* insert on the opposite side, effectively removing the element */
- ibf_insert_into (ibf,
- ibf->key_sum[i], buckets,
- -ibf->count[i].count_val);
+ ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
+
return GNUNET_YES;
}
@@ -291,6 +290,26 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
/**
+ * Returns the minimal bytes needed to store the counter of the IBF
+ *
+ * @param ibf the IBF
+ */
+uint8_t
+ibf_get_max_counter (struct InvertibleBloomFilter *ibf)
+{
+ long long max_counter = 0;
+ for (uint64_t i = 0; i < ibf->size; i++)
+ {
+ if (ibf->count[i].count_val > max_counter)
+ {
+ max_counter = ibf->count[i].count_val;
+ }
+ }
+ return 64 - __builtin_clzll (max_counter);
+}
+
+
+/**
* Write buckets from an ibf to a buffer.
* Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
*
@@ -298,16 +317,17 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
* @param start with which bucket to start
* @param count how many buckets to write
* @param buf buffer to write the data to
+ * @param max bit length of a counter for unpacking
*/
void
ibf_write_slice (const struct InvertibleBloomFilter *ibf,
uint32_t start,
- uint32_t count,
- void *buf)
+ uint64_t count,
+ void *buf,
+ uint8_t counter_max_length)
{
struct IBF_Key *key_dst;
struct IBF_KeyHash *key_hash_dst;
- struct IBF_Count *count_dst;
GNUNET_assert (start + count <= ibf->size);
@@ -315,19 +335,182 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
key_dst = (struct IBF_Key *) buf;
GNUNET_memcpy (key_dst,
ibf->key_sum + start,
- count * sizeof *key_dst);
+ count * sizeof(*key_dst));
key_dst += count;
/* copy key hashes */
key_hash_dst = (struct IBF_KeyHash *) key_dst;
GNUNET_memcpy (key_hash_dst,
ibf->key_hash_sum + start,
- count * sizeof *key_hash_dst);
+ count * sizeof(*key_hash_dst));
key_hash_dst += count;
- /* copy counts */
- count_dst = (struct IBF_Count *) key_hash_dst;
- GNUNET_memcpy (count_dst,
- ibf->count + start,
- count * sizeof *count_dst);
+
+ /* pack and copy counter */
+ pack_counter (ibf,
+ start,
+ count,
+ (uint8_t *) key_hash_dst,
+ counter_max_length);
+
+
+}
+
+
+/**
+ * Packs the counter to transmit only the smallest possible amount of bytes and
+ * preventing overflow of the counter
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ * @param max bit length of a counter for unpacking
+ */
+
+void
+pack_counter (const struct InvertibleBloomFilter *ibf,
+ uint32_t start,
+ uint64_t count,
+ uint8_t *buf,
+ uint8_t counter_max_length)
+{
+ uint8_t store_size = 0;
+ uint8_t store = 0;
+ uint16_t byte_ctr = 0;
+
+ /**
+ * Iterate over IBF bucket
+ */
+ for (uint64_t i = start; i< (count + start);)
+ {
+ uint64_t count_val_to_write = ibf->count[i].count_val;
+ uint8_t count_len_to_write = counter_max_length;
+
+ /**
+ * Pack and compose counters to byte values
+ */
+ while ((count_len_to_write + store_size) >= 8)
+ {
+ uint8_t bit_shift = 0;
+
+ /**
+ * Shift bits if more than a byte has to be written
+ * or the store size is not empty
+ */
+ if ((store_size > 0) || (count_len_to_write > 8))
+ {
+ uint8_t bit_unused = 8 - store_size;
+ bit_shift = count_len_to_write - bit_unused;
+ store = store << bit_unused;
+ }
+
+ buf[byte_ctr] = ((count_val_to_write >> bit_shift) | store) & 0xFF;
+ byte_ctr++;
+ count_len_to_write -= (8 - store_size);
+ count_val_to_write = count_val_to_write & ((1ULL <<
+ count_len_to_write) - 1);
+ store = 0;
+ store_size = 0;
+ }
+ store = (store << count_len_to_write) | count_val_to_write;
+ store_size = store_size + count_len_to_write;
+ count_len_to_write = 0;
+ i++;
+ }
+
+ /**
+ * Pack data left in story before finishing
+ */
+ if (store_size > 0)
+ {
+ buf[byte_ctr] = store << (8 - store_size);
+ byte_ctr++;
+ }
+
+}
+
+
+/**
+ * Unpacks the counter to transmit only the smallest possible amount of bytes and
+ * preventing overflow of the counter
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ * @param max bit length of a counter for unpacking
+ */
+
+void
+unpack_counter (const struct InvertibleBloomFilter *ibf,
+ uint32_t start,
+ uint64_t count,
+ uint8_t *buf,
+ uint8_t counter_max_length)
+{
+ uint64_t ibf_counter_ctr = 0;
+ uint64_t store = 0;
+ uint64_t store_bit_ctr = 0;
+ uint64_t byte_ctr = 0;
+
+ /**
+ * Iterate over received bytes
+ */
+ while (true)
+ {
+ uint8_t byte_read = buf[byte_ctr];
+ uint8_t bit_to_read_left = 8;
+ byte_ctr++;
+
+ /**
+ * Pack data left in story before finishing
+ */
+ while (bit_to_read_left >= 0)
+ {
+ /**
+ * Stop decoding when end is reached
+ */
+ if (ibf_counter_ctr > (count - 1))
+ return;
+
+ /*
+ * Unpack the counter
+ */
+ if ((store_bit_ctr + bit_to_read_left) >= counter_max_length)
+ {
+ uint8_t bytes_used = counter_max_length - store_bit_ctr;
+ if (store_bit_ctr > 0)
+ {
+ store = store << bytes_used;
+ }
+
+ uint8_t bytes_to_shift = bit_to_read_left - bytes_used;
+ uint64_t counter_part = byte_read >> bytes_to_shift;
+ store = store | counter_part;
+ ibf->count[ibf_counter_ctr + start].count_val = store;
+ byte_read = byte_read & ((1 << bytes_to_shift) - 1);
+ bit_to_read_left -= bytes_used;
+ ibf_counter_ctr++;
+ store = 0;
+ store_bit_ctr = 0;
+ }
+ else
+ {
+ store_bit_ctr += bit_to_read_left;
+ if (0 == store)
+ {
+ store = byte_read;
+ }
+ else
+ {
+ store = store << bit_to_read_left;
+ store = store | byte_read;
+ }
+ break;
+
+ }
+
+ }
+
+ }
+
}
@@ -338,12 +521,14 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
* @param start which bucket to start at
* @param count how many buckets to read
* @param ibf the ibf to read from
+ * @param max bit length of a counter for unpacking
*/
void
ibf_read_slice (const void *buf,
uint32_t start,
- uint32_t count,
- struct InvertibleBloomFilter *ibf)
+ uint64_t count,
+ struct InvertibleBloomFilter *ibf,
+ uint8_t counter_max_length)
{
struct IBF_Key *key_src;
struct IBF_KeyHash *key_hash_src;
@@ -364,11 +549,10 @@ ibf_read_slice (const void *buf,
key_hash_src,
count * sizeof *key_hash_src);
key_hash_src += count;
- /* copy counts */
+
+ /* copy and unpack counts */
count_src = (struct IBF_Count *) key_hash_src;
- GNUNET_memcpy (ibf->count + start,
- count_src,
- count * sizeof *count_src);
+ unpack_counter (ibf,start,count,(uint8_t *) count_src,counter_max_length);
}
diff --git a/src/setu/ibf.h b/src/setu/ibf.h
index 7c2ab33b1..5628405dc 100644
--- a/src/setu/ibf.h
+++ b/src/setu/ibf.h
@@ -22,6 +22,7 @@
* @file set/ibf.h
* @brief invertible bloom filter
* @author Florian Dold
+ * @author Elias Summermatter
*/
#ifndef GNUNET_CONSENSUS_IBF_H
@@ -62,7 +63,7 @@ struct IBF_KeyHash
*/
struct IBF_Count
{
- int8_t count_val;
+ int64_t count_val;
};
@@ -93,6 +94,20 @@ struct InvertibleBloomFilter
uint8_t hash_num;
/**
+ * If an IBF is decoded this count stores how many
+ * elements are on the local site. This is used
+ * to estimate the set difference on a site
+ */
+ int local_decoded_count;
+
+ /**
+ * If an IBF is decoded this count stores how many
+ * elements are on the remote site. This is used
+ * to estimate the set difference on a site
+ */
+ int remote_decoded_count;
+
+ /**
* Xor sums of the elements' keys, used to identify the elements.
* Array of 'size' elements.
*/
@@ -125,8 +140,9 @@ struct InvertibleBloomFilter
void
ibf_write_slice (const struct InvertibleBloomFilter *ibf,
uint32_t start,
- uint32_t count,
- void *buf);
+ uint64_t count,
+ void *buf,
+ uint8_t counter_max_length);
/**
@@ -140,8 +156,9 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf,
void
ibf_read_slice (const void *buf,
uint32_t start,
- uint32_t count,
- struct InvertibleBloomFilter *ibf);
+ uint64_t count,
+ struct InvertibleBloomFilter *ibf,
+ uint8_t counter_max_length);
/**
@@ -244,6 +261,44 @@ ibf_dup (const struct InvertibleBloomFilter *ibf);
void
ibf_destroy (struct InvertibleBloomFilter *ibf);
+uint8_t
+ibf_get_max_counter (struct InvertibleBloomFilter *ibf);
+
+
+/**
+ * Packs the counter to transmit only the smallest possible amount of bytes and
+ * preventing overflow of the counter
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ * @param max bit length of a counter for unpacking
+ */
+
+void
+pack_counter (const struct InvertibleBloomFilter *ibf,
+ uint32_t start,
+ uint64_t count,
+ uint8_t *buf,
+ uint8_t counter_max_length);
+
+/**
+ * Unpacks the counter to transmit only the smallest possible amount of bytes and
+ * preventing overflow of the counter
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ * @param max bit length of a counter for unpacking
+ */
+
+void
+unpack_counter (const struct InvertibleBloomFilter *ibf,
+ uint32_t start,
+ uint64_t count,
+ uint8_t *buf,
+ uint8_t counter_max_length);
+
#if 0 /* keep Emacsens' auto-indent happy */
{
diff --git a/src/setu/perf_setu_api.c b/src/setu/perf_setu_api.c
index b273f9c71..af84994f8 100644
--- a/src/setu/perf_setu_api.c
+++ b/src/setu/perf_setu_api.c
@@ -22,11 +22,14 @@
* @file set/test_setu_api.c
* @brief testcase for setu_api.c
* @author Florian Dold
+ * @author Elias Summermatter
*/
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_testing_lib.h"
#include "gnunet_setu_service.h"
+#include <sys/sysinfo.h>
+#include <pthread.h>
static struct GNUNET_PeerIdentity local_id;
@@ -50,6 +53,12 @@ static int ret;
static struct GNUNET_SCHEDULER_Task *tt;
+/**
+ * Handles configuration file for setu performance test
+ *
+ */
+static struct GNUNET_CONFIGURATION_Handle *setu_cfg;
+
static void
result_cb_set1 (void *cls,
@@ -57,44 +66,44 @@ result_cb_set1 (void *cls,
uint64_t size,
enum GNUNET_SETU_Status status)
{
- switch (status)
+ switch (status)
+ {
+ case GNUNET_SETU_STATUS_ADD_LOCAL:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
+ break;
+
+ case GNUNET_SETU_STATUS_FAILURE:
+ GNUNET_break (0);
+ oh1 = NULL;
+ fprintf (stderr, "set 1: received failure status!\n");
+ ret = 1;
+ if (NULL != tt)
+ {
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ }
+ GNUNET_SCHEDULER_shutdown ();
+ break;
+
+ case GNUNET_SETU_STATUS_DONE:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
+ oh1 = NULL;
+ if (NULL != set1)
{
- case GNUNET_SETU_STATUS_ADD_LOCAL:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
- break;
-
- case GNUNET_SETU_STATUS_FAILURE:
- GNUNET_break (0);
- oh1 = NULL;
- fprintf (stderr, "set 1: received failure status!\n");
- ret = 1;
- if (NULL != tt)
- {
- GNUNET_SCHEDULER_cancel (tt);
- tt = NULL;
- }
- GNUNET_SCHEDULER_shutdown ();
- break;
-
- case GNUNET_SETU_STATUS_DONE:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
- oh1 = NULL;
- if (NULL != set1)
- {
- GNUNET_SETU_destroy (set1);
- set1 = NULL;
- }
- if (NULL == set2)
- {
- GNUNET_SCHEDULER_cancel (tt);
- tt = NULL;
- GNUNET_SCHEDULER_shutdown ();
- }
- break;
-
- default:
- GNUNET_assert (0);
+ GNUNET_SETU_destroy (set1);
+ set1 = NULL;
}
+ if (NULL == set2)
+ {
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ GNUNET_SCHEDULER_shutdown ();
+ }
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
}
@@ -104,36 +113,36 @@ result_cb_set2 (void *cls,
uint64_t size,
enum GNUNET_SETU_Status status)
{
- switch (status)
+ switch (status)
+ {
+ case GNUNET_SETU_STATUS_ADD_LOCAL:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
+ break;
+
+ case GNUNET_SETU_STATUS_FAILURE:
+ GNUNET_break (0);
+ oh2 = NULL;
+ fprintf (stderr, "set 2: received failure status\n");
+ GNUNET_SCHEDULER_shutdown ();
+ ret = 1;
+ break;
+
+ case GNUNET_SETU_STATUS_DONE:
+ oh2 = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
+ GNUNET_SETU_destroy (set2);
+ set2 = NULL;
+ if (NULL == set1)
{
- case GNUNET_SETU_STATUS_ADD_LOCAL:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
- break;
-
- case GNUNET_SETU_STATUS_FAILURE:
- GNUNET_break (0);
- oh2 = NULL;
- fprintf (stderr, "set 2: received failure status\n");
- GNUNET_SCHEDULER_shutdown ();
- ret = 1;
- break;
-
- case GNUNET_SETU_STATUS_DONE:
- oh2 = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
- GNUNET_SETU_destroy (set2);
- set2 = NULL;
- if (NULL == set1)
- {
- GNUNET_SCHEDULER_cancel (tt);
- tt = NULL;
- GNUNET_SCHEDULER_shutdown ();
- }
- break;
-
- default:
- GNUNET_assert (0);
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ GNUNET_SCHEDULER_shutdown ();
}
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
}
@@ -143,14 +152,14 @@ listen_cb (void *cls,
const struct GNUNET_MessageHeader *context_msg,
struct GNUNET_SETU_Request *request)
{
- GNUNET_assert (NULL != context_msg);
- GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
- oh2 = GNUNET_SETU_accept (request,
- (struct GNUNET_SETU_Option[]){ 0 },
- &result_cb_set2,
- NULL);
- GNUNET_SETU_commit (oh2, set2);
+ GNUNET_assert (NULL != context_msg);
+ GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
+ oh2 = GNUNET_SETU_accept (request,
+ (struct GNUNET_SETU_Option[]){ 0 },
+ &result_cb_set2,
+ NULL);
+ GNUNET_SETU_commit (oh2, set2);
}
@@ -162,122 +171,89 @@ listen_cb (void *cls,
static void
start (void *cls)
{
- struct GNUNET_MessageHeader context_msg;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
- context_msg.size = htons (sizeof context_msg);
- context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
- listen_handle = GNUNET_SETU_listen (config,
- &app_id,
- &listen_cb,
- NULL);
- oh1 = GNUNET_SETU_prepare (&local_id,
- &app_id,
- &context_msg,
- (struct GNUNET_SETU_Option[]){ 0 },
- &result_cb_set1,
- NULL);
- GNUNET_SETU_commit (oh1, set1);
+ struct GNUNET_MessageHeader context_msg;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
+ context_msg.size = htons (sizeof context_msg);
+ context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
+ listen_handle = GNUNET_SETU_listen (config,
+ &app_id,
+ &listen_cb,
+ NULL);
+ oh1 = GNUNET_SETU_prepare (&local_id,
+ &app_id,
+ &context_msg,
+ (struct GNUNET_SETU_Option[]){ 0 },
+ &result_cb_set1,
+ NULL);
+ GNUNET_SETU_commit (oh1, set1);
}
/**
- * Initialize the second set, continue
- *
- * @param cls closure, unused
- */
-static void
-init_set2 (void *cls)
-{
- struct GNUNET_SETU_Element element;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
-
- element.element_type = 0;
- element.data = "hello1";
- element.size = strlen (element.data);
- GNUNET_SETU_add_element (set2, &element, NULL, NULL);
- element.data = "quux";
- element.size = strlen (element.data);
- GNUNET_SETU_add_element (set2, &element, NULL, NULL);
- element.data = "baz";
- element.size = strlen (element.data);
- GNUNET_SETU_add_element (set2, &element, &start, NULL);
-}
-
-/**
* Generate random byte stream
*/
-unsigned char *gen_rdm_bytestream (size_t num_bytes)
+unsigned char *
+gen_rdm_bytestream (size_t num_bytes)
{
- unsigned char *stream = GNUNET_malloc (num_bytes);
- GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes);
- return stream;
+ unsigned char *stream = GNUNET_malloc (num_bytes);
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes);
+ return stream;
}
+
/**
* Generate random sets
*/
static void
-initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes)
+initRandomSets (int overlap, int set1_size, int set2_size, int
+ element_size_in_bytes)
{
- struct GNUNET_SETU_Element element;
- element.element_type = 0;
-
- // Add elements to both sets
- for (int i = 0; i < overlap; i++) {
- element.data = gen_rdm_bytestream(element_size_in_bytes);
- element.size = element_size_in_bytes;
- GNUNET_SETU_add_element (set1, &element, NULL, NULL);
- GNUNET_SETU_add_element (set2, &element, NULL, NULL);
- set1_size--;
- set2_size--;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n");
-
- // Add other elements to set 1
- while(set1_size>0) {
- element.data = gen_rdm_bytestream(element_size_in_bytes);
- element.size = element_size_in_bytes;
- GNUNET_SETU_add_element (set1, &element, NULL, NULL);
- set1_size--;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n");
-
- // Add other elements to set 2
- while(set2_size > 0) {
- element.data = gen_rdm_bytestream(element_size_in_bytes);
- element.size = element_size_in_bytes;
+ struct GNUNET_SETU_Element element;
+ element.element_type = 0;
+
+ // Add elements to both sets
+ for (int i = 0; i < overlap; i++)
+ {
+ element.data = gen_rdm_bytestream (element_size_in_bytes);
+ element.size = element_size_in_bytes;
+ GNUNET_SETU_add_element (set1, &element, NULL, NULL);
+ GNUNET_SETU_add_element (set2, &element, NULL, NULL);
+ set1_size--;
+ set2_size--;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n");
+
+ // Add other elements to set 1
+ while (set1_size>0)
+ {
+ element.data = gen_rdm_bytestream (element_size_in_bytes);
+ element.size = element_size_in_bytes;
+ GNUNET_SETU_add_element (set1, &element, NULL, NULL);
+ set1_size--;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n");
- if(set2_size != 1) {
- GNUNET_SETU_add_element (set2, &element,NULL, NULL);
- } else {
- GNUNET_SETU_add_element (set2, &element,&start, NULL);
- }
+ // Add other elements to set 2
+ while (set2_size > 0)
+ {
+ element.data = gen_rdm_bytestream (element_size_in_bytes);
+ element.size = element_size_in_bytes;
- set2_size--;
+ if (set2_size != 1)
+ {
+ GNUNET_SETU_add_element (set2, &element,NULL, NULL);
+ }
+ else
+ {
+ GNUNET_SETU_add_element (set2, &element,&start, NULL);
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
-}
-
-/**
- * Initialize the first set, continue.
- */
-static void
-init_set1 (void)
-{
- struct GNUNET_SETU_Element element;
- element.element_type = 0;
- element.data = "hello";
- element.size = strlen (element.data);
- GNUNET_SETU_add_element (set1, &element, NULL, NULL);
- element.data = "bar";
- element.size = strlen (element.data);
- GNUNET_SETU_add_element (set1, &element, &init_set2, NULL);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
+ set2_size--;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
}
@@ -289,10 +265,10 @@ init_set1 (void)
static void
timeout_fail (void *cls)
{
- tt = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
- GNUNET_SCHEDULER_shutdown ();
- ret = 1;
+ tt = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
+ GNUNET_SCHEDULER_shutdown ();
+ ret = 1;
}
@@ -304,36 +280,36 @@ timeout_fail (void *cls)
static void
do_shutdown (void *cls)
{
- if (NULL != tt)
- {
- GNUNET_SCHEDULER_cancel (tt);
- tt = NULL;
- }
- if (NULL != oh1)
- {
- GNUNET_SETU_operation_cancel (oh1);
- oh1 = NULL;
- }
- if (NULL != oh2)
- {
- GNUNET_SETU_operation_cancel (oh2);
- oh2 = NULL;
- }
- if (NULL != set1)
- {
- GNUNET_SETU_destroy (set1);
- set1 = NULL;
- }
- if (NULL != set2)
- {
- GNUNET_SETU_destroy (set2);
- set2 = NULL;
- }
- if (NULL != listen_handle)
- {
- GNUNET_SETU_listen_cancel (listen_handle);
- listen_handle = NULL;
- }
+ if (NULL != tt)
+ {
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ }
+ if (NULL != oh1)
+ {
+ GNUNET_SETU_operation_cancel (oh1);
+ oh1 = NULL;
+ }
+ if (NULL != oh2)
+ {
+ GNUNET_SETU_operation_cancel (oh2);
+ oh2 = NULL;
+ }
+ if (NULL != set1)
+ {
+ GNUNET_SETU_destroy (set1);
+ set1 = NULL;
+ }
+ if (NULL != set2)
+ {
+ GNUNET_SETU_destroy (set2);
+ set2 = NULL;
+ }
+ if (NULL != listen_handle)
+ {
+ GNUNET_SETU_listen_cancel (listen_handle);
+ listen_handle = NULL;
+ }
}
@@ -350,79 +326,148 @@ run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_TESTING_Peer *peer)
{
- struct GNUNET_SETU_OperationHandle *my_oh;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Running preparatory tests\n");
- tt = GNUNET_SCHEDULER_add_delayed (
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
- &timeout_fail,
- NULL);
- GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
-
- config = cfg;
- GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
- &local_id));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "my id (from CRYPTO): %s\n",
- GNUNET_i2s (&local_id));
- GNUNET_TESTING_peer_get_identity (peer,
- &local_id);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "my id (from TESTING): %s\n",
- GNUNET_i2s (&local_id));
- set1 = GNUNET_SETU_create (cfg);
- set2 = GNUNET_SETU_create (cfg);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Created sets %p and %p for union operation\n",
- set1,
- set2);
- GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
-
- /* test if canceling an uncommitted request works! */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Launching and instantly stopping set operation\n");
- my_oh = GNUNET_SETU_prepare (&local_id,
- &app_id,
- NULL,
- (struct GNUNET_SETU_Option[]){ 0 },
- NULL,
- NULL);
- GNUNET_SETU_operation_cancel (my_oh);
-
- /* test the real set reconciliation */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Running real set-reconciliation\n");
- //init_set1 ();
- // limit ~23800 element total
- initRandomSets(50,100,100,128);
+ struct GNUNET_SETU_OperationHandle *my_oh;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Running preparatory tests\n");
+ tt = GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
+ &timeout_fail,
+ NULL);
+ GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
+
+ config = cfg;
+ GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
+ &local_id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "my id (from CRYPTO): %s\n",
+ GNUNET_i2s (&local_id));
+ GNUNET_TESTING_peer_get_identity (peer,
+ &local_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "my id (from TESTING): %s\n",
+ GNUNET_i2s (&local_id));
+ set1 = GNUNET_SETU_create (cfg);
+ set2 = GNUNET_SETU_create (cfg);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Created sets %p and %p for union operation\n",
+ set1,
+ set2);
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
+
+ /* test if canceling an uncommited request works! */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Launching and instantly stopping set operation\n");
+ my_oh = GNUNET_SETU_prepare (&local_id,
+ &app_id,
+ NULL,
+ (struct GNUNET_SETU_Option[]){ 0 },
+ NULL,
+ NULL);
+ GNUNET_SETU_operation_cancel (my_oh);
+
+ /* test the real set reconciliation */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Running real set-reconciliation\n");
+ // init_set1 ();
+ // limit ~23800 element total
+ initRandomSets (490, 500,500,32);
}
-static void execute_perf()
+
+void
+perf_thread ()
{
- for( int repeat_ctr = 0; repeat_ctr<1; repeat_ctr++ ) {
+ GNUNET_TESTING_service_run ("perf_setu_api",
+ "arm",
+ "test_setu.conf",
+ &run,
+ NULL);
+
+}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Executing perf round %d\n", repeat_ctr);
- GNUNET_TESTING_service_run ("perf_setu_api",
- "arm",
- "test_setu.conf",
- &run,
- NULL);
+static void
+run_petf_thread (int total_runs)
+{
+ int core_count = get_nprocs_conf ();
+ pid_t child_pid, wpid;
+ int status = 0;
+
+// Father code (before child processes start)
+ for (int processed = 0; processed < total_runs;)
+ {
+ for (int id = 0; id < core_count; id++)
+ {
+ if (processed >= total_runs)
+ break;
+
+ if ((child_pid = fork ()) == 0)
+ {
+ perf_thread ();
+ exit (0);
+ }
+ processed += 1;
}
- return 0;
+ while ((wpid = wait (&status)) > 0)
+ ;
+
+ }
}
+static void
+execute_perf ()
+{
+
+ /**
+ * Erase statfile
+ */
+ remove ("perf_stats.csv");
+ remove ("perf_failure_bucket_number_factor.csv");
+ for (int out_out_ctr = 3; out_out_ctr <= 3; out_out_ctr++)
+ {
+
+ for (int out_ctr = 20; out_ctr <= 20; out_ctr++)
+ {
+ float base = 0.1;
+ float x = out_ctr * base;
+ char factor[10];
+ char *buffer = gcvt (x, 4, factor);
+ setu_cfg = GNUNET_CONFIGURATION_create ();
+ GNUNET_CONFIGURATION_set_value_string (setu_cfg, "IBF",
+ "BUCKET_NUMBER_FACTOR",
+ buffer); // Factor default=4
+ GNUNET_CONFIGURATION_set_value_number (setu_cfg, "IBF",
+ "NUMBER_PER_BUCKET", 3); // K default=4
+ GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE",
+ "TRADEOFF", "2"); // default=0.25
+ GNUNET_CONFIGURATION_set_value_string (setu_cfg, "PERFORMANCE",
+ "MAX_SET_DIFF_FACTOR_DIFFERENTIAL",
+ "20000"); // default=0.25
+ GNUNET_CONFIGURATION_set_value_number (setu_cfg, "BOUNDARIES",
+ "UPPER_ELEMENT", 5000);
+
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_write (setu_cfg, "perf_setu.conf"))
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_ERROR,
+ _ ("Failed to write subsystem default identifier map'.\n"));
+ run_petf_thread (100);
+ }
+
+ }
+ return;
+}
+
int
main (int argc, char **argv)
{
- GNUNET_log_setup ("perf_setu_api",
- "WARNING",
- NULL);
- execute_perf();
- return 0;
+ GNUNET_log_setup ("perf_setu_api",
+ "WARNING",
+ NULL);
+ execute_perf ();
+ return 0;
}
diff --git a/src/setu/setu.h b/src/setu/setu.h
index 7c2a98a02..7b606f12c 100644
--- a/src/setu/setu.h
+++ b/src/setu/setu.h
@@ -122,6 +122,31 @@ struct GNUNET_SETU_AcceptMessage
*/
uint32_t byzantine_lower_bound;
+
+ /**
+ * Upper bound for the set size, used only when
+ * byzantine mode is enabled.
+ */
+ uint64_t byzantine_upper_bond;
+
+ /**
+ * Bandwidth latency tradeoff determines how much bytes a single RTT is
+ * worth, which is a performance setting
+ */
+ uint64_t bandwidth_latency_tradeoff;
+
+ /**
+ * The factor determines the number of buckets an IBF has which is
+ * multiplied by the estimated setsize default: 2
+ */
+ uint64_t ibf_bucket_number_factor;
+
+ /**
+ * This setting determines to how many IBF buckets an single elements
+ * is mapped to.
+ */
+ uint64_t ibf_number_of_buckets_per_element;
+
};
@@ -226,6 +251,30 @@ struct GNUNET_SETU_EvaluateMessage
*/
uint32_t byzantine_lower_bound;
+ /**
+ * Upper bound for the set size, used only when
+ * byzantine mode is enabled.
+ */
+ uint64_t byzantine_upper_bond;
+
+ /**
+ * Bandwidth latency tradeoff determines how much bytes a single RTT is
+ * worth, which is a performance setting
+ */
+ uint64_t bandwidth_latency_tradeoff;
+
+ /**
+ * The factor determines the number of buckets an IBF has which is
+ * multiplied by the estimated setsize default: 2
+ */
+ uint64_t ibf_bucket_number_factor;
+
+ /**
+ * This setting determines to how many IBF buckets an single elements
+ * is mapped to.
+ */
+ uint64_t ibf_number_of_buckets_per_element;
+
/* rest: context message, that is, application-specific
message to convince listener to pick up */
};
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c
index 0a09b18b2..faa57aaba 100644
--- a/src/setu/setu_api.c
+++ b/src/setu/setu_api.c
@@ -22,6 +22,7 @@
* @brief api for the set union service
* @author Florian Dold
* @author Christian Grothoff
+ * @author Elias Summermatter
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -526,6 +527,14 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
context_msg);
msg->app_id = *app_id;
msg->target_peer = *other_peer;
+
+ /* Set default values */
+ msg->byzantine_upper_bond = UINT64_MAX;
+ msg->bandwidth_latency_tradeoff = 0;
+ msg->ibf_bucket_number_factor = 2;
+ msg->ibf_number_of_buckets_per_element = 3;
+
+
for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
{
switch (opt->type)
@@ -534,6 +543,18 @@ GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
msg->byzantine = GNUNET_YES;
msg->byzantine_lower_bound = htonl (opt->v.num);
break;
+ case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND:
+ msg->byzantine_upper_bond = htonl (opt->v.num);
+ break;
+ case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF:
+ msg->bandwidth_latency_tradeoff = htonl (opt->v.num);
+ break;
+ case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR:
+ msg->ibf_bucket_number_factor = htonl (opt->v.num);
+ break;
+ case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT:
+ msg->ibf_number_of_buckets_per_element = htonl (opt->v.num);
+ break;
case GNUNET_SETU_OPTION_FORCE_FULL:
msg->force_full = GNUNET_YES;
break;
@@ -788,6 +809,13 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
mqm = GNUNET_MQ_msg (msg,
GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
msg->accept_reject_id = htonl (request->accept_id);
+
+ /* Set default values */
+ msg->byzantine_upper_bond = UINT64_MAX;
+ msg->bandwidth_latency_tradeoff = 0;
+ msg->ibf_bucket_number_factor = 2;
+ msg->ibf_number_of_buckets_per_element = 3;
+
for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
{
switch (opt->type)
@@ -796,6 +824,18 @@ GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
msg->byzantine = GNUNET_YES;
msg->byzantine_lower_bound = htonl (opt->v.num);
break;
+ case GNUNET_SETU_OPTION_CUSTOM_BYZANTINE_UPPER_BOUND:
+ msg->byzantine_upper_bond = htonl (opt->v.num);
+ break;
+ case GNUNET_SETU_OPTION_CUSTOM_BANDWIDTH_LATENCY_TRADEOFF:
+ msg->bandwidth_latency_tradeoff = htonl (opt->v.num);
+ break;
+ case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKET_NUMBER_FACTOR:
+ msg->ibf_bucket_number_factor = htonl (opt->v.num);
+ break;
+ case GNUNET_SETU_OPTION_CUSTOM_IBF_BUCKETS_PER_ELEMENT:
+ msg->ibf_number_of_buckets_per_element = htonl (opt->v.num);
+ break;
case GNUNET_SETU_OPTION_FORCE_FULL:
msg->force_full = GNUNET_YES;
break;
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c
index 2fb7d015e..5a0c9d70d 100644
--- a/src/setu/test_setu_api.c
+++ b/src/setu/test_setu_api.c
@@ -204,62 +204,6 @@ init_set2 (void *cls)
GNUNET_SETU_add_element (set2, &element, &start, NULL);
}
-/**
- * Generate random byte stream
- */
-
-unsigned char *gen_rdm_bytestream (size_t num_bytes)
-{
- unsigned char *stream = GNUNET_malloc (num_bytes);
- GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_WEAK, stream, num_bytes);
- return stream;
-}
-
-/**
- * Generate random sets
- */
-
-static void
-initRandomSets(int overlap, int set1_size, int set2_size, int element_size_in_bytes)
-{
- struct GNUNET_SETU_Element element;
- element.element_type = 0;
-
- // Add elements to both sets
- for (int i = 0; i < overlap; i++) {
- element.data = gen_rdm_bytestream(element_size_in_bytes);
- element.size = element_size_in_bytes;
- GNUNET_SETU_add_element (set1, &element, NULL, NULL);
- GNUNET_SETU_add_element (set2, &element, NULL, NULL);
- set1_size--;
- set2_size--;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in both sets\n");
-
- // Add other elements to set 1
- while(set1_size>0) {
- element.data = gen_rdm_bytestream(element_size_in_bytes);
- element.size = element_size_in_bytes;
- GNUNET_SETU_add_element (set1, &element, NULL, NULL);
- set1_size--;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set1\n");
-
- // Add other elements to set 2
- while(set2_size > 0) {
- element.data = gen_rdm_bytestream(element_size_in_bytes);
- element.size = element_size_in_bytes;
-
- if(set2_size != 1) {
- GNUNET_SETU_add_element (set2, &element,NULL, NULL);
- } else {
- GNUNET_SETU_add_element (set2, &element,&start, NULL);
- }
-
- set2_size--;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized elements in set2\n");
-}
/**
* Initialize the first set, continue.
@@ -392,9 +336,7 @@ run (void *cls,
/* test the real set reconciliation */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Running real set-reconciliation\n");
- //init_set1 ();
- initRandomSets(19500,20000,20000,4096);
- //initRandomSets(19500,20000,20000,32);
+ init_set1 ();
}