aboutsummaryrefslogtreecommitdiff
path: root/src/service/seti
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/seti')
-rw-r--r--src/service/seti/.gitignore3
-rw-r--r--src/service/seti/Makefile.am59
-rw-r--r--src/service/seti/gnunet-service-seti.c2516
-rw-r--r--src/service/seti/gnunet-service-seti_protocol.h144
-rw-r--r--src/service/seti/meson.build41
-rw-r--r--src/service/seti/seti.conf.in12
-rw-r--r--src/service/seti/seti.h267
-rw-r--r--src/service/seti/seti_api.c870
-rw-r--r--src/service/seti/test_seti.conf32
-rw-r--r--src/service/seti/test_seti_api.c345
10 files changed, 4289 insertions, 0 deletions
diff --git a/src/service/seti/.gitignore b/src/service/seti/.gitignore
new file mode 100644
index 000000000..5f234a4c2
--- /dev/null
+++ b/src/service/seti/.gitignore
@@ -0,0 +1,3 @@
1gnunet-seti-profiler
2gnunet-service-seti
3test_seti_api
diff --git a/src/service/seti/Makefile.am b/src/service/seti/Makefile.am
new file mode 100644
index 000000000..3c0c66ed8
--- /dev/null
+++ b/src/service/seti/Makefile.am
@@ -0,0 +1,59 @@
1# This Makefile.am is in the public domain
2AM_CPPFLAGS = -I$(top_srcdir)/src/include
3
4pkgcfgdir= $(pkgdatadir)/config.d/
5
6libexecdir= $(pkglibdir)/libexec/
7
8plugindir = $(libdir)/gnunet
9
10pkgcfg_DATA = \
11 seti.conf
12
13if USE_COVERAGE
14 AM_CFLAGS = -fprofile-arcs -ftest-coverage
15endif
16
17libexec_PROGRAMS = \
18 gnunet-service-seti
19
20lib_LTLIBRARIES = \
21 libgnunetseti.la
22
23gnunet_service_seti_SOURCES = \
24 gnunet-service-seti.c \
25 gnunet-service-seti_protocol.h
26gnunet_service_seti_LDADD = \
27 $(top_builddir)/src/lib/util/libgnunetutil.la \
28 $(top_builddir)/src/service/statistics/libgnunetstatistics.la \
29 $(top_builddir)/src/service/core/libgnunetcore.la \
30 $(top_builddir)/src/service/cadet/libgnunetcadet.la \
31 $(top_builddir)/src/lib/block/libgnunetblock.la \
32 libgnunetseti.la \
33 $(GN_LIBINTL)
34
35libgnunetseti_la_SOURCES = \
36 seti_api.c seti.h
37libgnunetseti_la_LIBADD = \
38 $(top_builddir)/src/lib/util/libgnunetutil.la \
39 $(LTLIBINTL)
40libgnunetseti_la_LDFLAGS = \
41 $(GN_LIB_LDFLAGS)
42
43check_PROGRAMS = \
44 # test_seti_api
45
46if ENABLE_TEST_RUN
47AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME;
48TESTS = $(check_PROGRAMS)
49endif
50
51test_seti_api_SOURCES = \
52 test_seti_api.c
53test_seti_api_LDADD = \
54 $(top_builddir)/src/lib/util/libgnunetutil.la \
55 $(top_builddir)/src/service/testing/libgnunettesting.la \
56 libgnunetseti.la
57
58EXTRA_DIST = \
59 test_seti.conf
diff --git a/src/service/seti/gnunet-service-seti.c b/src/service/seti/gnunet-service-seti.c
new file mode 100644
index 000000000..6db24a5b6
--- /dev/null
+++ b/src/service/seti/gnunet-service-seti.c
@@ -0,0 +1,2516 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-seti.c
22 * @brief two-peer set intersection operations
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet-service-seti_protocol.h"
28#include "gnunet_statistics_service.h"
29#include "gnunet_cadet_service.h"
30#include "gnunet_seti_service.h"
31#include "gnunet_block_lib.h"
32#include "seti.h"
33
34/**
35 * How long do we hold on to an incoming channel if there is
36 * no local listener before giving up?
37 */
38#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
39
40
41/**
42 * Current phase we are in for a intersection operation.
43 */
44enum IntersectionOperationPhase
45{
46 /**
47 * We are just starting.
48 */
49 PHASE_INITIAL,
50
51 /**
52 * We have send the number of our elements to the other
53 * peer, but did not setup our element set yet.
54 */
55 PHASE_COUNT_SENT,
56
57 /**
58 * We have initialized our set and are now reducing it by exchanging
59 * Bloom filters until one party notices the their element hashes
60 * are equal.
61 */
62 PHASE_BF_EXCHANGE,
63
64 /**
65 * We must next send the P2P DONE message (after finishing mostly
66 * with the local client). Then we will wait for the channel to close.
67 */
68 PHASE_MUST_SEND_DONE,
69
70 /**
71 * We have received the P2P DONE message, and must finish with the
72 * local client before terminating the channel.
73 */
74 PHASE_DONE_RECEIVED,
75
76 /**
77 * The protocol is over. Results may still have to be sent to the
78 * client.
79 */
80 PHASE_FINISHED
81};
82
83
84/**
85 * A set that supports a specific operation with other peers.
86 */
87struct Set;
88
89/**
90 * Information about an element element in the set. All elements are
91 * stored in a hash-table from their hash-code to their 'struct
92 * Element', so that the remove and add operations are reasonably
93 * fast.
94 */
95struct ElementEntry;
96
97/**
98 * Operation context used to execute a set operation.
99 */
100struct Operation;
101
102
103/**
104 * Information about an element element in the set. All elements are
105 * stored in a hash-table from their hash-code to their `struct
106 * Element`, so that the remove and add operations are reasonably
107 * fast.
108 */
109struct ElementEntry
110{
111 /**
112 * The actual element. The data for the element
113 * should be allocated at the end of this struct.
114 */
115 struct GNUNET_SETI_Element element;
116
117 /**
118 * Hash of the element. For set union: Will be used to derive the
119 * different IBF keys for different salts.
120 */
121 struct GNUNET_HashCode element_hash;
122
123 /**
124 * Generation in which the element was added.
125 */
126 unsigned int generation_added;
127
128 /**
129 * #GNUNET_YES if the element is a remote element, and does not belong
130 * to the operation's set.
131 */
132 int remote;
133};
134
135
136/**
137 * A listener is inhabited by a client, and waits for evaluation
138 * requests from remote peers.
139 */
140struct Listener;
141
142
143/**
144 * State we keep per client.
145 */
146struct ClientState
147{
148 /**
149 * Set, if associated with the client, otherwise NULL.
150 */
151 struct Set *set;
152
153 /**
154 * Listener, if associated with the client, otherwise NULL.
155 */
156 struct Listener *listener;
157
158 /**
159 * Client handle.
160 */
161 struct GNUNET_SERVICE_Client *client;
162
163 /**
164 * Message queue.
165 */
166 struct GNUNET_MQ_Handle *mq;
167};
168
169
170/**
171 * Operation context used to execute a set operation.
172 */
173struct Operation
174{
175 /**
176 * The identity of the requesting peer. Needs to
177 * be stored here as the op spec might not have been created yet.
178 */
179 struct GNUNET_PeerIdentity peer;
180
181 /**
182 * XOR of the keys of all of the elements (remaining) in my set.
183 * Always updated when elements are added or removed to
184 * @e my_elements.
185 */
186 struct GNUNET_HashCode my_xor;
187
188 /**
189 * XOR of the keys of all of the elements (remaining) in
190 * the other peer's set. Updated when we receive the
191 * other peer's Bloom filter.
192 */
193 struct GNUNET_HashCode other_xor;
194
195 /**
196 * Kept in a DLL of the listener, if @e listener is non-NULL.
197 */
198 struct Operation *next;
199
200 /**
201 * Kept in a DLL of the listener, if @e listener is non-NULL.
202 */
203 struct Operation *prev;
204
205 /**
206 * Channel to the peer.
207 */
208 struct GNUNET_CADET_Channel *channel;
209
210 /**
211 * Port this operation runs on.
212 */
213 struct Listener *listener;
214
215 /**
216 * Message queue for the channel.
217 */
218 struct GNUNET_MQ_Handle *mq;
219
220 /**
221 * Context message, may be NULL.
222 */
223 struct GNUNET_MessageHeader *context_msg;
224
225 /**
226 * Set associated with the operation, NULL until the spec has been
227 * associated with a set.
228 */
229 struct Set *set;
230
231 /**
232 * The bf we currently receive
233 */
234 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
235
236 /**
237 * BF of the set's element.
238 */
239 struct GNUNET_CONTAINER_BloomFilter *local_bf;
240
241 /**
242 * Remaining elements in the intersection operation.
243 * Maps element-id-hashes to 'elements in our set'.
244 */
245 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
246
247 /**
248 * Iterator for sending the final set of @e my_elements to the client.
249 */
250 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
251
252 /**
253 * For multipart BF transmissions, we have to store the
254 * bloomfilter-data until we fully received it.
255 */
256 char *bf_data;
257
258 /**
259 * Timeout task, if the incoming peer has not been accepted
260 * after the timeout, it will be disconnected.
261 */
262 struct GNUNET_SCHEDULER_Task *timeout_task;
263
264 /**
265 * How many bytes of @e bf_data are valid?
266 */
267 uint32_t bf_data_offset;
268
269 /**
270 * Current element count contained within @e my_elements.
271 * (May differ briefly during initialization.)
272 */
273 uint32_t my_element_count;
274
275 /**
276 * size of the bloomfilter in @e bf_data.
277 */
278 uint32_t bf_data_size;
279
280 /**
281 * size of the bloomfilter
282 */
283 uint32_t bf_bits_per_element;
284
285 /**
286 * Salt currently used for BF construction (by us or the other peer,
287 * depending on where we are in the code).
288 */
289 uint32_t salt;
290
291 /**
292 * Current state of the operation.
293 */
294 enum IntersectionOperationPhase phase;
295
296 /**
297 * Generation in which the operation handle was created.
298 */
299 unsigned int generation_created;
300
301 /**
302 * Did we send the client that we are done?
303 */
304 int client_done_sent;
305
306 /**
307 * Set whenever we reach the state where the death of the
308 * channel is perfectly find and should NOT result in the
309 * operation being cancelled.
310 */
311 int channel_death_expected;
312
313 /**
314 * Remote peers element count
315 */
316 uint32_t remote_element_count;
317
318 /**
319 * ID used to identify an operation between service and client
320 */
321 uint32_t client_request_id;
322
323 /**
324 * When are elements sent to the client, and which elements are sent?
325 */
326 int return_intersection;
327
328 /**
329 * Unique request id for the request from a remote peer, sent to the
330 * client, which will accept or reject the request. Set to '0' iff
331 * the request has not been suggested yet.
332 */
333 uint32_t suggest_id;
334
335};
336
337
338/**
339 * SetContent stores the actual set elements, which may be shared by
340 * multiple generations derived from one set.
341 */
342struct SetContent
343{
344 /**
345 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
346 */
347 struct GNUNET_CONTAINER_MultiHashMap *elements;
348
349 /**
350 * Number of references to the content.
351 */
352 unsigned int refcount;
353
354 /**
355 * FIXME: document!
356 */
357 unsigned int latest_generation;
358
359 /**
360 * Number of concurrently active iterators.
361 */
362 int iterator_count;
363};
364
365
366/**
367 * A set that supports a specific operation with other peers.
368 */
369struct Set
370{
371 /**
372 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
373 */
374 struct Set *next;
375
376 /**
377 * Sets are held in a doubly linked list.
378 */
379 struct Set *prev;
380
381 /**
382 * Client that owns the set. Only one client may own a set,
383 * and there can only be one set per client.
384 */
385 struct ClientState *cs;
386
387 /**
388 * Content, possibly shared by multiple sets,
389 * and thus reference counted.
390 */
391 struct SetContent *content;
392
393 /**
394 * Number of currently valid elements in the set which have not been
395 * removed.
396 */
397 uint32_t current_set_element_count;
398
399 /**
400 * Evaluate operations are held in a linked list.
401 */
402 struct Operation *ops_head;
403
404 /**
405 * Evaluate operations are held in a linked list.
406 */
407 struct Operation *ops_tail;
408
409 /**
410 * Current generation, that is, number of previously executed
411 * operations and lazy copies on the underlying set content.
412 */
413 unsigned int current_generation;
414
415};
416
417
418/**
419 * A listener is inhabited by a client, and waits for evaluation
420 * requests from remote peers.
421 */
422struct Listener
423{
424 /**
425 * Listeners are held in a doubly linked list.
426 */
427 struct Listener *next;
428
429 /**
430 * Listeners are held in a doubly linked list.
431 */
432 struct Listener *prev;
433
434 /**
435 * Head of DLL of operations this listener is responsible for.
436 * Once the client has accepted/declined the operation, the
437 * operation is moved to the respective set's operation DLLS.
438 */
439 struct Operation *op_head;
440
441 /**
442 * Tail of DLL of operations this listener is responsible for.
443 * Once the client has accepted/declined the operation, the
444 * operation is moved to the respective set's operation DLLS.
445 */
446 struct Operation *op_tail;
447
448 /**
449 * Client that owns the listener.
450 * Only one client may own a listener.
451 */
452 struct ClientState *cs;
453
454 /**
455 * The port we are listening on with CADET.
456 */
457 struct GNUNET_CADET_Port *open_port;
458
459 /**
460 * Application ID for the operation, used to distinguish
461 * multiple operations of the same type with the same peer.
462 */
463 struct GNUNET_HashCode app_id;
464
465};
466
467
468/**
469 * Handle to the cadet service, used to listen for and connect to
470 * remote peers.
471 */
472static struct GNUNET_CADET_Handle *cadet;
473
474/**
475 * Statistics handle.
476 */
477static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
478
479/**
480 * Listeners are held in a doubly linked list.
481 */
482static struct Listener *listener_head;
483
484/**
485 * Listeners are held in a doubly linked list.
486 */
487static struct Listener *listener_tail;
488
489/**
490 * Number of active clients.
491 */
492static unsigned int num_clients;
493
494/**
495 * Are we in shutdown? if #GNUNET_YES and the number of clients
496 * drops to zero, disconnect from CADET.
497 */
498static int in_shutdown;
499
500/**
501 * Counter for allocating unique IDs for clients, used to identify
502 * incoming operation requests from remote peers, that the client can
503 * choose to accept or refuse. 0 must not be used (reserved for
504 * uninitialized).
505 */
506static uint32_t suggest_id;
507
508
509/**
510 * If applicable in the current operation mode, send a result message
511 * to the client indicating we removed an element.
512 *
513 * @param op intersection operation
514 * @param element element to send
515 */
516static void
517send_client_removed_element (struct Operation *op,
518 struct GNUNET_SETI_Element *element)
519{
520 struct GNUNET_MQ_Envelope *ev;
521 struct GNUNET_SETI_ResultMessage *rm;
522
523 if (GNUNET_YES == op->return_intersection)
524 {
525 GNUNET_break (0);
526 return; /* Wrong mode for transmitting removed elements */
527 }
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529 "Sending removed element (size %u) to client\n",
530 element->size);
531 GNUNET_STATISTICS_update (_GSS_statistics,
532 "# Element removed messages sent",
533 1,
534 GNUNET_NO);
535 GNUNET_assert (0 != op->client_request_id);
536 ev = GNUNET_MQ_msg_extra (rm,
537 element->size,
538 GNUNET_MESSAGE_TYPE_SETI_RESULT);
539 if (NULL == ev)
540 {
541 GNUNET_break (0);
542 return;
543 }
544 rm->result_status = htons (GNUNET_SETI_STATUS_DEL_LOCAL);
545 rm->request_id = htonl (op->client_request_id);
546 rm->element_type = element->element_type;
547 GNUNET_memcpy (&rm[1],
548 element->data,
549 element->size);
550 GNUNET_MQ_send (op->set->cs->mq,
551 ev);
552}
553
554
555/**
556 * Is element @a ee part of the set used by @a op?
557 *
558 * @param ee element to test
559 * @param op operation the defines the set and its generation
560 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
561 */
562static int
563_GSS_is_element_of_operation (struct ElementEntry *ee,
564 struct Operation *op)
565{
566 return op->generation_created >= ee->generation_added;
567}
568
569
570/**
571 * Fills the "my_elements" hashmap with all relevant elements.
572 *
573 * @param cls the `struct Operation *` we are performing
574 * @param key current key code
575 * @param value the `struct ElementEntry *` from the hash map
576 * @return #GNUNET_YES (we should continue to iterate)
577 */
578static int
579filtered_map_initialization (void *cls,
580 const struct GNUNET_HashCode *key,
581 void *value)
582{
583 struct Operation *op = cls;
584 struct ElementEntry *ee = value;
585 struct GNUNET_HashCode mutated_hash;
586
587 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588 "FIMA called for %s:%u\n",
589 GNUNET_h2s (&ee->element_hash),
590 ee->element.size);
591
592 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
593 {
594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
596 GNUNET_h2s (&ee->element_hash),
597 ee->element.size);
598 return GNUNET_YES; /* element not valid in our operation's generation */
599 }
600
601 /* Test if element is in other peer's bloomfilter */
602 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
603 op->salt,
604 &mutated_hash);
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "Testing mingled hash %s with salt %u\n",
607 GNUNET_h2s (&mutated_hash),
608 op->salt);
609 if (GNUNET_NO ==
610 GNUNET_CONTAINER_bloomfilter_test (op->remote_bf,
611 &mutated_hash))
612 {
613 /* remove this element */
614 send_client_removed_element (op,
615 &ee->element);
616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617 "Reduced initialization, not starting with %s:%u\n",
618 GNUNET_h2s (&ee->element_hash),
619 ee->element.size);
620 return GNUNET_YES;
621 }
622 op->my_element_count++;
623 GNUNET_CRYPTO_hash_xor (&op->my_xor,
624 &ee->element_hash,
625 &op->my_xor);
626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627 "Filtered initialization of my_elements, adding %s:%u\n",
628 GNUNET_h2s (&ee->element_hash),
629 ee->element.size);
630 GNUNET_break (GNUNET_YES ==
631 GNUNET_CONTAINER_multihashmap_put (op->my_elements,
632 &ee->element_hash,
633 ee,
634 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
635
636 return GNUNET_YES;
637}
638
639
640/**
641 * Removes elements from our hashmap if they are not contained within the
642 * provided remote bloomfilter.
643 *
644 * @param cls closure with the `struct Operation *`
645 * @param key current key code
646 * @param value value in the hash map
647 * @return #GNUNET_YES (we should continue to iterate)
648 */
649static int
650iterator_bf_reduce (void *cls,
651 const struct GNUNET_HashCode *key,
652 void *value)
653{
654 struct Operation *op = cls;
655 struct ElementEntry *ee = value;
656 struct GNUNET_HashCode mutated_hash;
657
658 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
659 op->salt,
660 &mutated_hash);
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662 "Testing mingled hash %s with salt %u\n",
663 GNUNET_h2s (&mutated_hash),
664 op->salt);
665 if (GNUNET_NO ==
666 GNUNET_CONTAINER_bloomfilter_test (op->remote_bf,
667 &mutated_hash))
668 {
669 GNUNET_break (0 < op->my_element_count);
670 op->my_element_count--;
671 GNUNET_CRYPTO_hash_xor (&op->my_xor,
672 &ee->element_hash,
673 &op->my_xor);
674 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
675 "Bloom filter reduction of my_elements, removing %s:%u\n",
676 GNUNET_h2s (&ee->element_hash),
677 ee->element.size);
678 GNUNET_assert (GNUNET_YES ==
679 GNUNET_CONTAINER_multihashmap_remove (op->my_elements,
680 &ee->element_hash,
681 ee));
682 send_client_removed_element (op,
683 &ee->element);
684 }
685 else
686 {
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "Bloom filter reduction of my_elements, keeping %s:%u\n",
689 GNUNET_h2s (&ee->element_hash),
690 ee->element.size);
691 }
692 return GNUNET_YES;
693}
694
695
696/**
697 * Create initial bloomfilter based on all the elements given.
698 *
699 * @param cls the `struct Operation *`
700 * @param key current key code
701 * @param value the `struct ElementEntry` to process
702 * @return #GNUNET_YES (we should continue to iterate)
703 */
704static int
705iterator_bf_create (void *cls,
706 const struct GNUNET_HashCode *key,
707 void *value)
708{
709 struct Operation *op = cls;
710 struct ElementEntry *ee = value;
711 struct GNUNET_HashCode mutated_hash;
712
713 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
714 op->salt,
715 &mutated_hash);
716 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
717 "Initializing BF with hash %s with salt %u\n",
718 GNUNET_h2s (&mutated_hash),
719 op->salt);
720 GNUNET_CONTAINER_bloomfilter_add (op->local_bf,
721 &mutated_hash);
722 return GNUNET_YES;
723}
724
725
726/**
727 * Destroy the given operation. Used for any operation where both
728 * peers were known and that thus actually had a vt and channel. Must
729 * not be used for operations where 'listener' is still set and we do
730 * not know the other peer.
731 *
732 * Call the implementation-specific cancel function of the operation.
733 * Disconnects from the remote peer. Does not disconnect the client,
734 * as there may be multiple operations per set.
735 *
736 * @param op operation to destroy
737 */
738static void
739_GSS_operation_destroy (struct Operation *op)
740{
741 struct Set *set = op->set;
742 struct GNUNET_CADET_Channel *channel;
743
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
745 GNUNET_assert (NULL == op->listener);
746 if (NULL != op->remote_bf)
747 {
748 GNUNET_CONTAINER_bloomfilter_free (op->remote_bf);
749 op->remote_bf = NULL;
750 }
751 if (NULL != op->local_bf)
752 {
753 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
754 op->local_bf = NULL;
755 }
756 if (NULL != op->my_elements)
757 {
758 GNUNET_CONTAINER_multihashmap_destroy (op->my_elements);
759 op->my_elements = NULL;
760 }
761 if (NULL != op->full_result_iter)
762 {
763 GNUNET_CONTAINER_multihashmap_iterator_destroy (
764 op->full_result_iter);
765 op->full_result_iter = NULL;
766 }
767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
768 "Destroying intersection op state done\n");
769 if (NULL != set)
770 {
771 GNUNET_CONTAINER_DLL_remove (set->ops_head,
772 set->ops_tail,
773 op);
774 op->set = NULL;
775 }
776 if (NULL != op->context_msg)
777 {
778 GNUNET_free (op->context_msg);
779 op->context_msg = NULL;
780 }
781 if (NULL != (channel = op->channel))
782 {
783 /* This will free op; called conditionally as this helper function
784 is also called from within the channel disconnect handler. */
785 op->channel = NULL;
786 GNUNET_CADET_channel_destroy (channel);
787 }
788 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
789 * there was a channel end handler that will free 'op' on the call stack. */
790}
791
792
793/**
794 * This function probably should not exist
795 * and be replaced by inlining more specific
796 * logic in the various places where it is called.
797 */
798static void
799_GSS_operation_destroy2 (struct Operation *op);
800
801
802/**
803 * Destroy an incoming request from a remote peer
804 *
805 * @param op remote request to destroy
806 */
807static void
808incoming_destroy (struct Operation *op)
809{
810 struct Listener *listener;
811
812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813 "Destroying incoming operation %p\n",
814 op);
815 if (NULL != (listener = op->listener))
816 {
817 GNUNET_CONTAINER_DLL_remove (listener->op_head,
818 listener->op_tail,
819 op);
820 op->listener = NULL;
821 }
822 if (NULL != op->timeout_task)
823 {
824 GNUNET_SCHEDULER_cancel (op->timeout_task);
825 op->timeout_task = NULL;
826 }
827 _GSS_operation_destroy2 (op);
828}
829
830
831/**
832 * Signal to the client that the operation has finished and
833 * destroy the operation.
834 *
835 * @param cls operation to destroy
836 */
837static void
838send_client_done_and_destroy (void *cls)
839{
840 struct Operation *op = cls;
841 struct GNUNET_MQ_Envelope *ev;
842 struct GNUNET_SETI_ResultMessage *rm;
843
844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845 "Intersection succeeded, sending DONE to local client\n");
846 GNUNET_STATISTICS_update (_GSS_statistics,
847 "# Intersection operations succeeded",
848 1,
849 GNUNET_NO);
850 ev = GNUNET_MQ_msg (rm,
851 GNUNET_MESSAGE_TYPE_SETI_RESULT);
852 rm->request_id = htonl (op->client_request_id);
853 rm->result_status = htons (GNUNET_SETI_STATUS_DONE);
854 rm->element_type = htons (0);
855 GNUNET_MQ_send (op->set->cs->mq,
856 ev);
857 _GSS_operation_destroy (op);
858}
859
860
861/**
862 * This function probably should not exist
863 * and be replaced by inlining more specific
864 * logic in the various places where it is called.
865 */
866static void
867_GSS_operation_destroy2 (struct Operation *op)
868{
869 struct GNUNET_CADET_Channel *channel;
870
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "channel_end_cb called\n");
873 if (NULL != (channel = op->channel))
874 {
875 /* This will free op; called conditionally as this helper function
876 is also called from within the channel disconnect handler. */
877 op->channel = NULL;
878 GNUNET_CADET_channel_destroy (channel);
879 }
880 if (NULL != op->listener)
881 {
882 incoming_destroy (op);
883 return;
884 }
885 if (NULL != op->set)
886 {
887 if (GNUNET_YES == op->channel_death_expected)
888 {
889 /* oh goodie, we are done! */
890 send_client_done_and_destroy (op);
891 }
892 else
893 {
894 /* sorry, channel went down early, too bad. */
895 _GSS_operation_destroy (op);
896 }
897 }
898 else
899 _GSS_operation_destroy (op);
900 GNUNET_free (op);
901}
902
903
904/**
905 * Inform the client that the intersection operation has failed,
906 * and proceed to destroy the evaluate operation.
907 *
908 * @param op the intersection operation to fail
909 */
910static void
911fail_intersection_operation (struct Operation *op)
912{
913 struct GNUNET_MQ_Envelope *ev;
914 struct GNUNET_SETI_ResultMessage *msg;
915
916 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
917 "Intersection operation failed\n");
918 GNUNET_STATISTICS_update (_GSS_statistics,
919 "# Intersection operations failed",
920 1,
921 GNUNET_NO);
922 if (NULL != op->my_elements)
923 {
924 GNUNET_CONTAINER_multihashmap_destroy (op->my_elements);
925 op->my_elements = NULL;
926 }
927 ev = GNUNET_MQ_msg (msg,
928 GNUNET_MESSAGE_TYPE_SETI_RESULT);
929 msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
930 msg->request_id = htonl (op->client_request_id);
931 msg->element_type = htons (0);
932 GNUNET_MQ_send (op->set->cs->mq,
933 ev);
934 _GSS_operation_destroy (op);
935}
936
937
938/**
939 * Send a bloomfilter to our peer. After the result done message has
940 * been sent to the client, destroy the evaluate operation.
941 *
942 * @param op intersection operation
943 */
944static void
945send_bloomfilter (struct Operation *op)
946{
947 struct GNUNET_MQ_Envelope *ev;
948 struct BFMessage *msg;
949 uint32_t bf_size;
950 uint32_t bf_elementbits;
951 uint32_t chunk_size;
952 char *bf_data;
953 uint32_t offset;
954
955 /* We consider the ratio of the set sizes to determine
956 the number of bits per element, as the smaller set
957 should use more bits to maximize its set reduction
958 potential and minimize overall bandwidth consumption. */
959 bf_elementbits = 2 + ceil (log2 ((double)
960 (op->remote_element_count
961 / (double) op->my_element_count)));
962 if (bf_elementbits < 1)
963 bf_elementbits = 1; /* make sure k is not 0 */
964 /* optimize BF-size to ~50% of bits set */
965 bf_size = ceil ((double) (op->my_element_count
966 * bf_elementbits / log (2)));
967 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968 "Sending Bloom filter (%u) of size %u bytes\n",
969 (unsigned int) bf_elementbits,
970 (unsigned int) bf_size);
971 op->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
972 bf_size,
973 bf_elementbits);
974 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
975 UINT32_MAX);
976 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
977 &iterator_bf_create,
978 op);
979
980 /* send our Bloom filter */
981 GNUNET_STATISTICS_update (_GSS_statistics,
982 "# Intersection Bloom filters sent",
983 1,
984 GNUNET_NO);
985 chunk_size = 60 * 1024 - sizeof(struct BFMessage);
986 if (bf_size <= chunk_size)
987 {
988 /* singlepart */
989 chunk_size = bf_size;
990 ev = GNUNET_MQ_msg_extra (msg,
991 chunk_size,
992 GNUNET_MESSAGE_TYPE_SETI_P2P_BF);
993 GNUNET_assert (GNUNET_SYSERR !=
994 GNUNET_CONTAINER_bloomfilter_get_raw_data (
995 op->local_bf,
996 (char *) &msg[1],
997 bf_size));
998 msg->sender_element_count = htonl (op->my_element_count);
999 msg->bloomfilter_total_length = htonl (bf_size);
1000 msg->bits_per_element = htonl (bf_elementbits);
1001 msg->sender_mutator = htonl (op->salt);
1002 msg->element_xor_hash = op->my_xor;
1003 GNUNET_MQ_send (op->mq, ev);
1004 }
1005 else
1006 {
1007 /* multipart */
1008 bf_data = GNUNET_malloc (bf_size);
1009 GNUNET_assert (GNUNET_SYSERR !=
1010 GNUNET_CONTAINER_bloomfilter_get_raw_data (
1011 op->local_bf,
1012 bf_data,
1013 bf_size));
1014 offset = 0;
1015 while (offset < bf_size)
1016 {
1017 if (bf_size - chunk_size < offset)
1018 chunk_size = bf_size - offset;
1019 ev = GNUNET_MQ_msg_extra (msg,
1020 chunk_size,
1021 GNUNET_MESSAGE_TYPE_SETI_P2P_BF);
1022 GNUNET_memcpy (&msg[1],
1023 &bf_data[offset],
1024 chunk_size);
1025 offset += chunk_size;
1026 msg->sender_element_count = htonl (op->my_element_count);
1027 msg->bloomfilter_total_length = htonl (bf_size);
1028 msg->bits_per_element = htonl (bf_elementbits);
1029 msg->sender_mutator = htonl (op->salt);
1030 msg->element_xor_hash = op->my_xor;
1031 GNUNET_MQ_send (op->mq, ev);
1032 }
1033 GNUNET_free (bf_data);
1034 }
1035 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
1036 op->local_bf = NULL;
1037}
1038
1039
1040/**
1041 * Remember that we are done dealing with the local client
1042 * AND have sent the other peer our message that we are done,
1043 * so we are not just waiting for the channel to die before
1044 * telling the local client that we are done as our last act.
1045 *
1046 * @param cls the `struct Operation`.
1047 */
1048static void
1049finished_local_operations (void *cls)
1050{
1051 struct Operation *op = cls;
1052
1053 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054 "DONE sent to other peer, now waiting for other end to close the channel\n");
1055 op->phase = PHASE_FINISHED;
1056 op->channel_death_expected = GNUNET_YES;
1057}
1058
1059
1060/**
1061 * Notify the other peer that we are done. Once this message
1062 * is out, we still need to notify the local client that we
1063 * are done.
1064 *
1065 * @param op operation to notify for.
1066 */
1067static void
1068send_p2p_done (struct Operation *op)
1069{
1070 struct GNUNET_MQ_Envelope *ev;
1071 struct IntersectionDoneMessage *idm;
1072
1073 GNUNET_assert (PHASE_MUST_SEND_DONE == op->phase);
1074 GNUNET_assert (GNUNET_NO == op->channel_death_expected);
1075 ev = GNUNET_MQ_msg (idm,
1076 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE);
1077 idm->final_element_count = htonl (op->my_element_count);
1078 idm->element_xor_hash = op->my_xor;
1079 GNUNET_MQ_notify_sent (ev,
1080 &finished_local_operations,
1081 op);
1082 GNUNET_MQ_send (op->mq,
1083 ev);
1084}
1085
1086
1087/**
1088 * Send all elements in the full result iterator.
1089 *
1090 * @param cls the `struct Operation *`
1091 */
1092static void
1093send_remaining_elements (void *cls)
1094{
1095 struct Operation *op = cls;
1096 const void *nxt;
1097 const struct ElementEntry *ee;
1098 struct GNUNET_MQ_Envelope *ev;
1099 struct GNUNET_SETI_ResultMessage *rm;
1100 const struct GNUNET_SETI_Element *element;
1101 int res;
1102
1103 if (GNUNET_NO == op->return_intersection)
1104 {
1105 GNUNET_break (0);
1106 return; /* Wrong mode for transmitting removed elements */
1107 }
1108 res = GNUNET_CONTAINER_multihashmap_iterator_next (
1109 op->full_result_iter,
1110 NULL,
1111 &nxt);
1112 if (GNUNET_NO == res)
1113 {
1114 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115 "Sending done and destroy because iterator ran out\n");
1116 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1117 op->full_result_iter);
1118 op->full_result_iter = NULL;
1119 if (PHASE_DONE_RECEIVED == op->phase)
1120 {
1121 op->phase = PHASE_FINISHED;
1122 send_client_done_and_destroy (op);
1123 }
1124 else if (PHASE_MUST_SEND_DONE == op->phase)
1125 {
1126 send_p2p_done (op);
1127 }
1128 else
1129 {
1130 GNUNET_assert (0);
1131 }
1132 return;
1133 }
1134 ee = nxt;
1135 element = &ee->element;
1136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137 "Sending element %s:%u to client (full set)\n",
1138 GNUNET_h2s (&ee->element_hash),
1139 element->size);
1140 GNUNET_assert (0 != op->client_request_id);
1141 ev = GNUNET_MQ_msg_extra (rm,
1142 element->size,
1143 GNUNET_MESSAGE_TYPE_SETI_RESULT);
1144 GNUNET_assert (NULL != ev);
1145 rm->result_status = htons (GNUNET_SETI_STATUS_ADD_LOCAL);
1146 rm->request_id = htonl (op->client_request_id);
1147 rm->element_type = element->element_type;
1148 GNUNET_memcpy (&rm[1],
1149 element->data,
1150 element->size);
1151 GNUNET_MQ_notify_sent (ev,
1152 &send_remaining_elements,
1153 op);
1154 GNUNET_MQ_send (op->set->cs->mq,
1155 ev);
1156}
1157
1158
1159/**
1160 * Fills the "my_elements" hashmap with the initial set of
1161 * (non-deleted) elements from the set of the specification.
1162 *
1163 * @param cls closure with the `struct Operation *`
1164 * @param key current key code for the element
1165 * @param value value in the hash map with the `struct ElementEntry *`
1166 * @return #GNUNET_YES (we should continue to iterate)
1167 */
1168static int
1169initialize_map_unfiltered (void *cls,
1170 const struct GNUNET_HashCode *key,
1171 void *value)
1172{
1173 struct ElementEntry *ee = value;
1174 struct Operation *op = cls;
1175
1176 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1177 return GNUNET_YES; /* element not live in operation's generation */
1178 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1179 &ee->element_hash,
1180 &op->my_xor);
1181 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182 "Initial full initialization of my_elements, adding %s:%u\n",
1183 GNUNET_h2s (&ee->element_hash),
1184 ee->element.size);
1185 GNUNET_break (GNUNET_YES ==
1186 GNUNET_CONTAINER_multihashmap_put (op->my_elements,
1187 &ee->element_hash,
1188 ee,
1189 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1190 return GNUNET_YES;
1191}
1192
1193
1194/**
1195 * Send our element count to the peer, in case our element count is
1196 * lower than theirs.
1197 *
1198 * @param op intersection operation
1199 */
1200static void
1201send_element_count (struct Operation *op)
1202{
1203 struct GNUNET_MQ_Envelope *ev;
1204 struct IntersectionElementInfoMessage *msg;
1205
1206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207 "Sending our element count (%u)\n",
1208 op->my_element_count);
1209 ev = GNUNET_MQ_msg (msg,
1210 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO);
1211 msg->sender_element_count = htonl (op->my_element_count);
1212 GNUNET_MQ_send (op->mq, ev);
1213}
1214
1215
1216/**
1217 * We go first, initialize our map with all elements and
1218 * send the first Bloom filter.
1219 *
1220 * @param op operation to start exchange for
1221 */
1222static void
1223begin_bf_exchange (struct Operation *op)
1224{
1225 op->phase = PHASE_BF_EXCHANGE;
1226 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1227 &initialize_map_unfiltered,
1228 op);
1229 send_bloomfilter (op);
1230}
1231
1232
1233/**
1234 * Handle the initial `struct IntersectionElementInfoMessage` from a
1235 * remote peer.
1236 *
1237 * @param cls the intersection operation
1238 * @param mh the header of the message
1239 */
1240static void
1241handle_intersection_p2p_element_info (void *cls,
1242 const struct
1243 IntersectionElementInfoMessage *msg)
1244{
1245 struct Operation *op = cls;
1246
1247 op->remote_element_count = ntohl (msg->sender_element_count);
1248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1249 "Received remote element count (%u), I have %u\n",
1250 op->remote_element_count,
1251 op->my_element_count);
1252 if (((PHASE_INITIAL != op->phase) &&
1253 (PHASE_COUNT_SENT != op->phase)) ||
1254 (op->my_element_count > op->remote_element_count) ||
1255 (0 == op->my_element_count) ||
1256 (0 == op->remote_element_count))
1257 {
1258 GNUNET_break_op (0);
1259 fail_intersection_operation (op);
1260 return;
1261 }
1262 GNUNET_break (NULL == op->remote_bf);
1263 begin_bf_exchange (op);
1264 GNUNET_CADET_receive_done (op->channel);
1265}
1266
1267
1268/**
1269 * Process a Bloomfilter once we got all the chunks.
1270 *
1271 * @param op the intersection operation
1272 */
1273static void
1274process_bf (struct Operation *op)
1275{
1276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
1278 op->phase,
1279 op->remote_element_count,
1280 op->my_element_count,
1281 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
1282 switch (op->phase)
1283 {
1284 case PHASE_INITIAL:
1285 GNUNET_break_op (0);
1286 fail_intersection_operation (op);
1287 return;
1288 case PHASE_COUNT_SENT:
1289 /* This is the first BF being sent, build our initial map with
1290 filtering in place */
1291 op->my_element_count = 0;
1292 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1293 &filtered_map_initialization,
1294 op);
1295 break;
1296 case PHASE_BF_EXCHANGE:
1297 /* Update our set by reduction */
1298 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
1299 &iterator_bf_reduce,
1300 op);
1301 break;
1302 case PHASE_MUST_SEND_DONE:
1303 GNUNET_break_op (0);
1304 fail_intersection_operation (op);
1305 return;
1306 case PHASE_DONE_RECEIVED:
1307 GNUNET_break_op (0);
1308 fail_intersection_operation (op);
1309 return;
1310 case PHASE_FINISHED:
1311 GNUNET_break_op (0);
1312 fail_intersection_operation (op);
1313 return;
1314 }
1315 GNUNET_CONTAINER_bloomfilter_free (op->remote_bf);
1316 op->remote_bf = NULL;
1317
1318 if ((0 == op->my_element_count) || /* fully disjoint */
1319 ((op->my_element_count == op->remote_element_count) &&
1320 (0 == GNUNET_memcmp (&op->my_xor,
1321 &op->other_xor))))
1322 {
1323 /* we are done */
1324 op->phase = PHASE_MUST_SEND_DONE;
1325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326 "Intersection succeeded, sending DONE to other peer\n");
1327 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
1328 op->local_bf = NULL;
1329 if (GNUNET_YES == op->return_intersection)
1330 {
1331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332 "Sending full result set (%u elements)\n",
1333 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1334 op->full_result_iter
1335 = GNUNET_CONTAINER_multihashmap_iterator_create (
1336 op->my_elements);
1337 send_remaining_elements (op);
1338 return;
1339 }
1340 send_p2p_done (op);
1341 return;
1342 }
1343 op->phase = PHASE_BF_EXCHANGE;
1344 send_bloomfilter (op);
1345}
1346
1347
1348/**
1349 * Check an BF message from a remote peer.
1350 *
1351 * @param cls the intersection operation
1352 * @param msg the header of the message
1353 * @return #GNUNET_OK if @a msg is well-formed
1354 */
1355static int
1356check_intersection_p2p_bf (void *cls,
1357 const struct BFMessage *msg)
1358{
1359 struct Operation *op = cls;
1360
1361 (void) op;
1362 return GNUNET_OK;
1363}
1364
1365
1366/**
1367 * Handle an BF message from a remote peer.
1368 *
1369 * @param cls the intersection operation
1370 * @param msg the header of the message
1371 */
1372static void
1373handle_intersection_p2p_bf (void *cls,
1374 const struct BFMessage *msg)
1375{
1376 struct Operation *op = cls;
1377 uint32_t bf_size;
1378 uint32_t chunk_size;
1379 uint32_t bf_bits_per_element;
1380
1381 switch (op->phase)
1382 {
1383 case PHASE_INITIAL:
1384 GNUNET_break_op (0);
1385 fail_intersection_operation (op);
1386 return;
1387
1388 case PHASE_COUNT_SENT:
1389 case PHASE_BF_EXCHANGE:
1390 bf_size = ntohl (msg->bloomfilter_total_length);
1391 bf_bits_per_element = ntohl (msg->bits_per_element);
1392 chunk_size = htons (msg->header.size) - sizeof(struct BFMessage);
1393 op->other_xor = msg->element_xor_hash;
1394 if (bf_size == chunk_size)
1395 {
1396 if (NULL != op->bf_data)
1397 {
1398 GNUNET_break_op (0);
1399 fail_intersection_operation (op);
1400 return;
1401 }
1402 /* single part, done here immediately */
1403 op->remote_bf
1404 = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1],
1405 bf_size,
1406 bf_bits_per_element);
1407 op->salt = ntohl (msg->sender_mutator);
1408 op->remote_element_count = ntohl (msg->sender_element_count);
1409 process_bf (op);
1410 break;
1411 }
1412 /* multipart chunk */
1413 if (NULL == op->bf_data)
1414 {
1415 /* first chunk, initialize */
1416 op->bf_data = GNUNET_malloc (bf_size);
1417 op->bf_data_size = bf_size;
1418 op->bf_bits_per_element = bf_bits_per_element;
1419 op->bf_data_offset = 0;
1420 op->salt = ntohl (msg->sender_mutator);
1421 op->remote_element_count = ntohl (msg->sender_element_count);
1422 }
1423 else
1424 {
1425 /* increment */
1426 if ((op->bf_data_size != bf_size) ||
1427 (op->bf_bits_per_element != bf_bits_per_element) ||
1428 (op->bf_data_offset + chunk_size > bf_size) ||
1429 (op->salt != ntohl (msg->sender_mutator)) ||
1430 (op->remote_element_count != ntohl (msg->sender_element_count)))
1431 {
1432 GNUNET_break_op (0);
1433 fail_intersection_operation (op);
1434 return;
1435 }
1436 }
1437 GNUNET_memcpy (&op->bf_data[op->bf_data_offset],
1438 (const char *) &msg[1],
1439 chunk_size);
1440 op->bf_data_offset += chunk_size;
1441 if (op->bf_data_offset == bf_size)
1442 {
1443 /* last chunk, run! */
1444 op->remote_bf
1445 = GNUNET_CONTAINER_bloomfilter_init (op->bf_data,
1446 bf_size,
1447 bf_bits_per_element);
1448 GNUNET_free (op->bf_data);
1449 op->bf_data = NULL;
1450 op->bf_data_size = 0;
1451 process_bf (op);
1452 }
1453 break;
1454
1455 default:
1456 GNUNET_break_op (0);
1457 fail_intersection_operation (op);
1458 return;
1459 }
1460 GNUNET_CADET_receive_done (op->channel);
1461}
1462
1463
1464/**
1465 * Remove all elements from our hashmap.
1466 *
1467 * @param cls closure with the `struct Operation *`
1468 * @param key current key code
1469 * @param value value in the hash map
1470 * @return #GNUNET_YES (we should continue to iterate)
1471 */
1472static int
1473filter_all (void *cls,
1474 const struct GNUNET_HashCode *key,
1475 void *value)
1476{
1477 struct Operation *op = cls;
1478 struct ElementEntry *ee = value;
1479
1480 GNUNET_break (0 < op->my_element_count);
1481 op->my_element_count--;
1482 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1483 &ee->element_hash,
1484 &op->my_xor);
1485 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1486 "Final reduction of my_elements, removing %s:%u\n",
1487 GNUNET_h2s (&ee->element_hash),
1488 ee->element.size);
1489 GNUNET_assert (GNUNET_YES ==
1490 GNUNET_CONTAINER_multihashmap_remove (op->my_elements,
1491 &ee->element_hash,
1492 ee));
1493 send_client_removed_element (op,
1494 &ee->element);
1495 return GNUNET_YES;
1496}
1497
1498
1499/**
1500 * Handle a done message from a remote peer
1501 *
1502 * @param cls the intersection operation
1503 * @param mh the message
1504 */
1505static void
1506handle_intersection_p2p_done (void *cls,
1507 const struct IntersectionDoneMessage *idm)
1508{
1509 struct Operation *op = cls;
1510
1511 if (PHASE_BF_EXCHANGE != op->phase)
1512 {
1513 /* wrong phase to conclude? FIXME: Or should we allow this
1514 if the other peer has _initially_ already an empty set? */
1515 GNUNET_break_op (0);
1516 fail_intersection_operation (op);
1517 return;
1518 }
1519 if (0 == ntohl (idm->final_element_count))
1520 {
1521 /* other peer determined empty set is the intersection,
1522 remove all elements */
1523 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
1524 &filter_all,
1525 op);
1526 }
1527 if ((op->my_element_count != ntohl (idm->final_element_count)) ||
1528 (0 != GNUNET_memcmp (&op->my_xor,
1529 &idm->element_xor_hash)))
1530 {
1531 /* Other peer thinks we are done, but we disagree on the result! */
1532 GNUNET_break_op (0);
1533 fail_intersection_operation (op);
1534 return;
1535 }
1536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1538 op->my_element_count);
1539 op->phase = PHASE_DONE_RECEIVED;
1540 GNUNET_CADET_receive_done (op->channel);
1541
1542 GNUNET_assert (GNUNET_NO == op->client_done_sent);
1543 if (GNUNET_YES == op->return_intersection)
1544 {
1545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546 "Sending full result set to client (%u elements)\n",
1547 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1548 op->full_result_iter
1549 = GNUNET_CONTAINER_multihashmap_iterator_create (op->my_elements);
1550 send_remaining_elements (op);
1551 return;
1552 }
1553 op->phase = PHASE_FINISHED;
1554 send_client_done_and_destroy (op);
1555}
1556
1557
1558/**
1559 * Get the incoming socket associated with the given id.
1560 *
1561 * @param listener the listener to look in
1562 * @param id id to look for
1563 * @return the incoming socket associated with the id,
1564 * or NULL if there is none
1565 */
1566static struct Operation *
1567get_incoming (uint32_t id)
1568{
1569 for (struct Listener *listener = listener_head; NULL != listener;
1570 listener = listener->next)
1571 {
1572 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
1573 if (op->suggest_id == id)
1574 return op;
1575 }
1576 return NULL;
1577}
1578
1579
1580/**
1581 * Callback called when a client connects to the service.
1582 *
1583 * @param cls closure for the service
1584 * @param c the new client that connected to the service
1585 * @param mq the message queue used to send messages to the client
1586 * @return @a `struct ClientState`
1587 */
1588static void *
1589client_connect_cb (void *cls,
1590 struct GNUNET_SERVICE_Client *c,
1591 struct GNUNET_MQ_Handle *mq)
1592{
1593 struct ClientState *cs;
1594
1595 num_clients++;
1596 cs = GNUNET_new (struct ClientState);
1597 cs->client = c;
1598 cs->mq = mq;
1599 return cs;
1600}
1601
1602
1603/**
1604 * Iterator over hash map entries to free element entries.
1605 *
1606 * @param cls closure
1607 * @param key current key code
1608 * @param value a `struct ElementEntry *` to be free'd
1609 * @return #GNUNET_YES (continue to iterate)
1610 */
1611static int
1612destroy_elements_iterator (void *cls,
1613 const struct GNUNET_HashCode *key,
1614 void *value)
1615{
1616 struct ElementEntry *ee = value;
1617
1618 GNUNET_free (ee);
1619 return GNUNET_YES;
1620}
1621
1622
1623/**
1624 * Clean up after a client has disconnected
1625 *
1626 * @param cls closure, unused
1627 * @param client the client to clean up after
1628 * @param internal_cls the `struct ClientState`
1629 */
1630static void
1631client_disconnect_cb (void *cls,
1632 struct GNUNET_SERVICE_Client *client,
1633 void *internal_cls)
1634{
1635 struct ClientState *cs = internal_cls;
1636 struct Operation *op;
1637 struct Listener *listener;
1638 struct Set *set;
1639
1640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
1641 if (NULL != (set = cs->set))
1642 {
1643 struct SetContent *content = set->content;
1644
1645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
1646 /* Destroy pending set operations */
1647 while (NULL != set->ops_head)
1648 _GSS_operation_destroy (set->ops_head);
1649
1650 /* free set content (or at least decrement RC) */
1651 set->content = NULL;
1652 GNUNET_assert (0 != content->refcount);
1653 content->refcount--;
1654 if (0 == content->refcount)
1655 {
1656 GNUNET_assert (NULL != content->elements);
1657 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
1658 &destroy_elements_iterator,
1659 NULL);
1660 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
1661 content->elements = NULL;
1662 GNUNET_free (content);
1663 }
1664 GNUNET_free (set);
1665 }
1666
1667 if (NULL != (listener = cs->listener))
1668 {
1669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
1670 GNUNET_CADET_close_port (listener->open_port);
1671 listener->open_port = NULL;
1672 while (NULL != (op = listener->op_head))
1673 {
1674 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1675 "Destroying incoming operation `%u' from peer `%s'\n",
1676 (unsigned int) op->client_request_id,
1677 GNUNET_i2s (&op->peer));
1678 incoming_destroy (op);
1679 }
1680 GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
1681 GNUNET_free (listener);
1682 }
1683 GNUNET_free (cs);
1684 num_clients--;
1685 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
1686 {
1687 if (NULL != cadet)
1688 {
1689 GNUNET_CADET_disconnect (cadet);
1690 cadet = NULL;
1691 }
1692 }
1693}
1694
1695
1696/**
1697 * Check a request for a set operation from another peer.
1698 *
1699 * @param cls the operation state
1700 * @param msg the received message
1701 * @return #GNUNET_OK if the channel should be kept alive,
1702 * #GNUNET_SYSERR to destroy the channel
1703 */
1704static int
1705check_incoming_msg (void *cls,
1706 const struct OperationRequestMessage *msg)
1707{
1708 struct Operation *op = cls;
1709 struct Listener *listener = op->listener;
1710 const struct GNUNET_MessageHeader *nested_context;
1711
1712 /* double operation request */
1713 if (0 != op->suggest_id)
1714 {
1715 GNUNET_break_op (0);
1716 return GNUNET_SYSERR;
1717 }
1718 /* This should be equivalent to the previous condition, but can't hurt to check twice */
1719 if (NULL == listener)
1720 {
1721 GNUNET_break (0);
1722 return GNUNET_SYSERR;
1723 }
1724 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1725 if ((NULL != nested_context) &&
1726 (ntohs (nested_context->size) > GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE))
1727 {
1728 GNUNET_break_op (0);
1729 return GNUNET_SYSERR;
1730 }
1731 return GNUNET_OK;
1732}
1733
1734
1735/**
1736 * Handle a request for a set operation from another peer. Checks if we
1737 * have a listener waiting for such a request (and in that case initiates
1738 * asking the listener about accepting the connection). If no listener
1739 * is waiting, we queue the operation request in hope that a listener
1740 * shows up soon (before timeout).
1741 *
1742 * This msg is expected as the first and only msg handled through the
1743 * non-operation bound virtual table, acceptance of this operation replaces
1744 * our virtual table and subsequent msgs would be routed differently (as
1745 * we then know what type of operation this is).
1746 *
1747 * @param cls the operation state
1748 * @param msg the received message
1749 * @return #GNUNET_OK if the channel should be kept alive,
1750 * #GNUNET_SYSERR to destroy the channel
1751 */
1752static void
1753handle_incoming_msg (void *cls,
1754 const struct OperationRequestMessage *msg)
1755{
1756 struct Operation *op = cls;
1757 struct Listener *listener = op->listener;
1758 const struct GNUNET_MessageHeader *nested_context;
1759 struct GNUNET_MQ_Envelope *env;
1760 struct GNUNET_SETI_RequestMessage *cmsg;
1761
1762 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1763 /* Make a copy of the nested_context (application-specific context
1764 information that is opaque to set) so we can pass it to the
1765 listener later on */
1766 if (NULL != nested_context)
1767 op->context_msg = GNUNET_copy_message (nested_context);
1768 op->remote_element_count = ntohl (msg->element_count);
1769 GNUNET_log (
1770 GNUNET_ERROR_TYPE_DEBUG,
1771 "Received P2P operation request (port %s) for active listener\n",
1772 GNUNET_h2s (&op->listener->app_id));
1773 GNUNET_assert (0 == op->suggest_id);
1774 if (0 == suggest_id)
1775 suggest_id++;
1776 op->suggest_id = suggest_id++;
1777 GNUNET_assert (NULL != op->timeout_task);
1778 GNUNET_SCHEDULER_cancel (op->timeout_task);
1779 op->timeout_task = NULL;
1780 env = GNUNET_MQ_msg_nested_mh (cmsg,
1781 GNUNET_MESSAGE_TYPE_SETI_REQUEST,
1782 op->context_msg);
1783 GNUNET_log (
1784 GNUNET_ERROR_TYPE_DEBUG,
1785 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
1786 op->suggest_id,
1787 listener,
1788 listener->cs);
1789 cmsg->accept_id = htonl (op->suggest_id);
1790 cmsg->peer_id = op->peer;
1791 GNUNET_MQ_send (listener->cs->mq, env);
1792 /* NOTE: GNUNET_CADET_receive_done() will be called in
1793 #handle_client_accept() */
1794}
1795
1796
1797/**
1798 * Called when a client wants to create a new set. This is typically
1799 * the first request from a client, and includes the type of set
1800 * operation to be performed.
1801 *
1802 * @param cls client that sent the message
1803 * @param m message sent by the client
1804 */
1805static void
1806handle_client_create_set (void *cls,
1807 const struct GNUNET_SETI_CreateMessage *msg)
1808{
1809 struct ClientState *cs = cls;
1810 struct Set *set;
1811
1812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1813 "Client created new intersection set\n");
1814 if (NULL != cs->set)
1815 {
1816 /* There can only be one set per client */
1817 GNUNET_break (0);
1818 GNUNET_SERVICE_client_drop (cs->client);
1819 return;
1820 }
1821 set = GNUNET_new (struct Set);
1822 set->content = GNUNET_new (struct SetContent);
1823 set->content->refcount = 1;
1824 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1825 GNUNET_YES);
1826 set->cs = cs;
1827 cs->set = set;
1828 GNUNET_SERVICE_client_continue (cs->client);
1829}
1830
1831
1832/**
1833 * Timeout happens iff:
1834 * - we suggested an operation to our listener,
1835 * but did not receive a response in time
1836 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST
1837 *
1838 * @param cls channel context
1839 * @param tc context information (why was this task triggered now)
1840 */
1841static void
1842incoming_timeout_cb (void *cls)
1843{
1844 struct Operation *op = cls;
1845
1846 op->timeout_task = NULL;
1847 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1848 "Remote peer's incoming request timed out\n");
1849 incoming_destroy (op);
1850}
1851
1852
1853/**
1854 * Method called whenever another peer has added us to a channel the
1855 * other peer initiated. Only called (once) upon reception of data
1856 * from a channel we listen on.
1857 *
1858 * The channel context represents the operation itself and gets added
1859 * to a DLL, from where it gets looked up when our local listener
1860 * client responds to a proposed/suggested operation or connects and
1861 * associates with this operation.
1862 *
1863 * @param cls closure
1864 * @param channel new handle to the channel
1865 * @param source peer that started the channel
1866 * @return initial channel context for the channel
1867 * returns NULL on error
1868 */
1869static void *
1870channel_new_cb (void *cls,
1871 struct GNUNET_CADET_Channel *channel,
1872 const struct GNUNET_PeerIdentity *source)
1873{
1874 struct Listener *listener = cls;
1875 struct Operation *op;
1876
1877 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1878 "New incoming channel\n");
1879 op = GNUNET_new (struct Operation);
1880 op->listener = listener;
1881 op->peer = *source;
1882 op->channel = channel;
1883 op->mq = GNUNET_CADET_get_mq (op->channel);
1884 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1885 UINT32_MAX);
1886 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1887 &incoming_timeout_cb,
1888 op);
1889 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1890 listener->op_tail,
1891 op);
1892 return op;
1893}
1894
1895
1896/**
1897 * Function called whenever a channel is destroyed. Should clean up
1898 * any associated state. It must NOT call
1899 * GNUNET_CADET_channel_destroy() on the channel.
1900 *
1901 * The peer_disconnect function is part of a a virtual table set initially either
1902 * when a peer creates a new channel with us, or once we create
1903 * a new channel ourselves (evaluate).
1904 *
1905 * Once we know the exact type of operation (union/intersection), the vt is
1906 * replaced with an operation specific instance (_GSS_[op]_vt).
1907 *
1908 * @param channel_ctx place where local state associated
1909 * with the channel is stored
1910 * @param channel connection to the other end (henceforth invalid)
1911 */
1912static void
1913channel_end_cb (void *channel_ctx,
1914 const struct GNUNET_CADET_Channel *channel)
1915{
1916 struct Operation *op = channel_ctx;
1917
1918 op->channel = NULL;
1919 _GSS_operation_destroy2 (op);
1920}
1921
1922
1923/**
1924 * Function called whenever an MQ-channel's transmission window size changes.
1925 *
1926 * The first callback in an outgoing channel will be with a non-zero value
1927 * and will mean the channel is connected to the destination.
1928 *
1929 * For an incoming channel it will be called immediately after the
1930 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1931 *
1932 * @param cls Channel closure.
1933 * @param channel Connection to the other end (henceforth invalid).
1934 * @param window_size New window size. If the is more messages than buffer size
1935 * this value will be negative..
1936 */
1937static void
1938channel_window_cb (void *cls,
1939 const struct GNUNET_CADET_Channel *channel,
1940 int window_size)
1941{
1942 /* FIXME: not implemented, we could do flow control here... */
1943}
1944
1945
1946/**
1947 * Called when a client wants to create a new listener.
1948 *
1949 * @param cls client that sent the message
1950 * @param msg message sent by the client
1951 */
1952static void
1953handle_client_listen (void *cls,
1954 const struct GNUNET_SETI_ListenMessage *msg)
1955{
1956 struct ClientState *cs = cls;
1957 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1958 GNUNET_MQ_hd_var_size (incoming_msg,
1959 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
1960 struct OperationRequestMessage,
1961 NULL),
1962 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1963 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
1964 struct IntersectionElementInfoMessage,
1965 NULL),
1966 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1967 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
1968 struct BFMessage,
1969 NULL),
1970 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1971 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
1972 struct IntersectionDoneMessage,
1973 NULL),
1974 GNUNET_MQ_handler_end ()
1975 };
1976 struct Listener *listener;
1977
1978 if (NULL != cs->listener)
1979 {
1980 /* max. one active listener per client! */
1981 GNUNET_break (0);
1982 GNUNET_SERVICE_client_drop (cs->client);
1983 return;
1984 }
1985 listener = GNUNET_new (struct Listener);
1986 listener->cs = cs;
1987 cs->listener = listener;
1988 listener->app_id = msg->app_id;
1989 GNUNET_CONTAINER_DLL_insert (listener_head,
1990 listener_tail,
1991 listener);
1992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993 "New listener for set intersection created (port %s)\n",
1994 GNUNET_h2s (&listener->app_id));
1995 listener->open_port = GNUNET_CADET_open_port (cadet,
1996 &msg->app_id,
1997 &channel_new_cb,
1998 listener,
1999 &channel_window_cb,
2000 &channel_end_cb,
2001 cadet_handlers);
2002 GNUNET_SERVICE_client_continue (cs->client);
2003}
2004
2005
2006/**
2007 * Called when the listening client rejects an operation
2008 * request by another peer.
2009 *
2010 * @param cls client that sent the message
2011 * @param msg message sent by the client
2012 */
2013static void
2014handle_client_reject (void *cls,
2015 const struct GNUNET_SETI_RejectMessage *msg)
2016{
2017 struct ClientState *cs = cls;
2018 struct Operation *op;
2019
2020 op = get_incoming (ntohl (msg->accept_reject_id));
2021 if (NULL == op)
2022 {
2023 /* no matching incoming operation for this reject;
2024 could be that the other peer already disconnected... */
2025 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2026 "Client rejected unknown operation %u\n",
2027 (unsigned int) ntohl (msg->accept_reject_id));
2028 GNUNET_SERVICE_client_continue (cs->client);
2029 return;
2030 }
2031 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2032 "Peer request (app %s) rejected by client\n",
2033 GNUNET_h2s (&cs->listener->app_id));
2034 _GSS_operation_destroy2 (op);
2035 GNUNET_SERVICE_client_continue (cs->client);
2036}
2037
2038
2039/**
2040 * Called when a client wants to add or remove an element to a set it inhabits.
2041 *
2042 * @param cls client that sent the message
2043 * @param msg message sent by the client
2044 */
2045static int
2046check_client_set_add (void *cls,
2047 const struct GNUNET_SETI_ElementMessage *msg)
2048{
2049 /* NOTE: Technically, we should probably check with the
2050 block library whether the element we are given is well-formed */
2051 return GNUNET_OK;
2052}
2053
2054
2055/**
2056 * Called when a client wants to add an element to a set it inhabits.
2057 *
2058 * @param cls client that sent the message
2059 * @param msg message sent by the client
2060 */
2061static void
2062handle_client_set_add (void *cls,
2063 const struct GNUNET_SETI_ElementMessage *msg)
2064{
2065 struct ClientState *cs = cls;
2066 struct Set *set;
2067 struct GNUNET_SETI_Element el;
2068 struct ElementEntry *ee;
2069 struct GNUNET_HashCode hash;
2070
2071 if (NULL == (set = cs->set))
2072 {
2073 /* client without a set requested an operation */
2074 GNUNET_break (0);
2075 GNUNET_SERVICE_client_drop (cs->client);
2076 return;
2077 }
2078 GNUNET_SERVICE_client_continue (cs->client);
2079 el.size = ntohs (msg->header.size) - sizeof(*msg);
2080 el.data = &msg[1];
2081 el.element_type = ntohs (msg->element_type);
2082 GNUNET_SETI_element_hash (&el,
2083 &hash);
2084 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
2085 &hash);
2086 if (NULL == ee)
2087 {
2088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089 "Client inserts element %s of size %u\n",
2090 GNUNET_h2s (&hash),
2091 el.size);
2092 ee = GNUNET_malloc (el.size + sizeof(*ee));
2093 ee->element.size = el.size;
2094 GNUNET_memcpy (&ee[1], el.data, el.size);
2095 ee->element.data = &ee[1];
2096 ee->element.element_type = el.element_type;
2097 ee->remote = GNUNET_NO;
2098 ee->element_hash = hash;
2099 GNUNET_break (GNUNET_YES ==
2100 GNUNET_CONTAINER_multihashmap_put (
2101 set->content->elements,
2102 &ee->element_hash,
2103 ee,
2104 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2105 }
2106 else
2107 {
2108 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2109 "Client inserted element %s of size %u twice (ignored)\n",
2110 GNUNET_h2s (&hash),
2111 el.size);
2112 /* same element inserted twice */
2113 return;
2114 }
2115 set->current_set_element_count++;
2116}
2117
2118
2119/**
2120 * Advance the current generation of a set,
2121 * adding exclusion ranges if necessary.
2122 *
2123 * @param set the set where we want to advance the generation
2124 */
2125static void
2126advance_generation (struct Set *set)
2127{
2128 if (set->current_generation == set->content->latest_generation)
2129 {
2130 set->content->latest_generation++;
2131 set->current_generation++;
2132 return;
2133 }
2134 GNUNET_assert (set->current_generation < set->content->latest_generation);
2135}
2136
2137
2138/**
2139 * Called when a client wants to initiate a set operation with another
2140 * peer. Initiates the CADET connection to the listener and sends the
2141 * request.
2142 *
2143 * @param cls client that sent the message
2144 * @param msg message sent by the client
2145 * @return #GNUNET_OK if the message is well-formed
2146 */
2147static int
2148check_client_evaluate (void *cls,
2149 const struct GNUNET_SETI_EvaluateMessage *msg)
2150{
2151 /* FIXME: suboptimal, even if the context below could be NULL,
2152 there are malformed messages this does not check for... */
2153 return GNUNET_OK;
2154}
2155
2156
2157/**
2158 * Called when a client wants to initiate a set operation with another
2159 * peer. Initiates the CADET connection to the listener and sends the
2160 * request.
2161 *
2162 * @param cls client that sent the message
2163 * @param msg message sent by the client
2164 */
2165static void
2166handle_client_evaluate (void *cls,
2167 const struct GNUNET_SETI_EvaluateMessage *msg)
2168{
2169 struct ClientState *cs = cls;
2170 struct Operation *op = GNUNET_new (struct Operation);
2171 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2172 GNUNET_MQ_hd_var_size (incoming_msg,
2173 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2174 struct OperationRequestMessage,
2175 op),
2176 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
2177 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
2178 struct IntersectionElementInfoMessage,
2179 op),
2180 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
2181 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
2182 struct BFMessage,
2183 op),
2184 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
2185 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
2186 struct IntersectionDoneMessage,
2187 op),
2188 GNUNET_MQ_handler_end ()
2189 };
2190 struct Set *set;
2191 const struct GNUNET_MessageHeader *context;
2192
2193 if (NULL == (set = cs->set))
2194 {
2195 GNUNET_break (0);
2196 GNUNET_free (op);
2197 GNUNET_SERVICE_client_drop (cs->client);
2198 return;
2199 }
2200 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2201 UINT32_MAX);
2202 op->peer = msg->target_peer;
2203 op->return_intersection = htonl (msg->return_intersection);
2204 fprintf (stderr,
2205 "Return intersection for evaluate is %d\n",
2206 op->return_intersection);
2207 op->client_request_id = ntohl (msg->request_id);
2208 context = GNUNET_MQ_extract_nested_mh (msg);
2209
2210 /* Advance generation values, so that
2211 mutations won't interfer with the running operation. */
2212 op->set = set;
2213 op->generation_created = set->current_generation;
2214 advance_generation (set);
2215 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2216 set->ops_tail,
2217 op);
2218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2219 "Creating new CADET channel to port %s for set intersection\n",
2220 GNUNET_h2s (&msg->app_id));
2221 op->channel = GNUNET_CADET_channel_create (cadet,
2222 op,
2223 &msg->target_peer,
2224 &msg->app_id,
2225 &channel_window_cb,
2226 &channel_end_cb,
2227 cadet_handlers);
2228 op->mq = GNUNET_CADET_get_mq (op->channel);
2229 {
2230 struct GNUNET_MQ_Envelope *ev;
2231 struct OperationRequestMessage *msg;
2232
2233 ev = GNUNET_MQ_msg_nested_mh (msg,
2234 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2235 context);
2236 if (NULL == ev)
2237 {
2238 /* the context message is too large!? */
2239 GNUNET_break (0);
2240 GNUNET_SERVICE_client_drop (cs->client);
2241 return;
2242 }
2243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2244 "Initiating intersection operation evaluation\n");
2245 /* we started the operation, thus we have to send the operation request */
2246 op->phase = PHASE_INITIAL;
2247 op->my_element_count = op->set->current_set_element_count;
2248 op->my_elements
2249 = GNUNET_CONTAINER_multihashmap_create (op->my_element_count,
2250 GNUNET_YES);
2251
2252 msg->element_count = htonl (op->my_element_count);
2253 GNUNET_MQ_send (op->mq,
2254 ev);
2255 op->phase = PHASE_COUNT_SENT;
2256 if (NULL != context)
2257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258 "Sent op request with context message\n");
2259 else
2260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2261 "Sent op request without context message\n");
2262 }
2263 GNUNET_SERVICE_client_continue (cs->client);
2264}
2265
2266
2267/**
2268 * Handle a request from the client to cancel a running set operation.
2269 *
2270 * @param cls the client
2271 * @param msg the message
2272 */
2273static void
2274handle_client_cancel (void *cls,
2275 const struct GNUNET_SETI_CancelMessage *msg)
2276{
2277 struct ClientState *cs = cls;
2278 struct Set *set;
2279 struct Operation *op;
2280 int found;
2281
2282 if (NULL == (set = cs->set))
2283 {
2284 /* client without a set requested an operation */
2285 GNUNET_break (0);
2286 GNUNET_SERVICE_client_drop (cs->client);
2287 return;
2288 }
2289 found = GNUNET_NO;
2290 for (op = set->ops_head; NULL != op; op = op->next)
2291 {
2292 if (op->client_request_id == ntohl (msg->request_id))
2293 {
2294 found = GNUNET_YES;
2295 break;
2296 }
2297 }
2298 if (GNUNET_NO == found)
2299 {
2300 /* It may happen that the operation was already destroyed due to
2301 * the other peer disconnecting. The client may not know about this
2302 * yet and try to cancel the (just barely non-existent) operation.
2303 * So this is not a hard error.
2304 *///
2305 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2306 "Client canceled non-existent op %u\n",
2307 (uint32_t) ntohl (msg->request_id));
2308 }
2309 else
2310 {
2311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2312 "Client requested cancel for op %u\n",
2313 (uint32_t) ntohl (msg->request_id));
2314 _GSS_operation_destroy (op);
2315 }
2316 GNUNET_SERVICE_client_continue (cs->client);
2317}
2318
2319
2320/**
2321 * Handle a request from the client to accept a set operation that
2322 * came from a remote peer. We forward the accept to the associated
2323 * operation for handling
2324 *
2325 * @param cls the client
2326 * @param msg the message
2327 */
2328static void
2329handle_client_accept (void *cls,
2330 const struct GNUNET_SETI_AcceptMessage *msg)
2331{
2332 struct ClientState *cs = cls;
2333 struct Set *set;
2334 struct Operation *op;
2335 struct GNUNET_SETI_ResultMessage *result_message;
2336 struct GNUNET_MQ_Envelope *ev;
2337 struct Listener *listener;
2338
2339 if (NULL == (set = cs->set))
2340 {
2341 /* client without a set requested to accept */
2342 GNUNET_break (0);
2343 GNUNET_SERVICE_client_drop (cs->client);
2344 return;
2345 }
2346 op = get_incoming (ntohl (msg->accept_reject_id));
2347 if (NULL == op)
2348 {
2349 /* It is not an error if the set op does not exist -- it may
2350 * have been destroyed when the partner peer disconnected. */
2351 GNUNET_log (
2352 GNUNET_ERROR_TYPE_INFO,
2353 "Client %p accepted request %u of listener %p that is no longer active\n",
2354 cs,
2355 ntohl (msg->accept_reject_id),
2356 cs->listener);
2357 ev = GNUNET_MQ_msg (result_message,
2358 GNUNET_MESSAGE_TYPE_SETI_RESULT);
2359 result_message->request_id = msg->request_id;
2360 result_message->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
2361 GNUNET_MQ_send (set->cs->mq, ev);
2362 GNUNET_SERVICE_client_continue (cs->client);
2363 return;
2364 }
2365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366 "Client accepting request %u\n",
2367 (uint32_t) ntohl (msg->accept_reject_id));
2368 listener = op->listener;
2369 op->listener = NULL;
2370 op->return_intersection = htonl (msg->return_intersection);
2371 fprintf (stderr,
2372 "Return intersection for accept is %d\n",
2373 op->return_intersection);
2374 GNUNET_CONTAINER_DLL_remove (listener->op_head,
2375 listener->op_tail,
2376 op);
2377 op->set = set;
2378 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2379 set->ops_tail,
2380 op);
2381 op->client_request_id = ntohl (msg->request_id);
2382
2383 /* Advance generation values, so that future mutations do not
2384 interfer with the running operation. */
2385 op->generation_created = set->current_generation;
2386 advance_generation (set);
2387 {
2388 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2389 "Accepting set intersection operation\n");
2390 op->phase = PHASE_INITIAL;
2391 op->my_element_count
2392 = op->set->current_set_element_count;
2393 op->my_elements
2394 = GNUNET_CONTAINER_multihashmap_create (
2395 GNUNET_MIN (op->my_element_count,
2396 op->remote_element_count),
2397 GNUNET_YES);
2398 if (op->remote_element_count < op->my_element_count)
2399 {
2400 /* If the other peer (Alice) has fewer elements than us (Bob),
2401 we just send the count as Alice should send the first BF */
2402 send_element_count (op);
2403 op->phase = PHASE_COUNT_SENT;
2404 }
2405 else
2406 {
2407 /* We have fewer elements, so we start with the BF */
2408 begin_bf_exchange (op);
2409 }
2410 }
2411 /* Now allow CADET to continue, as we did not do this in
2412 #handle_incoming_msg (as we wanted to first see if the
2413 local client would accept the request). */
2414 GNUNET_CADET_receive_done (op->channel);
2415 GNUNET_SERVICE_client_continue (cs->client);
2416}
2417
2418
2419/**
2420 * Called to clean up, after a shutdown has been requested.
2421 *
2422 * @param cls closure, NULL
2423 */
2424static void
2425shutdown_task (void *cls)
2426{
2427 /* Delay actual shutdown to allow service to disconnect clients */
2428 in_shutdown = GNUNET_YES;
2429 if (0 == num_clients)
2430 {
2431 if (NULL != cadet)
2432 {
2433 GNUNET_CADET_disconnect (cadet);
2434 cadet = NULL;
2435 }
2436 }
2437 GNUNET_STATISTICS_destroy (_GSS_statistics,
2438 GNUNET_YES);
2439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2440 "handled shutdown request\n");
2441}
2442
2443
2444/**
2445 * Function called by the service's run
2446 * method to run service-specific setup code.
2447 *
2448 * @param cls closure
2449 * @param cfg configuration to use
2450 * @param service the initialized service
2451 */
2452static void
2453run (void *cls,
2454 const struct GNUNET_CONFIGURATION_Handle *cfg,
2455 struct GNUNET_SERVICE_Handle *service)
2456{
2457 /* FIXME: need to modify SERVICE (!) API to allow
2458 us to run a shutdown task *after* clients were
2459 forcefully disconnected! */
2460 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2461 NULL);
2462 _GSS_statistics = GNUNET_STATISTICS_create ("seti",
2463 cfg);
2464 cadet = GNUNET_CADET_connect (cfg);
2465 if (NULL == cadet)
2466 {
2467 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2468 _ ("Could not connect to CADET service\n"));
2469 GNUNET_SCHEDULER_shutdown ();
2470 return;
2471 }
2472}
2473
2474
2475/**
2476 * Define "main" method using service macro.
2477 */
2478GNUNET_SERVICE_MAIN (
2479 "seti",
2480 GNUNET_SERVICE_OPTION_NONE,
2481 &run,
2482 &client_connect_cb,
2483 &client_disconnect_cb,
2484 NULL,
2485 GNUNET_MQ_hd_fixed_size (client_accept,
2486 GNUNET_MESSAGE_TYPE_SETI_ACCEPT,
2487 struct GNUNET_SETI_AcceptMessage,
2488 NULL),
2489 GNUNET_MQ_hd_var_size (client_set_add,
2490 GNUNET_MESSAGE_TYPE_SETI_ADD,
2491 struct GNUNET_SETI_ElementMessage,
2492 NULL),
2493 GNUNET_MQ_hd_fixed_size (client_create_set,
2494 GNUNET_MESSAGE_TYPE_SETI_CREATE,
2495 struct GNUNET_SETI_CreateMessage,
2496 NULL),
2497 GNUNET_MQ_hd_var_size (client_evaluate,
2498 GNUNET_MESSAGE_TYPE_SETI_EVALUATE,
2499 struct GNUNET_SETI_EvaluateMessage,
2500 NULL),
2501 GNUNET_MQ_hd_fixed_size (client_listen,
2502 GNUNET_MESSAGE_TYPE_SETI_LISTEN,
2503 struct GNUNET_SETI_ListenMessage,
2504 NULL),
2505 GNUNET_MQ_hd_fixed_size (client_reject,
2506 GNUNET_MESSAGE_TYPE_SETI_REJECT,
2507 struct GNUNET_SETI_RejectMessage,
2508 NULL),
2509 GNUNET_MQ_hd_fixed_size (client_cancel,
2510 GNUNET_MESSAGE_TYPE_SETI_CANCEL,
2511 struct GNUNET_SETI_CancelMessage,
2512 NULL),
2513 GNUNET_MQ_handler_end ());
2514
2515
2516/* end of gnunet-service-seti.c */
diff --git a/src/service/seti/gnunet-service-seti_protocol.h b/src/service/seti/gnunet-service-seti_protocol.h
new file mode 100644
index 000000000..51968376e
--- /dev/null
+++ b/src/service/seti/gnunet-service-seti_protocol.h
@@ -0,0 +1,144 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013, 2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @author Florian Dold
22 * @author Christian Grothoff
23 * @file seti/gnunet-service-seti_protocol.h
24 * @brief Peer-to-Peer messages for gnunet set
25 */
26#ifndef SETI_PROTOCOL_H
27#define SETI_PROTOCOL_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35struct OperationRequestMessage
36{
37 /**
38 * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
39 */
40 struct GNUNET_MessageHeader header;
41
42 /**
43 * For Intersection: my element count
44 */
45 uint32_t element_count GNUNET_PACKED;
46
47 /**
48 * Application-specific identifier of the request.
49 */
50 struct GNUNET_HashCode app_idX;
51
52 /* rest: optional message */
53};
54
55
56/**
57 * During intersection, the first (and possibly second) message
58 * send it the number of elements in the set, to allow the peers
59 * to decide who should start with the Bloom filter.
60 */
61struct IntersectionElementInfoMessage
62{
63 /**
64 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
65 */
66 struct GNUNET_MessageHeader header;
67
68 /**
69 * mutator used with this bloomfilter.
70 */
71 uint32_t sender_element_count GNUNET_PACKED;
72};
73
74
75/**
76 * Bloom filter messages exchanged for set intersection calculation.
77 */
78struct BFMessage
79{
80 /**
81 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
82 */
83 struct GNUNET_MessageHeader header;
84
85 /**
86 * Number of elements the sender still has in the set.
87 */
88 uint32_t sender_element_count GNUNET_PACKED;
89
90 /**
91 * XOR of all hashes over all elements remaining in the set.
92 * Used to determine termination.
93 */
94 struct GNUNET_HashCode element_xor_hash;
95
96 /**
97 * Mutator used with this bloomfilter.
98 */
99 uint32_t sender_mutator GNUNET_PACKED;
100
101 /**
102 * Total length of the bloomfilter data.
103 */
104 uint32_t bloomfilter_total_length GNUNET_PACKED;
105
106 /**
107 * Number of bits (k-value) used in encoding the bloomfilter.
108 */
109 uint32_t bits_per_element GNUNET_PACKED;
110
111 /**
112 * rest: the sender's bloomfilter
113 */
114};
115
116
117/**
118 * Last message, send to confirm the final set. Contains the element
119 * count as it is possible that the peer determined that we were done
120 * by getting the empty set, which in that case also needs to be
121 * communicated.
122 */
123struct IntersectionDoneMessage
124{
125 /**
126 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
127 */
128 struct GNUNET_MessageHeader header;
129
130 /**
131 * Final number of elements in intersection.
132 */
133 uint32_t final_element_count GNUNET_PACKED;
134
135 /**
136 * XOR of all hashes over all elements remaining in the set.
137 */
138 struct GNUNET_HashCode element_xor_hash;
139};
140
141
142GNUNET_NETWORK_STRUCT_END
143
144#endif
diff --git a/src/service/seti/meson.build b/src/service/seti/meson.build
new file mode 100644
index 000000000..1743e327d
--- /dev/null
+++ b/src/service/seti/meson.build
@@ -0,0 +1,41 @@
1libgnunetseti_src = ['seti_api.c']
2
3gnunetserviceseti_src = ['gnunet-service-seti.c']
4
5configure_file(input : 'seti.conf.in',
6 output : 'seti.conf',
7 configuration : cdata,
8 install: true,
9 install_dir: pkgcfgdir)
10
11
12if get_option('monolith')
13 foreach p : libgnunetseti_src + gnunetserviceseti_src
14 gnunet_src += 'seti/' + p
15 endforeach
16endif
17
18libgnunetseti = library('gnunetseti',
19 libgnunetseti_src,
20 soversion: '0',
21 version: '0.0.0',
22 dependencies: libgnunetutil_dep,
23 include_directories: [incdir, configuration_inc],
24 install: true,
25 install_dir: get_option('libdir'))
26pkg.generate(libgnunetseti, url: 'https://www.gnunet.org',
27 description : 'Provides API for accessing the set intersection service')
28libgnunetseti_dep = declare_dependency(link_with : libgnunetseti)
29executable ('gnunet-service-seti',
30 gnunetserviceseti_src,
31 dependencies: [libgnunetseti_dep,
32 libgnunetutil_dep,
33 m_dep,
34 libgnunetstatistics_dep,
35 libgnunetcore_dep,
36 libgnunetcadet_dep,
37 libgnunetblock_dep],
38 include_directories: [incdir, configuration_inc],
39 install: true,
40 install_dir: get_option('libdir') / 'gnunet' / 'libexec')
41
diff --git a/src/service/seti/seti.conf.in b/src/service/seti/seti.conf.in
new file mode 100644
index 000000000..e4f7b60b5
--- /dev/null
+++ b/src/service/seti/seti.conf.in
@@ -0,0 +1,12 @@
1[seti]
2START_ON_DEMAND = @START_ON_DEMAND@
3@UNIXONLY@PORT = 2106
4HOSTNAME = localhost
5BINARY = gnunet-service-seti
6ACCEPT_FROM = 127.0.0.1;
7ACCEPT_FROM6 = ::1;
8UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-seti.sock
9UNIX_MATCH_UID = YES
10UNIX_MATCH_GID = YES
11
12#PREFIX = valgrind
diff --git a/src/service/seti/seti.h b/src/service/seti/seti.h
new file mode 100644
index 000000000..e780b1061
--- /dev/null
+++ b/src/service/seti/seti.h
@@ -0,0 +1,267 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/seti.h
22 * @brief messages used for the set intersection api
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#ifndef SETI_H
27#define SETI_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31#include "gnunet_set_service.h"
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35/**
36 * Message sent by the client to the service to ask starting
37 * a new set to perform operations with.
38 */
39struct GNUNET_SETI_CreateMessage
40{
41 /**
42 * Type: #GNUNET_MESSAGE_TYPE_SETI_CREATE
43 */
44 struct GNUNET_MessageHeader header;
45};
46
47
48/**
49 * Message sent by the client to the service to start listening for
50 * incoming requests to perform a certain type of set operation for a
51 * certain type of application.
52 */
53struct GNUNET_SETI_ListenMessage
54{
55 /**
56 * Type: #GNUNET_MESSAGE_TYPE_SETI_LISTEN
57 */
58 struct GNUNET_MessageHeader header;
59
60 /**
61 * Operation type, values of `enum GNUNET_SETI_OperationType`
62 */
63 uint32_t operation GNUNET_PACKED;
64
65 /**
66 * application id
67 */
68 struct GNUNET_HashCode app_id;
69};
70
71
72/**
73 * Message sent by a listening client to the service to accept
74 * performing the operation with the other peer.
75 */
76struct GNUNET_SETI_AcceptMessage
77{
78 /**
79 * Type: #GNUNET_MESSAGE_TYPE_SETI_ACCEPT
80 */
81 struct GNUNET_MessageHeader header;
82
83 /**
84 * ID of the incoming request we want to accept.
85 */
86 uint32_t accept_reject_id GNUNET_PACKED;
87
88 /**
89 * Request ID to identify responses.
90 */
91 uint32_t request_id GNUNET_PACKED;
92
93 /**
94 * Return the intersection (1), instead of the elements to
95 * remove / the delta (0), in NBO.
96 */
97 uint32_t return_intersection;
98
99};
100
101
102/**
103 * Message sent by a listening client to the service to reject
104 * performing the operation with the other peer.
105 */
106struct GNUNET_SETI_RejectMessage
107{
108 /**
109 * Type: #GNUNET_MESSAGE_TYPE_SETI_REJECT
110 */
111 struct GNUNET_MessageHeader header;
112
113 /**
114 * ID of the incoming request we want to reject.
115 */
116 uint32_t accept_reject_id GNUNET_PACKED;
117};
118
119
120/**
121 * A request for an operation with another client.
122 */
123struct GNUNET_SETI_RequestMessage
124{
125 /**
126 * Type: #GNUNET_MESSAGE_TYPE_SETI_REQUEST.
127 */
128 struct GNUNET_MessageHeader header;
129
130 /**
131 * ID of the to identify the request when accepting or
132 * rejecting it.
133 */
134 uint32_t accept_id GNUNET_PACKED;
135
136 /**
137 * Identity of the requesting peer.
138 */
139 struct GNUNET_PeerIdentity peer_id;
140
141 /* rest: context message, that is, application-specific
142 message to convince listener to pick up */
143};
144
145
146/**
147 * Message sent by client to service to initiate a set operation as a
148 * client (not as listener). A set (which determines the operation
149 * type) must already exist in association with this client.
150 */
151struct GNUNET_SETI_EvaluateMessage
152{
153 /**
154 * Type: #GNUNET_MESSAGE_TYPE_SETI_EVALUATE
155 */
156 struct GNUNET_MessageHeader header;
157
158 /**
159 * Id of our set to evaluate, chosen implicitly by the client when it
160 * calls #GNUNET_SETI_commit().
161 */
162 uint32_t request_id GNUNET_PACKED;
163
164 /**
165 * Peer to evaluate the operation with
166 */
167 struct GNUNET_PeerIdentity target_peer;
168
169 /**
170 * Application id
171 */
172 struct GNUNET_HashCode app_id;
173
174 /**
175 * Return the intersection (1), instead of the elements to
176 * remove / the delta (0), in NBO.
177 */
178 uint32_t return_intersection;
179
180 /* rest: context message, that is, application-specific
181 message to convince listener to pick up */
182};
183
184
185/**
186 * Message sent by the service to the client to indicate an
187 * element that is removed (set intersection) or added
188 * (set union) or part of the final result, depending on
189 * options specified for the operation.
190 */
191struct GNUNET_SETI_ResultMessage
192{
193 /**
194 * Type: #GNUNET_MESSAGE_TYPE_SETI_RESULT
195 */
196 struct GNUNET_MessageHeader header;
197
198 /**
199 * Current set size.
200 */
201 uint64_t current_size;
202
203 /**
204 * id the result belongs to
205 */
206 uint32_t request_id GNUNET_PACKED;
207
208 /**
209 * Was the evaluation successful? Contains
210 * an `enum GNUNET_SETI_Status` in NBO.
211 */
212 uint16_t result_status GNUNET_PACKED;
213
214 /**
215 * Type of the element attached to the message, if any.
216 */
217 uint16_t element_type GNUNET_PACKED;
218
219 /* rest: the actual element */
220};
221
222
223/**
224 * Message sent by client to the service to add an element to the set.
225 */
226struct GNUNET_SETI_ElementMessage
227{
228 /**
229 * Type: #GNUNET_MESSAGE_TYPE_SETI_ADD.
230 */
231 struct GNUNET_MessageHeader header;
232
233 /**
234 * Type of the element to add or remove.
235 */
236 uint16_t element_type GNUNET_PACKED;
237
238 /**
239 * For alignment, always zero.
240 */
241 uint16_t reserved GNUNET_PACKED;
242
243 /* rest: the actual element */
244};
245
246
247/**
248 * Sent to the service by the client
249 * in order to cancel a set operation.
250 */
251struct GNUNET_SETI_CancelMessage
252{
253 /**
254 * Type: #GNUNET_MESSAGE_TYPE_SETI_CANCEL
255 */
256 struct GNUNET_MessageHeader header;
257
258 /**
259 * ID of the request we want to cancel.
260 */
261 uint32_t request_id GNUNET_PACKED;
262};
263
264
265GNUNET_NETWORK_STRUCT_END
266
267#endif
diff --git a/src/service/seti/seti_api.c b/src/service/seti/seti_api.c
new file mode 100644
index 000000000..6522236c6
--- /dev/null
+++ b/src/service/seti/seti_api.c
@@ -0,0 +1,870 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2016, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file seti/seti_api.c
22 * @brief api for the set service
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_seti_service.h"
30#include "seti.h"
31
32
33#define LOG(kind, ...) GNUNET_log_from (kind, "seti-api", __VA_ARGS__)
34
35
36/**
37 * Opaque handle to a set.
38 */
39struct GNUNET_SETI_Handle
40{
41 /**
42 * Message queue for @e client.
43 */
44 struct GNUNET_MQ_Handle *mq;
45
46 /**
47 * Linked list of operations on the set.
48 */
49 struct GNUNET_SETI_OperationHandle *ops_head;
50
51 /**
52 * Linked list of operations on the set.
53 */
54 struct GNUNET_SETI_OperationHandle *ops_tail;
55
56 /**
57 * Configuration, needed when creating (lazy) copies.
58 */
59 const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62 * Should the set be destroyed once all operations are gone?
63 * #GNUNET_SYSERR if #GNUNET_SETI_destroy() must raise this flag,
64 * #GNUNET_YES if #GNUNET_SETI_destroy() did raise this flag.
65 */
66 int destroy_requested;
67
68 /**
69 * Has the set become invalid (e.g. service died)?
70 */
71 int invalid;
72
73 /**
74 * Both client and service count the number of iterators
75 * created so far to match replies with iterators.
76 */
77 uint16_t iteration_id;
78
79};
80
81
82/**
83 * Handle for a set operation request from another peer.
84 */
85struct GNUNET_SETI_Request
86{
87 /**
88 * Id of the request, used to identify the request when
89 * accepting/rejecting it.
90 */
91 uint32_t accept_id;
92
93 /**
94 * Has the request been accepted already?
95 * #GNUNET_YES/#GNUNET_NO
96 */
97 int accepted;
98};
99
100
101/**
102 * Handle to an operation. Only known to the service after committing
103 * the handle with a set.
104 */
105struct GNUNET_SETI_OperationHandle
106{
107 /**
108 * Function to be called when we have a result,
109 * or an error.
110 */
111 GNUNET_SETI_ResultIterator result_cb;
112
113 /**
114 * Closure for @e result_cb.
115 */
116 void *result_cls;
117
118 /**
119 * Local set used for the operation,
120 * NULL if no set has been provided by conclude yet.
121 */
122 struct GNUNET_SETI_Handle *set;
123
124 /**
125 * Message sent to the server on calling conclude,
126 * NULL if conclude has been called.
127 */
128 struct GNUNET_MQ_Envelope *conclude_mqm;
129
130 /**
131 * Address of the request if in the conclude message,
132 * used to patch the request id into the message when the set is known.
133 */
134 uint32_t *request_id_addr;
135
136 /**
137 * Handles are kept in a linked list.
138 */
139 struct GNUNET_SETI_OperationHandle *prev;
140
141 /**
142 * Handles are kept in a linked list.
143 */
144 struct GNUNET_SETI_OperationHandle *next;
145
146 /**
147 * Request ID to identify the operation within the set.
148 */
149 uint32_t request_id;
150
151 /**
152 * Should we return the resulting intersection (ADD) or
153 * the elements to remove (DEL)?
154 */
155 int return_intersection;
156};
157
158
159/**
160 * Opaque handle to a listen operation.
161 */
162struct GNUNET_SETI_ListenHandle
163{
164 /**
165 * Message queue for the client.
166 */
167 struct GNUNET_MQ_Handle*mq;
168
169 /**
170 * Configuration handle for the listener, stored
171 * here to be able to reconnect transparently on
172 * connection failure.
173 */
174 const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176 /**
177 * Function to call on a new incoming request,
178 * or on error.
179 */
180 GNUNET_SETI_ListenCallback listen_cb;
181
182 /**
183 * Closure for @e listen_cb.
184 */
185 void *listen_cls;
186
187 /**
188 * Task for reconnecting when the listener fails.
189 */
190 struct GNUNET_SCHEDULER_Task *reconnect_task;
191
192 /**
193 * Application ID we listen for.
194 */
195 struct GNUNET_HashCode app_id;
196
197 /**
198 * Time to wait until we try to reconnect on failure.
199 */
200 struct GNUNET_TIME_Relative reconnect_backoff;
201
202};
203
204
205/**
206 * Check that the given @a msg is well-formed.
207 *
208 * @param cls closure
209 * @param msg message to check
210 * @return #GNUNET_OK if message is well-formed
211 */
212static int
213check_result (void *cls,
214 const struct GNUNET_SETI_ResultMessage *msg)
215{
216 /* minimum size was already checked, everything else is OK! */
217 return GNUNET_OK;
218}
219
220
221/**
222 * Handle result message for a set operation.
223 *
224 * @param cls the set
225 * @param mh the message
226 */
227static void
228handle_result (void *cls,
229 const struct GNUNET_SETI_ResultMessage *msg)
230{
231 struct GNUNET_SETI_Handle *set = cls;
232 struct GNUNET_SETI_OperationHandle *oh;
233 struct GNUNET_SETI_Element e;
234 enum GNUNET_SETI_Status result_status;
235 int destroy_set;
236
237 GNUNET_assert (NULL != set->mq);
238 result_status = (enum GNUNET_SETI_Status) ntohs (msg->result_status);
239 LOG (GNUNET_ERROR_TYPE_DEBUG,
240 "Got result message with status %d\n",
241 result_status);
242 oh = GNUNET_MQ_assoc_get (set->mq,
243 ntohl (msg->request_id));
244 if (NULL == oh)
245 {
246 /* 'oh' can be NULL if we canceled the operation, but the service
247 did not get the cancel message yet. */
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "Ignoring result from canceled operation\n");
250 return;
251 }
252
253 switch (result_status)
254 {
255 case GNUNET_SETI_STATUS_ADD_LOCAL:
256 case GNUNET_SETI_STATUS_DEL_LOCAL:
257 e.data = &msg[1];
258 e.size = ntohs (msg->header.size)
259 - sizeof(struct GNUNET_SETI_ResultMessage);
260 e.element_type = ntohs (msg->element_type);
261 if (NULL != oh->result_cb)
262 oh->result_cb (oh->result_cls,
263 &e,
264 GNUNET_ntohll (msg->current_size),
265 result_status);
266 return;
267 case GNUNET_SETI_STATUS_FAILURE:
268 case GNUNET_SETI_STATUS_DONE:
269 GNUNET_MQ_assoc_remove (set->mq,
270 ntohl (msg->request_id));
271 GNUNET_CONTAINER_DLL_remove (set->ops_head,
272 set->ops_tail,
273 oh);
274 /* Need to do this calculation _before_ the result callback,
275 as IF the application still has a valid set handle, it
276 may trigger destruction of the set during the callback. */
277 destroy_set = (GNUNET_YES == set->destroy_requested) &&
278 (NULL == set->ops_head);
279 if (NULL != oh->result_cb)
280 {
281 oh->result_cb (oh->result_cls,
282 NULL,
283 GNUNET_ntohll (msg->current_size),
284 result_status);
285 }
286 else
287 {
288 LOG (GNUNET_ERROR_TYPE_DEBUG,
289 "No callback for final status\n");
290 }
291 if (destroy_set)
292 GNUNET_SETI_destroy (set);
293 GNUNET_free (oh);
294 return;
295 }
296}
297
298
299/**
300 * Destroy the given set operation.
301 *
302 * @param oh set operation to destroy
303 */
304static void
305set_operation_destroy (struct GNUNET_SETI_OperationHandle *oh)
306{
307 struct GNUNET_SETI_Handle *set = oh->set;
308 struct GNUNET_SETI_OperationHandle *h_assoc;
309
310 if (NULL != oh->conclude_mqm)
311 GNUNET_MQ_discard (oh->conclude_mqm);
312 /* is the operation already committed? */
313 if (NULL != set)
314 {
315 GNUNET_CONTAINER_DLL_remove (set->ops_head,
316 set->ops_tail,
317 oh);
318 h_assoc = GNUNET_MQ_assoc_remove (set->mq,
319 oh->request_id);
320 GNUNET_assert ((NULL == h_assoc) ||
321 (h_assoc == oh));
322 }
323 GNUNET_free (oh);
324}
325
326
327/**
328 * Cancel the given set operation. We need to send an explicit cancel
329 * message, as all operations one one set communicate using one
330 * handle.
331 *
332 * @param oh set operation to cancel
333 */
334void
335GNUNET_SETI_operation_cancel (struct GNUNET_SETI_OperationHandle *oh)
336{
337 struct GNUNET_SETI_Handle *set = oh->set;
338 struct GNUNET_SETI_CancelMessage *m;
339 struct GNUNET_MQ_Envelope *mqm;
340
341 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 "Cancelling SET operation\n");
343 if (NULL != set)
344 {
345 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETI_CANCEL);
346 m->request_id = htonl (oh->request_id);
347 GNUNET_MQ_send (set->mq, mqm);
348 }
349 set_operation_destroy (oh);
350 if ((NULL != set) &&
351 (GNUNET_YES == set->destroy_requested) &&
352 (NULL == set->ops_head))
353 {
354 LOG (GNUNET_ERROR_TYPE_DEBUG,
355 "Destroying set after operation cancel\n");
356 GNUNET_SETI_destroy (set);
357 }
358}
359
360
361/**
362 * We encountered an error communicating with the set service while
363 * performing a set operation. Report to the application.
364 *
365 * @param cls the `struct GNUNET_SETI_Handle`
366 * @param error error code
367 */
368static void
369handle_client_set_error (void *cls,
370 enum GNUNET_MQ_Error error)
371{
372 struct GNUNET_SETI_Handle *set = cls;
373
374 LOG (GNUNET_ERROR_TYPE_ERROR,
375 "Handling client set error %d\n",
376 error);
377 while (NULL != set->ops_head)
378 {
379 if ((NULL != set->ops_head->result_cb) &&
380 (GNUNET_NO == set->destroy_requested))
381 set->ops_head->result_cb (set->ops_head->result_cls,
382 NULL,
383 0,
384 GNUNET_SETI_STATUS_FAILURE);
385 set_operation_destroy (set->ops_head);
386 }
387 set->invalid = GNUNET_YES;
388}
389
390
391/**
392 * Create an empty set.
393 *
394 * @param cfg configuration to use for connecting to the
395 * set service
396 * @return a handle to the set
397 */
398struct GNUNET_SETI_Handle *
399GNUNET_SETI_create (const struct GNUNET_CONFIGURATION_Handle *cfg)
400{
401 struct GNUNET_SETI_Handle *set = GNUNET_new (struct GNUNET_SETI_Handle);
402 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
403 GNUNET_MQ_hd_var_size (result,
404 GNUNET_MESSAGE_TYPE_SETI_RESULT,
405 struct GNUNET_SETI_ResultMessage,
406 set),
407 GNUNET_MQ_handler_end ()
408 };
409 struct GNUNET_MQ_Envelope *mqm;
410 struct GNUNET_SETI_CreateMessage *create_msg;
411
412 set->cfg = cfg;
413 set->mq = GNUNET_CLIENT_connect (cfg,
414 "seti",
415 mq_handlers,
416 &handle_client_set_error,
417 set);
418 if (NULL == set->mq)
419 {
420 GNUNET_free (set);
421 return NULL;
422 }
423 LOG (GNUNET_ERROR_TYPE_DEBUG,
424 "Creating new intersection set\n");
425 mqm = GNUNET_MQ_msg (create_msg,
426 GNUNET_MESSAGE_TYPE_SETI_CREATE);
427 GNUNET_MQ_send (set->mq,
428 mqm);
429 return set;
430}
431
432
433/**
434 * Add an element to the given set. After the element has been added
435 * (in the sense of being transmitted to the set service), @a cont
436 * will be called. Multiple calls to GNUNET_SETI_add_element() can be
437 * queued.
438 *
439 * @param set set to add element to
440 * @param element element to add to the set
441 * @param cb continuation called after the element has been added
442 * @param cb_cls closure for @a cont
443 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
444 * set is invalid (e.g. the set service crashed)
445 */
446int
447GNUNET_SETI_add_element (struct GNUNET_SETI_Handle *set,
448 const struct GNUNET_SETI_Element *element,
449 GNUNET_SCHEDULER_TaskCallback cb,
450 void *cb_cls)
451{
452 struct GNUNET_MQ_Envelope *mqm;
453 struct GNUNET_SETI_ElementMessage *msg;
454
455 LOG (GNUNET_ERROR_TYPE_DEBUG,
456 "adding element of type %u to set %p\n",
457 (unsigned int) element->element_type,
458 set);
459 if (GNUNET_YES == set->invalid)
460 {
461 if (NULL != cb)
462 cb (cb_cls);
463 return GNUNET_SYSERR;
464 }
465 mqm = GNUNET_MQ_msg_extra (msg,
466 element->size,
467 GNUNET_MESSAGE_TYPE_SETI_ADD);
468 msg->element_type = htons (element->element_type);
469 GNUNET_memcpy (&msg[1],
470 element->data,
471 element->size);
472 GNUNET_MQ_notify_sent (mqm,
473 cb,
474 cb_cls);
475 GNUNET_MQ_send (set->mq,
476 mqm);
477 return GNUNET_OK;
478}
479
480
481/**
482 * Destroy the set handle if no operations are left, mark the set
483 * for destruction otherwise.
484 *
485 * @param set set handle to destroy
486 */
487void
488GNUNET_SETI_destroy (struct GNUNET_SETI_Handle *set)
489{
490 /* destroying set while iterator is active is currently
491 not supported; we should expand the API to allow
492 clients to explicitly cancel the iteration! */
493 if ((NULL != set->ops_head) ||
494 (GNUNET_SYSERR == set->destroy_requested))
495 {
496 LOG (GNUNET_ERROR_TYPE_DEBUG,
497 "Set operations are pending, delaying set destruction\n");
498 set->destroy_requested = GNUNET_YES;
499 return;
500 }
501 LOG (GNUNET_ERROR_TYPE_DEBUG,
502 "Really destroying set\n");
503 if (NULL != set->mq)
504 {
505 GNUNET_MQ_destroy (set->mq);
506 set->mq = NULL;
507 }
508 GNUNET_free (set);
509}
510
511
512struct GNUNET_SETI_OperationHandle *
513GNUNET_SETI_prepare (const struct GNUNET_PeerIdentity *other_peer,
514 const struct GNUNET_HashCode *app_id,
515 const struct GNUNET_MessageHeader *context_msg,
516 const struct GNUNET_SETI_Option options[],
517 GNUNET_SETI_ResultIterator result_cb,
518 void *result_cls)
519{
520 struct GNUNET_MQ_Envelope *mqm;
521 struct GNUNET_SETI_OperationHandle *oh;
522 struct GNUNET_SETI_EvaluateMessage *msg;
523
524 oh = GNUNET_new (struct GNUNET_SETI_OperationHandle);
525 oh->result_cb = result_cb;
526 oh->result_cls = result_cls;
527 mqm = GNUNET_MQ_msg_nested_mh (msg,
528 GNUNET_MESSAGE_TYPE_SETI_EVALUATE,
529 context_msg);
530 msg->app_id = *app_id;
531 msg->target_peer = *other_peer;
532 for (const struct GNUNET_SETI_Option *opt = options;
533 GNUNET_SETI_OPTION_END != opt->type;
534 opt++)
535 {
536 switch (opt->type)
537 {
538 case GNUNET_SETI_OPTION_RETURN_INTERSECTION:
539 msg->return_intersection = htonl (GNUNET_YES);
540 break;
541 default:
542 LOG (GNUNET_ERROR_TYPE_ERROR,
543 "Option with type %d not recognized\n",
544 (int) opt->type);
545 }
546 }
547 oh->conclude_mqm = mqm;
548 oh->request_id_addr = &msg->request_id;
549 return oh;
550}
551
552
553/**
554 * Connect to the set service in order to listen for requests.
555 *
556 * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect
557 */
558static void
559listen_connect (void *cls);
560
561
562/**
563 * Check validity of request message for a listen operation
564 *
565 * @param cls the listen handle
566 * @param msg the message
567 * @return #GNUNET_OK if the message is well-formed
568 */
569static int
570check_request (void *cls,
571 const struct GNUNET_SETI_RequestMessage *msg)
572{
573 const struct GNUNET_MessageHeader *context_msg;
574
575 if (ntohs (msg->header.size) == sizeof(*msg))
576 return GNUNET_OK; /* no context message is OK */
577 context_msg = GNUNET_MQ_extract_nested_mh (msg);
578 if (NULL == context_msg)
579 {
580 /* malformed context message is NOT ok */
581 GNUNET_break_op (0);
582 return GNUNET_SYSERR;
583 }
584 return GNUNET_OK;
585}
586
587
588/**
589 * Handle request message for a listen operation
590 *
591 * @param cls the listen handle
592 * @param msg the message
593 */
594static void
595handle_request (void *cls,
596 const struct GNUNET_SETI_RequestMessage *msg)
597{
598 struct GNUNET_SETI_ListenHandle *lh = cls;
599 struct GNUNET_SETI_Request req;
600 const struct GNUNET_MessageHeader *context_msg;
601 struct GNUNET_MQ_Envelope *mqm;
602 struct GNUNET_SETI_RejectMessage *rmsg;
603
604 LOG (GNUNET_ERROR_TYPE_DEBUG,
605 "Processing incoming operation request with id %u\n",
606 ntohl (msg->accept_id));
607 /* we got another valid request => reset the backoff */
608 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
609 req.accept_id = ntohl (msg->accept_id);
610 req.accepted = GNUNET_NO;
611 context_msg = GNUNET_MQ_extract_nested_mh (msg);
612 /* calling #GNUNET_SETI_accept() in the listen cb will set req->accepted */
613 lh->listen_cb (lh->listen_cls,
614 &msg->peer_id,
615 context_msg,
616 &req);
617 if (GNUNET_YES == req.accepted)
618 return; /* the accept-case is handled in #GNUNET_SETI_accept() */
619 LOG (GNUNET_ERROR_TYPE_DEBUG,
620 "Rejected request %u\n",
621 ntohl (msg->accept_id));
622 mqm = GNUNET_MQ_msg (rmsg,
623 GNUNET_MESSAGE_TYPE_SETI_REJECT);
624 rmsg->accept_reject_id = msg->accept_id;
625 GNUNET_MQ_send (lh->mq,
626 mqm);
627}
628
629
630/**
631 * Our connection with the set service encountered an error,
632 * re-initialize with exponential back-off.
633 *
634 * @param cls the `struct GNUNET_SETI_ListenHandle *`
635 * @param error reason for the disconnect
636 */
637static void
638handle_client_listener_error (void *cls,
639 enum GNUNET_MQ_Error error)
640{
641 struct GNUNET_SETI_ListenHandle *lh = cls;
642
643 LOG (GNUNET_ERROR_TYPE_DEBUG,
644 "Listener broke down (%d), re-connecting\n",
645 (int) error);
646 GNUNET_MQ_destroy (lh->mq);
647 lh->mq = NULL;
648 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
649 &listen_connect,
650 lh);
651 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
652}
653
654
655/**
656 * Connect to the set service in order to listen for requests.
657 *
658 * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect
659 */
660static void
661listen_connect (void *cls)
662{
663 struct GNUNET_SETI_ListenHandle *lh = cls;
664 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
665 GNUNET_MQ_hd_var_size (request,
666 GNUNET_MESSAGE_TYPE_SETI_REQUEST,
667 struct GNUNET_SETI_RequestMessage,
668 lh),
669 GNUNET_MQ_handler_end ()
670 };
671 struct GNUNET_MQ_Envelope *mqm;
672 struct GNUNET_SETI_ListenMessage *msg;
673
674 lh->reconnect_task = NULL;
675 GNUNET_assert (NULL == lh->mq);
676 lh->mq = GNUNET_CLIENT_connect (lh->cfg,
677 "seti",
678 mq_handlers,
679 &handle_client_listener_error,
680 lh);
681 if (NULL == lh->mq)
682 return;
683 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETI_LISTEN);
684 msg->app_id = lh->app_id;
685 GNUNET_MQ_send (lh->mq,
686 mqm);
687}
688
689
690/**
691 * Wait for set operation requests for the given application id
692 *
693 * @param cfg configuration to use for connecting to
694 * the set service, needs to be valid for the lifetime of the listen handle
695 * @param app_id id of the application that handles set operation requests
696 * @param listen_cb called for each incoming request matching the operation
697 * and application id
698 * @param listen_cls handle for @a listen_cb
699 * @return a handle that can be used to cancel the listen operation
700 */
701struct GNUNET_SETI_ListenHandle *
702GNUNET_SETI_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
703 const struct GNUNET_HashCode *app_id,
704 GNUNET_SETI_ListenCallback listen_cb,
705 void *listen_cls)
706{
707 struct GNUNET_SETI_ListenHandle *lh;
708
709 LOG (GNUNET_ERROR_TYPE_DEBUG,
710 "Starting listener for app %s\n",
711 GNUNET_h2s (app_id));
712 lh = GNUNET_new (struct GNUNET_SETI_ListenHandle);
713 lh->listen_cb = listen_cb;
714 lh->listen_cls = listen_cls;
715 lh->cfg = cfg;
716 lh->app_id = *app_id;
717 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
718 listen_connect (lh);
719 if (NULL == lh->mq)
720 {
721 GNUNET_free (lh);
722 return NULL;
723 }
724 return lh;
725}
726
727
728/**
729 * Cancel the given listen operation.
730 *
731 * @param lh handle for the listen operation
732 */
733void
734GNUNET_SETI_listen_cancel (struct GNUNET_SETI_ListenHandle *lh)
735{
736 LOG (GNUNET_ERROR_TYPE_DEBUG,
737 "Canceling listener %s\n",
738 GNUNET_h2s (&lh->app_id));
739 if (NULL != lh->mq)
740 {
741 GNUNET_MQ_destroy (lh->mq);
742 lh->mq = NULL;
743 }
744 if (NULL != lh->reconnect_task)
745 {
746 GNUNET_SCHEDULER_cancel (lh->reconnect_task);
747 lh->reconnect_task = NULL;
748 }
749 GNUNET_free (lh);
750}
751
752
753struct GNUNET_SETI_OperationHandle *
754GNUNET_SETI_accept (struct GNUNET_SETI_Request *request,
755 const struct GNUNET_SETI_Option options[],
756 GNUNET_SETI_ResultIterator result_cb,
757 void *result_cls)
758{
759 struct GNUNET_MQ_Envelope *mqm;
760 struct GNUNET_SETI_OperationHandle *oh;
761 struct GNUNET_SETI_AcceptMessage *msg;
762
763 GNUNET_assert (GNUNET_NO == request->accepted);
764 LOG (GNUNET_ERROR_TYPE_DEBUG,
765 "Client accepts set intersection operation with id %u\n",
766 request->accept_id);
767 request->accepted = GNUNET_YES;
768 mqm = GNUNET_MQ_msg (msg,
769 GNUNET_MESSAGE_TYPE_SETI_ACCEPT);
770 msg->accept_reject_id = htonl (request->accept_id);
771 oh = GNUNET_new (struct GNUNET_SETI_OperationHandle);
772 oh->result_cb = result_cb;
773 oh->result_cls = result_cls;
774 oh->conclude_mqm = mqm;
775 oh->request_id_addr = &msg->request_id;
776 for (const struct GNUNET_SETI_Option *opt = options;
777 GNUNET_SETI_OPTION_END != opt->type;
778 opt++)
779 {
780 switch (opt->type)
781 {
782 case GNUNET_SETI_OPTION_RETURN_INTERSECTION:
783 oh->return_intersection = GNUNET_YES;
784 msg->return_intersection = htonl (GNUNET_YES);
785 break;
786 default:
787 LOG (GNUNET_ERROR_TYPE_ERROR,
788 "Option with type %d not recognized\n",
789 (int) opt->type);
790 }
791 }
792 return oh;
793}
794
795
796/**
797 * Commit a set to be used with a set operation.
798 * This function is called once we have fully constructed
799 * the set that we want to use for the operation. At this
800 * time, the P2P protocol can then begin to exchange the
801 * set information and call the result callback with the
802 * result information.
803 *
804 * @param oh handle to the set operation
805 * @param set the set to use for the operation
806 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
807 * set is invalid (e.g. the set service crashed)
808 */
809int
810GNUNET_SETI_commit (struct GNUNET_SETI_OperationHandle *oh,
811 struct GNUNET_SETI_Handle *set)
812{
813 if (NULL != oh->set)
814 {
815 /* Some other set was already committed for this
816 * operation, there is a logic bug in the client of this API */
817 GNUNET_break (0);
818 return GNUNET_OK;
819 }
820 GNUNET_assert (NULL != set);
821 if (GNUNET_YES == set->invalid)
822 return GNUNET_SYSERR;
823 LOG (GNUNET_ERROR_TYPE_DEBUG,
824 "Client commits to SET\n");
825 GNUNET_assert (NULL != oh->conclude_mqm);
826 oh->set = set;
827 GNUNET_CONTAINER_DLL_insert (set->ops_head,
828 set->ops_tail,
829 oh);
830 oh->request_id = GNUNET_MQ_assoc_add (set->mq,
831 oh);
832 *oh->request_id_addr = htonl (oh->request_id);
833 GNUNET_MQ_send (set->mq,
834 oh->conclude_mqm);
835 oh->conclude_mqm = NULL;
836 oh->request_id_addr = NULL;
837 return GNUNET_OK;
838}
839
840
841/**
842 * Hash a set element.
843 *
844 * @param element the element that should be hashed
845 * @param[out] ret_hash a pointer to where the hash of @a element
846 * should be stored
847 */
848void
849GNUNET_SETI_element_hash (const struct GNUNET_SETI_Element *element,
850 struct GNUNET_HashCode *ret_hash)
851{
852 struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
853
854 /* It's not guaranteed that the element data is always after the element header,
855 so we need to hash the chunks separately. */
856 GNUNET_CRYPTO_hash_context_read (ctx,
857 &element->size,
858 sizeof(uint16_t));
859 GNUNET_CRYPTO_hash_context_read (ctx,
860 &element->element_type,
861 sizeof(uint16_t));
862 GNUNET_CRYPTO_hash_context_read (ctx,
863 element->data,
864 element->size);
865 GNUNET_CRYPTO_hash_context_finish (ctx,
866 ret_hash);
867}
868
869
870/* end of seti_api.c */
diff --git a/src/service/seti/test_seti.conf b/src/service/seti/test_seti.conf
new file mode 100644
index 000000000..506b3d169
--- /dev/null
+++ b/src/service/seti/test_seti.conf
@@ -0,0 +1,32 @@
1@INLINE@ ../../../contrib/conf/gnunet/no_forcestart.conf
2
3[PATHS]
4GNUNET_TEST_HOME = $GNUNET_TMP/test-gnunet-set/
5
6[seti]
7START_ON_DEMAND = YES
8#PREFIX = valgrind --leak-check=full
9#PREFIX = gdbserver :1234
10OPTIONS = -L INFO
11
12[transport]
13PLUGINS = unix
14OPTIONS = -LERROR
15
16[nat]
17RETURN_LOCAL_ADDRESSES = YES
18DISABLEV6 = YES
19USE_LOCALADDR = YES
20
21[peerinfo]
22NO_IO = YES
23
24[nat]
25# Use addresses from the local network interfaces (including loopback, but also others)
26USE_LOCALADDR = YES
27
28# Disable IPv6 support
29DISABLEV6 = NO
30
31# Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8)
32RETURN_LOCAL_ADDRESSES = YES
diff --git a/src/service/seti/test_seti_api.c b/src/service/seti/test_seti_api.c
new file mode 100644
index 000000000..9074fab41
--- /dev/null
+++ b/src/service/seti/test_seti_api.c
@@ -0,0 +1,345 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/test_seti_api.c
23 * @brief testcase for full result mode of the intersection set operation
24 * @author Christian Fuchs
25 * @author Christian Grothoff
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_testing_lib.h"
30#include "gnunet_seti_service.h"
31
32
33static int ret;
34
35static struct GNUNET_PeerIdentity local_id;
36
37static struct GNUNET_HashCode app_id;
38
39static struct GNUNET_SETI_Handle *set1;
40
41static struct GNUNET_SETI_Handle *set2;
42
43static struct GNUNET_SETI_ListenHandle *listen_handle;
44
45static const struct GNUNET_CONFIGURATION_Handle *config;
46
47static struct GNUNET_SCHEDULER_Task *tt;
48
49static struct GNUNET_SETI_OperationHandle *oh1;
50
51static struct GNUNET_SETI_OperationHandle *oh2;
52
53
54static void
55result_cb_set1 (void *cls,
56 const struct GNUNET_SETI_Element *element,
57 uint64_t current_size,
58 enum GNUNET_SETI_Status status)
59{
60 static int count;
61
62 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
63 "Processing result set 1 (%d)\n",
64 status);
65 switch (status)
66 {
67 case GNUNET_SETI_STATUS_ADD_LOCAL:
68 count++;
69 break;
70 case GNUNET_SETI_STATUS_FAILURE:
71 oh1 = NULL;
72 ret = 1;
73 break;
74 case GNUNET_SETI_STATUS_DONE:
75 oh1 = NULL;
76 GNUNET_assert (1 == count);
77 GNUNET_SETI_destroy (set1);
78 set1 = NULL;
79 if (NULL == set2)
80 GNUNET_SCHEDULER_shutdown ();
81 break;
82
83 default:
84 GNUNET_assert (0);
85 }
86}
87
88
89static void
90result_cb_set2 (void *cls,
91 const struct GNUNET_SETI_Element *element,
92 uint64_t current_size,
93 enum GNUNET_SETI_Status status)
94{
95 static int count;
96
97 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
98 "Processing result set 2 (%d)\n",
99 status);
100 switch (status)
101 {
102 case GNUNET_SETI_STATUS_ADD_LOCAL:
103 count++;
104 break;
105 case GNUNET_SETI_STATUS_FAILURE:
106 oh2 = NULL;
107 ret = 1;
108 break;
109 case GNUNET_SETI_STATUS_DONE:
110 oh2 = NULL;
111 GNUNET_break (1 == count);
112 if (1 != count)
113 ret |= 2;
114 GNUNET_SETI_destroy (set2);
115 set2 = NULL;
116 if (NULL == set1)
117 GNUNET_SCHEDULER_shutdown ();
118 break;
119 case GNUNET_SETI_STATUS_DEL_LOCAL:
120 /* unexpected! */
121 ret = 1;
122 break;
123 }
124}
125
126
127static void
128listen_cb (void *cls,
129 const struct GNUNET_PeerIdentity *other_peer,
130 const struct GNUNET_MessageHeader *context_msg,
131 struct GNUNET_SETI_Request *request)
132{
133 struct GNUNET_SETI_Option opts[] = {
134 { .type = GNUNET_SETI_OPTION_RETURN_INTERSECTION },
135 { .type = GNUNET_SETI_OPTION_END }
136 };
137
138 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
139 "starting intersection by accepting and committing\n");
140 GNUNET_assert (NULL != context_msg);
141 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
142 oh2 = GNUNET_SETI_accept (request,
143 opts,
144 &result_cb_set2,
145 NULL);
146 GNUNET_SETI_commit (oh2,
147 set2);
148}
149
150
151/**
152 * Start the set operation.
153 *
154 * @param cls closure, unused
155 */
156static void
157start (void *cls)
158{
159 struct GNUNET_MessageHeader context_msg;
160 struct GNUNET_SETI_Option opts[] = {
161 { .type = GNUNET_SETI_OPTION_RETURN_INTERSECTION },
162 { .type = GNUNET_SETI_OPTION_END }
163 };
164
165 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
166 "starting listener\n");
167 context_msg.size = htons (sizeof (context_msg));
168 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
169 listen_handle = GNUNET_SETI_listen (config,
170 &app_id,
171 &listen_cb,
172 NULL);
173 oh1 = GNUNET_SETI_prepare (&local_id,
174 &app_id,
175 &context_msg,
176 opts,
177 &result_cb_set1,
178 NULL);
179 GNUNET_SETI_commit (oh1,
180 set1);
181}
182
183
184/**
185 * Initialize the second set, continue
186 *
187 * @param cls closure, unused
188 */
189static void
190init_set2 (void *cls)
191{
192 struct GNUNET_SETI_Element element;
193
194 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
195 "initializing set 2\n");
196 element.element_type = 0;
197 element.data = "hello";
198 element.size = strlen (element.data);
199 GNUNET_SETI_add_element (set2,
200 &element,
201 NULL,
202 NULL);
203 element.data = "quux";
204 element.size = strlen (element.data);
205 GNUNET_SETI_add_element (set2,
206 &element,
207 NULL,
208 NULL);
209 element.data = "baz";
210 element.size = strlen (element.data);
211 GNUNET_SETI_add_element (set2,
212 &element,
213 &start,
214 NULL);
215}
216
217
218/**
219 * Initialize the first set, continue.
220 */
221static void
222init_set1 (void)
223{
224 struct GNUNET_SETI_Element element;
225
226 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
227 "initializing set 1\n");
228 element.element_type = 0;
229 element.data = "hello";
230 element.size = strlen (element.data);
231 GNUNET_SETI_add_element (set1,
232 &element,
233 NULL,
234 NULL);
235 element.data = "bar";
236 element.size = strlen (element.data);
237 GNUNET_SETI_add_element (set1,
238 &element,
239 &init_set2,
240 NULL);
241}
242
243
244/**
245 * Function run on shutdown.
246 *
247 * @param cls closure
248 */
249static void
250do_shutdown (void *cls)
251{
252 if (NULL != tt)
253 {
254 GNUNET_SCHEDULER_cancel (tt);
255 tt = NULL;
256 }
257 if (NULL != oh1)
258 {
259 GNUNET_SETI_operation_cancel (oh1);
260 oh1 = NULL;
261 }
262 if (NULL != oh2)
263 {
264 GNUNET_SETI_operation_cancel (oh2);
265 oh2 = NULL;
266 }
267 if (NULL != set1)
268 {
269 GNUNET_SETI_destroy (set1);
270 set1 = NULL;
271 }
272 if (NULL != set2)
273 {
274 GNUNET_SETI_destroy (set2);
275 set2 = NULL;
276 }
277 if (NULL != listen_handle)
278 {
279 GNUNET_SETI_listen_cancel (listen_handle);
280 listen_handle = NULL;
281 }
282}
283
284
285/**
286 * Function run on timeout.
287 *
288 * @param cls closure
289 */
290static void
291timeout_fail (void *cls)
292{
293 tt = NULL;
294 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
295 "Testcase failed with timeout\n");
296 GNUNET_SCHEDULER_shutdown ();
297 ret = 1;
298}
299
300
301/**
302 * Signature of the 'main' function for a (single-peer) testcase that
303 * is run using 'GNUNET_TESTING_peer_run'.
304 *
305 * @param cls closure
306 * @param cfg configuration of the peer that was started
307 * @param peer identity of the peer that was created
308 */
309static void
310run (void *cls,
311 const struct GNUNET_CONFIGURATION_Handle *cfg,
312 struct GNUNET_TESTING_Peer *peer)
313{
314 config = cfg;
315 GNUNET_TESTING_peer_get_identity (peer,
316 &local_id);
317 tt = GNUNET_SCHEDULER_add_delayed (
318 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
319 5),
320 &timeout_fail,
321 NULL);
322 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
323 NULL);
324
325 set1 = GNUNET_SETI_create (cfg);
326 set2 = GNUNET_SETI_create (cfg);
327 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK,
328 &app_id);
329
330 /* test the real set reconciliation */
331 init_set1 ();
332}
333
334
335int
336main (int argc,
337 char **argv)
338{
339 if (0 != GNUNET_TESTING_peer_run ("test_seti_api",
340 "test_seti.conf",
341 &run,
342 NULL))
343 return 1;
344 return ret;
345}