aboutsummaryrefslogtreecommitdiff
path: root/src/seti/gnunet-service-seti.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/seti/gnunet-service-seti.c')
-rw-r--r--src/seti/gnunet-service-seti.c2515
1 files changed, 2515 insertions, 0 deletions
diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c
new file mode 100644
index 000000000..af478233b
--- /dev/null
+++ b/src/seti/gnunet-service-seti.c
@@ -0,0 +1,2515 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-seti.c
22 * @brief two-peer set intersection operations
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "gnunet-service-seti_protocol.h"
27#include "gnunet_statistics_service.h"
28#include "gnunet_cadet_service.h"
29#include "gnunet_seti_service.h"
30#include "gnunet_block_lib.h"
31#include "seti.h"
32
33/**
34 * How long do we hold on to an incoming channel if there is
35 * no local listener before giving up?
36 */
37#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
38
39
40/**
41 * Current phase we are in for a intersection operation.
42 */
43enum IntersectionOperationPhase
44{
45 /**
46 * We are just starting.
47 */
48 PHASE_INITIAL,
49
50 /**
51 * We have send the number of our elements to the other
52 * peer, but did not setup our element set yet.
53 */
54 PHASE_COUNT_SENT,
55
56 /**
57 * We have initialized our set and are now reducing it by exchanging
58 * Bloom filters until one party notices the their element hashes
59 * are equal.
60 */
61 PHASE_BF_EXCHANGE,
62
63 /**
64 * We must next send the P2P DONE message (after finishing mostly
65 * with the local client). Then we will wait for the channel to close.
66 */
67 PHASE_MUST_SEND_DONE,
68
69 /**
70 * We have received the P2P DONE message, and must finish with the
71 * local client before terminating the channel.
72 */
73 PHASE_DONE_RECEIVED,
74
75 /**
76 * The protocol is over. Results may still have to be sent to the
77 * client.
78 */
79 PHASE_FINISHED
80};
81
82
83/**
84 * A set that supports a specific operation with other peers.
85 */
86struct Set;
87
88/**
89 * Information about an element element in the set. All elements are
90 * stored in a hash-table from their hash-code to their 'struct
91 * Element', so that the remove and add operations are reasonably
92 * fast.
93 */
94struct ElementEntry;
95
96/**
97 * Operation context used to execute a set operation.
98 */
99struct Operation;
100
101
102/**
103 * Information about an element element in the set. All elements are
104 * stored in a hash-table from their hash-code to their `struct
105 * Element`, so that the remove and add operations are reasonably
106 * fast.
107 */
108struct ElementEntry
109{
110 /**
111 * The actual element. The data for the element
112 * should be allocated at the end of this struct.
113 */
114 struct GNUNET_SETI_Element element;
115
116 /**
117 * Hash of the element. For set union: Will be used to derive the
118 * different IBF keys for different salts.
119 */
120 struct GNUNET_HashCode element_hash;
121
122 /**
123 * Generation in which the element was added.
124 */
125 unsigned int generation_added;
126
127 /**
128 * #GNUNET_YES if the element is a remote element, and does not belong
129 * to the operation's set.
130 */
131 int remote;
132};
133
134
135/**
136 * A listener is inhabited by a client, and waits for evaluation
137 * requests from remote peers.
138 */
139struct Listener;
140
141
142/**
143 * State we keep per client.
144 */
145struct ClientState
146{
147 /**
148 * Set, if associated with the client, otherwise NULL.
149 */
150 struct Set *set;
151
152 /**
153 * Listener, if associated with the client, otherwise NULL.
154 */
155 struct Listener *listener;
156
157 /**
158 * Client handle.
159 */
160 struct GNUNET_SERVICE_Client *client;
161
162 /**
163 * Message queue.
164 */
165 struct GNUNET_MQ_Handle *mq;
166};
167
168
169/**
170 * Operation context used to execute a set operation.
171 */
172struct Operation
173{
174 /**
175 * The identity of the requesting peer. Needs to
176 * be stored here as the op spec might not have been created yet.
177 */
178 struct GNUNET_PeerIdentity peer;
179
180 /**
181 * XOR of the keys of all of the elements (remaining) in my set.
182 * Always updated when elements are added or removed to
183 * @e my_elements.
184 */
185 struct GNUNET_HashCode my_xor;
186
187 /**
188 * XOR of the keys of all of the elements (remaining) in
189 * the other peer's set. Updated when we receive the
190 * other peer's Bloom filter.
191 */
192 struct GNUNET_HashCode other_xor;
193
194 /**
195 * Kept in a DLL of the listener, if @e listener is non-NULL.
196 */
197 struct Operation *next;
198
199 /**
200 * Kept in a DLL of the listener, if @e listener is non-NULL.
201 */
202 struct Operation *prev;
203
204 /**
205 * Channel to the peer.
206 */
207 struct GNUNET_CADET_Channel *channel;
208
209 /**
210 * Port this operation runs on.
211 */
212 struct Listener *listener;
213
214 /**
215 * Message queue for the channel.
216 */
217 struct GNUNET_MQ_Handle *mq;
218
219 /**
220 * Context message, may be NULL.
221 */
222 struct GNUNET_MessageHeader *context_msg;
223
224 /**
225 * Set associated with the operation, NULL until the spec has been
226 * associated with a set.
227 */
228 struct Set *set;
229
230 /**
231 * The bf we currently receive
232 */
233 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
234
235 /**
236 * BF of the set's element.
237 */
238 struct GNUNET_CONTAINER_BloomFilter *local_bf;
239
240 /**
241 * Remaining elements in the intersection operation.
242 * Maps element-id-hashes to 'elements in our set'.
243 */
244 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
245
246 /**
247 * Iterator for sending the final set of @e my_elements to the client.
248 */
249 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
250
251 /**
252 * For multipart BF transmissions, we have to store the
253 * bloomfilter-data until we fully received it.
254 */
255 char *bf_data;
256
257 /**
258 * Timeout task, if the incoming peer has not been accepted
259 * after the timeout, it will be disconnected.
260 */
261 struct GNUNET_SCHEDULER_Task *timeout_task;
262
263 /**
264 * How many bytes of @e bf_data are valid?
265 */
266 uint32_t bf_data_offset;
267
268 /**
269 * Current element count contained within @e my_elements.
270 * (May differ briefly during initialization.)
271 */
272 uint32_t my_element_count;
273
274 /**
275 * size of the bloomfilter in @e bf_data.
276 */
277 uint32_t bf_data_size;
278
279 /**
280 * size of the bloomfilter
281 */
282 uint32_t bf_bits_per_element;
283
284 /**
285 * Salt currently used for BF construction (by us or the other peer,
286 * depending on where we are in the code).
287 */
288 uint32_t salt;
289
290 /**
291 * Current state of the operation.
292 */
293 enum IntersectionOperationPhase phase;
294
295 /**
296 * Generation in which the operation handle was created.
297 */
298 unsigned int generation_created;
299
300 /**
301 * Did we send the client that we are done?
302 */
303 int client_done_sent;
304
305 /**
306 * Set whenever we reach the state where the death of the
307 * channel is perfectly find and should NOT result in the
308 * operation being cancelled.
309 */
310 int channel_death_expected;
311
312 /**
313 * Remote peers element count
314 */
315 uint32_t remote_element_count;
316
317 /**
318 * ID used to identify an operation between service and client
319 */
320 uint32_t client_request_id;
321
322 /**
323 * When are elements sent to the client, and which elements are sent?
324 */
325 int return_intersection;
326
327 /**
328 * Unique request id for the request from a remote peer, sent to the
329 * client, which will accept or reject the request. Set to '0' iff
330 * the request has not been suggested yet.
331 */
332 uint32_t suggest_id;
333
334};
335
336
337/**
338 * SetContent stores the actual set elements, which may be shared by
339 * multiple generations derived from one set.
340 */
341struct SetContent
342{
343 /**
344 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
345 */
346 struct GNUNET_CONTAINER_MultiHashMap *elements;
347
348 /**
349 * Number of references to the content.
350 */
351 unsigned int refcount;
352
353 /**
354 * FIXME: document!
355 */
356 unsigned int latest_generation;
357
358 /**
359 * Number of concurrently active iterators.
360 */
361 int iterator_count;
362};
363
364
365/**
366 * A set that supports a specific operation with other peers.
367 */
368struct Set
369{
370 /**
371 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
372 */
373 struct Set *next;
374
375 /**
376 * Sets are held in a doubly linked list.
377 */
378 struct Set *prev;
379
380 /**
381 * Client that owns the set. Only one client may own a set,
382 * and there can only be one set per client.
383 */
384 struct ClientState *cs;
385
386 /**
387 * Content, possibly shared by multiple sets,
388 * and thus reference counted.
389 */
390 struct SetContent *content;
391
392 /**
393 * Number of currently valid elements in the set which have not been
394 * removed.
395 */
396 uint32_t current_set_element_count;
397
398 /**
399 * Evaluate operations are held in a linked list.
400 */
401 struct Operation *ops_head;
402
403 /**
404 * Evaluate operations are held in a linked list.
405 */
406 struct Operation *ops_tail;
407
408 /**
409 * Current generation, that is, number of previously executed
410 * operations and lazy copies on the underlying set content.
411 */
412 unsigned int current_generation;
413
414};
415
416
417/**
418 * A listener is inhabited by a client, and waits for evaluation
419 * requests from remote peers.
420 */
421struct Listener
422{
423 /**
424 * Listeners are held in a doubly linked list.
425 */
426 struct Listener *next;
427
428 /**
429 * Listeners are held in a doubly linked list.
430 */
431 struct Listener *prev;
432
433 /**
434 * Head of DLL of operations this listener is responsible for.
435 * Once the client has accepted/declined the operation, the
436 * operation is moved to the respective set's operation DLLS.
437 */
438 struct Operation *op_head;
439
440 /**
441 * Tail of DLL of operations this listener is responsible for.
442 * Once the client has accepted/declined the operation, the
443 * operation is moved to the respective set's operation DLLS.
444 */
445 struct Operation *op_tail;
446
447 /**
448 * Client that owns the listener.
449 * Only one client may own a listener.
450 */
451 struct ClientState *cs;
452
453 /**
454 * The port we are listening on with CADET.
455 */
456 struct GNUNET_CADET_Port *open_port;
457
458 /**
459 * Application ID for the operation, used to distinguish
460 * multiple operations of the same type with the same peer.
461 */
462 struct GNUNET_HashCode app_id;
463
464};
465
466
467/**
468 * Handle to the cadet service, used to listen for and connect to
469 * remote peers.
470 */
471static struct GNUNET_CADET_Handle *cadet;
472
473/**
474 * Statistics handle.
475 */
476static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
477
478/**
479 * Listeners are held in a doubly linked list.
480 */
481static struct Listener *listener_head;
482
483/**
484 * Listeners are held in a doubly linked list.
485 */
486static struct Listener *listener_tail;
487
488/**
489 * Number of active clients.
490 */
491static unsigned int num_clients;
492
493/**
494 * Are we in shutdown? if #GNUNET_YES and the number of clients
495 * drops to zero, disconnect from CADET.
496 */
497static int in_shutdown;
498
499/**
500 * Counter for allocating unique IDs for clients, used to identify
501 * incoming operation requests from remote peers, that the client can
502 * choose to accept or refuse. 0 must not be used (reserved for
503 * uninitialized).
504 */
505static uint32_t suggest_id;
506
507
508/**
509 * If applicable in the current operation mode, send a result message
510 * to the client indicating we removed an element.
511 *
512 * @param op intersection operation
513 * @param element element to send
514 */
515static void
516send_client_removed_element (struct Operation *op,
517 struct GNUNET_SETI_Element *element)
518{
519 struct GNUNET_MQ_Envelope *ev;
520 struct GNUNET_SETI_ResultMessage *rm;
521
522 if (GNUNET_YES == op->return_intersection)
523 {
524 GNUNET_break (0);
525 return; /* Wrong mode for transmitting removed elements */
526 }
527 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
528 "Sending removed element (size %u) to client\n",
529 element->size);
530 GNUNET_STATISTICS_update (_GSS_statistics,
531 "# Element removed messages sent",
532 1,
533 GNUNET_NO);
534 GNUNET_assert (0 != op->client_request_id);
535 ev = GNUNET_MQ_msg_extra (rm,
536 element->size,
537 GNUNET_MESSAGE_TYPE_SETI_RESULT);
538 if (NULL == ev)
539 {
540 GNUNET_break (0);
541 return;
542 }
543 rm->result_status = htons (GNUNET_SETI_STATUS_DEL_LOCAL);
544 rm->request_id = htonl (op->client_request_id);
545 rm->element_type = element->element_type;
546 GNUNET_memcpy (&rm[1],
547 element->data,
548 element->size);
549 GNUNET_MQ_send (op->set->cs->mq,
550 ev);
551}
552
553
554/**
555 * Is element @a ee part of the set used by @a op?
556 *
557 * @param ee element to test
558 * @param op operation the defines the set and its generation
559 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
560 */
561static int
562_GSS_is_element_of_operation (struct ElementEntry *ee,
563 struct Operation *op)
564{
565 return op->generation_created >= ee->generation_added;
566}
567
568
569/**
570 * Fills the "my_elements" hashmap with all relevant elements.
571 *
572 * @param cls the `struct Operation *` we are performing
573 * @param key current key code
574 * @param value the `struct ElementEntry *` from the hash map
575 * @return #GNUNET_YES (we should continue to iterate)
576 */
577static int
578filtered_map_initialization (void *cls,
579 const struct GNUNET_HashCode *key,
580 void *value)
581{
582 struct Operation *op = cls;
583 struct ElementEntry *ee = value;
584 struct GNUNET_HashCode mutated_hash;
585
586 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587 "FIMA called for %s:%u\n",
588 GNUNET_h2s (&ee->element_hash),
589 ee->element.size);
590
591 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
592 {
593 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
595 GNUNET_h2s (&ee->element_hash),
596 ee->element.size);
597 return GNUNET_YES; /* element not valid in our operation's generation */
598 }
599
600 /* Test if element is in other peer's bloomfilter */
601 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
602 op->salt,
603 &mutated_hash);
604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
605 "Testing mingled hash %s with salt %u\n",
606 GNUNET_h2s (&mutated_hash),
607 op->salt);
608 if (GNUNET_NO ==
609 GNUNET_CONTAINER_bloomfilter_test (op->remote_bf,
610 &mutated_hash))
611 {
612 /* remove this element */
613 send_client_removed_element (op,
614 &ee->element);
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616 "Reduced initialization, not starting with %s:%u\n",
617 GNUNET_h2s (&ee->element_hash),
618 ee->element.size);
619 return GNUNET_YES;
620 }
621 op->my_element_count++;
622 GNUNET_CRYPTO_hash_xor (&op->my_xor,
623 &ee->element_hash,
624 &op->my_xor);
625 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
626 "Filtered initialization of my_elements, adding %s:%u\n",
627 GNUNET_h2s (&ee->element_hash),
628 ee->element.size);
629 GNUNET_break (GNUNET_YES ==
630 GNUNET_CONTAINER_multihashmap_put (op->my_elements,
631 &ee->element_hash,
632 ee,
633 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
634
635 return GNUNET_YES;
636}
637
638
639/**
640 * Removes elements from our hashmap if they are not contained within the
641 * provided remote bloomfilter.
642 *
643 * @param cls closure with the `struct Operation *`
644 * @param key current key code
645 * @param value value in the hash map
646 * @return #GNUNET_YES (we should continue to iterate)
647 */
648static int
649iterator_bf_reduce (void *cls,
650 const struct GNUNET_HashCode *key,
651 void *value)
652{
653 struct Operation *op = cls;
654 struct ElementEntry *ee = value;
655 struct GNUNET_HashCode mutated_hash;
656
657 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
658 op->salt,
659 &mutated_hash);
660 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661 "Testing mingled hash %s with salt %u\n",
662 GNUNET_h2s (&mutated_hash),
663 op->salt);
664 if (GNUNET_NO ==
665 GNUNET_CONTAINER_bloomfilter_test (op->remote_bf,
666 &mutated_hash))
667 {
668 GNUNET_break (0 < op->my_element_count);
669 op->my_element_count--;
670 GNUNET_CRYPTO_hash_xor (&op->my_xor,
671 &ee->element_hash,
672 &op->my_xor);
673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674 "Bloom filter reduction of my_elements, removing %s:%u\n",
675 GNUNET_h2s (&ee->element_hash),
676 ee->element.size);
677 GNUNET_assert (GNUNET_YES ==
678 GNUNET_CONTAINER_multihashmap_remove (op->my_elements,
679 &ee->element_hash,
680 ee));
681 send_client_removed_element (op,
682 &ee->element);
683 }
684 else
685 {
686 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687 "Bloom filter reduction of my_elements, keeping %s:%u\n",
688 GNUNET_h2s (&ee->element_hash),
689 ee->element.size);
690 }
691 return GNUNET_YES;
692}
693
694
695/**
696 * Create initial bloomfilter based on all the elements given.
697 *
698 * @param cls the `struct Operation *`
699 * @param key current key code
700 * @param value the `struct ElementEntry` to process
701 * @return #GNUNET_YES (we should continue to iterate)
702 */
703static int
704iterator_bf_create (void *cls,
705 const struct GNUNET_HashCode *key,
706 void *value)
707{
708 struct Operation *op = cls;
709 struct ElementEntry *ee = value;
710 struct GNUNET_HashCode mutated_hash;
711
712 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
713 op->salt,
714 &mutated_hash);
715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716 "Initializing BF with hash %s with salt %u\n",
717 GNUNET_h2s (&mutated_hash),
718 op->salt);
719 GNUNET_CONTAINER_bloomfilter_add (op->local_bf,
720 &mutated_hash);
721 return GNUNET_YES;
722}
723
724
725/**
726 * Destroy the given operation. Used for any operation where both
727 * peers were known and that thus actually had a vt and channel. Must
728 * not be used for operations where 'listener' is still set and we do
729 * not know the other peer.
730 *
731 * Call the implementation-specific cancel function of the operation.
732 * Disconnects from the remote peer. Does not disconnect the client,
733 * as there may be multiple operations per set.
734 *
735 * @param op operation to destroy
736 */
737static void
738_GSS_operation_destroy (struct Operation *op)
739{
740 struct Set *set = op->set;
741 struct GNUNET_CADET_Channel *channel;
742
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
744 GNUNET_assert (NULL == op->listener);
745 if (NULL != op->remote_bf)
746 {
747 GNUNET_CONTAINER_bloomfilter_free (op->remote_bf);
748 op->remote_bf = NULL;
749 }
750 if (NULL != op->local_bf)
751 {
752 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
753 op->local_bf = NULL;
754 }
755 if (NULL != op->my_elements)
756 {
757 GNUNET_CONTAINER_multihashmap_destroy (op->my_elements);
758 op->my_elements = NULL;
759 }
760 if (NULL != op->full_result_iter)
761 {
762 GNUNET_CONTAINER_multihashmap_iterator_destroy (
763 op->full_result_iter);
764 op->full_result_iter = NULL;
765 }
766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767 "Destroying intersection op state done\n");
768 if (NULL != set)
769 {
770 GNUNET_CONTAINER_DLL_remove (set->ops_head,
771 set->ops_tail,
772 op);
773 op->set = NULL;
774 }
775 if (NULL != op->context_msg)
776 {
777 GNUNET_free (op->context_msg);
778 op->context_msg = NULL;
779 }
780 if (NULL != (channel = op->channel))
781 {
782 /* This will free op; called conditionally as this helper function
783 is also called from within the channel disconnect handler. */
784 op->channel = NULL;
785 GNUNET_CADET_channel_destroy (channel);
786 }
787 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
788 * there was a channel end handler that will free 'op' on the call stack. */
789}
790
791
792/**
793 * This function probably should not exist
794 * and be replaced by inlining more specific
795 * logic in the various places where it is called.
796 */
797static void
798_GSS_operation_destroy2 (struct Operation *op);
799
800
801/**
802 * Destroy an incoming request from a remote peer
803 *
804 * @param op remote request to destroy
805 */
806static void
807incoming_destroy (struct Operation *op)
808{
809 struct Listener *listener;
810
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "Destroying incoming operation %p\n",
813 op);
814 if (NULL != (listener = op->listener))
815 {
816 GNUNET_CONTAINER_DLL_remove (listener->op_head,
817 listener->op_tail,
818 op);
819 op->listener = NULL;
820 }
821 if (NULL != op->timeout_task)
822 {
823 GNUNET_SCHEDULER_cancel (op->timeout_task);
824 op->timeout_task = NULL;
825 }
826 _GSS_operation_destroy2 (op);
827}
828
829
830/**
831 * Signal to the client that the operation has finished and
832 * destroy the operation.
833 *
834 * @param cls operation to destroy
835 */
836static void
837send_client_done_and_destroy (void *cls)
838{
839 struct Operation *op = cls;
840 struct GNUNET_MQ_Envelope *ev;
841 struct GNUNET_SETI_ResultMessage *rm;
842
843 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844 "Intersection succeeded, sending DONE to local client\n");
845 GNUNET_STATISTICS_update (_GSS_statistics,
846 "# Intersection operations succeeded",
847 1,
848 GNUNET_NO);
849 ev = GNUNET_MQ_msg (rm,
850 GNUNET_MESSAGE_TYPE_SETI_RESULT);
851 rm->request_id = htonl (op->client_request_id);
852 rm->result_status = htons (GNUNET_SETI_STATUS_DONE);
853 rm->element_type = htons (0);
854 GNUNET_MQ_send (op->set->cs->mq,
855 ev);
856 _GSS_operation_destroy (op);
857}
858
859
860/**
861 * This function probably should not exist
862 * and be replaced by inlining more specific
863 * logic in the various places where it is called.
864 */
865static void
866_GSS_operation_destroy2 (struct Operation *op)
867{
868 struct GNUNET_CADET_Channel *channel;
869
870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871 "channel_end_cb called\n");
872 if (NULL != (channel = op->channel))
873 {
874 /* This will free op; called conditionally as this helper function
875 is also called from within the channel disconnect handler. */
876 op->channel = NULL;
877 GNUNET_CADET_channel_destroy (channel);
878 }
879 if (NULL != op->listener)
880 {
881 incoming_destroy (op);
882 return;
883 }
884 if (NULL != op->set)
885 {
886 if (GNUNET_YES == op->channel_death_expected)
887 {
888 /* oh goodie, we are done! */
889 send_client_done_and_destroy (op);
890 }
891 else
892 {
893 /* sorry, channel went down early, too bad. */
894 _GSS_operation_destroy (op);
895 }
896 }
897 else
898 _GSS_operation_destroy (op);
899 GNUNET_free (op);
900}
901
902
903/**
904 * Inform the client that the intersection operation has failed,
905 * and proceed to destroy the evaluate operation.
906 *
907 * @param op the intersection operation to fail
908 */
909static void
910fail_intersection_operation (struct Operation *op)
911{
912 struct GNUNET_MQ_Envelope *ev;
913 struct GNUNET_SETI_ResultMessage *msg;
914
915 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
916 "Intersection operation failed\n");
917 GNUNET_STATISTICS_update (_GSS_statistics,
918 "# Intersection operations failed",
919 1,
920 GNUNET_NO);
921 if (NULL != op->my_elements)
922 {
923 GNUNET_CONTAINER_multihashmap_destroy (op->my_elements);
924 op->my_elements = NULL;
925 }
926 ev = GNUNET_MQ_msg (msg,
927 GNUNET_MESSAGE_TYPE_SETI_RESULT);
928 msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
929 msg->request_id = htonl (op->client_request_id);
930 msg->element_type = htons (0);
931 GNUNET_MQ_send (op->set->cs->mq,
932 ev);
933 _GSS_operation_destroy (op);
934}
935
936
937/**
938 * Send a bloomfilter to our peer. After the result done message has
939 * been sent to the client, destroy the evaluate operation.
940 *
941 * @param op intersection operation
942 */
943static void
944send_bloomfilter (struct Operation *op)
945{
946 struct GNUNET_MQ_Envelope *ev;
947 struct BFMessage *msg;
948 uint32_t bf_size;
949 uint32_t bf_elementbits;
950 uint32_t chunk_size;
951 char *bf_data;
952 uint32_t offset;
953
954 /* We consider the ratio of the set sizes to determine
955 the number of bits per element, as the smaller set
956 should use more bits to maximize its set reduction
957 potential and minimize overall bandwidth consumption. */
958 bf_elementbits = 2 + ceil (log2 ((double)
959 (op->remote_element_count
960 / (double) op->my_element_count)));
961 if (bf_elementbits < 1)
962 bf_elementbits = 1; /* make sure k is not 0 */
963 /* optimize BF-size to ~50% of bits set */
964 bf_size = ceil ((double) (op->my_element_count
965 * bf_elementbits / log (2)));
966 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
967 "Sending Bloom filter (%u) of size %u bytes\n",
968 (unsigned int) bf_elementbits,
969 (unsigned int) bf_size);
970 op->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
971 bf_size,
972 bf_elementbits);
973 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
974 UINT32_MAX);
975 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
976 &iterator_bf_create,
977 op);
978
979 /* send our Bloom filter */
980 GNUNET_STATISTICS_update (_GSS_statistics,
981 "# Intersection Bloom filters sent",
982 1,
983 GNUNET_NO);
984 chunk_size = 60 * 1024 - sizeof(struct BFMessage);
985 if (bf_size <= chunk_size)
986 {
987 /* singlepart */
988 chunk_size = bf_size;
989 ev = GNUNET_MQ_msg_extra (msg,
990 chunk_size,
991 GNUNET_MESSAGE_TYPE_SETI_P2P_BF);
992 GNUNET_assert (GNUNET_SYSERR !=
993 GNUNET_CONTAINER_bloomfilter_get_raw_data (
994 op->local_bf,
995 (char *) &msg[1],
996 bf_size));
997 msg->sender_element_count = htonl (op->my_element_count);
998 msg->bloomfilter_total_length = htonl (bf_size);
999 msg->bits_per_element = htonl (bf_elementbits);
1000 msg->sender_mutator = htonl (op->salt);
1001 msg->element_xor_hash = op->my_xor;
1002 GNUNET_MQ_send (op->mq, ev);
1003 }
1004 else
1005 {
1006 /* multipart */
1007 bf_data = GNUNET_malloc (bf_size);
1008 GNUNET_assert (GNUNET_SYSERR !=
1009 GNUNET_CONTAINER_bloomfilter_get_raw_data (
1010 op->local_bf,
1011 bf_data,
1012 bf_size));
1013 offset = 0;
1014 while (offset < bf_size)
1015 {
1016 if (bf_size - chunk_size < offset)
1017 chunk_size = bf_size - offset;
1018 ev = GNUNET_MQ_msg_extra (msg,
1019 chunk_size,
1020 GNUNET_MESSAGE_TYPE_SETI_P2P_BF);
1021 GNUNET_memcpy (&msg[1],
1022 &bf_data[offset],
1023 chunk_size);
1024 offset += chunk_size;
1025 msg->sender_element_count = htonl (op->my_element_count);
1026 msg->bloomfilter_total_length = htonl (bf_size);
1027 msg->bits_per_element = htonl (bf_elementbits);
1028 msg->sender_mutator = htonl (op->salt);
1029 msg->element_xor_hash = op->my_xor;
1030 GNUNET_MQ_send (op->mq, ev);
1031 }
1032 GNUNET_free (bf_data);
1033 }
1034 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
1035 op->local_bf = NULL;
1036}
1037
1038
1039/**
1040 * Remember that we are done dealing with the local client
1041 * AND have sent the other peer our message that we are done,
1042 * so we are not just waiting for the channel to die before
1043 * telling the local client that we are done as our last act.
1044 *
1045 * @param cls the `struct Operation`.
1046 */
1047static void
1048finished_local_operations (void *cls)
1049{
1050 struct Operation *op = cls;
1051
1052 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1053 "DONE sent to other peer, now waiting for other end to close the channel\n");
1054 op->phase = PHASE_FINISHED;
1055 op->channel_death_expected = GNUNET_YES;
1056}
1057
1058
1059/**
1060 * Notify the other peer that we are done. Once this message
1061 * is out, we still need to notify the local client that we
1062 * are done.
1063 *
1064 * @param op operation to notify for.
1065 */
1066static void
1067send_p2p_done (struct Operation *op)
1068{
1069 struct GNUNET_MQ_Envelope *ev;
1070 struct IntersectionDoneMessage *idm;
1071
1072 GNUNET_assert (PHASE_MUST_SEND_DONE == op->phase);
1073 GNUNET_assert (GNUNET_NO == op->channel_death_expected);
1074 ev = GNUNET_MQ_msg (idm,
1075 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE);
1076 idm->final_element_count = htonl (op->my_element_count);
1077 idm->element_xor_hash = op->my_xor;
1078 GNUNET_MQ_notify_sent (ev,
1079 &finished_local_operations,
1080 op);
1081 GNUNET_MQ_send (op->mq,
1082 ev);
1083}
1084
1085
1086/**
1087 * Send all elements in the full result iterator.
1088 *
1089 * @param cls the `struct Operation *`
1090 */
1091static void
1092send_remaining_elements (void *cls)
1093{
1094 struct Operation *op = cls;
1095 const void *nxt;
1096 const struct ElementEntry *ee;
1097 struct GNUNET_MQ_Envelope *ev;
1098 struct GNUNET_SETI_ResultMessage *rm;
1099 const struct GNUNET_SETI_Element *element;
1100 int res;
1101
1102 if (GNUNET_NO == op->return_intersection)
1103 {
1104 GNUNET_break (0);
1105 return; /* Wrong mode for transmitting removed elements */
1106 }
1107 res = GNUNET_CONTAINER_multihashmap_iterator_next (
1108 op->full_result_iter,
1109 NULL,
1110 &nxt);
1111 if (GNUNET_NO == res)
1112 {
1113 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114 "Sending done and destroy because iterator ran out\n");
1115 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1116 op->full_result_iter);
1117 op->full_result_iter = NULL;
1118 if (PHASE_DONE_RECEIVED == op->phase)
1119 {
1120 op->phase = PHASE_FINISHED;
1121 send_client_done_and_destroy (op);
1122 }
1123 else if (PHASE_MUST_SEND_DONE == op->phase)
1124 {
1125 send_p2p_done (op);
1126 }
1127 else
1128 {
1129 GNUNET_assert (0);
1130 }
1131 return;
1132 }
1133 ee = nxt;
1134 element = &ee->element;
1135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136 "Sending element %s:%u to client (full set)\n",
1137 GNUNET_h2s (&ee->element_hash),
1138 element->size);
1139 GNUNET_assert (0 != op->client_request_id);
1140 ev = GNUNET_MQ_msg_extra (rm,
1141 element->size,
1142 GNUNET_MESSAGE_TYPE_SETI_RESULT);
1143 GNUNET_assert (NULL != ev);
1144 rm->result_status = htons (GNUNET_SETI_STATUS_ADD_LOCAL);
1145 rm->request_id = htonl (op->client_request_id);
1146 rm->element_type = element->element_type;
1147 GNUNET_memcpy (&rm[1],
1148 element->data,
1149 element->size);
1150 GNUNET_MQ_notify_sent (ev,
1151 &send_remaining_elements,
1152 op);
1153 GNUNET_MQ_send (op->set->cs->mq,
1154 ev);
1155}
1156
1157
1158/**
1159 * Fills the "my_elements" hashmap with the initial set of
1160 * (non-deleted) elements from the set of the specification.
1161 *
1162 * @param cls closure with the `struct Operation *`
1163 * @param key current key code for the element
1164 * @param value value in the hash map with the `struct ElementEntry *`
1165 * @return #GNUNET_YES (we should continue to iterate)
1166 */
1167static int
1168initialize_map_unfiltered (void *cls,
1169 const struct GNUNET_HashCode *key,
1170 void *value)
1171{
1172 struct ElementEntry *ee = value;
1173 struct Operation *op = cls;
1174
1175 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1176 return GNUNET_YES; /* element not live in operation's generation */
1177 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1178 &ee->element_hash,
1179 &op->my_xor);
1180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181 "Initial full initialization of my_elements, adding %s:%u\n",
1182 GNUNET_h2s (&ee->element_hash),
1183 ee->element.size);
1184 GNUNET_break (GNUNET_YES ==
1185 GNUNET_CONTAINER_multihashmap_put (op->my_elements,
1186 &ee->element_hash,
1187 ee,
1188 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1189 return GNUNET_YES;
1190}
1191
1192
1193/**
1194 * Send our element count to the peer, in case our element count is
1195 * lower than theirs.
1196 *
1197 * @param op intersection operation
1198 */
1199static void
1200send_element_count (struct Operation *op)
1201{
1202 struct GNUNET_MQ_Envelope *ev;
1203 struct IntersectionElementInfoMessage *msg;
1204
1205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206 "Sending our element count (%u)\n",
1207 op->my_element_count);
1208 ev = GNUNET_MQ_msg (msg,
1209 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO);
1210 msg->sender_element_count = htonl (op->my_element_count);
1211 GNUNET_MQ_send (op->mq, ev);
1212}
1213
1214
1215/**
1216 * We go first, initialize our map with all elements and
1217 * send the first Bloom filter.
1218 *
1219 * @param op operation to start exchange for
1220 */
1221static void
1222begin_bf_exchange (struct Operation *op)
1223{
1224 op->phase = PHASE_BF_EXCHANGE;
1225 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1226 &initialize_map_unfiltered,
1227 op);
1228 send_bloomfilter (op);
1229}
1230
1231
1232/**
1233 * Handle the initial `struct IntersectionElementInfoMessage` from a
1234 * remote peer.
1235 *
1236 * @param cls the intersection operation
1237 * @param mh the header of the message
1238 */
1239static void
1240handle_intersection_p2p_element_info (void *cls,
1241 const struct
1242 IntersectionElementInfoMessage *msg)
1243{
1244 struct Operation *op = cls;
1245
1246 op->remote_element_count = ntohl (msg->sender_element_count);
1247 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1248 "Received remote element count (%u), I have %u\n",
1249 op->remote_element_count,
1250 op->my_element_count);
1251 if (((PHASE_INITIAL != op->phase) &&
1252 (PHASE_COUNT_SENT != op->phase)) ||
1253 (op->my_element_count > op->remote_element_count) ||
1254 (0 == op->my_element_count) ||
1255 (0 == op->remote_element_count))
1256 {
1257 GNUNET_break_op (0);
1258 fail_intersection_operation (op);
1259 return;
1260 }
1261 GNUNET_break (NULL == op->remote_bf);
1262 begin_bf_exchange (op);
1263 GNUNET_CADET_receive_done (op->channel);
1264}
1265
1266
1267/**
1268 * Process a Bloomfilter once we got all the chunks.
1269 *
1270 * @param op the intersection operation
1271 */
1272static void
1273process_bf (struct Operation *op)
1274{
1275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
1277 op->phase,
1278 op->remote_element_count,
1279 op->my_element_count,
1280 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
1281 switch (op->phase)
1282 {
1283 case PHASE_INITIAL:
1284 GNUNET_break_op (0);
1285 fail_intersection_operation (op);
1286 return;
1287 case PHASE_COUNT_SENT:
1288 /* This is the first BF being sent, build our initial map with
1289 filtering in place */
1290 op->my_element_count = 0;
1291 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1292 &filtered_map_initialization,
1293 op);
1294 break;
1295 case PHASE_BF_EXCHANGE:
1296 /* Update our set by reduction */
1297 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
1298 &iterator_bf_reduce,
1299 op);
1300 break;
1301 case PHASE_MUST_SEND_DONE:
1302 GNUNET_break_op (0);
1303 fail_intersection_operation (op);
1304 return;
1305 case PHASE_DONE_RECEIVED:
1306 GNUNET_break_op (0);
1307 fail_intersection_operation (op);
1308 return;
1309 case PHASE_FINISHED:
1310 GNUNET_break_op (0);
1311 fail_intersection_operation (op);
1312 return;
1313 }
1314 GNUNET_CONTAINER_bloomfilter_free (op->remote_bf);
1315 op->remote_bf = NULL;
1316
1317 if ((0 == op->my_element_count) || /* fully disjoint */
1318 ((op->my_element_count == op->remote_element_count) &&
1319 (0 == GNUNET_memcmp (&op->my_xor,
1320 &op->other_xor))))
1321 {
1322 /* we are done */
1323 op->phase = PHASE_MUST_SEND_DONE;
1324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1325 "Intersection succeeded, sending DONE to other peer\n");
1326 GNUNET_CONTAINER_bloomfilter_free (op->local_bf);
1327 op->local_bf = NULL;
1328 if (GNUNET_YES == op->return_intersection)
1329 {
1330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331 "Sending full result set (%u elements)\n",
1332 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1333 op->full_result_iter
1334 = GNUNET_CONTAINER_multihashmap_iterator_create (
1335 op->my_elements);
1336 send_remaining_elements (op);
1337 return;
1338 }
1339 send_p2p_done (op);
1340 return;
1341 }
1342 op->phase = PHASE_BF_EXCHANGE;
1343 send_bloomfilter (op);
1344}
1345
1346
1347/**
1348 * Check an BF message from a remote peer.
1349 *
1350 * @param cls the intersection operation
1351 * @param msg the header of the message
1352 * @return #GNUNET_OK if @a msg is well-formed
1353 */
1354static int
1355check_intersection_p2p_bf (void *cls,
1356 const struct BFMessage *msg)
1357{
1358 struct Operation *op = cls;
1359
1360 (void) op;
1361 return GNUNET_OK;
1362}
1363
1364
1365/**
1366 * Handle an BF message from a remote peer.
1367 *
1368 * @param cls the intersection operation
1369 * @param msg the header of the message
1370 */
1371static void
1372handle_intersection_p2p_bf (void *cls,
1373 const struct BFMessage *msg)
1374{
1375 struct Operation *op = cls;
1376 uint32_t bf_size;
1377 uint32_t chunk_size;
1378 uint32_t bf_bits_per_element;
1379
1380 switch (op->phase)
1381 {
1382 case PHASE_INITIAL:
1383 GNUNET_break_op (0);
1384 fail_intersection_operation (op);
1385 return;
1386
1387 case PHASE_COUNT_SENT:
1388 case PHASE_BF_EXCHANGE:
1389 bf_size = ntohl (msg->bloomfilter_total_length);
1390 bf_bits_per_element = ntohl (msg->bits_per_element);
1391 chunk_size = htons (msg->header.size) - sizeof(struct BFMessage);
1392 op->other_xor = msg->element_xor_hash;
1393 if (bf_size == chunk_size)
1394 {
1395 if (NULL != op->bf_data)
1396 {
1397 GNUNET_break_op (0);
1398 fail_intersection_operation (op);
1399 return;
1400 }
1401 /* single part, done here immediately */
1402 op->remote_bf
1403 = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1],
1404 bf_size,
1405 bf_bits_per_element);
1406 op->salt = ntohl (msg->sender_mutator);
1407 op->remote_element_count = ntohl (msg->sender_element_count);
1408 process_bf (op);
1409 break;
1410 }
1411 /* multipart chunk */
1412 if (NULL == op->bf_data)
1413 {
1414 /* first chunk, initialize */
1415 op->bf_data = GNUNET_malloc (bf_size);
1416 op->bf_data_size = bf_size;
1417 op->bf_bits_per_element = bf_bits_per_element;
1418 op->bf_data_offset = 0;
1419 op->salt = ntohl (msg->sender_mutator);
1420 op->remote_element_count = ntohl (msg->sender_element_count);
1421 }
1422 else
1423 {
1424 /* increment */
1425 if ((op->bf_data_size != bf_size) ||
1426 (op->bf_bits_per_element != bf_bits_per_element) ||
1427 (op->bf_data_offset + chunk_size > bf_size) ||
1428 (op->salt != ntohl (msg->sender_mutator)) ||
1429 (op->remote_element_count != ntohl (msg->sender_element_count)))
1430 {
1431 GNUNET_break_op (0);
1432 fail_intersection_operation (op);
1433 return;
1434 }
1435 }
1436 GNUNET_memcpy (&op->bf_data[op->bf_data_offset],
1437 (const char *) &msg[1],
1438 chunk_size);
1439 op->bf_data_offset += chunk_size;
1440 if (op->bf_data_offset == bf_size)
1441 {
1442 /* last chunk, run! */
1443 op->remote_bf
1444 = GNUNET_CONTAINER_bloomfilter_init (op->bf_data,
1445 bf_size,
1446 bf_bits_per_element);
1447 GNUNET_free (op->bf_data);
1448 op->bf_data = NULL;
1449 op->bf_data_size = 0;
1450 process_bf (op);
1451 }
1452 break;
1453
1454 default:
1455 GNUNET_break_op (0);
1456 fail_intersection_operation (op);
1457 return;
1458 }
1459 GNUNET_CADET_receive_done (op->channel);
1460}
1461
1462
1463/**
1464 * Remove all elements from our hashmap.
1465 *
1466 * @param cls closure with the `struct Operation *`
1467 * @param key current key code
1468 * @param value value in the hash map
1469 * @return #GNUNET_YES (we should continue to iterate)
1470 */
1471static int
1472filter_all (void *cls,
1473 const struct GNUNET_HashCode *key,
1474 void *value)
1475{
1476 struct Operation *op = cls;
1477 struct ElementEntry *ee = value;
1478
1479 GNUNET_break (0 < op->my_element_count);
1480 op->my_element_count--;
1481 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1482 &ee->element_hash,
1483 &op->my_xor);
1484 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1485 "Final reduction of my_elements, removing %s:%u\n",
1486 GNUNET_h2s (&ee->element_hash),
1487 ee->element.size);
1488 GNUNET_assert (GNUNET_YES ==
1489 GNUNET_CONTAINER_multihashmap_remove (op->my_elements,
1490 &ee->element_hash,
1491 ee));
1492 send_client_removed_element (op,
1493 &ee->element);
1494 return GNUNET_YES;
1495}
1496
1497
1498/**
1499 * Handle a done message from a remote peer
1500 *
1501 * @param cls the intersection operation
1502 * @param mh the message
1503 */
1504static void
1505handle_intersection_p2p_done (void *cls,
1506 const struct IntersectionDoneMessage *idm)
1507{
1508 struct Operation *op = cls;
1509
1510 if (PHASE_BF_EXCHANGE != op->phase)
1511 {
1512 /* wrong phase to conclude? FIXME: Or should we allow this
1513 if the other peer has _initially_ already an empty set? */
1514 GNUNET_break_op (0);
1515 fail_intersection_operation (op);
1516 return;
1517 }
1518 if (0 == ntohl (idm->final_element_count))
1519 {
1520 /* other peer determined empty set is the intersection,
1521 remove all elements */
1522 GNUNET_CONTAINER_multihashmap_iterate (op->my_elements,
1523 &filter_all,
1524 op);
1525 }
1526 if ((op->my_element_count != ntohl (idm->final_element_count)) ||
1527 (0 != GNUNET_memcmp (&op->my_xor,
1528 &idm->element_xor_hash)))
1529 {
1530 /* Other peer thinks we are done, but we disagree on the result! */
1531 GNUNET_break_op (0);
1532 fail_intersection_operation (op);
1533 return;
1534 }
1535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1537 op->my_element_count);
1538 op->phase = PHASE_DONE_RECEIVED;
1539 GNUNET_CADET_receive_done (op->channel);
1540
1541 GNUNET_assert (GNUNET_NO == op->client_done_sent);
1542 if (GNUNET_YES == op->return_intersection)
1543 {
1544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1545 "Sending full result set to client (%u elements)\n",
1546 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1547 op->full_result_iter
1548 = GNUNET_CONTAINER_multihashmap_iterator_create (op->my_elements);
1549 send_remaining_elements (op);
1550 return;
1551 }
1552 op->phase = PHASE_FINISHED;
1553 send_client_done_and_destroy (op);
1554}
1555
1556
1557/**
1558 * Get the incoming socket associated with the given id.
1559 *
1560 * @param listener the listener to look in
1561 * @param id id to look for
1562 * @return the incoming socket associated with the id,
1563 * or NULL if there is none
1564 */
1565static struct Operation *
1566get_incoming (uint32_t id)
1567{
1568 for (struct Listener *listener = listener_head; NULL != listener;
1569 listener = listener->next)
1570 {
1571 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
1572 if (op->suggest_id == id)
1573 return op;
1574 }
1575 return NULL;
1576}
1577
1578
1579/**
1580 * Callback called when a client connects to the service.
1581 *
1582 * @param cls closure for the service
1583 * @param c the new client that connected to the service
1584 * @param mq the message queue used to send messages to the client
1585 * @return @a `struct ClientState`
1586 */
1587static void *
1588client_connect_cb (void *cls,
1589 struct GNUNET_SERVICE_Client *c,
1590 struct GNUNET_MQ_Handle *mq)
1591{
1592 struct ClientState *cs;
1593
1594 num_clients++;
1595 cs = GNUNET_new (struct ClientState);
1596 cs->client = c;
1597 cs->mq = mq;
1598 return cs;
1599}
1600
1601
1602/**
1603 * Iterator over hash map entries to free element entries.
1604 *
1605 * @param cls closure
1606 * @param key current key code
1607 * @param value a `struct ElementEntry *` to be free'd
1608 * @return #GNUNET_YES (continue to iterate)
1609 */
1610static int
1611destroy_elements_iterator (void *cls,
1612 const struct GNUNET_HashCode *key,
1613 void *value)
1614{
1615 struct ElementEntry *ee = value;
1616
1617 GNUNET_free (ee);
1618 return GNUNET_YES;
1619}
1620
1621
1622/**
1623 * Clean up after a client has disconnected
1624 *
1625 * @param cls closure, unused
1626 * @param client the client to clean up after
1627 * @param internal_cls the `struct ClientState`
1628 */
1629static void
1630client_disconnect_cb (void *cls,
1631 struct GNUNET_SERVICE_Client *client,
1632 void *internal_cls)
1633{
1634 struct ClientState *cs = internal_cls;
1635 struct Operation *op;
1636 struct Listener *listener;
1637 struct Set *set;
1638
1639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
1640 if (NULL != (set = cs->set))
1641 {
1642 struct SetContent *content = set->content;
1643
1644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
1645 /* Destroy pending set operations */
1646 while (NULL != set->ops_head)
1647 _GSS_operation_destroy (set->ops_head);
1648
1649 /* free set content (or at least decrement RC) */
1650 set->content = NULL;
1651 GNUNET_assert (0 != content->refcount);
1652 content->refcount--;
1653 if (0 == content->refcount)
1654 {
1655 GNUNET_assert (NULL != content->elements);
1656 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
1657 &destroy_elements_iterator,
1658 NULL);
1659 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
1660 content->elements = NULL;
1661 GNUNET_free (content);
1662 }
1663 GNUNET_free (set);
1664 }
1665
1666 if (NULL != (listener = cs->listener))
1667 {
1668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
1669 GNUNET_CADET_close_port (listener->open_port);
1670 listener->open_port = NULL;
1671 while (NULL != (op = listener->op_head))
1672 {
1673 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1674 "Destroying incoming operation `%u' from peer `%s'\n",
1675 (unsigned int) op->client_request_id,
1676 GNUNET_i2s (&op->peer));
1677 incoming_destroy (op);
1678 }
1679 GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
1680 GNUNET_free (listener);
1681 }
1682 GNUNET_free (cs);
1683 num_clients--;
1684 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
1685 {
1686 if (NULL != cadet)
1687 {
1688 GNUNET_CADET_disconnect (cadet);
1689 cadet = NULL;
1690 }
1691 }
1692}
1693
1694
1695/**
1696 * Check a request for a set operation from another peer.
1697 *
1698 * @param cls the operation state
1699 * @param msg the received message
1700 * @return #GNUNET_OK if the channel should be kept alive,
1701 * #GNUNET_SYSERR to destroy the channel
1702 */
1703static int
1704check_incoming_msg (void *cls,
1705 const struct OperationRequestMessage *msg)
1706{
1707 struct Operation *op = cls;
1708 struct Listener *listener = op->listener;
1709 const struct GNUNET_MessageHeader *nested_context;
1710
1711 /* double operation request */
1712 if (0 != op->suggest_id)
1713 {
1714 GNUNET_break_op (0);
1715 return GNUNET_SYSERR;
1716 }
1717 /* This should be equivalent to the previous condition, but can't hurt to check twice */
1718 if (NULL == listener)
1719 {
1720 GNUNET_break (0);
1721 return GNUNET_SYSERR;
1722 }
1723 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1724 if ((NULL != nested_context) &&
1725 (ntohs (nested_context->size) > GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE))
1726 {
1727 GNUNET_break_op (0);
1728 return GNUNET_SYSERR;
1729 }
1730 return GNUNET_OK;
1731}
1732
1733
1734/**
1735 * Handle a request for a set operation from another peer. Checks if we
1736 * have a listener waiting for such a request (and in that case initiates
1737 * asking the listener about accepting the connection). If no listener
1738 * is waiting, we queue the operation request in hope that a listener
1739 * shows up soon (before timeout).
1740 *
1741 * This msg is expected as the first and only msg handled through the
1742 * non-operation bound virtual table, acceptance of this operation replaces
1743 * our virtual table and subsequent msgs would be routed differently (as
1744 * we then know what type of operation this is).
1745 *
1746 * @param cls the operation state
1747 * @param msg the received message
1748 * @return #GNUNET_OK if the channel should be kept alive,
1749 * #GNUNET_SYSERR to destroy the channel
1750 */
1751static void
1752handle_incoming_msg (void *cls,
1753 const struct OperationRequestMessage *msg)
1754{
1755 struct Operation *op = cls;
1756 struct Listener *listener = op->listener;
1757 const struct GNUNET_MessageHeader *nested_context;
1758 struct GNUNET_MQ_Envelope *env;
1759 struct GNUNET_SETI_RequestMessage *cmsg;
1760
1761 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1762 /* Make a copy of the nested_context (application-specific context
1763 information that is opaque to set) so we can pass it to the
1764 listener later on */
1765 if (NULL != nested_context)
1766 op->context_msg = GNUNET_copy_message (nested_context);
1767 op->remote_element_count = ntohl (msg->element_count);
1768 GNUNET_log (
1769 GNUNET_ERROR_TYPE_DEBUG,
1770 "Received P2P operation request (port %s) for active listener\n",
1771 GNUNET_h2s (&op->listener->app_id));
1772 GNUNET_assert (0 == op->suggest_id);
1773 if (0 == suggest_id)
1774 suggest_id++;
1775 op->suggest_id = suggest_id++;
1776 GNUNET_assert (NULL != op->timeout_task);
1777 GNUNET_SCHEDULER_cancel (op->timeout_task);
1778 op->timeout_task = NULL;
1779 env = GNUNET_MQ_msg_nested_mh (cmsg,
1780 GNUNET_MESSAGE_TYPE_SETI_REQUEST,
1781 op->context_msg);
1782 GNUNET_log (
1783 GNUNET_ERROR_TYPE_DEBUG,
1784 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
1785 op->suggest_id,
1786 listener,
1787 listener->cs);
1788 cmsg->accept_id = htonl (op->suggest_id);
1789 cmsg->peer_id = op->peer;
1790 GNUNET_MQ_send (listener->cs->mq, env);
1791 /* NOTE: GNUNET_CADET_receive_done() will be called in
1792 #handle_client_accept() */
1793}
1794
1795
1796/**
1797 * Called when a client wants to create a new set. This is typically
1798 * the first request from a client, and includes the type of set
1799 * operation to be performed.
1800 *
1801 * @param cls client that sent the message
1802 * @param m message sent by the client
1803 */
1804static void
1805handle_client_create_set (void *cls,
1806 const struct GNUNET_SETI_CreateMessage *msg)
1807{
1808 struct ClientState *cs = cls;
1809 struct Set *set;
1810
1811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1812 "Client created new intersection set\n");
1813 if (NULL != cs->set)
1814 {
1815 /* There can only be one set per client */
1816 GNUNET_break (0);
1817 GNUNET_SERVICE_client_drop (cs->client);
1818 return;
1819 }
1820 set = GNUNET_new (struct Set);
1821 set->content = GNUNET_new (struct SetContent);
1822 set->content->refcount = 1;
1823 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1824 GNUNET_YES);
1825 set->cs = cs;
1826 cs->set = set;
1827 GNUNET_SERVICE_client_continue (cs->client);
1828}
1829
1830
1831/**
1832 * Timeout happens iff:
1833 * - we suggested an operation to our listener,
1834 * but did not receive a response in time
1835 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST
1836 *
1837 * @param cls channel context
1838 * @param tc context information (why was this task triggered now)
1839 */
1840static void
1841incoming_timeout_cb (void *cls)
1842{
1843 struct Operation *op = cls;
1844
1845 op->timeout_task = NULL;
1846 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847 "Remote peer's incoming request timed out\n");
1848 incoming_destroy (op);
1849}
1850
1851
1852/**
1853 * Method called whenever another peer has added us to a channel the
1854 * other peer initiated. Only called (once) upon reception of data
1855 * from a channel we listen on.
1856 *
1857 * The channel context represents the operation itself and gets added
1858 * to a DLL, from where it gets looked up when our local listener
1859 * client responds to a proposed/suggested operation or connects and
1860 * associates with this operation.
1861 *
1862 * @param cls closure
1863 * @param channel new handle to the channel
1864 * @param source peer that started the channel
1865 * @return initial channel context for the channel
1866 * returns NULL on error
1867 */
1868static void *
1869channel_new_cb (void *cls,
1870 struct GNUNET_CADET_Channel *channel,
1871 const struct GNUNET_PeerIdentity *source)
1872{
1873 struct Listener *listener = cls;
1874 struct Operation *op;
1875
1876 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1877 "New incoming channel\n");
1878 op = GNUNET_new (struct Operation);
1879 op->listener = listener;
1880 op->peer = *source;
1881 op->channel = channel;
1882 op->mq = GNUNET_CADET_get_mq (op->channel);
1883 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1884 UINT32_MAX);
1885 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1886 &incoming_timeout_cb,
1887 op);
1888 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1889 listener->op_tail,
1890 op);
1891 return op;
1892}
1893
1894
1895/**
1896 * Function called whenever a channel is destroyed. Should clean up
1897 * any associated state. It must NOT call
1898 * GNUNET_CADET_channel_destroy() on the channel.
1899 *
1900 * The peer_disconnect function is part of a a virtual table set initially either
1901 * when a peer creates a new channel with us, or once we create
1902 * a new channel ourselves (evaluate).
1903 *
1904 * Once we know the exact type of operation (union/intersection), the vt is
1905 * replaced with an operation specific instance (_GSS_[op]_vt).
1906 *
1907 * @param channel_ctx place where local state associated
1908 * with the channel is stored
1909 * @param channel connection to the other end (henceforth invalid)
1910 */
1911static void
1912channel_end_cb (void *channel_ctx,
1913 const struct GNUNET_CADET_Channel *channel)
1914{
1915 struct Operation *op = channel_ctx;
1916
1917 op->channel = NULL;
1918 _GSS_operation_destroy2 (op);
1919}
1920
1921
1922/**
1923 * Function called whenever an MQ-channel's transmission window size changes.
1924 *
1925 * The first callback in an outgoing channel will be with a non-zero value
1926 * and will mean the channel is connected to the destination.
1927 *
1928 * For an incoming channel it will be called immediately after the
1929 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1930 *
1931 * @param cls Channel closure.
1932 * @param channel Connection to the other end (henceforth invalid).
1933 * @param window_size New window size. If the is more messages than buffer size
1934 * this value will be negative..
1935 */
1936static void
1937channel_window_cb (void *cls,
1938 const struct GNUNET_CADET_Channel *channel,
1939 int window_size)
1940{
1941 /* FIXME: not implemented, we could do flow control here... */
1942}
1943
1944
1945/**
1946 * Called when a client wants to create a new listener.
1947 *
1948 * @param cls client that sent the message
1949 * @param msg message sent by the client
1950 */
1951static void
1952handle_client_listen (void *cls,
1953 const struct GNUNET_SETI_ListenMessage *msg)
1954{
1955 struct ClientState *cs = cls;
1956 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1957 GNUNET_MQ_hd_var_size (incoming_msg,
1958 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
1959 struct OperationRequestMessage,
1960 NULL),
1961 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1962 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
1963 struct IntersectionElementInfoMessage,
1964 NULL),
1965 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1966 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
1967 struct BFMessage,
1968 NULL),
1969 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1970 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
1971 struct IntersectionDoneMessage,
1972 NULL),
1973 GNUNET_MQ_handler_end ()
1974 };
1975 struct Listener *listener;
1976
1977 if (NULL != cs->listener)
1978 {
1979 /* max. one active listener per client! */
1980 GNUNET_break (0);
1981 GNUNET_SERVICE_client_drop (cs->client);
1982 return;
1983 }
1984 listener = GNUNET_new (struct Listener);
1985 listener->cs = cs;
1986 cs->listener = listener;
1987 listener->app_id = msg->app_id;
1988 GNUNET_CONTAINER_DLL_insert (listener_head,
1989 listener_tail,
1990 listener);
1991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1992 "New listener for set intersection created (port %s)\n",
1993 GNUNET_h2s (&listener->app_id));
1994 listener->open_port = GNUNET_CADET_open_port (cadet,
1995 &msg->app_id,
1996 &channel_new_cb,
1997 listener,
1998 &channel_window_cb,
1999 &channel_end_cb,
2000 cadet_handlers);
2001 GNUNET_SERVICE_client_continue (cs->client);
2002}
2003
2004
2005/**
2006 * Called when the listening client rejects an operation
2007 * request by another peer.
2008 *
2009 * @param cls client that sent the message
2010 * @param msg message sent by the client
2011 */
2012static void
2013handle_client_reject (void *cls,
2014 const struct GNUNET_SETI_RejectMessage *msg)
2015{
2016 struct ClientState *cs = cls;
2017 struct Operation *op;
2018
2019 op = get_incoming (ntohl (msg->accept_reject_id));
2020 if (NULL == op)
2021 {
2022 /* no matching incoming operation for this reject;
2023 could be that the other peer already disconnected... */
2024 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2025 "Client rejected unknown operation %u\n",
2026 (unsigned int) ntohl (msg->accept_reject_id));
2027 GNUNET_SERVICE_client_continue (cs->client);
2028 return;
2029 }
2030 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2031 "Peer request (app %s) rejected by client\n",
2032 GNUNET_h2s (&cs->listener->app_id));
2033 _GSS_operation_destroy2 (op);
2034 GNUNET_SERVICE_client_continue (cs->client);
2035}
2036
2037
2038/**
2039 * Called when a client wants to add or remove an element to a set it inhabits.
2040 *
2041 * @param cls client that sent the message
2042 * @param msg message sent by the client
2043 */
2044static int
2045check_client_set_add (void *cls,
2046 const struct GNUNET_SETI_ElementMessage *msg)
2047{
2048 /* NOTE: Technically, we should probably check with the
2049 block library whether the element we are given is well-formed */
2050 return GNUNET_OK;
2051}
2052
2053
2054/**
2055 * Called when a client wants to add an element to a set it inhabits.
2056 *
2057 * @param cls client that sent the message
2058 * @param msg message sent by the client
2059 */
2060static void
2061handle_client_set_add (void *cls,
2062 const struct GNUNET_SETI_ElementMessage *msg)
2063{
2064 struct ClientState *cs = cls;
2065 struct Set *set;
2066 struct GNUNET_SETI_Element el;
2067 struct ElementEntry *ee;
2068 struct GNUNET_HashCode hash;
2069
2070 if (NULL == (set = cs->set))
2071 {
2072 /* client without a set requested an operation */
2073 GNUNET_break (0);
2074 GNUNET_SERVICE_client_drop (cs->client);
2075 return;
2076 }
2077 GNUNET_SERVICE_client_continue (cs->client);
2078 el.size = ntohs (msg->header.size) - sizeof(*msg);
2079 el.data = &msg[1];
2080 el.element_type = ntohs (msg->element_type);
2081 GNUNET_SETI_element_hash (&el,
2082 &hash);
2083 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
2084 &hash);
2085 if (NULL == ee)
2086 {
2087 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2088 "Client inserts element %s of size %u\n",
2089 GNUNET_h2s (&hash),
2090 el.size);
2091 ee = GNUNET_malloc (el.size + sizeof(*ee));
2092 ee->element.size = el.size;
2093 GNUNET_memcpy (&ee[1], el.data, el.size);
2094 ee->element.data = &ee[1];
2095 ee->element.element_type = el.element_type;
2096 ee->remote = GNUNET_NO;
2097 ee->element_hash = hash;
2098 GNUNET_break (GNUNET_YES ==
2099 GNUNET_CONTAINER_multihashmap_put (
2100 set->content->elements,
2101 &ee->element_hash,
2102 ee,
2103 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2104 }
2105 else
2106 {
2107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2108 "Client inserted element %s of size %u twice (ignored)\n",
2109 GNUNET_h2s (&hash),
2110 el.size);
2111 /* same element inserted twice */
2112 return;
2113 }
2114 set->current_set_element_count++;
2115}
2116
2117
2118/**
2119 * Advance the current generation of a set,
2120 * adding exclusion ranges if necessary.
2121 *
2122 * @param set the set where we want to advance the generation
2123 */
2124static void
2125advance_generation (struct Set *set)
2126{
2127 if (set->current_generation == set->content->latest_generation)
2128 {
2129 set->content->latest_generation++;
2130 set->current_generation++;
2131 return;
2132 }
2133 GNUNET_assert (set->current_generation < set->content->latest_generation);
2134}
2135
2136
2137/**
2138 * Called when a client wants to initiate a set operation with another
2139 * peer. Initiates the CADET connection to the listener and sends the
2140 * request.
2141 *
2142 * @param cls client that sent the message
2143 * @param msg message sent by the client
2144 * @return #GNUNET_OK if the message is well-formed
2145 */
2146static int
2147check_client_evaluate (void *cls,
2148 const struct GNUNET_SETI_EvaluateMessage *msg)
2149{
2150 /* FIXME: suboptimal, even if the context below could be NULL,
2151 there are malformed messages this does not check for... */
2152 return GNUNET_OK;
2153}
2154
2155
2156/**
2157 * Called when a client wants to initiate a set operation with another
2158 * peer. Initiates the CADET connection to the listener and sends the
2159 * request.
2160 *
2161 * @param cls client that sent the message
2162 * @param msg message sent by the client
2163 */
2164static void
2165handle_client_evaluate (void *cls,
2166 const struct GNUNET_SETI_EvaluateMessage *msg)
2167{
2168 struct ClientState *cs = cls;
2169 struct Operation *op = GNUNET_new (struct Operation);
2170 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2171 GNUNET_MQ_hd_var_size (incoming_msg,
2172 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2173 struct OperationRequestMessage,
2174 op),
2175 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
2176 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
2177 struct IntersectionElementInfoMessage,
2178 op),
2179 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
2180 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
2181 struct BFMessage,
2182 op),
2183 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
2184 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
2185 struct IntersectionDoneMessage,
2186 op),
2187 GNUNET_MQ_handler_end ()
2188 };
2189 struct Set *set;
2190 const struct GNUNET_MessageHeader *context;
2191
2192 if (NULL == (set = cs->set))
2193 {
2194 GNUNET_break (0);
2195 GNUNET_free (op);
2196 GNUNET_SERVICE_client_drop (cs->client);
2197 return;
2198 }
2199 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2200 UINT32_MAX);
2201 op->peer = msg->target_peer;
2202 op->return_intersection = htonl (msg->return_intersection);
2203 fprintf (stderr,
2204 "Return intersection for evaluate is %d\n",
2205 op->return_intersection);
2206 op->client_request_id = ntohl (msg->request_id);
2207 context = GNUNET_MQ_extract_nested_mh (msg);
2208
2209 /* Advance generation values, so that
2210 mutations won't interfer with the running operation. */
2211 op->set = set;
2212 op->generation_created = set->current_generation;
2213 advance_generation (set);
2214 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2215 set->ops_tail,
2216 op);
2217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2218 "Creating new CADET channel to port %s for set intersection\n",
2219 GNUNET_h2s (&msg->app_id));
2220 op->channel = GNUNET_CADET_channel_create (cadet,
2221 op,
2222 &msg->target_peer,
2223 &msg->app_id,
2224 &channel_window_cb,
2225 &channel_end_cb,
2226 cadet_handlers);
2227 op->mq = GNUNET_CADET_get_mq (op->channel);
2228 {
2229 struct GNUNET_MQ_Envelope *ev;
2230 struct OperationRequestMessage *msg;
2231
2232 ev = GNUNET_MQ_msg_nested_mh (msg,
2233 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2234 context);
2235 if (NULL == ev)
2236 {
2237 /* the context message is too large!? */
2238 GNUNET_break (0);
2239 GNUNET_SERVICE_client_drop (cs->client);
2240 return;
2241 }
2242 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2243 "Initiating intersection operation evaluation\n");
2244 /* we started the operation, thus we have to send the operation request */
2245 op->phase = PHASE_INITIAL;
2246 op->my_element_count = op->set->current_set_element_count;
2247 op->my_elements
2248 = GNUNET_CONTAINER_multihashmap_create (op->my_element_count,
2249 GNUNET_YES);
2250
2251 msg->element_count = htonl (op->my_element_count);
2252 GNUNET_MQ_send (op->mq,
2253 ev);
2254 op->phase = PHASE_COUNT_SENT;
2255 if (NULL != context)
2256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2257 "Sent op request with context message\n");
2258 else
2259 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2260 "Sent op request without context message\n");
2261 }
2262 GNUNET_SERVICE_client_continue (cs->client);
2263}
2264
2265
2266/**
2267 * Handle a request from the client to cancel a running set operation.
2268 *
2269 * @param cls the client
2270 * @param msg the message
2271 */
2272static void
2273handle_client_cancel (void *cls,
2274 const struct GNUNET_SETI_CancelMessage *msg)
2275{
2276 struct ClientState *cs = cls;
2277 struct Set *set;
2278 struct Operation *op;
2279 int found;
2280
2281 if (NULL == (set = cs->set))
2282 {
2283 /* client without a set requested an operation */
2284 GNUNET_break (0);
2285 GNUNET_SERVICE_client_drop (cs->client);
2286 return;
2287 }
2288 found = GNUNET_NO;
2289 for (op = set->ops_head; NULL != op; op = op->next)
2290 {
2291 if (op->client_request_id == ntohl (msg->request_id))
2292 {
2293 found = GNUNET_YES;
2294 break;
2295 }
2296 }
2297 if (GNUNET_NO == found)
2298 {
2299 /* It may happen that the operation was already destroyed due to
2300 * the other peer disconnecting. The client may not know about this
2301 * yet and try to cancel the (just barely non-existent) operation.
2302 * So this is not a hard error.
2303 *///
2304 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2305 "Client canceled non-existent op %u\n",
2306 (uint32_t) ntohl (msg->request_id));
2307 }
2308 else
2309 {
2310 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2311 "Client requested cancel for op %u\n",
2312 (uint32_t) ntohl (msg->request_id));
2313 _GSS_operation_destroy (op);
2314 }
2315 GNUNET_SERVICE_client_continue (cs->client);
2316}
2317
2318
2319/**
2320 * Handle a request from the client to accept a set operation that
2321 * came from a remote peer. We forward the accept to the associated
2322 * operation for handling
2323 *
2324 * @param cls the client
2325 * @param msg the message
2326 */
2327static void
2328handle_client_accept (void *cls,
2329 const struct GNUNET_SETI_AcceptMessage *msg)
2330{
2331 struct ClientState *cs = cls;
2332 struct Set *set;
2333 struct Operation *op;
2334 struct GNUNET_SETI_ResultMessage *result_message;
2335 struct GNUNET_MQ_Envelope *ev;
2336 struct Listener *listener;
2337
2338 if (NULL == (set = cs->set))
2339 {
2340 /* client without a set requested to accept */
2341 GNUNET_break (0);
2342 GNUNET_SERVICE_client_drop (cs->client);
2343 return;
2344 }
2345 op = get_incoming (ntohl (msg->accept_reject_id));
2346 if (NULL == op)
2347 {
2348 /* It is not an error if the set op does not exist -- it may
2349 * have been destroyed when the partner peer disconnected. */
2350 GNUNET_log (
2351 GNUNET_ERROR_TYPE_INFO,
2352 "Client %p accepted request %u of listener %p that is no longer active\n",
2353 cs,
2354 ntohl (msg->accept_reject_id),
2355 cs->listener);
2356 ev = GNUNET_MQ_msg (result_message,
2357 GNUNET_MESSAGE_TYPE_SETI_RESULT);
2358 result_message->request_id = msg->request_id;
2359 result_message->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
2360 GNUNET_MQ_send (set->cs->mq, ev);
2361 GNUNET_SERVICE_client_continue (cs->client);
2362 return;
2363 }
2364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2365 "Client accepting request %u\n",
2366 (uint32_t) ntohl (msg->accept_reject_id));
2367 listener = op->listener;
2368 op->listener = NULL;
2369 op->return_intersection = htonl (msg->return_intersection);
2370 fprintf (stderr,
2371 "Return intersection for accept is %d\n",
2372 op->return_intersection);
2373 GNUNET_CONTAINER_DLL_remove (listener->op_head,
2374 listener->op_tail,
2375 op);
2376 op->set = set;
2377 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2378 set->ops_tail,
2379 op);
2380 op->client_request_id = ntohl (msg->request_id);
2381
2382 /* Advance generation values, so that future mutations do not
2383 interfer with the running operation. */
2384 op->generation_created = set->current_generation;
2385 advance_generation (set);
2386 {
2387 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2388 "Accepting set intersection operation\n");
2389 op->phase = PHASE_INITIAL;
2390 op->my_element_count
2391 = op->set->current_set_element_count;
2392 op->my_elements
2393 = GNUNET_CONTAINER_multihashmap_create (
2394 GNUNET_MIN (op->my_element_count,
2395 op->remote_element_count),
2396 GNUNET_YES);
2397 if (op->remote_element_count < op->my_element_count)
2398 {
2399 /* If the other peer (Alice) has fewer elements than us (Bob),
2400 we just send the count as Alice should send the first BF */
2401 send_element_count (op);
2402 op->phase = PHASE_COUNT_SENT;
2403 }
2404 else
2405 {
2406 /* We have fewer elements, so we start with the BF */
2407 begin_bf_exchange (op);
2408 }
2409 }
2410 /* Now allow CADET to continue, as we did not do this in
2411 #handle_incoming_msg (as we wanted to first see if the
2412 local client would accept the request). */
2413 GNUNET_CADET_receive_done (op->channel);
2414 GNUNET_SERVICE_client_continue (cs->client);
2415}
2416
2417
2418/**
2419 * Called to clean up, after a shutdown has been requested.
2420 *
2421 * @param cls closure, NULL
2422 */
2423static void
2424shutdown_task (void *cls)
2425{
2426 /* Delay actual shutdown to allow service to disconnect clients */
2427 in_shutdown = GNUNET_YES;
2428 if (0 == num_clients)
2429 {
2430 if (NULL != cadet)
2431 {
2432 GNUNET_CADET_disconnect (cadet);
2433 cadet = NULL;
2434 }
2435 }
2436 GNUNET_STATISTICS_destroy (_GSS_statistics,
2437 GNUNET_YES);
2438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2439 "handled shutdown request\n");
2440}
2441
2442
2443/**
2444 * Function called by the service's run
2445 * method to run service-specific setup code.
2446 *
2447 * @param cls closure
2448 * @param cfg configuration to use
2449 * @param service the initialized service
2450 */
2451static void
2452run (void *cls,
2453 const struct GNUNET_CONFIGURATION_Handle *cfg,
2454 struct GNUNET_SERVICE_Handle *service)
2455{
2456 /* FIXME: need to modify SERVICE (!) API to allow
2457 us to run a shutdown task *after* clients were
2458 forcefully disconnected! */
2459 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2460 NULL);
2461 _GSS_statistics = GNUNET_STATISTICS_create ("seti",
2462 cfg);
2463 cadet = GNUNET_CADET_connect (cfg);
2464 if (NULL == cadet)
2465 {
2466 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2467 _ ("Could not connect to CADET service\n"));
2468 GNUNET_SCHEDULER_shutdown ();
2469 return;
2470 }
2471}
2472
2473
2474/**
2475 * Define "main" method using service macro.
2476 */
2477GNUNET_SERVICE_MAIN (
2478 "seti",
2479 GNUNET_SERVICE_OPTION_NONE,
2480 &run,
2481 &client_connect_cb,
2482 &client_disconnect_cb,
2483 NULL,
2484 GNUNET_MQ_hd_fixed_size (client_accept,
2485 GNUNET_MESSAGE_TYPE_SETI_ACCEPT,
2486 struct GNUNET_SETI_AcceptMessage,
2487 NULL),
2488 GNUNET_MQ_hd_var_size (client_set_add,
2489 GNUNET_MESSAGE_TYPE_SETI_ADD,
2490 struct GNUNET_SETI_ElementMessage,
2491 NULL),
2492 GNUNET_MQ_hd_fixed_size (client_create_set,
2493 GNUNET_MESSAGE_TYPE_SETI_CREATE,
2494 struct GNUNET_SETI_CreateMessage,
2495 NULL),
2496 GNUNET_MQ_hd_var_size (client_evaluate,
2497 GNUNET_MESSAGE_TYPE_SETI_EVALUATE,
2498 struct GNUNET_SETI_EvaluateMessage,
2499 NULL),
2500 GNUNET_MQ_hd_fixed_size (client_listen,
2501 GNUNET_MESSAGE_TYPE_SETI_LISTEN,
2502 struct GNUNET_SETI_ListenMessage,
2503 NULL),
2504 GNUNET_MQ_hd_fixed_size (client_reject,
2505 GNUNET_MESSAGE_TYPE_SETI_REJECT,
2506 struct GNUNET_SETI_RejectMessage,
2507 NULL),
2508 GNUNET_MQ_hd_fixed_size (client_cancel,
2509 GNUNET_MESSAGE_TYPE_SETI_CANCEL,
2510 struct GNUNET_SETI_CancelMessage,
2511 NULL),
2512 GNUNET_MQ_handler_end ());
2513
2514
2515/* end of gnunet-service-seti.c */