From 4d607f2f2838431cc7a349441f8f018ab99633a2 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 18 Aug 2020 18:09:58 +0200 Subject: splitting of set intersection functionality from set service (not yet finished, FTBFS) --- src/seti/.gitignore | 3 + src/seti/Makefile.am | 90 + src/seti/gnunet-service-seti.c | 3274 +++++++++++++++++++++++++++++++ src/seti/gnunet-service-seti_protocol.h | 144 ++ src/seti/gnunet-seti-profiler.c | 480 +++++ src/seti/plugin_block_seti_test.c | 123 ++ src/seti/seti.conf.in | 12 + src/seti/seti.h | 267 +++ src/seti/seti_api.c | 895 +++++++++ src/seti/test_seti.conf | 33 + src/seti/test_seti_api.c | 393 ++++ 11 files changed, 5714 insertions(+) create mode 100644 src/seti/.gitignore create mode 100644 src/seti/Makefile.am create mode 100644 src/seti/gnunet-service-seti.c create mode 100644 src/seti/gnunet-service-seti_protocol.h create mode 100644 src/seti/gnunet-seti-profiler.c create mode 100644 src/seti/plugin_block_seti_test.c create mode 100644 src/seti/seti.conf.in create mode 100644 src/seti/seti.h create mode 100644 src/seti/seti_api.c create mode 100644 src/seti/test_seti.conf create mode 100644 src/seti/test_seti_api.c (limited to 'src/seti') diff --git a/src/seti/.gitignore b/src/seti/.gitignore new file mode 100644 index 000000000..5f234a4c2 --- /dev/null +++ b/src/seti/.gitignore @@ -0,0 +1,3 @@ +gnunet-seti-profiler +gnunet-service-seti +test_seti_api diff --git a/src/seti/Makefile.am b/src/seti/Makefile.am new file mode 100644 index 000000000..d96ffff03 --- /dev/null +++ b/src/seti/Makefile.am @@ -0,0 +1,90 @@ +# This Makefile.am is in the public domain +AM_CPPFLAGS = -I$(top_srcdir)/src/include + +pkgcfgdir= $(pkgdatadir)/config.d/ + +libexecdir= $(pkglibdir)/libexec/ + +plugindir = $(libdir)/gnunet + +pkgcfg_DATA = \ + seti.conf + +if USE_COVERAGE + AM_CFLAGS = -fprofile-arcs -ftest-coverage +endif + +if HAVE_TESTING +bin_PROGRAMS = \ + gnunet-seti-profiler +endif + +libexec_PROGRAMS = \ + gnunet-service-seti + +lib_LTLIBRARIES = \ + libgnunetseti.la + +gnunet_seti_profiler_SOURCES = \ + gnunet-seti-profiler.c +gnunet_seti_profiler_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + libgnunetseti.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(GN_LIBINTL) + + +gnunet_service_seti_SOURCES = \ + gnunet-service-seti.c \ + gnunet-service-set_protocol.h +gnunet_service_seti_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/cadet/libgnunetcadet.la \ + $(top_builddir)/src/block/libgnunetblock.la \ + libgnunetseti.la \ + $(GN_LIBINTL) + +libgnunetseti_la_SOURCES = \ + seti_api.c seti.h +libgnunetseti_la_LIBADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(LTLIBINTL) +libgnunetseti_la_LDFLAGS = \ + $(GN_LIB_LDFLAGS) + +if HAVE_TESTING +check_PROGRAMS = \ + test_seti_api +endif + +if ENABLE_TEST_RUN +AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME; +TESTS = $(check_PROGRAMS) +endif + +test_seti_api_SOURCES = \ + test_seti_api.c +test_seti_api_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + libgnunetset.la + +plugin_LTLIBRARIES = \ + libgnunet_plugin_block_seti_test.la + +libgnunet_plugin_block_seti_test_la_SOURCES = \ + plugin_block_seti_test.c +libgnunet_plugin_block_seti_test_la_LIBADD = \ + $(top_builddir)/src/block/libgnunetblock.la \ + $(top_builddir)/src/block/libgnunetblockgroup.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(LTLIBINTL) +libgnunet_plugin_block_seti_test_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) + + +EXTRA_DIST = \ + test_seti.conf diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c new file mode 100644 index 000000000..3b8da01cd --- /dev/null +++ b/src/seti/gnunet-service-seti.c @@ -0,0 +1,3274 @@ +/* + This file is part of GNUnet + Copyright (C) 2013-2017, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/gnunet-service-seti.c + * @brief two-peer set intersection operations + * @author Florian Dold + * @author Christian Grothoff + */ +#include "gnunet-service-seti_protocol.h" +#include "gnunet_statistics_service.h" + +/** + * How long do we hold on to an incoming channel if there is + * no local listener before giving up? + */ +#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES + + +/** + * Current phase we are in for a intersection operation. + */ +enum IntersectionOperationPhase +{ + /** + * We are just starting. + */ + PHASE_INITIAL, + + /** + * We have send the number of our elements to the other + * peer, but did not setup our element set yet. + */ + PHASE_COUNT_SENT, + + /** + * We have initialized our set and are now reducing it by exchanging + * Bloom filters until one party notices the their element hashes + * are equal. + */ + PHASE_BF_EXCHANGE, + + /** + * We must next send the P2P DONE message (after finishing mostly + * with the local client). Then we will wait for the channel to close. + */ + PHASE_MUST_SEND_DONE, + + /** + * We have received the P2P DONE message, and must finish with the + * local client before terminating the channel. + */ + PHASE_DONE_RECEIVED, + + /** + * The protocol is over. Results may still have to be sent to the + * client. + */ + PHASE_FINISHED +}; + + +/** + * Implementation-specific set state. Used as opaque pointer, and + * specified further in the respective implementation. + */ +struct SetState; + +/** + * Implementation-specific set operation. Used as opaque pointer, and + * specified further in the respective implementation. + */ +struct OperationState; + +/** + * A set that supports a specific operation with other peers. + */ +struct Set; + +/** + * Information about an element element in the set. All elements are + * stored in a hash-table from their hash-code to their 'struct + * Element', so that the remove and add operations are reasonably + * fast. + */ +struct ElementEntry; + +/** + * Operation context used to execute a set operation. + */ +struct Operation; + + +/** + * MutationEvent gives information about changes + * to an element (removal / addition) in a set content. + */ +struct MutationEvent +{ + /** + * First generation affected by this mutation event. + * + * If @a generation is 0, this mutation event is a list + * sentinel element. + */ + unsigned int generation; + + /** + * If @a added is #GNUNET_YES, then this is a + * `remove` event, otherwise it is an `add` event. + */ + int added; +}; + + +/** + * Information about an element element in the set. All elements are + * stored in a hash-table from their hash-code to their `struct + * Element`, so that the remove and add operations are reasonably + * fast. + */ +struct ElementEntry +{ + /** + * The actual element. The data for the element + * should be allocated at the end of this struct. + */ + struct GNUNET_SET_Element element; + + /** + * Hash of the element. For set union: Will be used to derive the + * different IBF keys for different salts. + */ + struct GNUNET_HashCode element_hash; + + /** + * If @a mutations is not NULL, it contains + * a list of mutations, ordered by increasing generation. + * The list is terminated by a sentinel event with `generation` + * set to 0. + * + * If @a mutations is NULL, then this element exists in all generations + * of the respective set content this element belongs to. + */ + struct MutationEvent *mutations; + + /** + * Number of elements in the array @a mutations. + */ + unsigned int mutations_size; + + /** + * #GNUNET_YES if the element is a remote element, and does not belong + * to the operation's set. + */ + int remote; +}; + + +/** + * A listener is inhabited by a client, and waits for evaluation + * requests from remote peers. + */ +struct Listener; + + +/** + * State we keep per client. + */ +struct ClientState +{ + /** + * Set, if associated with the client, otherwise NULL. + */ + struct Set *set; + + /** + * Listener, if associated with the client, otherwise NULL. + */ + struct Listener *listener; + + /** + * Client handle. + */ + struct GNUNET_SERVICE_Client *client; + + /** + * Message queue. + */ + struct GNUNET_MQ_Handle *mq; +}; + + +/** + * Operation context used to execute a set operation. + */ +struct Operation +{ + /** + * Kept in a DLL of the listener, if @e listener is non-NULL. + */ + struct Operation *next; + + /** + * Kept in a DLL of the listener, if @e listener is non-NULL. + */ + struct Operation *prev; + + /** + * Channel to the peer. + */ + struct GNUNET_CADET_Channel *channel; + + /** + * Port this operation runs on. + */ + struct Listener *listener; + + /** + * Message queue for the channel. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Context message, may be NULL. + */ + struct GNUNET_MessageHeader *context_msg; + + /** + * Set associated with the operation, NULL until the spec has been + * associated with a set. + */ + struct Set *set; + + /** + * Operation-specific operation state. Note that the exact + * type depends on this being a union or intersection operation + * (and thus on @e vt). + */ + struct OperationState *state; // FIXME: inline + + /** + * The identity of the requesting peer. Needs to + * be stored here as the op spec might not have been created yet. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Timeout task, if the incoming peer has not been accepted + * after the timeout, it will be disconnected. + */ + struct GNUNET_SCHEDULER_Task *timeout_task; + + /** + * Salt to use for the operation. + */ + uint32_t salt; + + /** + * Remote peers element count + */ + uint32_t remote_element_count; + + /** + * ID used to identify an operation between service and client + */ + uint32_t client_request_id; + + /** + * When are elements sent to the client, and which elements are sent? + */ + enum GNUNET_SET_ResultMode result_mode; + + /** + * Always use delta operation instead of sending full sets, + * even it it's less efficient. + */ + int force_delta; + + /** + * Always send full sets, even if delta operations would + * be more efficient. + */ + int force_full; + + /** + * #GNUNET_YES to fail operations where Byzantine faults + * are suspected + */ + int byzantine; + + /** + * Lower bound for the set size, used only when + * byzantine mode is enabled. + */ + int byzantine_lower_bound; + + /** + * Unique request id for the request from a remote peer, sent to the + * client, which will accept or reject the request. Set to '0' iff + * the request has not been suggested yet. + */ + uint32_t suggest_id; + + /** + * Generation in which the operation handle + * was created. + */ + unsigned int generation_created; +}; + + +/** + * SetContent stores the actual set elements, which may be shared by + * multiple generations derived from one set. + */ +struct SetContent +{ + /** + * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`. + */ + struct GNUNET_CONTAINER_MultiHashMap *elements; + + /** + * Mutations requested by the client that we're + * unable to execute right now because we're iterating + * over the underlying hash map of elements. + */ + struct PendingMutation *pending_mutations_head; + + /** + * Mutations requested by the client that we're + * unable to execute right now because we're iterating + * over the underlying hash map of elements. + */ + struct PendingMutation *pending_mutations_tail; + + /** + * Number of references to the content. + */ + unsigned int refcount; + + /** + * FIXME: document! + */ + unsigned int latest_generation; + + /** + * Number of concurrently active iterators. + */ + int iterator_count; +}; + + +struct GenerationRange +{ + /** + * First generation that is excluded. + */ + unsigned int start; + + /** + * Generation after the last excluded generation. + */ + unsigned int end; +}; + + +/** + * Information about a mutation to apply to a set. + */ +struct PendingMutation +{ + /** + * Mutations are kept in a DLL. + */ + struct PendingMutation *prev; + + /** + * Mutations are kept in a DLL. + */ + struct PendingMutation *next; + + /** + * Set this mutation is about. + */ + struct Set *set; + + /** + * Message that describes the desired mutation. + * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or + * #GNUNET_MESSAGE_TYPE_SET_REMOVE. + */ + struct GNUNET_SET_ElementMessage *msg; +}; + + +/** + * A set that supports a specific operation with other peers. + */ +struct Set +{ + /** + * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`). + */ + struct Set *next; + + /** + * Sets are held in a doubly linked list. + */ + struct Set *prev; + + /** + * Client that owns the set. Only one client may own a set, + * and there can only be one set per client. + */ + struct ClientState *cs; + + /** + * Content, possibly shared by multiple sets, + * and thus reference counted. + */ + struct SetContent *content; + + /** + * Implementation-specific state. + */ + struct SetState *state; + + /** + * Current state of iterating elements for the client. + * NULL if we are not currently iterating. + */ + struct GNUNET_CONTAINER_MultiHashMapIterator *iter; + + /** + * Evaluate operations are held in a linked list. + */ + struct Operation *ops_head; + + /** + * Evaluate operations are held in a linked list. + */ + struct Operation *ops_tail; + + /** + * List of generations we have to exclude, due to lazy copies. + */ + struct GenerationRange *excluded_generations; + + /** + * Current generation, that is, number of previously executed + * operations and lazy copies on the underlying set content. + */ + unsigned int current_generation; + + /** + * Number of elements in array @a excluded_generations. + */ + unsigned int excluded_generations_size; + + /** + * Type of operation supported for this set + */ + enum GNUNET_SET_OperationType operation; + + /** + * Generation we're currently iteration over. + */ + unsigned int iter_generation; + + /** + * Each @e iter is assigned a unique number, so that the client + * can distinguish iterations. + */ + uint16_t iteration_id; +}; + + +/** + * State of an evaluate operation with another peer. + */ +struct OperationState +{ + /** + * The bf we currently receive + */ + struct GNUNET_CONTAINER_BloomFilter *remote_bf; + + /** + * BF of the set's element. + */ + struct GNUNET_CONTAINER_BloomFilter *local_bf; + + /** + * Remaining elements in the intersection operation. + * Maps element-id-hashes to 'elements in our set'. + */ + struct GNUNET_CONTAINER_MultiHashMap *my_elements; + + /** + * Iterator for sending the final set of @e my_elements to the client. + */ + struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter; + + /** + * Evaluate operations are held in a linked list. + */ + struct OperationState *next; + + /** + * Evaluate operations are held in a linked list. + */ + struct OperationState *prev; + + /** + * For multipart BF transmissions, we have to store the + * bloomfilter-data until we fully received it. + */ + char *bf_data; + + /** + * XOR of the keys of all of the elements (remaining) in my set. + * Always updated when elements are added or removed to + * @e my_elements. + */ + struct GNUNET_HashCode my_xor; + + /** + * XOR of the keys of all of the elements (remaining) in + * the other peer's set. Updated when we receive the + * other peer's Bloom filter. + */ + struct GNUNET_HashCode other_xor; + + /** + * How many bytes of @e bf_data are valid? + */ + uint32_t bf_data_offset; + + /** + * Current element count contained within @e my_elements. + * (May differ briefly during initialization.) + */ + uint32_t my_element_count; + + /** + * size of the bloomfilter in @e bf_data. + */ + uint32_t bf_data_size; + + /** + * size of the bloomfilter + */ + uint32_t bf_bits_per_element; + + /** + * Salt currently used for BF construction (by us or the other peer, + * depending on where we are in the code). + */ + uint32_t salt; + + /** + * Current state of the operation. + */ + enum IntersectionOperationPhase phase; + + /** + * Generation in which the operation handle + * was created. + */ + unsigned int generation_created; + + /** + * Did we send the client that we are done? + */ + int client_done_sent; + + /** + * Set whenever we reach the state where the death of the + * channel is perfectly find and should NOT result in the + * operation being cancelled. + */ + int channel_death_expected; +}; + + +/** + * Extra state required for efficient set intersection. + * Merely tracks the total number of elements. + */ +struct SetState +{ + /** + * Number of currently valid elements in the set which have not been + * removed. + */ + uint32_t current_set_element_count; +}; + + +/** + * A listener is inhabited by a client, and waits for evaluation + * requests from remote peers. + */ +struct Listener +{ + /** + * Listeners are held in a doubly linked list. + */ + struct Listener *next; + + /** + * Listeners are held in a doubly linked list. + */ + struct Listener *prev; + + /** + * Head of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. + */ + struct Operation *op_head; + + /** + * Tail of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. + */ + struct Operation *op_tail; + + /** + * Client that owns the listener. + * Only one client may own a listener. + */ + struct ClientState *cs; + + /** + * The port we are listening on with CADET. + */ + struct GNUNET_CADET_Port *open_port; + + /** + * Application ID for the operation, used to distinguish + * multiple operations of the same type with the same peer. + */ + struct GNUNET_HashCode app_id; + + /** + * The type of the operation. + */ + enum GNUNET_SET_OperationType operation; +}; + + +/** + * Handle to the cadet service, used to listen for and connect to + * remote peers. + */ +static struct GNUNET_CADET_Handle *cadet; + +/** + * Statistics handle. + */ +static struct GNUNET_STATISTICS_Handle *_GSS_statistics; + +/** + * Listeners are held in a doubly linked list. + */ +static struct Listener *listener_head; + +/** + * Listeners are held in a doubly linked list. + */ +static struct Listener *listener_tail; + +/** + * Number of active clients. + */ +static unsigned int num_clients; + +/** + * Are we in shutdown? if #GNUNET_YES and the number of clients + * drops to zero, disconnect from CADET. + */ +static int in_shutdown; + +/** + * Counter for allocating unique IDs for clients, used to identify + * incoming operation requests from remote peers, that the client can + * choose to accept or refuse. 0 must not be used (reserved for + * uninitialized). + */ +static uint32_t suggest_id; + + +/** + * If applicable in the current operation mode, send a result message + * to the client indicating we removed an element. + * + * @param op intersection operation + * @param element element to send + */ +static void +send_client_removed_element (struct Operation *op, + struct GNUNET_SET_Element *element) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + + if (GNUNET_SET_RESULT_REMOVED != op->result_mode) + return; /* Wrong mode for transmitting removed elements */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending removed element (size %u) to client\n", + element->size); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Element removed messages sent", + 1, + GNUNET_NO); + GNUNET_assert (0 != op->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == ev) + { + GNUNET_break (0); + return; + } + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (op->client_request_id); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_send (op->set->cs->mq, + ev); +} + + +/** + * Fills the "my_elements" hashmap with all relevant elements. + * + * @param cls the `struct Operation *` we are performing + * @param key current key code + * @param value the `struct ElementEntry *` from the hash map + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +filtered_map_initialization (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; + + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "FIMA called for %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u (wrong generation)\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + return GNUNET_YES; /* element not valid in our operation's generation */ + } + + /* Test if element is in other peer's bloomfilter */ + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + &mutated_hash)) + { + /* remove this element */ + send_client_removed_element (op, + &ee->element); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + return GNUNET_YES; + } + op->state->my_element_count++; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Filtered initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + + return GNUNET_YES; +} + + +/** + * Removes elements from our hashmap if they are not contained within the + * provided remote bloomfilter. + * + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +iterator_bf_reduce (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; + + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + &mutated_hash)) + { + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, keeping %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + } + return GNUNET_YES; +} + + +/** + * Create initial bloomfilter based on all the elements given. + * + * @param cls the `struct Operation *` + * @param key current key code + * @param value the `struct ElementEntry` to process + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +iterator_bf_create (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; + + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initializing BF with hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, + &mutated_hash); + return GNUNET_YES; +} + + +/** + * Inform the client that the intersection operation has failed, + * and proceed to destroy the evaluate operation. + * + * @param op the intersection operation to fail + */ +static void +fail_intersection_operation (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *msg; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Intersection operation failed\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Intersection operations failed", + 1, + GNUNET_NO); + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); + op->state->my_elements = NULL; + } + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_RESULT); + msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); + msg->request_id = htonl (op->client_request_id); + msg->element_type = htons (0); + GNUNET_MQ_send (op->set->cs->mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); +} + + +/** + * Send a bloomfilter to our peer. After the result done message has + * been sent to the client, destroy the evaluate operation. + * + * @param op intersection operation + */ +static void +send_bloomfilter (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct BFMessage *msg; + uint32_t bf_size; + uint32_t bf_elementbits; + uint32_t chunk_size; + char *bf_data; + uint32_t offset; + + /* We consider the ratio of the set sizes to determine + the number of bits per element, as the smaller set + should use more bits to maximize its set reduction + potential and minimize overall bandwidth consumption. */ + bf_elementbits = 2 + ceil (log2 ((double) + (op->remote_element_count + / (double) op->state->my_element_count))); + if (bf_elementbits < 1) + bf_elementbits = 1; /* make sure k is not 0 */ + /* optimize BF-size to ~50% of bits set */ + bf_size = ceil ((double) (op->state->my_element_count + * bf_elementbits / log (2))); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending Bloom filter (%u) of size %u bytes\n", + (unsigned int) bf_elementbits, + (unsigned int) bf_size); + op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + bf_size, + bf_elementbits); + op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &iterator_bf_create, + op); + + /* send our Bloom filter */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# Intersection Bloom filters sent", + 1, + GNUNET_NO); + chunk_size = 60 * 1024 - sizeof(struct BFMessage); + if (bf_size <= chunk_size) + { + /* singlepart */ + chunk_size = bf_size; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_bloomfilter_get_raw_data ( + op->state->local_bf, + (char *) &msg[1], + bf_size)); + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); + } + else + { + /* multipart */ + bf_data = GNUNET_malloc (bf_size); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_bloomfilter_get_raw_data ( + op->state->local_bf, + bf_data, + bf_size)); + offset = 0; + while (offset < bf_size) + { + if (bf_size - chunk_size < offset) + chunk_size = bf_size - offset; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_memcpy (&msg[1], + &bf_data[offset], + chunk_size); + offset += chunk_size; + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); + } + GNUNET_free (bf_data); + } + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; +} + + +/** + * Signal to the client that the operation has finished and + * destroy the operation. + * + * @param cls operation to destroy + */ +static void +send_client_done_and_destroy (void *cls) +{ + struct Operation *op = cls; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE to local client\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Intersection operations succeeded", + 1, + GNUNET_NO); + ev = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (op->client_request_id); + rm->result_status = htons (GNUNET_SET_STATUS_DONE); + rm->element_type = htons (0); + GNUNET_MQ_send (op->set->cs->mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); +} + + +/** + * Remember that we are done dealing with the local client + * AND have sent the other peer our message that we are done, + * so we are not just waiting for the channel to die before + * telling the local client that we are done as our last act. + * + * @param cls the `struct Operation`. + */ +static void +finished_local_operations (void *cls) +{ + struct Operation *op = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "DONE sent to other peer, now waiting for other end to close the channel\n"); + op->state->phase = PHASE_FINISHED; + op->state->channel_death_expected = GNUNET_YES; +} + + +/** + * Notify the other peer that we are done. Once this message + * is out, we still need to notify the local client that we + * are done. + * + * @param op operation to notify for. + */ +static void +send_p2p_done (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct IntersectionDoneMessage *idm; + + GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); + GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); + ev = GNUNET_MQ_msg (idm, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); + idm->final_element_count = htonl (op->state->my_element_count); + idm->element_xor_hash = op->state->my_xor; + GNUNET_MQ_notify_sent (ev, + &finished_local_operations, + op); + GNUNET_MQ_send (op->mq, + ev); +} + + +/** + * Send all elements in the full result iterator. + * + * @param cls the `struct Operation *` + */ +static void +send_remaining_elements (void *cls) +{ + struct Operation *op = cls; + const void *nxt; + const struct ElementEntry *ee; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + const struct GNUNET_SET_Element *element; + int res; + + res = GNUNET_CONTAINER_multihashmap_iterator_next ( + op->state->full_result_iter, + NULL, + &nxt); + if (GNUNET_NO == res) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending done and destroy because iterator ran out\n"); + GNUNET_CONTAINER_multihashmap_iterator_destroy ( + op->state->full_result_iter); + op->state->full_result_iter = NULL; + if (PHASE_DONE_RECEIVED == op->state->phase) + { + op->state->phase = PHASE_FINISHED; + send_client_done_and_destroy (op); + } + else if (PHASE_MUST_SEND_DONE == op->state->phase) + { + send_p2p_done (op); + } + else + { + GNUNET_assert (0); + } + return; + } + ee = nxt; + element = &ee->element; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending element %s:%u to client (full set)\n", + GNUNET_h2s (&ee->element_hash), + element->size); + GNUNET_assert (0 != op->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_assert (NULL != ev); + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (op->client_request_id); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_notify_sent (ev, + &send_remaining_elements, + op); + GNUNET_MQ_send (op->set->cs->mq, + ev); +} + + +/** + * Fills the "my_elements" hashmap with the initial set of + * (non-deleted) elements from the set of the specification. + * + * @param cls closure with the `struct Operation *` + * @param key current key code for the element + * @param value value in the hash map with the `struct ElementEntry *` + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +initialize_map_unfiltered (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ElementEntry *ee = value; + struct Operation *op = cls; + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + return GNUNET_YES; /* element not live in operation's generation */ + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initial full initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return GNUNET_YES; +} + + +/** + * Send our element count to the peer, in case our element count is + * lower than theirs. + * + * @param op intersection operation + */ +static void +send_element_count (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct IntersectionElementInfoMessage *msg; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending our element count (%u)\n", + op->state->my_element_count); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); + msg->sender_element_count = htonl (op->state->my_element_count); + GNUNET_MQ_send (op->mq, ev); +} + + +/** + * We go first, initialize our map with all elements and + * send the first Bloom filter. + * + * @param op operation to start exchange for + */ +static void +begin_bf_exchange (struct Operation *op) +{ + op->state->phase = PHASE_BF_EXCHANGE; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &initialize_map_unfiltered, + op); + send_bloomfilter (op); +} + + +/** + * Handle the initial `struct IntersectionElementInfoMessage` from a + * remote peer. + * + * @param cls the intersection operation + * @param mh the header of the message + */ +void +handle_intersection_p2p_element_info (void *cls, + const struct + IntersectionElementInfoMessage *msg) +{ + struct Operation *op = cls; + + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + op->remote_element_count = ntohl (msg->sender_element_count); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received remote element count (%u), I have %u\n", + op->remote_element_count, + op->state->my_element_count); + if (((PHASE_INITIAL != op->state->phase) && + (PHASE_COUNT_SENT != op->state->phase)) || + (op->state->my_element_count > op->remote_element_count) || + (0 == op->state->my_element_count) || + (0 == op->remote_element_count)) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_break (NULL == op->state->remote_bf); + begin_bf_exchange (op); + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Process a Bloomfilter once we got all the chunks. + * + * @param op the intersection operation + */ +static void +process_bf (struct Operation *op) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", + op->state->phase, + op->remote_element_count, + op->state->my_element_count, + GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); + switch (op->state->phase) + { + case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_COUNT_SENT: + /* This is the first BF being sent, build our initial map with + filtering in place */ + op->state->my_element_count = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &filtered_map_initialization, + op); + break; + + case PHASE_BF_EXCHANGE: + /* Update our set by reduction */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &iterator_bf_reduce, + op); + break; + + case PHASE_MUST_SEND_DONE: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_DONE_RECEIVED: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_FINISHED: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); + op->state->remote_bf = NULL; + + if ((0 == op->state->my_element_count) || /* fully disjoint */ + ((op->state->my_element_count == op->remote_element_count) && + (0 == GNUNET_memcmp (&op->state->my_xor, + &op->state->other_xor)))) + { + /* we are done */ + op->state->phase = PHASE_MUST_SEND_DONE; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE to other peer\n"); + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; + if (GNUNET_SET_RESULT_FULL == op->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create ( + op->state->my_elements); + send_remaining_elements (op); + return; + } + send_p2p_done (op); + return; + } + op->state->phase = PHASE_BF_EXCHANGE; + send_bloomfilter (op); +} + + +/** + * Check an BF message from a remote peer. + * + * @param cls the intersection operation + * @param msg the header of the message + * @return #GNUNET_OK if @a msg is well-formed + */ +static int +check_intersection_p2p_bf (void *cls, + const struct BFMessage *msg) +{ + struct Operation *op = cls; + + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle an BF message from a remote peer. + * + * @param cls the intersection operation + * @param msg the header of the message + */ +static +handle_intersection_p2p_bf (void *cls, + const struct BFMessage *msg) +{ + struct Operation *op = cls; + uint32_t bf_size; + uint32_t chunk_size; + uint32_t bf_bits_per_element; + + switch (op->state->phase) + { + case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_COUNT_SENT: + case PHASE_BF_EXCHANGE: + bf_size = ntohl (msg->bloomfilter_total_length); + bf_bits_per_element = ntohl (msg->bits_per_element); + chunk_size = htons (msg->header.size) - sizeof(struct BFMessage); + op->state->other_xor = msg->element_xor_hash; + if (bf_size == chunk_size) + { + if (NULL != op->state->bf_data) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + /* single part, done here immediately */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1], + bf_size, + bf_bits_per_element); + op->state->salt = ntohl (msg->sender_mutator); + op->remote_element_count = ntohl (msg->sender_element_count); + process_bf (op); + break; + } + /* multipart chunk */ + if (NULL == op->state->bf_data) + { + /* first chunk, initialize */ + op->state->bf_data = GNUNET_malloc (bf_size); + op->state->bf_data_size = bf_size; + op->state->bf_bits_per_element = bf_bits_per_element; + op->state->bf_data_offset = 0; + op->state->salt = ntohl (msg->sender_mutator); + op->remote_element_count = ntohl (msg->sender_element_count); + } + else + { + /* increment */ + if ((op->state->bf_data_size != bf_size) || + (op->state->bf_bits_per_element != bf_bits_per_element) || + (op->state->bf_data_offset + chunk_size > bf_size) || + (op->state->salt != ntohl (msg->sender_mutator)) || + (op->remote_element_count != ntohl (msg->sender_element_count))) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + } + GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset], + (const char *) &msg[1], + chunk_size); + op->state->bf_data_offset += chunk_size; + if (op->state->bf_data_offset == bf_size) + { + /* last chunk, run! */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, + bf_size, + bf_bits_per_element); + GNUNET_free (op->state->bf_data); + op->state->bf_data = NULL; + op->state->bf_data_size = 0; + process_bf (op); + } + break; + + default: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Remove all elements from our hashmap. + * + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +filter_all (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Final reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); + return GNUNET_YES; +} + + +/** + * Handle a done message from a remote peer + * + * @param cls the intersection operation + * @param mh the message + */ +static void +handle_intersection_p2p_done (void *cls, + const struct IntersectionDoneMessage *idm) +{ + struct Operation *op = cls; + + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + if (PHASE_BF_EXCHANGE != op->state->phase) + { + /* wrong phase to conclude? FIXME: Or should we allow this + if the other peer has _initially_ already an empty set? */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + if (0 == ntohl (idm->final_element_count)) + { + /* other peer determined empty set is the intersection, + remove all elements */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &filter_all, + op); + } + if ((op->state->my_element_count != ntohl (idm->final_element_count)) || + (0 != GNUNET_memcmp (&op->state->my_xor, + &idm->element_xor_hash))) + { + /* Other peer thinks we are done, but we disagree on the result! */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got IntersectionDoneMessage, have %u elements in intersection\n", + op->state->my_element_count); + op->state->phase = PHASE_DONE_RECEIVED; + GNUNET_CADET_receive_done (op->channel); + + GNUNET_assert (GNUNET_NO == op->state->client_done_sent); + if (GNUNET_SET_RESULT_FULL == op->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set to client (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + send_remaining_elements (op); + return; + } + op->state->phase = PHASE_FINISHED; + send_client_done_and_destroy (op); +} + + +/** + * Initiate a set intersection operation with a remote peer. + * + * @param op operation that is created, should be initialized to + * begin the evaluation + * @param opaque_context message to be transmitted to the listener + * to convince it to accept, may be NULL + * @return operation-specific state to keep in @a op + */ +static struct OperationState * +intersection_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) +{ + struct OperationState *state; + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) + { + /* the context message is too large!? */ + GNUNET_break (0); + return NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initiating intersection operation evaluation\n"); + state = GNUNET_new (struct OperationState); + /* we started the operation, thus we have to send the operation request */ + state->phase = PHASE_INITIAL; + state->my_element_count = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, + GNUNET_YES); + + msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); + msg->element_count = htonl (state->my_element_count); + GNUNET_MQ_send (op->mq, + ev); + state->phase = PHASE_COUNT_SENT; + if (NULL != opaque_context) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request with context message\n"); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request without context message\n"); + return state; +} + + +/** + * Accept an intersection operation request from a remote peer. Only + * initializes the private operation state. + * + * @param op operation that will be accepted as an intersection operation + */ +static struct OperationState * +intersection_accept (struct Operation *op) +{ + struct OperationState *state; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Accepting set intersection operation\n"); + state = GNUNET_new (struct OperationState); + state->phase = PHASE_INITIAL; + state->my_element_count + = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count, + op->remote_element_count), + GNUNET_YES); + op->state = state; + if (op->remote_element_count < state->my_element_count) + { + /* If the other peer (Alice) has fewer elements than us (Bob), + we just send the count as Alice should send the first BF */ + send_element_count (op); + state->phase = PHASE_COUNT_SENT; + return state; + } + /* We have fewer elements, so we start with the BF */ + begin_bf_exchange (op); + return state; +} + + +/** + * Destroy the intersection operation. Only things specific to the + * intersection operation are destroyed. + * + * @param op intersection operation to destroy + */ +static void +intersection_op_cancel (struct Operation *op) +{ + /* check if the op was canceled twice */ + GNUNET_assert (NULL != op->state); + if (NULL != op->state->remote_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); + op->state->remote_bf = NULL; + } + if (NULL != op->state->local_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; + } + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); + op->state->my_elements = NULL; + } + if (NULL != op->state->full_result_iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy ( + op->state->full_result_iter); + op->state->full_result_iter = NULL; + } + GNUNET_free (op->state); + op->state = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying intersection op state done\n"); +} + + +/** + * Create a new set supporting the intersection operation. + * + * @return the newly created set + */ +static struct SetState * +intersection_set_create () +{ + struct SetState *set_state; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection set created\n"); + set_state = GNUNET_new (struct SetState); + set_state->current_set_element_count = 0; + + return set_state; +} + + +/** + * Add the element from the given element message to the set. + * + * @param set_state state of the set want to add to + * @param ee the element to add to the set + */ +static void +intersection_add (struct SetState *set_state, + struct ElementEntry *ee) +{ + set_state->current_set_element_count++; +} + + +/** + * Destroy a set that supports the intersection operation + * + * @param set_state the set to destroy + */ +static void +intersection_set_destroy (struct SetState *set_state) +{ + GNUNET_free (set_state); +} + + +/** + * Remove the element given in the element message from the set. + * + * @param set_state state of the set to remove from + * @param element set element to remove + */ +static void +intersection_remove (struct SetState *set_state, + struct ElementEntry *element) +{ + GNUNET_assert (0 < set_state->current_set_element_count); + set_state->current_set_element_count--; +} + + +/** + * Callback for channel death for the intersection operation. + * + * @param op operation that lost the channel + */ +static void +intersection_channel_death (struct Operation *op) +{ + if (GNUNET_YES == op->state->channel_death_expected) + { + /* oh goodie, we are done! */ + send_client_done_and_destroy (op); + } + else + { + /* sorry, channel went down early, too bad. */ + _GSS_operation_destroy (op, + GNUNET_YES); + } +} + + +/** + * Get the incoming socket associated with the given id. + * + * @param listener the listener to look in + * @param id id to look for + * @return the incoming socket associated with the id, + * or NULL if there is none + */ +static struct Operation * +get_incoming (uint32_t id) +{ + for (struct Listener *listener = listener_head; NULL != listener; + listener = listener->next) + { + for (struct Operation *op = listener->op_head; NULL != op; op = op->next) + if (op->suggest_id == id) + return op; + } + return NULL; +} + + +/** + * Destroy an incoming request from a remote peer + * + * @param op remote request to destroy + */ +static void +incoming_destroy (struct Operation *op) +{ + struct Listener *listener; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying incoming operation %p\n", + op); + if (NULL != (listener = op->listener)) + { + GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); + op->listener = NULL; + } + if (NULL != op->timeout_task) + { + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + } + _GSS_operation_destroy2 (op); +} + + +/** + * Context for the #garbage_collect_cb(). + */ +struct GarbageContext +{ + /** + * Map for which we are garbage collecting removed elements. + */ + struct GNUNET_CONTAINER_MultiHashMap *map; + + /** + * Lowest generation for which an operation is still pending. + */ + unsigned int min_op_generation; + + /** + * Largest generation for which an operation is still pending. + */ + unsigned int max_op_generation; +}; + + +/** + * Function invoked to check if an element can be removed from + * the set's history because it is no longer needed. + * + * @param cls the `struct GarbageContext *` + * @param key key of the element in the map + * @param value the `struct ElementEntry *` + * @return #GNUNET_OK (continue to iterate) + */ +static int +garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value) +{ + // struct GarbageContext *gc = cls; + // struct ElementEntry *ee = value; + + // if (GNUNET_YES != ee->removed) + // return GNUNET_OK; + // if ( (gc->max_op_generation < ee->generation_added) || + // (ee->generation_removed > gc->min_op_generation) ) + // { + // GNUNET_assert (GNUNET_YES == + // GNUNET_CONTAINER_multihashmap_remove (gc->map, + // key, + // ee)); + // GNUNET_free (ee); + // } + return GNUNET_OK; +} + + +/** + * Collect and destroy elements that are not needed anymore, because + * their lifetime (as determined by their generation) does not overlap + * with any active set operation. + * + * @param set set to garbage collect + */ +static void +collect_generation_garbage (struct Set *set) +{ + struct GarbageContext gc; + + gc.min_op_generation = UINT_MAX; + gc.max_op_generation = 0; + for (struct Operation *op = set->ops_head; NULL != op; op = op->next) + { + gc.min_op_generation = + GNUNET_MIN (gc.min_op_generation, op->generation_created); + gc.max_op_generation = + GNUNET_MAX (gc.max_op_generation, op->generation_created); + } + gc.map = set->content->elements; + GNUNET_CONTAINER_multihashmap_iterate (set->content->elements, + &garbage_collect_cb, + &gc); +} + + +/** + * Is @a generation in the range of exclusions? + * + * @param generation generation to query + * @param excluded array of generations where the element is excluded + * @param excluded_size length of the @a excluded array + * @return #GNUNET_YES if @a generation is in any of the ranges + */ +static int +is_excluded_generation (unsigned int generation, + struct GenerationRange *excluded, + unsigned int excluded_size) +{ + for (unsigned int i = 0; i < excluded_size; i++) + if ((generation >= excluded[i].start) && (generation < excluded[i].end)) + return GNUNET_YES; + return GNUNET_NO; +} + + +/** + * Is element @a ee part of the set during @a query_generation? + * + * @param ee element to test + * @param query_generation generation to query + * @param excluded array of generations where the element is excluded + * @param excluded_size length of the @a excluded array + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ +static int +is_element_of_generation (struct ElementEntry *ee, + unsigned int query_generation, + struct GenerationRange *excluded, + unsigned int excluded_size) +{ + struct MutationEvent *mut; + int is_present; + + GNUNET_assert (NULL != ee->mutations); + if (GNUNET_YES == + is_excluded_generation (query_generation, excluded, excluded_size)) + { + GNUNET_break (0); + return GNUNET_NO; + } + + is_present = GNUNET_NO; + + /* Could be made faster with binary search, but lists + are small, so why bother. */ + for (unsigned int i = 0; i < ee->mutations_size; i++) + { + mut = &ee->mutations[i]; + + if (mut->generation > query_generation) + { + /* The mutation doesn't apply to our generation + anymore. We can'b break here, since mutations aren't + sorted by generation. */ + continue; + } + + if (GNUNET_YES == + is_excluded_generation (mut->generation, excluded, excluded_size)) + { + /* The generation is excluded (because it belongs to another + fork via a lazy copy) and thus mutations aren't considered + for membership testing. */ + continue; + } + + /* This would be an inconsistency in how we manage mutations. */ + if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added)) + GNUNET_assert (0); + /* Likewise. */ + if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added)) + GNUNET_assert (0); + + is_present = mut->added; + } + + return is_present; +} + + +/** + * Is element @a ee part of the set used by @a op? + * + * @param ee element to test + * @param op operation the defines the set and its generation + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ +int +_GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) +{ + return is_element_of_generation (ee, + op->generation_created, + op->set->excluded_generations, + op->set->excluded_generations_size); +} + + +/** + * Destroy the given operation. Used for any operation where both + * peers were known and that thus actually had a vt and channel. Must + * not be used for operations where 'listener' is still set and we do + * not know the other peer. + * + * Call the implementation-specific cancel function of the operation. + * Disconnects from the remote peer. Does not disconnect the client, + * as there may be multiple operations per set. + * + * @param op operation to destroy + * @param gc #GNUNET_YES to perform garbage collection on the set + */ +void +_GSS_operation_destroy (struct Operation *op, int gc) +{ + struct Set *set = op->set; + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op); + GNUNET_assert (NULL == op->listener); + if (NULL != op->state) + { + intersection_cancel (op); // FIXME: inline + op->state = NULL; + } + if (NULL != set) + { + GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op); + op->set = NULL; + } + if (NULL != op->context_msg) + { + GNUNET_free (op->context_msg); + op->context_msg = NULL; + } + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + if ((NULL != set) && (GNUNET_YES == gc)) + collect_generation_garbage (set); + /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, + * there was a channel end handler that will free 'op' on the call stack. */ +} + + +/** + * Callback called when a client connects to the service. + * + * @param cls closure for the service + * @param c the new client that connected to the service + * @param mq the message queue used to send messages to the client + * @return @a `struct ClientState` + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + struct GNUNET_MQ_Handle *mq) +{ + struct ClientState *cs; + + num_clients++; + cs = GNUNET_new (struct ClientState); + cs->client = c; + cs->mq = mq; + return cs; +} + + +/** + * Iterator over hash map entries to free element entries. + * + * @param cls closure + * @param key current key code + * @param value a `struct ElementEntry *` to be free'd + * @return #GNUNET_YES (continue to iterate) + */ +static int +destroy_elements_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ElementEntry *ee = value; + + GNUNET_free (ee->mutations); + GNUNET_free (ee); + return GNUNET_YES; +} + + +/** + * Clean up after a client has disconnected + * + * @param cls closure, unused + * @param client the client to clean up after + * @param internal_cls the `struct ClientState` + */ +static void +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *internal_cls) +{ + struct ClientState *cs = internal_cls; + struct Operation *op; + struct Listener *listener; + struct Set *set; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n"); + if (NULL != (set = cs->set)) + { + struct SetContent *content = set->content; + struct PendingMutation *pm; + struct PendingMutation *pm_current; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); + /* Destroy pending set operations */ + while (NULL != set->ops_head) + _GSS_operation_destroy (set->ops_head, GNUNET_NO); + + /* Destroy operation-specific state */ + GNUNET_assert (NULL != set->state); + intersection_set_destroy (set->state); // FIXME: inline + set->state = NULL; + + /* Clean up ongoing iterations */ + if (NULL != set->iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); + set->iter = NULL; + set->iteration_id++; + } + + /* discard any pending mutations that reference this set */ + pm = content->pending_mutations_head; + while (NULL != pm) + { + pm_current = pm; + pm = pm->next; + if (pm_current->set == set) + { + GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, + content->pending_mutations_tail, + pm_current); + GNUNET_free (pm_current); + } + } + + /* free set content (or at least decrement RC) */ + set->content = NULL; + GNUNET_assert (0 != content->refcount); + content->refcount--; + if (0 == content->refcount) + { + GNUNET_assert (NULL != content->elements); + GNUNET_CONTAINER_multihashmap_iterate (content->elements, + &destroy_elements_iterator, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (content->elements); + content->elements = NULL; + GNUNET_free (content); + } + GNUNET_free (set->excluded_generations); + set->excluded_generations = NULL; + + GNUNET_free (set); + } + + if (NULL != (listener = cs->listener)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n"); + GNUNET_CADET_close_port (listener->open_port); + listener->open_port = NULL; + while (NULL != (op = listener->op_head)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Destroying incoming operation `%u' from peer `%s'\n", + (unsigned int) op->client_request_id, + GNUNET_i2s (&op->peer)); + incoming_destroy (op); + } + GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener); + GNUNET_free (listener); + } + GNUNET_free (cs); + num_clients--; + if ((GNUNET_YES == in_shutdown) && (0 == num_clients)) + { + if (NULL != cadet) + { + GNUNET_CADET_disconnect (cadet); + cadet = NULL; + } + } +} + + +/** + * Check a request for a set operation from another peer. + * + * @param cls the operation state + * @param msg the received message + * @return #GNUNET_OK if the channel should be kept alive, + * #GNUNET_SYSERR to destroy the channel + */ +static int +check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) +{ + struct Operation *op = cls; + struct Listener *listener = op->listener; + const struct GNUNET_MessageHeader *nested_context; + + /* double operation request */ + if (0 != op->suggest_id) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + /* This should be equivalent to the previous condition, but can't hurt to check twice */ + if (NULL == op->listener) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (listener->operation != + (enum GNUNET_SET_OperationType) ntohl (msg->operation)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + nested_context = GNUNET_MQ_extract_nested_mh (msg); + if ((NULL != nested_context) && + (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle a request for a set operation from another peer. Checks if we + * have a listener waiting for such a request (and in that case initiates + * asking the listener about accepting the connection). If no listener + * is waiting, we queue the operation request in hope that a listener + * shows up soon (before timeout). + * + * This msg is expected as the first and only msg handled through the + * non-operation bound virtual table, acceptance of this operation replaces + * our virtual table and subsequent msgs would be routed differently (as + * we then know what type of operation this is). + * + * @param cls the operation state + * @param msg the received message + * @return #GNUNET_OK if the channel should be kept alive, + * #GNUNET_SYSERR to destroy the channel + */ +static void +handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) +{ + struct Operation *op = cls; + struct Listener *listener = op->listener; + const struct GNUNET_MessageHeader *nested_context; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_SET_RequestMessage *cmsg; + + nested_context = GNUNET_MQ_extract_nested_mh (msg); + /* Make a copy of the nested_context (application-specific context + information that is opaque to set) so we can pass it to the + listener later on */ + if (NULL != nested_context) + op->context_msg = GNUNET_copy_message (nested_context); + op->remote_element_count = ntohl (msg->element_count); + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Received P2P operation request (op %u, port %s) for active listener\n", + (uint32_t) ntohl (msg->operation), + GNUNET_h2s (&op->listener->app_id)); + GNUNET_assert (0 == op->suggest_id); + if (0 == suggest_id) + suggest_id++; + op->suggest_id = suggest_id++; + GNUNET_assert (NULL != op->timeout_task); + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + env = GNUNET_MQ_msg_nested_mh (cmsg, + GNUNET_MESSAGE_TYPE_SETI_REQUEST, + op->context_msg); + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Suggesting incoming request with accept id %u to listener %p of client %p\n", + op->suggest_id, + listener, + listener->cs); + cmsg->accept_id = htonl (op->suggest_id); + cmsg->peer_id = op->peer; + GNUNET_MQ_send (listener->cs->mq, env); + /* NOTE: GNUNET_CADET_receive_done() will be called in + #handle_client_accept() */ +} + + +/** + * Add an element to @a set as specified by @a msg + * + * @param set set to manipulate + * @param msg message specifying the change + */ +static void +execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) +{ + struct GNUNET_SET_Element el; + struct ElementEntry *ee; + struct GNUNET_HashCode hash; + + GNUNET_assert (GNUNET_MESSAGE_TYPE_SETI_ADD == ntohs (msg->header.type)); + el.size = ntohs (msg->header.size) - sizeof(*msg); + el.data = &msg[1]; + el.element_type = ntohs (msg->element_type); + GNUNET_SET_element_hash (&el, &hash); + ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); + if (NULL == ee) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserts element %s of size %u\n", + GNUNET_h2s (&hash), + el.size); + ee = GNUNET_malloc (el.size + sizeof(*ee)); + ee->element.size = el.size; + GNUNET_memcpy (&ee[1], el.data, el.size); + ee->element.data = &ee[1]; + ee->element.element_type = el.element_type; + ee->remote = GNUNET_NO; + ee->mutations = NULL; + ee->mutations_size = 0; + ee->element_hash = hash; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put ( + set->content->elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + } + else if (GNUNET_YES == + is_element_of_generation (ee, + set->current_generation, + set->excluded_generations, + set->excluded_generations_size)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserted element %s of size %u twice (ignored)\n", + GNUNET_h2s (&hash), + el.size); + + /* same element inserted twice */ + return; + } + + { + struct MutationEvent mut = { .generation = set->current_generation, + .added = GNUNET_YES }; + GNUNET_array_append (ee->mutations, ee->mutations_size, mut); + } + // FIXME: inline + intersection_add (set->state, + ee); +} + + +/** + * Perform a mutation on a set as specified by the @a msg + * + * @param set the set to mutate + * @param msg specification of what to change + */ +static void +execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) +{ + switch (ntohs (msg->header.type)) + { + case GNUNET_MESSAGE_TYPE_SETI_ADD: // FIXME: inline! + execute_add (set, msg); + break; + default: + GNUNET_break (0); + } +} + + +/** + * Execute mutations that were delayed on a set because of + * pending operations. + * + * @param set the set to execute mutations on + */ +static void +execute_delayed_mutations (struct Set *set) +{ + struct PendingMutation *pm; + + if (0 != set->content->iterator_count) + return; /* still cannot do this */ + while (NULL != (pm = set->content->pending_mutations_head)) + { + GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, + set->content->pending_mutations_tail, + pm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Executing pending mutation on %p.\n", + pm->set); + execute_mutation (pm->set, pm->msg); + GNUNET_free (pm->msg); + GNUNET_free (pm); + } +} + + +/** + * Send the next element of a set to the set's client. The next element is given by + * the set's current hashmap iterator. The set's iterator will be set to NULL if there + * are no more elements in the set. The caller must ensure that the set's iterator is + * valid. + * + * The client will acknowledge each received element with a + * #GNUNET_MESSAGE_TYPE_SETI_ITER_ACK message. Our + * #handle_client_iter_ack() will then trigger the next transmission. + * Note that the #GNUNET_MESSAGE_TYPE_SETI_ITER_DONE is not acknowledged. + * + * @param set set that should send its next element to its client + */ +static void +send_client_element (struct Set *set) +{ + int ret; + struct ElementEntry *ee; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_IterResponseMessage *msg; + + GNUNET_assert (NULL != set->iter); + do + { + ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, + NULL, + (const void **) &ee); + if (GNUNET_NO == ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETI_ITER_DONE); + GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); + set->iter = NULL; + set->iteration_id++; + GNUNET_assert (set->content->iterator_count > 0); + set->content->iterator_count--; + execute_delayed_mutations (set); + GNUNET_MQ_send (set->cs->mq, ev); + return; + } + GNUNET_assert (NULL != ee); + } + while (GNUNET_NO == + is_element_of_generation (ee, + set->iter_generation, + set->excluded_generations, + set->excluded_generations_size)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending iteration element on %p.\n", + set); + ev = GNUNET_MQ_msg_extra (msg, + ee->element.size, + GNUNET_MESSAGE_TYPE_SETI_ITER_ELEMENT); + GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size); + msg->element_type = htons (ee->element.element_type); + msg->iteration_id = htons (set->iteration_id); + GNUNET_MQ_send (set->cs->mq, ev); +} + + +/** + * Called when a client wants to iterate the elements of a set. + * Checks if we have a set associated with the client and if we + * can right now start an iteration. If all checks out, starts + * sending the elements of the set to the client. + * + * @param cls client that sent the message + * @param m message sent by the client + */ +static void +handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) +{ + struct ClientState *cs = cls; + struct Set *set; + + if (NULL == (set = cs->set)) + { + /* attempt to iterate over a non existing set */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + if (NULL != set->iter) + { + /* Only one concurrent iterate-action allowed per set */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Iterating set %p in gen %u with %u content elements\n", + (void *) set, + set->current_generation, + GNUNET_CONTAINER_multihashmap_size (set->content->elements)); + GNUNET_SERVICE_client_continue (cs->client); + set->content->iterator_count++; + set->iter = + GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); + set->iter_generation = set->current_generation; + send_client_element (set); +} + + +/** + * Called when a client wants to create a new set. This is typically + * the first request from a client, and includes the type of set + * operation to be performed. + * + * @param cls client that sent the message + * @param m message sent by the client + */ +static void +handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client created new set (operation %u)\n", + (uint32_t) ntohl (msg->operation)); + if (NULL != cs->set) + { + /* There can only be one set per client */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set = GNUNET_new (struct Set); + switch (ntohl (msg->operation)) + { + case GNUNET_SET_OPERATION_INTERSECTION: + set->vt = _GSS_intersection_vt (); + break; + + case GNUNET_SET_OPERATION_UNION: + set->vt = _GSS_union_vt (); + break; + + default: + GNUNET_free (set); + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); + set->state = intersection_set_create (); // FIXME: inline + if (NULL == set->state) + { + /* initialization failed (i.e. out of memory) */ + GNUNET_free (set); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set->content = GNUNET_new (struct SetContent); + set->content->refcount = 1; + set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + set->cs = cs; + cs->set = set; + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Timeout happens iff: + * - we suggested an operation to our listener, + * but did not receive a response in time + * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST + * + * @param cls channel context + * @param tc context information (why was this task triggered now) + */ +static void +incoming_timeout_cb (void *cls) +{ + struct Operation *op = cls; + + op->timeout_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Remote peer's incoming request timed out\n"); + incoming_destroy (op); +} + + +/** + * Method called whenever another peer has added us to a channel the + * other peer initiated. Only called (once) upon reception of data + * from a channel we listen on. + * + * The channel context represents the operation itself and gets added + * to a DLL, from where it gets looked up when our local listener + * client responds to a proposed/suggested operation or connects and + * associates with this operation. + * + * @param cls closure + * @param channel new handle to the channel + * @param source peer that started the channel + * @return initial channel context for the channel + * returns NULL on error + */ +static void * +channel_new_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) +{ + struct Listener *listener = cls; + struct Operation *op; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n"); + op = GNUNET_new (struct Operation); + op->listener = listener; + op->peer = *source; + op->channel = channel; + op->mq = GNUNET_CADET_get_mq (op->channel); + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, + &incoming_timeout_cb, + op); + GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op); + return op; +} + + +/** + * Function called whenever a channel is destroyed. Should clean up + * any associated state. It must NOT call + * GNUNET_CADET_channel_destroy() on the channel. + * + * The peer_disconnect function is part of a a virtual table set initially either + * when a peer creates a new channel with us, or once we create + * a new channel ourselves (evaluate). + * + * Once we know the exact type of operation (union/intersection), the vt is + * replaced with an operation specific instance (_GSS_[op]_vt). + * + * @param channel_ctx place where local state associated + * with the channel is stored + * @param channel connection to the other end (henceforth invalid) + */ +static void +channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) +{ + struct Operation *op = channel_ctx; + + op->channel = NULL; + _GSS_operation_destroy2 (op); +} + + +/** + * This function probably should not exist + * and be replaced by inlining more specific + * logic in the various places where it is called. + */ +void +_GSS_operation_destroy2 (struct Operation *op) +{ + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n"); + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + if (NULL != op->listener) + { + incoming_destroy (op); + return; + } + if (NULL != op->set) + intersection_channel_death (op); // FIXME: inline + else + _GSS_operation_destroy (op, GNUNET_YES); + GNUNET_free (op); +} + + +/** + * Function called whenever an MQ-channel's transmission window size changes. + * + * The first callback in an outgoing channel will be with a non-zero value + * and will mean the channel is connected to the destination. + * + * For an incoming channel it will be called immediately after the + * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. + * + * @param cls Channel closure. + * @param channel Connection to the other end (henceforth invalid). + * @param window_size New window size. If the is more messages than buffer size + * this value will be negative.. + */ +static void +channel_window_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + /* FIXME: not implemented, we could do flow control here... */ +} + + +/** + * Called when a client wants to create a new listener. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) +{ + struct ClientState *cs = cls; + struct GNUNET_MQ_MessageHandler cadet_handlers[] = + { GNUNET_MQ_hd_var_size (incoming_msg, + GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST, + struct OperationRequestMessage, + NULL), + GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, + GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO, + struct IntersectionElementInfoMessage, + NULL), + GNUNET_MQ_hd_var_size (intersection_p2p_bf, + GNUNET_MESSAGE_TYPE_SETI_P2P_BF, + struct BFMessage, + NULL), + GNUNET_MQ_hd_fixed_size (intersection_p2p_done, + GNUNET_MESSAGE_TYPE_SETI_P2P_DONE, + struct IntersectionDoneMessage, + NULL), + GNUNET_MQ_handler_end () }; + struct Listener *listener; + + if (NULL != cs->listener) + { + /* max. one active listener per client! */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + listener = GNUNET_new (struct Listener); + listener->cs = cs; + cs->listener = listener; + listener->app_id = msg->app_id; + listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); + GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New listener created (op %u, port %s)\n", + listener->operation, + GNUNET_h2s (&listener->app_id)); + listener->open_port = GNUNET_CADET_open_port (cadet, + &msg->app_id, + &channel_new_cb, + listener, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called when the listening client rejects an operation + * request by another peer. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) +{ + struct ClientState *cs = cls; + struct Operation *op; + + op = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == op) + { + /* no matching incoming operation for this reject; + could be that the other peer already disconnected... */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client rejected unknown operation %u\n", + (unsigned int) ntohl (msg->accept_reject_id)); + GNUNET_SERVICE_client_continue (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer request (op %u, app %s) rejected by client\n", + op->listener->operation, + GNUNET_h2s (&cs->listener->app_id)); + _GSS_operation_destroy2 (op); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called when a client wants to add or remove an element to a set it inhabits. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static int +check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) +{ + /* NOTE: Technically, we should probably check with the + block library whether the element we are given is well-formed */ + return GNUNET_OK; +} + + +/** + * Called when a client wants to add or remove an element to a set it inhabits. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + + if (NULL == (set = cs->set)) + { + /* client without a set requested an operation */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_SERVICE_client_continue (cs->client); + + if (0 != set->content->iterator_count) + { + struct PendingMutation *pm; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n"); + pm = GNUNET_new (struct PendingMutation); + pm->msg = + (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); + pm->set = set; + GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, + set->content->pending_mutations_tail, + pm); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); + execute_mutation (set, msg); +} + + +/** + * Advance the current generation of a set, + * adding exclusion ranges if necessary. + * + * @param set the set where we want to advance the generation + */ +static void +advance_generation (struct Set *set) +{ + struct GenerationRange r; + + if (set->current_generation == set->content->latest_generation) + { + set->content->latest_generation++; + set->current_generation++; + return; + } + + GNUNET_assert (set->current_generation < set->content->latest_generation); + + r.start = set->current_generation + 1; + r.end = set->content->latest_generation + 1; + set->content->latest_generation = r.end; + set->current_generation = r.end; + GNUNET_array_append (set->excluded_generations, + set->excluded_generations_size, + r); +} + + +/** + * Called when a client wants to initiate a set operation with another + * peer. Initiates the CADET connection to the listener and sends the + * request. + * + * @param cls client that sent the message + * @param msg message sent by the client + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) +{ + /* FIXME: suboptimal, even if the context below could be NULL, + there are malformed messages this does not check for... */ + return GNUNET_OK; +} + + +/** + * Called when a client wants to initiate a set operation with another + * peer. Initiates the CADET connection to the listener and sends the + * request. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) +{ + struct ClientState *cs = cls; + struct Operation *op = GNUNET_new (struct Operation); + const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (incoming_msg, + GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST, + struct OperationRequestMessage, + op), + GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, + GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO, + struct IntersectionElementInfoMessage, + op), + GNUNET_MQ_hd_var_size (intersection_p2p_bf, + GNUNET_MESSAGE_TYPE_SETI_P2P_BF, + struct BFMessage, + op), + GNUNET_MQ_hd_fixed_size (intersection_p2p_done, + GNUNET_MESSAGE_TYPE_SETI_P2P_DONE, + struct IntersectionDoneMessage, + op), + GNUNET_MQ_handler_end () + }; + struct Set *set; + const struct GNUNET_MessageHeader *context; + + if (NULL == (set = cs->set)) + { + GNUNET_break (0); + GNUNET_free (op); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + op->peer = msg->target_peer; + op->result_mode = ntohl (msg->result_mode); + op->client_request_id = ntohl (msg->request_id); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; + context = GNUNET_MQ_extract_nested_mh (msg); + + /* Advance generation values, so that + mutations won't interfer with the running operation. */ + op->set = set; + op->generation_created = set->current_generation; + advance_generation (set); + GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new CADET channel to port %s for set operation type %u\n", + GNUNET_h2s (&msg->app_id), + set->operation); + op->channel = GNUNET_CADET_channel_create (cadet, + op, + &msg->target_peer, + &msg->app_id, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); + op->mq = GNUNET_CADET_get_mq (op->channel); + op->state = intersection_evaluate (op, context); // FIXME: inline! + if (NULL == op->state) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Handle a request from the client to cancel a running set operation. + * + * @param cls the client + * @param msg the message + */ +static void +handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + struct Operation *op; + int found; + + if (NULL == (set = cs->set)) + { + /* client without a set requested an operation */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + found = GNUNET_NO; + for (op = set->ops_head; NULL != op; op = op->next) + { + if (op->client_request_id == ntohl (msg->request_id)) + { + found = GNUNET_YES; + break; + } + } + if (GNUNET_NO == found) + { + /* It may happen that the operation was already destroyed due to + * the other peer disconnecting. The client may not know about this + * yet and try to cancel the (just barely non-existent) operation. + * So this is not a hard error. + */GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client canceled non-existent op %u\n", + (uint32_t) ntohl (msg->request_id)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client requested cancel for op %u\n", + (uint32_t) ntohl (msg->request_id)); + _GSS_operation_destroy (op, GNUNET_YES); + } + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Handle a request from the client to accept a set operation that + * came from a remote peer. We forward the accept to the associated + * operation for handling + * + * @param cls the client + * @param msg the message + */ +static void +handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + struct Operation *op; + struct GNUNET_SET_ResultMessage *result_message; + struct GNUNET_MQ_Envelope *ev; + struct Listener *listener; + + if (NULL == (set = cs->set)) + { + /* client without a set requested to accept */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + op = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == op) + { + /* It is not an error if the set op does not exist -- it may + * have been destroyed when the partner peer disconnected. */ + GNUNET_log ( + GNUNET_ERROR_TYPE_INFO, + "Client %p accepted request %u of listener %p that is no longer active\n", + cs, + ntohl (msg->accept_reject_id), + cs->listener); + ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SETI_RESULT); + result_message->request_id = msg->request_id; + result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); + GNUNET_MQ_send (set->cs->mq, ev); + GNUNET_SERVICE_client_continue (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client accepting request %u\n", + (uint32_t) ntohl (msg->accept_reject_id)); + listener = op->listener; + op->listener = NULL; + GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); + op->set = set; + GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + op->client_request_id = ntohl (msg->request_id); + op->result_mode = ntohl (msg->result_mode); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; + + /* Advance generation values, so that future mutations do not + interfer with the running operation. */ + op->generation_created = set->current_generation; + advance_generation (set); + GNUNET_assert (NULL == op->state); + op->state = intersection_accept (op); // FIXME: inline + if (NULL == op->state) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + /* Now allow CADET to continue, as we did not do this in + #handle_incoming_msg (as we wanted to first see if the + local client would accept the request). */ + GNUNET_CADET_receive_done (op->channel); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called to clean up, after a shutdown has been requested. + * + * @param cls closure, NULL + */ +static void +shutdown_task (void *cls) +{ + /* Delay actual shutdown to allow service to disconnect clients */ + in_shutdown = GNUNET_YES; + if (0 == num_clients) + { + if (NULL != cadet) + { + GNUNET_CADET_disconnect (cadet); + cadet = NULL; + } + } + GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); +} + + +/** + * Function called by the service's run + * method to run service-specific setup code. + * + * @param cls closure + * @param cfg configuration to use + * @param service the initialized service + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_SERVICE_Handle *service) +{ + /* FIXME: need to modify SERVICE (!) API to allow + us to run a shutdown task *after* clients were + forcefully disconnected! */ + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); + cadet = GNUNET_CADET_connect (cfg); + if (NULL == cadet) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _ ("Could not connect to CADET service\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Define "main" method using service macro. + */ +GNUNET_SERVICE_MAIN ( + "set", + GNUNET_SERVICE_OPTION_NONE, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_fixed_size (client_accept, + GNUNET_MESSAGE_TYPE_SETI_ACCEPT, + struct GNUNET_SET_AcceptMessage, + NULL), + GNUNET_MQ_hd_var_size (client_mutation, + GNUNET_MESSAGE_TYPE_SETI_ADD, + struct GNUNET_SET_ElementMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_create_set, + GNUNET_MESSAGE_TYPE_SETI_CREATE, + struct GNUNET_SET_CreateMessage, + NULL), + GNUNET_MQ_hd_var_size (client_evaluate, + GNUNET_MESSAGE_TYPE_SETI_EVALUATE, + struct GNUNET_SET_EvaluateMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_listen, + GNUNET_MESSAGE_TYPE_SETI_LISTEN, + struct GNUNET_SET_ListenMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_reject, + GNUNET_MESSAGE_TYPE_SETI_REJECT, + struct GNUNET_SET_RejectMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_cancel, + GNUNET_MESSAGE_TYPE_SETI_CANCEL, + struct GNUNET_SET_CancelMessage, + NULL), + GNUNET_MQ_handler_end ()); + + +/* end of gnunet-service-seti.c */ diff --git a/src/seti/gnunet-service-seti_protocol.h b/src/seti/gnunet-service-seti_protocol.h new file mode 100644 index 000000000..51968376e --- /dev/null +++ b/src/seti/gnunet-service-seti_protocol.h @@ -0,0 +1,144 @@ +/* + This file is part of GNUnet. + Copyright (C) 2013, 2014, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @author Florian Dold + * @author Christian Grothoff + * @file seti/gnunet-service-seti_protocol.h + * @brief Peer-to-Peer messages for gnunet set + */ +#ifndef SETI_PROTOCOL_H +#define SETI_PROTOCOL_H + +#include "platform.h" +#include "gnunet_common.h" + + +GNUNET_NETWORK_STRUCT_BEGIN + +struct OperationRequestMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST + */ + struct GNUNET_MessageHeader header; + + /** + * For Intersection: my element count + */ + uint32_t element_count GNUNET_PACKED; + + /** + * Application-specific identifier of the request. + */ + struct GNUNET_HashCode app_idX; + + /* rest: optional message */ +}; + + +/** + * During intersection, the first (and possibly second) message + * send it the number of elements in the set, to allow the peers + * to decide who should start with the Bloom filter. + */ +struct IntersectionElementInfoMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO + */ + struct GNUNET_MessageHeader header; + + /** + * mutator used with this bloomfilter. + */ + uint32_t sender_element_count GNUNET_PACKED; +}; + + +/** + * Bloom filter messages exchanged for set intersection calculation. + */ +struct BFMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF + */ + struct GNUNET_MessageHeader header; + + /** + * Number of elements the sender still has in the set. + */ + uint32_t sender_element_count GNUNET_PACKED; + + /** + * XOR of all hashes over all elements remaining in the set. + * Used to determine termination. + */ + struct GNUNET_HashCode element_xor_hash; + + /** + * Mutator used with this bloomfilter. + */ + uint32_t sender_mutator GNUNET_PACKED; + + /** + * Total length of the bloomfilter data. + */ + uint32_t bloomfilter_total_length GNUNET_PACKED; + + /** + * Number of bits (k-value) used in encoding the bloomfilter. + */ + uint32_t bits_per_element GNUNET_PACKED; + + /** + * rest: the sender's bloomfilter + */ +}; + + +/** + * Last message, send to confirm the final set. Contains the element + * count as it is possible that the peer determined that we were done + * by getting the empty set, which in that case also needs to be + * communicated. + */ +struct IntersectionDoneMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE + */ + struct GNUNET_MessageHeader header; + + /** + * Final number of elements in intersection. + */ + uint32_t final_element_count GNUNET_PACKED; + + /** + * XOR of all hashes over all elements remaining in the set. + */ + struct GNUNET_HashCode element_xor_hash; +}; + + +GNUNET_NETWORK_STRUCT_END + +#endif diff --git a/src/seti/gnunet-seti-profiler.c b/src/seti/gnunet-seti-profiler.c new file mode 100644 index 000000000..b8230bcfc --- /dev/null +++ b/src/seti/gnunet-seti-profiler.c @@ -0,0 +1,480 @@ +/* + This file is part of GNUnet + Copyright (C) 2013, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/gnunet-seti-profiler.c + * @brief profiling tool for set intersection + * @author Florian Dold + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" +#include "gnunet_seti_service.h" +#include "gnunet_testbed_service.h" + + +static int ret; + +static unsigned int num_a = 5; +static unsigned int num_b = 5; +static unsigned int num_c = 20; + +const static struct GNUNET_CONFIGURATION_Handle *config; + +struct SetInfo +{ + char *id; + struct GNUNET_SETI_Handle *set; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_CONTAINER_MultiHashMap *sent; + struct GNUNET_CONTAINER_MultiHashMap *received; + int done; +} info1, info2; + +static struct GNUNET_CONTAINER_MultiHashMap *common_sent; + +static struct GNUNET_HashCode app_id; + +static struct GNUNET_PeerIdentity local_peer; + +static struct GNUNET_SETI_ListenHandle *set_listener; + +static unsigned int use_intersection; + +static unsigned int element_size = 32; + +/** + * Handle to the statistics service. + */ +static struct GNUNET_STATISTICS_Handle *statistics; + +/** + * The profiler will write statistics + * for all peers to the file with this name. + */ +static char *statistics_filename; + +/** + * The profiler will write statistics + * for all peers to this file. + */ +static FILE *statistics_file; + + +static int +map_remove_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_CONTAINER_MultiHashMap *m = cls; + int ret; + + GNUNET_assert (NULL != key); + + ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key); + if (GNUNET_OK != ret) + printf ("spurious element\n"); + return GNUNET_YES; +} + + +/** + * Callback function to process statistic values. + * + * @param cls closure + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + */ +static int +statistics_result (void *cls, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) +{ + if (NULL != statistics_file) + { + fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned + long) value); + } + return GNUNET_OK; +} + + +static void +statistics_done (void *cls, + int success) +{ + GNUNET_assert (GNUNET_YES == success); + if (NULL != statistics_file) + fclose (statistics_file); + GNUNET_SCHEDULER_shutdown (); +} + + +static void +check_all_done (void) +{ + if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO)) + return; + + GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator, + info2.sent); + GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator, + info1.sent); + + printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size ( + info1.sent)); + printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size ( + info2.sent)); + + if (NULL == statistics_filename) + { + GNUNET_SCHEDULER_shutdown (); + return; + } + + statistics_file = fopen (statistics_filename, "w"); + GNUNET_STATISTICS_get (statistics, NULL, NULL, + &statistics_done, + &statistics_result, NULL); +} + + +static void +set_result_cb (void *cls, + const struct GNUNET_SETI_Element *element, + uint64_t current_size, + enum GNUNET_SETI_Status status) +{ + struct SetInfo *info = cls; + struct GNUNET_HashCode hash; + + GNUNET_assert (GNUNET_NO == info->done); + switch (status) + { + case GNUNET_SETI_STATUS_DONE: + info->done = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set intersection done\n"); + check_all_done (); + info->oh = NULL; + return; + case GNUNET_SETI_STATUS_FAILURE: + info->oh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "failure\n"); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_SETI_STATUS_ADD_LOCAL: + GNUNET_CRYPTO_hash (element->data, + element->size, + &hash); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set %s: keep element %s\n", + info->id, + GNUNET_h2s (&hash)); + break; + case GNUNET_SETI_STATUS_DEL_LOCAL: + GNUNET_CRYPTO_hash (element->data, + element->size, + &hash); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set %s: remove element %s\n", + info->id, + GNUNET_h2s (&hash)); + return; + default: + GNUNET_assert (0); + } + + if (element->size != element_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "wrong element size: %u, expected %u\n", + element->size, + (unsigned int) sizeof(struct GNUNET_HashCode)); + GNUNET_assert (0); + } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set %s: got element (%s)\n", + info->id, GNUNET_h2s (element->data)); + GNUNET_assert (NULL != element->data); + { + struct GNUNET_HashCode data_hash; + + GNUNET_CRYPTO_hash (element->data, + element_size, + &data_hash); + GNUNET_CONTAINER_multihashmap_put (info->received, + &data_hash, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } +} + + +static void +set_listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SETI_Request *request) +{ + /* max. 1 option plus terminator */ + struct GNUNET_SETI_Option opts[2] = { { 0 } }; + unsigned int n_opts = 0; + + if (NULL == request) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "listener failed\n"); + return; + } + GNUNET_assert (NULL == info2.oh); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "set listen cb called\n"); + if (use_intersection) + { + opts[n_opts++] = (struct GNUNET_SETI_Option) { .type = + GNUNET_SETI_OPTION_RETURN_INTERSECTION }; + } + opts[n_opts].type = GNUNET_SETI_OPTION_END; + info2.oh = GNUNET_SETI_accept (request, + opts, + &set_result_cb, + &info2); + GNUNET_SETI_commit (info2.oh, + info2.set); +} + + +static int +set_insert_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_SETI_Handle *set = cls; + struct GNUNET_SETI_Element el; + + el.element_type = 0; + el.data = value; + el.size = element_size; + GNUNET_SETI_add_element (set, &el, NULL, NULL); + return GNUNET_YES; +} + + +static void +handle_shutdown (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shutting down set profiler\n"); + if (NULL != set_listener) + { + GNUNET_SETI_listen_cancel (set_listener); + set_listener = NULL; + } + if (NULL != info1.oh) + { + GNUNET_SETI_operation_cancel (info1.oh); + info1.oh = NULL; + } + if (NULL != info2.oh) + { + GNUNET_SETI_operation_cancel (info2.oh); + info2.oh = NULL; + } + if (NULL != info1.set) + { + GNUNET_SETI_destroy (info1.set); + info1.set = NULL; + } + if (NULL != info2.set) + { + GNUNET_SETI_destroy (info2.set); + info2.set = NULL; + } + GNUNET_STATISTICS_destroy (statistics, GNUNET_NO); +} + + +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +{ + unsigned int i; + struct GNUNET_HashCode hash; + /* max. 1 option plus terminator */ + struct GNUNET_SETI_Option opts[2] = { { 0 } }; + unsigned int n_opts = 0; + + config = cfg; + + GNUNET_assert (element_size > 0); + + if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "could not retrieve host identity\n"); + ret = 0; + return; + } + statistics = GNUNET_STATISTICS_create ("set-profiler", cfg); + GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL); + info1.id = "a"; + info2.id = "b"; + info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO); + info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO); + common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO); + info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO); + info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO); + for (i = 0; i < num_a; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + for (i = 0; i < num_b; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + for (i = 0; i < num_c; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id); + + info1.set = GNUNET_SETI_create (config); + info2.set = GNUNET_SETI_create (config); + GNUNET_CONTAINER_multihashmap_iterate (info1.sent, + &set_insert_iterator, + info1.set); + GNUNET_CONTAINER_multihashmap_iterate (info2.sent, + &set_insert_iterator, + info2.set); + GNUNET_CONTAINER_multihashmap_iterate (common_sent, + &set_insert_iterator, + info1.set); + GNUNET_CONTAINER_multihashmap_iterate (common_sent, + &set_insert_iterator, + info2.set); + + set_listener = GNUNET_SETI_listen (config, + &app_id, + &set_listen_cb, + NULL); + if (use_intersection) + { + opts[n_opts++] = (struct GNUNET_SETI_Option) { .type = + GNUNET_SETI_OPTION_RETURN_INTERSECTION }; + } + opts[n_opts].type = GNUNET_SETI_OPTION_END; + + info1.oh = GNUNET_SETI_prepare (&local_peer, + &app_id, + NULL, + opts, + set_result_cb, + &info1); + GNUNET_SETI_commit (info1.oh, + info1.set); + GNUNET_SETI_destroy (info1.set); + info1.set = NULL; +} + + +static void +pre_run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + if (0 != GNUNET_TESTING_peer_run ("set-profiler", + cfgfile, + &run, NULL)) + ret = 2; +} + + +int +main (int argc, char **argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_uint ('A', + "num-first", + NULL, + gettext_noop ("number of values"), + &num_a), + GNUNET_GETOPT_option_uint ('B', + "num-second", + NULL, + gettext_noop ("number of values"), + &num_b), + GNUNET_GETOPT_option_uint ('C', + "num-common", + NULL, + gettext_noop ("number of values"), + &num_c), + GNUNET_GETOPT_option_uint ('i', + "use-intersection", + NULL, + gettext_noop ( + "return intersection instead of delta"), + &use_intersection), + GNUNET_GETOPT_option_uint ('w', + "element-size", + NULL, + gettext_noop ("element size"), + &element_size), + GNUNET_GETOPT_option_filename ('s', + "statistics", + "FILENAME", + gettext_noop ("write statistics to file"), + &statistics_filename), + GNUNET_GETOPT_OPTION_END + }; + + GNUNET_PROGRAM_run2 (argc, argv, + "gnunet-seti-profiler", + "help", + options, + &pre_run, + NULL, + GNUNET_YES); + return ret; +} diff --git a/src/seti/plugin_block_seti_test.c b/src/seti/plugin_block_seti_test.c new file mode 100644 index 000000000..1de086092 --- /dev/null +++ b/src/seti/plugin_block_seti_test.c @@ -0,0 +1,123 @@ +/* + This file is part of GNUnet + Copyright (C) 2017 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/plugin_block_set_test.c + * @brief set test block, recognizes elements with non-zero first byte as invalid + * @author Christian Grothoff + */ + +#include "platform.h" +#include "gnunet_block_plugin.h" +#include "gnunet_block_group_lib.h" + + +/** + * Function called to validate a reply or a request. For + * request evaluation, simply pass "NULL" for the reply_block. + * + * @param cls closure + * @param ctx block context + * @param type block type + * @param group block group to use + * @param eo control flags + * @param query original query (hash) + * @param xquery extrended query data (can be NULL, depending on type) + * @param xquery_size number of bytes in xquery + * @param reply_block response to validate + * @param reply_block_size number of bytes in reply block + * @return characterization of result + */ +static enum GNUNET_BLOCK_EvaluationResult +block_plugin_set_test_evaluate (void *cls, + struct GNUNET_BLOCK_Context *ctx, + enum GNUNET_BLOCK_Type type, + struct GNUNET_BLOCK_Group *group, + enum GNUNET_BLOCK_EvaluationOptions eo, + const struct GNUNET_HashCode *query, + const void *xquery, + size_t xquery_size, + const void *reply_block, + size_t reply_block_size) +{ + if ((NULL == reply_block) || + (reply_block_size == 0) || + (0 != ((char *) reply_block)[0])) + return GNUNET_BLOCK_EVALUATION_RESULT_INVALID; + return GNUNET_BLOCK_EVALUATION_OK_MORE; +} + + +/** + * Function called to obtain the key for a block. + * + * @param cls closure + * @param type block type + * @param block block to get the key for + * @param block_size number of bytes in block + * @param key set to the key (query) for the given block + * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported + * (or if extracting a key from a block of this type does not work) + */ +static int +block_plugin_set_test_get_key (void *cls, + enum GNUNET_BLOCK_Type type, + const void *block, + size_t block_size, + struct GNUNET_HashCode *key) +{ + return GNUNET_SYSERR; +} + + +/** + * Entry point for the plugin. + */ +void * +libgnunet_plugin_block_set_test_init (void *cls) +{ + static enum GNUNET_BLOCK_Type types[] = { + GNUNET_BLOCK_TYPE_SET_TEST, + GNUNET_BLOCK_TYPE_ANY /* end of list */ + }; + struct GNUNET_BLOCK_PluginFunctions *api; + + api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions); + api->evaluate = &block_plugin_set_test_evaluate; + api->get_key = &block_plugin_set_test_get_key; + api->types = types; + return api; +} + + +/** + * Exit point from the plugin. + */ +void * +libgnunet_plugin_block_set_test_done (void *cls) +{ + struct GNUNET_BLOCK_PluginFunctions *api = cls; + + GNUNET_free (api); + return NULL; +} + + +/* end of plugin_block_set_test.c */ diff --git a/src/seti/seti.conf.in b/src/seti/seti.conf.in new file mode 100644 index 000000000..e4f7b60b5 --- /dev/null +++ b/src/seti/seti.conf.in @@ -0,0 +1,12 @@ +[seti] +START_ON_DEMAND = @START_ON_DEMAND@ +@UNIXONLY@PORT = 2106 +HOSTNAME = localhost +BINARY = gnunet-service-seti +ACCEPT_FROM = 127.0.0.1; +ACCEPT_FROM6 = ::1; +UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-seti.sock +UNIX_MATCH_UID = YES +UNIX_MATCH_GID = YES + +#PREFIX = valgrind diff --git a/src/seti/seti.h b/src/seti/seti.h new file mode 100644 index 000000000..aa7014034 --- /dev/null +++ b/src/seti/seti.h @@ -0,0 +1,267 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2014, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/seti.h + * @brief messages used for the set intersection api + * @author Florian Dold + * @author Christian Grothoff + */ +#ifndef SETI_H +#define SETI_H + +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_set_service.h" + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Message sent by the client to the service to ask starting + * a new set to perform operations with. + */ +struct GNUNET_SETI_CreateMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_CREATE + */ + struct GNUNET_MessageHeader header; +}; + + +/** + * Message sent by the client to the service to start listening for + * incoming requests to perform a certain type of set operation for a + * certain type of application. + */ +struct GNUNET_SETI_ListenMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_LISTEN + */ + struct GNUNET_MessageHeader header; + + /** + * Operation type, values of `enum GNUNET_SETI_OperationType` + */ + uint32_t operation GNUNET_PACKED; + + /** + * application id + */ + struct GNUNET_HashCode app_id; +}; + + +/** + * Message sent by a listening client to the service to accept + * performing the operation with the other peer. + */ +struct GNUNET_SETI_AcceptMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_ACCEPT + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the incoming request we want to accept. + */ + uint32_t accept_reject_id GNUNET_PACKED; + + /** + * Request ID to identify responses. + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Return the intersection (1), instead of the elements to + * remove / the delta (0), in NBO. + */ + uint32_t return_intersection; + +}; + + +/** + * Message sent by a listening client to the service to reject + * performing the operation with the other peer. + */ +struct GNUNET_SETI_RejectMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_REJECT + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the incoming request we want to reject. + */ + uint32_t accept_reject_id GNUNET_PACKED; +}; + + +/** + * A request for an operation with another client. + */ +struct GNUNET_SETI_RequestMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_REQUEST. + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the to identify the request when accepting or + * rejecting it. + */ + uint32_t accept_id GNUNET_PACKED; + + /** + * Identity of the requesting peer. + */ + struct GNUNET_PeerIdentity peer_id; + + /* rest: context message, that is, application-specific + message to convince listener to pick up */ +}; + + +/** + * Message sent by client to service to initiate a set operation as a + * client (not as listener). A set (which determines the operation + * type) must already exist in association with this client. + */ +struct GNUNET_SETI_EvaluateMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_EVALUATE + */ + struct GNUNET_MessageHeader header; + + /** + * Id of our set to evaluate, chosen implicitly by the client when it + * calls #GNUNET_SETI_commit(). + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Peer to evaluate the operation with + */ + struct GNUNET_PeerIdentity target_peer; + + /** + * Application id + */ + struct GNUNET_HashCode app_id; + + /** + * Return the intersection (1), instead of the elements to + * remove / the delta (0), in NBO. + */ + uint32_t return_intersection; + + /* rest: context message, that is, application-specific + message to convince listener to pick up */ +}; + + +/** + * Message sent by the service to the client to indicate an + * element that is removed (set intersection) or added + * (set union) or part of the final result, depending on + * options specified for the operation. + */ +struct GNUNET_SETI_ResultMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Current set size. + */ + uint64_t current_size; + + /** + * id the result belongs to + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Was the evaluation successful? Contains + * an `enum GNUNET_SETI_Status` in NBO. + */ + uint16_t result_status GNUNET_PACKED; + + /** + * Type of the element attachted to the message, if any. + */ + uint16_t element_type GNUNET_PACKED; + + /* rest: the actual element */ +}; + + +/** + * Message sent by client to the service to add an element to the set. + */ +struct GNUNET_SETI_ElementMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_ADD. + */ + struct GNUNET_MessageHeader header; + + /** + * Type of the element to add or remove. + */ + uint16_t element_type GNUNET_PACKED; + + /** + * For alignment, always zero. + */ + uint16_t reserved GNUNET_PACKED; + + /* rest: the actual element */ +}; + + +/** + * Sent to the service by the client + * in order to cancel a set operation. + */ +struct GNUNET_SETI_CancelMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_CANCEL + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the request we want to cancel. + */ + uint32_t request_id GNUNET_PACKED; +}; + + +GNUNET_NETWORK_STRUCT_END + +#endif diff --git a/src/seti/seti_api.c b/src/seti/seti_api.c new file mode 100644 index 000000000..d80a60684 --- /dev/null +++ b/src/seti/seti_api.c @@ -0,0 +1,895 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2016, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file seti/seti_api.c + * @brief api for the set service + * @author Florian Dold + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_seti_service.h" +#include "seti.h" + + +#define LOG(kind, ...) GNUNET_log_from (kind, "seti-api", __VA_ARGS__) + + +/** + * Opaque handle to a set. + */ +struct GNUNET_SETI_Handle +{ + /** + * Message queue for @e client. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Linked list of operations on the set. + */ + struct GNUNET_SETI_OperationHandle *ops_head; + + /** + * Linked list of operations on the set. + */ + struct GNUNET_SETI_OperationHandle *ops_tail; + + /** + * Configuration, needed when creating (lazy) copies. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Should the set be destroyed once all operations are gone? + * #GNUNET_SYSERR if #GNUNET_SETI_destroy() must raise this flag, + * #GNUNET_YES if #GNUNET_SETI_destroy() did raise this flag. + */ + int destroy_requested; + + /** + * Has the set become invalid (e.g. service died)? + */ + int invalid; + + /** + * Both client and service count the number of iterators + * created so far to match replies with iterators. + */ + uint16_t iteration_id; + +}; + + +/** + * Handle for a set operation request from another peer. + */ +struct GNUNET_SETI_Request +{ + /** + * Id of the request, used to identify the request when + * accepting/rejecting it. + */ + uint32_t accept_id; + + /** + * Has the request been accepted already? + * #GNUNET_YES/#GNUNET_NO + */ + int accepted; +}; + + +/** + * Handle to an operation. Only known to the service after committing + * the handle with a set. + */ +struct GNUNET_SETI_OperationHandle +{ + /** + * Function to be called when we have a result, + * or an error. + */ + GNUNET_SETI_ResultIterator result_cb; + + /** + * Closure for @e result_cb. + */ + void *result_cls; + + /** + * Local set used for the operation, + * NULL if no set has been provided by conclude yet. + */ + struct GNUNET_SETI_Handle *set; + + /** + * Message sent to the server on calling conclude, + * NULL if conclude has been called. + */ + struct GNUNET_MQ_Envelope *conclude_mqm; + + /** + * Address of the request if in the conclude message, + * used to patch the request id into the message when the set is known. + */ + uint32_t *request_id_addr; + + /** + * Handles are kept in a linked list. + */ + struct GNUNET_SETI_OperationHandle *prev; + + /** + * Handles are kept in a linked list. + */ + struct GNUNET_SETI_OperationHandle *next; + + /** + * Request ID to identify the operation within the set. + */ + uint32_t request_id; + + /** + * Should we return the resulting intersection (ADD) or + * the elements to remove (DEL)? + */ + int return_intersection; +}; + + +/** + * Opaque handle to a listen operation. + */ +struct GNUNET_SETI_ListenHandle +{ + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle*mq; + + /** + * Configuration handle for the listener, stored + * here to be able to reconnect transparently on + * connection failure. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Function to call on a new incoming request, + * or on error. + */ + GNUNET_SETI_ListenCallback listen_cb; + + /** + * Closure for @e listen_cb. + */ + void *listen_cls; + + /** + * Task for reconnecting when the listener fails. + */ + struct GNUNET_SCHEDULER_Task *reconnect_task; + + /** + * Application ID we listen for. + */ + struct GNUNET_HashCode app_id; + + /** + * Time to wait until we try to reconnect on failure. + */ + struct GNUNET_TIME_Relative reconnect_backoff; + +}; + + +/** + * Check that the given @a msg is well-formed. + * + * @param cls closure + * @param msg message to check + * @return #GNUNET_OK if message is well-formed + */ +static int +check_result (void *cls, + const struct GNUNET_SETI_ResultMessage *msg) +{ + /* minimum size was already checked, everything else is OK! */ + return GNUNET_OK; +} + + +/** + * Handle result message for a set operation. + * + * @param cls the set + * @param mh the message + */ +static void +handle_result (void *cls, + const struct GNUNET_SETI_ResultMessage *msg) +{ + struct GNUNET_SETI_Handle *set = cls; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_SETI_Element e; + enum GNUNET_SETI_Status result_status; + int destroy_set; + + GNUNET_assert (NULL != set->mq); + result_status = (enum GNUNET_SETI_Status) ntohs (msg->result_status); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got result message with status %d\n", + result_status); + oh = GNUNET_MQ_assoc_get (set->mq, + ntohl (msg->request_id)); + if (NULL == oh) + { + /* 'oh' can be NULL if we canceled the operation, but the service + did not get the cancel message yet. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring result from canceled operation\n"); + return; + } + + switch (result_status) + { + case GNUNET_SETI_STATUS_ADD_LOCAL: + case GNUNET_SETI_STATUS_DEL_LOCAL: + e.data = &msg[1]; + e.size = ntohs (msg->header.size) + - sizeof(struct GNUNET_SETI_ResultMessage); + e.element_type = ntohs (msg->element_type); + if (NULL != oh->result_cb) + oh->result_cb (oh->result_cls, + &e, + GNUNET_ntohll (msg->current_size), + result_status); + return; + case GNUNET_SETI_STATUS_FAILURE: + case GNUNET_SETI_STATUS_DONE: + GNUNET_MQ_assoc_remove (set->mq, + ntohl (msg->request_id)); + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + oh); + /* Need to do this calculation _before_ the result callback, + as IF the application still has a valid set handle, it + may trigger destruction of the set during the callback. */ + destroy_set = (GNUNET_YES == set->destroy_requested) && + (NULL == set->ops_head); + if (NULL != oh->result_cb) + { + oh->result_cb (oh->result_cls, + NULL, + GNUNET_ntohll (msg->current_size), + result_status); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "No callback for final status\n"); + } + if (destroy_set) + GNUNET_SETI_destroy (set); + GNUNET_free (oh); + return; + } +} + + +/** + * Destroy the given set operation. + * + * @param oh set operation to destroy + */ +static void +set_operation_destroy (struct GNUNET_SETI_OperationHandle *oh) +{ + struct GNUNET_SETI_Handle *set = oh->set; + struct GNUNET_SETI_OperationHandle *h_assoc; + + if (NULL != oh->conclude_mqm) + GNUNET_MQ_discard (oh->conclude_mqm); + /* is the operation already commited? */ + if (NULL != set) + { + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + oh); + h_assoc = GNUNET_MQ_assoc_remove (set->mq, + oh->request_id); + GNUNET_assert ((NULL == h_assoc) || + (h_assoc == oh)); + } + GNUNET_free (oh); +} + + +/** + * Cancel the given set operation. We need to send an explicit cancel + * message, as all operations one one set communicate using one + * handle. + * + * @param oh set operation to cancel + */ +void +GNUNET_SETI_operation_cancel (struct GNUNET_SETI_OperationHandle *oh) +{ + struct GNUNET_SETI_Handle *set = oh->set; + struct GNUNET_SETI_CancelMessage *m; + struct GNUNET_MQ_Envelope *mqm; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling SET operation\n"); + if (NULL != set) + { + mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETI_CANCEL); + m->request_id = htonl (oh->request_id); + GNUNET_MQ_send (set->mq, mqm); + } + set_operation_destroy (oh); + if ((NULL != set) && + (GNUNET_YES == set->destroy_requested) && + (NULL == set->ops_head)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying set after operation cancel\n"); + GNUNET_SETI_destroy (set); + } +} + + +/** + * We encountered an error communicating with the set service while + * performing a set operation. Report to the application. + * + * @param cls the `struct GNUNET_SETI_Handle` + * @param error error code + */ +static void +handle_client_set_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SETI_Handle *set = cls; + + LOG (GNUNET_ERROR_TYPE_ERROR, + "Handling client set error %d\n", + error); + while (NULL != set->ops_head) + { + if ((NULL != set->ops_head->result_cb) && + (GNUNET_NO == set->destroy_requested)) + set->ops_head->result_cb (set->ops_head->result_cls, + NULL, + 0, + GNUNET_SETI_STATUS_FAILURE); + set_operation_destroy (set->ops_head); + } + set->invalid = GNUNET_YES; +} + + +/** + * Create an empty set. + * + * @param cfg configuration to use for connecting to the + * set service + * @return a handle to the set + */ +struct GNUNET_SETI_Handle * +GNUNET_SETI_create (const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_SETI_Handle *set = GNUNET_new (struct GNUNET_SETI_Handle); + struct GNUNET_MQ_MessageHandler mq_handlers[] = { + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_SETI_RESULT, + struct GNUNET_SETI_ResultMessage, + set), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_CreateMessage *create_msg; + + set->cfg = cfg; + set->mq = GNUNET_CLIENT_connect (cfg, + "set", + mq_handlers, + &handle_client_set_error, + set); + if (NULL == set->mq) + { + GNUNET_free (set); + return NULL; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new intersection set\n"); + mqm = GNUNET_MQ_msg (create_msg, + GNUNET_MESSAGE_TYPE_SETI_CREATE); + GNUNET_MQ_send (set->mq, + mqm); + return set; +} + + +/** + * Add an element to the given set. After the element has been added + * (in the sense of being transmitted to the set service), @a cont + * will be called. Multiple calls to GNUNET_SETI_add_element() can be + * queued. + * + * @param set set to add element to + * @param element element to add to the set + * @param cb continuation called after the element has been added + * @param cb_cls closure for @a cont + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETI_add_element (struct GNUNET_SETI_Handle *set, + const struct GNUNET_SETI_Element *element, + GNUNET_SCHEDULER_TaskCallback cb, + void *cb_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_ElementMessage *msg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "adding element of type %u to set %p\n", + (unsigned int) element->element_type, + set); + if (GNUNET_YES == set->invalid) + { + if (NULL != cb) + cb (cb_cls); + return GNUNET_SYSERR; + } + mqm = GNUNET_MQ_msg_extra (msg, + element->size, + GNUNET_MESSAGE_TYPE_SETI_ADD); + msg->element_type = htons (element->element_type); + GNUNET_memcpy (&msg[1], + element->data, + element->size); + GNUNET_MQ_notify_sent (mqm, + cb, + cb_cls); + GNUNET_MQ_send (set->mq, + mqm); + return GNUNET_OK; +} + + +/** + * Destroy the set handle if no operations are left, mark the set + * for destruction otherwise. + * + * @param set set handle to destroy + */ +void +GNUNET_SETI_destroy (struct GNUNET_SETI_Handle *set) +{ + /* destroying set while iterator is active is currently + not supported; we should expand the API to allow + clients to explicitly cancel the iteration! */ + if ((NULL != set->ops_head) || + (GNUNET_SYSERR == set->destroy_requested)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Set operations are pending, delaying set destruction\n"); + set->destroy_requested = GNUNET_YES; + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Really destroying set\n"); + if (NULL != set->mq) + { + GNUNET_MQ_destroy (set->mq); + set->mq = NULL; + } + GNUNET_free (set); +} + + +/** + * Prepare a set operation to be evaluated with another peer. + * The evaluation will not start until the client provides + * a local set with #GNUNET_SETI_commit(). + * + * @param other_peer peer with the other set + * @param app_id hash for the application using the set + * @param context_msg additional information for the request + * @param options options to use when processing the request + * @param result_cb called on error or success + * @param result_cls closure for @e result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETI_OperationHandle * +GNUNET_SETI_prepare (const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_HashCode *app_id, + const struct GNUNET_MessageHeader *context_msg, + const struct GNUNET_SETI_Option options[], + GNUNET_SETI_ResultIterator result_cb, + void *result_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_SETI_EvaluateMessage *msg; + + oh = GNUNET_new (struct GNUNET_SETI_OperationHandle); + oh->result_cb = result_cb; + oh->result_cls = result_cls; + mqm = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SETI_EVALUATE, + context_msg); + msg->app_id = *app_id; + msg->target_peer = *other_peer; + for (const struct GNUNET_SETI_Option *opt = options; + GNUNET_SETI_OPTION_END != opt->type; + opt++) + { + switch (opt->type) + { + case GNUNET_SETI_OPTION_RETURN_INTERSECTION: + msg->return_intersection = GNUNET_YES; + break; + default: + LOG (GNUNET_ERROR_TYPE_ERROR, + "Option with type %d not recognized\n", + (int) opt->type); + } + } + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; + return oh; +} + + +/** + * Connect to the set service in order to listen for requests. + * + * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect + */ +static void +listen_connect (void *cls); + + +/** + * Check validity of request message for a listen operation + * + * @param cls the listen handle + * @param msg the message + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_request (void *cls, + const struct GNUNET_SETI_RequestMessage *msg) +{ + const struct GNUNET_MessageHeader *context_msg; + + if (ntohs (msg->header.size) == sizeof(*msg)) + return GNUNET_OK; /* no context message is OK */ + context_msg = GNUNET_MQ_extract_nested_mh (msg); + if (NULL == context_msg) + { + /* malformed context message is NOT ok */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle request message for a listen operation + * + * @param cls the listen handle + * @param msg the message + */ +static void +handle_request (void *cls, + const struct GNUNET_SETI_RequestMessage *msg) +{ + struct GNUNET_SETI_ListenHandle *lh = cls; + struct GNUNET_SETI_Request req; + const struct GNUNET_MessageHeader *context_msg; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_RejectMessage *rmsg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Processing incoming operation request with id %u\n", + ntohl (msg->accept_id)); + /* we got another valid request => reset the backoff */ + lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + req.accept_id = ntohl (msg->accept_id); + req.accepted = GNUNET_NO; + context_msg = GNUNET_MQ_extract_nested_mh (msg); + /* calling #GNUNET_SETI_accept() in the listen cb will set req->accepted */ + lh->listen_cb (lh->listen_cls, + &msg->peer_id, + context_msg, + &req); + if (GNUNET_YES == req.accepted) + return; /* the accept-case is handled in #GNUNET_SETI_accept() */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Rejected request %u\n", + ntohl (msg->accept_id)); + mqm = GNUNET_MQ_msg (rmsg, + GNUNET_MESSAGE_TYPE_SETI_REJECT); + rmsg->accept_reject_id = msg->accept_id; + GNUNET_MQ_send (lh->mq, + mqm); +} + + +/** + * Our connection with the set service encountered an error, + * re-initialize with exponential back-off. + * + * @param cls the `struct GNUNET_SETI_ListenHandle *` + * @param error reason for the disconnect + */ +static void +handle_client_listener_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SETI_ListenHandle *lh = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Listener broke down (%d), re-connecting\n", + (int) error); + GNUNET_MQ_destroy (lh->mq); + lh->mq = NULL; + lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, + &listen_connect, + lh); + lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff); +} + + +/** + * Connect to the set service in order to listen for requests. + * + * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect + */ +static void +listen_connect (void *cls) +{ + struct GNUNET_SETI_ListenHandle *lh = cls; + struct GNUNET_MQ_MessageHandler mq_handlers[] = { + GNUNET_MQ_hd_var_size (request, + GNUNET_MESSAGE_TYPE_SETI_REQUEST, + struct GNUNET_SETI_RequestMessage, + lh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_ListenMessage *msg; + + lh->reconnect_task = NULL; + GNUNET_assert (NULL == lh->mq); + lh->mq = GNUNET_CLIENT_connect (lh->cfg, + "set", + mq_handlers, + &handle_client_listener_error, + lh); + if (NULL == lh->mq) + return; + mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETI_LISTEN); + msg->app_id = lh->app_id; + GNUNET_MQ_send (lh->mq, + mqm); +} + + +/** + * Wait for set operation requests for the given application id + * + * @param cfg configuration to use for connecting to + * the set service, needs to be valid for the lifetime of the listen handle + * @param app_id id of the application that handles set operation requests + * @param listen_cb called for each incoming request matching the operation + * and application id + * @param listen_cls handle for @a listen_cb + * @return a handle that can be used to cancel the listen operation + */ +struct GNUNET_SETI_ListenHandle * +GNUNET_SETI_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HashCode *app_id, + GNUNET_SETI_ListenCallback listen_cb, + void *listen_cls) +{ + struct GNUNET_SETI_ListenHandle *lh; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Starting listener for app %s\n", + GNUNET_h2s (app_id)); + lh = GNUNET_new (struct GNUNET_SETI_ListenHandle); + lh->listen_cb = listen_cb; + lh->listen_cls = listen_cls; + lh->cfg = cfg; + lh->app_id = *app_id; + lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + listen_connect (lh); + if (NULL == lh->mq) + { + GNUNET_free (lh); + return NULL; + } + return lh; +} + + +/** + * Cancel the given listen operation. + * + * @param lh handle for the listen operation + */ +void +GNUNET_SETI_listen_cancel (struct GNUNET_SETI_ListenHandle *lh) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Canceling listener %s\n", + GNUNET_h2s (&lh->app_id)); + if (NULL != lh->mq) + { + GNUNET_MQ_destroy (lh->mq); + lh->mq = NULL; + } + if (NULL != lh->reconnect_task) + { + GNUNET_SCHEDULER_cancel (lh->reconnect_task); + lh->reconnect_task = NULL; + } + GNUNET_free (lh); +} + + +/** + * Accept a request we got via #GNUNET_SETI_listen. Must be called during + * #GNUNET_SETI_listen, as the 'struct GNUNET_SETI_Request' becomes invalid + * afterwards. + * Call #GNUNET_SETI_commit to provide the local set to use for the operation, + * and to begin the exchange with the remote peer. + * + * @param request request to accept + * @param options options to use when processing the request + * @param result_cb callback for the results + * @param result_cls closure for @a result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETI_OperationHandle * +GNUNET_SETI_accept (struct GNUNET_SETI_Request *request, + const struct GNUNET_SETI_Option options[], + GNUNET_SETI_ResultIterator result_cb, + void *result_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_SETI_AcceptMessage *msg; + + GNUNET_assert (GNUNET_NO == request->accepted); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client accepts set intersection operation with id %u\n", + request->accept_id); + request->accepted = GNUNET_YES; + mqm = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SETI_ACCEPT); + msg->accept_reject_id = htonl (request->accept_id); + oh = GNUNET_new (struct GNUNET_SETI_OperationHandle); + oh->result_cb = result_cb; + oh->result_cls = result_cls; + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; + for (const struct GNUNET_SETI_Option *opt = options; + GNUNET_SETI_OPTION_END != opt->type; + opt++) + { + switch (opt->type) + { + case GNUNET_SETI_OPTION_RETURN_INTERSECTION: + oh->return_intersection = GNUNET_YES; + break; + default: + LOG (GNUNET_ERROR_TYPE_ERROR, + "Option with type %d not recognized\n", + (int) opt->type); + } + } + return oh; +} + + +/** + * Commit a set to be used with a set operation. + * This function is called once we have fully constructed + * the set that we want to use for the operation. At this + * time, the P2P protocol can then begin to exchange the + * set information and call the result callback with the + * result information. + * + * @param oh handle to the set operation + * @param set the set to use for the operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETI_commit (struct GNUNET_SETI_OperationHandle *oh, + struct GNUNET_SETI_Handle *set) +{ + if (NULL != oh->set) + { + /* Some other set was already committed for this + * operation, there is a logic bug in the client of this API */ + GNUNET_break (0); + return GNUNET_OK; + } + GNUNET_assert (NULL != set); + if (GNUNET_YES == set->invalid) + return GNUNET_SYSERR; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client commits to SET\n"); + GNUNET_assert (NULL != oh->conclude_mqm); + oh->set = set; + GNUNET_CONTAINER_DLL_insert (set->ops_head, + set->ops_tail, + oh); + oh->request_id = GNUNET_MQ_assoc_add (set->mq, + oh); + *oh->request_id_addr = htonl (oh->request_id); + GNUNET_MQ_send (set->mq, + oh->conclude_mqm); + oh->conclude_mqm = NULL; + oh->request_id_addr = NULL; + return GNUNET_OK; +} + + +/** + * Hash a set element. + * + * @param element the element that should be hashed + * @param[out] ret_hash a pointer to where the hash of @a element + * should be stored + */ +void +GNUNET_SETI_element_hash (const struct GNUNET_SETI_Element *element, + struct GNUNET_HashCode *ret_hash) +{ + struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start (); + + /* It's not guaranteed that the element data is always after the element header, + so we need to hash the chunks separately. */ + GNUNET_CRYPTO_hash_context_read (ctx, + &element->size, + sizeof(uint16_t)); + GNUNET_CRYPTO_hash_context_read (ctx, + &element->element_type, + sizeof(uint16_t)); + GNUNET_CRYPTO_hash_context_read (ctx, + element->data, + element->size); + GNUNET_CRYPTO_hash_context_finish (ctx, + ret_hash); +} + + +/* end of seti_api.c */ diff --git a/src/seti/test_seti.conf b/src/seti/test_seti.conf new file mode 100644 index 000000000..21fe984f8 --- /dev/null +++ b/src/seti/test_seti.conf @@ -0,0 +1,33 @@ +@INLINE@ ../../contrib/conf/gnunet/no_forcestart.conf + +[PATHS] +GNUNET_TEST_HOME = $GNUNET_TMP/test-gnunet-set/ + +[set] +START_ON_DEMAND = YES +#PREFIX = valgrind --leak-check=full +#PREFIX = gdbserver :1234 +OPTIONS = -L INFO + +[transport] +PLUGINS = unix +OPTIONS = -LERROR + +[nat] +RETURN_LOCAL_ADDRESSES = YES +DISABLEV6 = YES +USE_LOCALADDR = YES + +[peerinfo] +NO_IO = YES + +[nat] +# Use addresses from the local network interfaces (inluding loopback, but also others) +USE_LOCALADDR = YES + +# Disable IPv6 support +DISABLEV6 = NO + +# Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8) +RETURN_LOCAL_ADDRESSES = YES + diff --git a/src/seti/test_seti_api.c b/src/seti/test_seti_api.c new file mode 100644 index 000000000..42dedb846 --- /dev/null +++ b/src/seti/test_seti_api.c @@ -0,0 +1,393 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2014 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/test_set_intersection_result_full.c + * @brief testcase for full result mode of the intersection set operation + * @author Christian Fuchs + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_testing_lib.h" +#include "gnunet_set_service.h" + + +static int ret; + +static struct GNUNET_PeerIdentity local_id; + +static struct GNUNET_HashCode app_id; + +static struct GNUNET_SET_Handle *set1; + +static struct GNUNET_SET_Handle *set2; + +static struct GNUNET_SET_ListenHandle *listen_handle; + +static const struct GNUNET_CONFIGURATION_Handle *config; + +static int iter_count; + +static struct GNUNET_SCHEDULER_Task *tt; + +static struct GNUNET_SET_OperationHandle *oh1; + +static struct GNUNET_SET_OperationHandle *oh2; + + +static void +result_cb_set1 (void *cls, + const struct GNUNET_SET_Element *element, + uint64_t current_size, + enum GNUNET_SET_Status status) +{ + static int count; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Processing result set 1 (%d)\n", + status); + switch (status) + { + case GNUNET_SET_STATUS_OK: + count++; + break; + + case GNUNET_SET_STATUS_FAILURE: + oh1 = NULL; + ret = 1; + break; + + case GNUNET_SET_STATUS_DONE: + oh1 = NULL; + GNUNET_assert (1 == count); + GNUNET_SET_destroy (set1); + set1 = NULL; + if (NULL == set2) + GNUNET_SCHEDULER_shutdown (); + break; + + default: + GNUNET_assert (0); + } +} + + +static void +result_cb_set2 (void *cls, + const struct GNUNET_SET_Element *element, + uint64_t current_size, + enum GNUNET_SET_Status status) +{ + static int count; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Processing result set 2 (%d)\n", + status); + switch (status) + { + case GNUNET_SET_STATUS_OK: + count++; + break; + + case GNUNET_SET_STATUS_FAILURE: + oh2 = NULL; + ret = 1; + break; + + case GNUNET_SET_STATUS_DONE: + oh2 = NULL; + GNUNET_assert (1 == count); + GNUNET_SET_destroy (set2); + set2 = NULL; + if (NULL == set1) + GNUNET_SCHEDULER_shutdown (); + break; + + default: + GNUNET_assert (0); + } +} + + +static void +listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SET_Request *request) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "starting intersection by accepting and committing\n"); + GNUNET_assert (NULL != context_msg); + GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); + oh2 = GNUNET_SET_accept (request, + GNUNET_SET_RESULT_FULL, + (struct GNUNET_SET_Option[]) { 0 }, + &result_cb_set2, + NULL); + GNUNET_SET_commit (oh2, + set2); +} + + +/** + * Start the set operation. + * + * @param cls closure, unused + */ +static void +start (void *cls) +{ + struct GNUNET_MessageHeader context_msg; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "starting listener\n"); + context_msg.size = htons (sizeof context_msg); + context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); + listen_handle = GNUNET_SET_listen (config, + GNUNET_SET_OPERATION_INTERSECTION, + &app_id, + &listen_cb, + NULL); + oh1 = GNUNET_SET_prepare (&local_id, + &app_id, + &context_msg, + GNUNET_SET_RESULT_FULL, + (struct GNUNET_SET_Option[]) { 0 }, + &result_cb_set1, + NULL); + GNUNET_SET_commit (oh1, + set1); +} + + +/** + * Initialize the second set, continue + * + * @param cls closure, unused + */ +static void +init_set2 (void *cls) +{ + struct GNUNET_SET_Element element; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "initializing set 2\n"); + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set2, + &element, + NULL, + NULL); + element.data = "quux"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set2, + &element, + NULL, + NULL); + element.data = "baz"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set2, + &element, + &start, + NULL); +} + + +/** + * Initialize the first set, continue. + */ +static void +init_set1 (void) +{ + struct GNUNET_SET_Element element; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "initializing set 1\n"); + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set1, + &element, + NULL, + NULL); + element.data = "bar"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set1, + &element, + &init_set2, + NULL); +} + + +static int +iter_cb (void *cls, + const struct GNUNET_SET_Element *element) +{ + if (NULL == element) + { + GNUNET_assert (iter_count == 3); + GNUNET_SET_destroy (cls); + return GNUNET_YES; + } + iter_count++; + return GNUNET_YES; +} + + +static void +test_iter () +{ + struct GNUNET_SET_Element element; + struct GNUNET_SET_Handle *iter_set; + + iter_set = GNUNET_SET_create (config, + GNUNET_SET_OPERATION_INTERSECTION); + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SET_add_element (iter_set, + &element, + NULL, + NULL); + element.data = "bar"; + element.size = strlen (element.data); + GNUNET_SET_add_element (iter_set, + &element, + NULL, + NULL); + element.data = "quux"; + element.size = strlen (element.data); + GNUNET_SET_add_element (iter_set, + &element, + NULL, + NULL); + GNUNET_SET_iterate (iter_set, + &iter_cb, + iter_set); +} + + +/** + * Function run on shutdown. + * + * @param cls closure + */ +static void +do_shutdown (void *cls) +{ + if (NULL != tt) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + } + if (NULL != oh1) + { + GNUNET_SET_operation_cancel (oh1); + oh1 = NULL; + } + if (NULL != oh2) + { + GNUNET_SET_operation_cancel (oh2); + oh2 = NULL; + } + if (NULL != set1) + { + GNUNET_SET_destroy (set1); + set1 = NULL; + } + if (NULL != set2) + { + GNUNET_SET_destroy (set2); + set2 = NULL; + } + if (NULL != listen_handle) + { + GNUNET_SET_listen_cancel (listen_handle); + listen_handle = NULL; + } +} + + +/** + * Function run on timeout. + * + * @param cls closure + */ +static void +timeout_fail (void *cls) +{ + tt = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Testcase failed with timeout\n"); + GNUNET_SCHEDULER_shutdown (); + ret = 1; +} + + +/** + * Signature of the 'main' function for a (single-peer) testcase that + * is run using 'GNUNET_TESTING_peer_run'. + * + * @param cls closure + * @param cfg configuration of the peer that was started + * @param peer identity of the peer that was created + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +{ + config = cfg; + GNUNET_TESTING_peer_get_identity (peer, + &local_id); + if (0) + test_iter (); + + tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( + GNUNET_TIME_UNIT_SECONDS, 5), + &timeout_fail, + NULL); + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, + NULL); + + set1 = GNUNET_SET_create (cfg, + GNUNET_SET_OPERATION_INTERSECTION); + set2 = GNUNET_SET_create (cfg, + GNUNET_SET_OPERATION_INTERSECTION); + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, + &app_id); + + /* test the real set reconciliation */ + init_set1 (); +} + + +int +main (int argc, + char **argv) +{ + if (0 != GNUNET_TESTING_peer_run ("test_set_intersection_result_full", + "test_set.conf", + &run, NULL)) + return 1; + return ret; +} -- cgit v1.2.3 From 3fb3bf908f4977aef6bde6a450954e2704b14bcb Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 18 Aug 2020 19:31:39 +0200 Subject: -splitting of set intersection functionality from set service (not yet finished, FTBFS) --- src/seti/gnunet-service-seti.c | 997 ++++++++++------------------------------- 1 file changed, 244 insertions(+), 753 deletions(-) (limited to 'src/seti') diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c index 3b8da01cd..037181bde 100644 --- a/src/seti/gnunet-service-seti.c +++ b/src/seti/gnunet-service-seti.c @@ -107,28 +107,6 @@ struct ElementEntry; struct Operation; -/** - * MutationEvent gives information about changes - * to an element (removal / addition) in a set content. - */ -struct MutationEvent -{ - /** - * First generation affected by this mutation event. - * - * If @a generation is 0, this mutation event is a list - * sentinel element. - */ - unsigned int generation; - - /** - * If @a added is #GNUNET_YES, then this is a - * `remove` event, otherwise it is an `add` event. - */ - int added; -}; - - /** * Information about an element element in the set. All elements are * stored in a hash-table from their hash-code to their `struct @@ -150,20 +128,9 @@ struct ElementEntry struct GNUNET_HashCode element_hash; /** - * If @a mutations is not NULL, it contains - * a list of mutations, ordered by increasing generation. - * The list is terminated by a sentinel event with `generation` - * set to 0. - * - * If @a mutations is NULL, then this element exists in all generations - * of the respective set content this element belongs to. - */ - struct MutationEvent *mutations; - - /** - * Number of elements in the array @a mutations. + * Generation in which the element was added. */ - unsigned int mutations_size; + unsigned int generation_added; /** * #GNUNET_YES if the element is a remote element, and does not belong @@ -285,7 +252,13 @@ struct Operation /** * When are elements sent to the client, and which elements are sent? */ - enum GNUNET_SET_ResultMode result_mode; + int return_intersection; + + /** + * Lower bound for the set size, used only when + * byzantine mode is enabled. + */ + int byzantine_lower_bound; /** * Always use delta operation instead of sending full sets, @@ -305,12 +278,6 @@ struct Operation */ int byzantine; - /** - * Lower bound for the set size, used only when - * byzantine mode is enabled. - */ - int byzantine_lower_bound; - /** * Unique request id for the request from a remote peer, sent to the * client, which will accept or reject the request. Set to '0' iff @@ -319,8 +286,7 @@ struct Operation uint32_t suggest_id; /** - * Generation in which the operation handle - * was created. + * Generation in which the operation handle was created. */ unsigned int generation_created; }; @@ -337,20 +303,6 @@ struct SetContent */ struct GNUNET_CONTAINER_MultiHashMap *elements; - /** - * Mutations requested by the client that we're - * unable to execute right now because we're iterating - * over the underlying hash map of elements. - */ - struct PendingMutation *pending_mutations_head; - - /** - * Mutations requested by the client that we're - * unable to execute right now because we're iterating - * over the underlying hash map of elements. - */ - struct PendingMutation *pending_mutations_tail; - /** * Number of references to the content. */ @@ -368,49 +320,6 @@ struct SetContent }; -struct GenerationRange -{ - /** - * First generation that is excluded. - */ - unsigned int start; - - /** - * Generation after the last excluded generation. - */ - unsigned int end; -}; - - -/** - * Information about a mutation to apply to a set. - */ -struct PendingMutation -{ - /** - * Mutations are kept in a DLL. - */ - struct PendingMutation *prev; - - /** - * Mutations are kept in a DLL. - */ - struct PendingMutation *next; - - /** - * Set this mutation is about. - */ - struct Set *set; - - /** - * Message that describes the desired mutation. - * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or - * #GNUNET_MESSAGE_TYPE_SET_REMOVE. - */ - struct GNUNET_SET_ElementMessage *msg; -}; - - /** * A set that supports a specific operation with other peers. */ @@ -443,12 +352,6 @@ struct Set */ struct SetState *state; - /** - * Current state of iterating elements for the client. - * NULL if we are not currently iterating. - */ - struct GNUNET_CONTAINER_MultiHashMapIterator *iter; - /** * Evaluate operations are held in a linked list. */ @@ -459,37 +362,12 @@ struct Set */ struct Operation *ops_tail; - /** - * List of generations we have to exclude, due to lazy copies. - */ - struct GenerationRange *excluded_generations; - /** * Current generation, that is, number of previously executed * operations and lazy copies on the underlying set content. */ unsigned int current_generation; - /** - * Number of elements in array @a excluded_generations. - */ - unsigned int excluded_generations_size; - - /** - * Type of operation supported for this set - */ - enum GNUNET_SET_OperationType operation; - - /** - * Generation we're currently iteration over. - */ - unsigned int iter_generation; - - /** - * Each @e iter is assigned a unique number, so that the client - * can distinguish iterations. - */ - uint16_t iteration_id; }; @@ -724,7 +602,7 @@ send_client_removed_element (struct Operation *op, struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - if (GNUNET_SET_RESULT_REMOVED != op->result_mode) + if (GNUNET_NO != op->return_intersection) return; /* Wrong mode for transmitting removed elements */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending removed element (size %u) to client\n", @@ -736,13 +614,13 @@ send_client_removed_element (struct Operation *op, GNUNET_assert (0 != op->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, - GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_MESSAGE_TYPE_SETI_RESULT); if (NULL == ev) { GNUNET_break (0); return; } - rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->result_status = htons (GNUNET_SET_STATUS_DEL_LOCAL); rm->request_id = htonl (op->client_request_id); rm->element_type = element->element_type; GNUNET_memcpy (&rm[1], @@ -770,7 +648,6 @@ filtered_map_initialization (void *cls, struct ElementEntry *ee = value; struct GNUNET_HashCode mutated_hash; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "FIMA called for %s:%u\n", GNUNET_h2s (&ee->element_hash), @@ -934,8 +811,8 @@ fail_intersection_operation (struct Operation *op) op->state->my_elements = NULL; } ev = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_SET_RESULT); - msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); + GNUNET_MESSAGE_TYPE_SETI_RESULT); + msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE); msg->request_id = htonl (op->client_request_id); msg->element_type = htons (0); GNUNET_MQ_send (op->set->cs->mq, @@ -999,7 +876,7 @@ send_bloomfilter (struct Operation *op) chunk_size = bf_size; ev = GNUNET_MQ_msg_extra (msg, chunk_size, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF); GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_bloomfilter_get_raw_data ( op->state->local_bf, @@ -1028,7 +905,7 @@ send_bloomfilter (struct Operation *op) chunk_size = bf_size - offset; ev = GNUNET_MQ_msg_extra (msg, chunk_size, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF); GNUNET_memcpy (&msg[1], &bf_data[offset], chunk_size); @@ -1067,7 +944,7 @@ send_client_done_and_destroy (void *cls) 1, GNUNET_NO); ev = GNUNET_MQ_msg (rm, - GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_MESSAGE_TYPE_SETI_RESULT); rm->request_id = htonl (op->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); @@ -1114,7 +991,7 @@ send_p2p_done (struct Operation *op) GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); ev = GNUNET_MQ_msg (idm, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); + GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_DONE); idm->final_element_count = htonl (op->state->my_element_count); idm->element_xor_hash = op->state->my_xor; GNUNET_MQ_notify_sent (ev, @@ -1176,7 +1053,7 @@ send_remaining_elements (void *cls) GNUNET_assert (0 != op->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, - GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_MESSAGE_TYPE_SETI_RESULT); GNUNET_assert (NULL != ev); rm->result_status = htons (GNUNET_SET_STATUS_OK); rm->request_id = htonl (op->client_request_id); @@ -1243,7 +1120,7 @@ send_element_count (struct Operation *op) "Sending our element count (%u)\n", op->state->my_element_count); ev = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); + GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_ELEMENT_INFO); msg->sender_element_count = htonl (op->state->my_element_count); GNUNET_MQ_send (op->mq, ev); } @@ -1273,7 +1150,7 @@ begin_bf_exchange (struct Operation *op) * @param cls the intersection operation * @param mh the header of the message */ -void +static void handle_intersection_p2p_element_info (void *cls, const struct IntersectionElementInfoMessage *msg) @@ -1327,7 +1204,6 @@ process_bf (struct Operation *op) GNUNET_break_op (0); fail_intersection_operation (op); return; - case PHASE_COUNT_SENT: /* This is the first BF being sent, build our initial map with filtering in place */ @@ -1336,24 +1212,20 @@ process_bf (struct Operation *op) &filtered_map_initialization, op); break; - case PHASE_BF_EXCHANGE: /* Update our set by reduction */ GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, &iterator_bf_reduce, op); break; - case PHASE_MUST_SEND_DONE: GNUNET_break_op (0); fail_intersection_operation (op); return; - case PHASE_DONE_RECEIVED: GNUNET_break_op (0); fail_intersection_operation (op); return; - case PHASE_FINISHED: GNUNET_break_op (0); fail_intersection_operation (op); @@ -1420,7 +1292,7 @@ check_intersection_p2p_bf (void *cls, * @param cls the intersection operation * @param msg the header of the message */ -static +static void handle_intersection_p2p_bf (void *cls, const struct BFMessage *msg) { @@ -1612,214 +1484,6 @@ handle_intersection_p2p_done (void *cls, } -/** - * Initiate a set intersection operation with a remote peer. - * - * @param op operation that is created, should be initialized to - * begin the evaluation - * @param opaque_context message to be transmitted to the listener - * to convince it to accept, may be NULL - * @return operation-specific state to keep in @a op - */ -static struct OperationState * -intersection_evaluate (struct Operation *op, - const struct GNUNET_MessageHeader *opaque_context) -{ - struct OperationState *state; - struct GNUNET_MQ_Envelope *ev; - struct OperationRequestMessage *msg; - - ev = GNUNET_MQ_msg_nested_mh (msg, - GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - opaque_context); - if (NULL == ev) - { - /* the context message is too large!? */ - GNUNET_break (0); - return NULL; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Initiating intersection operation evaluation\n"); - state = GNUNET_new (struct OperationState); - /* we started the operation, thus we have to send the operation request */ - state->phase = PHASE_INITIAL; - state->my_element_count = op->set->state->current_set_element_count; - state->my_elements - = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, - GNUNET_YES); - - msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); - msg->element_count = htonl (state->my_element_count); - GNUNET_MQ_send (op->mq, - ev); - state->phase = PHASE_COUNT_SENT; - if (NULL != opaque_context) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sent op request with context message\n"); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sent op request without context message\n"); - return state; -} - - -/** - * Accept an intersection operation request from a remote peer. Only - * initializes the private operation state. - * - * @param op operation that will be accepted as an intersection operation - */ -static struct OperationState * -intersection_accept (struct Operation *op) -{ - struct OperationState *state; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Accepting set intersection operation\n"); - state = GNUNET_new (struct OperationState); - state->phase = PHASE_INITIAL; - state->my_element_count - = op->set->state->current_set_element_count; - state->my_elements - = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count, - op->remote_element_count), - GNUNET_YES); - op->state = state; - if (op->remote_element_count < state->my_element_count) - { - /* If the other peer (Alice) has fewer elements than us (Bob), - we just send the count as Alice should send the first BF */ - send_element_count (op); - state->phase = PHASE_COUNT_SENT; - return state; - } - /* We have fewer elements, so we start with the BF */ - begin_bf_exchange (op); - return state; -} - - -/** - * Destroy the intersection operation. Only things specific to the - * intersection operation are destroyed. - * - * @param op intersection operation to destroy - */ -static void -intersection_op_cancel (struct Operation *op) -{ - /* check if the op was canceled twice */ - GNUNET_assert (NULL != op->state); - if (NULL != op->state->remote_bf) - { - GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); - op->state->remote_bf = NULL; - } - if (NULL != op->state->local_bf) - { - GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); - op->state->local_bf = NULL; - } - if (NULL != op->state->my_elements) - { - GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); - op->state->my_elements = NULL; - } - if (NULL != op->state->full_result_iter) - { - GNUNET_CONTAINER_multihashmap_iterator_destroy ( - op->state->full_result_iter); - op->state->full_result_iter = NULL; - } - GNUNET_free (op->state); - op->state = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying intersection op state done\n"); -} - - -/** - * Create a new set supporting the intersection operation. - * - * @return the newly created set - */ -static struct SetState * -intersection_set_create () -{ - struct SetState *set_state; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Intersection set created\n"); - set_state = GNUNET_new (struct SetState); - set_state->current_set_element_count = 0; - - return set_state; -} - - -/** - * Add the element from the given element message to the set. - * - * @param set_state state of the set want to add to - * @param ee the element to add to the set - */ -static void -intersection_add (struct SetState *set_state, - struct ElementEntry *ee) -{ - set_state->current_set_element_count++; -} - - -/** - * Destroy a set that supports the intersection operation - * - * @param set_state the set to destroy - */ -static void -intersection_set_destroy (struct SetState *set_state) -{ - GNUNET_free (set_state); -} - - -/** - * Remove the element given in the element message from the set. - * - * @param set_state state of the set to remove from - * @param element set element to remove - */ -static void -intersection_remove (struct SetState *set_state, - struct ElementEntry *element) -{ - GNUNET_assert (0 < set_state->current_set_element_count); - set_state->current_set_element_count--; -} - - -/** - * Callback for channel death for the intersection operation. - * - * @param op operation that lost the channel - */ -static void -intersection_channel_death (struct Operation *op) -{ - if (GNUNET_YES == op->state->channel_death_expected) - { - /* oh goodie, we are done! */ - send_client_done_and_destroy (op); - } - else - { - /* sorry, channel went down early, too bad. */ - _GSS_operation_destroy (op, - GNUNET_YES); - } -} - - /** * Get the incoming socket associated with the given id. * @@ -1869,171 +1533,6 @@ incoming_destroy (struct Operation *op) } -/** - * Context for the #garbage_collect_cb(). - */ -struct GarbageContext -{ - /** - * Map for which we are garbage collecting removed elements. - */ - struct GNUNET_CONTAINER_MultiHashMap *map; - - /** - * Lowest generation for which an operation is still pending. - */ - unsigned int min_op_generation; - - /** - * Largest generation for which an operation is still pending. - */ - unsigned int max_op_generation; -}; - - -/** - * Function invoked to check if an element can be removed from - * the set's history because it is no longer needed. - * - * @param cls the `struct GarbageContext *` - * @param key key of the element in the map - * @param value the `struct ElementEntry *` - * @return #GNUNET_OK (continue to iterate) - */ -static int -garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value) -{ - // struct GarbageContext *gc = cls; - // struct ElementEntry *ee = value; - - // if (GNUNET_YES != ee->removed) - // return GNUNET_OK; - // if ( (gc->max_op_generation < ee->generation_added) || - // (ee->generation_removed > gc->min_op_generation) ) - // { - // GNUNET_assert (GNUNET_YES == - // GNUNET_CONTAINER_multihashmap_remove (gc->map, - // key, - // ee)); - // GNUNET_free (ee); - // } - return GNUNET_OK; -} - - -/** - * Collect and destroy elements that are not needed anymore, because - * their lifetime (as determined by their generation) does not overlap - * with any active set operation. - * - * @param set set to garbage collect - */ -static void -collect_generation_garbage (struct Set *set) -{ - struct GarbageContext gc; - - gc.min_op_generation = UINT_MAX; - gc.max_op_generation = 0; - for (struct Operation *op = set->ops_head; NULL != op; op = op->next) - { - gc.min_op_generation = - GNUNET_MIN (gc.min_op_generation, op->generation_created); - gc.max_op_generation = - GNUNET_MAX (gc.max_op_generation, op->generation_created); - } - gc.map = set->content->elements; - GNUNET_CONTAINER_multihashmap_iterate (set->content->elements, - &garbage_collect_cb, - &gc); -} - - -/** - * Is @a generation in the range of exclusions? - * - * @param generation generation to query - * @param excluded array of generations where the element is excluded - * @param excluded_size length of the @a excluded array - * @return #GNUNET_YES if @a generation is in any of the ranges - */ -static int -is_excluded_generation (unsigned int generation, - struct GenerationRange *excluded, - unsigned int excluded_size) -{ - for (unsigned int i = 0; i < excluded_size; i++) - if ((generation >= excluded[i].start) && (generation < excluded[i].end)) - return GNUNET_YES; - return GNUNET_NO; -} - - -/** - * Is element @a ee part of the set during @a query_generation? - * - * @param ee element to test - * @param query_generation generation to query - * @param excluded array of generations where the element is excluded - * @param excluded_size length of the @a excluded array - * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not - */ -static int -is_element_of_generation (struct ElementEntry *ee, - unsigned int query_generation, - struct GenerationRange *excluded, - unsigned int excluded_size) -{ - struct MutationEvent *mut; - int is_present; - - GNUNET_assert (NULL != ee->mutations); - if (GNUNET_YES == - is_excluded_generation (query_generation, excluded, excluded_size)) - { - GNUNET_break (0); - return GNUNET_NO; - } - - is_present = GNUNET_NO; - - /* Could be made faster with binary search, but lists - are small, so why bother. */ - for (unsigned int i = 0; i < ee->mutations_size; i++) - { - mut = &ee->mutations[i]; - - if (mut->generation > query_generation) - { - /* The mutation doesn't apply to our generation - anymore. We can'b break here, since mutations aren't - sorted by generation. */ - continue; - } - - if (GNUNET_YES == - is_excluded_generation (mut->generation, excluded, excluded_size)) - { - /* The generation is excluded (because it belongs to another - fork via a lazy copy) and thus mutations aren't considered - for membership testing. */ - continue; - } - - /* This would be an inconsistency in how we manage mutations. */ - if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added)) - GNUNET_assert (0); - /* Likewise. */ - if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added)) - GNUNET_assert (0); - - is_present = mut->added; - } - - return is_present; -} - - /** * Is element @a ee part of the set used by @a op? * @@ -2041,13 +1540,11 @@ is_element_of_generation (struct ElementEntry *ee, * @param op operation the defines the set and its generation * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not */ -int -_GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) +static int +_GSS_is_element_of_operation (struct ElementEntry *ee, + struct Operation *op) { - return is_element_of_generation (ee, - op->generation_created, - op->set->excluded_generations, - op->set->excluded_generations_size); + return op->generation_created >= ee->generation_added; } @@ -2062,10 +1559,9 @@ _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) * as there may be multiple operations per set. * * @param op operation to destroy - * @param gc #GNUNET_YES to perform garbage collection on the set */ -void -_GSS_operation_destroy (struct Operation *op, int gc) +static void +_GSS_operation_destroy (struct Operation *op) { struct Set *set = op->set; struct GNUNET_CADET_Channel *channel; @@ -2074,12 +1570,39 @@ _GSS_operation_destroy (struct Operation *op, int gc) GNUNET_assert (NULL == op->listener); if (NULL != op->state) { - intersection_cancel (op); // FIXME: inline + /* check if the op was canceled twice */ + GNUNET_assert (NULL != op->state); + if (NULL != op->state->remote_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); + op->state->remote_bf = NULL; + } + if (NULL != op->state->local_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; + } + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); + op->state->my_elements = NULL; + } + if (NULL != op->state->full_result_iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy ( + op->state->full_result_iter); + op->state->full_result_iter = NULL; + } + GNUNET_free (op->state); op->state = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying intersection op state done\n"); } if (NULL != set) { - GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op); + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + op); op->set = NULL; } if (NULL != op->context_msg) @@ -2094,8 +1617,6 @@ _GSS_operation_destroy (struct Operation *op, int gc) op->channel = NULL; GNUNET_CADET_channel_destroy (channel); } - if ((NULL != set) && (GNUNET_YES == gc)) - collect_generation_garbage (set); /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, * there was a channel end handler that will free 'op' on the call stack. */ } @@ -2166,8 +1687,6 @@ client_disconnect_cb (void *cls, if (NULL != (set = cs->set)) { struct SetContent *content = set->content; - struct PendingMutation *pm; - struct PendingMutation *pm_current; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); /* Destroy pending set operations */ @@ -2176,8 +1695,7 @@ client_disconnect_cb (void *cls, /* Destroy operation-specific state */ GNUNET_assert (NULL != set->state); - intersection_set_destroy (set->state); // FIXME: inline - set->state = NULL; + GNUNET_free (set->state); /* Clean up ongoing iterations */ if (NULL != set->iter) @@ -2187,21 +1705,6 @@ client_disconnect_cb (void *cls, set->iteration_id++; } - /* discard any pending mutations that reference this set */ - pm = content->pending_mutations_head; - while (NULL != pm) - { - pm_current = pm; - pm = pm->next; - if (pm_current->set == set) - { - GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, - content->pending_mutations_tail, - pm_current); - GNUNET_free (pm_current); - } - } - /* free set content (or at least decrement RC) */ set->content = NULL; GNUNET_assert (0 != content->refcount); @@ -2260,7 +1763,8 @@ client_disconnect_cb (void *cls, * #GNUNET_SYSERR to destroy the channel */ static int -check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) +check_incoming_msg (void *cls, + const struct OperationRequestMessage *msg) { struct Operation *op = cls; struct Listener *listener = op->listener; @@ -2313,7 +1817,8 @@ check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) * #GNUNET_SYSERR to destroy the channel */ static void -handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) +handle_incoming_msg (void *cls, + const struct OperationRequestMessage *msg) { struct Operation *op = cls; struct Listener *listener = op->listener; @@ -2357,121 +1862,6 @@ handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) } -/** - * Add an element to @a set as specified by @a msg - * - * @param set set to manipulate - * @param msg message specifying the change - */ -static void -execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) -{ - struct GNUNET_SET_Element el; - struct ElementEntry *ee; - struct GNUNET_HashCode hash; - - GNUNET_assert (GNUNET_MESSAGE_TYPE_SETI_ADD == ntohs (msg->header.type)); - el.size = ntohs (msg->header.size) - sizeof(*msg); - el.data = &msg[1]; - el.element_type = ntohs (msg->element_type); - GNUNET_SET_element_hash (&el, &hash); - ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); - if (NULL == ee) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client inserts element %s of size %u\n", - GNUNET_h2s (&hash), - el.size); - ee = GNUNET_malloc (el.size + sizeof(*ee)); - ee->element.size = el.size; - GNUNET_memcpy (&ee[1], el.data, el.size); - ee->element.data = &ee[1]; - ee->element.element_type = el.element_type; - ee->remote = GNUNET_NO; - ee->mutations = NULL; - ee->mutations_size = 0; - ee->element_hash = hash; - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put ( - set->content->elements, - &ee->element_hash, - ee, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - } - else if (GNUNET_YES == - is_element_of_generation (ee, - set->current_generation, - set->excluded_generations, - set->excluded_generations_size)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client inserted element %s of size %u twice (ignored)\n", - GNUNET_h2s (&hash), - el.size); - - /* same element inserted twice */ - return; - } - - { - struct MutationEvent mut = { .generation = set->current_generation, - .added = GNUNET_YES }; - GNUNET_array_append (ee->mutations, ee->mutations_size, mut); - } - // FIXME: inline - intersection_add (set->state, - ee); -} - - -/** - * Perform a mutation on a set as specified by the @a msg - * - * @param set the set to mutate - * @param msg specification of what to change - */ -static void -execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) -{ - switch (ntohs (msg->header.type)) - { - case GNUNET_MESSAGE_TYPE_SETI_ADD: // FIXME: inline! - execute_add (set, msg); - break; - default: - GNUNET_break (0); - } -} - - -/** - * Execute mutations that were delayed on a set because of - * pending operations. - * - * @param set the set to execute mutations on - */ -static void -execute_delayed_mutations (struct Set *set) -{ - struct PendingMutation *pm; - - if (0 != set->content->iterator_count) - return; /* still cannot do this */ - while (NULL != (pm = set->content->pending_mutations_head)) - { - GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, - set->content->pending_mutations_tail, - pm); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Executing pending mutation on %p.\n", - pm->set); - execute_mutation (pm->set, pm->msg); - GNUNET_free (pm->msg); - GNUNET_free (pm); - } -} - - /** * Send the next element of a set to the set's client. The next element is given by * the set's current hashmap iterator. The set's iterator will be set to NULL if there @@ -2542,7 +1932,8 @@ send_client_element (struct Set *set) * @param m message sent by the client */ static void -handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) +handle_client_iterate (void *cls, + const struct GNUNET_MessageHeader *m) { struct ClientState *cs = cls; struct Set *set; @@ -2584,7 +1975,8 @@ handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) * @param m message sent by the client */ static void -handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) +handle_client_create_set (void *cls, + const struct GNUNET_SET_CreateMessage *msg) { struct ClientState *cs = cls; struct Set *set; @@ -2600,24 +1992,18 @@ handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) return; } set = GNUNET_new (struct Set); - switch (ntohl (msg->operation)) { - case GNUNET_SET_OPERATION_INTERSECTION: - set->vt = _GSS_intersection_vt (); - break; + struct SetState *set_state; - case GNUNET_SET_OPERATION_UNION: - set->vt = _GSS_union_vt (); - break; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection set created\n"); + set_state = GNUNET_new (struct SetState); + set_state->current_set_element_count = 0; - default: - GNUNET_free (set); - GNUNET_break (0); - GNUNET_SERVICE_client_drop (cs->client); - return; + set->state = set_state; } - set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); - set->state = intersection_set_create (); // FIXME: inline + + if (NULL == set->state) { /* initialization failed (i.e. out of memory) */ @@ -2627,7 +2013,8 @@ handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) } set->content = GNUNET_new (struct SetContent); set->content->refcount = 1; - set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, + GNUNET_YES); set->cs = cs; cs->set = set; GNUNET_SERVICE_client_continue (cs->client); @@ -2679,17 +2066,21 @@ channel_new_cb (void *cls, struct Listener *listener = cls; struct Operation *op; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New incoming channel\n"); op = GNUNET_new (struct Operation); op->listener = listener; op->peer = *source; op->channel = channel; op->mq = GNUNET_CADET_get_mq (op->channel); - op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, &incoming_timeout_cb, op); - GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op); + GNUNET_CONTAINER_DLL_insert (listener->op_head, + listener->op_tail, + op); return op; } @@ -2711,7 +2102,8 @@ channel_new_cb (void *cls, * @param channel connection to the other end (henceforth invalid) */ static void -channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) +channel_end_cb (void *channel_ctx, + const struct GNUNET_CADET_Channel *channel) { struct Operation *op = channel_ctx; @@ -2725,12 +2117,13 @@ channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) * and be replaced by inlining more specific * logic in the various places where it is called. */ -void +static void _GSS_operation_destroy2 (struct Operation *op) { struct GNUNET_CADET_Channel *channel; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "channel_end_cb called\n"); if (NULL != (channel = op->channel)) { /* This will free op; called conditionally as this helper function @@ -2744,9 +2137,22 @@ _GSS_operation_destroy2 (struct Operation *op) return; } if (NULL != op->set) - intersection_channel_death (op); // FIXME: inline + { + if (GNUNET_YES == op->state->channel_death_expected) + { + /* oh goodie, we are done! */ + send_client_done_and_destroy (op); + } + else + { + /* sorry, channel went down early, too bad. */ + _GSS_operation_destroy (op, + GNUNET_YES); + } + } else - _GSS_operation_destroy (op, GNUNET_YES); + _GSS_operation_destroy (op, + GNUNET_YES); GNUNET_free (op); } @@ -2781,7 +2187,8 @@ channel_window_cb (void *cls, * @param msg message sent by the client */ static void -handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) +handle_client_listen (void *cls, + const struct GNUNET_SET_ListenMessage *msg) { struct ClientState *cs = cls; struct GNUNET_MQ_MessageHandler cadet_handlers[] = @@ -2816,7 +2223,9 @@ handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) cs->listener = listener; listener->app_id = msg->app_id; listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); - GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener); + GNUNET_CONTAINER_DLL_insert (listener_head, + listener_tail, + listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New listener created (op %u, port %s)\n", listener->operation, @@ -2840,7 +2249,8 @@ handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) * @param msg message sent by the client */ static void -handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) +handle_client_reject (void *cls, + const struct GNUNET_SET_RejectMessage *msg) { struct ClientState *cs = cls; struct Operation *op; @@ -2872,7 +2282,8 @@ handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) * @param msg message sent by the client */ static int -check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) +check_client_mutation (void *cls, + const struct GNUNET_SET_ElementMessage *msg) { /* NOTE: Technically, we should probably check with the block library whether the element we are given is well-formed */ @@ -2881,16 +2292,20 @@ check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) /** - * Called when a client wants to add or remove an element to a set it inhabits. + * Called when a client wants to add an element to a set it inhabits. * * @param cls client that sent the message * @param msg message sent by the client */ static void -handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) +handle_client_set_add (void *cls, + const struct GNUNET_SET_ElementMessage *msg) { struct ClientState *cs = cls; struct Set *set; + struct GNUNET_SET_Element el; + struct ElementEntry *ee; + struct GNUNET_HashCode hash; if (NULL == (set = cs->set)) { @@ -2900,23 +2315,45 @@ handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) return; } GNUNET_SERVICE_client_continue (cs->client); - - if (0 != set->content->iterator_count) + el.size = ntohs (msg->header.size) - sizeof(*msg); + el.data = &msg[1]; + el.element_type = ntohs (msg->element_type); + GNUNET_ISET_element_hash (&el, + &hash); + ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, + &hash); + if (NULL == ee) { - struct PendingMutation *pm; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n"); - pm = GNUNET_new (struct PendingMutation); - pm->msg = - (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); - pm->set = set; - GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, - set->content->pending_mutations_tail, - pm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserts element %s of size %u\n", + GNUNET_h2s (&hash), + el.size); + ee = GNUNET_malloc (el.size + sizeof(*ee)); + ee->element.size = el.size; + GNUNET_memcpy (&ee[1], el.data, el.size); + ee->element.data = &ee[1]; + ee->element.element_type = el.element_type; + ee->remote = GNUNET_NO; + ee->mutations = NULL; + ee->mutations_size = 0; + ee->element_hash = hash; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put ( + set->content->elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserted element %s of size %u twice (ignored)\n", + GNUNET_h2s (&hash), + el.size); + /* same element inserted twice */ return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); - execute_mutation (set, msg); + set->state->current_set_element_count++; } @@ -2929,24 +2366,13 @@ handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) static void advance_generation (struct Set *set) { - struct GenerationRange r; - if (set->current_generation == set->content->latest_generation) { set->content->latest_generation++; set->current_generation++; return; } - GNUNET_assert (set->current_generation < set->content->latest_generation); - - r.start = set->current_generation + 1; - r.end = set->content->latest_generation + 1; - set->content->latest_generation = r.end; - set->current_generation = r.end; - GNUNET_array_append (set->excluded_generations, - set->excluded_generations_size, - r); } @@ -2960,7 +2386,8 @@ advance_generation (struct Set *set) * @return #GNUNET_OK if the message is well-formed */ static int -check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) +check_client_evaluate (void *cls, + const struct GNUNET_SET_EvaluateMessage *msg) { /* FIXME: suboptimal, even if the context below could be NULL, there are malformed messages this does not check for... */ @@ -2977,7 +2404,8 @@ check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) * @param msg message sent by the client */ static void -handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) +handle_client_evaluate (void *cls, + const struct GNUNET_SET_EvaluateMessage *msg) { struct ClientState *cs = cls; struct Operation *op = GNUNET_new (struct Operation); @@ -3010,7 +2438,8 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) GNUNET_SERVICE_client_drop (cs->client); return; } - op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); op->peer = msg->target_peer; op->result_mode = ntohl (msg->result_mode); op->client_request_id = ntohl (msg->request_id); @@ -3025,7 +2454,9 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) op->set = set; op->generation_created = set->current_generation; advance_generation (set); - GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + GNUNET_CONTAINER_DLL_insert (set->ops_head, + set->ops_tail, + op); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating new CADET channel to port %s for set operation type %u\n", GNUNET_h2s (&msg->app_id), @@ -3038,12 +2469,42 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) &channel_end_cb, cadet_handlers); op->mq = GNUNET_CADET_get_mq (op->channel); - op->state = intersection_evaluate (op, context); // FIXME: inline! - if (NULL == op->state) { - GNUNET_break (0); - GNUNET_SERVICE_client_drop (cs->client); - return; + struct OperationState *state; + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST, + context); + if (NULL == ev) + { + /* the context message is too large!? */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initiating intersection operation evaluation\n"); + state = GNUNET_new (struct OperationState); + /* we started the operation, thus we have to send the operation request */ + state->phase = PHASE_INITIAL; + state->my_element_count = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, + GNUNET_YES); + + msg->element_count = htonl (state->my_element_count); + GNUNET_MQ_send (op->mq, + ev); + state->phase = PHASE_COUNT_SENT; + if (NULL != opaque_context) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request with context message\n"); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request without context message\n"); + op->state = state; } GNUNET_SERVICE_client_continue (cs->client); } @@ -3056,7 +2517,8 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) * @param msg the message */ static void -handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) +handle_client_cancel (void *cls, + const struct GNUNET_SET_CancelMessage *msg) { struct ClientState *cs = cls; struct Set *set; @@ -3085,7 +2547,8 @@ handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) * the other peer disconnecting. The client may not know about this * yet and try to cancel the (just barely non-existent) operation. * So this is not a hard error. - */GNUNET_log (GNUNET_ERROR_TYPE_INFO, + */// + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Client canceled non-existent op %u\n", (uint32_t) ntohl (msg->request_id)); } @@ -3109,7 +2572,8 @@ handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) * @param msg the message */ static void -handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) +handle_client_accept (void *cls, + const struct GNUNET_SET_AcceptMessage *msg) { struct ClientState *cs = cls; struct Set *set; @@ -3136,7 +2600,8 @@ handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) cs, ntohl (msg->accept_reject_id), cs->listener); - ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SETI_RESULT); + ev = GNUNET_MQ_msg (result_message, + GNUNET_MESSAGE_TYPE_SETI_RESULT); result_message->request_id = msg->request_id; result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); GNUNET_MQ_send (set->cs->mq, ev); @@ -3163,12 +2628,34 @@ handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) op->generation_created = set->current_generation; advance_generation (set); GNUNET_assert (NULL == op->state); - op->state = intersection_accept (op); // FIXME: inline - if (NULL == op->state) { - GNUNET_break (0); - GNUNET_SERVICE_client_drop (cs->client); - return; + struct OperationState *state; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Accepting set intersection operation\n"); + state = GNUNET_new (struct OperationState); + state->phase = PHASE_INITIAL; + state->my_element_count + = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create ( + GNUNET_MIN (state->my_element_count, + op->remote_element_count), + GNUNET_YES); + op->state = state; + if (op->remote_element_count < state->my_element_count) + { + /* If the other peer (Alice) has fewer elements than us (Bob), + we just send the count as Alice should send the first BF */ + send_element_count (op); + state->phase = PHASE_COUNT_SENT; + } + else + { + /* We have fewer elements, so we start with the BF */ + begin_bf_exchange (op); + } + op->state = state; } /* Now allow CADET to continue, as we did not do this in #handle_incoming_msg (as we wanted to first see if the @@ -3196,8 +2683,10 @@ shutdown_task (void *cls) cadet = NULL; } } - GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); + GNUNET_STATISTICS_destroy (_GSS_statistics, + GNUNET_YES); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "handled shutdown request\n"); } @@ -3217,8 +2706,10 @@ run (void *cls, /* FIXME: need to modify SERVICE (!) API to allow us to run a shutdown task *after* clients were forcefully disconnected! */ - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); - _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); + _GSS_statistics = GNUNET_STATISTICS_create ("seti", + cfg); cadet = GNUNET_CADET_connect (cfg); if (NULL == cadet) { @@ -3234,7 +2725,7 @@ run (void *cls, * Define "main" method using service macro. */ GNUNET_SERVICE_MAIN ( - "set", + "seti", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, @@ -3244,7 +2735,7 @@ GNUNET_SERVICE_MAIN ( GNUNET_MESSAGE_TYPE_SETI_ACCEPT, struct GNUNET_SET_AcceptMessage, NULL), - GNUNET_MQ_hd_var_size (client_mutation, + GNUNET_MQ_hd_var_size (client_set_add, GNUNET_MESSAGE_TYPE_SETI_ADD, struct GNUNET_SET_ElementMessage, NULL), -- cgit v1.2.3 From 9cbc1f9c76ae5de5bcfdc15f935cd04200c0e013 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 19 Aug 2020 00:06:35 +0200 Subject: -fix FTBFS for seti --- src/include/gnunet_protocols.h | 5 + src/seti/gnunet-service-seti.c | 1204 ++++++++++++++++------------------------ 2 files changed, 472 insertions(+), 737 deletions(-) (limited to 'src/seti') diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index e9a2b1c0e..4526b75d9 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1854,6 +1854,11 @@ extern "C" { */ #define GNUNET_MESSAGE_TYPE_SETI_P2P_DONE 593 +/** + * Request to begin set intersection operation. + */ +#define GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST 594 + /******************************************************************************* * SET message types diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c index 037181bde..7159a7ba2 100644 --- a/src/seti/gnunet-service-seti.c +++ b/src/seti/gnunet-service-seti.c @@ -25,6 +25,10 @@ */ #include "gnunet-service-seti_protocol.h" #include "gnunet_statistics_service.h" +#include "gnunet_cadet_service.h" +#include "gnunet_seti_service.h" +#include "gnunet_block_lib.h" +#include "seti.h" /** * How long do we hold on to an incoming channel if there is @@ -76,18 +80,6 @@ enum IntersectionOperationPhase }; -/** - * Implementation-specific set state. Used as opaque pointer, and - * specified further in the respective implementation. - */ -struct SetState; - -/** - * Implementation-specific set operation. Used as opaque pointer, and - * specified further in the respective implementation. - */ -struct OperationState; - /** * A set that supports a specific operation with other peers. */ @@ -119,7 +111,7 @@ struct ElementEntry * The actual element. The data for the element * should be allocated at the end of this struct. */ - struct GNUNET_SET_Element element; + struct GNUNET_SETI_Element element; /** * Hash of the element. For set union: Will be used to derive the @@ -179,6 +171,26 @@ struct ClientState */ struct Operation { + /** + * The identity of the requesting peer. Needs to + * be stored here as the op spec might not have been created yet. + */ + struct GNUNET_PeerIdentity peer; + + /** + * XOR of the keys of all of the elements (remaining) in my set. + * Always updated when elements are added or removed to + * @e my_elements. + */ + struct GNUNET_HashCode my_xor; + + /** + * XOR of the keys of all of the elements (remaining) in + * the other peer's set. Updated when we receive the + * other peer's Bloom filter. + */ + struct GNUNET_HashCode other_xor; + /** * Kept in a DLL of the listener, if @e listener is non-NULL. */ @@ -216,17 +228,31 @@ struct Operation struct Set *set; /** - * Operation-specific operation state. Note that the exact - * type depends on this being a union or intersection operation - * (and thus on @e vt). + * The bf we currently receive */ - struct OperationState *state; // FIXME: inline + struct GNUNET_CONTAINER_BloomFilter *remote_bf; /** - * The identity of the requesting peer. Needs to - * be stored here as the op spec might not have been created yet. + * BF of the set's element. */ - struct GNUNET_PeerIdentity peer; + struct GNUNET_CONTAINER_BloomFilter *local_bf; + + /** + * Remaining elements in the intersection operation. + * Maps element-id-hashes to 'elements in our set'. + */ + struct GNUNET_CONTAINER_MultiHashMap *my_elements; + + /** + * Iterator for sending the final set of @e my_elements to the client. + */ + struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter; + + /** + * For multipart BF transmissions, we have to store the + * bloomfilter-data until we fully received it. + */ + char *bf_data; /** * Timeout task, if the incoming peer has not been accepted @@ -235,48 +261,68 @@ struct Operation struct GNUNET_SCHEDULER_Task *timeout_task; /** - * Salt to use for the operation. + * How many bytes of @e bf_data are valid? + */ + uint32_t bf_data_offset; + + /** + * Current element count contained within @e my_elements. + * (May differ briefly during initialization.) + */ + uint32_t my_element_count; + + /** + * size of the bloomfilter in @e bf_data. + */ + uint32_t bf_data_size; + + /** + * size of the bloomfilter + */ + uint32_t bf_bits_per_element; + + /** + * Salt currently used for BF construction (by us or the other peer, + * depending on where we are in the code). */ uint32_t salt; /** - * Remote peers element count + * Current state of the operation. */ - uint32_t remote_element_count; + enum IntersectionOperationPhase phase; /** - * ID used to identify an operation between service and client + * Generation in which the operation handle was created. */ - uint32_t client_request_id; + unsigned int generation_created; /** - * When are elements sent to the client, and which elements are sent? + * Did we send the client that we are done? */ - int return_intersection; + int client_done_sent; /** - * Lower bound for the set size, used only when - * byzantine mode is enabled. + * Set whenever we reach the state where the death of the + * channel is perfectly find and should NOT result in the + * operation being cancelled. */ - int byzantine_lower_bound; + int channel_death_expected; /** - * Always use delta operation instead of sending full sets, - * even it it's less efficient. + * Remote peers element count */ - int force_delta; + uint32_t remote_element_count; /** - * Always send full sets, even if delta operations would - * be more efficient. + * ID used to identify an operation between service and client */ - int force_full; + uint32_t client_request_id; /** - * #GNUNET_YES to fail operations where Byzantine faults - * are suspected + * When are elements sent to the client, and which elements are sent? */ - int byzantine; + int return_intersection; /** * Unique request id for the request from a remote peer, sent to the @@ -285,10 +331,6 @@ struct Operation */ uint32_t suggest_id; - /** - * Generation in which the operation handle was created. - */ - unsigned int generation_created; }; @@ -348,9 +390,10 @@ struct Set struct SetContent *content; /** - * Implementation-specific state. + * Number of currently valid elements in the set which have not been + * removed. */ - struct SetState *state; + uint32_t current_set_element_count; /** * Evaluate operations are held in a linked list. @@ -371,128 +414,6 @@ struct Set }; -/** - * State of an evaluate operation with another peer. - */ -struct OperationState -{ - /** - * The bf we currently receive - */ - struct GNUNET_CONTAINER_BloomFilter *remote_bf; - - /** - * BF of the set's element. - */ - struct GNUNET_CONTAINER_BloomFilter *local_bf; - - /** - * Remaining elements in the intersection operation. - * Maps element-id-hashes to 'elements in our set'. - */ - struct GNUNET_CONTAINER_MultiHashMap *my_elements; - - /** - * Iterator for sending the final set of @e my_elements to the client. - */ - struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter; - - /** - * Evaluate operations are held in a linked list. - */ - struct OperationState *next; - - /** - * Evaluate operations are held in a linked list. - */ - struct OperationState *prev; - - /** - * For multipart BF transmissions, we have to store the - * bloomfilter-data until we fully received it. - */ - char *bf_data; - - /** - * XOR of the keys of all of the elements (remaining) in my set. - * Always updated when elements are added or removed to - * @e my_elements. - */ - struct GNUNET_HashCode my_xor; - - /** - * XOR of the keys of all of the elements (remaining) in - * the other peer's set. Updated when we receive the - * other peer's Bloom filter. - */ - struct GNUNET_HashCode other_xor; - - /** - * How many bytes of @e bf_data are valid? - */ - uint32_t bf_data_offset; - - /** - * Current element count contained within @e my_elements. - * (May differ briefly during initialization.) - */ - uint32_t my_element_count; - - /** - * size of the bloomfilter in @e bf_data. - */ - uint32_t bf_data_size; - - /** - * size of the bloomfilter - */ - uint32_t bf_bits_per_element; - - /** - * Salt currently used for BF construction (by us or the other peer, - * depending on where we are in the code). - */ - uint32_t salt; - - /** - * Current state of the operation. - */ - enum IntersectionOperationPhase phase; - - /** - * Generation in which the operation handle - * was created. - */ - unsigned int generation_created; - - /** - * Did we send the client that we are done? - */ - int client_done_sent; - - /** - * Set whenever we reach the state where the death of the - * channel is perfectly find and should NOT result in the - * operation being cancelled. - */ - int channel_death_expected; -}; - - -/** - * Extra state required for efficient set intersection. - * Merely tracks the total number of elements. - */ -struct SetState -{ - /** - * Number of currently valid elements in the set which have not been - * removed. - */ - uint32_t current_set_element_count; -}; - - /** * A listener is inhabited by a client, and waits for evaluation * requests from remote peers. @@ -540,10 +461,6 @@ struct Listener */ struct GNUNET_HashCode app_id; - /** - * The type of the operation. - */ - enum GNUNET_SET_OperationType operation; }; @@ -597,10 +514,10 @@ static uint32_t suggest_id; */ static void send_client_removed_element (struct Operation *op, - struct GNUNET_SET_Element *element) + struct GNUNET_SETI_Element *element) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; + struct GNUNET_SETI_ResultMessage *rm; if (GNUNET_NO != op->return_intersection) return; /* Wrong mode for transmitting removed elements */ @@ -620,7 +537,7 @@ send_client_removed_element (struct Operation *op, GNUNET_break (0); return; } - rm->result_status = htons (GNUNET_SET_STATUS_DEL_LOCAL); + rm->result_status = htons (GNUNET_SETI_STATUS_DEL_LOCAL); rm->request_id = htonl (op->client_request_id); rm->element_type = element->element_type; GNUNET_memcpy (&rm[1], @@ -631,6 +548,21 @@ send_client_removed_element (struct Operation *op, } +/** + * Is element @a ee part of the set used by @a op? + * + * @param ee element to test + * @param op operation the defines the set and its generation + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ +static int +_GSS_is_element_of_operation (struct ElementEntry *ee, + struct Operation *op) +{ + return op->generation_created >= ee->generation_added; +} + + /** * Fills the "my_elements" hashmap with all relevant elements. * @@ -664,14 +596,14 @@ filtered_map_initialization (void *cls, /* Test if element is in other peer's bloomfilter */ GNUNET_BLOCK_mingle_hash (&ee->element_hash, - op->state->salt, + op->salt, &mutated_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Testing mingled hash %s with salt %u\n", GNUNET_h2s (&mutated_hash), - op->state->salt); + op->salt); if (GNUNET_NO == - GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + GNUNET_CONTAINER_bloomfilter_test (op->remote_bf, &mutated_hash)) { /* remove this element */ @@ -683,16 +615,16 @@ filtered_map_initialization (void *cls, ee->element.size); return GNUNET_YES; } - op->state->my_element_count++; - GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + op->my_element_count++; + GNUNET_CRYPTO_hash_xor (&op->my_xor, &ee->element_hash, - &op->state->my_xor); + &op->my_xor); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Filtered initialization of my_elements, adding %s:%u\n", GNUNET_h2s (&ee->element_hash), ee->element.size); GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + GNUNET_CONTAINER_multihashmap_put (op->my_elements, &ee->element_hash, ee, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); @@ -720,27 +652,27 @@ iterator_bf_reduce (void *cls, struct GNUNET_HashCode mutated_hash; GNUNET_BLOCK_mingle_hash (&ee->element_hash, - op->state->salt, + op->salt, &mutated_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Testing mingled hash %s with salt %u\n", GNUNET_h2s (&mutated_hash), - op->state->salt); + op->salt); if (GNUNET_NO == - GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + GNUNET_CONTAINER_bloomfilter_test (op->remote_bf, &mutated_hash)) { - GNUNET_break (0 < op->state->my_element_count); - op->state->my_element_count--; - GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + GNUNET_break (0 < op->my_element_count); + op->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->my_xor, &ee->element_hash, - &op->state->my_xor); + &op->my_xor); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bloom filter reduction of my_elements, removing %s:%u\n", GNUNET_h2s (&ee->element_hash), ee->element.size); GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + GNUNET_CONTAINER_multihashmap_remove (op->my_elements, &ee->element_hash, ee)); send_client_removed_element (op, @@ -775,18 +707,196 @@ iterator_bf_create (void *cls, struct GNUNET_HashCode mutated_hash; GNUNET_BLOCK_mingle_hash (&ee->element_hash, - op->state->salt, + op->salt, &mutated_hash); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Initializing BF with hash %s with salt %u\n", GNUNET_h2s (&mutated_hash), - op->state->salt); - GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, + op->salt); + GNUNET_CONTAINER_bloomfilter_add (op->local_bf, &mutated_hash); return GNUNET_YES; } +/** + * Destroy the given operation. Used for any operation where both + * peers were known and that thus actually had a vt and channel. Must + * not be used for operations where 'listener' is still set and we do + * not know the other peer. + * + * Call the implementation-specific cancel function of the operation. + * Disconnects from the remote peer. Does not disconnect the client, + * as there may be multiple operations per set. + * + * @param op operation to destroy + */ +static void +_GSS_operation_destroy (struct Operation *op) +{ + struct Set *set = op->set; + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op); + GNUNET_assert (NULL == op->listener); + if (NULL != op->remote_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->remote_bf); + op->remote_bf = NULL; + } + if (NULL != op->local_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->local_bf); + op->local_bf = NULL; + } + if (NULL != op->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->my_elements); + op->my_elements = NULL; + } + if (NULL != op->full_result_iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy ( + op->full_result_iter); + op->full_result_iter = NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying intersection op state done\n"); + if (NULL != set) + { + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + op); + op->set = NULL; + } + if (NULL != op->context_msg) + { + GNUNET_free (op->context_msg); + op->context_msg = NULL; + } + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, + * there was a channel end handler that will free 'op' on the call stack. */ +} + + +/** + * This function probably should not exist + * and be replaced by inlining more specific + * logic in the various places where it is called. + */ +static void +_GSS_operation_destroy2 (struct Operation *op); + + +/** + * Destroy an incoming request from a remote peer + * + * @param op remote request to destroy + */ +static void +incoming_destroy (struct Operation *op) +{ + struct Listener *listener; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying incoming operation %p\n", + op); + if (NULL != (listener = op->listener)) + { + GNUNET_CONTAINER_DLL_remove (listener->op_head, + listener->op_tail, + op); + op->listener = NULL; + } + if (NULL != op->timeout_task) + { + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + } + _GSS_operation_destroy2 (op); +} + + +/** + * Signal to the client that the operation has finished and + * destroy the operation. + * + * @param cls operation to destroy + */ +static void +send_client_done_and_destroy (void *cls) +{ + struct Operation *op = cls; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SETI_ResultMessage *rm; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE to local client\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Intersection operations succeeded", + 1, + GNUNET_NO); + ev = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_SETI_RESULT); + rm->request_id = htonl (op->client_request_id); + rm->result_status = htons (GNUNET_SETI_STATUS_DONE); + rm->element_type = htons (0); + GNUNET_MQ_send (op->set->cs->mq, + ev); + _GSS_operation_destroy (op); +} + + +/** + * This function probably should not exist + * and be replaced by inlining more specific + * logic in the various places where it is called. + */ +static void +_GSS_operation_destroy2 (struct Operation *op) +{ + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "channel_end_cb called\n"); + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + if (NULL != op->listener) + { + incoming_destroy (op); + return; + } + if (NULL != op->set) + { + if (GNUNET_YES == op->channel_death_expected) + { + /* oh goodie, we are done! */ + send_client_done_and_destroy (op); + } + else + { + /* sorry, channel went down early, too bad. */ + _GSS_operation_destroy (op); + } + } + else + _GSS_operation_destroy (op); + GNUNET_free (op); +} + + /** * Inform the client that the intersection operation has failed, * and proceed to destroy the evaluate operation. @@ -797,7 +907,7 @@ static void fail_intersection_operation (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *msg; + struct GNUNET_SETI_ResultMessage *msg; GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Intersection operation failed\n"); @@ -805,10 +915,10 @@ fail_intersection_operation (struct Operation *op) "# Intersection operations failed", 1, GNUNET_NO); - if (NULL != op->state->my_elements) + if (NULL != op->my_elements) { - GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); - op->state->my_elements = NULL; + GNUNET_CONTAINER_multihashmap_destroy (op->my_elements); + op->my_elements = NULL; } ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETI_RESULT); @@ -817,8 +927,7 @@ fail_intersection_operation (struct Operation *op) msg->element_type = htons (0); GNUNET_MQ_send (op->set->cs->mq, ev); - _GSS_operation_destroy (op, - GNUNET_YES); + _GSS_operation_destroy (op); } @@ -845,22 +954,22 @@ send_bloomfilter (struct Operation *op) potential and minimize overall bandwidth consumption. */ bf_elementbits = 2 + ceil (log2 ((double) (op->remote_element_count - / (double) op->state->my_element_count))); + / (double) op->my_element_count))); if (bf_elementbits < 1) bf_elementbits = 1; /* make sure k is not 0 */ /* optimize BF-size to ~50% of bits set */ - bf_size = ceil ((double) (op->state->my_element_count + bf_size = ceil ((double) (op->my_element_count * bf_elementbits / log (2))); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending Bloom filter (%u) of size %u bytes\n", (unsigned int) bf_elementbits, (unsigned int) bf_size); - op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - bf_size, - bf_elementbits); - op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, - UINT32_MAX); - GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + op->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + bf_size, + bf_elementbits); + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + GNUNET_CONTAINER_multihashmap_iterate (op->my_elements, &iterator_bf_create, op); @@ -876,17 +985,17 @@ send_bloomfilter (struct Operation *op) chunk_size = bf_size; ev = GNUNET_MQ_msg_extra (msg, chunk_size, - GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF); + GNUNET_MESSAGE_TYPE_SETI_P2P_BF); GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_bloomfilter_get_raw_data ( - op->state->local_bf, + op->local_bf, (char *) &msg[1], bf_size)); - msg->sender_element_count = htonl (op->state->my_element_count); + msg->sender_element_count = htonl (op->my_element_count); msg->bloomfilter_total_length = htonl (bf_size); msg->bits_per_element = htonl (bf_elementbits); - msg->sender_mutator = htonl (op->state->salt); - msg->element_xor_hash = op->state->my_xor; + msg->sender_mutator = htonl (op->salt); + msg->element_xor_hash = op->my_xor; GNUNET_MQ_send (op->mq, ev); } else @@ -895,7 +1004,7 @@ send_bloomfilter (struct Operation *op) bf_data = GNUNET_malloc (bf_size); GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_bloomfilter_get_raw_data ( - op->state->local_bf, + op->local_bf, bf_data, bf_size)); offset = 0; @@ -905,53 +1014,22 @@ send_bloomfilter (struct Operation *op) chunk_size = bf_size - offset; ev = GNUNET_MQ_msg_extra (msg, chunk_size, - GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF); + GNUNET_MESSAGE_TYPE_SETI_P2P_BF); GNUNET_memcpy (&msg[1], &bf_data[offset], chunk_size); offset += chunk_size; - msg->sender_element_count = htonl (op->state->my_element_count); + msg->sender_element_count = htonl (op->my_element_count); msg->bloomfilter_total_length = htonl (bf_size); msg->bits_per_element = htonl (bf_elementbits); - msg->sender_mutator = htonl (op->state->salt); - msg->element_xor_hash = op->state->my_xor; + msg->sender_mutator = htonl (op->salt); + msg->element_xor_hash = op->my_xor; GNUNET_MQ_send (op->mq, ev); } GNUNET_free (bf_data); } - GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); - op->state->local_bf = NULL; -} - - -/** - * Signal to the client that the operation has finished and - * destroy the operation. - * - * @param cls operation to destroy - */ -static void -send_client_done_and_destroy (void *cls) -{ - struct Operation *op = cls; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Intersection succeeded, sending DONE to local client\n"); - GNUNET_STATISTICS_update (_GSS_statistics, - "# Intersection operations succeeded", - 1, - GNUNET_NO); - ev = GNUNET_MQ_msg (rm, - GNUNET_MESSAGE_TYPE_SETI_RESULT); - rm->request_id = htonl (op->client_request_id); - rm->result_status = htons (GNUNET_SET_STATUS_DONE); - rm->element_type = htons (0); - GNUNET_MQ_send (op->set->cs->mq, - ev); - _GSS_operation_destroy (op, - GNUNET_YES); + GNUNET_CONTAINER_bloomfilter_free (op->local_bf); + op->local_bf = NULL; } @@ -970,8 +1048,8 @@ finished_local_operations (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DONE sent to other peer, now waiting for other end to close the channel\n"); - op->state->phase = PHASE_FINISHED; - op->state->channel_death_expected = GNUNET_YES; + op->phase = PHASE_FINISHED; + op->channel_death_expected = GNUNET_YES; } @@ -988,12 +1066,12 @@ send_p2p_done (struct Operation *op) struct GNUNET_MQ_Envelope *ev; struct IntersectionDoneMessage *idm; - GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); - GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); + GNUNET_assert (PHASE_MUST_SEND_DONE == op->phase); + GNUNET_assert (GNUNET_NO == op->channel_death_expected); ev = GNUNET_MQ_msg (idm, - GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_DONE); - idm->final_element_count = htonl (op->state->my_element_count); - idm->element_xor_hash = op->state->my_xor; + GNUNET_MESSAGE_TYPE_SETI_P2P_DONE); + idm->final_element_count = htonl (op->my_element_count); + idm->element_xor_hash = op->my_xor; GNUNET_MQ_notify_sent (ev, &finished_local_operations, op); @@ -1014,12 +1092,12 @@ send_remaining_elements (void *cls) const void *nxt; const struct ElementEntry *ee; struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; - const struct GNUNET_SET_Element *element; + struct GNUNET_SETI_ResultMessage *rm; + const struct GNUNET_SETI_Element *element; int res; res = GNUNET_CONTAINER_multihashmap_iterator_next ( - op->state->full_result_iter, + op->full_result_iter, NULL, &nxt); if (GNUNET_NO == res) @@ -1027,14 +1105,14 @@ send_remaining_elements (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending done and destroy because iterator ran out\n"); GNUNET_CONTAINER_multihashmap_iterator_destroy ( - op->state->full_result_iter); - op->state->full_result_iter = NULL; - if (PHASE_DONE_RECEIVED == op->state->phase) + op->full_result_iter); + op->full_result_iter = NULL; + if (PHASE_DONE_RECEIVED == op->phase) { - op->state->phase = PHASE_FINISHED; + op->phase = PHASE_FINISHED; send_client_done_and_destroy (op); } - else if (PHASE_MUST_SEND_DONE == op->state->phase) + else if (PHASE_MUST_SEND_DONE == op->phase) { send_p2p_done (op); } @@ -1055,7 +1133,7 @@ send_remaining_elements (void *cls) element->size, GNUNET_MESSAGE_TYPE_SETI_RESULT); GNUNET_assert (NULL != ev); - rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->result_status = htons (GNUNET_SETI_STATUS_ADD_LOCAL); rm->request_id = htonl (op->client_request_id); rm->element_type = element->element_type; GNUNET_memcpy (&rm[1], @@ -1088,15 +1166,15 @@ initialize_map_unfiltered (void *cls, if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) return GNUNET_YES; /* element not live in operation's generation */ - GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + GNUNET_CRYPTO_hash_xor (&op->my_xor, &ee->element_hash, - &op->state->my_xor); + &op->my_xor); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Initial full initialization of my_elements, adding %s:%u\n", GNUNET_h2s (&ee->element_hash), ee->element.size); GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + GNUNET_CONTAINER_multihashmap_put (op->my_elements, &ee->element_hash, ee, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); @@ -1118,10 +1196,10 @@ send_element_count (struct Operation *op) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending our element count (%u)\n", - op->state->my_element_count); + op->my_element_count); ev = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_ELEMENT_INFO); - msg->sender_element_count = htonl (op->state->my_element_count); + GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO); + msg->sender_element_count = htonl (op->my_element_count); GNUNET_MQ_send (op->mq, ev); } @@ -1135,7 +1213,7 @@ send_element_count (struct Operation *op) static void begin_bf_exchange (struct Operation *op) { - op->state->phase = PHASE_BF_EXCHANGE; + op->phase = PHASE_BF_EXCHANGE; GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, &initialize_map_unfiltered, op); @@ -1157,28 +1235,22 @@ handle_intersection_p2p_element_info (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) - { - GNUNET_break_op (0); - fail_intersection_operation (op); - return; - } op->remote_element_count = ntohl (msg->sender_element_count); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received remote element count (%u), I have %u\n", op->remote_element_count, - op->state->my_element_count); - if (((PHASE_INITIAL != op->state->phase) && - (PHASE_COUNT_SENT != op->state->phase)) || - (op->state->my_element_count > op->remote_element_count) || - (0 == op->state->my_element_count) || + op->my_element_count); + if (((PHASE_INITIAL != op->phase) && + (PHASE_COUNT_SENT != op->phase)) || + (op->my_element_count > op->remote_element_count) || + (0 == op->my_element_count) || (0 == op->remote_element_count)) { GNUNET_break_op (0); fail_intersection_operation (op); return; } - GNUNET_break (NULL == op->state->remote_bf); + GNUNET_break (NULL == op->remote_bf); begin_bf_exchange (op); GNUNET_CADET_receive_done (op->channel); } @@ -1194,11 +1266,11 @@ process_bf (struct Operation *op) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", - op->state->phase, + op->phase, op->remote_element_count, - op->state->my_element_count, + op->my_element_count, GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); - switch (op->state->phase) + switch (op->phase) { case PHASE_INITIAL: GNUNET_break_op (0); @@ -1207,14 +1279,14 @@ process_bf (struct Operation *op) case PHASE_COUNT_SENT: /* This is the first BF being sent, build our initial map with filtering in place */ - op->state->my_element_count = 0; + op->my_element_count = 0; GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, &filtered_map_initialization, op); break; case PHASE_BF_EXCHANGE: /* Update our set by reduction */ - GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + GNUNET_CONTAINER_multihashmap_iterate (op->my_elements, &iterator_bf_reduce, op); break; @@ -1231,35 +1303,35 @@ process_bf (struct Operation *op) fail_intersection_operation (op); return; } - GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); - op->state->remote_bf = NULL; + GNUNET_CONTAINER_bloomfilter_free (op->remote_bf); + op->remote_bf = NULL; - if ((0 == op->state->my_element_count) || /* fully disjoint */ - ((op->state->my_element_count == op->remote_element_count) && - (0 == GNUNET_memcmp (&op->state->my_xor, - &op->state->other_xor)))) + if ((0 == op->my_element_count) || /* fully disjoint */ + ((op->my_element_count == op->remote_element_count) && + (0 == GNUNET_memcmp (&op->my_xor, + &op->other_xor)))) { /* we are done */ - op->state->phase = PHASE_MUST_SEND_DONE; + op->phase = PHASE_MUST_SEND_DONE; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE to other peer\n"); - GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); - op->state->local_bf = NULL; - if (GNUNET_SET_RESULT_FULL == op->result_mode) + GNUNET_CONTAINER_bloomfilter_free (op->local_bf); + op->local_bf = NULL; + if (GNUNET_YES == op->return_intersection) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending full result set (%u elements)\n", - GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); - op->state->full_result_iter + GNUNET_CONTAINER_multihashmap_size (op->my_elements)); + op->full_result_iter = GNUNET_CONTAINER_multihashmap_iterator_create ( - op->state->my_elements); + op->my_elements); send_remaining_elements (op); return; } send_p2p_done (op); return; } - op->state->phase = PHASE_BF_EXCHANGE; + op->phase = PHASE_BF_EXCHANGE; send_bloomfilter (op); } @@ -1277,11 +1349,7 @@ check_intersection_p2p_bf (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + (void) op; return GNUNET_OK; } @@ -1301,7 +1369,7 @@ handle_intersection_p2p_bf (void *cls, uint32_t chunk_size; uint32_t bf_bits_per_element; - switch (op->state->phase) + switch (op->phase) { case PHASE_INITIAL: GNUNET_break_op (0); @@ -1313,43 +1381,43 @@ handle_intersection_p2p_bf (void *cls, bf_size = ntohl (msg->bloomfilter_total_length); bf_bits_per_element = ntohl (msg->bits_per_element); chunk_size = htons (msg->header.size) - sizeof(struct BFMessage); - op->state->other_xor = msg->element_xor_hash; + op->other_xor = msg->element_xor_hash; if (bf_size == chunk_size) { - if (NULL != op->state->bf_data) + if (NULL != op->bf_data) { GNUNET_break_op (0); fail_intersection_operation (op); return; } /* single part, done here immediately */ - op->state->remote_bf + op->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1], bf_size, bf_bits_per_element); - op->state->salt = ntohl (msg->sender_mutator); + op->salt = ntohl (msg->sender_mutator); op->remote_element_count = ntohl (msg->sender_element_count); process_bf (op); break; } /* multipart chunk */ - if (NULL == op->state->bf_data) + if (NULL == op->bf_data) { /* first chunk, initialize */ - op->state->bf_data = GNUNET_malloc (bf_size); - op->state->bf_data_size = bf_size; - op->state->bf_bits_per_element = bf_bits_per_element; - op->state->bf_data_offset = 0; - op->state->salt = ntohl (msg->sender_mutator); + op->bf_data = GNUNET_malloc (bf_size); + op->bf_data_size = bf_size; + op->bf_bits_per_element = bf_bits_per_element; + op->bf_data_offset = 0; + op->salt = ntohl (msg->sender_mutator); op->remote_element_count = ntohl (msg->sender_element_count); } else { /* increment */ - if ((op->state->bf_data_size != bf_size) || - (op->state->bf_bits_per_element != bf_bits_per_element) || - (op->state->bf_data_offset + chunk_size > bf_size) || - (op->state->salt != ntohl (msg->sender_mutator)) || + if ((op->bf_data_size != bf_size) || + (op->bf_bits_per_element != bf_bits_per_element) || + (op->bf_data_offset + chunk_size > bf_size) || + (op->salt != ntohl (msg->sender_mutator)) || (op->remote_element_count != ntohl (msg->sender_element_count))) { GNUNET_break_op (0); @@ -1357,20 +1425,20 @@ handle_intersection_p2p_bf (void *cls, return; } } - GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset], + GNUNET_memcpy (&op->bf_data[op->bf_data_offset], (const char *) &msg[1], chunk_size); - op->state->bf_data_offset += chunk_size; - if (op->state->bf_data_offset == bf_size) + op->bf_data_offset += chunk_size; + if (op->bf_data_offset == bf_size) { /* last chunk, run! */ - op->state->remote_bf - = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, + op->remote_bf + = GNUNET_CONTAINER_bloomfilter_init (op->bf_data, bf_size, bf_bits_per_element); - GNUNET_free (op->state->bf_data); - op->state->bf_data = NULL; - op->state->bf_data_size = 0; + GNUNET_free (op->bf_data); + op->bf_data = NULL; + op->bf_data_size = 0; process_bf (op); } break; @@ -1400,17 +1468,17 @@ filter_all (void *cls, struct Operation *op = cls; struct ElementEntry *ee = value; - GNUNET_break (0 < op->state->my_element_count); - op->state->my_element_count--; - GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + GNUNET_break (0 < op->my_element_count); + op->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->my_xor, &ee->element_hash, - &op->state->my_xor); + &op->my_xor); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Final reduction of my_elements, removing %s:%u\n", GNUNET_h2s (&ee->element_hash), ee->element.size); GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + GNUNET_CONTAINER_multihashmap_remove (op->my_elements, &ee->element_hash, ee)); send_client_removed_element (op, @@ -1431,13 +1499,7 @@ handle_intersection_p2p_done (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) - { - GNUNET_break_op (0); - fail_intersection_operation (op); - return; - } - if (PHASE_BF_EXCHANGE != op->state->phase) + if (PHASE_BF_EXCHANGE != op->phase) { /* wrong phase to conclude? FIXME: Or should we allow this if the other peer has _initially_ already an empty set? */ @@ -1449,12 +1511,12 @@ handle_intersection_p2p_done (void *cls, { /* other peer determined empty set is the intersection, remove all elements */ - GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + GNUNET_CONTAINER_multihashmap_iterate (op->my_elements, &filter_all, op); } - if ((op->state->my_element_count != ntohl (idm->final_element_count)) || - (0 != GNUNET_memcmp (&op->state->my_xor, + if ((op->my_element_count != ntohl (idm->final_element_count)) || + (0 != GNUNET_memcmp (&op->my_xor, &idm->element_xor_hash))) { /* Other peer thinks we are done, but we disagree on the result! */ @@ -1464,22 +1526,22 @@ handle_intersection_p2p_done (void *cls, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got IntersectionDoneMessage, have %u elements in intersection\n", - op->state->my_element_count); - op->state->phase = PHASE_DONE_RECEIVED; + op->my_element_count); + op->phase = PHASE_DONE_RECEIVED; GNUNET_CADET_receive_done (op->channel); - GNUNET_assert (GNUNET_NO == op->state->client_done_sent); - if (GNUNET_SET_RESULT_FULL == op->result_mode) + GNUNET_assert (GNUNET_NO == op->client_done_sent); + if (GNUNET_YES == op->return_intersection) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending full result set to client (%u elements)\n", - GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); - op->state->full_result_iter - = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + GNUNET_CONTAINER_multihashmap_size (op->my_elements)); + op->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create (op->my_elements); send_remaining_elements (op); return; } - op->state->phase = PHASE_FINISHED; + op->phase = PHASE_FINISHED; send_client_done_and_destroy (op); } @@ -1506,122 +1568,6 @@ get_incoming (uint32_t id) } -/** - * Destroy an incoming request from a remote peer - * - * @param op remote request to destroy - */ -static void -incoming_destroy (struct Operation *op) -{ - struct Listener *listener; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying incoming operation %p\n", - op); - if (NULL != (listener = op->listener)) - { - GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); - op->listener = NULL; - } - if (NULL != op->timeout_task) - { - GNUNET_SCHEDULER_cancel (op->timeout_task); - op->timeout_task = NULL; - } - _GSS_operation_destroy2 (op); -} - - -/** - * Is element @a ee part of the set used by @a op? - * - * @param ee element to test - * @param op operation the defines the set and its generation - * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not - */ -static int -_GSS_is_element_of_operation (struct ElementEntry *ee, - struct Operation *op) -{ - return op->generation_created >= ee->generation_added; -} - - -/** - * Destroy the given operation. Used for any operation where both - * peers were known and that thus actually had a vt and channel. Must - * not be used for operations where 'listener' is still set and we do - * not know the other peer. - * - * Call the implementation-specific cancel function of the operation. - * Disconnects from the remote peer. Does not disconnect the client, - * as there may be multiple operations per set. - * - * @param op operation to destroy - */ -static void -_GSS_operation_destroy (struct Operation *op) -{ - struct Set *set = op->set; - struct GNUNET_CADET_Channel *channel; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op); - GNUNET_assert (NULL == op->listener); - if (NULL != op->state) - { - /* check if the op was canceled twice */ - GNUNET_assert (NULL != op->state); - if (NULL != op->state->remote_bf) - { - GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); - op->state->remote_bf = NULL; - } - if (NULL != op->state->local_bf) - { - GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); - op->state->local_bf = NULL; - } - if (NULL != op->state->my_elements) - { - GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); - op->state->my_elements = NULL; - } - if (NULL != op->state->full_result_iter) - { - GNUNET_CONTAINER_multihashmap_iterator_destroy ( - op->state->full_result_iter); - op->state->full_result_iter = NULL; - } - GNUNET_free (op->state); - op->state = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying intersection op state done\n"); - } - if (NULL != set) - { - GNUNET_CONTAINER_DLL_remove (set->ops_head, - set->ops_tail, - op); - op->set = NULL; - } - if (NULL != op->context_msg) - { - GNUNET_free (op->context_msg); - op->context_msg = NULL; - } - if (NULL != (channel = op->channel)) - { - /* This will free op; called conditionally as this helper function - is also called from within the channel disconnect handler. */ - op->channel = NULL; - GNUNET_CADET_channel_destroy (channel); - } - /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, - * there was a channel end handler that will free 'op' on the call stack. */ -} - - /** * Callback called when a client connects to the service. * @@ -1660,7 +1606,6 @@ destroy_elements_iterator (void *cls, { struct ElementEntry *ee = value; - GNUNET_free (ee->mutations); GNUNET_free (ee); return GNUNET_YES; } @@ -1691,19 +1636,7 @@ client_disconnect_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); /* Destroy pending set operations */ while (NULL != set->ops_head) - _GSS_operation_destroy (set->ops_head, GNUNET_NO); - - /* Destroy operation-specific state */ - GNUNET_assert (NULL != set->state); - GNUNET_free (set->state); - - /* Clean up ongoing iterations */ - if (NULL != set->iter) - { - GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); - set->iter = NULL; - set->iteration_id++; - } + _GSS_operation_destroy (set->ops_head); /* free set content (or at least decrement RC) */ set->content = NULL; @@ -1719,9 +1652,6 @@ client_disconnect_cb (void *cls, content->elements = NULL; GNUNET_free (content); } - GNUNET_free (set->excluded_generations); - set->excluded_generations = NULL; - GNUNET_free (set); } @@ -1777,20 +1707,14 @@ check_incoming_msg (void *cls, return GNUNET_SYSERR; } /* This should be equivalent to the previous condition, but can't hurt to check twice */ - if (NULL == op->listener) + if (NULL == listener) { GNUNET_break (0); return GNUNET_SYSERR; } - if (listener->operation != - (enum GNUNET_SET_OperationType) ntohl (msg->operation)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } nested_context = GNUNET_MQ_extract_nested_mh (msg); if ((NULL != nested_context) && - (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE)) + (ntohs (nested_context->size) > GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE)) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1824,7 +1748,7 @@ handle_incoming_msg (void *cls, struct Listener *listener = op->listener; const struct GNUNET_MessageHeader *nested_context; struct GNUNET_MQ_Envelope *env; - struct GNUNET_SET_RequestMessage *cmsg; + struct GNUNET_SETI_RequestMessage *cmsg; nested_context = GNUNET_MQ_extract_nested_mh (msg); /* Make a copy of the nested_context (application-specific context @@ -1835,8 +1759,7 @@ handle_incoming_msg (void *cls, op->remote_element_count = ntohl (msg->element_count); GNUNET_log ( GNUNET_ERROR_TYPE_DEBUG, - "Received P2P operation request (op %u, port %s) for active listener\n", - (uint32_t) ntohl (msg->operation), + "Received P2P operation request (port %s) for active listener\n", GNUNET_h2s (&op->listener->app_id)); GNUNET_assert (0 == op->suggest_id); if (0 == suggest_id) @@ -1862,110 +1785,6 @@ handle_incoming_msg (void *cls, } -/** - * Send the next element of a set to the set's client. The next element is given by - * the set's current hashmap iterator. The set's iterator will be set to NULL if there - * are no more elements in the set. The caller must ensure that the set's iterator is - * valid. - * - * The client will acknowledge each received element with a - * #GNUNET_MESSAGE_TYPE_SETI_ITER_ACK message. Our - * #handle_client_iter_ack() will then trigger the next transmission. - * Note that the #GNUNET_MESSAGE_TYPE_SETI_ITER_DONE is not acknowledged. - * - * @param set set that should send its next element to its client - */ -static void -send_client_element (struct Set *set) -{ - int ret; - struct ElementEntry *ee; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_IterResponseMessage *msg; - - GNUNET_assert (NULL != set->iter); - do - { - ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, - NULL, - (const void **) &ee); - if (GNUNET_NO == ret) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set); - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETI_ITER_DONE); - GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); - set->iter = NULL; - set->iteration_id++; - GNUNET_assert (set->content->iterator_count > 0); - set->content->iterator_count--; - execute_delayed_mutations (set); - GNUNET_MQ_send (set->cs->mq, ev); - return; - } - GNUNET_assert (NULL != ee); - } - while (GNUNET_NO == - is_element_of_generation (ee, - set->iter_generation, - set->excluded_generations, - set->excluded_generations_size)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending iteration element on %p.\n", - set); - ev = GNUNET_MQ_msg_extra (msg, - ee->element.size, - GNUNET_MESSAGE_TYPE_SETI_ITER_ELEMENT); - GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size); - msg->element_type = htons (ee->element.element_type); - msg->iteration_id = htons (set->iteration_id); - GNUNET_MQ_send (set->cs->mq, ev); -} - - -/** - * Called when a client wants to iterate the elements of a set. - * Checks if we have a set associated with the client and if we - * can right now start an iteration. If all checks out, starts - * sending the elements of the set to the client. - * - * @param cls client that sent the message - * @param m message sent by the client - */ -static void -handle_client_iterate (void *cls, - const struct GNUNET_MessageHeader *m) -{ - struct ClientState *cs = cls; - struct Set *set; - - if (NULL == (set = cs->set)) - { - /* attempt to iterate over a non existing set */ - GNUNET_break (0); - GNUNET_SERVICE_client_drop (cs->client); - return; - } - if (NULL != set->iter) - { - /* Only one concurrent iterate-action allowed per set */ - GNUNET_break (0); - GNUNET_SERVICE_client_drop (cs->client); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Iterating set %p in gen %u with %u content elements\n", - (void *) set, - set->current_generation, - GNUNET_CONTAINER_multihashmap_size (set->content->elements)); - GNUNET_SERVICE_client_continue (cs->client); - set->content->iterator_count++; - set->iter = - GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); - set->iter_generation = set->current_generation; - send_client_element (set); -} - - /** * Called when a client wants to create a new set. This is typically * the first request from a client, and includes the type of set @@ -1976,14 +1795,13 @@ handle_client_iterate (void *cls, */ static void handle_client_create_set (void *cls, - const struct GNUNET_SET_CreateMessage *msg) + const struct GNUNET_SETI_CreateMessage *msg) { struct ClientState *cs = cls; struct Set *set; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client created new set (operation %u)\n", - (uint32_t) ntohl (msg->operation)); + "Client created new intersection set\n"); if (NULL != cs->set) { /* There can only be one set per client */ @@ -1992,25 +1810,6 @@ handle_client_create_set (void *cls, return; } set = GNUNET_new (struct Set); - { - struct SetState *set_state; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Intersection set created\n"); - set_state = GNUNET_new (struct SetState); - set_state->current_set_element_count = 0; - - set->state = set_state; - } - - - if (NULL == set->state) - { - /* initialization failed (i.e. out of memory) */ - GNUNET_free (set); - GNUNET_SERVICE_client_drop (cs->client); - return; - } set->content = GNUNET_new (struct SetContent); set->content->refcount = 1; set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, @@ -2112,51 +1911,6 @@ channel_end_cb (void *channel_ctx, } -/** - * This function probably should not exist - * and be replaced by inlining more specific - * logic in the various places where it is called. - */ -static void -_GSS_operation_destroy2 (struct Operation *op) -{ - struct GNUNET_CADET_Channel *channel; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "channel_end_cb called\n"); - if (NULL != (channel = op->channel)) - { - /* This will free op; called conditionally as this helper function - is also called from within the channel disconnect handler. */ - op->channel = NULL; - GNUNET_CADET_channel_destroy (channel); - } - if (NULL != op->listener) - { - incoming_destroy (op); - return; - } - if (NULL != op->set) - { - if (GNUNET_YES == op->state->channel_death_expected) - { - /* oh goodie, we are done! */ - send_client_done_and_destroy (op); - } - else - { - /* sorry, channel went down early, too bad. */ - _GSS_operation_destroy (op, - GNUNET_YES); - } - } - else - _GSS_operation_destroy (op, - GNUNET_YES); - GNUNET_free (op); -} - - /** * Function called whenever an MQ-channel's transmission window size changes. * @@ -2188,11 +1942,11 @@ channel_window_cb (void *cls, */ static void handle_client_listen (void *cls, - const struct GNUNET_SET_ListenMessage *msg) + const struct GNUNET_SETI_ListenMessage *msg) { struct ClientState *cs = cls; - struct GNUNET_MQ_MessageHandler cadet_handlers[] = - { GNUNET_MQ_hd_var_size (incoming_msg, + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (incoming_msg, GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST, struct OperationRequestMessage, NULL), @@ -2208,7 +1962,8 @@ handle_client_listen (void *cls, GNUNET_MESSAGE_TYPE_SETI_P2P_DONE, struct IntersectionDoneMessage, NULL), - GNUNET_MQ_handler_end () }; + GNUNET_MQ_handler_end () + }; struct Listener *listener; if (NULL != cs->listener) @@ -2222,13 +1977,11 @@ handle_client_listen (void *cls, listener->cs = cs; cs->listener = listener; listener->app_id = msg->app_id; - listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New listener created (op %u, port %s)\n", - listener->operation, + "New listener for set intersection created (port %s)\n", GNUNET_h2s (&listener->app_id)); listener->open_port = GNUNET_CADET_open_port (cadet, &msg->app_id, @@ -2250,7 +2003,7 @@ handle_client_listen (void *cls, */ static void handle_client_reject (void *cls, - const struct GNUNET_SET_RejectMessage *msg) + const struct GNUNET_SETI_RejectMessage *msg) { struct ClientState *cs = cls; struct Operation *op; @@ -2267,8 +2020,7 @@ handle_client_reject (void *cls, return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer request (op %u, app %s) rejected by client\n", - op->listener->operation, + "Peer request (app %s) rejected by client\n", GNUNET_h2s (&cs->listener->app_id)); _GSS_operation_destroy2 (op); GNUNET_SERVICE_client_continue (cs->client); @@ -2282,8 +2034,8 @@ handle_client_reject (void *cls, * @param msg message sent by the client */ static int -check_client_mutation (void *cls, - const struct GNUNET_SET_ElementMessage *msg) +check_client_set_add (void *cls, + const struct GNUNET_SETI_ElementMessage *msg) { /* NOTE: Technically, we should probably check with the block library whether the element we are given is well-formed */ @@ -2299,11 +2051,11 @@ check_client_mutation (void *cls, */ static void handle_client_set_add (void *cls, - const struct GNUNET_SET_ElementMessage *msg) + const struct GNUNET_SETI_ElementMessage *msg) { struct ClientState *cs = cls; struct Set *set; - struct GNUNET_SET_Element el; + struct GNUNET_SETI_Element el; struct ElementEntry *ee; struct GNUNET_HashCode hash; @@ -2318,7 +2070,7 @@ handle_client_set_add (void *cls, el.size = ntohs (msg->header.size) - sizeof(*msg); el.data = &msg[1]; el.element_type = ntohs (msg->element_type); - GNUNET_ISET_element_hash (&el, + GNUNET_SETI_element_hash (&el, &hash); ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); @@ -2334,8 +2086,6 @@ handle_client_set_add (void *cls, ee->element.data = &ee[1]; ee->element.element_type = el.element_type; ee->remote = GNUNET_NO; - ee->mutations = NULL; - ee->mutations_size = 0; ee->element_hash = hash; GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_multihashmap_put ( @@ -2353,7 +2103,7 @@ handle_client_set_add (void *cls, /* same element inserted twice */ return; } - set->state->current_set_element_count++; + set->current_set_element_count++; } @@ -2387,7 +2137,7 @@ advance_generation (struct Set *set) */ static int check_client_evaluate (void *cls, - const struct GNUNET_SET_EvaluateMessage *msg) + const struct GNUNET_SETI_EvaluateMessage *msg) { /* FIXME: suboptimal, even if the context below could be NULL, there are malformed messages this does not check for... */ @@ -2405,7 +2155,7 @@ check_client_evaluate (void *cls, */ static void handle_client_evaluate (void *cls, - const struct GNUNET_SET_EvaluateMessage *msg) + const struct GNUNET_SETI_EvaluateMessage *msg) { struct ClientState *cs = cls; struct Operation *op = GNUNET_new (struct Operation); @@ -2441,12 +2191,7 @@ handle_client_evaluate (void *cls, op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); op->peer = msg->target_peer; - op->result_mode = ntohl (msg->result_mode); op->client_request_id = ntohl (msg->request_id); - op->byzantine = msg->byzantine; - op->byzantine_lower_bound = msg->byzantine_lower_bound; - op->force_full = msg->force_full; - op->force_delta = msg->force_delta; context = GNUNET_MQ_extract_nested_mh (msg); /* Advance generation values, so that @@ -2458,9 +2203,8 @@ handle_client_evaluate (void *cls, set->ops_tail, op); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating new CADET channel to port %s for set operation type %u\n", - GNUNET_h2s (&msg->app_id), - set->operation); + "Creating new CADET channel to port %s for set intersection\n", + GNUNET_h2s (&msg->app_id)); op->channel = GNUNET_CADET_channel_create (cadet, op, &msg->target_peer, @@ -2470,7 +2214,6 @@ handle_client_evaluate (void *cls, cadet_handlers); op->mq = GNUNET_CADET_get_mq (op->channel); { - struct OperationState *state; struct GNUNET_MQ_Envelope *ev; struct OperationRequestMessage *msg; @@ -2486,25 +2229,23 @@ handle_client_evaluate (void *cls, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Initiating intersection operation evaluation\n"); - state = GNUNET_new (struct OperationState); /* we started the operation, thus we have to send the operation request */ - state->phase = PHASE_INITIAL; - state->my_element_count = op->set->state->current_set_element_count; - state->my_elements - = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, + op->phase = PHASE_INITIAL; + op->my_element_count = op->set->current_set_element_count; + op->my_elements + = GNUNET_CONTAINER_multihashmap_create (op->my_element_count, GNUNET_YES); - msg->element_count = htonl (state->my_element_count); + msg->element_count = htonl (op->my_element_count); GNUNET_MQ_send (op->mq, ev); - state->phase = PHASE_COUNT_SENT; - if (NULL != opaque_context) + op->phase = PHASE_COUNT_SENT; + if (NULL != context) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent op request with context message\n"); else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent op request without context message\n"); - op->state = state; } GNUNET_SERVICE_client_continue (cs->client); } @@ -2518,7 +2259,7 @@ handle_client_evaluate (void *cls, */ static void handle_client_cancel (void *cls, - const struct GNUNET_SET_CancelMessage *msg) + const struct GNUNET_SETI_CancelMessage *msg) { struct ClientState *cs = cls; struct Set *set; @@ -2557,7 +2298,7 @@ handle_client_cancel (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client requested cancel for op %u\n", (uint32_t) ntohl (msg->request_id)); - _GSS_operation_destroy (op, GNUNET_YES); + _GSS_operation_destroy (op); } GNUNET_SERVICE_client_continue (cs->client); } @@ -2573,12 +2314,12 @@ handle_client_cancel (void *cls, */ static void handle_client_accept (void *cls, - const struct GNUNET_SET_AcceptMessage *msg) + const struct GNUNET_SETI_AcceptMessage *msg) { struct ClientState *cs = cls; struct Set *set; struct Operation *op; - struct GNUNET_SET_ResultMessage *result_message; + struct GNUNET_SETI_ResultMessage *result_message; struct GNUNET_MQ_Envelope *ev; struct Listener *listener; @@ -2603,7 +2344,7 @@ handle_client_accept (void *cls, ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SETI_RESULT); result_message->request_id = msg->request_id; - result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); + result_message->result_status = htons (GNUNET_SETI_STATUS_FAILURE); GNUNET_MQ_send (set->cs->mq, ev); GNUNET_SERVICE_client_continue (cs->client); return; @@ -2617,45 +2358,34 @@ handle_client_accept (void *cls, op->set = set; GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); op->client_request_id = ntohl (msg->request_id); - op->result_mode = ntohl (msg->result_mode); - op->byzantine = msg->byzantine; - op->byzantine_lower_bound = msg->byzantine_lower_bound; - op->force_full = msg->force_full; - op->force_delta = msg->force_delta; /* Advance generation values, so that future mutations do not interfer with the running operation. */ op->generation_created = set->current_generation; advance_generation (set); - GNUNET_assert (NULL == op->state); { - struct OperationState *state; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Accepting set intersection operation\n"); - state = GNUNET_new (struct OperationState); - state->phase = PHASE_INITIAL; - state->my_element_count - = op->set->state->current_set_element_count; - state->my_elements + op->phase = PHASE_INITIAL; + op->my_element_count + = op->set->current_set_element_count; + op->my_elements = GNUNET_CONTAINER_multihashmap_create ( - GNUNET_MIN (state->my_element_count, + GNUNET_MIN (op->my_element_count, op->remote_element_count), GNUNET_YES); - op->state = state; - if (op->remote_element_count < state->my_element_count) + if (op->remote_element_count < op->my_element_count) { /* If the other peer (Alice) has fewer elements than us (Bob), we just send the count as Alice should send the first BF */ send_element_count (op); - state->phase = PHASE_COUNT_SENT; + op->phase = PHASE_COUNT_SENT; } else { /* We have fewer elements, so we start with the BF */ begin_bf_exchange (op); } - op->state = state; } /* Now allow CADET to continue, as we did not do this in #handle_incoming_msg (as we wanted to first see if the @@ -2733,31 +2463,31 @@ GNUNET_SERVICE_MAIN ( NULL, GNUNET_MQ_hd_fixed_size (client_accept, GNUNET_MESSAGE_TYPE_SETI_ACCEPT, - struct GNUNET_SET_AcceptMessage, + struct GNUNET_SETI_AcceptMessage, NULL), GNUNET_MQ_hd_var_size (client_set_add, GNUNET_MESSAGE_TYPE_SETI_ADD, - struct GNUNET_SET_ElementMessage, + struct GNUNET_SETI_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size (client_create_set, GNUNET_MESSAGE_TYPE_SETI_CREATE, - struct GNUNET_SET_CreateMessage, + struct GNUNET_SETI_CreateMessage, NULL), GNUNET_MQ_hd_var_size (client_evaluate, GNUNET_MESSAGE_TYPE_SETI_EVALUATE, - struct GNUNET_SET_EvaluateMessage, + struct GNUNET_SETI_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size (client_listen, GNUNET_MESSAGE_TYPE_SETI_LISTEN, - struct GNUNET_SET_ListenMessage, + struct GNUNET_SETI_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size (client_reject, GNUNET_MESSAGE_TYPE_SETI_REJECT, - struct GNUNET_SET_RejectMessage, + struct GNUNET_SETI_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size (client_cancel, GNUNET_MESSAGE_TYPE_SETI_CANCEL, - struct GNUNET_SET_CancelMessage, + struct GNUNET_SETI_CancelMessage, NULL), GNUNET_MQ_handler_end ()); -- cgit v1.2.3 From 62853641f7ddb1e1b52b105c48039b3c65f36a32 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 19 Aug 2020 00:07:09 +0200 Subject: -fix FTBFS for seti --- src/seti/Makefile.am | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/seti') diff --git a/src/seti/Makefile.am b/src/seti/Makefile.am index d96ffff03..b4980b670 100644 --- a/src/seti/Makefile.am +++ b/src/seti/Makefile.am @@ -70,7 +70,7 @@ test_seti_api_SOURCES = \ test_seti_api_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/testing/libgnunettesting.la \ - libgnunetset.la + libgnunetseti.la plugin_LTLIBRARIES = \ libgnunet_plugin_block_seti_test.la -- cgit v1.2.3 From 076acb9981bc141c7905e0dcf3f9fcda3497a8aa Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 19 Aug 2020 00:24:48 +0200 Subject: -fix test FTBFS --- src/seti/test_seti_api.c | 224 ++++++++++++++++++----------------------------- 1 file changed, 86 insertions(+), 138 deletions(-) (limited to 'src/seti') diff --git a/src/seti/test_seti_api.c b/src/seti/test_seti_api.c index 42dedb846..22a3a06e5 100644 --- a/src/seti/test_seti_api.c +++ b/src/seti/test_seti_api.c @@ -19,7 +19,7 @@ */ /** - * @file set/test_set_intersection_result_full.c + * @file set/test_seti_api.c * @brief testcase for full result mode of the intersection set operation * @author Christian Fuchs * @author Christian Grothoff @@ -27,7 +27,7 @@ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_testing_lib.h" -#include "gnunet_set_service.h" +#include "gnunet_seti_service.h" static int ret; @@ -36,28 +36,26 @@ static struct GNUNET_PeerIdentity local_id; static struct GNUNET_HashCode app_id; -static struct GNUNET_SET_Handle *set1; +static struct GNUNET_SETI_Handle *set1; -static struct GNUNET_SET_Handle *set2; +static struct GNUNET_SETI_Handle *set2; -static struct GNUNET_SET_ListenHandle *listen_handle; +static struct GNUNET_SETI_ListenHandle *listen_handle; static const struct GNUNET_CONFIGURATION_Handle *config; -static int iter_count; - static struct GNUNET_SCHEDULER_Task *tt; -static struct GNUNET_SET_OperationHandle *oh1; +static struct GNUNET_SETI_OperationHandle *oh1; -static struct GNUNET_SET_OperationHandle *oh2; +static struct GNUNET_SETI_OperationHandle *oh2; static void result_cb_set1 (void *cls, - const struct GNUNET_SET_Element *element, + const struct GNUNET_SETI_Element *element, uint64_t current_size, - enum GNUNET_SET_Status status) + enum GNUNET_SETI_Status status) { static int count; @@ -66,19 +64,17 @@ result_cb_set1 (void *cls, status); switch (status) { - case GNUNET_SET_STATUS_OK: + case GNUNET_SETI_STATUS_ADD_LOCAL: count++; break; - - case GNUNET_SET_STATUS_FAILURE: + case GNUNET_SETI_STATUS_FAILURE: oh1 = NULL; ret = 1; break; - - case GNUNET_SET_STATUS_DONE: + case GNUNET_SETI_STATUS_DONE: oh1 = NULL; GNUNET_assert (1 == count); - GNUNET_SET_destroy (set1); + GNUNET_SETI_destroy (set1); set1 = NULL; if (NULL == set2) GNUNET_SCHEDULER_shutdown (); @@ -92,9 +88,9 @@ result_cb_set1 (void *cls, static void result_cb_set2 (void *cls, - const struct GNUNET_SET_Element *element, + const struct GNUNET_SETI_Element *element, uint64_t current_size, - enum GNUNET_SET_Status status) + enum GNUNET_SETI_Status status) { static int count; @@ -103,24 +99,21 @@ result_cb_set2 (void *cls, status); switch (status) { - case GNUNET_SET_STATUS_OK: + case GNUNET_SETI_STATUS_ADD_LOCAL: count++; break; - - case GNUNET_SET_STATUS_FAILURE: + case GNUNET_SETI_STATUS_FAILURE: oh2 = NULL; ret = 1; break; - - case GNUNET_SET_STATUS_DONE: + case GNUNET_SETI_STATUS_DONE: oh2 = NULL; GNUNET_assert (1 == count); - GNUNET_SET_destroy (set2); + GNUNET_SETI_destroy (set2); set2 = NULL; if (NULL == set1) GNUNET_SCHEDULER_shutdown (); break; - default: GNUNET_assert (0); } @@ -131,19 +124,23 @@ static void listen_cb (void *cls, const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_MessageHeader *context_msg, - struct GNUNET_SET_Request *request) + struct GNUNET_SETI_Request *request) { + struct GNUNET_SETI_Option opts[] = { + { .type = GNUNET_SETI_OPTION_RETURN_INTERSECTION }, + { .type = GNUNET_SETI_OPTION_END } + }; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting intersection by accepting and committing\n"); GNUNET_assert (NULL != context_msg); GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); - oh2 = GNUNET_SET_accept (request, - GNUNET_SET_RESULT_FULL, - (struct GNUNET_SET_Option[]) { 0 }, - &result_cb_set2, - NULL); - GNUNET_SET_commit (oh2, - set2); + oh2 = GNUNET_SETI_accept (request, + opts, + &result_cb_set2, + NULL); + GNUNET_SETI_commit (oh2, + set2); } @@ -156,25 +153,27 @@ static void start (void *cls) { struct GNUNET_MessageHeader context_msg; + struct GNUNET_SETI_Option opts[] = { + { .type = GNUNET_SETI_OPTION_RETURN_INTERSECTION }, + { .type = GNUNET_SETI_OPTION_END } + }; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting listener\n"); - context_msg.size = htons (sizeof context_msg); + context_msg.size = htons (sizeof (context_msg)); context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); - listen_handle = GNUNET_SET_listen (config, - GNUNET_SET_OPERATION_INTERSECTION, - &app_id, - &listen_cb, - NULL); - oh1 = GNUNET_SET_prepare (&local_id, - &app_id, - &context_msg, - GNUNET_SET_RESULT_FULL, - (struct GNUNET_SET_Option[]) { 0 }, - &result_cb_set1, - NULL); - GNUNET_SET_commit (oh1, - set1); + listen_handle = GNUNET_SETI_listen (config, + &app_id, + &listen_cb, + NULL); + oh1 = GNUNET_SETI_prepare (&local_id, + &app_id, + &context_msg, + opts, + &result_cb_set1, + NULL); + GNUNET_SETI_commit (oh1, + set1); } @@ -186,29 +185,29 @@ start (void *cls) static void init_set2 (void *cls) { - struct GNUNET_SET_Element element; + struct GNUNET_SETI_Element element; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); element.element_type = 0; element.data = "hello"; element.size = strlen (element.data); - GNUNET_SET_add_element (set2, - &element, - NULL, - NULL); + GNUNET_SETI_add_element (set2, + &element, + NULL, + NULL); element.data = "quux"; element.size = strlen (element.data); - GNUNET_SET_add_element (set2, - &element, - NULL, - NULL); + GNUNET_SETI_add_element (set2, + &element, + NULL, + NULL); element.data = "baz"; element.size = strlen (element.data); - GNUNET_SET_add_element (set2, - &element, - &start, - NULL); + GNUNET_SETI_add_element (set2, + &element, + &start, + NULL); } @@ -218,71 +217,23 @@ init_set2 (void *cls) static void init_set1 (void) { - struct GNUNET_SET_Element element; + struct GNUNET_SETI_Element element; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 1\n"); element.element_type = 0; element.data = "hello"; element.size = strlen (element.data); - GNUNET_SET_add_element (set1, - &element, - NULL, - NULL); - element.data = "bar"; - element.size = strlen (element.data); - GNUNET_SET_add_element (set1, - &element, - &init_set2, - NULL); -} - - -static int -iter_cb (void *cls, - const struct GNUNET_SET_Element *element) -{ - if (NULL == element) - { - GNUNET_assert (iter_count == 3); - GNUNET_SET_destroy (cls); - return GNUNET_YES; - } - iter_count++; - return GNUNET_YES; -} - - -static void -test_iter () -{ - struct GNUNET_SET_Element element; - struct GNUNET_SET_Handle *iter_set; - - iter_set = GNUNET_SET_create (config, - GNUNET_SET_OPERATION_INTERSECTION); - element.element_type = 0; - element.data = "hello"; - element.size = strlen (element.data); - GNUNET_SET_add_element (iter_set, - &element, - NULL, - NULL); + GNUNET_SETI_add_element (set1, + &element, + NULL, + NULL); element.data = "bar"; element.size = strlen (element.data); - GNUNET_SET_add_element (iter_set, - &element, - NULL, - NULL); - element.data = "quux"; - element.size = strlen (element.data); - GNUNET_SET_add_element (iter_set, - &element, - NULL, - NULL); - GNUNET_SET_iterate (iter_set, - &iter_cb, - iter_set); + GNUNET_SETI_add_element (set1, + &element, + &init_set2, + NULL); } @@ -301,27 +252,27 @@ do_shutdown (void *cls) } if (NULL != oh1) { - GNUNET_SET_operation_cancel (oh1); + GNUNET_SETI_operation_cancel (oh1); oh1 = NULL; } if (NULL != oh2) { - GNUNET_SET_operation_cancel (oh2); + GNUNET_SETI_operation_cancel (oh2); oh2 = NULL; } if (NULL != set1) { - GNUNET_SET_destroy (set1); + GNUNET_SETI_destroy (set1); set1 = NULL; } if (NULL != set2) { - GNUNET_SET_destroy (set2); + GNUNET_SETI_destroy (set2); set2 = NULL; } if (NULL != listen_handle) { - GNUNET_SET_listen_cancel (listen_handle); + GNUNET_SETI_listen_cancel (listen_handle); listen_handle = NULL; } } @@ -359,20 +310,16 @@ run (void *cls, config = cfg; GNUNET_TESTING_peer_get_identity (peer, &local_id); - if (0) - test_iter (); - - tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( - GNUNET_TIME_UNIT_SECONDS, 5), - &timeout_fail, - NULL); + tt = GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + 5), + &timeout_fail, + NULL); GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); - set1 = GNUNET_SET_create (cfg, - GNUNET_SET_OPERATION_INTERSECTION); - set2 = GNUNET_SET_create (cfg, - GNUNET_SET_OPERATION_INTERSECTION); + set1 = GNUNET_SETI_create (cfg); + set2 = GNUNET_SETI_create (cfg); GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); @@ -385,9 +332,10 @@ int main (int argc, char **argv) { - if (0 != GNUNET_TESTING_peer_run ("test_set_intersection_result_full", - "test_set.conf", - &run, NULL)) + if (0 != GNUNET_TESTING_peer_run ("test_seti_api", + "test_seti.conf", + &run, + NULL)) return 1; return ret; } -- cgit v1.2.3 From 1eda2878cf1efef694c911c318427f14ecbdf8f3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 19 Aug 2020 00:46:39 +0200 Subject: -fix set/seti migration issues --- src/include/gnunet_block_lib.h | 13 ++++++++++++ src/seti/gnunet-service-seti.c | 20 +++++++++++++++--- src/seti/plugin_block_seti_test.c | 44 +++++++++++++++++++-------------------- src/seti/seti_api.c | 8 +++---- src/seti/test_seti.conf | 3 +-- src/setu/plugin_block_setu_test.c | 44 +++++++++++++++++++-------------------- 6 files changed, 79 insertions(+), 53 deletions(-) (limited to 'src/seti') diff --git a/src/include/gnunet_block_lib.h b/src/include/gnunet_block_lib.h index 18ca6f63f..73b51252e 100644 --- a/src/include/gnunet_block_lib.h +++ b/src/include/gnunet_block_lib.h @@ -137,6 +137,19 @@ enum GNUNET_BLOCK_Type * Contains either special marker elements or a nested block. */ GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT = 25, + + /** + * Block for testing set intersection. If first byte of the block + * is non-zero, the block is considered invalid. + */ + GNUNET_BLOCK_TYPE_SETI_TEST = 24, + + /** + * Block for testing set union. If first byte of the block + * is non-zero, the block is considered invalid. + */ + GNUNET_BLOCK_TYPE_SETU_TEST = 24, + }; diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c index 7159a7ba2..618d53128 100644 --- a/src/seti/gnunet-service-seti.c +++ b/src/seti/gnunet-service-seti.c @@ -519,8 +519,11 @@ send_client_removed_element (struct Operation *op, struct GNUNET_MQ_Envelope *ev; struct GNUNET_SETI_ResultMessage *rm; - if (GNUNET_NO != op->return_intersection) + if (GNUNET_YES == op->return_intersection) + { + GNUNET_break (0); return; /* Wrong mode for transmitting removed elements */ + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending removed element (size %u) to client\n", element->size); @@ -1096,6 +1099,11 @@ send_remaining_elements (void *cls) const struct GNUNET_SETI_Element *element; int res; + if (GNUNET_NO == op->return_intersection) + { + GNUNET_break (0); + return; /* Wrong mode for transmitting removed elements */ + } res = GNUNET_CONTAINER_multihashmap_iterator_next ( op->full_result_iter, NULL, @@ -2191,6 +2199,7 @@ handle_client_evaluate (void *cls, op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); op->peer = msg->target_peer; + op->return_intersection = htonl (msg->return_intersection); op->client_request_id = ntohl (msg->request_id); context = GNUNET_MQ_extract_nested_mh (msg); @@ -2354,9 +2363,14 @@ handle_client_accept (void *cls, (uint32_t) ntohl (msg->accept_reject_id)); listener = op->listener; op->listener = NULL; - GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); + op->return_intersection = htonl (msg->return_intersection); + GNUNET_CONTAINER_DLL_remove (listener->op_head, + listener->op_tail, + op); op->set = set; - GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + GNUNET_CONTAINER_DLL_insert (set->ops_head, + set->ops_tail, + op); op->client_request_id = ntohl (msg->request_id); /* Advance generation values, so that future mutations do not diff --git a/src/seti/plugin_block_seti_test.c b/src/seti/plugin_block_seti_test.c index 1de086092..55cf31bea 100644 --- a/src/seti/plugin_block_seti_test.c +++ b/src/seti/plugin_block_seti_test.c @@ -19,7 +19,7 @@ */ /** - * @file set/plugin_block_set_test.c + * @file seti/plugin_block_seti_test.c * @brief set test block, recognizes elements with non-zero first byte as invalid * @author Christian Grothoff */ @@ -46,16 +46,16 @@ * @return characterization of result */ static enum GNUNET_BLOCK_EvaluationResult -block_plugin_set_test_evaluate (void *cls, - struct GNUNET_BLOCK_Context *ctx, - enum GNUNET_BLOCK_Type type, - struct GNUNET_BLOCK_Group *group, - enum GNUNET_BLOCK_EvaluationOptions eo, - const struct GNUNET_HashCode *query, - const void *xquery, - size_t xquery_size, - const void *reply_block, - size_t reply_block_size) +block_plugin_seti_test_evaluate (void *cls, + struct GNUNET_BLOCK_Context *ctx, + enum GNUNET_BLOCK_Type type, + struct GNUNET_BLOCK_Group *group, + enum GNUNET_BLOCK_EvaluationOptions eo, + const struct GNUNET_HashCode *query, + const void *xquery, + size_t xquery_size, + const void *reply_block, + size_t reply_block_size) { if ((NULL == reply_block) || (reply_block_size == 0) || @@ -77,11 +77,11 @@ block_plugin_set_test_evaluate (void *cls, * (or if extracting a key from a block of this type does not work) */ static int -block_plugin_set_test_get_key (void *cls, - enum GNUNET_BLOCK_Type type, - const void *block, - size_t block_size, - struct GNUNET_HashCode *key) +block_plugin_seti_test_get_key (void *cls, + enum GNUNET_BLOCK_Type type, + const void *block, + size_t block_size, + struct GNUNET_HashCode *key) { return GNUNET_SYSERR; } @@ -91,17 +91,17 @@ block_plugin_set_test_get_key (void *cls, * Entry point for the plugin. */ void * -libgnunet_plugin_block_set_test_init (void *cls) +libgnunet_plugin_block_seti_test_init (void *cls) { static enum GNUNET_BLOCK_Type types[] = { - GNUNET_BLOCK_TYPE_SET_TEST, + GNUNET_BLOCK_TYPE_SETI_TEST, GNUNET_BLOCK_TYPE_ANY /* end of list */ }; struct GNUNET_BLOCK_PluginFunctions *api; api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions); - api->evaluate = &block_plugin_set_test_evaluate; - api->get_key = &block_plugin_set_test_get_key; + api->evaluate = &block_plugin_seti_test_evaluate; + api->get_key = &block_plugin_seti_test_get_key; api->types = types; return api; } @@ -111,7 +111,7 @@ libgnunet_plugin_block_set_test_init (void *cls) * Exit point from the plugin. */ void * -libgnunet_plugin_block_set_test_done (void *cls) +libgnunet_plugin_block_seti_test_done (void *cls) { struct GNUNET_BLOCK_PluginFunctions *api = cls; @@ -120,4 +120,4 @@ libgnunet_plugin_block_set_test_done (void *cls) } -/* end of plugin_block_set_test.c */ +/* end of plugin_block_seti_test.c */ diff --git a/src/seti/seti_api.c b/src/seti/seti_api.c index d80a60684..337b7b219 100644 --- a/src/seti/seti_api.c +++ b/src/seti/seti_api.c @@ -411,7 +411,7 @@ GNUNET_SETI_create (const struct GNUNET_CONFIGURATION_Handle *cfg) set->cfg = cfg; set->mq = GNUNET_CLIENT_connect (cfg, - "set", + "seti", mq_handlers, &handle_client_set_error, set); @@ -549,7 +549,7 @@ GNUNET_SETI_prepare (const struct GNUNET_PeerIdentity *other_peer, switch (opt->type) { case GNUNET_SETI_OPTION_RETURN_INTERSECTION: - msg->return_intersection = GNUNET_YES; + msg->return_intersection = htonl (GNUNET_YES); break; default: LOG (GNUNET_ERROR_TYPE_ERROR, @@ -687,7 +687,7 @@ listen_connect (void *cls) lh->reconnect_task = NULL; GNUNET_assert (NULL == lh->mq); lh->mq = GNUNET_CLIENT_connect (lh->cfg, - "set", + "seti", mq_handlers, &handle_client_listener_error, lh); @@ -806,7 +806,7 @@ GNUNET_SETI_accept (struct GNUNET_SETI_Request *request, switch (opt->type) { case GNUNET_SETI_OPTION_RETURN_INTERSECTION: - oh->return_intersection = GNUNET_YES; + oh->return_intersection = htonl (GNUNET_YES); break; default: LOG (GNUNET_ERROR_TYPE_ERROR, diff --git a/src/seti/test_seti.conf b/src/seti/test_seti.conf index 21fe984f8..c87433419 100644 --- a/src/seti/test_seti.conf +++ b/src/seti/test_seti.conf @@ -3,7 +3,7 @@ [PATHS] GNUNET_TEST_HOME = $GNUNET_TMP/test-gnunet-set/ -[set] +[seti] START_ON_DEMAND = YES #PREFIX = valgrind --leak-check=full #PREFIX = gdbserver :1234 @@ -30,4 +30,3 @@ DISABLEV6 = NO # Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8) RETURN_LOCAL_ADDRESSES = YES - diff --git a/src/setu/plugin_block_setu_test.c b/src/setu/plugin_block_setu_test.c index 1de086092..fd0c8a680 100644 --- a/src/setu/plugin_block_setu_test.c +++ b/src/setu/plugin_block_setu_test.c @@ -19,7 +19,7 @@ */ /** - * @file set/plugin_block_set_test.c + * @file setu/plugin_block_setu_test.c * @brief set test block, recognizes elements with non-zero first byte as invalid * @author Christian Grothoff */ @@ -46,16 +46,16 @@ * @return characterization of result */ static enum GNUNET_BLOCK_EvaluationResult -block_plugin_set_test_evaluate (void *cls, - struct GNUNET_BLOCK_Context *ctx, - enum GNUNET_BLOCK_Type type, - struct GNUNET_BLOCK_Group *group, - enum GNUNET_BLOCK_EvaluationOptions eo, - const struct GNUNET_HashCode *query, - const void *xquery, - size_t xquery_size, - const void *reply_block, - size_t reply_block_size) +block_plugin_setu_test_evaluate (void *cls, + struct GNUNET_BLOCK_Context *ctx, + enum GNUNET_BLOCK_Type type, + struct GNUNET_BLOCK_Group *group, + enum GNUNET_BLOCK_EvaluationOptions eo, + const struct GNUNET_HashCode *query, + const void *xquery, + size_t xquery_size, + const void *reply_block, + size_t reply_block_size) { if ((NULL == reply_block) || (reply_block_size == 0) || @@ -77,11 +77,11 @@ block_plugin_set_test_evaluate (void *cls, * (or if extracting a key from a block of this type does not work) */ static int -block_plugin_set_test_get_key (void *cls, - enum GNUNET_BLOCK_Type type, - const void *block, - size_t block_size, - struct GNUNET_HashCode *key) +block_plugin_setu_test_get_key (void *cls, + enum GNUNET_BLOCK_Type type, + const void *block, + size_t block_size, + struct GNUNET_HashCode *key) { return GNUNET_SYSERR; } @@ -91,17 +91,17 @@ block_plugin_set_test_get_key (void *cls, * Entry point for the plugin. */ void * -libgnunet_plugin_block_set_test_init (void *cls) +libgnunet_plugin_block_setu_test_init (void *cls) { static enum GNUNET_BLOCK_Type types[] = { - GNUNET_BLOCK_TYPE_SET_TEST, + GNUNET_BLOCK_TYPE_SETU_TEST, GNUNET_BLOCK_TYPE_ANY /* end of list */ }; struct GNUNET_BLOCK_PluginFunctions *api; api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions); - api->evaluate = &block_plugin_set_test_evaluate; - api->get_key = &block_plugin_set_test_get_key; + api->evaluate = &block_plugin_setu_test_evaluate; + api->get_key = &block_plugin_setu_test_get_key; api->types = types; return api; } @@ -111,7 +111,7 @@ libgnunet_plugin_block_set_test_init (void *cls) * Exit point from the plugin. */ void * -libgnunet_plugin_block_set_test_done (void *cls) +libgnunet_plugin_block_setu_test_done (void *cls) { struct GNUNET_BLOCK_PluginFunctions *api = cls; @@ -120,4 +120,4 @@ libgnunet_plugin_block_set_test_done (void *cls) } -/* end of plugin_block_set_test.c */ +/* end of plugin_block_setu_test.c */ -- cgit v1.2.3 From 5660868ce7523c4bda8e25954fda0ae2d28a9702 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 19 Aug 2020 10:55:36 +0200 Subject: fix seti testcase --- src/seti/gnunet-service-seti.c | 6 ++++++ src/seti/seti_api.c | 3 ++- src/seti/test_seti_api.c | 12 ++++++++---- 3 files changed, 16 insertions(+), 5 deletions(-) (limited to 'src/seti') diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c index 618d53128..af478233b 100644 --- a/src/seti/gnunet-service-seti.c +++ b/src/seti/gnunet-service-seti.c @@ -2200,6 +2200,9 @@ handle_client_evaluate (void *cls, UINT32_MAX); op->peer = msg->target_peer; op->return_intersection = htonl (msg->return_intersection); + fprintf (stderr, + "Return intersection for evaluate is %d\n", + op->return_intersection); op->client_request_id = ntohl (msg->request_id); context = GNUNET_MQ_extract_nested_mh (msg); @@ -2364,6 +2367,9 @@ handle_client_accept (void *cls, listener = op->listener; op->listener = NULL; op->return_intersection = htonl (msg->return_intersection); + fprintf (stderr, + "Return intersection for accept is %d\n", + op->return_intersection); GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); diff --git a/src/seti/seti_api.c b/src/seti/seti_api.c index 337b7b219..5b88b0469 100644 --- a/src/seti/seti_api.c +++ b/src/seti/seti_api.c @@ -806,7 +806,8 @@ GNUNET_SETI_accept (struct GNUNET_SETI_Request *request, switch (opt->type) { case GNUNET_SETI_OPTION_RETURN_INTERSECTION: - oh->return_intersection = htonl (GNUNET_YES); + oh->return_intersection = GNUNET_YES; + msg->return_intersection = htonl (GNUNET_YES); break; default: LOG (GNUNET_ERROR_TYPE_ERROR, diff --git a/src/seti/test_seti_api.c b/src/seti/test_seti_api.c index 22a3a06e5..9074fab41 100644 --- a/src/seti/test_seti_api.c +++ b/src/seti/test_seti_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2012-2014 GNUnet e.V. + Copyright (C) 2012-2014, 2020 GNUnet e.V. GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published @@ -108,14 +108,18 @@ result_cb_set2 (void *cls, break; case GNUNET_SETI_STATUS_DONE: oh2 = NULL; - GNUNET_assert (1 == count); + GNUNET_break (1 == count); + if (1 != count) + ret |= 2; GNUNET_SETI_destroy (set2); set2 = NULL; if (NULL == set1) GNUNET_SCHEDULER_shutdown (); break; - default: - GNUNET_assert (0); + case GNUNET_SETI_STATUS_DEL_LOCAL: + /* unexpected! */ + ret = 1; + break; } } -- cgit v1.2.3