aboutsummaryrefslogtreecommitdiff
path: root/src/setu
diff options
context:
space:
mode:
Diffstat (limited to 'src/setu')
-rw-r--r--src/setu/.gitignore6
-rw-r--r--src/setu/Makefile.am102
-rw-r--r--src/setu/gnunet-service-setu.c3683
-rw-r--r--src/setu/gnunet-service-setu_protocol.h226
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.c303
-rw-r--r--src/setu/gnunet-service-setu_strata_estimator.h169
-rw-r--r--src/setu/gnunet-setu-ibf-profiler.c308
-rw-r--r--src/setu/gnunet-setu-profiler.c499
-rw-r--r--src/setu/ibf.c409
-rw-r--r--src/setu/ibf.h255
-rw-r--r--src/setu/ibf_sim.c142
-rw-r--r--src/setu/plugin_block_setu_test.c123
-rw-r--r--src/setu/setu.conf.in12
-rw-r--r--src/setu/setu.h315
-rw-r--r--src/setu/setu_api.c897
-rw-r--r--src/setu/test_setu_api.c360
16 files changed, 7809 insertions, 0 deletions
diff --git a/src/setu/.gitignore b/src/setu/.gitignore
new file mode 100644
index 000000000..35295449b
--- /dev/null
+++ b/src/setu/.gitignore
@@ -0,0 +1,6 @@
1gnunet-setu-profiler
2gnunet-service-setu
3gnunet-setu-ibf-profiler
4test_setu_api
5test_setu_copy
6test_setu_result_symmetric
diff --git a/src/setu/Makefile.am b/src/setu/Makefile.am
new file mode 100644
index 000000000..b37ceba51
--- /dev/null
+++ b/src/setu/Makefile.am
@@ -0,0 +1,102 @@
1# This Makefile.am is in the public domain
2AM_CPPFLAGS = -I$(top_srcdir)/src/include
3
4pkgcfgdir= $(pkgdatadir)/config.d/
5
6libexecdir= $(pkglibdir)/libexec/
7
8plugindir = $(libdir)/gnunet
9
10pkgcfg_DATA = \
11 setu.conf
12
13if USE_COVERAGE
14 AM_CFLAGS = -fprofile-arcs -ftest-coverage
15endif
16
17if HAVE_TESTING
18bin_PROGRAMS = \
19 gnunet-setu-profiler
20
21noinst_PROGRAMS = \
22 gnunet-setu-ibf-profiler
23endif
24
25libexec_PROGRAMS = \
26 gnunet-service-setu
27
28lib_LTLIBRARIES = \
29 libgnunetsetu.la
30
31gnunet_setu_profiler_SOURCES = \
32 gnunet-setu-profiler.c
33gnunet_setu_profiler_LDADD = \
34 $(top_builddir)/src/util/libgnunetutil.la \
35 $(top_builddir)/src/statistics/libgnunetstatistics.la \
36 libgnunetsetu.la \
37 $(top_builddir)/src/testing/libgnunettesting.la \
38 $(GN_LIBINTL)
39
40
41gnunet_setu_ibf_profiler_SOURCES = \
42 gnunet-setu-ibf-profiler.c \
43 ibf.c
44gnunet_setu_ibf_profiler_LDADD = \
45 $(top_builddir)/src/util/libgnunetutil.la \
46 $(GN_LIBINTL)
47
48gnunet_service_setu_SOURCES = \
49 gnunet-service-setu.c gnunet-service-setu.h \
50 ibf.c ibf.h \
51 gnunet-service-setu_strata_estimator.c gnunet-service-setu_strata_estimator.h \
52 gnunet-service-setu_protocol.h
53gnunet_service_setu_LDADD = \
54 $(top_builddir)/src/util/libgnunetutil.la \
55 $(top_builddir)/src/statistics/libgnunetstatistics.la \
56 $(top_builddir)/src/core/libgnunetcore.la \
57 $(top_builddir)/src/cadet/libgnunetcadet.la \
58 $(top_builddir)/src/block/libgnunetblock.la \
59 libgnunetsetu.la \
60 $(GN_LIBINTL)
61
62libgnunetsetu_la_SOURCES = \
63 setu_api.c setu.h
64libgnunetsetu_la_LIBADD = \
65 $(top_builddir)/src/util/libgnunetutil.la \
66 $(LTLIBINTL)
67libgnunetsetu_la_LDFLAGS = \
68 $(GN_LIB_LDFLAGS)
69
70if HAVE_TESTING
71check_PROGRAMS = \
72 test_setu_api
73endif
74
75if ENABLE_TEST_RUN
76AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME;
77TESTS = $(check_PROGRAMS)
78endif
79
80test_setu_api_SOURCES = \
81 test_setu_api.c
82test_setu_api_LDADD = \
83 $(top_builddir)/src/util/libgnunetutil.la \
84 $(top_builddir)/src/testing/libgnunettesting.la \
85 libgnunetsetu.la
86
87plugin_LTLIBRARIES = \
88 libgnunet_plugin_block_setu_test.la
89
90libgnunet_plugin_block_setu_test_la_SOURCES = \
91 plugin_block_setu_test.c
92libgnunet_plugin_block_setu_test_la_LIBADD = \
93 $(top_builddir)/src/block/libgnunetblock.la \
94 $(top_builddir)/src/block/libgnunetblockgroup.la \
95 $(top_builddir)/src/util/libgnunetutil.la \
96 $(LTLIBINTL)
97libgnunet_plugin_block_setu_test_la_LDFLAGS = \
98 $(GN_PLUGIN_LDFLAGS)
99
100
101EXTRA_DIST = \
102 test_setu.conf
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
new file mode 100644
index 000000000..326589186
--- /dev/null
+++ b/src/setu/gnunet-service-setu.c
@@ -0,0 +1,3683 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file setu/gnunet-service-setu.c
22 * @brief set union operation
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h"
29#include "ibf.h"
30#include "gnunet_protocols.h"
31#include "gnunet_applications.h"
32#include "gnunet_cadet_service.h"
33#include "gnunet-service-setu_strata_estimator.h"
34#include "gnunet-service-setu_protocol.h"
35#include "gnunet_statistics_service.h"
36#include <gcrypt.h>
37#include "gnunet_setu_service.h"
38#include "setu.h"
39
40#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
41
42/**
43 * How long do we hold on to an incoming channel if there is
44 * no local listener before giving up?
45 */
46#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
47
48/**
49 * Number of IBFs in a strata estimator.
50 */
51#define SE_STRATA_COUNT 32
52
53/**
54 * Size of the IBFs in the strata estimator.
55 */
56#define SE_IBF_SIZE 80
57
58/**
59 * The hash num parameter for the difference digests and strata estimators.
60 */
61#define SE_IBF_HASH_NUM 4
62
63/**
64 * Number of buckets that can be transmitted in one message.
65 */
66#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
67
68/**
69 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
70 * Choose this value so that computing the IBF is still cheaper
71 * than transmitting all values.
72 */
73#define MAX_IBF_ORDER (20)
74
75/**
76 * Number of buckets used in the ibf per estimated
77 * difference.
78 */
79#define IBF_ALPHA 4
80
81
82/**
83 * Current phase we are in for a union operation.
84 */
85enum UnionOperationPhase
86{
87 /**
88 * We sent the request message, and expect a strata estimator.
89 */
90 PHASE_EXPECT_SE,
91
92 /**
93 * We sent the strata estimator, and expect an IBF. This phase is entered once
94 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
95 *
96 * XXX: could use better wording.
97 * XXX: repurposed to also expect a "request full set" message, should be renamed
98 *
99 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
100 */
101 PHASE_EXPECT_IBF,
102
103 /**
104 * Continuation for multi part IBFs.
105 */
106 PHASE_EXPECT_IBF_CONT,
107
108 /**
109 * We are decoding an IBF.
110 */
111 PHASE_INVENTORY_ACTIVE,
112
113 /**
114 * The other peer is decoding the IBF we just sent.
115 */
116 PHASE_INVENTORY_PASSIVE,
117
118 /**
119 * The protocol is almost finished, but we still have to flush our message
120 * queue and/or expect some elements.
121 */
122 PHASE_FINISH_CLOSING,
123
124 /**
125 * In the penultimate phase, we wait until all our demands are satisfied.
126 * Then we send a done message, and wait for another done message.
127 */
128 PHASE_FINISH_WAITING,
129
130 /**
131 * In the ultimate phase, we wait until our demands are satisfied and then
132 * quit (sending another DONE message).
133 */
134 PHASE_DONE,
135
136 /**
137 * After sending the full set, wait for responses with the elements
138 * that the local peer is missing.
139 */
140 PHASE_FULL_SENDING,
141};
142
143
144/**
145 * Information about an element element in the set. All elements are
146 * stored in a hash-table from their hash-code to their `struct
147 * Element`, so that the remove and add operations are reasonably
148 * fast.
149 */
150struct ElementEntry
151{
152 /**
153 * The actual element. The data for the element
154 * should be allocated at the end of this struct.
155 */
156 struct GNUNET_SETU_Element element;
157
158 /**
159 * Hash of the element. For set union: Will be used to derive the
160 * different IBF keys for different salts.
161 */
162 struct GNUNET_HashCode element_hash;
163
164 /**
165 * First generation that includes this element.
166 */
167 unsigned int generation;
168
169 /**
170 * #GNUNET_YES if the element is a remote element, and does not belong
171 * to the operation's set.
172 */
173 int remote;
174};
175
176
177/**
178 * A listener is inhabited by a client, and waits for evaluation
179 * requests from remote peers.
180 */
181struct Listener;
182
183
184/**
185 * A set that supports a specific operation with other peers.
186 */
187struct Set;
188
189
190/**
191 * State we keep per client.
192 */
193struct ClientState
194{
195 /**
196 * Set, if associated with the client, otherwise NULL.
197 */
198 struct Set *set;
199
200 /**
201 * Listener, if associated with the client, otherwise NULL.
202 */
203 struct Listener *listener;
204
205 /**
206 * Client handle.
207 */
208 struct GNUNET_SERVICE_Client *client;
209
210 /**
211 * Message queue.
212 */
213 struct GNUNET_MQ_Handle *mq;
214};
215
216
217/**
218 * Operation context used to execute a set operation.
219 */
220struct Operation
221{
222
223 /**
224 * The identity of the requesting peer. Needs to
225 * be stored here as the op spec might not have been created yet.
226 */
227 struct GNUNET_PeerIdentity peer;
228
229 /**
230 * Initial size of our set, just before the operation started.
231 */
232 uint64_t initial_size;
233
234 /**
235 * Kept in a DLL of the listener, if @e listener is non-NULL.
236 */
237 struct Operation *next;
238
239 /**
240 * Kept in a DLL of the listener, if @e listener is non-NULL.
241 */
242 struct Operation *prev;
243
244 /**
245 * Channel to the peer.
246 */
247 struct GNUNET_CADET_Channel *channel;
248
249 /**
250 * Port this operation runs on.
251 */
252 struct Listener *listener;
253
254 /**
255 * Message queue for the channel.
256 */
257 struct GNUNET_MQ_Handle *mq;
258
259 /**
260 * Context message, may be NULL.
261 */
262 struct GNUNET_MessageHeader *context_msg;
263
264 /**
265 * Set associated with the operation, NULL until the spec has been
266 * associated with a set.
267 */
268 struct Set *set;
269
270 /**
271 * Copy of the set's strata estimator at the time of
272 * creation of this operation.
273 */
274 struct StrataEstimator *se;
275
276 /**
277 * The IBF we currently receive.
278 */
279 struct InvertibleBloomFilter *remote_ibf;
280
281 /**
282 * The IBF with the local set's element.
283 */
284 struct InvertibleBloomFilter *local_ibf;
285
286 /**
287 * Maps unsalted IBF-Keys to elements.
288 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
289 * Colliding IBF-Keys are linked.
290 */
291 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
292
293 /**
294 * Timeout task, if the incoming peer has not been accepted
295 * after the timeout, it will be disconnected.
296 */
297 struct GNUNET_SCHEDULER_Task *timeout_task;
298
299 /**
300 * Hashes for elements that we have demanded from the other peer.
301 */
302 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
303
304 /**
305 * Current state of the operation.
306 */
307 enum UnionOperationPhase phase;
308
309 /**
310 * Did we send the client that we are done?
311 */
312 int client_done_sent;
313
314 /**
315 * Number of ibf buckets already received into the @a remote_ibf.
316 */
317 unsigned int ibf_buckets_received;
318
319 /**
320 * Salt that we're using for sending IBFs
321 */
322 uint32_t salt_send;
323
324 /**
325 * Salt for the IBF we've received and that we're currently decoding.
326 */
327 uint32_t salt_receive;
328
329 /**
330 * Number of elements we received from the other peer
331 * that were not in the local set yet.
332 */
333 uint32_t received_fresh;
334
335 /**
336 * Total number of elements received from the other peer.
337 */
338 uint32_t received_total;
339
340 /**
341 * Salt to use for the operation.
342 */
343 uint32_t salt;
344
345 /**
346 * Remote peers element count
347 */
348 uint32_t remote_element_count;
349
350 /**
351 * ID used to identify an operation between service and client
352 */
353 uint32_t client_request_id;
354
355 /**
356 * Always use delta operation instead of sending full sets,
357 * even it it's less efficient.
358 */
359 int force_delta;
360
361 /**
362 * Always send full sets, even if delta operations would
363 * be more efficient.
364 */
365 int force_full;
366
367 /**
368 * #GNUNET_YES to fail operations where Byzantine faults
369 * are suspected
370 */
371 int byzantine;
372
373 /**
374 * #GNUNET_YES to also send back set elements we are sending to
375 * the remote peer.
376 */
377 int symmetric;
378
379 /**
380 * Lower bound for the set size, used only when
381 * byzantine mode is enabled.
382 */
383 int byzantine_lower_bound;
384
385 /**
386 * Unique request id for the request from a remote peer, sent to the
387 * client, which will accept or reject the request. Set to '0' iff
388 * the request has not been suggested yet.
389 */
390 uint32_t suggest_id;
391
392 /**
393 * Generation in which the operation handle
394 * was created.
395 */
396 unsigned int generation_created;
397};
398
399
400/**
401 * SetContent stores the actual set elements, which may be shared by
402 * multiple generations derived from one set.
403 */
404struct SetContent
405{
406 /**
407 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
408 */
409 struct GNUNET_CONTAINER_MultiHashMap *elements;
410
411 /**
412 * Number of references to the content.
413 */
414 unsigned int refcount;
415
416 /**
417 * FIXME: document!
418 */
419 unsigned int latest_generation;
420
421 /**
422 * Number of concurrently active iterators.
423 */
424 int iterator_count;
425};
426
427
428/**
429 * A set that supports a specific operation with other peers.
430 */
431struct Set
432{
433 /**
434 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
435 */
436 struct Set *next;
437
438 /**
439 * Sets are held in a doubly linked list.
440 */
441 struct Set *prev;
442
443 /**
444 * Client that owns the set. Only one client may own a set,
445 * and there can only be one set per client.
446 */
447 struct ClientState *cs;
448
449 /**
450 * Content, possibly shared by multiple sets,
451 * and thus reference counted.
452 */
453 struct SetContent *content;
454
455 /**
456 * The strata estimator is only generated once for each set. The IBF keys
457 * are derived from the element hashes with salt=0.
458 */
459 struct StrataEstimator *se;
460
461 /**
462 * Evaluate operations are held in a linked list.
463 */
464 struct Operation *ops_head;
465
466 /**
467 * Evaluate operations are held in a linked list.
468 */
469 struct Operation *ops_tail;
470
471 /**
472 * Current generation, that is, number of previously executed
473 * operations and lazy copies on the underlying set content.
474 */
475 unsigned int current_generation;
476
477};
478
479
480/**
481 * The key entry is used to associate an ibf key with an element.
482 */
483struct KeyEntry
484{
485 /**
486 * IBF key for the entry, derived from the current salt.
487 */
488 struct IBF_Key ibf_key;
489
490 /**
491 * The actual element associated with the key.
492 *
493 * Only owned by the union operation if element->operation
494 * is #GNUNET_YES.
495 */
496 struct ElementEntry *element;
497
498 /**
499 * Did we receive this element? Even if element->is_foreign is false, we
500 * might have received the element, so this indicates that the other peer
501 * has it.
502 */
503 int received;
504};
505
506
507/**
508 * Used as a closure for sending elements
509 * with a specific IBF key.
510 */
511struct SendElementClosure
512{
513 /**
514 * The IBF key whose matching elements should be
515 * sent.
516 */
517 struct IBF_Key ibf_key;
518
519 /**
520 * Operation for which the elements
521 * should be sent.
522 */
523 struct Operation *op;
524};
525
526
527/**
528 * A listener is inhabited by a client, and waits for evaluation
529 * requests from remote peers.
530 */
531struct Listener
532{
533 /**
534 * Listeners are held in a doubly linked list.
535 */
536 struct Listener *next;
537
538 /**
539 * Listeners are held in a doubly linked list.
540 */
541 struct Listener *prev;
542
543 /**
544 * Head of DLL of operations this listener is responsible for.
545 * Once the client has accepted/declined the operation, the
546 * operation is moved to the respective set's operation DLLS.
547 */
548 struct Operation *op_head;
549
550 /**
551 * Tail of DLL of operations this listener is responsible for.
552 * Once the client has accepted/declined the operation, the
553 * operation is moved to the respective set's operation DLLS.
554 */
555 struct Operation *op_tail;
556
557 /**
558 * Client that owns the listener.
559 * Only one client may own a listener.
560 */
561 struct ClientState *cs;
562
563 /**
564 * The port we are listening on with CADET.
565 */
566 struct GNUNET_CADET_Port *open_port;
567
568 /**
569 * Application ID for the operation, used to distinguish
570 * multiple operations of the same type with the same peer.
571 */
572 struct GNUNET_HashCode app_id;
573
574};
575
576
577/**
578 * Handle to the cadet service, used to listen for and connect to
579 * remote peers.
580 */
581static struct GNUNET_CADET_Handle *cadet;
582
583/**
584 * Statistics handle.
585 */
586static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
587
588/**
589 * Listeners are held in a doubly linked list.
590 */
591static struct Listener *listener_head;
592
593/**
594 * Listeners are held in a doubly linked list.
595 */
596static struct Listener *listener_tail;
597
598/**
599 * Number of active clients.
600 */
601static unsigned int num_clients;
602
603/**
604 * Are we in shutdown? if #GNUNET_YES and the number of clients
605 * drops to zero, disconnect from CADET.
606 */
607static int in_shutdown;
608
609/**
610 * Counter for allocating unique IDs for clients, used to identify incoming
611 * operation requests from remote peers, that the client can choose to accept
612 * or refuse. 0 must not be used (reserved for uninitialized).
613 */
614static uint32_t suggest_id;
615
616
617/**
618 * Iterator over hash map entries, called to
619 * destroy the linked list of colliding ibf key entries.
620 *
621 * @param cls closure
622 * @param key current key code
623 * @param value value in the hash map
624 * @return #GNUNET_YES if we should continue to iterate,
625 * #GNUNET_NO if not.
626 */
627static int
628destroy_key_to_element_iter (void *cls,
629 uint32_t key,
630 void *value)
631{
632 struct KeyEntry *k = value;
633
634 GNUNET_assert (NULL != k);
635 if (GNUNET_YES == k->element->remote)
636 {
637 GNUNET_free (k->element);
638 k->element = NULL;
639 }
640 GNUNET_free (k);
641 return GNUNET_YES;
642}
643
644
645/**
646 * Signal to the client that the operation has finished and
647 * destroy the operation.
648 *
649 * @param cls operation to destroy
650 */
651static void
652send_client_done (void *cls)
653{
654 struct Operation *op = cls;
655 struct GNUNET_MQ_Envelope *ev;
656 struct GNUNET_SETU_ResultMessage *rm;
657
658 if (GNUNET_YES == op->client_done_sent)
659 return;
660 if (PHASE_DONE != op->phase)
661 {
662 LOG (GNUNET_ERROR_TYPE_WARNING,
663 "Union operation failed\n");
664 GNUNET_STATISTICS_update (_GSS_statistics,
665 "# Union operations failed",
666 1,
667 GNUNET_NO);
668 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SETU_RESULT);
669 rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
670 rm->request_id = htonl (op->client_request_id);
671 rm->element_type = htons (0);
672 GNUNET_MQ_send (op->set->cs->mq,
673 ev);
674 return;
675 }
676
677 op->client_done_sent = GNUNET_YES;
678
679 GNUNET_STATISTICS_update (_GSS_statistics,
680 "# Union operations succeeded",
681 1,
682 GNUNET_NO);
683 LOG (GNUNET_ERROR_TYPE_INFO,
684 "Signalling client that union operation is done\n");
685 ev = GNUNET_MQ_msg (rm,
686 GNUNET_MESSAGE_TYPE_SETU_RESULT);
687 rm->request_id = htonl (op->client_request_id);
688 rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
689 rm->element_type = htons (0);
690 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
691 op->key_to_element));
692 GNUNET_MQ_send (op->set->cs->mq,
693 ev);
694}
695
696
697/* FIXME: the destroy logic is a mess and should be cleaned up! */
698
699/**
700 * Destroy the given operation. Used for any operation where both
701 * peers were known and that thus actually had a vt and channel. Must
702 * not be used for operations where 'listener' is still set and we do
703 * not know the other peer.
704 *
705 * Call the implementation-specific cancel function of the operation.
706 * Disconnects from the remote peer. Does not disconnect the client,
707 * as there may be multiple operations per set.
708 *
709 * @param op operation to destroy
710 */
711static void
712_GSS_operation_destroy (struct Operation *op)
713{
714 struct Set *set = op->set;
715 struct GNUNET_CADET_Channel *channel;
716
717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718 "Destroying union operation %p\n",
719 op);
720 GNUNET_assert (NULL == op->listener);
721 /* check if the op was canceled twice */
722 if (NULL != op->remote_ibf)
723 {
724 ibf_destroy (op->remote_ibf);
725 op->remote_ibf = NULL;
726 }
727 if (NULL != op->demanded_hashes)
728 {
729 GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
730 op->demanded_hashes = NULL;
731 }
732 if (NULL != op->local_ibf)
733 {
734 ibf_destroy (op->local_ibf);
735 op->local_ibf = NULL;
736 }
737 if (NULL != op->se)
738 {
739 strata_estimator_destroy (op->se);
740 op->se = NULL;
741 }
742 if (NULL != op->key_to_element)
743 {
744 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
745 &destroy_key_to_element_iter,
746 NULL);
747 GNUNET_CONTAINER_multihashmap32_destroy (op->key_to_element);
748 op->key_to_element = NULL;
749 }
750 if (NULL != set)
751 {
752 GNUNET_CONTAINER_DLL_remove (set->ops_head,
753 set->ops_tail,
754 op);
755 op->set = NULL;
756 }
757 if (NULL != op->context_msg)
758 {
759 GNUNET_free (op->context_msg);
760 op->context_msg = NULL;
761 }
762 if (NULL != (channel = op->channel))
763 {
764 /* This will free op; called conditionally as this helper function
765 is also called from within the channel disconnect handler. */
766 op->channel = NULL;
767 GNUNET_CADET_channel_destroy (channel);
768 }
769 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
770 * there was a channel end handler that will free 'op' on the call stack. */
771}
772
773
774/**
775 * This function probably should not exist
776 * and be replaced by inlining more specific
777 * logic in the various places where it is called.
778 */
779static void
780_GSS_operation_destroy2 (struct Operation *op);
781
782
783/**
784 * Destroy an incoming request from a remote peer
785 *
786 * @param op remote request to destroy
787 */
788static void
789incoming_destroy (struct Operation *op)
790{
791 struct Listener *listener;
792
793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 "Destroying incoming operation %p\n",
795 op);
796 if (NULL != (listener = op->listener))
797 {
798 GNUNET_CONTAINER_DLL_remove (listener->op_head,
799 listener->op_tail,
800 op);
801 op->listener = NULL;
802 }
803 if (NULL != op->timeout_task)
804 {
805 GNUNET_SCHEDULER_cancel (op->timeout_task);
806 op->timeout_task = NULL;
807 }
808 _GSS_operation_destroy2 (op);
809}
810
811
812/**
813 * This function probably should not exist
814 * and be replaced by inlining more specific
815 * logic in the various places where it is called.
816 */
817static void
818_GSS_operation_destroy2 (struct Operation *op)
819{
820 struct GNUNET_CADET_Channel *channel;
821
822 if (NULL != (channel = op->channel))
823 {
824 /* This will free op; called conditionally as this helper function
825 is also called from within the channel disconnect handler. */
826 op->channel = NULL;
827 GNUNET_CADET_channel_destroy (channel);
828 }
829 if (NULL != op->listener)
830 {
831 incoming_destroy (op);
832 return;
833 }
834 if (NULL != op->set)
835 send_client_done (op);
836 _GSS_operation_destroy (op);
837 GNUNET_free (op);
838}
839
840
841/**
842 * Inform the client that the union operation has failed,
843 * and proceed to destroy the evaluate operation.
844 *
845 * @param op the union operation to fail
846 */
847static void
848fail_union_operation (struct Operation *op)
849{
850 struct GNUNET_MQ_Envelope *ev;
851 struct GNUNET_SETU_ResultMessage *msg;
852
853 LOG (GNUNET_ERROR_TYPE_WARNING,
854 "union operation failed\n");
855 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETU_RESULT);
856 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
857 msg->request_id = htonl (op->client_request_id);
858 msg->element_type = htons (0);
859 GNUNET_MQ_send (op->set->cs->mq,
860 ev);
861 _GSS_operation_destroy (op);
862}
863
864
865/**
866 * Derive the IBF key from a hash code and
867 * a salt.
868 *
869 * @param src the hash code
870 * @return the derived IBF key
871 */
872static struct IBF_Key
873get_ibf_key (const struct GNUNET_HashCode *src)
874{
875 struct IBF_Key key;
876 uint16_t salt = 0;
877
878 GNUNET_assert (GNUNET_OK ==
879 GNUNET_CRYPTO_kdf (&key, sizeof(key),
880 src, sizeof *src,
881 &salt, sizeof(salt),
882 NULL, 0));
883 return key;
884}
885
886
887/**
888 * Context for #op_get_element_iterator
889 */
890struct GetElementContext
891{
892 /**
893 * FIXME.
894 */
895 struct GNUNET_HashCode hash;
896
897 /**
898 * FIXME.
899 */
900 struct KeyEntry *k;
901};
902
903
904/**
905 * Iterator over the mapping from IBF keys to element entries. Checks if we
906 * have an element with a given GNUNET_HashCode.
907 *
908 * @param cls closure
909 * @param key current key code
910 * @param value value in the hash map
911 * @return #GNUNET_YES if we should search further,
912 * #GNUNET_NO if we've found the element.
913 */
914static int
915op_get_element_iterator (void *cls,
916 uint32_t key,
917 void *value)
918{
919 struct GetElementContext *ctx = cls;
920 struct KeyEntry *k = value;
921
922 GNUNET_assert (NULL != k);
923 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
924 &ctx->hash))
925 {
926 ctx->k = k;
927 return GNUNET_NO;
928 }
929 return GNUNET_YES;
930}
931
932
933/**
934 * Determine whether the given element is already in the operation's element
935 * set.
936 *
937 * @param op operation that should be tested for 'element_hash'
938 * @param element_hash hash of the element to look for
939 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
940 */
941static struct KeyEntry *
942op_get_element (struct Operation *op,
943 const struct GNUNET_HashCode *element_hash)
944{
945 int ret;
946 struct IBF_Key ibf_key;
947 struct GetElementContext ctx = { { { 0 } }, 0 };
948
949 ctx.hash = *element_hash;
950
951 ibf_key = get_ibf_key (element_hash);
952 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->key_to_element,
953 (uint32_t) ibf_key.key_val,
954 &op_get_element_iterator,
955 &ctx);
956
957 /* was the iteration aborted because we found the element? */
958 if (GNUNET_SYSERR == ret)
959 {
960 GNUNET_assert (NULL != ctx.k);
961 return ctx.k;
962 }
963 return NULL;
964}
965
966
967/**
968 * Insert an element into the union operation's
969 * key-to-element mapping. Takes ownership of 'ee'.
970 * Note that this does not insert the element in the set,
971 * only in the operation's key-element mapping.
972 * This is done to speed up re-tried operations, if some elements
973 * were transmitted, and then the IBF fails to decode.
974 *
975 * XXX: clarify ownership, doesn't sound right.
976 *
977 * @param op the union operation
978 * @param ee the element entry
979 * @parem received was this element received from the remote peer?
980 */
981static void
982op_register_element (struct Operation *op,
983 struct ElementEntry *ee,
984 int received)
985{
986 struct IBF_Key ibf_key;
987 struct KeyEntry *k;
988
989 ibf_key = get_ibf_key (&ee->element_hash);
990 k = GNUNET_new (struct KeyEntry);
991 k->element = ee;
992 k->ibf_key = ibf_key;
993 k->received = received;
994 GNUNET_assert (GNUNET_OK ==
995 GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
996 (uint32_t) ibf_key.key_val,
997 k,
998 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
999}
1000
1001
1002/**
1003 * FIXME.
1004 */
1005static void
1006salt_key (const struct IBF_Key *k_in,
1007 uint32_t salt,
1008 struct IBF_Key *k_out)
1009{
1010 int s = salt % 64;
1011 uint64_t x = k_in->key_val;
1012
1013 /* rotate ibf key */
1014 x = (x >> s) | (x << (64 - s));
1015 k_out->key_val = x;
1016}
1017
1018
1019/**
1020 * FIXME.
1021 */
1022static void
1023unsalt_key (const struct IBF_Key *k_in,
1024 uint32_t salt,
1025 struct IBF_Key *k_out)
1026{
1027 int s = salt % 64;
1028 uint64_t x = k_in->key_val;
1029
1030 x = (x << s) | (x >> (64 - s));
1031 k_out->key_val = x;
1032}
1033
1034
1035/**
1036 * Insert a key into an ibf.
1037 *
1038 * @param cls the ibf
1039 * @param key unused
1040 * @param value the key entry to get the key from
1041 */
1042static int
1043prepare_ibf_iterator (void *cls,
1044 uint32_t key,
1045 void *value)
1046{
1047 struct Operation *op = cls;
1048 struct KeyEntry *ke = value;
1049 struct IBF_Key salted_key;
1050
1051 LOG (GNUNET_ERROR_TYPE_DEBUG,
1052 "[OP %x] inserting %lx (hash %s) into ibf\n",
1053 (void *) op,
1054 (unsigned long) ke->ibf_key.key_val,
1055 GNUNET_h2s (&ke->element->element_hash));
1056 salt_key (&ke->ibf_key,
1057 op->salt_send,
1058 &salted_key);
1059 ibf_insert (op->local_ibf, salted_key);
1060 return GNUNET_YES;
1061}
1062
1063
1064/**
1065 * Is element @a ee part of the set used by @a op?
1066 *
1067 * @param ee element to test
1068 * @param op operation the defines the set and its generation
1069 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
1070 */
1071static int
1072_GSS_is_element_of_operation (struct ElementEntry *ee,
1073 struct Operation *op)
1074{
1075 return ee->generation >= op->generation_created;
1076}
1077
1078
1079/**
1080 * Iterator for initializing the
1081 * key-to-element mapping of a union operation
1082 *
1083 * @param cls the union operation `struct Operation *`
1084 * @param key unused
1085 * @param value the `struct ElementEntry *` to insert
1086 * into the key-to-element mapping
1087 * @return #GNUNET_YES (to continue iterating)
1088 */
1089static int
1090init_key_to_element_iterator (void *cls,
1091 const struct GNUNET_HashCode *key,
1092 void *value)
1093{
1094 struct Operation *op = cls;
1095 struct ElementEntry *ee = value;
1096
1097 /* make sure that the element belongs to the set at the time
1098 * of creating the operation */
1099 if (GNUNET_NO ==
1100 _GSS_is_element_of_operation (ee,
1101 op))
1102 return GNUNET_YES;
1103 GNUNET_assert (GNUNET_NO == ee->remote);
1104 op_register_element (op,
1105 ee,
1106 GNUNET_NO);
1107 return GNUNET_YES;
1108}
1109
1110
1111/**
1112 * Initialize the IBF key to element mapping local to this set operation.
1113 *
1114 * @param op the set union operation
1115 */
1116static void
1117initialize_key_to_element (struct Operation *op)
1118{
1119 unsigned int len;
1120
1121 GNUNET_assert (NULL == op->key_to_element);
1122 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
1123 op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
1124 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1125 &init_key_to_element_iterator,
1126 op);
1127}
1128
1129
1130/**
1131 * Create an ibf with the operation's elements
1132 * of the specified size
1133 *
1134 * @param op the union operation
1135 * @param size size of the ibf to create
1136 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1137 */
1138static int
1139prepare_ibf (struct Operation *op,
1140 uint32_t size)
1141{
1142 GNUNET_assert (NULL != op->key_to_element);
1143
1144 if (NULL != op->local_ibf)
1145 ibf_destroy (op->local_ibf);
1146 op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
1147 if (NULL == op->local_ibf)
1148 {
1149 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1150 "Failed to allocate local IBF\n");
1151 return GNUNET_SYSERR;
1152 }
1153 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
1154 &prepare_ibf_iterator,
1155 op);
1156 return GNUNET_OK;
1157}
1158
1159
1160/**
1161 * Send an ibf of appropriate size.
1162 *
1163 * Fragments the IBF into multiple messages if necessary.
1164 *
1165 * @param op the union operation
1166 * @param ibf_order order of the ibf to send, size=2^order
1167 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1168 */
1169static int
1170send_ibf (struct Operation *op,
1171 uint16_t ibf_order)
1172{
1173 unsigned int buckets_sent = 0;
1174 struct InvertibleBloomFilter *ibf;
1175
1176 if (GNUNET_OK !=
1177 prepare_ibf (op, 1 << ibf_order))
1178 {
1179 /* allocation failed */
1180 return GNUNET_SYSERR;
1181 }
1182
1183 LOG (GNUNET_ERROR_TYPE_DEBUG,
1184 "sending ibf of size %u\n",
1185 1 << ibf_order);
1186
1187 {
1188 char name[64] = { 0 };
1189 snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
1190 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1191 }
1192
1193 ibf = op->local_ibf;
1194
1195 while (buckets_sent < (1 << ibf_order))
1196 {
1197 unsigned int buckets_in_message;
1198 struct GNUNET_MQ_Envelope *ev;
1199 struct IBFMessage *msg;
1200
1201 buckets_in_message = (1 << ibf_order) - buckets_sent;
1202 /* limit to maximum */
1203 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1204 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1205
1206 ev = GNUNET_MQ_msg_extra (msg,
1207 buckets_in_message * IBF_BUCKET_SIZE,
1208 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
1209 msg->reserved1 = 0;
1210 msg->reserved2 = 0;
1211 msg->order = ibf_order;
1212 msg->offset = htonl (buckets_sent);
1213 msg->salt = htonl (op->salt_send);
1214 ibf_write_slice (ibf, buckets_sent,
1215 buckets_in_message, &msg[1]);
1216 buckets_sent += buckets_in_message;
1217 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "ibf chunk size %u, %u/%u sent\n",
1219 buckets_in_message,
1220 buckets_sent,
1221 1 << ibf_order);
1222 GNUNET_MQ_send (op->mq, ev);
1223 }
1224
1225 /* The other peer must decode the IBF, so
1226 * we're passive. */
1227 op->phase = PHASE_INVENTORY_PASSIVE;
1228 return GNUNET_OK;
1229}
1230
1231
1232/**
1233 * Compute the necessary order of an ibf
1234 * from the size of the symmetric set difference.
1235 *
1236 * @param diff the difference
1237 * @return the required size of the ibf
1238 */
1239static unsigned int
1240get_order_from_difference (unsigned int diff)
1241{
1242 unsigned int ibf_order;
1243
1244 ibf_order = 2;
1245 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
1246 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
1247 (ibf_order < MAX_IBF_ORDER))
1248 ibf_order++;
1249 // add one for correction
1250 return ibf_order + 1;
1251}
1252
1253
1254/**
1255 * Send a set element.
1256 *
1257 * @param cls the union operation `struct Operation *`
1258 * @param key unused
1259 * @param value the `struct ElementEntry *` to insert
1260 * into the key-to-element mapping
1261 * @return #GNUNET_YES (to continue iterating)
1262 */
1263static int
1264send_full_element_iterator (void *cls,
1265 const struct GNUNET_HashCode *key,
1266 void *value)
1267{
1268 struct Operation *op = cls;
1269 struct GNUNET_SETU_ElementMessage *emsg;
1270 struct ElementEntry *ee = value;
1271 struct GNUNET_SETU_Element *el = &ee->element;
1272 struct GNUNET_MQ_Envelope *ev;
1273
1274 LOG (GNUNET_ERROR_TYPE_DEBUG,
1275 "Sending element %s\n",
1276 GNUNET_h2s (key));
1277 ev = GNUNET_MQ_msg_extra (emsg,
1278 el->size,
1279 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
1280 emsg->element_type = htons (el->element_type);
1281 GNUNET_memcpy (&emsg[1],
1282 el->data,
1283 el->size);
1284 GNUNET_MQ_send (op->mq,
1285 ev);
1286 return GNUNET_YES;
1287}
1288
1289
1290/**
1291 * Switch to full set transmission for @a op.
1292 *
1293 * @param op operation to switch to full set transmission.
1294 */
1295static void
1296send_full_set (struct Operation *op)
1297{
1298 struct GNUNET_MQ_Envelope *ev;
1299
1300 op->phase = PHASE_FULL_SENDING;
1301 LOG (GNUNET_ERROR_TYPE_DEBUG,
1302 "Dedicing to transmit the full set\n");
1303 /* FIXME: use a more memory-friendly way of doing this with an
1304 iterator, just as we do in the non-full case! */
1305 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1306 &send_full_element_iterator,
1307 op);
1308 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
1309 GNUNET_MQ_send (op->mq,
1310 ev);
1311}
1312
1313
1314/**
1315 * Handle a strata estimator from a remote peer
1316 *
1317 * @param cls the union operation
1318 * @param msg the message
1319 */
1320static int
1321check_union_p2p_strata_estimator (void *cls,
1322 const struct StrataEstimatorMessage *msg)
1323{
1324 struct Operation *op = cls;
1325 int is_compressed;
1326 size_t len;
1327
1328 if (op->phase != PHASE_EXPECT_SE)
1329 {
1330 GNUNET_break (0);
1331 return GNUNET_SYSERR;
1332 }
1333 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1334 msg->header.type));
1335 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1336 if ((GNUNET_NO == is_compressed) &&
1337 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
1338 {
1339 GNUNET_break (0);
1340 return GNUNET_SYSERR;
1341 }
1342 return GNUNET_OK;
1343}
1344
1345
1346/**
1347 * Handle a strata estimator from a remote peer
1348 *
1349 * @param cls the union operation
1350 * @param msg the message
1351 */
1352static void
1353handle_union_p2p_strata_estimator (void *cls,
1354 const struct StrataEstimatorMessage *msg)
1355{
1356 struct Operation *op = cls;
1357 struct StrataEstimator *remote_se;
1358 unsigned int diff;
1359 uint64_t other_size;
1360 size_t len;
1361 int is_compressed;
1362
1363 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1364 msg->header.type));
1365 GNUNET_STATISTICS_update (_GSS_statistics,
1366 "# bytes of SE received",
1367 ntohs (msg->header.size),
1368 GNUNET_NO);
1369 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1370 other_size = GNUNET_ntohll (msg->set_size);
1371 remote_se = strata_estimator_create (SE_STRATA_COUNT,
1372 SE_IBF_SIZE,
1373 SE_IBF_HASH_NUM);
1374 if (NULL == remote_se)
1375 {
1376 /* insufficient resources, fail */
1377 fail_union_operation (op);
1378 return;
1379 }
1380 if (GNUNET_OK !=
1381 strata_estimator_read (&msg[1],
1382 len,
1383 is_compressed,
1384 remote_se))
1385 {
1386 /* decompression failed */
1387 strata_estimator_destroy (remote_se);
1388 fail_union_operation (op);
1389 return;
1390 }
1391 GNUNET_assert (NULL != op->se);
1392 diff = strata_estimator_difference (remote_se,
1393 op->se);
1394
1395 if (diff > 200)
1396 diff = diff * 3 / 2;
1397
1398 strata_estimator_destroy (remote_se);
1399 strata_estimator_destroy (op->se);
1400 op->se = NULL;
1401 LOG (GNUNET_ERROR_TYPE_DEBUG,
1402 "got se diff=%d, using ibf size %d\n",
1403 diff,
1404 1U << get_order_from_difference (diff));
1405
1406 {
1407 char *set_debug;
1408
1409 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
1410 if ((NULL != set_debug) &&
1411 (0 == strcmp (set_debug, "1")))
1412 {
1413 FILE *f = fopen ("set.log", "a");
1414 fprintf (f, "%llu\n", (unsigned long long) diff);
1415 fclose (f);
1416 }
1417 }
1418
1419 if ((GNUNET_YES == op->byzantine) &&
1420 (other_size < op->byzantine_lower_bound))
1421 {
1422 GNUNET_break (0);
1423 fail_union_operation (op);
1424 return;
1425 }
1426
1427 if ((GNUNET_YES == op->force_full) ||
1428 (diff > op->initial_size / 4) ||
1429 (0 == other_size))
1430 {
1431 LOG (GNUNET_ERROR_TYPE_DEBUG,
1432 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
1433 diff,
1434 op->initial_size);
1435 GNUNET_STATISTICS_update (_GSS_statistics,
1436 "# of full sends",
1437 1,
1438 GNUNET_NO);
1439 if ((op->initial_size <= other_size) ||
1440 (0 == other_size))
1441 {
1442 send_full_set (op);
1443 }
1444 else
1445 {
1446 struct GNUNET_MQ_Envelope *ev;
1447
1448 LOG (GNUNET_ERROR_TYPE_DEBUG,
1449 "Telling other peer that we expect its full set\n");
1450 op->phase = PHASE_EXPECT_IBF;
1451 ev = GNUNET_MQ_msg_header (
1452 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
1453 GNUNET_MQ_send (op->mq,
1454 ev);
1455 }
1456 }
1457 else
1458 {
1459 GNUNET_STATISTICS_update (_GSS_statistics,
1460 "# of ibf sends",
1461 1,
1462 GNUNET_NO);
1463 if (GNUNET_OK !=
1464 send_ibf (op,
1465 get_order_from_difference (diff)))
1466 {
1467 /* Internal error, best we can do is shut the connection */
1468 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1469 "Failed to send IBF, closing connection\n");
1470 fail_union_operation (op);
1471 return;
1472 }
1473 }
1474 GNUNET_CADET_receive_done (op->channel);
1475}
1476
1477
1478/**
1479 * Iterator to send elements to a remote peer
1480 *
1481 * @param cls closure with the element key and the union operation
1482 * @param key ignored
1483 * @param value the key entry
1484 */
1485static int
1486send_offers_iterator (void *cls,
1487 uint32_t key,
1488 void *value)
1489{
1490 struct SendElementClosure *sec = cls;
1491 struct Operation *op = sec->op;
1492 struct KeyEntry *ke = value;
1493 struct GNUNET_MQ_Envelope *ev;
1494 struct GNUNET_MessageHeader *mh;
1495
1496 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1497 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1498 return GNUNET_YES;
1499
1500 ev = GNUNET_MQ_msg_header_extra (mh,
1501 sizeof(struct GNUNET_HashCode),
1502 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
1503
1504 GNUNET_assert (NULL != ev);
1505 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1506 LOG (GNUNET_ERROR_TYPE_DEBUG,
1507 "[OP %x] sending element offer (%s) to peer\n",
1508 (void *) op,
1509 GNUNET_h2s (&ke->element->element_hash));
1510 GNUNET_MQ_send (op->mq, ev);
1511 return GNUNET_YES;
1512}
1513
1514
1515/**
1516 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1517 *
1518 * @param op union operation
1519 * @param ibf_key IBF key of interest
1520 */
1521static void
1522send_offers_for_key (struct Operation *op,
1523 struct IBF_Key ibf_key)
1524{
1525 struct SendElementClosure send_cls;
1526
1527 send_cls.ibf_key = ibf_key;
1528 send_cls.op = op;
1529 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1530 op->key_to_element,
1531 (uint32_t) ibf_key.
1532 key_val,
1533 &send_offers_iterator,
1534 &send_cls);
1535}
1536
1537
1538/**
1539 * Decode which elements are missing on each side, and
1540 * send the appropriate offers and inquiries.
1541 *
1542 * @param op union operation
1543 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1544 */
1545static int
1546decode_and_send (struct Operation *op)
1547{
1548 struct IBF_Key key;
1549 struct IBF_Key last_key;
1550 int side;
1551 unsigned int num_decoded;
1552 struct InvertibleBloomFilter *diff_ibf;
1553
1554 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->phase);
1555
1556 if (GNUNET_OK !=
1557 prepare_ibf (op,
1558 op->remote_ibf->size))
1559 {
1560 GNUNET_break (0);
1561 /* allocation failed */
1562 return GNUNET_SYSERR;
1563 }
1564 diff_ibf = ibf_dup (op->local_ibf);
1565 ibf_subtract (diff_ibf,
1566 op->remote_ibf);
1567
1568 ibf_destroy (op->remote_ibf);
1569 op->remote_ibf = NULL;
1570
1571 LOG (GNUNET_ERROR_TYPE_DEBUG,
1572 "decoding IBF (size=%u)\n",
1573 diff_ibf->size);
1574
1575 num_decoded = 0;
1576 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1577
1578 while (1)
1579 {
1580 int res;
1581 int cycle_detected = GNUNET_NO;
1582
1583 last_key = key;
1584
1585 res = ibf_decode (diff_ibf,
1586 &side,
1587 &key);
1588 if (res == GNUNET_OK)
1589 {
1590 LOG (GNUNET_ERROR_TYPE_DEBUG,
1591 "decoded ibf key %lx\n",
1592 (unsigned long) key.key_val);
1593 num_decoded += 1;
1594 if ((num_decoded > diff_ibf->size) ||
1595 ((num_decoded > 1) &&
1596 (last_key.key_val == key.key_val)))
1597 {
1598 LOG (GNUNET_ERROR_TYPE_DEBUG,
1599 "detected cyclic ibf (decoded %u/%u)\n",
1600 num_decoded,
1601 diff_ibf->size);
1602 cycle_detected = GNUNET_YES;
1603 }
1604 }
1605 if ((GNUNET_SYSERR == res) ||
1606 (GNUNET_YES == cycle_detected))
1607 {
1608 int next_order;
1609 next_order = 0;
1610 while (1 << next_order < diff_ibf->size)
1611 next_order++;
1612 next_order++;
1613 if (next_order <= MAX_IBF_ORDER)
1614 {
1615 LOG (GNUNET_ERROR_TYPE_DEBUG,
1616 "decoding failed, sending larger ibf (size %u)\n",
1617 1 << next_order);
1618 GNUNET_STATISTICS_update (_GSS_statistics,
1619 "# of IBF retries",
1620 1,
1621 GNUNET_NO);
1622 op->salt_send++;
1623 if (GNUNET_OK !=
1624 send_ibf (op, next_order))
1625 {
1626 /* Internal error, best we can do is shut the connection */
1627 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1628 "Failed to send IBF, closing connection\n");
1629 fail_union_operation (op);
1630 ibf_destroy (diff_ibf);
1631 return GNUNET_SYSERR;
1632 }
1633 }
1634 else
1635 {
1636 GNUNET_STATISTICS_update (_GSS_statistics,
1637 "# of failed union operations (too large)",
1638 1,
1639 GNUNET_NO);
1640 // XXX: Send the whole set, element-by-element
1641 LOG (GNUNET_ERROR_TYPE_ERROR,
1642 "set union failed: reached ibf limit\n");
1643 fail_union_operation (op);
1644 ibf_destroy (diff_ibf);
1645 return GNUNET_SYSERR;
1646 }
1647 break;
1648 }
1649 if (GNUNET_NO == res)
1650 {
1651 struct GNUNET_MQ_Envelope *ev;
1652
1653 LOG (GNUNET_ERROR_TYPE_DEBUG,
1654 "transmitted all values, sending DONE\n");
1655 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1656 GNUNET_MQ_send (op->mq, ev);
1657 /* We now wait until we get a DONE message back
1658 * and then wait for our MQ to be flushed and all our
1659 * demands be delivered. */
1660 break;
1661 }
1662 if (1 == side)
1663 {
1664 struct IBF_Key unsalted_key;
1665
1666 unsalt_key (&key,
1667 op->salt_receive,
1668 &unsalted_key);
1669 send_offers_for_key (op,
1670 unsalted_key);
1671 }
1672 else if (-1 == side)
1673 {
1674 struct GNUNET_MQ_Envelope *ev;
1675 struct InquiryMessage *msg;
1676
1677 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1678 * the effort additional complexity. */
1679 ev = GNUNET_MQ_msg_extra (msg,
1680 sizeof(struct IBF_Key),
1681 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY);
1682 msg->salt = htonl (op->salt_receive);
1683 GNUNET_memcpy (&msg[1],
1684 &key,
1685 sizeof(struct IBF_Key));
1686 LOG (GNUNET_ERROR_TYPE_DEBUG,
1687 "sending element inquiry for IBF key %lx\n",
1688 (unsigned long) key.key_val);
1689 GNUNET_MQ_send (op->mq, ev);
1690 }
1691 else
1692 {
1693 GNUNET_assert (0);
1694 }
1695 }
1696 ibf_destroy (diff_ibf);
1697 return GNUNET_OK;
1698}
1699
1700
1701/**
1702 * Check an IBF message from a remote peer.
1703 *
1704 * Reassemble the IBF from multiple pieces, and
1705 * process the whole IBF once possible.
1706 *
1707 * @param cls the union operation
1708 * @param msg the header of the message
1709 * @return #GNUNET_OK if @a msg is well-formed
1710 */
1711static int
1712check_union_p2p_ibf (void *cls,
1713 const struct IBFMessage *msg)
1714{
1715 struct Operation *op = cls;
1716 unsigned int buckets_in_message;
1717
1718 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1719 / IBF_BUCKET_SIZE;
1720 if (0 == buckets_in_message)
1721 {
1722 GNUNET_break_op (0);
1723 return GNUNET_SYSERR;
1724 }
1725 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1726 * IBF_BUCKET_SIZE)
1727 {
1728 GNUNET_break_op (0);
1729 return GNUNET_SYSERR;
1730 }
1731 if (op->phase == PHASE_EXPECT_IBF_CONT)
1732 {
1733 if (ntohl (msg->offset) != op->ibf_buckets_received)
1734 {
1735 GNUNET_break_op (0);
1736 return GNUNET_SYSERR;
1737 }
1738 if (1 << msg->order != op->remote_ibf->size)
1739 {
1740 GNUNET_break_op (0);
1741 return GNUNET_SYSERR;
1742 }
1743 if (ntohl (msg->salt) != op->salt_receive)
1744 {
1745 GNUNET_break_op (0);
1746 return GNUNET_SYSERR;
1747 }
1748 }
1749 else if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
1750 (op->phase != PHASE_EXPECT_IBF))
1751 {
1752 GNUNET_break_op (0);
1753 return GNUNET_SYSERR;
1754 }
1755
1756 return GNUNET_OK;
1757}
1758
1759
1760/**
1761 * Handle an IBF message from a remote peer.
1762 *
1763 * Reassemble the IBF from multiple pieces, and
1764 * process the whole IBF once possible.
1765 *
1766 * @param cls the union operation
1767 * @param msg the header of the message
1768 */
1769static void
1770handle_union_p2p_ibf (void *cls,
1771 const struct IBFMessage *msg)
1772{
1773 struct Operation *op = cls;
1774 unsigned int buckets_in_message;
1775
1776 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1777 / IBF_BUCKET_SIZE;
1778 if ((op->phase == PHASE_INVENTORY_PASSIVE) ||
1779 (op->phase == PHASE_EXPECT_IBF))
1780 {
1781 op->phase = PHASE_EXPECT_IBF_CONT;
1782 GNUNET_assert (NULL == op->remote_ibf);
1783 LOG (GNUNET_ERROR_TYPE_DEBUG,
1784 "Creating new ibf of size %u\n",
1785 1 << msg->order);
1786 op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1787 op->salt_receive = ntohl (msg->salt);
1788 LOG (GNUNET_ERROR_TYPE_DEBUG,
1789 "Receiving new IBF with salt %u\n",
1790 op->salt_receive);
1791 if (NULL == op->remote_ibf)
1792 {
1793 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1794 "Failed to parse remote IBF, closing connection\n");
1795 fail_union_operation (op);
1796 return;
1797 }
1798 op->ibf_buckets_received = 0;
1799 if (0 != ntohl (msg->offset))
1800 {
1801 GNUNET_break_op (0);
1802 fail_union_operation (op);
1803 return;
1804 }
1805 }
1806 else
1807 {
1808 GNUNET_assert (op->phase == PHASE_EXPECT_IBF_CONT);
1809 LOG (GNUNET_ERROR_TYPE_DEBUG,
1810 "Received more of IBF\n");
1811 }
1812 GNUNET_assert (NULL != op->remote_ibf);
1813
1814 ibf_read_slice (&msg[1],
1815 op->ibf_buckets_received,
1816 buckets_in_message,
1817 op->remote_ibf);
1818 op->ibf_buckets_received += buckets_in_message;
1819
1820 if (op->ibf_buckets_received == op->remote_ibf->size)
1821 {
1822 LOG (GNUNET_ERROR_TYPE_DEBUG,
1823 "received full ibf\n");
1824 op->phase = PHASE_INVENTORY_ACTIVE;
1825 if (GNUNET_OK !=
1826 decode_and_send (op))
1827 {
1828 /* Internal error, best we can do is shut down */
1829 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1830 "Failed to decode IBF, closing connection\n");
1831 fail_union_operation (op);
1832 return;
1833 }
1834 }
1835 GNUNET_CADET_receive_done (op->channel);
1836}
1837
1838
1839/**
1840 * Send a result message to the client indicating
1841 * that there is a new element.
1842 *
1843 * @param op union operation
1844 * @param element element to send
1845 * @param status status to send with the new element
1846 */
1847static void
1848send_client_element (struct Operation *op,
1849 const struct GNUNET_SETU_Element *element,
1850 enum GNUNET_SETU_Status status)
1851{
1852 struct GNUNET_MQ_Envelope *ev;
1853 struct GNUNET_SETU_ResultMessage *rm;
1854
1855 LOG (GNUNET_ERROR_TYPE_DEBUG,
1856 "sending element (size %u) to client\n",
1857 element->size);
1858 GNUNET_assert (0 != op->client_request_id);
1859 ev = GNUNET_MQ_msg_extra (rm,
1860 element->size,
1861 GNUNET_MESSAGE_TYPE_SETU_RESULT);
1862 if (NULL == ev)
1863 {
1864 GNUNET_MQ_discard (ev);
1865 GNUNET_break (0);
1866 return;
1867 }
1868 rm->result_status = htons (status);
1869 rm->request_id = htonl (op->client_request_id);
1870 rm->element_type = htons (element->element_type);
1871 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1872 op->key_to_element));
1873 GNUNET_memcpy (&rm[1],
1874 element->data,
1875 element->size);
1876 GNUNET_MQ_send (op->set->cs->mq,
1877 ev);
1878}
1879
1880
1881/**
1882 * Tests if the operation is finished, and if so notify.
1883 *
1884 * @param op operation to check
1885 */
1886static void
1887maybe_finish (struct Operation *op)
1888{
1889 unsigned int num_demanded;
1890
1891 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1892 op->demanded_hashes);
1893
1894 if (PHASE_FINISH_WAITING == op->phase)
1895 {
1896 LOG (GNUNET_ERROR_TYPE_DEBUG,
1897 "In PHASE_FINISH_WAITING, pending %u demands\n",
1898 num_demanded);
1899 if (0 == num_demanded)
1900 {
1901 struct GNUNET_MQ_Envelope *ev;
1902
1903 op->phase = PHASE_DONE;
1904 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1905 GNUNET_MQ_send (op->mq,
1906 ev);
1907 /* We now wait until the other peer sends P2P_OVER
1908 * after it got all elements from us. */
1909 }
1910 }
1911 if (PHASE_FINISH_CLOSING == op->phase)
1912 {
1913 LOG (GNUNET_ERROR_TYPE_DEBUG,
1914 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1915 num_demanded);
1916 if (0 == num_demanded)
1917 {
1918 op->phase = PHASE_DONE;
1919 send_client_done (op);
1920 _GSS_operation_destroy2 (op);
1921 }
1922 }
1923}
1924
1925
1926/**
1927 * Check an element message from a remote peer.
1928 *
1929 * @param cls the union operation
1930 * @param emsg the message
1931 */
1932static int
1933check_union_p2p_elements (void *cls,
1934 const struct GNUNET_SETU_ElementMessage *emsg)
1935{
1936 struct Operation *op = cls;
1937
1938 if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
1939 {
1940 GNUNET_break_op (0);
1941 return GNUNET_SYSERR;
1942 }
1943 return GNUNET_OK;
1944}
1945
1946
1947/**
1948 * Handle an element message from a remote peer.
1949 * Sent by the other peer either because we decoded an IBF and placed a demand,
1950 * or because the other peer switched to full set transmission.
1951 *
1952 * @param cls the union operation
1953 * @param emsg the message
1954 */
1955static void
1956handle_union_p2p_elements (void *cls,
1957 const struct GNUNET_SETU_ElementMessage *emsg)
1958{
1959 struct Operation *op = cls;
1960 struct ElementEntry *ee;
1961 struct KeyEntry *ke;
1962 uint16_t element_size;
1963
1964 element_size = ntohs (emsg->header.size) - sizeof(struct
1965 GNUNET_SETU_ElementMessage);
1966 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1967 GNUNET_memcpy (&ee[1],
1968 &emsg[1],
1969 element_size);
1970 ee->element.size = element_size;
1971 ee->element.data = &ee[1];
1972 ee->element.element_type = ntohs (emsg->element_type);
1973 ee->remote = GNUNET_YES;
1974 GNUNET_SETU_element_hash (&ee->element,
1975 &ee->element_hash);
1976 if (GNUNET_NO ==
1977 GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
1978 &ee->element_hash,
1979 NULL))
1980 {
1981 /* We got something we didn't demand, since it's not in our map. */
1982 GNUNET_break_op (0);
1983 fail_union_operation (op);
1984 return;
1985 }
1986
1987 LOG (GNUNET_ERROR_TYPE_DEBUG,
1988 "Got element (size %u, hash %s) from peer\n",
1989 (unsigned int) element_size,
1990 GNUNET_h2s (&ee->element_hash));
1991
1992 GNUNET_STATISTICS_update (_GSS_statistics,
1993 "# received elements",
1994 1,
1995 GNUNET_NO);
1996 GNUNET_STATISTICS_update (_GSS_statistics,
1997 "# exchanged elements",
1998 1,
1999 GNUNET_NO);
2000
2001 op->received_total++;
2002
2003 ke = op_get_element (op,
2004 &ee->element_hash);
2005 if (NULL != ke)
2006 {
2007 /* Got repeated element. Should not happen since
2008 * we track demands. */
2009 GNUNET_STATISTICS_update (_GSS_statistics,
2010 "# repeated elements",
2011 1,
2012 GNUNET_NO);
2013 ke->received = GNUNET_YES;
2014 GNUNET_free (ee);
2015 }
2016 else
2017 {
2018 LOG (GNUNET_ERROR_TYPE_DEBUG,
2019 "Registering new element from remote peer\n");
2020 op->received_fresh++;
2021 op_register_element (op, ee, GNUNET_YES);
2022 /* only send results immediately if the client wants it */
2023 send_client_element (op,
2024 &ee->element,
2025 GNUNET_SETU_STATUS_ADD_LOCAL);
2026 }
2027
2028 if ((op->received_total > 8) &&
2029 (op->received_fresh < op->received_total / 3))
2030 {
2031 /* The other peer gave us lots of old elements, there's something wrong. */
2032 GNUNET_break_op (0);
2033 fail_union_operation (op);
2034 return;
2035 }
2036 GNUNET_CADET_receive_done (op->channel);
2037 maybe_finish (op);
2038}
2039
2040
2041/**
2042 * Check a full element message from a remote peer.
2043 *
2044 * @param cls the union operation
2045 * @param emsg the message
2046 */
2047static int
2048check_union_p2p_full_element (void *cls,
2049 const struct GNUNET_SETU_ElementMessage *emsg)
2050{
2051 struct Operation *op = cls;
2052
2053 (void) op;
2054 // FIXME: check that we expect full elements here?
2055 return GNUNET_OK;
2056}
2057
2058
2059/**
2060 * Handle an element message from a remote peer.
2061 *
2062 * @param cls the union operation
2063 * @param emsg the message
2064 */
2065static void
2066handle_union_p2p_full_element (void *cls,
2067 const struct GNUNET_SETU_ElementMessage *emsg)
2068{
2069 struct Operation *op = cls;
2070 struct ElementEntry *ee;
2071 struct KeyEntry *ke;
2072 uint16_t element_size;
2073
2074 element_size = ntohs (emsg->header.size)
2075 - sizeof(struct GNUNET_SETU_ElementMessage);
2076 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2077 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
2078 ee->element.size = element_size;
2079 ee->element.data = &ee[1];
2080 ee->element.element_type = ntohs (emsg->element_type);
2081 ee->remote = GNUNET_YES;
2082 GNUNET_SETU_element_hash (&ee->element,
2083 &ee->element_hash);
2084 LOG (GNUNET_ERROR_TYPE_DEBUG,
2085 "Got element (full diff, size %u, hash %s) from peer\n",
2086 (unsigned int) element_size,
2087 GNUNET_h2s (&ee->element_hash));
2088
2089 GNUNET_STATISTICS_update (_GSS_statistics,
2090 "# received elements",
2091 1,
2092 GNUNET_NO);
2093 GNUNET_STATISTICS_update (_GSS_statistics,
2094 "# exchanged elements",
2095 1,
2096 GNUNET_NO);
2097
2098 op->received_total++;
2099
2100 ke = op_get_element (op,
2101 &ee->element_hash);
2102 if (NULL != ke)
2103 {
2104 /* Got repeated element. Should not happen since
2105 * we track demands. */
2106 GNUNET_STATISTICS_update (_GSS_statistics,
2107 "# repeated elements",
2108 1,
2109 GNUNET_NO);
2110 ke->received = GNUNET_YES;
2111 GNUNET_free (ee);
2112 }
2113 else
2114 {
2115 LOG (GNUNET_ERROR_TYPE_DEBUG,
2116 "Registering new element from remote peer\n");
2117 op->received_fresh++;
2118 op_register_element (op, ee, GNUNET_YES);
2119 /* only send results immediately if the client wants it */
2120 send_client_element (op,
2121 &ee->element,
2122 GNUNET_SETU_STATUS_ADD_LOCAL);
2123 }
2124
2125 if ((GNUNET_YES == op->byzantine) &&
2126 (op->received_total > 384 + op->received_fresh * 4) &&
2127 (op->received_fresh < op->received_total / 6))
2128 {
2129 /* The other peer gave us lots of old elements, there's something wrong. */
2130 LOG (GNUNET_ERROR_TYPE_ERROR,
2131 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
2132 (unsigned long long) op->received_fresh,
2133 (unsigned long long) op->received_total);
2134 GNUNET_break_op (0);
2135 fail_union_operation (op);
2136 return;
2137 }
2138 GNUNET_CADET_receive_done (op->channel);
2139}
2140
2141
2142/**
2143 * Send offers (for GNUNET_Hash-es) in response
2144 * to inquiries (for IBF_Key-s).
2145 *
2146 * @param cls the union operation
2147 * @param msg the message
2148 */
2149static int
2150check_union_p2p_inquiry (void *cls,
2151 const struct InquiryMessage *msg)
2152{
2153 struct Operation *op = cls;
2154 unsigned int num_keys;
2155
2156 if (op->phase != PHASE_INVENTORY_PASSIVE)
2157 {
2158 GNUNET_break_op (0);
2159 return GNUNET_SYSERR;
2160 }
2161 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2162 / sizeof(struct IBF_Key);
2163 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2164 != num_keys * sizeof(struct IBF_Key))
2165 {
2166 GNUNET_break_op (0);
2167 return GNUNET_SYSERR;
2168 }
2169 return GNUNET_OK;
2170}
2171
2172
2173/**
2174 * Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
2175 *
2176 * @param cls the union operation
2177 * @param msg the message
2178 */
2179static void
2180handle_union_p2p_inquiry (void *cls,
2181 const struct InquiryMessage *msg)
2182{
2183 struct Operation *op = cls;
2184 const struct IBF_Key *ibf_key;
2185 unsigned int num_keys;
2186
2187 LOG (GNUNET_ERROR_TYPE_DEBUG,
2188 "Received union inquiry\n");
2189 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2190 / sizeof(struct IBF_Key);
2191 ibf_key = (const struct IBF_Key *) &msg[1];
2192 while (0 != num_keys--)
2193 {
2194 struct IBF_Key unsalted_key;
2195
2196 unsalt_key (ibf_key,
2197 ntohl (msg->salt),
2198 &unsalted_key);
2199 send_offers_for_key (op,
2200 unsalted_key);
2201 ibf_key++;
2202 }
2203 GNUNET_CADET_receive_done (op->channel);
2204}
2205
2206
2207/**
2208 * Iterator over hash map entries, called to destroy the linked list of
2209 * colliding ibf key entries.
2210 *
2211 * @param cls closure
2212 * @param key current key code
2213 * @param value value in the hash map
2214 * @return #GNUNET_YES if we should continue to iterate,
2215 * #GNUNET_NO if not.
2216 */
2217static int
2218send_missing_full_elements_iter (void *cls,
2219 uint32_t key,
2220 void *value)
2221{
2222 struct Operation *op = cls;
2223 struct KeyEntry *ke = value;
2224 struct GNUNET_MQ_Envelope *ev;
2225 struct GNUNET_SETU_ElementMessage *emsg;
2226 struct ElementEntry *ee = ke->element;
2227
2228 if (GNUNET_YES == ke->received)
2229 return GNUNET_YES;
2230 ev = GNUNET_MQ_msg_extra (emsg,
2231 ee->element.size,
2232 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
2233 GNUNET_memcpy (&emsg[1],
2234 ee->element.data,
2235 ee->element.size);
2236 emsg->element_type = htons (ee->element.element_type);
2237 GNUNET_MQ_send (op->mq,
2238 ev);
2239 return GNUNET_YES;
2240}
2241
2242
2243/**
2244 * Handle a request for full set transmission.
2245 *
2246 * @parem cls closure, a set union operation
2247 * @param mh the demand message
2248 */
2249static void
2250handle_union_p2p_request_full (void *cls,
2251 const struct GNUNET_MessageHeader *mh)
2252{
2253 struct Operation *op = cls;
2254
2255 LOG (GNUNET_ERROR_TYPE_DEBUG,
2256 "Received request for full set transmission\n");
2257 if (PHASE_EXPECT_IBF != op->phase)
2258 {
2259 GNUNET_break_op (0);
2260 fail_union_operation (op);
2261 return;
2262 }
2263
2264 // FIXME: we need to check that our set is larger than the
2265 // byzantine_lower_bound by some threshold
2266 send_full_set (op);
2267 GNUNET_CADET_receive_done (op->channel);
2268}
2269
2270
2271/**
2272 * Handle a "full done" message.
2273 *
2274 * @parem cls closure, a set union operation
2275 * @param mh the demand message
2276 */
2277static void
2278handle_union_p2p_full_done (void *cls,
2279 const struct GNUNET_MessageHeader *mh)
2280{
2281 struct Operation *op = cls;
2282
2283 switch (op->phase)
2284 {
2285 case PHASE_EXPECT_IBF:
2286 {
2287 struct GNUNET_MQ_Envelope *ev;
2288
2289 LOG (GNUNET_ERROR_TYPE_DEBUG,
2290 "got FULL DONE, sending elements that other peer is missing\n");
2291
2292 /* send all the elements that did not come from the remote peer */
2293 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
2294 &send_missing_full_elements_iter,
2295 op);
2296 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
2297 GNUNET_MQ_send (op->mq,
2298 ev);
2299 op->phase = PHASE_DONE;
2300 /* we now wait until the other peer sends us the OVER message*/
2301 }
2302 break;
2303
2304 case PHASE_FULL_SENDING:
2305 {
2306 LOG (GNUNET_ERROR_TYPE_DEBUG,
2307 "got FULL DONE, finishing\n");
2308 /* We sent the full set, and got the response for that. We're done. */
2309 op->phase = PHASE_DONE;
2310 GNUNET_CADET_receive_done (op->channel);
2311 send_client_done (op);
2312 _GSS_operation_destroy2 (op);
2313 return;
2314 }
2315 break;
2316
2317 default:
2318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2319 "Handle full done phase is %u\n",
2320 (unsigned) op->phase);
2321 GNUNET_break_op (0);
2322 fail_union_operation (op);
2323 return;
2324 }
2325 GNUNET_CADET_receive_done (op->channel);
2326}
2327
2328
2329/**
2330 * Check a demand by the other peer for elements based on a list
2331 * of `struct GNUNET_HashCode`s.
2332 *
2333 * @parem cls closure, a set union operation
2334 * @param mh the demand message
2335 * @return #GNUNET_OK if @a mh is well-formed
2336 */
2337static int
2338check_union_p2p_demand (void *cls,
2339 const struct GNUNET_MessageHeader *mh)
2340{
2341 struct Operation *op = cls;
2342 unsigned int num_hashes;
2343
2344 (void) op;
2345 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2346 / sizeof(struct GNUNET_HashCode);
2347 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2348 != num_hashes * sizeof(struct GNUNET_HashCode))
2349 {
2350 GNUNET_break_op (0);
2351 return GNUNET_SYSERR;
2352 }
2353 return GNUNET_OK;
2354}
2355
2356
2357/**
2358 * Handle a demand by the other peer for elements based on a list
2359 * of `struct GNUNET_HashCode`s.
2360 *
2361 * @parem cls closure, a set union operation
2362 * @param mh the demand message
2363 */
2364static void
2365handle_union_p2p_demand (void *cls,
2366 const struct GNUNET_MessageHeader *mh)
2367{
2368 struct Operation *op = cls;
2369 struct ElementEntry *ee;
2370 struct GNUNET_SETU_ElementMessage *emsg;
2371 const struct GNUNET_HashCode *hash;
2372 unsigned int num_hashes;
2373 struct GNUNET_MQ_Envelope *ev;
2374
2375 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2376 / sizeof(struct GNUNET_HashCode);
2377 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2378 num_hashes > 0;
2379 hash++, num_hashes--)
2380 {
2381 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2382 hash);
2383 if (NULL == ee)
2384 {
2385 /* Demand for non-existing element. */
2386 GNUNET_break_op (0);
2387 fail_union_operation (op);
2388 return;
2389 }
2390 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2391 {
2392 /* Probably confused lazily copied sets. */
2393 GNUNET_break_op (0);
2394 fail_union_operation (op);
2395 return;
2396 }
2397 ev = GNUNET_MQ_msg_extra (emsg,
2398 ee->element.size,
2399 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
2400 GNUNET_memcpy (&emsg[1],
2401 ee->element.data,
2402 ee->element.size);
2403 emsg->reserved = htons (0);
2404 emsg->element_type = htons (ee->element.element_type);
2405 LOG (GNUNET_ERROR_TYPE_DEBUG,
2406 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2407 (void *) op,
2408 (unsigned int) ee->element.size,
2409 GNUNET_h2s (&ee->element_hash));
2410 GNUNET_MQ_send (op->mq, ev);
2411 GNUNET_STATISTICS_update (_GSS_statistics,
2412 "# exchanged elements",
2413 1,
2414 GNUNET_NO);
2415 if (op->symmetric)
2416 send_client_element (op,
2417 &ee->element,
2418 GNUNET_SET_STATUS_ADD_REMOTE);
2419 }
2420 GNUNET_CADET_receive_done (op->channel);
2421}
2422
2423
2424/**
2425 * Check offer (of `struct GNUNET_HashCode`s).
2426 *
2427 * @param cls the union operation
2428 * @param mh the message
2429 * @return #GNUNET_OK if @a mh is well-formed
2430 */
2431static int
2432check_union_p2p_offer (void *cls,
2433 const struct GNUNET_MessageHeader *mh)
2434{
2435 struct Operation *op = cls;
2436 unsigned int num_hashes;
2437
2438 /* look up elements and send them */
2439 if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
2440 (op->phase != PHASE_INVENTORY_ACTIVE))
2441 {
2442 GNUNET_break_op (0);
2443 return GNUNET_SYSERR;
2444 }
2445 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2446 / sizeof(struct GNUNET_HashCode);
2447 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2448 num_hashes * sizeof(struct GNUNET_HashCode))
2449 {
2450 GNUNET_break_op (0);
2451 return GNUNET_SYSERR;
2452 }
2453 return GNUNET_OK;
2454}
2455
2456
2457/**
2458 * Handle offers (of `struct GNUNET_HashCode`s) and
2459 * respond with demands (of `struct GNUNET_HashCode`s).
2460 *
2461 * @param cls the union operation
2462 * @param mh the message
2463 */
2464static void
2465handle_union_p2p_offer (void *cls,
2466 const struct GNUNET_MessageHeader *mh)
2467{
2468 struct Operation *op = cls;
2469 const struct GNUNET_HashCode *hash;
2470 unsigned int num_hashes;
2471
2472 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2473 / sizeof(struct GNUNET_HashCode);
2474 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2475 num_hashes > 0;
2476 hash++, num_hashes--)
2477 {
2478 struct ElementEntry *ee;
2479 struct GNUNET_MessageHeader *demands;
2480 struct GNUNET_MQ_Envelope *ev;
2481
2482 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2483 hash);
2484 if (NULL != ee)
2485 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2486 continue;
2487
2488 if (GNUNET_YES ==
2489 GNUNET_CONTAINER_multihashmap_contains (op->demanded_hashes,
2490 hash))
2491 {
2492 LOG (GNUNET_ERROR_TYPE_DEBUG,
2493 "Skipped sending duplicate demand\n");
2494 continue;
2495 }
2496
2497 GNUNET_assert (GNUNET_OK ==
2498 GNUNET_CONTAINER_multihashmap_put (
2499 op->demanded_hashes,
2500 hash,
2501 NULL,
2502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2503
2504 LOG (GNUNET_ERROR_TYPE_DEBUG,
2505 "[OP %x] Requesting element (hash %s)\n",
2506 (void *) op, GNUNET_h2s (hash));
2507 ev = GNUNET_MQ_msg_header_extra (demands,
2508 sizeof(struct GNUNET_HashCode),
2509 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
2510 GNUNET_memcpy (&demands[1],
2511 hash,
2512 sizeof(struct GNUNET_HashCode));
2513 GNUNET_MQ_send (op->mq, ev);
2514 }
2515 GNUNET_CADET_receive_done (op->channel);
2516}
2517
2518
2519/**
2520 * Handle a done message from a remote peer
2521 *
2522 * @param cls the union operation
2523 * @param mh the message
2524 */
2525static void
2526handle_union_p2p_done (void *cls,
2527 const struct GNUNET_MessageHeader *mh)
2528{
2529 struct Operation *op = cls;
2530
2531 switch (op->phase)
2532 {
2533 case PHASE_INVENTORY_PASSIVE:
2534 /* We got all requests, but still have to send our elements in response. */
2535 op->phase = PHASE_FINISH_WAITING;
2536 LOG (GNUNET_ERROR_TYPE_DEBUG,
2537 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2538 /* The active peer is done sending offers
2539 * and inquiries. This means that all
2540 * our responses to that (demands and offers)
2541 * must be in flight (queued or in mesh).
2542 *
2543 * We should notify the active peer once
2544 * all our demands are satisfied, so that the active
2545 * peer can quit if we gave it everything.
2546 */GNUNET_CADET_receive_done (op->channel);
2547 maybe_finish (op);
2548 return;
2549 case PHASE_INVENTORY_ACTIVE:
2550 LOG (GNUNET_ERROR_TYPE_DEBUG,
2551 "got DONE (as active partner), waiting to finish\n");
2552 /* All demands of the other peer are satisfied,
2553 * and we processed all offers, thus we know
2554 * exactly what our demands must be.
2555 *
2556 * We'll close the channel
2557 * to the other peer once our demands are met.
2558 */op->phase = PHASE_FINISH_CLOSING;
2559 GNUNET_CADET_receive_done (op->channel);
2560 maybe_finish (op);
2561 return;
2562 default:
2563 GNUNET_break_op (0);
2564 fail_union_operation (op);
2565 return;
2566 }
2567}
2568
2569
2570/**
2571 * Handle a over message from a remote peer
2572 *
2573 * @param cls the union operation
2574 * @param mh the message
2575 */
2576static void
2577handle_union_p2p_over (void *cls,
2578 const struct GNUNET_MessageHeader *mh)
2579{
2580 send_client_done (cls);
2581}
2582
2583
2584/**
2585 * Get the incoming socket associated with the given id.
2586 *
2587 * @param listener the listener to look in
2588 * @param id id to look for
2589 * @return the incoming socket associated with the id,
2590 * or NULL if there is none
2591 */
2592static struct Operation *
2593get_incoming (uint32_t id)
2594{
2595 for (struct Listener *listener = listener_head;
2596 NULL != listener;
2597 listener = listener->next)
2598 {
2599 for (struct Operation *op = listener->op_head;
2600 NULL != op;
2601 op = op->next)
2602 if (op->suggest_id == id)
2603 return op;
2604 }
2605 return NULL;
2606}
2607
2608
2609/**
2610 * Callback called when a client connects to the service.
2611 *
2612 * @param cls closure for the service
2613 * @param c the new client that connected to the service
2614 * @param mq the message queue used to send messages to the client
2615 * @return @a `struct ClientState`
2616 */
2617static void *
2618client_connect_cb (void *cls,
2619 struct GNUNET_SERVICE_Client *c,
2620 struct GNUNET_MQ_Handle *mq)
2621{
2622 struct ClientState *cs;
2623
2624 num_clients++;
2625 cs = GNUNET_new (struct ClientState);
2626 cs->client = c;
2627 cs->mq = mq;
2628 return cs;
2629}
2630
2631
2632/**
2633 * Iterator over hash map entries to free element entries.
2634 *
2635 * @param cls closure
2636 * @param key current key code
2637 * @param value a `struct ElementEntry *` to be free'd
2638 * @return #GNUNET_YES (continue to iterate)
2639 */
2640static int
2641destroy_elements_iterator (void *cls,
2642 const struct GNUNET_HashCode *key,
2643 void *value)
2644{
2645 struct ElementEntry *ee = value;
2646
2647 GNUNET_free (ee);
2648 return GNUNET_YES;
2649}
2650
2651
2652/**
2653 * Clean up after a client has disconnected
2654 *
2655 * @param cls closure, unused
2656 * @param client the client to clean up after
2657 * @param internal_cls the `struct ClientState`
2658 */
2659static void
2660client_disconnect_cb (void *cls,
2661 struct GNUNET_SERVICE_Client *client,
2662 void *internal_cls)
2663{
2664 struct ClientState *cs = internal_cls;
2665 struct Operation *op;
2666 struct Listener *listener;
2667 struct Set *set;
2668
2669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2670 "Client disconnected, cleaning up\n");
2671 if (NULL != (set = cs->set))
2672 {
2673 struct SetContent *content = set->content;
2674
2675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2676 "Destroying client's set\n");
2677 /* Destroy pending set operations */
2678 while (NULL != set->ops_head)
2679 _GSS_operation_destroy (set->ops_head);
2680
2681 /* Destroy operation-specific state */
2682 if (NULL != set->se)
2683 {
2684 strata_estimator_destroy (set->se);
2685 set->se = NULL;
2686 }
2687 /* free set content (or at least decrement RC) */
2688 set->content = NULL;
2689 GNUNET_assert (0 != content->refcount);
2690 content->refcount--;
2691 if (0 == content->refcount)
2692 {
2693 GNUNET_assert (NULL != content->elements);
2694 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
2695 &destroy_elements_iterator,
2696 NULL);
2697 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
2698 content->elements = NULL;
2699 GNUNET_free (content);
2700 }
2701 GNUNET_free (set);
2702 }
2703
2704 if (NULL != (listener = cs->listener))
2705 {
2706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707 "Destroying client's listener\n");
2708 GNUNET_CADET_close_port (listener->open_port);
2709 listener->open_port = NULL;
2710 while (NULL != (op = listener->op_head))
2711 {
2712 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2713 "Destroying incoming operation `%u' from peer `%s'\n",
2714 (unsigned int) op->client_request_id,
2715 GNUNET_i2s (&op->peer));
2716 incoming_destroy (op);
2717 }
2718 GNUNET_CONTAINER_DLL_remove (listener_head,
2719 listener_tail,
2720 listener);
2721 GNUNET_free (listener);
2722 }
2723 GNUNET_free (cs);
2724 num_clients--;
2725 if ( (GNUNET_YES == in_shutdown) &&
2726 (0 == num_clients) )
2727 {
2728 if (NULL != cadet)
2729 {
2730 GNUNET_CADET_disconnect (cadet);
2731 cadet = NULL;
2732 }
2733 }
2734}
2735
2736
2737/**
2738 * Check a request for a set operation from another peer.
2739 *
2740 * @param cls the operation state
2741 * @param msg the received message
2742 * @return #GNUNET_OK if the channel should be kept alive,
2743 * #GNUNET_SYSERR to destroy the channel
2744 */
2745static int
2746check_incoming_msg (void *cls,
2747 const struct OperationRequestMessage *msg)
2748{
2749 struct Operation *op = cls;
2750 struct Listener *listener = op->listener;
2751 const struct GNUNET_MessageHeader *nested_context;
2752
2753 /* double operation request */
2754 if (0 != op->suggest_id)
2755 {
2756 GNUNET_break_op (0);
2757 return GNUNET_SYSERR;
2758 }
2759 /* This should be equivalent to the previous condition, but can't hurt to check twice */
2760 if (NULL == listener)
2761 {
2762 GNUNET_break (0);
2763 return GNUNET_SYSERR;
2764 }
2765 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2766 if ((NULL != nested_context) &&
2767 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
2768 {
2769 GNUNET_break_op (0);
2770 return GNUNET_SYSERR;
2771 }
2772 return GNUNET_OK;
2773}
2774
2775
2776/**
2777 * Handle a request for a set operation from another peer. Checks if we
2778 * have a listener waiting for such a request (and in that case initiates
2779 * asking the listener about accepting the connection). If no listener
2780 * is waiting, we queue the operation request in hope that a listener
2781 * shows up soon (before timeout).
2782 *
2783 * This msg is expected as the first and only msg handled through the
2784 * non-operation bound virtual table, acceptance of this operation replaces
2785 * our virtual table and subsequent msgs would be routed differently (as
2786 * we then know what type of operation this is).
2787 *
2788 * @param cls the operation state
2789 * @param msg the received message
2790 * @return #GNUNET_OK if the channel should be kept alive,
2791 * #GNUNET_SYSERR to destroy the channel
2792 */
2793static void
2794handle_incoming_msg (void *cls,
2795 const struct OperationRequestMessage *msg)
2796{
2797 struct Operation *op = cls;
2798 struct Listener *listener = op->listener;
2799 const struct GNUNET_MessageHeader *nested_context;
2800 struct GNUNET_MQ_Envelope *env;
2801 struct GNUNET_SETU_RequestMessage *cmsg;
2802
2803 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2804 /* Make a copy of the nested_context (application-specific context
2805 information that is opaque to set) so we can pass it to the
2806 listener later on */
2807 if (NULL != nested_context)
2808 op->context_msg = GNUNET_copy_message (nested_context);
2809 op->remote_element_count = ntohl (msg->element_count);
2810 GNUNET_log (
2811 GNUNET_ERROR_TYPE_DEBUG,
2812 "Received P2P operation request (port %s) for active listener\n",
2813 GNUNET_h2s (&op->listener->app_id));
2814 GNUNET_assert (0 == op->suggest_id);
2815 if (0 == suggest_id)
2816 suggest_id++;
2817 op->suggest_id = suggest_id++;
2818 GNUNET_assert (NULL != op->timeout_task);
2819 GNUNET_SCHEDULER_cancel (op->timeout_task);
2820 op->timeout_task = NULL;
2821 env = GNUNET_MQ_msg_nested_mh (cmsg,
2822 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
2823 op->context_msg);
2824 GNUNET_log (
2825 GNUNET_ERROR_TYPE_DEBUG,
2826 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
2827 op->suggest_id,
2828 listener,
2829 listener->cs);
2830 cmsg->accept_id = htonl (op->suggest_id);
2831 cmsg->peer_id = op->peer;
2832 GNUNET_MQ_send (listener->cs->mq,
2833 env);
2834 /* NOTE: GNUNET_CADET_receive_done() will be called in
2835 #handle_client_accept() */
2836}
2837
2838
2839/**
2840 * Called when a client wants to create a new set. This is typically
2841 * the first request from a client, and includes the type of set
2842 * operation to be performed.
2843 *
2844 * @param cls client that sent the message
2845 * @param m message sent by the client
2846 */
2847static void
2848handle_client_create_set (void *cls,
2849 const struct GNUNET_SETU_CreateMessage *msg)
2850{
2851 struct ClientState *cs = cls;
2852 struct Set *set;
2853
2854 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2855 "Client created new set for union operation\n");
2856 if (NULL != cs->set)
2857 {
2858 /* There can only be one set per client */
2859 GNUNET_break (0);
2860 GNUNET_SERVICE_client_drop (cs->client);
2861 return;
2862 }
2863 set = GNUNET_new (struct Set);
2864 {
2865 struct StrataEstimator *se;
2866
2867 se = strata_estimator_create (SE_STRATA_COUNT,
2868 SE_IBF_SIZE,
2869 SE_IBF_HASH_NUM);
2870 if (NULL == se)
2871 {
2872 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2873 "Failed to allocate strata estimator\n");
2874 GNUNET_free (set);
2875 GNUNET_SERVICE_client_drop (cs->client);
2876 return;
2877 }
2878 set->se = se;
2879 }
2880 set->content = GNUNET_new (struct SetContent);
2881 set->content->refcount = 1;
2882 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
2883 GNUNET_YES);
2884 set->cs = cs;
2885 cs->set = set;
2886 GNUNET_SERVICE_client_continue (cs->client);
2887}
2888
2889
2890/**
2891 * Timeout happens iff:
2892 * - we suggested an operation to our listener,
2893 * but did not receive a response in time
2894 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
2895 *
2896 * @param cls channel context
2897 * @param tc context information (why was this task triggered now)
2898 */
2899static void
2900incoming_timeout_cb (void *cls)
2901{
2902 struct Operation *op = cls;
2903
2904 op->timeout_task = NULL;
2905 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2906 "Remote peer's incoming request timed out\n");
2907 incoming_destroy (op);
2908}
2909
2910
2911/**
2912 * Method called whenever another peer has added us to a channel the
2913 * other peer initiated. Only called (once) upon reception of data
2914 * from a channel we listen on.
2915 *
2916 * The channel context represents the operation itself and gets added
2917 * to a DLL, from where it gets looked up when our local listener
2918 * client responds to a proposed/suggested operation or connects and
2919 * associates with this operation.
2920 *
2921 * @param cls closure
2922 * @param channel new handle to the channel
2923 * @param source peer that started the channel
2924 * @return initial channel context for the channel
2925 * returns NULL on error
2926 */
2927static void *
2928channel_new_cb (void *cls,
2929 struct GNUNET_CADET_Channel *channel,
2930 const struct GNUNET_PeerIdentity *source)
2931{
2932 struct Listener *listener = cls;
2933 struct Operation *op;
2934
2935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2936 "New incoming channel\n");
2937 op = GNUNET_new (struct Operation);
2938 op->listener = listener;
2939 op->peer = *source;
2940 op->channel = channel;
2941 op->mq = GNUNET_CADET_get_mq (op->channel);
2942 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2943 UINT32_MAX);
2944 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
2945 &incoming_timeout_cb,
2946 op);
2947 GNUNET_CONTAINER_DLL_insert (listener->op_head,
2948 listener->op_tail,
2949 op);
2950 return op;
2951}
2952
2953
2954/**
2955 * Function called whenever a channel is destroyed. Should clean up
2956 * any associated state. It must NOT call
2957 * GNUNET_CADET_channel_destroy() on the channel.
2958 *
2959 * The peer_disconnect function is part of a a virtual table set initially either
2960 * when a peer creates a new channel with us, or once we create
2961 * a new channel ourselves (evaluate).
2962 *
2963 * Once we know the exact type of operation (union/intersection), the vt is
2964 * replaced with an operation specific instance (_GSS_[op]_vt).
2965 *
2966 * @param channel_ctx place where local state associated
2967 * with the channel is stored
2968 * @param channel connection to the other end (henceforth invalid)
2969 */
2970static void
2971channel_end_cb (void *channel_ctx,
2972 const struct GNUNET_CADET_Channel *channel)
2973{
2974 struct Operation *op = channel_ctx;
2975
2976 op->channel = NULL;
2977 _GSS_operation_destroy2 (op);
2978}
2979
2980
2981/**
2982 * Function called whenever an MQ-channel's transmission window size changes.
2983 *
2984 * The first callback in an outgoing channel will be with a non-zero value
2985 * and will mean the channel is connected to the destination.
2986 *
2987 * For an incoming channel it will be called immediately after the
2988 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
2989 *
2990 * @param cls Channel closure.
2991 * @param channel Connection to the other end (henceforth invalid).
2992 * @param window_size New window size. If the is more messages than buffer size
2993 * this value will be negative..
2994 */
2995static void
2996channel_window_cb (void *cls,
2997 const struct GNUNET_CADET_Channel *channel,
2998 int window_size)
2999{
3000 /* FIXME: not implemented, we could do flow control here... */
3001}
3002
3003
3004/**
3005 * Called when a client wants to create a new listener.
3006 *
3007 * @param cls client that sent the message
3008 * @param msg message sent by the client
3009 */
3010static void
3011handle_client_listen (void *cls,
3012 const struct GNUNET_SETU_ListenMessage *msg)
3013{
3014 struct ClientState *cs = cls;
3015 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3016 GNUNET_MQ_hd_var_size (incoming_msg,
3017 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3018 struct OperationRequestMessage,
3019 NULL),
3020 GNUNET_MQ_hd_var_size (union_p2p_ibf,
3021 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
3022 struct IBFMessage,
3023 NULL),
3024 GNUNET_MQ_hd_var_size (union_p2p_elements,
3025 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
3026 struct GNUNET_SETU_ElementMessage,
3027 NULL),
3028 GNUNET_MQ_hd_var_size (union_p2p_offer,
3029 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
3030 struct GNUNET_MessageHeader,
3031 NULL),
3032 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3033 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
3034 struct InquiryMessage,
3035 NULL),
3036 GNUNET_MQ_hd_var_size (union_p2p_demand,
3037 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
3038 struct GNUNET_MessageHeader,
3039 NULL),
3040 GNUNET_MQ_hd_fixed_size (union_p2p_done,
3041 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
3042 struct GNUNET_MessageHeader,
3043 NULL),
3044 GNUNET_MQ_hd_fixed_size (union_p2p_over,
3045 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
3046 struct GNUNET_MessageHeader,
3047 NULL),
3048 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3049 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3050 struct GNUNET_MessageHeader,
3051 NULL),
3052 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3053 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3054 struct GNUNET_MessageHeader,
3055 NULL),
3056 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3057 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3058 struct StrataEstimatorMessage,
3059 NULL),
3060 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3061 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
3062 struct StrataEstimatorMessage,
3063 NULL),
3064 GNUNET_MQ_hd_var_size (union_p2p_full_element,
3065 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3066 struct GNUNET_SETU_ElementMessage,
3067 NULL),
3068 GNUNET_MQ_handler_end ()
3069 };
3070 struct Listener *listener;
3071
3072 if (NULL != cs->listener)
3073 {
3074 /* max. one active listener per client! */
3075 GNUNET_break (0);
3076 GNUNET_SERVICE_client_drop (cs->client);
3077 return;
3078 }
3079 listener = GNUNET_new (struct Listener);
3080 listener->cs = cs;
3081 cs->listener = listener;
3082 listener->app_id = msg->app_id;
3083 GNUNET_CONTAINER_DLL_insert (listener_head,
3084 listener_tail,
3085 listener);
3086 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3087 "New listener created (port %s)\n",
3088 GNUNET_h2s (&listener->app_id));
3089 listener->open_port = GNUNET_CADET_open_port (cadet,
3090 &msg->app_id,
3091 &channel_new_cb,
3092 listener,
3093 &channel_window_cb,
3094 &channel_end_cb,
3095 cadet_handlers);
3096 GNUNET_SERVICE_client_continue (cs->client);
3097}
3098
3099
3100/**
3101 * Called when the listening client rejects an operation
3102 * request by another peer.
3103 *
3104 * @param cls client that sent the message
3105 * @param msg message sent by the client
3106 */
3107static void
3108handle_client_reject (void *cls,
3109 const struct GNUNET_SETU_RejectMessage *msg)
3110{
3111 struct ClientState *cs = cls;
3112 struct Operation *op;
3113
3114 op = get_incoming (ntohl (msg->accept_reject_id));
3115 if (NULL == op)
3116 {
3117 /* no matching incoming operation for this reject;
3118 could be that the other peer already disconnected... */
3119 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3120 "Client rejected unknown operation %u\n",
3121 (unsigned int) ntohl (msg->accept_reject_id));
3122 GNUNET_SERVICE_client_continue (cs->client);
3123 return;
3124 }
3125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3126 "Peer request (app %s) rejected by client\n",
3127 GNUNET_h2s (&cs->listener->app_id));
3128 _GSS_operation_destroy2 (op);
3129 GNUNET_SERVICE_client_continue (cs->client);
3130}
3131
3132
3133/**
3134 * Called when a client wants to add or remove an element to a set it inhabits.
3135 *
3136 * @param cls client that sent the message
3137 * @param msg message sent by the client
3138 */
3139static int
3140check_client_set_add (void *cls,
3141 const struct GNUNET_SETU_ElementMessage *msg)
3142{
3143 /* NOTE: Technically, we should probably check with the
3144 block library whether the element we are given is well-formed */
3145 return GNUNET_OK;
3146}
3147
3148
3149/**
3150 * Called when a client wants to add or remove an element to a set it inhabits.
3151 *
3152 * @param cls client that sent the message
3153 * @param msg message sent by the client
3154 */
3155static void
3156handle_client_set_add (void *cls,
3157 const struct GNUNET_SETU_ElementMessage *msg)
3158{
3159 struct ClientState *cs = cls;
3160 struct Set *set;
3161 struct GNUNET_SETU_Element el;
3162 struct ElementEntry *ee;
3163 struct GNUNET_HashCode hash;
3164
3165 if (NULL == (set = cs->set))
3166 {
3167 /* client without a set requested an operation */
3168 GNUNET_break (0);
3169 GNUNET_SERVICE_client_drop (cs->client);
3170 return;
3171 }
3172 GNUNET_SERVICE_client_continue (cs->client);
3173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
3174 el.size = ntohs (msg->header.size) - sizeof(*msg);
3175 el.data = &msg[1];
3176 el.element_type = ntohs (msg->element_type);
3177 GNUNET_SETU_element_hash (&el,
3178 &hash);
3179 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
3180 &hash);
3181 if (NULL == ee)
3182 {
3183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3184 "Client inserts element %s of size %u\n",
3185 GNUNET_h2s (&hash),
3186 el.size);
3187 ee = GNUNET_malloc (el.size + sizeof(*ee));
3188 ee->element.size = el.size;
3189 GNUNET_memcpy (&ee[1], el.data, el.size);
3190 ee->element.data = &ee[1];
3191 ee->element.element_type = el.element_type;
3192 ee->remote = GNUNET_NO;
3193 ee->generation = set->current_generation;
3194 ee->element_hash = hash;
3195 GNUNET_break (GNUNET_YES ==
3196 GNUNET_CONTAINER_multihashmap_put (
3197 set->content->elements,
3198 &ee->element_hash,
3199 ee,
3200 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3201 }
3202 else
3203 {
3204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3205 "Client inserted element %s of size %u twice (ignored)\n",
3206 GNUNET_h2s (&hash),
3207 el.size);
3208 /* same element inserted twice */
3209 return;
3210 }
3211 strata_estimator_insert (set->se,
3212 get_ibf_key (&ee->element_hash));
3213}
3214
3215
3216/**
3217 * Advance the current generation of a set,
3218 * adding exclusion ranges if necessary.
3219 *
3220 * @param set the set where we want to advance the generation
3221 */
3222static void
3223advance_generation (struct Set *set)
3224{
3225 set->content->latest_generation++;
3226 set->current_generation++;
3227}
3228
3229
3230/**
3231 * Called when a client wants to initiate a set operation with another
3232 * peer. Initiates the CADET connection to the listener and sends the
3233 * request.
3234 *
3235 * @param cls client that sent the message
3236 * @param msg message sent by the client
3237 * @return #GNUNET_OK if the message is well-formed
3238 */
3239static int
3240check_client_evaluate (void *cls,
3241 const struct GNUNET_SETU_EvaluateMessage *msg)
3242{
3243 /* FIXME: suboptimal, even if the context below could be NULL,
3244 there are malformed messages this does not check for... */
3245 return GNUNET_OK;
3246}
3247
3248
3249/**
3250 * Called when a client wants to initiate a set operation with another
3251 * peer. Initiates the CADET connection to the listener and sends the
3252 * request.
3253 *
3254 * @param cls client that sent the message
3255 * @param msg message sent by the client
3256 */
3257static void
3258handle_client_evaluate (void *cls,
3259 const struct GNUNET_SETU_EvaluateMessage *msg)
3260{
3261 struct ClientState *cs = cls;
3262 struct Operation *op = GNUNET_new (struct Operation);
3263 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3264 GNUNET_MQ_hd_var_size (incoming_msg,
3265 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3266 struct OperationRequestMessage,
3267 op),
3268 GNUNET_MQ_hd_var_size (union_p2p_ibf,
3269 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
3270 struct IBFMessage,
3271 op),
3272 GNUNET_MQ_hd_var_size (union_p2p_elements,
3273 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
3274 struct GNUNET_SETU_ElementMessage,
3275 op),
3276 GNUNET_MQ_hd_var_size (union_p2p_offer,
3277 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
3278 struct GNUNET_MessageHeader,
3279 op),
3280 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3281 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
3282 struct InquiryMessage,
3283 op),
3284 GNUNET_MQ_hd_var_size (union_p2p_demand,
3285 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
3286 struct GNUNET_MessageHeader,
3287 op),
3288 GNUNET_MQ_hd_fixed_size (union_p2p_done,
3289 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
3290 struct GNUNET_MessageHeader,
3291 op),
3292 GNUNET_MQ_hd_fixed_size (union_p2p_over,
3293 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
3294 struct GNUNET_MessageHeader,
3295 op),
3296 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3297 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3298 struct GNUNET_MessageHeader,
3299 op),
3300 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3301 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3302 struct GNUNET_MessageHeader,
3303 op),
3304 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3305 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3306 struct StrataEstimatorMessage,
3307 op),
3308 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3309 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
3310 struct StrataEstimatorMessage,
3311 op),
3312 GNUNET_MQ_hd_var_size (union_p2p_full_element,
3313 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3314 struct GNUNET_SETU_ElementMessage,
3315 op),
3316 GNUNET_MQ_handler_end ()
3317 };
3318 struct Set *set;
3319 const struct GNUNET_MessageHeader *context;
3320
3321 if (NULL == (set = cs->set))
3322 {
3323 GNUNET_break (0);
3324 GNUNET_free (op);
3325 GNUNET_SERVICE_client_drop (cs->client);
3326 return;
3327 }
3328 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
3329 UINT32_MAX);
3330 op->peer = msg->target_peer;
3331 op->client_request_id = ntohl (msg->request_id);
3332 op->byzantine = msg->byzantine;
3333 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3334 op->force_full = msg->force_full;
3335 op->force_delta = msg->force_delta;
3336 op->symmetric = msg->symmetric;
3337 context = GNUNET_MQ_extract_nested_mh (msg);
3338
3339 /* Advance generation values, so that
3340 mutations won't interfer with the running operation. */
3341 op->set = set;
3342 op->generation_created = set->current_generation;
3343 advance_generation (set);
3344 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3345 set->ops_tail,
3346 op);
3347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3348 "Creating new CADET channel to port %s for set union\n",
3349 GNUNET_h2s (&msg->app_id));
3350 op->channel = GNUNET_CADET_channel_create (cadet,
3351 op,
3352 &msg->target_peer,
3353 &msg->app_id,
3354 &channel_window_cb,
3355 &channel_end_cb,
3356 cadet_handlers);
3357 op->mq = GNUNET_CADET_get_mq (op->channel);
3358 {
3359 struct GNUNET_MQ_Envelope *ev;
3360 struct OperationRequestMessage *msg;
3361
3362 ev = GNUNET_MQ_msg_nested_mh (msg,
3363 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3364 context);
3365 if (NULL == ev)
3366 {
3367 /* the context message is too large */
3368 GNUNET_break (0);
3369 GNUNET_SERVICE_client_drop (cs->client);
3370 return;
3371 }
3372 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3373 GNUNET_NO);
3374 /* copy the current generation's strata estimator for this operation */
3375 op->se = strata_estimator_dup (op->set->se);
3376 /* we started the operation, thus we have to send the operation request */
3377 op->phase = PHASE_EXPECT_SE;
3378 op->salt_receive = op->salt_send = 42; // FIXME?????
3379 LOG (GNUNET_ERROR_TYPE_DEBUG,
3380 "Initiating union operation evaluation\n");
3381 GNUNET_STATISTICS_update (_GSS_statistics,
3382 "# of total union operations",
3383 1,
3384 GNUNET_NO);
3385 GNUNET_STATISTICS_update (_GSS_statistics,
3386 "# of initiated union operations",
3387 1,
3388 GNUNET_NO);
3389 GNUNET_MQ_send (op->mq,
3390 ev);
3391 if (NULL != context)
3392 LOG (GNUNET_ERROR_TYPE_DEBUG,
3393 "sent op request with context message\n");
3394 else
3395 LOG (GNUNET_ERROR_TYPE_DEBUG,
3396 "sent op request without context message\n");
3397 initialize_key_to_element (op);
3398 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3399 op->key_to_element);
3400
3401 }
3402 GNUNET_SERVICE_client_continue (cs->client);
3403}
3404
3405
3406/**
3407 * Handle a request from the client to cancel a running set operation.
3408 *
3409 * @param cls the client
3410 * @param msg the message
3411 */
3412static void
3413handle_client_cancel (void *cls,
3414 const struct GNUNET_SETU_CancelMessage *msg)
3415{
3416 struct ClientState *cs = cls;
3417 struct Set *set;
3418 struct Operation *op;
3419 int found;
3420
3421 if (NULL == (set = cs->set))
3422 {
3423 /* client without a set requested an operation */
3424 GNUNET_break (0);
3425 GNUNET_SERVICE_client_drop (cs->client);
3426 return;
3427 }
3428 found = GNUNET_NO;
3429 for (op = set->ops_head; NULL != op; op = op->next)
3430 {
3431 if (op->client_request_id == ntohl (msg->request_id))
3432 {
3433 found = GNUNET_YES;
3434 break;
3435 }
3436 }
3437 if (GNUNET_NO == found)
3438 {
3439 /* It may happen that the operation was already destroyed due to
3440 * the other peer disconnecting. The client may not know about this
3441 * yet and try to cancel the (just barely non-existent) operation.
3442 * So this is not a hard error.
3443 *///
3444 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3445 "Client canceled non-existent op %u\n",
3446 (uint32_t) ntohl (msg->request_id));
3447 }
3448 else
3449 {
3450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3451 "Client requested cancel for op %u\n",
3452 (uint32_t) ntohl (msg->request_id));
3453 _GSS_operation_destroy (op);
3454 }
3455 GNUNET_SERVICE_client_continue (cs->client);
3456}
3457
3458
3459/**
3460 * Handle a request from the client to accept a set operation that
3461 * came from a remote peer. We forward the accept to the associated
3462 * operation for handling
3463 *
3464 * @param cls the client
3465 * @param msg the message
3466 */
3467static void
3468handle_client_accept (void *cls,
3469 const struct GNUNET_SETU_AcceptMessage *msg)
3470{
3471 struct ClientState *cs = cls;
3472 struct Set *set;
3473 struct Operation *op;
3474 struct GNUNET_SETU_ResultMessage *result_message;
3475 struct GNUNET_MQ_Envelope *ev;
3476 struct Listener *listener;
3477
3478 if (NULL == (set = cs->set))
3479 {
3480 /* client without a set requested to accept */
3481 GNUNET_break (0);
3482 GNUNET_SERVICE_client_drop (cs->client);
3483 return;
3484 }
3485 op = get_incoming (ntohl (msg->accept_reject_id));
3486 if (NULL == op)
3487 {
3488 /* It is not an error if the set op does not exist -- it may
3489 * have been destroyed when the partner peer disconnected. */
3490 GNUNET_log (
3491 GNUNET_ERROR_TYPE_INFO,
3492 "Client %p accepted request %u of listener %p that is no longer active\n",
3493 cs,
3494 ntohl (msg->accept_reject_id),
3495 cs->listener);
3496 ev = GNUNET_MQ_msg (result_message,
3497 GNUNET_MESSAGE_TYPE_SETU_RESULT);
3498 result_message->request_id = msg->request_id;
3499 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
3500 GNUNET_MQ_send (set->cs->mq, ev);
3501 GNUNET_SERVICE_client_continue (cs->client);
3502 return;
3503 }
3504 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3505 "Client accepting request %u\n",
3506 (uint32_t) ntohl (msg->accept_reject_id));
3507 listener = op->listener;
3508 op->listener = NULL;
3509 GNUNET_CONTAINER_DLL_remove (listener->op_head,
3510 listener->op_tail,
3511 op);
3512 op->set = set;
3513 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3514 set->ops_tail,
3515 op);
3516 op->client_request_id = ntohl (msg->request_id);
3517 op->byzantine = msg->byzantine;
3518 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3519 op->force_full = msg->force_full;
3520 op->force_delta = msg->force_delta;
3521 op->symmetric = msg->symmetric;
3522
3523 /* Advance generation values, so that future mutations do not
3524 interfer with the running operation. */
3525 op->generation_created = set->current_generation;
3526 advance_generation (set);
3527 GNUNET_assert (NULL == op->se);
3528
3529 LOG (GNUNET_ERROR_TYPE_DEBUG,
3530 "accepting set union operation\n");
3531 GNUNET_STATISTICS_update (_GSS_statistics,
3532 "# of accepted union operations",
3533 1,
3534 GNUNET_NO);
3535 GNUNET_STATISTICS_update (_GSS_statistics,
3536 "# of total union operations",
3537 1,
3538 GNUNET_NO);
3539 {
3540 const struct StrataEstimator *se;
3541 struct GNUNET_MQ_Envelope *ev;
3542 struct StrataEstimatorMessage *strata_msg;
3543 char *buf;
3544 size_t len;
3545 uint16_t type;
3546
3547 op->se = strata_estimator_dup (op->set->se);
3548 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3549 GNUNET_NO);
3550 op->salt_receive = op->salt_send = 42; // FIXME?????
3551 initialize_key_to_element (op);
3552 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3553 op->key_to_element);
3554
3555 /* kick off the operation */
3556 se = op->se;
3557 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3558 len = strata_estimator_write (se,
3559 buf);
3560 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
3561 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
3562 else
3563 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
3564 ev = GNUNET_MQ_msg_extra (strata_msg,
3565 len,
3566 type);
3567 GNUNET_memcpy (&strata_msg[1],
3568 buf,
3569 len);
3570 GNUNET_free (buf);
3571 strata_msg->set_size
3572 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3573 op->set->content->elements));
3574 GNUNET_MQ_send (op->mq,
3575 ev);
3576 op->phase = PHASE_EXPECT_IBF;
3577 }
3578 /* Now allow CADET to continue, as we did not do this in
3579 #handle_incoming_msg (as we wanted to first see if the
3580 local client would accept the request). */
3581 GNUNET_CADET_receive_done (op->channel);
3582 GNUNET_SERVICE_client_continue (cs->client);
3583}
3584
3585
3586/**
3587 * Called to clean up, after a shutdown has been requested.
3588 *
3589 * @param cls closure, NULL
3590 */
3591static void
3592shutdown_task (void *cls)
3593{
3594 /* Delay actual shutdown to allow service to disconnect clients */
3595 in_shutdown = GNUNET_YES;
3596 if (0 == num_clients)
3597 {
3598 if (NULL != cadet)
3599 {
3600 GNUNET_CADET_disconnect (cadet);
3601 cadet = NULL;
3602 }
3603 }
3604 GNUNET_STATISTICS_destroy (_GSS_statistics,
3605 GNUNET_YES);
3606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3607 "handled shutdown request\n");
3608}
3609
3610
3611/**
3612 * Function called by the service's run
3613 * method to run service-specific setup code.
3614 *
3615 * @param cls closure
3616 * @param cfg configuration to use
3617 * @param service the initialized service
3618 */
3619static void
3620run (void *cls,
3621 const struct GNUNET_CONFIGURATION_Handle *cfg,
3622 struct GNUNET_SERVICE_Handle *service)
3623{
3624 /* FIXME: need to modify SERVICE (!) API to allow
3625 us to run a shutdown task *after* clients were
3626 forcefully disconnected! */
3627 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3628 NULL);
3629 _GSS_statistics = GNUNET_STATISTICS_create ("setu",
3630 cfg);
3631 cadet = GNUNET_CADET_connect (cfg);
3632 if (NULL == cadet)
3633 {
3634 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3635 _ ("Could not connect to CADET service\n"));
3636 GNUNET_SCHEDULER_shutdown ();
3637 return;
3638 }
3639}
3640
3641
3642/**
3643 * Define "main" method using service macro.
3644 */
3645GNUNET_SERVICE_MAIN (
3646 "set",
3647 GNUNET_SERVICE_OPTION_NONE,
3648 &run,
3649 &client_connect_cb,
3650 &client_disconnect_cb,
3651 NULL,
3652 GNUNET_MQ_hd_fixed_size (client_accept,
3653 GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
3654 struct GNUNET_SETU_AcceptMessage,
3655 NULL),
3656 GNUNET_MQ_hd_var_size (client_set_add,
3657 GNUNET_MESSAGE_TYPE_SETU_ADD,
3658 struct GNUNET_SETU_ElementMessage,
3659 NULL),
3660 GNUNET_MQ_hd_fixed_size (client_create_set,
3661 GNUNET_MESSAGE_TYPE_SETU_CREATE,
3662 struct GNUNET_SETU_CreateMessage,
3663 NULL),
3664 GNUNET_MQ_hd_var_size (client_evaluate,
3665 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
3666 struct GNUNET_SETU_EvaluateMessage,
3667 NULL),
3668 GNUNET_MQ_hd_fixed_size (client_listen,
3669 GNUNET_MESSAGE_TYPE_SETU_LISTEN,
3670 struct GNUNET_SETU_ListenMessage,
3671 NULL),
3672 GNUNET_MQ_hd_fixed_size (client_reject,
3673 GNUNET_MESSAGE_TYPE_SETU_REJECT,
3674 struct GNUNET_SETU_RejectMessage,
3675 NULL),
3676 GNUNET_MQ_hd_fixed_size (client_cancel,
3677 GNUNET_MESSAGE_TYPE_SETU_CANCEL,
3678 struct GNUNET_SETU_CancelMessage,
3679 NULL),
3680 GNUNET_MQ_handler_end ());
3681
3682
3683/* end of gnunet-service-setu.c */
diff --git a/src/setu/gnunet-service-setu_protocol.h b/src/setu/gnunet-service-setu_protocol.h
new file mode 100644
index 000000000..a2803ee47
--- /dev/null
+++ b/src/setu/gnunet-service-setu_protocol.h
@@ -0,0 +1,226 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013, 2014 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @author Florian Dold
22 * @author Christian Grothoff
23 * @file set/gnunet-service-set_protocol.h
24 * @brief Peer-to-Peer messages for gnunet set
25 */
26#ifndef SET_PROTOCOL_H
27#define SET_PROTOCOL_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35struct OperationRequestMessage
36{
37 /**
38 * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
39 */
40 struct GNUNET_MessageHeader header;
41
42 /**
43 * Operation to request, values from `enum GNUNET_SET_OperationType`
44 */
45 uint32_t operation GNUNET_PACKED;
46
47 /**
48 * For Intersection: my element count
49 */
50 uint32_t element_count GNUNET_PACKED;
51
52 /**
53 * Application-specific identifier of the request.
54 */
55 struct GNUNET_HashCode app_idX;
56
57 /* rest: optional message */
58};
59
60
61/**
62 * Message containing buckets of an invertible bloom filter.
63 *
64 * If an IBF has too many buckets for an IBF message,
65 * it is split into multiple messages.
66 */
67struct IBFMessage
68{
69 /**
70 * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
71 */
72 struct GNUNET_MessageHeader header;
73
74 /**
75 * Order of the whole ibf, where
76 * num_buckets = 2^order
77 */
78 uint8_t order;
79
80 /**
81 * Padding, must be 0.
82 */
83 uint8_t reserved1;
84
85 /**
86 * Padding, must be 0.
87 */
88 uint16_t reserved2 GNUNET_PACKED;
89
90 /**
91 * Offset of the strata in the rest of the message
92 */
93 uint32_t offset GNUNET_PACKED;
94
95 /**
96 * Salt used when hashing elements for this IBF.
97 */
98 uint32_t salt GNUNET_PACKED;
99
100 /* rest: buckets */
101};
102
103
104struct InquiryMessage
105{
106 /**
107 * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
108 */
109 struct GNUNET_MessageHeader header;
110
111 /**
112 * Salt used when hashing elements for this inquiry.
113 */
114 uint32_t salt GNUNET_PACKED;
115
116 /**
117 * Reserved, set to 0.
118 */
119 uint32_t reserved GNUNET_PACKED;
120
121 /* rest: inquiry IBF keys */
122};
123
124
125/**
126 * During intersection, the first (and possibly second) message
127 * send it the number of elements in the set, to allow the peers
128 * to decide who should start with the Bloom filter.
129 */
130struct IntersectionElementInfoMessage
131{
132 /**
133 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
134 */
135 struct GNUNET_MessageHeader header;
136
137 /**
138 * mutator used with this bloomfilter.
139 */
140 uint32_t sender_element_count GNUNET_PACKED;
141};
142
143
144/**
145 * Bloom filter messages exchanged for set intersection calculation.
146 */
147struct BFMessage
148{
149 /**
150 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
151 */
152 struct GNUNET_MessageHeader header;
153
154 /**
155 * Number of elements the sender still has in the set.
156 */
157 uint32_t sender_element_count GNUNET_PACKED;
158
159 /**
160 * XOR of all hashes over all elements remaining in the set.
161 * Used to determine termination.
162 */
163 struct GNUNET_HashCode element_xor_hash;
164
165 /**
166 * Mutator used with this bloomfilter.
167 */
168 uint32_t sender_mutator GNUNET_PACKED;
169
170 /**
171 * Total length of the bloomfilter data.
172 */
173 uint32_t bloomfilter_total_length GNUNET_PACKED;
174
175 /**
176 * Number of bits (k-value) used in encoding the bloomfilter.
177 */
178 uint32_t bits_per_element GNUNET_PACKED;
179
180 /**
181 * rest: the sender's bloomfilter
182 */
183};
184
185
186/**
187 * Last message, send to confirm the final set. Contains the element
188 * count as it is possible that the peer determined that we were done
189 * by getting the empty set, which in that case also needs to be
190 * communicated.
191 */
192struct IntersectionDoneMessage
193{
194 /**
195 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
196 */
197 struct GNUNET_MessageHeader header;
198
199 /**
200 * Final number of elements in intersection.
201 */
202 uint32_t final_element_count GNUNET_PACKED;
203
204 /**
205 * XOR of all hashes over all elements remaining in the set.
206 */
207 struct GNUNET_HashCode element_xor_hash;
208};
209
210
211/**
212 * Strata estimator together with the peer's overall set size.
213 */
214struct StrataEstimatorMessage
215{
216 /**
217 * Type: #GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
218 */
219 struct GNUNET_MessageHeader header;
220
221 uint64_t set_size;
222};
223
224GNUNET_NETWORK_STRUCT_END
225
226#endif
diff --git a/src/setu/gnunet-service-setu_strata_estimator.c b/src/setu/gnunet-service-setu_strata_estimator.c
new file mode 100644
index 000000000..0fa6a6f17
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.c
@@ -0,0 +1,303 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-setu_strata_estimator.c
22 * @brief invertible bloom filter
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "ibf.h"
29#include "gnunet-service-setu_strata_estimator.h"
30
31
32/**
33 * Should we try compressing the strata estimator? This will
34 * break compatibility with the 0.10.1-network.
35 */
36#define FAIL_10_1_COMPATIBILTIY 1
37
38
39/**
40 * Write the given strata estimator to the buffer.
41 *
42 * @param se strata estimator to serialize
43 * @param[out] buf buffer to write to, must be of appropriate size
44 * @return number of bytes written to @a buf
45 */
46size_t
47strata_estimator_write (const struct StrataEstimator *se,
48 void *buf)
49{
50 char *sbuf = buf;
51 unsigned int i;
52 size_t osize;
53
54 GNUNET_assert (NULL != se);
55 for (i = 0; i < se->strata_count; i++)
56 {
57 ibf_write_slice (se->strata[i],
58 0,
59 se->ibf_size,
60 &sbuf[se->ibf_size * IBF_BUCKET_SIZE * i]);
61 }
62 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
63#if FAIL_10_1_COMPATIBILTIY
64 {
65 char *cbuf;
66 size_t nsize;
67
68 if (GNUNET_YES ==
69 GNUNET_try_compression (buf,
70 osize,
71 &cbuf,
72 &nsize))
73 {
74 GNUNET_memcpy (buf, cbuf, nsize);
75 osize = nsize;
76 GNUNET_free (cbuf);
77 }
78 }
79#endif
80 return osize;
81}
82
83
84/**
85 * Read strata from the buffer into the given strata
86 * estimator. The strata estimator must already be allocated.
87 *
88 * @param buf buffer to read from
89 * @param buf_len number of bytes in @a buf
90 * @param is_compressed is the data compressed?
91 * @param[out] se strata estimator to write to
92 * @return #GNUNET_OK on success
93 */
94int
95strata_estimator_read (const void *buf,
96 size_t buf_len,
97 int is_compressed,
98 struct StrataEstimator *se)
99{
100 unsigned int i;
101 size_t osize;
102 char *dbuf;
103
104 dbuf = NULL;
105 if (GNUNET_YES == is_compressed)
106 {
107 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
108 dbuf = GNUNET_decompress (buf,
109 buf_len,
110 osize);
111 if (NULL == dbuf)
112 {
113 GNUNET_break_op (0); /* bad compressed input data */
114 return GNUNET_SYSERR;
115 }
116 buf = dbuf;
117 buf_len = osize;
118 }
119
120 if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE)
121 {
122 GNUNET_break (0); /* very odd error */
123 GNUNET_free (dbuf);
124 return GNUNET_SYSERR;
125 }
126
127 for (i = 0; i < se->strata_count; i++)
128 {
129 ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]);
130 buf += se->ibf_size * IBF_BUCKET_SIZE;
131 }
132 GNUNET_free (dbuf);
133 return GNUNET_OK;
134}
135
136
137/**
138 * Add a key to the strata estimator.
139 *
140 * @param se strata estimator to add the key to
141 * @param key key to add
142 */
143void
144strata_estimator_insert (struct StrataEstimator *se,
145 struct IBF_Key key)
146{
147 uint64_t v;
148 unsigned int i;
149
150 v = key.key_val;
151 /* count trailing '1'-bits of v */
152 for (i = 0; v & 1; v >>= 1, i++)
153 /* empty */;
154 ibf_insert (se->strata[i], key);
155}
156
157
158/**
159 * Remove a key from the strata estimator.
160 *
161 * @param se strata estimator to remove the key from
162 * @param key key to remove
163 */
164void
165strata_estimator_remove (struct StrataEstimator *se,
166 struct IBF_Key key)
167{
168 uint64_t v;
169 unsigned int i;
170
171 v = key.key_val;
172 /* count trailing '1'-bits of v */
173 for (i = 0; v & 1; v >>= 1, i++)
174 /* empty */;
175 ibf_remove (se->strata[i], key);
176}
177
178
179/**
180 * Create a new strata estimator with the given parameters.
181 *
182 * @param strata_count number of stratas, that is, number of ibfs in the estimator
183 * @param ibf_size size of each ibf stratum
184 * @param ibf_hashnum hashnum parameter of each ibf
185 * @return a freshly allocated, empty strata estimator, NULL on error
186 */
187struct StrataEstimator *
188strata_estimator_create (unsigned int strata_count,
189 uint32_t ibf_size,
190 uint8_t ibf_hashnum)
191{
192 struct StrataEstimator *se;
193 unsigned int i;
194 unsigned int j;
195
196 se = GNUNET_new (struct StrataEstimator);
197 se->strata_count = strata_count;
198 se->ibf_size = ibf_size;
199 se->strata = GNUNET_new_array (strata_count,
200 struct InvertibleBloomFilter *);
201 for (i = 0; i < strata_count; i++)
202 {
203 se->strata[i] = ibf_create (ibf_size, ibf_hashnum);
204 if (NULL == se->strata[i])
205 {
206 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
207 "Failed to allocate memory for strata estimator\n");
208 for (j = 0; j < i; j++)
209 ibf_destroy (se->strata[i]);
210 GNUNET_free (se);
211 return NULL;
212 }
213 }
214 return se;
215}
216
217
218/**
219 * Estimate set difference with two strata estimators,
220 * i.e. arrays of IBFs.
221 * Does not not modify its arguments.
222 *
223 * @param se1 first strata estimator
224 * @param se2 second strata estimator
225 * @return the estimated difference
226 */
227unsigned int
228strata_estimator_difference (const struct StrataEstimator *se1,
229 const struct StrataEstimator *se2)
230{
231 unsigned int count;
232
233 GNUNET_assert (se1->strata_count == se2->strata_count);
234 count = 0;
235 for (int i = se1->strata_count - 1; i >= 0; i--)
236 {
237 struct InvertibleBloomFilter *diff;
238 /* number of keys decoded from the ibf */
239
240 /* FIXME: implement this without always allocating new IBFs */
241 diff = ibf_dup (se1->strata[i]);
242 ibf_subtract (diff, se2->strata[i]);
243 for (int ibf_count = 0; GNUNET_YES; ibf_count++)
244 {
245 int more;
246
247 more = ibf_decode (diff, NULL, NULL);
248 if (GNUNET_NO == more)
249 {
250 count += ibf_count;
251 break;
252 }
253 /* Estimate if decoding fails or would not terminate */
254 if ((GNUNET_SYSERR == more) || (ibf_count > diff->size))
255 {
256 ibf_destroy (diff);
257 return count * (1 << (i + 1));
258 }
259 }
260 ibf_destroy (diff);
261 }
262 return count;
263}
264
265
266/**
267 * Make a copy of a strata estimator.
268 *
269 * @param se the strata estimator to copy
270 * @return the copy
271 */
272struct StrataEstimator *
273strata_estimator_dup (struct StrataEstimator *se)
274{
275 struct StrataEstimator *c;
276 unsigned int i;
277
278 c = GNUNET_new (struct StrataEstimator);
279 c->strata_count = se->strata_count;
280 c->ibf_size = se->ibf_size;
281 c->strata = GNUNET_new_array (se->strata_count,
282 struct InvertibleBloomFilter *);
283 for (i = 0; i < se->strata_count; i++)
284 c->strata[i] = ibf_dup (se->strata[i]);
285 return c;
286}
287
288
289/**
290 * Destroy a strata estimator, free all of its resources.
291 *
292 * @param se strata estimator to destroy.
293 */
294void
295strata_estimator_destroy (struct StrataEstimator *se)
296{
297 unsigned int i;
298
299 for (i = 0; i < se->strata_count; i++)
300 ibf_destroy (se->strata[i]);
301 GNUNET_free (se->strata);
302 GNUNET_free (se);
303}
diff --git a/src/setu/gnunet-service-setu_strata_estimator.h b/src/setu/gnunet-service-setu_strata_estimator.h
new file mode 100644
index 000000000..afdbcdbbf
--- /dev/null
+++ b/src/setu/gnunet-service-setu_strata_estimator.h
@@ -0,0 +1,169 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/gnunet-service-setu_strata_estimator.h
23 * @brief estimator of set difference
24 * @author Florian Dold
25 */
26
27#ifndef GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
28#define GNUNET_SERVICE_SETU_STRATA_ESTIMATOR_H
29
30#include "platform.h"
31#include "gnunet_common.h"
32#include "gnunet_util_lib.h"
33
34#ifdef __cplusplus
35extern "C"
36{
37#if 0 /* keep Emacsens' auto-indent happy */
38}
39#endif
40#endif
41
42
43/**
44 * A handle to a strata estimator.
45 */
46struct StrataEstimator
47{
48 /**
49 * The IBFs of this strata estimator.
50 */
51 struct InvertibleBloomFilter **strata;
52
53 /**
54 * Size of the IBF array in @e strata
55 */
56 unsigned int strata_count;
57
58 /**
59 * Size of each IBF stratum (in bytes)
60 */
61 unsigned int ibf_size;
62};
63
64
65/**
66 * Write the given strata estimator to the buffer.
67 *
68 * @param se strata estimator to serialize
69 * @param[out] buf buffer to write to, must be of appropriate size
70 * @return number of bytes written to @a buf
71 */
72size_t
73strata_estimator_write (const struct StrataEstimator *se,
74 void *buf);
75
76
77/**
78 * Read strata from the buffer into the given strata
79 * estimator. The strata estimator must already be allocated.
80 *
81 * @param buf buffer to read from
82 * @param buf_len number of bytes in @a buf
83 * @param is_compressed is the data compressed?
84 * @param[out] se strata estimator to write to
85 * @return #GNUNET_OK on success
86 */
87int
88strata_estimator_read (const void *buf,
89 size_t buf_len,
90 int is_compressed,
91 struct StrataEstimator *se);
92
93
94/**
95 * Create a new strata estimator with the given parameters.
96 *
97 * @param strata_count number of stratas, that is, number of ibfs in the estimator
98 * @param ibf_size size of each ibf stratum
99 * @param ibf_hashnum hashnum parameter of each ibf
100 * @return a freshly allocated, empty strata estimator, NULL on error
101 */
102struct StrataEstimator *
103strata_estimator_create (unsigned int strata_count,
104 uint32_t ibf_size,
105 uint8_t ibf_hashnum);
106
107
108/**
109 * Get an estimation of the symmetric difference of the elements
110 * contained in both strata estimators.
111 *
112 * @param se1 first strata estimator
113 * @param se2 second strata estimator
114 * @return abs(|se1| - |se2|)
115 */
116unsigned int
117strata_estimator_difference (const struct StrataEstimator *se1,
118 const struct StrataEstimator *se2);
119
120
121/**
122 * Add a key to the strata estimator.
123 *
124 * @param se strata estimator to add the key to
125 * @param key key to add
126 */
127void
128strata_estimator_insert (struct StrataEstimator *se,
129 struct IBF_Key key);
130
131
132/**
133 * Remove a key from the strata estimator.
134 *
135 * @param se strata estimator to remove the key from
136 * @param key key to remove
137 */
138void
139strata_estimator_remove (struct StrataEstimator *se,
140 struct IBF_Key key);
141
142
143/**
144 * Destroy a strata estimator, free all of its resources.
145 *
146 * @param se strata estimator to destroy.
147 */
148void
149strata_estimator_destroy (struct StrataEstimator *se);
150
151
152/**
153 * Make a copy of a strata estimator.
154 *
155 * @param se the strata estimator to copy
156 * @return the copy
157 */
158struct StrataEstimator *
159strata_estimator_dup (struct StrataEstimator *se);
160
161
162#if 0 /* keep Emacsens' auto-indent happy */
163{
164#endif
165#ifdef __cplusplus
166}
167#endif
168
169#endif
diff --git a/src/setu/gnunet-setu-ibf-profiler.c b/src/setu/gnunet-setu-ibf-profiler.c
new file mode 100644
index 000000000..944b63d30
--- /dev/null
+++ b/src/setu/gnunet-setu-ibf-profiler.c
@@ -0,0 +1,308 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/gnunet-set-ibf-profiler.c
23 * @brief tool for profiling the invertible bloom filter implementation
24 * @author Florian Dold
25 */
26
27#include "platform.h"
28#include "gnunet_util_lib.h"
29
30#include "ibf.h"
31
32static unsigned int asize = 10;
33static unsigned int bsize = 10;
34static unsigned int csize = 10;
35static unsigned int hash_num = 4;
36static unsigned int ibf_size = 80;
37
38/* FIXME: add parameter for this */
39static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
40
41static struct GNUNET_CONTAINER_MultiHashMap *set_a;
42static struct GNUNET_CONTAINER_MultiHashMap *set_b;
43/* common elements in a and b */
44static struct GNUNET_CONTAINER_MultiHashMap *set_c;
45
46static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
47
48static struct InvertibleBloomFilter *ibf_a;
49static struct InvertibleBloomFilter *ibf_b;
50
51
52static void
53register_hashcode (struct GNUNET_HashCode *hash)
54{
55 struct GNUNET_HashCode replicated;
56 struct IBF_Key key;
57
58 key = ibf_key_from_hashcode (hash);
59 ibf_hashcode_from_key (key, &replicated);
60 (void) GNUNET_CONTAINER_multihashmap_put (
61 key_to_hashcode,
62 &replicated,
63 GNUNET_memdup (hash, sizeof *hash),
64 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
65}
66
67
68static void
69iter_hashcodes (struct IBF_Key key,
70 GNUNET_CONTAINER_MulitHashMapIteratorCallback iter,
71 void *cls)
72{
73 struct GNUNET_HashCode replicated;
74
75 ibf_hashcode_from_key (key, &replicated);
76 GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode,
77 &replicated,
78 iter,
79 cls);
80}
81
82
83static int
84insert_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
85{
86 struct InvertibleBloomFilter *ibf = cls;
87
88 ibf_insert (ibf, ibf_key_from_hashcode (key));
89 return GNUNET_YES;
90}
91
92
93static int
94remove_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
95{
96 struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
97
98 /* if remove fails, there just was a collision with another key */
99 (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
100 return GNUNET_YES;
101}
102
103
104static void
105run (void *cls,
106 char *const *args,
107 const char *cfgfile,
108 const struct GNUNET_CONFIGURATION_Handle *cfg)
109{
110 struct GNUNET_HashCode id;
111 struct IBF_Key ibf_key;
112 int i;
113 int side;
114 int res;
115 struct GNUNET_TIME_Absolute start_time;
116 struct GNUNET_TIME_Relative delta_time;
117
118 set_a =
119 GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)),
120 GNUNET_NO);
121 set_b =
122 GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)),
123 GNUNET_NO);
124 set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
125 GNUNET_NO);
126
127 key_to_hashcode =
128 GNUNET_CONTAINER_multihashmap_create (((asize + bsize + csize == 0)
129 ? 1
130 : (asize + bsize + csize)),
131 GNUNET_NO);
132
133 printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
134 hash_num,
135 ibf_size,
136 asize,
137 bsize,
138 csize);
139
140 i = 0;
141 while (i < asize)
142 {
143 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
144 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
145 continue;
146 GNUNET_break (GNUNET_OK ==
147 GNUNET_CONTAINER_multihashmap_put (
148 set_a,
149 &id,
150 NULL,
151 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
152 register_hashcode (&id);
153 i++;
154 }
155 i = 0;
156 while (i < bsize)
157 {
158 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
159 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
160 continue;
161 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
162 continue;
163 GNUNET_break (GNUNET_OK ==
164 GNUNET_CONTAINER_multihashmap_put (
165 set_b,
166 &id,
167 NULL,
168 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
169 register_hashcode (&id);
170 i++;
171 }
172 i = 0;
173 while (i < csize)
174 {
175 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
176 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
177 continue;
178 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
179 continue;
180 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
181 continue;
182 GNUNET_break (GNUNET_OK ==
183 GNUNET_CONTAINER_multihashmap_put (
184 set_c,
185 &id,
186 NULL,
187 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
188 register_hashcode (&id);
189 i++;
190 }
191
192 ibf_a = ibf_create (ibf_size, hash_num);
193 ibf_b = ibf_create (ibf_size, hash_num);
194 if ((NULL == ibf_a) || (NULL == ibf_b))
195 {
196 /* insufficient memory */
197 GNUNET_break (0);
198 GNUNET_SCHEDULER_shutdown ();
199 return;
200 }
201
202
203 printf ("generated sets\n");
204
205 start_time = GNUNET_TIME_absolute_get ();
206
207 GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
208 GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
209 GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
210 GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
211
212 delta_time = GNUNET_TIME_absolute_get_duration (start_time);
213
214 printf ("encoded in: %s\n",
215 GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
216
217 ibf_subtract (ibf_a, ibf_b);
218
219
220 start_time = GNUNET_TIME_absolute_get ();
221
222 for (i = 0; i <= asize + bsize; i++)
223 {
224 res = ibf_decode (ibf_a, &side, &ibf_key);
225 if (GNUNET_SYSERR == res)
226 {
227 printf ("decode failed, %u/%u elements left\n",
228 GNUNET_CONTAINER_multihashmap_size (set_a)
229 + GNUNET_CONTAINER_multihashmap_size (set_b),
230 asize + bsize);
231 return;
232 }
233 if (GNUNET_NO == res)
234 {
235 if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
236 (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
237 {
238 delta_time = GNUNET_TIME_absolute_get_duration (start_time);
239 printf ("decoded successfully in: %s\n",
240 GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
241 }
242 else
243 {
244 printf ("decode missed elements (should never happen)\n");
245 }
246 return;
247 }
248
249 if (side == 1)
250 iter_hashcodes (ibf_key, remove_iterator, set_a);
251 if (side == -1)
252 iter_hashcodes (ibf_key, remove_iterator, set_b);
253 }
254 printf ("cyclic IBF, %u/%u elements left\n",
255 GNUNET_CONTAINER_multihashmap_size (set_a)
256 + GNUNET_CONTAINER_multihashmap_size (set_b),
257 asize + bsize);
258}
259
260
261int
262main (int argc, char **argv)
263{
264 struct GNUNET_GETOPT_CommandLineOption options[] = {
265 GNUNET_GETOPT_option_uint ('A',
266 "asize",
267 NULL,
268 gettext_noop ("number of element in set A-B"),
269 &asize),
270
271 GNUNET_GETOPT_option_uint ('B',
272 "bsize",
273 NULL,
274 gettext_noop ("number of element in set B-A"),
275 &bsize),
276
277 GNUNET_GETOPT_option_uint ('C',
278 "csize",
279 NULL,
280 gettext_noop (
281 "number of common elements in A and B"),
282 &csize),
283
284 GNUNET_GETOPT_option_uint ('k',
285 "hash-num",
286 NULL,
287 gettext_noop ("hash num"),
288 &hash_num),
289
290 GNUNET_GETOPT_option_uint ('s',
291 "ibf-size",
292 NULL,
293 gettext_noop ("ibf size"),
294 &ibf_size),
295
296 GNUNET_GETOPT_OPTION_END
297 };
298
299 GNUNET_PROGRAM_run2 (argc,
300 argv,
301 "gnunet-consensus-ibf",
302 "help",
303 options,
304 &run,
305 NULL,
306 GNUNET_YES);
307 return 0;
308}
diff --git a/src/setu/gnunet-setu-profiler.c b/src/setu/gnunet-setu-profiler.c
new file mode 100644
index 000000000..8d6a2dc8c
--- /dev/null
+++ b/src/setu/gnunet-setu-profiler.c
@@ -0,0 +1,499 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file setu/gnunet-setu-profiler.c
23 * @brief profiling tool for set
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h"
29#include "gnunet_setu_service.h"
30#include "gnunet_testbed_service.h"
31
32
33static int ret;
34
35static unsigned int num_a = 5;
36static unsigned int num_b = 5;
37static unsigned int num_c = 20;
38
39static char *op_str = "union";
40
41const static struct GNUNET_CONFIGURATION_Handle *config;
42
43struct SetInfo
44{
45 char *id;
46 struct GNUNET_SETU_Handle *set;
47 struct GNUNET_SETU_OperationHandle *oh;
48 struct GNUNET_CONTAINER_MultiHashMap *sent;
49 struct GNUNET_CONTAINER_MultiHashMap *received;
50 int done;
51} info1, info2;
52
53static struct GNUNET_CONTAINER_MultiHashMap *common_sent;
54
55static struct GNUNET_HashCode app_id;
56
57static struct GNUNET_PeerIdentity local_peer;
58
59static struct GNUNET_SETU_ListenHandle *set_listener;
60
61static int byzantine;
62static unsigned int force_delta;
63static unsigned int force_full;
64static unsigned int element_size = 32;
65
66/**
67 * Handle to the statistics service.
68 */
69static struct GNUNET_STATISTICS_Handle *statistics;
70
71/**
72 * The profiler will write statistics
73 * for all peers to the file with this name.
74 */
75static char *statistics_filename;
76
77/**
78 * The profiler will write statistics
79 * for all peers to this file.
80 */
81static FILE *statistics_file;
82
83
84static int
85map_remove_iterator (void *cls,
86 const struct GNUNET_HashCode *key,
87 void *value)
88{
89 struct GNUNET_CONTAINER_MultiHashMap *m = cls;
90 int ret;
91
92 GNUNET_assert (NULL != key);
93
94 ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key);
95 if (GNUNET_OK != ret)
96 printf ("spurious element\n");
97 return GNUNET_YES;
98}
99
100
101/**
102 * Callback function to process statistic values.
103 *
104 * @param cls closure
105 * @param subsystem name of subsystem that created the statistic
106 * @param name the name of the datum
107 * @param value the current value
108 * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
109 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
110 */
111static int
112statistics_result (void *cls,
113 const char *subsystem,
114 const char *name,
115 uint64_t value,
116 int is_persistent)
117{
118 if (NULL != statistics_file)
119 {
120 fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned
121 long) value);
122 }
123 return GNUNET_OK;
124}
125
126
127static void
128statistics_done (void *cls,
129 int success)
130{
131 GNUNET_assert (GNUNET_YES == success);
132 if (NULL != statistics_file)
133 fclose (statistics_file);
134 GNUNET_SCHEDULER_shutdown ();
135}
136
137
138static void
139check_all_done (void)
140{
141 if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO))
142 return;
143
144 GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator,
145 info2.sent);
146 GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator,
147 info1.sent);
148
149 printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
150 info1.sent));
151 printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
152 info2.sent));
153
154 if (NULL == statistics_filename)
155 {
156 GNUNET_SCHEDULER_shutdown ();
157 return;
158 }
159
160 statistics_file = fopen (statistics_filename, "w");
161 GNUNET_STATISTICS_get (statistics, NULL, NULL,
162 &statistics_done,
163 &statistics_result, NULL);
164}
165
166
167static void
168set_result_cb (void *cls,
169 const struct GNUNET_SETU_Element *element,
170 uint64_t current_size,
171 enum GNUNET_SETU_Status status)
172{
173 struct SetInfo *info = cls;
174
175 GNUNET_assert (GNUNET_NO == info->done);
176 switch (status)
177 {
178 case GNUNET_SETU_STATUS_DONE:
179 info->done = GNUNET_YES;
180 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id);
181 check_all_done ();
182 info->oh = NULL;
183 return;
184
185 case GNUNET_SETU_STATUS_FAILURE:
186 info->oh = NULL;
187 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
188 GNUNET_SCHEDULER_shutdown ();
189 return;
190
191 case GNUNET_SETU_STATUS_ADD_LOCAL:
192 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id);
193 break;
194 default:
195 GNUNET_assert (0);
196 }
197
198 if (element->size != element_size)
199 {
200 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
201 "wrong element size: %u, expected %u\n",
202 element->size,
203 (unsigned int) sizeof(struct GNUNET_HashCode));
204 GNUNET_assert (0);
205 }
206
207 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n",
208 info->id, GNUNET_h2s (element->data));
209 GNUNET_assert (NULL != element->data);
210 struct GNUNET_HashCode data_hash;
211 GNUNET_CRYPTO_hash (element->data, element_size, &data_hash);
212 GNUNET_CONTAINER_multihashmap_put (info->received,
213 &data_hash, NULL,
214 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
215}
216
217
218static void
219set_listen_cb (void *cls,
220 const struct GNUNET_PeerIdentity *other_peer,
221 const struct GNUNET_MessageHeader *context_msg,
222 struct GNUNET_SETU_Request *request)
223{
224 /* max. 2 options plus terminator */
225 struct GNUNET_SETU_Option opts[3] = { { 0 } };
226 unsigned int n_opts = 0;
227
228 if (NULL == request)
229 {
230 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
231 "listener failed\n");
232 return;
233 }
234 GNUNET_assert (NULL == info2.oh);
235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
236 "set listen cb called\n");
237 if (byzantine)
238 {
239 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
240 GNUNET_SETU_OPTION_BYZANTINE };
241 }
242 GNUNET_assert (! (force_full && force_delta));
243 if (force_full)
244 {
245 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
246 GNUNET_SETU_OPTION_FORCE_FULL };
247 }
248 if (force_delta)
249 {
250 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
251 GNUNET_SETU_OPTION_FORCE_DELTA };
252 }
253
254 opts[n_opts].type = 0;
255 info2.oh = GNUNET_SETU_accept (request,
256 opts,
257 set_result_cb, &info2);
258 GNUNET_SETU_commit (info2.oh, info2.set);
259}
260
261
262static int
263set_insert_iterator (void *cls,
264 const struct GNUNET_HashCode *key,
265 void *value)
266{
267 struct GNUNET_SETU_Handle *set = cls;
268 struct GNUNET_SETU_Element el;
269
270 el.element_type = 0;
271 el.data = value;
272 el.size = element_size;
273 GNUNET_SETU_add_element (set, &el, NULL, NULL);
274 return GNUNET_YES;
275}
276
277
278static void
279handle_shutdown (void *cls)
280{
281 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
282 "Shutting down set profiler\n");
283 if (NULL != set_listener)
284 {
285 GNUNET_SETU_listen_cancel (set_listener);
286 set_listener = NULL;
287 }
288 if (NULL != info1.oh)
289 {
290 GNUNET_SETU_operation_cancel (info1.oh);
291 info1.oh = NULL;
292 }
293 if (NULL != info2.oh)
294 {
295 GNUNET_SETU_operation_cancel (info2.oh);
296 info2.oh = NULL;
297 }
298 if (NULL != info1.set)
299 {
300 GNUNET_SETU_destroy (info1.set);
301 info1.set = NULL;
302 }
303 if (NULL != info2.set)
304 {
305 GNUNET_SETU_destroy (info2.set);
306 info2.set = NULL;
307 }
308 GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
309}
310
311
312static void
313run (void *cls,
314 const struct GNUNET_CONFIGURATION_Handle *cfg,
315 struct GNUNET_TESTING_Peer *peer)
316{
317 unsigned int i;
318 struct GNUNET_HashCode hash;
319 /* max. 2 options plus terminator */
320 struct GNUNET_SETU_Option opts[3] = { { 0 } };
321 unsigned int n_opts = 0;
322
323 config = cfg;
324
325 GNUNET_assert (element_size > 0);
326
327 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer))
328 {
329 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
330 ret = 0;
331 return;
332 }
333
334 statistics = GNUNET_STATISTICS_create ("set-profiler", cfg);
335
336 GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL);
337
338 info1.id = "a";
339 info2.id = "b";
340
341 info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
342 info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
343 common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO);
344
345 info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
346 info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
347
348 for (i = 0; i < num_a; i++)
349 {
350 char *data = GNUNET_malloc (element_size);
351 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
352 GNUNET_CRYPTO_hash (data, element_size, &hash);
353 GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data,
354 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
355 }
356
357 for (i = 0; i < num_b; i++)
358 {
359 char *data = GNUNET_malloc (element_size);
360 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
361 GNUNET_CRYPTO_hash (data, element_size, &hash);
362 GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data,
363 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
364 }
365
366 for (i = 0; i < num_c; i++)
367 {
368 char *data = GNUNET_malloc (element_size);
369 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
370 GNUNET_CRYPTO_hash (data, element_size, &hash);
371 GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data,
372 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
373 }
374
375 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id);
376
377 /* FIXME: also implement intersection etc. */
378 info1.set = GNUNET_SETU_create (config);
379 info2.set = GNUNET_SETU_create (config);
380
381 GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator,
382 info1.set);
383 GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator,
384 info2.set);
385 GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
386 info1.set);
387 GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
388 info2.set);
389
390 set_listener = GNUNET_SETU_listen (config,
391 &app_id,
392 &set_listen_cb,
393 NULL);
394
395
396 if (byzantine)
397 {
398 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
399 GNUNET_SETU_OPTION_BYZANTINE };
400 }
401 GNUNET_assert (! (force_full && force_delta));
402 if (force_full)
403 {
404 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
405 GNUNET_SETU_OPTION_FORCE_FULL };
406 }
407 if (force_delta)
408 {
409 opts[n_opts++] = (struct GNUNET_SETU_Option) { .type =
410 GNUNET_SETU_OPTION_FORCE_DELTA };
411 }
412
413 opts[n_opts].type = 0;
414
415 info1.oh = GNUNET_SETU_prepare (&local_peer, &app_id, NULL,
416 opts,
417 set_result_cb, &info1);
418 GNUNET_SETU_commit (info1.oh, info1.set);
419 GNUNET_SETU_destroy (info1.set);
420 info1.set = NULL;
421}
422
423
424static void
425pre_run (void *cls, char *const *args, const char *cfgfile,
426 const struct GNUNET_CONFIGURATION_Handle *cfg)
427{
428 if (0 != GNUNET_TESTING_peer_run ("set-profiler",
429 cfgfile,
430 &run, NULL))
431 ret = 2;
432}
433
434
435int
436main (int argc, char **argv)
437{
438 struct GNUNET_GETOPT_CommandLineOption options[] = {
439 GNUNET_GETOPT_option_uint ('A',
440 "num-first",
441 NULL,
442 gettext_noop ("number of values"),
443 &num_a),
444
445 GNUNET_GETOPT_option_uint ('B',
446 "num-second",
447 NULL,
448 gettext_noop ("number of values"),
449 &num_b),
450
451 GNUNET_GETOPT_option_flag ('b',
452 "byzantine",
453 gettext_noop ("use byzantine mode"),
454 &byzantine),
455
456 GNUNET_GETOPT_option_uint ('f',
457 "force-full",
458 NULL,
459 gettext_noop ("force sending full set"),
460 &force_full),
461
462 GNUNET_GETOPT_option_uint ('d',
463 "force-delta",
464 NULL,
465 gettext_noop ("number delta operation"),
466 &force_delta),
467
468 GNUNET_GETOPT_option_uint ('C',
469 "num-common",
470 NULL,
471 gettext_noop ("number of values"),
472 &num_c),
473
474 GNUNET_GETOPT_option_string ('x',
475 "operation",
476 NULL,
477 gettext_noop ("operation to execute"),
478 &op_str),
479
480 GNUNET_GETOPT_option_uint ('w',
481 "element-size",
482 NULL,
483 gettext_noop ("element size"),
484 &element_size),
485
486 GNUNET_GETOPT_option_filename ('s',
487 "statistics",
488 "FILENAME",
489 gettext_noop ("write statistics to file"),
490 &statistics_filename),
491
492 GNUNET_GETOPT_OPTION_END
493 };
494
495 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler",
496 "help",
497 options, &pre_run, NULL, GNUNET_YES);
498 return ret;
499}
diff --git a/src/setu/ibf.c b/src/setu/ibf.c
new file mode 100644
index 000000000..1532afceb
--- /dev/null
+++ b/src/setu/ibf.c
@@ -0,0 +1,409 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/ibf.c
23 * @brief implementation of the invertible bloom filter
24 * @author Florian Dold
25 */
26
27#include "ibf.h"
28
29/**
30 * Compute the key's hash from the key.
31 * Redefine to use a different hash function.
32 */
33#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof(struct \
34 IBF_KeyHash)))
35
36/**
37 * Create a key from a hashcode.
38 *
39 * @param hash the hashcode
40 * @return a key
41 */
42struct IBF_Key
43ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
44{
45 return *(struct IBF_Key *) hash;
46}
47
48
49/**
50 * Create a hashcode from a key, by replicating the key
51 * until the hascode is filled
52 *
53 * @param key the key
54 * @param dst hashcode to store the result in
55 */
56void
57ibf_hashcode_from_key (struct IBF_Key key,
58 struct GNUNET_HashCode *dst)
59{
60 struct IBF_Key *p;
61 unsigned int i;
62 const unsigned int keys_per_hashcode = sizeof(struct GNUNET_HashCode)
63 / sizeof(struct IBF_Key);
64
65 p = (struct IBF_Key *) dst;
66 for (i = 0; i < keys_per_hashcode; i++)
67 *p++ = key;
68}
69
70
71/**
72 * Create an invertible bloom filter.
73 *
74 * @param size number of IBF buckets
75 * @param hash_num number of buckets one element is hashed in
76 * @return the newly created invertible bloom filter, NULL on error
77 */
78struct InvertibleBloomFilter *
79ibf_create (uint32_t size, uint8_t hash_num)
80{
81 struct InvertibleBloomFilter *ibf;
82
83 GNUNET_assert (0 != size);
84
85 ibf = GNUNET_new (struct InvertibleBloomFilter);
86 ibf->count = GNUNET_malloc_large (size * sizeof(uint8_t));
87 if (NULL == ibf->count)
88 {
89 GNUNET_free (ibf);
90 return NULL;
91 }
92 ibf->key_sum = GNUNET_malloc_large (size * sizeof(struct IBF_Key));
93 if (NULL == ibf->key_sum)
94 {
95 GNUNET_free (ibf->count);
96 GNUNET_free (ibf);
97 return NULL;
98 }
99 ibf->key_hash_sum = GNUNET_malloc_large (size * sizeof(struct IBF_KeyHash));
100 if (NULL == ibf->key_hash_sum)
101 {
102 GNUNET_free (ibf->key_sum);
103 GNUNET_free (ibf->count);
104 GNUNET_free (ibf);
105 return NULL;
106 }
107 ibf->size = size;
108 ibf->hash_num = hash_num;
109
110 return ibf;
111}
112
113
114/**
115 * Store unique bucket indices for the specified key in dst.
116 */
117static void
118ibf_get_indices (const struct InvertibleBloomFilter *ibf,
119 struct IBF_Key key,
120 int *dst)
121{
122 uint32_t filled;
123 uint32_t i;
124 uint32_t bucket;
125
126 bucket = GNUNET_CRYPTO_crc32_n (&key, sizeof key);
127 for (i = 0, filled = 0; filled < ibf->hash_num; i++)
128 {
129 unsigned int j;
130 uint64_t x;
131 for (j = 0; j < filled; j++)
132 if (dst[j] == bucket)
133 goto try_next;
134 dst[filled++] = bucket % ibf->size;
135try_next:;
136 x = ((uint64_t) bucket << 32) | i;
137 bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
138 }
139}
140
141
142static void
143ibf_insert_into (struct InvertibleBloomFilter *ibf,
144 struct IBF_Key key,
145 const int *buckets, int side)
146{
147 int i;
148
149 for (i = 0; i < ibf->hash_num; i++)
150 {
151 const int bucket = buckets[i];
152 ibf->count[bucket].count_val += side;
153 ibf->key_sum[bucket].key_val ^= key.key_val;
154 ibf->key_hash_sum[bucket].key_hash_val
155 ^= IBF_KEY_HASH_VAL (key);
156 }
157}
158
159
160/**
161 * Insert a key into an IBF.
162 *
163 * @param ibf the IBF
164 * @param key the element's hash code
165 */
166void
167ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
168{
169 int buckets[ibf->hash_num];
170
171 GNUNET_assert (ibf->hash_num <= ibf->size);
172 ibf_get_indices (ibf, key, buckets);
173 ibf_insert_into (ibf, key, buckets, 1);
174}
175
176
177/**
178 * Remove a key from an IBF.
179 *
180 * @param ibf the IBF
181 * @param key the element's hash code
182 */
183void
184ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
185{
186 int buckets[ibf->hash_num];
187
188 GNUNET_assert (ibf->hash_num <= ibf->size);
189 ibf_get_indices (ibf, key, buckets);
190 ibf_insert_into (ibf, key, buckets, -1);
191}
192
193
194/**
195 * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero.
196 */
197static int
198ibf_is_empty (struct InvertibleBloomFilter *ibf)
199{
200 int i;
201
202 for (i = 0; i < ibf->size; i++)
203 {
204 if (0 != ibf->count[i].count_val)
205 return GNUNET_NO;
206 if (0 != ibf->key_hash_sum[i].key_hash_val)
207 return GNUNET_NO;
208 if (0 != ibf->key_sum[i].key_val)
209 return GNUNET_NO;
210 }
211 return GNUNET_YES;
212}
213
214
215/**
216 * Decode and remove an element from the IBF, if possible.
217 *
218 * @param ibf the invertible bloom filter to decode
219 * @param ret_side sign of the cell's count where the decoded element came from.
220 * A negative sign indicates that the element was recovered
221 * resides in an IBF that was previously subtracted from.
222 * @param ret_id receives the hash code of the decoded element, if successful
223 * @return GNUNET_YES if decoding an element was successful,
224 * GNUNET_NO if the IBF is empty,
225 * GNUNET_SYSERR if the decoding has failed
226 */
227int
228ibf_decode (struct InvertibleBloomFilter *ibf,
229 int *ret_side, struct IBF_Key *ret_id)
230{
231 struct IBF_KeyHash hash;
232 int i;
233 int buckets[ibf->hash_num];
234
235 GNUNET_assert (NULL != ibf);
236
237 for (i = 0; i < ibf->size; i++)
238 {
239 int j;
240 int hit;
241
242 /* we can only decode from pure buckets */
243 if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
244 continue;
245
246 hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]);
247
248 /* test if the hash matches the key */
249 if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)
250 continue;
251
252 /* test if key in bucket hits its own location,
253 * if not, the key hash was subject to collision */
254 hit = GNUNET_NO;
255 ibf_get_indices (ibf, ibf->key_sum[i], buckets);
256 for (j = 0; j < ibf->hash_num; j++)
257 if (buckets[j] == i)
258 hit = GNUNET_YES;
259
260 if (GNUNET_NO == hit)
261 continue;
262
263 if (NULL != ret_side)
264 *ret_side = ibf->count[i].count_val;
265 if (NULL != ret_id)
266 *ret_id = ibf->key_sum[i];
267
268 /* insert on the opposite side, effectively removing the element */
269 ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
270
271 return GNUNET_YES;
272 }
273
274 if (GNUNET_YES == ibf_is_empty (ibf))
275 return GNUNET_NO;
276 return GNUNET_SYSERR;
277}
278
279
280/**
281 * Write buckets from an ibf to a buffer.
282 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
283 *
284 * @param ibf the ibf to write
285 * @param start with which bucket to start
286 * @param count how many buckets to write
287 * @param buf buffer to write the data to
288 */
289void
290ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start,
291 uint32_t count, void *buf)
292{
293 struct IBF_Key *key_dst;
294 struct IBF_KeyHash *key_hash_dst;
295 struct IBF_Count *count_dst;
296
297 GNUNET_assert (start + count <= ibf->size);
298
299 /* copy keys */
300 key_dst = (struct IBF_Key *) buf;
301 GNUNET_memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst);
302 key_dst += count;
303 /* copy key hashes */
304 key_hash_dst = (struct IBF_KeyHash *) key_dst;
305 GNUNET_memcpy (key_hash_dst, ibf->key_hash_sum + start, count
306 * sizeof *key_hash_dst);
307 key_hash_dst += count;
308 /* copy counts */
309 count_dst = (struct IBF_Count *) key_hash_dst;
310 GNUNET_memcpy (count_dst, ibf->count + start, count * sizeof *count_dst);
311}
312
313
314/**
315 * Read buckets from a buffer into an ibf.
316 *
317 * @param buf pointer to the buffer to read from
318 * @param start which bucket to start at
319 * @param count how many buckets to read
320 * @param ibf the ibf to read from
321 */
322void
323ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct
324 InvertibleBloomFilter *ibf)
325{
326 struct IBF_Key *key_src;
327 struct IBF_KeyHash *key_hash_src;
328 struct IBF_Count *count_src;
329
330 GNUNET_assert (count > 0);
331 GNUNET_assert (start + count <= ibf->size);
332
333 /* copy keys */
334 key_src = (struct IBF_Key *) buf;
335 GNUNET_memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src);
336 key_src += count;
337 /* copy key hashes */
338 key_hash_src = (struct IBF_KeyHash *) key_src;
339 GNUNET_memcpy (ibf->key_hash_sum + start, key_hash_src, count
340 * sizeof *key_hash_src);
341 key_hash_src += count;
342 /* copy counts */
343 count_src = (struct IBF_Count *) key_hash_src;
344 GNUNET_memcpy (ibf->count + start, count_src, count * sizeof *count_src);
345}
346
347
348/**
349 * Subtract ibf2 from ibf1, storing the result in ibf1.
350 * The two IBF's must have the same parameters size and hash_num.
351 *
352 * @param ibf1 IBF that is subtracted from
353 * @param ibf2 IBF that will be subtracted from ibf1
354 */
355void
356ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct
357 InvertibleBloomFilter *ibf2)
358{
359 int i;
360
361 GNUNET_assert (ibf1->size == ibf2->size);
362 GNUNET_assert (ibf1->hash_num == ibf2->hash_num);
363
364 for (i = 0; i < ibf1->size; i++)
365 {
366 ibf1->count[i].count_val -= ibf2->count[i].count_val;
367 ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val;
368 ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val;
369 }
370}
371
372
373/**
374 * Create a copy of an IBF, the copy has to be destroyed properly.
375 *
376 * @param ibf the IBF to copy
377 */
378struct InvertibleBloomFilter *
379ibf_dup (const struct InvertibleBloomFilter *ibf)
380{
381 struct InvertibleBloomFilter *copy;
382
383 copy = GNUNET_malloc (sizeof *copy);
384 copy->hash_num = ibf->hash_num;
385 copy->size = ibf->size;
386 copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size
387 * sizeof(struct IBF_KeyHash));
388 copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof(struct
389 IBF_Key));
390 copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof(struct
391 IBF_Count));
392 return copy;
393}
394
395
396/**
397 * Destroy all resources associated with the invertible bloom filter.
398 * No more ibf_*-functions may be called on ibf after calling destroy.
399 *
400 * @param ibf the intertible bloom filter to destroy
401 */
402void
403ibf_destroy (struct InvertibleBloomFilter *ibf)
404{
405 GNUNET_free (ibf->key_sum);
406 GNUNET_free (ibf->key_hash_sum);
407 GNUNET_free (ibf->count);
408 GNUNET_free (ibf);
409}
diff --git a/src/setu/ibf.h b/src/setu/ibf.h
new file mode 100644
index 000000000..7c2ab33b1
--- /dev/null
+++ b/src/setu/ibf.h
@@ -0,0 +1,255 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/ibf.h
23 * @brief invertible bloom filter
24 * @author Florian Dold
25 */
26
27#ifndef GNUNET_CONSENSUS_IBF_H
28#define GNUNET_CONSENSUS_IBF_H
29
30#include "platform.h"
31#include "gnunet_util_lib.h"
32
33#ifdef __cplusplus
34extern "C"
35{
36#if 0 /* keep Emacsens' auto-indent happy */
37}
38#endif
39#endif
40
41
42/**
43 * Keys that can be inserted into and removed from an IBF.
44 */
45struct IBF_Key
46{
47 uint64_t key_val;
48};
49
50
51/**
52 * Hash of an IBF key.
53 */
54struct IBF_KeyHash
55{
56 uint32_t key_hash_val;
57};
58
59
60/**
61 * Type of the count field of IBF buckets.
62 */
63struct IBF_Count
64{
65 int8_t count_val;
66};
67
68
69/**
70 * Size of one ibf bucket in bytes
71 */
72#define IBF_BUCKET_SIZE (sizeof(struct IBF_Count) + sizeof(struct IBF_Key) \
73 + sizeof(struct IBF_KeyHash))
74
75
76/**
77 * Invertible bloom filter (IBF).
78 *
79 * An IBF is a counting bloom filter that has the ability to restore
80 * the hashes of its stored elements with high probability.
81 */
82struct InvertibleBloomFilter
83{
84 /**
85 * How many cells does this IBF have?
86 */
87 uint32_t size;
88
89 /**
90 * In how many cells do we hash one element?
91 * Usually 4 or 3.
92 */
93 uint8_t hash_num;
94
95 /**
96 * Xor sums of the elements' keys, used to identify the elements.
97 * Array of 'size' elements.
98 */
99 struct IBF_Key *key_sum;
100
101 /**
102 * Xor sums of the hashes of the keys of inserted elements.
103 * Array of 'size' elements.
104 */
105 struct IBF_KeyHash *key_hash_sum;
106
107 /**
108 * How many times has a bucket been hit?
109 * Can be negative, as a result of IBF subtraction.
110 * Array of 'size' elements.
111 */
112 struct IBF_Count *count;
113};
114
115
116/**
117 * Write buckets from an ibf to a buffer.
118 * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf.
119 *
120 * @param ibf the ibf to write
121 * @param start with which bucket to start
122 * @param count how many buckets to write
123 * @param buf buffer to write the data to
124 */
125void
126ibf_write_slice (const struct InvertibleBloomFilter *ibf,
127 uint32_t start,
128 uint32_t count,
129 void *buf);
130
131
132/**
133 * Read buckets from a buffer into an ibf.
134 *
135 * @param buf pointer to the buffer to read from
136 * @param start which bucket to start at
137 * @param count how many buckets to read
138 * @param ibf the ibf to write to
139 */
140void
141ibf_read_slice (const void *buf,
142 uint32_t start,
143 uint32_t count,
144 struct InvertibleBloomFilter *ibf);
145
146
147/**
148 * Create a key from a hashcode.
149 *
150 * @param hash the hashcode
151 * @return a key
152 */
153struct IBF_Key
154ibf_key_from_hashcode (const struct GNUNET_HashCode *hash);
155
156
157/**
158 * Create a hashcode from a key, by replicating the key
159 * until the hascode is filled
160 *
161 * @param key the key
162 * @param dst hashcode to store the result in
163 */
164void
165ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst);
166
167
168/**
169 * Create an invertible bloom filter.
170 *
171 * @param size number of IBF buckets
172 * @param hash_num number of buckets one element is hashed in, usually 3 or 4
173 * @return the newly created invertible bloom filter, NULL on error
174 */
175struct InvertibleBloomFilter *
176ibf_create (uint32_t size, uint8_t hash_num);
177
178
179/**
180 * Insert a key into an IBF.
181 *
182 * @param ibf the IBF
183 * @param key the element's hash code
184 */
185void
186ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
187
188
189/**
190 * Remove a key from an IBF.
191 *
192 * @param ibf the IBF
193 * @param key the element's hash code
194 */
195void
196ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
197
198
199/**
200 * Subtract ibf2 from ibf1, storing the result in ibf1.
201 * The two IBF's must have the same parameters size and hash_num.
202 *
203 * @param ibf1 IBF that is subtracted from
204 * @param ibf2 IBF that will be subtracted from ibf1
205 */
206void
207ibf_subtract (struct InvertibleBloomFilter *ibf1,
208 const struct InvertibleBloomFilter *ibf2);
209
210
211/**
212 * Decode and remove an element from the IBF, if possible.
213 *
214 * @param ibf the invertible bloom filter to decode
215 * @param ret_side sign of the cell's count where the decoded element came from.
216 * A negative sign indicates that the element was recovered
217 * resides in an IBF that was previously subtracted from.
218 * @param ret_id receives the hash code of the decoded element, if successful
219 * @return #GNUNET_YES if decoding an element was successful,
220 * #GNUNET_NO if the IBF is empty,
221 * #GNUNET_SYSERR if the decoding has failed
222 */
223int
224ibf_decode (struct InvertibleBloomFilter *ibf,
225 int *ret_side,
226 struct IBF_Key *ret_id);
227
228
229/**
230 * Create a copy of an IBF, the copy has to be destroyed properly.
231 *
232 * @param ibf the IBF to copy
233 */
234struct InvertibleBloomFilter *
235ibf_dup (const struct InvertibleBloomFilter *ibf);
236
237
238/**
239 * Destroy all resources associated with the invertible bloom filter.
240 * No more ibf_*-functions may be called on ibf after calling destroy.
241 *
242 * @param ibf the intertible bloom filter to destroy
243 */
244void
245ibf_destroy (struct InvertibleBloomFilter *ibf);
246
247
248#if 0 /* keep Emacsens' auto-indent happy */
249{
250#endif
251#ifdef __cplusplus
252}
253#endif
254
255#endif
diff --git a/src/setu/ibf_sim.c b/src/setu/ibf_sim.c
new file mode 100644
index 000000000..6415d00e1
--- /dev/null
+++ b/src/setu/ibf_sim.c
@@ -0,0 +1,142 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/ibf_sim.c
23 * @brief implementation of simulation for invertible bloom filter
24 * @author Florian Dold
25 *
26 * This code was used for some internal experiments, it is not
27 * build or shipped as part of the GNUnet system.
28 */
29#include <stdlib.h>
30#include <stdio.h>
31#include <string.h>
32
33#define MAX_IBF_DECODE 16
34
35/* report average over how many rounds? */
36#define ROUNDS 100000
37
38/* enable one of the three below */
39// simple fix
40#define FIX1 0
41// possibly slightly better fix for large IBF_DECODE values
42#define FIX2 1
43
44// SIGCOMM algorithm
45#define STRATA 0
46
47// print each value?
48#define VERBOSE 0
49// avoid assembly? (ASM is about 50% faster)
50#define SLOW 0
51
52int
53main (int argc, char **argv)
54{
55 unsigned int round;
56 unsigned int buckets[31]; // max is 2^31 as 'random' returns only between 0 and 2^31
57 unsigned int i;
58 int j;
59 unsigned int r;
60 unsigned int ret;
61 unsigned long long total;
62 unsigned int want;
63 double predict;
64
65 srandom (time (NULL));
66 total = 0;
67 want = atoi (argv[1]);
68 for (round = 0; round < ROUNDS; round++)
69 {
70 memset (buckets, 0, sizeof(buckets));
71 for (i = 0; i < want; i++)
72 {
73 /* FIXME: might want to use 'better' PRNG to avoid
74 PRNG-induced biases */
75 r = random ();
76 if (0 == r)
77 continue;
78#if SLOW
79 for (j = 0; (j < 31) && (0 == (r & (1 << j))); j++)
80 ;
81#else
82 /* use assembly / gcc */
83 j = __builtin_ffs (r) - 1;
84#endif
85 buckets[j]++;
86 }
87 ret = 0;
88 predict = 0.0;
89 for (j = 31; j >= 0; j--)
90 {
91#if FIX1
92 /* improved algorithm, for 1000 elements with IBF-DECODE 8, I
93 get 990/1000 elements on average over 1 million runs; key
94 idea being to stop short of the 'last' possible IBF as
95 otherwise a "lowball" per-chance would unduely influence the
96 result */if ((j > 0) &&
97 (buckets[j - 1] > MAX_IBF_DECODE))
98 {
99 ret *= (1 << (j + 1));
100 break;
101 }
102#endif
103#if FIX2
104 /* another improvement: don't just always cut off the last one,
105 but rather try to predict based on all previous values where
106 that "last" one is; additional prediction can only really
107 work if MAX_IBF_DECODE is sufficiently high */
108 if ((j > 0) &&
109 ((buckets[j - 1] > MAX_IBF_DECODE) ||
110 (predict > MAX_IBF_DECODE)))
111 {
112 ret *= (1 << (j + 1));
113 break;
114 }
115#endif
116#if STRATA
117 /* original algorithm, for 1000 elements with IBF-DECODE 8,
118 I get 920/1000 elements on average over 1 million runs */
119 if (buckets[j] > MAX_IBF_DECODE)
120 {
121 ret *= (1 << (j + 1));
122 break;
123 }
124#endif
125 ret += buckets[j];
126 predict = (buckets[j] + 2.0 * predict) / 2.0;
127 }
128#if VERBOSE
129 fprintf (stderr, "%u ", ret);
130#endif
131 total += ret;
132 }
133 fprintf (stderr, "\n");
134 fprintf (stdout, "average %llu\n", total / ROUNDS);
135 return 0;
136}
137
138
139/* TODO: should calculate stddev of the results to also be able to
140 say something about the stability of the results, outside of
141 large-scale averages -- gaining 8% precision at the expense of
142 50% additional variance might not be worth it... */
diff --git a/src/setu/plugin_block_setu_test.c b/src/setu/plugin_block_setu_test.c
new file mode 100644
index 000000000..fd0c8a680
--- /dev/null
+++ b/src/setu/plugin_block_setu_test.c
@@ -0,0 +1,123 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file setu/plugin_block_setu_test.c
23 * @brief set test block, recognizes elements with non-zero first byte as invalid
24 * @author Christian Grothoff
25 */
26
27#include "platform.h"
28#include "gnunet_block_plugin.h"
29#include "gnunet_block_group_lib.h"
30
31
32/**
33 * Function called to validate a reply or a request. For
34 * request evaluation, simply pass "NULL" for the reply_block.
35 *
36 * @param cls closure
37 * @param ctx block context
38 * @param type block type
39 * @param group block group to use
40 * @param eo control flags
41 * @param query original query (hash)
42 * @param xquery extrended query data (can be NULL, depending on type)
43 * @param xquery_size number of bytes in xquery
44 * @param reply_block response to validate
45 * @param reply_block_size number of bytes in reply block
46 * @return characterization of result
47 */
48static enum GNUNET_BLOCK_EvaluationResult
49block_plugin_setu_test_evaluate (void *cls,
50 struct GNUNET_BLOCK_Context *ctx,
51 enum GNUNET_BLOCK_Type type,
52 struct GNUNET_BLOCK_Group *group,
53 enum GNUNET_BLOCK_EvaluationOptions eo,
54 const struct GNUNET_HashCode *query,
55 const void *xquery,
56 size_t xquery_size,
57 const void *reply_block,
58 size_t reply_block_size)
59{
60 if ((NULL == reply_block) ||
61 (reply_block_size == 0) ||
62 (0 != ((char *) reply_block)[0]))
63 return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
64 return GNUNET_BLOCK_EVALUATION_OK_MORE;
65}
66
67
68/**
69 * Function called to obtain the key for a block.
70 *
71 * @param cls closure
72 * @param type block type
73 * @param block block to get the key for
74 * @param block_size number of bytes in block
75 * @param key set to the key (query) for the given block
76 * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported
77 * (or if extracting a key from a block of this type does not work)
78 */
79static int
80block_plugin_setu_test_get_key (void *cls,
81 enum GNUNET_BLOCK_Type type,
82 const void *block,
83 size_t block_size,
84 struct GNUNET_HashCode *key)
85{
86 return GNUNET_SYSERR;
87}
88
89
90/**
91 * Entry point for the plugin.
92 */
93void *
94libgnunet_plugin_block_setu_test_init (void *cls)
95{
96 static enum GNUNET_BLOCK_Type types[] = {
97 GNUNET_BLOCK_TYPE_SETU_TEST,
98 GNUNET_BLOCK_TYPE_ANY /* end of list */
99 };
100 struct GNUNET_BLOCK_PluginFunctions *api;
101
102 api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions);
103 api->evaluate = &block_plugin_setu_test_evaluate;
104 api->get_key = &block_plugin_setu_test_get_key;
105 api->types = types;
106 return api;
107}
108
109
110/**
111 * Exit point from the plugin.
112 */
113void *
114libgnunet_plugin_block_setu_test_done (void *cls)
115{
116 struct GNUNET_BLOCK_PluginFunctions *api = cls;
117
118 GNUNET_free (api);
119 return NULL;
120}
121
122
123/* end of plugin_block_setu_test.c */
diff --git a/src/setu/setu.conf.in b/src/setu/setu.conf.in
new file mode 100644
index 000000000..6c48f6156
--- /dev/null
+++ b/src/setu/setu.conf.in
@@ -0,0 +1,12 @@
1[setu]
2START_ON_DEMAND = @START_ON_DEMAND@
3@UNIXONLY@PORT = 2106
4HOSTNAME = localhost
5BINARY = gnunet-service-setu
6ACCEPT_FROM = 127.0.0.1;
7ACCEPT_FROM6 = ::1;
8UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-setu.sock
9UNIX_MATCH_UID = YES
10UNIX_MATCH_GID = YES
11
12#PREFIX = valgrind
diff --git a/src/setu/setu.h b/src/setu/setu.h
new file mode 100644
index 000000000..e9a0def95
--- /dev/null
+++ b/src/setu/setu.h
@@ -0,0 +1,315 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/set.h
22 * @brief messages used for the set union api
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#ifndef SET_H
27#define SET_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31#include "gnunet_set_service.h"
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35/**
36 * Message sent by the client to the service to ask starting
37 * a new set to perform operations with. Includes the desired
38 * set operation type.
39 */
40struct GNUNET_SETU_CreateMessage
41{
42 /**
43 * Type: #GNUNET_MESSAGE_TYPE_SETU_CREATE
44 */
45 struct GNUNET_MessageHeader header;
46
47};
48
49
50/**
51 * Message sent by the client to the service to start listening for
52 * incoming requests to perform a certain type of set operation for a
53 * certain type of application.
54 */
55struct GNUNET_SETU_ListenMessage
56{
57 /**
58 * Type: #GNUNET_MESSAGE_TYPE_SETU_LISTEN
59 */
60 struct GNUNET_MessageHeader header;
61
62 /**
63 * Always zero.
64 */
65 uint32_t reserved GNUNET_PACKED;
66
67 /**
68 * application id
69 */
70 struct GNUNET_HashCode app_id;
71};
72
73
74/**
75 * Message sent by a listening client to the service to accept
76 * performing the operation with the other peer.
77 */
78struct GNUNET_SETU_AcceptMessage
79{
80 /**
81 * Type: #GNUNET_MESSAGE_TYPE_SETU_ACCEPT
82 */
83 struct GNUNET_MessageHeader header;
84
85 /**
86 * ID of the incoming request we want to accept.
87 */
88 uint32_t accept_reject_id GNUNET_PACKED;
89
90 /**
91 * Request ID to identify responses.
92 */
93 uint32_t request_id GNUNET_PACKED;
94
95 /**
96 * Always use delta operation instead of sending full sets,
97 * even it it's less efficient.
98 */
99 uint8_t force_delta;
100
101 /**
102 * Always send full sets, even if delta operations would
103 * be more efficient.
104 */
105 uint8_t force_full;
106
107 /**
108 * #GNUNET_YES to fail operations where Byzantine faults
109 * are suspected
110 */
111 uint8_t byzantine;
112
113 /**
114 * #GNUNET_YES to also send back set elements we are sending to
115 * the remote peer.
116 */
117 uint8_t symmetric;
118
119 /**
120 * Lower bound for the set size, used only when
121 * byzantine mode is enabled.
122 */
123 uint32_t byzantine_lower_bound;
124};
125
126
127/**
128 * Message sent by a listening client to the service to reject
129 * performing the operation with the other peer.
130 */
131struct GNUNET_SETU_RejectMessage
132{
133 /**
134 * Type: #GNUNET_MESSAGE_TYPE_SETU_REJECT
135 */
136 struct GNUNET_MessageHeader header;
137
138 /**
139 * ID of the incoming request we want to reject.
140 */
141 uint32_t accept_reject_id GNUNET_PACKED;
142};
143
144
145/**
146 * A request for an operation with another client.
147 */
148struct GNUNET_SETU_RequestMessage
149{
150 /**
151 * Type: #GNUNET_MESSAGE_TYPE_SETU_REQUEST.
152 */
153 struct GNUNET_MessageHeader header;
154
155 /**
156 * ID of the to identify the request when accepting or
157 * rejecting it.
158 */
159 uint32_t accept_id GNUNET_PACKED;
160
161 /**
162 * Identity of the requesting peer.
163 */
164 struct GNUNET_PeerIdentity peer_id;
165
166 /* rest: context message, that is, application-specific
167 message to convince listener to pick up */
168};
169
170
171/**
172 * Message sent by client to service to initiate a set operation as a
173 * client (not as listener). A set (which determines the operation
174 * type) must already exist in association with this client.
175 */
176struct GNUNET_SETU_EvaluateMessage
177{
178 /**
179 * Type: #GNUNET_MESSAGE_TYPE_SETU_EVALUATE
180 */
181 struct GNUNET_MessageHeader header;
182
183 /**
184 * Id of our set to evaluate, chosen implicitly by the client when it
185 * calls #GNUNET_SETU_commit().
186 */
187 uint32_t request_id GNUNET_PACKED;
188
189 /**
190 * Peer to evaluate the operation with
191 */
192 struct GNUNET_PeerIdentity target_peer;
193
194 /**
195 * Application id
196 */
197 struct GNUNET_HashCode app_id;
198
199 /**
200 * Always use delta operation instead of sending full sets,
201 * even it it's less efficient.
202 */
203 uint8_t force_delta;
204
205 /**
206 * Always send full sets, even if delta operations would
207 * be more efficient.
208 */
209 uint8_t force_full;
210
211 /**
212 * #GNUNET_YES to fail operations where Byzantine faults
213 * are suspected
214 */
215 uint8_t byzantine;
216
217 /**
218 * Also return set elements we are sending to the remote peer.
219 */
220 uint8_t symmetric;
221
222 /**
223 * Lower bound for the set size, used only when
224 * byzantine mode is enabled.
225 */
226 uint32_t byzantine_lower_bound;
227
228 /* rest: context message, that is, application-specific
229 message to convince listener to pick up */
230};
231
232
233/**
234 * Message sent by the service to the client to indicate an
235 * element that is removed (set intersection) or added
236 * (set union) or part of the final result, depending on
237 * options specified for the operation.
238 */
239struct GNUNET_SETU_ResultMessage
240{
241 /**
242 * Type: #GNUNET_MESSAGE_TYPE_SETU_RESULT
243 */
244 struct GNUNET_MessageHeader header;
245
246 /**
247 * Current set size.
248 */
249 uint64_t current_size;
250
251 /**
252 * id the result belongs to
253 */
254 uint32_t request_id GNUNET_PACKED;
255
256 /**
257 * Was the evaluation successful? Contains
258 * an `enum GNUNET_SETU_Status` in NBO.
259 */
260 uint16_t result_status GNUNET_PACKED;
261
262 /**
263 * Type of the element attachted to the message, if any.
264 */
265 uint16_t element_type GNUNET_PACKED;
266
267 /* rest: the actual element */
268};
269
270
271/**
272 * Message sent by client to the service to add
273 * an element to the set.
274 */
275struct GNUNET_SETU_ElementMessage
276{
277 /**
278 * Type: #GNUNET_MESSAGE_TYPE_SETU_ADD
279 */
280 struct GNUNET_MessageHeader header;
281
282 /**
283 * Type of the element to add or remove.
284 */
285 uint16_t element_type GNUNET_PACKED;
286
287 /**
288 * For alignment, always zero.
289 */
290 uint16_t reserved GNUNET_PACKED;
291
292 /* rest: the actual element */
293};
294
295
296/**
297 * Sent to the service by the client in order to cancel a set operation.
298 */
299struct GNUNET_SETU_CancelMessage
300{
301 /**
302 * Type: #GNUNET_MESSAGE_TYPE_SETU_CANCEL
303 */
304 struct GNUNET_MessageHeader header;
305
306 /**
307 * ID of the request we want to cancel.
308 */
309 uint32_t request_id GNUNET_PACKED;
310};
311
312
313GNUNET_NETWORK_STRUCT_END
314
315#endif
diff --git a/src/setu/setu_api.c b/src/setu/setu_api.c
new file mode 100644
index 000000000..dd3a4a769
--- /dev/null
+++ b/src/setu/setu_api.c
@@ -0,0 +1,897 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2016, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/setu_api.c
22 * @brief api for the set union service
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_setu_service.h"
30#include "setu.h"
31
32
33#define LOG(kind, ...) GNUNET_log_from (kind, "set-api", __VA_ARGS__)
34
35/**
36 * Opaque handle to a set.
37 */
38struct GNUNET_SETU_Handle
39{
40 /**
41 * Message queue for @e client.
42 */
43 struct GNUNET_MQ_Handle *mq;
44
45 /**
46 * Linked list of operations on the set.
47 */
48 struct GNUNET_SETU_OperationHandle *ops_head;
49
50 /**
51 * Linked list of operations on the set.
52 */
53 struct GNUNET_SETU_OperationHandle *ops_tail;
54
55 /**
56 * Should the set be destroyed once all operations are gone?
57 * #GNUNET_SYSERR if #GNUNET_SETU_destroy() must raise this flag,
58 * #GNUNET_YES if #GNUNET_SETU_destroy() did raise this flag.
59 */
60 int destroy_requested;
61
62 /**
63 * Has the set become invalid (e.g. service died)?
64 */
65 int invalid;
66
67};
68
69
70/**
71 * Handle for a set operation request from another peer.
72 */
73struct GNUNET_SETU_Request
74{
75 /**
76 * Id of the request, used to identify the request when
77 * accepting/rejecting it.
78 */
79 uint32_t accept_id;
80
81 /**
82 * Has the request been accepted already?
83 * #GNUNET_YES/#GNUNET_NO
84 */
85 int accepted;
86};
87
88
89/**
90 * Handle to an operation. Only known to the service after committing
91 * the handle with a set.
92 */
93struct GNUNET_SETU_OperationHandle
94{
95 /**
96 * Function to be called when we have a result,
97 * or an error.
98 */
99 GNUNET_SETU_ResultIterator result_cb;
100
101 /**
102 * Closure for @e result_cb.
103 */
104 void *result_cls;
105
106 /**
107 * Local set used for the operation,
108 * NULL if no set has been provided by conclude yet.
109 */
110 struct GNUNET_SETU_Handle *set;
111
112 /**
113 * Message sent to the server on calling conclude,
114 * NULL if conclude has been called.
115 */
116 struct GNUNET_MQ_Envelope *conclude_mqm;
117
118 /**
119 * Address of the request if in the conclude message,
120 * used to patch the request id into the message when the set is known.
121 */
122 uint32_t *request_id_addr;
123
124 /**
125 * Handles are kept in a linked list.
126 */
127 struct GNUNET_SETU_OperationHandle *prev;
128
129 /**
130 * Handles are kept in a linked list.
131 */
132 struct GNUNET_SETU_OperationHandle *next;
133
134 /**
135 * Request ID to identify the operation within the set.
136 */
137 uint32_t request_id;
138};
139
140
141/**
142 * Opaque handle to a listen operation.
143 */
144struct GNUNET_SETU_ListenHandle
145{
146 /**
147 * Message queue for the client.
148 */
149 struct GNUNET_MQ_Handle*mq;
150
151 /**
152 * Configuration handle for the listener, stored
153 * here to be able to reconnect transparently on
154 * connection failure.
155 */
156 const struct GNUNET_CONFIGURATION_Handle *cfg;
157
158 /**
159 * Function to call on a new incoming request,
160 * or on error.
161 */
162 GNUNET_SETU_ListenCallback listen_cb;
163
164 /**
165 * Closure for @e listen_cb.
166 */
167 void *listen_cls;
168
169 /**
170 * Application ID we listen for.
171 */
172 struct GNUNET_HashCode app_id;
173
174 /**
175 * Time to wait until we try to reconnect on failure.
176 */
177 struct GNUNET_TIME_Relative reconnect_backoff;
178
179 /**
180 * Task for reconnecting when the listener fails.
181 */
182 struct GNUNET_SCHEDULER_Task *reconnect_task;
183
184};
185
186
187/**
188 * Check that the given @a msg is well-formed.
189 *
190 * @param cls closure
191 * @param msg message to check
192 * @return #GNUNET_OK if message is well-formed
193 */
194static int
195check_result (void *cls,
196 const struct GNUNET_SETU_ResultMessage *msg)
197{
198 /* minimum size was already checked, everything else is OK! */
199 return GNUNET_OK;
200}
201
202
203/**
204 * Handle result message for a set operation.
205 *
206 * @param cls the set
207 * @param mh the message
208 */
209static void
210handle_result (void *cls,
211 const struct GNUNET_SETU_ResultMessage *msg)
212{
213 struct GNUNET_SETU_Handle *set = cls;
214 struct GNUNET_SETU_OperationHandle *oh;
215 struct GNUNET_SETU_Element e;
216 enum GNUNET_SETU_Status result_status;
217 int destroy_set;
218
219 GNUNET_assert (NULL != set->mq);
220 result_status = (enum GNUNET_SETU_Status) ntohs (msg->result_status);
221 LOG (GNUNET_ERROR_TYPE_DEBUG,
222 "Got result message with status %d\n",
223 result_status);
224 oh = GNUNET_MQ_assoc_get (set->mq,
225 ntohl (msg->request_id));
226 if (NULL == oh)
227 {
228 /* 'oh' can be NULL if we canceled the operation, but the service
229 did not get the cancel message yet. */
230 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
231 "Ignoring result from canceled operation\n");
232 return;
233 }
234
235 switch (result_status)
236 {
237 case GNUNET_SETU_STATUS_ADD_LOCAL:
238 case GNUNET_SETU_STATUS_ADD_REMOTE:
239 e.data = &msg[1];
240 e.size = ntohs (msg->header.size)
241 - sizeof(struct GNUNET_SETU_ResultMessage);
242 e.element_type = ntohs (msg->element_type);
243 if (NULL != oh->result_cb)
244 oh->result_cb (oh->result_cls,
245 &e,
246 GNUNET_ntohll (msg->current_size),
247 result_status);
248 return;
249 case GNUNET_SETU_STATUS_FAILURE:
250 case GNUNET_SETU_STATUS_DONE:
251 LOG (GNUNET_ERROR_TYPE_DEBUG,
252 "Treating result as final status\n");
253 GNUNET_MQ_assoc_remove (set->mq,
254 ntohl (msg->request_id));
255 GNUNET_CONTAINER_DLL_remove (set->ops_head,
256 set->ops_tail,
257 oh);
258 /* Need to do this calculation _before_ the result callback,
259 as IF the application still has a valid set handle, it
260 may trigger destruction of the set during the callback. */
261 destroy_set = (GNUNET_YES == set->destroy_requested) &&
262 (NULL == set->ops_head);
263 if (NULL != oh->result_cb)
264 {
265 oh->result_cb (oh->result_cls,
266 NULL,
267 GNUNET_ntohll (msg->current_size),
268 result_status);
269 }
270 else
271 {
272 LOG (GNUNET_ERROR_TYPE_DEBUG,
273 "No callback for final status\n");
274 }
275 if (destroy_set)
276 GNUNET_SETU_destroy (set);
277 GNUNET_free (oh);
278 return;
279 }
280}
281
282
283/**
284 * Destroy the given set operation.
285 *
286 * @param oh set operation to destroy
287 */
288static void
289set_operation_destroy (struct GNUNET_SETU_OperationHandle *oh)
290{
291 struct GNUNET_SETU_Handle *set = oh->set;
292 struct GNUNET_SETU_OperationHandle *h_assoc;
293
294 if (NULL != oh->conclude_mqm)
295 GNUNET_MQ_discard (oh->conclude_mqm);
296 /* is the operation already commited? */
297 if (NULL != set)
298 {
299 GNUNET_CONTAINER_DLL_remove (set->ops_head,
300 set->ops_tail,
301 oh);
302 h_assoc = GNUNET_MQ_assoc_remove (set->mq,
303 oh->request_id);
304 GNUNET_assert ((NULL == h_assoc) ||
305 (h_assoc == oh));
306 }
307 GNUNET_free (oh);
308}
309
310
311/**
312 * Cancel the given set operation. We need to send an explicit cancel
313 * message, as all operations one one set communicate using one
314 * handle.
315 *
316 * @param oh set operation to cancel
317 */
318void
319GNUNET_SETU_operation_cancel (struct GNUNET_SETU_OperationHandle *oh)
320{
321 struct GNUNET_SETU_Handle *set = oh->set;
322 struct GNUNET_SETU_CancelMessage *m;
323 struct GNUNET_MQ_Envelope *mqm;
324
325 LOG (GNUNET_ERROR_TYPE_DEBUG,
326 "Cancelling SET operation\n");
327 if (NULL != set)
328 {
329 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETU_CANCEL);
330 m->request_id = htonl (oh->request_id);
331 GNUNET_MQ_send (set->mq, mqm);
332 }
333 set_operation_destroy (oh);
334 if ((NULL != set) &&
335 (GNUNET_YES == set->destroy_requested) &&
336 (NULL == set->ops_head))
337 {
338 LOG (GNUNET_ERROR_TYPE_DEBUG,
339 "Destroying set after operation cancel\n");
340 GNUNET_SETU_destroy (set);
341 }
342}
343
344
345/**
346 * We encountered an error communicating with the set service while
347 * performing a set operation. Report to the application.
348 *
349 * @param cls the `struct GNUNET_SETU_Handle`
350 * @param error error code
351 */
352static void
353handle_client_set_error (void *cls,
354 enum GNUNET_MQ_Error error)
355{
356 struct GNUNET_SETU_Handle *set = cls;
357
358 LOG (GNUNET_ERROR_TYPE_ERROR,
359 "Handling client set error %d\n",
360 error);
361 while (NULL != set->ops_head)
362 {
363 if ((NULL != set->ops_head->result_cb) &&
364 (GNUNET_NO == set->destroy_requested))
365 set->ops_head->result_cb (set->ops_head->result_cls,
366 NULL,
367 0,
368 GNUNET_SETU_STATUS_FAILURE);
369 set_operation_destroy (set->ops_head);
370 }
371 set->invalid = GNUNET_YES;
372}
373
374
375/**
376 * Create an empty set, supporting the specified operation.
377 *
378 * @param cfg configuration to use for connecting to the
379 * set service
380 * @return a handle to the set
381 */
382struct GNUNET_SETU_Handle *
383GNUNET_SETU_create (const struct GNUNET_CONFIGURATION_Handle *cfg)
384{
385 struct GNUNET_SETU_Handle *set = GNUNET_new (struct GNUNET_SETU_Handle);
386 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
387 GNUNET_MQ_hd_var_size (result,
388 GNUNET_MESSAGE_TYPE_SETU_RESULT,
389 struct GNUNET_SETU_ResultMessage,
390 set),
391 GNUNET_MQ_handler_end ()
392 };
393 struct GNUNET_MQ_Envelope *mqm;
394 struct GNUNET_SETU_CreateMessage *create_msg;
395
396 set->mq = GNUNET_CLIENT_connect (cfg,
397 "setu",
398 mq_handlers,
399 &handle_client_set_error,
400 set);
401 if (NULL == set->mq)
402 {
403 GNUNET_free (set);
404 return NULL;
405 }
406 mqm = GNUNET_MQ_msg (create_msg,
407 GNUNET_MESSAGE_TYPE_SETU_CREATE);
408 GNUNET_MQ_send (set->mq,
409 mqm);
410 return set;
411}
412
413
414/**
415 * Add an element to the given set. After the element has been added
416 * (in the sense of being transmitted to the set service), @a cont
417 * will be called. Multiple calls to GNUNET_SETU_add_element() can be
418 * queued.
419 *
420 * @param set set to add element to
421 * @param element element to add to the set
422 * @param cb continuation called after the element has been added
423 * @param cb_cls closure for @a cb
424 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
425 * set is invalid (e.g. the set service crashed)
426 */
427int
428GNUNET_SETU_add_element (struct GNUNET_SETU_Handle *set,
429 const struct GNUNET_SETU_Element *element,
430 GNUNET_SCHEDULER_TaskCallback cb,
431 void *cb_cls)
432{
433 struct GNUNET_MQ_Envelope *mqm;
434 struct GNUNET_SETU_ElementMessage *msg;
435
436 LOG (GNUNET_ERROR_TYPE_DEBUG,
437 "adding element of type %u to set %p\n",
438 (unsigned int) element->element_type,
439 set);
440 GNUNET_assert (NULL != set);
441 if (GNUNET_YES == set->invalid)
442 {
443 if (NULL != cb)
444 cb (cb_cls);
445 return GNUNET_SYSERR;
446 }
447 mqm = GNUNET_MQ_msg_extra (msg,
448 element->size,
449 GNUNET_MESSAGE_TYPE_SETU_ADD);
450 msg->element_type = htons (element->element_type);
451 GNUNET_memcpy (&msg[1],
452 element->data,
453 element->size);
454 GNUNET_MQ_notify_sent (mqm,
455 cb,
456 cb_cls);
457 GNUNET_MQ_send (set->mq,
458 mqm);
459 return GNUNET_OK;
460}
461
462
463/**
464 * Destroy the set handle if no operations are left, mark the set
465 * for destruction otherwise.
466 *
467 * @param set set handle to destroy
468 */
469void
470GNUNET_SETU_destroy (struct GNUNET_SETU_Handle *set)
471{
472 /* destroying set while iterator is active is currently
473 not supported; we should expand the API to allow
474 clients to explicitly cancel the iteration! */
475 GNUNET_assert (NULL != set);
476 if ((NULL != set->ops_head) ||
477 (GNUNET_SYSERR == set->destroy_requested))
478 {
479 LOG (GNUNET_ERROR_TYPE_DEBUG,
480 "Set operations are pending, delaying set destruction\n");
481 set->destroy_requested = GNUNET_YES;
482 return;
483 }
484 LOG (GNUNET_ERROR_TYPE_DEBUG,
485 "Really destroying set\n");
486 if (NULL != set->mq)
487 {
488 GNUNET_MQ_destroy (set->mq);
489 set->mq = NULL;
490 }
491 GNUNET_free (set);
492}
493
494
495/**
496 * Prepare a set operation to be evaluated with another peer.
497 * The evaluation will not start until the client provides
498 * a local set with #GNUNET_SETU_commit().
499 *
500 * @param other_peer peer with the other set
501 * @param app_id hash for the application using the set
502 * @param context_msg additional information for the request
503 * @param result_cb called on error or success
504 * @param result_cls closure for @e result_cb
505 * @return a handle to cancel the operation
506 */
507struct GNUNET_SETU_OperationHandle *
508GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
509 const struct GNUNET_HashCode *app_id,
510 const struct GNUNET_MessageHeader *context_msg,
511 const struct GNUNET_SETU_Option options[],
512 GNUNET_SETU_ResultIterator result_cb,
513 void *result_cls)
514{
515 struct GNUNET_MQ_Envelope *mqm;
516 struct GNUNET_SETU_OperationHandle *oh;
517 struct GNUNET_SETU_EvaluateMessage *msg;
518
519 LOG (GNUNET_ERROR_TYPE_DEBUG,
520 "Client prepares set union operation\n");
521 oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
522 oh->result_cb = result_cb;
523 oh->result_cls = result_cls;
524 mqm = GNUNET_MQ_msg_nested_mh (msg,
525 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
526 context_msg);
527 msg->app_id = *app_id;
528 msg->target_peer = *other_peer;
529 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
530 {
531 switch (opt->type)
532 {
533 case GNUNET_SETU_OPTION_BYZANTINE:
534 msg->byzantine = GNUNET_YES;
535 msg->byzantine_lower_bound = htonl (opt->v.num);
536 break;
537 case GNUNET_SETU_OPTION_FORCE_FULL:
538 msg->force_full = GNUNET_YES;
539 break;
540 case GNUNET_SETU_OPTION_FORCE_DELTA:
541 msg->force_delta = GNUNET_YES;
542 break;
543 case GNUNET_SETU_OPTION_SYMMETRIC:
544 msg->symmetric = GNUNET_YES;
545 break;
546 default:
547 LOG (GNUNET_ERROR_TYPE_ERROR,
548 "Option with type %d not recognized\n",
549 (int) opt->type);
550 }
551 }
552 oh->conclude_mqm = mqm;
553 oh->request_id_addr = &msg->request_id;
554 return oh;
555}
556
557
558/**
559 * Connect to the set service in order to listen for requests.
560 *
561 * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
562 */
563static void
564listen_connect (void *cls);
565
566
567/**
568 * Check validity of request message for a listen operation
569 *
570 * @param cls the listen handle
571 * @param msg the message
572 * @return #GNUNET_OK if the message is well-formed
573 */
574static int
575check_request (void *cls,
576 const struct GNUNET_SETU_RequestMessage *msg)
577{
578 const struct GNUNET_MessageHeader *context_msg;
579
580 if (ntohs (msg->header.size) == sizeof(*msg))
581 return GNUNET_OK; /* no context message is OK */
582 context_msg = GNUNET_MQ_extract_nested_mh (msg);
583 if (NULL == context_msg)
584 {
585 /* malformed context message is NOT ok */
586 GNUNET_break_op (0);
587 return GNUNET_SYSERR;
588 }
589 return GNUNET_OK;
590}
591
592
593/**
594 * Handle request message for a listen operation
595 *
596 * @param cls the listen handle
597 * @param msg the message
598 */
599static void
600handle_request (void *cls,
601 const struct GNUNET_SETU_RequestMessage *msg)
602{
603 struct GNUNET_SETU_ListenHandle *lh = cls;
604 struct GNUNET_SETU_Request req;
605 const struct GNUNET_MessageHeader *context_msg;
606 struct GNUNET_MQ_Envelope *mqm;
607 struct GNUNET_SETU_RejectMessage *rmsg;
608
609 LOG (GNUNET_ERROR_TYPE_DEBUG,
610 "Processing incoming operation request with id %u\n",
611 ntohl (msg->accept_id));
612 /* we got another valid request => reset the backoff */
613 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
614 req.accept_id = ntohl (msg->accept_id);
615 req.accepted = GNUNET_NO;
616 context_msg = GNUNET_MQ_extract_nested_mh (msg);
617 /* calling #GNUNET_SETU_accept() in the listen cb will set req->accepted */
618 lh->listen_cb (lh->listen_cls,
619 &msg->peer_id,
620 context_msg,
621 &req);
622 if (GNUNET_YES == req.accepted)
623 return; /* the accept-case is handled in #GNUNET_SETU_accept() */
624 LOG (GNUNET_ERROR_TYPE_DEBUG,
625 "Rejected request %u\n",
626 ntohl (msg->accept_id));
627 mqm = GNUNET_MQ_msg (rmsg,
628 GNUNET_MESSAGE_TYPE_SETU_REJECT);
629 rmsg->accept_reject_id = msg->accept_id;
630 GNUNET_MQ_send (lh->mq,
631 mqm);
632}
633
634
635/**
636 * Our connection with the set service encountered an error,
637 * re-initialize with exponential back-off.
638 *
639 * @param cls the `struct GNUNET_SETU_ListenHandle *`
640 * @param error reason for the disconnect
641 */
642static void
643handle_client_listener_error (void *cls,
644 enum GNUNET_MQ_Error error)
645{
646 struct GNUNET_SETU_ListenHandle *lh = cls;
647
648 LOG (GNUNET_ERROR_TYPE_DEBUG,
649 "Listener broke down (%d), re-connecting\n",
650 (int) error);
651 GNUNET_MQ_destroy (lh->mq);
652 lh->mq = NULL;
653 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
654 &listen_connect,
655 lh);
656 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
657}
658
659
660/**
661 * Connect to the set service in order to listen for requests.
662 *
663 * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
664 */
665static void
666listen_connect (void *cls)
667{
668 struct GNUNET_SETU_ListenHandle *lh = cls;
669 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
670 GNUNET_MQ_hd_var_size (request,
671 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
672 struct GNUNET_SETU_RequestMessage,
673 lh),
674 GNUNET_MQ_handler_end ()
675 };
676 struct GNUNET_MQ_Envelope *mqm;
677 struct GNUNET_SETU_ListenMessage *msg;
678
679 lh->reconnect_task = NULL;
680 GNUNET_assert (NULL == lh->mq);
681 lh->mq = GNUNET_CLIENT_connect (lh->cfg,
682 "setu",
683 mq_handlers,
684 &handle_client_listener_error,
685 lh);
686 if (NULL == lh->mq)
687 return;
688 mqm = GNUNET_MQ_msg (msg,
689 GNUNET_MESSAGE_TYPE_SETU_LISTEN);
690 msg->app_id = lh->app_id;
691 GNUNET_MQ_send (lh->mq,
692 mqm);
693}
694
695
696/**
697 * Wait for set operation requests for the given application id
698 *
699 * @param cfg configuration to use for connecting to
700 * the set service, needs to be valid for the lifetime of the listen handle
701 * @param app_id id of the application that handles set operation requests
702 * @param listen_cb called for each incoming request matching the operation
703 * and application id
704 * @param listen_cls handle for @a listen_cb
705 * @return a handle that can be used to cancel the listen operation
706 */
707struct GNUNET_SETU_ListenHandle *
708GNUNET_SETU_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
709 const struct GNUNET_HashCode *app_id,
710 GNUNET_SETU_ListenCallback listen_cb,
711 void *listen_cls)
712{
713 struct GNUNET_SETU_ListenHandle *lh;
714
715 LOG (GNUNET_ERROR_TYPE_DEBUG,
716 "Starting listener for app %s\n",
717 GNUNET_h2s (app_id));
718 lh = GNUNET_new (struct GNUNET_SETU_ListenHandle);
719 lh->listen_cb = listen_cb;
720 lh->listen_cls = listen_cls;
721 lh->cfg = cfg;
722 lh->app_id = *app_id;
723 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
724 listen_connect (lh);
725 if (NULL == lh->mq)
726 {
727 GNUNET_free (lh);
728 return NULL;
729 }
730 return lh;
731}
732
733
734/**
735 * Cancel the given listen operation.
736 *
737 * @param lh handle for the listen operation
738 */
739void
740GNUNET_SETU_listen_cancel (struct GNUNET_SETU_ListenHandle *lh)
741{
742 LOG (GNUNET_ERROR_TYPE_DEBUG,
743 "Canceling listener %s\n",
744 GNUNET_h2s (&lh->app_id));
745 if (NULL != lh->mq)
746 {
747 GNUNET_MQ_destroy (lh->mq);
748 lh->mq = NULL;
749 }
750 if (NULL != lh->reconnect_task)
751 {
752 GNUNET_SCHEDULER_cancel (lh->reconnect_task);
753 lh->reconnect_task = NULL;
754 }
755 GNUNET_free (lh);
756}
757
758
759/**
760 * Accept a request we got via #GNUNET_SETU_listen. Must be called during
761 * #GNUNET_SETU_listen, as the 'struct GNUNET_SETU_Request' becomes invalid
762 * afterwards.
763 * Call #GNUNET_SETU_commit to provide the local set to use for the operation,
764 * and to begin the exchange with the remote peer.
765 *
766 * @param request request to accept
767 * @param result_mode specified how results will be returned,
768 * see `enum GNUNET_SETU_ResultMode`.
769 * @param result_cb callback for the results
770 * @param result_cls closure for @a result_cb
771 * @return a handle to cancel the operation
772 */
773struct GNUNET_SETU_OperationHandle *
774GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
775 const struct GNUNET_SETU_Option options[],
776 GNUNET_SETU_ResultIterator result_cb,
777 void *result_cls)
778{
779 struct GNUNET_MQ_Envelope *mqm;
780 struct GNUNET_SETU_OperationHandle *oh;
781 struct GNUNET_SETU_AcceptMessage *msg;
782
783 GNUNET_assert (GNUNET_NO == request->accepted);
784 LOG (GNUNET_ERROR_TYPE_DEBUG,
785 "Client accepts set union operation with id %u\n",
786 request->accept_id);
787 request->accepted = GNUNET_YES;
788 mqm = GNUNET_MQ_msg (msg,
789 GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
790 msg->accept_reject_id = htonl (request->accept_id);
791 for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
792 {
793 switch (opt->type)
794 {
795 case GNUNET_SETU_OPTION_BYZANTINE:
796 msg->byzantine = GNUNET_YES;
797 msg->byzantine_lower_bound = htonl (opt->v.num);
798 break;
799 case GNUNET_SETU_OPTION_FORCE_FULL:
800 msg->force_full = GNUNET_YES;
801 break;
802 case GNUNET_SETU_OPTION_FORCE_DELTA:
803 msg->force_delta = GNUNET_YES;
804 break;
805 case GNUNET_SETU_OPTION_SYMMETRIC:
806 msg->symmetric = GNUNET_YES;
807 break;
808 default:
809 LOG (GNUNET_ERROR_TYPE_ERROR,
810 "Option with type %d not recognized\n",
811 (int) opt->type);
812 }
813 }
814 oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
815 oh->result_cb = result_cb;
816 oh->result_cls = result_cls;
817 oh->conclude_mqm = mqm;
818 oh->request_id_addr = &msg->request_id;
819 return oh;
820}
821
822
823/**
824 * Commit a set to be used with a set operation.
825 * This function is called once we have fully constructed
826 * the set that we want to use for the operation. At this
827 * time, the P2P protocol can then begin to exchange the
828 * set information and call the result callback with the
829 * result information.
830 *
831 * @param oh handle to the set operation
832 * @param set the set to use for the operation
833 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
834 * set is invalid (e.g. the set service crashed)
835 */
836int
837GNUNET_SETU_commit (struct GNUNET_SETU_OperationHandle *oh,
838 struct GNUNET_SETU_Handle *set)
839{
840 if (NULL != oh->set)
841 {
842 /* Some other set was already committed for this
843 * operation, there is a logic bug in the client of this API */
844 GNUNET_break (0);
845 return GNUNET_OK;
846 }
847 GNUNET_assert (NULL != set);
848 if (GNUNET_YES == set->invalid)
849 return GNUNET_SYSERR;
850 LOG (GNUNET_ERROR_TYPE_DEBUG,
851 "Client commits to SET\n");
852 GNUNET_assert (NULL != oh->conclude_mqm);
853 oh->set = set;
854 GNUNET_CONTAINER_DLL_insert (set->ops_head,
855 set->ops_tail,
856 oh);
857 oh->request_id = GNUNET_MQ_assoc_add (set->mq,
858 oh);
859 *oh->request_id_addr = htonl (oh->request_id);
860 GNUNET_MQ_send (set->mq,
861 oh->conclude_mqm);
862 oh->conclude_mqm = NULL;
863 oh->request_id_addr = NULL;
864 return GNUNET_OK;
865}
866
867
868/**
869 * Hash a set element.
870 *
871 * @param element the element that should be hashed
872 * @param[out] ret_hash a pointer to where the hash of @a element
873 * should be stored
874 */
875void
876GNUNET_SETU_element_hash (const struct GNUNET_SETU_Element *element,
877 struct GNUNET_HashCode *ret_hash)
878{
879 struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
880
881 /* It's not guaranteed that the element data is always after the element header,
882 so we need to hash the chunks separately. */
883 GNUNET_CRYPTO_hash_context_read (ctx,
884 &element->size,
885 sizeof(uint16_t));
886 GNUNET_CRYPTO_hash_context_read (ctx,
887 &element->element_type,
888 sizeof(uint16_t));
889 GNUNET_CRYPTO_hash_context_read (ctx,
890 element->data,
891 element->size);
892 GNUNET_CRYPTO_hash_context_finish (ctx,
893 ret_hash);
894}
895
896
897/* end of setu_api.c */
diff --git a/src/setu/test_setu_api.c b/src/setu/test_setu_api.c
new file mode 100644
index 000000000..95119873c
--- /dev/null
+++ b/src/setu/test_setu_api.c
@@ -0,0 +1,360 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/test_setu_api.c
23 * @brief testcase for setu_api.c
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_testing_lib.h"
29#include "gnunet_setu_service.h"
30
31
32static struct GNUNET_PeerIdentity local_id;
33
34static struct GNUNET_HashCode app_id;
35
36static struct GNUNET_SETU_Handle *set1;
37
38static struct GNUNET_SETU_Handle *set2;
39
40static struct GNUNET_SETU_ListenHandle *listen_handle;
41
42static struct GNUNET_SETU_OperationHandle *oh1;
43
44static struct GNUNET_SETU_OperationHandle *oh2;
45
46static const struct GNUNET_CONFIGURATION_Handle *config;
47
48static int ret;
49
50static struct GNUNET_SCHEDULER_Task *tt;
51
52
53static void
54result_cb_set1 (void *cls,
55 const struct GNUNET_SETU_Element *element,
56 uint64_t size,
57 enum GNUNET_SETU_Status status)
58{
59 switch (status)
60 {
61 case GNUNET_SETU_STATUS_ADD_LOCAL:
62 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: got element\n");
63 break;
64
65 case GNUNET_SETU_STATUS_FAILURE:
66 GNUNET_break (0);
67 oh1 = NULL;
68 fprintf (stderr, "set 1: received failure status!\n");
69 ret = 1;
70 if (NULL != tt)
71 {
72 GNUNET_SCHEDULER_cancel (tt);
73 tt = NULL;
74 }
75 GNUNET_SCHEDULER_shutdown ();
76 break;
77
78 case GNUNET_SETU_STATUS_DONE:
79 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 1: done\n");
80 oh1 = NULL;
81 if (NULL != set1)
82 {
83 GNUNET_SETU_destroy (set1);
84 set1 = NULL;
85 }
86 if (NULL == set2)
87 {
88 GNUNET_SCHEDULER_cancel (tt);
89 tt = NULL;
90 GNUNET_SCHEDULER_shutdown ();
91 }
92 break;
93
94 default:
95 GNUNET_assert (0);
96 }
97}
98
99
100static void
101result_cb_set2 (void *cls,
102 const struct GNUNET_SETU_Element *element,
103 uint64_t size,
104 enum GNUNET_SETU_Status status)
105{
106 switch (status)
107 {
108 case GNUNET_SETU_STATUS_ADD_LOCAL:
109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: got element\n");
110 break;
111
112 case GNUNET_SETU_STATUS_FAILURE:
113 GNUNET_break (0);
114 oh2 = NULL;
115 fprintf (stderr, "set 2: received failure status\n");
116 GNUNET_SCHEDULER_shutdown ();
117 ret = 1;
118 break;
119
120 case GNUNET_SETU_STATUS_DONE:
121 oh2 = NULL;
122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "set 2: done\n");
123 GNUNET_SETU_destroy (set2);
124 set2 = NULL;
125 if (NULL == set1)
126 {
127 GNUNET_SCHEDULER_cancel (tt);
128 tt = NULL;
129 GNUNET_SCHEDULER_shutdown ();
130 }
131 break;
132
133 default:
134 GNUNET_assert (0);
135 }
136}
137
138
139static void
140listen_cb (void *cls,
141 const struct GNUNET_PeerIdentity *other_peer,
142 const struct GNUNET_MessageHeader *context_msg,
143 struct GNUNET_SETU_Request *request)
144{
145 GNUNET_assert (NULL != context_msg);
146 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
147 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listen cb called\n");
148 oh2 = GNUNET_SETU_accept (request,
149 (struct GNUNET_SETU_Option[]){ 0 },
150 &result_cb_set2,
151 NULL);
152 GNUNET_SETU_commit (oh2, set2);
153}
154
155
156/**
157 * Start the set operation.
158 *
159 * @param cls closure, unused
160 */
161static void
162start (void *cls)
163{
164 struct GNUNET_MessageHeader context_msg;
165
166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting reconciliation\n");
167 context_msg.size = htons (sizeof context_msg);
168 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
169 listen_handle = GNUNET_SETU_listen (config,
170 &app_id,
171 &listen_cb,
172 NULL);
173 oh1 = GNUNET_SETU_prepare (&local_id,
174 &app_id,
175 &context_msg,
176 (struct GNUNET_SETU_Option[]){ 0 },
177 &result_cb_set1,
178 NULL);
179 GNUNET_SETU_commit (oh1, set1);
180}
181
182
183/**
184 * Initialize the second set, continue
185 *
186 * @param cls closure, unused
187 */
188static void
189init_set2 (void *cls)
190{
191 struct GNUNET_SETU_Element element;
192
193 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
194
195 element.element_type = 0;
196 element.data = "hello";
197 element.size = strlen (element.data);
198 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
199 element.data = "quux";
200 element.size = strlen (element.data);
201 GNUNET_SETU_add_element (set2, &element, NULL, NULL);
202 element.data = "baz";
203 element.size = strlen (element.data);
204 GNUNET_SETU_add_element (set2, &element, &start, NULL);
205}
206
207
208/**
209 * Initialize the first set, continue.
210 */
211static void
212init_set1 (void)
213{
214 struct GNUNET_SETU_Element element;
215
216 element.element_type = 0;
217 element.data = "hello";
218 element.size = strlen (element.data);
219 GNUNET_SETU_add_element (set1, &element, NULL, NULL);
220 element.data = "bar";
221 element.size = strlen (element.data);
222 GNUNET_SETU_add_element (set1, &element, &init_set2, NULL);
223 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
224}
225
226
227/**
228 * Function run on timeout.
229 *
230 * @param cls closure
231 */
232static void
233timeout_fail (void *cls)
234{
235 tt = NULL;
236 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, "Testcase failed with timeout\n");
237 GNUNET_SCHEDULER_shutdown ();
238 ret = 1;
239}
240
241
242/**
243 * Function run on shutdown.
244 *
245 * @param cls closure
246 */
247static void
248do_shutdown (void *cls)
249{
250 if (NULL != tt)
251 {
252 GNUNET_SCHEDULER_cancel (tt);
253 tt = NULL;
254 }
255 if (NULL != oh1)
256 {
257 GNUNET_SETU_operation_cancel (oh1);
258 oh1 = NULL;
259 }
260 if (NULL != oh2)
261 {
262 GNUNET_SETU_operation_cancel (oh2);
263 oh2 = NULL;
264 }
265 if (NULL != set1)
266 {
267 GNUNET_SETU_destroy (set1);
268 set1 = NULL;
269 }
270 if (NULL != set2)
271 {
272 GNUNET_SETU_destroy (set2);
273 set2 = NULL;
274 }
275 if (NULL != listen_handle)
276 {
277 GNUNET_SETU_listen_cancel (listen_handle);
278 listen_handle = NULL;
279 }
280}
281
282
283/**
284 * Signature of the 'main' function for a (single-peer) testcase that
285 * is run using 'GNUNET_TESTING_peer_run'.
286 *
287 * @param cls closure
288 * @param cfg configuration of the peer that was started
289 * @param peer identity of the peer that was created
290 */
291static void
292run (void *cls,
293 const struct GNUNET_CONFIGURATION_Handle *cfg,
294 struct GNUNET_TESTING_Peer *peer)
295{
296 struct GNUNET_SETU_OperationHandle *my_oh;
297
298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
299 "Running preparatory tests\n");
300 tt = GNUNET_SCHEDULER_add_delayed (
301 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
302 &timeout_fail,
303 NULL);
304 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
305
306 config = cfg;
307 GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_get_peer_identity (cfg,
308 &local_id));
309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
310 "my id (from CRYPTO): %s\n",
311 GNUNET_i2s (&local_id));
312 GNUNET_TESTING_peer_get_identity (peer,
313 &local_id);
314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
315 "my id (from TESTING): %s\n",
316 GNUNET_i2s (&local_id));
317 set1 = GNUNET_SETU_create (cfg);
318 set2 = GNUNET_SETU_create (cfg);
319 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
320 "Created sets %p and %p for union operation\n",
321 set1,
322 set2);
323 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
324
325 /* test if canceling an uncommited request works! */
326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
327 "Launching and instantly stopping set operation\n");
328 my_oh = GNUNET_SETU_prepare (&local_id,
329 &app_id,
330 NULL,
331 (struct GNUNET_SETU_Option[]){ 0 },
332 NULL,
333 NULL);
334 GNUNET_SETU_operation_cancel (my_oh);
335
336 /* test the real set reconciliation */
337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
338 "Running real set-reconciliation\n");
339 init_set1 ();
340}
341
342
343int
344main (int argc, char **argv)
345{
346 GNUNET_log_setup ("test_setu_api",
347 "WARNING",
348 NULL);
349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
350 "Launching peer\n");
351 if (0 !=
352 GNUNET_TESTING_peer_run ("test_setu_api",
353 "test_setu.conf",
354 &run,
355 NULL))
356 {
357 return 1;
358 }
359 return ret;
360}