From be0475f2a583d203465d3091ff933806a5ace613 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 16 Aug 2020 20:46:39 +0200 Subject: split of set union from set service (preliminary) --- configure.ac | 2 + po/POTFILES.in | 19 + src/include/gnunet_protocols.h | 132 + src/setu/Makefile.am | 102 + src/setu/gnunet-service-setu.c | 3482 +++++++++++++++++++++++ src/setu/gnunet-service-setu.h | 393 +++ src/setu/gnunet-service-setu_protocol.h | 226 ++ src/setu/gnunet-service-setu_strata_estimator.c | 303 ++ src/setu/gnunet-service-setu_strata_estimator.h | 169 ++ src/setu/gnunet-setu-ibf-profiler.c | 308 ++ src/setu/gnunet-setu-profiler.c | 499 ++++ src/setu/ibf.c | 409 +++ src/setu/ibf.h | 255 ++ src/setu/ibf_sim.c | 142 + src/setu/plugin_block_setu_test.c | 123 + src/setu/setu.h | 304 ++ src/setu/setu_api.c | 872 ++++++ src/setu/test_setu_api.c | 360 +++ 18 files changed, 8100 insertions(+) create mode 100644 src/setu/Makefile.am create mode 100644 src/setu/gnunet-service-setu.c create mode 100644 src/setu/gnunet-service-setu.h create mode 100644 src/setu/gnunet-service-setu_protocol.h create mode 100644 src/setu/gnunet-service-setu_strata_estimator.c create mode 100644 src/setu/gnunet-service-setu_strata_estimator.h create mode 100644 src/setu/gnunet-setu-ibf-profiler.c create mode 100644 src/setu/gnunet-setu-profiler.c create mode 100644 src/setu/ibf.c create mode 100644 src/setu/ibf.h create mode 100644 src/setu/ibf_sim.c create mode 100644 src/setu/plugin_block_setu_test.c create mode 100644 src/setu/setu.h create mode 100644 src/setu/setu_api.c create mode 100644 src/setu/test_setu_api.c diff --git a/configure.ac b/configure.ac index e492242a6..bb205220c 100644 --- a/configure.ac +++ b/configure.ac @@ -1937,6 +1937,8 @@ src/scalarproduct/Makefile src/scalarproduct/scalarproduct.conf src/set/Makefile src/set/set.conf +src/setu/Makefile +src/setu/setu.conf src/sq/Makefile src/statistics/Makefile src/statistics/statistics.conf diff --git a/po/POTFILES.in b/po/POTFILES.in index 1f5cc81c3..c5503c343 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -333,6 +333,25 @@ src/set/ibf.c src/set/ibf_sim.c src/set/plugin_block_set_test.c src/set/set_api.c +src/seti/gnunet-service-set.c +src/seti/gnunet-service-set_intersection.c +src/seti/gnunet-service-set_union.c +src/seti/gnunet-service-set_union_strata_estimator.c +src/seti/gnunet-set-ibf-profiler.c +src/seti/gnunet-set-profiler.c +src/seti/ibf.c +src/seti/ibf_sim.c +src/seti/plugin_block_set_test.c +src/seti/set_api.c +src/setu/gnunet-service-set_union.c +src/setu/gnunet-service-set_union_strata_estimator.c +src/setu/gnunet-service-setu.c +src/setu/gnunet-setu-ibf-profiler.c +src/setu/gnunet-setu-profiler.c +src/setu/ibf.c +src/setu/ibf_sim.c +src/setu/plugin_block_setu_test.c +src/setu/setu_api.c src/sq/sq.c src/sq/sq_exec.c src/sq/sq_prepare.c diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 5af58664f..c3fcde0b9 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1658,10 +1658,142 @@ extern "C" { #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT 547 +/******************************************************************************* + * SETU message types + ******************************************************************************/ + + +/** + * Cancel a set operation + */ +#define GNUNET_MESSAGE_TYPE_SETU_CANCEL 550 + +/** + * Add element to set + */ +#define GNUNET_MESSAGE_TYPE_SETU_ADD 551 + +/** + * Create a new local set + */ +#define GNUNET_MESSAGE_TYPE_SETU_CREATE 552 + +/** + * Handle result message from operation + */ +#define GNUNET_MESSAGE_TYPE_SETU_RESULT 553 + +/** + * Evaluate a set operation + */ +#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE 554 + +/** + * Listen for operation requests + */ +#define GNUNET_MESSAGE_TYPE_SETU_LISTEN 555 + +/** + * Reject a set request. + */ +#define GNUNET_MESSAGE_TYPE_SETU_REJECT 556 + +/** + * Accept an incoming set request + */ +#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT 557 + +/** + * Notify the client of an incoming request from a remote peer + */ +#define GNUNET_MESSAGE_TYPE_SETU_REQUEST 558 + + +/** + * Demand the whole element from the other + * peer, given only the hash code. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 565 + +/** + * Demand the whole element from the other + * peer, given only the hash code. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 566 + +/** + * Tell the other peer to send us a list of + * hashes that match an IBF key. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 567 + +/** + * Tell the other peer which hashes match a + * given IBF key. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 568 + +/** + * Request a set union operation from a remote peer. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 581 + +/** + * Strata estimator. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 582 + +/** + * Invertible bloom filter. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 583 + +/** + * Actual set elements. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 584 + +/** + * Requests for the elements with the given hashes. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 585 + +/** + * Set operation is done. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 586 + +/** + * Compressed strata estimator. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 590 + +/** + * Request all missing elements from the other peer, + * based on their sets and the elements we previously sent + * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 597 + +/** + * Send a set element, not as response to a demand but because + * we're sending the full set. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 598 + +/** + * Request all missing elements from the other peer, + * based on their sets and the elements we previously sent + * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. + */ +#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 599 + + /******************************************************************************* * SET message types ******************************************************************************/ + /** * Demand the whole element from the other * peer, given only the hash code. diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am new file mode 100644 index 000000000..b37ceba51 --- /dev/null +++ b/src/setu/Makefile.am @@ -0,0 +1,102 @@ +# This Makefile.am is in the public domain +AM_CPPFLAGS = -I$(top_srcdir)/src/include + +pkgcfgdir= $(pkgdatadir)/config.d/ + +libexecdir= $(pkglibdir)/libexec/ + +plugindir = $(libdir)/gnunet + +pkgcfg_DATA = \ + setu.conf + +if USE_COVERAGE + AM_CFLAGS = -fprofile-arcs -ftest-coverage +endif + +if HAVE_TESTING +bin_PROGRAMS = \ + gnunet-setu-profiler + +noinst_PROGRAMS = \ + gnunet-setu-ibf-profiler +endif + +libexec_PROGRAMS = \ + gnunet-service-setu + +lib_LTLIBRARIES = \ + libgnunetsetu.la + +gnunet_setu_profiler_SOURCES = \ + gnunet-setu-profiler.c +gnunet_setu_profiler_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + libgnunetsetu.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(GN_LIBINTL) + + +gnunet_setu_ibf_profiler_SOURCES = \ + gnunet-setu-ibf-profiler.c \ + ibf.c +gnunet_setu_ibf_profiler_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(GN_LIBINTL) + +gnunet_service_setu_SOURCES = \ + gnunet-service-setu.c gnunet-service-setu.h \ + ibf.c ibf.h \ + gnunet-service-setu_strata_estimator.c gnunet-service-setu_strata_estimator.h \ + gnunet-service-setu_protocol.h +gnunet_service_setu_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/cadet/libgnunetcadet.la \ + $(top_builddir)/src/block/libgnunetblock.la \ + libgnunetsetu.la \ + $(GN_LIBINTL) + +libgnunetsetu_la_SOURCES = \ + setu_api.c setu.h +libgnunetsetu_la_LIBADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(LTLIBINTL) +libgnunetsetu_la_LDFLAGS = \ + $(GN_LIB_LDFLAGS) + +if HAVE_TESTING +check_PROGRAMS = \ + test_setu_api +endif + +if ENABLE_TEST_RUN +AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME; +TESTS = $(check_PROGRAMS) +endif + +test_setu_api_SOURCES = \ + test_setu_api.c +test_setu_api_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + libgnunetsetu.la + +plugin_LTLIBRARIES = \ + libgnunet_plugin_block_setu_test.la + +libgnunet_plugin_block_setu_test_la_SOURCES = \ + plugin_block_setu_test.c +libgnunet_plugin_block_setu_test_la_LIBADD = \ + $(top_builddir)/src/block/libgnunetblock.la \ + $(top_builddir)/src/block/libgnunetblockgroup.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(LTLIBINTL) +libgnunet_plugin_block_setu_test_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) + + +EXTRA_DIST = \ + test_setu.conf diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c new file mode 100644 index 000000000..88edc622f --- /dev/null +++ b/src/setu/gnunet-service-setu.c @@ -0,0 +1,3482 @@ +/* + This file is part of GNUnet + Copyright (C) 2013-2017, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file setu/gnunet-service-setu.c + * @brief set union operation + * @author Florian Dold + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" +#include "gnunet-service-setu.h" +#include "ibf.h" +#include "gnunet-service-setu_strata_estimator.h" +#include "gnunet-service-setu_protocol.h" +#include "gnunet_statistics_service.h" +#include + + +#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__) + +/** + * How long do we hold on to an incoming channel if there is + * no local listener before giving up? + */ +#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES + +/** + * Number of IBFs in a strata estimator. + */ +#define SE_STRATA_COUNT 32 + +/** + * Size of the IBFs in the strata estimator. + */ +#define SE_IBF_SIZE 80 + +/** + * The hash num parameter for the difference digests and strata estimators. + */ +#define SE_IBF_HASH_NUM 4 + +/** + * Number of buckets that can be transmitted in one message. + */ +#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE) + +/** + * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). + * Choose this value so that computing the IBF is still cheaper + * than transmitting all values. + */ +#define MAX_IBF_ORDER (20) + +/** + * Number of buckets used in the ibf per estimated + * difference. + */ +#define IBF_ALPHA 4 + + +/** + * Current phase we are in for a union operation. + */ +enum UnionOperationPhase +{ + /** + * We sent the request message, and expect a strata estimator. + */ + PHASE_EXPECT_SE, + + /** + * We sent the strata estimator, and expect an IBF. This phase is entered once + * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. + * + * XXX: could use better wording. + * XXX: repurposed to also expect a "request full set" message, should be renamed + * + * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS + */ + PHASE_EXPECT_IBF, + + /** + * Continuation for multi part IBFs. + */ + PHASE_EXPECT_IBF_CONT, + + /** + * We are decoding an IBF. + */ + PHASE_INVENTORY_ACTIVE, + + /** + * The other peer is decoding the IBF we just sent. + */ + PHASE_INVENTORY_PASSIVE, + + /** + * The protocol is almost finished, but we still have to flush our message + * queue and/or expect some elements. + */ + PHASE_FINISH_CLOSING, + + /** + * In the penultimate phase, + * we wait until all our demands + * are satisfied. Then we send a done + * message, and wait for another done message. + */ + PHASE_FINISH_WAITING, + + /** + * In the ultimate phase, we wait until + * our demands are satisfied and then + * quit (sending another DONE message). + */ + PHASE_DONE, + + /** + * After sending the full set, wait for responses with the elements + * that the local peer is missing. + */ + PHASE_FULL_SENDING, +}; + + +/** + * State of an evaluate operation with another peer. + */ +struct OperationState +{ + /** + * Copy of the set's strata estimator at the time of + * creation of this operation. + */ + struct StrataEstimator *se; + + /** + * The IBF we currently receive. + */ + struct InvertibleBloomFilter *remote_ibf; + + /** + * The IBF with the local set's element. + */ + struct InvertibleBloomFilter *local_ibf; + + /** + * Maps unsalted IBF-Keys to elements. + * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. + * Colliding IBF-Keys are linked. + */ + struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; + + /** + * Current state of the operation. + */ + enum UnionOperationPhase phase; + + /** + * Did we send the client that we are done? + */ + int client_done_sent; + + /** + * Number of ibf buckets already received into the @a remote_ibf. + */ + unsigned int ibf_buckets_received; + + /** + * Hashes for elements that we have demanded from the other peer. + */ + struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes; + + /** + * Salt that we're using for sending IBFs + */ + uint32_t salt_send; + + /** + * Salt for the IBF we've received and that we're currently decoding. + */ + uint32_t salt_receive; + + /** + * Number of elements we received from the other peer + * that were not in the local set yet. + */ + uint32_t received_fresh; + + /** + * Total number of elements received from the other peer. + */ + uint32_t received_total; + + /** + * Initial size of our set, just before + * the operation started. + */ + uint64_t initial_size; +}; + + +/** + * The key entry is used to associate an ibf key with an element. + */ +struct KeyEntry +{ + /** + * IBF key for the entry, derived from the current salt. + */ + struct IBF_Key ibf_key; + + /** + * The actual element associated with the key. + * + * Only owned by the union operation if element->operation + * is #GNUNET_YES. + */ + struct ElementEntry *element; + + /** + * Did we receive this element? + * Even if element->is_foreign is false, we might + * have received the element, so this indicates that + * the other peer has it. + */ + int received; +}; + + +/** + * Used as a closure for sending elements + * with a specific IBF key. + */ +struct SendElementClosure +{ + /** + * The IBF key whose matching elements should be + * sent. + */ + struct IBF_Key ibf_key; + + /** + * Operation for which the elements + * should be sent. + */ + struct Operation *op; +}; + + +/** + * Extra state required for efficient set union. + */ +struct SetState +{ + /** + * The strata estimator is only generated once for + * each set. + * The IBF keys are derived from the element hashes with + * salt=0. + */ + struct StrataEstimator *se; +}; + + +/** + * A listener is inhabited by a client, and waits for evaluation + * requests from remote peers. + */ +struct Listener +{ + /** + * Listeners are held in a doubly linked list. + */ + struct Listener *next; + + /** + * Listeners are held in a doubly linked list. + */ + struct Listener *prev; + + /** + * Head of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. + */ + struct Operation *op_head; + + /** + * Tail of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. + */ + struct Operation *op_tail; + + /** + * Client that owns the listener. + * Only one client may own a listener. + */ + struct ClientState *cs; + + /** + * The port we are listening on with CADET. + */ + struct GNUNET_CADET_Port *open_port; + + /** + * Application ID for the operation, used to distinguish + * multiple operations of the same type with the same peer. + */ + struct GNUNET_HashCode app_id; + +}; + + +/** + * Handle to the cadet service, used to listen for and connect to + * remote peers. + */ +static struct GNUNET_CADET_Handle *cadet; + +/** + * Statistics handle. + */ +struct GNUNET_STATISTICS_Handle *_GSS_statistics; + +/** + * Listeners are held in a doubly linked list. + */ +static struct Listener *listener_head; + +/** + * Listeners are held in a doubly linked list. + */ +static struct Listener *listener_tail; + +/** + * Number of active clients. + */ +static unsigned int num_clients; + +/** + * Are we in shutdown? if #GNUNET_YES and the number of clients + * drops to zero, disconnect from CADET. + */ +static int in_shutdown; + +/** + * Counter for allocating unique IDs for clients, used to identify + * incoming operation requests from remote peers, that the client can + * choose to accept or refuse. 0 must not be used (reserved for + * uninitialized). + */ +static uint32_t suggest_id; + + +/** + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +destroy_key_to_element_iter (void *cls, + uint32_t key, + void *value) +{ + struct KeyEntry *k = value; + + GNUNET_assert (NULL != k); + if (GNUNET_YES == k->element->remote) + { + GNUNET_free (k->element); + k->element = NULL; + } + GNUNET_free (k); + return GNUNET_YES; +} + + +/** + * Destroy the union operation. Only things specific to the union + * operation are destroyed. + * + * @param op union operation to destroy + */ +static void +union_op_cancel (struct Operation *op) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op\n"); + /* check if the op was canceled twice */ + GNUNET_assert (NULL != op->state); + if (NULL != op->state->remote_ibf) + { + ibf_destroy (op->state->remote_ibf); + op->state->remote_ibf = NULL; + } + if (NULL != op->state->demanded_hashes) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); + op->state->demanded_hashes = NULL; + } + if (NULL != op->state->local_ibf) + { + ibf_destroy (op->state->local_ibf); + op->state->local_ibf = NULL; + } + if (NULL != op->state->se) + { + strata_estimator_destroy (op->state->se); + op->state->se = NULL; + } + if (NULL != op->state->key_to_element) + { + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &destroy_key_to_element_iter, + NULL); + GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); + op->state->key_to_element = NULL; + } + GNUNET_free (op->state); + op->state = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op done\n"); +} + + +/** + * Inform the client that the union operation has failed, + * and proceed to destroy the evaluate operation. + * + * @param op the union operation to fail + */ +static void +fail_union_operation (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SETU_ResultMessage *msg; + + LOG (GNUNET_ERROR_TYPE_WARNING, + "union operation failed\n"); + ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); + msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE); + msg->request_id = htonl (op->client_request_id); + msg->element_type = htons (0); + GNUNET_MQ_send (op->set->cs->mq, + ev); + _GSS_operation_destroy (op); +} + + +/** + * Derive the IBF key from a hash code and + * a salt. + * + * @param src the hash code + * @return the derived IBF key + */ +static struct IBF_Key +get_ibf_key (const struct GNUNET_HashCode *src) +{ + struct IBF_Key key; + uint16_t salt = 0; + + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_kdf (&key, sizeof(key), + src, sizeof *src, + &salt, sizeof(salt), + NULL, 0)); + return key; +} + + +/** + * Context for #op_get_element_iterator + */ +struct GetElementContext +{ + /** + * FIXME. + */ + struct GNUNET_HashCode hash; + + /** + * FIXME. + */ + struct KeyEntry *k; +}; + + +/** + * Iterator over the mapping from IBF keys to element entries. Checks if we + * have an element with a given GNUNET_HashCode. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should search further, + * #GNUNET_NO if we've found the element. + */ +static int +op_get_element_iterator (void *cls, + uint32_t key, + void *value) +{ + struct GetElementContext *ctx = cls; + struct KeyEntry *k = value; + + GNUNET_assert (NULL != k); + if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, + &ctx->hash)) + { + ctx->k = k; + return GNUNET_NO; + } + return GNUNET_YES; +} + + +/** + * Determine whether the given element is already in the operation's element + * set. + * + * @param op operation that should be tested for 'element_hash' + * @param element_hash hash of the element to look for + * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise + */ +static struct KeyEntry * +op_get_element (struct Operation *op, + const struct GNUNET_HashCode *element_hash) +{ + int ret; + struct IBF_Key ibf_key; + struct GetElementContext ctx = { { { 0 } }, 0 }; + + ctx.hash = *element_hash; + + ibf_key = get_ibf_key (element_hash); + ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, + (uint32_t) ibf_key.key_val, + op_get_element_iterator, + &ctx); + + /* was the iteration aborted because we found the element? */ + if (GNUNET_SYSERR == ret) + { + GNUNET_assert (NULL != ctx.k); + return ctx.k; + } + return NULL; +} + + +/** + * Insert an element into the union operation's + * key-to-element mapping. Takes ownership of 'ee'. + * Note that this does not insert the element in the set, + * only in the operation's key-element mapping. + * This is done to speed up re-tried operations, if some elements + * were transmitted, and then the IBF fails to decode. + * + * XXX: clarify ownership, doesn't sound right. + * + * @param op the union operation + * @param ee the element entry + * @parem received was this element received from the remote peer? + */ +static void +op_register_element (struct Operation *op, + struct ElementEntry *ee, + int received) +{ + struct IBF_Key ibf_key; + struct KeyEntry *k; + + ibf_key = get_ibf_key (&ee->element_hash); + k = GNUNET_new (struct KeyEntry); + k->element = ee; + k->ibf_key = ibf_key; + k->received = received; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, + (uint32_t) ibf_key.key_val, + k, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); +} + + +/** + * FIXME. + */ +static void +salt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + + /* rotate ibf key */ + x = (x >> s) | (x << (64 - s)); + k_out->key_val = x; +} + + +/** + * FIXME. + */ +static void +unsalt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + + x = (x << s) | (x >> (64 - s)); + k_out->key_val = x; +} + + +/** + * Insert a key into an ibf. + * + * @param cls the ibf + * @param key unused + * @param value the key entry to get the key from + */ +static int +prepare_ibf_iterator (void *cls, + uint32_t key, + void *value) +{ + struct Operation *op = cls; + struct KeyEntry *ke = value; + struct IBF_Key salted_key; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] inserting %lx (hash %s) into ibf\n", + (void *) op, + (unsigned long) ke->ibf_key.key_val, + GNUNET_h2s (&ke->element->element_hash)); + salt_key (&ke->ibf_key, + op->state->salt_send, + &salted_key); + ibf_insert (op->state->local_ibf, salted_key); + return GNUNET_YES; +} + + +/** + * Iterator for initializing the + * key-to-element mapping of a union operation + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +init_key_to_element_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + + /* make sure that the element belongs to the set at the time + * of creating the operation */ + if (GNUNET_NO == + _GSS_is_element_of_operation (ee, + op)) + return GNUNET_YES; + GNUNET_assert (GNUNET_NO == ee->remote); + op_register_element (op, + ee, + GNUNET_NO); + return GNUNET_YES; +} + + +/** + * Initialize the IBF key to element mapping local to this set + * operation. + * + * @param op the set union operation + */ +static void +initialize_key_to_element (struct Operation *op) +{ + unsigned int len; + + GNUNET_assert (NULL == op->state->key_to_element); + len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements); + op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &init_key_to_element_iterator, + op); +} + + +/** + * Create an ibf with the operation's elements + * of the specified size + * + * @param op the union operation + * @param size size of the ibf to create + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure + */ +static int +prepare_ibf (struct Operation *op, + uint32_t size) +{ + GNUNET_assert (NULL != op->state->key_to_element); + + if (NULL != op->state->local_ibf) + ibf_destroy (op->state->local_ibf); + op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); + if (NULL == op->state->local_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate local IBF\n"); + return GNUNET_SYSERR; + } + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &prepare_ibf_iterator, + op); + return GNUNET_OK; +} + + +/** + * Send an ibf of appropriate size. + * + * Fragments the IBF into multiple messages if necessary. + * + * @param op the union operation + * @param ibf_order order of the ibf to send, size=2^order + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure + */ +static int +send_ibf (struct Operation *op, + uint16_t ibf_order) +{ + unsigned int buckets_sent = 0; + struct InvertibleBloomFilter *ibf; + + if (GNUNET_OK != + prepare_ibf (op, 1 << ibf_order)) + { + /* allocation failed */ + return GNUNET_SYSERR; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending ibf of size %u\n", + 1 << ibf_order); + + { + char name[64] = { 0 }; + snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order); + GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO); + } + + ibf = op->state->local_ibf; + + while (buckets_sent < (1 << ibf_order)) + { + unsigned int buckets_in_message; + struct GNUNET_MQ_Envelope *ev; + struct IBFMessage *msg; + + buckets_in_message = (1 << ibf_order) - buckets_sent; + /* limit to maximum */ + if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) + buckets_in_message = MAX_BUCKETS_PER_MESSAGE; + + ev = GNUNET_MQ_msg_extra (msg, + buckets_in_message * IBF_BUCKET_SIZE, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); + msg->reserved1 = 0; + msg->reserved2 = 0; + msg->order = ibf_order; + msg->offset = htonl (buckets_sent); + msg->salt = htonl (op->state->salt_send); + ibf_write_slice (ibf, buckets_sent, + buckets_in_message, &msg[1]); + buckets_sent += buckets_in_message; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "ibf chunk size %u, %u/%u sent\n", + buckets_in_message, + buckets_sent, + 1 << ibf_order); + GNUNET_MQ_send (op->mq, ev); + } + + /* The other peer must decode the IBF, so + * we're passive. */ + op->state->phase = PHASE_INVENTORY_PASSIVE; + return GNUNET_OK; +} + + +/** + * Compute the necessary order of an ibf + * from the size of the symmetric set difference. + * + * @param diff the difference + * @return the required size of the ibf + */ +static unsigned int +get_order_from_difference (unsigned int diff) +{ + unsigned int ibf_order; + + ibf_order = 2; + while (((1 << ibf_order) < (IBF_ALPHA * diff) || + ((1 << ibf_order) < SE_IBF_HASH_NUM)) && + (ibf_order < MAX_IBF_ORDER)) + ibf_order++; + // add one for correction + return ibf_order + 1; +} + + +/** + * Send a set element. + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +send_full_element_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct GNUNET_SETU_ElementMessage *emsg; + struct ElementEntry *ee = value; + struct GNUNET_SETU_Element *el = &ee->element; + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending element %s\n", + GNUNET_h2s (key)); + ev = GNUNET_MQ_msg_extra (emsg, + el->size, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + emsg->element_type = htons (el->element_type); + GNUNET_memcpy (&emsg[1], + el->data, + el->size); + GNUNET_MQ_send (op->mq, + ev); + return GNUNET_YES; +} + + +/** + * Switch to full set transmission for @a op. + * + * @param op operation to switch to full set transmission. + */ +static void +send_full_set (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_FULL_SENDING; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Dedicing to transmit the full set\n"); + /* FIXME: use a more memory-friendly way of doing this with an + iterator, just as we do in the non-full case! */ + (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &send_full_element_iterator, + op); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, + ev); +} + + +/** + * Handle a strata estimator from a remote peer + * + * @param cls the union operation + * @param msg the message + */ +static int +check_union_p2p_strata_estimator (void *cls, + const struct StrataEstimatorMessage *msg) +{ + struct Operation *op = cls; + int is_compressed; + size_t len; + + if (op->state->phase != PHASE_EXPECT_SE) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons ( + msg->header.type)); + len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); + if ((GNUNET_NO == is_compressed) && + (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle a strata estimator from a remote peer + * + * @param cls the union operation + * @param msg the message + */ +static void +handle_union_p2p_strata_estimator (void *cls, + const struct StrataEstimatorMessage *msg) +{ + struct Operation *op = cls; + struct StrataEstimator *remote_se; + unsigned int diff; + uint64_t other_size; + size_t len; + int is_compressed; + + is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons ( + msg->header.type)); + GNUNET_STATISTICS_update (_GSS_statistics, + "# bytes of SE received", + ntohs (msg->header.size), + GNUNET_NO); + len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage); + other_size = GNUNET_ntohll (msg->set_size); + remote_se = strata_estimator_create (SE_STRATA_COUNT, + SE_IBF_SIZE, + SE_IBF_HASH_NUM); + if (NULL == remote_se) + { + /* insufficient resources, fail */ + fail_union_operation (op); + return; + } + if (GNUNET_OK != + strata_estimator_read (&msg[1], + len, + is_compressed, + remote_se)) + { + /* decompression failed */ + strata_estimator_destroy (remote_se); + fail_union_operation (op); + return; + } + GNUNET_assert (NULL != op->state->se); + diff = strata_estimator_difference (remote_se, + op->state->se); + + if (diff > 200) + diff = diff * 3 / 2; + + strata_estimator_destroy (remote_se); + strata_estimator_destroy (op->state->se); + op->state->se = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got se diff=%d, using ibf size %d\n", + diff, + 1U << get_order_from_difference (diff)); + + { + char *set_debug; + + set_debug = getenv ("GNUNET_SETU_BENCHMARK"); + if ((NULL != set_debug) && + (0 == strcmp (set_debug, "1"))) + { + FILE *f = fopen ("set.log", "a"); + fprintf (f, "%llu\n", (unsigned long long) diff); + fclose (f); + } + } + + if ((GNUNET_YES == op->byzantine) && + (other_size < op->byzantine_lower_bound)) + { + GNUNET_break (0); + fail_union_operation (op); + return; + } + + if ((GNUNET_YES == op->force_full) || + (diff > op->state->initial_size / 4) || + (0 == other_size)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Deciding to go for full set transmission (diff=%d, own set=%u)\n", + diff, + op->state->initial_size); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of full sends", + 1, + GNUNET_NO); + if ((op->state->initial_size <= other_size) || + (0 == other_size)) + { + send_full_set (op); + } + else + { + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Telling other peer that we expect its full set\n"); + op->state->phase = PHASE_EXPECT_IBF; + ev = GNUNET_MQ_msg_header ( + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); + GNUNET_MQ_send (op->mq, + ev); + } + } + else + { + GNUNET_STATISTICS_update (_GSS_statistics, + "# of ibf sends", + 1, + GNUNET_NO); + if (GNUNET_OK != + send_ibf (op, + get_order_from_difference (diff))) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + return; + } + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Iterator to send elements to a remote peer + * + * @param cls closure with the element key and the union operation + * @param key ignored + * @param value the key entry + */ +static int +send_offers_iterator (void *cls, + uint32_t key, + void *value) +{ + struct SendElementClosure *sec = cls; + struct Operation *op = sec->op; + struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_MessageHeader *mh; + + /* Detect 32-bit key collision for the 64-bit IBF keys. */ + if (ke->ibf_key.key_val != sec->ibf_key.key_val) + return GNUNET_YES; + + ev = GNUNET_MQ_msg_header_extra (mh, + sizeof(struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); + + GNUNET_assert (NULL != ev); + *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] sending element offer (%s) to peer\n", + (void *) op, + GNUNET_h2s (&ke->element->element_hash)); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; +} + + +/** + * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key. + * + * @param op union operation + * @param ibf_key IBF key of interest + */ +static void +send_offers_for_key (struct Operation *op, + struct IBF_Key ibf_key) +{ + struct SendElementClosure send_cls; + + send_cls.ibf_key = ibf_key; + send_cls.op = op; + (void) GNUNET_CONTAINER_multihashmap32_get_multiple ( + op->state->key_to_element, + (uint32_t) ibf_key. + key_val, + &send_offers_iterator, + &send_cls); +} + + +/** + * Decode which elements are missing on each side, and + * send the appropriate offers and inquiries. + * + * @param op union operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure + */ +static int +decode_and_send (struct Operation *op) +{ + struct IBF_Key key; + struct IBF_Key last_key; + int side; + unsigned int num_decoded; + struct InvertibleBloomFilter *diff_ibf; + + GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); + + if (GNUNET_OK != + prepare_ibf (op, + op->state->remote_ibf->size)) + { + GNUNET_break (0); + /* allocation failed */ + return GNUNET_SYSERR; + } + diff_ibf = ibf_dup (op->state->local_ibf); + ibf_subtract (diff_ibf, + op->state->remote_ibf); + + ibf_destroy (op->state->remote_ibf); + op->state->remote_ibf = NULL; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoding IBF (size=%u)\n", + diff_ibf->size); + + num_decoded = 0; + key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ + + while (1) + { + int res; + int cycle_detected = GNUNET_NO; + + last_key = key; + + res = ibf_decode (diff_ibf, &side, &key); + if (res == GNUNET_OK) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoded ibf key %lx\n", + (unsigned long) key.key_val); + num_decoded += 1; + if ((num_decoded > diff_ibf->size) || + ((num_decoded > 1) && + (last_key.key_val == key.key_val))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "detected cyclic ibf (decoded %u/%u)\n", + num_decoded, + diff_ibf->size); + cycle_detected = GNUNET_YES; + } + } + if ((GNUNET_SYSERR == res) || + (GNUNET_YES == cycle_detected)) + { + int next_order; + next_order = 0; + while (1 << next_order < diff_ibf->size) + next_order++; + next_order++; + if (next_order <= MAX_IBF_ORDER) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoding failed, sending larger ibf (size %u)\n", + 1 << next_order); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of IBF retries", + 1, + GNUNET_NO); + op->state->salt_send++; + if (GNUNET_OK != + send_ibf (op, next_order)) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; + } + } + else + { + GNUNET_STATISTICS_update (_GSS_statistics, + "# of failed union operations (too large)", + 1, + GNUNET_NO); + // XXX: Send the whole set, element-by-element + LOG (GNUNET_ERROR_TYPE_ERROR, + "set union failed: reached ibf limit\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; + } + break; + } + if (GNUNET_NO == res) + { + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "transmitted all values, sending DONE\n"); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); + GNUNET_MQ_send (op->mq, ev); + /* We now wait until we get a DONE message back + * and then wait for our MQ to be flushed and all our + * demands be delivered. */ + break; + } + if (1 == side) + { + struct IBF_Key unsalted_key; + + unsalt_key (&key, + op->state->salt_receive, + &unsalted_key); + send_offers_for_key (op, + unsalted_key); + } + else if (-1 == side) + { + struct GNUNET_MQ_Envelope *ev; + struct InquiryMessage *msg; + + /* It may be nice to merge multiple requests, but with CADET's corking it is not worth + * the effort additional complexity. */ + ev = GNUNET_MQ_msg_extra (msg, + sizeof(struct IBF_Key), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); + msg->salt = htonl (op->state->salt_receive); + GNUNET_memcpy (&msg[1], + &key, + sizeof(struct IBF_Key)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element inquiry for IBF key %lx\n", + (unsigned long) key.key_val); + GNUNET_MQ_send (op->mq, ev); + } + else + { + GNUNET_assert (0); + } + } + ibf_destroy (diff_ibf); + return GNUNET_OK; +} + + +/** + * Check an IBF message from a remote peer. + * + * Reassemble the IBF from multiple pieces, and + * process the whole IBF once possible. + * + * @param cls the union operation + * @param msg the header of the message + * @return #GNUNET_OK if @a msg is well-formed + */ +static int +check_union_p2p_ibf (void *cls, + const struct IBFMessage *msg) +{ + struct Operation *op = cls; + unsigned int buckets_in_message; + + buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) + / IBF_BUCKET_SIZE; + if (0 == buckets_in_message) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message + * IBF_BUCKET_SIZE) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (op->state->phase == PHASE_EXPECT_IBF_CONT) + { + if (ntohl (msg->offset) != op->state->ibf_buckets_received) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (1 << msg->order != op->state->remote_ibf->size) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (ntohl (msg->salt) != op->state->salt_receive) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + } + else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && + (op->state->phase != PHASE_EXPECT_IBF)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Handle an IBF message from a remote peer. + * + * Reassemble the IBF from multiple pieces, and + * process the whole IBF once possible. + * + * @param cls the union operation + * @param msg the header of the message + */ +static void +handle_union_p2p_ibf (void *cls, + const struct IBFMessage *msg) +{ + struct Operation *op = cls; + unsigned int buckets_in_message; + + buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) + / IBF_BUCKET_SIZE; + if ((op->state->phase == PHASE_INVENTORY_PASSIVE) || + (op->state->phase == PHASE_EXPECT_IBF)) + { + op->state->phase = PHASE_EXPECT_IBF_CONT; + GNUNET_assert (NULL == op->state->remote_ibf); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new ibf of size %u\n", + 1 << msg->order); + op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM); + op->state->salt_receive = ntohl (msg->salt); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving new IBF with salt %u\n", + op->state->salt_receive); + if (NULL == op->state->remote_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to parse remote IBF, closing connection\n"); + fail_union_operation (op); + return; + } + op->state->ibf_buckets_received = 0; + if (0 != ntohl (msg->offset)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + } + else + { + GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received more of IBF\n"); + } + GNUNET_assert (NULL != op->state->remote_ibf); + + ibf_read_slice (&msg[1], + op->state->ibf_buckets_received, + buckets_in_message, + op->state->remote_ibf); + op->state->ibf_buckets_received += buckets_in_message; + + if (op->state->ibf_buckets_received == op->state->remote_ibf->size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received full ibf\n"); + op->state->phase = PHASE_INVENTORY_ACTIVE; + if (GNUNET_OK != + decode_and_send (op)) + { + /* Internal error, best we can do is shut down */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to decode IBF, closing connection\n"); + fail_union_operation (op); + return; + } + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Send a result message to the client indicating + * that there is a new element. + * + * @param op union operation + * @param element element to send + * @param status status to send with the new element + */ +static void +send_client_element (struct Operation *op, + const struct GNUNET_SETU_Element *element, + int status) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SETU_ResultMessage *rm; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element (size %u) to client\n", + element->size); + GNUNET_assert (0 != op->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == ev) + { + GNUNET_MQ_discard (ev); + GNUNET_break (0); + return; + } + rm->result_status = htons (status); + rm->request_id = htonl (op->client_request_id); + rm->element_type = htons (element->element_type); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size ( + op->state->key_to_element)); + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_send (op->set->cs->mq, + ev); +} + + +/** + * Signal to the client that the operation has finished and + * destroy the operation. + * + * @param cls operation to destroy + */ +static void +send_client_done (void *cls) +{ + struct Operation *op = cls; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SETU_ResultMessage *rm; + + if (GNUNET_YES == op->state->client_done_sent) + { + return; + } + + if (PHASE_DONE != op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Union operation failed\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Union operations failed", + 1, + GNUNET_NO); + ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE); + rm->request_id = htonl (op->client_request_id); + rm->element_type = htons (0); + GNUNET_MQ_send (op->set->cs->mq, + ev); + return; + } + + op->state->client_done_sent = GNUNET_YES; + + GNUNET_STATISTICS_update (_GSS_statistics, + "# Union operations succeeded", + 1, + GNUNET_NO); + LOG (GNUNET_ERROR_TYPE_INFO, + "Signalling client that union operation is done\n"); + ev = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (op->client_request_id); + rm->result_status = htons (GNUNET_SETU_STATUS_DONE); + rm->element_type = htons (0); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size ( + op->state->key_to_element)); + GNUNET_MQ_send (op->set->cs->mq, + ev); +} + + +/** + * Tests if the operation is finished, and if so notify. + * + * @param op operation to check + */ +static void +maybe_finish (struct Operation *op) +{ + unsigned int num_demanded; + + num_demanded = GNUNET_CONTAINER_multihashmap_size ( + op->state->demanded_hashes); + + if (PHASE_FINISH_WAITING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_WAITING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) + { + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_DONE; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); + GNUNET_MQ_send (op->mq, + ev); + /* We now wait until the other peer sends P2P_OVER + * after it got all elements from us. */ + } + } + if (PHASE_FINISH_CLOSING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_CLOSING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) + { + op->state->phase = PHASE_DONE; + send_client_done (op); + _GSS_operation_destroy2 (op); + } + } +} + + +/** + * Check an element message from a remote peer. + * + * @param cls the union operation + * @param emsg the message + */ +static int +check_union_p2p_elements (void *cls, + const struct GNUNET_SETU_ElementMessage *emsg) +{ + struct Operation *op = cls; + + if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle an element message from a remote peer. + * Sent by the other peer either because we decoded an IBF and placed a demand, + * or because the other peer switched to full set transmission. + * + * @param cls the union operation + * @param emsg the message + */ +static void +handle_union_p2p_elements (void *cls, + const struct GNUNET_SETU_ElementMessage *emsg) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + struct KeyEntry *ke; + uint16_t element_size; + + element_size = ntohs (emsg->header.size) - sizeof(struct + GNUNET_SETU_ElementMessage); + ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], + &emsg[1], + element_size); + ee->element.size = element_size; + ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); + ee->remote = GNUNET_YES; + GNUNET_SETU_element_hash (&ee->element, + &ee->element_hash); + if (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, + &ee->element_hash, + NULL)) + { + /* We got something we didn't demand, since it's not in our map. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total++; + + ke = op_get_element (op, &ee->element_hash); + if (NULL != ke) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; + GNUNET_free (ee); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh++; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + send_client_element (op, + &ee->element, + GNUNET_SETU_STATUS_ADD_LOCAL); + } + + if ((op->state->received_total > 8) && + (op->state->received_fresh < op->state->received_total / 3)) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + GNUNET_CADET_receive_done (op->channel); + maybe_finish (op); +} + + +/** + * Check a full element message from a remote peer. + * + * @param cls the union operation + * @param emsg the message + */ +static int +check_union_p2p_full_element (void *cls, + const struct GNUNET_SETU_ElementMessage *emsg) +{ + struct Operation *op = cls; + + (void) op; + // FIXME: check that we expect full elements here? + return GNUNET_OK; +} + + +/** + * Handle an element message from a remote peer. + * + * @param cls the union operation + * @param emsg the message + */ +static void +handle_union_p2p_full_element (void *cls, + const struct GNUNET_SETU_ElementMessage *emsg) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + struct KeyEntry *ke; + uint16_t element_size; + + element_size = ntohs (emsg->header.size) + - sizeof(struct GNUNET_SETU_ElementMessage); + ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); + ee->element.size = element_size; + ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); + ee->remote = GNUNET_YES; + GNUNET_SETU_element_hash (&ee->element, &ee->element_hash); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (full diff, size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total++; + + ke = op_get_element (op, &ee->element_hash); + if (NULL != ke) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; + GNUNET_free (ee); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh++; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + send_client_element (op, + &ee->element, + GNUNET_SETU_STATUS_ADD_LOCAL); + } + + if ((GNUNET_YES == op->byzantine) && + (op->state->received_total > 384 + op->state->received_fresh * 4) && + (op->state->received_fresh < op->state->received_total / 6)) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + LOG (GNUNET_ERROR_TYPE_ERROR, + "Other peer sent only %llu/%llu fresh elements, failing operation\n", + (unsigned long long) op->state->received_fresh, + (unsigned long long) op->state->received_total); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Send offers (for GNUNET_Hash-es) in response + * to inquiries (for IBF_Key-s). + * + * @param cls the union operation + * @param msg the message + */ +static int +check_union_p2p_inquiry (void *cls, + const struct InquiryMessage *msg) +{ + struct Operation *op = cls; + unsigned int num_keys; + + if (op->state->phase != PHASE_INVENTORY_PASSIVE) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) + / sizeof(struct IBF_Key); + if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage)) + != num_keys * sizeof(struct IBF_Key)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Send offers (for GNUNET_Hash-es) in response + * to inquiries (for IBF_Key-s). + * + * @param cls the union operation + * @param msg the message + */ +static void +handle_union_p2p_inquiry (void *cls, + const struct InquiryMessage *msg) +{ + struct Operation *op = cls; + const struct IBF_Key *ibf_key; + unsigned int num_keys; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received union inquiry\n"); + num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage)) + / sizeof(struct IBF_Key); + ibf_key = (const struct IBF_Key *) &msg[1]; + while (0 != num_keys--) + { + struct IBF_Key unsalted_key; + + unsalt_key (ibf_key, + ntohl (msg->salt), + &unsalted_key); + send_offers_for_key (op, + unsalted_key); + ibf_key++; + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +send_missing_full_elements_iter (void *cls, + uint32_t key, + void *value) +{ + struct Operation *op = cls; + struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SETU_ElementMessage *emsg; + struct ElementEntry *ee = ke->element; + + if (GNUNET_YES == ke->received) + return GNUNET_YES; + ev = GNUNET_MQ_msg_extra (emsg, + ee->element.size, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + GNUNET_memcpy (&emsg[1], + ee->element.data, + ee->element.size); + emsg->element_type = htons (ee->element.element_type); + GNUNET_MQ_send (op->mq, + ev); + return GNUNET_YES; +} + + +/** + * Handle a request for full set transmission. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_union_p2p_request_full (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received request for full set transmission\n"); + if (PHASE_EXPECT_IBF != op->state->phase) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + // FIXME: we need to check that our set is larger than the + // byzantine_lower_bound by some threshold + send_full_set (op); + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Handle a "full done" message. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_union_p2p_full_done (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + switch (op->state->phase) + { + case PHASE_EXPECT_IBF: + { + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got FULL DONE, sending elements that other peer is missing\n"); + + /* send all the elements that did not come from the remote peer */ + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &send_missing_full_elements_iter, + op); + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, + ev); + op->state->phase = PHASE_DONE; + /* we now wait until the other peer sends us the OVER message*/ + } + break; + + case PHASE_FULL_SENDING: + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got FULL DONE, finishing\n"); + /* We sent the full set, and got the response for that. We're done. */ + op->state->phase = PHASE_DONE; + GNUNET_CADET_receive_done (op->channel); + send_client_done (op); + _GSS_operation_destroy2 (op); + return; + } + break; + + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Handle full done phase is %u\n", + (unsigned) op->state->phase); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Check a demand by the other peer for elements based on a list + * of `struct GNUNET_HashCode`s. + * + * @parem cls closure, a set union operation + * @param mh the demand message + * @return #GNUNET_OK if @a mh is well-formed + */ +static int +check_union_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + unsigned int num_hashes; + + (void) op; + num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) + / sizeof(struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) + != num_hashes * sizeof(struct GNUNET_HashCode)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle a demand by the other peer for elements based on a list + * of `struct GNUNET_HashCode`s. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_union_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + struct GNUNET_SETU_ElementMessage *emsg; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + struct GNUNET_MQ_Envelope *ev; + + num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) + / sizeof(struct GNUNET_HashCode); + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) + { + ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, + hash); + if (NULL == ee) + { + /* Demand for non-existing element. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + /* Probably confused lazily copied sets. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, + GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", + (void *) op, + (unsigned int) ee->element.size, + GNUNET_h2s (&ee->element_hash)); + GNUNET_MQ_send (op->mq, ev); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Check offer (of `struct GNUNET_HashCode`s). + * + * @param cls the union operation + * @param mh the message + * @return #GNUNET_OK if @a mh is well-formed + */ +static int +check_union_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + unsigned int num_hashes; + + /* look up elements and send them */ + if ((op->state->phase != PHASE_INVENTORY_PASSIVE) && + (op->state->phase != PHASE_INVENTORY_ACTIVE)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) + / sizeof(struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) != + num_hashes * sizeof(struct GNUNET_HashCode)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle offers (of `struct GNUNET_HashCode`s) and + * respond with demands (of `struct GNUNET_HashCode`s). + * + * @param cls the union operation + * @param mh the message + */ +static void +handle_union_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + + num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) + / sizeof(struct GNUNET_HashCode); + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) + { + struct ElementEntry *ee; + struct GNUNET_MessageHeader *demands; + struct GNUNET_MQ_Envelope *ev; + + ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, + hash); + if (NULL != ee) + if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) + continue; + + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, + hash)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Skipped sending duplicate demand\n"); + continue; + } + + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put ( + op->state->demanded_hashes, + hash, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Requesting element (hash %s)\n", + (void *) op, GNUNET_h2s (hash)); + ev = GNUNET_MQ_msg_header_extra (demands, + sizeof(struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); + GNUNET_memcpy (&demands[1], + hash, + sizeof(struct GNUNET_HashCode)); + GNUNET_MQ_send (op->mq, ev); + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Handle a done message from a remote peer + * + * @param cls the union operation + * @param mh the message + */ +static void +handle_union_p2p_done (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + switch (op->state->phase) + { + case PHASE_INVENTORY_PASSIVE: + /* We got all requests, but still have to send our elements in response. */ + op->state->phase = PHASE_FINISH_WAITING; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as passive partner), waiting for our demands to be satisfied\n"); + /* The active peer is done sending offers + * and inquiries. This means that all + * our responses to that (demands and offers) + * must be in flight (queued or in mesh). + * + * We should notify the active peer once + * all our demands are satisfied, so that the active + * peer can quit if we gave it everything. + */GNUNET_CADET_receive_done (op->channel); + maybe_finish (op); + return; + + case PHASE_INVENTORY_ACTIVE: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as active partner), waiting to finish\n"); + /* All demands of the other peer are satisfied, + * and we processed all offers, thus we know + * exactly what our demands must be. + * + * We'll close the channel + * to the other peer once our demands are met. + */op->state->phase = PHASE_FINISH_CLOSING; + GNUNET_CADET_receive_done (op->channel); + maybe_finish (op); + return; + + default: + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + + +/** + * Handle a over message from a remote peer + * + * @param cls the union operation + * @param mh the message + */ +static void +handle_union_p2p_over (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + send_client_done (cls); +} + + +/** + * Initiate operation to evaluate a set union with a remote peer. + * + * @param op operation to perform (to be initialized) + * @param opaque_context message to be transmitted to the listener + * to convince it to accept, may be NULL + */ +static struct OperationState * +union_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) +{ + struct OperationState *state; + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) + { + /* the context message is too large */ + GNUNET_break (0); + return NULL; + } + state = GNUNET_new (struct OperationState); + state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, + GNUNET_NO); + /* copy the current generation's strata estimator for this operation */ + state->se = strata_estimator_dup (op->set->state->se); + /* we started the operation, thus we have to send the operation request */ + state->phase = PHASE_EXPECT_SE; + state->salt_receive = state->salt_send = 42; // FIXME????? + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initiating union operation evaluation\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of initiated union operations", + 1, + GNUNET_NO); + GNUNET_MQ_send (op->mq, + ev); + + if (NULL != opaque_context) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request with context message\n"); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request without context message\n"); + + op->state = state; + initialize_key_to_element (op); + state->initial_size = GNUNET_CONTAINER_multihashmap32_size ( + state->key_to_element); + return state; +} + + +/** + * Get the incoming socket associated with the given id. + * + * @param listener the listener to look in + * @param id id to look for + * @return the incoming socket associated with the id, + * or NULL if there is none + */ +static struct Operation * +get_incoming (uint32_t id) +{ + for (struct Listener *listener = listener_head; + NULL != listener; + listener = listener->next) + { + for (struct Operation *op = listener->op_head; + NULL != op; + op = op->next) + if (op->suggest_id == id) + return op; + } + return NULL; +} + + +/** + * Destroy an incoming request from a remote peer + * + * @param op remote request to destroy + */ +static void +incoming_destroy (struct Operation *op) +{ + struct Listener *listener; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying incoming operation %p\n", + op); + if (NULL != (listener = op->listener)) + { + GNUNET_CONTAINER_DLL_remove (listener->op_head, + listener->op_tail, + op); + op->listener = NULL; + } + if (NULL != op->timeout_task) + { + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + } + _GSS_operation_destroy2 (op); +} + + +/** + * Is element @a ee part of the set used by @a op? + * + * @param ee element to test + * @param op operation the defines the set and its generation + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ +int +_GSS_is_element_of_operation (struct ElementEntry *ee, + struct Operation *op) +{ + return ee->generation >= op->generation_created; +} + + +/** + * Destroy the given operation. Used for any operation where both + * peers were known and that thus actually had a vt and channel. Must + * not be used for operations where 'listener' is still set and we do + * not know the other peer. + * + * Call the implementation-specific cancel function of the operation. + * Disconnects from the remote peer. Does not disconnect the client, + * as there may be multiple operations per set. + * + * @param op operation to destroy + */ +void +_GSS_operation_destroy (struct Operation *op) +{ + struct Set *set = op->set; + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying operation %p\n", op); + GNUNET_assert (NULL == op->listener); + if (NULL != op->state) + { + union_op_cancel (op); + op->state = NULL; + } + if (NULL != set) + { + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + op); + op->set = NULL; + } + if (NULL != op->context_msg) + { + GNUNET_free (op->context_msg); + op->context_msg = NULL; + } + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, + * there was a channel end handler that will free 'op' on the call stack. */ +} + + +/** + * Callback called when a client connects to the service. + * + * @param cls closure for the service + * @param c the new client that connected to the service + * @param mq the message queue used to send messages to the client + * @return @a `struct ClientState` + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + struct GNUNET_MQ_Handle *mq) +{ + struct ClientState *cs; + + num_clients++; + cs = GNUNET_new (struct ClientState); + cs->client = c; + cs->mq = mq; + return cs; +} + + +/** + * Iterator over hash map entries to free element entries. + * + * @param cls closure + * @param key current key code + * @param value a `struct ElementEntry *` to be free'd + * @return #GNUNET_YES (continue to iterate) + */ +static int +destroy_elements_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ElementEntry *ee = value; + + GNUNET_free (ee); + return GNUNET_YES; +} + + +/** + * Clean up after a client has disconnected + * + * @param cls closure, unused + * @param client the client to clean up after + * @param internal_cls the `struct ClientState` + */ +static void +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *internal_cls) +{ + struct ClientState *cs = internal_cls; + struct Operation *op; + struct Listener *listener; + struct Set *set; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client disconnected, cleaning up\n"); + if (NULL != (set = cs->set)) + { + struct SetContent *content = set->content; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying client's set\n"); + /* Destroy pending set operations */ + while (NULL != set->ops_head) + _GSS_operation_destroy (set->ops_head); + + /* Destroy operation-specific state */ + GNUNET_assert (NULL != set->state); + if (NULL != set->state->se) + { + strata_estimator_destroy (set->state->se); + set->state->se = NULL; + } + GNUNET_free (set->state); + + /* free set content (or at least decrement RC) */ + set->content = NULL; + GNUNET_assert (0 != content->refcount); + content->refcount--; + if (0 == content->refcount) + { + GNUNET_assert (NULL != content->elements); + GNUNET_CONTAINER_multihashmap_iterate (content->elements, + &destroy_elements_iterator, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (content->elements); + content->elements = NULL; + GNUNET_free (content); + } + GNUNET_free (set); + } + + if (NULL != (listener = cs->listener)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying client's listener\n"); + GNUNET_CADET_close_port (listener->open_port); + listener->open_port = NULL; + while (NULL != (op = listener->op_head)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Destroying incoming operation `%u' from peer `%s'\n", + (unsigned int) op->client_request_id, + GNUNET_i2s (&op->peer)); + incoming_destroy (op); + } + GNUNET_CONTAINER_DLL_remove (listener_head, + listener_tail, + listener); + GNUNET_free (listener); + } + GNUNET_free (cs); + num_clients--; + if ( (GNUNET_YES == in_shutdown) && + (0 == num_clients) ) + { + if (NULL != cadet) + { + GNUNET_CADET_disconnect (cadet); + cadet = NULL; + } + } +} + + +/** + * Check a request for a set operation from another peer. + * + * @param cls the operation state + * @param msg the received message + * @return #GNUNET_OK if the channel should be kept alive, + * #GNUNET_SYSERR to destroy the channel + */ +static int +check_incoming_msg (void *cls, + const struct OperationRequestMessage *msg) +{ + struct Operation *op = cls; + struct Listener *listener = op->listener; + const struct GNUNET_MessageHeader *nested_context; + + /* double operation request */ + if (0 != op->suggest_id) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + /* This should be equivalent to the previous condition, but can't hurt to check twice */ + if (NULL == listener) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + nested_context = GNUNET_MQ_extract_nested_mh (msg); + if ((NULL != nested_context) && + (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle a request for a set operation from another peer. Checks if we + * have a listener waiting for such a request (and in that case initiates + * asking the listener about accepting the connection). If no listener + * is waiting, we queue the operation request in hope that a listener + * shows up soon (before timeout). + * + * This msg is expected as the first and only msg handled through the + * non-operation bound virtual table, acceptance of this operation replaces + * our virtual table and subsequent msgs would be routed differently (as + * we then know what type of operation this is). + * + * @param cls the operation state + * @param msg the received message + * @return #GNUNET_OK if the channel should be kept alive, + * #GNUNET_SYSERR to destroy the channel + */ +static void +handle_incoming_msg (void *cls, + const struct OperationRequestMessage *msg) +{ + struct Operation *op = cls; + struct Listener *listener = op->listener; + const struct GNUNET_MessageHeader *nested_context; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_SETU_RequestMessage *cmsg; + + nested_context = GNUNET_MQ_extract_nested_mh (msg); + /* Make a copy of the nested_context (application-specific context + information that is opaque to set) so we can pass it to the + listener later on */ + if (NULL != nested_context) + op->context_msg = GNUNET_copy_message (nested_context); + op->remote_element_count = ntohl (msg->element_count); + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Received P2P operation request (port %s) for active listener\n", + GNUNET_h2s (&op->listener->app_id)); + GNUNET_assert (0 == op->suggest_id); + if (0 == suggest_id) + suggest_id++; + op->suggest_id = suggest_id++; + GNUNET_assert (NULL != op->timeout_task); + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + env = GNUNET_MQ_msg_nested_mh (cmsg, + GNUNET_MESSAGE_TYPE_SETU_REQUEST, + op->context_msg); + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Suggesting incoming request with accept id %u to listener %p of client %p\n", + op->suggest_id, + listener, + listener->cs); + cmsg->accept_id = htonl (op->suggest_id); + cmsg->peer_id = op->peer; + GNUNET_MQ_send (listener->cs->mq, + env); + /* NOTE: GNUNET_CADET_receive_done() will be called in + #handle_client_accept() */ +} + + +/** + * Called when a client wants to create a new set. This is typically + * the first request from a client, and includes the type of set + * operation to be performed. + * + * @param cls client that sent the message + * @param m message sent by the client + */ +static void +handle_client_create_set (void *cls, + const struct GNUNET_SETU_CreateMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client created new set for union operation\n"); + if (NULL != cs->set) + { + /* There can only be one set per client */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set = GNUNET_new (struct Set); + { + struct SetState *set_state; + + set_state = GNUNET_new (struct SetState); // FIXME: avoid this malloc, merge structs! + set_state->se = strata_estimator_create (SE_STRATA_COUNT, + SE_IBF_SIZE, SE_IBF_HASH_NUM); + if (NULL == set_state->se) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate strata estimator\n"); + GNUNET_free (set); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set->state = set_state; + } + set->content = GNUNET_new (struct SetContent); + set->content->refcount = 1; + set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, + GNUNET_YES); + set->cs = cs; + cs->set = set; + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Timeout happens iff: + * - we suggested an operation to our listener, + * but did not receive a response in time + * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST + * + * @param cls channel context + * @param tc context information (why was this task triggered now) + */ +static void +incoming_timeout_cb (void *cls) +{ + struct Operation *op = cls; + + op->timeout_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Remote peer's incoming request timed out\n"); + incoming_destroy (op); +} + + +/** + * Method called whenever another peer has added us to a channel the + * other peer initiated. Only called (once) upon reception of data + * from a channel we listen on. + * + * The channel context represents the operation itself and gets added + * to a DLL, from where it gets looked up when our local listener + * client responds to a proposed/suggested operation or connects and + * associates with this operation. + * + * @param cls closure + * @param channel new handle to the channel + * @param source peer that started the channel + * @return initial channel context for the channel + * returns NULL on error + */ +static void * +channel_new_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) +{ + struct Listener *listener = cls; + struct Operation *op; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New incoming channel\n"); + op = GNUNET_new (struct Operation); + op->listener = listener; + op->peer = *source; + op->channel = channel; + op->mq = GNUNET_CADET_get_mq (op->channel); + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, + &incoming_timeout_cb, + op); + GNUNET_CONTAINER_DLL_insert (listener->op_head, + listener->op_tail, + op); + return op; +} + + +/** + * Function called whenever a channel is destroyed. Should clean up + * any associated state. It must NOT call + * GNUNET_CADET_channel_destroy() on the channel. + * + * The peer_disconnect function is part of a a virtual table set initially either + * when a peer creates a new channel with us, or once we create + * a new channel ourselves (evaluate). + * + * Once we know the exact type of operation (union/intersection), the vt is + * replaced with an operation specific instance (_GSS_[op]_vt). + * + * @param channel_ctx place where local state associated + * with the channel is stored + * @param channel connection to the other end (henceforth invalid) + */ +static void +channel_end_cb (void *channel_ctx, + const struct GNUNET_CADET_Channel *channel) +{ + struct Operation *op = channel_ctx; + + op->channel = NULL; + _GSS_operation_destroy2 (op); +} + + +/** + * This function probably should not exist + * and be replaced by inlining more specific + * logic in the various places where it is called. + */ +void +_GSS_operation_destroy2 (struct Operation *op) +{ + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "channel_end_cb called\n"); + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + if (NULL != op->listener) + { + incoming_destroy (op); + return; + } + if (NULL != op->set) + send_client_done (op); + _GSS_operation_destroy (op); + GNUNET_free (op); +} + + +/** + * Function called whenever an MQ-channel's transmission window size changes. + * + * The first callback in an outgoing channel will be with a non-zero value + * and will mean the channel is connected to the destination. + * + * For an incoming channel it will be called immediately after the + * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. + * + * @param cls Channel closure. + * @param channel Connection to the other end (henceforth invalid). + * @param window_size New window size. If the is more messages than buffer size + * this value will be negative.. + */ +static void +channel_window_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + /* FIXME: not implemented, we could do flow control here... */ +} + + +/** + * Called when a client wants to create a new listener. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_listen (void *cls, + const struct GNUNET_SETU_ListenMessage *msg) +{ + struct ClientState *cs = cls; + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (incoming_msg, + GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST, + struct OperationRequestMessage, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_ibf, + GNUNET_MESSAGE_TYPE_SETU_P2P_IBF, + struct IBFMessage, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_elements, + GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS, + struct GNUNET_SETU_ElementMessage, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_offer, + GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_inquiry, + GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY, + struct InquiryMessage, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_demand, + GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (union_p2p_done, + GNUNET_MESSAGE_TYPE_SETU_P2P_DONE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (union_p2p_over, + GNUNET_MESSAGE_TYPE_SETU_P2P_OVER, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (union_p2p_full_done, + GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (union_p2p_request_full, + GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, + GNUNET_MESSAGE_TYPE_SETU_P2P_SE, + struct StrataEstimatorMessage, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, + GNUNET_MESSAGE_TYPE_SETU_P2P_SEC, + struct StrataEstimatorMessage, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_full_element, + GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT, + struct GNUNET_SETU_ElementMessage, + NULL), + GNUNET_MQ_handler_end () + }; + struct Listener *listener; + + if (NULL != cs->listener) + { + /* max. one active listener per client! */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + listener = GNUNET_new (struct Listener); + listener->cs = cs; + cs->listener = listener; + listener->app_id = msg->app_id; + GNUNET_CONTAINER_DLL_insert (listener_head, + listener_tail, + listener); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New listener created (port %s)\n", + GNUNET_h2s (&listener->app_id)); + listener->open_port = GNUNET_CADET_open_port (cadet, + &msg->app_id, + &channel_new_cb, + listener, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called when the listening client rejects an operation + * request by another peer. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_reject (void *cls, + const struct GNUNET_SETU_RejectMessage *msg) +{ + struct ClientState *cs = cls; + struct Operation *op; + + op = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == op) + { + /* no matching incoming operation for this reject; + could be that the other peer already disconnected... */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client rejected unknown operation %u\n", + (unsigned int) ntohl (msg->accept_reject_id)); + GNUNET_SERVICE_client_continue (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer request (app %s) rejected by client\n", + GNUNET_h2s (&cs->listener->app_id)); + _GSS_operation_destroy2 (op); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called when a client wants to add or remove an element to a set it inhabits. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static int +check_client_set_add (void *cls, + const struct GNUNET_SETU_ElementMessage *msg) +{ + /* NOTE: Technically, we should probably check with the + block library whether the element we are given is well-formed */ + return GNUNET_OK; +} + + +/** + * Called when a client wants to add or remove an element to a set it inhabits. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_set_add (void *cls, + const struct GNUNET_SETU_ElementMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + struct GNUNET_SETU_Element el; + struct ElementEntry *ee; + struct GNUNET_HashCode hash; + + if (NULL == (set = cs->set)) + { + /* client without a set requested an operation */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_SERVICE_client_continue (cs->client); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); + GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type)); + el.size = ntohs (msg->header.size) - sizeof(*msg); + el.data = &msg[1]; + el.element_type = ntohs (msg->element_type); + GNUNET_SETU_element_hash (&el, + &hash); + ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, + &hash); + if (NULL == ee) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserts element %s of size %u\n", + GNUNET_h2s (&hash), + el.size); + ee = GNUNET_malloc (el.size + sizeof(*ee)); + ee->element.size = el.size; + GNUNET_memcpy (&ee[1], el.data, el.size); + ee->element.data = &ee[1]; + ee->element.element_type = el.element_type; + ee->remote = GNUNET_NO; + ee->generation = set->current_generation; + ee->element_hash = hash; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put ( + set->content->elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserted element %s of size %u twice (ignored)\n", + GNUNET_h2s (&hash), + el.size); + /* same element inserted twice */ + return; + } + strata_estimator_insert (set->state->se, + get_ibf_key (&ee->element_hash)); +} + + +/** + * Advance the current generation of a set, + * adding exclusion ranges if necessary. + * + * @param set the set where we want to advance the generation + */ +static void +advance_generation (struct Set *set) +{ + set->content->latest_generation++; + set->current_generation++; +} + + +/** + * Called when a client wants to initiate a set operation with another + * peer. Initiates the CADET connection to the listener and sends the + * request. + * + * @param cls client that sent the message + * @param msg message sent by the client + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_client_evaluate (void *cls, + const struct GNUNET_SETU_EvaluateMessage *msg) +{ + /* FIXME: suboptimal, even if the context below could be NULL, + there are malformed messages this does not check for... */ + return GNUNET_OK; +} + + +/** + * Called when a client wants to initiate a set operation with another + * peer. Initiates the CADET connection to the listener and sends the + * request. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_evaluate (void *cls, + const struct GNUNET_SETU_EvaluateMessage *msg) +{ + struct ClientState *cs = cls; + struct Operation *op = GNUNET_new (struct Operation); + const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (incoming_msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + struct OperationRequestMessage, + op), + GNUNET_MQ_hd_var_size (union_p2p_ibf, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, + struct IBFMessage, + op), + GNUNET_MQ_hd_var_size (union_p2p_elements, + GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, + struct GNUNET_SETU_ElementMessage, + op), + GNUNET_MQ_hd_var_size (union_p2p_offer, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (union_p2p_inquiry, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, + struct InquiryMessage, + op), + GNUNET_MQ_hd_var_size (union_p2p_demand, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_fixed_size (union_p2p_done, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_fixed_size (union_p2p_over, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_fixed_size (union_p2p_full_done, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_fixed_size (union_p2p_request_full, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, + struct StrataEstimatorMessage, + op), + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, + struct StrataEstimatorMessage, + op), + GNUNET_MQ_hd_var_size (union_p2p_full_element, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, + struct GNUNET_SETU_ElementMessage, + op), + GNUNET_MQ_handler_end () + }; + struct Set *set; + const struct GNUNET_MessageHeader *context; + + if (NULL == (set = cs->set)) + { + GNUNET_break (0); + GNUNET_free (op); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + op->peer = msg->target_peer; + op->client_request_id = ntohl (msg->request_id); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; + context = GNUNET_MQ_extract_nested_mh (msg); + + /* Advance generation values, so that + mutations won't interfer with the running operation. */ + op->set = set; + op->generation_created = set->current_generation; + advance_generation (set); + GNUNET_CONTAINER_DLL_insert (set->ops_head, + set->ops_tail, + op); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new CADET channel to port %s for set union\n", + GNUNET_h2s (&msg->app_id)); + op->channel = GNUNET_CADET_channel_create (cadet, + op, + &msg->target_peer, + &msg->app_id, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); + op->mq = GNUNET_CADET_get_mq (op->channel); + op->state = union_evaluate (op, context); + if (NULL == op->state) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Handle a request from the client to cancel a running set operation. + * + * @param cls the client + * @param msg the message + */ +static void +handle_client_cancel (void *cls, + const struct GNUNET_SETU_CancelMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + struct Operation *op; + int found; + + if (NULL == (set = cs->set)) + { + /* client without a set requested an operation */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + found = GNUNET_NO; + for (op = set->ops_head; NULL != op; op = op->next) + { + if (op->client_request_id == ntohl (msg->request_id)) + { + found = GNUNET_YES; + break; + } + } + if (GNUNET_NO == found) + { + /* It may happen that the operation was already destroyed due to + * the other peer disconnecting. The client may not know about this + * yet and try to cancel the (just barely non-existent) operation. + * So this is not a hard error. + */// + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client canceled non-existent op %u\n", + (uint32_t) ntohl (msg->request_id)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client requested cancel for op %u\n", + (uint32_t) ntohl (msg->request_id)); + _GSS_operation_destroy (op); + } + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Handle a request from the client to accept a set operation that + * came from a remote peer. We forward the accept to the associated + * operation for handling + * + * @param cls the client + * @param msg the message + */ +static void +handle_client_accept (void *cls, + const struct GNUNET_SETU_AcceptMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + struct Operation *op; + struct GNUNET_SETU_ResultMessage *result_message; + struct GNUNET_MQ_Envelope *ev; + struct Listener *listener; + + if (NULL == (set = cs->set)) + { + /* client without a set requested to accept */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + op = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == op) + { + /* It is not an error if the set op does not exist -- it may + * have been destroyed when the partner peer disconnected. */ + GNUNET_log ( + GNUNET_ERROR_TYPE_INFO, + "Client %p accepted request %u of listener %p that is no longer active\n", + cs, + ntohl (msg->accept_reject_id), + cs->listener); + ev = GNUNET_MQ_msg (result_message, + GNUNET_MESSAGE_TYPE_SET_RESULT); + result_message->request_id = msg->request_id; + result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE); + GNUNET_MQ_send (set->cs->mq, ev); + GNUNET_SERVICE_client_continue (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client accepting request %u\n", + (uint32_t) ntohl (msg->accept_reject_id)); + listener = op->listener; + op->listener = NULL; + GNUNET_CONTAINER_DLL_remove (listener->op_head, + listener->op_tail, + op); + op->set = set; + GNUNET_CONTAINER_DLL_insert (set->ops_head, + set->ops_tail, + op); + op->client_request_id = ntohl (msg->request_id); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; + + /* Advance generation values, so that future mutations do not + interfer with the running operation. */ + op->generation_created = set->current_generation; + advance_generation (set); + GNUNET_assert (NULL == op->state); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "accepting set union operation\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of accepted union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + { + struct OperationState *state; + const struct StrataEstimator *se; + struct GNUNET_MQ_Envelope *ev; + struct StrataEstimatorMessage *strata_msg; + char *buf; + size_t len; + uint16_t type; + + state = GNUNET_new (struct OperationState); // FIXME: merge with 'op' to avoid malloc! + state->se = strata_estimator_dup (op->set->state->se); + state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, + GNUNET_NO); + state->salt_receive = state->salt_send = 42; // FIXME????? + op->state = state; + initialize_key_to_element (op); + state->initial_size = GNUNET_CONTAINER_multihashmap32_size ( + state->key_to_element); + + /* kick off the operation */ + se = state->se; + buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); + len = strata_estimator_write (se, + buf); + if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; + else + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; + ev = GNUNET_MQ_msg_extra (strata_msg, + len, + type); + GNUNET_memcpy (&strata_msg[1], + buf, + len); + GNUNET_free (buf); + strata_msg->set_size + = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size ( + op->set->content->elements)); + GNUNET_MQ_send (op->mq, + ev); + state->phase = PHASE_EXPECT_IBF; + + op->state = state; + } + /* Now allow CADET to continue, as we did not do this in + #handle_incoming_msg (as we wanted to first see if the + local client would accept the request). */ + GNUNET_CADET_receive_done (op->channel); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called to clean up, after a shutdown has been requested. + * + * @param cls closure, NULL + */ +static void +shutdown_task (void *cls) +{ + /* Delay actual shutdown to allow service to disconnect clients */ + in_shutdown = GNUNET_YES; + if (0 == num_clients) + { + if (NULL != cadet) + { + GNUNET_CADET_disconnect (cadet); + cadet = NULL; + } + } + GNUNET_STATISTICS_destroy (_GSS_statistics, + GNUNET_YES); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "handled shutdown request\n"); +} + + +/** + * Function called by the service's run + * method to run service-specific setup code. + * + * @param cls closure + * @param cfg configuration to use + * @param service the initialized service + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_SERVICE_Handle *service) +{ + /* FIXME: need to modify SERVICE (!) API to allow + us to run a shutdown task *after* clients were + forcefully disconnected! */ + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); + _GSS_statistics = GNUNET_STATISTICS_create ("setu", cfg); + cadet = GNUNET_CADET_connect (cfg); + if (NULL == cadet) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _ ("Could not connect to CADET service\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Define "main" method using service macro. + */ +GNUNET_SERVICE_MAIN ( + "set", + GNUNET_SERVICE_OPTION_NONE, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_fixed_size (client_accept, + GNUNET_MESSAGE_TYPE_SETU_ACCEPT, + struct GNUNET_SETU_AcceptMessage, + NULL), + GNUNET_MQ_hd_var_size (client_set_add, + GNUNET_MESSAGE_TYPE_SETU_ADD, + struct GNUNET_SETU_ElementMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_create_set, + GNUNET_MESSAGE_TYPE_SETU_CREATE, + struct GNUNET_SETU_CreateMessage, + NULL), + GNUNET_MQ_hd_var_size (client_evaluate, + GNUNET_MESSAGE_TYPE_SETU_EVALUATE, + struct GNUNET_SETU_EvaluateMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_listen, + GNUNET_MESSAGE_TYPE_SETU_LISTEN, + struct GNUNET_SETU_ListenMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_reject, + GNUNET_MESSAGE_TYPE_SETU_REJECT, + struct GNUNET_SETU_RejectMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_cancel, + GNUNET_MESSAGE_TYPE_SETU_CANCEL, + struct GNUNET_SETU_CancelMessage, + NULL), + GNUNET_MQ_handler_end ()); + + +/* end of gnunet-service-setu.c */ diff --git a/src/setu/gnunet-service-setu.h b/src/setu/gnunet-service-setu.h new file mode 100644 index 000000000..eb6b7a8e5 --- /dev/null +++ b/src/setu/gnunet-service-setu.h @@ -0,0 +1,393 @@ +/* + This file is part of GNUnet + Copyright (C) 2013-2017 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/gnunet-service-setu.h + * @brief common components for the implementation the different set operations + * @author Florian Dold + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_SETU_H_PRIVATE +#define GNUNET_SERVICE_SETU_H_PRIVATE + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_applications.h" +#include "gnunet_core_service.h" +#include "gnunet_cadet_service.h" +#include "gnunet_setu_service.h" +#include "setu.h" + + +/** + * Implementation-specific set state. Used as opaque pointer, and + * specified further in the respective implementation. + */ +struct SetState; + +/** + * Implementation-specific set operation. Used as opaque pointer, and + * specified further in the respective implementation. + */ +struct OperationState; + +/** + * A set that supports a specific operation with other peers. + */ +struct Set; + +/** + * Information about an element element in the set. All elements are + * stored in a hash-table from their hash-code to their 'struct + * Element', so that the remove and add operations are reasonably + * fast. + */ +struct ElementEntry; + +/** + * Operation context used to execute a set operation. + */ +struct Operation; + + +/** + * Information about an element element in the set. All elements are + * stored in a hash-table from their hash-code to their `struct + * Element`, so that the remove and add operations are reasonably + * fast. + */ +struct ElementEntry +{ + /** + * The actual element. The data for the element + * should be allocated at the end of this struct. + */ + struct GNUNET_SETU_Element element; + + /** + * Hash of the element. For set union: Will be used to derive the + * different IBF keys for different salts. + */ + struct GNUNET_HashCode element_hash; + + /** + * First generation that includes this element. + */ + unsigned int generation; + + /** + * #GNUNET_YES if the element is a remote element, and does not belong + * to the operation's set. + */ + int remote; +}; + + +/** + * A listener is inhabited by a client, and waits for evaluation + * requests from remote peers. + */ +struct Listener; + + +/** + * State we keep per client. + */ +struct ClientState +{ + /** + * Set, if associated with the client, otherwise NULL. + */ + struct Set *set; + + /** + * Listener, if associated with the client, otherwise NULL. + */ + struct Listener *listener; + + /** + * Client handle. + */ + struct GNUNET_SERVICE_Client *client; + + /** + * Message queue. + */ + struct GNUNET_MQ_Handle *mq; +}; + + +/** + * Operation context used to execute a set operation. + */ +struct Operation +{ + /** + * Kept in a DLL of the listener, if @e listener is non-NULL. + */ + struct Operation *next; + + /** + * Kept in a DLL of the listener, if @e listener is non-NULL. + */ + struct Operation *prev; + + /** + * Channel to the peer. + */ + struct GNUNET_CADET_Channel *channel; + + /** + * Port this operation runs on. + */ + struct Listener *listener; + + /** + * Message queue for the channel. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Context message, may be NULL. + */ + struct GNUNET_MessageHeader *context_msg; + + /** + * Set associated with the operation, NULL until the spec has been + * associated with a set. + */ + struct Set *set; + + /** + * Operation-specific operation state. Note that the exact + * type depends on this being a union or intersection operation + * (and thus on @e vt). + */ + struct OperationState *state; + + /** + * The identity of the requesting peer. Needs to + * be stored here as the op spec might not have been created yet. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Timeout task, if the incoming peer has not been accepted + * after the timeout, it will be disconnected. + */ + struct GNUNET_SCHEDULER_Task *timeout_task; + + /** + * Salt to use for the operation. + */ + uint32_t salt; + + /** + * Remote peers element count + */ + uint32_t remote_element_count; + + /** + * ID used to identify an operation between service and client + */ + uint32_t client_request_id; + + /** + * Always use delta operation instead of sending full sets, + * even it it's less efficient. + */ + int force_delta; + + /** + * Always send full sets, even if delta operations would + * be more efficient. + */ + int force_full; + + /** + * #GNUNET_YES to fail operations where Byzantine faults + * are suspected + */ + int byzantine; + + /** + * Lower bound for the set size, used only when + * byzantine mode is enabled. + */ + int byzantine_lower_bound; + + /** + * Unique request id for the request from a remote peer, sent to the + * client, which will accept or reject the request. Set to '0' iff + * the request has not been suggested yet. + */ + uint32_t suggest_id; + + /** + * Generation in which the operation handle + * was created. + */ + unsigned int generation_created; +}; + + +/** + * SetContent stores the actual set elements, which may be shared by + * multiple generations derived from one set. + */ +struct SetContent +{ + /** + * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`. + */ + struct GNUNET_CONTAINER_MultiHashMap *elements; + + /** + * Number of references to the content. + */ + unsigned int refcount; + + /** + * FIXME: document! + */ + unsigned int latest_generation; + + /** + * Number of concurrently active iterators. + */ + int iterator_count; +}; + + +struct GenerationRange +{ + /** + * First generation that is excluded. + */ + unsigned int start; + + /** + * Generation after the last excluded generation. + */ + unsigned int end; +}; + + +/** + * A set that supports a specific operation with other peers. + */ +struct Set +{ + /** + * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`). + */ + struct Set *next; + + /** + * Sets are held in a doubly linked list. + */ + struct Set *prev; + + /** + * Client that owns the set. Only one client may own a set, + * and there can only be one set per client. + */ + struct ClientState *cs; + + /** + * Content, possibly shared by multiple sets, + * and thus reference counted. + */ + struct SetContent *content; + + /** + * Implementation-specific state. + */ + struct SetState *state; + + /** + * Evaluate operations are held in a linked list. + */ + struct Operation *ops_head; + + /** + * Evaluate operations are held in a linked list. + */ + struct Operation *ops_tail; + + /** + * Current generation, that is, number of previously executed + * operations and lazy copies on the underlying set content. + */ + unsigned int current_generation; + +}; + + +extern struct GNUNET_STATISTICS_Handle *_GSS_statistics; + + +/** + * Destroy the given operation. Used for any operation where both + * peers were known and that thus actually had a vt and channel. Must + * not be used for operations where 'listener' is still set and we do + * not know the other peer. + * + * Call the implementation-specific cancel function of the operation. + * Disconnects from the remote peer. Does not disconnect the client, + * as there may be multiple operations per set. + * + * @param op operation to destroy + */ +void +_GSS_operation_destroy (struct Operation *op); + + +/** + * This function probably should not exist + * and be replaced by inlining more specific + * logic in the various places where it is called. + */ +void +_GSS_operation_destroy2 (struct Operation *op); + + +/** + * Get the table with implementing functions for set union. + * + * @return the operation specific VTable + */ +const struct SetVT * +_GSS_union_vt (void); + + +/** + * Is element @a ee part of the set used by @a op? + * + * @param ee element to test + * @param op operation the defines the set and its generation + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ +int +_GSS_is_element_of_operation (struct ElementEntry *ee, + struct Operation *op); + + +#endif diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h new file mode 100644 index 000000000..a2803ee47 --- /dev/null +++ b/src/setu/gnunet-service-setu_protocol.h @@ -0,0 +1,226 @@ +/* + This file is part of GNUnet. + Copyright (C) 2013, 2014 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @author Florian Dold + * @author Christian Grothoff + * @file set/gnunet-service-set_protocol.h + * @brief Peer-to-Peer messages for gnunet set + */ +#ifndef SET_PROTOCOL_H +#define SET_PROTOCOL_H + +#include "platform.h" +#include "gnunet_common.h" + + +GNUNET_NETWORK_STRUCT_BEGIN + +struct OperationRequestMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST + */ + struct GNUNET_MessageHeader header; + + /** + * Operation to request, values from `enum GNUNET_SET_OperationType` + */ + uint32_t operation GNUNET_PACKED; + + /** + * For Intersection: my element count + */ + uint32_t element_count GNUNET_PACKED; + + /** + * Application-specific identifier of the request. + */ + struct GNUNET_HashCode app_idX; + + /* rest: optional message */ +}; + + +/** + * Message containing buckets of an invertible bloom filter. + * + * If an IBF has too many buckets for an IBF message, + * it is split into multiple messages. + */ +struct IBFMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF + */ + struct GNUNET_MessageHeader header; + + /** + * Order of the whole ibf, where + * num_buckets = 2^order + */ + uint8_t order; + + /** + * Padding, must be 0. + */ + uint8_t reserved1; + + /** + * Padding, must be 0. + */ + uint16_t reserved2 GNUNET_PACKED; + + /** + * Offset of the strata in the rest of the message + */ + uint32_t offset GNUNET_PACKED; + + /** + * Salt used when hashing elements for this IBF. + */ + uint32_t salt GNUNET_PACKED; + + /* rest: buckets */ +}; + + +struct InquiryMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF + */ + struct GNUNET_MessageHeader header; + + /** + * Salt used when hashing elements for this inquiry. + */ + uint32_t salt GNUNET_PACKED; + + /** + * Reserved, set to 0. + */ + uint32_t reserved GNUNET_PACKED; + + /* rest: inquiry IBF keys */ +}; + + +/** + * During intersection, the first (and possibly second) message + * send it the number of elements in the set, to allow the peers + * to decide who should start with the Bloom filter. + */ +struct IntersectionElementInfoMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO + */ + struct GNUNET_MessageHeader header; + + /** + * mutator used with this bloomfilter. + */ + uint32_t sender_element_count GNUNET_PACKED; +}; + + +/** + * Bloom filter messages exchanged for set intersection calculation. + */ +struct BFMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF + */ + struct GNUNET_MessageHeader header; + + /** + * Number of elements the sender still has in the set. + */ + uint32_t sender_element_count GNUNET_PACKED; + + /** + * XOR of all hashes over all elements remaining in the set. + * Used to determine termination. + */ + struct GNUNET_HashCode element_xor_hash; + + /** + * Mutator used with this bloomfilter. + */ + uint32_t sender_mutator GNUNET_PACKED; + + /** + * Total length of the bloomfilter data. + */ + uint32_t bloomfilter_total_length GNUNET_PACKED; + + /** + * Number of bits (k-value) used in encoding the bloomfilter. + */ + uint32_t bits_per_element GNUNET_PACKED; + + /** + * rest: the sender's bloomfilter + */ +}; + + +/** + * Last message, send to confirm the final set. Contains the element + * count as it is possible that the peer determined that we were done + * by getting the empty set, which in that case also needs to be + * communicated. + */ +struct IntersectionDoneMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE + */ + struct GNUNET_MessageHeader header; + + /** + * Final number of elements in intersection. + */ + uint32_t final_element_count GNUNET_PACKED; + + /** + * XOR of all hashes over all elements remaining in the set. + */ + struct GNUNET_HashCode element_xor_hash; +}; + + +/** + * Strata estimator together with the peer's overall set size. + */ +struct StrataEstimatorMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C) + */ + struct GNUNET_MessageHeader header; + + uint64_t set_size; +}; + +GNUNET_NETWORK_STRUCT_END + +#endif diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c new file mode 100644 index 000000000..0fa6a6f17 --- /dev/null +++ b/src/setu/gnunet-service-setu_strata_estimator.c @@ -0,0 +1,303 @@ +/* + This file is part of GNUnet + Copyright (C) 2012 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/gnunet-service-setu_strata_estimator.c + * @brief invertible bloom filter + * @author Florian Dold + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "ibf.h" +#include "gnunet-service-setu_strata_estimator.h" + + +/** + * Should we try compressing the strata estimator? This will + * break compatibility with the 0.10.1-network. + */ +#define FAIL_10_1_COMPATIBILTIY 1 + + +/** + * Write the given strata estimator to the buffer. + * + * @param se strata estimator to serialize + * @param[out] buf buffer to write to, must be of appropriate size + * @return number of bytes written to @a buf + */ +size_t +strata_estimator_write (const struct StrataEstimator *se, + void *buf) +{ + char *sbuf = buf; + unsigned int i; + size_t osize; + + GNUNET_assert (NULL != se); + for (i = 0; i < se->strata_count; i++) + { + ibf_write_slice (se->strata[i], + 0, + se->ibf_size, + &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]); + } + osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; +#if FAIL_10_1_COMPATIBILTIY + { + char *cbuf; + size_t nsize; + + if (GNUNET_YES == + GNUNET_try_compression (buf, + osize, + &cbuf, + &nsize)) + { + GNUNET_memcpy (buf, cbuf, nsize); + osize = nsize; + GNUNET_free (cbuf); + } + } +#endif + return osize; +} + + +/** + * Read strata from the buffer into the given strata + * estimator. The strata estimator must already be allocated. + * + * @param buf buffer to read from + * @param buf_len number of bytes in @a buf + * @param is_compressed is the data compressed? + * @param[out] se strata estimator to write to + * @return #GNUNET_OK on success + */ +int +strata_estimator_read (const void *buf, + size_t buf_len, + int is_compressed, + struct StrataEstimator *se) +{ + unsigned int i; + size_t osize; + char *dbuf; + + dbuf = NULL; + if (GNUNET_YES == is_compressed) + { + osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count; + dbuf = GNUNET_decompress (buf, + buf_len, + osize); + if (NULL == dbuf) + { + GNUNET_break_op (0); /* bad compressed input data */ + return GNUNET_SYSERR; + } + buf = dbuf; + buf_len = osize; + } + + if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE) + { + GNUNET_break (0); /* very odd error */ + GNUNET_free (dbuf); + return GNUNET_SYSERR; + } + + for (i = 0; i < se->strata_count; i++) + { + ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); + buf += se->ibf_size * IBF_BUCKET_SIZE; + } + GNUNET_free (dbuf); + return GNUNET_OK; +} + + +/** + * Add a key to the strata estimator. + * + * @param se strata estimator to add the key to + * @param key key to add + */ +void +strata_estimator_insert (struct StrataEstimator *se, + struct IBF_Key key) +{ + uint64_t v; + unsigned int i; + + v = key.key_val; + /* count trailing '1'-bits of v */ + for (i = 0; v & 1; v >>= 1, i++) + /* empty */; + ibf_insert (se->strata[i], key); +} + + +/** + * Remove a key from the strata estimator. + * + * @param se strata estimator to remove the key from + * @param key key to remove + */ +void +strata_estimator_remove (struct StrataEstimator *se, + struct IBF_Key key) +{ + uint64_t v; + unsigned int i; + + v = key.key_val; + /* count trailing '1'-bits of v */ + for (i = 0; v & 1; v >>= 1, i++) + /* empty */; + ibf_remove (se->strata[i], key); +} + + +/** + * Create a new strata estimator with the given parameters. + * + * @param strata_count number of stratas, that is, number of ibfs in the estimator + * @param ibf_size size of each ibf stratum + * @param ibf_hashnum hashnum parameter of each ibf + * @return a freshly allocated, empty strata estimator, NULL on error + */ +struct StrataEstimator * +strata_estimator_create (unsigned int strata_count, + uint32_t ibf_size, + uint8_t ibf_hashnum) +{ + struct StrataEstimator *se; + unsigned int i; + unsigned int j; + + se = GNUNET_new (struct StrataEstimator); + se->strata_count = strata_count; + se->ibf_size = ibf_size; + se->strata = GNUNET_new_array (strata_count, + struct InvertibleBloomFilter *); + for (i = 0; i < strata_count; i++) + { + se->strata[i] = ibf_create (ibf_size, ibf_hashnum); + if (NULL == se->strata[i]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate memory for strata estimator\n"); + for (j = 0; j < i; j++) + ibf_destroy (se->strata[i]); + GNUNET_free (se); + return NULL; + } + } + return se; +} + + +/** + * Estimate set difference with two strata estimators, + * i.e. arrays of IBFs. + * Does not not modify its arguments. + * + * @param se1 first strata estimator + * @param se2 second strata estimator + * @return the estimated difference + */ +unsigned int +strata_estimator_difference (const struct StrataEstimator *se1, + const struct StrataEstimator *se2) +{ + unsigned int count; + + GNUNET_assert (se1->strata_count == se2->strata_count); + count = 0; + for (int i = se1->strata_count - 1; i >= 0; i--) + { + struct InvertibleBloomFilter *diff; + /* number of keys decoded from the ibf */ + + /* FIXME: implement this without always allocating new IBFs */ + diff = ibf_dup (se1->strata[i]); + ibf_subtract (diff, se2->strata[i]); + for (int ibf_count = 0; GNUNET_YES; ibf_count++) + { + int more; + + more = ibf_decode (diff, NULL, NULL); + if (GNUNET_NO == more) + { + count += ibf_count; + break; + } + /* Estimate if decoding fails or would not terminate */ + if ((GNUNET_SYSERR == more) || (ibf_count > diff->size)) + { + ibf_destroy (diff); + return count * (1 << (i + 1)); + } + } + ibf_destroy (diff); + } + return count; +} + + +/** + * Make a copy of a strata estimator. + * + * @param se the strata estimator to copy + * @return the copy + */ +struct StrataEstimator * +strata_estimator_dup (struct StrataEstimator *se) +{ + struct StrataEstimator *c; + unsigned int i; + + c = GNUNET_new (struct StrataEstimator); + c->strata_count = se->strata_count; + c->ibf_size = se->ibf_size; + c->strata = GNUNET_new_array (se->strata_count, + struct InvertibleBloomFilter *); + for (i = 0; i < se->strata_count; i++) + c->strata[i] = ibf_dup (se->strata[i]); + return c; +} + + +/** + * Destroy a strata estimator, free all of its resources. + * + * @param se strata estimator to destroy. + */ +void +strata_estimator_destroy (struct StrataEstimator *se) +{ + unsigned int i; + + for (i = 0; i < se->strata_count; i++) + ibf_destroy (se->strata[i]); + GNUNET_free (se->strata); + GNUNET_free (se); +} diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h new file mode 100644 index 000000000..afdbcdbbf --- /dev/null +++ b/src/setu/gnunet-service-setu_strata_estimator.h @@ -0,0 +1,169 @@ +/* + This file is part of GNUnet + Copyright (C) 2012 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/gnunet-service-setu_strata_estimator.h + * @brief estimator of set difference + * @author Florian Dold + */ + +#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H +#define GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H + +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_util_lib.h" + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + + +/** + * A handle to a strata estimator. + */ +struct StrataEstimator +{ + /** + * The IBFs of this strata estimator. + */ + struct InvertibleBloomFilter **strata; + + /** + * Size of the IBF array in @e strata + */ + unsigned int strata_count; + + /** + * Size of each IBF stratum (in bytes) + */ + unsigned int ibf_size; +}; + + +/** + * Write the given strata estimator to the buffer. + * + * @param se strata estimator to serialize + * @param[out] buf buffer to write to, must be of appropriate size + * @return number of bytes written to @a buf + */ +size_t +strata_estimator_write (const struct StrataEstimator *se, + void *buf); + + +/** + * Read strata from the buffer into the given strata + * estimator. The strata estimator must already be allocated. + * + * @param buf buffer to read from + * @param buf_len number of bytes in @a buf + * @param is_compressed is the data compressed? + * @param[out] se strata estimator to write to + * @return #GNUNET_OK on success + */ +int +strata_estimator_read (const void *buf, + size_t buf_len, + int is_compressed, + struct StrataEstimator *se); + + +/** + * Create a new strata estimator with the given parameters. + * + * @param strata_count number of stratas, that is, number of ibfs in the estimator + * @param ibf_size size of each ibf stratum + * @param ibf_hashnum hashnum parameter of each ibf + * @return a freshly allocated, empty strata estimator, NULL on error + */ +struct StrataEstimator * +strata_estimator_create (unsigned int strata_count, + uint32_t ibf_size, + uint8_t ibf_hashnum); + + +/** + * Get an estimation of the symmetric difference of the elements + * contained in both strata estimators. + * + * @param se1 first strata estimator + * @param se2 second strata estimator + * @return abs(|se1| - |se2|) + */ +unsigned int +strata_estimator_difference (const struct StrataEstimator *se1, + const struct StrataEstimator *se2); + + +/** + * Add a key to the strata estimator. + * + * @param se strata estimator to add the key to + * @param key key to add + */ +void +strata_estimator_insert (struct StrataEstimator *se, + struct IBF_Key key); + + +/** + * Remove a key from the strata estimator. + * + * @param se strata estimator to remove the key from + * @param key key to remove + */ +void +strata_estimator_remove (struct StrataEstimator *se, + struct IBF_Key key); + + +/** + * Destroy a strata estimator, free all of its resources. + * + * @param se strata estimator to destroy. + */ +void +strata_estimator_destroy (struct StrataEstimator *se); + + +/** + * Make a copy of a strata estimator. + * + * @param se the strata estimator to copy + * @return the copy + */ +struct StrataEstimator * +strata_estimator_dup (struct StrataEstimator *se); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/setu/gnunet-setu-ibf-profiler.c b/src/setu/gnunet-setu-ibf-profiler.c new file mode 100644 index 000000000..944b63d30 --- /dev/null +++ b/src/setu/gnunet-setu-ibf-profiler.c @@ -0,0 +1,308 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/gnunet-set-ibf-profiler.c + * @brief tool for profiling the invertible bloom filter implementation + * @author Florian Dold + */ + +#include "platform.h" +#include "gnunet_util_lib.h" + +#include "ibf.h" + +static unsigned int asize = 10; +static unsigned int bsize = 10; +static unsigned int csize = 10; +static unsigned int hash_num = 4; +static unsigned int ibf_size = 80; + +/* FIXME: add parameter for this */ +static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK; + +static struct GNUNET_CONTAINER_MultiHashMap *set_a; +static struct GNUNET_CONTAINER_MultiHashMap *set_b; +/* common elements in a and b */ +static struct GNUNET_CONTAINER_MultiHashMap *set_c; + +static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode; + +static struct InvertibleBloomFilter *ibf_a; +static struct InvertibleBloomFilter *ibf_b; + + +static void +register_hashcode (struct GNUNET_HashCode *hash) +{ + struct GNUNET_HashCode replicated; + struct IBF_Key key; + + key = ibf_key_from_hashcode (hash); + ibf_hashcode_from_key (key, &replicated); + (void) GNUNET_CONTAINER_multihashmap_put ( + key_to_hashcode, + &replicated, + GNUNET_memdup (hash, sizeof *hash), + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); +} + + +static void +iter_hashcodes (struct IBF_Key key, + GNUNET_CONTAINER_MulitHashMapIteratorCallback iter, + void *cls) +{ + struct GNUNET_HashCode replicated; + + ibf_hashcode_from_key (key, &replicated); + GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, + &replicated, + iter, + cls); +} + + +static int +insert_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) +{ + struct InvertibleBloomFilter *ibf = cls; + + ibf_insert (ibf, ibf_key_from_hashcode (key)); + return GNUNET_YES; +} + + +static int +remove_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) +{ + struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls; + + /* if remove fails, there just was a collision with another key */ + (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL); + return GNUNET_YES; +} + + +static void +run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_HashCode id; + struct IBF_Key ibf_key; + int i; + int side; + int res; + struct GNUNET_TIME_Absolute start_time; + struct GNUNET_TIME_Relative delta_time; + + set_a = + GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)), + GNUNET_NO); + set_b = + GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)), + GNUNET_NO); + set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize), + GNUNET_NO); + + key_to_hashcode = + GNUNET_CONTAINER_multihashmap_create (((asize + bsize + csize == 0) + ? 1 + : (asize + bsize + csize)), + GNUNET_NO); + + printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n", + hash_num, + ibf_size, + asize, + bsize, + csize); + + i = 0; + while (i < asize) + { + GNUNET_CRYPTO_hash_create_random (random_quality, &id); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) + continue; + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put ( + set_a, + &id, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + register_hashcode (&id); + i++; + } + i = 0; + while (i < bsize) + { + GNUNET_CRYPTO_hash_create_random (random_quality, &id); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) + continue; + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) + continue; + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put ( + set_b, + &id, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + register_hashcode (&id); + i++; + } + i = 0; + while (i < csize) + { + GNUNET_CRYPTO_hash_create_random (random_quality, &id); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) + continue; + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) + continue; + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id)) + continue; + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put ( + set_c, + &id, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + register_hashcode (&id); + i++; + } + + ibf_a = ibf_create (ibf_size, hash_num); + ibf_b = ibf_create (ibf_size, hash_num); + if ((NULL == ibf_a) || (NULL == ibf_b)) + { + /* insufficient memory */ + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + } + + + printf ("generated sets\n"); + + start_time = GNUNET_TIME_absolute_get (); + + GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a); + GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b); + GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a); + GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b); + + delta_time = GNUNET_TIME_absolute_get_duration (start_time); + + printf ("encoded in: %s\n", + GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO)); + + ibf_subtract (ibf_a, ibf_b); + + + start_time = GNUNET_TIME_absolute_get (); + + for (i = 0; i <= asize + bsize; i++) + { + res = ibf_decode (ibf_a, &side, &ibf_key); + if (GNUNET_SYSERR == res) + { + printf ("decode failed, %u/%u elements left\n", + GNUNET_CONTAINER_multihashmap_size (set_a) + + GNUNET_CONTAINER_multihashmap_size (set_b), + asize + bsize); + return; + } + if (GNUNET_NO == res) + { + if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) && + (0 == GNUNET_CONTAINER_multihashmap_size (set_a))) + { + delta_time = GNUNET_TIME_absolute_get_duration (start_time); + printf ("decoded successfully in: %s\n", + GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO)); + } + else + { + printf ("decode missed elements (should never happen)\n"); + } + return; + } + + if (side == 1) + iter_hashcodes (ibf_key, remove_iterator, set_a); + if (side == -1) + iter_hashcodes (ibf_key, remove_iterator, set_b); + } + printf ("cyclic IBF, %u/%u elements left\n", + GNUNET_CONTAINER_multihashmap_size (set_a) + + GNUNET_CONTAINER_multihashmap_size (set_b), + asize + bsize); +} + + +int +main (int argc, char **argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_uint ('A', + "asize", + NULL, + gettext_noop ("number of element in set A-B"), + &asize), + + GNUNET_GETOPT_option_uint ('B', + "bsize", + NULL, + gettext_noop ("number of element in set B-A"), + &bsize), + + GNUNET_GETOPT_option_uint ('C', + "csize", + NULL, + gettext_noop ( + "number of common elements in A and B"), + &csize), + + GNUNET_GETOPT_option_uint ('k', + "hash-num", + NULL, + gettext_noop ("hash num"), + &hash_num), + + GNUNET_GETOPT_option_uint ('s', + "ibf-size", + NULL, + gettext_noop ("ibf size"), + &ibf_size), + + GNUNET_GETOPT_OPTION_END + }; + + GNUNET_PROGRAM_run2 (argc, + argv, + "gnunet-consensus-ibf", + "help", + options, + &run, + NULL, + GNUNET_YES); + return 0; +} diff --git a/src/setu/gnunet-setu-profiler.c b/src/setu/gnunet-setu-profiler.c new file mode 100644 index 000000000..8d6a2dc8c --- /dev/null +++ b/src/setu/gnunet-setu-profiler.c @@ -0,0 +1,499 @@ +/* + This file is part of GNUnet + Copyright (C) 2013 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file setu/gnunet-setu-profiler.c + * @brief profiling tool for set + * @author Florian Dold + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" +#include "gnunet_setu_service.h" +#include "gnunet_testbed_service.h" + + +static int ret; + +static unsigned int num_a = 5; +static unsigned int num_b = 5; +static unsigned int num_c = 20; + +static char *op_str = "union"; + +const static struct GNUNET_CONFIGURATION_Handle *config; + +struct SetInfo +{ + char *id; + struct GNUNET_SETU_Handle *set; + struct GNUNET_SETU_OperationHandle *oh; + struct GNUNET_CONTAINER_MultiHashMap *sent; + struct GNUNET_CONTAINER_MultiHashMap *received; + int done; +} info1, info2; + +static struct GNUNET_CONTAINER_MultiHashMap *common_sent; + +static struct GNUNET_HashCode app_id; + +static struct GNUNET_PeerIdentity local_peer; + +static struct GNUNET_SETU_ListenHandle *set_listener; + +static int byzantine; +static unsigned int force_delta; +static unsigned int force_full; +static unsigned int element_size = 32; + +/** + * Handle to the statistics service. + */ +static struct GNUNET_STATISTICS_Handle *statistics; + +/** + * The profiler will write statistics + * for all peers to the file with this name. + */ +static char *statistics_filename; + +/** + * The profiler will write statistics + * for all peers to this file. + */ +static FILE *statistics_file; + + +static int +map_remove_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_CONTAINER_MultiHashMap *m = cls; + int ret; + + GNUNET_assert (NULL != key); + + ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key); + if (GNUNET_OK != ret) + printf ("spurious element\n"); + return GNUNET_YES; +} + + +/** + * Callback function to process statistic values. + * + * @param cls closure + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + */ +static int +statistics_result (void *cls, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) +{ + if (NULL != statistics_file) + { + fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned + long) value); + } + return GNUNET_OK; +} + + +static void +statistics_done (void *cls, + int success) +{ + GNUNET_assert (GNUNET_YES == success); + if (NULL != statistics_file) + fclose (statistics_file); + GNUNET_SCHEDULER_shutdown (); +} + + +static void +check_all_done (void) +{ + if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO)) + return; + + GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator, + info2.sent); + GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator, + info1.sent); + + printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size ( + info1.sent)); + printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size ( + info2.sent)); + + if (NULL == statistics_filename) + { + GNUNET_SCHEDULER_shutdown (); + return; + } + + statistics_file = fopen (statistics_filename, "w"); + GNUNET_STATISTICS_get (statistics, NULL, NULL, + &statistics_done, + &statistics_result, NULL); +} + + +static void +set_result_cb (void *cls, + const struct GNUNET_SETU_Element *element, + uint64_t current_size, + enum GNUNET_SETU_Status status) +{ + struct SetInfo *info = cls; + + GNUNET_assert (GNUNET_NO == info->done); + switch (status) + { + case GNUNET_SETU_STATUS_DONE: + info->done = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id); + check_all_done (); + info->oh = NULL; + return; + + case GNUNET_SETU_STATUS_FAILURE: + info->oh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n"); + GNUNET_SCHEDULER_shutdown (); + return; + + case GNUNET_SETU_STATUS_ADD_LOCAL: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id); + break; + default: + GNUNET_assert (0); + } + + if (element->size != element_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "wrong element size: %u, expected %u\n", + element->size, + (unsigned int) sizeof(struct GNUNET_HashCode)); + GNUNET_assert (0); + } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n", + info->id, GNUNET_h2s (element->data)); + GNUNET_assert (NULL != element->data); + struct GNUNET_HashCode data_hash; + GNUNET_CRYPTO_hash (element->data, element_size, &data_hash); + GNUNET_CONTAINER_multihashmap_put (info->received, + &data_hash, NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); +} + + +static void +set_listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SETU_Request *request) +{ + /* max. 2 options plus terminator */ + struct GNUNET_SETU_Option opts[3] = { { 0 } }; + unsigned int n_opts = 0; + + if (NULL == request) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "listener failed\n"); + return; + } + GNUNET_assert (NULL == info2.oh); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "set listen cb called\n"); + if (byzantine) + { + opts[n_opts++] = (struct GNUNET_SETU_Option) { .type = + GNUNET_SETU_OPTION_BYZANTINE }; + } + GNUNET_assert (! (force_full && force_delta)); + if (force_full) + { + opts[n_opts++] = (struct GNUNET_SETU_Option) { .type = + GNUNET_SETU_OPTION_FORCE_FULL }; + } + if (force_delta) + { + opts[n_opts++] = (struct GNUNET_SETU_Option) { .type = + GNUNET_SETU_OPTION_FORCE_DELTA }; + } + + opts[n_opts].type = 0; + info2.oh = GNUNET_SETU_accept (request, + opts, + set_result_cb, &info2); + GNUNET_SETU_commit (info2.oh, info2.set); +} + + +static int +set_insert_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_SETU_Handle *set = cls; + struct GNUNET_SETU_Element el; + + el.element_type = 0; + el.data = value; + el.size = element_size; + GNUNET_SETU_add_element (set, &el, NULL, NULL); + return GNUNET_YES; +} + + +static void +handle_shutdown (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shutting down set profiler\n"); + if (NULL != set_listener) + { + GNUNET_SETU_listen_cancel (set_listener); + set_listener = NULL; + } + if (NULL != info1.oh) + { + GNUNET_SETU_operation_cancel (info1.oh); + info1.oh = NULL; + } + if (NULL != info2.oh) + { + GNUNET_SETU_operation_cancel (info2.oh); + info2.oh = NULL; + } + if (NULL != info1.set) + { + GNUNET_SETU_destroy (info1.set); + info1.set = NULL; + } + if (NULL != info2.set) + { + GNUNET_SETU_destroy (info2.set); + info2.set = NULL; + } + GNUNET_STATISTICS_destroy (statistics, GNUNET_NO); +} + + +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +{ + unsigned int i; + struct GNUNET_HashCode hash; + /* max. 2 options plus terminator */ + struct GNUNET_SETU_Option opts[3] = { { 0 } }; + unsigned int n_opts = 0; + + config = cfg; + + GNUNET_assert (element_size > 0); + + if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); + ret = 0; + return; + } + + statistics = GNUNET_STATISTICS_create ("set-profiler", cfg); + + GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL); + + info1.id = "a"; + info2.id = "b"; + + info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO); + info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO); + common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO); + + info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO); + info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO); + + for (i = 0; i < num_a; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + for (i = 0; i < num_b; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + for (i = 0; i < num_c; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id); + + /* FIXME: also implement intersection etc. */ + info1.set = GNUNET_SETU_create (config); + info2.set = GNUNET_SETU_create (config); + + GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator, + info1.set); + GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator, + info2.set); + GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, + info1.set); + GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, + info2.set); + + set_listener = GNUNET_SETU_listen (config, + &app_id, + &set_listen_cb, + NULL); + + + if (byzantine) + { + opts[n_opts++] = (struct GNUNET_SETU_Option) { .type = + GNUNET_SETU_OPTION_BYZANTINE }; + } + GNUNET_assert (! (force_full && force_delta)); + if (force_full) + { + opts[n_opts++] = (struct GNUNET_SETU_Option) { .type = + GNUNET_SETU_OPTION_FORCE_FULL }; + } + if (force_delta) + { + opts[n_opts++] = (struct GNUNET_SETU_Option) { .type = + GNUNET_SETU_OPTION_FORCE_DELTA }; + } + + opts[n_opts].type = 0; + + info1.oh = GNUNET_SETU_prepare (&local_peer, &app_id, NULL, + opts, + set_result_cb, &info1); + GNUNET_SETU_commit (info1.oh, info1.set); + GNUNET_SETU_destroy (info1.set); + info1.set = NULL; +} + + +static void +pre_run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + if (0 != GNUNET_TESTING_peer_run ("set-profiler", + cfgfile, + &run, NULL)) + ret = 2; +} + + +int +main (int argc, char **argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_uint ('A', + "num-first", + NULL, + gettext_noop ("number of values"), + &num_a), + + GNUNET_GETOPT_option_uint ('B', + "num-second", + NULL, + gettext_noop ("number of values"), + &num_b), + + GNUNET_GETOPT_option_flag ('b', + "byzantine", + gettext_noop ("use byzantine mode"), + &byzantine), + + GNUNET_GETOPT_option_uint ('f', + "force-full", + NULL, + gettext_noop ("force sending full set"), + &force_full), + + GNUNET_GETOPT_option_uint ('d', + "force-delta", + NULL, + gettext_noop ("number delta operation"), + &force_delta), + + GNUNET_GETOPT_option_uint ('C', + "num-common", + NULL, + gettext_noop ("number of values"), + &num_c), + + GNUNET_GETOPT_option_string ('x', + "operation", + NULL, + gettext_noop ("operation to execute"), + &op_str), + + GNUNET_GETOPT_option_uint ('w', + "element-size", + NULL, + gettext_noop ("element size"), + &element_size), + + GNUNET_GETOPT_option_filename ('s', + "statistics", + "FILENAME", + gettext_noop ("write statistics to file"), + &statistics_filename), + + GNUNET_GETOPT_OPTION_END + }; + + GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler", + "help", + options, &pre_run, NULL, GNUNET_YES); + return ret; +} diff --git a/src/setu/ibf.c b/src/setu/ibf.c new file mode 100644 index 000000000..1532afceb --- /dev/null +++ b/src/setu/ibf.c @@ -0,0 +1,409 @@ +/* + This file is part of GNUnet + Copyright (C) 2012 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/ibf.c + * @brief implementation of the invertible bloom filter + * @author Florian Dold + */ + +#include "ibf.h" + +/** + * Compute the key's hash from the key. + * Redefine to use a different hash function. + */ +#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof(struct \ + IBF_KeyHash))) + +/** + * Create a key from a hashcode. + * + * @param hash the hashcode + * @return a key + */ +struct IBF_Key +ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) +{ + return *(struct IBF_Key *) hash; +} + + +/** + * Create a hashcode from a key, by replicating the key + * until the hascode is filled + * + * @param key the key + * @param dst hashcode to store the result in + */ +void +ibf_hashcode_from_key (struct IBF_Key key, + struct GNUNET_HashCode *dst) +{ + struct IBF_Key *p; + unsigned int i; + const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode) + / sizeof(struct IBF_Key); + + p = (struct IBF_Key *) dst; + for (i = 0; i < keys_per_hashcode; i++) + *p++ = key; +} + + +/** + * Create an invertible bloom filter. + * + * @param size number of IBF buckets + * @param hash_num number of buckets one element is hashed in + * @return the newly created invertible bloom filter, NULL on error + */ +struct InvertibleBloomFilter * +ibf_create (uint32_t size, uint8_t hash_num) +{ + struct InvertibleBloomFilter *ibf; + + GNUNET_assert (0 != size); + + ibf = GNUNET_new (struct InvertibleBloomFilter); + ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t)); + if (NULL == ibf->count) + { + GNUNET_free (ibf); + return NULL; + } + ibf->key_sum = GNUNET_malloc_large (size * sizeof(struct IBF_Key)); + if (NULL == ibf->key_sum) + { + GNUNET_free (ibf->count); + GNUNET_free (ibf); + return NULL; + } + ibf->key_hash_sum = GNUNET_malloc_large (size * sizeof(struct IBF_KeyHash)); + if (NULL == ibf->key_hash_sum) + { + GNUNET_free (ibf->key_sum); + GNUNET_free (ibf->count); + GNUNET_free (ibf); + return NULL; + } + ibf->size = size; + ibf->hash_num = hash_num; + + return ibf; +} + + +/** + * Store unique bucket indices for the specified key in dst. + */ +static void +ibf_get_indices (const struct InvertibleBloomFilter *ibf, + struct IBF_Key key, + int *dst) +{ + uint32_t filled; + uint32_t i; + uint32_t bucket; + + bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key); + for (i = 0, filled = 0; filled < ibf->hash_num; i++) + { + unsigned int j; + uint64_t x; + for (j = 0; j < filled; j++) + if (dst[j] == bucket) + goto try_next; + dst[filled++] = bucket % ibf->size; +try_next:; + x = ((uint64_t) bucket << 32) | i; + bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x); + } +} + + +static void +ibf_insert_into (struct InvertibleBloomFilter *ibf, + struct IBF_Key key, + const int *buckets, int side) +{ + int i; + + for (i = 0; i < ibf->hash_num; i++) + { + const int bucket = buckets[i]; + ibf->count[bucket].count_val += side; + ibf->key_sum[bucket].key_val ^= key.key_val; + ibf->key_hash_sum[bucket].key_hash_val + ^= IBF_KEY_HASH_VAL (key); + } +} + + +/** + * Insert a key into an IBF. + * + * @param ibf the IBF + * @param key the element's hash code + */ +void +ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) +{ + int buckets[ibf->hash_num]; + + GNUNET_assert (ibf->hash_num <= ibf->size); + ibf_get_indices (ibf, key, buckets); + ibf_insert_into (ibf, key, buckets, 1); +} + + +/** + * Remove a key from an IBF. + * + * @param ibf the IBF + * @param key the element's hash code + */ +void +ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key) +{ + int buckets[ibf->hash_num]; + + GNUNET_assert (ibf->hash_num <= ibf->size); + ibf_get_indices (ibf, key, buckets); + ibf_insert_into (ibf, key, buckets, -1); +} + + +/** + * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero. + */ +static int +ibf_is_empty (struct InvertibleBloomFilter *ibf) +{ + int i; + + for (i = 0; i < ibf->size; i++) + { + if (0 != ibf->count[i].count_val) + return GNUNET_NO; + if (0 != ibf->key_hash_sum[i].key_hash_val) + return GNUNET_NO; + if (0 != ibf->key_sum[i].key_val) + return GNUNET_NO; + } + return GNUNET_YES; +} + + +/** + * Decode and remove an element from the IBF, if possible. + * + * @param ibf the invertible bloom filter to decode + * @param ret_side sign of the cell's count where the decoded element came from. + * A negative sign indicates that the element was recovered + * resides in an IBF that was previously subtracted from. + * @param ret_id receives the hash code of the decoded element, if successful + * @return GNUNET_YES if decoding an element was successful, + * GNUNET_NO if the IBF is empty, + * GNUNET_SYSERR if the decoding has failed + */ +int +ibf_decode (struct InvertibleBloomFilter *ibf, + int *ret_side, struct IBF_Key *ret_id) +{ + struct IBF_KeyHash hash; + int i; + int buckets[ibf->hash_num]; + + GNUNET_assert (NULL != ibf); + + for (i = 0; i < ibf->size; i++) + { + int j; + int hit; + + /* we can only decode from pure buckets */ + if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val)) + continue; + + hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]); + + /* test if the hash matches the key */ + if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val) + continue; + + /* test if key in bucket hits its own location, + * if not, the key hash was subject to collision */ + hit = GNUNET_NO; + ibf_get_indices (ibf, ibf->key_sum[i], buckets); + for (j = 0; j < ibf->hash_num; j++) + if (buckets[j] == i) + hit = GNUNET_YES; + + if (GNUNET_NO == hit) + continue; + + if (NULL != ret_side) + *ret_side = ibf->count[i].count_val; + if (NULL != ret_id) + *ret_id = ibf->key_sum[i]; + + /* insert on the opposite side, effectively removing the element */ + ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val); + + return GNUNET_YES; + } + + if (GNUNET_YES == ibf_is_empty (ibf)) + return GNUNET_NO; + return GNUNET_SYSERR; +} + + +/** + * Write buckets from an ibf to a buffer. + * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. + * + * @param ibf the ibf to write + * @param start with which bucket to start + * @param count how many buckets to write + * @param buf buffer to write the data to + */ +void +ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, + uint32_t count, void *buf) +{ + struct IBF_Key *key_dst; + struct IBF_KeyHash *key_hash_dst; + struct IBF_Count *count_dst; + + GNUNET_assert (start + count <= ibf->size); + + /* copy keys */ + key_dst = (struct IBF_Key *) buf; + GNUNET_memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); + key_dst += count; + /* copy key hashes */ + key_hash_dst = (struct IBF_KeyHash *) key_dst; + GNUNET_memcpy (key_hash_dst, ibf->key_hash_sum + start, count + * sizeof *key_hash_dst); + key_hash_dst += count; + /* copy counts */ + count_dst = (struct IBF_Count *) key_hash_dst; + GNUNET_memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); +} + + +/** + * Read buckets from a buffer into an ibf. + * + * @param buf pointer to the buffer to read from + * @param start which bucket to start at + * @param count how many buckets to read + * @param ibf the ibf to read from + */ +void +ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct + InvertibleBloomFilter *ibf) +{ + struct IBF_Key *key_src; + struct IBF_KeyHash *key_hash_src; + struct IBF_Count *count_src; + + GNUNET_assert (count > 0); + GNUNET_assert (start + count <= ibf->size); + + /* copy keys */ + key_src = (struct IBF_Key *) buf; + GNUNET_memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); + key_src += count; + /* copy key hashes */ + key_hash_src = (struct IBF_KeyHash *) key_src; + GNUNET_memcpy (ibf->key_hash_sum + start, key_hash_src, count + * sizeof *key_hash_src); + key_hash_src += count; + /* copy counts */ + count_src = (struct IBF_Count *) key_hash_src; + GNUNET_memcpy (ibf->count + start, count_src, count * sizeof *count_src); +} + + +/** + * Subtract ibf2 from ibf1, storing the result in ibf1. + * The two IBF's must have the same parameters size and hash_num. + * + * @param ibf1 IBF that is subtracted from + * @param ibf2 IBF that will be subtracted from ibf1 + */ +void +ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct + InvertibleBloomFilter *ibf2) +{ + int i; + + GNUNET_assert (ibf1->size == ibf2->size); + GNUNET_assert (ibf1->hash_num == ibf2->hash_num); + + for (i = 0; i < ibf1->size; i++) + { + ibf1->count[i].count_val -= ibf2->count[i].count_val; + ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val; + ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val; + } +} + + +/** + * Create a copy of an IBF, the copy has to be destroyed properly. + * + * @param ibf the IBF to copy + */ +struct InvertibleBloomFilter * +ibf_dup (const struct InvertibleBloomFilter *ibf) +{ + struct InvertibleBloomFilter *copy; + + copy = GNUNET_malloc (sizeof *copy); + copy->hash_num = ibf->hash_num; + copy->size = ibf->size; + copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size + * sizeof(struct IBF_KeyHash)); + copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof(struct + IBF_Key)); + copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof(struct + IBF_Count)); + return copy; +} + + +/** + * Destroy all resources associated with the invertible bloom filter. + * No more ibf_*-functions may be called on ibf after calling destroy. + * + * @param ibf the intertible bloom filter to destroy + */ +void +ibf_destroy (struct InvertibleBloomFilter *ibf) +{ + GNUNET_free (ibf->key_sum); + GNUNET_free (ibf->key_hash_sum); + GNUNET_free (ibf->count); + GNUNET_free (ibf); +} diff --git a/src/setu/ibf.h b/src/setu/ibf.h new file mode 100644 index 000000000..7c2ab33b1 --- /dev/null +++ b/src/setu/ibf.h @@ -0,0 +1,255 @@ +/* + This file is part of GNUnet + Copyright (C) 2012 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/ibf.h + * @brief invertible bloom filter + * @author Florian Dold + */ + +#ifndef GNUNET_CONSENSUS_IBF_H +#define GNUNET_CONSENSUS_IBF_H + +#include "platform.h" +#include "gnunet_util_lib.h" + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + + +/** + * Keys that can be inserted into and removed from an IBF. + */ +struct IBF_Key +{ + uint64_t key_val; +}; + + +/** + * Hash of an IBF key. + */ +struct IBF_KeyHash +{ + uint32_t key_hash_val; +}; + + +/** + * Type of the count field of IBF buckets. + */ +struct IBF_Count +{ + int8_t count_val; +}; + + +/** + * Size of one ibf bucket in bytes + */ +#define IBF_BUCKET_SIZE (sizeof(struct IBF_Count) + sizeof(struct IBF_Key) \ + + sizeof(struct IBF_KeyHash)) + + +/** + * Invertible bloom filter (IBF). + * + * An IBF is a counting bloom filter that has the ability to restore + * the hashes of its stored elements with high probability. + */ +struct InvertibleBloomFilter +{ + /** + * How many cells does this IBF have? + */ + uint32_t size; + + /** + * In how many cells do we hash one element? + * Usually 4 or 3. + */ + uint8_t hash_num; + + /** + * Xor sums of the elements' keys, used to identify the elements. + * Array of 'size' elements. + */ + struct IBF_Key *key_sum; + + /** + * Xor sums of the hashes of the keys of inserted elements. + * Array of 'size' elements. + */ + struct IBF_KeyHash *key_hash_sum; + + /** + * How many times has a bucket been hit? + * Can be negative, as a result of IBF subtraction. + * Array of 'size' elements. + */ + struct IBF_Count *count; +}; + + +/** + * Write buckets from an ibf to a buffer. + * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. + * + * @param ibf the ibf to write + * @param start with which bucket to start + * @param count how many buckets to write + * @param buf buffer to write the data to + */ +void +ibf_write_slice (const struct InvertibleBloomFilter *ibf, + uint32_t start, + uint32_t count, + void *buf); + + +/** + * Read buckets from a buffer into an ibf. + * + * @param buf pointer to the buffer to read from + * @param start which bucket to start at + * @param count how many buckets to read + * @param ibf the ibf to write to + */ +void +ibf_read_slice (const void *buf, + uint32_t start, + uint32_t count, + struct InvertibleBloomFilter *ibf); + + +/** + * Create a key from a hashcode. + * + * @param hash the hashcode + * @return a key + */ +struct IBF_Key +ibf_key_from_hashcode (const struct GNUNET_HashCode *hash); + + +/** + * Create a hashcode from a key, by replicating the key + * until the hascode is filled + * + * @param key the key + * @param dst hashcode to store the result in + */ +void +ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst); + + +/** + * Create an invertible bloom filter. + * + * @param size number of IBF buckets + * @param hash_num number of buckets one element is hashed in, usually 3 or 4 + * @return the newly created invertible bloom filter, NULL on error + */ +struct InvertibleBloomFilter * +ibf_create (uint32_t size, uint8_t hash_num); + + +/** + * Insert a key into an IBF. + * + * @param ibf the IBF + * @param key the element's hash code + */ +void +ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key); + + +/** + * Remove a key from an IBF. + * + * @param ibf the IBF + * @param key the element's hash code + */ +void +ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key); + + +/** + * Subtract ibf2 from ibf1, storing the result in ibf1. + * The two IBF's must have the same parameters size and hash_num. + * + * @param ibf1 IBF that is subtracted from + * @param ibf2 IBF that will be subtracted from ibf1 + */ +void +ibf_subtract (struct InvertibleBloomFilter *ibf1, + const struct InvertibleBloomFilter *ibf2); + + +/** + * Decode and remove an element from the IBF, if possible. + * + * @param ibf the invertible bloom filter to decode + * @param ret_side sign of the cell's count where the decoded element came from. + * A negative sign indicates that the element was recovered + * resides in an IBF that was previously subtracted from. + * @param ret_id receives the hash code of the decoded element, if successful + * @return #GNUNET_YES if decoding an element was successful, + * #GNUNET_NO if the IBF is empty, + * #GNUNET_SYSERR if the decoding has failed + */ +int +ibf_decode (struct InvertibleBloomFilter *ibf, + int *ret_side, + struct IBF_Key *ret_id); + + +/** + * Create a copy of an IBF, the copy has to be destroyed properly. + * + * @param ibf the IBF to copy + */ +struct InvertibleBloomFilter * +ibf_dup (const struct InvertibleBloomFilter *ibf); + + +/** + * Destroy all resources associated with the invertible bloom filter. + * No more ibf_*-functions may be called on ibf after calling destroy. + * + * @param ibf the intertible bloom filter to destroy + */ +void +ibf_destroy (struct InvertibleBloomFilter *ibf); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/setu/ibf_sim.c b/src/setu/ibf_sim.c new file mode 100644 index 000000000..6415d00e1 --- /dev/null +++ b/src/setu/ibf_sim.c @@ -0,0 +1,142 @@ +/* + This file is part of GNUnet + Copyright (C) 2013 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/ibf_sim.c + * @brief implementation of simulation for invertible bloom filter + * @author Florian Dold + * + * This code was used for some internal experiments, it is not + * build or shipped as part of the GNUnet system. + */ +#include +#include +#include + +#define MAX_IBF_DECODE 16 + +/* report average over how many rounds? */ +#define ROUNDS 100000 + +/* enable one of the three below */ +// simple fix +#define FIX1 0 +// possibly slightly better fix for large IBF_DECODE values +#define FIX2 1 + +// SIGCOMM algorithm +#define STRATA 0 + +// print each value? +#define VERBOSE 0 +// avoid assembly? (ASM is about 50% faster) +#define SLOW 0 + +int +main (int argc, char **argv) +{ + unsigned int round; + unsigned int buckets[31]; // max is 2^31 as 'random' returns only between 0 and 2^31 + unsigned int i; + int j; + unsigned int r; + unsigned int ret; + unsigned long long total; + unsigned int want; + double predict; + + srandom (time (NULL)); + total = 0; + want = atoi (argv[1]); + for (round = 0; round < ROUNDS; round++) + { + memset (buckets, 0, sizeof(buckets)); + for (i = 0; i < want; i++) + { + /* FIXME: might want to use 'better' PRNG to avoid + PRNG-induced biases */ + r = random (); + if (0 == r) + continue; +#if SLOW + for (j = 0; (j < 31) && (0 == (r & (1 << j))); j++) + ; +#else + /* use assembly / gcc */ + j = __builtin_ffs (r) - 1; +#endif + buckets[j]++; + } + ret = 0; + predict = 0.0; + for (j = 31; j >= 0; j--) + { +#if FIX1 + /* improved algorithm, for 1000 elements with IBF-DECODE 8, I + get 990/1000 elements on average over 1 million runs; key + idea being to stop short of the 'last' possible IBF as + otherwise a "lowball" per-chance would unduely influence the + result */if ((j > 0) && + (buckets[j - 1] > MAX_IBF_DECODE)) + { + ret *= (1 << (j + 1)); + break; + } +#endif +#if FIX2 + /* another improvement: don't just always cut off the last one, + but rather try to predict based on all previous values where + that "last" one is; additional prediction can only really + work if MAX_IBF_DECODE is sufficiently high */ + if ((j > 0) && + ((buckets[j - 1] > MAX_IBF_DECODE) || + (predict > MAX_IBF_DECODE))) + { + ret *= (1 << (j + 1)); + break; + } +#endif +#if STRATA + /* original algorithm, for 1000 elements with IBF-DECODE 8, + I get 920/1000 elements on average over 1 million runs */ + if (buckets[j] > MAX_IBF_DECODE) + { + ret *= (1 << (j + 1)); + break; + } +#endif + ret += buckets[j]; + predict = (buckets[j] + 2.0 * predict) / 2.0; + } +#if VERBOSE + fprintf (stderr, "%u ", ret); +#endif + total += ret; + } + fprintf (stderr, "\n"); + fprintf (stdout, "average %llu\n", total / ROUNDS); + return 0; +} + + +/* TODO: should calculate stddev of the results to also be able to + say something about the stability of the results, outside of + large-scale averages -- gaining 8% precision at the expense of + 50% additional variance might not be worth it... */ diff --git a/src/setu/plugin_block_setu_test.c b/src/setu/plugin_block_setu_test.c new file mode 100644 index 000000000..1de086092 --- /dev/null +++ b/src/setu/plugin_block_setu_test.c @@ -0,0 +1,123 @@ +/* + This file is part of GNUnet + Copyright (C) 2017 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/plugin_block_set_test.c + * @brief set test block, recognizes elements with non-zero first byte as invalid + * @author Christian Grothoff + */ + +#include "platform.h" +#include "gnunet_block_plugin.h" +#include "gnunet_block_group_lib.h" + + +/** + * Function called to validate a reply or a request. For + * request evaluation, simply pass "NULL" for the reply_block. + * + * @param cls closure + * @param ctx block context + * @param type block type + * @param group block group to use + * @param eo control flags + * @param query original query (hash) + * @param xquery extrended query data (can be NULL, depending on type) + * @param xquery_size number of bytes in xquery + * @param reply_block response to validate + * @param reply_block_size number of bytes in reply block + * @return characterization of result + */ +static enum GNUNET_BLOCK_EvaluationResult +block_plugin_set_test_evaluate (void *cls, + struct GNUNET_BLOCK_Context *ctx, + enum GNUNET_BLOCK_Type type, + struct GNUNET_BLOCK_Group *group, + enum GNUNET_BLOCK_EvaluationOptions eo, + const struct GNUNET_HashCode *query, + const void *xquery, + size_t xquery_size, + const void *reply_block, + size_t reply_block_size) +{ + if ((NULL == reply_block) || + (reply_block_size == 0) || + (0 != ((char *) reply_block)[0])) + return GNUNET_BLOCK_EVALUATION_RESULT_INVALID; + return GNUNET_BLOCK_EVALUATION_OK_MORE; +} + + +/** + * Function called to obtain the key for a block. + * + * @param cls closure + * @param type block type + * @param block block to get the key for + * @param block_size number of bytes in block + * @param key set to the key (query) for the given block + * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported + * (or if extracting a key from a block of this type does not work) + */ +static int +block_plugin_set_test_get_key (void *cls, + enum GNUNET_BLOCK_Type type, + const void *block, + size_t block_size, + struct GNUNET_HashCode *key) +{ + return GNUNET_SYSERR; +} + + +/** + * Entry point for the plugin. + */ +void * +libgnunet_plugin_block_set_test_init (void *cls) +{ + static enum GNUNET_BLOCK_Type types[] = { + GNUNET_BLOCK_TYPE_SET_TEST, + GNUNET_BLOCK_TYPE_ANY /* end of list */ + }; + struct GNUNET_BLOCK_PluginFunctions *api; + + api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions); + api->evaluate = &block_plugin_set_test_evaluate; + api->get_key = &block_plugin_set_test_get_key; + api->types = types; + return api; +} + + +/** + * Exit point from the plugin. + */ +void * +libgnunet_plugin_block_set_test_done (void *cls) +{ + struct GNUNET_BLOCK_PluginFunctions *api = cls; + + GNUNET_free (api); + return NULL; +} + + +/* end of plugin_block_set_test.c */ diff --git a/src/setu/setu.h b/src/setu/setu.h new file mode 100644 index 000000000..f1c5df92b --- /dev/null +++ b/src/setu/setu.h @@ -0,0 +1,304 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2014, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/set.h + * @brief messages used for the set union api + * @author Florian Dold + * @author Christian Grothoff + */ +#ifndef SET_H +#define SET_H + +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_set_service.h" + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Message sent by the client to the service to ask starting + * a new set to perform operations with. Includes the desired + * set operation type. + */ +struct GNUNET_SETU_CreateMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_CREATE + */ + struct GNUNET_MessageHeader header; + +}; + + +/** + * Message sent by the client to the service to start listening for + * incoming requests to perform a certain type of set operation for a + * certain type of application. + */ +struct GNUNET_SETU_ListenMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_LISTEN + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * application id + */ + struct GNUNET_HashCode app_id; +}; + + +/** + * Message sent by a listening client to the service to accept + * performing the operation with the other peer. + */ +struct GNUNET_SETU_AcceptMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_ACCEPT + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the incoming request we want to accept. + */ + uint32_t accept_reject_id GNUNET_PACKED; + + /** + * Request ID to identify responses. + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Always use delta operation instead of sending full sets, + * even it it's less efficient. + */ + uint8_t force_delta; + + /** + * Always send full sets, even if delta operations would + * be more efficient. + */ + uint8_t force_full; + + /** + * #GNUNET_YES to fail operations where Byzantine faults + * are suspected + */ + uint8_t byzantine; + + /** + * Lower bound for the set size, used only when + * byzantine mode is enabled. + */ + uint8_t byzantine_lower_bound; +}; + + +/** + * Message sent by a listening client to the service to reject + * performing the operation with the other peer. + */ +struct GNUNET_SETU_RejectMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_REJECT + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the incoming request we want to reject. + */ + uint32_t accept_reject_id GNUNET_PACKED; +}; + + +/** + * A request for an operation with another client. + */ +struct GNUNET_SETU_RequestMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_REQUEST. + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the to identify the request when accepting or + * rejecting it. + */ + uint32_t accept_id GNUNET_PACKED; + + /** + * Identity of the requesting peer. + */ + struct GNUNET_PeerIdentity peer_id; + + /* rest: context message, that is, application-specific + message to convince listener to pick up */ +}; + + +/** + * Message sent by client to service to initiate a set operation as a + * client (not as listener). A set (which determines the operation + * type) must already exist in association with this client. + */ +struct GNUNET_SETU_EvaluateMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_EVALUATE + */ + struct GNUNET_MessageHeader header; + + /** + * Id of our set to evaluate, chosen implicitly by the client when it + * calls #GNUNET_SETU_commit(). + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Peer to evaluate the operation with + */ + struct GNUNET_PeerIdentity target_peer; + + /** + * Application id + */ + struct GNUNET_HashCode app_id; + + /** + * Always use delta operation instead of sending full sets, + * even it it's less efficient. + */ + uint8_t force_delta; + + /** + * Always send full sets, even if delta operations would + * be more efficient. + */ + uint8_t force_full; + + /** + * #GNUNET_YES to fail operations where Byzantine faults + * are suspected + */ + uint8_t byzantine; + + /** + * Lower bound for the set size, used only when + * byzantine mode is enabled. + */ + uint8_t byzantine_lower_bound; + + /* rest: context message, that is, application-specific + message to convince listener to pick up */ +}; + + +/** + * Message sent by the service to the client to indicate an + * element that is removed (set intersection) or added + * (set union) or part of the final result, depending on + * options specified for the operation. + */ +struct GNUNET_SETU_ResultMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Current set size. + */ + uint64_t current_size; + + /** + * id the result belongs to + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Was the evaluation successful? Contains + * an `enum GNUNET_SETU_Status` in NBO. + */ + uint16_t result_status GNUNET_PACKED; + + /** + * Type of the element attachted to the message, if any. + */ + uint16_t element_type GNUNET_PACKED; + + /* rest: the actual element */ +}; + + +/** + * Message sent by client to the service to add + * an element to the set. + */ +struct GNUNET_SETU_ElementMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_ADD + */ + struct GNUNET_MessageHeader header; + + /** + * Type of the element to add or remove. + */ + uint16_t element_type GNUNET_PACKED; + + /** + * For alignment, always zero. + */ + uint16_t reserved GNUNET_PACKED; + + /* rest: the actual element */ +}; + + +/** + * Sent to the service by the client in order to cancel a set operation. + */ +struct GNUNET_SETU_CancelMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETU_CANCEL + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the request we want to cancel. + */ + uint32_t request_id GNUNET_PACKED; +}; + + +GNUNET_NETWORK_STRUCT_END + +#endif diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c new file mode 100644 index 000000000..48260de55 --- /dev/null +++ b/src/setu/setu_api.c @@ -0,0 +1,872 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2016, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/setu_api.c + * @brief api for the set union service + * @author Florian Dold + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_setu_service.h" +#include "setu.h" + + +#define LOG(kind, ...) GNUNET_log_from (kind, "set-api", __VA_ARGS__) + +/** + * Opaque handle to a set. + */ +struct GNUNET_SETU_Handle +{ + /** + * Message queue for @e client. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Linked list of operations on the set. + */ + struct GNUNET_SETU_OperationHandle *ops_head; + + /** + * Linked list of operations on the set. + */ + struct GNUNET_SETU_OperationHandle *ops_tail; + + /** + * Should the set be destroyed once all operations are gone? + * #GNUNET_SYSERR if #GNUNET_SETU_destroy() must raise this flag, + * #GNUNET_YES if #GNUNET_SETU_destroy() did raise this flag. + */ + int destroy_requested; + + /** + * Has the set become invalid (e.g. service died)? + */ + int invalid; + +}; + + +/** + * Handle for a set operation request from another peer. + */ +struct GNUNET_SETU_Request +{ + /** + * Id of the request, used to identify the request when + * accepting/rejecting it. + */ + uint32_t accept_id; + + /** + * Has the request been accepted already? + * #GNUNET_YES/#GNUNET_NO + */ + int accepted; +}; + + +/** + * Handle to an operation. Only known to the service after committing + * the handle with a set. + */ +struct GNUNET_SETU_OperationHandle +{ + /** + * Function to be called when we have a result, + * or an error. + */ + GNUNET_SETU_ResultIterator result_cb; + + /** + * Closure for @e result_cb. + */ + void *result_cls; + + /** + * Local set used for the operation, + * NULL if no set has been provided by conclude yet. + */ + struct GNUNET_SETU_Handle *set; + + /** + * Message sent to the server on calling conclude, + * NULL if conclude has been called. + */ + struct GNUNET_MQ_Envelope *conclude_mqm; + + /** + * Address of the request if in the conclude message, + * used to patch the request id into the message when the set is known. + */ + uint32_t *request_id_addr; + + /** + * Handles are kept in a linked list. + */ + struct GNUNET_SETU_OperationHandle *prev; + + /** + * Handles are kept in a linked list. + */ + struct GNUNET_SETU_OperationHandle *next; + + /** + * Request ID to identify the operation within the set. + */ + uint32_t request_id; +}; + + +/** + * Opaque handle to a listen operation. + */ +struct GNUNET_SETU_ListenHandle +{ + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle*mq; + + /** + * Configuration handle for the listener, stored + * here to be able to reconnect transparently on + * connection failure. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Function to call on a new incoming request, + * or on error. + */ + GNUNET_SETU_ListenCallback listen_cb; + + /** + * Closure for @e listen_cb. + */ + void *listen_cls; + + /** + * Application ID we listen for. + */ + struct GNUNET_HashCode app_id; + + /** + * Time to wait until we try to reconnect on failure. + */ + struct GNUNET_TIME_Relative reconnect_backoff; + + /** + * Task for reconnecting when the listener fails. + */ + struct GNUNET_SCHEDULER_Task *reconnect_task; + +}; + + +/** + * Check that the given @a msg is well-formed. + * + * @param cls closure + * @param msg message to check + * @return #GNUNET_OK if message is well-formed + */ +static int +check_result (void *cls, + const struct GNUNET_SETU_ResultMessage *msg) +{ + /* minimum size was already checked, everything else is OK! */ + return GNUNET_OK; +} + + +/** + * Handle result message for a set operation. + * + * @param cls the set + * @param mh the message + */ +static void +handle_result (void *cls, + const struct GNUNET_SETU_ResultMessage *msg) +{ + struct GNUNET_SETU_Handle *set = cls; + struct GNUNET_SETU_OperationHandle *oh; + struct GNUNET_SETU_Element e; + enum GNUNET_SETU_Status result_status; + int destroy_set; + + GNUNET_assert (NULL != set->mq); + result_status = (enum GNUNET_SETU_Status) ntohs (msg->result_status); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got result message with status %d\n", + result_status); + oh = GNUNET_MQ_assoc_get (set->mq, + ntohl (msg->request_id)); + if (NULL == oh) + { + /* 'oh' can be NULL if we canceled the operation, but the service + did not get the cancel message yet. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring result from canceled operation\n"); + return; + } + + switch (result_status) + { + case GNUNET_SETU_STATUS_ADD_LOCAL: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Treating result as element\n"); + e.data = &msg[1]; + e.size = ntohs (msg->header.size) + - sizeof(struct GNUNET_SETU_ResultMessage); + e.element_type = ntohs (msg->element_type); + if (NULL != oh->result_cb) + oh->result_cb (oh->result_cls, + &e, + GNUNET_ntohll (msg->current_size), + result_status); + return; + case GNUNET_SETU_STATUS_FAILURE: + case GNUNET_SETU_STATUS_DONE: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Treating result as final status\n"); + GNUNET_MQ_assoc_remove (set->mq, + ntohl (msg->request_id)); + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + oh); + /* Need to do this calculation _before_ the result callback, + as IF the application still has a valid set handle, it + may trigger destruction of the set during the callback. */ + destroy_set = (GNUNET_YES == set->destroy_requested) && + (NULL == set->ops_head); + if (NULL != oh->result_cb) + { + oh->result_cb (oh->result_cls, + NULL, + GNUNET_ntohll (msg->current_size), + result_status); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "No callback for final status\n"); + } + if (destroy_set) + GNUNET_SETU_destroy (set); + GNUNET_free (oh); + return; + } +} + + +/** + * Destroy the given set operation. + * + * @param oh set operation to destroy + */ +static void +set_operation_destroy (struct GNUNET_SETU_OperationHandle *oh) +{ + struct GNUNET_SETU_Handle *set = oh->set; + struct GNUNET_SETU_OperationHandle *h_assoc; + + if (NULL != oh->conclude_mqm) + GNUNET_MQ_discard (oh->conclude_mqm); + /* is the operation already commited? */ + if (NULL != set) + { + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + oh); + h_assoc = GNUNET_MQ_assoc_remove (set->mq, + oh->request_id); + GNUNET_assert ((NULL == h_assoc) || + (h_assoc == oh)); + } + GNUNET_free (oh); +} + + +/** + * Cancel the given set operation. We need to send an explicit cancel + * message, as all operations one one set communicate using one + * handle. + * + * @param oh set operation to cancel + */ +void +GNUNET_SETU_operation_cancel (struct GNUNET_SETU_OperationHandle *oh) +{ + struct GNUNET_SETU_Handle *set = oh->set; + struct GNUNET_SETU_CancelMessage *m; + struct GNUNET_MQ_Envelope *mqm; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling SET operation\n"); + if (NULL != set) + { + mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETU_CANCEL); + m->request_id = htonl (oh->request_id); + GNUNET_MQ_send (set->mq, mqm); + } + set_operation_destroy (oh); + if ((NULL != set) && + (GNUNET_YES == set->destroy_requested) && + (NULL == set->ops_head)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying set after operation cancel\n"); + GNUNET_SETU_destroy (set); + } +} + + +/** + * We encountered an error communicating with the set service while + * performing a set operation. Report to the application. + * + * @param cls the `struct GNUNET_SETU_Handle` + * @param error error code + */ +static void +handle_client_set_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SETU_Handle *set = cls; + + LOG (GNUNET_ERROR_TYPE_ERROR, + "Handling client set error %d\n", + error); + while (NULL != set->ops_head) + { + if ((NULL != set->ops_head->result_cb) && + (GNUNET_NO == set->destroy_requested)) + set->ops_head->result_cb (set->ops_head->result_cls, + NULL, + 0, + GNUNET_SETU_STATUS_FAILURE); + set_operation_destroy (set->ops_head); + } + set->invalid = GNUNET_YES; +} + + +/** + * Create an empty set, supporting the specified operation. + * + * @param cfg configuration to use for connecting to the + * set service + * @return a handle to the set + */ +struct GNUNET_SETU_Handle * +GNUNET_SETU_create (const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_SETU_Handle *set = GNUNET_new (struct GNUNET_SETU_Handle); + struct GNUNET_MQ_MessageHandler mq_handlers[] = { + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_SETU_RESULT, + struct GNUNET_SETU_ResultMessage, + set), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETU_CreateMessage *create_msg; + + set->mq = GNUNET_CLIENT_connect (cfg, + "setu", + mq_handlers, + &handle_client_set_error, + set); + if (NULL == set->mq) + { + GNUNET_free (set); + return NULL; + } + mqm = GNUNET_MQ_msg (create_msg, + GNUNET_MESSAGE_TYPE_SETU_CREATE); + GNUNET_MQ_send (set->mq, + mqm); + return set; +} + + +/** + * Add an element to the given set. After the element has been added + * (in the sense of being transmitted to the set service), @a cont + * will be called. Multiple calls to GNUNET_SETU_add_element() can be + * queued. + * + * @param set set to add element to + * @param element element to add to the set + * @param cb continuation called after the element has been added + * @param cb_cls closure for @a cb + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETU_add_element (struct GNUNET_SETU_Handle *set, + const struct GNUNET_SETU_Element *element, + GNUNET_SCHEDULER_TaskCallback cb, + void *cb_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETU_ElementMessage *msg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "adding element of type %u to set %p\n", + (unsigned int) element->element_type, + set); + GNUNET_assert (NULL != set); + if (GNUNET_YES == set->invalid) + { + if (NULL != cb) + cb (cb_cls); + return GNUNET_SYSERR; + } + mqm = GNUNET_MQ_msg_extra (msg, + element->size, + GNUNET_MESSAGE_TYPE_SETU_ADD); + msg->element_type = htons (element->element_type); + GNUNET_memcpy (&msg[1], + element->data, + element->size); + GNUNET_MQ_notify_sent (mqm, + cb, + cb_cls); + GNUNET_MQ_send (set->mq, + mqm); + return GNUNET_OK; +} + + +/** + * Destroy the set handle if no operations are left, mark the set + * for destruction otherwise. + * + * @param set set handle to destroy + */ +void +GNUNET_SETU_destroy (struct GNUNET_SETU_Handle *set) +{ + /* destroying set while iterator is active is currently + not supported; we should expand the API to allow + clients to explicitly cancel the iteration! */ + GNUNET_assert (NULL != set); + if ((NULL != set->ops_head) || + (GNUNET_SYSERR == set->destroy_requested)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Set operations are pending, delaying set destruction\n"); + set->destroy_requested = GNUNET_YES; + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Really destroying set\n"); + if (NULL != set->mq) + { + GNUNET_MQ_destroy (set->mq); + set->mq = NULL; + } + GNUNET_free (set); +} + + +/** + * Prepare a set operation to be evaluated with another peer. + * The evaluation will not start until the client provides + * a local set with #GNUNET_SETU_commit(). + * + * @param other_peer peer with the other set + * @param app_id hash for the application using the set + * @param context_msg additional information for the request + * @param result_cb called on error or success + * @param result_cls closure for @e result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETU_OperationHandle * +GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_HashCode *app_id, + const struct GNUNET_MessageHeader *context_msg, + const struct GNUNET_SETU_Option options[], + GNUNET_SETU_ResultIterator result_cb, + void *result_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETU_OperationHandle *oh; + struct GNUNET_SETU_EvaluateMessage *msg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client prepares set union operation\n"); + oh = GNUNET_new (struct GNUNET_SETU_OperationHandle); + oh->result_cb = result_cb; + oh->result_cls = result_cls; + mqm = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SETU_EVALUATE, + context_msg); + msg->app_id = *app_id; + msg->target_peer = *other_peer; + for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++) + { + switch (opt->type) + { + case GNUNET_SETU_OPTION_BYZANTINE: + msg->byzantine = GNUNET_YES; + msg->byzantine_lower_bound = opt->v.num; + break; + case GNUNET_SETU_OPTION_FORCE_FULL: + msg->force_full = GNUNET_YES; + break; + case GNUNET_SETU_OPTION_FORCE_DELTA: + msg->force_delta = GNUNET_YES; + break; + default: + LOG (GNUNET_ERROR_TYPE_ERROR, + "Option with type %d not recognized\n", + (int) opt->type); + } + } + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; + return oh; +} + + +/** + * Connect to the set service in order to listen for requests. + * + * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect + */ +static void +listen_connect (void *cls); + + +/** + * Check validity of request message for a listen operation + * + * @param cls the listen handle + * @param msg the message + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_request (void *cls, + const struct GNUNET_SETU_RequestMessage *msg) +{ + const struct GNUNET_MessageHeader *context_msg; + + if (ntohs (msg->header.size) == sizeof(*msg)) + return GNUNET_OK; /* no context message is OK */ + context_msg = GNUNET_MQ_extract_nested_mh (msg); + if (NULL == context_msg) + { + /* malformed context message is NOT ok */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle request message for a listen operation + * + * @param cls the listen handle + * @param msg the message + */ +static void +handle_request (void *cls, + const struct GNUNET_SETU_RequestMessage *msg) +{ + struct GNUNET_SETU_ListenHandle *lh = cls; + struct GNUNET_SETU_Request req; + const struct GNUNET_MessageHeader *context_msg; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETU_RejectMessage *rmsg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Processing incoming operation request with id %u\n", + ntohl (msg->accept_id)); + /* we got another valid request => reset the backoff */ + lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + req.accept_id = ntohl (msg->accept_id); + req.accepted = GNUNET_NO; + context_msg = GNUNET_MQ_extract_nested_mh (msg); + /* calling #GNUNET_SETU_accept() in the listen cb will set req->accepted */ + lh->listen_cb (lh->listen_cls, + &msg->peer_id, + context_msg, + &req); + if (GNUNET_YES == req.accepted) + return; /* the accept-case is handled in #GNUNET_SETU_accept() */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Rejected request %u\n", + ntohl (msg->accept_id)); + mqm = GNUNET_MQ_msg (rmsg, + GNUNET_MESSAGE_TYPE_SETU_REJECT); + rmsg->accept_reject_id = msg->accept_id; + GNUNET_MQ_send (lh->mq, + mqm); +} + + +/** + * Our connection with the set service encountered an error, + * re-initialize with exponential back-off. + * + * @param cls the `struct GNUNET_SETU_ListenHandle *` + * @param error reason for the disconnect + */ +static void +handle_client_listener_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SETU_ListenHandle *lh = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Listener broke down (%d), re-connecting\n", + (int) error); + GNUNET_MQ_destroy (lh->mq); + lh->mq = NULL; + lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, + &listen_connect, + lh); + lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff); +} + + +/** + * Connect to the set service in order to listen for requests. + * + * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect + */ +static void +listen_connect (void *cls) +{ + struct GNUNET_SETU_ListenHandle *lh = cls; + struct GNUNET_MQ_MessageHandler mq_handlers[] = { + GNUNET_MQ_hd_var_size (request, + GNUNET_MESSAGE_TYPE_SETU_REQUEST, + struct GNUNET_SETU_RequestMessage, + lh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETU_ListenMessage *msg; + + lh->reconnect_task = NULL; + GNUNET_assert (NULL == lh->mq); + lh->mq = GNUNET_CLIENT_connect (lh->cfg, + "setu", + mq_handlers, + &handle_client_listener_error, + lh); + if (NULL == lh->mq) + return; + mqm = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SETU_LISTEN); + msg->app_id = lh->app_id; + GNUNET_MQ_send (lh->mq, + mqm); +} + + +/** + * Wait for set operation requests for the given application id + * + * @param cfg configuration to use for connecting to + * the set service, needs to be valid for the lifetime of the listen handle + * @param app_id id of the application that handles set operation requests + * @param listen_cb called for each incoming request matching the operation + * and application id + * @param listen_cls handle for @a listen_cb + * @return a handle that can be used to cancel the listen operation + */ +struct GNUNET_SETU_ListenHandle * +GNUNET_SETU_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HashCode *app_id, + GNUNET_SETU_ListenCallback listen_cb, + void *listen_cls) +{ + struct GNUNET_SETU_ListenHandle *lh; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Starting listener for app %s\n", + GNUNET_h2s (app_id)); + lh = GNUNET_new (struct GNUNET_SETU_ListenHandle); + lh->listen_cb = listen_cb; + lh->listen_cls = listen_cls; + lh->cfg = cfg; + lh->app_id = *app_id; + lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + listen_connect (lh); + if (NULL == lh->mq) + { + GNUNET_free (lh); + return NULL; + } + return lh; +} + + +/** + * Cancel the given listen operation. + * + * @param lh handle for the listen operation + */ +void +GNUNET_SETU_listen_cancel (struct GNUNET_SETU_ListenHandle *lh) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Canceling listener %s\n", + GNUNET_h2s (&lh->app_id)); + if (NULL != lh->mq) + { + GNUNET_MQ_destroy (lh->mq); + lh->mq = NULL; + } + if (NULL != lh->reconnect_task) + { + GNUNET_SCHEDULER_cancel (lh->reconnect_task); + lh->reconnect_task = NULL; + } + GNUNET_free (lh); +} + + +/** + * Accept a request we got via #GNUNET_SETU_listen. Must be called during + * #GNUNET_SETU_listen, as the 'struct GNUNET_SETU_Request' becomes invalid + * afterwards. + * Call #GNUNET_SETU_commit to provide the local set to use for the operation, + * and to begin the exchange with the remote peer. + * + * @param request request to accept + * @param result_mode specified how results will be returned, + * see `enum GNUNET_SETU_ResultMode`. + * @param result_cb callback for the results + * @param result_cls closure for @a result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETU_OperationHandle * +GNUNET_SETU_accept (struct GNUNET_SETU_Request *request, + const struct GNUNET_SETU_Option options[], + GNUNET_SETU_ResultIterator result_cb, + void *result_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETU_OperationHandle *oh; + struct GNUNET_SETU_AcceptMessage *msg; + + GNUNET_assert (GNUNET_NO == request->accepted); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client accepts set union operation with id %u\n", + request->accept_id); + request->accepted = GNUNET_YES; + mqm = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SETU_ACCEPT); + msg->accept_reject_id = htonl (request->accept_id); + oh = GNUNET_new (struct GNUNET_SETU_OperationHandle); + oh->result_cb = result_cb; + oh->result_cls = result_cls; + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; + return oh; +} + + +/** + * Commit a set to be used with a set operation. + * This function is called once we have fully constructed + * the set that we want to use for the operation. At this + * time, the P2P protocol can then begin to exchange the + * set information and call the result callback with the + * result information. + * + * @param oh handle to the set operation + * @param set the set to use for the operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETU_commit (struct GNUNET_SETU_OperationHandle *oh, + struct GNUNET_SETU_Handle *set) +{ + if (NULL != oh->set) + { + /* Some other set was already committed for this + * operation, there is a logic bug in the client of this API */ + GNUNET_break (0); + return GNUNET_OK; + } + GNUNET_assert (NULL != set); + if (GNUNET_YES == set->invalid) + return GNUNET_SYSERR; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client commits to SET\n"); + GNUNET_assert (NULL != oh->conclude_mqm); + oh->set = set; + GNUNET_CONTAINER_DLL_insert (set->ops_head, + set->ops_tail, + oh); + oh->request_id = GNUNET_MQ_assoc_add (set->mq, + oh); + *oh->request_id_addr = htonl (oh->request_id); + GNUNET_MQ_send (set->mq, + oh->conclude_mqm); + oh->conclude_mqm = NULL; + oh->request_id_addr = NULL; + return GNUNET_OK; +} + + +/** + * Hash a set element. + * + * @param element the element that should be hashed + * @param[out] ret_hash a pointer to where the hash of @a element + * should be stored + */ +void +GNUNET_SETU_element_hash (const struct GNUNET_SETU_Element *element, + struct GNUNET_HashCode *ret_hash) +{ + struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start (); + + /* It's not guaranteed that the element data is always after the element header, + so we need to hash the chunks separately. */ + GNUNET_CRYPTO_hash_context_read (ctx, + &element->size, + sizeof(uint16_t)); + GNUNET_CRYPTO_hash_context_read (ctx, + &element->element_type, + sizeof(uint16_t)); + GNUNET_CRYPTO_hash_context_read (ctx, + element->data, + element->size); + GNUNET_CRYPTO_hash_context_finish (ctx, + ret_hash); +} + + +/* end of setu_api.c */ diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c new file mode 100644 index 000000000..95119873c --- /dev/null +++ b/src/setu/test_setu_api.c @@ -0,0 +1,360 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/test_setu_api.c + * @brief testcase for setu_api.c + * @author Florian Dold + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_testing_lib.h" +#include "gnunet_setu_service.h" + + +static struct GNUNET_PeerIdentity local_id; + +static struct GNUNET_HashCode app_id; + +static struct GNUNET_SETU_Handle *set1; + +static struct GNUNET_SETU_Handle *set2; + +static struct GNUNET_SETU_ListenHandle *listen_handle; + +static struct GNUNET_SETU_OperationHandle *oh1; + +static struct GNUNET_SETU_OperationHandle *oh2; + +static const struct GNUNET_CONFIGURATION_Handle *config; + +static int ret; + +static struct GNUNET_SCHEDULER_Task *tt; + + +static void +result_cb_set1 (void *cls, + const struct GNUNET_SETU_Element *element, + uint64_t size, + enum GNUNET_SETU_Status status) +{ + switch (status) + { + case GNUNET_SETU_STATUS_ADD_LOCAL: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n"); + break; + + case GNUNET_SETU_STATUS_FAILURE: + GNUNET_break (0); + oh1 = NULL; + fprintf (stderr, "set 1: received failure status!\n"); + ret = 1; + if (NULL != tt) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + } + GNUNET_SCHEDULER_shutdown (); + break; + + case GNUNET_SETU_STATUS_DONE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n"); + oh1 = NULL; + if (NULL != set1) + { + GNUNET_SETU_destroy (set1); + set1 = NULL; + } + if (NULL == set2) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + GNUNET_SCHEDULER_shutdown (); + } + break; + + default: + GNUNET_assert (0); + } +} + + +static void +result_cb_set2 (void *cls, + const struct GNUNET_SETU_Element *element, + uint64_t size, + enum GNUNET_SETU_Status status) +{ + switch (status) + { + case GNUNET_SETU_STATUS_ADD_LOCAL: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n"); + break; + + case GNUNET_SETU_STATUS_FAILURE: + GNUNET_break (0); + oh2 = NULL; + fprintf (stderr, "set 2: received failure status\n"); + GNUNET_SCHEDULER_shutdown (); + ret = 1; + break; + + case GNUNET_SETU_STATUS_DONE: + oh2 = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n"); + GNUNET_SETU_destroy (set2); + set2 = NULL; + if (NULL == set1) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + GNUNET_SCHEDULER_shutdown (); + } + break; + + default: + GNUNET_assert (0); + } +} + + +static void +listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SETU_Request *request) +{ + GNUNET_assert (NULL != context_msg); + GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n"); + oh2 = GNUNET_SETU_accept (request, + (struct GNUNET_SETU_Option[]){ 0 }, + &result_cb_set2, + NULL); + GNUNET_SETU_commit (oh2, set2); +} + + +/** + * Start the set operation. + * + * @param cls closure, unused + */ +static void +start (void *cls) +{ + struct GNUNET_MessageHeader context_msg; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n"); + context_msg.size = htons (sizeof context_msg); + context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); + listen_handle = GNUNET_SETU_listen (config, + &app_id, + &listen_cb, + NULL); + oh1 = GNUNET_SETU_prepare (&local_id, + &app_id, + &context_msg, + (struct GNUNET_SETU_Option[]){ 0 }, + &result_cb_set1, + NULL); + GNUNET_SETU_commit (oh1, set1); +} + + +/** + * Initialize the second set, continue + * + * @param cls closure, unused + */ +static void +init_set2 (void *cls) +{ + struct GNUNET_SETU_Element element; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); + + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SETU_add_element (set2, &element, NULL, NULL); + element.data = "quux"; + element.size = strlen (element.data); + GNUNET_SETU_add_element (set2, &element, NULL, NULL); + element.data = "baz"; + element.size = strlen (element.data); + GNUNET_SETU_add_element (set2, &element, &start, NULL); +} + + +/** + * Initialize the first set, continue. + */ +static void +init_set1 (void) +{ + struct GNUNET_SETU_Element element; + + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SETU_add_element (set1, &element, NULL, NULL); + element.data = "bar"; + element.size = strlen (element.data); + GNUNET_SETU_add_element (set1, &element, &init_set2, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); +} + + +/** + * Function run on timeout. + * + * @param cls closure + */ +static void +timeout_fail (void *cls) +{ + tt = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n"); + GNUNET_SCHEDULER_shutdown (); + ret = 1; +} + + +/** + * Function run on shutdown. + * + * @param cls closure + */ +static void +do_shutdown (void *cls) +{ + if (NULL != tt) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + } + if (NULL != oh1) + { + GNUNET_SETU_operation_cancel (oh1); + oh1 = NULL; + } + if (NULL != oh2) + { + GNUNET_SETU_operation_cancel (oh2); + oh2 = NULL; + } + if (NULL != set1) + { + GNUNET_SETU_destroy (set1); + set1 = NULL; + } + if (NULL != set2) + { + GNUNET_SETU_destroy (set2); + set2 = NULL; + } + if (NULL != listen_handle) + { + GNUNET_SETU_listen_cancel (listen_handle); + listen_handle = NULL; + } +} + + +/** + * Signature of the 'main' function for a (single-peer) testcase that + * is run using 'GNUNET_TESTING_peer_run'. + * + * @param cls closure + * @param cfg configuration of the peer that was started + * @param peer identity of the peer that was created + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +{ + struct GNUNET_SETU_OperationHandle *my_oh; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Running preparatory tests\n"); + tt = GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), + &timeout_fail, + NULL); + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); + + config = cfg; + GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg, + &local_id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "my id (from CRYPTO): %s\n", + GNUNET_i2s (&local_id)); + GNUNET_TESTING_peer_get_identity (peer, + &local_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "my id (from TESTING): %s\n", + GNUNET_i2s (&local_id)); + set1 = GNUNET_SETU_create (cfg); + set2 = GNUNET_SETU_create (cfg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Created sets %p and %p for union operation\n", + set1, + set2); + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); + + /* test if canceling an uncommited request works! */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Launching and instantly stopping set operation\n"); + my_oh = GNUNET_SETU_prepare (&local_id, + &app_id, + NULL, + (struct GNUNET_SETU_Option[]){ 0 }, + NULL, + NULL); + GNUNET_SETU_operation_cancel (my_oh); + + /* test the real set reconciliation */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Running real set-reconciliation\n"); + init_set1 (); +} + + +int +main (int argc, char **argv) +{ + GNUNET_log_setup ("test_setu_api", + "WARNING", + NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Launching peer\n"); + if (0 != + GNUNET_TESTING_peer_run ("test_setu_api", + "test_setu.conf", + &run, + NULL)) + { + return 1; + } + return ret; +} -- cgit v1.2.3