From 4d607f2f2838431cc7a349441f8f018ab99633a2 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 18 Aug 2020 18:09:58 +0200 Subject: splitting of set intersection functionality from set service (not yet finished, FTBFS) --- configure.ac | 2 + po/POTFILES.in | 5 + src/include/gnunet_protocols.h | 94 +- src/include/gnunet_seti_service.h | 369 ++++ src/include/gnunet_setu_service.h | 4 +- src/seti/.gitignore | 3 + src/seti/Makefile.am | 90 + src/seti/gnunet-service-seti.c | 3274 +++++++++++++++++++++++++++++++ src/seti/gnunet-service-seti_protocol.h | 144 ++ src/seti/gnunet-seti-profiler.c | 480 +++++ src/seti/plugin_block_seti_test.c | 123 ++ src/seti/seti.conf.in | 12 + src/seti/seti.h | 267 +++ src/seti/seti_api.c | 895 +++++++++ src/seti/test_seti.conf | 33 + src/seti/test_seti_api.c | 393 ++++ 16 files changed, 6172 insertions(+), 16 deletions(-) create mode 100644 src/include/gnunet_seti_service.h create mode 100644 src/seti/.gitignore create mode 100644 src/seti/Makefile.am create mode 100644 src/seti/gnunet-service-seti.c create mode 100644 src/seti/gnunet-service-seti_protocol.h create mode 100644 src/seti/gnunet-seti-profiler.c create mode 100644 src/seti/plugin_block_seti_test.c create mode 100644 src/seti/seti.conf.in create mode 100644 src/seti/seti.h create mode 100644 src/seti/seti_api.c create mode 100644 src/seti/test_seti.conf create mode 100644 src/seti/test_seti_api.c diff --git a/configure.ac b/configure.ac index 72309c78d..bd92bd0e9 100644 --- a/configure.ac +++ b/configure.ac @@ -1939,6 +1939,8 @@ src/scalarproduct/Makefile src/scalarproduct/scalarproduct.conf src/set/Makefile src/set/set.conf +src/seti/Makefile +src/seti/seti.conf src/setu/Makefile src/setu/setu.conf src/sq/Makefile diff --git a/po/POTFILES.in b/po/POTFILES.in index 7d19122ca..12e27fa81 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -333,6 +333,11 @@ src/set/ibf.c src/set/ibf_sim.c src/set/plugin_block_set_test.c src/set/set_api.c +src/seti/gnunet-service-set_intersection.c +src/seti/gnunet-service-seti.c +src/seti/gnunet-seti-profiler.c +src/seti/plugin_block_seti_test.c +src/seti/setu_api.c src/setu/gnunet-service-setu.c src/setu/gnunet-service-setu_strata_estimator.c src/setu/gnunet-setu-ibf-profiler.c diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index c3fcde0b9..e9a2b1c0e 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1713,80 +1713,146 @@ extern "C" { * Demand the whole element from the other * peer, given only the hash code. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 565 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 559 /** * Demand the whole element from the other * peer, given only the hash code. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 566 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 560 /** * Tell the other peer to send us a list of * hashes that match an IBF key. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 567 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 561 /** * Tell the other peer which hashes match a * given IBF key. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 568 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 562 /** * Request a set union operation from a remote peer. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 581 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 563 /** * Strata estimator. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 582 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 564 /** * Invertible bloom filter. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 583 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 565 /** * Actual set elements. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 584 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 566 /** * Requests for the elements with the given hashes. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 585 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 567 /** * Set operation is done. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 586 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 568 /** * Compressed strata estimator. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 590 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 569 /** * Request all missing elements from the other peer, * based on their sets and the elements we previously sent * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 597 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 570 /** * Send a set element, not as response to a demand but because * we're sending the full set. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 598 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 571 /** * Request all missing elements from the other peer, * based on their sets and the elements we previously sent * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. */ -#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 599 +#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572 + + +/******************************************************************************* + * SETI message types + ******************************************************************************/ + + +/** + * Cancel a set operation + */ +#define GNUNET_MESSAGE_TYPE_SETI_CANCEL 580 + +/** + * Add element to set. + */ +#define GNUNET_MESSAGE_TYPE_SETI_ADD 581 + +/** + * Create a new local set + */ +#define GNUNET_MESSAGE_TYPE_SETI_CREATE 582 + +/** + * Handle result message from operation + */ +#define GNUNET_MESSAGE_TYPE_SETI_RESULT 583 + +/** + * Evaluate a set operation + */ +#define GNUNET_MESSAGE_TYPE_SETI_EVALUATE 584 + +/** + * Listen for operation requests + */ +#define GNUNET_MESSAGE_TYPE_SETI_LISTEN 585 + +/** + * Reject a set request. + */ +#define GNUNET_MESSAGE_TYPE_SETI_REJECT 586 + +/** + * Accept an incoming set request + */ +#define GNUNET_MESSAGE_TYPE_SETI_ACCEPT 587 + +/** + * Notify the client of an incoming request from a remote peer + */ +#define GNUNET_MESSAGE_TYPE_SETI_REQUEST 588 + +/** + * Information about the element count for intersection + */ +#define GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO 591 + +/** + * Bloom filter message for intersection exchange started by Bob. + */ +#define GNUNET_MESSAGE_TYPE_SETI_P2P_BF 592 + +/** + * Intersection operation is done. + */ +#define GNUNET_MESSAGE_TYPE_SETI_P2P_DONE 593 /******************************************************************************* diff --git a/src/include/gnunet_seti_service.h b/src/include/gnunet_seti_service.h new file mode 100644 index 000000000..c0b6f41a5 --- /dev/null +++ b/src/include/gnunet_seti_service.h @@ -0,0 +1,369 @@ +/* + This file is part of GNUnet + Copyright (C) 2013, 2014, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @author Florian Dold + * @author Christian Grothoff + * + * @file + * Two-peer set intersection operations + * + * @defgroup set Set intersection service + * Two-peer set operations + * + * @{ + */ + +#ifndef GNUNET_SETI_SERVICE_H +#define GNUNET_SETI_SERVICE_H + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + +#include "gnunet_common.h" +#include "gnunet_time_lib.h" +#include "gnunet_configuration_lib.h" + + +/** + * Maximum size of a context message for set operation requests. + */ +#define GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE ((1 << 16) - 1024) + +/** + * Opaque handle to a set. + */ +struct GNUNET_SETI_Handle; + +/** + * Opaque handle to a set operation request from another peer. + */ +struct GNUNET_SETI_Request; + +/** + * Opaque handle to a listen operation. + */ +struct GNUNET_SETI_ListenHandle; + +/** + * Opaque handle to a set operation. + */ +struct GNUNET_SETI_OperationHandle; + + +/** + * Status for the result callback + */ +enum GNUNET_SETI_Status +{ + + /** + * Element should be added to the result set of the local peer, i.e. the + * element is in the intersection. + */ + GNUNET_SETI_STATUS_ADD_LOCAL, + + /** + * Element should be delete from the result set of the local peer, i.e. the + * local peer is having an element that is not in the intersection. + */ + GNUNET_SETI_STATUS_DEL_LOCAL, + + /** + * The other peer refused to do the operation with us, or something went + * wrong. + */ + GNUNET_SETI_STATUS_FAILURE, + + /** + * Success, all elements have been sent (and received). + */ + GNUNET_SETI_STATUS_DONE +}; + + +/** + * Element stored in a set. + */ +struct GNUNET_SETI_Element +{ + /** + * Number of bytes in the buffer pointed to by data. + */ + uint16_t size; + + /** + * Application-specific element type. + */ + uint16_t element_type; + + /** + * Actual data of the element + */ + const void *data; +}; + + +/** + * Possible options to pass to a set operation. + * + * Used as tag for struct #GNUNET_SETI_Option. + */ +enum GNUNET_SETI_OptionType +{ + /** + * List terminator. + */ + GNUNET_SETI_OPTION_END = 0, + + /** + * Return the elements remaining in the intersection + * (#GNUNET_SETI_STATUS_ADD_LOCAL). If not given, the default is to return a + * list of the elements to be removed (#GNUNET_SETI_STATUS_DEL_LOCAL). + */ + GNUNET_SETI_OPTION_RETURN_INTERSECTION = 1, +}; + + +/** + * Option for set operations. + */ +struct GNUNET_SETI_Option +{ + /** + * Type of the option. + */ + enum GNUNET_SETI_OptionType type; + + /** + * Value for the option, only used with some options. + */ + union + { + uint64_t num; + } v; +}; + + +/** + * Callback for set union operation results. Called for each element + * in the result set. + * + * @param cls closure + * @param element a result element, only valid if status is #GNUNET_SETI_STATUS_OK + * @param current_size current set size + * @param status see `enum GNUNET_SETI_Status` + */ +typedef void +(*GNUNET_SETI_ResultIterator) (void *cls, + const struct GNUNET_SETI_Element *element, + uint64_t current_size, + enum GNUNET_SETI_Status status); + + +/** + * Called when another peer wants to do a set operation with the + * local peer. If a listen error occurs, the @a request is NULL. + * + * @param cls closure + * @param other_peer the other peer + * @param context_msg message with application specific information from + * the other peer + * @param request request from the other peer (never NULL), use GNUNET_SETI_accept() + * to accept it, otherwise the request will be refused + * Note that we can't just return value from the listen callback, + * as it is also necessary to specify the set we want to do the + * operation with, whith sometimes can be derived from the context + * message. It's necessary to specify the timeout. + */ +typedef void +(*GNUNET_SETI_ListenCallback) (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SETI_Request *request); + + +/** + * Create an empty set, supporting the specified operation. + * + * @param cfg configuration to use for connecting to the + * set service + * @return a handle to the set + */ +struct GNUNET_SETI_Handle * +GNUNET_SETI_create (const struct GNUNET_CONFIGURATION_Handle *cfg); + + +/** + * Add an element to the given set. + * + * @param set set to add element to + * @param element element to add to the set + * @param cb function to call when finished, can be NULL + * @param cb_cls closure for @a cb + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETI_add_element (struct GNUNET_SETI_Handle *set, + const struct GNUNET_SETI_Element *element, + GNUNET_SCHEDULER_TaskCallback cb, + void *cb_cls); + + +/** + * Destroy the set handle, and free all associated resources. Operations may + * still be pending when a set is destroyed (and will be allowed to complete). + * + * @param set set to destroy + */ +void +GNUNET_SETI_destroy (struct GNUNET_SETI_Handle *set); + + +/** + * Prepare a set operation to be evaluated with another peer. The evaluation + * will not start until the client provides a local set with + * GNUNET_SETI_commit(). + * + * @param other_peer peer with the other set + * @param app_id hash for the application using the set + * @param context_msg additional information for the request + * @param options options to use when processing the request + * @param result_cb called on error or success + * @param result_cls closure for @a result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETI_OperationHandle * +GNUNET_SETI_prepare (const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_HashCode *app_id, + const struct GNUNET_MessageHeader *context_msg, + const struct GNUNET_SETI_Option options[], + GNUNET_SETI_ResultIterator result_cb, + void *result_cls); + + +/** + * Wait for set operation requests for the given application ID. + * If the connection to the set service is lost, the listener is + * re-created transparently with exponential backoff. + * + * @param cfg configuration to use for connecting to + * the set service + * @param app_id id of the application that handles set operation requests + * @param listen_cb called for each incoming request matching the operation + * and application id + * @param listen_cls handle for @a listen_cb + * @return a handle that can be used to cancel the listen operation + */ +struct GNUNET_SETI_ListenHandle * +GNUNET_SETI_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HashCode *app_id, + GNUNET_SETI_ListenCallback listen_cb, + void *listen_cls); + + +/** + * Cancel the given listen operation. After calling cancel, the + * listen callback for this listen handle will not be called again. + * Note that cancelling a listen operation will automatically reject + * all operations that have not yet been accepted. + * + * @param lh handle for the listen operation + */ +void +GNUNET_SETI_listen_cancel (struct GNUNET_SETI_ListenHandle *lh); + + +/** + * Accept a request we got via GNUNET_SETI_listen(). Must be called during + * GNUNET_SETI_listen(), as the `struct GNUNET_SETI_Request` becomes invalid + * afterwards. + * Call GNUNET_SETI_commit() to provide the local set to use for the operation, + * and to begin the exchange with the remote peer. + * + * @param request request to accept + * @param options options to use when processing the request + * @param result_cb callback for the results + * @param result_cls closure for @a result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETI_OperationHandle * +GNUNET_SETI_accept (struct GNUNET_SETI_Request *request, + const struct GNUNET_SETI_Option options[], + GNUNET_SETI_ResultIterator result_cb, + void *result_cls); + + +/** + * Commit a set to be used with a set operation. + * This function is called once we have fully constructed + * the set that we want to use for the operation. At this + * time, the P2P protocol can then begin to exchange the + * set information and call the result callback with the + * result information. + * + * @param oh handle to the set operation + * @param set the set to use for the operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETI_commit (struct GNUNET_SETI_OperationHandle *oh, + struct GNUNET_SETI_Handle *set); + + +/** + * Cancel the given set operation. May not be called after the operation's + * `GNUNET_SETI_ResultIterator` has been called with a status of + * #GNUNET_SETI_STATUS_FAILURE or #GNUNET_SETI_STATUS_DONE. + * + * @param oh set operation to cancel + */ +void +GNUNET_SETI_operation_cancel (struct GNUNET_SETI_OperationHandle *oh); + + +/** + * Hash a set element. + * + * @param element the element that should be hashed + * @param[out] ret_hash a pointer to where the hash of @a element + * should be stored + */ +void +GNUNET_SETI_element_hash (const struct GNUNET_SETI_Element *element, + struct GNUNET_HashCode *ret_hash); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +#endif + +/** @} */ /* end of group */ diff --git a/src/include/gnunet_setu_service.h b/src/include/gnunet_setu_service.h index 092c03198..d737b97c1 100644 --- a/src/include/gnunet_setu_service.h +++ b/src/include/gnunet_setu_service.h @@ -22,9 +22,9 @@ * @author Christian Grothoff * * @file - * Two-peer set operations + * Two-peer set union operations * - * @defgroup set Set service + * @defgroup set Set union service * Two-peer set operations * * @{ diff --git a/src/seti/.gitignore b/src/seti/.gitignore new file mode 100644 index 000000000..5f234a4c2 --- /dev/null +++ b/src/seti/.gitignore @@ -0,0 +1,3 @@ +gnunet-seti-profiler +gnunet-service-seti +test_seti_api diff --git a/src/seti/Makefile.am b/src/seti/Makefile.am new file mode 100644 index 000000000..d96ffff03 --- /dev/null +++ b/src/seti/Makefile.am @@ -0,0 +1,90 @@ +# This Makefile.am is in the public domain +AM_CPPFLAGS = -I$(top_srcdir)/src/include + +pkgcfgdir= $(pkgdatadir)/config.d/ + +libexecdir= $(pkglibdir)/libexec/ + +plugindir = $(libdir)/gnunet + +pkgcfg_DATA = \ + seti.conf + +if USE_COVERAGE + AM_CFLAGS = -fprofile-arcs -ftest-coverage +endif + +if HAVE_TESTING +bin_PROGRAMS = \ + gnunet-seti-profiler +endif + +libexec_PROGRAMS = \ + gnunet-service-seti + +lib_LTLIBRARIES = \ + libgnunetseti.la + +gnunet_seti_profiler_SOURCES = \ + gnunet-seti-profiler.c +gnunet_seti_profiler_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + libgnunetseti.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(GN_LIBINTL) + + +gnunet_service_seti_SOURCES = \ + gnunet-service-seti.c \ + gnunet-service-set_protocol.h +gnunet_service_seti_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/cadet/libgnunetcadet.la \ + $(top_builddir)/src/block/libgnunetblock.la \ + libgnunetseti.la \ + $(GN_LIBINTL) + +libgnunetseti_la_SOURCES = \ + seti_api.c seti.h +libgnunetseti_la_LIBADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(LTLIBINTL) +libgnunetseti_la_LDFLAGS = \ + $(GN_LIB_LDFLAGS) + +if HAVE_TESTING +check_PROGRAMS = \ + test_seti_api +endif + +if ENABLE_TEST_RUN +AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME; +TESTS = $(check_PROGRAMS) +endif + +test_seti_api_SOURCES = \ + test_seti_api.c +test_seti_api_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + libgnunetset.la + +plugin_LTLIBRARIES = \ + libgnunet_plugin_block_seti_test.la + +libgnunet_plugin_block_seti_test_la_SOURCES = \ + plugin_block_seti_test.c +libgnunet_plugin_block_seti_test_la_LIBADD = \ + $(top_builddir)/src/block/libgnunetblock.la \ + $(top_builddir)/src/block/libgnunetblockgroup.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(LTLIBINTL) +libgnunet_plugin_block_seti_test_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) + + +EXTRA_DIST = \ + test_seti.conf diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c new file mode 100644 index 000000000..3b8da01cd --- /dev/null +++ b/src/seti/gnunet-service-seti.c @@ -0,0 +1,3274 @@ +/* + This file is part of GNUnet + Copyright (C) 2013-2017, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/gnunet-service-seti.c + * @brief two-peer set intersection operations + * @author Florian Dold + * @author Christian Grothoff + */ +#include "gnunet-service-seti_protocol.h" +#include "gnunet_statistics_service.h" + +/** + * How long do we hold on to an incoming channel if there is + * no local listener before giving up? + */ +#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES + + +/** + * Current phase we are in for a intersection operation. + */ +enum IntersectionOperationPhase +{ + /** + * We are just starting. + */ + PHASE_INITIAL, + + /** + * We have send the number of our elements to the other + * peer, but did not setup our element set yet. + */ + PHASE_COUNT_SENT, + + /** + * We have initialized our set and are now reducing it by exchanging + * Bloom filters until one party notices the their element hashes + * are equal. + */ + PHASE_BF_EXCHANGE, + + /** + * We must next send the P2P DONE message (after finishing mostly + * with the local client). Then we will wait for the channel to close. + */ + PHASE_MUST_SEND_DONE, + + /** + * We have received the P2P DONE message, and must finish with the + * local client before terminating the channel. + */ + PHASE_DONE_RECEIVED, + + /** + * The protocol is over. Results may still have to be sent to the + * client. + */ + PHASE_FINISHED +}; + + +/** + * Implementation-specific set state. Used as opaque pointer, and + * specified further in the respective implementation. + */ +struct SetState; + +/** + * Implementation-specific set operation. Used as opaque pointer, and + * specified further in the respective implementation. + */ +struct OperationState; + +/** + * A set that supports a specific operation with other peers. + */ +struct Set; + +/** + * Information about an element element in the set. All elements are + * stored in a hash-table from their hash-code to their 'struct + * Element', so that the remove and add operations are reasonably + * fast. + */ +struct ElementEntry; + +/** + * Operation context used to execute a set operation. + */ +struct Operation; + + +/** + * MutationEvent gives information about changes + * to an element (removal / addition) in a set content. + */ +struct MutationEvent +{ + /** + * First generation affected by this mutation event. + * + * If @a generation is 0, this mutation event is a list + * sentinel element. + */ + unsigned int generation; + + /** + * If @a added is #GNUNET_YES, then this is a + * `remove` event, otherwise it is an `add` event. + */ + int added; +}; + + +/** + * Information about an element element in the set. All elements are + * stored in a hash-table from their hash-code to their `struct + * Element`, so that the remove and add operations are reasonably + * fast. + */ +struct ElementEntry +{ + /** + * The actual element. The data for the element + * should be allocated at the end of this struct. + */ + struct GNUNET_SET_Element element; + + /** + * Hash of the element. For set union: Will be used to derive the + * different IBF keys for different salts. + */ + struct GNUNET_HashCode element_hash; + + /** + * If @a mutations is not NULL, it contains + * a list of mutations, ordered by increasing generation. + * The list is terminated by a sentinel event with `generation` + * set to 0. + * + * If @a mutations is NULL, then this element exists in all generations + * of the respective set content this element belongs to. + */ + struct MutationEvent *mutations; + + /** + * Number of elements in the array @a mutations. + */ + unsigned int mutations_size; + + /** + * #GNUNET_YES if the element is a remote element, and does not belong + * to the operation's set. + */ + int remote; +}; + + +/** + * A listener is inhabited by a client, and waits for evaluation + * requests from remote peers. + */ +struct Listener; + + +/** + * State we keep per client. + */ +struct ClientState +{ + /** + * Set, if associated with the client, otherwise NULL. + */ + struct Set *set; + + /** + * Listener, if associated with the client, otherwise NULL. + */ + struct Listener *listener; + + /** + * Client handle. + */ + struct GNUNET_SERVICE_Client *client; + + /** + * Message queue. + */ + struct GNUNET_MQ_Handle *mq; +}; + + +/** + * Operation context used to execute a set operation. + */ +struct Operation +{ + /** + * Kept in a DLL of the listener, if @e listener is non-NULL. + */ + struct Operation *next; + + /** + * Kept in a DLL of the listener, if @e listener is non-NULL. + */ + struct Operation *prev; + + /** + * Channel to the peer. + */ + struct GNUNET_CADET_Channel *channel; + + /** + * Port this operation runs on. + */ + struct Listener *listener; + + /** + * Message queue for the channel. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Context message, may be NULL. + */ + struct GNUNET_MessageHeader *context_msg; + + /** + * Set associated with the operation, NULL until the spec has been + * associated with a set. + */ + struct Set *set; + + /** + * Operation-specific operation state. Note that the exact + * type depends on this being a union or intersection operation + * (and thus on @e vt). + */ + struct OperationState *state; // FIXME: inline + + /** + * The identity of the requesting peer. Needs to + * be stored here as the op spec might not have been created yet. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Timeout task, if the incoming peer has not been accepted + * after the timeout, it will be disconnected. + */ + struct GNUNET_SCHEDULER_Task *timeout_task; + + /** + * Salt to use for the operation. + */ + uint32_t salt; + + /** + * Remote peers element count + */ + uint32_t remote_element_count; + + /** + * ID used to identify an operation between service and client + */ + uint32_t client_request_id; + + /** + * When are elements sent to the client, and which elements are sent? + */ + enum GNUNET_SET_ResultMode result_mode; + + /** + * Always use delta operation instead of sending full sets, + * even it it's less efficient. + */ + int force_delta; + + /** + * Always send full sets, even if delta operations would + * be more efficient. + */ + int force_full; + + /** + * #GNUNET_YES to fail operations where Byzantine faults + * are suspected + */ + int byzantine; + + /** + * Lower bound for the set size, used only when + * byzantine mode is enabled. + */ + int byzantine_lower_bound; + + /** + * Unique request id for the request from a remote peer, sent to the + * client, which will accept or reject the request. Set to '0' iff + * the request has not been suggested yet. + */ + uint32_t suggest_id; + + /** + * Generation in which the operation handle + * was created. + */ + unsigned int generation_created; +}; + + +/** + * SetContent stores the actual set elements, which may be shared by + * multiple generations derived from one set. + */ +struct SetContent +{ + /** + * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`. + */ + struct GNUNET_CONTAINER_MultiHashMap *elements; + + /** + * Mutations requested by the client that we're + * unable to execute right now because we're iterating + * over the underlying hash map of elements. + */ + struct PendingMutation *pending_mutations_head; + + /** + * Mutations requested by the client that we're + * unable to execute right now because we're iterating + * over the underlying hash map of elements. + */ + struct PendingMutation *pending_mutations_tail; + + /** + * Number of references to the content. + */ + unsigned int refcount; + + /** + * FIXME: document! + */ + unsigned int latest_generation; + + /** + * Number of concurrently active iterators. + */ + int iterator_count; +}; + + +struct GenerationRange +{ + /** + * First generation that is excluded. + */ + unsigned int start; + + /** + * Generation after the last excluded generation. + */ + unsigned int end; +}; + + +/** + * Information about a mutation to apply to a set. + */ +struct PendingMutation +{ + /** + * Mutations are kept in a DLL. + */ + struct PendingMutation *prev; + + /** + * Mutations are kept in a DLL. + */ + struct PendingMutation *next; + + /** + * Set this mutation is about. + */ + struct Set *set; + + /** + * Message that describes the desired mutation. + * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or + * #GNUNET_MESSAGE_TYPE_SET_REMOVE. + */ + struct GNUNET_SET_ElementMessage *msg; +}; + + +/** + * A set that supports a specific operation with other peers. + */ +struct Set +{ + /** + * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`). + */ + struct Set *next; + + /** + * Sets are held in a doubly linked list. + */ + struct Set *prev; + + /** + * Client that owns the set. Only one client may own a set, + * and there can only be one set per client. + */ + struct ClientState *cs; + + /** + * Content, possibly shared by multiple sets, + * and thus reference counted. + */ + struct SetContent *content; + + /** + * Implementation-specific state. + */ + struct SetState *state; + + /** + * Current state of iterating elements for the client. + * NULL if we are not currently iterating. + */ + struct GNUNET_CONTAINER_MultiHashMapIterator *iter; + + /** + * Evaluate operations are held in a linked list. + */ + struct Operation *ops_head; + + /** + * Evaluate operations are held in a linked list. + */ + struct Operation *ops_tail; + + /** + * List of generations we have to exclude, due to lazy copies. + */ + struct GenerationRange *excluded_generations; + + /** + * Current generation, that is, number of previously executed + * operations and lazy copies on the underlying set content. + */ + unsigned int current_generation; + + /** + * Number of elements in array @a excluded_generations. + */ + unsigned int excluded_generations_size; + + /** + * Type of operation supported for this set + */ + enum GNUNET_SET_OperationType operation; + + /** + * Generation we're currently iteration over. + */ + unsigned int iter_generation; + + /** + * Each @e iter is assigned a unique number, so that the client + * can distinguish iterations. + */ + uint16_t iteration_id; +}; + + +/** + * State of an evaluate operation with another peer. + */ +struct OperationState +{ + /** + * The bf we currently receive + */ + struct GNUNET_CONTAINER_BloomFilter *remote_bf; + + /** + * BF of the set's element. + */ + struct GNUNET_CONTAINER_BloomFilter *local_bf; + + /** + * Remaining elements in the intersection operation. + * Maps element-id-hashes to 'elements in our set'. + */ + struct GNUNET_CONTAINER_MultiHashMap *my_elements; + + /** + * Iterator for sending the final set of @e my_elements to the client. + */ + struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter; + + /** + * Evaluate operations are held in a linked list. + */ + struct OperationState *next; + + /** + * Evaluate operations are held in a linked list. + */ + struct OperationState *prev; + + /** + * For multipart BF transmissions, we have to store the + * bloomfilter-data until we fully received it. + */ + char *bf_data; + + /** + * XOR of the keys of all of the elements (remaining) in my set. + * Always updated when elements are added or removed to + * @e my_elements. + */ + struct GNUNET_HashCode my_xor; + + /** + * XOR of the keys of all of the elements (remaining) in + * the other peer's set. Updated when we receive the + * other peer's Bloom filter. + */ + struct GNUNET_HashCode other_xor; + + /** + * How many bytes of @e bf_data are valid? + */ + uint32_t bf_data_offset; + + /** + * Current element count contained within @e my_elements. + * (May differ briefly during initialization.) + */ + uint32_t my_element_count; + + /** + * size of the bloomfilter in @e bf_data. + */ + uint32_t bf_data_size; + + /** + * size of the bloomfilter + */ + uint32_t bf_bits_per_element; + + /** + * Salt currently used for BF construction (by us or the other peer, + * depending on where we are in the code). + */ + uint32_t salt; + + /** + * Current state of the operation. + */ + enum IntersectionOperationPhase phase; + + /** + * Generation in which the operation handle + * was created. + */ + unsigned int generation_created; + + /** + * Did we send the client that we are done? + */ + int client_done_sent; + + /** + * Set whenever we reach the state where the death of the + * channel is perfectly find and should NOT result in the + * operation being cancelled. + */ + int channel_death_expected; +}; + + +/** + * Extra state required for efficient set intersection. + * Merely tracks the total number of elements. + */ +struct SetState +{ + /** + * Number of currently valid elements in the set which have not been + * removed. + */ + uint32_t current_set_element_count; +}; + + +/** + * A listener is inhabited by a client, and waits for evaluation + * requests from remote peers. + */ +struct Listener +{ + /** + * Listeners are held in a doubly linked list. + */ + struct Listener *next; + + /** + * Listeners are held in a doubly linked list. + */ + struct Listener *prev; + + /** + * Head of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. + */ + struct Operation *op_head; + + /** + * Tail of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. + */ + struct Operation *op_tail; + + /** + * Client that owns the listener. + * Only one client may own a listener. + */ + struct ClientState *cs; + + /** + * The port we are listening on with CADET. + */ + struct GNUNET_CADET_Port *open_port; + + /** + * Application ID for the operation, used to distinguish + * multiple operations of the same type with the same peer. + */ + struct GNUNET_HashCode app_id; + + /** + * The type of the operation. + */ + enum GNUNET_SET_OperationType operation; +}; + + +/** + * Handle to the cadet service, used to listen for and connect to + * remote peers. + */ +static struct GNUNET_CADET_Handle *cadet; + +/** + * Statistics handle. + */ +static struct GNUNET_STATISTICS_Handle *_GSS_statistics; + +/** + * Listeners are held in a doubly linked list. + */ +static struct Listener *listener_head; + +/** + * Listeners are held in a doubly linked list. + */ +static struct Listener *listener_tail; + +/** + * Number of active clients. + */ +static unsigned int num_clients; + +/** + * Are we in shutdown? if #GNUNET_YES and the number of clients + * drops to zero, disconnect from CADET. + */ +static int in_shutdown; + +/** + * Counter for allocating unique IDs for clients, used to identify + * incoming operation requests from remote peers, that the client can + * choose to accept or refuse. 0 must not be used (reserved for + * uninitialized). + */ +static uint32_t suggest_id; + + +/** + * If applicable in the current operation mode, send a result message + * to the client indicating we removed an element. + * + * @param op intersection operation + * @param element element to send + */ +static void +send_client_removed_element (struct Operation *op, + struct GNUNET_SET_Element *element) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + + if (GNUNET_SET_RESULT_REMOVED != op->result_mode) + return; /* Wrong mode for transmitting removed elements */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending removed element (size %u) to client\n", + element->size); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Element removed messages sent", + 1, + GNUNET_NO); + GNUNET_assert (0 != op->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == ev) + { + GNUNET_break (0); + return; + } + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (op->client_request_id); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_send (op->set->cs->mq, + ev); +} + + +/** + * Fills the "my_elements" hashmap with all relevant elements. + * + * @param cls the `struct Operation *` we are performing + * @param key current key code + * @param value the `struct ElementEntry *` from the hash map + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +filtered_map_initialization (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; + + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "FIMA called for %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u (wrong generation)\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + return GNUNET_YES; /* element not valid in our operation's generation */ + } + + /* Test if element is in other peer's bloomfilter */ + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + &mutated_hash)) + { + /* remove this element */ + send_client_removed_element (op, + &ee->element); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + return GNUNET_YES; + } + op->state->my_element_count++; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Filtered initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + + return GNUNET_YES; +} + + +/** + * Removes elements from our hashmap if they are not contained within the + * provided remote bloomfilter. + * + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +iterator_bf_reduce (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; + + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + &mutated_hash)) + { + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, keeping %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + } + return GNUNET_YES; +} + + +/** + * Create initial bloomfilter based on all the elements given. + * + * @param cls the `struct Operation *` + * @param key current key code + * @param value the `struct ElementEntry` to process + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +iterator_bf_create (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; + + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initializing BF with hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, + &mutated_hash); + return GNUNET_YES; +} + + +/** + * Inform the client that the intersection operation has failed, + * and proceed to destroy the evaluate operation. + * + * @param op the intersection operation to fail + */ +static void +fail_intersection_operation (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *msg; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Intersection operation failed\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Intersection operations failed", + 1, + GNUNET_NO); + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); + op->state->my_elements = NULL; + } + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_RESULT); + msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); + msg->request_id = htonl (op->client_request_id); + msg->element_type = htons (0); + GNUNET_MQ_send (op->set->cs->mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); +} + + +/** + * Send a bloomfilter to our peer. After the result done message has + * been sent to the client, destroy the evaluate operation. + * + * @param op intersection operation + */ +static void +send_bloomfilter (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct BFMessage *msg; + uint32_t bf_size; + uint32_t bf_elementbits; + uint32_t chunk_size; + char *bf_data; + uint32_t offset; + + /* We consider the ratio of the set sizes to determine + the number of bits per element, as the smaller set + should use more bits to maximize its set reduction + potential and minimize overall bandwidth consumption. */ + bf_elementbits = 2 + ceil (log2 ((double) + (op->remote_element_count + / (double) op->state->my_element_count))); + if (bf_elementbits < 1) + bf_elementbits = 1; /* make sure k is not 0 */ + /* optimize BF-size to ~50% of bits set */ + bf_size = ceil ((double) (op->state->my_element_count + * bf_elementbits / log (2))); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending Bloom filter (%u) of size %u bytes\n", + (unsigned int) bf_elementbits, + (unsigned int) bf_size); + op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + bf_size, + bf_elementbits); + op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &iterator_bf_create, + op); + + /* send our Bloom filter */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# Intersection Bloom filters sent", + 1, + GNUNET_NO); + chunk_size = 60 * 1024 - sizeof(struct BFMessage); + if (bf_size <= chunk_size) + { + /* singlepart */ + chunk_size = bf_size; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_bloomfilter_get_raw_data ( + op->state->local_bf, + (char *) &msg[1], + bf_size)); + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); + } + else + { + /* multipart */ + bf_data = GNUNET_malloc (bf_size); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_bloomfilter_get_raw_data ( + op->state->local_bf, + bf_data, + bf_size)); + offset = 0; + while (offset < bf_size) + { + if (bf_size - chunk_size < offset) + chunk_size = bf_size - offset; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_memcpy (&msg[1], + &bf_data[offset], + chunk_size); + offset += chunk_size; + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); + } + GNUNET_free (bf_data); + } + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; +} + + +/** + * Signal to the client that the operation has finished and + * destroy the operation. + * + * @param cls operation to destroy + */ +static void +send_client_done_and_destroy (void *cls) +{ + struct Operation *op = cls; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE to local client\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# Intersection operations succeeded", + 1, + GNUNET_NO); + ev = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (op->client_request_id); + rm->result_status = htons (GNUNET_SET_STATUS_DONE); + rm->element_type = htons (0); + GNUNET_MQ_send (op->set->cs->mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); +} + + +/** + * Remember that we are done dealing with the local client + * AND have sent the other peer our message that we are done, + * so we are not just waiting for the channel to die before + * telling the local client that we are done as our last act. + * + * @param cls the `struct Operation`. + */ +static void +finished_local_operations (void *cls) +{ + struct Operation *op = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "DONE sent to other peer, now waiting for other end to close the channel\n"); + op->state->phase = PHASE_FINISHED; + op->state->channel_death_expected = GNUNET_YES; +} + + +/** + * Notify the other peer that we are done. Once this message + * is out, we still need to notify the local client that we + * are done. + * + * @param op operation to notify for. + */ +static void +send_p2p_done (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct IntersectionDoneMessage *idm; + + GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); + GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); + ev = GNUNET_MQ_msg (idm, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); + idm->final_element_count = htonl (op->state->my_element_count); + idm->element_xor_hash = op->state->my_xor; + GNUNET_MQ_notify_sent (ev, + &finished_local_operations, + op); + GNUNET_MQ_send (op->mq, + ev); +} + + +/** + * Send all elements in the full result iterator. + * + * @param cls the `struct Operation *` + */ +static void +send_remaining_elements (void *cls) +{ + struct Operation *op = cls; + const void *nxt; + const struct ElementEntry *ee; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + const struct GNUNET_SET_Element *element; + int res; + + res = GNUNET_CONTAINER_multihashmap_iterator_next ( + op->state->full_result_iter, + NULL, + &nxt); + if (GNUNET_NO == res) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending done and destroy because iterator ran out\n"); + GNUNET_CONTAINER_multihashmap_iterator_destroy ( + op->state->full_result_iter); + op->state->full_result_iter = NULL; + if (PHASE_DONE_RECEIVED == op->state->phase) + { + op->state->phase = PHASE_FINISHED; + send_client_done_and_destroy (op); + } + else if (PHASE_MUST_SEND_DONE == op->state->phase) + { + send_p2p_done (op); + } + else + { + GNUNET_assert (0); + } + return; + } + ee = nxt; + element = &ee->element; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending element %s:%u to client (full set)\n", + GNUNET_h2s (&ee->element_hash), + element->size); + GNUNET_assert (0 != op->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_assert (NULL != ev); + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (op->client_request_id); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_notify_sent (ev, + &send_remaining_elements, + op); + GNUNET_MQ_send (op->set->cs->mq, + ev); +} + + +/** + * Fills the "my_elements" hashmap with the initial set of + * (non-deleted) elements from the set of the specification. + * + * @param cls closure with the `struct Operation *` + * @param key current key code for the element + * @param value value in the hash map with the `struct ElementEntry *` + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +initialize_map_unfiltered (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ElementEntry *ee = value; + struct Operation *op = cls; + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + return GNUNET_YES; /* element not live in operation's generation */ + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initial full initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return GNUNET_YES; +} + + +/** + * Send our element count to the peer, in case our element count is + * lower than theirs. + * + * @param op intersection operation + */ +static void +send_element_count (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct IntersectionElementInfoMessage *msg; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending our element count (%u)\n", + op->state->my_element_count); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); + msg->sender_element_count = htonl (op->state->my_element_count); + GNUNET_MQ_send (op->mq, ev); +} + + +/** + * We go first, initialize our map with all elements and + * send the first Bloom filter. + * + * @param op operation to start exchange for + */ +static void +begin_bf_exchange (struct Operation *op) +{ + op->state->phase = PHASE_BF_EXCHANGE; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &initialize_map_unfiltered, + op); + send_bloomfilter (op); +} + + +/** + * Handle the initial `struct IntersectionElementInfoMessage` from a + * remote peer. + * + * @param cls the intersection operation + * @param mh the header of the message + */ +void +handle_intersection_p2p_element_info (void *cls, + const struct + IntersectionElementInfoMessage *msg) +{ + struct Operation *op = cls; + + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + op->remote_element_count = ntohl (msg->sender_element_count); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received remote element count (%u), I have %u\n", + op->remote_element_count, + op->state->my_element_count); + if (((PHASE_INITIAL != op->state->phase) && + (PHASE_COUNT_SENT != op->state->phase)) || + (op->state->my_element_count > op->remote_element_count) || + (0 == op->state->my_element_count) || + (0 == op->remote_element_count)) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_break (NULL == op->state->remote_bf); + begin_bf_exchange (op); + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Process a Bloomfilter once we got all the chunks. + * + * @param op the intersection operation + */ +static void +process_bf (struct Operation *op) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", + op->state->phase, + op->remote_element_count, + op->state->my_element_count, + GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); + switch (op->state->phase) + { + case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_COUNT_SENT: + /* This is the first BF being sent, build our initial map with + filtering in place */ + op->state->my_element_count = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &filtered_map_initialization, + op); + break; + + case PHASE_BF_EXCHANGE: + /* Update our set by reduction */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &iterator_bf_reduce, + op); + break; + + case PHASE_MUST_SEND_DONE: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_DONE_RECEIVED: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_FINISHED: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); + op->state->remote_bf = NULL; + + if ((0 == op->state->my_element_count) || /* fully disjoint */ + ((op->state->my_element_count == op->remote_element_count) && + (0 == GNUNET_memcmp (&op->state->my_xor, + &op->state->other_xor)))) + { + /* we are done */ + op->state->phase = PHASE_MUST_SEND_DONE; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE to other peer\n"); + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; + if (GNUNET_SET_RESULT_FULL == op->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create ( + op->state->my_elements); + send_remaining_elements (op); + return; + } + send_p2p_done (op); + return; + } + op->state->phase = PHASE_BF_EXCHANGE; + send_bloomfilter (op); +} + + +/** + * Check an BF message from a remote peer. + * + * @param cls the intersection operation + * @param msg the header of the message + * @return #GNUNET_OK if @a msg is well-formed + */ +static int +check_intersection_p2p_bf (void *cls, + const struct BFMessage *msg) +{ + struct Operation *op = cls; + + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle an BF message from a remote peer. + * + * @param cls the intersection operation + * @param msg the header of the message + */ +static +handle_intersection_p2p_bf (void *cls, + const struct BFMessage *msg) +{ + struct Operation *op = cls; + uint32_t bf_size; + uint32_t chunk_size; + uint32_t bf_bits_per_element; + + switch (op->state->phase) + { + case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + + case PHASE_COUNT_SENT: + case PHASE_BF_EXCHANGE: + bf_size = ntohl (msg->bloomfilter_total_length); + bf_bits_per_element = ntohl (msg->bits_per_element); + chunk_size = htons (msg->header.size) - sizeof(struct BFMessage); + op->state->other_xor = msg->element_xor_hash; + if (bf_size == chunk_size) + { + if (NULL != op->state->bf_data) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + /* single part, done here immediately */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1], + bf_size, + bf_bits_per_element); + op->state->salt = ntohl (msg->sender_mutator); + op->remote_element_count = ntohl (msg->sender_element_count); + process_bf (op); + break; + } + /* multipart chunk */ + if (NULL == op->state->bf_data) + { + /* first chunk, initialize */ + op->state->bf_data = GNUNET_malloc (bf_size); + op->state->bf_data_size = bf_size; + op->state->bf_bits_per_element = bf_bits_per_element; + op->state->bf_data_offset = 0; + op->state->salt = ntohl (msg->sender_mutator); + op->remote_element_count = ntohl (msg->sender_element_count); + } + else + { + /* increment */ + if ((op->state->bf_data_size != bf_size) || + (op->state->bf_bits_per_element != bf_bits_per_element) || + (op->state->bf_data_offset + chunk_size > bf_size) || + (op->state->salt != ntohl (msg->sender_mutator)) || + (op->remote_element_count != ntohl (msg->sender_element_count))) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + } + GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset], + (const char *) &msg[1], + chunk_size); + op->state->bf_data_offset += chunk_size; + if (op->state->bf_data_offset == bf_size) + { + /* last chunk, run! */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, + bf_size, + bf_bits_per_element); + GNUNET_free (op->state->bf_data); + op->state->bf_data = NULL; + op->state->bf_data_size = 0; + process_bf (op); + } + break; + + default: + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Remove all elements from our hashmap. + * + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +filter_all (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Final reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); + return GNUNET_YES; +} + + +/** + * Handle a done message from a remote peer + * + * @param cls the intersection operation + * @param mh the message + */ +static void +handle_intersection_p2p_done (void *cls, + const struct IntersectionDoneMessage *idm) +{ + struct Operation *op = cls; + + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + if (PHASE_BF_EXCHANGE != op->state->phase) + { + /* wrong phase to conclude? FIXME: Or should we allow this + if the other peer has _initially_ already an empty set? */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + if (0 == ntohl (idm->final_element_count)) + { + /* other peer determined empty set is the intersection, + remove all elements */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &filter_all, + op); + } + if ((op->state->my_element_count != ntohl (idm->final_element_count)) || + (0 != GNUNET_memcmp (&op->state->my_xor, + &idm->element_xor_hash))) + { + /* Other peer thinks we are done, but we disagree on the result! */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got IntersectionDoneMessage, have %u elements in intersection\n", + op->state->my_element_count); + op->state->phase = PHASE_DONE_RECEIVED; + GNUNET_CADET_receive_done (op->channel); + + GNUNET_assert (GNUNET_NO == op->state->client_done_sent); + if (GNUNET_SET_RESULT_FULL == op->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set to client (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + send_remaining_elements (op); + return; + } + op->state->phase = PHASE_FINISHED; + send_client_done_and_destroy (op); +} + + +/** + * Initiate a set intersection operation with a remote peer. + * + * @param op operation that is created, should be initialized to + * begin the evaluation + * @param opaque_context message to be transmitted to the listener + * to convince it to accept, may be NULL + * @return operation-specific state to keep in @a op + */ +static struct OperationState * +intersection_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) +{ + struct OperationState *state; + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) + { + /* the context message is too large!? */ + GNUNET_break (0); + return NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initiating intersection operation evaluation\n"); + state = GNUNET_new (struct OperationState); + /* we started the operation, thus we have to send the operation request */ + state->phase = PHASE_INITIAL; + state->my_element_count = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, + GNUNET_YES); + + msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); + msg->element_count = htonl (state->my_element_count); + GNUNET_MQ_send (op->mq, + ev); + state->phase = PHASE_COUNT_SENT; + if (NULL != opaque_context) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request with context message\n"); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request without context message\n"); + return state; +} + + +/** + * Accept an intersection operation request from a remote peer. Only + * initializes the private operation state. + * + * @param op operation that will be accepted as an intersection operation + */ +static struct OperationState * +intersection_accept (struct Operation *op) +{ + struct OperationState *state; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Accepting set intersection operation\n"); + state = GNUNET_new (struct OperationState); + state->phase = PHASE_INITIAL; + state->my_element_count + = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count, + op->remote_element_count), + GNUNET_YES); + op->state = state; + if (op->remote_element_count < state->my_element_count) + { + /* If the other peer (Alice) has fewer elements than us (Bob), + we just send the count as Alice should send the first BF */ + send_element_count (op); + state->phase = PHASE_COUNT_SENT; + return state; + } + /* We have fewer elements, so we start with the BF */ + begin_bf_exchange (op); + return state; +} + + +/** + * Destroy the intersection operation. Only things specific to the + * intersection operation are destroyed. + * + * @param op intersection operation to destroy + */ +static void +intersection_op_cancel (struct Operation *op) +{ + /* check if the op was canceled twice */ + GNUNET_assert (NULL != op->state); + if (NULL != op->state->remote_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); + op->state->remote_bf = NULL; + } + if (NULL != op->state->local_bf) + { + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; + } + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); + op->state->my_elements = NULL; + } + if (NULL != op->state->full_result_iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy ( + op->state->full_result_iter); + op->state->full_result_iter = NULL; + } + GNUNET_free (op->state); + op->state = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying intersection op state done\n"); +} + + +/** + * Create a new set supporting the intersection operation. + * + * @return the newly created set + */ +static struct SetState * +intersection_set_create () +{ + struct SetState *set_state; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection set created\n"); + set_state = GNUNET_new (struct SetState); + set_state->current_set_element_count = 0; + + return set_state; +} + + +/** + * Add the element from the given element message to the set. + * + * @param set_state state of the set want to add to + * @param ee the element to add to the set + */ +static void +intersection_add (struct SetState *set_state, + struct ElementEntry *ee) +{ + set_state->current_set_element_count++; +} + + +/** + * Destroy a set that supports the intersection operation + * + * @param set_state the set to destroy + */ +static void +intersection_set_destroy (struct SetState *set_state) +{ + GNUNET_free (set_state); +} + + +/** + * Remove the element given in the element message from the set. + * + * @param set_state state of the set to remove from + * @param element set element to remove + */ +static void +intersection_remove (struct SetState *set_state, + struct ElementEntry *element) +{ + GNUNET_assert (0 < set_state->current_set_element_count); + set_state->current_set_element_count--; +} + + +/** + * Callback for channel death for the intersection operation. + * + * @param op operation that lost the channel + */ +static void +intersection_channel_death (struct Operation *op) +{ + if (GNUNET_YES == op->state->channel_death_expected) + { + /* oh goodie, we are done! */ + send_client_done_and_destroy (op); + } + else + { + /* sorry, channel went down early, too bad. */ + _GSS_operation_destroy (op, + GNUNET_YES); + } +} + + +/** + * Get the incoming socket associated with the given id. + * + * @param listener the listener to look in + * @param id id to look for + * @return the incoming socket associated with the id, + * or NULL if there is none + */ +static struct Operation * +get_incoming (uint32_t id) +{ + for (struct Listener *listener = listener_head; NULL != listener; + listener = listener->next) + { + for (struct Operation *op = listener->op_head; NULL != op; op = op->next) + if (op->suggest_id == id) + return op; + } + return NULL; +} + + +/** + * Destroy an incoming request from a remote peer + * + * @param op remote request to destroy + */ +static void +incoming_destroy (struct Operation *op) +{ + struct Listener *listener; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying incoming operation %p\n", + op); + if (NULL != (listener = op->listener)) + { + GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); + op->listener = NULL; + } + if (NULL != op->timeout_task) + { + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + } + _GSS_operation_destroy2 (op); +} + + +/** + * Context for the #garbage_collect_cb(). + */ +struct GarbageContext +{ + /** + * Map for which we are garbage collecting removed elements. + */ + struct GNUNET_CONTAINER_MultiHashMap *map; + + /** + * Lowest generation for which an operation is still pending. + */ + unsigned int min_op_generation; + + /** + * Largest generation for which an operation is still pending. + */ + unsigned int max_op_generation; +}; + + +/** + * Function invoked to check if an element can be removed from + * the set's history because it is no longer needed. + * + * @param cls the `struct GarbageContext *` + * @param key key of the element in the map + * @param value the `struct ElementEntry *` + * @return #GNUNET_OK (continue to iterate) + */ +static int +garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value) +{ + // struct GarbageContext *gc = cls; + // struct ElementEntry *ee = value; + + // if (GNUNET_YES != ee->removed) + // return GNUNET_OK; + // if ( (gc->max_op_generation < ee->generation_added) || + // (ee->generation_removed > gc->min_op_generation) ) + // { + // GNUNET_assert (GNUNET_YES == + // GNUNET_CONTAINER_multihashmap_remove (gc->map, + // key, + // ee)); + // GNUNET_free (ee); + // } + return GNUNET_OK; +} + + +/** + * Collect and destroy elements that are not needed anymore, because + * their lifetime (as determined by their generation) does not overlap + * with any active set operation. + * + * @param set set to garbage collect + */ +static void +collect_generation_garbage (struct Set *set) +{ + struct GarbageContext gc; + + gc.min_op_generation = UINT_MAX; + gc.max_op_generation = 0; + for (struct Operation *op = set->ops_head; NULL != op; op = op->next) + { + gc.min_op_generation = + GNUNET_MIN (gc.min_op_generation, op->generation_created); + gc.max_op_generation = + GNUNET_MAX (gc.max_op_generation, op->generation_created); + } + gc.map = set->content->elements; + GNUNET_CONTAINER_multihashmap_iterate (set->content->elements, + &garbage_collect_cb, + &gc); +} + + +/** + * Is @a generation in the range of exclusions? + * + * @param generation generation to query + * @param excluded array of generations where the element is excluded + * @param excluded_size length of the @a excluded array + * @return #GNUNET_YES if @a generation is in any of the ranges + */ +static int +is_excluded_generation (unsigned int generation, + struct GenerationRange *excluded, + unsigned int excluded_size) +{ + for (unsigned int i = 0; i < excluded_size; i++) + if ((generation >= excluded[i].start) && (generation < excluded[i].end)) + return GNUNET_YES; + return GNUNET_NO; +} + + +/** + * Is element @a ee part of the set during @a query_generation? + * + * @param ee element to test + * @param query_generation generation to query + * @param excluded array of generations where the element is excluded + * @param excluded_size length of the @a excluded array + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ +static int +is_element_of_generation (struct ElementEntry *ee, + unsigned int query_generation, + struct GenerationRange *excluded, + unsigned int excluded_size) +{ + struct MutationEvent *mut; + int is_present; + + GNUNET_assert (NULL != ee->mutations); + if (GNUNET_YES == + is_excluded_generation (query_generation, excluded, excluded_size)) + { + GNUNET_break (0); + return GNUNET_NO; + } + + is_present = GNUNET_NO; + + /* Could be made faster with binary search, but lists + are small, so why bother. */ + for (unsigned int i = 0; i < ee->mutations_size; i++) + { + mut = &ee->mutations[i]; + + if (mut->generation > query_generation) + { + /* The mutation doesn't apply to our generation + anymore. We can'b break here, since mutations aren't + sorted by generation. */ + continue; + } + + if (GNUNET_YES == + is_excluded_generation (mut->generation, excluded, excluded_size)) + { + /* The generation is excluded (because it belongs to another + fork via a lazy copy) and thus mutations aren't considered + for membership testing. */ + continue; + } + + /* This would be an inconsistency in how we manage mutations. */ + if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added)) + GNUNET_assert (0); + /* Likewise. */ + if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added)) + GNUNET_assert (0); + + is_present = mut->added; + } + + return is_present; +} + + +/** + * Is element @a ee part of the set used by @a op? + * + * @param ee element to test + * @param op operation the defines the set and its generation + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ +int +_GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) +{ + return is_element_of_generation (ee, + op->generation_created, + op->set->excluded_generations, + op->set->excluded_generations_size); +} + + +/** + * Destroy the given operation. Used for any operation where both + * peers were known and that thus actually had a vt and channel. Must + * not be used for operations where 'listener' is still set and we do + * not know the other peer. + * + * Call the implementation-specific cancel function of the operation. + * Disconnects from the remote peer. Does not disconnect the client, + * as there may be multiple operations per set. + * + * @param op operation to destroy + * @param gc #GNUNET_YES to perform garbage collection on the set + */ +void +_GSS_operation_destroy (struct Operation *op, int gc) +{ + struct Set *set = op->set; + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op); + GNUNET_assert (NULL == op->listener); + if (NULL != op->state) + { + intersection_cancel (op); // FIXME: inline + op->state = NULL; + } + if (NULL != set) + { + GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op); + op->set = NULL; + } + if (NULL != op->context_msg) + { + GNUNET_free (op->context_msg); + op->context_msg = NULL; + } + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + if ((NULL != set) && (GNUNET_YES == gc)) + collect_generation_garbage (set); + /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, + * there was a channel end handler that will free 'op' on the call stack. */ +} + + +/** + * Callback called when a client connects to the service. + * + * @param cls closure for the service + * @param c the new client that connected to the service + * @param mq the message queue used to send messages to the client + * @return @a `struct ClientState` + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + struct GNUNET_MQ_Handle *mq) +{ + struct ClientState *cs; + + num_clients++; + cs = GNUNET_new (struct ClientState); + cs->client = c; + cs->mq = mq; + return cs; +} + + +/** + * Iterator over hash map entries to free element entries. + * + * @param cls closure + * @param key current key code + * @param value a `struct ElementEntry *` to be free'd + * @return #GNUNET_YES (continue to iterate) + */ +static int +destroy_elements_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ElementEntry *ee = value; + + GNUNET_free (ee->mutations); + GNUNET_free (ee); + return GNUNET_YES; +} + + +/** + * Clean up after a client has disconnected + * + * @param cls closure, unused + * @param client the client to clean up after + * @param internal_cls the `struct ClientState` + */ +static void +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *internal_cls) +{ + struct ClientState *cs = internal_cls; + struct Operation *op; + struct Listener *listener; + struct Set *set; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n"); + if (NULL != (set = cs->set)) + { + struct SetContent *content = set->content; + struct PendingMutation *pm; + struct PendingMutation *pm_current; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); + /* Destroy pending set operations */ + while (NULL != set->ops_head) + _GSS_operation_destroy (set->ops_head, GNUNET_NO); + + /* Destroy operation-specific state */ + GNUNET_assert (NULL != set->state); + intersection_set_destroy (set->state); // FIXME: inline + set->state = NULL; + + /* Clean up ongoing iterations */ + if (NULL != set->iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); + set->iter = NULL; + set->iteration_id++; + } + + /* discard any pending mutations that reference this set */ + pm = content->pending_mutations_head; + while (NULL != pm) + { + pm_current = pm; + pm = pm->next; + if (pm_current->set == set) + { + GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, + content->pending_mutations_tail, + pm_current); + GNUNET_free (pm_current); + } + } + + /* free set content (or at least decrement RC) */ + set->content = NULL; + GNUNET_assert (0 != content->refcount); + content->refcount--; + if (0 == content->refcount) + { + GNUNET_assert (NULL != content->elements); + GNUNET_CONTAINER_multihashmap_iterate (content->elements, + &destroy_elements_iterator, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (content->elements); + content->elements = NULL; + GNUNET_free (content); + } + GNUNET_free (set->excluded_generations); + set->excluded_generations = NULL; + + GNUNET_free (set); + } + + if (NULL != (listener = cs->listener)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n"); + GNUNET_CADET_close_port (listener->open_port); + listener->open_port = NULL; + while (NULL != (op = listener->op_head)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Destroying incoming operation `%u' from peer `%s'\n", + (unsigned int) op->client_request_id, + GNUNET_i2s (&op->peer)); + incoming_destroy (op); + } + GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener); + GNUNET_free (listener); + } + GNUNET_free (cs); + num_clients--; + if ((GNUNET_YES == in_shutdown) && (0 == num_clients)) + { + if (NULL != cadet) + { + GNUNET_CADET_disconnect (cadet); + cadet = NULL; + } + } +} + + +/** + * Check a request for a set operation from another peer. + * + * @param cls the operation state + * @param msg the received message + * @return #GNUNET_OK if the channel should be kept alive, + * #GNUNET_SYSERR to destroy the channel + */ +static int +check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) +{ + struct Operation *op = cls; + struct Listener *listener = op->listener; + const struct GNUNET_MessageHeader *nested_context; + + /* double operation request */ + if (0 != op->suggest_id) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + /* This should be equivalent to the previous condition, but can't hurt to check twice */ + if (NULL == op->listener) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (listener->operation != + (enum GNUNET_SET_OperationType) ntohl (msg->operation)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + nested_context = GNUNET_MQ_extract_nested_mh (msg); + if ((NULL != nested_context) && + (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle a request for a set operation from another peer. Checks if we + * have a listener waiting for such a request (and in that case initiates + * asking the listener about accepting the connection). If no listener + * is waiting, we queue the operation request in hope that a listener + * shows up soon (before timeout). + * + * This msg is expected as the first and only msg handled through the + * non-operation bound virtual table, acceptance of this operation replaces + * our virtual table and subsequent msgs would be routed differently (as + * we then know what type of operation this is). + * + * @param cls the operation state + * @param msg the received message + * @return #GNUNET_OK if the channel should be kept alive, + * #GNUNET_SYSERR to destroy the channel + */ +static void +handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) +{ + struct Operation *op = cls; + struct Listener *listener = op->listener; + const struct GNUNET_MessageHeader *nested_context; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_SET_RequestMessage *cmsg; + + nested_context = GNUNET_MQ_extract_nested_mh (msg); + /* Make a copy of the nested_context (application-specific context + information that is opaque to set) so we can pass it to the + listener later on */ + if (NULL != nested_context) + op->context_msg = GNUNET_copy_message (nested_context); + op->remote_element_count = ntohl (msg->element_count); + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Received P2P operation request (op %u, port %s) for active listener\n", + (uint32_t) ntohl (msg->operation), + GNUNET_h2s (&op->listener->app_id)); + GNUNET_assert (0 == op->suggest_id); + if (0 == suggest_id) + suggest_id++; + op->suggest_id = suggest_id++; + GNUNET_assert (NULL != op->timeout_task); + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + env = GNUNET_MQ_msg_nested_mh (cmsg, + GNUNET_MESSAGE_TYPE_SETI_REQUEST, + op->context_msg); + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Suggesting incoming request with accept id %u to listener %p of client %p\n", + op->suggest_id, + listener, + listener->cs); + cmsg->accept_id = htonl (op->suggest_id); + cmsg->peer_id = op->peer; + GNUNET_MQ_send (listener->cs->mq, env); + /* NOTE: GNUNET_CADET_receive_done() will be called in + #handle_client_accept() */ +} + + +/** + * Add an element to @a set as specified by @a msg + * + * @param set set to manipulate + * @param msg message specifying the change + */ +static void +execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) +{ + struct GNUNET_SET_Element el; + struct ElementEntry *ee; + struct GNUNET_HashCode hash; + + GNUNET_assert (GNUNET_MESSAGE_TYPE_SETI_ADD == ntohs (msg->header.type)); + el.size = ntohs (msg->header.size) - sizeof(*msg); + el.data = &msg[1]; + el.element_type = ntohs (msg->element_type); + GNUNET_SET_element_hash (&el, &hash); + ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); + if (NULL == ee) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserts element %s of size %u\n", + GNUNET_h2s (&hash), + el.size); + ee = GNUNET_malloc (el.size + sizeof(*ee)); + ee->element.size = el.size; + GNUNET_memcpy (&ee[1], el.data, el.size); + ee->element.data = &ee[1]; + ee->element.element_type = el.element_type; + ee->remote = GNUNET_NO; + ee->mutations = NULL; + ee->mutations_size = 0; + ee->element_hash = hash; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put ( + set->content->elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + } + else if (GNUNET_YES == + is_element_of_generation (ee, + set->current_generation, + set->excluded_generations, + set->excluded_generations_size)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserted element %s of size %u twice (ignored)\n", + GNUNET_h2s (&hash), + el.size); + + /* same element inserted twice */ + return; + } + + { + struct MutationEvent mut = { .generation = set->current_generation, + .added = GNUNET_YES }; + GNUNET_array_append (ee->mutations, ee->mutations_size, mut); + } + // FIXME: inline + intersection_add (set->state, + ee); +} + + +/** + * Perform a mutation on a set as specified by the @a msg + * + * @param set the set to mutate + * @param msg specification of what to change + */ +static void +execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) +{ + switch (ntohs (msg->header.type)) + { + case GNUNET_MESSAGE_TYPE_SETI_ADD: // FIXME: inline! + execute_add (set, msg); + break; + default: + GNUNET_break (0); + } +} + + +/** + * Execute mutations that were delayed on a set because of + * pending operations. + * + * @param set the set to execute mutations on + */ +static void +execute_delayed_mutations (struct Set *set) +{ + struct PendingMutation *pm; + + if (0 != set->content->iterator_count) + return; /* still cannot do this */ + while (NULL != (pm = set->content->pending_mutations_head)) + { + GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, + set->content->pending_mutations_tail, + pm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Executing pending mutation on %p.\n", + pm->set); + execute_mutation (pm->set, pm->msg); + GNUNET_free (pm->msg); + GNUNET_free (pm); + } +} + + +/** + * Send the next element of a set to the set's client. The next element is given by + * the set's current hashmap iterator. The set's iterator will be set to NULL if there + * are no more elements in the set. The caller must ensure that the set's iterator is + * valid. + * + * The client will acknowledge each received element with a + * #GNUNET_MESSAGE_TYPE_SETI_ITER_ACK message. Our + * #handle_client_iter_ack() will then trigger the next transmission. + * Note that the #GNUNET_MESSAGE_TYPE_SETI_ITER_DONE is not acknowledged. + * + * @param set set that should send its next element to its client + */ +static void +send_client_element (struct Set *set) +{ + int ret; + struct ElementEntry *ee; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_IterResponseMessage *msg; + + GNUNET_assert (NULL != set->iter); + do + { + ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, + NULL, + (const void **) &ee); + if (GNUNET_NO == ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETI_ITER_DONE); + GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); + set->iter = NULL; + set->iteration_id++; + GNUNET_assert (set->content->iterator_count > 0); + set->content->iterator_count--; + execute_delayed_mutations (set); + GNUNET_MQ_send (set->cs->mq, ev); + return; + } + GNUNET_assert (NULL != ee); + } + while (GNUNET_NO == + is_element_of_generation (ee, + set->iter_generation, + set->excluded_generations, + set->excluded_generations_size)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending iteration element on %p.\n", + set); + ev = GNUNET_MQ_msg_extra (msg, + ee->element.size, + GNUNET_MESSAGE_TYPE_SETI_ITER_ELEMENT); + GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size); + msg->element_type = htons (ee->element.element_type); + msg->iteration_id = htons (set->iteration_id); + GNUNET_MQ_send (set->cs->mq, ev); +} + + +/** + * Called when a client wants to iterate the elements of a set. + * Checks if we have a set associated with the client and if we + * can right now start an iteration. If all checks out, starts + * sending the elements of the set to the client. + * + * @param cls client that sent the message + * @param m message sent by the client + */ +static void +handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) +{ + struct ClientState *cs = cls; + struct Set *set; + + if (NULL == (set = cs->set)) + { + /* attempt to iterate over a non existing set */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + if (NULL != set->iter) + { + /* Only one concurrent iterate-action allowed per set */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Iterating set %p in gen %u with %u content elements\n", + (void *) set, + set->current_generation, + GNUNET_CONTAINER_multihashmap_size (set->content->elements)); + GNUNET_SERVICE_client_continue (cs->client); + set->content->iterator_count++; + set->iter = + GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); + set->iter_generation = set->current_generation; + send_client_element (set); +} + + +/** + * Called when a client wants to create a new set. This is typically + * the first request from a client, and includes the type of set + * operation to be performed. + * + * @param cls client that sent the message + * @param m message sent by the client + */ +static void +handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client created new set (operation %u)\n", + (uint32_t) ntohl (msg->operation)); + if (NULL != cs->set) + { + /* There can only be one set per client */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set = GNUNET_new (struct Set); + switch (ntohl (msg->operation)) + { + case GNUNET_SET_OPERATION_INTERSECTION: + set->vt = _GSS_intersection_vt (); + break; + + case GNUNET_SET_OPERATION_UNION: + set->vt = _GSS_union_vt (); + break; + + default: + GNUNET_free (set); + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); + set->state = intersection_set_create (); // FIXME: inline + if (NULL == set->state) + { + /* initialization failed (i.e. out of memory) */ + GNUNET_free (set); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + set->content = GNUNET_new (struct SetContent); + set->content->refcount = 1; + set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + set->cs = cs; + cs->set = set; + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Timeout happens iff: + * - we suggested an operation to our listener, + * but did not receive a response in time + * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST + * + * @param cls channel context + * @param tc context information (why was this task triggered now) + */ +static void +incoming_timeout_cb (void *cls) +{ + struct Operation *op = cls; + + op->timeout_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Remote peer's incoming request timed out\n"); + incoming_destroy (op); +} + + +/** + * Method called whenever another peer has added us to a channel the + * other peer initiated. Only called (once) upon reception of data + * from a channel we listen on. + * + * The channel context represents the operation itself and gets added + * to a DLL, from where it gets looked up when our local listener + * client responds to a proposed/suggested operation or connects and + * associates with this operation. + * + * @param cls closure + * @param channel new handle to the channel + * @param source peer that started the channel + * @return initial channel context for the channel + * returns NULL on error + */ +static void * +channel_new_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *source) +{ + struct Listener *listener = cls; + struct Operation *op; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n"); + op = GNUNET_new (struct Operation); + op->listener = listener; + op->peer = *source; + op->channel = channel; + op->mq = GNUNET_CADET_get_mq (op->channel); + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, + &incoming_timeout_cb, + op); + GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op); + return op; +} + + +/** + * Function called whenever a channel is destroyed. Should clean up + * any associated state. It must NOT call + * GNUNET_CADET_channel_destroy() on the channel. + * + * The peer_disconnect function is part of a a virtual table set initially either + * when a peer creates a new channel with us, or once we create + * a new channel ourselves (evaluate). + * + * Once we know the exact type of operation (union/intersection), the vt is + * replaced with an operation specific instance (_GSS_[op]_vt). + * + * @param channel_ctx place where local state associated + * with the channel is stored + * @param channel connection to the other end (henceforth invalid) + */ +static void +channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) +{ + struct Operation *op = channel_ctx; + + op->channel = NULL; + _GSS_operation_destroy2 (op); +} + + +/** + * This function probably should not exist + * and be replaced by inlining more specific + * logic in the various places where it is called. + */ +void +_GSS_operation_destroy2 (struct Operation *op) +{ + struct GNUNET_CADET_Channel *channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n"); + if (NULL != (channel = op->channel)) + { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); + } + if (NULL != op->listener) + { + incoming_destroy (op); + return; + } + if (NULL != op->set) + intersection_channel_death (op); // FIXME: inline + else + _GSS_operation_destroy (op, GNUNET_YES); + GNUNET_free (op); +} + + +/** + * Function called whenever an MQ-channel's transmission window size changes. + * + * The first callback in an outgoing channel will be with a non-zero value + * and will mean the channel is connected to the destination. + * + * For an incoming channel it will be called immediately after the + * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. + * + * @param cls Channel closure. + * @param channel Connection to the other end (henceforth invalid). + * @param window_size New window size. If the is more messages than buffer size + * this value will be negative.. + */ +static void +channel_window_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + /* FIXME: not implemented, we could do flow control here... */ +} + + +/** + * Called when a client wants to create a new listener. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) +{ + struct ClientState *cs = cls; + struct GNUNET_MQ_MessageHandler cadet_handlers[] = + { GNUNET_MQ_hd_var_size (incoming_msg, + GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST, + struct OperationRequestMessage, + NULL), + GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, + GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO, + struct IntersectionElementInfoMessage, + NULL), + GNUNET_MQ_hd_var_size (intersection_p2p_bf, + GNUNET_MESSAGE_TYPE_SETI_P2P_BF, + struct BFMessage, + NULL), + GNUNET_MQ_hd_fixed_size (intersection_p2p_done, + GNUNET_MESSAGE_TYPE_SETI_P2P_DONE, + struct IntersectionDoneMessage, + NULL), + GNUNET_MQ_handler_end () }; + struct Listener *listener; + + if (NULL != cs->listener) + { + /* max. one active listener per client! */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + listener = GNUNET_new (struct Listener); + listener->cs = cs; + cs->listener = listener; + listener->app_id = msg->app_id; + listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); + GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New listener created (op %u, port %s)\n", + listener->operation, + GNUNET_h2s (&listener->app_id)); + listener->open_port = GNUNET_CADET_open_port (cadet, + &msg->app_id, + &channel_new_cb, + listener, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called when the listening client rejects an operation + * request by another peer. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) +{ + struct ClientState *cs = cls; + struct Operation *op; + + op = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == op) + { + /* no matching incoming operation for this reject; + could be that the other peer already disconnected... */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client rejected unknown operation %u\n", + (unsigned int) ntohl (msg->accept_reject_id)); + GNUNET_SERVICE_client_continue (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer request (op %u, app %s) rejected by client\n", + op->listener->operation, + GNUNET_h2s (&cs->listener->app_id)); + _GSS_operation_destroy2 (op); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called when a client wants to add or remove an element to a set it inhabits. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static int +check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) +{ + /* NOTE: Technically, we should probably check with the + block library whether the element we are given is well-formed */ + return GNUNET_OK; +} + + +/** + * Called when a client wants to add or remove an element to a set it inhabits. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + + if (NULL == (set = cs->set)) + { + /* client without a set requested an operation */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_SERVICE_client_continue (cs->client); + + if (0 != set->content->iterator_count) + { + struct PendingMutation *pm; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n"); + pm = GNUNET_new (struct PendingMutation); + pm->msg = + (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); + pm->set = set; + GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, + set->content->pending_mutations_tail, + pm); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); + execute_mutation (set, msg); +} + + +/** + * Advance the current generation of a set, + * adding exclusion ranges if necessary. + * + * @param set the set where we want to advance the generation + */ +static void +advance_generation (struct Set *set) +{ + struct GenerationRange r; + + if (set->current_generation == set->content->latest_generation) + { + set->content->latest_generation++; + set->current_generation++; + return; + } + + GNUNET_assert (set->current_generation < set->content->latest_generation); + + r.start = set->current_generation + 1; + r.end = set->content->latest_generation + 1; + set->content->latest_generation = r.end; + set->current_generation = r.end; + GNUNET_array_append (set->excluded_generations, + set->excluded_generations_size, + r); +} + + +/** + * Called when a client wants to initiate a set operation with another + * peer. Initiates the CADET connection to the listener and sends the + * request. + * + * @param cls client that sent the message + * @param msg message sent by the client + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) +{ + /* FIXME: suboptimal, even if the context below could be NULL, + there are malformed messages this does not check for... */ + return GNUNET_OK; +} + + +/** + * Called when a client wants to initiate a set operation with another + * peer. Initiates the CADET connection to the listener and sends the + * request. + * + * @param cls client that sent the message + * @param msg message sent by the client + */ +static void +handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) +{ + struct ClientState *cs = cls; + struct Operation *op = GNUNET_new (struct Operation); + const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (incoming_msg, + GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST, + struct OperationRequestMessage, + op), + GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, + GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO, + struct IntersectionElementInfoMessage, + op), + GNUNET_MQ_hd_var_size (intersection_p2p_bf, + GNUNET_MESSAGE_TYPE_SETI_P2P_BF, + struct BFMessage, + op), + GNUNET_MQ_hd_fixed_size (intersection_p2p_done, + GNUNET_MESSAGE_TYPE_SETI_P2P_DONE, + struct IntersectionDoneMessage, + op), + GNUNET_MQ_handler_end () + }; + struct Set *set; + const struct GNUNET_MessageHeader *context; + + if (NULL == (set = cs->set)) + { + GNUNET_break (0); + GNUNET_free (op); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + op->peer = msg->target_peer; + op->result_mode = ntohl (msg->result_mode); + op->client_request_id = ntohl (msg->request_id); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; + context = GNUNET_MQ_extract_nested_mh (msg); + + /* Advance generation values, so that + mutations won't interfer with the running operation. */ + op->set = set; + op->generation_created = set->current_generation; + advance_generation (set); + GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new CADET channel to port %s for set operation type %u\n", + GNUNET_h2s (&msg->app_id), + set->operation); + op->channel = GNUNET_CADET_channel_create (cadet, + op, + &msg->target_peer, + &msg->app_id, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); + op->mq = GNUNET_CADET_get_mq (op->channel); + op->state = intersection_evaluate (op, context); // FIXME: inline! + if (NULL == op->state) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Handle a request from the client to cancel a running set operation. + * + * @param cls the client + * @param msg the message + */ +static void +handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + struct Operation *op; + int found; + + if (NULL == (set = cs->set)) + { + /* client without a set requested an operation */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + found = GNUNET_NO; + for (op = set->ops_head; NULL != op; op = op->next) + { + if (op->client_request_id == ntohl (msg->request_id)) + { + found = GNUNET_YES; + break; + } + } + if (GNUNET_NO == found) + { + /* It may happen that the operation was already destroyed due to + * the other peer disconnecting. The client may not know about this + * yet and try to cancel the (just barely non-existent) operation. + * So this is not a hard error. + */GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client canceled non-existent op %u\n", + (uint32_t) ntohl (msg->request_id)); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client requested cancel for op %u\n", + (uint32_t) ntohl (msg->request_id)); + _GSS_operation_destroy (op, GNUNET_YES); + } + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Handle a request from the client to accept a set operation that + * came from a remote peer. We forward the accept to the associated + * operation for handling + * + * @param cls the client + * @param msg the message + */ +static void +handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) +{ + struct ClientState *cs = cls; + struct Set *set; + struct Operation *op; + struct GNUNET_SET_ResultMessage *result_message; + struct GNUNET_MQ_Envelope *ev; + struct Listener *listener; + + if (NULL == (set = cs->set)) + { + /* client without a set requested to accept */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + op = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == op) + { + /* It is not an error if the set op does not exist -- it may + * have been destroyed when the partner peer disconnected. */ + GNUNET_log ( + GNUNET_ERROR_TYPE_INFO, + "Client %p accepted request %u of listener %p that is no longer active\n", + cs, + ntohl (msg->accept_reject_id), + cs->listener); + ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SETI_RESULT); + result_message->request_id = msg->request_id; + result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); + GNUNET_MQ_send (set->cs->mq, ev); + GNUNET_SERVICE_client_continue (cs->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client accepting request %u\n", + (uint32_t) ntohl (msg->accept_reject_id)); + listener = op->listener; + op->listener = NULL; + GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); + op->set = set; + GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + op->client_request_id = ntohl (msg->request_id); + op->result_mode = ntohl (msg->result_mode); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; + + /* Advance generation values, so that future mutations do not + interfer with the running operation. */ + op->generation_created = set->current_generation; + advance_generation (set); + GNUNET_assert (NULL == op->state); + op->state = intersection_accept (op); // FIXME: inline + if (NULL == op->state) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + /* Now allow CADET to continue, as we did not do this in + #handle_incoming_msg (as we wanted to first see if the + local client would accept the request). */ + GNUNET_CADET_receive_done (op->channel); + GNUNET_SERVICE_client_continue (cs->client); +} + + +/** + * Called to clean up, after a shutdown has been requested. + * + * @param cls closure, NULL + */ +static void +shutdown_task (void *cls) +{ + /* Delay actual shutdown to allow service to disconnect clients */ + in_shutdown = GNUNET_YES; + if (0 == num_clients) + { + if (NULL != cadet) + { + GNUNET_CADET_disconnect (cadet); + cadet = NULL; + } + } + GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); +} + + +/** + * Function called by the service's run + * method to run service-specific setup code. + * + * @param cls closure + * @param cfg configuration to use + * @param service the initialized service + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_SERVICE_Handle *service) +{ + /* FIXME: need to modify SERVICE (!) API to allow + us to run a shutdown task *after* clients were + forcefully disconnected! */ + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); + cadet = GNUNET_CADET_connect (cfg); + if (NULL == cadet) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _ ("Could not connect to CADET service\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Define "main" method using service macro. + */ +GNUNET_SERVICE_MAIN ( + "set", + GNUNET_SERVICE_OPTION_NONE, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_fixed_size (client_accept, + GNUNET_MESSAGE_TYPE_SETI_ACCEPT, + struct GNUNET_SET_AcceptMessage, + NULL), + GNUNET_MQ_hd_var_size (client_mutation, + GNUNET_MESSAGE_TYPE_SETI_ADD, + struct GNUNET_SET_ElementMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_create_set, + GNUNET_MESSAGE_TYPE_SETI_CREATE, + struct GNUNET_SET_CreateMessage, + NULL), + GNUNET_MQ_hd_var_size (client_evaluate, + GNUNET_MESSAGE_TYPE_SETI_EVALUATE, + struct GNUNET_SET_EvaluateMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_listen, + GNUNET_MESSAGE_TYPE_SETI_LISTEN, + struct GNUNET_SET_ListenMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_reject, + GNUNET_MESSAGE_TYPE_SETI_REJECT, + struct GNUNET_SET_RejectMessage, + NULL), + GNUNET_MQ_hd_fixed_size (client_cancel, + GNUNET_MESSAGE_TYPE_SETI_CANCEL, + struct GNUNET_SET_CancelMessage, + NULL), + GNUNET_MQ_handler_end ()); + + +/* end of gnunet-service-seti.c */ diff --git a/src/seti/gnunet-service-seti_protocol.h b/src/seti/gnunet-service-seti_protocol.h new file mode 100644 index 000000000..51968376e --- /dev/null +++ b/src/seti/gnunet-service-seti_protocol.h @@ -0,0 +1,144 @@ +/* + This file is part of GNUnet. + Copyright (C) 2013, 2014, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @author Florian Dold + * @author Christian Grothoff + * @file seti/gnunet-service-seti_protocol.h + * @brief Peer-to-Peer messages for gnunet set + */ +#ifndef SETI_PROTOCOL_H +#define SETI_PROTOCOL_H + +#include "platform.h" +#include "gnunet_common.h" + + +GNUNET_NETWORK_STRUCT_BEGIN + +struct OperationRequestMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST + */ + struct GNUNET_MessageHeader header; + + /** + * For Intersection: my element count + */ + uint32_t element_count GNUNET_PACKED; + + /** + * Application-specific identifier of the request. + */ + struct GNUNET_HashCode app_idX; + + /* rest: optional message */ +}; + + +/** + * During intersection, the first (and possibly second) message + * send it the number of elements in the set, to allow the peers + * to decide who should start with the Bloom filter. + */ +struct IntersectionElementInfoMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO + */ + struct GNUNET_MessageHeader header; + + /** + * mutator used with this bloomfilter. + */ + uint32_t sender_element_count GNUNET_PACKED; +}; + + +/** + * Bloom filter messages exchanged for set intersection calculation. + */ +struct BFMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF + */ + struct GNUNET_MessageHeader header; + + /** + * Number of elements the sender still has in the set. + */ + uint32_t sender_element_count GNUNET_PACKED; + + /** + * XOR of all hashes over all elements remaining in the set. + * Used to determine termination. + */ + struct GNUNET_HashCode element_xor_hash; + + /** + * Mutator used with this bloomfilter. + */ + uint32_t sender_mutator GNUNET_PACKED; + + /** + * Total length of the bloomfilter data. + */ + uint32_t bloomfilter_total_length GNUNET_PACKED; + + /** + * Number of bits (k-value) used in encoding the bloomfilter. + */ + uint32_t bits_per_element GNUNET_PACKED; + + /** + * rest: the sender's bloomfilter + */ +}; + + +/** + * Last message, send to confirm the final set. Contains the element + * count as it is possible that the peer determined that we were done + * by getting the empty set, which in that case also needs to be + * communicated. + */ +struct IntersectionDoneMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE + */ + struct GNUNET_MessageHeader header; + + /** + * Final number of elements in intersection. + */ + uint32_t final_element_count GNUNET_PACKED; + + /** + * XOR of all hashes over all elements remaining in the set. + */ + struct GNUNET_HashCode element_xor_hash; +}; + + +GNUNET_NETWORK_STRUCT_END + +#endif diff --git a/src/seti/gnunet-seti-profiler.c b/src/seti/gnunet-seti-profiler.c new file mode 100644 index 000000000..b8230bcfc --- /dev/null +++ b/src/seti/gnunet-seti-profiler.c @@ -0,0 +1,480 @@ +/* + This file is part of GNUnet + Copyright (C) 2013, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/gnunet-seti-profiler.c + * @brief profiling tool for set intersection + * @author Florian Dold + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" +#include "gnunet_seti_service.h" +#include "gnunet_testbed_service.h" + + +static int ret; + +static unsigned int num_a = 5; +static unsigned int num_b = 5; +static unsigned int num_c = 20; + +const static struct GNUNET_CONFIGURATION_Handle *config; + +struct SetInfo +{ + char *id; + struct GNUNET_SETI_Handle *set; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_CONTAINER_MultiHashMap *sent; + struct GNUNET_CONTAINER_MultiHashMap *received; + int done; +} info1, info2; + +static struct GNUNET_CONTAINER_MultiHashMap *common_sent; + +static struct GNUNET_HashCode app_id; + +static struct GNUNET_PeerIdentity local_peer; + +static struct GNUNET_SETI_ListenHandle *set_listener; + +static unsigned int use_intersection; + +static unsigned int element_size = 32; + +/** + * Handle to the statistics service. + */ +static struct GNUNET_STATISTICS_Handle *statistics; + +/** + * The profiler will write statistics + * for all peers to the file with this name. + */ +static char *statistics_filename; + +/** + * The profiler will write statistics + * for all peers to this file. + */ +static FILE *statistics_file; + + +static int +map_remove_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_CONTAINER_MultiHashMap *m = cls; + int ret; + + GNUNET_assert (NULL != key); + + ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key); + if (GNUNET_OK != ret) + printf ("spurious element\n"); + return GNUNET_YES; +} + + +/** + * Callback function to process statistic values. + * + * @param cls closure + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + */ +static int +statistics_result (void *cls, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) +{ + if (NULL != statistics_file) + { + fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned + long) value); + } + return GNUNET_OK; +} + + +static void +statistics_done (void *cls, + int success) +{ + GNUNET_assert (GNUNET_YES == success); + if (NULL != statistics_file) + fclose (statistics_file); + GNUNET_SCHEDULER_shutdown (); +} + + +static void +check_all_done (void) +{ + if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO)) + return; + + GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator, + info2.sent); + GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator, + info1.sent); + + printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size ( + info1.sent)); + printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size ( + info2.sent)); + + if (NULL == statistics_filename) + { + GNUNET_SCHEDULER_shutdown (); + return; + } + + statistics_file = fopen (statistics_filename, "w"); + GNUNET_STATISTICS_get (statistics, NULL, NULL, + &statistics_done, + &statistics_result, NULL); +} + + +static void +set_result_cb (void *cls, + const struct GNUNET_SETI_Element *element, + uint64_t current_size, + enum GNUNET_SETI_Status status) +{ + struct SetInfo *info = cls; + struct GNUNET_HashCode hash; + + GNUNET_assert (GNUNET_NO == info->done); + switch (status) + { + case GNUNET_SETI_STATUS_DONE: + info->done = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set intersection done\n"); + check_all_done (); + info->oh = NULL; + return; + case GNUNET_SETI_STATUS_FAILURE: + info->oh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "failure\n"); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_SETI_STATUS_ADD_LOCAL: + GNUNET_CRYPTO_hash (element->data, + element->size, + &hash); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set %s: keep element %s\n", + info->id, + GNUNET_h2s (&hash)); + break; + case GNUNET_SETI_STATUS_DEL_LOCAL: + GNUNET_CRYPTO_hash (element->data, + element->size, + &hash); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set %s: remove element %s\n", + info->id, + GNUNET_h2s (&hash)); + return; + default: + GNUNET_assert (0); + } + + if (element->size != element_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "wrong element size: %u, expected %u\n", + element->size, + (unsigned int) sizeof(struct GNUNET_HashCode)); + GNUNET_assert (0); + } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "set %s: got element (%s)\n", + info->id, GNUNET_h2s (element->data)); + GNUNET_assert (NULL != element->data); + { + struct GNUNET_HashCode data_hash; + + GNUNET_CRYPTO_hash (element->data, + element_size, + &data_hash); + GNUNET_CONTAINER_multihashmap_put (info->received, + &data_hash, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } +} + + +static void +set_listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SETI_Request *request) +{ + /* max. 1 option plus terminator */ + struct GNUNET_SETI_Option opts[2] = { { 0 } }; + unsigned int n_opts = 0; + + if (NULL == request) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "listener failed\n"); + return; + } + GNUNET_assert (NULL == info2.oh); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "set listen cb called\n"); + if (use_intersection) + { + opts[n_opts++] = (struct GNUNET_SETI_Option) { .type = + GNUNET_SETI_OPTION_RETURN_INTERSECTION }; + } + opts[n_opts].type = GNUNET_SETI_OPTION_END; + info2.oh = GNUNET_SETI_accept (request, + opts, + &set_result_cb, + &info2); + GNUNET_SETI_commit (info2.oh, + info2.set); +} + + +static int +set_insert_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_SETI_Handle *set = cls; + struct GNUNET_SETI_Element el; + + el.element_type = 0; + el.data = value; + el.size = element_size; + GNUNET_SETI_add_element (set, &el, NULL, NULL); + return GNUNET_YES; +} + + +static void +handle_shutdown (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shutting down set profiler\n"); + if (NULL != set_listener) + { + GNUNET_SETI_listen_cancel (set_listener); + set_listener = NULL; + } + if (NULL != info1.oh) + { + GNUNET_SETI_operation_cancel (info1.oh); + info1.oh = NULL; + } + if (NULL != info2.oh) + { + GNUNET_SETI_operation_cancel (info2.oh); + info2.oh = NULL; + } + if (NULL != info1.set) + { + GNUNET_SETI_destroy (info1.set); + info1.set = NULL; + } + if (NULL != info2.set) + { + GNUNET_SETI_destroy (info2.set); + info2.set = NULL; + } + GNUNET_STATISTICS_destroy (statistics, GNUNET_NO); +} + + +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +{ + unsigned int i; + struct GNUNET_HashCode hash; + /* max. 1 option plus terminator */ + struct GNUNET_SETI_Option opts[2] = { { 0 } }; + unsigned int n_opts = 0; + + config = cfg; + + GNUNET_assert (element_size > 0); + + if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "could not retrieve host identity\n"); + ret = 0; + return; + } + statistics = GNUNET_STATISTICS_create ("set-profiler", cfg); + GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL); + info1.id = "a"; + info2.id = "b"; + info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO); + info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO); + common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO); + info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO); + info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO); + for (i = 0; i < num_a; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + for (i = 0; i < num_b; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + for (i = 0; i < num_c; i++) + { + char *data = GNUNET_malloc (element_size); + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size); + GNUNET_CRYPTO_hash (data, element_size, &hash); + GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id); + + info1.set = GNUNET_SETI_create (config); + info2.set = GNUNET_SETI_create (config); + GNUNET_CONTAINER_multihashmap_iterate (info1.sent, + &set_insert_iterator, + info1.set); + GNUNET_CONTAINER_multihashmap_iterate (info2.sent, + &set_insert_iterator, + info2.set); + GNUNET_CONTAINER_multihashmap_iterate (common_sent, + &set_insert_iterator, + info1.set); + GNUNET_CONTAINER_multihashmap_iterate (common_sent, + &set_insert_iterator, + info2.set); + + set_listener = GNUNET_SETI_listen (config, + &app_id, + &set_listen_cb, + NULL); + if (use_intersection) + { + opts[n_opts++] = (struct GNUNET_SETI_Option) { .type = + GNUNET_SETI_OPTION_RETURN_INTERSECTION }; + } + opts[n_opts].type = GNUNET_SETI_OPTION_END; + + info1.oh = GNUNET_SETI_prepare (&local_peer, + &app_id, + NULL, + opts, + set_result_cb, + &info1); + GNUNET_SETI_commit (info1.oh, + info1.set); + GNUNET_SETI_destroy (info1.set); + info1.set = NULL; +} + + +static void +pre_run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + if (0 != GNUNET_TESTING_peer_run ("set-profiler", + cfgfile, + &run, NULL)) + ret = 2; +} + + +int +main (int argc, char **argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_uint ('A', + "num-first", + NULL, + gettext_noop ("number of values"), + &num_a), + GNUNET_GETOPT_option_uint ('B', + "num-second", + NULL, + gettext_noop ("number of values"), + &num_b), + GNUNET_GETOPT_option_uint ('C', + "num-common", + NULL, + gettext_noop ("number of values"), + &num_c), + GNUNET_GETOPT_option_uint ('i', + "use-intersection", + NULL, + gettext_noop ( + "return intersection instead of delta"), + &use_intersection), + GNUNET_GETOPT_option_uint ('w', + "element-size", + NULL, + gettext_noop ("element size"), + &element_size), + GNUNET_GETOPT_option_filename ('s', + "statistics", + "FILENAME", + gettext_noop ("write statistics to file"), + &statistics_filename), + GNUNET_GETOPT_OPTION_END + }; + + GNUNET_PROGRAM_run2 (argc, argv, + "gnunet-seti-profiler", + "help", + options, + &pre_run, + NULL, + GNUNET_YES); + return ret; +} diff --git a/src/seti/plugin_block_seti_test.c b/src/seti/plugin_block_seti_test.c new file mode 100644 index 000000000..1de086092 --- /dev/null +++ b/src/seti/plugin_block_seti_test.c @@ -0,0 +1,123 @@ +/* + This file is part of GNUnet + Copyright (C) 2017 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/plugin_block_set_test.c + * @brief set test block, recognizes elements with non-zero first byte as invalid + * @author Christian Grothoff + */ + +#include "platform.h" +#include "gnunet_block_plugin.h" +#include "gnunet_block_group_lib.h" + + +/** + * Function called to validate a reply or a request. For + * request evaluation, simply pass "NULL" for the reply_block. + * + * @param cls closure + * @param ctx block context + * @param type block type + * @param group block group to use + * @param eo control flags + * @param query original query (hash) + * @param xquery extrended query data (can be NULL, depending on type) + * @param xquery_size number of bytes in xquery + * @param reply_block response to validate + * @param reply_block_size number of bytes in reply block + * @return characterization of result + */ +static enum GNUNET_BLOCK_EvaluationResult +block_plugin_set_test_evaluate (void *cls, + struct GNUNET_BLOCK_Context *ctx, + enum GNUNET_BLOCK_Type type, + struct GNUNET_BLOCK_Group *group, + enum GNUNET_BLOCK_EvaluationOptions eo, + const struct GNUNET_HashCode *query, + const void *xquery, + size_t xquery_size, + const void *reply_block, + size_t reply_block_size) +{ + if ((NULL == reply_block) || + (reply_block_size == 0) || + (0 != ((char *) reply_block)[0])) + return GNUNET_BLOCK_EVALUATION_RESULT_INVALID; + return GNUNET_BLOCK_EVALUATION_OK_MORE; +} + + +/** + * Function called to obtain the key for a block. + * + * @param cls closure + * @param type block type + * @param block block to get the key for + * @param block_size number of bytes in block + * @param key set to the key (query) for the given block + * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported + * (or if extracting a key from a block of this type does not work) + */ +static int +block_plugin_set_test_get_key (void *cls, + enum GNUNET_BLOCK_Type type, + const void *block, + size_t block_size, + struct GNUNET_HashCode *key) +{ + return GNUNET_SYSERR; +} + + +/** + * Entry point for the plugin. + */ +void * +libgnunet_plugin_block_set_test_init (void *cls) +{ + static enum GNUNET_BLOCK_Type types[] = { + GNUNET_BLOCK_TYPE_SET_TEST, + GNUNET_BLOCK_TYPE_ANY /* end of list */ + }; + struct GNUNET_BLOCK_PluginFunctions *api; + + api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions); + api->evaluate = &block_plugin_set_test_evaluate; + api->get_key = &block_plugin_set_test_get_key; + api->types = types; + return api; +} + + +/** + * Exit point from the plugin. + */ +void * +libgnunet_plugin_block_set_test_done (void *cls) +{ + struct GNUNET_BLOCK_PluginFunctions *api = cls; + + GNUNET_free (api); + return NULL; +} + + +/* end of plugin_block_set_test.c */ diff --git a/src/seti/seti.conf.in b/src/seti/seti.conf.in new file mode 100644 index 000000000..e4f7b60b5 --- /dev/null +++ b/src/seti/seti.conf.in @@ -0,0 +1,12 @@ +[seti] +START_ON_DEMAND = @START_ON_DEMAND@ +@UNIXONLY@PORT = 2106 +HOSTNAME = localhost +BINARY = gnunet-service-seti +ACCEPT_FROM = 127.0.0.1; +ACCEPT_FROM6 = ::1; +UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-seti.sock +UNIX_MATCH_UID = YES +UNIX_MATCH_GID = YES + +#PREFIX = valgrind diff --git a/src/seti/seti.h b/src/seti/seti.h new file mode 100644 index 000000000..aa7014034 --- /dev/null +++ b/src/seti/seti.h @@ -0,0 +1,267 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2014, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file set/seti.h + * @brief messages used for the set intersection api + * @author Florian Dold + * @author Christian Grothoff + */ +#ifndef SETI_H +#define SETI_H + +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_set_service.h" + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Message sent by the client to the service to ask starting + * a new set to perform operations with. + */ +struct GNUNET_SETI_CreateMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_CREATE + */ + struct GNUNET_MessageHeader header; +}; + + +/** + * Message sent by the client to the service to start listening for + * incoming requests to perform a certain type of set operation for a + * certain type of application. + */ +struct GNUNET_SETI_ListenMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_LISTEN + */ + struct GNUNET_MessageHeader header; + + /** + * Operation type, values of `enum GNUNET_SETI_OperationType` + */ + uint32_t operation GNUNET_PACKED; + + /** + * application id + */ + struct GNUNET_HashCode app_id; +}; + + +/** + * Message sent by a listening client to the service to accept + * performing the operation with the other peer. + */ +struct GNUNET_SETI_AcceptMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_ACCEPT + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the incoming request we want to accept. + */ + uint32_t accept_reject_id GNUNET_PACKED; + + /** + * Request ID to identify responses. + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Return the intersection (1), instead of the elements to + * remove / the delta (0), in NBO. + */ + uint32_t return_intersection; + +}; + + +/** + * Message sent by a listening client to the service to reject + * performing the operation with the other peer. + */ +struct GNUNET_SETI_RejectMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_REJECT + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the incoming request we want to reject. + */ + uint32_t accept_reject_id GNUNET_PACKED; +}; + + +/** + * A request for an operation with another client. + */ +struct GNUNET_SETI_RequestMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_REQUEST. + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the to identify the request when accepting or + * rejecting it. + */ + uint32_t accept_id GNUNET_PACKED; + + /** + * Identity of the requesting peer. + */ + struct GNUNET_PeerIdentity peer_id; + + /* rest: context message, that is, application-specific + message to convince listener to pick up */ +}; + + +/** + * Message sent by client to service to initiate a set operation as a + * client (not as listener). A set (which determines the operation + * type) must already exist in association with this client. + */ +struct GNUNET_SETI_EvaluateMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_EVALUATE + */ + struct GNUNET_MessageHeader header; + + /** + * Id of our set to evaluate, chosen implicitly by the client when it + * calls #GNUNET_SETI_commit(). + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Peer to evaluate the operation with + */ + struct GNUNET_PeerIdentity target_peer; + + /** + * Application id + */ + struct GNUNET_HashCode app_id; + + /** + * Return the intersection (1), instead of the elements to + * remove / the delta (0), in NBO. + */ + uint32_t return_intersection; + + /* rest: context message, that is, application-specific + message to convince listener to pick up */ +}; + + +/** + * Message sent by the service to the client to indicate an + * element that is removed (set intersection) or added + * (set union) or part of the final result, depending on + * options specified for the operation. + */ +struct GNUNET_SETI_ResultMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Current set size. + */ + uint64_t current_size; + + /** + * id the result belongs to + */ + uint32_t request_id GNUNET_PACKED; + + /** + * Was the evaluation successful? Contains + * an `enum GNUNET_SETI_Status` in NBO. + */ + uint16_t result_status GNUNET_PACKED; + + /** + * Type of the element attachted to the message, if any. + */ + uint16_t element_type GNUNET_PACKED; + + /* rest: the actual element */ +}; + + +/** + * Message sent by client to the service to add an element to the set. + */ +struct GNUNET_SETI_ElementMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_ADD. + */ + struct GNUNET_MessageHeader header; + + /** + * Type of the element to add or remove. + */ + uint16_t element_type GNUNET_PACKED; + + /** + * For alignment, always zero. + */ + uint16_t reserved GNUNET_PACKED; + + /* rest: the actual element */ +}; + + +/** + * Sent to the service by the client + * in order to cancel a set operation. + */ +struct GNUNET_SETI_CancelMessage +{ + /** + * Type: #GNUNET_MESSAGE_TYPE_SETI_CANCEL + */ + struct GNUNET_MessageHeader header; + + /** + * ID of the request we want to cancel. + */ + uint32_t request_id GNUNET_PACKED; +}; + + +GNUNET_NETWORK_STRUCT_END + +#endif diff --git a/src/seti/seti_api.c b/src/seti/seti_api.c new file mode 100644 index 000000000..d80a60684 --- /dev/null +++ b/src/seti/seti_api.c @@ -0,0 +1,895 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2016, 2020 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ +/** + * @file seti/seti_api.c + * @brief api for the set service + * @author Florian Dold + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_seti_service.h" +#include "seti.h" + + +#define LOG(kind, ...) GNUNET_log_from (kind, "seti-api", __VA_ARGS__) + + +/** + * Opaque handle to a set. + */ +struct GNUNET_SETI_Handle +{ + /** + * Message queue for @e client. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Linked list of operations on the set. + */ + struct GNUNET_SETI_OperationHandle *ops_head; + + /** + * Linked list of operations on the set. + */ + struct GNUNET_SETI_OperationHandle *ops_tail; + + /** + * Configuration, needed when creating (lazy) copies. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Should the set be destroyed once all operations are gone? + * #GNUNET_SYSERR if #GNUNET_SETI_destroy() must raise this flag, + * #GNUNET_YES if #GNUNET_SETI_destroy() did raise this flag. + */ + int destroy_requested; + + /** + * Has the set become invalid (e.g. service died)? + */ + int invalid; + + /** + * Both client and service count the number of iterators + * created so far to match replies with iterators. + */ + uint16_t iteration_id; + +}; + + +/** + * Handle for a set operation request from another peer. + */ +struct GNUNET_SETI_Request +{ + /** + * Id of the request, used to identify the request when + * accepting/rejecting it. + */ + uint32_t accept_id; + + /** + * Has the request been accepted already? + * #GNUNET_YES/#GNUNET_NO + */ + int accepted; +}; + + +/** + * Handle to an operation. Only known to the service after committing + * the handle with a set. + */ +struct GNUNET_SETI_OperationHandle +{ + /** + * Function to be called when we have a result, + * or an error. + */ + GNUNET_SETI_ResultIterator result_cb; + + /** + * Closure for @e result_cb. + */ + void *result_cls; + + /** + * Local set used for the operation, + * NULL if no set has been provided by conclude yet. + */ + struct GNUNET_SETI_Handle *set; + + /** + * Message sent to the server on calling conclude, + * NULL if conclude has been called. + */ + struct GNUNET_MQ_Envelope *conclude_mqm; + + /** + * Address of the request if in the conclude message, + * used to patch the request id into the message when the set is known. + */ + uint32_t *request_id_addr; + + /** + * Handles are kept in a linked list. + */ + struct GNUNET_SETI_OperationHandle *prev; + + /** + * Handles are kept in a linked list. + */ + struct GNUNET_SETI_OperationHandle *next; + + /** + * Request ID to identify the operation within the set. + */ + uint32_t request_id; + + /** + * Should we return the resulting intersection (ADD) or + * the elements to remove (DEL)? + */ + int return_intersection; +}; + + +/** + * Opaque handle to a listen operation. + */ +struct GNUNET_SETI_ListenHandle +{ + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle*mq; + + /** + * Configuration handle for the listener, stored + * here to be able to reconnect transparently on + * connection failure. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Function to call on a new incoming request, + * or on error. + */ + GNUNET_SETI_ListenCallback listen_cb; + + /** + * Closure for @e listen_cb. + */ + void *listen_cls; + + /** + * Task for reconnecting when the listener fails. + */ + struct GNUNET_SCHEDULER_Task *reconnect_task; + + /** + * Application ID we listen for. + */ + struct GNUNET_HashCode app_id; + + /** + * Time to wait until we try to reconnect on failure. + */ + struct GNUNET_TIME_Relative reconnect_backoff; + +}; + + +/** + * Check that the given @a msg is well-formed. + * + * @param cls closure + * @param msg message to check + * @return #GNUNET_OK if message is well-formed + */ +static int +check_result (void *cls, + const struct GNUNET_SETI_ResultMessage *msg) +{ + /* minimum size was already checked, everything else is OK! */ + return GNUNET_OK; +} + + +/** + * Handle result message for a set operation. + * + * @param cls the set + * @param mh the message + */ +static void +handle_result (void *cls, + const struct GNUNET_SETI_ResultMessage *msg) +{ + struct GNUNET_SETI_Handle *set = cls; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_SETI_Element e; + enum GNUNET_SETI_Status result_status; + int destroy_set; + + GNUNET_assert (NULL != set->mq); + result_status = (enum GNUNET_SETI_Status) ntohs (msg->result_status); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got result message with status %d\n", + result_status); + oh = GNUNET_MQ_assoc_get (set->mq, + ntohl (msg->request_id)); + if (NULL == oh) + { + /* 'oh' can be NULL if we canceled the operation, but the service + did not get the cancel message yet. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring result from canceled operation\n"); + return; + } + + switch (result_status) + { + case GNUNET_SETI_STATUS_ADD_LOCAL: + case GNUNET_SETI_STATUS_DEL_LOCAL: + e.data = &msg[1]; + e.size = ntohs (msg->header.size) + - sizeof(struct GNUNET_SETI_ResultMessage); + e.element_type = ntohs (msg->element_type); + if (NULL != oh->result_cb) + oh->result_cb (oh->result_cls, + &e, + GNUNET_ntohll (msg->current_size), + result_status); + return; + case GNUNET_SETI_STATUS_FAILURE: + case GNUNET_SETI_STATUS_DONE: + GNUNET_MQ_assoc_remove (set->mq, + ntohl (msg->request_id)); + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + oh); + /* Need to do this calculation _before_ the result callback, + as IF the application still has a valid set handle, it + may trigger destruction of the set during the callback. */ + destroy_set = (GNUNET_YES == set->destroy_requested) && + (NULL == set->ops_head); + if (NULL != oh->result_cb) + { + oh->result_cb (oh->result_cls, + NULL, + GNUNET_ntohll (msg->current_size), + result_status); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "No callback for final status\n"); + } + if (destroy_set) + GNUNET_SETI_destroy (set); + GNUNET_free (oh); + return; + } +} + + +/** + * Destroy the given set operation. + * + * @param oh set operation to destroy + */ +static void +set_operation_destroy (struct GNUNET_SETI_OperationHandle *oh) +{ + struct GNUNET_SETI_Handle *set = oh->set; + struct GNUNET_SETI_OperationHandle *h_assoc; + + if (NULL != oh->conclude_mqm) + GNUNET_MQ_discard (oh->conclude_mqm); + /* is the operation already commited? */ + if (NULL != set) + { + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + oh); + h_assoc = GNUNET_MQ_assoc_remove (set->mq, + oh->request_id); + GNUNET_assert ((NULL == h_assoc) || + (h_assoc == oh)); + } + GNUNET_free (oh); +} + + +/** + * Cancel the given set operation. We need to send an explicit cancel + * message, as all operations one one set communicate using one + * handle. + * + * @param oh set operation to cancel + */ +void +GNUNET_SETI_operation_cancel (struct GNUNET_SETI_OperationHandle *oh) +{ + struct GNUNET_SETI_Handle *set = oh->set; + struct GNUNET_SETI_CancelMessage *m; + struct GNUNET_MQ_Envelope *mqm; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling SET operation\n"); + if (NULL != set) + { + mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETI_CANCEL); + m->request_id = htonl (oh->request_id); + GNUNET_MQ_send (set->mq, mqm); + } + set_operation_destroy (oh); + if ((NULL != set) && + (GNUNET_YES == set->destroy_requested) && + (NULL == set->ops_head)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying set after operation cancel\n"); + GNUNET_SETI_destroy (set); + } +} + + +/** + * We encountered an error communicating with the set service while + * performing a set operation. Report to the application. + * + * @param cls the `struct GNUNET_SETI_Handle` + * @param error error code + */ +static void +handle_client_set_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SETI_Handle *set = cls; + + LOG (GNUNET_ERROR_TYPE_ERROR, + "Handling client set error %d\n", + error); + while (NULL != set->ops_head) + { + if ((NULL != set->ops_head->result_cb) && + (GNUNET_NO == set->destroy_requested)) + set->ops_head->result_cb (set->ops_head->result_cls, + NULL, + 0, + GNUNET_SETI_STATUS_FAILURE); + set_operation_destroy (set->ops_head); + } + set->invalid = GNUNET_YES; +} + + +/** + * Create an empty set. + * + * @param cfg configuration to use for connecting to the + * set service + * @return a handle to the set + */ +struct GNUNET_SETI_Handle * +GNUNET_SETI_create (const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_SETI_Handle *set = GNUNET_new (struct GNUNET_SETI_Handle); + struct GNUNET_MQ_MessageHandler mq_handlers[] = { + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_SETI_RESULT, + struct GNUNET_SETI_ResultMessage, + set), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_CreateMessage *create_msg; + + set->cfg = cfg; + set->mq = GNUNET_CLIENT_connect (cfg, + "set", + mq_handlers, + &handle_client_set_error, + set); + if (NULL == set->mq) + { + GNUNET_free (set); + return NULL; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new intersection set\n"); + mqm = GNUNET_MQ_msg (create_msg, + GNUNET_MESSAGE_TYPE_SETI_CREATE); + GNUNET_MQ_send (set->mq, + mqm); + return set; +} + + +/** + * Add an element to the given set. After the element has been added + * (in the sense of being transmitted to the set service), @a cont + * will be called. Multiple calls to GNUNET_SETI_add_element() can be + * queued. + * + * @param set set to add element to + * @param element element to add to the set + * @param cb continuation called after the element has been added + * @param cb_cls closure for @a cont + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETI_add_element (struct GNUNET_SETI_Handle *set, + const struct GNUNET_SETI_Element *element, + GNUNET_SCHEDULER_TaskCallback cb, + void *cb_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_ElementMessage *msg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "adding element of type %u to set %p\n", + (unsigned int) element->element_type, + set); + if (GNUNET_YES == set->invalid) + { + if (NULL != cb) + cb (cb_cls); + return GNUNET_SYSERR; + } + mqm = GNUNET_MQ_msg_extra (msg, + element->size, + GNUNET_MESSAGE_TYPE_SETI_ADD); + msg->element_type = htons (element->element_type); + GNUNET_memcpy (&msg[1], + element->data, + element->size); + GNUNET_MQ_notify_sent (mqm, + cb, + cb_cls); + GNUNET_MQ_send (set->mq, + mqm); + return GNUNET_OK; +} + + +/** + * Destroy the set handle if no operations are left, mark the set + * for destruction otherwise. + * + * @param set set handle to destroy + */ +void +GNUNET_SETI_destroy (struct GNUNET_SETI_Handle *set) +{ + /* destroying set while iterator is active is currently + not supported; we should expand the API to allow + clients to explicitly cancel the iteration! */ + if ((NULL != set->ops_head) || + (GNUNET_SYSERR == set->destroy_requested)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Set operations are pending, delaying set destruction\n"); + set->destroy_requested = GNUNET_YES; + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Really destroying set\n"); + if (NULL != set->mq) + { + GNUNET_MQ_destroy (set->mq); + set->mq = NULL; + } + GNUNET_free (set); +} + + +/** + * Prepare a set operation to be evaluated with another peer. + * The evaluation will not start until the client provides + * a local set with #GNUNET_SETI_commit(). + * + * @param other_peer peer with the other set + * @param app_id hash for the application using the set + * @param context_msg additional information for the request + * @param options options to use when processing the request + * @param result_cb called on error or success + * @param result_cls closure for @e result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETI_OperationHandle * +GNUNET_SETI_prepare (const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_HashCode *app_id, + const struct GNUNET_MessageHeader *context_msg, + const struct GNUNET_SETI_Option options[], + GNUNET_SETI_ResultIterator result_cb, + void *result_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_SETI_EvaluateMessage *msg; + + oh = GNUNET_new (struct GNUNET_SETI_OperationHandle); + oh->result_cb = result_cb; + oh->result_cls = result_cls; + mqm = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SETI_EVALUATE, + context_msg); + msg->app_id = *app_id; + msg->target_peer = *other_peer; + for (const struct GNUNET_SETI_Option *opt = options; + GNUNET_SETI_OPTION_END != opt->type; + opt++) + { + switch (opt->type) + { + case GNUNET_SETI_OPTION_RETURN_INTERSECTION: + msg->return_intersection = GNUNET_YES; + break; + default: + LOG (GNUNET_ERROR_TYPE_ERROR, + "Option with type %d not recognized\n", + (int) opt->type); + } + } + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; + return oh; +} + + +/** + * Connect to the set service in order to listen for requests. + * + * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect + */ +static void +listen_connect (void *cls); + + +/** + * Check validity of request message for a listen operation + * + * @param cls the listen handle + * @param msg the message + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_request (void *cls, + const struct GNUNET_SETI_RequestMessage *msg) +{ + const struct GNUNET_MessageHeader *context_msg; + + if (ntohs (msg->header.size) == sizeof(*msg)) + return GNUNET_OK; /* no context message is OK */ + context_msg = GNUNET_MQ_extract_nested_mh (msg); + if (NULL == context_msg) + { + /* malformed context message is NOT ok */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle request message for a listen operation + * + * @param cls the listen handle + * @param msg the message + */ +static void +handle_request (void *cls, + const struct GNUNET_SETI_RequestMessage *msg) +{ + struct GNUNET_SETI_ListenHandle *lh = cls; + struct GNUNET_SETI_Request req; + const struct GNUNET_MessageHeader *context_msg; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_RejectMessage *rmsg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Processing incoming operation request with id %u\n", + ntohl (msg->accept_id)); + /* we got another valid request => reset the backoff */ + lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + req.accept_id = ntohl (msg->accept_id); + req.accepted = GNUNET_NO; + context_msg = GNUNET_MQ_extract_nested_mh (msg); + /* calling #GNUNET_SETI_accept() in the listen cb will set req->accepted */ + lh->listen_cb (lh->listen_cls, + &msg->peer_id, + context_msg, + &req); + if (GNUNET_YES == req.accepted) + return; /* the accept-case is handled in #GNUNET_SETI_accept() */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Rejected request %u\n", + ntohl (msg->accept_id)); + mqm = GNUNET_MQ_msg (rmsg, + GNUNET_MESSAGE_TYPE_SETI_REJECT); + rmsg->accept_reject_id = msg->accept_id; + GNUNET_MQ_send (lh->mq, + mqm); +} + + +/** + * Our connection with the set service encountered an error, + * re-initialize with exponential back-off. + * + * @param cls the `struct GNUNET_SETI_ListenHandle *` + * @param error reason for the disconnect + */ +static void +handle_client_listener_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SETI_ListenHandle *lh = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Listener broke down (%d), re-connecting\n", + (int) error); + GNUNET_MQ_destroy (lh->mq); + lh->mq = NULL; + lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, + &listen_connect, + lh); + lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff); +} + + +/** + * Connect to the set service in order to listen for requests. + * + * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect + */ +static void +listen_connect (void *cls) +{ + struct GNUNET_SETI_ListenHandle *lh = cls; + struct GNUNET_MQ_MessageHandler mq_handlers[] = { + GNUNET_MQ_hd_var_size (request, + GNUNET_MESSAGE_TYPE_SETI_REQUEST, + struct GNUNET_SETI_RequestMessage, + lh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_ListenMessage *msg; + + lh->reconnect_task = NULL; + GNUNET_assert (NULL == lh->mq); + lh->mq = GNUNET_CLIENT_connect (lh->cfg, + "set", + mq_handlers, + &handle_client_listener_error, + lh); + if (NULL == lh->mq) + return; + mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETI_LISTEN); + msg->app_id = lh->app_id; + GNUNET_MQ_send (lh->mq, + mqm); +} + + +/** + * Wait for set operation requests for the given application id + * + * @param cfg configuration to use for connecting to + * the set service, needs to be valid for the lifetime of the listen handle + * @param app_id id of the application that handles set operation requests + * @param listen_cb called for each incoming request matching the operation + * and application id + * @param listen_cls handle for @a listen_cb + * @return a handle that can be used to cancel the listen operation + */ +struct GNUNET_SETI_ListenHandle * +GNUNET_SETI_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HashCode *app_id, + GNUNET_SETI_ListenCallback listen_cb, + void *listen_cls) +{ + struct GNUNET_SETI_ListenHandle *lh; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Starting listener for app %s\n", + GNUNET_h2s (app_id)); + lh = GNUNET_new (struct GNUNET_SETI_ListenHandle); + lh->listen_cb = listen_cb; + lh->listen_cls = listen_cls; + lh->cfg = cfg; + lh->app_id = *app_id; + lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + listen_connect (lh); + if (NULL == lh->mq) + { + GNUNET_free (lh); + return NULL; + } + return lh; +} + + +/** + * Cancel the given listen operation. + * + * @param lh handle for the listen operation + */ +void +GNUNET_SETI_listen_cancel (struct GNUNET_SETI_ListenHandle *lh) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Canceling listener %s\n", + GNUNET_h2s (&lh->app_id)); + if (NULL != lh->mq) + { + GNUNET_MQ_destroy (lh->mq); + lh->mq = NULL; + } + if (NULL != lh->reconnect_task) + { + GNUNET_SCHEDULER_cancel (lh->reconnect_task); + lh->reconnect_task = NULL; + } + GNUNET_free (lh); +} + + +/** + * Accept a request we got via #GNUNET_SETI_listen. Must be called during + * #GNUNET_SETI_listen, as the 'struct GNUNET_SETI_Request' becomes invalid + * afterwards. + * Call #GNUNET_SETI_commit to provide the local set to use for the operation, + * and to begin the exchange with the remote peer. + * + * @param request request to accept + * @param options options to use when processing the request + * @param result_cb callback for the results + * @param result_cls closure for @a result_cb + * @return a handle to cancel the operation + */ +struct GNUNET_SETI_OperationHandle * +GNUNET_SETI_accept (struct GNUNET_SETI_Request *request, + const struct GNUNET_SETI_Option options[], + GNUNET_SETI_ResultIterator result_cb, + void *result_cls) +{ + struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_SETI_OperationHandle *oh; + struct GNUNET_SETI_AcceptMessage *msg; + + GNUNET_assert (GNUNET_NO == request->accepted); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client accepts set intersection operation with id %u\n", + request->accept_id); + request->accepted = GNUNET_YES; + mqm = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SETI_ACCEPT); + msg->accept_reject_id = htonl (request->accept_id); + oh = GNUNET_new (struct GNUNET_SETI_OperationHandle); + oh->result_cb = result_cb; + oh->result_cls = result_cls; + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; + for (const struct GNUNET_SETI_Option *opt = options; + GNUNET_SETI_OPTION_END != opt->type; + opt++) + { + switch (opt->type) + { + case GNUNET_SETI_OPTION_RETURN_INTERSECTION: + oh->return_intersection = GNUNET_YES; + break; + default: + LOG (GNUNET_ERROR_TYPE_ERROR, + "Option with type %d not recognized\n", + (int) opt->type); + } + } + return oh; +} + + +/** + * Commit a set to be used with a set operation. + * This function is called once we have fully constructed + * the set that we want to use for the operation. At this + * time, the P2P protocol can then begin to exchange the + * set information and call the result callback with the + * result information. + * + * @param oh handle to the set operation + * @param set the set to use for the operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the + * set is invalid (e.g. the set service crashed) + */ +int +GNUNET_SETI_commit (struct GNUNET_SETI_OperationHandle *oh, + struct GNUNET_SETI_Handle *set) +{ + if (NULL != oh->set) + { + /* Some other set was already committed for this + * operation, there is a logic bug in the client of this API */ + GNUNET_break (0); + return GNUNET_OK; + } + GNUNET_assert (NULL != set); + if (GNUNET_YES == set->invalid) + return GNUNET_SYSERR; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client commits to SET\n"); + GNUNET_assert (NULL != oh->conclude_mqm); + oh->set = set; + GNUNET_CONTAINER_DLL_insert (set->ops_head, + set->ops_tail, + oh); + oh->request_id = GNUNET_MQ_assoc_add (set->mq, + oh); + *oh->request_id_addr = htonl (oh->request_id); + GNUNET_MQ_send (set->mq, + oh->conclude_mqm); + oh->conclude_mqm = NULL; + oh->request_id_addr = NULL; + return GNUNET_OK; +} + + +/** + * Hash a set element. + * + * @param element the element that should be hashed + * @param[out] ret_hash a pointer to where the hash of @a element + * should be stored + */ +void +GNUNET_SETI_element_hash (const struct GNUNET_SETI_Element *element, + struct GNUNET_HashCode *ret_hash) +{ + struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start (); + + /* It's not guaranteed that the element data is always after the element header, + so we need to hash the chunks separately. */ + GNUNET_CRYPTO_hash_context_read (ctx, + &element->size, + sizeof(uint16_t)); + GNUNET_CRYPTO_hash_context_read (ctx, + &element->element_type, + sizeof(uint16_t)); + GNUNET_CRYPTO_hash_context_read (ctx, + element->data, + element->size); + GNUNET_CRYPTO_hash_context_finish (ctx, + ret_hash); +} + + +/* end of seti_api.c */ diff --git a/src/seti/test_seti.conf b/src/seti/test_seti.conf new file mode 100644 index 000000000..21fe984f8 --- /dev/null +++ b/src/seti/test_seti.conf @@ -0,0 +1,33 @@ +@INLINE@ ../../contrib/conf/gnunet/no_forcestart.conf + +[PATHS] +GNUNET_TEST_HOME = $GNUNET_TMP/test-gnunet-set/ + +[set] +START_ON_DEMAND = YES +#PREFIX = valgrind --leak-check=full +#PREFIX = gdbserver :1234 +OPTIONS = -L INFO + +[transport] +PLUGINS = unix +OPTIONS = -LERROR + +[nat] +RETURN_LOCAL_ADDRESSES = YES +DISABLEV6 = YES +USE_LOCALADDR = YES + +[peerinfo] +NO_IO = YES + +[nat] +# Use addresses from the local network interfaces (inluding loopback, but also others) +USE_LOCALADDR = YES + +# Disable IPv6 support +DISABLEV6 = NO + +# Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8) +RETURN_LOCAL_ADDRESSES = YES + diff --git a/src/seti/test_seti_api.c b/src/seti/test_seti_api.c new file mode 100644 index 000000000..42dedb846 --- /dev/null +++ b/src/seti/test_seti_api.c @@ -0,0 +1,393 @@ +/* + This file is part of GNUnet. + Copyright (C) 2012-2014 GNUnet e.V. + + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file set/test_set_intersection_result_full.c + * @brief testcase for full result mode of the intersection set operation + * @author Christian Fuchs + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_testing_lib.h" +#include "gnunet_set_service.h" + + +static int ret; + +static struct GNUNET_PeerIdentity local_id; + +static struct GNUNET_HashCode app_id; + +static struct GNUNET_SET_Handle *set1; + +static struct GNUNET_SET_Handle *set2; + +static struct GNUNET_SET_ListenHandle *listen_handle; + +static const struct GNUNET_CONFIGURATION_Handle *config; + +static int iter_count; + +static struct GNUNET_SCHEDULER_Task *tt; + +static struct GNUNET_SET_OperationHandle *oh1; + +static struct GNUNET_SET_OperationHandle *oh2; + + +static void +result_cb_set1 (void *cls, + const struct GNUNET_SET_Element *element, + uint64_t current_size, + enum GNUNET_SET_Status status) +{ + static int count; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Processing result set 1 (%d)\n", + status); + switch (status) + { + case GNUNET_SET_STATUS_OK: + count++; + break; + + case GNUNET_SET_STATUS_FAILURE: + oh1 = NULL; + ret = 1; + break; + + case GNUNET_SET_STATUS_DONE: + oh1 = NULL; + GNUNET_assert (1 == count); + GNUNET_SET_destroy (set1); + set1 = NULL; + if (NULL == set2) + GNUNET_SCHEDULER_shutdown (); + break; + + default: + GNUNET_assert (0); + } +} + + +static void +result_cb_set2 (void *cls, + const struct GNUNET_SET_Element *element, + uint64_t current_size, + enum GNUNET_SET_Status status) +{ + static int count; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Processing result set 2 (%d)\n", + status); + switch (status) + { + case GNUNET_SET_STATUS_OK: + count++; + break; + + case GNUNET_SET_STATUS_FAILURE: + oh2 = NULL; + ret = 1; + break; + + case GNUNET_SET_STATUS_DONE: + oh2 = NULL; + GNUNET_assert (1 == count); + GNUNET_SET_destroy (set2); + set2 = NULL; + if (NULL == set1) + GNUNET_SCHEDULER_shutdown (); + break; + + default: + GNUNET_assert (0); + } +} + + +static void +listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SET_Request *request) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "starting intersection by accepting and committing\n"); + GNUNET_assert (NULL != context_msg); + GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY); + oh2 = GNUNET_SET_accept (request, + GNUNET_SET_RESULT_FULL, + (struct GNUNET_SET_Option[]) { 0 }, + &result_cb_set2, + NULL); + GNUNET_SET_commit (oh2, + set2); +} + + +/** + * Start the set operation. + * + * @param cls closure, unused + */ +static void +start (void *cls) +{ + struct GNUNET_MessageHeader context_msg; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "starting listener\n"); + context_msg.size = htons (sizeof context_msg); + context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY); + listen_handle = GNUNET_SET_listen (config, + GNUNET_SET_OPERATION_INTERSECTION, + &app_id, + &listen_cb, + NULL); + oh1 = GNUNET_SET_prepare (&local_id, + &app_id, + &context_msg, + GNUNET_SET_RESULT_FULL, + (struct GNUNET_SET_Option[]) { 0 }, + &result_cb_set1, + NULL); + GNUNET_SET_commit (oh1, + set1); +} + + +/** + * Initialize the second set, continue + * + * @param cls closure, unused + */ +static void +init_set2 (void *cls) +{ + struct GNUNET_SET_Element element; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "initializing set 2\n"); + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set2, + &element, + NULL, + NULL); + element.data = "quux"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set2, + &element, + NULL, + NULL); + element.data = "baz"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set2, + &element, + &start, + NULL); +} + + +/** + * Initialize the first set, continue. + */ +static void +init_set1 (void) +{ + struct GNUNET_SET_Element element; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "initializing set 1\n"); + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set1, + &element, + NULL, + NULL); + element.data = "bar"; + element.size = strlen (element.data); + GNUNET_SET_add_element (set1, + &element, + &init_set2, + NULL); +} + + +static int +iter_cb (void *cls, + const struct GNUNET_SET_Element *element) +{ + if (NULL == element) + { + GNUNET_assert (iter_count == 3); + GNUNET_SET_destroy (cls); + return GNUNET_YES; + } + iter_count++; + return GNUNET_YES; +} + + +static void +test_iter () +{ + struct GNUNET_SET_Element element; + struct GNUNET_SET_Handle *iter_set; + + iter_set = GNUNET_SET_create (config, + GNUNET_SET_OPERATION_INTERSECTION); + element.element_type = 0; + element.data = "hello"; + element.size = strlen (element.data); + GNUNET_SET_add_element (iter_set, + &element, + NULL, + NULL); + element.data = "bar"; + element.size = strlen (element.data); + GNUNET_SET_add_element (iter_set, + &element, + NULL, + NULL); + element.data = "quux"; + element.size = strlen (element.data); + GNUNET_SET_add_element (iter_set, + &element, + NULL, + NULL); + GNUNET_SET_iterate (iter_set, + &iter_cb, + iter_set); +} + + +/** + * Function run on shutdown. + * + * @param cls closure + */ +static void +do_shutdown (void *cls) +{ + if (NULL != tt) + { + GNUNET_SCHEDULER_cancel (tt); + tt = NULL; + } + if (NULL != oh1) + { + GNUNET_SET_operation_cancel (oh1); + oh1 = NULL; + } + if (NULL != oh2) + { + GNUNET_SET_operation_cancel (oh2); + oh2 = NULL; + } + if (NULL != set1) + { + GNUNET_SET_destroy (set1); + set1 = NULL; + } + if (NULL != set2) + { + GNUNET_SET_destroy (set2); + set2 = NULL; + } + if (NULL != listen_handle) + { + GNUNET_SET_listen_cancel (listen_handle); + listen_handle = NULL; + } +} + + +/** + * Function run on timeout. + * + * @param cls closure + */ +static void +timeout_fail (void *cls) +{ + tt = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Testcase failed with timeout\n"); + GNUNET_SCHEDULER_shutdown (); + ret = 1; +} + + +/** + * Signature of the 'main' function for a (single-peer) testcase that + * is run using 'GNUNET_TESTING_peer_run'. + * + * @param cls closure + * @param cfg configuration of the peer that was started + * @param peer identity of the peer that was created + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +{ + config = cfg; + GNUNET_TESTING_peer_get_identity (peer, + &local_id); + if (0) + test_iter (); + + tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( + GNUNET_TIME_UNIT_SECONDS, 5), + &timeout_fail, + NULL); + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, + NULL); + + set1 = GNUNET_SET_create (cfg, + GNUNET_SET_OPERATION_INTERSECTION); + set2 = GNUNET_SET_create (cfg, + GNUNET_SET_OPERATION_INTERSECTION); + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, + &app_id); + + /* test the real set reconciliation */ + init_set1 (); +} + + +int +main (int argc, + char **argv) +{ + if (0 != GNUNET_TESTING_peer_run ("test_set_intersection_result_full", + "test_set.conf", + &run, NULL)) + return 1; + return ret; +} -- cgit v1.2.3