aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Schanzenbach <mschanzenbach@posteo.de>2020-08-16 21:54:40 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2020-08-16 21:54:52 +0200
commit7cb5ed9817617de03b9238d4ee0df0262050b036 (patch)
tree318f9081dedd73391b6d5bc97022584e584e9b92
parent4dca8c3557a50ae5be9c105aa5fb2f85d99d917a (diff)
parentbe0475f2a583d203465d3091ff933806a5ace613 (diff)
downloadgnunet-7cb5ed9817617de03b9238d4ee0df0262050b036.tar.gz
gnunet-7cb5ed9817617de03b9238d4ee0df0262050b036.zip
Merge branch 'master' of ssh://gnunet.org/gnunet
-rw-r--r--configure.ac2
-rw-r--r--po/POTFILES.in19
-rw-r--r--src/include/gnunet_protocols.h132
-rw-r--r--src/setu/Makefile.am102
-rw-r--r--src/setu/gnunet-service-setu.c3482
-rw-r--r--src/setu/gnunet-service-setu.h393
-rw-r--r--src/setu/gnunet-service-setu_protocol.h226
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.c303
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.h169
-rw-r--r--src/setu/gnunet-setu-ibf-profiler.c308
-rw-r--r--src/setu/gnunet-setu-profiler.c499
-rw-r--r--src/setu/ibf.c409
-rw-r--r--src/setu/ibf.h255
-rw-r--r--src/setu/ibf_sim.c142
-rw-r--r--src/setu/plugin_block_setu_test.c123
-rw-r--r--src/setu/setu.h304
-rw-r--r--src/setu/setu_api.c872
-rw-r--r--src/setu/test_setu_api.c360
18 files changed, 8100 insertions, 0 deletions
diff --git a/configure.ac b/configure.ac
index e492242a6..bb205220c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1937,6 +1937,8 @@ src/scalarproduct/Makefile
1937src/scalarproduct/scalarproduct.conf 1937src/scalarproduct/scalarproduct.conf
1938src/set/Makefile 1938src/set/Makefile
1939src/set/set.conf 1939src/set/set.conf
1940src/setu/Makefile
1941src/setu/setu.conf
1940src/sq/Makefile 1942src/sq/Makefile
1941src/statistics/Makefile 1943src/statistics/Makefile
1942src/statistics/statistics.conf 1944src/statistics/statistics.conf
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 1f5cc81c3..c5503c343 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -333,6 +333,25 @@ src/set/ibf.c
333src/set/ibf_sim.c 333src/set/ibf_sim.c
334src/set/plugin_block_set_test.c 334src/set/plugin_block_set_test.c
335src/set/set_api.c 335src/set/set_api.c
336src/seti/gnunet-service-set.c
337src/seti/gnunet-service-set_intersection.c
338src/seti/gnunet-service-set_union.c
339src/seti/gnunet-service-set_union_strata_estimator.c
340src/seti/gnunet-set-ibf-profiler.c
341src/seti/gnunet-set-profiler.c
342src/seti/ibf.c
343src/seti/ibf_sim.c
344src/seti/plugin_block_set_test.c
345src/seti/set_api.c
346src/setu/gnunet-service-set_union.c
347src/setu/gnunet-service-set_union_strata_estimator.c
348src/setu/gnunet-service-setu.c
349src/setu/gnunet-setu-ibf-profiler.c
350src/setu/gnunet-setu-profiler.c
351src/setu/ibf.c
352src/setu/ibf_sim.c
353src/setu/plugin_block_setu_test.c
354src/setu/setu_api.c
336src/sq/sq.c 355src/sq/sq.c
337src/sq/sq_exec.c 356src/sq/sq_exec.c
338src/sq/sq_prepare.c 357src/sq/sq_prepare.c
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 5af58664f..c3fcde0b9 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1659,9 +1659,141 @@ extern "C" {
1659 1659
1660 1660
1661/******************************************************************************* 1661/*******************************************************************************
1662 * SETU message types
1663 ******************************************************************************/
1664
1665
1666/**
1667 * Cancel a set operation
1668 */
1669#define GNUNET_MESSAGE_TYPE_SETU_CANCEL 550
1670
1671/**
1672 * Add element to set
1673 */
1674#define GNUNET_MESSAGE_TYPE_SETU_ADD 551
1675
1676/**
1677 * Create a new local set
1678 */
1679#define GNUNET_MESSAGE_TYPE_SETU_CREATE 552
1680
1681/**
1682 * Handle result message from operation
1683 */
1684#define GNUNET_MESSAGE_TYPE_SETU_RESULT 553
1685
1686/**
1687 * Evaluate a set operation
1688 */
1689#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE 554
1690
1691/**
1692 * Listen for operation requests
1693 */
1694#define GNUNET_MESSAGE_TYPE_SETU_LISTEN 555
1695
1696/**
1697 * Reject a set request.
1698 */
1699#define GNUNET_MESSAGE_TYPE_SETU_REJECT 556
1700
1701/**
1702 * Accept an incoming set request
1703 */
1704#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT 557
1705
1706/**
1707 * Notify the client of an incoming request from a remote peer
1708 */
1709#define GNUNET_MESSAGE_TYPE_SETU_REQUEST 558
1710
1711
1712/**
1713 * Demand the whole element from the other
1714 * peer, given only the hash code.
1715 */
1716#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 565
1717
1718/**
1719 * Demand the whole element from the other
1720 * peer, given only the hash code.
1721 */
1722#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 566
1723
1724/**
1725 * Tell the other peer to send us a list of
1726 * hashes that match an IBF key.
1727 */
1728#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 567
1729
1730/**
1731 * Tell the other peer which hashes match a
1732 * given IBF key.
1733 */
1734#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 568
1735
1736/**
1737 * Request a set union operation from a remote peer.
1738 */
1739#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 581
1740
1741/**
1742 * Strata estimator.
1743 */
1744#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 582
1745
1746/**
1747 * Invertible bloom filter.
1748 */
1749#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 583
1750
1751/**
1752 * Actual set elements.
1753 */
1754#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 584
1755
1756/**
1757 * Requests for the elements with the given hashes.
1758 */
1759#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 585
1760
1761/**
1762 * Set operation is done.
1763 */
1764#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 586
1765
1766/**
1767 * Compressed strata estimator.
1768 */
1769#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 590
1770
1771/**
1772 * Request all missing elements from the other peer,
1773 * based on their sets and the elements we previously sent
1774 * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
1775 */
1776#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 597
1777
1778/**
1779 * Send a set element, not as response to a demand but because
1780 * we're sending the full set.
1781 */
1782#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 598
1783
1784/**
1785 * Request all missing elements from the other peer,
1786 * based on their sets and the elements we previously sent
1787 * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
1788 */
1789#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 599
1790
1791
1792/*******************************************************************************
1662 * SET message types 1793 * SET message types
1663 ******************************************************************************/ 1794 ******************************************************************************/
1664 1795
1796
1665/** 1797/**
1666 * Demand the whole element from the other 1798 * Demand the whole element from the other
1667 * peer, given only the hash code. 1799 * peer, given only the hash code.
diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am
new file mode 100644
index 000000000..b37ceba51
--- /dev/null
+++ b/src/setu/Makefile.am
@@ -0,0 +1,102 @@
1# This Makefile.am is in the public domain
2AM_CPPFLAGS = -I$(top_srcdir)/src/include
3
4pkgcfgdir= $(pkgdatadir)/config.d/
5
6libexecdir= $(pkglibdir)/libexec/
7
8plugindir = $(libdir)/gnunet
9
10pkgcfg_DATA = \
11 setu.conf
12
13if USE_COVERAGE
14 AM_CFLAGS = -fprofile-arcs -ftest-coverage
15endif
16
17if HAVE_TESTING
18bin_PROGRAMS = \
19 gnunet-setu-profiler
20
21noinst_PROGRAMS = \
22 gnunet-setu-ibf-profiler
23endif
24
25libexec_PROGRAMS = \
26 gnunet-service-setu
27
28lib_LTLIBRARIES = \
29 libgnunetsetu.la
30
31gnunet_setu_profiler_SOURCES = \
32 gnunet-setu-profiler.c
33gnunet_setu_profiler_LDADD = \
34 $(top_builddir)/src/util/libgnunetutil.la \
35 $(top_builddir)/src/statistics/libgnunetstatistics.la \
36 libgnunetsetu.la \
37 $(top_builddir)/src/testing/libgnunettesting.la \
38 $(GN_LIBINTL)
39
40
41gnunet_setu_ibf_profiler_SOURCES = \
42 gnunet-setu-ibf-profiler.c \
43 ibf.c
44gnunet_setu_ibf_profiler_LDADD = \
45 $(top_builddir)/src/util/libgnunetutil.la \
46 $(GN_LIBINTL)
47
48gnunet_service_setu_SOURCES = \
49 gnunet-service-setu.c gnunet-service-setu.h \
50 ibf.c ibf.h \
51 gnunet-service-setu_strata_estimator.c gnunet-service-setu_strata_estimator.h \
52 gnunet-service-setu_protocol.h
53gnunet_service_setu_LDADD = \
54 $(top_builddir)/src/util/libgnunetutil.la \
55 $(top_builddir)/src/statistics/libgnunetstatistics.la \
56 $(top_builddir)/src/core/libgnunetcore.la \
57 $(top_builddir)/src/cadet/libgnunetcadet.la \
58 $(top_builddir)/src/block/libgnunetblock.la \
59 libgnunetsetu.la \
60 $(GN_LIBINTL)
61
62libgnunetsetu_la_SOURCES = \
63 setu_api.c setu.h
64libgnunetsetu_la_LIBADD = \
65 $(top_builddir)/src/util/libgnunetutil.la \
66 $(LTLIBINTL)
67libgnunetsetu_la_LDFLAGS = \
68 $(GN_LIB_LDFLAGS)
69
70if HAVE_TESTING
71check_PROGRAMS = \
72 test_setu_api
73endif
74
75if ENABLE_TEST_RUN
76AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME;
77TESTS = $(check_PROGRAMS)
78endif
79
80test_setu_api_SOURCES = \
81 test_setu_api.c
82test_setu_api_LDADD = \
83 $(top_builddir)/src/util/libgnunetutil.la \
84 $(top_builddir)/src/testing/libgnunettesting.la \
85 libgnunetsetu.la
86
87plugin_LTLIBRARIES = \
88 libgnunet_plugin_block_setu_test.la
89
90libgnunet_plugin_block_setu_test_la_SOURCES = \
91 plugin_block_setu_test.c
92libgnunet_plugin_block_setu_test_la_LIBADD = \
93 $(top_builddir)/src/block/libgnunetblock.la \
94 $(top_builddir)/src/block/libgnunetblockgroup.la \
95 $(top_builddir)/src/util/libgnunetutil.la \
96 $(LTLIBINTL)
97libgnunet_plugin_block_setu_test_la_LDFLAGS = \
98 $(GN_PLUGIN_LDFLAGS)
99
100
101EXTRA_DIST = \
102 test_setu.conf
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
new file mode 100644
index 000000000..88edc622f
--- /dev/null
+++ b/src/setu/gnunet-service-setu.c
@@ -0,0 +1,3482 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file setu/gnunet-service-setu.c
22 * @brief set union operation
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h"
29#include "gnunet-service-setu.h"
30#include "ibf.h"
31#include "gnunet-service-setu_strata_estimator.h"
32#include "gnunet-service-setu_protocol.h"
33#include "gnunet_statistics_service.h"
34#include <gcrypt.h>
35
36
37#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
38
39/**
40 * How long do we hold on to an incoming channel if there is
41 * no local listener before giving up?
42 */
43#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
44
45/**
46 * Number of IBFs in a strata estimator.
47 */
48#define SE_STRATA_COUNT 32
49
50/**
51 * Size of the IBFs in the strata estimator.
52 */
53#define SE_IBF_SIZE 80
54
55/**
56 * The hash num parameter for the difference digests and strata estimators.
57 */
58#define SE_IBF_HASH_NUM 4
59
60/**
61 * Number of buckets that can be transmitted in one message.
62 */
63#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
64
65/**
66 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
67 * Choose this value so that computing the IBF is still cheaper
68 * than transmitting all values.
69 */
70#define MAX_IBF_ORDER (20)
71
72/**
73 * Number of buckets used in the ibf per estimated
74 * difference.
75 */
76#define IBF_ALPHA 4
77
78
79/**
80 * Current phase we are in for a union operation.
81 */
82enum UnionOperationPhase
83{
84 /**
85 * We sent the request message, and expect a strata estimator.
86 */
87 PHASE_EXPECT_SE,
88
89 /**
90 * We sent the strata estimator, and expect an IBF. This phase is entered once
91 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
92 *
93 * XXX: could use better wording.
94 * XXX: repurposed to also expect a "request full set" message, should be renamed
95 *
96 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
97 */
98 PHASE_EXPECT_IBF,
99
100 /**
101 * Continuation for multi part IBFs.
102 */
103 PHASE_EXPECT_IBF_CONT,
104
105 /**
106 * We are decoding an IBF.
107 */
108 PHASE_INVENTORY_ACTIVE,
109
110 /**
111 * The other peer is decoding the IBF we just sent.
112 */
113 PHASE_INVENTORY_PASSIVE,
114
115 /**
116 * The protocol is almost finished, but we still have to flush our message
117 * queue and/or expect some elements.
118 */
119 PHASE_FINISH_CLOSING,
120
121 /**
122 * In the penultimate phase,
123 * we wait until all our demands
124 * are satisfied. Then we send a done
125 * message, and wait for another done message.
126 */
127 PHASE_FINISH_WAITING,
128
129 /**
130 * In the ultimate phase, we wait until
131 * our demands are satisfied and then
132 * quit (sending another DONE message).
133 */
134 PHASE_DONE,
135
136 /**
137 * After sending the full set, wait for responses with the elements
138 * that the local peer is missing.
139 */
140 PHASE_FULL_SENDING,
141};
142
143
144/**
145 * State of an evaluate operation with another peer.
146 */
147struct OperationState
148{
149 /**
150 * Copy of the set's strata estimator at the time of
151 * creation of this operation.
152 */
153 struct StrataEstimator *se;
154
155 /**
156 * The IBF we currently receive.
157 */
158 struct InvertibleBloomFilter *remote_ibf;
159
160 /**
161 * The IBF with the local set's element.
162 */
163 struct InvertibleBloomFilter *local_ibf;
164
165 /**
166 * Maps unsalted IBF-Keys to elements.
167 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
168 * Colliding IBF-Keys are linked.
169 */
170 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
171
172 /**
173 * Current state of the operation.
174 */
175 enum UnionOperationPhase phase;
176
177 /**
178 * Did we send the client that we are done?
179 */
180 int client_done_sent;
181
182 /**
183 * Number of ibf buckets already received into the @a remote_ibf.
184 */
185 unsigned int ibf_buckets_received;
186
187 /**
188 * Hashes for elements that we have demanded from the other peer.
189 */
190 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
191
192 /**
193 * Salt that we're using for sending IBFs
194 */
195 uint32_t salt_send;
196
197 /**
198 * Salt for the IBF we've received and that we're currently decoding.
199 */
200 uint32_t salt_receive;
201
202 /**
203 * Number of elements we received from the other peer
204 * that were not in the local set yet.
205 */
206 uint32_t received_fresh;
207
208 /**
209 * Total number of elements received from the other peer.
210 */
211 uint32_t received_total;
212
213 /**
214 * Initial size of our set, just before
215 * the operation started.
216 */
217 uint64_t initial_size;
218};
219
220
221/**
222 * The key entry is used to associate an ibf key with an element.
223 */
224struct KeyEntry
225{
226 /**
227 * IBF key for the entry, derived from the current salt.
228 */
229 struct IBF_Key ibf_key;
230
231 /**
232 * The actual element associated with the key.
233 *
234 * Only owned by the union operation if element->operation
235 * is #GNUNET_YES.
236 */
237 struct ElementEntry *element;
238
239 /**
240 * Did we receive this element?
241 * Even if element->is_foreign is false, we might
242 * have received the element, so this indicates that
243 * the other peer has it.
244 */
245 int received;
246};
247
248
249/**
250 * Used as a closure for sending elements
251 * with a specific IBF key.
252 */
253struct SendElementClosure
254{
255 /**
256 * The IBF key whose matching elements should be
257 * sent.
258 */
259 struct IBF_Key ibf_key;
260
261 /**
262 * Operation for which the elements
263 * should be sent.
264 */
265 struct Operation *op;
266};
267
268
269/**
270 * Extra state required for efficient set union.
271 */
272struct SetState
273{
274 /**
275 * The strata estimator is only generated once for
276 * each set.
277 * The IBF keys are derived from the element hashes with
278 * salt=0.
279 */
280 struct StrataEstimator *se;
281};
282
283
284/**
285 * A listener is inhabited by a client, and waits for evaluation
286 * requests from remote peers.
287 */
288struct Listener
289{
290 /**
291 * Listeners are held in a doubly linked list.
292 */
293 struct Listener *next;
294
295 /**
296 * Listeners are held in a doubly linked list.
297 */
298 struct Listener *prev;
299
300 /**
301 * Head of DLL of operations this listener is responsible for.
302 * Once the client has accepted/declined the operation, the
303 * operation is moved to the respective set's operation DLLS.
304 */
305 struct Operation *op_head;
306
307 /**
308 * Tail of DLL of operations this listener is responsible for.
309 * Once the client has accepted/declined the operation, the
310 * operation is moved to the respective set's operation DLLS.
311 */
312 struct Operation *op_tail;
313
314 /**
315 * Client that owns the listener.
316 * Only one client may own a listener.
317 */
318 struct ClientState *cs;
319
320 /**
321 * The port we are listening on with CADET.
322 */
323 struct GNUNET_CADET_Port *open_port;
324
325 /**
326 * Application ID for the operation, used to distinguish
327 * multiple operations of the same type with the same peer.
328 */
329 struct GNUNET_HashCode app_id;
330
331};
332
333
334/**
335 * Handle to the cadet service, used to listen for and connect to
336 * remote peers.
337 */
338static struct GNUNET_CADET_Handle *cadet;
339
340/**
341 * Statistics handle.
342 */
343struct GNUNET_STATISTICS_Handle *_GSS_statistics;
344
345/**
346 * Listeners are held in a doubly linked list.
347 */
348static struct Listener *listener_head;
349
350/**
351 * Listeners are held in a doubly linked list.
352 */
353static struct Listener *listener_tail;
354
355/**
356 * Number of active clients.
357 */
358static unsigned int num_clients;
359
360/**
361 * Are we in shutdown? if #GNUNET_YES and the number of clients
362 * drops to zero, disconnect from CADET.
363 */
364static int in_shutdown;
365
366/**
367 * Counter for allocating unique IDs for clients, used to identify
368 * incoming operation requests from remote peers, that the client can
369 * choose to accept or refuse. 0 must not be used (reserved for
370 * uninitialized).
371 */
372static uint32_t suggest_id;
373
374
375/**
376 * Iterator over hash map entries, called to
377 * destroy the linked list of colliding ibf key entries.
378 *
379 * @param cls closure
380 * @param key current key code
381 * @param value value in the hash map
382 * @return #GNUNET_YES if we should continue to iterate,
383 * #GNUNET_NO if not.
384 */
385static int
386destroy_key_to_element_iter (void *cls,
387 uint32_t key,
388 void *value)
389{
390 struct KeyEntry *k = value;
391
392 GNUNET_assert (NULL != k);
393 if (GNUNET_YES == k->element->remote)
394 {
395 GNUNET_free (k->element);
396 k->element = NULL;
397 }
398 GNUNET_free (k);
399 return GNUNET_YES;
400}
401
402
403/**
404 * Destroy the union operation. Only things specific to the union
405 * operation are destroyed.
406 *
407 * @param op union operation to destroy
408 */
409static void
410union_op_cancel (struct Operation *op)
411{
412 LOG (GNUNET_ERROR_TYPE_DEBUG,
413 "destroying union op\n");
414 /* check if the op was canceled twice */
415 GNUNET_assert (NULL != op->state);
416 if (NULL != op->state->remote_ibf)
417 {
418 ibf_destroy (op->state->remote_ibf);
419 op->state->remote_ibf = NULL;
420 }
421 if (NULL != op->state->demanded_hashes)
422 {
423 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
424 op->state->demanded_hashes = NULL;
425 }
426 if (NULL != op->state->local_ibf)
427 {
428 ibf_destroy (op->state->local_ibf);
429 op->state->local_ibf = NULL;
430 }
431 if (NULL != op->state->se)
432 {
433 strata_estimator_destroy (op->state->se);
434 op->state->se = NULL;
435 }
436 if (NULL != op->state->key_to_element)
437 {
438 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
439 &destroy_key_to_element_iter,
440 NULL);
441 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
442 op->state->key_to_element = NULL;
443 }
444 GNUNET_free (op->state);
445 op->state = NULL;
446 LOG (GNUNET_ERROR_TYPE_DEBUG,
447 "destroying union op done\n");
448}
449
450
451/**
452 * Inform the client that the union operation has failed,
453 * and proceed to destroy the evaluate operation.
454 *
455 * @param op the union operation to fail
456 */
457static void
458fail_union_operation (struct Operation *op)
459{
460 struct GNUNET_MQ_Envelope *ev;
461 struct GNUNET_SETU_ResultMessage *msg;
462
463 LOG (GNUNET_ERROR_TYPE_WARNING,
464 "union operation failed\n");
465 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
466 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
467 msg->request_id = htonl (op->client_request_id);
468 msg->element_type = htons (0);
469 GNUNET_MQ_send (op->set->cs->mq,
470 ev);
471 _GSS_operation_destroy (op);
472}
473
474
475/**
476 * Derive the IBF key from a hash code and
477 * a salt.
478 *
479 * @param src the hash code
480 * @return the derived IBF key
481 */
482static struct IBF_Key
483get_ibf_key (const struct GNUNET_HashCode *src)
484{
485 struct IBF_Key key;
486 uint16_t salt = 0;
487
488 GNUNET_assert (GNUNET_OK ==
489 GNUNET_CRYPTO_kdf (&key, sizeof(key),
490 src, sizeof *src,
491 &salt, sizeof(salt),
492 NULL, 0));
493 return key;
494}
495
496
497/**
498 * Context for #op_get_element_iterator
499 */
500struct GetElementContext
501{
502 /**
503 * FIXME.
504 */
505 struct GNUNET_HashCode hash;
506
507 /**
508 * FIXME.
509 */
510 struct KeyEntry *k;
511};
512
513
514/**
515 * Iterator over the mapping from IBF keys to element entries. Checks if we
516 * have an element with a given GNUNET_HashCode.
517 *
518 * @param cls closure
519 * @param key current key code
520 * @param value value in the hash map
521 * @return #GNUNET_YES if we should search further,
522 * #GNUNET_NO if we've found the element.
523 */
524static int
525op_get_element_iterator (void *cls,
526 uint32_t key,
527 void *value)
528{
529 struct GetElementContext *ctx = cls;
530 struct KeyEntry *k = value;
531
532 GNUNET_assert (NULL != k);
533 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
534 &ctx->hash))
535 {
536 ctx->k = k;
537 return GNUNET_NO;
538 }
539 return GNUNET_YES;
540}
541
542
543/**
544 * Determine whether the given element is already in the operation's element
545 * set.
546 *
547 * @param op operation that should be tested for 'element_hash'
548 * @param element_hash hash of the element to look for
549 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
550 */
551static struct KeyEntry *
552op_get_element (struct Operation *op,
553 const struct GNUNET_HashCode *element_hash)
554{
555 int ret;
556 struct IBF_Key ibf_key;
557 struct GetElementContext ctx = { { { 0 } }, 0 };
558
559 ctx.hash = *element_hash;
560
561 ibf_key = get_ibf_key (element_hash);
562 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
563 (uint32_t) ibf_key.key_val,
564 op_get_element_iterator,
565 &ctx);
566
567 /* was the iteration aborted because we found the element? */
568 if (GNUNET_SYSERR == ret)
569 {
570 GNUNET_assert (NULL != ctx.k);
571 return ctx.k;
572 }
573 return NULL;
574}
575
576
577/**
578 * Insert an element into the union operation's
579 * key-to-element mapping. Takes ownership of 'ee'.
580 * Note that this does not insert the element in the set,
581 * only in the operation's key-element mapping.
582 * This is done to speed up re-tried operations, if some elements
583 * were transmitted, and then the IBF fails to decode.
584 *
585 * XXX: clarify ownership, doesn't sound right.
586 *
587 * @param op the union operation
588 * @param ee the element entry
589 * @parem received was this element received from the remote peer?
590 */
591static void
592op_register_element (struct Operation *op,
593 struct ElementEntry *ee,
594 int received)
595{
596 struct IBF_Key ibf_key;
597 struct KeyEntry *k;
598
599 ibf_key = get_ibf_key (&ee->element_hash);
600 k = GNUNET_new (struct KeyEntry);
601 k->element = ee;
602 k->ibf_key = ibf_key;
603 k->received = received;
604 GNUNET_assert (GNUNET_OK ==
605 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
606 (uint32_t) ibf_key.key_val,
607 k,
608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
609}
610
611
612/**
613 * FIXME.
614 */
615static void
616salt_key (const struct IBF_Key *k_in,
617 uint32_t salt,
618 struct IBF_Key *k_out)
619{
620 int s = salt % 64;
621 uint64_t x = k_in->key_val;
622
623 /* rotate ibf key */
624 x = (x >> s) | (x << (64 - s));
625 k_out->key_val = x;
626}
627
628
629/**
630 * FIXME.
631 */
632static void
633unsalt_key (const struct IBF_Key *k_in,
634 uint32_t salt,
635 struct IBF_Key *k_out)
636{
637 int s = salt % 64;
638 uint64_t x = k_in->key_val;
639
640 x = (x << s) | (x >> (64 - s));
641 k_out->key_val = x;
642}
643
644
645/**
646 * Insert a key into an ibf.
647 *
648 * @param cls the ibf
649 * @param key unused
650 * @param value the key entry to get the key from
651 */
652static int
653prepare_ibf_iterator (void *cls,
654 uint32_t key,
655 void *value)
656{
657 struct Operation *op = cls;
658 struct KeyEntry *ke = value;
659 struct IBF_Key salted_key;
660
661 LOG (GNUNET_ERROR_TYPE_DEBUG,
662 "[OP %x] inserting %lx (hash %s) into ibf\n",
663 (void *) op,
664 (unsigned long) ke->ibf_key.key_val,
665 GNUNET_h2s (&ke->element->element_hash));
666 salt_key (&ke->ibf_key,
667 op->state->salt_send,
668 &salted_key);
669 ibf_insert (op->state->local_ibf, salted_key);
670 return GNUNET_YES;
671}
672
673
674/**
675 * Iterator for initializing the
676 * key-to-element mapping of a union operation
677 *
678 * @param cls the union operation `struct Operation *`
679 * @param key unused
680 * @param value the `struct ElementEntry *` to insert
681 * into the key-to-element mapping
682 * @return #GNUNET_YES (to continue iterating)
683 */
684static int
685init_key_to_element_iterator (void *cls,
686 const struct GNUNET_HashCode *key,
687 void *value)
688{
689 struct Operation *op = cls;
690 struct ElementEntry *ee = value;
691
692 /* make sure that the element belongs to the set at the time
693 * of creating the operation */
694 if (GNUNET_NO ==
695 _GSS_is_element_of_operation (ee,
696 op))
697 return GNUNET_YES;
698 GNUNET_assert (GNUNET_NO == ee->remote);
699 op_register_element (op,
700 ee,
701 GNUNET_NO);
702 return GNUNET_YES;
703}
704
705
706/**
707 * Initialize the IBF key to element mapping local to this set
708 * operation.
709 *
710 * @param op the set union operation
711 */
712static void
713initialize_key_to_element (struct Operation *op)
714{
715 unsigned int len;
716
717 GNUNET_assert (NULL == op->state->key_to_element);
718 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
719 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
720 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
721 &init_key_to_element_iterator,
722 op);
723}
724
725
726/**
727 * Create an ibf with the operation's elements
728 * of the specified size
729 *
730 * @param op the union operation
731 * @param size size of the ibf to create
732 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
733 */
734static int
735prepare_ibf (struct Operation *op,
736 uint32_t size)
737{
738 GNUNET_assert (NULL != op->state->key_to_element);
739
740 if (NULL != op->state->local_ibf)
741 ibf_destroy (op->state->local_ibf);
742 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
743 if (NULL == op->state->local_ibf)
744 {
745 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
746 "Failed to allocate local IBF\n");
747 return GNUNET_SYSERR;
748 }
749 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
750 &prepare_ibf_iterator,
751 op);
752 return GNUNET_OK;
753}
754
755
756/**
757 * Send an ibf of appropriate size.
758 *
759 * Fragments the IBF into multiple messages if necessary.
760 *
761 * @param op the union operation
762 * @param ibf_order order of the ibf to send, size=2^order
763 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
764 */
765static int
766send_ibf (struct Operation *op,
767 uint16_t ibf_order)
768{
769 unsigned int buckets_sent = 0;
770 struct InvertibleBloomFilter *ibf;
771
772 if (GNUNET_OK !=
773 prepare_ibf (op, 1 << ibf_order))
774 {
775 /* allocation failed */
776 return GNUNET_SYSERR;
777 }
778
779 LOG (GNUNET_ERROR_TYPE_DEBUG,
780 "sending ibf of size %u\n",
781 1 << ibf_order);
782
783 {
784 char name[64] = { 0 };
785 snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
786 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
787 }
788
789 ibf = op->state->local_ibf;
790
791 while (buckets_sent < (1 << ibf_order))
792 {
793 unsigned int buckets_in_message;
794 struct GNUNET_MQ_Envelope *ev;
795 struct IBFMessage *msg;
796
797 buckets_in_message = (1 << ibf_order) - buckets_sent;
798 /* limit to maximum */
799 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
800 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
801
802 ev = GNUNET_MQ_msg_extra (msg,
803 buckets_in_message * IBF_BUCKET_SIZE,
804 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
805 msg->reserved1 = 0;
806 msg->reserved2 = 0;
807 msg->order = ibf_order;
808 msg->offset = htonl (buckets_sent);
809 msg->salt = htonl (op->state->salt_send);
810 ibf_write_slice (ibf, buckets_sent,
811 buckets_in_message, &msg[1]);
812 buckets_sent += buckets_in_message;
813 LOG (GNUNET_ERROR_TYPE_DEBUG,
814 "ibf chunk size %u, %u/%u sent\n",
815 buckets_in_message,
816 buckets_sent,
817 1 << ibf_order);
818 GNUNET_MQ_send (op->mq, ev);
819 }
820
821 /* The other peer must decode the IBF, so
822 * we're passive. */
823 op->state->phase = PHASE_INVENTORY_PASSIVE;
824 return GNUNET_OK;
825}
826
827
828/**
829 * Compute the necessary order of an ibf
830 * from the size of the symmetric set difference.
831 *
832 * @param diff the difference
833 * @return the required size of the ibf
834 */
835static unsigned int
836get_order_from_difference (unsigned int diff)
837{
838 unsigned int ibf_order;
839
840 ibf_order = 2;
841 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
842 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
843 (ibf_order < MAX_IBF_ORDER))
844 ibf_order++;
845 // add one for correction
846 return ibf_order + 1;
847}
848
849
850/**
851 * Send a set element.
852 *
853 * @param cls the union operation `struct Operation *`
854 * @param key unused
855 * @param value the `struct ElementEntry *` to insert
856 * into the key-to-element mapping
857 * @return #GNUNET_YES (to continue iterating)
858 */
859static int
860send_full_element_iterator (void *cls,
861 const struct GNUNET_HashCode *key,
862 void *value)
863{
864 struct Operation *op = cls;
865 struct GNUNET_SETU_ElementMessage *emsg;
866 struct ElementEntry *ee = value;
867 struct GNUNET_SETU_Element *el = &ee->element;
868 struct GNUNET_MQ_Envelope *ev;
869
870 LOG (GNUNET_ERROR_TYPE_DEBUG,
871 "Sending element %s\n",
872 GNUNET_h2s (key));
873 ev = GNUNET_MQ_msg_extra (emsg,
874 el->size,
875 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
876 emsg->element_type = htons (el->element_type);
877 GNUNET_memcpy (&emsg[1],
878 el->data,
879 el->size);
880 GNUNET_MQ_send (op->mq,
881 ev);
882 return GNUNET_YES;
883}
884
885
886/**
887 * Switch to full set transmission for @a op.
888 *
889 * @param op operation to switch to full set transmission.
890 */
891static void
892send_full_set (struct Operation *op)
893{
894 struct GNUNET_MQ_Envelope *ev;
895
896 op->state->phase = PHASE_FULL_SENDING;
897 LOG (GNUNET_ERROR_TYPE_DEBUG,
898 "Dedicing to transmit the full set\n");
899 /* FIXME: use a more memory-friendly way of doing this with an
900 iterator, just as we do in the non-full case! */
901 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
902 &send_full_element_iterator,
903 op);
904 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
905 GNUNET_MQ_send (op->mq,
906 ev);
907}
908
909
910/**
911 * Handle a strata estimator from a remote peer
912 *
913 * @param cls the union operation
914 * @param msg the message
915 */
916static int
917check_union_p2p_strata_estimator (void *cls,
918 const struct StrataEstimatorMessage *msg)
919{
920 struct Operation *op = cls;
921 int is_compressed;
922 size_t len;
923
924 if (op->state->phase != PHASE_EXPECT_SE)
925 {
926 GNUNET_break (0);
927 return GNUNET_SYSERR;
928 }
929 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
930 msg->header.type));
931 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
932 if ((GNUNET_NO == is_compressed) &&
933 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
934 {
935 GNUNET_break (0);
936 return GNUNET_SYSERR;
937 }
938 return GNUNET_OK;
939}
940
941
942/**
943 * Handle a strata estimator from a remote peer
944 *
945 * @param cls the union operation
946 * @param msg the message
947 */
948static void
949handle_union_p2p_strata_estimator (void *cls,
950 const struct StrataEstimatorMessage *msg)
951{
952 struct Operation *op = cls;
953 struct StrataEstimator *remote_se;
954 unsigned int diff;
955 uint64_t other_size;
956 size_t len;
957 int is_compressed;
958
959 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
960 msg->header.type));
961 GNUNET_STATISTICS_update (_GSS_statistics,
962 "# bytes of SE received",
963 ntohs (msg->header.size),
964 GNUNET_NO);
965 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
966 other_size = GNUNET_ntohll (msg->set_size);
967 remote_se = strata_estimator_create (SE_STRATA_COUNT,
968 SE_IBF_SIZE,
969 SE_IBF_HASH_NUM);
970 if (NULL == remote_se)
971 {
972 /* insufficient resources, fail */
973 fail_union_operation (op);
974 return;
975 }
976 if (GNUNET_OK !=
977 strata_estimator_read (&msg[1],
978 len,
979 is_compressed,
980 remote_se))
981 {
982 /* decompression failed */
983 strata_estimator_destroy (remote_se);
984 fail_union_operation (op);
985 return;
986 }
987 GNUNET_assert (NULL != op->state->se);
988 diff = strata_estimator_difference (remote_se,
989 op->state->se);
990
991 if (diff > 200)
992 diff = diff * 3 / 2;
993
994 strata_estimator_destroy (remote_se);
995 strata_estimator_destroy (op->state->se);
996 op->state->se = NULL;
997 LOG (GNUNET_ERROR_TYPE_DEBUG,
998 "got se diff=%d, using ibf size %d\n",
999 diff,
1000 1U << get_order_from_difference (diff));
1001
1002 {
1003 char *set_debug;
1004
1005 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
1006 if ((NULL != set_debug) &&
1007 (0 == strcmp (set_debug, "1")))
1008 {
1009 FILE *f = fopen ("set.log", "a");
1010 fprintf (f, "%llu\n", (unsigned long long) diff);
1011 fclose (f);
1012 }
1013 }
1014
1015 if ((GNUNET_YES == op->byzantine) &&
1016 (other_size < op->byzantine_lower_bound))
1017 {
1018 GNUNET_break (0);
1019 fail_union_operation (op);
1020 return;
1021 }
1022
1023 if ((GNUNET_YES == op->force_full) ||
1024 (diff > op->state->initial_size / 4) ||
1025 (0 == other_size))
1026 {
1027 LOG (GNUNET_ERROR_TYPE_DEBUG,
1028 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
1029 diff,
1030 op->state->initial_size);
1031 GNUNET_STATISTICS_update (_GSS_statistics,
1032 "# of full sends",
1033 1,
1034 GNUNET_NO);
1035 if ((op->state->initial_size <= other_size) ||
1036 (0 == other_size))
1037 {
1038 send_full_set (op);
1039 }
1040 else
1041 {
1042 struct GNUNET_MQ_Envelope *ev;
1043
1044 LOG (GNUNET_ERROR_TYPE_DEBUG,
1045 "Telling other peer that we expect its full set\n");
1046 op->state->phase = PHASE_EXPECT_IBF;
1047 ev = GNUNET_MQ_msg_header (
1048 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
1049 GNUNET_MQ_send (op->mq,
1050 ev);
1051 }
1052 }
1053 else
1054 {
1055 GNUNET_STATISTICS_update (_GSS_statistics,
1056 "# of ibf sends",
1057 1,
1058 GNUNET_NO);
1059 if (GNUNET_OK !=
1060 send_ibf (op,
1061 get_order_from_difference (diff)))
1062 {
1063 /* Internal error, best we can do is shut the connection */
1064 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1065 "Failed to send IBF, closing connection\n");
1066 fail_union_operation (op);
1067 return;
1068 }
1069 }
1070 GNUNET_CADET_receive_done (op->channel);
1071}
1072
1073
1074/**
1075 * Iterator to send elements to a remote peer
1076 *
1077 * @param cls closure with the element key and the union operation
1078 * @param key ignored
1079 * @param value the key entry
1080 */
1081static int
1082send_offers_iterator (void *cls,
1083 uint32_t key,
1084 void *value)
1085{
1086 struct SendElementClosure *sec = cls;
1087 struct Operation *op = sec->op;
1088 struct KeyEntry *ke = value;
1089 struct GNUNET_MQ_Envelope *ev;
1090 struct GNUNET_MessageHeader *mh;
1091
1092 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1093 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1094 return GNUNET_YES;
1095
1096 ev = GNUNET_MQ_msg_header_extra (mh,
1097 sizeof(struct GNUNET_HashCode),
1098 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
1099
1100 GNUNET_assert (NULL != ev);
1101 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1102 LOG (GNUNET_ERROR_TYPE_DEBUG,
1103 "[OP %x] sending element offer (%s) to peer\n",
1104 (void *) op,
1105 GNUNET_h2s (&ke->element->element_hash));
1106 GNUNET_MQ_send (op->mq, ev);
1107 return GNUNET_YES;
1108}
1109
1110
1111/**
1112 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1113 *
1114 * @param op union operation
1115 * @param ibf_key IBF key of interest
1116 */
1117static void
1118send_offers_for_key (struct Operation *op,
1119 struct IBF_Key ibf_key)
1120{
1121 struct SendElementClosure send_cls;
1122
1123 send_cls.ibf_key = ibf_key;
1124 send_cls.op = op;
1125 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1126 op->state->key_to_element,
1127 (uint32_t) ibf_key.
1128 key_val,
1129 &send_offers_iterator,
1130 &send_cls);
1131}
1132
1133
1134/**
1135 * Decode which elements are missing on each side, and
1136 * send the appropriate offers and inquiries.
1137 *
1138 * @param op union operation
1139 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1140 */
1141static int
1142decode_and_send (struct Operation *op)
1143{
1144 struct IBF_Key key;
1145 struct IBF_Key last_key;
1146 int side;
1147 unsigned int num_decoded;
1148 struct InvertibleBloomFilter *diff_ibf;
1149
1150 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1151
1152 if (GNUNET_OK !=
1153 prepare_ibf (op,
1154 op->state->remote_ibf->size))
1155 {
1156 GNUNET_break (0);
1157 /* allocation failed */
1158 return GNUNET_SYSERR;
1159 }
1160 diff_ibf = ibf_dup (op->state->local_ibf);
1161 ibf_subtract (diff_ibf,
1162 op->state->remote_ibf);
1163
1164 ibf_destroy (op->state->remote_ibf);
1165 op->state->remote_ibf = NULL;
1166
1167 LOG (GNUNET_ERROR_TYPE_DEBUG,
1168 "decoding IBF (size=%u)\n",
1169 diff_ibf->size);
1170
1171 num_decoded = 0;
1172 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1173
1174 while (1)
1175 {
1176 int res;
1177 int cycle_detected = GNUNET_NO;
1178
1179 last_key = key;
1180
1181 res = ibf_decode (diff_ibf, &side, &key);
1182 if (res == GNUNET_OK)
1183 {
1184 LOG (GNUNET_ERROR_TYPE_DEBUG,
1185 "decoded ibf key %lx\n",
1186 (unsigned long) key.key_val);
1187 num_decoded += 1;
1188 if ((num_decoded > diff_ibf->size) ||
1189 ((num_decoded > 1) &&
1190 (last_key.key_val == key.key_val)))
1191 {
1192 LOG (GNUNET_ERROR_TYPE_DEBUG,
1193 "detected cyclic ibf (decoded %u/%u)\n",
1194 num_decoded,
1195 diff_ibf->size);
1196 cycle_detected = GNUNET_YES;
1197 }
1198 }
1199 if ((GNUNET_SYSERR == res) ||
1200 (GNUNET_YES == cycle_detected))
1201 {
1202 int next_order;
1203 next_order = 0;
1204 while (1 << next_order < diff_ibf->size)
1205 next_order++;
1206 next_order++;
1207 if (next_order <= MAX_IBF_ORDER)
1208 {
1209 LOG (GNUNET_ERROR_TYPE_DEBUG,
1210 "decoding failed, sending larger ibf (size %u)\n",
1211 1 << next_order);
1212 GNUNET_STATISTICS_update (_GSS_statistics,
1213 "# of IBF retries",
1214 1,
1215 GNUNET_NO);
1216 op->state->salt_send++;
1217 if (GNUNET_OK !=
1218 send_ibf (op, next_order))
1219 {
1220 /* Internal error, best we can do is shut the connection */
1221 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1222 "Failed to send IBF, closing connection\n");
1223 fail_union_operation (op);
1224 ibf_destroy (diff_ibf);
1225 return GNUNET_SYSERR;
1226 }
1227 }
1228 else
1229 {
1230 GNUNET_STATISTICS_update (_GSS_statistics,
1231 "# of failed union operations (too large)",
1232 1,
1233 GNUNET_NO);
1234 // XXX: Send the whole set, element-by-element
1235 LOG (GNUNET_ERROR_TYPE_ERROR,
1236 "set union failed: reached ibf limit\n");
1237 fail_union_operation (op);
1238 ibf_destroy (diff_ibf);
1239 return GNUNET_SYSERR;
1240 }
1241 break;
1242 }
1243 if (GNUNET_NO == res)
1244 {
1245 struct GNUNET_MQ_Envelope *ev;
1246
1247 LOG (GNUNET_ERROR_TYPE_DEBUG,
1248 "transmitted all values, sending DONE\n");
1249 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1250 GNUNET_MQ_send (op->mq, ev);
1251 /* We now wait until we get a DONE message back
1252 * and then wait for our MQ to be flushed and all our
1253 * demands be delivered. */
1254 break;
1255 }
1256 if (1 == side)
1257 {
1258 struct IBF_Key unsalted_key;
1259
1260 unsalt_key (&key,
1261 op->state->salt_receive,
1262 &unsalted_key);
1263 send_offers_for_key (op,
1264 unsalted_key);
1265 }
1266 else if (-1 == side)
1267 {
1268 struct GNUNET_MQ_Envelope *ev;
1269 struct InquiryMessage *msg;
1270
1271 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1272 * the effort additional complexity. */
1273 ev = GNUNET_MQ_msg_extra (msg,
1274 sizeof(struct IBF_Key),
1275 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1276 msg->salt = htonl (op->state->salt_receive);
1277 GNUNET_memcpy (&msg[1],
1278 &key,
1279 sizeof(struct IBF_Key));
1280 LOG (GNUNET_ERROR_TYPE_DEBUG,
1281 "sending element inquiry for IBF key %lx\n",
1282 (unsigned long) key.key_val);
1283 GNUNET_MQ_send (op->mq, ev);
1284 }
1285 else
1286 {
1287 GNUNET_assert (0);
1288 }
1289 }
1290 ibf_destroy (diff_ibf);
1291 return GNUNET_OK;
1292}
1293
1294
1295/**
1296 * Check an IBF message from a remote peer.
1297 *
1298 * Reassemble the IBF from multiple pieces, and
1299 * process the whole IBF once possible.
1300 *
1301 * @param cls the union operation
1302 * @param msg the header of the message
1303 * @return #GNUNET_OK if @a msg is well-formed
1304 */
1305static int
1306check_union_p2p_ibf (void *cls,
1307 const struct IBFMessage *msg)
1308{
1309 struct Operation *op = cls;
1310 unsigned int buckets_in_message;
1311
1312 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1313 / IBF_BUCKET_SIZE;
1314 if (0 == buckets_in_message)
1315 {
1316 GNUNET_break_op (0);
1317 return GNUNET_SYSERR;
1318 }
1319 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1320 * IBF_BUCKET_SIZE)
1321 {
1322 GNUNET_break_op (0);
1323 return GNUNET_SYSERR;
1324 }
1325 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1326 {
1327 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1328 {
1329 GNUNET_break_op (0);
1330 return GNUNET_SYSERR;
1331 }
1332 if (1 << msg->order != op->state->remote_ibf->size)
1333 {
1334 GNUNET_break_op (0);
1335 return GNUNET_SYSERR;
1336 }
1337 if (ntohl (msg->salt) != op->state->salt_receive)
1338 {
1339 GNUNET_break_op (0);
1340 return GNUNET_SYSERR;
1341 }
1342 }
1343 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1344 (op->state->phase != PHASE_EXPECT_IBF))
1345 {
1346 GNUNET_break_op (0);
1347 return GNUNET_SYSERR;
1348 }
1349
1350 return GNUNET_OK;
1351}
1352
1353
1354/**
1355 * Handle an IBF message from a remote peer.
1356 *
1357 * Reassemble the IBF from multiple pieces, and
1358 * process the whole IBF once possible.
1359 *
1360 * @param cls the union operation
1361 * @param msg the header of the message
1362 */
1363static void
1364handle_union_p2p_ibf (void *cls,
1365 const struct IBFMessage *msg)
1366{
1367 struct Operation *op = cls;
1368 unsigned int buckets_in_message;
1369
1370 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1371 / IBF_BUCKET_SIZE;
1372 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1373 (op->state->phase == PHASE_EXPECT_IBF))
1374 {
1375 op->state->phase = PHASE_EXPECT_IBF_CONT;
1376 GNUNET_assert (NULL == op->state->remote_ibf);
1377 LOG (GNUNET_ERROR_TYPE_DEBUG,
1378 "Creating new ibf of size %u\n",
1379 1 << msg->order);
1380 op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1381 op->state->salt_receive = ntohl (msg->salt);
1382 LOG (GNUNET_ERROR_TYPE_DEBUG,
1383 "Receiving new IBF with salt %u\n",
1384 op->state->salt_receive);
1385 if (NULL == op->state->remote_ibf)
1386 {
1387 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1388 "Failed to parse remote IBF, closing connection\n");
1389 fail_union_operation (op);
1390 return;
1391 }
1392 op->state->ibf_buckets_received = 0;
1393 if (0 != ntohl (msg->offset))
1394 {
1395 GNUNET_break_op (0);
1396 fail_union_operation (op);
1397 return;
1398 }
1399 }
1400 else
1401 {
1402 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1403 LOG (GNUNET_ERROR_TYPE_DEBUG,
1404 "Received more of IBF\n");
1405 }
1406 GNUNET_assert (NULL != op->state->remote_ibf);
1407
1408 ibf_read_slice (&msg[1],
1409 op->state->ibf_buckets_received,
1410 buckets_in_message,
1411 op->state->remote_ibf);
1412 op->state->ibf_buckets_received += buckets_in_message;
1413
1414 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1415 {
1416 LOG (GNUNET_ERROR_TYPE_DEBUG,
1417 "received full ibf\n");
1418 op->state->phase = PHASE_INVENTORY_ACTIVE;
1419 if (GNUNET_OK !=
1420 decode_and_send (op))
1421 {
1422 /* Internal error, best we can do is shut down */
1423 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1424 "Failed to decode IBF, closing connection\n");
1425 fail_union_operation (op);
1426 return;
1427 }
1428 }
1429 GNUNET_CADET_receive_done (op->channel);
1430}
1431
1432
1433/**
1434 * Send a result message to the client indicating
1435 * that there is a new element.
1436 *
1437 * @param op union operation
1438 * @param element element to send
1439 * @param status status to send with the new element
1440 */
1441static void
1442send_client_element (struct Operation *op,
1443 const struct GNUNET_SETU_Element *element,
1444 int status)
1445{
1446 struct GNUNET_MQ_Envelope *ev;
1447 struct GNUNET_SETU_ResultMessage *rm;
1448
1449 LOG (GNUNET_ERROR_TYPE_DEBUG,
1450 "sending element (size %u) to client\n",
1451 element->size);
1452 GNUNET_assert (0 != op->client_request_id);
1453 ev = GNUNET_MQ_msg_extra (rm,
1454 element->size,
1455 GNUNET_MESSAGE_TYPE_SET_RESULT);
1456 if (NULL == ev)
1457 {
1458 GNUNET_MQ_discard (ev);
1459 GNUNET_break (0);
1460 return;
1461 }
1462 rm->result_status = htons (status);
1463 rm->request_id = htonl (op->client_request_id);
1464 rm->element_type = htons (element->element_type);
1465 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1466 op->state->key_to_element));
1467 GNUNET_memcpy (&rm[1],
1468 element->data,
1469 element->size);
1470 GNUNET_MQ_send (op->set->cs->mq,
1471 ev);
1472}
1473
1474
1475/**
1476 * Signal to the client that the operation has finished and
1477 * destroy the operation.
1478 *
1479 * @param cls operation to destroy
1480 */
1481static void
1482send_client_done (void *cls)
1483{
1484 struct Operation *op = cls;
1485 struct GNUNET_MQ_Envelope *ev;
1486 struct GNUNET_SETU_ResultMessage *rm;
1487
1488 if (GNUNET_YES == op->state->client_done_sent)
1489 {
1490 return;
1491 }
1492
1493 if (PHASE_DONE != op->state->phase)
1494 {
1495 LOG (GNUNET_ERROR_TYPE_WARNING,
1496 "Union operation failed\n");
1497 GNUNET_STATISTICS_update (_GSS_statistics,
1498 "# Union operations failed",
1499 1,
1500 GNUNET_NO);
1501 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1502 rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1503 rm->request_id = htonl (op->client_request_id);
1504 rm->element_type = htons (0);
1505 GNUNET_MQ_send (op->set->cs->mq,
1506 ev);
1507 return;
1508 }
1509
1510 op->state->client_done_sent = GNUNET_YES;
1511
1512 GNUNET_STATISTICS_update (_GSS_statistics,
1513 "# Union operations succeeded",
1514 1,
1515 GNUNET_NO);
1516 LOG (GNUNET_ERROR_TYPE_INFO,
1517 "Signalling client that union operation is done\n");
1518 ev = GNUNET_MQ_msg (rm,
1519 GNUNET_MESSAGE_TYPE_SET_RESULT);
1520 rm->request_id = htonl (op->client_request_id);
1521 rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
1522 rm->element_type = htons (0);
1523 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1524 op->state->key_to_element));
1525 GNUNET_MQ_send (op->set->cs->mq,
1526 ev);
1527}
1528
1529
1530/**
1531 * Tests if the operation is finished, and if so notify.
1532 *
1533 * @param op operation to check
1534 */
1535static void
1536maybe_finish (struct Operation *op)
1537{
1538 unsigned int num_demanded;
1539
1540 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1541 op->state->demanded_hashes);
1542
1543 if (PHASE_FINISH_WAITING == op->state->phase)
1544 {
1545 LOG (GNUNET_ERROR_TYPE_DEBUG,
1546 "In PHASE_FINISH_WAITING, pending %u demands\n",
1547 num_demanded);
1548 if (0 == num_demanded)
1549 {
1550 struct GNUNET_MQ_Envelope *ev;
1551
1552 op->state->phase = PHASE_DONE;
1553 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1554 GNUNET_MQ_send (op->mq,
1555 ev);
1556 /* We now wait until the other peer sends P2P_OVER
1557 * after it got all elements from us. */
1558 }
1559 }
1560 if (PHASE_FINISH_CLOSING == op->state->phase)
1561 {
1562 LOG (GNUNET_ERROR_TYPE_DEBUG,
1563 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1564 num_demanded);
1565 if (0 == num_demanded)
1566 {
1567 op->state->phase = PHASE_DONE;
1568 send_client_done (op);
1569 _GSS_operation_destroy2 (op);
1570 }
1571 }
1572}
1573
1574
1575/**
1576 * Check an element message from a remote peer.
1577 *
1578 * @param cls the union operation
1579 * @param emsg the message
1580 */
1581static int
1582check_union_p2p_elements (void *cls,
1583 const struct GNUNET_SETU_ElementMessage *emsg)
1584{
1585 struct Operation *op = cls;
1586
1587 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1588 {
1589 GNUNET_break_op (0);
1590 return GNUNET_SYSERR;
1591 }
1592 return GNUNET_OK;
1593}
1594
1595
1596/**
1597 * Handle an element message from a remote peer.
1598 * Sent by the other peer either because we decoded an IBF and placed a demand,
1599 * or because the other peer switched to full set transmission.
1600 *
1601 * @param cls the union operation
1602 * @param emsg the message
1603 */
1604static void
1605handle_union_p2p_elements (void *cls,
1606 const struct GNUNET_SETU_ElementMessage *emsg)
1607{
1608 struct Operation *op = cls;
1609 struct ElementEntry *ee;
1610 struct KeyEntry *ke;
1611 uint16_t element_size;
1612
1613 element_size = ntohs (emsg->header.size) - sizeof(struct
1614 GNUNET_SETU_ElementMessage);
1615 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1616 GNUNET_memcpy (&ee[1],
1617 &emsg[1],
1618 element_size);
1619 ee->element.size = element_size;
1620 ee->element.data = &ee[1];
1621 ee->element.element_type = ntohs (emsg->element_type);
1622 ee->remote = GNUNET_YES;
1623 GNUNET_SETU_element_hash (&ee->element,
1624 &ee->element_hash);
1625 if (GNUNET_NO ==
1626 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1627 &ee->element_hash,
1628 NULL))
1629 {
1630 /* We got something we didn't demand, since it's not in our map. */
1631 GNUNET_break_op (0);
1632 fail_union_operation (op);
1633 return;
1634 }
1635
1636 LOG (GNUNET_ERROR_TYPE_DEBUG,
1637 "Got element (size %u, hash %s) from peer\n",
1638 (unsigned int) element_size,
1639 GNUNET_h2s (&ee->element_hash));
1640
1641 GNUNET_STATISTICS_update (_GSS_statistics,
1642 "# received elements",
1643 1,
1644 GNUNET_NO);
1645 GNUNET_STATISTICS_update (_GSS_statistics,
1646 "# exchanged elements",
1647 1,
1648 GNUNET_NO);
1649
1650 op->state->received_total++;
1651
1652 ke = op_get_element (op, &ee->element_hash);
1653 if (NULL != ke)
1654 {
1655 /* Got repeated element. Should not happen since
1656 * we track demands. */
1657 GNUNET_STATISTICS_update (_GSS_statistics,
1658 "# repeated elements",
1659 1,
1660 GNUNET_NO);
1661 ke->received = GNUNET_YES;
1662 GNUNET_free (ee);
1663 }
1664 else
1665 {
1666 LOG (GNUNET_ERROR_TYPE_DEBUG,
1667 "Registering new element from remote peer\n");
1668 op->state->received_fresh++;
1669 op_register_element (op, ee, GNUNET_YES);
1670 /* only send results immediately if the client wants it */
1671 send_client_element (op,
1672 &ee->element,
1673 GNUNET_SETU_STATUS_ADD_LOCAL);
1674 }
1675
1676 if ((op->state->received_total > 8) &&
1677 (op->state->received_fresh < op->state->received_total / 3))
1678 {
1679 /* The other peer gave us lots of old elements, there's something wrong. */
1680 GNUNET_break_op (0);
1681 fail_union_operation (op);
1682 return;
1683 }
1684 GNUNET_CADET_receive_done (op->channel);
1685 maybe_finish (op);
1686}
1687
1688
1689/**
1690 * Check a full element message from a remote peer.
1691 *
1692 * @param cls the union operation
1693 * @param emsg the message
1694 */
1695static int
1696check_union_p2p_full_element (void *cls,
1697 const struct GNUNET_SETU_ElementMessage *emsg)
1698{
1699 struct Operation *op = cls;
1700
1701 (void) op;
1702 // FIXME: check that we expect full elements here?
1703 return GNUNET_OK;
1704}
1705
1706
1707/**
1708 * Handle an element message from a remote peer.
1709 *
1710 * @param cls the union operation
1711 * @param emsg the message
1712 */
1713static void
1714handle_union_p2p_full_element (void *cls,
1715 const struct GNUNET_SETU_ElementMessage *emsg)
1716{
1717 struct Operation *op = cls;
1718 struct ElementEntry *ee;
1719 struct KeyEntry *ke;
1720 uint16_t element_size;
1721
1722 element_size = ntohs (emsg->header.size)
1723 - sizeof(struct GNUNET_SETU_ElementMessage);
1724 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1725 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1726 ee->element.size = element_size;
1727 ee->element.data = &ee[1];
1728 ee->element.element_type = ntohs (emsg->element_type);
1729 ee->remote = GNUNET_YES;
1730 GNUNET_SETU_element_hash (&ee->element, &ee->element_hash);
1731
1732 LOG (GNUNET_ERROR_TYPE_DEBUG,
1733 "Got element (full diff, size %u, hash %s) from peer\n",
1734 (unsigned int) element_size,
1735 GNUNET_h2s (&ee->element_hash));
1736
1737 GNUNET_STATISTICS_update (_GSS_statistics,
1738 "# received elements",
1739 1,
1740 GNUNET_NO);
1741 GNUNET_STATISTICS_update (_GSS_statistics,
1742 "# exchanged elements",
1743 1,
1744 GNUNET_NO);
1745
1746 op->state->received_total++;
1747
1748 ke = op_get_element (op, &ee->element_hash);
1749 if (NULL != ke)
1750 {
1751 /* Got repeated element. Should not happen since
1752 * we track demands. */
1753 GNUNET_STATISTICS_update (_GSS_statistics,
1754 "# repeated elements",
1755 1,
1756 GNUNET_NO);
1757 ke->received = GNUNET_YES;
1758 GNUNET_free (ee);
1759 }
1760 else
1761 {
1762 LOG (GNUNET_ERROR_TYPE_DEBUG,
1763 "Registering new element from remote peer\n");
1764 op->state->received_fresh++;
1765 op_register_element (op, ee, GNUNET_YES);
1766 /* only send results immediately if the client wants it */
1767 send_client_element (op,
1768 &ee->element,
1769 GNUNET_SETU_STATUS_ADD_LOCAL);
1770 }
1771
1772 if ((GNUNET_YES == op->byzantine) &&
1773 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1774 (op->state->received_fresh < op->state->received_total / 6))
1775 {
1776 /* The other peer gave us lots of old elements, there's something wrong. */
1777 LOG (GNUNET_ERROR_TYPE_ERROR,
1778 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1779 (unsigned long long) op->state->received_fresh,
1780 (unsigned long long) op->state->received_total);
1781 GNUNET_break_op (0);
1782 fail_union_operation (op);
1783 return;
1784 }
1785 GNUNET_CADET_receive_done (op->channel);
1786}
1787
1788
1789/**
1790 * Send offers (for GNUNET_Hash-es) in response
1791 * to inquiries (for IBF_Key-s).
1792 *
1793 * @param cls the union operation
1794 * @param msg the message
1795 */
1796static int
1797check_union_p2p_inquiry (void *cls,
1798 const struct InquiryMessage *msg)
1799{
1800 struct Operation *op = cls;
1801 unsigned int num_keys;
1802
1803 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1804 {
1805 GNUNET_break_op (0);
1806 return GNUNET_SYSERR;
1807 }
1808 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1809 / sizeof(struct IBF_Key);
1810 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1811 != num_keys * sizeof(struct IBF_Key))
1812 {
1813 GNUNET_break_op (0);
1814 return GNUNET_SYSERR;
1815 }
1816 return GNUNET_OK;
1817}
1818
1819
1820/**
1821 * Send offers (for GNUNET_Hash-es) in response
1822 * to inquiries (for IBF_Key-s).
1823 *
1824 * @param cls the union operation
1825 * @param msg the message
1826 */
1827static void
1828handle_union_p2p_inquiry (void *cls,
1829 const struct InquiryMessage *msg)
1830{
1831 struct Operation *op = cls;
1832 const struct IBF_Key *ibf_key;
1833 unsigned int num_keys;
1834
1835 LOG (GNUNET_ERROR_TYPE_DEBUG,
1836 "Received union inquiry\n");
1837 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1838 / sizeof(struct IBF_Key);
1839 ibf_key = (const struct IBF_Key *) &msg[1];
1840 while (0 != num_keys--)
1841 {
1842 struct IBF_Key unsalted_key;
1843
1844 unsalt_key (ibf_key,
1845 ntohl (msg->salt),
1846 &unsalted_key);
1847 send_offers_for_key (op,
1848 unsalted_key);
1849 ibf_key++;
1850 }
1851 GNUNET_CADET_receive_done (op->channel);
1852}
1853
1854
1855/**
1856 * Iterator over hash map entries, called to
1857 * destroy the linked list of colliding ibf key entries.
1858 *
1859 * @param cls closure
1860 * @param key current key code
1861 * @param value value in the hash map
1862 * @return #GNUNET_YES if we should continue to iterate,
1863 * #GNUNET_NO if not.
1864 */
1865static int
1866send_missing_full_elements_iter (void *cls,
1867 uint32_t key,
1868 void *value)
1869{
1870 struct Operation *op = cls;
1871 struct KeyEntry *ke = value;
1872 struct GNUNET_MQ_Envelope *ev;
1873 struct GNUNET_SETU_ElementMessage *emsg;
1874 struct ElementEntry *ee = ke->element;
1875
1876 if (GNUNET_YES == ke->received)
1877 return GNUNET_YES;
1878 ev = GNUNET_MQ_msg_extra (emsg,
1879 ee->element.size,
1880 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1881 GNUNET_memcpy (&emsg[1],
1882 ee->element.data,
1883 ee->element.size);
1884 emsg->element_type = htons (ee->element.element_type);
1885 GNUNET_MQ_send (op->mq,
1886 ev);
1887 return GNUNET_YES;
1888}
1889
1890
1891/**
1892 * Handle a request for full set transmission.
1893 *
1894 * @parem cls closure, a set union operation
1895 * @param mh the demand message
1896 */
1897static void
1898handle_union_p2p_request_full (void *cls,
1899 const struct GNUNET_MessageHeader *mh)
1900{
1901 struct Operation *op = cls;
1902
1903 LOG (GNUNET_ERROR_TYPE_DEBUG,
1904 "Received request for full set transmission\n");
1905 if (PHASE_EXPECT_IBF != op->state->phase)
1906 {
1907 GNUNET_break_op (0);
1908 fail_union_operation (op);
1909 return;
1910 }
1911
1912 // FIXME: we need to check that our set is larger than the
1913 // byzantine_lower_bound by some threshold
1914 send_full_set (op);
1915 GNUNET_CADET_receive_done (op->channel);
1916}
1917
1918
1919/**
1920 * Handle a "full done" message.
1921 *
1922 * @parem cls closure, a set union operation
1923 * @param mh the demand message
1924 */
1925static void
1926handle_union_p2p_full_done (void *cls,
1927 const struct GNUNET_MessageHeader *mh)
1928{
1929 struct Operation *op = cls;
1930
1931 switch (op->state->phase)
1932 {
1933 case PHASE_EXPECT_IBF:
1934 {
1935 struct GNUNET_MQ_Envelope *ev;
1936
1937 LOG (GNUNET_ERROR_TYPE_DEBUG,
1938 "got FULL DONE, sending elements that other peer is missing\n");
1939
1940 /* send all the elements that did not come from the remote peer */
1941 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1942 &send_missing_full_elements_iter,
1943 op);
1944
1945 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1946 GNUNET_MQ_send (op->mq,
1947 ev);
1948 op->state->phase = PHASE_DONE;
1949 /* we now wait until the other peer sends us the OVER message*/
1950 }
1951 break;
1952
1953 case PHASE_FULL_SENDING:
1954 {
1955 LOG (GNUNET_ERROR_TYPE_DEBUG,
1956 "got FULL DONE, finishing\n");
1957 /* We sent the full set, and got the response for that. We're done. */
1958 op->state->phase = PHASE_DONE;
1959 GNUNET_CADET_receive_done (op->channel);
1960 send_client_done (op);
1961 _GSS_operation_destroy2 (op);
1962 return;
1963 }
1964 break;
1965
1966 default:
1967 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1968 "Handle full done phase is %u\n",
1969 (unsigned) op->state->phase);
1970 GNUNET_break_op (0);
1971 fail_union_operation (op);
1972 return;
1973 }
1974 GNUNET_CADET_receive_done (op->channel);
1975}
1976
1977
1978/**
1979 * Check a demand by the other peer for elements based on a list
1980 * of `struct GNUNET_HashCode`s.
1981 *
1982 * @parem cls closure, a set union operation
1983 * @param mh the demand message
1984 * @return #GNUNET_OK if @a mh is well-formed
1985 */
1986static int
1987check_union_p2p_demand (void *cls,
1988 const struct GNUNET_MessageHeader *mh)
1989{
1990 struct Operation *op = cls;
1991 unsigned int num_hashes;
1992
1993 (void) op;
1994 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1995 / sizeof(struct GNUNET_HashCode);
1996 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1997 != num_hashes * sizeof(struct GNUNET_HashCode))
1998 {
1999 GNUNET_break_op (0);
2000 return GNUNET_SYSERR;
2001 }
2002 return GNUNET_OK;
2003}
2004
2005
2006/**
2007 * Handle a demand by the other peer for elements based on a list
2008 * of `struct GNUNET_HashCode`s.
2009 *
2010 * @parem cls closure, a set union operation
2011 * @param mh the demand message
2012 */
2013static void
2014handle_union_p2p_demand (void *cls,
2015 const struct GNUNET_MessageHeader *mh)
2016{
2017 struct Operation *op = cls;
2018 struct ElementEntry *ee;
2019 struct GNUNET_SETU_ElementMessage *emsg;
2020 const struct GNUNET_HashCode *hash;
2021 unsigned int num_hashes;
2022 struct GNUNET_MQ_Envelope *ev;
2023
2024 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2025 / sizeof(struct GNUNET_HashCode);
2026 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2027 num_hashes > 0;
2028 hash++, num_hashes--)
2029 {
2030 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2031 hash);
2032 if (NULL == ee)
2033 {
2034 /* Demand for non-existing element. */
2035 GNUNET_break_op (0);
2036 fail_union_operation (op);
2037 return;
2038 }
2039 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2040 {
2041 /* Probably confused lazily copied sets. */
2042 GNUNET_break_op (0);
2043 fail_union_operation (op);
2044 return;
2045 }
2046 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
2047 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
2048 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
2049 emsg->reserved = htons (0);
2050 emsg->element_type = htons (ee->element.element_type);
2051 LOG (GNUNET_ERROR_TYPE_DEBUG,
2052 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2053 (void *) op,
2054 (unsigned int) ee->element.size,
2055 GNUNET_h2s (&ee->element_hash));
2056 GNUNET_MQ_send (op->mq, ev);
2057 GNUNET_STATISTICS_update (_GSS_statistics,
2058 "# exchanged elements",
2059 1,
2060 GNUNET_NO);
2061 }
2062 GNUNET_CADET_receive_done (op->channel);
2063}
2064
2065
2066/**
2067 * Check offer (of `struct GNUNET_HashCode`s).
2068 *
2069 * @param cls the union operation
2070 * @param mh the message
2071 * @return #GNUNET_OK if @a mh is well-formed
2072 */
2073static int
2074check_union_p2p_offer (void *cls,
2075 const struct GNUNET_MessageHeader *mh)
2076{
2077 struct Operation *op = cls;
2078 unsigned int num_hashes;
2079
2080 /* look up elements and send them */
2081 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2082 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2083 {
2084 GNUNET_break_op (0);
2085 return GNUNET_SYSERR;
2086 }
2087 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2088 / sizeof(struct GNUNET_HashCode);
2089 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2090 num_hashes * sizeof(struct GNUNET_HashCode))
2091 {
2092 GNUNET_break_op (0);
2093 return GNUNET_SYSERR;
2094 }
2095 return GNUNET_OK;
2096}
2097
2098
2099/**
2100 * Handle offers (of `struct GNUNET_HashCode`s) and
2101 * respond with demands (of `struct GNUNET_HashCode`s).
2102 *
2103 * @param cls the union operation
2104 * @param mh the message
2105 */
2106static void
2107handle_union_p2p_offer (void *cls,
2108 const struct GNUNET_MessageHeader *mh)
2109{
2110 struct Operation *op = cls;
2111 const struct GNUNET_HashCode *hash;
2112 unsigned int num_hashes;
2113
2114 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2115 / sizeof(struct GNUNET_HashCode);
2116 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2117 num_hashes > 0;
2118 hash++, num_hashes--)
2119 {
2120 struct ElementEntry *ee;
2121 struct GNUNET_MessageHeader *demands;
2122 struct GNUNET_MQ_Envelope *ev;
2123
2124 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2125 hash);
2126 if (NULL != ee)
2127 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2128 continue;
2129
2130 if (GNUNET_YES ==
2131 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2132 hash))
2133 {
2134 LOG (GNUNET_ERROR_TYPE_DEBUG,
2135 "Skipped sending duplicate demand\n");
2136 continue;
2137 }
2138
2139 GNUNET_assert (GNUNET_OK ==
2140 GNUNET_CONTAINER_multihashmap_put (
2141 op->state->demanded_hashes,
2142 hash,
2143 NULL,
2144 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2145
2146 LOG (GNUNET_ERROR_TYPE_DEBUG,
2147 "[OP %x] Requesting element (hash %s)\n",
2148 (void *) op, GNUNET_h2s (hash));
2149 ev = GNUNET_MQ_msg_header_extra (demands,
2150 sizeof(struct GNUNET_HashCode),
2151 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2152 GNUNET_memcpy (&demands[1],
2153 hash,
2154 sizeof(struct GNUNET_HashCode));
2155 GNUNET_MQ_send (op->mq, ev);
2156 }
2157 GNUNET_CADET_receive_done (op->channel);
2158}
2159
2160
2161/**
2162 * Handle a done message from a remote peer
2163 *
2164 * @param cls the union operation
2165 * @param mh the message
2166 */
2167static void
2168handle_union_p2p_done (void *cls,
2169 const struct GNUNET_MessageHeader *mh)
2170{
2171 struct Operation *op = cls;
2172
2173 switch (op->state->phase)
2174 {
2175 case PHASE_INVENTORY_PASSIVE:
2176 /* We got all requests, but still have to send our elements in response. */
2177 op->state->phase = PHASE_FINISH_WAITING;
2178
2179 LOG (GNUNET_ERROR_TYPE_DEBUG,
2180 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2181 /* The active peer is done sending offers
2182 * and inquiries. This means that all
2183 * our responses to that (demands and offers)
2184 * must be in flight (queued or in mesh).
2185 *
2186 * We should notify the active peer once
2187 * all our demands are satisfied, so that the active
2188 * peer can quit if we gave it everything.
2189 */GNUNET_CADET_receive_done (op->channel);
2190 maybe_finish (op);
2191 return;
2192
2193 case PHASE_INVENTORY_ACTIVE:
2194 LOG (GNUNET_ERROR_TYPE_DEBUG,
2195 "got DONE (as active partner), waiting to finish\n");
2196 /* All demands of the other peer are satisfied,
2197 * and we processed all offers, thus we know
2198 * exactly what our demands must be.
2199 *
2200 * We'll close the channel
2201 * to the other peer once our demands are met.
2202 */op->state->phase = PHASE_FINISH_CLOSING;
2203 GNUNET_CADET_receive_done (op->channel);
2204 maybe_finish (op);
2205 return;
2206
2207 default:
2208 GNUNET_break_op (0);
2209 fail_union_operation (op);
2210 return;
2211 }
2212}
2213
2214
2215/**
2216 * Handle a over message from a remote peer
2217 *
2218 * @param cls the union operation
2219 * @param mh the message
2220 */
2221static void
2222handle_union_p2p_over (void *cls,
2223 const struct GNUNET_MessageHeader *mh)
2224{
2225 send_client_done (cls);
2226}
2227
2228
2229/**
2230 * Initiate operation to evaluate a set union with a remote peer.
2231 *
2232 * @param op operation to perform (to be initialized)
2233 * @param opaque_context message to be transmitted to the listener
2234 * to convince it to accept, may be NULL
2235 */
2236static struct OperationState *
2237union_evaluate (struct Operation *op,
2238 const struct GNUNET_MessageHeader *opaque_context)
2239{
2240 struct OperationState *state;
2241 struct GNUNET_MQ_Envelope *ev;
2242 struct OperationRequestMessage *msg;
2243
2244 ev = GNUNET_MQ_msg_nested_mh (msg,
2245 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2246 opaque_context);
2247 if (NULL == ev)
2248 {
2249 /* the context message is too large */
2250 GNUNET_break (0);
2251 return NULL;
2252 }
2253 state = GNUNET_new (struct OperationState);
2254 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2255 GNUNET_NO);
2256 /* copy the current generation's strata estimator for this operation */
2257 state->se = strata_estimator_dup (op->set->state->se);
2258 /* we started the operation, thus we have to send the operation request */
2259 state->phase = PHASE_EXPECT_SE;
2260 state->salt_receive = state->salt_send = 42; // FIXME?????
2261 LOG (GNUNET_ERROR_TYPE_DEBUG,
2262 "Initiating union operation evaluation\n");
2263 GNUNET_STATISTICS_update (_GSS_statistics,
2264 "# of total union operations",
2265 1,
2266 GNUNET_NO);
2267 GNUNET_STATISTICS_update (_GSS_statistics,
2268 "# of initiated union operations",
2269 1,
2270 GNUNET_NO);
2271 GNUNET_MQ_send (op->mq,
2272 ev);
2273
2274 if (NULL != opaque_context)
2275 LOG (GNUNET_ERROR_TYPE_DEBUG,
2276 "sent op request with context message\n");
2277 else
2278 LOG (GNUNET_ERROR_TYPE_DEBUG,
2279 "sent op request without context message\n");
2280
2281 op->state = state;
2282 initialize_key_to_element (op);
2283 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2284 state->key_to_element);
2285 return state;
2286}
2287
2288
2289/**
2290 * Get the incoming socket associated with the given id.
2291 *
2292 * @param listener the listener to look in
2293 * @param id id to look for
2294 * @return the incoming socket associated with the id,
2295 * or NULL if there is none
2296 */
2297static struct Operation *
2298get_incoming (uint32_t id)
2299{
2300 for (struct Listener *listener = listener_head;
2301 NULL != listener;
2302 listener = listener->next)
2303 {
2304 for (struct Operation *op = listener->op_head;
2305 NULL != op;
2306 op = op->next)
2307 if (op->suggest_id == id)
2308 return op;
2309 }
2310 return NULL;
2311}
2312
2313
2314/**
2315 * Destroy an incoming request from a remote peer
2316 *
2317 * @param op remote request to destroy
2318 */
2319static void
2320incoming_destroy (struct Operation *op)
2321{
2322 struct Listener *listener;
2323
2324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325 "Destroying incoming operation %p\n",
2326 op);
2327 if (NULL != (listener = op->listener))
2328 {
2329 GNUNET_CONTAINER_DLL_remove (listener->op_head,
2330 listener->op_tail,
2331 op);
2332 op->listener = NULL;
2333 }
2334 if (NULL != op->timeout_task)
2335 {
2336 GNUNET_SCHEDULER_cancel (op->timeout_task);
2337 op->timeout_task = NULL;
2338 }
2339 _GSS_operation_destroy2 (op);
2340}
2341
2342
2343/**
2344 * Is element @a ee part of the set used by @a op?
2345 *
2346 * @param ee element to test
2347 * @param op operation the defines the set and its generation
2348 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
2349 */
2350int
2351_GSS_is_element_of_operation (struct ElementEntry *ee,
2352 struct Operation *op)
2353{
2354 return ee->generation >= op->generation_created;
2355}
2356
2357
2358/**
2359 * Destroy the given operation. Used for any operation where both
2360 * peers were known and that thus actually had a vt and channel. Must
2361 * not be used for operations where 'listener' is still set and we do
2362 * not know the other peer.
2363 *
2364 * Call the implementation-specific cancel function of the operation.
2365 * Disconnects from the remote peer. Does not disconnect the client,
2366 * as there may be multiple operations per set.
2367 *
2368 * @param op operation to destroy
2369 */
2370void
2371_GSS_operation_destroy (struct Operation *op)
2372{
2373 struct Set *set = op->set;
2374 struct GNUNET_CADET_Channel *channel;
2375
2376 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2377 "Destroying operation %p\n", op);
2378 GNUNET_assert (NULL == op->listener);
2379 if (NULL != op->state)
2380 {
2381 union_op_cancel (op);
2382 op->state = NULL;
2383 }
2384 if (NULL != set)
2385 {
2386 GNUNET_CONTAINER_DLL_remove (set->ops_head,
2387 set->ops_tail,
2388 op);
2389 op->set = NULL;
2390 }
2391 if (NULL != op->context_msg)
2392 {
2393 GNUNET_free (op->context_msg);
2394 op->context_msg = NULL;
2395 }
2396 if (NULL != (channel = op->channel))
2397 {
2398 /* This will free op; called conditionally as this helper function
2399 is also called from within the channel disconnect handler. */
2400 op->channel = NULL;
2401 GNUNET_CADET_channel_destroy (channel);
2402 }
2403 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
2404 * there was a channel end handler that will free 'op' on the call stack. */
2405}
2406
2407
2408/**
2409 * Callback called when a client connects to the service.
2410 *
2411 * @param cls closure for the service
2412 * @param c the new client that connected to the service
2413 * @param mq the message queue used to send messages to the client
2414 * @return @a `struct ClientState`
2415 */
2416static void *
2417client_connect_cb (void *cls,
2418 struct GNUNET_SERVICE_Client *c,
2419 struct GNUNET_MQ_Handle *mq)
2420{
2421 struct ClientState *cs;
2422
2423 num_clients++;
2424 cs = GNUNET_new (struct ClientState);
2425 cs->client = c;
2426 cs->mq = mq;
2427 return cs;
2428}
2429
2430
2431/**
2432 * Iterator over hash map entries to free element entries.
2433 *
2434 * @param cls closure
2435 * @param key current key code
2436 * @param value a `struct ElementEntry *` to be free'd
2437 * @return #GNUNET_YES (continue to iterate)
2438 */
2439static int
2440destroy_elements_iterator (void *cls,
2441 const struct GNUNET_HashCode *key,
2442 void *value)
2443{
2444 struct ElementEntry *ee = value;
2445
2446 GNUNET_free (ee);
2447 return GNUNET_YES;
2448}
2449
2450
2451/**
2452 * Clean up after a client has disconnected
2453 *
2454 * @param cls closure, unused
2455 * @param client the client to clean up after
2456 * @param internal_cls the `struct ClientState`
2457 */
2458static void
2459client_disconnect_cb (void *cls,
2460 struct GNUNET_SERVICE_Client *client,
2461 void *internal_cls)
2462{
2463 struct ClientState *cs = internal_cls;
2464 struct Operation *op;
2465 struct Listener *listener;
2466 struct Set *set;
2467
2468 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2469 "Client disconnected, cleaning up\n");
2470 if (NULL != (set = cs->set))
2471 {
2472 struct SetContent *content = set->content;
2473
2474 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2475 "Destroying client's set\n");
2476 /* Destroy pending set operations */
2477 while (NULL != set->ops_head)
2478 _GSS_operation_destroy (set->ops_head);
2479
2480 /* Destroy operation-specific state */
2481 GNUNET_assert (NULL != set->state);
2482 if (NULL != set->state->se)
2483 {
2484 strata_estimator_destroy (set->state->se);
2485 set->state->se = NULL;
2486 }
2487 GNUNET_free (set->state);
2488
2489 /* free set content (or at least decrement RC) */
2490 set->content = NULL;
2491 GNUNET_assert (0 != content->refcount);
2492 content->refcount--;
2493 if (0 == content->refcount)
2494 {
2495 GNUNET_assert (NULL != content->elements);
2496 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
2497 &destroy_elements_iterator,
2498 NULL);
2499 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
2500 content->elements = NULL;
2501 GNUNET_free (content);
2502 }
2503 GNUNET_free (set);
2504 }
2505
2506 if (NULL != (listener = cs->listener))
2507 {
2508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2509 "Destroying client's listener\n");
2510 GNUNET_CADET_close_port (listener->open_port);
2511 listener->open_port = NULL;
2512 while (NULL != (op = listener->op_head))
2513 {
2514 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2515 "Destroying incoming operation `%u' from peer `%s'\n",
2516 (unsigned int) op->client_request_id,
2517 GNUNET_i2s (&op->peer));
2518 incoming_destroy (op);
2519 }
2520 GNUNET_CONTAINER_DLL_remove (listener_head,
2521 listener_tail,
2522 listener);
2523 GNUNET_free (listener);
2524 }
2525 GNUNET_free (cs);
2526 num_clients--;
2527 if ( (GNUNET_YES == in_shutdown) &&
2528 (0 == num_clients) )
2529 {
2530 if (NULL != cadet)
2531 {
2532 GNUNET_CADET_disconnect (cadet);
2533 cadet = NULL;
2534 }
2535 }
2536}
2537
2538
2539/**
2540 * Check a request for a set operation from another peer.
2541 *
2542 * @param cls the operation state
2543 * @param msg the received message
2544 * @return #GNUNET_OK if the channel should be kept alive,
2545 * #GNUNET_SYSERR to destroy the channel
2546 */
2547static int
2548check_incoming_msg (void *cls,
2549 const struct OperationRequestMessage *msg)
2550{
2551 struct Operation *op = cls;
2552 struct Listener *listener = op->listener;
2553 const struct GNUNET_MessageHeader *nested_context;
2554
2555 /* double operation request */
2556 if (0 != op->suggest_id)
2557 {
2558 GNUNET_break_op (0);
2559 return GNUNET_SYSERR;
2560 }
2561 /* This should be equivalent to the previous condition, but can't hurt to check twice */
2562 if (NULL == listener)
2563 {
2564 GNUNET_break (0);
2565 return GNUNET_SYSERR;
2566 }
2567 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2568 if ((NULL != nested_context) &&
2569 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
2570 {
2571 GNUNET_break_op (0);
2572 return GNUNET_SYSERR;
2573 }
2574 return GNUNET_OK;
2575}
2576
2577
2578/**
2579 * Handle a request for a set operation from another peer. Checks if we
2580 * have a listener waiting for such a request (and in that case initiates
2581 * asking the listener about accepting the connection). If no listener
2582 * is waiting, we queue the operation request in hope that a listener
2583 * shows up soon (before timeout).
2584 *
2585 * This msg is expected as the first and only msg handled through the
2586 * non-operation bound virtual table, acceptance of this operation replaces
2587 * our virtual table and subsequent msgs would be routed differently (as
2588 * we then know what type of operation this is).
2589 *
2590 * @param cls the operation state
2591 * @param msg the received message
2592 * @return #GNUNET_OK if the channel should be kept alive,
2593 * #GNUNET_SYSERR to destroy the channel
2594 */
2595static void
2596handle_incoming_msg (void *cls,
2597 const struct OperationRequestMessage *msg)
2598{
2599 struct Operation *op = cls;
2600 struct Listener *listener = op->listener;
2601 const struct GNUNET_MessageHeader *nested_context;
2602 struct GNUNET_MQ_Envelope *env;
2603 struct GNUNET_SETU_RequestMessage *cmsg;
2604
2605 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2606 /* Make a copy of the nested_context (application-specific context
2607 information that is opaque to set) so we can pass it to the
2608 listener later on */
2609 if (NULL != nested_context)
2610 op->context_msg = GNUNET_copy_message (nested_context);
2611 op->remote_element_count = ntohl (msg->element_count);
2612 GNUNET_log (
2613 GNUNET_ERROR_TYPE_DEBUG,
2614 "Received P2P operation request (port %s) for active listener\n",
2615 GNUNET_h2s (&op->listener->app_id));
2616 GNUNET_assert (0 == op->suggest_id);
2617 if (0 == suggest_id)
2618 suggest_id++;
2619 op->suggest_id = suggest_id++;
2620 GNUNET_assert (NULL != op->timeout_task);
2621 GNUNET_SCHEDULER_cancel (op->timeout_task);
2622 op->timeout_task = NULL;
2623 env = GNUNET_MQ_msg_nested_mh (cmsg,
2624 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
2625 op->context_msg);
2626 GNUNET_log (
2627 GNUNET_ERROR_TYPE_DEBUG,
2628 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
2629 op->suggest_id,
2630 listener,
2631 listener->cs);
2632 cmsg->accept_id = htonl (op->suggest_id);
2633 cmsg->peer_id = op->peer;
2634 GNUNET_MQ_send (listener->cs->mq,
2635 env);
2636 /* NOTE: GNUNET_CADET_receive_done() will be called in
2637 #handle_client_accept() */
2638}
2639
2640
2641/**
2642 * Called when a client wants to create a new set. This is typically
2643 * the first request from a client, and includes the type of set
2644 * operation to be performed.
2645 *
2646 * @param cls client that sent the message
2647 * @param m message sent by the client
2648 */
2649static void
2650handle_client_create_set (void *cls,
2651 const struct GNUNET_SETU_CreateMessage *msg)
2652{
2653 struct ClientState *cs = cls;
2654 struct Set *set;
2655
2656 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2657 "Client created new set for union operation\n");
2658 if (NULL != cs->set)
2659 {
2660 /* There can only be one set per client */
2661 GNUNET_break (0);
2662 GNUNET_SERVICE_client_drop (cs->client);
2663 return;
2664 }
2665 set = GNUNET_new (struct Set);
2666 {
2667 struct SetState *set_state;
2668
2669 set_state = GNUNET_new (struct SetState); // FIXME: avoid this malloc, merge structs!
2670 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2671 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2672 if (NULL == set_state->se)
2673 {
2674 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2675 "Failed to allocate strata estimator\n");
2676 GNUNET_free (set);
2677 GNUNET_SERVICE_client_drop (cs->client);
2678 return;
2679 }
2680 set->state = set_state;
2681 }
2682 set->content = GNUNET_new (struct SetContent);
2683 set->content->refcount = 1;
2684 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
2685 GNUNET_YES);
2686 set->cs = cs;
2687 cs->set = set;
2688 GNUNET_SERVICE_client_continue (cs->client);
2689}
2690
2691
2692/**
2693 * Timeout happens iff:
2694 * - we suggested an operation to our listener,
2695 * but did not receive a response in time
2696 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
2697 *
2698 * @param cls channel context
2699 * @param tc context information (why was this task triggered now)
2700 */
2701static void
2702incoming_timeout_cb (void *cls)
2703{
2704 struct Operation *op = cls;
2705
2706 op->timeout_task = NULL;
2707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2708 "Remote peer's incoming request timed out\n");
2709 incoming_destroy (op);
2710}
2711
2712
2713/**
2714 * Method called whenever another peer has added us to a channel the
2715 * other peer initiated. Only called (once) upon reception of data
2716 * from a channel we listen on.
2717 *
2718 * The channel context represents the operation itself and gets added
2719 * to a DLL, from where it gets looked up when our local listener
2720 * client responds to a proposed/suggested operation or connects and
2721 * associates with this operation.
2722 *
2723 * @param cls closure
2724 * @param channel new handle to the channel
2725 * @param source peer that started the channel
2726 * @return initial channel context for the channel
2727 * returns NULL on error
2728 */
2729static void *
2730channel_new_cb (void *cls,
2731 struct GNUNET_CADET_Channel *channel,
2732 const struct GNUNET_PeerIdentity *source)
2733{
2734 struct Listener *listener = cls;
2735 struct Operation *op;
2736
2737 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2738 "New incoming channel\n");
2739 op = GNUNET_new (struct Operation);
2740 op->listener = listener;
2741 op->peer = *source;
2742 op->channel = channel;
2743 op->mq = GNUNET_CADET_get_mq (op->channel);
2744 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2745 UINT32_MAX);
2746 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
2747 &incoming_timeout_cb,
2748 op);
2749 GNUNET_CONTAINER_DLL_insert (listener->op_head,
2750 listener->op_tail,
2751 op);
2752 return op;
2753}
2754
2755
2756/**
2757 * Function called whenever a channel is destroyed. Should clean up
2758 * any associated state. It must NOT call
2759 * GNUNET_CADET_channel_destroy() on the channel.
2760 *
2761 * The peer_disconnect function is part of a a virtual table set initially either
2762 * when a peer creates a new channel with us, or once we create
2763 * a new channel ourselves (evaluate).
2764 *
2765 * Once we know the exact type of operation (union/intersection), the vt is
2766 * replaced with an operation specific instance (_GSS_[op]_vt).
2767 *
2768 * @param channel_ctx place where local state associated
2769 * with the channel is stored
2770 * @param channel connection to the other end (henceforth invalid)
2771 */
2772static void
2773channel_end_cb (void *channel_ctx,
2774 const struct GNUNET_CADET_Channel *channel)
2775{
2776 struct Operation *op = channel_ctx;
2777
2778 op->channel = NULL;
2779 _GSS_operation_destroy2 (op);
2780}
2781
2782
2783/**
2784 * This function probably should not exist
2785 * and be replaced by inlining more specific
2786 * logic in the various places where it is called.
2787 */
2788void
2789_GSS_operation_destroy2 (struct Operation *op)
2790{
2791 struct GNUNET_CADET_Channel *channel;
2792
2793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2794 "channel_end_cb called\n");
2795 if (NULL != (channel = op->channel))
2796 {
2797 /* This will free op; called conditionally as this helper function
2798 is also called from within the channel disconnect handler. */
2799 op->channel = NULL;
2800 GNUNET_CADET_channel_destroy (channel);
2801 }
2802 if (NULL != op->listener)
2803 {
2804 incoming_destroy (op);
2805 return;
2806 }
2807 if (NULL != op->set)
2808 send_client_done (op);
2809 _GSS_operation_destroy (op);
2810 GNUNET_free (op);
2811}
2812
2813
2814/**
2815 * Function called whenever an MQ-channel's transmission window size changes.
2816 *
2817 * The first callback in an outgoing channel will be with a non-zero value
2818 * and will mean the channel is connected to the destination.
2819 *
2820 * For an incoming channel it will be called immediately after the
2821 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
2822 *
2823 * @param cls Channel closure.
2824 * @param channel Connection to the other end (henceforth invalid).
2825 * @param window_size New window size. If the is more messages than buffer size
2826 * this value will be negative..
2827 */
2828static void
2829channel_window_cb (void *cls,
2830 const struct GNUNET_CADET_Channel *channel,
2831 int window_size)
2832{
2833 /* FIXME: not implemented, we could do flow control here... */
2834}
2835
2836
2837/**
2838 * Called when a client wants to create a new listener.
2839 *
2840 * @param cls client that sent the message
2841 * @param msg message sent by the client
2842 */
2843static void
2844handle_client_listen (void *cls,
2845 const struct GNUNET_SETU_ListenMessage *msg)
2846{
2847 struct ClientState *cs = cls;
2848 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2849 GNUNET_MQ_hd_var_size (incoming_msg,
2850 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
2851 struct OperationRequestMessage,
2852 NULL),
2853 GNUNET_MQ_hd_var_size (union_p2p_ibf,
2854 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
2855 struct IBFMessage,
2856 NULL),
2857 GNUNET_MQ_hd_var_size (union_p2p_elements,
2858 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
2859 struct GNUNET_SETU_ElementMessage,
2860 NULL),
2861 GNUNET_MQ_hd_var_size (union_p2p_offer,
2862 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
2863 struct GNUNET_MessageHeader,
2864 NULL),
2865 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
2866 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
2867 struct InquiryMessage,
2868 NULL),
2869 GNUNET_MQ_hd_var_size (union_p2p_demand,
2870 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
2871 struct GNUNET_MessageHeader,
2872 NULL),
2873 GNUNET_MQ_hd_fixed_size (union_p2p_done,
2874 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
2875 struct GNUNET_MessageHeader,
2876 NULL),
2877 GNUNET_MQ_hd_fixed_size (union_p2p_over,
2878 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
2879 struct GNUNET_MessageHeader,
2880 NULL),
2881 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
2882 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
2883 struct GNUNET_MessageHeader,
2884 NULL),
2885 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
2886 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
2887 struct GNUNET_MessageHeader,
2888 NULL),
2889 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
2890 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
2891 struct StrataEstimatorMessage,
2892 NULL),
2893 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
2894 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
2895 struct StrataEstimatorMessage,
2896 NULL),
2897 GNUNET_MQ_hd_var_size (union_p2p_full_element,
2898 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
2899 struct GNUNET_SETU_ElementMessage,
2900 NULL),
2901 GNUNET_MQ_handler_end ()
2902 };
2903 struct Listener *listener;
2904
2905 if (NULL != cs->listener)
2906 {
2907 /* max. one active listener per client! */
2908 GNUNET_break (0);
2909 GNUNET_SERVICE_client_drop (cs->client);
2910 return;
2911 }
2912 listener = GNUNET_new (struct Listener);
2913 listener->cs = cs;
2914 cs->listener = listener;
2915 listener->app_id = msg->app_id;
2916 GNUNET_CONTAINER_DLL_insert (listener_head,
2917 listener_tail,
2918 listener);
2919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2920 "New listener created (port %s)\n",
2921 GNUNET_h2s (&listener->app_id));
2922 listener->open_port = GNUNET_CADET_open_port (cadet,
2923 &msg->app_id,
2924 &channel_new_cb,
2925 listener,
2926 &channel_window_cb,
2927 &channel_end_cb,
2928 cadet_handlers);
2929 GNUNET_SERVICE_client_continue (cs->client);
2930}
2931
2932
2933/**
2934 * Called when the listening client rejects an operation
2935 * request by another peer.
2936 *
2937 * @param cls client that sent the message
2938 * @param msg message sent by the client
2939 */
2940static void
2941handle_client_reject (void *cls,
2942 const struct GNUNET_SETU_RejectMessage *msg)
2943{
2944 struct ClientState *cs = cls;
2945 struct Operation *op;
2946
2947 op = get_incoming (ntohl (msg->accept_reject_id));
2948 if (NULL == op)
2949 {
2950 /* no matching incoming operation for this reject;
2951 could be that the other peer already disconnected... */
2952 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2953 "Client rejected unknown operation %u\n",
2954 (unsigned int) ntohl (msg->accept_reject_id));
2955 GNUNET_SERVICE_client_continue (cs->client);
2956 return;
2957 }
2958 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2959 "Peer request (app %s) rejected by client\n",
2960 GNUNET_h2s (&cs->listener->app_id));
2961 _GSS_operation_destroy2 (op);
2962 GNUNET_SERVICE_client_continue (cs->client);
2963}
2964
2965
2966/**
2967 * Called when a client wants to add or remove an element to a set it inhabits.
2968 *
2969 * @param cls client that sent the message
2970 * @param msg message sent by the client
2971 */
2972static int
2973check_client_set_add (void *cls,
2974 const struct GNUNET_SETU_ElementMessage *msg)
2975{
2976 /* NOTE: Technically, we should probably check with the
2977 block library whether the element we are given is well-formed */
2978 return GNUNET_OK;
2979}
2980
2981
2982/**
2983 * Called when a client wants to add or remove an element to a set it inhabits.
2984 *
2985 * @param cls client that sent the message
2986 * @param msg message sent by the client
2987 */
2988static void
2989handle_client_set_add (void *cls,
2990 const struct GNUNET_SETU_ElementMessage *msg)
2991{
2992 struct ClientState *cs = cls;
2993 struct Set *set;
2994 struct GNUNET_SETU_Element el;
2995 struct ElementEntry *ee;
2996 struct GNUNET_HashCode hash;
2997
2998 if (NULL == (set = cs->set))
2999 {
3000 /* client without a set requested an operation */
3001 GNUNET_break (0);
3002 GNUNET_SERVICE_client_drop (cs->client);
3003 return;
3004 }
3005 GNUNET_SERVICE_client_continue (cs->client);
3006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
3007 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
3008 el.size = ntohs (msg->header.size) - sizeof(*msg);
3009 el.data = &msg[1];
3010 el.element_type = ntohs (msg->element_type);
3011 GNUNET_SETU_element_hash (&el,
3012 &hash);
3013 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
3014 &hash);
3015 if (NULL == ee)
3016 {
3017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3018 "Client inserts element %s of size %u\n",
3019 GNUNET_h2s (&hash),
3020 el.size);
3021 ee = GNUNET_malloc (el.size + sizeof(*ee));
3022 ee->element.size = el.size;
3023 GNUNET_memcpy (&ee[1], el.data, el.size);
3024 ee->element.data = &ee[1];
3025 ee->element.element_type = el.element_type;
3026 ee->remote = GNUNET_NO;
3027 ee->generation = set->current_generation;
3028 ee->element_hash = hash;
3029 GNUNET_break (GNUNET_YES ==
3030 GNUNET_CONTAINER_multihashmap_put (
3031 set->content->elements,
3032 &ee->element_hash,
3033 ee,
3034 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3035 }
3036 else
3037 {
3038 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3039 "Client inserted element %s of size %u twice (ignored)\n",
3040 GNUNET_h2s (&hash),
3041 el.size);
3042 /* same element inserted twice */
3043 return;
3044 }
3045 strata_estimator_insert (set->state->se,
3046 get_ibf_key (&ee->element_hash));
3047}
3048
3049
3050/**
3051 * Advance the current generation of a set,
3052 * adding exclusion ranges if necessary.
3053 *
3054 * @param set the set where we want to advance the generation
3055 */
3056static void
3057advance_generation (struct Set *set)
3058{
3059 set->content->latest_generation++;
3060 set->current_generation++;
3061}
3062
3063
3064/**
3065 * Called when a client wants to initiate a set operation with another
3066 * peer. Initiates the CADET connection to the listener and sends the
3067 * request.
3068 *
3069 * @param cls client that sent the message
3070 * @param msg message sent by the client
3071 * @return #GNUNET_OK if the message is well-formed
3072 */
3073static int
3074check_client_evaluate (void *cls,
3075 const struct GNUNET_SETU_EvaluateMessage *msg)
3076{
3077 /* FIXME: suboptimal, even if the context below could be NULL,
3078 there are malformed messages this does not check for... */
3079 return GNUNET_OK;
3080}
3081
3082
3083/**
3084 * Called when a client wants to initiate a set operation with another
3085 * peer. Initiates the CADET connection to the listener and sends the
3086 * request.
3087 *
3088 * @param cls client that sent the message
3089 * @param msg message sent by the client
3090 */
3091static void
3092handle_client_evaluate (void *cls,
3093 const struct GNUNET_SETU_EvaluateMessage *msg)
3094{
3095 struct ClientState *cs = cls;
3096 struct Operation *op = GNUNET_new (struct Operation);
3097 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3098 GNUNET_MQ_hd_var_size (incoming_msg,
3099 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
3100 struct OperationRequestMessage,
3101 op),
3102 GNUNET_MQ_hd_var_size (union_p2p_ibf,
3103 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
3104 struct IBFMessage,
3105 op),
3106 GNUNET_MQ_hd_var_size (union_p2p_elements,
3107 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
3108 struct GNUNET_SETU_ElementMessage,
3109 op),
3110 GNUNET_MQ_hd_var_size (union_p2p_offer,
3111 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
3112 struct GNUNET_MessageHeader,
3113 op),
3114 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3115 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
3116 struct InquiryMessage,
3117 op),
3118 GNUNET_MQ_hd_var_size (union_p2p_demand,
3119 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
3120 struct GNUNET_MessageHeader,
3121 op),
3122 GNUNET_MQ_hd_fixed_size (union_p2p_done,
3123 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
3124 struct GNUNET_MessageHeader,
3125 op),
3126 GNUNET_MQ_hd_fixed_size (union_p2p_over,
3127 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
3128 struct GNUNET_MessageHeader,
3129 op),
3130 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3131 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
3132 struct GNUNET_MessageHeader,
3133 op),
3134 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3135 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
3136 struct GNUNET_MessageHeader,
3137 op),
3138 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3139 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
3140 struct StrataEstimatorMessage,
3141 op),
3142 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3143 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
3144 struct StrataEstimatorMessage,
3145 op),
3146 GNUNET_MQ_hd_var_size (union_p2p_full_element,
3147 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
3148 struct GNUNET_SETU_ElementMessage,
3149 op),
3150 GNUNET_MQ_handler_end ()
3151 };
3152 struct Set *set;
3153 const struct GNUNET_MessageHeader *context;
3154
3155 if (NULL == (set = cs->set))
3156 {
3157 GNUNET_break (0);
3158 GNUNET_free (op);
3159 GNUNET_SERVICE_client_drop (cs->client);
3160 return;
3161 }
3162 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
3163 UINT32_MAX);
3164 op->peer = msg->target_peer;
3165 op->client_request_id = ntohl (msg->request_id);
3166 op->byzantine = msg->byzantine;
3167 op->byzantine_lower_bound = msg->byzantine_lower_bound;
3168 op->force_full = msg->force_full;
3169 op->force_delta = msg->force_delta;
3170 context = GNUNET_MQ_extract_nested_mh (msg);
3171
3172 /* Advance generation values, so that
3173 mutations won't interfer with the running operation. */
3174 op->set = set;
3175 op->generation_created = set->current_generation;
3176 advance_generation (set);
3177 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3178 set->ops_tail,
3179 op);
3180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3181 "Creating new CADET channel to port %s for set union\n",
3182 GNUNET_h2s (&msg->app_id));
3183 op->channel = GNUNET_CADET_channel_create (cadet,
3184 op,
3185 &msg->target_peer,
3186 &msg->app_id,
3187 &channel_window_cb,
3188 &channel_end_cb,
3189 cadet_handlers);
3190 op->mq = GNUNET_CADET_get_mq (op->channel);
3191 op->state = union_evaluate (op, context);
3192 if (NULL == op->state)
3193 {
3194 GNUNET_break (0);
3195 GNUNET_SERVICE_client_drop (cs->client);
3196 return;
3197 }
3198 GNUNET_SERVICE_client_continue (cs->client);
3199}
3200
3201
3202/**
3203 * Handle a request from the client to cancel a running set operation.
3204 *
3205 * @param cls the client
3206 * @param msg the message
3207 */
3208static void
3209handle_client_cancel (void *cls,
3210 const struct GNUNET_SETU_CancelMessage *msg)
3211{
3212 struct ClientState *cs = cls;
3213 struct Set *set;
3214 struct Operation *op;
3215 int found;
3216
3217 if (NULL == (set = cs->set))
3218 {
3219 /* client without a set requested an operation */
3220 GNUNET_break (0);
3221 GNUNET_SERVICE_client_drop (cs->client);
3222 return;
3223 }
3224 found = GNUNET_NO;
3225 for (op = set->ops_head; NULL != op; op = op->next)
3226 {
3227 if (op->client_request_id == ntohl (msg->request_id))
3228 {
3229 found = GNUNET_YES;
3230 break;
3231 }
3232 }
3233 if (GNUNET_NO == found)
3234 {
3235 /* It may happen that the operation was already destroyed due to
3236 * the other peer disconnecting. The client may not know about this
3237 * yet and try to cancel the (just barely non-existent) operation.
3238 * So this is not a hard error.
3239 *///
3240 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3241 "Client canceled non-existent op %u\n",
3242 (uint32_t) ntohl (msg->request_id));
3243 }
3244 else
3245 {
3246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3247 "Client requested cancel for op %u\n",
3248 (uint32_t) ntohl (msg->request_id));
3249 _GSS_operation_destroy (op);
3250 }
3251 GNUNET_SERVICE_client_continue (cs->client);
3252}
3253
3254
3255/**
3256 * Handle a request from the client to accept a set operation that
3257 * came from a remote peer. We forward the accept to the associated
3258 * operation for handling
3259 *
3260 * @param cls the client
3261 * @param msg the message
3262 */
3263static void
3264handle_client_accept (void *cls,
3265 const struct GNUNET_SETU_AcceptMessage *msg)
3266{
3267 struct ClientState *cs = cls;
3268 struct Set *set;
3269 struct Operation *op;
3270 struct GNUNET_SETU_ResultMessage *result_message;
3271 struct GNUNET_MQ_Envelope *ev;
3272 struct Listener *listener;
3273
3274 if (NULL == (set = cs->set))
3275 {
3276 /* client without a set requested to accept */
3277 GNUNET_break (0);
3278 GNUNET_SERVICE_client_drop (cs->client);
3279 return;
3280 }
3281 op = get_incoming (ntohl (msg->accept_reject_id));
3282 if (NULL == op)
3283 {
3284 /* It is not an error if the set op does not exist -- it may
3285 * have been destroyed when the partner peer disconnected. */
3286 GNUNET_log (
3287 GNUNET_ERROR_TYPE_INFO,
3288 "Client %p accepted request %u of listener %p that is no longer active\n",
3289 cs,
3290 ntohl (msg->accept_reject_id),
3291 cs->listener);
3292 ev = GNUNET_MQ_msg (result_message,
3293 GNUNET_MESSAGE_TYPE_SET_RESULT);
3294 result_message->request_id = msg->request_id;
3295 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
3296 GNUNET_MQ_send (set->cs->mq, ev);
3297 GNUNET_SERVICE_client_continue (cs->client);
3298 return;
3299 }
3300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3301 "Client accepting request %u\n",
3302 (uint32_t) ntohl (msg->accept_reject_id));
3303 listener = op->listener;
3304 op->listener = NULL;
3305 GNUNET_CONTAINER_DLL_remove (listener->op_head,
3306 listener->op_tail,
3307 op);
3308 op->set = set;
3309 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3310 set->ops_tail,
3311 op);
3312 op->client_request_id = ntohl (msg->request_id);
3313 op->byzantine = msg->byzantine;
3314 op->byzantine_lower_bound = msg->byzantine_lower_bound;
3315 op->force_full = msg->force_full;
3316 op->force_delta = msg->force_delta;
3317
3318 /* Advance generation values, so that future mutations do not
3319 interfer with the running operation. */
3320 op->generation_created = set->current_generation;
3321 advance_generation (set);
3322 GNUNET_assert (NULL == op->state);
3323
3324 LOG (GNUNET_ERROR_TYPE_DEBUG,
3325 "accepting set union operation\n");
3326 GNUNET_STATISTICS_update (_GSS_statistics,
3327 "# of accepted union operations",
3328 1,
3329 GNUNET_NO);
3330 GNUNET_STATISTICS_update (_GSS_statistics,
3331 "# of total union operations",
3332 1,
3333 GNUNET_NO);
3334 {
3335 struct OperationState *state;
3336 const struct StrataEstimator *se;
3337 struct GNUNET_MQ_Envelope *ev;
3338 struct StrataEstimatorMessage *strata_msg;
3339 char *buf;
3340 size_t len;
3341 uint16_t type;
3342
3343 state = GNUNET_new (struct OperationState); // FIXME: merge with 'op' to avoid malloc!
3344 state->se = strata_estimator_dup (op->set->state->se);
3345 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3346 GNUNET_NO);
3347 state->salt_receive = state->salt_send = 42; // FIXME?????
3348 op->state = state;
3349 initialize_key_to_element (op);
3350 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3351 state->key_to_element);
3352
3353 /* kick off the operation */
3354 se = state->se;
3355 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3356 len = strata_estimator_write (se,
3357 buf);
3358 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
3359 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
3360 else
3361 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
3362 ev = GNUNET_MQ_msg_extra (strata_msg,
3363 len,
3364 type);
3365 GNUNET_memcpy (&strata_msg[1],
3366 buf,
3367 len);
3368 GNUNET_free (buf);
3369 strata_msg->set_size
3370 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3371 op->set->content->elements));
3372 GNUNET_MQ_send (op->mq,
3373 ev);
3374 state->phase = PHASE_EXPECT_IBF;
3375
3376 op->state = state;
3377 }
3378 /* Now allow CADET to continue, as we did not do this in
3379 #handle_incoming_msg (as we wanted to first see if the
3380 local client would accept the request). */
3381 GNUNET_CADET_receive_done (op->channel);
3382 GNUNET_SERVICE_client_continue (cs->client);
3383}
3384
3385
3386/**
3387 * Called to clean up, after a shutdown has been requested.
3388 *
3389 * @param cls closure, NULL
3390 */
3391static void
3392shutdown_task (void *cls)
3393{
3394 /* Delay actual shutdown to allow service to disconnect clients */
3395 in_shutdown = GNUNET_YES;
3396 if (0 == num_clients)
3397 {
3398 if (NULL != cadet)
3399 {
3400 GNUNET_CADET_disconnect (cadet);
3401 cadet = NULL;
3402 }
3403 }
3404 GNUNET_STATISTICS_destroy (_GSS_statistics,
3405 GNUNET_YES);
3406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3407 "handled shutdown request\n");
3408}
3409
3410
3411/**
3412 * Function called by the service's run
3413 * method to run service-specific setup code.
3414 *
3415 * @param cls closure
3416 * @param cfg configuration to use
3417 * @param service the initialized service
3418 */
3419static void
3420run (void *cls,
3421 const struct GNUNET_CONFIGURATION_Handle *cfg,
3422 struct GNUNET_SERVICE_Handle *service)
3423{
3424 /* FIXME: need to modify SERVICE (!) API to allow
3425 us to run a shutdown task *after* clients were
3426 forcefully disconnected! */
3427 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3428 NULL);
3429 _GSS_statistics = GNUNET_STATISTICS_create ("setu", cfg);
3430 cadet = GNUNET_CADET_connect (cfg);
3431 if (NULL == cadet)
3432 {
3433 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3434 _ ("Could not connect to CADET service\n"));
3435 GNUNET_SCHEDULER_shutdown ();
3436 return;
3437 }
3438}
3439
3440
3441/**
3442 * Define "main" method using service macro.
3443 */
3444GNUNET_SERVICE_MAIN (
3445 "set",
3446 GNUNET_SERVICE_OPTION_NONE,
3447 &run,
3448 &client_connect_cb,
3449 &client_disconnect_cb,
3450 NULL,
3451 GNUNET_MQ_hd_fixed_size (client_accept,
3452 GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
3453 struct GNUNET_SETU_AcceptMessage,
3454 NULL),
3455 GNUNET_MQ_hd_var_size (client_set_add,
3456 GNUNET_MESSAGE_TYPE_SETU_ADD,
3457 struct GNUNET_SETU_ElementMessage,
3458 NULL),
3459 GNUNET_MQ_hd_fixed_size (client_create_set,
3460 GNUNET_MESSAGE_TYPE_SETU_CREATE,
3461 struct GNUNET_SETU_CreateMessage,
3462 NULL),
3463 GNUNET_MQ_hd_var_size (client_evaluate,
3464 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
3465 struct GNUNET_SETU_EvaluateMessage,
3466 NULL),
3467 GNUNET_MQ_hd_fixed_size (client_listen,
3468 GNUNET_MESSAGE_TYPE_SETU_LISTEN,
3469 struct GNUNET_SETU_ListenMessage,
3470 NULL),
3471 GNUNET_MQ_hd_fixed_size (client_reject,
3472 GNUNET_MESSAGE_TYPE_SETU_REJECT,
3473 struct GNUNET_SETU_RejectMessage,
3474 NULL),
3475 GNUNET_MQ_hd_fixed_size (client_cancel,
3476 GNUNET_MESSAGE_TYPE_SETU_CANCEL,
3477 struct GNUNET_SETU_CancelMessage,
3478 NULL),
3479 GNUNET_MQ_handler_end ());
3480
3481
3482/* end of gnunet-service-setu.c */
diff --git a/src/setu/gnunet-service-setu.h b/src/setu/gnunet-service-setu.h
new file mode 100644
index 000000000..eb6b7a8e5
--- /dev/null
+++ b/src/setu/gnunet-service-setu.h
@@ -0,0 +1,393 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-setu.h
22 * @brief common components for the implementation the different set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#ifndef GNUNET_SERVICE_SETU_H_PRIVATE
27#define GNUNET_SERVICE_SETU_H_PRIVATE
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_protocols.h"
32#include "gnunet_applications.h"
33#include "gnunet_core_service.h"
34#include "gnunet_cadet_service.h"
35#include "gnunet_setu_service.h"
36#include "setu.h"
37
38
39/**
40 * Implementation-specific set state. Used as opaque pointer, and
41 * specified further in the respective implementation.
42 */
43struct SetState;
44
45/**
46 * Implementation-specific set operation. Used as opaque pointer, and
47 * specified further in the respective implementation.
48 */
49struct OperationState;
50
51/**
52 * A set that supports a specific operation with other peers.
53 */
54struct Set;
55
56/**
57 * Information about an element element in the set. All elements are
58 * stored in a hash-table from their hash-code to their 'struct
59 * Element', so that the remove and add operations are reasonably
60 * fast.
61 */
62struct ElementEntry;
63
64/**
65 * Operation context used to execute a set operation.
66 */
67struct Operation;
68
69
70/**
71 * Information about an element element in the set. All elements are
72 * stored in a hash-table from their hash-code to their `struct
73 * Element`, so that the remove and add operations are reasonably
74 * fast.
75 */
76struct ElementEntry
77{
78 /**
79 * The actual element. The data for the element
80 * should be allocated at the end of this struct.
81 */
82 struct GNUNET_SETU_Element element;
83
84 /**
85 * Hash of the element. For set union: Will be used to derive the
86 * different IBF keys for different salts.
87 */
88 struct GNUNET_HashCode element_hash;
89
90 /**
91 * First generation that includes this element.
92 */
93 unsigned int generation;
94
95 /**
96 * #GNUNET_YES if the element is a remote element, and does not belong
97 * to the operation's set.
98 */
99 int remote;
100};
101
102
103/**
104 * A listener is inhabited by a client, and waits for evaluation
105 * requests from remote peers.
106 */
107struct Listener;
108
109
110/**
111 * State we keep per client.
112 */
113struct ClientState
114{
115 /**
116 * Set, if associated with the client, otherwise NULL.
117 */
118 struct Set *set;
119
120 /**
121 * Listener, if associated with the client, otherwise NULL.
122 */
123 struct Listener *listener;
124
125 /**
126 * Client handle.
127 */
128 struct GNUNET_SERVICE_Client *client;
129
130 /**
131 * Message queue.
132 */
133 struct GNUNET_MQ_Handle *mq;
134};
135
136
137/**
138 * Operation context used to execute a set operation.
139 */
140struct Operation
141{
142 /**
143 * Kept in a DLL of the listener, if @e listener is non-NULL.
144 */
145 struct Operation *next;
146
147 /**
148 * Kept in a DLL of the listener, if @e listener is non-NULL.
149 */
150 struct Operation *prev;
151
152 /**
153 * Channel to the peer.
154 */
155 struct GNUNET_CADET_Channel *channel;
156
157 /**
158 * Port this operation runs on.
159 */
160 struct Listener *listener;
161
162 /**
163 * Message queue for the channel.
164 */
165 struct GNUNET_MQ_Handle *mq;
166
167 /**
168 * Context message, may be NULL.
169 */
170 struct GNUNET_MessageHeader *context_msg;
171
172 /**
173 * Set associated with the operation, NULL until the spec has been
174 * associated with a set.
175 */
176 struct Set *set;
177
178 /**
179 * Operation-specific operation state. Note that the exact
180 * type depends on this being a union or intersection operation
181 * (and thus on @e vt).
182 */
183 struct OperationState *state;
184
185 /**
186 * The identity of the requesting peer. Needs to
187 * be stored here as the op spec might not have been created yet.
188 */
189 struct GNUNET_PeerIdentity peer;
190
191 /**
192 * Timeout task, if the incoming peer has not been accepted
193 * after the timeout, it will be disconnected.
194 */
195 struct GNUNET_SCHEDULER_Task *timeout_task;
196
197 /**
198 * Salt to use for the operation.
199 */
200 uint32_t salt;
201
202 /**
203 * Remote peers element count
204 */
205 uint32_t remote_element_count;
206
207 /**
208 * ID used to identify an operation between service and client
209 */
210 uint32_t client_request_id;
211
212 /**
213 * Always use delta operation instead of sending full sets,
214 * even it it's less efficient.
215 */
216 int force_delta;
217
218 /**
219 * Always send full sets, even if delta operations would
220 * be more efficient.
221 */
222 int force_full;
223
224 /**
225 * #GNUNET_YES to fail operations where Byzantine faults
226 * are suspected
227 */
228 int byzantine;
229
230 /**
231 * Lower bound for the set size, used only when
232 * byzantine mode is enabled.
233 */
234 int byzantine_lower_bound;
235
236 /**
237 * Unique request id for the request from a remote peer, sent to the
238 * client, which will accept or reject the request. Set to '0' iff
239 * the request has not been suggested yet.
240 */
241 uint32_t suggest_id;
242
243 /**
244 * Generation in which the operation handle
245 * was created.
246 */
247 unsigned int generation_created;
248};
249
250
251/**
252 * SetContent stores the actual set elements, which may be shared by
253 * multiple generations derived from one set.
254 */
255struct SetContent
256{
257 /**
258 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
259 */
260 struct GNUNET_CONTAINER_MultiHashMap *elements;
261
262 /**
263 * Number of references to the content.
264 */
265 unsigned int refcount;
266
267 /**
268 * FIXME: document!
269 */
270 unsigned int latest_generation;
271
272 /**
273 * Number of concurrently active iterators.
274 */
275 int iterator_count;
276};
277
278
279struct GenerationRange
280{
281 /**
282 * First generation that is excluded.
283 */
284 unsigned int start;
285
286 /**
287 * Generation after the last excluded generation.
288 */
289 unsigned int end;
290};
291
292
293/**
294 * A set that supports a specific operation with other peers.
295 */
296struct Set
297{
298 /**
299 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
300 */
301 struct Set *next;
302
303 /**
304 * Sets are held in a doubly linked list.
305 */
306 struct Set *prev;
307
308 /**
309 * Client that owns the set. Only one client may own a set,
310 * and there can only be one set per client.
311 */
312 struct ClientState *cs;
313
314 /**
315 * Content, possibly shared by multiple sets,
316 * and thus reference counted.
317 */
318 struct SetContent *content;
319
320 /**
321 * Implementation-specific state.
322 */
323 struct SetState *state;
324
325 /**
326 * Evaluate operations are held in a linked list.
327 */
328 struct Operation *ops_head;
329
330 /**
331 * Evaluate operations are held in a linked list.
332 */
333 struct Operation *ops_tail;
334
335 /**
336 * Current generation, that is, number of previously executed
337 * operations and lazy copies on the underlying set content.
338 */
339 unsigned int current_generation;
340
341};
342
343
344extern struct GNUNET_STATISTICS_Handle *_GSS_statistics;
345
346
347/**
348 * Destroy the given operation. Used for any operation where both
349 * peers were known and that thus actually had a vt and channel. Must
350 * not be used for operations where 'listener' is still set and we do
351 * not know the other peer.
352 *
353 * Call the implementation-specific cancel function of the operation.
354 * Disconnects from the remote peer. Does not disconnect the client,
355 * as there may be multiple operations per set.
356 *
357 * @param op operation to destroy
358 */
359void
360_GSS_operation_destroy (struct Operation *op);
361
362
363/**
364 * This function probably should not exist
365 * and be replaced by inlining more specific
366 * logic in the various places where it is called.
367 */
368void
369_GSS_operation_destroy2 (struct Operation *op);
370
371
372/**
373 * Get the table with implementing functions for set union.
374 *
375 * @return the operation specific VTable
376 */
377const struct SetVT *
378_GSS_union_vt (void);
379
380
381/**
382 * Is element @a ee part of the set used by @a op?
383 *
384 * @param ee element to test
385 * @param op operation the defines the set and its generation
386 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
387 */
388int
389_GSS_is_element_of_operation (struct ElementEntry *ee,
390 struct Operation *op);
391
392
393#endif
diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h
new file mode 100644
index 000000000..a2803ee47
--- /dev/null
+++ b/src/setu/gnunet-service-setu_protocol.h
@@ -0,0 +1,226 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013, 2014 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @author Florian Dold
22 * @author Christian Grothoff
23 * @file set/gnunet-service-set_protocol.h
24 * @brief Peer-to-Peer messages for gnunet set
25 */
26#ifndef SET_PROTOCOL_H
27#define SET_PROTOCOL_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35struct OperationRequestMessage
36{
37 /**
38 * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
39 */
40 struct GNUNET_MessageHeader header;
41
42 /**
43 * Operation to request, values from `enum GNUNET_SET_OperationType`
44 */
45 uint32_t operation GNUNET_PACKED;
46
47 /**
48 * For Intersection: my element count
49 */
50 uint32_t element_count GNUNET_PACKED;
51
52 /**
53 * Application-specific identifier of the request.
54 */
55 struct GNUNET_HashCode app_idX;
56
57 /* rest: optional message */
58};
59
60
61/**
62 * Message containing buckets of an invertible bloom filter.
63 *
64 * If an IBF has too many buckets for an IBF message,
65 * it is split into multiple messages.
66 */
67struct IBFMessage
68{
69 /**
70 * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
71 */
72 struct GNUNET_MessageHeader header;
73
74 /**
75 * Order of the whole ibf, where
76 * num_buckets = 2^order
77 */
78 uint8_t order;
79
80 /**
81 * Padding, must be 0.
82 */
83 uint8_t reserved1;
84
85 /**
86 * Padding, must be 0.
87 */
88 uint16_t reserved2 GNUNET_PACKED;
89
90 /**
91 * Offset of the strata in the rest of the message
92 */
93 uint32_t offset GNUNET_PACKED;
94
95 /**
96 * Salt used when hashing elements for this IBF.
97 */
98 uint32_t salt GNUNET_PACKED;
99
100 /* rest: buckets */
101};
102
103
104struct InquiryMessage
105{
106 /**
107 * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
108 */
109 struct GNUNET_MessageHeader header;
110
111 /**
112 * Salt used when hashing elements for this inquiry.
113 */
114 uint32_t salt GNUNET_PACKED;
115
116 /**
117 * Reserved, set to 0.
118 */
119 uint32_t reserved GNUNET_PACKED;
120
121 /* rest: inquiry IBF keys */
122};
123
124
125/**
126 * During intersection, the first (and possibly second) message
127 * send it the number of elements in the set, to allow the peers
128 * to decide who should start with the Bloom filter.
129 */
130struct IntersectionElementInfoMessage
131{
132 /**
133 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
134 */
135 struct GNUNET_MessageHeader header;
136
137 /**
138 * mutator used with this bloomfilter.
139 */
140 uint32_t sender_element_count GNUNET_PACKED;
141};
142
143
144/**
145 * Bloom filter messages exchanged for set intersection calculation.
146 */
147struct BFMessage
148{
149 /**
150 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
151 */
152 struct GNUNET_MessageHeader header;
153
154 /**
155 * Number of elements the sender still has in the set.
156 */
157 uint32_t sender_element_count GNUNET_PACKED;
158
159 /**
160 * XOR of all hashes over all elements remaining in the set.
161 * Used to determine termination.
162 */
163 struct GNUNET_HashCode element_xor_hash;
164
165 /**
166 * Mutator used with this bloomfilter.
167 */
168 uint32_t sender_mutator GNUNET_PACKED;
169
170 /**
171 * Total length of the bloomfilter data.
172 */
173 uint32_t bloomfilter_total_length GNUNET_PACKED;
174
175 /**
176 * Number of bits (k-value) used in encoding the bloomfilter.
177 */
178 uint32_t bits_per_element GNUNET_PACKED;
179
180 /**
181 * rest: the sender's bloomfilter
182 */
183};
184
185
186/**
187 * Last message, send to confirm the final set. Contains the element
188 * count as it is possible that the peer determined that we were done
189 * by getting the empty set, which in that case also needs to be
190 * communicated.
191 */
192struct IntersectionDoneMessage
193{
194 /**
195 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
196 */
197 struct GNUNET_MessageHeader header;
198
199 /**
200 * Final number of elements in intersection.
201 */
202 uint32_t final_element_count GNUNET_PACKED;
203
204 /**
205 * XOR of all hashes over all elements remaining in the set.
206 */
207 struct GNUNET_HashCode element_xor_hash;
208};
209
210
211/**
212 * Strata estimator together with the peer's overall set size.
213 */
214struct StrataEstimatorMessage
215{
216 /**
217 * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
218 */
219 struct GNUNET_MessageHeader header;
220
221 uint64_t set_size;
222};
223
224GNUNET_NETWORK_STRUCT_END
225
226#endif
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c
new file mode 100644
index 000000000..0fa6a6f17
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.c
@@ -0,0 +1,303 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-setu_strata_estimator.c
22 * @brief invertible bloom filter
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "ibf.h"
29#include "gnunet-service-setu_strata_estimator.h"
30
31
32/**
33 * Should we try compressing the strata estimator? This will
34 * break compatibility with the 0.10.1-network.
35 */
36#define FAIL_10_1_COMPATIBILTIY 1
37
38
39/**
40 * Write the given strata estimator to the buffer.
41 *
42 * @param se strata estimator to serialize
43 * @param[out] buf buffer to write to, must be of appropriate size
44 * @return number of bytes written to @a buf
45 */
46size_t
47strata_estimator_write (const struct StrataEstimator *se,
48 void *buf)
49{
50 char *sbuf = buf;
51 unsigned int i;
52 size_t osize;
53
54 GNUNET_assert (NULL != se);
55 for (i = 0; i < se->strata_count; i++)
56 {
57 ibf_write_slice (se->strata[i],
58 0,
59 se->ibf_size,
60 &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]);
61 }
62 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
63#if FAIL_10_1_COMPATIBILTIY
64 {
65 char *cbuf;
66 size_t nsize;
67
68 if (GNUNET_YES ==
69 GNUNET_try_compression (buf,
70 osize,
71 &cbuf,
72 &nsize))
73 {
74 GNUNET_memcpy (buf, cbuf, nsize);
75 osize = nsize;
76 GNUNET_free (cbuf);
77 }
78 }
79#endif
80 return osize;
81}
82
83
84/**
85 * Read strata from the buffer into the given strata
86 * estimator. The strata estimator must already be allocated.
87 *
88 * @param buf buffer to read from
89 * @param buf_len number of bytes in @a buf
90 * @param is_compressed is the data compressed?
91 * @param[out] se strata estimator to write to
92 * @return #GNUNET_OK on success
93 */
94int
95strata_estimator_read (const void *buf,
96 size_t buf_len,
97 int is_compressed,
98 struct StrataEstimator *se)
99{
100 unsigned int i;
101 size_t osize;
102 char *dbuf;
103
104 dbuf = NULL;
105 if (GNUNET_YES == is_compressed)
106 {
107 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
108 dbuf = GNUNET_decompress (buf,
109 buf_len,
110 osize);
111 if (NULL == dbuf)
112 {
113 GNUNET_break_op (0); /* bad compressed input data */
114 return GNUNET_SYSERR;
115 }
116 buf = dbuf;
117 buf_len = osize;
118 }
119
120 if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE)
121 {
122 GNUNET_break (0); /* very odd error */
123 GNUNET_free (dbuf);
124 return GNUNET_SYSERR;
125 }
126
127 for (i = 0; i < se->strata_count; i++)
128 {
129 ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]);
130 buf += se->ibf_size * IBF_BUCKET_SIZE;
131 }
132 GNUNET_free (dbuf);
133 return GNUNET_OK;
134}
135
136
137/**
138 * Add a key to the strata estimator.
139 *
140 * @param se strata estimator to add the key to
141 * @param key key to add
142 */
143void
144strata_estimator_insert (struct StrataEstimator *se,
145 struct IBF_Key key)
146{
147 uint64_t v;
148 unsigned int i;
149
150 v = key.key_val;
151 /* count trailing '1'-bits of v */
152 for (i = 0; v & 1; v >>= 1, i++)
153 /* empty */;
154 ibf_insert (se->strata[i], key);
155}
156
157
158/**
159 * Remove a key from the strata estimator.
160 *
161 * @param se strata estimator to remove the key from
162 * @param key key to remove
163 */
164void
165strata_estimator_remove (struct StrataEstimator *se,
166 struct IBF_Key key)
167{
168 uint64_t v;
169 unsigned int i;
170
171 v = key.key_val;
172 /* count trailing '1'-bits of v */
173 for (i = 0; v & 1; v >>= 1, i++)
174 /* empty */;
175 ibf_remove (se->strata[i], key);
176}
177
178
179/**
180 * Create a new strata estimator with the given parameters.
181 *
182 * @param strata_count number of stratas, that is, number of ibfs in the estimator
183 * @param ibf_size size of each ibf stratum
184 * @param ibf_hashnum hashnum parameter of each ibf
185 * @return a freshly allocated, empty strata estimator, NULL on error
186 */
187struct StrataEstimator *
188strata_estimator_create (unsigned int strata_count,
189 uint32_t ibf_size,
190 uint8_t ibf_hashnum)
191{
192 struct StrataEstimator *se;
193 unsigned int i;
194 unsigned int j;
195
196 se = GNUNET_new (struct StrataEstimator);
197 se->strata_count = strata_count;
198 se->ibf_size = ibf_size;
199 se->strata = GNUNET_new_array (strata_count,
200 struct InvertibleBloomFilter *);
201 for (i = 0; i < strata_count; i++)
202 {
203 se->strata[i] = ibf_create (ibf_size, ibf_hashnum);
204 if (NULL == se->strata[i])
205 {
206 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
207 "Failed to allocate memory for strata estimator\n");
208 for (j = 0; j < i; j++)
209 ibf_destroy (se->strata[i]);
210 GNUNET_free (se);
211 return NULL;
212 }
213 }
214 return se;
215}
216
217
218/**
219 * Estimate set difference with two strata estimators,
220 * i.e. arrays of IBFs.
221 * Does not not modify its arguments.
222 *
223 * @param se1 first strata estimator
224 * @param se2 second strata estimator
225 * @return the estimated difference
226 */
227unsigned int
228strata_estimator_difference (const struct StrataEstimator *se1,
229 const struct StrataEstimator *se2)
230{
231 unsigned int count;
232
233 GNUNET_assert (se1->strata_count == se2->strata_count);
234 count = 0;
235 for (int i = se1->strata_count - 1; i >= 0; i--)
236 {
237 struct InvertibleBloomFilter *diff;
238 /* number of keys decoded from the ibf */
239
240 /* FIXME: implement this without always allocating new IBFs */
241 diff = ibf_dup (se1->strata[i]);
242 ibf_subtract (diff, se2->strata[i]);
243 for (int ibf_count = 0; GNUNET_YES; ibf_count++)
244 {
245 int more;
246
247 more = ibf_decode (diff, NULL, NULL);
248 if (GNUNET_NO == more)
249 {
250 count += ibf_count;
251 break;
252 }
253 /* Estimate if decoding fails or would not terminate */
254 if ((GNUNET_SYSERR == more) || (ibf_count > diff->size))
255 {
256 ibf_destroy (diff);
257 return count * (1 << (i + 1));
258 }
259 }
260 ibf_destroy (diff);
261 }
262 return count;
263}
264
265
266/**
267 * Make a copy of a strata estimator.
268 *
269 * @param se the strata estimator to copy
270 * @return the copy
271 */
272struct StrataEstimator *
273strata_estimator_dup (struct StrataEstimator *se)
274{
275 struct StrataEstimator *c;
276 unsigned int i;
277
278 c = GNUNET_new (struct StrataEstimator);
279 c->strata_count = se->strata_count;
280 c->ibf_size = se->ibf_size;
281 c->strata = GNUNET_new_array (se->strata_count,
282 struct InvertibleBloomFilter *);
283 for (i = 0; i < se->strata_count; i++)
284 c->strata[i] = ibf_dup (se->strata[i]);
285 return c;
286}
287
288
289/**
290 * Destroy a strata estimator, free all of its resources.
291 *
292 * @param se strata estimator to destroy.
293 */
294void
295strata_estimator_destroy (struct StrataEstimator *se)
296{
297 unsigned int i;
298
299 for (i = 0; i < se->strata_count; i++)
300 ibf_destroy (se->strata[i]);
301 GNUNET_free (se->strata);
302 GNUNET_free (se);
303}
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h
new file mode 100644
index 000000000..afdbcdbbf
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.h
@@ -0,0 +1,169 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/gnunet-service-setu_strata_estimator.h
23 * @brief estimator of set difference
24 * @author Florian Dold
25 */
26
27#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
28#define GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
29
30#include "platform.h"
31#include "gnunet_common.h"
32#include "gnunet_util_lib.h"
33
34#ifdef __cplusplus
35extern "C"
36{
37#if 0 /* keep Emacsens' auto-indent happy */
38}
39#endif
40#endif
41
42
43/**
44 * A handle to a strata estimator.
45 */
46struct StrataEstimator
47{
48 /**
49 * The IBFs of this strata estimator.
50 */
51 struct InvertibleBloomFilter **strata;
52
53 /**
54 * Size of the IBF array in @e strata
55 */
56 unsigned int strata_count;
57
58 /**
59 * Size of each IBF stratum (in bytes)
60 */
61 unsigned int ibf_size;
62};
63
64
65/**
66 * Write the given strata estimator to the buffer.
67 *
68 * @param se strata estimator to serialize
69 * @param[out] buf buffer to write to, must be of appropriate size
70 * @return number of bytes written to @a buf
71 */
72size_t
73strata_estimator_write (const struct StrataEstimator *se,
74 void *buf);
75
76
77/**
78 * Read strata from the buffer into the given strata
79 * estimator. The strata estimator must already be allocated.
80 *
81 * @param buf buffer to read from
82 * @param buf_len number of bytes in @a buf
83 * @param is_compressed is the data compressed?
84 * @param[out] se strata estimator to write to
85 * @return #GNUNET_OK on success
86 */
87int
88strata_estimator_read (const void *buf,
89 size_t buf_len,
90 int is_compressed,
91 struct StrataEstimator *se);
92
93
94/**
95 * Create a new strata estimator with the given parameters.
96 *
97 * @param strata_count number of stratas, that is, number of ibfs in the estimator
98 * @param ibf_size size of each ibf stratum
99 * @param ibf_hashnum hashnum parameter of each ibf
100 * @return a freshly allocated, empty strata estimator, NULL on error
101 */
102struct StrataEstimator *
103strata_estimator_create (unsigned int strata_count,
104 uint32_t ibf_size,
105 uint8_t ibf_hashnum);
106
107
108/**
109 * Get an estimation of the symmetric difference of the elements
110 * contained in both strata estimators.
111 *
112 * @param se1 first strata estimator
113 * @param se2 second strata estimator
114 * @return abs(|se1| - |se2|)
115 */
116unsigned int
117strata_estimator_difference (const struct StrataEstimator *se1,
118 const struct StrataEstimator *se2);
119
120
121/**
122 * Add a key to the strata estimator.
123 *
124 * @param se strata estimator to add the key to
125 * @param key key to add
126 */
127void
128strata_estimator_insert (struct StrataEstimator *se,
129 struct IBF_Key key);
130
131
132/**
133 * Remove a key from the strata estimator.
134 *
135 * @param se strata estimator to remove the key from
136 * @param key key to remove
137 */
138void
139strata_estimator_remove (struct StrataEstimator *se,
140 struct IBF_Key key);
141
142
143/**
144 * Destroy a strata estimator, free all of its resources.
145 *
146 * @param se strata estimator to destroy.
147 */
148void
149strata_estimator_destroy (struct StrataEstimator *se);
150
151
152/**
153 * Make a copy of a strata estimator.
154 *
155 * @param se the strata estimator to copy
156 * @return the copy
157 */
158struct StrataEstimator *
159strata_estimator_dup (struct StrataEstimator *se);
160
161
162#if 0 /* keep Emacsens' auto-indent happy */
163{
164#endif
165#ifdef __cplusplus
166}
167#endif
168
169#endif
diff --git a/src/setu/gnunet-setu-ibf-profiler.c b/src/setu/gnunet-setu-ibf-profiler.c
new file mode 100644
index 000000000..944b63d30
--- /dev/null
+++ b/src/setu/gnunet-setu-ibf-profiler.c
@@ -0,0 +1,308 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/gnunet-set-ibf-profiler.c
23 * @brief tool for profiling the invertible bloom filter implementation
24 * @author Florian Dold
25 */
26
27#include "platform.h"
28#include "gnunet_util_lib.h"
29
30#include "ibf.h"
31
32static unsigned int asize = 10;
33static unsigned int bsize = 10;
34static unsigned int csize = 10;
35static unsigned int hash_num = 4;
36static unsigned int ibf_size = 80;
37
38/* FIXME: add parameter for this */
39static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
40
41static struct GNUNET_CONTAINER_MultiHashMap *set_a;
42static struct GNUNET_CONTAINER_MultiHashMap *set_b;
43/* common elements in a and b */
44static struct GNUNET_CONTAINER_MultiHashMap *set_c;
45
46static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
47
48static struct InvertibleBloomFilter *ibf_a;
49static struct InvertibleBloomFilter *ibf_b;
50
51
52static void
53register_hashcode (struct GNUNET_HashCode *hash)
54{
55 struct GNUNET_HashCode replicated;
56 struct IBF_Key key;
57
58 key = ibf_key_from_hashcode (hash);
59 ibf_hashcode_from_key (key, &replicated);
60 (void) GNUNET_CONTAINER_multihashmap_put (
61 key_to_hashcode,
62 &replicated,
63 GNUNET_memdup (hash, sizeof *hash),
64 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
65}
66
67
68static void
69iter_hashcodes (struct IBF_Key key,
70 GNUNET_CONTAINER_MulitHashMapIteratorCallback iter,
71 void *cls)
72{
73 struct GNUNET_HashCode replicated;
74
75 ibf_hashcode_from_key (key, &replicated);
76 GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode,
77 &replicated,
78 iter,
79 cls);
80}
81
82
83static int
84insert_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
85{
86 struct InvertibleBloomFilter *ibf = cls;
87
88 ibf_insert (ibf, ibf_key_from_hashcode (key));
89 return GNUNET_YES;
90}
91
92
93static int
94remove_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
95{
96 struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
97
98 /* if remove fails, there just was a collision with another key */
99 (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
100 return GNUNET_YES;
101}
102
103
104static void
105run (void *cls,
106 char *const *args,
107 const char *cfgfile,
108 const struct GNUNET_CONFIGURATION_Handle *cfg)
109{
110 struct GNUNET_HashCode id;
111 struct IBF_Key ibf_key;
112 int i;
113 int side;
114 int res;
115 struct GNUNET_TIME_Absolute start_time;
116 struct GNUNET_TIME_Relative delta_time;
117
118 set_a =
119 GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)),
120 GNUNET_NO);
121 set_b =
122 GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)),
123 GNUNET_NO);
124 set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
125 GNUNET_NO);
126
127 key_to_hashcode =
128 GNUNET_CONTAINER_multihashmap_create (((asize + bsize + csize == 0)
129 ? 1
130 : (asize + bsize + csize)),
131 GNUNET_NO);
132
133 printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
134 hash_num,
135 ibf_size,
136 asize,
137 bsize,
138 csize);
139
140 i = 0;
141 while (i < asize)
142 {
143 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
144 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
145 continue;
146 GNUNET_break (GNUNET_OK ==
147 GNUNET_CONTAINER_multihashmap_put (
148 set_a,
149 &id,
150 NULL,
151 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
152 register_hashcode (&id);
153 i++;
154 }
155 i = 0;
156 while (i < bsize)
157 {
158 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
159 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
160 continue;
161 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
162 continue;
163 GNUNET_break (GNUNET_OK ==
164 GNUNET_CONTAINER_multihashmap_put (
165 set_b,
166 &id,
167 NULL,
168 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
169 register_hashcode (&id);
170 i++;
171 }
172 i = 0;
173 while (i < csize)
174 {
175 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
176 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
177 continue;
178 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
179 continue;
180 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
181 continue;
182 GNUNET_break (GNUNET_OK ==
183 GNUNET_CONTAINER_multihashmap_put (
184 set_c,
185 &id,
186 NULL,
187 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
188 register_hashcode (&id);
189 i++;
190 }
191
192 ibf_a = ibf_create (ibf_size, hash_num);
193 ibf_b = ibf_create (ibf_size, hash_num);
194 if ((NULL == ibf_a) || (NULL == ibf_b))
195 {
196 /* insufficient memory */
197 GNUNET_break (0);
198 GNUNET_SCHEDULER_shutdown ();
199 return;
200 }
201
202
203 printf ("generated sets\n");
204
205 start_time = GNUNET_TIME_absolute_get ();
206
207 GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
208 GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
209 GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
210 GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
211
212 delta_time = GNUNET_TIME_absolute_get_duration (start_time);
213
214 printf ("encoded in: %s\n",
215 GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
216
217 ibf_subtract (ibf_a, ibf_b);
218
219
220 start_time = GNUNET_TIME_absolute_get ();
221
222 for (i = 0; i <= asize + bsize; i++)
223 {
224 res = ibf_decode (ibf_a, &side, &ibf_key);
225 if (GNUNET_SYSERR == res)
226 {
227 printf ("decode failed, %u/%u elements left\n",
228 GNUNET_CONTAINER_multihashmap_size (set_a)
229 + GNUNET_CONTAINER_multihashmap_size (set_b),
230 asize + bsize);
231 return;
232 }
233 if (GNUNET_NO == res)
234 {
235 if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
236 (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
237 {
238 delta_time = GNUNET_TIME_absolute_get_duration (start_time);
239 printf ("decoded successfully in: %s\n",
240 GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
241 }
242 else
243 {
244 printf ("decode missed elements (should never happen)\n");
245 }
246 return;
247 }
248
249 if (side == 1)
250 iter_hashcodes (ibf_key, remove_iterator, set_a);
251 if (side == -1)
252 iter_hashcodes (ibf_key, remove_iterator, set_b);
253 }
254 printf ("cyclic IBF, %u/%u elements left\n",
255 GNUNET_CONTAINER_multihashmap_size (set_a)
256 + GNUNET_CONTAINER_multihashmap_size (set_b),
257 asize + bsize);
258}
259
260
261int
262main (int argc, char **argv)
263{
264 struct GNUNET_GETOPT_CommandLineOption options[] = {
265 GNUNET_GETOPT_option_uint ('A',
266 "asize",
267 NULL,
268 gettext_noop ("number of element in set A-B"),
269 &asize),
270
271 GNUNET_GETOPT_option_uint ('B',
272 "bsize",
273 NULL,
274 gettext_noop ("number of element in set B-A"),
275 &bsize),
276
277 GNUNET_GETOPT_option_uint ('C',
278 "csize",
279 NULL,
280 gettext_noop (
281 "number of common elements in A and B"),
282 &csize),
283
284 GNUNET_GETOPT_option_uint ('k',
285 "hash-num",
286 NULL,
287 gettext_noop ("hash num"),
288 &hash_num),
289
290 GNUNET_GETOPT_option_uint ('s',
291 "ibf-size",
292 NULL,
293 gettext_noop ("ibf size"),
294 &ibf_size),
295
296 GNUNET_GETOPT_OPTION_END
297 };
298
299 GNUNET_PROGRAM_run2 (argc,
300 argv,
301 "gnunet-consensus-ibf",
302 "help",
303 options,
304 &run,
305 NULL,
306 GNUNET_YES);
307 return 0;
308}
diff --git a/src/setu/gnunet-setu-profiler.c b/src/setu/gnunet-setu-profiler.c
new file mode 100644
index 000000000..8d6a2dc8c
--- /dev/null
+++ b/src/setu/gnunet-setu-profiler.c
@@ -0,0 +1,499 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file setu/gnunet-setu-profiler.c
23 * @brief profiling tool for set
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h"
29#include "gnunet_setu_service.h"
30#include "gnunet_testbed_service.h"
31
32
33static int ret;
34
35static unsigned int num_a = 5;
36static unsigned int num_b = 5;
37static unsigned int num_c = 20;
38
39static char *op_str = "union";
40
41const static struct GNUNET_CONFIGURATION_Handle *config;
42
43struct SetInfo
44{
45 char *id;
46 struct GNUNET_SETU_Handle *set;
47 struct GNUNET_SETU_OperationHandle *oh;
48 struct GNUNET_CONTAINER_MultiHashMap *sent;
49 struct GNUNET_CONTAINER_MultiHashMap *received;
50 int done;
51} info1, info2;
52
53static struct GNUNET_CONTAINER_MultiHashMap *common_sent;
54
55static struct GNUNET_HashCode app_id;
56
57static struct GNUNET_PeerIdentity local_peer;
58
59static struct GNUNET_SETU_ListenHandle *set_listener;
60
61static int byzantine;
62static unsigned int force_delta;
63static unsigned int force_full;
64static unsigned int element_size = 32;
65
66/**
67 * Handle to the statistics service.
68 */
69static struct GNUNET_STATISTICS_Handle *statistics;
70
71/**
72 * The profiler will write statistics
73 * for all peers to the file with this name.
74 */
75static char *statistics_filename;
76
77/**
78 * The profiler will write statistics
79 * for all peers to this file.
80 */
81static FILE *statistics_file;
82
83
84static int
85map_remove_iterator (void *cls,
86 const struct GNUNET_HashCode *key,
87 void *value)
88{
89 struct GNUNET_CONTAINER_MultiHashMap *m = cls;
90 int ret;
91
92 GNUNET_assert (NULL != key);
93
94 ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key);
95 if (GNUNET_OK != ret)
96 printf ("spurious element\n");
97 return GNUNET_YES;
98}
99
100
101/**
102 * Callback function to process statistic values.
103 *
104 * @param cls closure
105 * @param subsystem name of subsystem that created the statistic
106 * @param name the name of the datum
107 * @param value the current value
108 * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
109 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
110 */
111static int
112statistics_result (void *cls,
113 const char *subsystem,
114 const char *name,
115 uint64_t value,
116 int is_persistent)
117{
118 if (NULL != statistics_file)
119 {
120 fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned
121 long) value);
122 }
123 return GNUNET_OK;
124}
125
126
127static void
128statistics_done (void *cls,
129 int success)
130{
131 GNUNET_assert (GNUNET_YES == success);
132 if (NULL != statistics_file)
133 fclose (statistics_file);
134 GNUNET_SCHEDULER_shutdown ();
135}
136
137
138static void
139check_all_done (void)
140{
141 if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO))
142 return;
143
144 GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator,
145 info2.sent);
146 GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator,
147 info1.sent);
148
149 printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
150 info1.sent));
151 printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
152 info2.sent));
153
154 if (NULL == statistics_filename)
155 {
156 GNUNET_SCHEDULER_shutdown ();
157 return;
158 }
159
160 statistics_file = fopen (statistics_filename, "w");
161 GNUNET_STATISTICS_get (statistics, NULL, NULL,
162 &statistics_done,
163 &statistics_result, NULL);
164}
165
166
167static void
168set_result_cb (void *cls,
169 const struct GNUNET_SETU_Element *element,
170 uint64_t current_size,
171 enum GNUNET_SETU_Status status)
172{
173 struct SetInfo *info = cls;
174
175 GNUNET_assert (GNUNET_NO == info->done);
176 switch (status)
177 {
178 case GNUNET_SETU_STATUS_DONE:
179 info->done = GNUNET_YES;
180 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id);
181 check_all_done ();
182 info->oh = NULL;
183 return;
184
185 case GNUNET_SETU_STATUS_FAILURE:
186 info->oh = NULL;
187 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
188 GNUNET_SCHEDULER_shutdown ();
189 return;
190
191 case GNUNET_SETU_STATUS_ADD_LOCAL:
192 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id);
193 break;
194 default:
195 GNUNET_assert (0);
196 }
197
198 if (element->size != element_size)
199 {
200 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
201 "wrong element size: %u, expected %u\n",
202 element->size,
203 (unsigned int) sizeof(struct GNUNET_HashCode));
204 GNUNET_assert (0);
205 }
206
207 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n",
208 info->id, GNUNET_h2s (element->data));
209 GNUNET_assert (NULL != element->data);
210 struct GNUNET_HashCode data_hash;
211 GNUNET_CRYPTO_hash (element->data, element_size, &data_hash);
212 GNUNET_CONTAINER_multihashmap_put (info->received,
213 &data_hash, NULL,
214 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
215}
216
217
218static void
219set_listen_cb (void *cls,
220 const struct GNUNET_PeerIdentity *other_peer,
221 const struct GNUNET_MessageHeader *context_msg,
222 struct GNUNET_SETU_Request *request)
223{
224 /* max. 2 options plus terminator */
225 struct GNUNET_SETU_Option opts[3] = { { 0 } };
226 unsigned int n_opts = 0;
227
228 if (NULL == request)
229 {
230 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
231 "listener failed\n");
232 return;
233 }
234 GNUNET_assert (NULL == info2.oh);
235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
236 "set listen cb called\n");
237 if (byzantine)
238 {
239 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
240 GNUNET_SETU_OPTION_BYZANTINE };
241 }
242 GNUNET_assert (! (force_full && force_delta));
243 if (force_full)
244 {
245 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
246 GNUNET_SETU_OPTION_FORCE_FULL };
247 }
248 if (force_delta)
249 {
250 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
251 GNUNET_SETU_OPTION_FORCE_DELTA };
252 }
253
254 opts[n_opts].type = 0;
255 info2.oh = GNUNET_SETU_accept (request,
256 opts,
257 set_result_cb, &info2);
258 GNUNET_SETU_commit (info2.oh, info2.set);
259}
260
261
262static int
263set_insert_iterator (void *cls,
264 const struct GNUNET_HashCode *key,
265 void *value)
266{
267 struct GNUNET_SETU_Handle *set = cls;
268 struct GNUNET_SETU_Element el;
269
270 el.element_type = 0;
271 el.data = value;
272 el.size = element_size;
273 GNUNET_SETU_add_element (set, &el, NULL, NULL);
274 return GNUNET_YES;
275}
276
277
278static void
279handle_shutdown (void *cls)
280{
281 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
282 "Shutting down set profiler\n");
283 if (NULL != set_listener)
284 {
285 GNUNET_SETU_listen_cancel (set_listener);
286 set_listener = NULL;
287 }
288 if (NULL != info1.oh)
289 {
290 GNUNET_SETU_operation_cancel (info1.oh);
291 info1.oh = NULL;
292 }
293 if (NULL != info2.oh)
294 {
295 GNUNET_SETU_operation_cancel (info2.oh);
296 info2.oh = NULL;
297 }
298 if (NULL != info1.set)
299 {
300 GNUNET_SETU_destroy (info1.set);
301 info1.set = NULL;
302 }
303 if (NULL != info2.set)
304 {
305 GNUNET_SETU_destroy (info2.set);
306 info2.set = NULL;
307 }
308 GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
309}
310
311
312static void
313run (void *cls,
314 const struct GNUNET_CONFIGURATION_Handle *cfg,
315 struct GNUNET_TESTING_Peer *peer)
316{
317 unsigned int i;
318 struct GNUNET_HashCode hash;
319 /* max. 2 options plus terminator */
320 struct GNUNET_SETU_Option opts[3] = { { 0 } };
321 unsigned int n_opts = 0;
322
323 config = cfg;
324
325 GNUNET_assert (element_size > 0);
326
327 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer))
328 {
329 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
330 ret = 0;
331 return;
332 }
333
334 statistics = GNUNET_STATISTICS_create ("set-profiler", cfg);
335
336 GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL);
337
338 info1.id = "a";
339 info2.id = "b";
340
341 info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
342 info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
343 common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO);
344
345 info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
346 info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
347
348 for (i = 0; i < num_a; i++)
349 {
350 char *data = GNUNET_malloc (element_size);
351 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
352 GNUNET_CRYPTO_hash (data, element_size, &hash);
353 GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data,
354 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
355 }
356
357 for (i = 0; i < num_b; i++)
358 {
359 char *data = GNUNET_malloc (element_size);
360 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
361 GNUNET_CRYPTO_hash (data, element_size, &hash);
362 GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data,
363 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
364 }
365
366 for (i = 0; i < num_c; i++)
367 {
368 char *data = GNUNET_malloc (element_size);
369 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
370 GNUNET_CRYPTO_hash (data, element_size, &hash);
371 GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data,
372 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
373 }
374
375 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id);
376
377 /* FIXME: also implement intersection etc. */
378 info1.set = GNUNET_SETU_create (config);
379 info2.set = GNUNET_SETU_create (config);
380
381 GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator,
382 info1.set);
383 GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator,
384 info2.set);
385 GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
386 info1.set);
387 GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
388 info2.set);
389
390 set_listener = GNUNET_SETU_listen (config,
391 &app_id,
392 &set_listen_cb,
393 NULL);
394
395
396 if (byzantine)
397 {
398 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
399 GNUNET_SETU_OPTION_BYZANTINE };
400 }
401 GNUNET_assert (! (force_full && force_delta));
402 if (force_full)
403 {
404 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
405 GNUNET_SETU_OPTION_FORCE_FULL };
406 }
407 if (force_delta)
408 {
409 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
410 GNUNET_SETU_OPTION_FORCE_DELTA };
411 }
412
413 opts[n_opts].type = 0;
414
415 info1.oh = GNUNET_SETU_prepare (&local_peer, &app_id, NULL,
416 opts,
417 set_result_cb, &info1);
418 GNUNET_SETU_commit (info1.oh, info1.set);
419 GNUNET_SETU_destroy (info1.set);
420 info1.set = NULL;
421}
422
423
424static void
425pre_run (void *cls, char *const *args, const char *cfgfile,
426 const struct GNUNET_CONFIGURATION_Handle *cfg)
427{
428 if (0 != GNUNET_TESTING_peer_run ("set-profiler",
429 cfgfile,
430 &run, NULL))
431 ret = 2;
432}
433
434
435int
436main (int argc, char **argv)
437{
438 struct GNUNET_GETOPT_CommandLineOption options[] = {
439 GNUNET_GETOPT_option_uint ('A',
440 "num-first",
441 NULL,
442 gettext_noop ("number of values"),
443 &num_a),
444
445 GNUNET_GETOPT_option_uint ('B',
446 "num-second",
447 NULL,
448 gettext_noop ("number of values"),
449 &num_b),
450
451 GNUNET_GETOPT_option_flag ('b',
452 "byzantine",
453 gettext_noop ("use byzantine mode"),
454 &byzantine),
455
456 GNUNET_GETOPT_option_uint ('f',
457 "force-full",
458 NULL,
459 gettext_noop ("force sending full set"),
460 &force_full),
461
462 GNUNET_GETOPT_option_uint ('d',
463 "force-delta",
464 NULL,
465 gettext_noop ("number delta operation"),
466 &force_delta),
467
468 GNUNET_GETOPT_option_uint ('C',
469 "num-common",
470 NULL,
471 gettext_noop ("number of values"),
472 &num_c),
473
474 GNUNET_GETOPT_option_string ('x',
475 "operation",
476 NULL,
477 gettext_noop ("operation to execute"),
478 &op_str),
479
480 GNUNET_GETOPT_option_uint ('w',
481 "element-size",
482 NULL,
483 gettext_noop ("element size"),
484 &element_size),
485
486 GNUNET_GETOPT_option_filename ('s',
487 "statistics",
488 "FILENAME",
489 gettext_noop ("write statistics to file"),
490 &statistics_filename),
491
492 GNUNET_GETOPT_OPTION_END
493 };
494
495 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler",
496 "help",
497 options, &pre_run, NULL, GNUNET_YES);
498 return ret;
499}
diff --git a/src/setu/ibf.c b/src/setu/ibf.c
new file mode 100644
index 000000000..1532afceb
--- /dev/null
+++ b/src/setu/ibf.c
@@ -0,0 +1,409 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/ibf.c
23 * @brief implementation of the invertible bloom filter
24 * @author Florian Dold
25 */
26
27#include "ibf.h"
28
29/**
30 * Compute the key's hash from the key.
31 * Redefine to use a different hash function.
32 */
33#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof(struct \
34 IBF_KeyHash)))
35
36/**
37 * Create a key from a hashcode.
38 *
39 * @param hash the hashcode
40 * @return a key
41 */
42struct IBF_Key
43ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
44{
45 return *(struct IBF_Key *) hash;
46}
47
48
49/**
50 * Create a hashcode from a key, by replicating the key
51 * until the hascode is filled
52 *
53 * @param key the key
54 * @param dst hashcode to store the result in
55 */
56void
57ibf_hashcode_from_key (struct IBF_Key key,
58 struct GNUNET_HashCode *dst)
59{
60 struct IBF_Key *p;
61 unsigned int i;
62 const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode)
63 / sizeof(struct IBF_Key);
64
65 p = (struct IBF_Key *) dst;
66 for (i = 0; i < keys_per_hashcode; i++)
67 *p++ = key;
68}
69
70
71/**
72 * Create an invertible bloom filter.
73 *
74 * @param size number of IBF buckets
75 * @param hash_num number of buckets one element is hashed in
76 * @return the newly created invertible bloom filter, NULL on error
77 */
78struct InvertibleBloomFilter *
79ibf_create (uint32_t size, uint8_t hash_num)
80{
81 struct InvertibleBloomFilter *ibf;
82
83 GNUNET_assert (0 != size);
84
85 ibf = GNUNET_new (struct InvertibleBloomFilter);
86 ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t));
87 if (NULL == ibf->count)
88 {
89 GNUNET_free (ibf);
90 return NULL;
91 }
92 ibf->key_sum = GNUNET_malloc_large (size * sizeof(struct IBF_Key));
93 if (NULL == ibf->key_sum)
94 {
95 GNUNET_free (ibf->count);
96 GNUNET_free (ibf);
97 return NULL;
98 }
99 ibf->key_hash_sum = GNUNET_malloc_large (size * sizeof(struct IBF_KeyHash));
100 if (NULL == ibf->key_hash_sum)
101 {
102 GNUNET_free (ibf->key_sum);
103 GNUNET_free (ibf->count);
104 GNUNET_free (ibf);
105 return NULL;
106 }
107 ibf->size = size;
108 ibf->hash_num = hash_num;
109
110 return ibf;
111}
112
113
114/**
115 * Store unique bucket indices for the specified key in dst.
116 */
117static void
118ibf_get_indices (const struct InvertibleBloomFilter *ibf,
119 struct IBF_Key key,
120 int *dst)
121{
122 uint32_t filled;
123 uint32_t i;
124 uint32_t bucket;
125
126 bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key);
127 for (i = 0, filled = 0; filled < ibf->hash_num; i++)
128 {
129 unsigned int j;
130 uint64_t x;
131 for (j = 0; j < filled; j++)
132 if (dst[j] == bucket)
133 goto try_next;
134 dst[filled++] = bucket % ibf->size;
135try_next:;
136 x = ((uint64_t) bucket << 32) | i;
137 bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
138 }
139}
140
141
142static void
143ibf_insert_into (struct InvertibleBloomFilter *ibf,
144 struct IBF_Key key,
145 const int *buckets, int side)
146{
147 int i;
148
149 for (i = 0; i < ibf->hash_num; i++)
150 {
151 const int bucket = buckets[i];
152 ibf->count[bucket].count_val += side;
153 ibf->key_sum[bucket].key_val ^= key.key_val;
154 ibf->key_hash_sum[bucket].key_hash_val
155 ^= IBF_KEY_HASH_VAL (key);
156 }
157}
158
159
160/**
161 * Insert a key into an IBF.
162 *
163 * @param ibf the IBF
164 * @param key the element's hash code
165 */
166void
167ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
168{
169 int buckets[ibf->hash_num];
170
171 GNUNET_assert (ibf->hash_num <= ibf->size);
172 ibf_get_indices (ibf, key, buckets);
173 ibf_insert_into (ibf, key, buckets, 1);
174}
175
176
177/**
178 * Remove a key from an IBF.
179 *
180 * @param ibf the IBF
181 * @param key the element's hash code
182 */
183void
184ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
185{
186 int buckets[ibf->hash_num];
187
188 GNUNET_assert (ibf->hash_num <= ibf->size);
189 ibf_get_indices (ibf, key, buckets);
190 ibf_insert_into (ibf, key, buckets, -1);
191}
192
193
194/**
195 * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero.
196 */
197static int
198ibf_is_empty (struct InvertibleBloomFilter *ibf)
199{
200 int i;
201
202 for (i = 0; i < ibf->size; i++)
203 {
204 if (0 != ibf->count[i].count_val)
205 return GNUNET_NO;
206 if (0 != ibf->key_hash_sum[i].key_hash_val)
207 return GNUNET_NO;
208 if (0 != ibf->key_sum[i].key_val)
209 return GNUNET_NO;
210 }
211 return GNUNET_YES;
212}
213
214
215/**
216 * Decode and remove an element from the IBF, if possible.
217 *
218 * @param ibf the invertible bloom filter to decode
219 * @param ret_side sign of the cell's count where the decoded element came from.
220 * A negative sign indicates that the element was recovered
221 * resides in an IBF that was previously subtracted from.
222 * @param ret_id receives the hash code of the decoded element, if successful
223 * @return GNUNET_YES if decoding an element was successful,
224 * GNUNET_NO if the IBF is empty,
225 * GNUNET_SYSERR if the decoding has failed
226 */
227int
228ibf_decode (struct InvertibleBloomFilter *ibf,
229 int *ret_side, struct IBF_Key *ret_id)
230{
231 struct IBF_KeyHash hash;
232 int i;
233 int buckets[ibf->hash_num];
234
235 GNUNET_assert (NULL != ibf);
236
237 for (i = 0; i < ibf->size; i++)
238 {
239 int j;
240 int hit;
241
242 /* we can only decode from pure buckets */
243 if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
244 continue;
245
246 hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]);
247
248 /* test if the hash matches the key */
249 if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)
250 continue;
251
252 /* test if key in bucket hits its own location,
253 * if not, the key hash was subject to collision */
254 hit = GNUNET_NO;
255 ibf_get_indices (ibf, ibf->key_sum[i], buckets);
256 for (j = 0; j < ibf->hash_num; j++)
257 if (buckets[j] == i)
258 hit = GNUNET_YES;
259
260 if (GNUNET_NO == hit)
261 continue;
262
263 if (NULL != ret_side)
264 *ret_side = ibf->count[i].count_val;
265 if (NULL != ret_id)
266 *ret_id = ibf->key_sum[i];
267
268 /* insert on the opposite side, effectively removing the element */
269 ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
270
271 return GNUNET_YES;
272 }
273
274 if (GNUNET_YES == ibf_is_empty (ibf))
275 return GNUNET_NO;
276 return GNUNET_SYSERR;
277}
278
279
280/**
281 * Write buckets from an ibf to a buffer.
282 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
283 *
284 * @param ibf the ibf to write
285 * @param start with which bucket to start
286 * @param count how many buckets to write
287 * @param buf buffer to write the data to
288 */
289void
290ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start,
291 uint32_t count, void *buf)
292{
293 struct IBF_Key *key_dst;
294 struct IBF_KeyHash *key_hash_dst;
295 struct IBF_Count *count_dst;
296
297 GNUNET_assert (start + count <= ibf->size);
298
299 /* copy keys */
300 key_dst = (struct IBF_Key *) buf;
301 GNUNET_memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst);
302 key_dst += count;
303 /* copy key hashes */
304 key_hash_dst = (struct IBF_KeyHash *) key_dst;
305 GNUNET_memcpy (key_hash_dst, ibf->key_hash_sum + start, count
306 * sizeof *key_hash_dst);
307 key_hash_dst += count;
308 /* copy counts */
309 count_dst = (struct IBF_Count *) key_hash_dst;
310 GNUNET_memcpy (count_dst, ibf->count + start, count * sizeof *count_dst);
311}
312
313
314/**
315 * Read buckets from a buffer into an ibf.
316 *
317 * @param buf pointer to the buffer to read from
318 * @param start which bucket to start at
319 * @param count how many buckets to read
320 * @param ibf the ibf to read from
321 */
322void
323ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct
324 InvertibleBloomFilter *ibf)
325{
326 struct IBF_Key *key_src;
327 struct IBF_KeyHash *key_hash_src;
328 struct IBF_Count *count_src;
329
330 GNUNET_assert (count > 0);
331 GNUNET_assert (start + count <= ibf->size);
332
333 /* copy keys */
334 key_src = (struct IBF_Key *) buf;
335 GNUNET_memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src);
336 key_src += count;
337 /* copy key hashes */
338 key_hash_src = (struct IBF_KeyHash *) key_src;
339 GNUNET_memcpy (ibf->key_hash_sum + start, key_hash_src, count
340 * sizeof *key_hash_src);
341 key_hash_src += count;
342 /* copy counts */
343 count_src = (struct IBF_Count *) key_hash_src;
344 GNUNET_memcpy (ibf->count + start, count_src, count * sizeof *count_src);
345}
346
347
348/**
349 * Subtract ibf2 from ibf1, storing the result in ibf1.
350 * The two IBF's must have the same parameters size and hash_num.
351 *
352 * @param ibf1 IBF that is subtracted from
353 * @param ibf2 IBF that will be subtracted from ibf1
354 */
355void
356ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct
357 InvertibleBloomFilter *ibf2)
358{
359 int i;
360
361 GNUNET_assert (ibf1->size == ibf2->size);
362 GNUNET_assert (ibf1->hash_num == ibf2->hash_num);
363
364 for (i = 0; i < ibf1->size; i++)
365 {
366 ibf1->count[i].count_val -= ibf2->count[i].count_val;
367 ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val;
368 ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val;
369 }
370}
371
372
373/**
374 * Create a copy of an IBF, the copy has to be destroyed properly.
375 *
376 * @param ibf the IBF to copy
377 */
378struct InvertibleBloomFilter *
379ibf_dup (const struct InvertibleBloomFilter *ibf)
380{
381 struct InvertibleBloomFilter *copy;
382
383 copy = GNUNET_malloc (sizeof *copy);
384 copy->hash_num = ibf->hash_num;
385 copy->size = ibf->size;
386 copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size
387 * sizeof(struct IBF_KeyHash));
388 copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof(struct
389 IBF_Key));
390 copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof(struct
391 IBF_Count));
392 return copy;
393}
394
395
396/**
397 * Destroy all resources associated with the invertible bloom filter.
398 * No more ibf_*-functions may be called on ibf after calling destroy.
399 *
400 * @param ibf the intertible bloom filter to destroy
401 */
402void
403ibf_destroy (struct InvertibleBloomFilter *ibf)
404{
405 GNUNET_free (ibf->key_sum);
406 GNUNET_free (ibf->key_hash_sum);
407 GNUNET_free (ibf->count);
408 GNUNET_free (ibf);
409}
diff --git a/src/setu/ibf.h b/src/setu/ibf.h
new file mode 100644
index 000000000..7c2ab33b1
--- /dev/null
+++ b/src/setu/ibf.h
@@ -0,0 +1,255 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/ibf.h
23 * @brief invertible bloom filter
24 * @author Florian Dold
25 */
26
27#ifndef GNUNET_CONSENSUS_IBF_H
28#define GNUNET_CONSENSUS_IBF_H
29
30#include "platform.h"
31#include "gnunet_util_lib.h"
32
33#ifdef __cplusplus
34extern "C"
35{
36#if 0 /* keep Emacsens' auto-indent happy */
37}
38#endif
39#endif
40
41
42/**
43 * Keys that can be inserted into and removed from an IBF.
44 */
45struct IBF_Key
46{
47 uint64_t key_val;
48};
49
50
51/**
52 * Hash of an IBF key.
53 */
54struct IBF_KeyHash
55{
56 uint32_t key_hash_val;
57};
58
59
60/**
61 * Type of the count field of IBF buckets.
62 */
63struct IBF_Count
64{
65 int8_t count_val;
66};
67
68
69/**
70 * Size of one ibf bucket in bytes
71 */
72#define IBF_BUCKET_SIZE (sizeof(struct IBF_Count) + sizeof(struct IBF_Key) \
73 + sizeof(struct IBF_KeyHash))
74
75
76/**
77 * Invertible bloom filter (IBF).
78 *
79 * An IBF is a counting bloom filter that has the ability to restore
80 * the hashes of its stored elements with high probability.
81 */
82struct InvertibleBloomFilter
83{
84 /**
85 * How many cells does this IBF have?
86 */
87 uint32_t size;
88
89 /**
90 * In how many cells do we hash one element?
91 * Usually 4 or 3.
92 */
93 uint8_t hash_num;
94
95 /**
96 * Xor sums of the elements' keys, used to identify the elements.
97 * Array of 'size' elements.
98 */
99 struct IBF_Key *key_sum;
100
101 /**
102 * Xor sums of the hashes of the keys of inserted elements.
103 * Array of 'size' elements.
104 */
105 struct IBF_KeyHash *key_hash_sum;
106
107 /**
108 * How many times has a bucket been hit?
109 * Can be negative, as a result of IBF subtraction.
110 * Array of 'size' elements.
111 */
112 struct IBF_Count *count;
113};
114
115
116/**
117 * Write buckets from an ibf to a buffer.
118 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
119 *
120 * @param ibf the ibf to write
121 * @param start with which bucket to start
122 * @param count how many buckets to write
123 * @param buf buffer to write the data to
124 */
125void
126ibf_write_slice (const struct InvertibleBloomFilter *ibf,
127 uint32_t start,
128 uint32_t count,
129 void *buf);
130
131
132/**
133 * Read buckets from a buffer into an ibf.
134 *
135 * @param buf pointer to the buffer to read from
136 * @param start which bucket to start at
137 * @param count how many buckets to read
138 * @param ibf the ibf to write to
139 */
140void
141ibf_read_slice (const void *buf,
142 uint32_t start,
143 uint32_t count,
144 struct InvertibleBloomFilter *ibf);
145
146
147/**
148 * Create a key from a hashcode.
149 *
150 * @param hash the hashcode
151 * @return a key
152 */
153struct IBF_Key
154ibf_key_from_hashcode (const struct GNUNET_HashCode *hash);
155
156
157/**
158 * Create a hashcode from a key, by replicating the key
159 * until the hascode is filled
160 *
161 * @param key the key
162 * @param dst hashcode to store the result in
163 */
164void
165ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst);
166
167
168/**
169 * Create an invertible bloom filter.
170 *
171 * @param size number of IBF buckets
172 * @param hash_num number of buckets one element is hashed in, usually 3 or 4
173 * @return the newly created invertible bloom filter, NULL on error
174 */
175struct InvertibleBloomFilter *
176ibf_create (uint32_t size, uint8_t hash_num);
177
178
179/**
180 * Insert a key into an IBF.
181 *
182 * @param ibf the IBF
183 * @param key the element's hash code
184 */
185void
186ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
187
188
189/**
190 * Remove a key from an IBF.
191 *
192 * @param ibf the IBF
193 * @param key the element's hash code
194 */
195void
196ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
197
198
199/**
200 * Subtract ibf2 from ibf1, storing the result in ibf1.
201 * The two IBF's must have the same parameters size and hash_num.
202 *
203 * @param ibf1 IBF that is subtracted from
204 * @param ibf2 IBF that will be subtracted from ibf1
205 */
206void
207ibf_subtract (struct InvertibleBloomFilter *ibf1,
208 const struct InvertibleBloomFilter *ibf2);
209
210
211/**
212 * Decode and remove an element from the IBF, if possible.
213 *
214 * @param ibf the invertible bloom filter to decode
215 * @param ret_side sign of the cell's count where the decoded element came from.
216 * A negative sign indicates that the element was recovered
217 * resides in an IBF that was previously subtracted from.
218 * @param ret_id receives the hash code of the decoded element, if successful
219 * @return #GNUNET_YES if decoding an element was successful,
220 * #GNUNET_NO if the IBF is empty,
221 * #GNUNET_SYSERR if the decoding has failed
222 */
223int
224ibf_decode (struct InvertibleBloomFilter *ibf,
225 int *ret_side,
226 struct IBF_Key *ret_id);
227
228
229/**
230 * Create a copy of an IBF, the copy has to be destroyed properly.
231 *
232 * @param ibf the IBF to copy
233 */
234struct InvertibleBloomFilter *
235ibf_dup (const struct InvertibleBloomFilter *ibf);
236
237
238/**
239 * Destroy all resources associated with the invertible bloom filter.
240 * No more ibf_*-functions may be called on ibf after calling destroy.
241 *
242 * @param ibf the intertible bloom filter to destroy
243 */
244void
245ibf_destroy (struct InvertibleBloomFilter *ibf);
246
247
248#if 0 /* keep Emacsens' auto-indent happy */
249{
250#endif
251#ifdef __cplusplus
252}
253#endif
254
255#endif
diff --git a/src/setu/ibf_sim.c b/src/setu/ibf_sim.c
new file mode 100644
index 000000000..6415d00e1
--- /dev/null
+++ b/src/setu/ibf_sim.c
@@ -0,0 +1,142 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/ibf_sim.c
23 * @brief implementation of simulation for invertible bloom filter
24 * @author Florian Dold
25 *
26 * This code was used for some internal experiments, it is not
27 * build or shipped as part of the GNUnet system.
28 */
29#include <stdlib.h>
30#include <stdio.h>
31#include <string.h>
32
33#define MAX_IBF_DECODE 16
34
35/* report average over how many rounds? */
36#define ROUNDS 100000
37
38/* enable one of the three below */
39// simple fix
40#define FIX1 0
41// possibly slightly better fix for large IBF_DECODE values
42#define FIX2 1
43
44// SIGCOMM algorithm
45#define STRATA 0
46
47// print each value?
48#define VERBOSE 0
49// avoid assembly? (ASM is about 50% faster)
50#define SLOW 0
51
52int
53main (int argc, char **argv)
54{
55 unsigned int round;
56 unsigned int buckets[31]; // max is 2^31 as 'random' returns only between 0 and 2^31
57 unsigned int i;
58 int j;
59 unsigned int r;
60 unsigned int ret;
61 unsigned long long total;
62 unsigned int want;
63 double predict;
64
65 srandom (time (NULL));
66 total = 0;
67 want = atoi (argv[1]);
68 for (round = 0; round < ROUNDS; round++)
69 {
70 memset (buckets, 0, sizeof(buckets));
71 for (i = 0; i < want; i++)
72 {
73 /* FIXME: might want to use 'better' PRNG to avoid
74 PRNG-induced biases */
75 r = random ();
76 if (0 == r)
77 continue;
78#if SLOW
79 for (j = 0; (j < 31) && (0 == (r & (1 << j))); j++)
80 ;
81#else
82 /* use assembly / gcc */
83 j = __builtin_ffs (r) - 1;
84#endif
85 buckets[j]++;
86 }
87 ret = 0;
88 predict = 0.0;
89 for (j = 31; j >= 0; j--)
90 {
91#if FIX1
92 /* improved algorithm, for 1000 elements with IBF-DECODE 8, I
93 get 990/1000 elements on average over 1 million runs; key
94 idea being to stop short of the 'last' possible IBF as
95 otherwise a "lowball" per-chance would unduely influence the
96 result */if ((j > 0) &&
97 (buckets[j - 1] > MAX_IBF_DECODE))
98 {
99 ret *= (1 << (j + 1));
100 break;
101 }
102#endif
103#if FIX2
104 /* another improvement: don't just always cut off the last one,
105 but rather try to predict based on all previous values where
106 that "last" one is; additional prediction can only really
107 work if MAX_IBF_DECODE is sufficiently high */
108 if ((j > 0) &&
109 ((buckets[j - 1] > MAX_IBF_DECODE) ||
110 (predict > MAX_IBF_DECODE)))
111 {
112 ret *= (1 << (j + 1));
113 break;
114 }
115#endif
116#if STRATA
117 /* original algorithm, for 1000 elements with IBF-DECODE 8,
118 I get 920/1000 elements on average over 1 million runs */
119 if (buckets[j] > MAX_IBF_DECODE)
120 {
121 ret *= (1 << (j + 1));
122 break;
123 }
124#endif
125 ret += buckets[j];
126 predict = (buckets[j] + 2.0 * predict) / 2.0;
127 }
128#if VERBOSE
129 fprintf (stderr, "%u ", ret);
130#endif
131 total += ret;
132 }
133 fprintf (stderr, "\n");
134 fprintf (stdout, "average %llu\n", total / ROUNDS);
135 return 0;
136}
137
138
139/* TODO: should calculate stddev of the results to also be able to
140 say something about the stability of the results, outside of
141 large-scale averages -- gaining 8% precision at the expense of
142 50% additional variance might not be worth it... */
diff --git a/src/setu/plugin_block_setu_test.c b/src/setu/plugin_block_setu_test.c
new file mode 100644
index 000000000..1de086092
--- /dev/null
+++ b/src/setu/plugin_block_setu_test.c
@@ -0,0 +1,123 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/plugin_block_set_test.c
23 * @brief set test block, recognizes elements with non-zero first byte as invalid
24 * @author Christian Grothoff
25 */
26
27#include "platform.h"
28#include "gnunet_block_plugin.h"
29#include "gnunet_block_group_lib.h"
30
31
32/**
33 * Function called to validate a reply or a request. For
34 * request evaluation, simply pass "NULL" for the reply_block.
35 *
36 * @param cls closure
37 * @param ctx block context
38 * @param type block type
39 * @param group block group to use
40 * @param eo control flags
41 * @param query original query (hash)
42 * @param xquery extrended query data (can be NULL, depending on type)
43 * @param xquery_size number of bytes in xquery
44 * @param reply_block response to validate
45 * @param reply_block_size number of bytes in reply block
46 * @return characterization of result
47 */
48static enum GNUNET_BLOCK_EvaluationResult
49block_plugin_set_test_evaluate (void *cls,
50 struct GNUNET_BLOCK_Context *ctx,
51 enum GNUNET_BLOCK_Type type,
52 struct GNUNET_BLOCK_Group *group,
53 enum GNUNET_BLOCK_EvaluationOptions eo,
54 const struct GNUNET_HashCode *query,
55 const void *xquery,
56 size_t xquery_size,
57 const void *reply_block,
58 size_t reply_block_size)
59{
60 if ((NULL == reply_block) ||
61 (reply_block_size == 0) ||
62 (0 != ((char *) reply_block)[0]))
63 return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
64 return GNUNET_BLOCK_EVALUATION_OK_MORE;
65}
66
67
68/**
69 * Function called to obtain the key for a block.
70 *
71 * @param cls closure
72 * @param type block type
73 * @param block block to get the key for
74 * @param block_size number of bytes in block
75 * @param key set to the key (query) for the given block
76 * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported
77 * (or if extracting a key from a block of this type does not work)
78 */
79static int
80block_plugin_set_test_get_key (void *cls,
81 enum GNUNET_BLOCK_Type type,
82 const void *block,
83 size_t block_size,
84 struct GNUNET_HashCode *key)
85{
86 return GNUNET_SYSERR;
87}
88
89
90/**
91 * Entry point for the plugin.
92 */
93void *
94libgnunet_plugin_block_set_test_init (void *cls)
95{
96 static enum GNUNET_BLOCK_Type types[] = {
97 GNUNET_BLOCK_TYPE_SET_TEST,
98 GNUNET_BLOCK_TYPE_ANY /* end of list */
99 };
100 struct GNUNET_BLOCK_PluginFunctions *api;
101
102 api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions);
103 api->evaluate = &block_plugin_set_test_evaluate;
104 api->get_key = &block_plugin_set_test_get_key;
105 api->types = types;
106 return api;
107}
108
109
110/**
111 * Exit point from the plugin.
112 */
113void *
114libgnunet_plugin_block_set_test_done (void *cls)
115{
116 struct GNUNET_BLOCK_PluginFunctions *api = cls;
117
118 GNUNET_free (api);
119 return NULL;
120}
121
122
123/* end of plugin_block_set_test.c */
diff --git a/src/setu/setu.h b/src/setu/setu.h
new file mode 100644
index 000000000..f1c5df92b
--- /dev/null
+++ b/src/setu/setu.h
@@ -0,0 +1,304 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/set.h
22 * @brief messages used for the set union api
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#ifndef SET_H
27#define SET_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31#include "gnunet_set_service.h"
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35/**
36 * Message sent by the client to the service to ask starting
37 * a new set to perform operations with. Includes the desired
38 * set operation type.
39 */
40struct GNUNET_SETU_CreateMessage
41{
42 /**
43 * Type: #GNUNET_MESSAGE_TYPE_SETU_CREATE
44 */
45 struct GNUNET_MessageHeader header;
46
47};
48
49
50/**
51 * Message sent by the client to the service to start listening for
52 * incoming requests to perform a certain type of set operation for a
53 * certain type of application.
54 */
55struct GNUNET_SETU_ListenMessage
56{
57 /**
58 * Type: #GNUNET_MESSAGE_TYPE_SETU_LISTEN
59 */
60 struct GNUNET_MessageHeader header;
61
62 /**
63 * Always zero.
64 */
65 uint32_t reserved GNUNET_PACKED;
66
67 /**
68 * application id
69 */
70 struct GNUNET_HashCode app_id;
71};
72
73
74/**
75 * Message sent by a listening client to the service to accept
76 * performing the operation with the other peer.
77 */
78struct GNUNET_SETU_AcceptMessage
79{
80 /**
81 * Type: #GNUNET_MESSAGE_TYPE_SETU_ACCEPT
82 */
83 struct GNUNET_MessageHeader header;
84
85 /**
86 * ID of the incoming request we want to accept.
87 */
88 uint32_t accept_reject_id GNUNET_PACKED;
89
90 /**
91 * Request ID to identify responses.
92 */
93 uint32_t request_id GNUNET_PACKED;
94
95 /**
96 * Always use delta operation instead of sending full sets,
97 * even it it's less efficient.
98 */
99 uint8_t force_delta;
100
101 /**
102 * Always send full sets, even if delta operations would
103 * be more efficient.
104 */
105 uint8_t force_full;
106
107 /**
108 * #GNUNET_YES to fail operations where Byzantine faults
109 * are suspected
110 */
111 uint8_t byzantine;
112
113 /**
114 * Lower bound for the set size, used only when
115 * byzantine mode is enabled.
116 */
117 uint8_t byzantine_lower_bound;
118};
119
120
121/**
122 * Message sent by a listening client to the service to reject
123 * performing the operation with the other peer.
124 */
125struct GNUNET_SETU_RejectMessage
126{
127 /**
128 * Type: #GNUNET_MESSAGE_TYPE_SETU_REJECT
129 */
130 struct GNUNET_MessageHeader header;
131
132 /**
133 * ID of the incoming request we want to reject.
134 */
135 uint32_t accept_reject_id GNUNET_PACKED;
136};
137
138
139/**
140 * A request for an operation with another client.
141 */
142struct GNUNET_SETU_RequestMessage
143{
144 /**
145 * Type: #GNUNET_MESSAGE_TYPE_SETU_REQUEST.
146 */
147 struct GNUNET_MessageHeader header;
148
149 /**
150 * ID of the to identify the request when accepting or
151 * rejecting it.
152 */
153 uint32_t accept_id GNUNET_PACKED;
154
155 /**
156 * Identity of the requesting peer.
157 */
158 struct GNUNET_PeerIdentity peer_id;
159
160 /* rest: context message, that is, application-specific
161 message to convince listener to pick up */
162};
163
164
165/**
166 * Message sent by client to service to initiate a set operation as a
167 * client (not as listener). A set (which determines the operation
168 * type) must already exist in association with this client.
169 */
170struct GNUNET_SETU_EvaluateMessage
171{
172 /**
173 * Type: #GNUNET_MESSAGE_TYPE_SETU_EVALUATE
174 */
175 struct GNUNET_MessageHeader header;
176
177 /**
178 * Id of our set to evaluate, chosen implicitly by the client when it
179 * calls #GNUNET_SETU_commit().
180 */
181 uint32_t request_id GNUNET_PACKED;
182
183 /**
184 * Peer to evaluate the operation with
185 */
186 struct GNUNET_PeerIdentity target_peer;
187
188 /**
189 * Application id
190 */
191 struct GNUNET_HashCode app_id;
192
193 /**
194 * Always use delta operation instead of sending full sets,
195 * even it it's less efficient.
196 */
197 uint8_t force_delta;
198
199 /**
200 * Always send full sets, even if delta operations would
201 * be more efficient.
202 */
203 uint8_t force_full;
204
205 /**
206 * #GNUNET_YES to fail operations where Byzantine faults
207 * are suspected
208 */
209 uint8_t byzantine;
210
211 /**
212 * Lower bound for the set size, used only when
213 * byzantine mode is enabled.
214 */
215 uint8_t byzantine_lower_bound;
216
217 /* rest: context message, that is, application-specific
218 message to convince listener to pick up */
219};
220
221
222/**
223 * Message sent by the service to the client to indicate an
224 * element that is removed (set intersection) or added
225 * (set union) or part of the final result, depending on
226 * options specified for the operation.
227 */
228struct GNUNET_SETU_ResultMessage
229{
230 /**
231 * Type: #GNUNET_MESSAGE_TYPE_SETU_RESULT
232 */
233 struct GNUNET_MessageHeader header;
234
235 /**
236 * Current set size.
237 */
238 uint64_t current_size;
239
240 /**
241 * id the result belongs to
242 */
243 uint32_t request_id GNUNET_PACKED;
244
245 /**
246 * Was the evaluation successful? Contains
247 * an `enum GNUNET_SETU_Status` in NBO.
248 */
249 uint16_t result_status GNUNET_PACKED;
250
251 /**
252 * Type of the element attachted to the message, if any.
253 */
254 uint16_t element_type GNUNET_PACKED;
255
256 /* rest: the actual element */
257};
258
259
260/**
261 * Message sent by client to the service to add
262 * an element to the set.
263 */
264struct GNUNET_SETU_ElementMessage
265{
266 /**
267 * Type: #GNUNET_MESSAGE_TYPE_SETU_ADD
268 */
269 struct GNUNET_MessageHeader header;
270
271 /**
272 * Type of the element to add or remove.
273 */
274 uint16_t element_type GNUNET_PACKED;
275
276 /**
277 * For alignment, always zero.
278 */
279 uint16_t reserved GNUNET_PACKED;
280
281 /* rest: the actual element */
282};
283
284
285/**
286 * Sent to the service by the client in order to cancel a set operation.
287 */
288struct GNUNET_SETU_CancelMessage
289{
290 /**
291 * Type: #GNUNET_MESSAGE_TYPE_SETU_CANCEL
292 */
293 struct GNUNET_MessageHeader header;
294
295 /**
296 * ID of the request we want to cancel.
297 */
298 uint32_t request_id GNUNET_PACKED;
299};
300
301
302GNUNET_NETWORK_STRUCT_END
303
304#endif
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c
new file mode 100644
index 000000000..48260de55
--- /dev/null
+++ b/src/setu/setu_api.c
@@ -0,0 +1,872 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2016, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/setu_api.c
22 * @brief api for the set union service
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_setu_service.h"
30#include "setu.h"
31
32
33#define LOG(kind, ...) GNUNET_log_from (kind, "set-api", __VA_ARGS__)
34
35/**
36 * Opaque handle to a set.
37 */
38struct GNUNET_SETU_Handle
39{
40 /**
41 * Message queue for @e client.
42 */
43 struct GNUNET_MQ_Handle *mq;
44
45 /**
46 * Linked list of operations on the set.
47 */
48 struct GNUNET_SETU_OperationHandle *ops_head;
49
50 /**
51 * Linked list of operations on the set.
52 */
53 struct GNUNET_SETU_OperationHandle *ops_tail;
54
55 /**
56 * Should the set be destroyed once all operations are gone?
57 * #GNUNET_SYSERR if #GNUNET_SETU_destroy() must raise this flag,
58 * #GNUNET_YES if #GNUNET_SETU_destroy() did raise this flag.
59 */
60 int destroy_requested;
61
62 /**
63 * Has the set become invalid (e.g. service died)?
64 */
65 int invalid;
66
67};
68
69
70/**
71 * Handle for a set operation request from another peer.
72 */
73struct GNUNET_SETU_Request
74{
75 /**
76 * Id of the request, used to identify the request when
77 * accepting/rejecting it.
78 */
79 uint32_t accept_id;
80
81 /**
82 * Has the request been accepted already?
83 * #GNUNET_YES/#GNUNET_NO
84 */
85 int accepted;
86};
87
88
89/**
90 * Handle to an operation. Only known to the service after committing
91 * the handle with a set.
92 */
93struct GNUNET_SETU_OperationHandle
94{
95 /**
96 * Function to be called when we have a result,
97 * or an error.
98 */
99 GNUNET_SETU_ResultIterator result_cb;
100
101 /**
102 * Closure for @e result_cb.
103 */
104 void *result_cls;
105
106 /**
107 * Local set used for the operation,
108 * NULL if no set has been provided by conclude yet.
109 */
110 struct GNUNET_SETU_Handle *set;
111
112 /**
113 * Message sent to the server on calling conclude,
114 * NULL if conclude has been called.
115 */
116 struct GNUNET_MQ_Envelope *conclude_mqm;
117
118 /**
119 * Address of the request if in the conclude message,
120 * used to patch the request id into the message when the set is known.
121 */
122 uint32_t *request_id_addr;
123
124 /**
125 * Handles are kept in a linked list.
126 */
127 struct GNUNET_SETU_OperationHandle *prev;
128
129 /**
130 * Handles are kept in a linked list.
131 */
132 struct GNUNET_SETU_OperationHandle *next;
133
134 /**
135 * Request ID to identify the operation within the set.
136 */
137 uint32_t request_id;
138};
139
140
141/**
142 * Opaque handle to a listen operation.
143 */
144struct GNUNET_SETU_ListenHandle
145{
146 /**
147 * Message queue for the client.
148 */
149 struct GNUNET_MQ_Handle*mq;
150
151 /**
152 * Configuration handle for the listener, stored
153 * here to be able to reconnect transparently on
154 * connection failure.
155 */
156 const struct GNUNET_CONFIGURATION_Handle *cfg;
157
158 /**
159 * Function to call on a new incoming request,
160 * or on error.
161 */
162 GNUNET_SETU_ListenCallback listen_cb;
163
164 /**
165 * Closure for @e listen_cb.
166 */
167 void *listen_cls;
168
169 /**
170 * Application ID we listen for.
171 */
172 struct GNUNET_HashCode app_id;
173
174 /**
175 * Time to wait until we try to reconnect on failure.
176 */
177 struct GNUNET_TIME_Relative reconnect_backoff;
178
179 /**
180 * Task for reconnecting when the listener fails.
181 */
182 struct GNUNET_SCHEDULER_Task *reconnect_task;
183
184};
185
186
187/**
188 * Check that the given @a msg is well-formed.
189 *
190 * @param cls closure
191 * @param msg message to check
192 * @return #GNUNET_OK if message is well-formed
193 */
194static int
195check_result (void *cls,
196 const struct GNUNET_SETU_ResultMessage *msg)
197{
198 /* minimum size was already checked, everything else is OK! */
199 return GNUNET_OK;
200}
201
202
203/**
204 * Handle result message for a set operation.
205 *
206 * @param cls the set
207 * @param mh the message
208 */
209static void
210handle_result (void *cls,
211 const struct GNUNET_SETU_ResultMessage *msg)
212{
213 struct GNUNET_SETU_Handle *set = cls;
214 struct GNUNET_SETU_OperationHandle *oh;
215 struct GNUNET_SETU_Element e;
216 enum GNUNET_SETU_Status result_status;
217 int destroy_set;
218
219 GNUNET_assert (NULL != set->mq);
220 result_status = (enum GNUNET_SETU_Status) ntohs (msg->result_status);
221 LOG (GNUNET_ERROR_TYPE_DEBUG,
222 "Got result message with status %d\n",
223 result_status);
224 oh = GNUNET_MQ_assoc_get (set->mq,
225 ntohl (msg->request_id));
226 if (NULL == oh)
227 {
228 /* 'oh' can be NULL if we canceled the operation, but the service
229 did not get the cancel message yet. */
230 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
231 "Ignoring result from canceled operation\n");
232 return;
233 }
234
235 switch (result_status)
236 {
237 case GNUNET_SETU_STATUS_ADD_LOCAL:
238 LOG (GNUNET_ERROR_TYPE_DEBUG,
239 "Treating result as element\n");
240 e.data = &msg[1];
241 e.size = ntohs (msg->header.size)
242 - sizeof(struct GNUNET_SETU_ResultMessage);
243 e.element_type = ntohs (msg->element_type);
244 if (NULL != oh->result_cb)
245 oh->result_cb (oh->result_cls,
246 &e,
247 GNUNET_ntohll (msg->current_size),
248 result_status);
249 return;
250 case GNUNET_SETU_STATUS_FAILURE:
251 case GNUNET_SETU_STATUS_DONE:
252 LOG (GNUNET_ERROR_TYPE_DEBUG,
253 "Treating result as final status\n");
254 GNUNET_MQ_assoc_remove (set->mq,
255 ntohl (msg->request_id));
256 GNUNET_CONTAINER_DLL_remove (set->ops_head,
257 set->ops_tail,
258 oh);
259 /* Need to do this calculation _before_ the result callback,
260 as IF the application still has a valid set handle, it
261 may trigger destruction of the set during the callback. */
262 destroy_set = (GNUNET_YES == set->destroy_requested) &&
263 (NULL == set->ops_head);
264 if (NULL != oh->result_cb)
265 {
266 oh->result_cb (oh->result_cls,
267 NULL,
268 GNUNET_ntohll (msg->current_size),
269 result_status);
270 }
271 else
272 {
273 LOG (GNUNET_ERROR_TYPE_DEBUG,
274 "No callback for final status\n");
275 }
276 if (destroy_set)
277 GNUNET_SETU_destroy (set);
278 GNUNET_free (oh);
279 return;
280 }
281}
282
283
284/**
285 * Destroy the given set operation.
286 *
287 * @param oh set operation to destroy
288 */
289static void
290set_operation_destroy (struct GNUNET_SETU_OperationHandle *oh)
291{
292 struct GNUNET_SETU_Handle *set = oh->set;
293 struct GNUNET_SETU_OperationHandle *h_assoc;
294
295 if (NULL != oh->conclude_mqm)
296 GNUNET_MQ_discard (oh->conclude_mqm);
297 /* is the operation already commited? */
298 if (NULL != set)
299 {
300 GNUNET_CONTAINER_DLL_remove (set->ops_head,
301 set->ops_tail,
302 oh);
303 h_assoc = GNUNET_MQ_assoc_remove (set->mq,
304 oh->request_id);
305 GNUNET_assert ((NULL == h_assoc) ||
306 (h_assoc == oh));
307 }
308 GNUNET_free (oh);
309}
310
311
312/**
313 * Cancel the given set operation. We need to send an explicit cancel
314 * message, as all operations one one set communicate using one
315 * handle.
316 *
317 * @param oh set operation to cancel
318 */
319void
320GNUNET_SETU_operation_cancel (struct GNUNET_SETU_OperationHandle *oh)
321{
322 struct GNUNET_SETU_Handle *set = oh->set;
323 struct GNUNET_SETU_CancelMessage *m;
324 struct GNUNET_MQ_Envelope *mqm;
325
326 LOG (GNUNET_ERROR_TYPE_DEBUG,
327 "Cancelling SET operation\n");
328 if (NULL != set)
329 {
330 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETU_CANCEL);
331 m->request_id = htonl (oh->request_id);
332 GNUNET_MQ_send (set->mq, mqm);
333 }
334 set_operation_destroy (oh);
335 if ((NULL != set) &&
336 (GNUNET_YES == set->destroy_requested) &&
337 (NULL == set->ops_head))
338 {
339 LOG (GNUNET_ERROR_TYPE_DEBUG,
340 "Destroying set after operation cancel\n");
341 GNUNET_SETU_destroy (set);
342 }
343}
344
345
346/**
347 * We encountered an error communicating with the set service while
348 * performing a set operation. Report to the application.
349 *
350 * @param cls the `struct GNUNET_SETU_Handle`
351 * @param error error code
352 */
353static void
354handle_client_set_error (void *cls,
355 enum GNUNET_MQ_Error error)
356{
357 struct GNUNET_SETU_Handle *set = cls;
358
359 LOG (GNUNET_ERROR_TYPE_ERROR,
360 "Handling client set error %d\n",
361 error);
362 while (NULL != set->ops_head)
363 {
364 if ((NULL != set->ops_head->result_cb) &&
365 (GNUNET_NO == set->destroy_requested))
366 set->ops_head->result_cb (set->ops_head->result_cls,
367 NULL,
368 0,
369 GNUNET_SETU_STATUS_FAILURE);
370 set_operation_destroy (set->ops_head);
371 }
372 set->invalid = GNUNET_YES;
373}
374
375
376/**
377 * Create an empty set, supporting the specified operation.
378 *
379 * @param cfg configuration to use for connecting to the
380 * set service
381 * @return a handle to the set
382 */
383struct GNUNET_SETU_Handle *
384GNUNET_SETU_create (const struct GNUNET_CONFIGURATION_Handle *cfg)
385{
386 struct GNUNET_SETU_Handle *set = GNUNET_new (struct GNUNET_SETU_Handle);
387 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
388 GNUNET_MQ_hd_var_size (result,
389 GNUNET_MESSAGE_TYPE_SETU_RESULT,
390 struct GNUNET_SETU_ResultMessage,
391 set),
392 GNUNET_MQ_handler_end ()
393 };
394 struct GNUNET_MQ_Envelope *mqm;
395 struct GNUNET_SETU_CreateMessage *create_msg;
396
397 set->mq = GNUNET_CLIENT_connect (cfg,
398 "setu",
399 mq_handlers,
400 &handle_client_set_error,
401 set);
402 if (NULL == set->mq)
403 {
404 GNUNET_free (set);
405 return NULL;
406 }
407 mqm = GNUNET_MQ_msg (create_msg,
408 GNUNET_MESSAGE_TYPE_SETU_CREATE);
409 GNUNET_MQ_send (set->mq,
410 mqm);
411 return set;
412}
413
414
415/**
416 * Add an element to the given set. After the element has been added
417 * (in the sense of being transmitted to the set service), @a cont
418 * will be called. Multiple calls to GNUNET_SETU_add_element() can be
419 * queued.
420 *
421 * @param set set to add element to
422 * @param element element to add to the set
423 * @param cb continuation called after the element has been added
424 * @param cb_cls closure for @a cb
425 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
426 * set is invalid (e.g. the set service crashed)
427 */
428int
429GNUNET_SETU_add_element (struct GNUNET_SETU_Handle *set,
430 const struct GNUNET_SETU_Element *element,
431 GNUNET_SCHEDULER_TaskCallback cb,
432 void *cb_cls)
433{
434 struct GNUNET_MQ_Envelope *mqm;
435 struct GNUNET_SETU_ElementMessage *msg;
436
437 LOG (GNUNET_ERROR_TYPE_DEBUG,
438 "adding element of type %u to set %p\n",
439 (unsigned int) element->element_type,
440 set);
441 GNUNET_assert (NULL != set);
442 if (GNUNET_YES == set->invalid)
443 {
444 if (NULL != cb)
445 cb (cb_cls);
446 return GNUNET_SYSERR;
447 }
448 mqm = GNUNET_MQ_msg_extra (msg,
449 element->size,
450 GNUNET_MESSAGE_TYPE_SETU_ADD);
451 msg->element_type = htons (element->element_type);
452 GNUNET_memcpy (&msg[1],
453 element->data,
454 element->size);
455 GNUNET_MQ_notify_sent (mqm,
456 cb,
457 cb_cls);
458 GNUNET_MQ_send (set->mq,
459 mqm);
460 return GNUNET_OK;
461}
462
463
464/**
465 * Destroy the set handle if no operations are left, mark the set
466 * for destruction otherwise.
467 *
468 * @param set set handle to destroy
469 */
470void
471GNUNET_SETU_destroy (struct GNUNET_SETU_Handle *set)
472{
473 /* destroying set while iterator is active is currently
474 not supported; we should expand the API to allow
475 clients to explicitly cancel the iteration! */
476 GNUNET_assert (NULL != set);
477 if ((NULL != set->ops_head) ||
478 (GNUNET_SYSERR == set->destroy_requested))
479 {
480 LOG (GNUNET_ERROR_TYPE_DEBUG,
481 "Set operations are pending, delaying set destruction\n");
482 set->destroy_requested = GNUNET_YES;
483 return;
484 }
485 LOG (GNUNET_ERROR_TYPE_DEBUG,
486 "Really destroying set\n");
487 if (NULL != set->mq)
488 {
489 GNUNET_MQ_destroy (set->mq);
490 set->mq = NULL;
491 }
492 GNUNET_free (set);
493}
494
495
496/**
497 * Prepare a set operation to be evaluated with another peer.
498 * The evaluation will not start until the client provides
499 * a local set with #GNUNET_SETU_commit().
500 *
501 * @param other_peer peer with the other set
502 * @param app_id hash for the application using the set
503 * @param context_msg additional information for the request
504 * @param result_cb called on error or success
505 * @param result_cls closure for @e result_cb
506 * @return a handle to cancel the operation
507 */
508struct GNUNET_SETU_OperationHandle *
509GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
510 const struct GNUNET_HashCode *app_id,
511 const struct GNUNET_MessageHeader *context_msg,
512 const struct GNUNET_SETU_Option options[],
513 GNUNET_SETU_ResultIterator result_cb,
514 void *result_cls)
515{
516 struct GNUNET_MQ_Envelope *mqm;
517 struct GNUNET_SETU_OperationHandle *oh;
518 struct GNUNET_SETU_EvaluateMessage *msg;
519
520 LOG (GNUNET_ERROR_TYPE_DEBUG,
521 "Client prepares set union operation\n");
522 oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
523 oh->result_cb = result_cb;
524 oh->result_cls = result_cls;
525 mqm = GNUNET_MQ_msg_nested_mh (msg,
526 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
527 context_msg);
528 msg->app_id = *app_id;
529 msg->target_peer = *other_peer;
530 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
531 {
532 switch (opt->type)
533 {
534 case GNUNET_SETU_OPTION_BYZANTINE:
535 msg->byzantine = GNUNET_YES;
536 msg->byzantine_lower_bound = opt->v.num;
537 break;
538 case GNUNET_SETU_OPTION_FORCE_FULL:
539 msg->force_full = GNUNET_YES;
540 break;
541 case GNUNET_SETU_OPTION_FORCE_DELTA:
542 msg->force_delta = GNUNET_YES;
543 break;
544 default:
545 LOG (GNUNET_ERROR_TYPE_ERROR,
546 "Option with type %d not recognized\n",
547 (int) opt->type);
548 }
549 }
550 oh->conclude_mqm = mqm;
551 oh->request_id_addr = &msg->request_id;
552 return oh;
553}
554
555
556/**
557 * Connect to the set service in order to listen for requests.
558 *
559 * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
560 */
561static void
562listen_connect (void *cls);
563
564
565/**
566 * Check validity of request message for a listen operation
567 *
568 * @param cls the listen handle
569 * @param msg the message
570 * @return #GNUNET_OK if the message is well-formed
571 */
572static int
573check_request (void *cls,
574 const struct GNUNET_SETU_RequestMessage *msg)
575{
576 const struct GNUNET_MessageHeader *context_msg;
577
578 if (ntohs (msg->header.size) == sizeof(*msg))
579 return GNUNET_OK; /* no context message is OK */
580 context_msg = GNUNET_MQ_extract_nested_mh (msg);
581 if (NULL == context_msg)
582 {
583 /* malformed context message is NOT ok */
584 GNUNET_break_op (0);
585 return GNUNET_SYSERR;
586 }
587 return GNUNET_OK;
588}
589
590
591/**
592 * Handle request message for a listen operation
593 *
594 * @param cls the listen handle
595 * @param msg the message
596 */
597static void
598handle_request (void *cls,
599 const struct GNUNET_SETU_RequestMessage *msg)
600{
601 struct GNUNET_SETU_ListenHandle *lh = cls;
602 struct GNUNET_SETU_Request req;
603 const struct GNUNET_MessageHeader *context_msg;
604 struct GNUNET_MQ_Envelope *mqm;
605 struct GNUNET_SETU_RejectMessage *rmsg;
606
607 LOG (GNUNET_ERROR_TYPE_DEBUG,
608 "Processing incoming operation request with id %u\n",
609 ntohl (msg->accept_id));
610 /* we got another valid request => reset the backoff */
611 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
612 req.accept_id = ntohl (msg->accept_id);
613 req.accepted = GNUNET_NO;
614 context_msg = GNUNET_MQ_extract_nested_mh (msg);
615 /* calling #GNUNET_SETU_accept() in the listen cb will set req->accepted */
616 lh->listen_cb (lh->listen_cls,
617 &msg->peer_id,
618 context_msg,
619 &req);
620 if (GNUNET_YES == req.accepted)
621 return; /* the accept-case is handled in #GNUNET_SETU_accept() */
622 LOG (GNUNET_ERROR_TYPE_DEBUG,
623 "Rejected request %u\n",
624 ntohl (msg->accept_id));
625 mqm = GNUNET_MQ_msg (rmsg,
626 GNUNET_MESSAGE_TYPE_SETU_REJECT);
627 rmsg->accept_reject_id = msg->accept_id;
628 GNUNET_MQ_send (lh->mq,
629 mqm);
630}
631
632
633/**
634 * Our connection with the set service encountered an error,
635 * re-initialize with exponential back-off.
636 *
637 * @param cls the `struct GNUNET_SETU_ListenHandle *`
638 * @param error reason for the disconnect
639 */
640static void
641handle_client_listener_error (void *cls,
642 enum GNUNET_MQ_Error error)
643{
644 struct GNUNET_SETU_ListenHandle *lh = cls;
645
646 LOG (GNUNET_ERROR_TYPE_DEBUG,
647 "Listener broke down (%d), re-connecting\n",
648 (int) error);
649 GNUNET_MQ_destroy (lh->mq);
650 lh->mq = NULL;
651 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
652 &listen_connect,
653 lh);
654 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
655}
656
657
658/**
659 * Connect to the set service in order to listen for requests.
660 *
661 * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
662 */
663static void
664listen_connect (void *cls)
665{
666 struct GNUNET_SETU_ListenHandle *lh = cls;
667 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
668 GNUNET_MQ_hd_var_size (request,
669 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
670 struct GNUNET_SETU_RequestMessage,
671 lh),
672 GNUNET_MQ_handler_end ()
673 };
674 struct GNUNET_MQ_Envelope *mqm;
675 struct GNUNET_SETU_ListenMessage *msg;
676
677 lh->reconnect_task = NULL;
678 GNUNET_assert (NULL == lh->mq);
679 lh->mq = GNUNET_CLIENT_connect (lh->cfg,
680 "setu",
681 mq_handlers,
682 &handle_client_listener_error,
683 lh);
684 if (NULL == lh->mq)
685 return;
686 mqm = GNUNET_MQ_msg (msg,
687 GNUNET_MESSAGE_TYPE_SETU_LISTEN);
688 msg->app_id = lh->app_id;
689 GNUNET_MQ_send (lh->mq,
690 mqm);
691}
692
693
694/**
695 * Wait for set operation requests for the given application id
696 *
697 * @param cfg configuration to use for connecting to
698 * the set service, needs to be valid for the lifetime of the listen handle
699 * @param app_id id of the application that handles set operation requests
700 * @param listen_cb called for each incoming request matching the operation
701 * and application id
702 * @param listen_cls handle for @a listen_cb
703 * @return a handle that can be used to cancel the listen operation
704 */
705struct GNUNET_SETU_ListenHandle *
706GNUNET_SETU_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
707 const struct GNUNET_HashCode *app_id,
708 GNUNET_SETU_ListenCallback listen_cb,
709 void *listen_cls)
710{
711 struct GNUNET_SETU_ListenHandle *lh;
712
713 LOG (GNUNET_ERROR_TYPE_DEBUG,
714 "Starting listener for app %s\n",
715 GNUNET_h2s (app_id));
716 lh = GNUNET_new (struct GNUNET_SETU_ListenHandle);
717 lh->listen_cb = listen_cb;
718 lh->listen_cls = listen_cls;
719 lh->cfg = cfg;
720 lh->app_id = *app_id;
721 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
722 listen_connect (lh);
723 if (NULL == lh->mq)
724 {
725 GNUNET_free (lh);
726 return NULL;
727 }
728 return lh;
729}
730
731
732/**
733 * Cancel the given listen operation.
734 *
735 * @param lh handle for the listen operation
736 */
737void
738GNUNET_SETU_listen_cancel (struct GNUNET_SETU_ListenHandle *lh)
739{
740 LOG (GNUNET_ERROR_TYPE_DEBUG,
741 "Canceling listener %s\n",
742 GNUNET_h2s (&lh->app_id));
743 if (NULL != lh->mq)
744 {
745 GNUNET_MQ_destroy (lh->mq);
746 lh->mq = NULL;
747 }
748 if (NULL != lh->reconnect_task)
749 {
750 GNUNET_SCHEDULER_cancel (lh->reconnect_task);
751 lh->reconnect_task = NULL;
752 }
753 GNUNET_free (lh);
754}
755
756
757/**
758 * Accept a request we got via #GNUNET_SETU_listen. Must be called during
759 * #GNUNET_SETU_listen, as the 'struct GNUNET_SETU_Request' becomes invalid
760 * afterwards.
761 * Call #GNUNET_SETU_commit to provide the local set to use for the operation,
762 * and to begin the exchange with the remote peer.
763 *
764 * @param request request to accept
765 * @param result_mode specified how results will be returned,
766 * see `enum GNUNET_SETU_ResultMode`.
767 * @param result_cb callback for the results
768 * @param result_cls closure for @a result_cb
769 * @return a handle to cancel the operation
770 */
771struct GNUNET_SETU_OperationHandle *
772GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
773 const struct GNUNET_SETU_Option options[],
774 GNUNET_SETU_ResultIterator result_cb,
775 void *result_cls)
776{
777 struct GNUNET_MQ_Envelope *mqm;
778 struct GNUNET_SETU_OperationHandle *oh;
779 struct GNUNET_SETU_AcceptMessage *msg;
780
781 GNUNET_assert (GNUNET_NO == request->accepted);
782 LOG (GNUNET_ERROR_TYPE_DEBUG,
783 "Client accepts set union operation with id %u\n",
784 request->accept_id);
785 request->accepted = GNUNET_YES;
786 mqm = GNUNET_MQ_msg (msg,
787 GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
788 msg->accept_reject_id = htonl (request->accept_id);
789 oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
790 oh->result_cb = result_cb;
791 oh->result_cls = result_cls;
792 oh->conclude_mqm = mqm;
793 oh->request_id_addr = &msg->request_id;
794 return oh;
795}
796
797
798/**
799 * Commit a set to be used with a set operation.
800 * This function is called once we have fully constructed
801 * the set that we want to use for the operation. At this
802 * time, the P2P protocol can then begin to exchange the
803 * set information and call the result callback with the
804 * result information.
805 *
806 * @param oh handle to the set operation
807 * @param set the set to use for the operation
808 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
809 * set is invalid (e.g. the set service crashed)
810 */
811int
812GNUNET_SETU_commit (struct GNUNET_SETU_OperationHandle *oh,
813 struct GNUNET_SETU_Handle *set)
814{
815 if (NULL != oh->set)
816 {
817 /* Some other set was already committed for this
818 * operation, there is a logic bug in the client of this API */
819 GNUNET_break (0);
820 return GNUNET_OK;
821 }
822 GNUNET_assert (NULL != set);
823 if (GNUNET_YES == set->invalid)
824 return GNUNET_SYSERR;
825 LOG (GNUNET_ERROR_TYPE_DEBUG,
826 "Client commits to SET\n");
827 GNUNET_assert (NULL != oh->conclude_mqm);
828 oh->set = set;
829 GNUNET_CONTAINER_DLL_insert (set->ops_head,
830 set->ops_tail,
831 oh);
832 oh->request_id = GNUNET_MQ_assoc_add (set->mq,
833 oh);
834 *oh->request_id_addr = htonl (oh->request_id);
835 GNUNET_MQ_send (set->mq,
836 oh->conclude_mqm);
837 oh->conclude_mqm = NULL;
838 oh->request_id_addr = NULL;
839 return GNUNET_OK;
840}
841
842
843/**
844 * Hash a set element.
845 *
846 * @param element the element that should be hashed
847 * @param[out] ret_hash a pointer to where the hash of @a element
848 * should be stored
849 */
850void
851GNUNET_SETU_element_hash (const struct GNUNET_SETU_Element *element,
852 struct GNUNET_HashCode *ret_hash)
853{
854 struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
855
856 /* It's not guaranteed that the element data is always after the element header,
857 so we need to hash the chunks separately. */
858 GNUNET_CRYPTO_hash_context_read (ctx,
859 &element->size,
860 sizeof(uint16_t));
861 GNUNET_CRYPTO_hash_context_read (ctx,
862 &element->element_type,
863 sizeof(uint16_t));
864 GNUNET_CRYPTO_hash_context_read (ctx,
865 element->data,
866 element->size);
867 GNUNET_CRYPTO_hash_context_finish (ctx,
868 ret_hash);
869}
870
871
872/* end of setu_api.c */
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c
new file mode 100644
index 000000000..95119873c
--- /dev/null
+++ b/src/setu/test_setu_api.c
@@ -0,0 +1,360 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/test_setu_api.c
23 * @brief testcase for setu_api.c
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_testing_lib.h"
29#include "gnunet_setu_service.h"
30
31
32static struct GNUNET_PeerIdentity local_id;
33
34static struct GNUNET_HashCode app_id;
35
36static struct GNUNET_SETU_Handle *set1;
37
38static struct GNUNET_SETU_Handle *set2;
39
40static struct GNUNET_SETU_ListenHandle *listen_handle;
41
42static struct GNUNET_SETU_OperationHandle *oh1;
43
44static struct GNUNET_SETU_OperationHandle *oh2;
45
46static const struct GNUNET_CONFIGURATION_Handle *config;
47
48static int ret;
49
50static struct GNUNET_SCHEDULER_Task *tt;
51
52
53static void
54result_cb_set1 (void *cls,
55 const struct GNUNET_SETU_Element *element,
56 uint64_t size,
57 enum GNUNET_SETU_Status status)
58{
59 switch (status)
60 {
61 case GNUNET_SETU_STATUS_ADD_LOCAL:
62 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
63 break;
64
65 case GNUNET_SETU_STATUS_FAILURE:
66 GNUNET_break (0);
67 oh1 = NULL;
68 fprintf (stderr, "set 1: received failure status!\n");
69 ret = 1;
70 if (NULL != tt)
71 {
72 GNUNET_SCHEDULER_cancel (tt);
73 tt = NULL;
74 }
75 GNUNET_SCHEDULER_shutdown ();
76 break;
77
78 case GNUNET_SETU_STATUS_DONE:
79 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
80 oh1 = NULL;
81 if (NULL != set1)
82 {
83 GNUNET_SETU_destroy (set1);
84 set1 = NULL;
85 }
86 if (NULL == set2)
87 {
88 GNUNET_SCHEDULER_cancel (tt);
89 tt = NULL;
90 GNUNET_SCHEDULER_shutdown ();
91 }
92 break;
93
94 default:
95 GNUNET_assert (0);
96 }
97}
98
99
100static void
101result_cb_set2 (void *cls,
102 const struct GNUNET_SETU_Element *element,
103 uint64_t size,
104 enum GNUNET_SETU_Status status)
105{
106 switch (status)
107 {
108 case GNUNET_SETU_STATUS_ADD_LOCAL:
109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
110 break;
111
112 case GNUNET_SETU_STATUS_FAILURE:
113 GNUNET_break (0);
114 oh2 = NULL;
115 fprintf (stderr, "set 2: received failure status\n");
116 GNUNET_SCHEDULER_shutdown ();
117 ret = 1;
118 break;
119
120 case GNUNET_SETU_STATUS_DONE:
121 oh2 = NULL;
122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
123 GNUNET_SETU_destroy (set2);
124 set2 = NULL;
125 if (NULL == set1)
126 {
127 GNUNET_SCHEDULER_cancel (tt);
128 tt = NULL;
129 GNUNET_SCHEDULER_shutdown ();
130 }
131 break;
132
133 default:
134 GNUNET_assert (0);
135 }
136}
137
138
139static void
140listen_cb (void *cls,
141 const struct GNUNET_PeerIdentity *other_peer,
142 const struct GNUNET_MessageHeader *context_msg,
143 struct GNUNET_SETU_Request *request)
144{
145 GNUNET_assert (NULL != context_msg);
146 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
147 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
148 oh2 = GNUNET_SETU_accept (request,
149 (struct GNUNET_SETU_Option[]){ 0 },
150 &result_cb_set2,
151 NULL);
152 GNUNET_SETU_commit (oh2, set2);
153}
154
155
156/**
157 * Start the set operation.
158 *
159 * @param cls closure, unused
160 */
161static void
162start (void *cls)
163{
164 struct GNUNET_MessageHeader context_msg;
165
166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
167 context_msg.size = htons (sizeof context_msg);
168 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
169 listen_handle = GNUNET_SETU_listen (config,
170 &app_id,
171 &listen_cb,
172 NULL);
173 oh1 = GNUNET_SETU_prepare (&local_id,
174 &app_id,
175 &context_msg,
176 (struct GNUNET_SETU_Option[]){ 0 },
177 &result_cb_set1,
178 NULL);
179 GNUNET_SETU_commit (oh1, set1);
180}
181
182
183/**
184 * Initialize the second set, continue
185 *
186 * @param cls closure, unused
187 */
188static void
189init_set2 (void *cls)
190{
191 struct GNUNET_SETU_Element element;
192
193 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
194
195 element.element_type = 0;
196 element.data = "hello";
197 element.size = strlen (element.data);
198 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
199 element.data = "quux";
200 element.size = strlen (element.data);
201 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
202 element.data = "baz";
203 element.size = strlen (element.data);
204 GNUNET_SETU_add_element (set2, &element, &start, NULL);
205}
206
207
208/**
209 * Initialize the first set, continue.
210 */
211static void
212init_set1 (void)
213{
214 struct GNUNET_SETU_Element element;
215
216 element.element_type = 0;
217 element.data = "hello";
218 element.size = strlen (element.data);
219 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
220 element.data = "bar";
221 element.size = strlen (element.data);
222 GNUNET_SETU_add_element (set1, &element, &init_set2, NULL);
223 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
224}
225
226
227/**
228 * Function run on timeout.
229 *
230 * @param cls closure
231 */
232static void
233timeout_fail (void *cls)
234{
235 tt = NULL;
236 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
237 GNUNET_SCHEDULER_shutdown ();
238 ret = 1;
239}
240
241
242/**
243 * Function run on shutdown.
244 *
245 * @param cls closure
246 */
247static void
248do_shutdown (void *cls)
249{
250 if (NULL != tt)
251 {
252 GNUNET_SCHEDULER_cancel (tt);
253 tt = NULL;
254 }
255 if (NULL != oh1)
256 {
257 GNUNET_SETU_operation_cancel (oh1);
258 oh1 = NULL;
259 }
260 if (NULL != oh2)
261 {
262 GNUNET_SETU_operation_cancel (oh2);
263 oh2 = NULL;
264 }
265 if (NULL != set1)
266 {
267 GNUNET_SETU_destroy (set1);
268 set1 = NULL;
269 }
270 if (NULL != set2)
271 {
272 GNUNET_SETU_destroy (set2);
273 set2 = NULL;
274 }
275 if (NULL != listen_handle)
276 {
277 GNUNET_SETU_listen_cancel (listen_handle);
278 listen_handle = NULL;
279 }
280}
281
282
283/**
284 * Signature of the 'main' function for a (single-peer) testcase that
285 * is run using 'GNUNET_TESTING_peer_run'.
286 *
287 * @param cls closure
288 * @param cfg configuration of the peer that was started
289 * @param peer identity of the peer that was created
290 */
291static void
292run (void *cls,
293 const struct GNUNET_CONFIGURATION_Handle *cfg,
294 struct GNUNET_TESTING_Peer *peer)
295{
296 struct GNUNET_SETU_OperationHandle *my_oh;
297
298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
299 "Running preparatory tests\n");
300 tt = GNUNET_SCHEDULER_add_delayed (
301 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
302 &timeout_fail,
303 NULL);
304 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
305
306 config = cfg;
307 GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
308 &local_id));
309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
310 "my id (from CRYPTO): %s\n",
311 GNUNET_i2s (&local_id));
312 GNUNET_TESTING_peer_get_identity (peer,
313 &local_id);
314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
315 "my id (from TESTING): %s\n",
316 GNUNET_i2s (&local_id));
317 set1 = GNUNET_SETU_create (cfg);
318 set2 = GNUNET_SETU_create (cfg);
319 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
320 "Created sets %p and %p for union operation\n",
321 set1,
322 set2);
323 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
324
325 /* test if canceling an uncommited request works! */
326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327 "Launching and instantly stopping set operation\n");
328 my_oh = GNUNET_SETU_prepare (&local_id,
329 &app_id,
330 NULL,
331 (struct GNUNET_SETU_Option[]){ 0 },
332 NULL,
333 NULL);
334 GNUNET_SETU_operation_cancel (my_oh);
335
336 /* test the real set reconciliation */
337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338 "Running real set-reconciliation\n");
339 init_set1 ();
340}
341
342
343int
344main (int argc, char **argv)
345{
346 GNUNET_log_setup ("test_setu_api",
347 "WARNING",
348 NULL);
349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
350 "Launching peer\n");
351 if (0 !=
352 GNUNET_TESTING_peer_run ("test_setu_api",
353 "test_setu.conf",
354 &run,
355 NULL))
356 {
357 return 1;
358 }
359 return ret;
360}