aboutsummaryrefslogtreecommitdiff
path: root/src/service/seti/gnunet-service-seti.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/seti/gnunet-service-seti.c')
-rw-r--r--src/service/seti/gnunet-service-seti.c2516
1 files changed, 2516 insertions, 0 deletions
diff --git a/src/service/seti/gnunet-service-seti.c b/src/service/seti/gnunet-service-seti.c
new file mode 100644
index 000000000..6db24a5b6
--- /dev/null
+++ b/src/service/seti/gnunet-service-seti.c
@@ -0,0 +1,2516 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-seti.c
22 * @brief two-peer set intersection operations
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet-service-seti_protocol.h"
28#include "gnunet_statistics_service.h"
29#include "gnunet_cadet_service.h"
30#include "gnunet_seti_service.h"
31#include "gnunet_block_lib.h"
32#include "seti.h"
33
34/**
35 * How long do we hold on to an incoming channel if there is
36 * no local listener before giving up?
37 */
38#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
39
40
41/**
42 * Current phase we are in for a intersection operation.
43 */
44enum IntersectionOperationPhase
45{
46 /**
47 * We are just starting.
48 */
49 PHASE_INITIAL,
50
51 /**
52 * We have send the number of our elements to the other
53 * peer, but did not setup our element set yet.
54 */
55 PHASE_COUNT_SENT,
56
57 /**
58 * We have initialized our set and are now reducing it by exchanging
59 * Bloom filters until one party notices the their element hashes
60 * are equal.
61 */
62 PHASE_BF_EXCHANGE,
63
64 /**
65 * We must next send the P2P DONE message (after finishing mostly
66 * with the local client). Then we will wait for the channel to close.
67 */
68 PHASE_MUST_SEND_DONE,
69
70 /**
71 * We have received the P2P DONE message, and must finish with the
72 * local client before terminating the channel.
73 */
74 PHASE_DONE_RECEIVED,
75
76 /**
77 * The protocol is over. Results may still have to be sent to the
78 * client.
79 */
80 PHASE_FINISHED
81};
82
83
84/**
85 * A set that supports a specific operation with other peers.
86 */
87struct Set;
88
89/**
90 * Information about an element element in the set. All elements are
91 * stored in a hash-table from their hash-code to their 'struct
92 * Element', so that the remove and add operations are reasonably
93 * fast.
94 */
95struct ElementEntry;
96
97/**
98 * Operation context used to execute a set operation.
99 */
100struct Operation;
101
102
103/**
104 * Information about an element element in the set. All elements are
105 * stored in a hash-table from their hash-code to their `struct
106 * Element`, so that the remove and add operations are reasonably
107 * fast.
108 */
109struct ElementEntry
110{
111 /**
112 * The actual element. The data for the element
113 * should be allocated at the end of this struct.
114 */
115 struct GNUNET_SETI_Element element;
116
117 /**
118 * Hash of the element. For set union: Will be used to derive the
119 * different IBF keys for different salts.
120 */
121 struct GNUNET_HashCode element_hash;
122
123 /**
124 * Generation in which the element was added.
125 */
126 unsigned int generation_added;
127
128 /**
129 * #GNUNET_YES if the element is a remote element, and does not belong
130 * to the operation's set.
131 */
132 int remote;
133};
134
135
136/**
137 * A listener is inhabited by a client, and waits for evaluation
138 * requests from remote peers.
139 */
140struct Listener;
141
142
143/**
144 * State we keep per client.
145 */
146struct ClientState
147{
148 /**
149 * Set, if associated with the client, otherwise NULL.
150 */
151 struct Set *set;
152
153 /**
154 * Listener, if associated with the client, otherwise NULL.
155 */
156 struct Listener *listener;
157
158 /**
159 * Client handle.
160 */
161 struct GNUNET_SERVICE_Client *client;
162
163 /**
164 * Message queue.
165 */
166 struct GNUNET_MQ_Handle *mq;
167};
168
169
170/**
171 * Operation context used to execute a set operation.
172 */
173struct Operation
174{
175 /**
176 * The identity of the requesting peer. Needs to
177 * be stored here as the op spec might not have been created yet.
178 */
179 struct GNUNET_PeerIdentity peer;
180
181 /**
182 * XOR of the keys of all of the elements (remaining) in my set.
183 * Always updated when elements are added or removed to
184 * @e my_elements.
185 */
186 struct GNUNET_HashCode my_xor;
187
188 /**
189 * XOR of the keys of all of the elements (remaining) in
190 * the other peer's set. Updated when we receive the
191 * other peer's Bloom filter.
192 */
193 struct GNUNET_HashCode other_xor;
194
195 /**
196 * Kept in a DLL of the listener, if @e listener is non-NULL.
197 */
198 struct Operation *next;
199
200 /**
201 * Kept in a DLL of the listener, if @e listener is non-NULL.
202 */
203 struct Operation *prev;
204
205 /**
206 * Channel to the peer.
207 */
208 struct GNUNET_CADET_Channel *channel;
209
210 /**
211 * Port this operation runs on.
212 */
213 struct Listener *listener;
214
215 /**
216 * Message queue for the channel.
217 */
218 struct GNUNET_MQ_Handle *mq;
219
220 /**
221 * Context message, may be NULL.
222 */
223 struct GNUNET_MessageHeader *context_msg;
224
225 /**
226 * Set associated with the operation, NULL until the spec has been
227 * associated with a set.
228 */
229 struct Set *set;
230
231 /**
232 * The bf we currently receive
233 */
234 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
235
236 /**
237 * BF of the set's element.
238 */
239 struct GNUNET_CONTAINER_BloomFilter *local_bf;
240
241 /**
242 * Remaining elements in the intersection operation.
243 * Maps element-id-hashes to 'elements in our set'.
244 */
245 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
246
247 /**
248 * Iterator for sending the final set of @e my_elements to the client.
249 */
250 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
251
252 /**
253 * For multipart BF transmissions, we have to store the
254 * bloomfilter-data until we fully received it.
255 */
256 char *bf_data;
257
258 /**
259 * Timeout task, if the incoming peer has not been accepted
260 * after the timeout, it will be disconnected.
261 */
262 struct GNUNET_SCHEDULER_Task *timeout_task;
263
264 /**
265 * How many bytes of @e bf_data are valid?
266 */
267 uint32_t bf_data_offset;
268
269 /**
270 * Current element count contained within @e my_elements.
271 * (May differ briefly during initialization.)
272 */
273 uint32_t my_element_count;
274
275 /**
276 * size of the bloomfilter in @e bf_data.
277 */
278 uint32_t bf_data_size;
279
280 /**
281 * size of the bloomfilter
282 */
283 uint32_t bf_bits_per_element;
284
285 /**
286 * Salt currently used for BF construction (by us or the other peer,
287 * depending on where we are in the code).
288 */
289 uint32_t salt;
290
291 /**
292 * Current state of the operation.
293 */
294 enum IntersectionOperationPhase phase;
295
296 /**
297 * Generation in which the operation handle was created.
298 */
299 unsigned int generation_created;
300
301 /**
302 * Did we send the client that we are done?
303 */
304 int client_done_sent;
305
306 /**
307 * Set whenever we reach the state where the death of the
308 * channel is perfectly find and should NOT result in the
309 * operation being cancelled.
310 */
311 int channel_death_expected;
312
313 /**
314 * Remote peers element count
315 */
316 uint32_t remote_element_count;
317
318 /**
319 * ID used to identify an operation between service and client
320 */
321 uint32_t client_request_id;
322
323 /**
324 * When are elements sent to the client, and which elements are sent?
325 */
326 int return_intersection;
327
328 /**
329 * Unique request id for the request from a remote peer, sent to the
330 * client, which will accept or reject the request. Set to '0' iff
331 * the request has not been suggested yet.
332 */
333 uint32_t suggest_id;
334
335};
336
337
338/**
339 * SetContent stores the actual set elements, which may be shared by
340 * multiple generations derived from one set.
341 */
342struct SetContent
343{
344 /**
345 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
346 */
347 struct GNUNET_CONTAINER_MultiHashMap *elements;
348
349 /**
350 * Number of references to the content.
351 */
352 unsigned int refcount;
353
354 /**
355 * FIXME: document!
356 */
357 unsigned int latest_generation;
358
359 /**
360 * Number of concurrently active iterators.
361 */
362 int iterator_count;
363};
364
365
366/**
367 * A set that supports a specific operation with other peers.
368 */
369struct Set
370{
371 /**
372 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
373 */
374 struct Set *next;
375
376 /**
377 * Sets are held in a doubly linked list.
378 */
379 struct Set *prev;
380
381 /**
382 * Client that owns the set. Only one client may own a set,
383 * and there can only be one set per client.
384 */
385 struct ClientState *cs;
386
387 /**
388 * Content, possibly shared by multiple sets,
389 * and thus reference counted.
390 */
391 struct SetContent *content;
392
393 /**
394 * Number of currently valid elements in the set which have not been
395 * removed.
396 */
397 uint32_t current_set_element_count;
398
399 /**
400 * Evaluate operations are held in a linked list.
401 */
402 struct Operation *ops_head;
403
404 /**
405 * Evaluate operations are held in a linked list.
406 */
407 struct Operation *ops_tail;
408
409 /**
410 * Current generation, that is, number of previously executed
411 * operations and lazy copies on the underlying set content.
412 */
413 unsigned int current_generation;
414
415};
416
417
418/**
419 * A listener is inhabited by a client, and waits for evaluation
420 * requests from remote peers.
421 */
422struct Listener
423{
424 /**
425 * Listeners are held in a doubly linked list.
426 */
427 struct Listener *next;
428
429 /**
430 * Listeners are held in a doubly linked list.
431 */
432 struct Listener *prev;
433
434 /**
435 * Head of DLL of operations this listener is responsible for.
436 * Once the client has accepted/declined the operation, the
437 * operation is moved to the respective set's operation DLLS.
438 */
439 struct Operation *op_head;
440
441 /**
442 * Tail of DLL of operations this listener is responsible for.
443 * Once the client has accepted/declined the operation, the
444 * operation is moved to the respective set's operation DLLS.
445 */
446 struct Operation *op_tail;
447
448 /**
449 * Client that owns the listener.
450 * Only one client may own a listener.
451 */
452 struct ClientState *cs;
453
454 /**
455 * The port we are listening on with CADET.
456 */
457 struct GNUNET_CADET_Port *open_port;
458
459 /**
460 * Application ID for the operation, used to distinguish
461 * multiple operations of the same type with the same peer.
462 */
463 struct GNUNET_HashCode app_id;
464
465};
466
467
468/**
469 * Handle to the cadet service, used to listen for and connect to
470 * remote peers.
471 */
472static struct GNUNET_CADET_Handle *cadet;
473
474/**
475 * Statistics handle.
476 */
477static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
478
479/**
480 * Listeners are held in a doubly linked list.
481 */
482static struct Listener *listener_head;
483
484/**
485 * Listeners are held in a doubly linked list.
486 */
487static struct Listener *listener_tail;
488
489/**
490 * Number of active clients.
491 */
492static unsigned int num_clients;
493
494/**
495 * Are we in shutdown? if #GNUNET_YES and the number of clients
496 * drops to zero, disconnect from CADET.
497 */
498static int in_shutdown;
499
500/**
501 * Counter for allocating unique IDs for clients, used to identify
502 * incoming operation requests from remote peers, that the client can
503 * choose to accept or refuse. 0 must not be used (reserved for
504 * uninitialized).
505 */
506static uint32_t suggest_id;
507
508
509/**
510 * If applicable in the current operation mode, send a result message
511 * to the client indicating we removed an element.
512 *
513 * @param op intersection operation
514 * @param element element to send
515 */
516static void
517send_client_removed_element (struct Operation *op,
518 struct GNUNET_SETI_Element *element)
519{
520 struct GNUNET_MQ_Envelope *ev;
521 struct GNUNET_SETI_ResultMessage *rm;
522
523 if (GNUNET_YES == op->return_intersection)
524 {
525 GNUNET_break (0);
526 return; /* Wrong mode for transmitting removed elements */
527 }
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529 "Sending removed element (size %u) to client\n",
530 element->size);
531 GNUNET_STATISTICS_update (_GSS_statistics,
532 "# Element removed messages sent",
533 1,
534 GNUNET_NO);
535 GNUNET_assert (0 != op->client_request_id);
536 ev = GNUNET_MQ_msg_extra (rm,
537 element->size,
538 GNUNET_MESSAGE_TYPE_SETI_RESULT);
539 if (NULL == ev)
540 {
541 GNUNET_break (0);
542 return;
543 }
544 rm->result_status = htons (GNUNET_SETI_STATUS_DEL_LOCAL);
545 rm->request_id = htonl (op->client_request_id);
546 rm->element_type = element->element_type;
547 GNUNET_memcpy (&rm[1],
548 element->data,
549 element->size);
550 GNUNET_MQ_send (op->set->cs->mq,
551 ev);
552}
553
554
555/**
556 * Is element @a ee part of the set used by @a op?
557 *
558 * @param ee element to test
559 * @param op operation the defines the set and its generation
560 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
561 */
562static int
563_GSS_is_element_of_operation (struct ElementEntry *ee,
564 struct Operation *op)
565{
566 return op->generation_created >= ee->generation_added;
567}
568
569
570/**
571 * Fills the "my_elements" hashmap with all relevant elements.
572 *
573 * @param cls the `struct Operation *` we are performing
574 * @param key current key code
575 * @param value the `struct ElementEntry *` from the hash map
576 * @return #GNUNET_YES (we should continue to iterate)
577 */
578static int
579filtered_map_initialization (void *cls,
580 const struct GNUNET_HashCode *key,
581 void *value)
582{
583 struct Operation *op = cls;
584 struct ElementEntry *ee = value;
585 struct GNUNET_HashCode mutated_hash;
586
587 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588 "FIMA called for %s:%u\n",
589 GNUNET_h2s (&ee->element_hash),
590 ee->element.size);
591
592 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
593 {
594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
596 GNUNET_h2s (&ee->element_hash),
597 ee->element.size);
598 return GNUNET_YES; /* element not valid in our operation's generation */
599 }
600
601 /* Test if element is in other peer's bloomfilter */
602 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
603 op->salt,
604 &mutated_hash);
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "Testing mingled hash %s with salt %u\n",
607 GNUNET_h2s (&mutated_hash),
608 op->salt);
609 if (GNUNET_NO ==
610 GNUNET_CONTAINER_bloomfilter_test (op->remote_bf,
611 &mutated_hash))
612 {
613 /* remove this element */
614 send_client_removed_element (op,
615 &ee->element);
616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617 "Reduced initialization, not starting with %s:%u\n",
618 GNUNET_h2s (&ee->element_hash),
619 ee->element.size);
620 return GNUNET_YES;
621 }
622 op->my_element_count++;
623 GNUNET_CRYPTO_hash_xor (&op->my_xor,
624 &ee->element_hash,
625 &op->my_xor);
626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627 "Filtered initialization of my_elements, adding %s:%u\n",
628 GNUNET_h2s (&ee->element_hash),
629 ee->element.size);
630 GNUNET_break (GNUNET_YES ==
631 GNUNET_CONTAINER_multihashmap_put (op->my_elements,
632 &ee->element_hash,
633 ee,
634 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
635
636 return GNUNET_YES;
637}
638
639
640/**
641 * Removes elements from our hashmap if they are not contained within the
642 * provided remote bloomfilter.
643 *
644 * @param cls closure with the `struct Operation *`
645 * @param key current key code
646 * @param value value in the hash map
647 * @return #GNUNET_YES (we should continue to iterate)
648 */
649static int
650iterator_bf_reduce (void *cls,
651 const struct GNUNET_HashCode *key,
652 void *value)
653{
654 struct Operation *op = cls;
655 struct ElementEntry *ee = value;
656 struct GNUNET_HashCode mutated_hash;
657
658 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
659 op->salt,
660 &mutated_hash);
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662 "Testing mingled hash %s with salt %u\n",
663 GNUNET_h2s (&mutated_hash),
664 op->salt);
665 if (GNUNET_NO ==
666 GNUNET_CONTAINER_bloomfilter_test (op->remote_bf,
667 &mutated_hash))
668 {
669 GNUNET_break (0 < op->my_element_count);
670 op->my_element_count--;
671 GNUNET_CRYPTO_hash_xor (&op->my_xor,
672 &ee->element_hash,
673 &op->my_xor);
674 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
675 "Bloom filter reduction of my_elements, removing %s:%u\n",
676 GNUNET_h2s (&ee->element_hash),
677 ee->element.size);
678 GNUNET_assert (GNUNET_YES ==
679 GNUNET_CONTAINER_multihashmap_remove (op->my_elements,
680 &ee->element_hash,
681 ee));
682 send_client_removed_element (op,
683 &ee->element);
684 }
685 else
686 {
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "Bloom filter reduction of my_elements, keeping %s:%u\n",
689 GNUNET_h2s (&ee->element_hash),
690 ee->element.size);
691 }
692 return GNUNET_YES;
693}
694
695
696/**
697 * Create initial bloomfilter based on all the elements given.
698 *
699 * @param cls the `struct Operation *`
700 * @param key current key code
701 * @param value the `struct ElementEntry` to process
702 * @return #GNUNET_YES (we should continue to iterate)
703 */
704static int
705iterator_bf_create (void *cls,
706 const struct GNUNET_HashCode *key,
707 void *value)
708{
709 struct Operation *op = cls;
710 struct ElementEntry *ee = value;
711 struct GNUNET_HashCode mutated_hash;
712
713 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
714 op->salt,
715 &mutated_hash);
716 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
717 "Initializing BF with hash %s with salt %u\n",
718 GNUNET_h2s (&mutated_hash),
719 op->salt);
720 GNUNET_CONTAINER_bloomfilter_add (op->local_bf,
721 &mutated_hash);
722 return GNUNET_YES;
723}
724
725
726/**
727 * Destroy the given operation. Used for any operation where both
728 * peers were known and that thus actually had a vt and channel. Must
729 * not be used for operations where 'listener' is still set and we do
730 * not know the other peer.
731 *
732 * Call the implementation-specific cancel function of the operation.
733 * Disconnects from the remote peer. Does not disconnect the client,
734 * as there may be multiple operations per set.
735 *
736 * @param op operation to destroy
737 */
738static void
739_GSS_operation_destroy (struct Operation *op)
740{
741 struct Set *set = op->set;
742 struct GNUNET_CADET_Channel *channel;
743
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
745 GNUNET_assert (NULL == op->listener);
746 if (NULL != op->remote_bf)
747 {
748 GNUNET_CONTAINER_bloomfilter_free (op->remote_bf);
749 op->remote_bf = NULL;
750 }
751 if (NULL != op->local_bf)
752 {
753 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
754 op->local_bf = NULL;
755 }
756 if (NULL != op->my_elements)
757 {
758 GNUNET_CONTAINER_multihashmap_destroy (op->my_elements);
759 op->my_elements = NULL;
760 }
761 if (NULL != op->full_result_iter)
762 {
763 GNUNET_CONTAINER_multihashmap_iterator_destroy (
764 op->full_result_iter);
765 op->full_result_iter = NULL;
766 }
767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
768 "Destroying intersection op state done\n");
769 if (NULL != set)
770 {
771 GNUNET_CONTAINER_DLL_remove (set->ops_head,
772 set->ops_tail,
773 op);
774 op->set = NULL;
775 }
776 if (NULL != op->context_msg)
777 {
778 GNUNET_free (op->context_msg);
779 op->context_msg = NULL;
780 }
781 if (NULL != (channel = op->channel))
782 {
783 /* This will free op; called conditionally as this helper function
784 is also called from within the channel disconnect handler. */
785 op->channel = NULL;
786 GNUNET_CADET_channel_destroy (channel);
787 }
788 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
789 * there was a channel end handler that will free 'op' on the call stack. */
790}
791
792
793/**
794 * This function probably should not exist
795 * and be replaced by inlining more specific
796 * logic in the various places where it is called.
797 */
798static void
799_GSS_operation_destroy2 (struct Operation *op);
800
801
802/**
803 * Destroy an incoming request from a remote peer
804 *
805 * @param op remote request to destroy
806 */
807static void
808incoming_destroy (struct Operation *op)
809{
810 struct Listener *listener;
811
812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813 "Destroying incoming operation %p\n",
814 op);
815 if (NULL != (listener = op->listener))
816 {
817 GNUNET_CONTAINER_DLL_remove (listener->op_head,
818 listener->op_tail,
819 op);
820 op->listener = NULL;
821 }
822 if (NULL != op->timeout_task)
823 {
824 GNUNET_SCHEDULER_cancel (op->timeout_task);
825 op->timeout_task = NULL;
826 }
827 _GSS_operation_destroy2 (op);
828}
829
830
831/**
832 * Signal to the client that the operation has finished and
833 * destroy the operation.
834 *
835 * @param cls operation to destroy
836 */
837static void
838send_client_done_and_destroy (void *cls)
839{
840 struct Operation *op = cls;
841 struct GNUNET_MQ_Envelope *ev;
842 struct GNUNET_SETI_ResultMessage *rm;
843
844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845 "Intersection succeeded, sending DONE to local client\n");
846 GNUNET_STATISTICS_update (_GSS_statistics,
847 "# Intersection operations succeeded",
848 1,
849 GNUNET_NO);
850 ev = GNUNET_MQ_msg (rm,
851 GNUNET_MESSAGE_TYPE_SETI_RESULT);
852 rm->request_id = htonl (op->client_request_id);
853 rm->result_status = htons (GNUNET_SETI_STATUS_DONE);
854 rm->element_type = htons (0);
855 GNUNET_MQ_send (op->set->cs->mq,
856 ev);
857 _GSS_operation_destroy (op);
858}
859
860
861/**
862 * This function probably should not exist
863 * and be replaced by inlining more specific
864 * logic in the various places where it is called.
865 */
866static void
867_GSS_operation_destroy2 (struct Operation *op)
868{
869 struct GNUNET_CADET_Channel *channel;
870
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "channel_end_cb called\n");
873 if (NULL != (channel = op->channel))
874 {
875 /* This will free op; called conditionally as this helper function
876 is also called from within the channel disconnect handler. */
877 op->channel = NULL;
878 GNUNET_CADET_channel_destroy (channel);
879 }
880 if (NULL != op->listener)
881 {
882 incoming_destroy (op);
883 return;
884 }
885 if (NULL != op->set)
886 {
887 if (GNUNET_YES == op->channel_death_expected)
888 {
889 /* oh goodie, we are done! */
890 send_client_done_and_destroy (op);
891 }
892 else
893 {
894 /* sorry, channel went down early, too bad. */
895 _GSS_operation_destroy (op);
896 }
897 }
898 else
899 _GSS_operation_destroy (op);
900 GNUNET_free (op);
901}
902
903
904/**
905 * Inform the client that the intersection operation has failed,
906 * and proceed to destroy the evaluate operation.
907 *
908 * @param op the intersection operation to fail
909 */
910static void
911fail_intersection_operation (struct Operation *op)
912{
913 struct GNUNET_MQ_Envelope *ev;
914 struct GNUNET_SETI_ResultMessage *msg;
915
916 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
917 "Intersection operation failed\n");
918 GNUNET_STATISTICS_update (_GSS_statistics,
919 "# Intersection operations failed",
920 1,
921 GNUNET_NO);
922 if (NULL != op->my_elements)
923 {
924 GNUNET_CONTAINER_multihashmap_destroy (op->my_elements);
925 op->my_elements = NULL;
926 }
927 ev = GNUNET_MQ_msg (msg,
928 GNUNET_MESSAGE_TYPE_SETI_RESULT);
929 msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
930 msg->request_id = htonl (op->client_request_id);
931 msg->element_type = htons (0);
932 GNUNET_MQ_send (op->set->cs->mq,
933 ev);
934 _GSS_operation_destroy (op);
935}
936
937
938/**
939 * Send a bloomfilter to our peer. After the result done message has
940 * been sent to the client, destroy the evaluate operation.
941 *
942 * @param op intersection operation
943 */
944static void
945send_bloomfilter (struct Operation *op)
946{
947 struct GNUNET_MQ_Envelope *ev;
948 struct BFMessage *msg;
949 uint32_t bf_size;
950 uint32_t bf_elementbits;
951 uint32_t chunk_size;
952 char *bf_data;
953 uint32_t offset;
954
955 /* We consider the ratio of the set sizes to determine
956 the number of bits per element, as the smaller set
957 should use more bits to maximize its set reduction
958 potential and minimize overall bandwidth consumption. */
959 bf_elementbits = 2 + ceil (log2 ((double)
960 (op->remote_element_count
961 / (double) op->my_element_count)));
962 if (bf_elementbits < 1)
963 bf_elementbits = 1; /* make sure k is not 0 */
964 /* optimize BF-size to ~50% of bits set */
965 bf_size = ceil ((double) (op->my_element_count
966 * bf_elementbits / log (2)));
967 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968 "Sending Bloom filter (%u) of size %u bytes\n",
969 (unsigned int) bf_elementbits,
970 (unsigned int) bf_size);
971 op->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
972 bf_size,
973 bf_elementbits);
974 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
975 UINT32_MAX);
976 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
977 &iterator_bf_create,
978 op);
979
980 /* send our Bloom filter */
981 GNUNET_STATISTICS_update (_GSS_statistics,
982 "# Intersection Bloom filters sent",
983 1,
984 GNUNET_NO);
985 chunk_size = 60 * 1024 - sizeof(struct BFMessage);
986 if (bf_size <= chunk_size)
987 {
988 /* singlepart */
989 chunk_size = bf_size;
990 ev = GNUNET_MQ_msg_extra (msg,
991 chunk_size,
992 GNUNET_MESSAGE_TYPE_SETI_P2P_BF);
993 GNUNET_assert (GNUNET_SYSERR !=
994 GNUNET_CONTAINER_bloomfilter_get_raw_data (
995 op->local_bf,
996 (char *) &msg[1],
997 bf_size));
998 msg->sender_element_count = htonl (op->my_element_count);
999 msg->bloomfilter_total_length = htonl (bf_size);
1000 msg->bits_per_element = htonl (bf_elementbits);
1001 msg->sender_mutator = htonl (op->salt);
1002 msg->element_xor_hash = op->my_xor;
1003 GNUNET_MQ_send (op->mq, ev);
1004 }
1005 else
1006 {
1007 /* multipart */
1008 bf_data = GNUNET_malloc (bf_size);
1009 GNUNET_assert (GNUNET_SYSERR !=
1010 GNUNET_CONTAINER_bloomfilter_get_raw_data (
1011 op->local_bf,
1012 bf_data,
1013 bf_size));
1014 offset = 0;
1015 while (offset < bf_size)
1016 {
1017 if (bf_size - chunk_size < offset)
1018 chunk_size = bf_size - offset;
1019 ev = GNUNET_MQ_msg_extra (msg,
1020 chunk_size,
1021 GNUNET_MESSAGE_TYPE_SETI_P2P_BF);
1022 GNUNET_memcpy (&msg[1],
1023 &bf_data[offset],
1024 chunk_size);
1025 offset += chunk_size;
1026 msg->sender_element_count = htonl (op->my_element_count);
1027 msg->bloomfilter_total_length = htonl (bf_size);
1028 msg->bits_per_element = htonl (bf_elementbits);
1029 msg->sender_mutator = htonl (op->salt);
1030 msg->element_xor_hash = op->my_xor;
1031 GNUNET_MQ_send (op->mq, ev);
1032 }
1033 GNUNET_free (bf_data);
1034 }
1035 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
1036 op->local_bf = NULL;
1037}
1038
1039
1040/**
1041 * Remember that we are done dealing with the local client
1042 * AND have sent the other peer our message that we are done,
1043 * so we are not just waiting for the channel to die before
1044 * telling the local client that we are done as our last act.
1045 *
1046 * @param cls the `struct Operation`.
1047 */
1048static void
1049finished_local_operations (void *cls)
1050{
1051 struct Operation *op = cls;
1052
1053 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054 "DONE sent to other peer, now waiting for other end to close the channel\n");
1055 op->phase = PHASE_FINISHED;
1056 op->channel_death_expected = GNUNET_YES;
1057}
1058
1059
1060/**
1061 * Notify the other peer that we are done. Once this message
1062 * is out, we still need to notify the local client that we
1063 * are done.
1064 *
1065 * @param op operation to notify for.
1066 */
1067static void
1068send_p2p_done (struct Operation *op)
1069{
1070 struct GNUNET_MQ_Envelope *ev;
1071 struct IntersectionDoneMessage *idm;
1072
1073 GNUNET_assert (PHASE_MUST_SEND_DONE == op->phase);
1074 GNUNET_assert (GNUNET_NO == op->channel_death_expected);
1075 ev = GNUNET_MQ_msg (idm,
1076 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE);
1077 idm->final_element_count = htonl (op->my_element_count);
1078 idm->element_xor_hash = op->my_xor;
1079 GNUNET_MQ_notify_sent (ev,
1080 &finished_local_operations,
1081 op);
1082 GNUNET_MQ_send (op->mq,
1083 ev);
1084}
1085
1086
1087/**
1088 * Send all elements in the full result iterator.
1089 *
1090 * @param cls the `struct Operation *`
1091 */
1092static void
1093send_remaining_elements (void *cls)
1094{
1095 struct Operation *op = cls;
1096 const void *nxt;
1097 const struct ElementEntry *ee;
1098 struct GNUNET_MQ_Envelope *ev;
1099 struct GNUNET_SETI_ResultMessage *rm;
1100 const struct GNUNET_SETI_Element *element;
1101 int res;
1102
1103 if (GNUNET_NO == op->return_intersection)
1104 {
1105 GNUNET_break (0);
1106 return; /* Wrong mode for transmitting removed elements */
1107 }
1108 res = GNUNET_CONTAINER_multihashmap_iterator_next (
1109 op->full_result_iter,
1110 NULL,
1111 &nxt);
1112 if (GNUNET_NO == res)
1113 {
1114 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115 "Sending done and destroy because iterator ran out\n");
1116 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1117 op->full_result_iter);
1118 op->full_result_iter = NULL;
1119 if (PHASE_DONE_RECEIVED == op->phase)
1120 {
1121 op->phase = PHASE_FINISHED;
1122 send_client_done_and_destroy (op);
1123 }
1124 else if (PHASE_MUST_SEND_DONE == op->phase)
1125 {
1126 send_p2p_done (op);
1127 }
1128 else
1129 {
1130 GNUNET_assert (0);
1131 }
1132 return;
1133 }
1134 ee = nxt;
1135 element = &ee->element;
1136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137 "Sending element %s:%u to client (full set)\n",
1138 GNUNET_h2s (&ee->element_hash),
1139 element->size);
1140 GNUNET_assert (0 != op->client_request_id);
1141 ev = GNUNET_MQ_msg_extra (rm,
1142 element->size,
1143 GNUNET_MESSAGE_TYPE_SETI_RESULT);
1144 GNUNET_assert (NULL != ev);
1145 rm->result_status = htons (GNUNET_SETI_STATUS_ADD_LOCAL);
1146 rm->request_id = htonl (op->client_request_id);
1147 rm->element_type = element->element_type;
1148 GNUNET_memcpy (&rm[1],
1149 element->data,
1150 element->size);
1151 GNUNET_MQ_notify_sent (ev,
1152 &send_remaining_elements,
1153 op);
1154 GNUNET_MQ_send (op->set->cs->mq,
1155 ev);
1156}
1157
1158
1159/**
1160 * Fills the "my_elements" hashmap with the initial set of
1161 * (non-deleted) elements from the set of the specification.
1162 *
1163 * @param cls closure with the `struct Operation *`
1164 * @param key current key code for the element
1165 * @param value value in the hash map with the `struct ElementEntry *`
1166 * @return #GNUNET_YES (we should continue to iterate)
1167 */
1168static int
1169initialize_map_unfiltered (void *cls,
1170 const struct GNUNET_HashCode *key,
1171 void *value)
1172{
1173 struct ElementEntry *ee = value;
1174 struct Operation *op = cls;
1175
1176 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1177 return GNUNET_YES; /* element not live in operation's generation */
1178 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1179 &ee->element_hash,
1180 &op->my_xor);
1181 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182 "Initial full initialization of my_elements, adding %s:%u\n",
1183 GNUNET_h2s (&ee->element_hash),
1184 ee->element.size);
1185 GNUNET_break (GNUNET_YES ==
1186 GNUNET_CONTAINER_multihashmap_put (op->my_elements,
1187 &ee->element_hash,
1188 ee,
1189 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1190 return GNUNET_YES;
1191}
1192
1193
1194/**
1195 * Send our element count to the peer, in case our element count is
1196 * lower than theirs.
1197 *
1198 * @param op intersection operation
1199 */
1200static void
1201send_element_count (struct Operation *op)
1202{
1203 struct GNUNET_MQ_Envelope *ev;
1204 struct IntersectionElementInfoMessage *msg;
1205
1206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207 "Sending our element count (%u)\n",
1208 op->my_element_count);
1209 ev = GNUNET_MQ_msg (msg,
1210 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO);
1211 msg->sender_element_count = htonl (op->my_element_count);
1212 GNUNET_MQ_send (op->mq, ev);
1213}
1214
1215
1216/**
1217 * We go first, initialize our map with all elements and
1218 * send the first Bloom filter.
1219 *
1220 * @param op operation to start exchange for
1221 */
1222static void
1223begin_bf_exchange (struct Operation *op)
1224{
1225 op->phase = PHASE_BF_EXCHANGE;
1226 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1227 &initialize_map_unfiltered,
1228 op);
1229 send_bloomfilter (op);
1230}
1231
1232
1233/**
1234 * Handle the initial `struct IntersectionElementInfoMessage` from a
1235 * remote peer.
1236 *
1237 * @param cls the intersection operation
1238 * @param mh the header of the message
1239 */
1240static void
1241handle_intersection_p2p_element_info (void *cls,
1242 const struct
1243 IntersectionElementInfoMessage *msg)
1244{
1245 struct Operation *op = cls;
1246
1247 op->remote_element_count = ntohl (msg->sender_element_count);
1248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1249 "Received remote element count (%u), I have %u\n",
1250 op->remote_element_count,
1251 op->my_element_count);
1252 if (((PHASE_INITIAL != op->phase) &&
1253 (PHASE_COUNT_SENT != op->phase)) ||
1254 (op->my_element_count > op->remote_element_count) ||
1255 (0 == op->my_element_count) ||
1256 (0 == op->remote_element_count))
1257 {
1258 GNUNET_break_op (0);
1259 fail_intersection_operation (op);
1260 return;
1261 }
1262 GNUNET_break (NULL == op->remote_bf);
1263 begin_bf_exchange (op);
1264 GNUNET_CADET_receive_done (op->channel);
1265}
1266
1267
1268/**
1269 * Process a Bloomfilter once we got all the chunks.
1270 *
1271 * @param op the intersection operation
1272 */
1273static void
1274process_bf (struct Operation *op)
1275{
1276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
1278 op->phase,
1279 op->remote_element_count,
1280 op->my_element_count,
1281 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
1282 switch (op->phase)
1283 {
1284 case PHASE_INITIAL:
1285 GNUNET_break_op (0);
1286 fail_intersection_operation (op);
1287 return;
1288 case PHASE_COUNT_SENT:
1289 /* This is the first BF being sent, build our initial map with
1290 filtering in place */
1291 op->my_element_count = 0;
1292 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1293 &filtered_map_initialization,
1294 op);
1295 break;
1296 case PHASE_BF_EXCHANGE:
1297 /* Update our set by reduction */
1298 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
1299 &iterator_bf_reduce,
1300 op);
1301 break;
1302 case PHASE_MUST_SEND_DONE:
1303 GNUNET_break_op (0);
1304 fail_intersection_operation (op);
1305 return;
1306 case PHASE_DONE_RECEIVED:
1307 GNUNET_break_op (0);
1308 fail_intersection_operation (op);
1309 return;
1310 case PHASE_FINISHED:
1311 GNUNET_break_op (0);
1312 fail_intersection_operation (op);
1313 return;
1314 }
1315 GNUNET_CONTAINER_bloomfilter_free (op->remote_bf);
1316 op->remote_bf = NULL;
1317
1318 if ((0 == op->my_element_count) || /* fully disjoint */
1319 ((op->my_element_count == op->remote_element_count) &&
1320 (0 == GNUNET_memcmp (&op->my_xor,
1321 &op->other_xor))))
1322 {
1323 /* we are done */
1324 op->phase = PHASE_MUST_SEND_DONE;
1325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326 "Intersection succeeded, sending DONE to other peer\n");
1327 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
1328 op->local_bf = NULL;
1329 if (GNUNET_YES == op->return_intersection)
1330 {
1331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332 "Sending full result set (%u elements)\n",
1333 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1334 op->full_result_iter
1335 = GNUNET_CONTAINER_multihashmap_iterator_create (
1336 op->my_elements);
1337 send_remaining_elements (op);
1338 return;
1339 }
1340 send_p2p_done (op);
1341 return;
1342 }
1343 op->phase = PHASE_BF_EXCHANGE;
1344 send_bloomfilter (op);
1345}
1346
1347
1348/**
1349 * Check an BF message from a remote peer.
1350 *
1351 * @param cls the intersection operation
1352 * @param msg the header of the message
1353 * @return #GNUNET_OK if @a msg is well-formed
1354 */
1355static int
1356check_intersection_p2p_bf (void *cls,
1357 const struct BFMessage *msg)
1358{
1359 struct Operation *op = cls;
1360
1361 (void) op;
1362 return GNUNET_OK;
1363}
1364
1365
1366/**
1367 * Handle an BF message from a remote peer.
1368 *
1369 * @param cls the intersection operation
1370 * @param msg the header of the message
1371 */
1372static void
1373handle_intersection_p2p_bf (void *cls,
1374 const struct BFMessage *msg)
1375{
1376 struct Operation *op = cls;
1377 uint32_t bf_size;
1378 uint32_t chunk_size;
1379 uint32_t bf_bits_per_element;
1380
1381 switch (op->phase)
1382 {
1383 case PHASE_INITIAL:
1384 GNUNET_break_op (0);
1385 fail_intersection_operation (op);
1386 return;
1387
1388 case PHASE_COUNT_SENT:
1389 case PHASE_BF_EXCHANGE:
1390 bf_size = ntohl (msg->bloomfilter_total_length);
1391 bf_bits_per_element = ntohl (msg->bits_per_element);
1392 chunk_size = htons (msg->header.size) - sizeof(struct BFMessage);
1393 op->other_xor = msg->element_xor_hash;
1394 if (bf_size == chunk_size)
1395 {
1396 if (NULL != op->bf_data)
1397 {
1398 GNUNET_break_op (0);
1399 fail_intersection_operation (op);
1400 return;
1401 }
1402 /* single part, done here immediately */
1403 op->remote_bf
1404 = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1],
1405 bf_size,
1406 bf_bits_per_element);
1407 op->salt = ntohl (msg->sender_mutator);
1408 op->remote_element_count = ntohl (msg->sender_element_count);
1409 process_bf (op);
1410 break;
1411 }
1412 /* multipart chunk */
1413 if (NULL == op->bf_data)
1414 {
1415 /* first chunk, initialize */
1416 op->bf_data = GNUNET_malloc (bf_size);
1417 op->bf_data_size = bf_size;
1418 op->bf_bits_per_element = bf_bits_per_element;
1419 op->bf_data_offset = 0;
1420 op->salt = ntohl (msg->sender_mutator);
1421 op->remote_element_count = ntohl (msg->sender_element_count);
1422 }
1423 else
1424 {
1425 /* increment */
1426 if ((op->bf_data_size != bf_size) ||
1427 (op->bf_bits_per_element != bf_bits_per_element) ||
1428 (op->bf_data_offset + chunk_size > bf_size) ||
1429 (op->salt != ntohl (msg->sender_mutator)) ||
1430 (op->remote_element_count != ntohl (msg->sender_element_count)))
1431 {
1432 GNUNET_break_op (0);
1433 fail_intersection_operation (op);
1434 return;
1435 }
1436 }
1437 GNUNET_memcpy (&op->bf_data[op->bf_data_offset],
1438 (const char *) &msg[1],
1439 chunk_size);
1440 op->bf_data_offset += chunk_size;
1441 if (op->bf_data_offset == bf_size)
1442 {
1443 /* last chunk, run! */
1444 op->remote_bf
1445 = GNUNET_CONTAINER_bloomfilter_init (op->bf_data,
1446 bf_size,
1447 bf_bits_per_element);
1448 GNUNET_free (op->bf_data);
1449 op->bf_data = NULL;
1450 op->bf_data_size = 0;
1451 process_bf (op);
1452 }
1453 break;
1454
1455 default:
1456 GNUNET_break_op (0);
1457 fail_intersection_operation (op);
1458 return;
1459 }
1460 GNUNET_CADET_receive_done (op->channel);
1461}
1462
1463
1464/**
1465 * Remove all elements from our hashmap.
1466 *
1467 * @param cls closure with the `struct Operation *`
1468 * @param key current key code
1469 * @param value value in the hash map
1470 * @return #GNUNET_YES (we should continue to iterate)
1471 */
1472static int
1473filter_all (void *cls,
1474 const struct GNUNET_HashCode *key,
1475 void *value)
1476{
1477 struct Operation *op = cls;
1478 struct ElementEntry *ee = value;
1479
1480 GNUNET_break (0 < op->my_element_count);
1481 op->my_element_count--;
1482 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1483 &ee->element_hash,
1484 &op->my_xor);
1485 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1486 "Final reduction of my_elements, removing %s:%u\n",
1487 GNUNET_h2s (&ee->element_hash),
1488 ee->element.size);
1489 GNUNET_assert (GNUNET_YES ==
1490 GNUNET_CONTAINER_multihashmap_remove (op->my_elements,
1491 &ee->element_hash,
1492 ee));
1493 send_client_removed_element (op,
1494 &ee->element);
1495 return GNUNET_YES;
1496}
1497
1498
1499/**
1500 * Handle a done message from a remote peer
1501 *
1502 * @param cls the intersection operation
1503 * @param mh the message
1504 */
1505static void
1506handle_intersection_p2p_done (void *cls,
1507 const struct IntersectionDoneMessage *idm)
1508{
1509 struct Operation *op = cls;
1510
1511 if (PHASE_BF_EXCHANGE != op->phase)
1512 {
1513 /* wrong phase to conclude? FIXME: Or should we allow this
1514 if the other peer has _initially_ already an empty set? */
1515 GNUNET_break_op (0);
1516 fail_intersection_operation (op);
1517 return;
1518 }
1519 if (0 == ntohl (idm->final_element_count))
1520 {
1521 /* other peer determined empty set is the intersection,
1522 remove all elements */
1523 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
1524 &filter_all,
1525 op);
1526 }
1527 if ((op->my_element_count != ntohl (idm->final_element_count)) ||
1528 (0 != GNUNET_memcmp (&op->my_xor,
1529 &idm->element_xor_hash)))
1530 {
1531 /* Other peer thinks we are done, but we disagree on the result! */
1532 GNUNET_break_op (0);
1533 fail_intersection_operation (op);
1534 return;
1535 }
1536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1538 op->my_element_count);
1539 op->phase = PHASE_DONE_RECEIVED;
1540 GNUNET_CADET_receive_done (op->channel);
1541
1542 GNUNET_assert (GNUNET_NO == op->client_done_sent);
1543 if (GNUNET_YES == op->return_intersection)
1544 {
1545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546 "Sending full result set to client (%u elements)\n",
1547 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1548 op->full_result_iter
1549 = GNUNET_CONTAINER_multihashmap_iterator_create (op->my_elements);
1550 send_remaining_elements (op);
1551 return;
1552 }
1553 op->phase = PHASE_FINISHED;
1554 send_client_done_and_destroy (op);
1555}
1556
1557
1558/**
1559 * Get the incoming socket associated with the given id.
1560 *
1561 * @param listener the listener to look in
1562 * @param id id to look for
1563 * @return the incoming socket associated with the id,
1564 * or NULL if there is none
1565 */
1566static struct Operation *
1567get_incoming (uint32_t id)
1568{
1569 for (struct Listener *listener = listener_head; NULL != listener;
1570 listener = listener->next)
1571 {
1572 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
1573 if (op->suggest_id == id)
1574 return op;
1575 }
1576 return NULL;
1577}
1578
1579
1580/**
1581 * Callback called when a client connects to the service.
1582 *
1583 * @param cls closure for the service
1584 * @param c the new client that connected to the service
1585 * @param mq the message queue used to send messages to the client
1586 * @return @a `struct ClientState`
1587 */
1588static void *
1589client_connect_cb (void *cls,
1590 struct GNUNET_SERVICE_Client *c,
1591 struct GNUNET_MQ_Handle *mq)
1592{
1593 struct ClientState *cs;
1594
1595 num_clients++;
1596 cs = GNUNET_new (struct ClientState);
1597 cs->client = c;
1598 cs->mq = mq;
1599 return cs;
1600}
1601
1602
1603/**
1604 * Iterator over hash map entries to free element entries.
1605 *
1606 * @param cls closure
1607 * @param key current key code
1608 * @param value a `struct ElementEntry *` to be free'd
1609 * @return #GNUNET_YES (continue to iterate)
1610 */
1611static int
1612destroy_elements_iterator (void *cls,
1613 const struct GNUNET_HashCode *key,
1614 void *value)
1615{
1616 struct ElementEntry *ee = value;
1617
1618 GNUNET_free (ee);
1619 return GNUNET_YES;
1620}
1621
1622
1623/**
1624 * Clean up after a client has disconnected
1625 *
1626 * @param cls closure, unused
1627 * @param client the client to clean up after
1628 * @param internal_cls the `struct ClientState`
1629 */
1630static void
1631client_disconnect_cb (void *cls,
1632 struct GNUNET_SERVICE_Client *client,
1633 void *internal_cls)
1634{
1635 struct ClientState *cs = internal_cls;
1636 struct Operation *op;
1637 struct Listener *listener;
1638 struct Set *set;
1639
1640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
1641 if (NULL != (set = cs->set))
1642 {
1643 struct SetContent *content = set->content;
1644
1645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
1646 /* Destroy pending set operations */
1647 while (NULL != set->ops_head)
1648 _GSS_operation_destroy (set->ops_head);
1649
1650 /* free set content (or at least decrement RC) */
1651 set->content = NULL;
1652 GNUNET_assert (0 != content->refcount);
1653 content->refcount--;
1654 if (0 == content->refcount)
1655 {
1656 GNUNET_assert (NULL != content->elements);
1657 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
1658 &destroy_elements_iterator,
1659 NULL);
1660 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
1661 content->elements = NULL;
1662 GNUNET_free (content);
1663 }
1664 GNUNET_free (set);
1665 }
1666
1667 if (NULL != (listener = cs->listener))
1668 {
1669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
1670 GNUNET_CADET_close_port (listener->open_port);
1671 listener->open_port = NULL;
1672 while (NULL != (op = listener->op_head))
1673 {
1674 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1675 "Destroying incoming operation `%u' from peer `%s'\n",
1676 (unsigned int) op->client_request_id,
1677 GNUNET_i2s (&op->peer));
1678 incoming_destroy (op);
1679 }
1680 GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
1681 GNUNET_free (listener);
1682 }
1683 GNUNET_free (cs);
1684 num_clients--;
1685 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
1686 {
1687 if (NULL != cadet)
1688 {
1689 GNUNET_CADET_disconnect (cadet);
1690 cadet = NULL;
1691 }
1692 }
1693}
1694
1695
1696/**
1697 * Check a request for a set operation from another peer.
1698 *
1699 * @param cls the operation state
1700 * @param msg the received message
1701 * @return #GNUNET_OK if the channel should be kept alive,
1702 * #GNUNET_SYSERR to destroy the channel
1703 */
1704static int
1705check_incoming_msg (void *cls,
1706 const struct OperationRequestMessage *msg)
1707{
1708 struct Operation *op = cls;
1709 struct Listener *listener = op->listener;
1710 const struct GNUNET_MessageHeader *nested_context;
1711
1712 /* double operation request */
1713 if (0 != op->suggest_id)
1714 {
1715 GNUNET_break_op (0);
1716 return GNUNET_SYSERR;
1717 }
1718 /* This should be equivalent to the previous condition, but can't hurt to check twice */
1719 if (NULL == listener)
1720 {
1721 GNUNET_break (0);
1722 return GNUNET_SYSERR;
1723 }
1724 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1725 if ((NULL != nested_context) &&
1726 (ntohs (nested_context->size) > GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE))
1727 {
1728 GNUNET_break_op (0);
1729 return GNUNET_SYSERR;
1730 }
1731 return GNUNET_OK;
1732}
1733
1734
1735/**
1736 * Handle a request for a set operation from another peer. Checks if we
1737 * have a listener waiting for such a request (and in that case initiates
1738 * asking the listener about accepting the connection). If no listener
1739 * is waiting, we queue the operation request in hope that a listener
1740 * shows up soon (before timeout).
1741 *
1742 * This msg is expected as the first and only msg handled through the
1743 * non-operation bound virtual table, acceptance of this operation replaces
1744 * our virtual table and subsequent msgs would be routed differently (as
1745 * we then know what type of operation this is).
1746 *
1747 * @param cls the operation state
1748 * @param msg the received message
1749 * @return #GNUNET_OK if the channel should be kept alive,
1750 * #GNUNET_SYSERR to destroy the channel
1751 */
1752static void
1753handle_incoming_msg (void *cls,
1754 const struct OperationRequestMessage *msg)
1755{
1756 struct Operation *op = cls;
1757 struct Listener *listener = op->listener;
1758 const struct GNUNET_MessageHeader *nested_context;
1759 struct GNUNET_MQ_Envelope *env;
1760 struct GNUNET_SETI_RequestMessage *cmsg;
1761
1762 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1763 /* Make a copy of the nested_context (application-specific context
1764 information that is opaque to set) so we can pass it to the
1765 listener later on */
1766 if (NULL != nested_context)
1767 op->context_msg = GNUNET_copy_message (nested_context);
1768 op->remote_element_count = ntohl (msg->element_count);
1769 GNUNET_log (
1770 GNUNET_ERROR_TYPE_DEBUG,
1771 "Received P2P operation request (port %s) for active listener\n",
1772 GNUNET_h2s (&op->listener->app_id));
1773 GNUNET_assert (0 == op->suggest_id);
1774 if (0 == suggest_id)
1775 suggest_id++;
1776 op->suggest_id = suggest_id++;
1777 GNUNET_assert (NULL != op->timeout_task);
1778 GNUNET_SCHEDULER_cancel (op->timeout_task);
1779 op->timeout_task = NULL;
1780 env = GNUNET_MQ_msg_nested_mh (cmsg,
1781 GNUNET_MESSAGE_TYPE_SETI_REQUEST,
1782 op->context_msg);
1783 GNUNET_log (
1784 GNUNET_ERROR_TYPE_DEBUG,
1785 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
1786 op->suggest_id,
1787 listener,
1788 listener->cs);
1789 cmsg->accept_id = htonl (op->suggest_id);
1790 cmsg->peer_id = op->peer;
1791 GNUNET_MQ_send (listener->cs->mq, env);
1792 /* NOTE: GNUNET_CADET_receive_done() will be called in
1793 #handle_client_accept() */
1794}
1795
1796
1797/**
1798 * Called when a client wants to create a new set. This is typically
1799 * the first request from a client, and includes the type of set
1800 * operation to be performed.
1801 *
1802 * @param cls client that sent the message
1803 * @param m message sent by the client
1804 */
1805static void
1806handle_client_create_set (void *cls,
1807 const struct GNUNET_SETI_CreateMessage *msg)
1808{
1809 struct ClientState *cs = cls;
1810 struct Set *set;
1811
1812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1813 "Client created new intersection set\n");
1814 if (NULL != cs->set)
1815 {
1816 /* There can only be one set per client */
1817 GNUNET_break (0);
1818 GNUNET_SERVICE_client_drop (cs->client);
1819 return;
1820 }
1821 set = GNUNET_new (struct Set);
1822 set->content = GNUNET_new (struct SetContent);
1823 set->content->refcount = 1;
1824 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1825 GNUNET_YES);
1826 set->cs = cs;
1827 cs->set = set;
1828 GNUNET_SERVICE_client_continue (cs->client);
1829}
1830
1831
1832/**
1833 * Timeout happens iff:
1834 * - we suggested an operation to our listener,
1835 * but did not receive a response in time
1836 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST
1837 *
1838 * @param cls channel context
1839 * @param tc context information (why was this task triggered now)
1840 */
1841static void
1842incoming_timeout_cb (void *cls)
1843{
1844 struct Operation *op = cls;
1845
1846 op->timeout_task = NULL;
1847 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1848 "Remote peer's incoming request timed out\n");
1849 incoming_destroy (op);
1850}
1851
1852
1853/**
1854 * Method called whenever another peer has added us to a channel the
1855 * other peer initiated. Only called (once) upon reception of data
1856 * from a channel we listen on.
1857 *
1858 * The channel context represents the operation itself and gets added
1859 * to a DLL, from where it gets looked up when our local listener
1860 * client responds to a proposed/suggested operation or connects and
1861 * associates with this operation.
1862 *
1863 * @param cls closure
1864 * @param channel new handle to the channel
1865 * @param source peer that started the channel
1866 * @return initial channel context for the channel
1867 * returns NULL on error
1868 */
1869static void *
1870channel_new_cb (void *cls,
1871 struct GNUNET_CADET_Channel *channel,
1872 const struct GNUNET_PeerIdentity *source)
1873{
1874 struct Listener *listener = cls;
1875 struct Operation *op;
1876
1877 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1878 "New incoming channel\n");
1879 op = GNUNET_new (struct Operation);
1880 op->listener = listener;
1881 op->peer = *source;
1882 op->channel = channel;
1883 op->mq = GNUNET_CADET_get_mq (op->channel);
1884 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1885 UINT32_MAX);
1886 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1887 &incoming_timeout_cb,
1888 op);
1889 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1890 listener->op_tail,
1891 op);
1892 return op;
1893}
1894
1895
1896/**
1897 * Function called whenever a channel is destroyed. Should clean up
1898 * any associated state. It must NOT call
1899 * GNUNET_CADET_channel_destroy() on the channel.
1900 *
1901 * The peer_disconnect function is part of a a virtual table set initially either
1902 * when a peer creates a new channel with us, or once we create
1903 * a new channel ourselves (evaluate).
1904 *
1905 * Once we know the exact type of operation (union/intersection), the vt is
1906 * replaced with an operation specific instance (_GSS_[op]_vt).
1907 *
1908 * @param channel_ctx place where local state associated
1909 * with the channel is stored
1910 * @param channel connection to the other end (henceforth invalid)
1911 */
1912static void
1913channel_end_cb (void *channel_ctx,
1914 const struct GNUNET_CADET_Channel *channel)
1915{
1916 struct Operation *op = channel_ctx;
1917
1918 op->channel = NULL;
1919 _GSS_operation_destroy2 (op);
1920}
1921
1922
1923/**
1924 * Function called whenever an MQ-channel's transmission window size changes.
1925 *
1926 * The first callback in an outgoing channel will be with a non-zero value
1927 * and will mean the channel is connected to the destination.
1928 *
1929 * For an incoming channel it will be called immediately after the
1930 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1931 *
1932 * @param cls Channel closure.
1933 * @param channel Connection to the other end (henceforth invalid).
1934 * @param window_size New window size. If the is more messages than buffer size
1935 * this value will be negative..
1936 */
1937static void
1938channel_window_cb (void *cls,
1939 const struct GNUNET_CADET_Channel *channel,
1940 int window_size)
1941{
1942 /* FIXME: not implemented, we could do flow control here... */
1943}
1944
1945
1946/**
1947 * Called when a client wants to create a new listener.
1948 *
1949 * @param cls client that sent the message
1950 * @param msg message sent by the client
1951 */
1952static void
1953handle_client_listen (void *cls,
1954 const struct GNUNET_SETI_ListenMessage *msg)
1955{
1956 struct ClientState *cs = cls;
1957 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1958 GNUNET_MQ_hd_var_size (incoming_msg,
1959 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
1960 struct OperationRequestMessage,
1961 NULL),
1962 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1963 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
1964 struct IntersectionElementInfoMessage,
1965 NULL),
1966 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1967 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
1968 struct BFMessage,
1969 NULL),
1970 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1971 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
1972 struct IntersectionDoneMessage,
1973 NULL),
1974 GNUNET_MQ_handler_end ()
1975 };
1976 struct Listener *listener;
1977
1978 if (NULL != cs->listener)
1979 {
1980 /* max. one active listener per client! */
1981 GNUNET_break (0);
1982 GNUNET_SERVICE_client_drop (cs->client);
1983 return;
1984 }
1985 listener = GNUNET_new (struct Listener);
1986 listener->cs = cs;
1987 cs->listener = listener;
1988 listener->app_id = msg->app_id;
1989 GNUNET_CONTAINER_DLL_insert (listener_head,
1990 listener_tail,
1991 listener);
1992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993 "New listener for set intersection created (port %s)\n",
1994 GNUNET_h2s (&listener->app_id));
1995 listener->open_port = GNUNET_CADET_open_port (cadet,
1996 &msg->app_id,
1997 &channel_new_cb,
1998 listener,
1999 &channel_window_cb,
2000 &channel_end_cb,
2001 cadet_handlers);
2002 GNUNET_SERVICE_client_continue (cs->client);
2003}
2004
2005
2006/**
2007 * Called when the listening client rejects an operation
2008 * request by another peer.
2009 *
2010 * @param cls client that sent the message
2011 * @param msg message sent by the client
2012 */
2013static void
2014handle_client_reject (void *cls,
2015 const struct GNUNET_SETI_RejectMessage *msg)
2016{
2017 struct ClientState *cs = cls;
2018 struct Operation *op;
2019
2020 op = get_incoming (ntohl (msg->accept_reject_id));
2021 if (NULL == op)
2022 {
2023 /* no matching incoming operation for this reject;
2024 could be that the other peer already disconnected... */
2025 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2026 "Client rejected unknown operation %u\n",
2027 (unsigned int) ntohl (msg->accept_reject_id));
2028 GNUNET_SERVICE_client_continue (cs->client);
2029 return;
2030 }
2031 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2032 "Peer request (app %s) rejected by client\n",
2033 GNUNET_h2s (&cs->listener->app_id));
2034 _GSS_operation_destroy2 (op);
2035 GNUNET_SERVICE_client_continue (cs->client);
2036}
2037
2038
2039/**
2040 * Called when a client wants to add or remove an element to a set it inhabits.
2041 *
2042 * @param cls client that sent the message
2043 * @param msg message sent by the client
2044 */
2045static int
2046check_client_set_add (void *cls,
2047 const struct GNUNET_SETI_ElementMessage *msg)
2048{
2049 /* NOTE: Technically, we should probably check with the
2050 block library whether the element we are given is well-formed */
2051 return GNUNET_OK;
2052}
2053
2054
2055/**
2056 * Called when a client wants to add an element to a set it inhabits.
2057 *
2058 * @param cls client that sent the message
2059 * @param msg message sent by the client
2060 */
2061static void
2062handle_client_set_add (void *cls,
2063 const struct GNUNET_SETI_ElementMessage *msg)
2064{
2065 struct ClientState *cs = cls;
2066 struct Set *set;
2067 struct GNUNET_SETI_Element el;
2068 struct ElementEntry *ee;
2069 struct GNUNET_HashCode hash;
2070
2071 if (NULL == (set = cs->set))
2072 {
2073 /* client without a set requested an operation */
2074 GNUNET_break (0);
2075 GNUNET_SERVICE_client_drop (cs->client);
2076 return;
2077 }
2078 GNUNET_SERVICE_client_continue (cs->client);
2079 el.size = ntohs (msg->header.size) - sizeof(*msg);
2080 el.data = &msg[1];
2081 el.element_type = ntohs (msg->element_type);
2082 GNUNET_SETI_element_hash (&el,
2083 &hash);
2084 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
2085 &hash);
2086 if (NULL == ee)
2087 {
2088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089 "Client inserts element %s of size %u\n",
2090 GNUNET_h2s (&hash),
2091 el.size);
2092 ee = GNUNET_malloc (el.size + sizeof(*ee));
2093 ee->element.size = el.size;
2094 GNUNET_memcpy (&ee[1], el.data, el.size);
2095 ee->element.data = &ee[1];
2096 ee->element.element_type = el.element_type;
2097 ee->remote = GNUNET_NO;
2098 ee->element_hash = hash;
2099 GNUNET_break (GNUNET_YES ==
2100 GNUNET_CONTAINER_multihashmap_put (
2101 set->content->elements,
2102 &ee->element_hash,
2103 ee,
2104 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2105 }
2106 else
2107 {
2108 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2109 "Client inserted element %s of size %u twice (ignored)\n",
2110 GNUNET_h2s (&hash),
2111 el.size);
2112 /* same element inserted twice */
2113 return;
2114 }
2115 set->current_set_element_count++;
2116}
2117
2118
2119/**
2120 * Advance the current generation of a set,
2121 * adding exclusion ranges if necessary.
2122 *
2123 * @param set the set where we want to advance the generation
2124 */
2125static void
2126advance_generation (struct Set *set)
2127{
2128 if (set->current_generation == set->content->latest_generation)
2129 {
2130 set->content->latest_generation++;
2131 set->current_generation++;
2132 return;
2133 }
2134 GNUNET_assert (set->current_generation < set->content->latest_generation);
2135}
2136
2137
2138/**
2139 * Called when a client wants to initiate a set operation with another
2140 * peer. Initiates the CADET connection to the listener and sends the
2141 * request.
2142 *
2143 * @param cls client that sent the message
2144 * @param msg message sent by the client
2145 * @return #GNUNET_OK if the message is well-formed
2146 */
2147static int
2148check_client_evaluate (void *cls,
2149 const struct GNUNET_SETI_EvaluateMessage *msg)
2150{
2151 /* FIXME: suboptimal, even if the context below could be NULL,
2152 there are malformed messages this does not check for... */
2153 return GNUNET_OK;
2154}
2155
2156
2157/**
2158 * Called when a client wants to initiate a set operation with another
2159 * peer. Initiates the CADET connection to the listener and sends the
2160 * request.
2161 *
2162 * @param cls client that sent the message
2163 * @param msg message sent by the client
2164 */
2165static void
2166handle_client_evaluate (void *cls,
2167 const struct GNUNET_SETI_EvaluateMessage *msg)
2168{
2169 struct ClientState *cs = cls;
2170 struct Operation *op = GNUNET_new (struct Operation);
2171 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2172 GNUNET_MQ_hd_var_size (incoming_msg,
2173 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2174 struct OperationRequestMessage,
2175 op),
2176 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
2177 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
2178 struct IntersectionElementInfoMessage,
2179 op),
2180 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
2181 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
2182 struct BFMessage,
2183 op),
2184 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
2185 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
2186 struct IntersectionDoneMessage,
2187 op),
2188 GNUNET_MQ_handler_end ()
2189 };
2190 struct Set *set;
2191 const struct GNUNET_MessageHeader *context;
2192
2193 if (NULL == (set = cs->set))
2194 {
2195 GNUNET_break (0);
2196 GNUNET_free (op);
2197 GNUNET_SERVICE_client_drop (cs->client);
2198 return;
2199 }
2200 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2201 UINT32_MAX);
2202 op->peer = msg->target_peer;
2203 op->return_intersection = htonl (msg->return_intersection);
2204 fprintf (stderr,
2205 "Return intersection for evaluate is %d\n",
2206 op->return_intersection);
2207 op->client_request_id = ntohl (msg->request_id);
2208 context = GNUNET_MQ_extract_nested_mh (msg);
2209
2210 /* Advance generation values, so that
2211 mutations won't interfer with the running operation. */
2212 op->set = set;
2213 op->generation_created = set->current_generation;
2214 advance_generation (set);
2215 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2216 set->ops_tail,
2217 op);
2218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2219 "Creating new CADET channel to port %s for set intersection\n",
2220 GNUNET_h2s (&msg->app_id));
2221 op->channel = GNUNET_CADET_channel_create (cadet,
2222 op,
2223 &msg->target_peer,
2224 &msg->app_id,
2225 &channel_window_cb,
2226 &channel_end_cb,
2227 cadet_handlers);
2228 op->mq = GNUNET_CADET_get_mq (op->channel);
2229 {
2230 struct GNUNET_MQ_Envelope *ev;
2231 struct OperationRequestMessage *msg;
2232
2233 ev = GNUNET_MQ_msg_nested_mh (msg,
2234 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2235 context);
2236 if (NULL == ev)
2237 {
2238 /* the context message is too large!? */
2239 GNUNET_break (0);
2240 GNUNET_SERVICE_client_drop (cs->client);
2241 return;
2242 }
2243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2244 "Initiating intersection operation evaluation\n");
2245 /* we started the operation, thus we have to send the operation request */
2246 op->phase = PHASE_INITIAL;
2247 op->my_element_count = op->set->current_set_element_count;
2248 op->my_elements
2249 = GNUNET_CONTAINER_multihashmap_create (op->my_element_count,
2250 GNUNET_YES);
2251
2252 msg->element_count = htonl (op->my_element_count);
2253 GNUNET_MQ_send (op->mq,
2254 ev);
2255 op->phase = PHASE_COUNT_SENT;
2256 if (NULL != context)
2257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258 "Sent op request with context message\n");
2259 else
2260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2261 "Sent op request without context message\n");
2262 }
2263 GNUNET_SERVICE_client_continue (cs->client);
2264}
2265
2266
2267/**
2268 * Handle a request from the client to cancel a running set operation.
2269 *
2270 * @param cls the client
2271 * @param msg the message
2272 */
2273static void
2274handle_client_cancel (void *cls,
2275 const struct GNUNET_SETI_CancelMessage *msg)
2276{
2277 struct ClientState *cs = cls;
2278 struct Set *set;
2279 struct Operation *op;
2280 int found;
2281
2282 if (NULL == (set = cs->set))
2283 {
2284 /* client without a set requested an operation */
2285 GNUNET_break (0);
2286 GNUNET_SERVICE_client_drop (cs->client);
2287 return;
2288 }
2289 found = GNUNET_NO;
2290 for (op = set->ops_head; NULL != op; op = op->next)
2291 {
2292 if (op->client_request_id == ntohl (msg->request_id))
2293 {
2294 found = GNUNET_YES;
2295 break;
2296 }
2297 }
2298 if (GNUNET_NO == found)
2299 {
2300 /* It may happen that the operation was already destroyed due to
2301 * the other peer disconnecting. The client may not know about this
2302 * yet and try to cancel the (just barely non-existent) operation.
2303 * So this is not a hard error.
2304 *///
2305 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2306 "Client canceled non-existent op %u\n",
2307 (uint32_t) ntohl (msg->request_id));
2308 }
2309 else
2310 {
2311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2312 "Client requested cancel for op %u\n",
2313 (uint32_t) ntohl (msg->request_id));
2314 _GSS_operation_destroy (op);
2315 }
2316 GNUNET_SERVICE_client_continue (cs->client);
2317}
2318
2319
2320/**
2321 * Handle a request from the client to accept a set operation that
2322 * came from a remote peer. We forward the accept to the associated
2323 * operation for handling
2324 *
2325 * @param cls the client
2326 * @param msg the message
2327 */
2328static void
2329handle_client_accept (void *cls,
2330 const struct GNUNET_SETI_AcceptMessage *msg)
2331{
2332 struct ClientState *cs = cls;
2333 struct Set *set;
2334 struct Operation *op;
2335 struct GNUNET_SETI_ResultMessage *result_message;
2336 struct GNUNET_MQ_Envelope *ev;
2337 struct Listener *listener;
2338
2339 if (NULL == (set = cs->set))
2340 {
2341 /* client without a set requested to accept */
2342 GNUNET_break (0);
2343 GNUNET_SERVICE_client_drop (cs->client);
2344 return;
2345 }
2346 op = get_incoming (ntohl (msg->accept_reject_id));
2347 if (NULL == op)
2348 {
2349 /* It is not an error if the set op does not exist -- it may
2350 * have been destroyed when the partner peer disconnected. */
2351 GNUNET_log (
2352 GNUNET_ERROR_TYPE_INFO,
2353 "Client %p accepted request %u of listener %p that is no longer active\n",
2354 cs,
2355 ntohl (msg->accept_reject_id),
2356 cs->listener);
2357 ev = GNUNET_MQ_msg (result_message,
2358 GNUNET_MESSAGE_TYPE_SETI_RESULT);
2359 result_message->request_id = msg->request_id;
2360 result_message->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
2361 GNUNET_MQ_send (set->cs->mq, ev);
2362 GNUNET_SERVICE_client_continue (cs->client);
2363 return;
2364 }
2365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366 "Client accepting request %u\n",
2367 (uint32_t) ntohl (msg->accept_reject_id));
2368 listener = op->listener;
2369 op->listener = NULL;
2370 op->return_intersection = htonl (msg->return_intersection);
2371 fprintf (stderr,
2372 "Return intersection for accept is %d\n",
2373 op->return_intersection);
2374 GNUNET_CONTAINER_DLL_remove (listener->op_head,
2375 listener->op_tail,
2376 op);
2377 op->set = set;
2378 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2379 set->ops_tail,
2380 op);
2381 op->client_request_id = ntohl (msg->request_id);
2382
2383 /* Advance generation values, so that future mutations do not
2384 interfer with the running operation. */
2385 op->generation_created = set->current_generation;
2386 advance_generation (set);
2387 {
2388 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2389 "Accepting set intersection operation\n");
2390 op->phase = PHASE_INITIAL;
2391 op->my_element_count
2392 = op->set->current_set_element_count;
2393 op->my_elements
2394 = GNUNET_CONTAINER_multihashmap_create (
2395 GNUNET_MIN (op->my_element_count,
2396 op->remote_element_count),
2397 GNUNET_YES);
2398 if (op->remote_element_count < op->my_element_count)
2399 {
2400 /* If the other peer (Alice) has fewer elements than us (Bob),
2401 we just send the count as Alice should send the first BF */
2402 send_element_count (op);
2403 op->phase = PHASE_COUNT_SENT;
2404 }
2405 else
2406 {
2407 /* We have fewer elements, so we start with the BF */
2408 begin_bf_exchange (op);
2409 }
2410 }
2411 /* Now allow CADET to continue, as we did not do this in
2412 #handle_incoming_msg (as we wanted to first see if the
2413 local client would accept the request). */
2414 GNUNET_CADET_receive_done (op->channel);
2415 GNUNET_SERVICE_client_continue (cs->client);
2416}
2417
2418
2419/**
2420 * Called to clean up, after a shutdown has been requested.
2421 *
2422 * @param cls closure, NULL
2423 */
2424static void
2425shutdown_task (void *cls)
2426{
2427 /* Delay actual shutdown to allow service to disconnect clients */
2428 in_shutdown = GNUNET_YES;
2429 if (0 == num_clients)
2430 {
2431 if (NULL != cadet)
2432 {
2433 GNUNET_CADET_disconnect (cadet);
2434 cadet = NULL;
2435 }
2436 }
2437 GNUNET_STATISTICS_destroy (_GSS_statistics,
2438 GNUNET_YES);
2439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2440 "handled shutdown request\n");
2441}
2442
2443
2444/**
2445 * Function called by the service's run
2446 * method to run service-specific setup code.
2447 *
2448 * @param cls closure
2449 * @param cfg configuration to use
2450 * @param service the initialized service
2451 */
2452static void
2453run (void *cls,
2454 const struct GNUNET_CONFIGURATION_Handle *cfg,
2455 struct GNUNET_SERVICE_Handle *service)
2456{
2457 /* FIXME: need to modify SERVICE (!) API to allow
2458 us to run a shutdown task *after* clients were
2459 forcefully disconnected! */
2460 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2461 NULL);
2462 _GSS_statistics = GNUNET_STATISTICS_create ("seti",
2463 cfg);
2464 cadet = GNUNET_CADET_connect (cfg);
2465 if (NULL == cadet)
2466 {
2467 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2468 _ ("Could not connect to CADET service\n"));
2469 GNUNET_SCHEDULER_shutdown ();
2470 return;
2471 }
2472}
2473
2474
2475/**
2476 * Define "main" method using service macro.
2477 */
2478GNUNET_SERVICE_MAIN (
2479 "seti",
2480 GNUNET_SERVICE_OPTION_NONE,
2481 &run,
2482 &client_connect_cb,
2483 &client_disconnect_cb,
2484 NULL,
2485 GNUNET_MQ_hd_fixed_size (client_accept,
2486 GNUNET_MESSAGE_TYPE_SETI_ACCEPT,
2487 struct GNUNET_SETI_AcceptMessage,
2488 NULL),
2489 GNUNET_MQ_hd_var_size (client_set_add,
2490 GNUNET_MESSAGE_TYPE_SETI_ADD,
2491 struct GNUNET_SETI_ElementMessage,
2492 NULL),
2493 GNUNET_MQ_hd_fixed_size (client_create_set,
2494 GNUNET_MESSAGE_TYPE_SETI_CREATE,
2495 struct GNUNET_SETI_CreateMessage,
2496 NULL),
2497 GNUNET_MQ_hd_var_size (client_evaluate,
2498 GNUNET_MESSAGE_TYPE_SETI_EVALUATE,
2499 struct GNUNET_SETI_EvaluateMessage,
2500 NULL),
2501 GNUNET_MQ_hd_fixed_size (client_listen,
2502 GNUNET_MESSAGE_TYPE_SETI_LISTEN,
2503 struct GNUNET_SETI_ListenMessage,
2504 NULL),
2505 GNUNET_MQ_hd_fixed_size (client_reject,
2506 GNUNET_MESSAGE_TYPE_SETI_REJECT,
2507 struct GNUNET_SETI_RejectMessage,
2508 NULL),
2509 GNUNET_MQ_hd_fixed_size (client_cancel,
2510 GNUNET_MESSAGE_TYPE_SETI_CANCEL,
2511 struct GNUNET_SETI_CancelMessage,
2512 NULL),
2513 GNUNET_MQ_handler_end ());
2514
2515
2516/* end of gnunet-service-seti.c */