summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Schanzenbach <mschanzenbach@posteo.de>2020-08-16 21:54:40 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2020-08-16 21:54:52 +0200
commit7cb5ed9817617de03b9238d4ee0df0262050b036 (patch)
tree318f9081dedd73391b6d5bc97022584e584e9b92
parent4dca8c3557a50ae5be9c105aa5fb2f85d99d917a (diff)
parentbe0475f2a583d203465d3091ff933806a5ace613 (diff)
Merge branch 'master' of ssh://gnunet.org/gnunet
-rw-r--r--configure.ac2
-rw-r--r--po/POTFILES.in19
-rw-r--r--src/include/gnunet_protocols.h132
-rw-r--r--src/setu/Makefile.am102
-rw-r--r--src/setu/gnunet-service-setu.c3482
-rw-r--r--src/setu/gnunet-service-setu.h393
-rw-r--r--src/setu/gnunet-service-setu_protocol.h226
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.c303
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.h169
-rw-r--r--src/setu/gnunet-setu-ibf-profiler.c308
-rw-r--r--src/setu/gnunet-setu-profiler.c499
-rw-r--r--src/setu/ibf.c409
-rw-r--r--src/setu/ibf.h255
-rw-r--r--src/setu/ibf_sim.c142
-rw-r--r--src/setu/plugin_block_setu_test.c123
-rw-r--r--src/setu/setu.h304
-rw-r--r--src/setu/setu_api.c872
-rw-r--r--src/setu/test_setu_api.c360
18 files changed, 8100 insertions, 0 deletions
diff --git a/configure.ac b/configure.ac
index e492242a6..bb205220c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1937,6 +1937,8 @@ src/scalarproduct/Makefile
src/scalarproduct/scalarproduct.conf
src/set/Makefile
src/set/set.conf
+src/setu/Makefile
+src/setu/setu.conf
src/sq/Makefile
src/statistics/Makefile
src/statistics/statistics.conf
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 1f5cc81c3..c5503c343 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -333,6 +333,25 @@ src/set/ibf.c
src/set/ibf_sim.c
src/set/plugin_block_set_test.c
src/set/set_api.c
+src/seti/gnunet-service-set.c
+src/seti/gnunet-service-set_intersection.c
+src/seti/gnunet-service-set_union.c
+src/seti/gnunet-service-set_union_strata_estimator.c
+src/seti/gnunet-set-ibf-profiler.c
+src/seti/gnunet-set-profiler.c
+src/seti/ibf.c
+src/seti/ibf_sim.c
+src/seti/plugin_block_set_test.c
+src/seti/set_api.c
+src/setu/gnunet-service-set_union.c
+src/setu/gnunet-service-set_union_strata_estimator.c
+src/setu/gnunet-service-setu.c
+src/setu/gnunet-setu-ibf-profiler.c
+src/setu/gnunet-setu-profiler.c
+src/setu/ibf.c
+src/setu/ibf_sim.c
+src/setu/plugin_block_setu_test.c
+src/setu/setu_api.c
src/sq/sq.c
src/sq/sq_exec.c
src/sq/sq_prepare.c
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 5af58664f..c3fcde0b9 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1659,9 +1659,141 @@ extern "C" {
/*******************************************************************************
+ * SETU message types
+ ******************************************************************************/
+
+
+/**
+ * Cancel a set operation
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_CANCEL 550
+
+/**
+ * Add element to set
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_ADD 551
+
+/**
+ * Create a new local set
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_CREATE 552
+
+/**
+ * Handle result message from operation
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_RESULT 553
+
+/**
+ * Evaluate a set operation
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE 554
+
+/**
+ * Listen for operation requests
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_LISTEN 555
+
+/**
+ * Reject a set request.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_REJECT 556
+
+/**
+ * Accept an incoming set request
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT 557
+
+/**
+ * Notify the client of an incoming request from a remote peer
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_REQUEST 558
+
+
+/**
+ * Demand the whole element from the other
+ * peer, given only the hash code.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 565
+
+/**
+ * Demand the whole element from the other
+ * peer, given only the hash code.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 566
+
+/**
+ * Tell the other peer to send us a list of
+ * hashes that match an IBF key.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 567
+
+/**
+ * Tell the other peer which hashes match a
+ * given IBF key.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 568
+
+/**
+ * Request a set union operation from a remote peer.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 581
+
+/**
+ * Strata estimator.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 582
+
+/**
+ * Invertible bloom filter.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 583
+
+/**
+ * Actual set elements.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 584
+
+/**
+ * Requests for the elements with the given hashes.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 585
+
+/**
+ * Set operation is done.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 586
+
+/**
+ * Compressed strata estimator.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 590
+
+/**
+ * Request all missing elements from the other peer,
+ * based on their sets and the elements we previously sent
+ * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 597
+
+/**
+ * Send a set element, not as response to a demand but because
+ * we're sending the full set.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 598
+
+/**
+ * Request all missing elements from the other peer,
+ * based on their sets and the elements we previously sent
+ * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
+ */
+#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 599
+
+
+/*******************************************************************************
* SET message types
******************************************************************************/
+
/**
* Demand the whole element from the other
* peer, given only the hash code.
diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am
new file mode 100644
index 000000000..b37ceba51
--- /dev/null
+++ b/src/setu/Makefile.am
@@ -0,0 +1,102 @@
+# This Makefile.am is in the public domain
+AM_CPPFLAGS = -I$(top_srcdir)/src/include
+
+pkgcfgdir= $(pkgdatadir)/config.d/
+
+libexecdir= $(pkglibdir)/libexec/
+
+plugindir = $(libdir)/gnunet
+
+pkgcfg_DATA = \
+ setu.conf
+
+if USE_COVERAGE
+ AM_CFLAGS = -fprofile-arcs -ftest-coverage
+endif
+
+if HAVE_TESTING
+bin_PROGRAMS = \
+ gnunet-setu-profiler
+
+noinst_PROGRAMS = \
+ gnunet-setu-ibf-profiler
+endif
+
+libexec_PROGRAMS = \
+ gnunet-service-setu
+
+lib_LTLIBRARIES = \
+ libgnunetsetu.la
+
+gnunet_setu_profiler_SOURCES = \
+ gnunet-setu-profiler.c
+gnunet_setu_profiler_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la \
+ libgnunetsetu.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(GN_LIBINTL)
+
+
+gnunet_setu_ibf_profiler_SOURCES = \
+ gnunet-setu-ibf-profiler.c \
+ ibf.c
+gnunet_setu_ibf_profiler_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(GN_LIBINTL)
+
+gnunet_service_setu_SOURCES = \
+ gnunet-service-setu.c gnunet-service-setu.h \
+ ibf.c ibf.h \
+ gnunet-service-setu_strata_estimator.c gnunet-service-setu_strata_estimator.h \
+ gnunet-service-setu_protocol.h
+gnunet_service_setu_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
+ $(top_builddir)/src/cadet/libgnunetcadet.la \
+ $(top_builddir)/src/block/libgnunetblock.la \
+ libgnunetsetu.la \
+ $(GN_LIBINTL)
+
+libgnunetsetu_la_SOURCES = \
+ setu_api.c setu.h
+libgnunetsetu_la_LIBADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(LTLIBINTL)
+libgnunetsetu_la_LDFLAGS = \
+ $(GN_LIB_LDFLAGS)
+
+if HAVE_TESTING
+check_PROGRAMS = \
+ test_setu_api
+endif
+
+if ENABLE_TEST_RUN
+AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME;
+TESTS = $(check_PROGRAMS)
+endif
+
+test_setu_api_SOURCES = \
+ test_setu_api.c
+test_setu_api_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ libgnunetsetu.la
+
+plugin_LTLIBRARIES = \
+ libgnunet_plugin_block_setu_test.la
+
+libgnunet_plugin_block_setu_test_la_SOURCES = \
+ plugin_block_setu_test.c
+libgnunet_plugin_block_setu_test_la_LIBADD = \
+ $(top_builddir)/src/block/libgnunetblock.la \
+ $(top_builddir)/src/block/libgnunetblockgroup.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(LTLIBINTL)
+libgnunet_plugin_block_setu_test_la_LDFLAGS = \
+ $(GN_PLUGIN_LDFLAGS)
+
+
+EXTRA_DIST = \
+ test_setu.conf
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
new file mode 100644
index 000000000..88edc622f
--- /dev/null
+++ b/src/setu/gnunet-service-setu.c
@@ -0,0 +1,3482 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2013-2017, 2020 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file setu/gnunet-service-setu.c
+ * @brief set union operation
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_statistics_service.h"
+#include "gnunet-service-setu.h"
+#include "ibf.h"
+#include "gnunet-service-setu_strata_estimator.h"
+#include "gnunet-service-setu_protocol.h"
+#include "gnunet_statistics_service.h"
+#include <gcrypt.h>
+
+
+#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
+
+/**
+ * How long do we hold on to an incoming channel if there is
+ * no local listener before giving up?
+ */
+#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
+
+/**
+ * Number of IBFs in a strata estimator.
+ */
+#define SE_STRATA_COUNT 32
+
+/**
+ * Size of the IBFs in the strata estimator.
+ */
+#define SE_IBF_SIZE 80
+
+/**
+ * The hash num parameter for the difference digests and strata estimators.
+ */
+#define SE_IBF_HASH_NUM 4
+
+/**
+ * Number of buckets that can be transmitted in one message.
+ */
+#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
+
+/**
+ * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
+ * Choose this value so that computing the IBF is still cheaper
+ * than transmitting all values.
+ */
+#define MAX_IBF_ORDER (20)
+
+/**
+ * Number of buckets used in the ibf per estimated
+ * difference.
+ */
+#define IBF_ALPHA 4
+
+
+/**
+ * Current phase we are in for a union operation.
+ */
+enum UnionOperationPhase
+{
+ /**
+ * We sent the request message, and expect a strata estimator.
+ */
+ PHASE_EXPECT_SE,
+
+ /**
+ * We sent the strata estimator, and expect an IBF. This phase is entered once
+ * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
+ *
+ * XXX: could use better wording.
+ * XXX: repurposed to also expect a "request full set" message, should be renamed
+ *
+ * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
+ */
+ PHASE_EXPECT_IBF,
+
+ /**
+ * Continuation for multi part IBFs.
+ */
+ PHASE_EXPECT_IBF_CONT,
+
+ /**
+ * We are decoding an IBF.
+ */
+ PHASE_INVENTORY_ACTIVE,
+
+ /**
+ * The other peer is decoding the IBF we just sent.
+ */
+ PHASE_INVENTORY_PASSIVE,
+
+ /**
+ * The protocol is almost finished, but we still have to flush our message
+ * queue and/or expect some elements.
+ */
+ PHASE_FINISH_CLOSING,
+
+ /**
+ * In the penultimate phase,
+ * we wait until all our demands
+ * are satisfied. Then we send a done
+ * message, and wait for another done message.
+ */
+ PHASE_FINISH_WAITING,
+
+ /**
+ * In the ultimate phase, we wait until
+ * our demands are satisfied and then
+ * quit (sending another DONE message).
+ */
+ PHASE_DONE,
+
+ /**
+ * After sending the full set, wait for responses with the elements
+ * that the local peer is missing.
+ */
+ PHASE_FULL_SENDING,
+};
+
+
+/**
+ * State of an evaluate operation with another peer.
+ */
+struct OperationState
+{
+ /**
+ * Copy of the set's strata estimator at the time of
+ * creation of this operation.
+ */
+ struct StrataEstimator *se;
+
+ /**
+ * The IBF we currently receive.
+ */
+ struct InvertibleBloomFilter *remote_ibf;
+
+ /**
+ * The IBF with the local set's element.
+ */
+ struct InvertibleBloomFilter *local_ibf;
+
+ /**
+ * Maps unsalted IBF-Keys to elements.
+ * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
+ * Colliding IBF-Keys are linked.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
+
+ /**
+ * Current state of the operation.
+ */
+ enum UnionOperationPhase phase;
+
+ /**
+ * Did we send the client that we are done?
+ */
+ int client_done_sent;
+
+ /**
+ * Number of ibf buckets already received into the @a remote_ibf.
+ */
+ unsigned int ibf_buckets_received;
+
+ /**
+ * Hashes for elements that we have demanded from the other peer.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
+
+ /**
+ * Salt that we're using for sending IBFs
+ */
+ uint32_t salt_send;
+
+ /**
+ * Salt for the IBF we've received and that we're currently decoding.
+ */
+ uint32_t salt_receive;
+
+ /**
+ * Number of elements we received from the other peer
+ * that were not in the local set yet.
+ */
+ uint32_t received_fresh;
+
+ /**
+ * Total number of elements received from the other peer.
+ */
+ uint32_t received_total;
+
+ /**
+ * Initial size of our set, just before
+ * the operation started.
+ */
+ uint64_t initial_size;
+};
+
+
+/**
+ * The key entry is used to associate an ibf key with an element.
+ */
+struct KeyEntry
+{
+ /**
+ * IBF key for the entry, derived from the current salt.
+ */
+ struct IBF_Key ibf_key;
+
+ /**
+ * The actual element associated with the key.
+ *
+ * Only owned by the union operation if element->operation
+ * is #GNUNET_YES.
+ */
+ struct ElementEntry *element;
+
+ /**
+ * Did we receive this element?
+ * Even if element->is_foreign is false, we might
+ * have received the element, so this indicates that
+ * the other peer has it.
+ */
+ int received;
+};
+
+
+/**
+ * Used as a closure for sending elements
+ * with a specific IBF key.
+ */
+struct SendElementClosure
+{
+ /**
+ * The IBF key whose matching elements should be
+ * sent.
+ */
+ struct IBF_Key ibf_key;
+
+ /**
+ * Operation for which the elements
+ * should be sent.
+ */
+ struct Operation *op;
+};
+
+
+/**
+ * Extra state required for efficient set union.
+ */
+struct SetState
+{
+ /**
+ * The strata estimator is only generated once for
+ * each set.
+ * The IBF keys are derived from the element hashes with
+ * salt=0.
+ */
+ struct StrataEstimator *se;
+};
+
+
+/**
+ * A listener is inhabited by a client, and waits for evaluation
+ * requests from remote peers.
+ */
+struct Listener
+{
+ /**
+ * Listeners are held in a doubly linked list.
+ */
+ struct Listener *next;
+
+ /**
+ * Listeners are held in a doubly linked list.
+ */
+ struct Listener *prev;
+
+ /**
+ * Head of DLL of operations this listener is responsible for.
+ * Once the client has accepted/declined the operation, the
+ * operation is moved to the respective set's operation DLLS.
+ */
+ struct Operation *op_head;
+
+ /**
+ * Tail of DLL of operations this listener is responsible for.
+ * Once the client has accepted/declined the operation, the
+ * operation is moved to the respective set's operation DLLS.
+ */
+ struct Operation *op_tail;
+
+ /**
+ * Client that owns the listener.
+ * Only one client may own a listener.
+ */
+ struct ClientState *cs;
+
+ /**
+ * The port we are listening on with CADET.
+ */
+ struct GNUNET_CADET_Port *open_port;
+
+ /**
+ * Application ID for the operation, used to distinguish
+ * multiple operations of the same type with the same peer.
+ */
+ struct GNUNET_HashCode app_id;
+
+};
+
+
+/**
+ * Handle to the cadet service, used to listen for and connect to
+ * remote peers.
+ */
+static struct GNUNET_CADET_Handle *cadet;
+
+/**
+ * Statistics handle.
+ */
+struct GNUNET_STATISTICS_Handle *_GSS_statistics;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
+static struct Listener *listener_head;
+
+/**
+ * Listeners are held in a doubly linked list.
+ */
+static struct Listener *listener_tail;
+
+/**
+ * Number of active clients.
+ */
+static unsigned int num_clients;
+
+/**
+ * Are we in shutdown? if #GNUNET_YES and the number of clients
+ * drops to zero, disconnect from CADET.
+ */
+static int in_shutdown;
+
+/**
+ * Counter for allocating unique IDs for clients, used to identify
+ * incoming operation requests from remote peers, that the client can
+ * choose to accept or refuse. 0 must not be used (reserved for
+ * uninitialized).
+ */
+static uint32_t suggest_id;
+
+
+/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
+ */
+static int
+destroy_key_to_element_iter (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct KeyEntry *k = value;
+
+ GNUNET_assert (NULL != k);
+ if (GNUNET_YES == k->element->remote)
+ {
+ GNUNET_free (k->element);
+ k->element = NULL;
+ }
+ GNUNET_free (k);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Destroy the union operation. Only things specific to the union
+ * operation are destroyed.
+ *
+ * @param op union operation to destroy
+ */
+static void
+union_op_cancel (struct Operation *op)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "destroying union op\n");
+ /* check if the op was canceled twice */
+ GNUNET_assert (NULL != op->state);
+ if (NULL != op->state->remote_ibf)
+ {
+ ibf_destroy (op->state->remote_ibf);
+ op->state->remote_ibf = NULL;
+ }
+ if (NULL != op->state->demanded_hashes)
+ {
+ GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
+ op->state->demanded_hashes = NULL;
+ }
+ if (NULL != op->state->local_ibf)
+ {
+ ibf_destroy (op->state->local_ibf);
+ op->state->local_ibf = NULL;
+ }
+ if (NULL != op->state->se)
+ {
+ strata_estimator_destroy (op->state->se);
+ op->state->se = NULL;
+ }
+ if (NULL != op->state->key_to_element)
+ {
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &destroy_key_to_element_iter,
+ NULL);
+ GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
+ op->state->key_to_element = NULL;
+ }
+ GNUNET_free (op->state);
+ op->state = NULL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "destroying union op done\n");
+}
+
+
+/**
+ * Inform the client that the union operation has failed,
+ * and proceed to destroy the evaluate operation.
+ *
+ * @param op the union operation to fail
+ */
+static void
+fail_union_operation (struct Operation *op)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SETU_ResultMessage *msg;
+
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "union operation failed\n");
+ ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
+ msg->request_id = htonl (op->client_request_id);
+ msg->element_type = htons (0);
+ GNUNET_MQ_send (op->set->cs->mq,
+ ev);
+ _GSS_operation_destroy (op);
+}
+
+
+/**
+ * Derive the IBF key from a hash code and
+ * a salt.
+ *
+ * @param src the hash code
+ * @return the derived IBF key
+ */
+static struct IBF_Key
+get_ibf_key (const struct GNUNET_HashCode *src)
+{
+ struct IBF_Key key;
+ uint16_t salt = 0;
+
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CRYPTO_kdf (&key, sizeof(key),
+ src, sizeof *src,
+ &salt, sizeof(salt),
+ NULL, 0));
+ return key;
+}
+
+
+/**
+ * Context for #op_get_element_iterator
+ */
+struct GetElementContext
+{
+ /**
+ * FIXME.
+ */
+ struct GNUNET_HashCode hash;
+
+ /**
+ * FIXME.
+ */
+ struct KeyEntry *k;
+};
+
+
+/**
+ * Iterator over the mapping from IBF keys to element entries. Checks if we
+ * have an element with a given GNUNET_HashCode.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should search further,
+ * #GNUNET_NO if we've found the element.
+ */
+static int
+op_get_element_iterator (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct GetElementContext *ctx = cls;
+ struct KeyEntry *k = value;
+
+ GNUNET_assert (NULL != k);
+ if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
+ &ctx->hash))
+ {
+ ctx->k = k;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Determine whether the given element is already in the operation's element
+ * set.
+ *
+ * @param op operation that should be tested for 'element_hash'
+ * @param element_hash hash of the element to look for
+ * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
+ */
+static struct KeyEntry *
+op_get_element (struct Operation *op,
+ const struct GNUNET_HashCode *element_hash)
+{
+ int ret;
+ struct IBF_Key ibf_key;
+ struct GetElementContext ctx = { { { 0 } }, 0 };
+
+ ctx.hash = *element_hash;
+
+ ibf_key = get_ibf_key (element_hash);
+ ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
+ (uint32_t) ibf_key.key_val,
+ op_get_element_iterator,
+ &ctx);
+
+ /* was the iteration aborted because we found the element? */
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_assert (NULL != ctx.k);
+ return ctx.k;
+ }
+ return NULL;
+}
+
+
+/**
+ * Insert an element into the union operation's
+ * key-to-element mapping. Takes ownership of 'ee'.
+ * Note that this does not insert the element in the set,
+ * only in the operation's key-element mapping.
+ * This is done to speed up re-tried operations, if some elements
+ * were transmitted, and then the IBF fails to decode.
+ *
+ * XXX: clarify ownership, doesn't sound right.
+ *
+ * @param op the union operation
+ * @param ee the element entry
+ * @parem received was this element received from the remote peer?
+ */
+static void
+op_register_element (struct Operation *op,
+ struct ElementEntry *ee,
+ int received)
+{
+ struct IBF_Key ibf_key;
+ struct KeyEntry *k;
+
+ ibf_key = get_ibf_key (&ee->element_hash);
+ k = GNUNET_new (struct KeyEntry);
+ k->element = ee;
+ k->ibf_key = ibf_key;
+ k->received = received;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
+ (uint32_t) ibf_key.key_val,
+ k,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+}
+
+
+/**
+ * FIXME.
+ */
+static void
+salt_key (const struct IBF_Key *k_in,
+ uint32_t salt,
+ struct IBF_Key *k_out)
+{
+ int s = salt % 64;
+ uint64_t x = k_in->key_val;
+
+ /* rotate ibf key */
+ x = (x >> s) | (x << (64 - s));
+ k_out->key_val = x;
+}
+
+
+/**
+ * FIXME.
+ */
+static void
+unsalt_key (const struct IBF_Key *k_in,
+ uint32_t salt,
+ struct IBF_Key *k_out)
+{
+ int s = salt % 64;
+ uint64_t x = k_in->key_val;
+
+ x = (x << s) | (x >> (64 - s));
+ k_out->key_val = x;
+}
+
+
+/**
+ * Insert a key into an ibf.
+ *
+ * @param cls the ibf
+ * @param key unused
+ * @param value the key entry to get the key from
+ */
+static int
+prepare_ibf_iterator (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct KeyEntry *ke = value;
+ struct IBF_Key salted_key;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "[OP %x] inserting %lx (hash %s) into ibf\n",
+ (void *) op,
+ (unsigned long) ke->ibf_key.key_val,
+ GNUNET_h2s (&ke->element->element_hash));
+ salt_key (&ke->ibf_key,
+ op->state->salt_send,
+ &salted_key);
+ ibf_insert (op->state->local_ibf, salted_key);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Iterator for initializing the
+ * key-to-element mapping of a union operation
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ * into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+init_key_to_element_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee = value;
+
+ /* make sure that the element belongs to the set at the time
+ * of creating the operation */
+ if (GNUNET_NO ==
+ _GSS_is_element_of_operation (ee,
+ op))
+ return GNUNET_YES;
+ GNUNET_assert (GNUNET_NO == ee->remote);
+ op_register_element (op,
+ ee,
+ GNUNET_NO);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+ unsigned int len;
+
+ GNUNET_assert (NULL == op->state->key_to_element);
+ len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
+ op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+ GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+ &init_key_to_element_iterator,
+ op);
+}
+
+
+/**
+ * Create an ibf with the operation's elements
+ * of the specified size
+ *
+ * @param op the union operation
+ * @param size size of the ibf to create
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ */
+static int
+prepare_ibf (struct Operation *op,
+ uint32_t size)
+{
+ GNUNET_assert (NULL != op->state->key_to_element);
+
+ if (NULL != op->state->local_ibf)
+ ibf_destroy (op->state->local_ibf);
+ op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
+ if (NULL == op->state->local_ibf)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to allocate local IBF\n");
+ return GNUNET_SYSERR;
+ }
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &prepare_ibf_iterator,
+ op);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Send an ibf of appropriate size.
+ *
+ * Fragments the IBF into multiple messages if necessary.
+ *
+ * @param op the union operation
+ * @param ibf_order order of the ibf to send, size=2^order
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ */
+static int
+send_ibf (struct Operation *op,
+ uint16_t ibf_order)
+{
+ unsigned int buckets_sent = 0;
+ struct InvertibleBloomFilter *ibf;
+
+ if (GNUNET_OK !=
+ prepare_ibf (op, 1 << ibf_order))
+ {
+ /* allocation failed */
+ return GNUNET_SYSERR;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "sending ibf of size %u\n",
+ 1 << ibf_order);
+
+ {
+ char name[64] = { 0 };
+ snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
+ GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
+ }
+
+ ibf = op->state->local_ibf;
+
+ while (buckets_sent < (1 << ibf_order))
+ {
+ unsigned int buckets_in_message;
+ struct GNUNET_MQ_Envelope *ev;
+ struct IBFMessage *msg;
+
+ buckets_in_message = (1 << ibf_order) - buckets_sent;
+ /* limit to maximum */
+ if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
+ buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
+
+ ev = GNUNET_MQ_msg_extra (msg,
+ buckets_in_message * IBF_BUCKET_SIZE,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
+ msg->reserved1 = 0;
+ msg->reserved2 = 0;
+ msg->order = ibf_order;
+ msg->offset = htonl (buckets_sent);
+ msg->salt = htonl (op->state->salt_send);
+ ibf_write_slice (ibf, buckets_sent,
+ buckets_in_message, &msg[1]);
+ buckets_sent += buckets_in_message;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "ibf chunk size %u, %u/%u sent\n",
+ buckets_in_message,
+ buckets_sent,
+ 1 << ibf_order);
+ GNUNET_MQ_send (op->mq, ev);
+ }
+
+ /* The other peer must decode the IBF, so
+ * we're passive. */
+ op->state->phase = PHASE_INVENTORY_PASSIVE;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Compute the necessary order of an ibf
+ * from the size of the symmetric set difference.
+ *
+ * @param diff the difference
+ * @return the required size of the ibf
+ */
+static unsigned int
+get_order_from_difference (unsigned int diff)
+{
+ unsigned int ibf_order;
+
+ ibf_order = 2;
+ while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
+ ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
+ (ibf_order < MAX_IBF_ORDER))
+ ibf_order++;
+ // add one for correction
+ return ibf_order + 1;
+}
+
+
+/**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ * into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_full_element_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct GNUNET_SETU_ElementMessage *emsg;
+ struct ElementEntry *ee = value;
+ struct GNUNET_SETU_Element *el = &ee->element;
+ struct GNUNET_MQ_Envelope *ev;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending element %s\n",
+ GNUNET_h2s (key));
+ ev = GNUNET_MQ_msg_extra (emsg,
+ el->size,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+ emsg->element_type = htons (el->element_type);
+ GNUNET_memcpy (&emsg[1],
+ el->data,
+ el->size);
+ GNUNET_MQ_send (op->mq,
+ ev);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Switch to full set transmission for @a op.
+ *
+ * @param op operation to switch to full set transmission.
+ */
+static void
+send_full_set (struct Operation *op)
+{
+ struct GNUNET_MQ_Envelope *ev;
+
+ op->state->phase = PHASE_FULL_SENDING;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Dedicing to transmit the full set\n");
+ /* FIXME: use a more memory-friendly way of doing this with an
+ iterator, just as we do in the non-full case! */
+ (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+ &send_full_element_iterator,
+ op);
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq,
+ ev);
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static int
+check_union_p2p_strata_estimator (void *cls,
+ const struct StrataEstimatorMessage *msg)
+{
+ struct Operation *op = cls;
+ int is_compressed;
+ size_t len;
+
+ if (op->state->phase != PHASE_EXPECT_SE)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
+ msg->header.type));
+ len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
+ if ((GNUNET_NO == is_compressed) &&
+ (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static void
+handle_union_p2p_strata_estimator (void *cls,
+ const struct StrataEstimatorMessage *msg)
+{
+ struct Operation *op = cls;
+ struct StrataEstimator *remote_se;
+ unsigned int diff;
+ uint64_t other_size;
+ size_t len;
+ int is_compressed;
+
+ is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
+ msg->header.type));
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# bytes of SE received",
+ ntohs (msg->header.size),
+ GNUNET_NO);
+ len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
+ other_size = GNUNET_ntohll (msg->set_size);
+ remote_se = strata_estimator_create (SE_STRATA_COUNT,
+ SE_IBF_SIZE,
+ SE_IBF_HASH_NUM);
+ if (NULL == remote_se)
+ {
+ /* insufficient resources, fail */
+ fail_union_operation (op);
+ return;
+ }
+ if (GNUNET_OK !=
+ strata_estimator_read (&msg[1],
+ len,
+ is_compressed,
+ remote_se))
+ {
+ /* decompression failed */
+ strata_estimator_destroy (remote_se);
+ fail_union_operation (op);
+ return;
+ }
+ GNUNET_assert (NULL != op->state->se);
+ diff = strata_estimator_difference (remote_se,
+ op->state->se);
+
+ if (diff > 200)
+ diff = diff * 3 / 2;
+
+ strata_estimator_destroy (remote_se);
+ strata_estimator_destroy (op->state->se);
+ op->state->se = NULL;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got se diff=%d, using ibf size %d\n",
+ diff,
+ 1U << get_order_from_difference (diff));
+
+ {
+ char *set_debug;
+
+ set_debug = getenv ("GNUNET_SETU_BENCHMARK");
+ if ((NULL != set_debug) &&
+ (0 == strcmp (set_debug, "1")))
+ {
+ FILE *f = fopen ("set.log", "a");
+ fprintf (f, "%llu\n", (unsigned long long) diff);
+ fclose (f);
+ }
+ }
+
+ if ((GNUNET_YES == op->byzantine) &&
+ (other_size < op->byzantine_lower_bound))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ if ((GNUNET_YES == op->force_full) ||
+ (diff > op->state->initial_size / 4) ||
+ (0 == other_size))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
+ diff,
+ op->state->initial_size);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of full sends",
+ 1,
+ GNUNET_NO);
+ if ((op->state->initial_size <= other_size) ||
+ (0 == other_size))
+ {
+ send_full_set (op);
+ }
+ else
+ {
+ struct GNUNET_MQ_Envelope *ev;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Telling other peer that we expect its full set\n");
+ op->state->phase = PHASE_EXPECT_IBF;
+ ev = GNUNET_MQ_msg_header (
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
+ GNUNET_MQ_send (op->mq,
+ ev);
+ }
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of ibf sends",
+ 1,
+ GNUNET_NO);
+ if (GNUNET_OK !=
+ send_ibf (op,
+ get_order_from_difference (diff)))
+ {
+ /* Internal error, best we can do is shut the connection */
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to send IBF, closing connection\n");
+ fail_union_operation (op);
+ return;
+ }
+ }
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Iterator to send elements to a remote peer
+ *
+ * @param cls closure with the element key and the union operation
+ * @param key ignored
+ * @param value the key entry
+ */
+static int
+send_offers_iterator (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct SendElementClosure *sec = cls;
+ struct Operation *op = sec->op;
+ struct KeyEntry *ke = value;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_MessageHeader *mh;
+
+ /* Detect 32-bit key collision for the 64-bit IBF keys. */
+ if (ke->ibf_key.key_val != sec->ibf_key.key_val)
+ return GNUNET_YES;
+
+ ev = GNUNET_MQ_msg_header_extra (mh,
+ sizeof(struct GNUNET_HashCode),
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
+
+ GNUNET_assert (NULL != ev);
+ *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "[OP %x] sending element offer (%s) to peer\n",
+ (void *) op,
+ GNUNET_h2s (&ke->element->element_hash));
+ GNUNET_MQ_send (op->mq, ev);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
+ *
+ * @param op union operation
+ * @param ibf_key IBF key of interest
+ */
+static void
+send_offers_for_key (struct Operation *op,
+ struct IBF_Key ibf_key)
+{
+ struct SendElementClosure send_cls;
+
+ send_cls.ibf_key = ibf_key;
+ send_cls.op = op;
+ (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
+ op->state->key_to_element,
+ (uint32_t) ibf_key.
+ key_val,
+ &send_offers_iterator,
+ &send_cls);
+}
+
+
+/**
+ * Decode which elements are missing on each side, and
+ * send the appropriate offers and inquiries.
+ *
+ * @param op union operation
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ */
+static int
+decode_and_send (struct Operation *op)
+{
+ struct IBF_Key key;
+ struct IBF_Key last_key;
+ int side;
+ unsigned int num_decoded;
+ struct InvertibleBloomFilter *diff_ibf;
+
+ GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
+
+ if (GNUNET_OK !=
+ prepare_ibf (op,
+ op->state->remote_ibf->size))
+ {
+ GNUNET_break (0);
+ /* allocation failed */
+ return GNUNET_SYSERR;
+ }
+ diff_ibf = ibf_dup (op->state->local_ibf);
+ ibf_subtract (diff_ibf,
+ op->state->remote_ibf);
+
+ ibf_destroy (op->state->remote_ibf);
+ op->state->remote_ibf = NULL;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "decoding IBF (size=%u)\n",
+ diff_ibf->size);
+
+ num_decoded = 0;
+ key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
+
+ while (1)
+ {
+ int res;
+ int cycle_detected = GNUNET_NO;
+
+ last_key = key;
+
+ res = ibf_decode (diff_ibf, &side, &key);
+ if (res == GNUNET_OK)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "decoded ibf key %lx\n",
+ (unsigned long) key.key_val);
+ num_decoded += 1;
+ if ((num_decoded > diff_ibf->size) ||
+ ((num_decoded > 1) &&
+ (last_key.key_val == key.key_val)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "detected cyclic ibf (decoded %u/%u)\n",
+ num_decoded,
+ diff_ibf->size);
+ cycle_detected = GNUNET_YES;
+ }
+ }
+ if ((GNUNET_SYSERR == res) ||
+ (GNUNET_YES == cycle_detected))
+ {
+ int next_order;
+ next_order = 0;
+ while (1 << next_order < diff_ibf->size)
+ next_order++;
+ next_order++;
+ if (next_order <= MAX_IBF_ORDER)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "decoding failed, sending larger ibf (size %u)\n",
+ 1 << next_order);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of IBF retries",
+ 1,
+ GNUNET_NO);
+ op->state->salt_send++;
+ if (GNUNET_OK !=
+ send_ibf (op, next_order))
+ {
+ /* Internal error, best we can do is shut the connection */
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to send IBF, closing connection\n");
+ fail_union_operation (op);
+ ibf_destroy (diff_ibf);
+ return GNUNET_SYSERR;
+ }
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of failed union operations (too large)",
+ 1,
+ GNUNET_NO);
+ // XXX: Send the whole set, element-by-element
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "set union failed: reached ibf limit\n");
+ fail_union_operation (op);
+ ibf_destroy (diff_ibf);
+ return GNUNET_SYSERR;
+ }
+ break;
+ }
+ if (GNUNET_NO == res)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "transmitted all values, sending DONE\n");
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+ /* We now wait until we get a DONE message back
+ * and then wait for our MQ to be flushed and all our
+ * demands be delivered. */
+ break;
+ }
+ if (1 == side)
+ {
+ struct IBF_Key unsalted_key;
+
+ unsalt_key (&key,
+ op->state->salt_receive,
+ &unsalted_key);
+ send_offers_for_key (op,
+ unsalted_key);
+ }
+ else if (-1 == side)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+ struct InquiryMessage *msg;
+
+ /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
+ * the effort additional complexity. */
+ ev = GNUNET_MQ_msg_extra (msg,
+ sizeof(struct IBF_Key),
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
+ msg->salt = htonl (op->state->salt_receive);
+ GNUNET_memcpy (&msg[1],
+ &key,
+ sizeof(struct IBF_Key));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "sending element inquiry for IBF key %lx\n",
+ (unsigned long) key.key_val);
+ GNUNET_MQ_send (op->mq, ev);
+ }
+ else
+ {
+ GNUNET_assert (0);
+ }
+ }
+ ibf_destroy (diff_ibf);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Check an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+static int
+check_union_p2p_ibf (void *cls,
+ const struct IBFMessage *msg)
+{
+ struct Operation *op = cls;
+ unsigned int buckets_in_message;
+
+ buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
+ / IBF_BUCKET_SIZE;
+ if (0 == buckets_in_message)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
+ * IBF_BUCKET_SIZE)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (op->state->phase == PHASE_EXPECT_IBF_CONT)
+ {
+ if (ntohl (msg->offset) != op->state->ibf_buckets_received)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (1 << msg->order != op->state->remote_ibf->size)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (ntohl (msg->salt) != op->state->salt_receive)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ }
+ else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+ (op->state->phase != PHASE_EXPECT_IBF))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ */
+static void
+handle_union_p2p_ibf (void *cls,
+ const struct IBFMessage *msg)
+{
+ struct Operation *op = cls;
+ unsigned int buckets_in_message;
+
+ buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
+ / IBF_BUCKET_SIZE;
+ if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
+ (op->state->phase == PHASE_EXPECT_IBF))
+ {
+ op->state->phase = PHASE_EXPECT_IBF_CONT;
+ GNUNET_assert (NULL == op->state->remote_ibf);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new ibf of size %u\n",
+ 1 << msg->order);
+ op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
+ op->state->salt_receive = ntohl (msg->salt);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving new IBF with salt %u\n",
+ op->state->salt_receive);
+ if (NULL == op->state->remote_ibf)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to parse remote IBF, closing connection\n");
+ fail_union_operation (op);
+ return;
+ }
+ op->state->ibf_buckets_received = 0;
+ if (0 != ntohl (msg->offset))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ }
+ else
+ {
+ GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received more of IBF\n");
+ }
+ GNUNET_assert (NULL != op->state->remote_ibf);
+
+ ibf_read_slice (&msg[1],
+ op->state->ibf_buckets_received,
+ buckets_in_message,
+ op->state->remote_ibf);
+ op->state->ibf_buckets_received += buckets_in_message;
+
+ if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "received full ibf\n");
+ op->state->phase = PHASE_INVENTORY_ACTIVE;
+ if (GNUNET_OK !=
+ decode_and_send (op))
+ {
+ /* Internal error, best we can do is shut down */
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to decode IBF, closing connection\n");
+ fail_union_operation (op);
+ return;
+ }
+ }
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Send a result message to the client indicating
+ * that there is a new element.
+ *
+ * @param op union operation
+ * @param element element to send
+ * @param status status to send with the new element
+ */
+static void
+send_client_element (struct Operation *op,
+ const struct GNUNET_SETU_Element *element,
+ int status)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SETU_ResultMessage *rm;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "sending element (size %u) to client\n",
+ element->size);
+ GNUNET_assert (0 != op->client_request_id);
+ ev = GNUNET_MQ_msg_extra (rm,
+ element->size,
+ GNUNET_MESSAGE_TYPE_SET_RESULT);
+ if (NULL == ev)
+ {
+ GNUNET_MQ_discard (ev);
+ GNUNET_break (0);
+ return;
+ }
+ rm->result_status = htons (status);
+ rm->request_id = htonl (op->client_request_id);
+ rm->element_type = htons (element->element_type);
+ rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
+ op->state->key_to_element));
+ GNUNET_memcpy (&rm[1],
+ element->data,
+ element->size);
+ GNUNET_MQ_send (op->set->cs->mq,
+ ev);
+}
+
+
+/**
+ * Signal to the client that the operation has finished and
+ * destroy the operation.
+ *
+ * @param cls operation to destroy
+ */
+static void
+send_client_done (void *cls)
+{
+ struct Operation *op = cls;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SETU_ResultMessage *rm;
+
+ if (GNUNET_YES == op->state->client_done_sent)
+ {
+ return;
+ }
+
+ if (PHASE_DONE != op->state->phase)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Union operation failed\n");
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# Union operations failed",
+ 1,
+ GNUNET_NO);
+ ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
+ rm->request_id = htonl (op->client_request_id);
+ rm->element_type = htons (0);
+ GNUNET_MQ_send (op->set->cs->mq,
+ ev);
+ return;
+ }
+
+ op->state->client_done_sent = GNUNET_YES;
+
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# Union operations succeeded",
+ 1,
+ GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Signalling client that union operation is done\n");
+ ev = GNUNET_MQ_msg (rm,
+ GNUNET_MESSAGE_TYPE_SET_RESULT);
+ rm->request_id = htonl (op->client_request_id);
+ rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
+ rm->element_type = htons (0);
+ rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
+ op->state->key_to_element));
+ GNUNET_MQ_send (op->set->cs->mq,
+ ev);
+}
+
+
+/**
+ * Tests if the operation is finished, and if so notify.
+ *
+ * @param op operation to check
+ */
+static void
+maybe_finish (struct Operation *op)
+{
+ unsigned int num_demanded;
+
+ num_demanded = GNUNET_CONTAINER_multihashmap_size (
+ op->state->demanded_hashes);
+
+ if (PHASE_FINISH_WAITING == op->state->phase)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "In PHASE_FINISH_WAITING, pending %u demands\n",
+ num_demanded);
+ if (0 == num_demanded)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+
+ op->state->phase = PHASE_DONE;
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
+ GNUNET_MQ_send (op->mq,
+ ev);
+ /* We now wait until the other peer sends P2P_OVER
+ * after it got all elements from us. */
+ }
+ }
+ if (PHASE_FINISH_CLOSING == op->state->phase)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "In PHASE_FINISH_CLOSING, pending %u demands\n",
+ num_demanded);
+ if (0 == num_demanded)
+ {
+ op->state->phase = PHASE_DONE;
+ send_client_done (op);
+ _GSS_operation_destroy2 (op);
+ }
+ }
+}
+
+
+/**
+ * Check an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static int
+check_union_p2p_elements (void *cls,
+ const struct GNUNET_SETU_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+
+ if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static void
+handle_union_p2p_elements (void *cls,
+ const struct GNUNET_SETU_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct KeyEntry *ke;
+ uint16_t element_size;
+
+ element_size = ntohs (emsg->header.size) - sizeof(struct
+ GNUNET_SETU_ElementMessage);
+ ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
+ GNUNET_memcpy (&ee[1],
+ &emsg[1],
+ element_size);
+ ee->element.size = element_size;
+ ee->element.data = &ee[1];
+ ee->element.element_type = ntohs (emsg->element_type);
+ ee->remote = GNUNET_YES;
+ GNUNET_SETU_element_hash (&ee->element,
+ &ee->element_hash);
+ if (GNUNET_NO ==
+ GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
+ &ee->element_hash,
+ NULL))
+ {
+ /* We got something we didn't demand, since it's not in our map. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got element (size %u, hash %s) from peer\n",
+ (unsigned int) element_size,
+ GNUNET_h2s (&ee->element_hash));
+
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# received elements",
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# exchanged elements",
+ 1,
+ GNUNET_NO);
+
+ op->state->received_total++;
+
+ ke = op_get_element (op, &ee->element_hash);
+ if (NULL != ke)
+ {
+ /* Got repeated element. Should not happen since
+ * we track demands. */
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# repeated elements",
+ 1,
+ GNUNET_NO);
+ ke->received = GNUNET_YES;
+ GNUNET_free (ee);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Registering new element from remote peer\n");
+ op->state->received_fresh++;
+ op_register_element (op, ee, GNUNET_YES);
+ /* only send results immediately if the client wants it */
+ send_client_element (op,
+ &ee->element,
+ GNUNET_SETU_STATUS_ADD_LOCAL);
+ }
+
+ if ((op->state->received_total > 8) &&
+ (op->state->received_fresh < op->state->received_total / 3))
+ {
+ /* The other peer gave us lots of old elements, there's something wrong. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ GNUNET_CADET_receive_done (op->channel);
+ maybe_finish (op);
+}
+
+
+/**
+ * Check a full element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static int
+check_union_p2p_full_element (void *cls,
+ const struct GNUNET_SETU_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+
+ (void) op;
+ // FIXME: check that we expect full elements here?
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+static void
+handle_union_p2p_full_element (void *cls,
+ const struct GNUNET_SETU_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct KeyEntry *ke;
+ uint16_t element_size;
+
+ element_size = ntohs (emsg->header.size)
+ - sizeof(struct GNUNET_SETU_ElementMessage);
+ ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
+ GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+ ee->element.size = element_size;
+ ee->element.data = &ee[1];
+ ee->element.element_type = ntohs (emsg->element_type);
+ ee->remote = GNUNET_YES;
+ GNUNET_SETU_element_hash (&ee->element, &ee->element_hash);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got element (full diff, size %u, hash %s) from peer\n",
+ (unsigned int) element_size,
+ GNUNET_h2s (&ee->element_hash));
+
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# received elements",
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# exchanged elements",
+ 1,
+ GNUNET_NO);
+
+ op->state->received_total++;
+
+ ke = op_get_element (op, &ee->element_hash);
+ if (NULL != ke)
+ {
+ /* Got repeated element. Should not happen since
+ * we track demands. */
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# repeated elements",
+ 1,
+ GNUNET_NO);
+ ke->received = GNUNET_YES;
+ GNUNET_free (ee);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Registering new element from remote peer\n");
+ op->state->received_fresh++;
+ op_register_element (op, ee, GNUNET_YES);
+ /* only send results immediately if the client wants it */
+ send_client_element (op,
+ &ee->element,
+ GNUNET_SETU_STATUS_ADD_LOCAL);
+ }
+
+ if ((GNUNET_YES == op->byzantine) &&
+ (op->state->received_total > 384 + op->state->received_fresh * 4) &&
+ (op->state->received_fresh < op->state->received_total / 6))
+ {
+ /* The other peer gave us lots of old elements, there's something wrong. */
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Other peer sent only %llu/%llu fresh elements, failing operation\n",
+ (unsigned long long) op->state->received_fresh,
+ (unsigned long long) op->state->received_total);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static int
+check_union_p2p_inquiry (void *cls,
+ const struct InquiryMessage *msg)
+{
+ struct Operation *op = cls;
+ unsigned int num_keys;
+
+ if (op->state->phase != PHASE_INVENTORY_PASSIVE)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
+ / sizeof(struct IBF_Key);
+ if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
+ != num_keys * sizeof(struct IBF_Key))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+static void
+handle_union_p2p_inquiry (void *cls,
+ const struct InquiryMessage *msg)
+{
+ struct Operation *op = cls;
+ const struct IBF_Key *ibf_key;
+ unsigned int num_keys;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received union inquiry\n");
+ num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
+ / sizeof(struct IBF_Key);
+ ibf_key = (const struct IBF_Key *) &msg[1];
+ while (0 != num_keys--)
+ {
+ struct IBF_Key unsalted_key;
+
+ unsalt_key (ibf_key,
+ ntohl (msg->salt),
+ &unsalted_key);
+ send_offers_for_key (op,
+ unsalted_key);
+ ibf_key++;
+ }
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
+ */
+static int
+send_missing_full_elements_iter (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct KeyEntry *ke = value;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SETU_ElementMessage *emsg;
+ struct ElementEntry *ee = ke->element;
+
+ if (GNUNET_YES == ke->received)
+ return GNUNET_YES;
+ ev = GNUNET_MQ_msg_extra (emsg,
+ ee->element.size,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+ GNUNET_memcpy (&emsg[1],
+ ee->element.data,
+ ee->element.size);
+ emsg->element_type = htons (ee->element.element_type);
+ GNUNET_MQ_send (op->mq,
+ ev);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handle a request for full set transmission.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_union_p2p_request_full (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for full set transmission\n");
+ if (PHASE_EXPECT_IBF != op->state->phase)
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ // FIXME: we need to check that our set is larger than the
+ // byzantine_lower_bound by some threshold
+ send_full_set (op);
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_union_p2p_full_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ switch (op->state->phase)
+ {
+ case PHASE_EXPECT_IBF:
+ {
+ struct GNUNET_MQ_Envelope *ev;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got FULL DONE, sending elements that other peer is missing\n");
+
+ /* send all the elements that did not come from the remote peer */
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &send_missing_full_elements_iter,
+ op);
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq,
+ ev);
+ op->state->phase = PHASE_DONE;
+ /* we now wait until the other peer sends us the OVER message*/
+ }
+ break;
+
+ case PHASE_FULL_SENDING:
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got FULL DONE, finishing\n");
+ /* We sent the full set, and got the response for that. We're done. */
+ op->state->phase = PHASE_DONE;
+ GNUNET_CADET_receive_done (op->channel);
+ send_client_done (op);
+ _GSS_operation_destroy2 (op);
+ return;
+ }
+ break;
+
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Handle full done phase is %u\n",
+ (unsigned) op->state->phase);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+static int
+check_union_p2p_demand (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ unsigned int num_hashes;
+
+ (void) op;
+ num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+ / sizeof(struct GNUNET_HashCode);
+ if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+ != num_hashes * sizeof(struct GNUNET_HashCode))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_union_p2p_demand (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct GNUNET_SETU_ElementMessage *emsg;
+ const struct GNUNET_HashCode *hash;
+ unsigned int num_hashes;
+ struct GNUNET_MQ_Envelope *ev;
+
+ num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+ / sizeof(struct GNUNET_HashCode);
+ for (hash = (const struct GNUNET_HashCode *) &mh[1];
+ num_hashes > 0;
+ hash++, num_hashes--)
+ {
+ ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
+ hash);
+ if (NULL == ee)
+ {
+ /* Demand for non-existing element. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+ {
+ /* Probably confused lazily copied sets. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
+ GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+ GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+ emsg->reserved = htons (0);
+ emsg->element_type = htons (ee->element.element_type);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
+ (void *) op,
+ (unsigned int) ee->element.size,
+ GNUNET_h2s (&ee->element_hash));
+ GNUNET_MQ_send (op->mq, ev);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# exchanged elements",
+ 1,
+ GNUNET_NO);
+ }
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check offer (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+static int
+check_union_p2p_offer (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ unsigned int num_hashes;
+
+ /* look up elements and send them */
+ if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+ (op->state->phase != PHASE_INVENTORY_ACTIVE))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+ / sizeof(struct GNUNET_HashCode);
+ if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
+ num_hashes * sizeof(struct GNUNET_HashCode))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle offers (of `struct GNUNET_HashCode`s) and
+ * respond with demands (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_union_p2p_offer (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ const struct GNUNET_HashCode *hash;
+ unsigned int num_hashes;
+
+ num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
+ / sizeof(struct GNUNET_HashCode);
+ for (hash = (const struct GNUNET_HashCode *) &mh[1];
+ num_hashes > 0;
+ hash++, num_hashes--)
+ {
+ struct ElementEntry *ee;
+ struct GNUNET_MessageHeader *demands;
+ struct GNUNET_MQ_Envelope *ev;
+
+ ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
+ hash);
+ if (NULL != ee)
+ if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
+ continue;
+
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
+ hash))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Skipped sending duplicate demand\n");
+ continue;
+ }
+
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (
+ op->state->demanded_hashes,
+ hash,
+ NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "[OP %x] Requesting element (hash %s)\n",
+ (void *) op, GNUNET_h2s (hash));
+ ev = GNUNET_MQ_msg_header_extra (demands,
+ sizeof(struct GNUNET_HashCode),
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
+ GNUNET_memcpy (&demands[1],
+ hash,
+ sizeof(struct GNUNET_HashCode));
+ GNUNET_MQ_send (op->mq, ev);
+ }
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Handle a done message from a remote peer
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_union_p2p_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ switch (op->state->phase)
+ {
+ case PHASE_INVENTORY_PASSIVE:
+ /* We got all requests, but still have to send our elements in response. */
+ op->state->phase = PHASE_FINISH_WAITING;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got DONE (as passive partner), waiting for our demands to be satisfied\n");
+ /* The active peer is done sending offers
+ * and inquiries. This means that all
+ * our responses to that (demands and offers)
+ * must be in flight (queued or in mesh).
+ *
+ * We should notify the active peer once
+ * all our demands are satisfied, so that the active
+ * peer can quit if we gave it everything.
+ */GNUNET_CADET_receive_done (op->channel);
+ maybe_finish (op);
+ return;
+
+ case PHASE_INVENTORY_ACTIVE:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got DONE (as active partner), waiting to finish\n");
+ /* All demands of the other peer are satisfied,
+ * and we processed all offers, thus we know
+ * exactly what our demands must be.
+ *
+ * We'll close the channel
+ * to the other peer once our demands are met.
+ */op->state->phase = PHASE_FINISH_CLOSING;
+ GNUNET_CADET_receive_done (op->channel);
+ maybe_finish (op);
+ return;
+
+ default:
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+}
+
+
+/**
+ * Handle a over message from a remote peer
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_union_p2p_over (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ send_client_done (cls);
+}
+
+
+/**
+ * Initiate operation to evaluate a set union with a remote peer.
+ *
+ * @param op operation to perform (to be initialized)
+ * @param opaque_context message to be transmitted to the listener
+ * to convince it to accept, may be NULL
+ */
+static struct OperationState *
+union_evaluate (struct Operation *op,
+ const struct GNUNET_MessageHeader *opaque_context)
+{
+ struct OperationState *state;
+ struct GNUNET_MQ_Envelope *ev;
+ struct OperationRequestMessage *msg;
+
+ ev = GNUNET_MQ_msg_nested_mh (msg,
+ GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+ opaque_context);
+ if (NULL == ev)
+ {
+ /* the context message is too large */
+ GNUNET_break (0);
+ return NULL;
+ }
+ state = GNUNET_new (struct OperationState);
+ state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
+ GNUNET_NO);
+ /* copy the current generation's strata estimator for this operation */
+ state->se = strata_estimator_dup (op->set->state->se);
+ /* we started the operation, thus we have to send the operation request */
+ state->phase = PHASE_EXPECT_SE;
+ state->salt_receive = state->salt_send = 42; // FIXME?????
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Initiating union operation evaluation\n");
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of total union operations",
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of initiated union operations",
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_send (op->mq,
+ ev);
+
+ if (NULL != opaque_context)
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "sent op request with context message\n");
+ else
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "sent op request without context message\n");
+
+ op->state = state;
+ initialize_key_to_element (op);
+ state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
+ state->key_to_element);
+ return state;
+}
+
+
+/**
+ * Get the incoming socket associated with the given id.
+ *
+ * @param listener the listener to look in
+ * @param id id to look for
+ * @return the incoming socket associated with the id,
+ * or NULL if there is none
+ */
+static struct Operation *
+get_incoming (uint32_t id)
+{
+ for (struct Listener *listener = listener_head;
+ NULL != listener;
+ listener = listener->next)
+ {
+ for (struct Operation *op = listener->op_head;
+ NULL != op;
+ op = op->next)
+ if (op->suggest_id == id)
+ return op;
+ }
+ return NULL;
+}
+
+
+/**
+ * Destroy an incoming request from a remote peer
+ *
+ * @param op remote request to destroy
+ */
+static void
+incoming_destroy (struct Operation *op)
+{
+ struct Listener *listener;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying incoming operation %p\n",
+ op);
+ if (NULL != (listener = op->listener))
+ {
+ GNUNET_CONTAINER_DLL_remove (listener->op_head,
+ listener->op_tail,
+ op);
+ op->listener = NULL;
+ }
+ if (NULL != op->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (op->timeout_task);
+ op->timeout_task = NULL;
+ }
+ _GSS_operation_destroy2 (op);
+}
+
+
+/**
+ * Is element @a ee part of the set used by @a op?
+ *
+ * @param ee element to test
+ * @param op operation the defines the set and its generation
+ * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
+ */
+int
+_GSS_is_element_of_operation (struct ElementEntry *ee,
+ struct Operation *op)
+{
+ return ee->generation >= op->generation_created;
+}
+
+
+/**
+ * Destroy the given operation. Used for any operation where both
+ * peers were known and that thus actually had a vt and channel. Must
+ * not be used for operations where 'listener' is still set and we do
+ * not know the other peer.
+ *
+ * Call the implementation-specific cancel function of the operation.
+ * Disconnects from the remote peer. Does not disconnect the client,
+ * as there may be multiple operations per set.
+ *
+ * @param op operation to destroy
+ */
+void
+_GSS_operation_destroy (struct Operation *op)
+{
+ struct Set *set = op->set;
+ struct GNUNET_CADET_Channel *channel;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying operation %p\n", op);
+ GNUNET_assert (NULL == op->listener);
+ if (NULL != op->state)
+ {
+ union_op_cancel (op);
+ op->state = NULL;
+ }
+ if (NULL != set)
+ {
+ GNUNET_CONTAINER_DLL_remove (set->ops_head,
+ set->ops_tail,
+ op);
+ op->set = NULL;
+ }
+ if (NULL != op->context_msg)
+ {
+ GNUNET_free (op->context_msg);
+ op->context_msg = NULL;
+ }
+ if (NULL != (channel = op->channel))
+ {
+ /* This will free op; called conditionally as this helper function
+ is also called from within the channel disconnect handler. */
+ op->channel = NULL;
+ GNUNET_CADET_channel_destroy (channel);
+ }
+ /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
+ * there was a channel end handler that will free 'op' on the call stack. */
+}
+
+
+/**
+ * Callback called when a client connects to the service.
+ *
+ * @param cls closure for the service
+ * @param c the new client that connected to the service
+ * @param mq the message queue used to send messages to the client
+ * @return @a `struct ClientState`
+ */
+static void *
+client_connect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *c,
+ struct GNUNET_MQ_Handle *mq)
+{
+ struct ClientState *cs;
+
+ num_clients++;
+ cs = GNUNET_new (struct ClientState);
+ cs->client = c;
+ cs->mq = mq;
+ return cs;
+}
+
+
+/**
+ * Iterator over hash map entries to free element entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value a `struct ElementEntry *` to be free'd
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+destroy_elements_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct ElementEntry *ee = value;
+
+ GNUNET_free (ee);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Clean up after a client has disconnected
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ * @param internal_cls the `struct ClientState`
+ */
+static void
+client_disconnect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ void *internal_cls)
+{
+ struct ClientState *cs = internal_cls;
+ struct Operation *op;
+ struct Listener *listener;
+ struct Set *set;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client disconnected, cleaning up\n");
+ if (NULL != (set = cs->set))
+ {
+ struct SetContent *content = set->content;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying client's set\n");
+ /* Destroy pending set operations */
+ while (NULL != set->ops_head)
+ _GSS_operation_destroy (set->ops_head);
+
+ /* Destroy operation-specific state */
+ GNUNET_assert (NULL != set->state);
+ if (NULL != set->state->se)
+ {
+ strata_estimator_destroy (set->state->se);
+ set->state->se = NULL;
+ }
+ GNUNET_free (set->state);
+
+ /* free set content (or at least decrement RC) */
+ set->content = NULL;
+ GNUNET_assert (0 != content->refcount);
+ content->refcount--;
+ if (0 == content->refcount)
+ {
+ GNUNET_assert (NULL != content->elements);
+ GNUNET_CONTAINER_multihashmap_iterate (content->elements,
+ &destroy_elements_iterator,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (content->elements);
+ content->elements = NULL;
+ GNUNET_free (content);
+ }
+ GNUNET_free (set);
+ }
+
+ if (NULL != (listener = cs->listener))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying client's listener\n");
+ GNUNET_CADET_close_port (listener->open_port);
+ listener->open_port = NULL;
+ while (NULL != (op = listener->op_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Destroying incoming operation `%u' from peer `%s'\n",
+ (unsigned int) op->client_request_id,
+ GNUNET_i2s (&op->peer));
+ incoming_destroy (op);
+ }
+ GNUNET_CONTAINER_DLL_remove (listener_head,
+ listener_tail,
+ listener);
+ GNUNET_free (listener);
+ }
+ GNUNET_free (cs);
+ num_clients--;
+ if ( (GNUNET_YES == in_shutdown) &&
+ (0 == num_clients) )
+ {
+ if (NULL != cadet)
+ {
+ GNUNET_CADET_disconnect (cadet);
+ cadet = NULL;
+ }
+ }
+}
+
+
+/**
+ * Check a request for a set operation from another peer.
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ * #GNUNET_SYSERR to destroy the channel
+ */
+static int
+check_incoming_msg (void *cls,
+ const struct OperationRequestMessage *msg)
+{
+ struct Operation *op = cls;
+ struct Listener *listener = op->listener;
+ const struct GNUNET_MessageHeader *nested_context;
+
+ /* double operation request */
+ if (0 != op->suggest_id)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ /* This should be equivalent to the previous condition, but can't hurt to check twice */
+ if (NULL == listener)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ nested_context = GNUNET_MQ_extract_nested_mh (msg);
+ if ((NULL != nested_context) &&
+ (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle a request for a set operation from another peer. Checks if we
+ * have a listener waiting for such a request (and in that case initiates
+ * asking the listener about accepting the connection). If no listener
+ * is waiting, we queue the operation request in hope that a listener
+ * shows up soon (before timeout).
+ *
+ * This msg is expected as the first and only msg handled through the
+ * non-operation bound virtual table, acceptance of this operation replaces
+ * our virtual table and subsequent msgs would be routed differently (as
+ * we then know what type of operation this is).
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ * #GNUNET_SYSERR to destroy the channel
+ */
+static void
+handle_incoming_msg (void *cls,
+ const struct OperationRequestMessage *msg)
+{
+ struct Operation *op = cls;
+ struct Listener *listener = op->listener;
+ const struct GNUNET_MessageHeader *nested_context;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_SETU_RequestMessage *cmsg;
+
+ nested_context = GNUNET_MQ_extract_nested_mh (msg);
+ /* Make a copy of the nested_context (application-specific context
+ information that is opaque to set) so we can pass it to the
+ listener later on */
+ if (NULL != nested_context)
+ op->context_msg = GNUNET_copy_message (nested_context);
+ op->remote_element_count = ntohl (msg->element_count);
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Received P2P operation request (port %s) for active listener\n",
+ GNUNET_h2s (&op->listener->app_id));
+ GNUNET_assert (0 == op->suggest_id);
+ if (0 == suggest_id)
+ suggest_id++;
+ op->suggest_id = suggest_id++;
+ GNUNET_assert (NULL != op->timeout_task);
+ GNUNET_SCHEDULER_cancel (op->timeout_task);
+ op->timeout_task = NULL;
+ env = GNUNET_MQ_msg_nested_mh (cmsg,
+ GNUNET_MESSAGE_TYPE_SETU_REQUEST,
+ op->context_msg);
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Suggesting incoming request with accept id %u to listener %p of client %p\n",
+ op->suggest_id,
+ listener,
+ listener->cs);
+ cmsg->accept_id = htonl (op->suggest_id);
+ cmsg->peer_id = op->peer;
+ GNUNET_MQ_send (listener->cs->mq,
+ env);
+ /* NOTE: GNUNET_CADET_receive_done() will be called in
+ #handle_client_accept() */
+}
+
+
+/**
+ * Called when a client wants to create a new set. This is typically
+ * the first request from a client, and includes the type of set
+ * operation to be performed.
+ *
+ * @param cls client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_create_set (void *cls,
+ const struct GNUNET_SETU_CreateMessage *msg)
+{
+ struct ClientState *cs = cls;
+ struct Set *set;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client created new set for union operation\n");
+ if (NULL != cs->set)
+ {
+ /* There can only be one set per client */
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ set = GNUNET_new (struct Set);
+ {
+ struct SetState *set_state;
+
+ set_state = GNUNET_new (struct SetState); // FIXME: avoid this malloc, merge structs!
+ set_state->se = strata_estimator_create (SE_STRATA_COUNT,
+ SE_IBF_SIZE, SE_IBF_HASH_NUM);
+ if (NULL == set_state->se)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to allocate strata estimator\n");
+ GNUNET_free (set);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ set->state = set_state;
+ }
+ set->content = GNUNET_new (struct SetContent);
+ set->content->refcount = 1;
+ set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
+ GNUNET_YES);
+ set->cs = cs;
+ cs->set = set;
+ GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Timeout happens iff:
+ * - we suggested an operation to our listener,
+ * but did not receive a response in time
+ * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+ *
+ * @param cls channel context
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+incoming_timeout_cb (void *cls)
+{
+ struct Operation *op = cls;
+
+ op->timeout_task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Remote peer's incoming request timed out\n");
+ incoming_destroy (op);
+}
+
+
+/**
+ * Method called whenever another peer has added us to a channel the
+ * other peer initiated. Only called (once) upon reception of data
+ * from a channel we listen on.
+ *
+ * The channel context represents the operation itself and gets added
+ * to a DLL, from where it gets looked up when our local listener
+ * client responds to a proposed/suggested operation or connects and
+ * associates with this operation.
+ *
+ * @param cls closure
+ * @param channel new handle to the channel
+ * @param source peer that started the channel
+ * @return initial channel context for the channel
+ * returns NULL on error
+ */
+static void *
+channel_new_cb (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ const struct GNUNET_PeerIdentity *source)
+{
+ struct Listener *listener = cls;
+ struct Operation *op;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New incoming channel\n");
+ op = GNUNET_new (struct Operation);
+ op->listener = listener;
+ op->peer = *source;
+ op->channel = channel;
+ op->mq = GNUNET_CADET_get_mq (op->channel);
+ op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+ UINT32_MAX);
+ op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
+ &incoming_timeout_cb,
+ op);
+ GNUNET_CONTAINER_DLL_insert (listener->op_head,
+ listener->op_tail,
+ op);
+ return op;
+}
+
+
+/**
+ * Function called whenever a channel is destroyed. Should clean up
+ * any associated state. It must NOT call
+ * GNUNET_CADET_channel_destroy() on the channel.
+ *
+ * The peer_disconnect function is part of a a virtual table set initially either
+ * when a peer creates a new channel with us, or once we create
+ * a new channel ourselves (evaluate).
+ *
+ * Once we know the exact type of operation (union/intersection), the vt is
+ * replaced with an operation specific instance (_GSS_[op]_vt).
+ *
+ * @param channel_ctx place where local state associated
+ * with the channel is stored
+ * @param channel connection to the other end (henceforth invalid)
+ */
+static void
+channel_end_cb (void *channel_ctx,
+ const struct GNUNET_CADET_Channel *channel)
+{
+ struct Operation *op = channel_ctx;
+
+ op->channel = NULL;
+ _GSS_operation_destroy2 (op);
+}
+
+
+/**
+ * This function probably should not exist
+ * and be replaced by inlining more specific
+ * logic in the various places where it is called.
+ */
+void
+_GSS_operation_destroy2 (struct Operation *op)
+{
+ struct GNUNET_CADET_Channel *channel;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "channel_end_cb called\n");
+ if (NULL != (channel = op->channel))
+ {
+ /* This will free op; called conditionally as this helper function
+ is also called from within the channel disconnect handler. */
+ op->channel = NULL;
+ GNUNET_CADET_channel_destroy (channel);
+ }
+ if (NULL != op->listener)
+ {
+ incoming_destroy (op);
+ return;
+ }
+ if (NULL != op->set)
+ send_client_done (op);
+ _GSS_operation_destroy (op);
+ GNUNET_free (op);
+}
+
+
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ * this value will be negative..
+ */
+static void
+channel_window_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel,
+ int window_size)
+{
+ /* FIXME: not implemented, we could do flow control here... */
+}
+
+
+/**
+ * Called when a client wants to create a new listener.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_listen (void *cls,
+ const struct GNUNET_SETU_ListenMessage *msg)
+{
+ struct ClientState *cs = cls;
+ struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+ GNUNET_MQ_hd_var_size (incoming_msg,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
+ struct OperationRequestMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_ibf,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
+ struct IBFMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_elements,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
+ struct GNUNET_SETU_ElementMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_offer,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_inquiry,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
+ struct InquiryMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_demand,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_done,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_over,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
+ struct StrataEstimatorMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
+ struct StrataEstimatorMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_full_element,
+ GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
+ struct GNUNET_SETU_ElementMessage,
+ NULL),
+ GNUNET_MQ_handler_end ()
+ };
+ struct Listener *listener;
+
+ if (NULL != cs->listener)
+ {
+ /* max. one active listener per client! */
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ listener = GNUNET_new (struct Listener);
+ listener->cs = cs;
+ cs->listener = listener;
+ listener->app_id = msg->app_id;
+ GNUNET_CONTAINER_DLL_insert (listener_head,
+ listener_tail,
+ listener);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New listener created (port %s)\n",
+ GNUNET_h2s (&listener->app_id));
+ listener->open_port = GNUNET_CADET_open_port (cadet,
+ &msg->app_id,
+ &channel_new_cb,
+ listener,
+ &channel_window_cb,
+ &channel_end_cb,
+ cadet_handlers);
+ GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Called when the listening client rejects an operation
+ * request by another peer.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_reject (void *cls,
+ const struct GNUNET_SETU_RejectMessage *msg)
+{
+ struct ClientState *cs = cls;
+ struct Operation *op;
+
+ op = get_incoming (ntohl (msg->accept_reject_id));
+ if (NULL == op)
+ {
+ /* no matching incoming operation for this reject;
+ could be that the other peer already disconnected... */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Client rejected unknown operation %u\n",
+ (unsigned int) ntohl (msg->accept_reject_id));
+ GNUNET_SERVICE_client_continue (cs->client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer request (app %s) rejected by client\n",
+ GNUNET_h2s (&cs->listener->app_id));
+ _GSS_operation_destroy2 (op);
+ GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Called when a client wants to add or remove an element to a set it inhabits.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static int
+check_client_set_add (void *cls,
+ const struct GNUNET_SETU_ElementMessage *msg)
+{
+ /* NOTE: Technically, we should probably check with the
+ block library whether the element we are given is well-formed */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Called when a client wants to add or remove an element to a set it inhabits.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_set_add (void *cls,
+ const struct GNUNET_SETU_ElementMessage *msg)
+{
+ struct ClientState *cs = cls;
+ struct Set *set;
+ struct GNUNET_SETU_Element el;
+ struct ElementEntry *ee;
+ struct GNUNET_HashCode hash;
+
+ if (NULL == (set = cs->set))
+ {
+ /* client without a set requested an operation */
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ GNUNET_SERVICE_client_continue (cs->client);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
+ el.size = ntohs (msg->header.size) - sizeof(*msg);
+ el.data = &msg[1];
+ el.element_type = ntohs (msg->element_type);
+ GNUNET_SETU_element_hash (&el,
+ &hash);
+ ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
+ &hash);
+ if (NULL == ee)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client inserts element %s of size %u\n",
+ GNUNET_h2s (&hash),
+ el.size);
+ ee = GNUNET_malloc (el.size + sizeof(*ee));
+ ee->element.size = el.size;
+ GNUNET_memcpy (&ee[1], el.data, el.size);
+ ee->element.data = &ee[1];
+ ee->element.element_type = el.element_type;
+ ee->remote = GNUNET_NO;
+ ee->generation = set->current_generation;
+ ee->element_hash = hash;
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_put (
+ set->content->elements,
+ &ee->element_hash,
+ ee,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client inserted element %s of size %u twice (ignored)\n",
+ GNUNET_h2s (&hash),
+ el.size);
+ /* same element inserted twice */
+ return;
+ }
+ strata_estimator_insert (set->state->se,
+ get_ibf_key (&ee->element_hash));
+}
+
+
+/**
+ * Advance the current generation of a set,
+ * adding exclusion ranges if necessary.
+ *
+ * @param set the set where we want to advance the generation
+ */
+static void
+advance_generation (struct Set *set)
+{
+ set->content->latest_generation++;
+ set->current_generation++;
+}
+
+
+/**
+ * Called when a client wants to initiate a set operation with another
+ * peer. Initiates the CADET connection to the listener and sends the
+ * request.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ * @return #GNUNET_OK if the message is well-formed
+ */
+static int
+check_client_evaluate (void *cls,
+ const struct GNUNET_SETU_EvaluateMessage *msg)
+{
+ /* FIXME: suboptimal, even if the context below could be NULL,
+ there are malformed messages this does not check for... */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Called when a client wants to initiate a set operation with another
+ * peer. Initiates the CADET connection to the listener and sends the
+ * request.
+ *
+ * @param cls client that sent the message
+ * @param msg message sent by the client
+ */
+static void
+handle_client_evaluate (void *cls,
+ const struct GNUNET_SETU_EvaluateMessage *msg)
+{
+ struct ClientState *cs = cls;
+ struct Operation *op = GNUNET_new (struct Operation);
+ const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
+ GNUNET_MQ_hd_var_size (incoming_msg,
+ GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+ struct OperationRequestMessage,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_ibf,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
+ struct IBFMessage,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_elements,
+ GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
+ struct GNUNET_SETU_ElementMessage,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_offer,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_inquiry,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
+ struct InquiryMessage,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_demand,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_over,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
+ struct StrataEstimatorMessage,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
+ struct StrataEstimatorMessage,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_full_element,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+ struct GNUNET_SETU_ElementMessage,
+ op),
+ GNUNET_MQ_handler_end ()
+ };
+ struct Set *set;
+ const struct GNUNET_MessageHeader *context;
+
+ if (NULL == (set = cs->set))
+ {
+ GNUNET_break (0);
+ GNUNET_free (op);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+ UINT32_MAX);
+ op->peer = msg->target_peer;
+ op->client_request_id = ntohl (msg->request_id);
+ op->byzantine = msg->byzantine;
+ op->byzantine_lower_bound = msg->byzantine_lower_bound;
+ op->force_full = msg->force_full;
+ op->force_delta = msg->force_delta;
+ context = GNUNET_MQ_extract_nested_mh (msg);
+
+ /* Advance generation values, so that
+ mutations won't interfer with the running operation. */
+ op->set = set;
+ op->generation_created = set->current_generation;
+ advance_generation (set);
+ GNUNET_CONTAINER_DLL_insert (set->ops_head,
+ set->ops_tail,
+ op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new CADET channel to port %s for set union\n",
+ GNUNET_h2s (&msg->app_id));
+ op->channel = GNUNET_CADET_channel_create (cadet,
+ op,
+ &msg->target_peer,
+ &msg->app_id,
+ &channel_window_cb,
+ &channel_end_cb,
+ cadet_handlers);
+ op->mq = GNUNET_CADET_get_mq (op->channel);
+ op->state = union_evaluate (op, context);
+ if (NULL == op->state)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Handle a request from the client to cancel a running set operation.
+ *
+ * @param cls the client
+ * @param msg the message
+ */
+static void
+handle_client_cancel (void *cls,
+ const struct GNUNET_SETU_CancelMessage *msg)
+{
+ struct ClientState *cs = cls;
+ struct Set *set;
+ struct Operation *op;
+ int found;
+
+ if (NULL == (set = cs->set))
+ {
+ /* client without a set requested an operation */
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ found = GNUNET_NO;
+ for (op = set->ops_head; NULL != op; op = op->next)
+ {
+ if (op->client_request_id == ntohl (msg->request_id))
+ {
+ found = GNUNET_YES;
+ break;
+ }
+ }
+ if (GNUNET_NO == found)
+ {
+ /* It may happen that the operation was already destroyed due to
+ * the other peer disconnecting. The client may not know about this
+ * yet and try to cancel the (just barely non-existent) operation.
+ * So this is not a hard error.
+ *///
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Client canceled non-existent op %u\n",
+ (uint32_t) ntohl (msg->request_id));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client requested cancel for op %u\n",
+ (uint32_t) ntohl (msg->request_id));
+ _GSS_operation_destroy (op);
+ }
+ GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Handle a request from the client to accept a set operation that
+ * came from a remote peer. We forward the accept to the associated
+ * operation for handling
+ *
+ * @param cls the client
+ * @param msg the message
+ */
+static void
+handle_client_accept (void *cls,
+ const struct GNUNET_SETU_AcceptMessage *msg)
+{
+ struct ClientState *cs = cls;
+ struct Set *set;
+ struct Operation *op;
+ struct GNUNET_SETU_ResultMessage *result_message;
+ struct GNUNET_MQ_Envelope *ev;
+ struct Listener *listener;
+
+ if (NULL == (set = cs->set))
+ {
+ /* client without a set requested to accept */
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (cs->client);
+ return;
+ }
+ op = get_incoming (ntohl (msg->accept_reject_id));
+ if (NULL == op)
+ {
+ /* It is not an error if the set op does not exist -- it may
+ * have been destroyed when the partner peer disconnected. */
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_INFO,
+ "Client %p accepted request %u of listener %p that is no longer active\n",
+ cs,
+ ntohl (msg->accept_reject_id),
+ cs->listener);
+ ev = GNUNET_MQ_msg (result_message,
+ GNUNET_MESSAGE_TYPE_SET_RESULT);
+ result_message->request_id = msg->request_id;
+ result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
+ GNUNET_MQ_send (set->cs->mq, ev);
+ GNUNET_SERVICE_client_continue (cs->client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client accepting request %u\n",
+ (uint32_t) ntohl (msg->accept_reject_id));
+ listener = op->listener;
+ op->listener = NULL;
+ GNUNET_CONTAINER_DLL_remove (listener->op_head,
+ listener->op_tail,
+ op);
+ op->set = set;
+ GNUNET_CONTAINER_DLL_insert (set->ops_head,
+ set->ops_tail,
+ op);
+ op->client_request_id = ntohl (msg->request_id);
+ op->byzantine = msg->byzantine;
+ op->byzantine_lower_bound = msg->byzantine_lower_bound;
+ op->force_full = msg->force_full;
+ op->force_delta = msg->force_delta;
+
+ /* Advance generation values, so that future mutations do not
+ interfer with the running operation. */
+ op->generation_created = set->current_generation;
+ advance_generation (set);
+ GNUNET_assert (NULL == op->state);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "accepting set union operation\n");
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of accepted union operations",
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of total union operations",
+ 1,
+ GNUNET_NO);
+ {
+ struct OperationState *state;
+ const struct StrataEstimator *se;
+ struct GNUNET_MQ_Envelope *ev;
+ struct StrataEstimatorMessage *strata_msg;
+ char *buf;
+ size_t len;
+ uint16_t type;
+
+ state = GNUNET_new (struct OperationState); // FIXME: merge with 'op' to avoid malloc!
+ state->se = strata_estimator_dup (op->set->state->se);
+ state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
+ GNUNET_NO);
+ state->salt_receive = state->salt_send = 42; // FIXME?????
+ op->state = state;
+ initialize_key_to_element (op);
+ state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
+ state->key_to_element);
+
+ /* kick off the operation */
+ se = state->se;
+ buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
+ len = strata_estimator_write (se,
+ buf);
+ if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
+ type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
+ else
+ type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
+ ev = GNUNET_MQ_msg_extra (strata_msg,
+ len,
+ type);
+ GNUNET_memcpy (&strata_msg[1],
+ buf,
+ len);
+ GNUNET_free (buf);
+ strata_msg->set_size
+ = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
+ op->set->content->elements));
+ GNUNET_MQ_send (op->mq,
+ ev);
+ state->phase = PHASE_EXPECT_IBF;
+
+ op->state = state;
+ }
+ /* Now allow CADET to continue, as we did not do this in
+ #handle_incoming_msg (as we wanted to first see if the
+ local client would accept the request). */
+ GNUNET_CADET_receive_done (op->channel);
+ GNUNET_SERVICE_client_continue (cs->client);
+}
+
+
+/**
+ * Called to clean up, after a shutdown has been requested.
+ *
+ * @param cls closure, NULL
+ */
+static void
+shutdown_task (void *cls)
+{
+ /* Delay actual shutdown to allow service to disconnect clients */
+ in_shutdown = GNUNET_YES;
+ if (0 == num_clients)
+ {
+ if (NULL != cadet)
+ {
+ GNUNET_CADET_disconnect (cadet);
+ cadet = NULL;
+ }
+ }
+ GNUNET_STATISTICS_destroy (_GSS_statistics,
+ GNUNET_YES);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "handled shutdown request\n");
+}
+
+
+/**
+ * Function called by the service's run
+ * method to run service-specific setup code.
+ *
+ * @param cls closure
+ * @param cfg configuration to use
+ * @param service the initialized service
+ */
+static void
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_SERVICE_Handle *service)
+{
+ /* FIXME: need to modify SERVICE (!) API to allow
+ us to run a shutdown task *after* clients were
+ forcefully disconnected! */
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ NULL);
+ _GSS_statistics = GNUNET_STATISTICS_create ("setu", cfg);
+ cadet = GNUNET_CADET_connect (cfg);
+ if (NULL == cadet)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _ ("Could not connect to CADET service\n"));
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
+/**
+ * Define "main" method using service macro.
+ */
+GNUNET_SERVICE_MAIN (
+ "set",
+ GNUNET_SERVICE_OPTION_NONE,
+ &run,
+ &client_connect_cb,
+ &client_disconnect_cb,
+ NULL,
+ GNUNET_MQ_hd_fixed_size (client_accept,
+ GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
+ struct GNUNET_SETU_AcceptMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_set_add,
+ GNUNET_MESSAGE_TYPE_SETU_ADD,
+ struct GNUNET_SETU_ElementMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (client_create_set,
+ GNUNET_MESSAGE_TYPE_SETU_CREATE,
+ struct GNUNET_SETU_CreateMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_evaluate,
+ GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
+ struct GNUNET_SETU_EvaluateMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (client_listen,
+ GNUNET_MESSAGE_TYPE_SETU_LISTEN,
+ struct GNUNET_SETU_ListenMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (client_reject,
+ GNUNET_MESSAGE_TYPE_SETU_REJECT,
+ struct GNUNET_SETU_RejectMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (client_cancel,
+ GNUNET_MESSAGE_TYPE_SETU_CANCEL,
+ struct GNUNET_SETU_CancelMessage,
+ NULL),
+ GNUNET_MQ_handler_end ());
+
+
+/* end of gnunet-service-setu.c */
diff --git a/src/setu/gnunet-service-setu.h b/src/setu/gnunet-service-setu.h
new file mode 100644
index 000000000..eb6b7a8e5
--- /dev/null
+++ b/src/setu/gnunet-service-setu.h
@@ -0,0 +1,393 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2013-2017 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/gnunet-service-setu.h
+ * @brief common components for the implementation the different set operations
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#ifndef GNUNET_SERVICE_SETU_H_PRIVATE
+#define GNUNET_SERVICE_SETU_H_PRIVATE
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet_core_service.h"
+#include "gnunet_cadet_service.h"
+#include "gnunet_setu_service.h"
+#include "setu.h"
+
+
+/**
+ * Implementation-specific set state. Used as opaque pointer, and
+ * specified further in the respective implementation.
+ */
+struct SetState;
+
+/**
+ * Implementation-specific set operation. Used as opaque pointer, and
+ * specified further in the respective implementation.
+ */
+struct OperationState;
+
+/**
+ * A set that supports a specific operation with other peers.
+ */
+struct Set;
+
+/**
+ * Information about an element element in the set. All elements are
+ * stored in a hash-table from their hash-code to their 'struct
+ * Element', so that the remove and add operations are reasonably
+ * fast.
+ */
+struct ElementEntry;
+
+/**
+ * Operation context used to execute a set operation.
+ */
+struct Operation;
+
+
+/**
+ * Information about an element element in the set. All elements are
+ * stored in a hash-table from their hash-code to their `struct
+ * Element`, so that the remove and add operations are reasonably
+ * fast.
+ */
+struct ElementEntry
+{
+ /**
+ * The actual element. The data for the element
+ * should be allocated at the end of this struct.
+ */
+ struct GNUNET_SETU_Element element;
+
+ /**
+ * Hash of the element. For set union: Will be used to derive the
+ * different IBF keys for different salts.
+ */
+ struct GNUNET_HashCode element_hash;
+
+ /**
+ * First generation that includes this element.
+ */
+ unsigned int generation;
+
+ /**
+ * #GNUNET_YES if the element is a remote element, and does not belong
+ * to the operation's set.
+ */
+ int remote;
+};
+
+
+/**
+ * A listener is inhabited by a client, and waits for evaluation
+ * requests from remote peers.
+ */
+struct Listener;
+
+
+/**
+ * State we keep per client.
+ */
+struct ClientState
+{
+ /**
+ * Set, if associated with the client, otherwise NULL.
+ */
+ struct Set *set;
+
+ /**
+ * Listener, if associated with the client, otherwise NULL.
+ */
+ struct Listener *listener;
+
+ /**
+ * Client handle.
+ */
+ struct GNUNET_SERVICE_Client *client;
+
+ /**
+ * Message queue.
+ */
+ struct GNUNET_MQ_Handle *mq;
+};
+
+
+/**
+ * Operation context used to execute a set operation.
+ */
+struct Operation
+{
+ /**
+ * Kept in a DLL of the listener, if @e listener is non-NULL.
+ */
+ struct Operation *next;
+
+ /**
+ * Kept in a DLL of the listener, if @e listener is non-NULL.
+ */
+ struct Operation *prev;
+
+ /**
+ * Channel to the peer.
+ */
+ struct GNUNET_CADET_Channel *channel;
+
+ /**
+ * Port this operation runs on.
+ */
+ struct Listener *listener;
+
+ /**
+ * Message queue for the channel.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Context message, may be NULL.
+ */
+ struct GNUNET_MessageHeader *context_msg;
+
+ /**
+ * Set associated with the operation, NULL until the spec has been
+ * associated with a set.
+ */
+ struct Set *set;
+
+ /**
+ * Operation-specific operation state. Note that the exact
+ * type depends on this being a union or intersection operation
+ * (and thus on @e vt).
+ */
+ struct OperationState *state;
+
+ /**
+ * The identity of the requesting peer. Needs to
+ * be stored here as the op spec might not have been created yet.
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Timeout task, if the incoming peer has not been accepted
+ * after the timeout, it will be disconnected.
+ */
+ struct GNUNET_SCHEDULER_Task *timeout_task;
+
+ /**
+ * Salt to use for the operation.
+ */
+ uint32_t salt;
+
+ /**
+ * Remote peers element count
+ */
+ uint32_t remote_element_count;
+
+ /**
+ * ID used to identify an operation between service and client
+ */
+ uint32_t client_request_id;
+
+ /**
+ * Always use delta operation instead of sending full sets,
+ * even it it's less efficient.
+ */
+ int force_delta;
+
+ /**
+ * Always send full sets, even if delta operations would
+ * be more efficient.
+ */
+ int force_full;
+
+ /**
+ * #GNUNET_YES to fail operations where Byzantine faults
+ * are suspected
+ */
+ int byzantine;
+
+ /**
+ * Lower bound for the set size, used only when
+ * byzantine mode is enabled.
+ */
+ int byzantine_lower_bound;
+
+ /**
+ * Unique request id for the request from a remote peer, sent to the
+ * client, which will accept or reject the request. Set to '0' iff
+ * the request has not been suggested yet.
+ */
+ uint32_t suggest_id;
+
+ /**
+ * Generation in which the operation handle
+ * was created.
+ */
+ unsigned int generation_created;
+};
+
+
+/**
+ * SetContent stores the actual set elements, which may be shared by
+ * multiple generations derived from one set.
+ */
+struct SetContent
+{
+ /**
+ * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *elements;
+
+ /**
+ * Number of references to the content.
+ */
+ unsigned int refcount;
+
+ /**
+ * FIXME: document!
+ */
+ unsigned int latest_generation;
+
+ /**
+ * Number of concurrently active iterators.
+ */
+ int iterator_count;
+};
+
+
+struct GenerationRange
+{
+ /**
+ * First generation that is excluded.
+ */
+ unsigned int start;
+
+ /**
+ * Generation after the last excluded generation.
+ */
+ unsigned int end;
+};
+
+
+/**
+ * A set that supports a specific operation with other peers.
+ */
+struct Set
+{
+ /**
+ * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
+ */
+ struct Set *next;
+
+ /**
+ * Sets are held in a doubly linked list.
+ */
+ struct Set *prev;
+
+ /**
+ * Client that owns the set. Only one client may own a set,
+ * and there can only be one set per client.
+ */
+ struct ClientState *cs;
+
+ /**
+ * Content, possibly shared by multiple sets,
+ * and thus reference counted.
+ */
+ struct SetContent *content;
+
+ /**
+ * Implementation-specific state.
+ */
+ struct SetState *state;
+
+ /**
+ * Evaluate operations are held in a linked list.
+ */
+ struct Operation *ops_head;
+
+ /**
+ * Evaluate operations are held in a linked list.
+ */
+ struct Operation *ops_tail;
+
+ /**
+ * Current generation, that is, number of previously executed
+ * operations and lazy copies on the underlying set content.
+ */
+ unsigned int current_generation;
+
+};
+
+
+extern struct GNUNET_STATISTICS_Handle *_GSS_statistics;
+
+
+/**
+ * Destroy the given operation. Used for any operation where both
+ * peers were known and that thus actually had a vt and channel. Must
+ * not be used for operations where 'listener' is still set and we do
+ * not know the other peer.
+ *
+ * Call the implementation-specific cancel function of the operation.
+ * Disconnects from the remote peer. Does not disconnect the client,
+ * as there may be multiple operations per set.
+ *
+ * @param op operation to destroy
+ */
+void
+_GSS_operation_destroy (struct Operation *op);
+
+
+/**
+ * This function probably should not exist
+ * and be replaced by inlining more specific
+ * logic in the various places where it is called.
+ */
+void
+_GSS_operation_destroy2 (struct Operation *op);
+
+
+/**
+ * Get the table with implementing functions for set union.
+ *
+ * @return the operation specific VTable
+ */
+const struct SetVT *
+_GSS_union_vt (void);
+
+
+/**
+ * Is element @a ee part of the set used by @a op?
+ *
+ * @param ee element to test
+ * @param op operation the defines the set and its generation
+ * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
+ */
+int
+_GSS_is_element_of_operation (struct ElementEntry *ee,
+ struct Operation *op);
+
+
+#endif
diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h
new file mode 100644
index 000000000..a2803ee47
--- /dev/null
+++ b/src/setu/gnunet-service-setu_protocol.h
@@ -0,0 +1,226 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2013, 2014 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @author Florian Dold
+ * @author Christian Grothoff
+ * @file set/gnunet-service-set_protocol.h
+ * @brief Peer-to-Peer messages for gnunet set
+ */
+#ifndef SET_PROTOCOL_H
+#define SET_PROTOCOL_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+struct OperationRequestMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Operation to request, values from `enum GNUNET_SET_OperationType`
+ */
+ uint32_t operation GNUNET_PACKED;
+
+ /**
+ * For Intersection: my element count
+ */
+ uint32_t element_count GNUNET_PACKED;
+
+ /**
+ * Application-specific identifier of the request.
+ */
+ struct GNUNET_HashCode app_idX;
+
+ /* rest: optional message */
+};
+
+
+/**
+ * Message containing buckets of an invertible bloom filter.
+ *
+ * If an IBF has too many buckets for an IBF message,
+ * it is split into multiple messages.
+ */
+struct IBFMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Order of the whole ibf, where
+ * num_buckets = 2^order
+ */
+ uint8_t order;
+
+ /**
+ * Padding, must be 0.
+ */
+ uint8_t reserved1;
+
+ /**
+ * Padding, must be 0.
+ */
+ uint16_t reserved2 GNUNET_PACKED;
+
+ /**
+ * Offset of the strata in the rest of the message
+ */
+ uint32_t offset GNUNET_PACKED;
+
+ /**
+ * Salt used when hashing elements for this IBF.
+ */
+ uint32_t salt GNUNET_PACKED;
+
+ /* rest: buckets */
+};
+
+
+struct InquiryMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Salt used when hashing elements for this inquiry.
+ */
+ uint32_t salt GNUNET_PACKED;
+
+ /**
+ * Reserved, set to 0.
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+ /* rest: inquiry IBF keys */
+};
+
+
+/**
+ * During intersection, the first (and possibly second) message
+ * send it the number of elements in the set, to allow the peers
+ * to decide who should start with the Bloom filter.
+ */
+struct IntersectionElementInfoMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * mutator used with this bloomfilter.
+ */
+ uint32_t sender_element_count GNUNET_PACKED;
+};
+
+
+/**
+ * Bloom filter messages exchanged for set intersection calculation.
+ */
+struct BFMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Number of elements the sender still has in the set.
+ */
+ uint32_t sender_element_count GNUNET_PACKED;
+
+ /**
+ * XOR of all hashes over all elements remaining in the set.
+ * Used to determine termination.
+ */
+ struct GNUNET_HashCode element_xor_hash;
+
+ /**
+ * Mutator used with this bloomfilter.
+ */
+ uint32_t sender_mutator GNUNET_PACKED;
+
+ /**
+ * Total length of the bloomfilter data.
+ */
+ uint32_t bloomfilter_total_length GNUNET_PACKED;
+
+ /**
+ * Number of bits (k-value) used in encoding the bloomfilter.
+ */
+ uint32_t bits_per_element GNUNET_PACKED;
+
+ /**
+ * rest: the sender's bloomfilter
+ */
+};
+
+
+/**
+ * Last message, send to confirm the final set. Contains the element
+ * count as it is possible that the peer determined that we were done
+ * by getting the empty set, which in that case also needs to be
+ * communicated.
+ */
+struct IntersectionDoneMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Final number of elements in intersection.
+ */
+ uint32_t final_element_count GNUNET_PACKED;
+
+ /**
+ * XOR of all hashes over all elements remaining in the set.
+ */
+ struct GNUNET_HashCode element_xor_hash;
+};
+
+
+/**
+ * Strata estimator together with the peer's overall set size.
+ */
+struct StrataEstimatorMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint64_t set_size;
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+#endif
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c
new file mode 100644
index 000000000..0fa6a6f17
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.c
@@ -0,0 +1,303 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2012 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/gnunet-service-setu_strata_estimator.c
+ * @brief invertible bloom filter
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "ibf.h"
+#include "gnunet-service-setu_strata_estimator.h"
+
+
+/**
+ * Should we try compressing the strata estimator? This will
+ * break compatibility with the 0.10.1-network.
+ */
+#define FAIL_10_1_COMPATIBILTIY 1
+
+
+/**
+ * Write the given strata estimator to the buffer.
+ *
+ * @param se strata estimator to serialize
+ * @param[out] buf buffer to write to, must be of appropriate size
+ * @return number of bytes written to @a buf
+ */
+size_t
+strata_estimator_write (const struct StrataEstimator *se,
+ void *buf)
+{
+ char *sbuf = buf;
+ unsigned int i;
+ size_t osize;
+
+ GNUNET_assert (NULL != se);
+ for (i = 0; i < se->strata_count; i++)
+ {
+ ibf_write_slice (se->strata[i],
+ 0,
+ se->ibf_size,
+ &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]);
+ }
+ osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
+#if FAIL_10_1_COMPATIBILTIY
+ {
+ char *cbuf;
+ size_t nsize;
+
+ if (GNUNET_YES ==
+ GNUNET_try_compression (buf,
+ osize,
+ &cbuf,
+ &nsize))
+ {
+ GNUNET_memcpy (buf, cbuf, nsize);
+ osize = nsize;
+ GNUNET_free (cbuf);
+ }
+ }
+#endif
+ return osize;
+}
+
+
+/**
+ * Read strata from the buffer into the given strata
+ * estimator. The strata estimator must already be allocated.
+ *
+ * @param buf buffer to read from
+ * @param buf_len number of bytes in @a buf
+ * @param is_compressed is the data compressed?
+ * @param[out] se strata estimator to write to
+ * @return #GNUNET_OK on success
+ */
+int
+strata_estimator_read (const void *buf,
+ size_t buf_len,
+ int is_compressed,
+ struct StrataEstimator *se)
+{
+ unsigned int i;
+ size_t osize;
+ char *dbuf;
+
+ dbuf = NULL;
+ if (GNUNET_YES == is_compressed)
+ {
+ osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
+ dbuf = GNUNET_decompress (buf,
+ buf_len,
+ osize);
+ if (NULL == dbuf)
+ {
+ GNUNET_break_op (0); /* bad compressed input data */
+ return GNUNET_SYSERR;
+ }
+ buf = dbuf;
+ buf_len = osize;
+ }
+
+ if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE)
+ {
+ GNUNET_break (0); /* very odd error */
+ GNUNET_free (dbuf);
+ return GNUNET_SYSERR;
+ }
+
+ for (i = 0; i < se->strata_count; i++)
+ {
+ ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]);
+ buf += se->ibf_size * IBF_BUCKET_SIZE;
+ }
+ GNUNET_free (dbuf);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Add a key to the strata estimator.
+ *
+ * @param se strata estimator to add the key to
+ * @param key key to add
+ */
+void
+strata_estimator_insert (struct StrataEstimator *se,
+ struct IBF_Key key)
+{
+ uint64_t v;
+ unsigned int i;
+
+ v = key.key_val;
+ /* count trailing '1'-bits of v */
+ for (i = 0; v & 1; v >>= 1, i++)
+ /* empty */;
+ ibf_insert (se->strata[i], key);
+}
+
+
+/**
+ * Remove a key from the strata estimator.
+ *
+ * @param se strata estimator to remove the key from
+ * @param key key to remove
+ */
+void
+strata_estimator_remove (struct StrataEstimator *se,
+ struct IBF_Key key)
+{
+ uint64_t v;
+ unsigned int i;
+
+ v = key.key_val;
+ /* count trailing '1'-bits of v */
+ for (i = 0; v & 1; v >>= 1, i++)
+ /* empty */;
+ ibf_remove (se->strata[i], key);
+}
+
+
+/**
+ * Create a new strata estimator with the given parameters.
+ *
+ * @param strata_count number of stratas, that is, number of ibfs in the estimator
+ * @param ibf_size size of each ibf stratum
+ * @param ibf_hashnum hashnum parameter of each ibf
+ * @return a freshly allocated, empty strata estimator, NULL on error
+ */
+struct StrataEstimator *
+strata_estimator_create (unsigned int strata_count,
+ uint32_t ibf_size,
+ uint8_t ibf_hashnum)
+{
+ struct StrataEstimator *se;
+ unsigned int i;
+ unsigned int j;
+
+ se = GNUNET_new (struct StrataEstimator);
+ se->strata_count = strata_count;
+ se->ibf_size = ibf_size;
+ se->strata = GNUNET_new_array (strata_count,
+ struct InvertibleBloomFilter *);
+ for (i = 0; i < strata_count; i++)
+ {
+ se->strata[i] = ibf_create (ibf_size, ibf_hashnum);
+ if (NULL == se->strata[i])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to allocate memory for strata estimator\n");
+ for (j = 0; j < i; j++)
+ ibf_destroy (se->strata[i]);
+ GNUNET_free (se);
+ return NULL;
+ }
+ }
+ return se;
+}
+
+
+/**
+ * Estimate set difference with two strata estimators,
+ * i.e. arrays of IBFs.
+ * Does not not modify its arguments.
+ *
+ * @param se1 first strata estimator
+ * @param se2 second strata estimator
+ * @return the estimated difference
+ */
+unsigned int
+strata_estimator_difference (const struct StrataEstimator *se1,
+ const struct StrataEstimator *se2)
+{
+ unsigned int count;
+
+ GNUNET_assert (se1->strata_count == se2->strata_count);
+ count = 0;
+ for (int i = se1->strata_count - 1; i >= 0; i--)
+ {
+ struct InvertibleBloomFilter *diff;
+ /* number of keys decoded from the ibf */
+
+ /* FIXME: implement this without always allocating new IBFs */
+ diff = ibf_dup (se1->strata[i]);
+ ibf_subtract (diff, se2->strata[i]);
+ for (int ibf_count = 0; GNUNET_YES; ibf_count++)
+ {
+ int more;
+
+ more = ibf_decode (diff, NULL, NULL);
+ if (GNUNET_NO == more)
+ {
+ count += ibf_count;
+ break;
+ }
+ /* Estimate if decoding fails or would not terminate */
+ if ((GNUNET_SYSERR == more) || (ibf_count > diff->size))
+ {
+ ibf_destroy (diff);
+ return count * (1 << (i + 1));
+ }
+ }
+ ibf_destroy (diff);
+ }
+ return count;
+}
+
+
+/**
+ * Make a copy of a strata estimator.
+ *
+ * @param se the strata estimator to copy
+ * @return the copy
+ */
+struct StrataEstimator *
+strata_estimator_dup (struct StrataEstimator *se)
+{
+ struct StrataEstimator *c;
+ unsigned int i;
+
+ c = GNUNET_new (struct StrataEstimator);
+ c->strata_count = se->strata_count;
+ c->ibf_size = se->ibf_size;
+ c->strata = GNUNET_new_array (se->strata_count,
+ struct InvertibleBloomFilter *);
+ for (i = 0; i < se->strata_count; i++)
+ c->strata[i] = ibf_dup (se->strata[i]);
+ return c;
+}
+
+
+/**
+ * Destroy a strata estimator, free all of its resources.
+ *
+ * @param se strata estimator to destroy.
+ */
+void
+strata_estimator_destroy (struct StrataEstimator *se)
+{
+ unsigned int i;
+
+ for (i = 0; i < se->strata_count; i++)
+ ibf_destroy (se->strata[i]);
+ GNUNET_free (se->strata);
+ GNUNET_free (se);
+}
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h
new file mode 100644
index 000000000..afdbcdbbf
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.h
@@ -0,0 +1,169 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2012 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/gnunet-service-setu_strata_estimator.h
+ * @brief estimator of set difference
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
+#define GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0 /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+
+/**
+ * A handle to a strata estimator.
+ */
+struct StrataEstimator
+{
+ /**
+ * The IBFs of this strata estimator.
+ */
+ struct InvertibleBloomFilter **strata;
+
+ /**
+ * Size of the IBF array in @e strata
+ */
+ unsigned int strata_count;
+
+ /**
+ * Size of each IBF stratum (in bytes)
+ */
+ unsigned int ibf_size;
+};
+
+
+/**
+ * Write the given strata estimator to the buffer.
+ *
+ * @param se strata estimator to serialize
+ * @param[out] buf buffer to write to, must be of appropriate size
+ * @return number of bytes written to @a buf
+ */
+size_t
+strata_estimator_write (const struct StrataEstimator *se,
+ void *buf);
+
+
+/**
+ * Read strata from the buffer into the given strata
+ * estimator. The strata estimator must already be allocated.
+ *
+ * @param buf buffer to read from
+ * @param buf_len number of bytes in @a buf
+ * @param is_compressed is the data compressed?
+ * @param[out] se strata estimator to write to
+ * @return #GNUNET_OK on success
+ */
+int
+strata_estimator_read (const void *buf,
+ size_t buf_len,
+ int is_compressed,
+ struct StrataEstimator *se);
+
+
+/**
+ * Create a new strata estimator with the given parameters.
+ *
+ * @param strata_count number of stratas, that is, number of ibfs in the estimator
+ * @param ibf_size size of each ibf stratum
+ * @param ibf_hashnum hashnum parameter of each ibf
+ * @return a freshly allocated, empty strata estimator, NULL on error
+ */
+struct StrataEstimator *
+strata_estimator_create (unsigned int strata_count,
+ uint32_t ibf_size,
+ uint8_t ibf_hashnum);
+
+
+/**
+ * Get an estimation of the symmetric difference of the elements
+ * contained in both strata estimators.
+ *
+ * @param se1 first strata estimator
+ * @param se2 second strata estimator
+ * @return abs(|se1| - |se2|)
+ */
+unsigned int
+strata_estimator_difference (const struct StrataEstimator *se1,
+ const struct StrataEstimator *se2);
+
+
+/**
+ * Add a key to the strata estimator.
+ *
+ * @param se strata estimator to add the key to
+ * @param key key to add
+ */
+void
+strata_estimator_insert (struct StrataEstimator *se,
+ struct IBF_Key key);
+
+
+/**
+ * Remove a key from the strata estimator.
+ *
+ * @param se strata estimator to remove the key from
+ * @param key key to remove
+ */
+void
+strata_estimator_remove (struct StrataEstimator *se,
+ struct IBF_Key key);
+
+
+/**
+ * Destroy a strata estimator, free all of its resources.
+ *
+ * @param se strata estimator to destroy.
+ */
+void
+strata_estimator_destroy (struct StrataEstimator *se);
+
+
+/**
+ * Make a copy of a strata estimator.
+ *
+ * @param se the strata estimator to copy
+ * @return the copy
+ */
+struct StrataEstimator *
+strata_estimator_dup (struct StrataEstimator *se);
+
+
+#if 0 /* keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/setu/gnunet-setu-ibf-profiler.c b/src/setu/gnunet-setu-ibf-profiler.c
new file mode 100644
index 000000000..944b63d30
--- /dev/null
+++ b/src/setu/gnunet-setu-ibf-profiler.c
@@ -0,0 +1,308 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2012 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/gnunet-set-ibf-profiler.c
+ * @brief tool for profiling the invertible bloom filter implementation
+ * @author Florian Dold
+ */
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+
+#include "ibf.h"
+
+static unsigned int asize = 10;
+static unsigned int bsize = 10;
+static unsigned int csize = 10;
+static unsigned int hash_num = 4;
+static unsigned int ibf_size = 80;
+
+/* FIXME: add parameter for this */
+static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
+
+static struct GNUNET_CONTAINER_MultiHashMap *set_a;
+static struct GNUNET_CONTAINER_MultiHashMap *set_b;
+/* common elements in a and b */
+static struct GNUNET_CONTAINER_MultiHashMap *set_c;
+
+static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
+
+static struct InvertibleBloomFilter *ibf_a;
+static struct InvertibleBloomFilter *ibf_b;
+
+
+static void
+register_hashcode (struct GNUNET_HashCode *hash)
+{
+ struct GNUNET_HashCode replicated;
+ struct IBF_Key key;
+
+ key = ibf_key_from_hashcode (hash);
+ ibf_hashcode_from_key (key, &replicated);
+ (void) GNUNET_CONTAINER_multihashmap_put (
+ key_to_hashcode,
+ &replicated,
+ GNUNET_memdup (hash, sizeof *hash),
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+
+static void
+iter_hashcodes (struct IBF_Key key,
+ GNUNET_CONTAINER_MulitHashMapIteratorCallback iter,
+ void *cls)
+{
+ struct GNUNET_HashCode replicated;
+
+ ibf_hashcode_from_key (key, &replicated);
+ GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode,
+ &replicated,
+ iter,
+ cls);
+}
+
+
+static int
+insert_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+ struct InvertibleBloomFilter *ibf = cls;
+
+ ibf_insert (ibf, ibf_key_from_hashcode (key));
+ return GNUNET_YES;
+}
+
+
+static int
+remove_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+ struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
+
+ /* if remove fails, there just was a collision with another key */
+ (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
+ return GNUNET_YES;
+}
+
+
+static void
+run (void *cls,
+ char *const *args,
+ const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_HashCode id;
+ struct IBF_Key ibf_key;
+ int i;
+ int side;
+ int res;
+ struct GNUNET_TIME_Absolute start_time;
+ struct GNUNET_TIME_Relative delta_time;
+
+ set_a =
+ GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)),
+ GNUNET_NO);
+ set_b =
+ GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)),
+ GNUNET_NO);
+ set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
+ GNUNET_NO);
+
+ key_to_hashcode =
+ GNUNET_CONTAINER_multihashmap_create (((asize + bsize + csize == 0)
+ ? 1
+ : (asize + bsize + csize)),
+ GNUNET_NO);
+
+ printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
+ hash_num,
+ ibf_size,
+ asize,
+ bsize,
+ csize);
+
+ i = 0;
+ while (i < asize)
+ {
+ GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+ continue;
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (
+ set_a,
+ &id,
+ NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ register_hashcode (&id);
+ i++;
+ }
+ i = 0;
+ while (i < bsize)
+ {
+ GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+ continue;
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+ continue;
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (
+ set_b,
+ &id,
+ NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ register_hashcode (&id);
+ i++;
+ }
+ i = 0;
+ while (i < csize)
+ {
+ GNUNET_CRYPTO_hash_create_random (random_quality, &id);
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
+ continue;
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
+ continue;
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
+ continue;
+ GNUNET_break (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (
+ set_c,
+ &id,
+ NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ register_hashcode (&id);
+ i++;
+ }
+
+ ibf_a = ibf_create (ibf_size, hash_num);
+ ibf_b = ibf_create (ibf_size, hash_num);
+ if ((NULL == ibf_a) || (NULL == ibf_b))
+ {
+ /* insufficient memory */
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+
+
+ printf ("generated sets\n");
+
+ start_time = GNUNET_TIME_absolute_get ();
+
+ GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
+ GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
+ GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
+ GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
+
+ delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+
+ printf ("encoded in: %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
+
+ ibf_subtract (ibf_a, ibf_b);
+
+
+ start_time = GNUNET_TIME_absolute_get ();
+
+ for (i = 0; i <= asize + bsize; i++)
+ {
+ res = ibf_decode (ibf_a, &side, &ibf_key);
+ if (GNUNET_SYSERR == res)
+ {
+ printf ("decode failed, %u/%u elements left\n",
+ GNUNET_CONTAINER_multihashmap_size (set_a)
+ + GNUNET_CONTAINER_multihashmap_size (set_b),
+ asize + bsize);
+ return;
+ }
+ if (GNUNET_NO == res)
+ {
+ if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
+ (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
+ {
+ delta_time = GNUNET_TIME_absolute_get_duration (start_time);
+ printf ("decoded successfully in: %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
+ }
+ else
+ {
+ printf ("decode missed elements (should never happen)\n");
+ }
+ return;
+ }
+
+ if (side == 1)
+ iter_hashcodes (ibf_key, remove_iterator, set_a);
+ if (side == -1)
+ iter_hashcodes (ibf_key, remove_iterator, set_b);
+ }
+ printf ("cyclic IBF, %u/%u elements left\n",
+ GNUNET_CONTAINER_multihashmap_size (set_a)
+ + GNUNET_CONTAINER_multihashmap_size (set_b),
+ asize + bsize);
+}
+
+
+int
+main (int argc, char **argv)
+{
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_uint ('A',
+ "asize",
+ NULL,
+ gettext_noop ("number of element in set A-B"),
+ &asize),
+
+ GNUNET_GETOPT_option_uint ('B',
+ "bsize",
+ NULL,
+ gettext_noop ("number of element in set B-A"),
+ &bsize),
+
+ GNUNET_GETOPT_option_uint ('C',
+ "csize",
+ NULL,
+ gettext_noop (
+ "number of common elements in A and B"),
+ &csize),
+
+ GNUNET_GETOPT_option_uint ('k',
+ "hash-num",
+ NULL,
+ gettext_noop ("hash num"),
+ &hash_num),
+
+ GNUNET_GETOPT_option_uint ('s',
+ "ibf-size",
+ NULL,
+ gettext_noop ("ibf size"),
+ &ibf_size),
+
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ GNUNET_PROGRAM_run2 (argc,
+ argv,
+ "gnunet-consensus-ibf",
+ "help",
+ options,
+ &run,
+ NULL,
+ GNUNET_YES);
+ return 0;
+}
diff --git a/src/setu/gnunet-setu-profiler.c b/src/setu/gnunet-setu-profiler.c
new file mode 100644
index 000000000..8d6a2dc8c
--- /dev/null
+++ b/src/setu/gnunet-setu-profiler.c
@@ -0,0 +1,499 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2013 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file setu/gnunet-setu-profiler.c
+ * @brief profiling tool for set
+ * @author Florian Dold
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_statistics_service.h"
+#include "gnunet_setu_service.h"
+#include "gnunet_testbed_service.h"
+
+
+static int ret;
+
+static unsigned int num_a = 5;
+static unsigned int num_b = 5;
+static unsigned int num_c = 20;
+
+static char *op_str = "union";
+
+const static struct GNUNET_CONFIGURATION_Handle *config;
+
+struct SetInfo
+{
+ char *id;
+ struct GNUNET_SETU_Handle *set;
+ struct GNUNET_SETU_OperationHandle *oh;
+ struct GNUNET_CONTAINER_MultiHashMap *sent;
+ struct GNUNET_CONTAINER_MultiHashMap *received;
+ int done;
+} info1, info2;
+
+static struct GNUNET_CONTAINER_MultiHashMap *common_sent;
+
+static struct GNUNET_HashCode app_id;
+
+static struct GNUNET_PeerIdentity local_peer;
+
+static struct GNUNET_SETU_ListenHandle *set_listener;
+
+static int byzantine;
+static unsigned int force_delta;
+static unsigned int force_full;
+static unsigned int element_size = 32;
+
+/**
+ * Handle to the statistics service.
+ */
+static struct GNUNET_STATISTICS_Handle *statistics;
+
+/**
+ * The profiler will write statistics
+ * for all peers to the file with this name.
+ */
+static char *statistics_filename;
+
+/**
+ * The profiler will write statistics
+ * for all peers to this file.
+ */
+static FILE *statistics_file;
+
+
+static int
+map_remove_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_CONTAINER_MultiHashMap *m = cls;
+ int ret;
+
+ GNUNET_assert (NULL != key);
+
+ ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key);
+ if (GNUNET_OK != ret)
+ printf ("spurious element\n");
+ return GNUNET_YES;
+}
+
+
+/**
+ * Callback function to process statistic values.
+ *
+ * @param cls closure
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
+ */
+static int
+statistics_result (void *cls,
+ const char *subsystem,
+ const char *name,
+ uint64_t value,
+ int is_persistent)
+{
+ if (NULL != statistics_file)
+ {
+ fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned
+ long) value);
+ }
+ return GNUNET_OK;
+}
+
+
+static void
+statistics_done (void *cls,
+ int success)
+{
+ GNUNET_assert (GNUNET_YES == success);
+ if (NULL != statistics_file)
+ fclose (statistics_file);
+ GNUNET_SCHEDULER_shutdown ();
+}
+
+
+static void
+check_all_done (void)
+{
+ if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO))
+ return;
+
+ GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator,
+ info2.sent);
+ GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator,
+ info1.sent);
+
+ printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
+ info1.sent));
+ printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
+ info2.sent));
+
+ if (NULL == statistics_filename)
+ {
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+
+ statistics_file = fopen (statistics_filename, "w");
+ GNUNET_STATISTICS_get (statistics, NULL, NULL,
+ &statistics_done,
+ &statistics_result, NULL);
+}
+
+
+static void
+set_result_cb (void *cls,
+ const struct GNUNET_SETU_Element *element,
+ uint64_t current_size,
+ enum GNUNET_SETU_Status status)
+{
+ struct SetInfo *info = cls;
+
+ GNUNET_assert (GNUNET_NO == info->done);
+ switch (status)
+ {
+ case GNUNET_SETU_STATUS_DONE:
+ info->done = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id);
+ check_all_done ();
+ info->oh = NULL;
+ return;
+
+ case GNUNET_SETU_STATUS_FAILURE:
+ info->oh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+
+ case GNUNET_SETU_STATUS_ADD_LOCAL:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+
+ if (element->size != element_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "wrong element size: %u, expected %u\n",
+ element->size,
+ (unsigned int) sizeof(struct GNUNET_HashCode));
+ GNUNET_assert (0);
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n",
+ info->id, GNUNET_h2s (element->data));
+ GNUNET_assert (NULL != element->data);
+ struct GNUNET_HashCode data_hash;
+ GNUNET_CRYPTO_hash (element->data, element_size, &data_hash);
+ GNUNET_CONTAINER_multihashmap_put (info->received,
+ &data_hash, NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+}
+
+
+static void
+set_listen_cb (void *cls,
+ const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_MessageHeader *context_msg,
+ struct GNUNET_SETU_Request *request)
+{
+ /* max. 2 options plus terminator */
+ struct GNUNET_SETU_Option opts[3] = { { 0 } };
+ unsigned int n_opts = 0;
+
+ if (NULL == request)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "listener failed\n");
+ return;
+ }
+ GNUNET_assert (NULL == info2.oh);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "set listen cb called\n");
+ if (byzantine)
+ {
+ opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+ GNUNET_SETU_OPTION_BYZANTINE };
+ }
+ GNUNET_assert (! (force_full && force_delta));
+ if (force_full)
+ {
+ opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+ GNUNET_SETU_OPTION_FORCE_FULL };
+ }
+ if (force_delta)
+ {
+ opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+ GNUNET_SETU_OPTION_FORCE_DELTA };
+ }
+
+ opts[n_opts].type = 0;
+ info2.oh = GNUNET_SETU_accept (request,
+ opts,
+ set_result_cb, &info2);
+ GNUNET_SETU_commit (info2.oh, info2.set);
+}
+
+
+static int
+set_insert_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_SETU_Handle *set = cls;
+ struct GNUNET_SETU_Element el;
+
+ el.element_type = 0;
+ el.data = value;
+ el.size = element_size;
+ GNUNET_SETU_add_element (set, &el, NULL, NULL);
+ return GNUNET_YES;
+}
+
+
+static void
+handle_shutdown (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shutting down set profiler\n");
+ if (NULL != set_listener)
+ {
+ GNUNET_SETU_listen_cancel (set_listener);
+ set_listener = NULL;
+ }
+ if (NULL != info1.oh)
+ {
+ GNUNET_SETU_operation_cancel (info1.oh);
+ info1.oh = NULL;
+ }
+ if (NULL != info2.oh)
+ {
+ GNUNET_SETU_operation_cancel (info2.oh);
+ info2.oh = NULL;
+ }
+ if (NULL != info1.set)
+ {
+ GNUNET_SETU_destroy (info1.set);
+ info1.set = NULL;
+ }
+ if (NULL != info2.set)
+ {
+ GNUNET_SETU_destroy (info2.set);
+ info2.set = NULL;
+ }
+ GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
+}
+
+
+static void
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+{
+ unsigned int i;
+ struct GNUNET_HashCode hash;
+ /* max. 2 options plus terminator */
+ struct GNUNET_SETU_Option opts[3] = { { 0 } };
+ unsigned int n_opts = 0;
+
+ config = cfg;
+
+ GNUNET_assert (element_size > 0);
+
+ if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
+ ret = 0;
+ return;
+ }
+
+ statistics = GNUNET_STATISTICS_create ("set-profiler", cfg);
+
+ GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL);
+
+ info1.id = "a";
+ info2.id = "b";
+
+ info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
+ info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
+ common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO);
+
+ info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
+ info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
+
+ for (i = 0; i < num_a; i++)
+ {
+ char *data = GNUNET_malloc (element_size);
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
+ GNUNET_CRYPTO_hash (data, element_size, &hash);
+ GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ }
+
+ for (i = 0; i < num_b; i++)
+ {
+ char *data = GNUNET_malloc (element_size);
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
+ GNUNET_CRYPTO_hash (data, element_size, &hash);
+ GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ }
+
+ for (i = 0; i < num_c; i++)
+ {
+ char *data = GNUNET_malloc (element_size);
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
+ GNUNET_CRYPTO_hash (data, element_size, &hash);
+ GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ }
+
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id);
+
+ /* FIXME: also implement intersection etc. */
+ info1.set = GNUNET_SETU_create (config);
+ info2.set = GNUNET_SETU_create (config);
+
+ GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator,
+ info1.set);
+ GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator,
+ info2.set);
+ GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
+ info1.set);
+ GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
+ info2.set);
+
+ set_listener = GNUNET_SETU_listen (config,
+ &app_id,
+ &set_listen_cb,
+ NULL);
+
+
+ if (byzantine)
+ {
+ opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+ GNUNET_SETU_OPTION_BYZANTINE };
+ }
+ GNUNET_assert (! (force_full && force_delta));
+ if (force_full)
+ {
+ opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+ GNUNET_SETU_OPTION_FORCE_FULL };
+ }
+ if (force_delta)
+ {
+ opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
+ GNUNET_SETU_OPTION_FORCE_DELTA };
+ }
+
+ opts[n_opts].type = 0;
+
+ info1.oh = GNUNET_SETU_prepare (&local_peer, &app_id, NULL,
+ opts,
+ set_result_cb, &info1);
+ GNUNET_SETU_commit (info1.oh, info1.set);
+ GNUNET_SETU_destroy (info1.set);
+ info1.set = NULL;
+}
+
+
+static void
+pre_run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ if (0 != GNUNET_TESTING_peer_run ("set-profiler",
+ cfgfile,
+ &run, NULL))
+ ret = 2;
+}
+
+
+int
+main (int argc, char **argv)
+{
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_uint ('A',
+ "num-first",
+ NULL,
+ gettext_noop ("number of values"),
+ &num_a),
+
+ GNUNET_GETOPT_option_uint ('B',
+ "num-second",
+ NULL,
+ gettext_noop ("number of values"),
+ &num_b),
+
+ GNUNET_GETOPT_option_flag ('b',
+ "byzantine",
+ gettext_noop ("use byzantine mode"),
+ &byzantine),
+
+ GNUNET_GETOPT_option_uint ('f',
+ "force-full",
+ NULL,
+ gettext_noop ("force sending full set"),
+ &force_full),
+
+ GNUNET_GETOPT_option_uint ('d',
+ "force-delta",
+ NULL,
+ gettext_noop ("number delta operation"),
+ &force_delta),
+
+ GNUNET_GETOPT_option_uint ('C',
+ "num-common",
+ NULL,
+ gettext_noop ("number of values"),
+ &num_c),
+
+ GNUNET_GETOPT_option_string ('x',
+ "operation",
+ NULL,
+ gettext_noop ("operation to execute"),
+ &op_str),
+
+ GNUNET_GETOPT_option_uint ('w',
+ "element-size",
+ NULL,
+ gettext_noop ("element size"),
+ &element_size),
+
+ GNUNET_GETOPT_option_filename ('s',
+ "statistics",
+ "FILENAME",
+ gettext_noop ("write statistics to file"),
+ &statistics_filename),
+
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler",
+ "help",
+ options, &pre_run, NULL, GNUNET_YES);
+ return ret;
+}
diff --git a/src/setu/ibf.c b/src/setu/ibf.c
new file mode 100644
index 000000000..1532afceb
--- /dev/null
+++ b/src/setu/ibf.c
@@ -0,0 +1,409 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2012 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/ibf.c
+ * @brief implementation of the invertible bloom filter
+ * @author Florian Dold
+ */
+
+#include "ibf.h"
+
+/**
+ * Compute the key's hash from the key.
+ * Redefine to use a different hash function.
+ */
+#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof(struct \
+ IBF_KeyHash)))
+
+/**
+ * Create a key from a hashcode.
+ *
+ * @param hash the hashcode
+ * @return a key
+ */
+struct IBF_Key
+ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
+{
+ return *(struct IBF_Key *) hash;
+}
+
+
+/**
+ * Create a hashcode from a key, by replicating the key
+ * until the hascode is filled
+ *
+ * @param key the key
+ * @param dst hashcode to store the result in
+ */
+void
+ibf_hashcode_from_key (struct IBF_Key key,
+ struct GNUNET_HashCode *dst)
+{
+ struct IBF_Key *p;
+ unsigned int i;
+ const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode)
+ / sizeof(struct IBF_Key);
+
+ p = (struct IBF_Key *) dst;
+ for (i = 0; i < keys_per_hashcode; i++)
+ *p++ = key;
+}
+
+
+/**
+ * Create an invertible bloom filter.
+ *
+ * @param size number of IBF buckets
+ * @param hash_num number of buckets one element is hashed in
+ * @return the newly created invertible bloom filter, NULL on error
+ */
+struct InvertibleBloomFilter *
+ibf_create (uint32_t size, uint8_t hash_num)
+{
+ struct InvertibleBloomFilter *ibf;
+
+ GNUNET_assert (0 != size);
+
+ ibf = GNUNET_new (struct InvertibleBloomFilter);
+ ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t));
+ if (NULL == ibf->count)
+ {
+ GNUNET_free (ibf);
+ return NULL;
+ }
+ ibf->key_sum = GNUNET_malloc_large (size * sizeof(struct IBF_Key));
+ if (NULL == ibf->key_sum)
+ {
+ GNUNET_free (ibf->count);
+ GNUNET_free (ibf);
+ return NULL;
+ }
+ ibf->key_hash_sum = GNUNET_malloc_large (size * sizeof(struct IBF_KeyHash));
+ if (NULL == ibf->key_hash_sum)
+ {
+ GNUNET_free (ibf->key_sum);
+ GNUNET_free (ibf->count);
+ GNUNET_free (ibf);
+ return NULL;
+ }
+ ibf->size = size;
+ ibf->hash_num = hash_num;
+
+ return ibf;
+}
+
+
+/**
+ * Store unique bucket indices for the specified key in dst.
+ */
+static void
+ibf_get_indices (const struct InvertibleBloomFilter *ibf,
+ struct IBF_Key key,
+ int *dst)
+{
+ uint32_t filled;
+ uint32_t i;
+ uint32_t bucket;
+
+ bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key);
+ for (i = 0, filled = 0; filled < ibf->hash_num; i++)
+ {
+ unsigned int j;
+ uint64_t x;
+ for (j = 0; j < filled; j++)
+ if (dst[j] == bucket)
+ goto try_next;
+ dst[filled++] = bucket % ibf->size;
+try_next:;
+ x = ((uint64_t) bucket << 32) | i;
+ bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
+ }
+}
+
+
+static void
+ibf_insert_into (struct InvertibleBloomFilter *ibf,
+ struct IBF_Key key,
+ const int *buckets, int side)
+{
+ int i;
+
+ for (i = 0; i < ibf->hash_num; i++)
+ {
+ const int bucket = buckets[i];
+ ibf->count[bucket].count_val += side;
+ ibf->key_sum[bucket].key_val ^= key.key_val;
+ ibf->key_hash_sum[bucket].key_hash_val
+ ^= IBF_KEY_HASH_VAL (key);
+ }
+}
+
+
+/**
+ * Insert a key into an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
+{
+ int buckets[ibf->hash_num];
+
+ GNUNET_assert (ibf->hash_num <= ibf->size);
+ ibf_get_indices (ibf, key, buckets);
+ ibf_insert_into (ibf, key, buckets, 1);
+}
+
+
+/**
+ * Remove a key from an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
+{
+ int buckets[ibf->hash_num];
+
+ GNUNET_assert (ibf->hash_num <= ibf->size);
+ ibf_get_indices (ibf, key, buckets);
+ ibf_insert_into (ibf, key, buckets, -1);
+}
+
+
+/**
+ * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero.
+ */
+static int
+ibf_is_empty (struct InvertibleBloomFilter *ibf)
+{
+ int i;
+
+ for (i = 0; i < ibf->size; i++)
+ {
+ if (0 != ibf->count[i].count_val)
+ return GNUNET_NO;
+ if (0 != ibf->key_hash_sum[i].key_hash_val)
+ return GNUNET_NO;
+ if (0 != ibf->key_sum[i].key_val)
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Decode and remove an element from the IBF, if possible.
+ *
+ * @param ibf the invertible bloom filter to decode
+ * @param ret_side sign of the cell's count where the decoded element came from.
+ * A negative sign indicates that the element was recovered
+ * resides in an IBF that was previously subtracted from.
+ * @param ret_id receives the hash code of the decoded element, if successful
+ * @return GNUNET_YES if decoding an element was successful,
+ * GNUNET_NO if the IBF is empty,
+ * GNUNET_SYSERR if the decoding has failed
+ */
+int
+ibf_decode (struct InvertibleBloomFilter *ibf,
+ int *ret_side, struct IBF_Key *ret_id)
+{
+ struct IBF_KeyHash hash;
+ int i;
+ int buckets[ibf->hash_num];
+
+ GNUNET_assert (NULL != ibf);
+
+ for (i = 0; i < ibf->size; i++)
+ {
+ int j;
+ int hit;
+
+ /* we can only decode from pure buckets */
+ if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
+ continue;
+
+ hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]);
+
+ /* test if the hash matches the key */
+ if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)
+ continue;
+
+ /* test if key in bucket hits its own location,
+ * if not, the key hash was subject to collision */
+ hit = GNUNET_NO;
+ ibf_get_indices (ibf, ibf->key_sum[i], buckets);
+ for (j = 0; j < ibf->hash_num; j++)
+ if (buckets[j] == i)
+ hit = GNUNET_YES;
+
+ if (GNUNET_NO == hit)
+ continue;
+
+ if (NULL != ret_side)
+ *ret_side = ibf->count[i].count_val;
+ if (NULL != ret_id)
+ *ret_id = ibf->key_sum[i];
+
+ /* insert on the opposite side, effectively removing the element */
+ ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
+
+ return GNUNET_YES;
+ }
+
+ if (GNUNET_YES == ibf_is_empty (ibf))
+ return GNUNET_NO;
+ return GNUNET_SYSERR;
+}
+
+
+/**
+ * Write buckets from an ibf to a buffer.
+ * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
+ *
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ */
+void
+ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start,
+ uint32_t count, void *buf)
+{
+ struct IBF_Key *key_dst;
+ struct IBF_KeyHash *key_hash_dst;
+ struct IBF_Count *count_dst;
+
+ GNUNET_assert (start + count <= ibf->size);
+
+ /* copy keys */
+ key_dst = (struct IBF_Key *) buf;
+ GNUNET_memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst);
+ key_dst += count;
+ /* copy key hashes */
+ key_hash_dst = (struct IBF_KeyHash *) key_dst;
+ GNUNET_memcpy (key_hash_dst, ibf->key_hash_sum + start, count
+ * sizeof *key_hash_dst);
+ key_hash_dst += count;
+ /* copy counts */
+ count_dst = (struct IBF_Count *) key_hash_dst;
+ GNUNET_memcpy (count_dst, ibf->count + start, count * sizeof *count_dst);
+}
+
+
+/**
+ * Read buckets from a buffer into an ibf.
+ *
+ * @param buf pointer to the buffer to read from
+ * @param start which bucket to start at
+ * @param count how many buckets to read
+ * @param ibf the ibf to read from
+ */
+void
+ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct
+ InvertibleBloomFilter *ibf)
+{
+ struct IBF_Key *key_src;
+ struct IBF_KeyHash *key_hash_src;
+ struct IBF_Count *count_src;
+
+ GNUNET_assert (count > 0);
+ GNUNET_assert (start + count <= ibf->size);
+
+ /* copy keys */
+ key_src = (struct IBF_Key *) buf;
+ GNUNET_memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src);
+ key_src += count;
+ /* copy key hashes */
+ key_hash_src = (struct IBF_KeyHash *) key_src;
+ GNUNET_memcpy (ibf->key_hash_sum + start, key_hash_src, count
+ * sizeof *key_hash_src);
+ key_hash_src += count;
+ /* copy counts */
+ count_src = (struct IBF_Count *) key_hash_src;
+ GNUNET_memcpy (ibf->count + start, count_src, count * sizeof *count_src);
+}
+
+
+/**
+ * Subtract ibf2 from ibf1, storing the result in ibf1.
+ * The two IBF's must have the same parameters size and hash_num.
+ *
+ * @param ibf1 IBF that is subtracted from
+ * @param ibf2 IBF that will be subtracted from ibf1
+ */
+void
+ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct
+ InvertibleBloomFilter *ibf2)
+{
+ int i;
+
+ GNUNET_assert (ibf1->size == ibf2->size);
+ GNUNET_assert (ibf1->hash_num == ibf2->hash_num);
+
+ for (i = 0; i < ibf1->size; i++)
+ {
+ ibf1->count[i].count_val -= ibf2->count[i].count_val;
+ ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val;
+ ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val;
+ }
+}
+
+
+/**
+ * Create a copy of an IBF, the copy has to be destroyed properly.
+ *
+ * @param ibf the IBF to copy
+ */
+struct InvertibleBloomFilter *
+ibf_dup (const struct InvertibleBloomFilter *ibf)
+{
+ struct InvertibleBloomFilter *copy;
+
+ copy = GNUNET_malloc (sizeof *copy);
+ copy->hash_num = ibf->hash_num;
+ copy->size = ibf->size;
+ copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size
+ * sizeof(struct IBF_KeyHash));
+ copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof(struct
+ IBF_Key));
+ copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof(struct
+ IBF_Count));
+ return copy;
+}
+
+
+/**
+ * Destroy all resources associated with the invertible bloom filter.
+ * No more ibf_*-functions may be called on ibf after calling destroy.
+ *
+ * @param ibf the intertible bloom filter to destroy
+ */
+void
+ibf_destroy (struct InvertibleBloomFilter *ibf)
+{
+ GNUNET_free (ibf->key_sum);
+ GNUNET_free (ibf->key_hash_sum);
+ GNUNET_free (ibf->count);
+ GNUNET_free (ibf);
+}
diff --git a/src/setu/ibf.h b/src/setu/ibf.h
new file mode 100644
index 000000000..7c2ab33b1
--- /dev/null
+++ b/src/setu/ibf.h
@@ -0,0 +1,255 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2012 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/ibf.h
+ * @brief invertible bloom filter
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_CONSENSUS_IBF_H
+#define GNUNET_CONSENSUS_IBF_H
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0 /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+
+/**
+ * Keys that can be inserted into and removed from an IBF.
+ */
+struct IBF_Key
+{
+ uint64_t key_val;
+};
+
+
+/**
+ * Hash of an IBF key.
+ */
+struct IBF_KeyHash
+{
+ uint32_t key_hash_val;
+};
+
+
+/**
+ * Type of the count field of IBF buckets.
+ */
+struct IBF_Count
+{
+ int8_t count_val;
+};
+
+
+/**
+ * Size of one ibf bucket in bytes
+ */
+#define IBF_BUCKET_SIZE (sizeof(struct IBF_Count) + sizeof(struct IBF_Key) \
+ + sizeof(struct IBF_KeyHash))
+
+
+/**
+ * Invertible bloom filter (IBF).
+ *
+ * An IBF is a counting bloom filter that has the ability to restore
+ * the hashes of its stored elements with high probability.
+ */
+struct InvertibleBloomFilter
+{
+ /**
+ * How many cells does this IBF have?
+ */
+ uint32_t size;
+
+ /**
+ * In how many cells do we hash one element?
+ * Usually 4 or 3.
+ */
+ uint8_t hash_num;
+
+ /**
+ * Xor sums of the elements' keys, used to identify the elements.
+ * Array of 'size' elements.
+ */
+ struct IBF_Key *key_sum;
+
+ /**
+ * Xor sums of the hashes of the keys of inserted elements.
+ * Array of 'size' elements.
+ */
+ struct IBF_KeyHash *key_hash_sum;
+
+ /**
+ * How many times has a bucket been hit?
+ * Can be negative, as a result of IBF subtraction.
+ * Array of 'size' elements.
+ */
+ struct IBF_Count *count;
+};
+
+
+/**
+ * Write buckets from an ibf to a buffer.
+ * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
+ *
+ * @param ibf the ibf to write
+ * @param start with which bucket to start
+ * @param count how many buckets to write
+ * @param buf buffer to write the data to
+ */
+void
+ibf_write_slice (const struct InvertibleBloomFilter *ibf,
+ uint32_t start,
+ uint32_t count,
+ void *buf);
+
+
+/**
+ * Read buckets from a buffer into an ibf.
+ *
+ * @param buf pointer to the buffer to read from
+ * @param start which bucket to start at
+ * @param count how many buckets to read
+ * @param ibf the ibf to write to
+ */
+void
+ibf_read_slice (const void *buf,
+ uint32_t start,
+ uint32_t count,
+ struct InvertibleBloomFilter *ibf);
+
+
+/**
+ * Create a key from a hashcode.
+ *
+ * @param hash the hashcode
+ * @return a key
+ */
+struct IBF_Key
+ibf_key_from_hashcode (const struct GNUNET_HashCode *hash);
+
+
+/**
+ * Create a hashcode from a key, by replicating the key
+ * until the hascode is filled
+ *
+ * @param key the key
+ * @param dst hashcode to store the result in
+ */
+void
+ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst);
+
+
+/**
+ * Create an invertible bloom filter.
+ *
+ * @param size number of IBF buckets
+ * @param hash_num number of buckets one element is hashed in, usually 3 or 4
+ * @return the newly created invertible bloom filter, NULL on error
+ */
+struct InvertibleBloomFilter *
+ibf_create (uint32_t size, uint8_t hash_num);
+
+
+/**
+ * Insert a key into an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
+
+
+/**
+ * Remove a key from an IBF.
+ *
+ * @param ibf the IBF
+ * @param key the element's hash code
+ */
+void
+ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
+
+
+/**
+ * Subtract ibf2 from ibf1, storing the result in ibf1.
+ * The two IBF's must have the same parameters size and hash_num.
+ *
+ * @param ibf1 IBF that is subtracted from
+ * @param ibf2 IBF that will be subtracted from ibf1
+ */
+void
+ibf_subtract (struct InvertibleBloomFilter *ibf1,
+ const struct InvertibleBloomFilter *ibf2);
+
+
+/**
+ * Decode and remove an element from the IBF, if possible.
+ *
+ * @param ibf the invertible bloom filter to decode
+ * @param ret_side sign of the cell's count where the decoded element came from.
+ * A negative sign indicates that the element was recovered
+ * resides in an IBF that was previously subtracted from.
+ * @param ret_id receives the hash code of the decoded element, if successful
+ * @return #GNUNET_YES if decoding an element was successful,
+ * #GNUNET_NO if the IBF is empty,
+ * #GNUNET_SYSERR if the decoding has failed
+ */
+int
+ibf_decode (struct InvertibleBloomFilter *ibf,
+ int *ret_side,
+ struct IBF_Key *ret_id);
+
+
+/**
+ * Create a copy of an IBF, the copy has to be destroyed properly.
+ *
+ * @param ibf the IBF to copy
+ */
+struct InvertibleBloomFilter *
+ibf_dup (const struct InvertibleBloomFilter *ibf);
+
+
+/**
+ * Destroy all resources associated with the invertible bloom filter.
+ * No more ibf_*-functions may be called on ibf after calling destroy.
+ *
+ * @param ibf the intertible bloom filter to destroy
+ */
+void
+ibf_destroy (struct InvertibleBloomFilter *ibf);
+
+
+#if 0 /* keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/setu/ibf_sim.c b/src/setu/ibf_sim.c
new file mode 100644
index 000000000..6415d00e1
--- /dev/null
+++ b/src/setu/ibf_sim.c
@@ -0,0 +1,142 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2013 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/ibf_sim.c
+ * @brief implementation of simulation for invertible bloom filter
+ * @author Florian Dold
+ *
+ * This code was used for some internal experiments, it is not
+ * build or shipped as part of the GNUnet system.
+ */
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#define MAX_IBF_DECODE 16
+
+/* report average over how many rounds? */
+#define ROUNDS 100000
+
+/* enable one of the three below */
+// simple fix
+#define FIX1 0
+// possibly slightly better fix for large IBF_DECODE values
+#define FIX2 1
+
+// SIGCOMM algorithm
+#define STRATA 0
+
+// print each value?
+#define VERBOSE 0
+// avoid assembly? (ASM is about 50% faster)
+#define SLOW 0
+
+int
+main (int argc, char **argv)
+{
+ unsigned int round;
+ unsigned int buckets[31]; // max is 2^31 as 'random' returns only between 0 and 2^31
+ unsigned int i;
+ int j;
+ unsigned int r;
+ unsigned int ret;
+ unsigned long long total;
+ unsigned int want;
+ double predict;
+
+ srandom (time (NULL));
+ total = 0;
+ want = atoi (argv[1]);
+ for (round = 0; round < ROUNDS; round++)
+ {
+ memset (buckets, 0, sizeof(buckets));
+ for (i = 0; i < want; i++)
+ {
+ /* FIXME: might want to use 'better' PRNG to avoid
+ PRNG-induced biases */
+ r = random ();
+ if (0 == r)
+ continue;
+#if SLOW
+ for (j = 0; (j < 31) && (0 == (r & (1 << j))); j++)
+ ;
+#else
+ /* use assembly / gcc */
+ j = __builtin_ffs (r) - 1;
+#endif
+ buckets[j]++;
+ }
+ ret = 0;
+ predict = 0.0;
+ for (j = 31; j >= 0; j--)
+ {
+#if FIX1
+ /* improved algorithm, for 1000 elements with IBF-DECODE 8, I
+ get 990/1000 elements on average over 1 million runs; key
+ idea being to stop short of the 'last' possible IBF as
+ otherwise a "lowball" per-chance would unduely influence the
+ result */if ((j > 0) &&
+ (buckets[j - 1] > MAX_IBF_DECODE))
+ {
+ ret *= (1 << (j + 1));
+ break;
+ }
+#endif
+#if FIX2
+ /* another improvement: don't just always cut off the last one,
+ but rather try to predict based on all previous values where
+ that "last" one is; additional prediction can only really
+ work if MAX_IBF_DECODE is sufficiently high */
+ if ((j > 0) &&
+ ((buckets[j - 1] > MAX_IBF_DECODE) ||
+ (predict > MAX_IBF_DECODE)))
+ {
+ ret *= (1 << (j + 1));
+ break;
+ }
+#endif
+#if STRATA
+ /* original algorithm, for 1000 elements with IBF-DECODE 8,
+ I get 920/1000 elements on average over 1 million runs */
+ if (buckets[j] > MAX_IBF_DECODE)
+ {
+ ret *= (1 << (j + 1));
+ break;
+ }
+#endif
+ ret += buckets[j];
+ predict = (buckets[j] + 2.0 * predict) / 2.0;
+ }
+#if VERBOSE
+ fprintf (stderr, "%u ", ret);
+#endif
+ total += ret;
+ }
+ fprintf (stderr, "\n");
+ fprintf (stdout, "average %llu\n", total / ROUNDS);
+ return 0;
+}
+
+
+/* TODO: should calculate stddev of the results to also be able to
+ say something about the stability of the results, outside of
+ large-scale averages -- gaining 8% precision at the expense of
+ 50% additional variance might not be worth it... */
diff --git a/src/setu/plugin_block_setu_test.c b/src/setu/plugin_block_setu_test.c
new file mode 100644
index 000000000..1de086092
--- /dev/null
+++ b/src/setu/plugin_block_setu_test.c
@@ -0,0 +1,123 @@
+/*
+ This file is part of GNUnet
+ Copyright (C) 2017 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/plugin_block_set_test.c
+ * @brief set test block, recognizes elements with non-zero first byte as invalid
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_block_plugin.h"
+#include "gnunet_block_group_lib.h"
+
+
+/**
+ * Function called to validate a reply or a request. For
+ * request evaluation, simply pass "NULL" for the reply_block.
+ *
+ * @param cls closure
+ * @param ctx block context
+ * @param type block type
+ * @param group block group to use
+ * @param eo control flags
+ * @param query original query (hash)
+ * @param xquery extrended query data (can be NULL, depending on type)
+ * @param xquery_size number of bytes in xquery
+ * @param reply_block response to validate
+ * @param reply_block_size number of bytes in reply block
+ * @return characterization of result
+ */
+static enum GNUNET_BLOCK_EvaluationResult
+block_plugin_set_test_evaluate (void *cls,
+ struct GNUNET_BLOCK_Context *ctx,
+ enum GNUNET_BLOCK_Type type,
+ struct GNUNET_BLOCK_Group *group,
+ enum GNUNET_BLOCK_EvaluationOptions eo,
+ const struct GNUNET_HashCode *query,
+ const void *xquery,
+ size_t xquery_size,
+ const void *reply_block,
+ size_t reply_block_size)
+{
+ if ((NULL == reply_block) ||
+ (reply_block_size == 0) ||
+ (0 != ((char *) reply_block)[0]))
+ return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
+ return GNUNET_BLOCK_EVALUATION_OK_MORE;
+}
+
+
+/**
+ * Function called to obtain the key for a block.
+ *
+ * @param cls closure
+ * @param type block type
+ * @param block block to get the key for
+ * @param block_size number of bytes in block
+ * @param key set to the key (query) for the given block
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported
+ * (or if extracting a key from a block of this type does not work)
+ */
+static int
+block_plugin_set_test_get_key (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ const void *block,
+ size_t block_size,
+ struct GNUNET_HashCode *key)
+{
+ return GNUNET_SYSERR;
+}
+
+
+/**
+ * Entry point for the plugin.
+ */
+void *
+libgnunet_plugin_block_set_test_init (void *cls)
+{
+ static enum GNUNET_BLOCK_Type types[] = {
+ GNUNET_BLOCK_TYPE_SET_TEST,
+ GNUNET_BLOCK_TYPE_ANY /* end of list */
+ };
+ struct GNUNET_BLOCK_PluginFunctions *api;
+
+ api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions);
+ api->evaluate = &block_plugin_set_test_evaluate;
+ api->get_key = &block_plugin_set_test_get_key;
+ api->types = types;
+ return api;
+}
+
+
+/**
+ * Exit point from the plugin.
+ */
+void *
+libgnunet_plugin_block_set_test_done (void *cls)
+{
+ struct GNUNET_BLOCK_PluginFunctions *api = cls;
+
+ GNUNET_free (api);
+ return NULL;
+}
+
+
+/* end of plugin_block_set_test.c */
diff --git a/src/setu/setu.h b/src/setu/setu.h
new file mode 100644
index 000000000..f1c5df92b
--- /dev/null
+++ b/src/setu/setu.h
@@ -0,0 +1,304 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2012-2014, 2020 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/set.h
+ * @brief messages used for the set union api
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#ifndef SET_H
+#define SET_H
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_set_service.h"
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+/**
+ * Message sent by the client to the service to ask starting
+ * a new set to perform operations with. Includes the desired
+ * set operation type.
+ */
+struct GNUNET_SETU_CreateMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_CREATE
+ */
+ struct GNUNET_MessageHeader header;
+
+};
+
+
+/**
+ * Message sent by the client to the service to start listening for
+ * incoming requests to perform a certain type of set operation for a
+ * certain type of application.
+ */
+struct GNUNET_SETU_ListenMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_LISTEN
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Always zero.
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * application id
+ */
+ struct GNUNET_HashCode app_id;
+};
+
+
+/**
+ * Message sent by a listening client to the service to accept
+ * performing the operation with the other peer.
+ */
+struct GNUNET_SETU_AcceptMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_ACCEPT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * ID of the incoming request we want to accept.
+ */
+ uint32_t accept_reject_id GNUNET_PACKED;
+
+ /**
+ * Request ID to identify responses.
+ */
+ uint32_t request_id GNUNET_PACKED;
+
+ /**
+ * Always use delta operation instead of sending full sets,
+ * even it it's less efficient.
+ */
+ uint8_t force_delta;
+
+ /**
+ * Always send full sets, even if delta operations would
+ * be more efficient.
+ */
+ uint8_t force_full;
+
+ /**
+ * #GNUNET_YES to fail operations where Byzantine faults
+ * are suspected
+ */
+ uint8_t byzantine;
+
+ /**
+ * Lower bound for the set size, used only when
+ * byzantine mode is enabled.
+ */
+ uint8_t byzantine_lower_bound;
+};
+
+
+/**
+ * Message sent by a listening client to the service to reject
+ * performing the operation with the other peer.
+ */
+struct GNUNET_SETU_RejectMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_REJECT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * ID of the incoming request we want to reject.
+ */
+ uint32_t accept_reject_id GNUNET_PACKED;
+};
+
+
+/**
+ * A request for an operation with another client.
+ */
+struct GNUNET_SETU_RequestMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_REQUEST.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * ID of the to identify the request when accepting or
+ * rejecting it.
+ */
+ uint32_t accept_id GNUNET_PACKED;
+
+ /**
+ * Identity of the requesting peer.
+ */
+ struct GNUNET_PeerIdentity peer_id;
+
+ /* rest: context message, that is, application-specific
+ message to convince listener to pick up */
+};
+
+
+/**
+ * Message sent by client to service to initiate a set operation as a
+ * client (not as listener). A set (which determines the operation
+ * type) must already exist in association with this client.
+ */
+struct GNUNET_SETU_EvaluateMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_EVALUATE
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Id of our set to evaluate, chosen implicitly by the client when it
+ * calls #GNUNET_SETU_commit().
+ */
+ uint32_t request_id GNUNET_PACKED;
+
+ /**
+ * Peer to evaluate the operation with
+ */
+ struct GNUNET_PeerIdentity target_peer;
+
+ /**
+ * Application id
+ */
+ struct GNUNET_HashCode app_id;
+
+ /**
+ * Always use delta operation instead of sending full sets,
+ * even it it's less efficient.
+ */
+ uint8_t force_delta;
+
+ /**
+ * Always send full sets, even if delta operations would
+ * be more efficient.
+ */
+ uint8_t force_full;
+
+ /**
+ * #GNUNET_YES to fail operations where Byzantine faults
+ * are suspected
+ */
+ uint8_t byzantine;
+
+ /**
+ * Lower bound for the set size, used only when
+ * byzantine mode is enabled.
+ */
+ uint8_t byzantine_lower_bound;
+
+ /* rest: context message, that is, application-specific
+ message to convince listener to pick up */
+};
+
+
+/**
+ * Message sent by the service to the client to indicate an
+ * element that is removed (set intersection) or added
+ * (set union) or part of the final result, depending on
+ * options specified for the operation.
+ */
+struct GNUNET_SETU_ResultMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_RESULT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Current set size.
+ */
+ uint64_t current_size;
+
+ /**
+ * id the result belongs to
+ */
+ uint32_t request_id GNUNET_PACKED;
+
+ /**
+ * Was the evaluation successful? Contains
+ * an `enum GNUNET_SETU_Status` in NBO.
+ */
+ uint16_t result_status GNUNET_PACKED;
+
+ /**
+ * Type of the element attachted to the message, if any.
+ */
+ uint16_t element_type GNUNET_PACKED;
+
+ /* rest: the actual element */
+};
+
+
+/**
+ * Message sent by client to the service to add
+ * an element to the set.
+ */
+struct GNUNET_SETU_ElementMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_ADD
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Type of the element to add or remove.
+ */
+ uint16_t element_type GNUNET_PACKED;
+
+ /**
+ * For alignment, always zero.
+ */
+ uint16_t reserved GNUNET_PACKED;
+
+ /* rest: the actual element */
+};
+
+
+/**
+ * Sent to the service by the client in order to cancel a set operation.
+ */
+struct GNUNET_SETU_CancelMessage
+{
+ /**
+ * Type: #GNUNET_MESSAGE_TYPE_SETU_CANCEL
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * ID of the request we want to cancel.
+ */
+ uint32_t request_id GNUNET_PACKED;
+};
+
+
+GNUNET_NETWORK_STRUCT_END
+
+#endif
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c
new file mode 100644
index 000000000..48260de55
--- /dev/null
+++ b/src/setu/setu_api.c
@@ -0,0 +1,872 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2012-2016, 2020 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file set/setu_api.c
+ * @brief api for the set union service
+ * @author Florian Dold
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_setu_service.h"
+#include "setu.h"
+
+
+#define LOG(kind, ...) GNUNET_log_from (kind, "set-api", __VA_ARGS__)
+
+/**
+ * Opaque handle to a set.
+ */
+struct GNUNET_SETU_Handle
+{
+ /**
+ * Message queue for @e client.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Linked list of operations on the set.
+ */
+ struct GNUNET_SETU_OperationHandle *ops_head;
+
+ /**
+ * Linked list of operations on the set.
+ */
+ struct GNUNET_SETU_OperationHandle *ops_tail;
+
+ /**
+ * Should the set be destroyed once all operations are gone?
+ * #GNUNET_SYSERR if #GNUNET_SETU_destroy() must raise this flag,
+ * #GNUNET_YES if #GNUNET_SETU_destroy() did raise this flag.
+ */
+ int destroy_requested;
+
+ /**
+ * Has the set become invalid (e.g. service died)?
+ */
+ int invalid;
+
+};
+
+
+/**
+ * Handle for a set operation request from another peer.
+ */
+struct GNUNET_SETU_Request
+{
+ /**
+ * Id of the request, used to identify the request when
+ * accepting/rejecting it.
+ */
+ uint32_t accept_id;
+
+ /**
+ * Has the request been accepted already?
+ * #GNUNET_YES/#GNUNET_NO
+ */
+ int accepted;
+};
+
+
+/**
+ * Handle to an operation. Only known to the service after committing
+ * the handle with a set.
+ */
+struct GNUNET_SETU_OperationHandle
+{
+ /**
+ * Function to be called when we have a result,
+ * or an error.
+ */
+ GNUNET_SETU_ResultIterator result_cb;
+
+ /**
+ * Closure for @e result_cb.
+ */
+ void *result_cls;
+
+ /**
+ * Local set used for the operation,
+ * NULL if no set has been provided by conclude yet.
+ */
+ struct GNUNET_SETU_Handle *set;
+
+ /**
+ * Message sent to the server on calling conclude,
+ * NULL if conclude has been called.
+ */
+ struct GNUNET_MQ_Envelope *conclude_mqm;
+
+ /**
+ * Address of the request if in the conclude message,
+ * used to patch the request id into the message when the set is known.
+ */
+ uint32_t *request_id_addr;
+
+ /**
+ * Handles are kept in a linked list.
+ */
+ struct GNUNET_SETU_OperationHandle *prev;
+
+ /**
+ * Handles are kept in a linked list.
+ */
+ struct GNUNET_SETU_OperationHandle *next;
+
+ /**
+ * Request ID to identify the operation within the set.
+ */
+ uint32_t request_id;
+};
+
+
+/**
+ * Opaque handle to a listen operation.
+ */
+struct GNUNET_SETU_ListenHandle
+{
+ /**
+ * Message queue for the client.
+ */
+ struct GNUNET_MQ_Handle*mq;
+
+ /**
+ * Configuration handle for the listener, stored
+ * here to be able to reconnect transparently on
+ * connection failure.
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Function to call on a new incoming request,
+ * or on error.
+ */
+ GNUNET_SETU_ListenCallback listen_cb;
+
+ /**
+ * Closure for @e listen_cb.
+ */
+ void *listen_cls;
+
+ /**
+ * Application ID we listen for.
+ */
+ struct GNUNET_HashCode app_id;
+
+ /**
+ * Time to wait until we try to reconnect on failure.
+ */
+ struct GNUNET_TIME_Relative reconnect_backoff;
+
+ /**
+ * Task for reconnecting when the listener fails.
+ */
+ struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+};
+
+
+/**
+ * Check that the given @a msg is well-formed.
+ *
+ * @param cls closure
+ * @param msg message to check
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_result (void *cls,
+ const struct GNUNET_SETU_ResultMessage *msg)
+{
+ /* minimum size was already checked, everything else is OK! */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle result message for a set operation.
+ *
+ * @param cls the set
+ * @param mh the message
+ */
+static void
+handle_result (void *cls,
+ const struct GNUNET_SETU_ResultMessage *msg)
+{
+ struct GNUNET_SETU_Handle *set = cls;
+ struct GNUNET_SETU_OperationHandle *oh;
+ struct GNUNET_SETU_Element e;
+ enum GNUNET_SETU_Status result_status;
+ int destroy_set;
+
+ GNUNET_assert (NULL != set->mq);
+ result_status = (enum GNUNET_SETU_Status) ntohs (msg->result_status);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got result message with status %d\n",
+ result_status);
+ oh = GNUNET_MQ_assoc_get (set->mq,
+ ntohl (msg->request_id));
+ if (NULL == oh)
+ {
+ /* 'oh' can be NULL if we canceled the operation, but the service
+ did not get the cancel message yet. */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring result from canceled operation\n");
+ return;
+ }
+
+ switch (result_status)
+ {
+ case GNUNET_SETU_STATUS_ADD_LOCAL:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Treating result as element\n");
+ e.data = &msg[1];
+ e.size = ntohs (msg->header.size)
+ - sizeof(struct GNUNET_SETU_ResultMessage);
+ e.element_type = ntohs (msg->element_type);
+ if (NULL != oh->result_cb)
+ oh->result_cb (oh->result_cls,
+ &e,
+ GNUNET_ntohll (msg->current_size),
+ result_status);
+ return;
+ case GNUNET_SETU_STATUS_FAILURE:
+ case GNUNET_SETU_STATUS_DONE:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Treating result as final status\n");
+ GNUNET_MQ_assoc_remove (set->mq,
+ ntohl (msg->request_id));
+ GNUNET_CONTAINER_DLL_remove (set->ops_head,
+ set->ops_tail,
+ oh);
+ /* Need to do this calculation _before_ the result callback,
+ as IF the application still has a valid set handle, it
+ may trigger destruction of the set during the callback. */
+ destroy_set = (GNUNET_YES == set->destroy_requested) &&
+ (NULL == set->ops_head);
+ if (NULL != oh->result_cb)
+ {
+ oh->result_cb (oh->result_cls,
+ NULL,
+ GNUNET_ntohll (msg->current_size),
+ result_status);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "No callback for final status\n");
+ }
+ if (destroy_set)
+ GNUNET_SETU_destroy (set);
+ GNUNET_free (oh);
+ return;
+ }
+}
+
+
+/**
+ * Destroy the given set operation.
+ *
+ * @param oh set operation to destroy
+ */
+static void
+set_operation_destroy (struct GNUNET_SETU_OperationHandle *oh)
+{
+ struct GNUNET_SETU_Handle *set = oh->set;
+ struct GNUNET_SETU_OperationHandle *h_assoc;
+
+ if (NULL != oh->conclude_mqm)
+ GNUNET_MQ_discard (oh->conclude_mqm);
+ /* is the operation already commited? */
+ if (NULL != set)
+ {
+ GNUNET_CONTAINER_DLL_remove (set->ops_head,
+ set->ops_tail,
+ oh);
+ h_assoc = GNUNET_MQ_assoc_remove (set->mq,
+ oh->request_id);
+ GNUNET_assert ((NULL == h_assoc) ||
+ (h_assoc == oh));
+ }
+ GNUNET_free (oh);
+}
+
+
+/**
+ * Cancel the given set operation. We need to send an explicit cancel
+ * message, as all operations one one set communicate using one
+ * handle.
+ *
+ * @param oh set operation to cancel
+ */
+void
+GNUNET_SETU_operation_cancel (struct GNUNET_SETU_OperationHandle *oh)
+{
+ struct GNUNET_SETU_Handle *set = oh->set;
+ struct GNUNET_SETU_CancelMessage *m;
+ struct GNUNET_MQ_Envelope *mqm;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelling SET operation\n");
+ if (NULL != set)
+ {
+ mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETU_CANCEL);
+ m->request_id = htonl (oh->request_id);
+ GNUNET_MQ_send (set->mq, mqm);
+ }
+ set_operation_destroy (oh);
+ if ((NULL != set) &&
+ (GNUNET_YES == set->destroy_requested) &&
+ (NULL == set->ops_head))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying set after operation cancel\n");
+ GNUNET_SETU_destroy (set);
+ }
+}
+
+
+/**
+ * We encountered an error communicating with the set service while
+ * performing a set operation. Report to the application.
+ *
+ * @param cls the `struct GNUNET_SETU_Handle`
+ * @param error error code
+ */
+static void
+handle_client_set_error (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_SETU_Handle *set = cls;
+
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Handling client set error %d\n",
+ error);
+ while (NULL != set->ops_head)
+ {
+ if ((NULL != set->ops_head->result_cb) &&
+ (GNUNET_NO == set->destroy_requested))
+ set->ops_head->result_cb (set->ops_head->result_cls,
+ NULL,
+ 0,
+ GNUNET_SETU_STATUS_FAILURE);
+ set_operation_destroy (set->ops_head);
+ }
+ set->invalid = GNUNET_YES;
+}
+
+
+/**
+ * Create an empty set, supporting the specified operation.
+ *
+ * @param cfg configuration to use for connecting to the
+ * set service
+ * @return a handle to the set
+ */
+struct GNUNET_SETU_Handle *
+GNUNET_SETU_create (const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_SETU_Handle *set = GNUNET_new (struct GNUNET_SETU_Handle);
+ struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ GNUNET_MQ_hd_var_size (result,
+ GNUNET_MESSAGE_TYPE_SETU_RESULT,
+ struct GNUNET_SETU_ResultMessage,
+ set),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SETU_CreateMessage *create_msg;
+
+ set->mq = GNUNET_CLIENT_connect (cfg,
+ "setu",
+ mq_handlers,
+ &handle_client_set_error,
+ set);
+ if (NULL == set->mq)
+ {
+ GNUNET_free (set);
+ return NULL;
+ }
+ mqm = GNUNET_MQ_msg (create_msg,
+ GNUNET_MESSAGE_TYPE_SETU_CREATE);
+ GNUNET_MQ_send (set->mq,
+ mqm);
+ return set;
+}
+
+
+/**
+ * Add an element to the given set. After the element has been added
+ * (in the sense of being transmitted to the set service), @a cont
+ * will be called. Multiple calls to GNUNET_SETU_add_element() can be
+ * queued.
+ *
+ * @param set set to add element to
+ * @param element element to add to the set
+ * @param cb continuation called after the element has been added
+ * @param cb_cls closure for @a cb
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
+ * set is invalid (e.g. the set service crashed)
+ */
+int
+GNUNET_SETU_add_element (struct GNUNET_SETU_Handle *set,
+ const struct GNUNET_SETU_Element *element,
+ GNUNET_SCHEDULER_TaskCallback cb,
+ void *cb_cls)
+{
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SETU_ElementMessage *msg;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "adding element of type %u to set %p\n",
+ (unsigned int) element->element_type,
+ set);
+ GNUNET_assert (NULL != set);
+ if (GNUNET_YES == set->invalid)
+ {
+ if (NULL != cb)
+ cb (cb_cls);
+ return GNUNET_SYSERR;
+ }
+ mqm = GNUNET_MQ_msg_extra (msg,
+ element->size,
+ GNUNET_MESSAGE_TYPE_SETU_ADD);
+ msg->element_type = htons (element->element_type);
+ GNUNET_memcpy (&msg[1],
+ element->data,
+ element->size);
+ GNUNET_MQ_notify_sent (mqm,
+ cb,
+ cb_cls);
+ GNUNET_MQ_send (set->mq,
+ mqm);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Destroy the set handle if no operations are left, mark the set
+ * for destruction otherwise.
+ *
+ * @param set set handle to destroy
+ */
+void
+GNUNET_SETU_destroy (struct GNUNET_SETU_Handle *set)
+{
+ /* destroying set while iterator is active is currently
+ not supported; we should expand the API to allow
+ clients to explicitly cancel the iteration! */
+ GNUNET_assert (NULL != set);
+ if ((NULL != set->ops_head) ||
+ (GNUNET_SYSERR == set->destroy_requested))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Set operations are pending, delaying set destruction\n");
+ set->destroy_requested = GNUNET_YES;
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Really destroying set\n");
+ if (NULL != set->mq)
+ {
+ GNUNET_MQ_destroy (set->mq);
+ set->mq = NULL;
+ }
+ GNUNET_free (set);
+}
+
+
+/**
+ * Prepare a set operation to be evaluated with another peer.
+ * The evaluation will not start until the client provides
+ * a local set with #GNUNET_SETU_commit().
+ *
+ * @param other_peer peer with the other set
+ * @param app_id hash for the application using the set
+ * @param context_msg additional information for the request
+ * @param result_cb called on error or success
+ * @param result_cls closure for @e result_cb
+ * @return a handle to cancel the operation
+ */
+struct GNUNET_SETU_OperationHandle *
+GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_HashCode *app_id,
+ const struct GNUNET_MessageHeader *context_msg,
+ const struct GNUNET_SETU_Option options[],
+ GNUNET_SETU_ResultIterator result_cb,
+ void *result_cls)
+{
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SETU_OperationHandle *oh;
+ struct GNUNET_SETU_EvaluateMessage *msg;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client prepares set union operation\n");
+ oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
+ oh->result_cb = result_cb;
+ oh->result_cls = result_cls;
+ mqm = GNUNET_MQ_msg_nested_mh (msg,
+ GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
+ context_msg);
+ msg->app_id = *app_id;
+ msg->target_peer = *other_peer;
+ for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
+ {
+ switch (opt->type)
+ {
+ case GNUNET_SETU_OPTION_BYZANTINE:
+ msg->byzantine = GNUNET_YES;
+ msg->byzantine_lower_bound = opt->v.num;
+ break;
+ case GNUNET_SETU_OPTION_FORCE_FULL:
+ msg->force_full = GNUNET_YES;
+ break;
+ case GNUNET_SETU_OPTION_FORCE_DELTA:
+ msg->force_delta = GNUNET_YES;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Option with type %d not recognized\n",
+ (int) opt->type);
+ }
+ }
+ oh->conclude_mqm = mqm;
+ oh->request_id_addr = &msg->request_id;
+ return oh;
+}
+
+
+/**
+ * Connect to the set service in order to listen for requests.
+ *
+ * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
+ */
+static void
+listen_connect (void *cls);
+
+
+/**
+ * Check validity of request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param msg the message
+ * @return #GNUNET_OK if the message is well-formed
+ */
+static int
+check_request (void *cls,
+ const struct GNUNET_SETU_RequestMessage *msg)
+{
+ const struct GNUNET_MessageHeader *context_msg;
+
+ if (ntohs (msg->header.size) == sizeof(*msg))
+ return GNUNET_OK; /* no context message is OK */
+ context_msg = GNUNET_MQ_extract_nested_mh (msg);
+ if (NULL == context_msg)
+ {
+ /* malformed context message is NOT ok */
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle request message for a listen operation
+ *
+ * @param cls the listen handle
+ * @param msg the message
+ */
+static void
+handle_request (void *cls,
+ const struct GNUNET_SETU_RequestMessage *msg)
+{
+ struct GNUNET_SETU_ListenHandle *lh = cls;
+ struct GNUNET_SETU_Request req;
+ const struct GNUNET_MessageHeader *context_msg;
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SETU_RejectMessage *rmsg;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing incoming operation request with id %u\n",
+ ntohl (msg->accept_id));
+ /* we got another valid request => reset the backoff */
+ lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ req.accept_id = ntohl (msg->accept_id);
+ req.accepted = GNUNET_NO;
+ context_msg = GNUNET_MQ_extract_nested_mh (msg);
+ /* calling #GNUNET_SETU_accept() in the listen cb will set req->accepted */
+ lh->listen_cb (lh->listen_cls,
+ &msg->peer_id,
+ context_msg,
+ &req);
+ if (GNUNET_YES == req.accepted)
+ return; /* the accept-case is handled in #GNUNET_SETU_accept() */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Rejected request %u\n",
+ ntohl (msg->accept_id));
+ mqm = GNUNET_MQ_msg (rmsg,
+ GNUNET_MESSAGE_TYPE_SETU_REJECT);
+ rmsg->accept_reject_id = msg->accept_id;
+ GNUNET_MQ_send (lh->mq,
+ mqm);
+}
+
+
+/**
+ * Our connection with the set service encountered an error,
+ * re-initialize with exponential back-off.
+ *
+ * @param cls the `struct GNUNET_SETU_ListenHandle *`
+ * @param error reason for the disconnect
+ */
+static void
+handle_client_listener_error (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_SETU_ListenHandle *lh = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Listener broke down (%d), re-connecting\n",
+ (int) error);
+ GNUNET_MQ_destroy (lh->mq);
+ lh->mq = NULL;
+ lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
+ &listen_connect,
+ lh);
+ lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
+}
+
+
+/**
+ * Connect to the set service in order to listen for requests.
+ *
+ * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
+ */
+static void
+listen_connect (void *cls)
+{
+ struct GNUNET_SETU_ListenHandle *lh = cls;
+ struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ GNUNET_MQ_hd_var_size (request,
+ GNUNET_MESSAGE_TYPE_SETU_REQUEST,
+ struct GNUNET_SETU_RequestMessage,
+ lh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SETU_ListenMessage *msg;
+
+ lh->reconnect_task = NULL;
+ GNUNET_assert (NULL == lh->mq);
+ lh->mq = GNUNET_CLIENT_connect (lh->cfg,
+ "setu",
+ mq_handlers,
+ &handle_client_listener_error,
+ lh);
+ if (NULL == lh->mq)
+ return;
+ mqm = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_SETU_LISTEN);
+ msg->app_id = lh->app_id;
+ GNUNET_MQ_send (lh->mq,
+ mqm);
+}
+
+
+/**
+ * Wait for set operation requests for the given application id
+ *
+ * @param cfg configuration to use for connecting to
+ * the set service, needs to be valid for the lifetime of the listen handle
+ * @param app_id id of the application that handles set operation requests
+ * @param listen_cb called for each incoming request matching the operation
+ * and application id
+ * @param listen_cls handle for @a listen_cb
+ * @return a handle that can be used to cancel the listen operation
+ */
+struct GNUNET_SETU_ListenHandle *
+GNUNET_SETU_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const struct GNUNET_HashCode *app_id,
+ GNUNET_SETU_ListenCallback listen_cb,
+ void *listen_cls)
+{
+ struct GNUNET_SETU_ListenHandle *lh;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting listener for app %s\n",
+ GNUNET_h2s (app_id));
+ lh = GNUNET_new (struct GNUNET_SETU_ListenHandle);
+ lh->listen_cb = listen_cb;
+ lh->listen_cls = listen_cls;
+ lh->cfg = cfg;
+ lh->app_id = *app_id;
+ lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ listen_connect (lh);
+ if (NULL == lh->mq)
+ {
+ GNUNET_free (lh);
+ return NULL;
+ }
+ return lh;
+}
+
+
+/**
+ * Cancel the given listen operation.
+ *
+ * @param lh handle for the listen operation
+ */
+void
+GNUNET_SETU_listen_cancel (struct GNUNET_SETU_ListenHandle *lh)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Canceling listener %s\n",
+ GNUNET_h2s (&lh->app_id));
+ if (NULL != lh->mq)
+ {
+ GNUNET_MQ_destroy (lh->mq);
+ lh->mq = NULL;
+ }
+ if (NULL != lh->reconnect_task)
+ {
+ GNUNET_SCHEDULER_cancel (lh->reconnect_task);
+ lh->reconnect_task = NULL;
+ }
+ GNUNET_free (lh);
+}
+
+
+/**
+ * Accept a request we got via #GNUNET_SETU_listen. Must be called during
+ * #GNUNET_SETU_listen, as the 'struct GNUNET_SETU_Request' becomes invalid
+ * afterwards.
+ * Call #GNUNET_SETU_commit to provide the local set to use for the operation,
+ * and to begin the exchange with the remote peer.
+ *
+ * @param request request to accept
+ * @param result_mode specified how results will be returned,
+ * see `enum GNUNET_SETU_ResultMode`.
+ * @param result_cb callback for the results
+ * @param result_cls closure for @a result_cb
+ * @return a handle to cancel the operation
+ */
+struct GNUNET_SETU_OperationHandle *
+GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
+ const struct GNUNET_SETU_Option options[],
+ GNUNET_SETU_ResultIterator result_cb,
+ void *result_cls)
+{
+ struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_SETU_OperationHandle *oh;
+ struct GNUNET_SETU_AcceptMessage *msg;
+
+ GNUNET_assert (GNUNET_NO == request->accepted);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client accepts set union operation with id %u\n",
+ request->accept_id);
+ request->accepted = GNUNET_YES;
+ mqm = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
+ msg->accept_reject_id = htonl (request->accept_id);
+ oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
+ oh->result_cb = result_cb;
+ oh->result_cls = result_cls;
+ oh->conclude_mqm = mqm;
+ oh->request_id_addr = &msg->request_id;
+ return oh;
+}
+
+
+/**
+ * Commit a set to be used with a set operation.
+ * This function is called once we have fully constructed
+ * the set that we want to use for the operation. At this
+ * time, the P2P protocol can then begin to exchange the
+ * set information and call the result callback with the
+ * result information.
+ *
+ * @param oh handle to the set operation
+ * @param set the set to use for the operation
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
+ * set is invalid (e.g. the set service crashed)
+ */
+int
+GNUNET_SETU_commit (struct GNUNET_SETU_OperationHandle *oh,
+ struct GNUNET_SETU_Handle *set)
+{
+ if (NULL != oh->set)
+ {
+ /* Some other set was already committed for this
+ * operation, there is a logic bug in the client of this API */
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ GNUNET_assert (NULL != set);
+ if (GNUNET_YES == set->invalid)
+ return GNUNET_SYSERR;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client commits to SET\n");
+ GNUNET_assert (NULL != oh->conclude_mqm);
+ oh->set = set;
+ GNUNET_CONTAINER_DLL_insert (set->ops_head,
+ set->ops_tail,
+ oh);
+ oh->request_id = GNUNET_MQ_assoc_add (set->mq,
+ oh);
+ *oh->request_id_addr = htonl (oh->request_id);
+ GNUNET_MQ_send (set->mq,
+ oh->conclude_mqm);
+ oh->conclude_mqm = NULL;
+ oh->request_id_addr = NULL;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param[out] ret_hash a pointer to where the hash of @a element
+ * should be stored
+ */
+void
+GNUNET_SETU_element_hash (const struct GNUNET_SETU_Element *element,
+ struct GNUNET_HashCode *ret_hash)
+{
+ struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
+
+ /* It's not guaranteed that the element data is always after the element header,
+ so we need to hash the chunks separately. */
+ GNUNET_CRYPTO_hash_context_read (ctx,
+ &element->size,
+ sizeof(uint16_t));
+ GNUNET_CRYPTO_hash_context_read (ctx,
+ &element->element_type,
+ sizeof(uint16_t));
+ GNUNET_CRYPTO_hash_context_read (ctx,
+ element->data,
+ element->size);
+ GNUNET_CRYPTO_hash_context_finish (ctx,
+ ret_hash);
+}
+
+
+/* end of setu_api.c */
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c
new file mode 100644
index 000000000..95119873c
--- /dev/null
+++ b/src/setu/test_setu_api.c
@@ -0,0 +1,360 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2012 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+
+/**
+ * @file set/test_setu_api.c
+ * @brief testcase for setu_api.c
+ * @author Florian Dold
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_setu_service.h"
+
+
+static struct GNUNET_PeerIdentity local_id;
+
+static struct GNUNET_HashCode app_id;
+
+static struct GNUNET_SETU_Handle *set1;
+
+static struct GNUNET_SETU_Handle *set2;
+
+static struct GNUNET_SETU_ListenHandle *listen_handle;
+
+static struct GNUNET_SETU_OperationHandle *oh1;
+
+static struct GNUNET_SETU_OperationHandle *oh2;
+
+static const struct GNUNET_CONFIGURATION_Handle *config;
+
+static int ret;
+
+static struct GNUNET_SCHEDULER_Task *tt;
+
+
+static void
+result_cb_set1 (void *cls,
+ const struct GNUNET_SETU_Element *element,
+ uint64_t size,
+ enum GNUNET_SETU_Status status)
+{
+ switch (status)
+ {
+ case GNUNET_SETU_STATUS_ADD_LOCAL:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
+ break;
+
+ case GNUNET_SETU_STATUS_FAILURE:
+ GNUNET_break (0);
+ oh1 = NULL;
+ fprintf (stderr, "set 1: received failure status!\n");
+ ret = 1;
+ if (NULL != tt)
+ {
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ }
+ GNUNET_SCHEDULER_shutdown ();
+ break;
+
+ case GNUNET_SETU_STATUS_DONE:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
+ oh1 = NULL;
+ if (NULL != set1)
+ {
+ GNUNET_SETU_destroy (set1);
+ set1 = NULL;
+ }
+ if (NULL == set2)
+ {
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ GNUNET_SCHEDULER_shutdown ();
+ }
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+static void
+result_cb_set2 (void *cls,
+ const struct GNUNET_SETU_Element *element,
+ uint64_t size,
+ enum GNUNET_SETU_Status status)
+{
+ switch (status)
+ {
+ case GNUNET_SETU_STATUS_ADD_LOCAL:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
+ break;
+
+ case GNUNET_SETU_STATUS_FAILURE:
+ GNUNET_break (0);
+ oh2 = NULL;
+ fprintf (stderr, "set 2: received failure status\n");
+ GNUNET_SCHEDULER_shutdown ();
+ ret = 1;
+ break;
+
+ case GNUNET_SETU_STATUS_DONE:
+ oh2 = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
+ GNUNET_SETU_destroy (set2);
+ set2 = NULL;
+ if (NULL == set1)
+ {
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ GNUNET_SCHEDULER_shutdown ();
+ }
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+static void
+listen_cb (void *cls,
+ const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_MessageHeader *context_msg,
+ struct GNUNET_SETU_Request *request)
+{
+ GNUNET_assert (NULL != context_msg);
+ GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
+ oh2 = GNUNET_SETU_accept (request,
+ (struct GNUNET_SETU_Option[]){ 0 },
+ &result_cb_set2,
+ NULL);
+ GNUNET_SETU_commit (oh2, set2);
+}
+
+
+/**
+ * Start the set operation.
+ *
+ * @param cls closure, unused
+ */
+static void
+start (void *cls)
+{
+ struct GNUNET_MessageHeader context_msg;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
+ context_msg.size = htons (sizeof context_msg);
+ context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
+ listen_handle = GNUNET_SETU_listen (config,
+ &app_id,
+ &listen_cb,
+ NULL);
+ oh1 = GNUNET_SETU_prepare (&local_id,
+ &app_id,
+ &context_msg,
+ (struct GNUNET_SETU_Option[]){ 0 },
+ &result_cb_set1,
+ NULL);
+ GNUNET_SETU_commit (oh1, set1);
+}
+
+
+/**
+ * Initialize the second set, continue
+ *
+ * @param cls closure, unused
+ */
+static void
+init_set2 (void *cls)
+{
+ struct GNUNET_SETU_Element element;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
+
+ element.element_type = 0;
+ element.data = "hello";
+ element.size = strlen (element.data);
+ GNUNET_SETU_add_element (set2, &element, NULL, NULL);
+ element.data = "quux";
+ element.size = strlen (element.data);
+ GNUNET_SETU_add_element (set2, &element, NULL, NULL);
+ element.data = "baz";
+ element.size = strlen (element.data);
+ GNUNET_SETU_add_element (set2, &element, &start, NULL);
+}
+
+
+/**
+ * Initialize the first set, continue.
+ */
+static void
+init_set1 (void)
+{
+ struct GNUNET_SETU_Element element;
+
+ element.element_type = 0;
+ element.data = "hello";
+ element.size = strlen (element.data);
+ GNUNET_SETU_add_element (set1, &element, NULL, NULL);
+ element.data = "bar";
+ element.size = strlen (element.data);
+ GNUNET_SETU_add_element (set1, &element, &init_set2, NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
+}
+
+
+/**
+ * Function run on timeout.
+ *
+ * @param cls closure
+ */
+static void
+timeout_fail (void *cls)
+{
+ tt = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
+ GNUNET_SCHEDULER_shutdown ();
+ ret = 1;
+}
+
+
+/**
+ * Function run on shutdown.
+ *
+ * @param cls closure
+ */
+static void
+do_shutdown (void *cls)
+{
+ if (NULL != tt)
+ {
+ GNUNET_SCHEDULER_cancel (tt);
+ tt = NULL;
+ }
+ if (NULL != oh1)
+ {
+ GNUNET_SETU_operation_cancel (oh1);
+ oh1 = NULL;
+ }
+ if (NULL != oh2)
+ {
+ GNUNET_SETU_operation_cancel (oh2);
+ oh2 = NULL;
+ }
+ if (NULL != set1)
+ {
+ GNUNET_SETU_destroy (set1);
+ set1 = NULL;
+ }
+ if (NULL != set2)
+ {
+ GNUNET_SETU_destroy (set2);
+ set2 = NULL;
+ }
+ if (NULL != listen_handle)
+ {
+ GNUNET_SETU_listen_cancel (listen_handle);
+ listen_handle = NULL;
+ }
+}
+
+
+/**
+ * Signature of the 'main' function for a (single-peer) testcase that
+ * is run using 'GNUNET_TESTING_peer_run'.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+{
+ struct GNUNET_SETU_OperationHandle *my_oh;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Running preparatory tests\n");
+ tt = GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
+ &timeout_fail,
+ NULL);
+ GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
+
+ config = cfg;
+ GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
+ &local_id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "my id (from CRYPTO): %s\n",
+ GNUNET_i2s (&local_id));
+ GNUNET_TESTING_peer_get_identity (peer,
+ &local_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "my id (from TESTING): %s\n",
+ GNUNET_i2s (&local_id));
+ set1 = GNUNET_SETU_create (cfg);
+ set2 = GNUNET_SETU_create (cfg);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Created sets %p and %p for union operation\n",
+ set1,
+ set2);
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
+
+ /* test if canceling an uncommited request works! */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Launching and instantly stopping set operation\n");
+ my_oh = GNUNET_SETU_prepare (&local_id,
+ &app_id,
+ NULL,
+ (struct GNUNET_SETU_Option[]){ 0 },
+ NULL,
+ NULL);
+ GNUNET_SETU_operation_cancel (my_oh);
+
+ /* test the real set reconciliation */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Running real set-reconciliation\n");
+ init_set1 ();
+}
+
+
+int
+main (int argc, char **argv)
+{
+ GNUNET_log_setup ("test_setu_api",
+ "WARNING",
+ NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Launching peer\n");
+ if (0 !=
+ GNUNET_TESTING_peer_run ("test_setu_api",
+ "test_setu.conf",
+ &run,
+ NULL))
+ {
+ return 1;
+ }
+ return ret;
+}