aboutsummaryrefslogtreecommitdiff
path: root/src/setu/gnunet-service-setu.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/setu/gnunet-service-setu.c')
-rw-r--r--src/setu/gnunet-service-setu.c3683
1 files changed, 3683 insertions, 0 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
new file mode 100644
index 000000000..326589186
--- /dev/null
+++ b/src/setu/gnunet-service-setu.c
@@ -0,0 +1,3683 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file setu/gnunet-service-setu.c
22 * @brief set union operation
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h"
29#include "ibf.h"
30#include "gnunet_protocols.h"
31#include "gnunet_applications.h"
32#include "gnunet_cadet_service.h"
33#include "gnunet-service-setu_strata_estimator.h"
34#include "gnunet-service-setu_protocol.h"
35#include "gnunet_statistics_service.h"
36#include <gcrypt.h>
37#include "gnunet_setu_service.h"
38#include "setu.h"
39
40#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
41
42/**
43 * How long do we hold on to an incoming channel if there is
44 * no local listener before giving up?
45 */
46#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
47
48/**
49 * Number of IBFs in a strata estimator.
50 */
51#define SE_STRATA_COUNT 32
52
53/**
54 * Size of the IBFs in the strata estimator.
55 */
56#define SE_IBF_SIZE 80
57
58/**
59 * The hash num parameter for the difference digests and strata estimators.
60 */
61#define SE_IBF_HASH_NUM 4
62
63/**
64 * Number of buckets that can be transmitted in one message.
65 */
66#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
67
68/**
69 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
70 * Choose this value so that computing the IBF is still cheaper
71 * than transmitting all values.
72 */
73#define MAX_IBF_ORDER (20)
74
75/**
76 * Number of buckets used in the ibf per estimated
77 * difference.
78 */
79#define IBF_ALPHA 4
80
81
82/**
83 * Current phase we are in for a union operation.
84 */
85enum UnionOperationPhase
86{
87 /**
88 * We sent the request message, and expect a strata estimator.
89 */
90 PHASE_EXPECT_SE,
91
92 /**
93 * We sent the strata estimator, and expect an IBF. This phase is entered once
94 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
95 *
96 * XXX: could use better wording.
97 * XXX: repurposed to also expect a "request full set" message, should be renamed
98 *
99 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
100 */
101 PHASE_EXPECT_IBF,
102
103 /**
104 * Continuation for multi part IBFs.
105 */
106 PHASE_EXPECT_IBF_CONT,
107
108 /**
109 * We are decoding an IBF.
110 */
111 PHASE_INVENTORY_ACTIVE,
112
113 /**
114 * The other peer is decoding the IBF we just sent.
115 */
116 PHASE_INVENTORY_PASSIVE,
117
118 /**
119 * The protocol is almost finished, but we still have to flush our message
120 * queue and/or expect some elements.
121 */
122 PHASE_FINISH_CLOSING,
123
124 /**
125 * In the penultimate phase, we wait until all our demands are satisfied.
126 * Then we send a done message, and wait for another done message.
127 */
128 PHASE_FINISH_WAITING,
129
130 /**
131 * In the ultimate phase, we wait until our demands are satisfied and then
132 * quit (sending another DONE message).
133 */
134 PHASE_DONE,
135
136 /**
137 * After sending the full set, wait for responses with the elements
138 * that the local peer is missing.
139 */
140 PHASE_FULL_SENDING,
141};
142
143
144/**
145 * Information about an element element in the set. All elements are
146 * stored in a hash-table from their hash-code to their `struct
147 * Element`, so that the remove and add operations are reasonably
148 * fast.
149 */
150struct ElementEntry
151{
152 /**
153 * The actual element. The data for the element
154 * should be allocated at the end of this struct.
155 */
156 struct GNUNET_SETU_Element element;
157
158 /**
159 * Hash of the element. For set union: Will be used to derive the
160 * different IBF keys for different salts.
161 */
162 struct GNUNET_HashCode element_hash;
163
164 /**
165 * First generation that includes this element.
166 */
167 unsigned int generation;
168
169 /**
170 * #GNUNET_YES if the element is a remote element, and does not belong
171 * to the operation's set.
172 */
173 int remote;
174};
175
176
177/**
178 * A listener is inhabited by a client, and waits for evaluation
179 * requests from remote peers.
180 */
181struct Listener;
182
183
184/**
185 * A set that supports a specific operation with other peers.
186 */
187struct Set;
188
189
190/**
191 * State we keep per client.
192 */
193struct ClientState
194{
195 /**
196 * Set, if associated with the client, otherwise NULL.
197 */
198 struct Set *set;
199
200 /**
201 * Listener, if associated with the client, otherwise NULL.
202 */
203 struct Listener *listener;
204
205 /**
206 * Client handle.
207 */
208 struct GNUNET_SERVICE_Client *client;
209
210 /**
211 * Message queue.
212 */
213 struct GNUNET_MQ_Handle *mq;
214};
215
216
217/**
218 * Operation context used to execute a set operation.
219 */
220struct Operation
221{
222
223 /**
224 * The identity of the requesting peer. Needs to
225 * be stored here as the op spec might not have been created yet.
226 */
227 struct GNUNET_PeerIdentity peer;
228
229 /**
230 * Initial size of our set, just before the operation started.
231 */
232 uint64_t initial_size;
233
234 /**
235 * Kept in a DLL of the listener, if @e listener is non-NULL.
236 */
237 struct Operation *next;
238
239 /**
240 * Kept in a DLL of the listener, if @e listener is non-NULL.
241 */
242 struct Operation *prev;
243
244 /**
245 * Channel to the peer.
246 */
247 struct GNUNET_CADET_Channel *channel;
248
249 /**
250 * Port this operation runs on.
251 */
252 struct Listener *listener;
253
254 /**
255 * Message queue for the channel.
256 */
257 struct GNUNET_MQ_Handle *mq;
258
259 /**
260 * Context message, may be NULL.
261 */
262 struct GNUNET_MessageHeader *context_msg;
263
264 /**
265 * Set associated with the operation, NULL until the spec has been
266 * associated with a set.
267 */
268 struct Set *set;
269
270 /**
271 * Copy of the set's strata estimator at the time of
272 * creation of this operation.
273 */
274 struct StrataEstimator *se;
275
276 /**
277 * The IBF we currently receive.
278 */
279 struct InvertibleBloomFilter *remote_ibf;
280
281 /**
282 * The IBF with the local set's element.
283 */
284 struct InvertibleBloomFilter *local_ibf;
285
286 /**
287 * Maps unsalted IBF-Keys to elements.
288 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
289 * Colliding IBF-Keys are linked.
290 */
291 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
292
293 /**
294 * Timeout task, if the incoming peer has not been accepted
295 * after the timeout, it will be disconnected.
296 */
297 struct GNUNET_SCHEDULER_Task *timeout_task;
298
299 /**
300 * Hashes for elements that we have demanded from the other peer.
301 */
302 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
303
304 /**
305 * Current state of the operation.
306 */
307 enum UnionOperationPhase phase;
308
309 /**
310 * Did we send the client that we are done?
311 */
312 int client_done_sent;
313
314 /**
315 * Number of ibf buckets already received into the @a remote_ibf.
316 */
317 unsigned int ibf_buckets_received;
318
319 /**
320 * Salt that we're using for sending IBFs
321 */
322 uint32_t salt_send;
323
324 /**
325 * Salt for the IBF we've received and that we're currently decoding.
326 */
327 uint32_t salt_receive;
328
329 /**
330 * Number of elements we received from the other peer
331 * that were not in the local set yet.
332 */
333 uint32_t received_fresh;
334
335 /**
336 * Total number of elements received from the other peer.
337 */
338 uint32_t received_total;
339
340 /**
341 * Salt to use for the operation.
342 */
343 uint32_t salt;
344
345 /**
346 * Remote peers element count
347 */
348 uint32_t remote_element_count;
349
350 /**
351 * ID used to identify an operation between service and client
352 */
353 uint32_t client_request_id;
354
355 /**
356 * Always use delta operation instead of sending full sets,
357 * even it it's less efficient.
358 */
359 int force_delta;
360
361 /**
362 * Always send full sets, even if delta operations would
363 * be more efficient.
364 */
365 int force_full;
366
367 /**
368 * #GNUNET_YES to fail operations where Byzantine faults
369 * are suspected
370 */
371 int byzantine;
372
373 /**
374 * #GNUNET_YES to also send back set elements we are sending to
375 * the remote peer.
376 */
377 int symmetric;
378
379 /**
380 * Lower bound for the set size, used only when
381 * byzantine mode is enabled.
382 */
383 int byzantine_lower_bound;
384
385 /**
386 * Unique request id for the request from a remote peer, sent to the
387 * client, which will accept or reject the request. Set to '0' iff
388 * the request has not been suggested yet.
389 */
390 uint32_t suggest_id;
391
392 /**
393 * Generation in which the operation handle
394 * was created.
395 */
396 unsigned int generation_created;
397};
398
399
400/**
401 * SetContent stores the actual set elements, which may be shared by
402 * multiple generations derived from one set.
403 */
404struct SetContent
405{
406 /**
407 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
408 */
409 struct GNUNET_CONTAINER_MultiHashMap *elements;
410
411 /**
412 * Number of references to the content.
413 */
414 unsigned int refcount;
415
416 /**
417 * FIXME: document!
418 */
419 unsigned int latest_generation;
420
421 /**
422 * Number of concurrently active iterators.
423 */
424 int iterator_count;
425};
426
427
428/**
429 * A set that supports a specific operation with other peers.
430 */
431struct Set
432{
433 /**
434 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
435 */
436 struct Set *next;
437
438 /**
439 * Sets are held in a doubly linked list.
440 */
441 struct Set *prev;
442
443 /**
444 * Client that owns the set. Only one client may own a set,
445 * and there can only be one set per client.
446 */
447 struct ClientState *cs;
448
449 /**
450 * Content, possibly shared by multiple sets,
451 * and thus reference counted.
452 */
453 struct SetContent *content;
454
455 /**
456 * The strata estimator is only generated once for each set. The IBF keys
457 * are derived from the element hashes with salt=0.
458 */
459 struct StrataEstimator *se;
460
461 /**
462 * Evaluate operations are held in a linked list.
463 */
464 struct Operation *ops_head;
465
466 /**
467 * Evaluate operations are held in a linked list.
468 */
469 struct Operation *ops_tail;
470
471 /**
472 * Current generation, that is, number of previously executed
473 * operations and lazy copies on the underlying set content.
474 */
475 unsigned int current_generation;
476
477};
478
479
480/**
481 * The key entry is used to associate an ibf key with an element.
482 */
483struct KeyEntry
484{
485 /**
486 * IBF key for the entry, derived from the current salt.
487 */
488 struct IBF_Key ibf_key;
489
490 /**
491 * The actual element associated with the key.
492 *
493 * Only owned by the union operation if element->operation
494 * is #GNUNET_YES.
495 */
496 struct ElementEntry *element;
497
498 /**
499 * Did we receive this element? Even if element->is_foreign is false, we
500 * might have received the element, so this indicates that the other peer
501 * has it.
502 */
503 int received;
504};
505
506
507/**
508 * Used as a closure for sending elements
509 * with a specific IBF key.
510 */
511struct SendElementClosure
512{
513 /**
514 * The IBF key whose matching elements should be
515 * sent.
516 */
517 struct IBF_Key ibf_key;
518
519 /**
520 * Operation for which the elements
521 * should be sent.
522 */
523 struct Operation *op;
524};
525
526
527/**
528 * A listener is inhabited by a client, and waits for evaluation
529 * requests from remote peers.
530 */
531struct Listener
532{
533 /**
534 * Listeners are held in a doubly linked list.
535 */
536 struct Listener *next;
537
538 /**
539 * Listeners are held in a doubly linked list.
540 */
541 struct Listener *prev;
542
543 /**
544 * Head of DLL of operations this listener is responsible for.
545 * Once the client has accepted/declined the operation, the
546 * operation is moved to the respective set's operation DLLS.
547 */
548 struct Operation *op_head;
549
550 /**
551 * Tail of DLL of operations this listener is responsible for.
552 * Once the client has accepted/declined the operation, the
553 * operation is moved to the respective set's operation DLLS.
554 */
555 struct Operation *op_tail;
556
557 /**
558 * Client that owns the listener.
559 * Only one client may own a listener.
560 */
561 struct ClientState *cs;
562
563 /**
564 * The port we are listening on with CADET.
565 */
566 struct GNUNET_CADET_Port *open_port;
567
568 /**
569 * Application ID for the operation, used to distinguish
570 * multiple operations of the same type with the same peer.
571 */
572 struct GNUNET_HashCode app_id;
573
574};
575
576
577/**
578 * Handle to the cadet service, used to listen for and connect to
579 * remote peers.
580 */
581static struct GNUNET_CADET_Handle *cadet;
582
583/**
584 * Statistics handle.
585 */
586static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
587
588/**
589 * Listeners are held in a doubly linked list.
590 */
591static struct Listener *listener_head;
592
593/**
594 * Listeners are held in a doubly linked list.
595 */
596static struct Listener *listener_tail;
597
598/**
599 * Number of active clients.
600 */
601static unsigned int num_clients;
602
603/**
604 * Are we in shutdown? if #GNUNET_YES and the number of clients
605 * drops to zero, disconnect from CADET.
606 */
607static int in_shutdown;
608
609/**
610 * Counter for allocating unique IDs for clients, used to identify incoming
611 * operation requests from remote peers, that the client can choose to accept
612 * or refuse. 0 must not be used (reserved for uninitialized).
613 */
614static uint32_t suggest_id;
615
616
617/**
618 * Iterator over hash map entries, called to
619 * destroy the linked list of colliding ibf key entries.
620 *
621 * @param cls closure
622 * @param key current key code
623 * @param value value in the hash map
624 * @return #GNUNET_YES if we should continue to iterate,
625 * #GNUNET_NO if not.
626 */
627static int
628destroy_key_to_element_iter (void *cls,
629 uint32_t key,
630 void *value)
631{
632 struct KeyEntry *k = value;
633
634 GNUNET_assert (NULL != k);
635 if (GNUNET_YES == k->element->remote)
636 {
637 GNUNET_free (k->element);
638 k->element = NULL;
639 }
640 GNUNET_free (k);
641 return GNUNET_YES;
642}
643
644
645/**
646 * Signal to the client that the operation has finished and
647 * destroy the operation.
648 *
649 * @param cls operation to destroy
650 */
651static void
652send_client_done (void *cls)
653{
654 struct Operation *op = cls;
655 struct GNUNET_MQ_Envelope *ev;
656 struct GNUNET_SETU_ResultMessage *rm;
657
658 if (GNUNET_YES == op->client_done_sent)
659 return;
660 if (PHASE_DONE != op->phase)
661 {
662 LOG (GNUNET_ERROR_TYPE_WARNING,
663 "Union operation failed\n");
664 GNUNET_STATISTICS_update (_GSS_statistics,
665 "# Union operations failed",
666 1,
667 GNUNET_NO);
668 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SETU_RESULT);
669 rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
670 rm->request_id = htonl (op->client_request_id);
671 rm->element_type = htons (0);
672 GNUNET_MQ_send (op->set->cs->mq,
673 ev);
674 return;
675 }
676
677 op->client_done_sent = GNUNET_YES;
678
679 GNUNET_STATISTICS_update (_GSS_statistics,
680 "# Union operations succeeded",
681 1,
682 GNUNET_NO);
683 LOG (GNUNET_ERROR_TYPE_INFO,
684 "Signalling client that union operation is done\n");
685 ev = GNUNET_MQ_msg (rm,
686 GNUNET_MESSAGE_TYPE_SETU_RESULT);
687 rm->request_id = htonl (op->client_request_id);
688 rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
689 rm->element_type = htons (0);
690 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
691 op->key_to_element));
692 GNUNET_MQ_send (op->set->cs->mq,
693 ev);
694}
695
696
697/* FIXME: the destroy logic is a mess and should be cleaned up! */
698
699/**
700 * Destroy the given operation. Used for any operation where both
701 * peers were known and that thus actually had a vt and channel. Must
702 * not be used for operations where 'listener' is still set and we do
703 * not know the other peer.
704 *
705 * Call the implementation-specific cancel function of the operation.
706 * Disconnects from the remote peer. Does not disconnect the client,
707 * as there may be multiple operations per set.
708 *
709 * @param op operation to destroy
710 */
711static void
712_GSS_operation_destroy (struct Operation *op)
713{
714 struct Set *set = op->set;
715 struct GNUNET_CADET_Channel *channel;
716
717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718 "Destroying union operation %p\n",
719 op);
720 GNUNET_assert (NULL == op->listener);
721 /* check if the op was canceled twice */
722 if (NULL != op->remote_ibf)
723 {
724 ibf_destroy (op->remote_ibf);
725 op->remote_ibf = NULL;
726 }
727 if (NULL != op->demanded_hashes)
728 {
729 GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
730 op->demanded_hashes = NULL;
731 }
732 if (NULL != op->local_ibf)
733 {
734 ibf_destroy (op->local_ibf);
735 op->local_ibf = NULL;
736 }
737 if (NULL != op->se)
738 {
739 strata_estimator_destroy (op->se);
740 op->se = NULL;
741 }
742 if (NULL != op->key_to_element)
743 {
744 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
745 &destroy_key_to_element_iter,
746 NULL);
747 GNUNET_CONTAINER_multihashmap32_destroy (op->key_to_element);
748 op->key_to_element = NULL;
749 }
750 if (NULL != set)
751 {
752 GNUNET_CONTAINER_DLL_remove (set->ops_head,
753 set->ops_tail,
754 op);
755 op->set = NULL;
756 }
757 if (NULL != op->context_msg)
758 {
759 GNUNET_free (op->context_msg);
760 op->context_msg = NULL;
761 }
762 if (NULL != (channel = op->channel))
763 {
764 /* This will free op; called conditionally as this helper function
765 is also called from within the channel disconnect handler. */
766 op->channel = NULL;
767 GNUNET_CADET_channel_destroy (channel);
768 }
769 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
770 * there was a channel end handler that will free 'op' on the call stack. */
771}
772
773
774/**
775 * This function probably should not exist
776 * and be replaced by inlining more specific
777 * logic in the various places where it is called.
778 */
779static void
780_GSS_operation_destroy2 (struct Operation *op);
781
782
783/**
784 * Destroy an incoming request from a remote peer
785 *
786 * @param op remote request to destroy
787 */
788static void
789incoming_destroy (struct Operation *op)
790{
791 struct Listener *listener;
792
793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 "Destroying incoming operation %p\n",
795 op);
796 if (NULL != (listener = op->listener))
797 {
798 GNUNET_CONTAINER_DLL_remove (listener->op_head,
799 listener->op_tail,
800 op);
801 op->listener = NULL;
802 }
803 if (NULL != op->timeout_task)
804 {
805 GNUNET_SCHEDULER_cancel (op->timeout_task);
806 op->timeout_task = NULL;
807 }
808 _GSS_operation_destroy2 (op);
809}
810
811
812/**
813 * This function probably should not exist
814 * and be replaced by inlining more specific
815 * logic in the various places where it is called.
816 */
817static void
818_GSS_operation_destroy2 (struct Operation *op)
819{
820 struct GNUNET_CADET_Channel *channel;
821
822 if (NULL != (channel = op->channel))
823 {
824 /* This will free op; called conditionally as this helper function
825 is also called from within the channel disconnect handler. */
826 op->channel = NULL;
827 GNUNET_CADET_channel_destroy (channel);
828 }
829 if (NULL != op->listener)
830 {
831 incoming_destroy (op);
832 return;
833 }
834 if (NULL != op->set)
835 send_client_done (op);
836 _GSS_operation_destroy (op);
837 GNUNET_free (op);
838}
839
840
841/**
842 * Inform the client that the union operation has failed,
843 * and proceed to destroy the evaluate operation.
844 *
845 * @param op the union operation to fail
846 */
847static void
848fail_union_operation (struct Operation *op)
849{
850 struct GNUNET_MQ_Envelope *ev;
851 struct GNUNET_SETU_ResultMessage *msg;
852
853 LOG (GNUNET_ERROR_TYPE_WARNING,
854 "union operation failed\n");
855 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETU_RESULT);
856 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
857 msg->request_id = htonl (op->client_request_id);
858 msg->element_type = htons (0);
859 GNUNET_MQ_send (op->set->cs->mq,
860 ev);
861 _GSS_operation_destroy (op);
862}
863
864
865/**
866 * Derive the IBF key from a hash code and
867 * a salt.
868 *
869 * @param src the hash code
870 * @return the derived IBF key
871 */
872static struct IBF_Key
873get_ibf_key (const struct GNUNET_HashCode *src)
874{
875 struct IBF_Key key;
876 uint16_t salt = 0;
877
878 GNUNET_assert (GNUNET_OK ==
879 GNUNET_CRYPTO_kdf (&key, sizeof(key),
880 src, sizeof *src,
881 &salt, sizeof(salt),
882 NULL, 0));
883 return key;
884}
885
886
887/**
888 * Context for #op_get_element_iterator
889 */
890struct GetElementContext
891{
892 /**
893 * FIXME.
894 */
895 struct GNUNET_HashCode hash;
896
897 /**
898 * FIXME.
899 */
900 struct KeyEntry *k;
901};
902
903
904/**
905 * Iterator over the mapping from IBF keys to element entries. Checks if we
906 * have an element with a given GNUNET_HashCode.
907 *
908 * @param cls closure
909 * @param key current key code
910 * @param value value in the hash map
911 * @return #GNUNET_YES if we should search further,
912 * #GNUNET_NO if we've found the element.
913 */
914static int
915op_get_element_iterator (void *cls,
916 uint32_t key,
917 void *value)
918{
919 struct GetElementContext *ctx = cls;
920 struct KeyEntry *k = value;
921
922 GNUNET_assert (NULL != k);
923 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
924 &ctx->hash))
925 {
926 ctx->k = k;
927 return GNUNET_NO;
928 }
929 return GNUNET_YES;
930}
931
932
933/**
934 * Determine whether the given element is already in the operation's element
935 * set.
936 *
937 * @param op operation that should be tested for 'element_hash'
938 * @param element_hash hash of the element to look for
939 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
940 */
941static struct KeyEntry *
942op_get_element (struct Operation *op,
943 const struct GNUNET_HashCode *element_hash)
944{
945 int ret;
946 struct IBF_Key ibf_key;
947 struct GetElementContext ctx = { { { 0 } }, 0 };
948
949 ctx.hash = *element_hash;
950
951 ibf_key = get_ibf_key (element_hash);
952 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->key_to_element,
953 (uint32_t) ibf_key.key_val,
954 &op_get_element_iterator,
955 &ctx);
956
957 /* was the iteration aborted because we found the element? */
958 if (GNUNET_SYSERR == ret)
959 {
960 GNUNET_assert (NULL != ctx.k);
961 return ctx.k;
962 }
963 return NULL;
964}
965
966
967/**
968 * Insert an element into the union operation's
969 * key-to-element mapping. Takes ownership of 'ee'.
970 * Note that this does not insert the element in the set,
971 * only in the operation's key-element mapping.
972 * This is done to speed up re-tried operations, if some elements
973 * were transmitted, and then the IBF fails to decode.
974 *
975 * XXX: clarify ownership, doesn't sound right.
976 *
977 * @param op the union operation
978 * @param ee the element entry
979 * @parem received was this element received from the remote peer?
980 */
981static void
982op_register_element (struct Operation *op,
983 struct ElementEntry *ee,
984 int received)
985{
986 struct IBF_Key ibf_key;
987 struct KeyEntry *k;
988
989 ibf_key = get_ibf_key (&ee->element_hash);
990 k = GNUNET_new (struct KeyEntry);
991 k->element = ee;
992 k->ibf_key = ibf_key;
993 k->received = received;
994 GNUNET_assert (GNUNET_OK ==
995 GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
996 (uint32_t) ibf_key.key_val,
997 k,
998 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
999}
1000
1001
1002/**
1003 * FIXME.
1004 */
1005static void
1006salt_key (const struct IBF_Key *k_in,
1007 uint32_t salt,
1008 struct IBF_Key *k_out)
1009{
1010 int s = salt % 64;
1011 uint64_t x = k_in->key_val;
1012
1013 /* rotate ibf key */
1014 x = (x >> s) | (x << (64 - s));
1015 k_out->key_val = x;
1016}
1017
1018
1019/**
1020 * FIXME.
1021 */
1022static void
1023unsalt_key (const struct IBF_Key *k_in,
1024 uint32_t salt,
1025 struct IBF_Key *k_out)
1026{
1027 int s = salt % 64;
1028 uint64_t x = k_in->key_val;
1029
1030 x = (x << s) | (x >> (64 - s));
1031 k_out->key_val = x;
1032}
1033
1034
1035/**
1036 * Insert a key into an ibf.
1037 *
1038 * @param cls the ibf
1039 * @param key unused
1040 * @param value the key entry to get the key from
1041 */
1042static int
1043prepare_ibf_iterator (void *cls,
1044 uint32_t key,
1045 void *value)
1046{
1047 struct Operation *op = cls;
1048 struct KeyEntry *ke = value;
1049 struct IBF_Key salted_key;
1050
1051 LOG (GNUNET_ERROR_TYPE_DEBUG,
1052 "[OP %x] inserting %lx (hash %s) into ibf\n",
1053 (void *) op,
1054 (unsigned long) ke->ibf_key.key_val,
1055 GNUNET_h2s (&ke->element->element_hash));
1056 salt_key (&ke->ibf_key,
1057 op->salt_send,
1058 &salted_key);
1059 ibf_insert (op->local_ibf, salted_key);
1060 return GNUNET_YES;
1061}
1062
1063
1064/**
1065 * Is element @a ee part of the set used by @a op?
1066 *
1067 * @param ee element to test
1068 * @param op operation the defines the set and its generation
1069 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
1070 */
1071static int
1072_GSS_is_element_of_operation (struct ElementEntry *ee,
1073 struct Operation *op)
1074{
1075 return ee->generation >= op->generation_created;
1076}
1077
1078
1079/**
1080 * Iterator for initializing the
1081 * key-to-element mapping of a union operation
1082 *
1083 * @param cls the union operation `struct Operation *`
1084 * @param key unused
1085 * @param value the `struct ElementEntry *` to insert
1086 * into the key-to-element mapping
1087 * @return #GNUNET_YES (to continue iterating)
1088 */
1089static int
1090init_key_to_element_iterator (void *cls,
1091 const struct GNUNET_HashCode *key,
1092 void *value)
1093{
1094 struct Operation *op = cls;
1095 struct ElementEntry *ee = value;
1096
1097 /* make sure that the element belongs to the set at the time
1098 * of creating the operation */
1099 if (GNUNET_NO ==
1100 _GSS_is_element_of_operation (ee,
1101 op))
1102 return GNUNET_YES;
1103 GNUNET_assert (GNUNET_NO == ee->remote);
1104 op_register_element (op,
1105 ee,
1106 GNUNET_NO);
1107 return GNUNET_YES;
1108}
1109
1110
1111/**
1112 * Initialize the IBF key to element mapping local to this set operation.
1113 *
1114 * @param op the set union operation
1115 */
1116static void
1117initialize_key_to_element (struct Operation *op)
1118{
1119 unsigned int len;
1120
1121 GNUNET_assert (NULL == op->key_to_element);
1122 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
1123 op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
1124 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1125 &init_key_to_element_iterator,
1126 op);
1127}
1128
1129
1130/**
1131 * Create an ibf with the operation's elements
1132 * of the specified size
1133 *
1134 * @param op the union operation
1135 * @param size size of the ibf to create
1136 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1137 */
1138static int
1139prepare_ibf (struct Operation *op,
1140 uint32_t size)
1141{
1142 GNUNET_assert (NULL != op->key_to_element);
1143
1144 if (NULL != op->local_ibf)
1145 ibf_destroy (op->local_ibf);
1146 op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
1147 if (NULL == op->local_ibf)
1148 {
1149 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1150 "Failed to allocate local IBF\n");
1151 return GNUNET_SYSERR;
1152 }
1153 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
1154 &prepare_ibf_iterator,
1155 op);
1156 return GNUNET_OK;
1157}
1158
1159
1160/**
1161 * Send an ibf of appropriate size.
1162 *
1163 * Fragments the IBF into multiple messages if necessary.
1164 *
1165 * @param op the union operation
1166 * @param ibf_order order of the ibf to send, size=2^order
1167 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1168 */
1169static int
1170send_ibf (struct Operation *op,
1171 uint16_t ibf_order)
1172{
1173 unsigned int buckets_sent = 0;
1174 struct InvertibleBloomFilter *ibf;
1175
1176 if (GNUNET_OK !=
1177 prepare_ibf (op, 1 << ibf_order))
1178 {
1179 /* allocation failed */
1180 return GNUNET_SYSERR;
1181 }
1182
1183 LOG (GNUNET_ERROR_TYPE_DEBUG,
1184 "sending ibf of size %u\n",
1185 1 << ibf_order);
1186
1187 {
1188 char name[64] = { 0 };
1189 snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
1190 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1191 }
1192
1193 ibf = op->local_ibf;
1194
1195 while (buckets_sent < (1 << ibf_order))
1196 {
1197 unsigned int buckets_in_message;
1198 struct GNUNET_MQ_Envelope *ev;
1199 struct IBFMessage *msg;
1200
1201 buckets_in_message = (1 << ibf_order) - buckets_sent;
1202 /* limit to maximum */
1203 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1204 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1205
1206 ev = GNUNET_MQ_msg_extra (msg,
1207 buckets_in_message * IBF_BUCKET_SIZE,
1208 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
1209 msg->reserved1 = 0;
1210 msg->reserved2 = 0;
1211 msg->order = ibf_order;
1212 msg->offset = htonl (buckets_sent);
1213 msg->salt = htonl (op->salt_send);
1214 ibf_write_slice (ibf, buckets_sent,
1215 buckets_in_message, &msg[1]);
1216 buckets_sent += buckets_in_message;
1217 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "ibf chunk size %u, %u/%u sent\n",
1219 buckets_in_message,
1220 buckets_sent,
1221 1 << ibf_order);
1222 GNUNET_MQ_send (op->mq, ev);
1223 }
1224
1225 /* The other peer must decode the IBF, so
1226 * we're passive. */
1227 op->phase = PHASE_INVENTORY_PASSIVE;
1228 return GNUNET_OK;
1229}
1230
1231
1232/**
1233 * Compute the necessary order of an ibf
1234 * from the size of the symmetric set difference.
1235 *
1236 * @param diff the difference
1237 * @return the required size of the ibf
1238 */
1239static unsigned int
1240get_order_from_difference (unsigned int diff)
1241{
1242 unsigned int ibf_order;
1243
1244 ibf_order = 2;
1245 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
1246 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
1247 (ibf_order < MAX_IBF_ORDER))
1248 ibf_order++;
1249 // add one for correction
1250 return ibf_order + 1;
1251}
1252
1253
1254/**
1255 * Send a set element.
1256 *
1257 * @param cls the union operation `struct Operation *`
1258 * @param key unused
1259 * @param value the `struct ElementEntry *` to insert
1260 * into the key-to-element mapping
1261 * @return #GNUNET_YES (to continue iterating)
1262 */
1263static int
1264send_full_element_iterator (void *cls,
1265 const struct GNUNET_HashCode *key,
1266 void *value)
1267{
1268 struct Operation *op = cls;
1269 struct GNUNET_SETU_ElementMessage *emsg;
1270 struct ElementEntry *ee = value;
1271 struct GNUNET_SETU_Element *el = &ee->element;
1272 struct GNUNET_MQ_Envelope *ev;
1273
1274 LOG (GNUNET_ERROR_TYPE_DEBUG,
1275 "Sending element %s\n",
1276 GNUNET_h2s (key));
1277 ev = GNUNET_MQ_msg_extra (emsg,
1278 el->size,
1279 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
1280 emsg->element_type = htons (el->element_type);
1281 GNUNET_memcpy (&emsg[1],
1282 el->data,
1283 el->size);
1284 GNUNET_MQ_send (op->mq,
1285 ev);
1286 return GNUNET_YES;
1287}
1288
1289
1290/**
1291 * Switch to full set transmission for @a op.
1292 *
1293 * @param op operation to switch to full set transmission.
1294 */
1295static void
1296send_full_set (struct Operation *op)
1297{
1298 struct GNUNET_MQ_Envelope *ev;
1299
1300 op->phase = PHASE_FULL_SENDING;
1301 LOG (GNUNET_ERROR_TYPE_DEBUG,
1302 "Dedicing to transmit the full set\n");
1303 /* FIXME: use a more memory-friendly way of doing this with an
1304 iterator, just as we do in the non-full case! */
1305 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1306 &send_full_element_iterator,
1307 op);
1308 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
1309 GNUNET_MQ_send (op->mq,
1310 ev);
1311}
1312
1313
1314/**
1315 * Handle a strata estimator from a remote peer
1316 *
1317 * @param cls the union operation
1318 * @param msg the message
1319 */
1320static int
1321check_union_p2p_strata_estimator (void *cls,
1322 const struct StrataEstimatorMessage *msg)
1323{
1324 struct Operation *op = cls;
1325 int is_compressed;
1326 size_t len;
1327
1328 if (op->phase != PHASE_EXPECT_SE)
1329 {
1330 GNUNET_break (0);
1331 return GNUNET_SYSERR;
1332 }
1333 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1334 msg->header.type));
1335 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1336 if ((GNUNET_NO == is_compressed) &&
1337 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
1338 {
1339 GNUNET_break (0);
1340 return GNUNET_SYSERR;
1341 }
1342 return GNUNET_OK;
1343}
1344
1345
1346/**
1347 * Handle a strata estimator from a remote peer
1348 *
1349 * @param cls the union operation
1350 * @param msg the message
1351 */
1352static void
1353handle_union_p2p_strata_estimator (void *cls,
1354 const struct StrataEstimatorMessage *msg)
1355{
1356 struct Operation *op = cls;
1357 struct StrataEstimator *remote_se;
1358 unsigned int diff;
1359 uint64_t other_size;
1360 size_t len;
1361 int is_compressed;
1362
1363 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1364 msg->header.type));
1365 GNUNET_STATISTICS_update (_GSS_statistics,
1366 "# bytes of SE received",
1367 ntohs (msg->header.size),
1368 GNUNET_NO);
1369 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1370 other_size = GNUNET_ntohll (msg->set_size);
1371 remote_se = strata_estimator_create (SE_STRATA_COUNT,
1372 SE_IBF_SIZE,
1373 SE_IBF_HASH_NUM);
1374 if (NULL == remote_se)
1375 {
1376 /* insufficient resources, fail */
1377 fail_union_operation (op);
1378 return;
1379 }
1380 if (GNUNET_OK !=
1381 strata_estimator_read (&msg[1],
1382 len,
1383 is_compressed,
1384 remote_se))
1385 {
1386 /* decompression failed */
1387 strata_estimator_destroy (remote_se);
1388 fail_union_operation (op);
1389 return;
1390 }
1391 GNUNET_assert (NULL != op->se);
1392 diff = strata_estimator_difference (remote_se,
1393 op->se);
1394
1395 if (diff > 200)
1396 diff = diff * 3 / 2;
1397
1398 strata_estimator_destroy (remote_se);
1399 strata_estimator_destroy (op->se);
1400 op->se = NULL;
1401 LOG (GNUNET_ERROR_TYPE_DEBUG,
1402 "got se diff=%d, using ibf size %d\n",
1403 diff,
1404 1U << get_order_from_difference (diff));
1405
1406 {
1407 char *set_debug;
1408
1409 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
1410 if ((NULL != set_debug) &&
1411 (0 == strcmp (set_debug, "1")))
1412 {
1413 FILE *f = fopen ("set.log", "a");
1414 fprintf (f, "%llu\n", (unsigned long long) diff);
1415 fclose (f);
1416 }
1417 }
1418
1419 if ((GNUNET_YES == op->byzantine) &&
1420 (other_size < op->byzantine_lower_bound))
1421 {
1422 GNUNET_break (0);
1423 fail_union_operation (op);
1424 return;
1425 }
1426
1427 if ((GNUNET_YES == op->force_full) ||
1428 (diff > op->initial_size / 4) ||
1429 (0 == other_size))
1430 {
1431 LOG (GNUNET_ERROR_TYPE_DEBUG,
1432 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
1433 diff,
1434 op->initial_size);
1435 GNUNET_STATISTICS_update (_GSS_statistics,
1436 "# of full sends",
1437 1,
1438 GNUNET_NO);
1439 if ((op->initial_size <= other_size) ||
1440 (0 == other_size))
1441 {
1442 send_full_set (op);
1443 }
1444 else
1445 {
1446 struct GNUNET_MQ_Envelope *ev;
1447
1448 LOG (GNUNET_ERROR_TYPE_DEBUG,
1449 "Telling other peer that we expect its full set\n");
1450 op->phase = PHASE_EXPECT_IBF;
1451 ev = GNUNET_MQ_msg_header (
1452 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
1453 GNUNET_MQ_send (op->mq,
1454 ev);
1455 }
1456 }
1457 else
1458 {
1459 GNUNET_STATISTICS_update (_GSS_statistics,
1460 "# of ibf sends",
1461 1,
1462 GNUNET_NO);
1463 if (GNUNET_OK !=
1464 send_ibf (op,
1465 get_order_from_difference (diff)))
1466 {
1467 /* Internal error, best we can do is shut the connection */
1468 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1469 "Failed to send IBF, closing connection\n");
1470 fail_union_operation (op);
1471 return;
1472 }
1473 }
1474 GNUNET_CADET_receive_done (op->channel);
1475}
1476
1477
1478/**
1479 * Iterator to send elements to a remote peer
1480 *
1481 * @param cls closure with the element key and the union operation
1482 * @param key ignored
1483 * @param value the key entry
1484 */
1485static int
1486send_offers_iterator (void *cls,
1487 uint32_t key,
1488 void *value)
1489{
1490 struct SendElementClosure *sec = cls;
1491 struct Operation *op = sec->op;
1492 struct KeyEntry *ke = value;
1493 struct GNUNET_MQ_Envelope *ev;
1494 struct GNUNET_MessageHeader *mh;
1495
1496 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1497 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1498 return GNUNET_YES;
1499
1500 ev = GNUNET_MQ_msg_header_extra (mh,
1501 sizeof(struct GNUNET_HashCode),
1502 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
1503
1504 GNUNET_assert (NULL != ev);
1505 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1506 LOG (GNUNET_ERROR_TYPE_DEBUG,
1507 "[OP %x] sending element offer (%s) to peer\n",
1508 (void *) op,
1509 GNUNET_h2s (&ke->element->element_hash));
1510 GNUNET_MQ_send (op->mq, ev);
1511 return GNUNET_YES;
1512}
1513
1514
1515/**
1516 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1517 *
1518 * @param op union operation
1519 * @param ibf_key IBF key of interest
1520 */
1521static void
1522send_offers_for_key (struct Operation *op,
1523 struct IBF_Key ibf_key)
1524{
1525 struct SendElementClosure send_cls;
1526
1527 send_cls.ibf_key = ibf_key;
1528 send_cls.op = op;
1529 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1530 op->key_to_element,
1531 (uint32_t) ibf_key.
1532 key_val,
1533 &send_offers_iterator,
1534 &send_cls);
1535}
1536
1537
1538/**
1539 * Decode which elements are missing on each side, and
1540 * send the appropriate offers and inquiries.
1541 *
1542 * @param op union operation
1543 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1544 */
1545static int
1546decode_and_send (struct Operation *op)
1547{
1548 struct IBF_Key key;
1549 struct IBF_Key last_key;
1550 int side;
1551 unsigned int num_decoded;
1552 struct InvertibleBloomFilter *diff_ibf;
1553
1554 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->phase);
1555
1556 if (GNUNET_OK !=
1557 prepare_ibf (op,
1558 op->remote_ibf->size))
1559 {
1560 GNUNET_break (0);
1561 /* allocation failed */
1562 return GNUNET_SYSERR;
1563 }
1564 diff_ibf = ibf_dup (op->local_ibf);
1565 ibf_subtract (diff_ibf,
1566 op->remote_ibf);
1567
1568 ibf_destroy (op->remote_ibf);
1569 op->remote_ibf = NULL;
1570
1571 LOG (GNUNET_ERROR_TYPE_DEBUG,
1572 "decoding IBF (size=%u)\n",
1573 diff_ibf->size);
1574
1575 num_decoded = 0;
1576 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1577
1578 while (1)
1579 {
1580 int res;
1581 int cycle_detected = GNUNET_NO;
1582
1583 last_key = key;
1584
1585 res = ibf_decode (diff_ibf,
1586 &side,
1587 &key);
1588 if (res == GNUNET_OK)
1589 {
1590 LOG (GNUNET_ERROR_TYPE_DEBUG,
1591 "decoded ibf key %lx\n",
1592 (unsigned long) key.key_val);
1593 num_decoded += 1;
1594 if ((num_decoded > diff_ibf->size) ||
1595 ((num_decoded > 1) &&
1596 (last_key.key_val == key.key_val)))
1597 {
1598 LOG (GNUNET_ERROR_TYPE_DEBUG,
1599 "detected cyclic ibf (decoded %u/%u)\n",
1600 num_decoded,
1601 diff_ibf->size);
1602 cycle_detected = GNUNET_YES;
1603 }
1604 }
1605 if ((GNUNET_SYSERR == res) ||
1606 (GNUNET_YES == cycle_detected))
1607 {
1608 int next_order;
1609 next_order = 0;
1610 while (1 << next_order < diff_ibf->size)
1611 next_order++;
1612 next_order++;
1613 if (next_order <= MAX_IBF_ORDER)
1614 {
1615 LOG (GNUNET_ERROR_TYPE_DEBUG,
1616 "decoding failed, sending larger ibf (size %u)\n",
1617 1 << next_order);
1618 GNUNET_STATISTICS_update (_GSS_statistics,
1619 "# of IBF retries",
1620 1,
1621 GNUNET_NO);
1622 op->salt_send++;
1623 if (GNUNET_OK !=
1624 send_ibf (op, next_order))
1625 {
1626 /* Internal error, best we can do is shut the connection */
1627 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1628 "Failed to send IBF, closing connection\n");
1629 fail_union_operation (op);
1630 ibf_destroy (diff_ibf);
1631 return GNUNET_SYSERR;
1632 }
1633 }
1634 else
1635 {
1636 GNUNET_STATISTICS_update (_GSS_statistics,
1637 "# of failed union operations (too large)",
1638 1,
1639 GNUNET_NO);
1640 // XXX: Send the whole set, element-by-element
1641 LOG (GNUNET_ERROR_TYPE_ERROR,
1642 "set union failed: reached ibf limit\n");
1643 fail_union_operation (op);
1644 ibf_destroy (diff_ibf);
1645 return GNUNET_SYSERR;
1646 }
1647 break;
1648 }
1649 if (GNUNET_NO == res)
1650 {
1651 struct GNUNET_MQ_Envelope *ev;
1652
1653 LOG (GNUNET_ERROR_TYPE_DEBUG,
1654 "transmitted all values, sending DONE\n");
1655 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1656 GNUNET_MQ_send (op->mq, ev);
1657 /* We now wait until we get a DONE message back
1658 * and then wait for our MQ to be flushed and all our
1659 * demands be delivered. */
1660 break;
1661 }
1662 if (1 == side)
1663 {
1664 struct IBF_Key unsalted_key;
1665
1666 unsalt_key (&key,
1667 op->salt_receive,
1668 &unsalted_key);
1669 send_offers_for_key (op,
1670 unsalted_key);
1671 }
1672 else if (-1 == side)
1673 {
1674 struct GNUNET_MQ_Envelope *ev;
1675 struct InquiryMessage *msg;
1676
1677 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1678 * the effort additional complexity. */
1679 ev = GNUNET_MQ_msg_extra (msg,
1680 sizeof(struct IBF_Key),
1681 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY);
1682 msg->salt = htonl (op->salt_receive);
1683 GNUNET_memcpy (&msg[1],
1684 &key,
1685 sizeof(struct IBF_Key));
1686 LOG (GNUNET_ERROR_TYPE_DEBUG,
1687 "sending element inquiry for IBF key %lx\n",
1688 (unsigned long) key.key_val);
1689 GNUNET_MQ_send (op->mq, ev);
1690 }
1691 else
1692 {
1693 GNUNET_assert (0);
1694 }
1695 }
1696 ibf_destroy (diff_ibf);
1697 return GNUNET_OK;
1698}
1699
1700
1701/**
1702 * Check an IBF message from a remote peer.
1703 *
1704 * Reassemble the IBF from multiple pieces, and
1705 * process the whole IBF once possible.
1706 *
1707 * @param cls the union operation
1708 * @param msg the header of the message
1709 * @return #GNUNET_OK if @a msg is well-formed
1710 */
1711static int
1712check_union_p2p_ibf (void *cls,
1713 const struct IBFMessage *msg)
1714{
1715 struct Operation *op = cls;
1716 unsigned int buckets_in_message;
1717
1718 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1719 / IBF_BUCKET_SIZE;
1720 if (0 == buckets_in_message)
1721 {
1722 GNUNET_break_op (0);
1723 return GNUNET_SYSERR;
1724 }
1725 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1726 * IBF_BUCKET_SIZE)
1727 {
1728 GNUNET_break_op (0);
1729 return GNUNET_SYSERR;
1730 }
1731 if (op->phase == PHASE_EXPECT_IBF_CONT)
1732 {
1733 if (ntohl (msg->offset) != op->ibf_buckets_received)
1734 {
1735 GNUNET_break_op (0);
1736 return GNUNET_SYSERR;
1737 }
1738 if (1 << msg->order != op->remote_ibf->size)
1739 {
1740 GNUNET_break_op (0);
1741 return GNUNET_SYSERR;
1742 }
1743 if (ntohl (msg->salt) != op->salt_receive)
1744 {
1745 GNUNET_break_op (0);
1746 return GNUNET_SYSERR;
1747 }
1748 }
1749 else if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
1750 (op->phase != PHASE_EXPECT_IBF))
1751 {
1752 GNUNET_break_op (0);
1753 return GNUNET_SYSERR;
1754 }
1755
1756 return GNUNET_OK;
1757}
1758
1759
1760/**
1761 * Handle an IBF message from a remote peer.
1762 *
1763 * Reassemble the IBF from multiple pieces, and
1764 * process the whole IBF once possible.
1765 *
1766 * @param cls the union operation
1767 * @param msg the header of the message
1768 */
1769static void
1770handle_union_p2p_ibf (void *cls,
1771 const struct IBFMessage *msg)
1772{
1773 struct Operation *op = cls;
1774 unsigned int buckets_in_message;
1775
1776 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1777 / IBF_BUCKET_SIZE;
1778 if ((op->phase == PHASE_INVENTORY_PASSIVE) ||
1779 (op->phase == PHASE_EXPECT_IBF))
1780 {
1781 op->phase = PHASE_EXPECT_IBF_CONT;
1782 GNUNET_assert (NULL == op->remote_ibf);
1783 LOG (GNUNET_ERROR_TYPE_DEBUG,
1784 "Creating new ibf of size %u\n",
1785 1 << msg->order);
1786 op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1787 op->salt_receive = ntohl (msg->salt);
1788 LOG (GNUNET_ERROR_TYPE_DEBUG,
1789 "Receiving new IBF with salt %u\n",
1790 op->salt_receive);
1791 if (NULL == op->remote_ibf)
1792 {
1793 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1794 "Failed to parse remote IBF, closing connection\n");
1795 fail_union_operation (op);
1796 return;
1797 }
1798 op->ibf_buckets_received = 0;
1799 if (0 != ntohl (msg->offset))
1800 {
1801 GNUNET_break_op (0);
1802 fail_union_operation (op);
1803 return;
1804 }
1805 }
1806 else
1807 {
1808 GNUNET_assert (op->phase == PHASE_EXPECT_IBF_CONT);
1809 LOG (GNUNET_ERROR_TYPE_DEBUG,
1810 "Received more of IBF\n");
1811 }
1812 GNUNET_assert (NULL != op->remote_ibf);
1813
1814 ibf_read_slice (&msg[1],
1815 op->ibf_buckets_received,
1816 buckets_in_message,
1817 op->remote_ibf);
1818 op->ibf_buckets_received += buckets_in_message;
1819
1820 if (op->ibf_buckets_received == op->remote_ibf->size)
1821 {
1822 LOG (GNUNET_ERROR_TYPE_DEBUG,
1823 "received full ibf\n");
1824 op->phase = PHASE_INVENTORY_ACTIVE;
1825 if (GNUNET_OK !=
1826 decode_and_send (op))
1827 {
1828 /* Internal error, best we can do is shut down */
1829 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1830 "Failed to decode IBF, closing connection\n");
1831 fail_union_operation (op);
1832 return;
1833 }
1834 }
1835 GNUNET_CADET_receive_done (op->channel);
1836}
1837
1838
1839/**
1840 * Send a result message to the client indicating
1841 * that there is a new element.
1842 *
1843 * @param op union operation
1844 * @param element element to send
1845 * @param status status to send with the new element
1846 */
1847static void
1848send_client_element (struct Operation *op,
1849 const struct GNUNET_SETU_Element *element,
1850 enum GNUNET_SETU_Status status)
1851{
1852 struct GNUNET_MQ_Envelope *ev;
1853 struct GNUNET_SETU_ResultMessage *rm;
1854
1855 LOG (GNUNET_ERROR_TYPE_DEBUG,
1856 "sending element (size %u) to client\n",
1857 element->size);
1858 GNUNET_assert (0 != op->client_request_id);
1859 ev = GNUNET_MQ_msg_extra (rm,
1860 element->size,
1861 GNUNET_MESSAGE_TYPE_SETU_RESULT);
1862 if (NULL == ev)
1863 {
1864 GNUNET_MQ_discard (ev);
1865 GNUNET_break (0);
1866 return;
1867 }
1868 rm->result_status = htons (status);
1869 rm->request_id = htonl (op->client_request_id);
1870 rm->element_type = htons (element->element_type);
1871 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1872 op->key_to_element));
1873 GNUNET_memcpy (&rm[1],
1874 element->data,
1875 element->size);
1876 GNUNET_MQ_send (op->set->cs->mq,
1877 ev);
1878}
1879
1880
1881/**
1882 * Tests if the operation is finished, and if so notify.
1883 *
1884 * @param op operation to check
1885 */
1886static void
1887maybe_finish (struct Operation *op)
1888{
1889 unsigned int num_demanded;
1890
1891 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1892 op->demanded_hashes);
1893
1894 if (PHASE_FINISH_WAITING == op->phase)
1895 {
1896 LOG (GNUNET_ERROR_TYPE_DEBUG,
1897 "In PHASE_FINISH_WAITING, pending %u demands\n",
1898 num_demanded);
1899 if (0 == num_demanded)
1900 {
1901 struct GNUNET_MQ_Envelope *ev;
1902
1903 op->phase = PHASE_DONE;
1904 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
1905 GNUNET_MQ_send (op->mq,
1906 ev);
1907 /* We now wait until the other peer sends P2P_OVER
1908 * after it got all elements from us. */
1909 }
1910 }
1911 if (PHASE_FINISH_CLOSING == op->phase)
1912 {
1913 LOG (GNUNET_ERROR_TYPE_DEBUG,
1914 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1915 num_demanded);
1916 if (0 == num_demanded)
1917 {
1918 op->phase = PHASE_DONE;
1919 send_client_done (op);
1920 _GSS_operation_destroy2 (op);
1921 }
1922 }
1923}
1924
1925
1926/**
1927 * Check an element message from a remote peer.
1928 *
1929 * @param cls the union operation
1930 * @param emsg the message
1931 */
1932static int
1933check_union_p2p_elements (void *cls,
1934 const struct GNUNET_SETU_ElementMessage *emsg)
1935{
1936 struct Operation *op = cls;
1937
1938 if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
1939 {
1940 GNUNET_break_op (0);
1941 return GNUNET_SYSERR;
1942 }
1943 return GNUNET_OK;
1944}
1945
1946
1947/**
1948 * Handle an element message from a remote peer.
1949 * Sent by the other peer either because we decoded an IBF and placed a demand,
1950 * or because the other peer switched to full set transmission.
1951 *
1952 * @param cls the union operation
1953 * @param emsg the message
1954 */
1955static void
1956handle_union_p2p_elements (void *cls,
1957 const struct GNUNET_SETU_ElementMessage *emsg)
1958{
1959 struct Operation *op = cls;
1960 struct ElementEntry *ee;
1961 struct KeyEntry *ke;
1962 uint16_t element_size;
1963
1964 element_size = ntohs (emsg->header.size) - sizeof(struct
1965 GNUNET_SETU_ElementMessage);
1966 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1967 GNUNET_memcpy (&ee[1],
1968 &emsg[1],
1969 element_size);
1970 ee->element.size = element_size;
1971 ee->element.data = &ee[1];
1972 ee->element.element_type = ntohs (emsg->element_type);
1973 ee->remote = GNUNET_YES;
1974 GNUNET_SETU_element_hash (&ee->element,
1975 &ee->element_hash);
1976 if (GNUNET_NO ==
1977 GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
1978 &ee->element_hash,
1979 NULL))
1980 {
1981 /* We got something we didn't demand, since it's not in our map. */
1982 GNUNET_break_op (0);
1983 fail_union_operation (op);
1984 return;
1985 }
1986
1987 LOG (GNUNET_ERROR_TYPE_DEBUG,
1988 "Got element (size %u, hash %s) from peer\n",
1989 (unsigned int) element_size,
1990 GNUNET_h2s (&ee->element_hash));
1991
1992 GNUNET_STATISTICS_update (_GSS_statistics,
1993 "# received elements",
1994 1,
1995 GNUNET_NO);
1996 GNUNET_STATISTICS_update (_GSS_statistics,
1997 "# exchanged elements",
1998 1,
1999 GNUNET_NO);
2000
2001 op->received_total++;
2002
2003 ke = op_get_element (op,
2004 &ee->element_hash);
2005 if (NULL != ke)
2006 {
2007 /* Got repeated element. Should not happen since
2008 * we track demands. */
2009 GNUNET_STATISTICS_update (_GSS_statistics,
2010 "# repeated elements",
2011 1,
2012 GNUNET_NO);
2013 ke->received = GNUNET_YES;
2014 GNUNET_free (ee);
2015 }
2016 else
2017 {
2018 LOG (GNUNET_ERROR_TYPE_DEBUG,
2019 "Registering new element from remote peer\n");
2020 op->received_fresh++;
2021 op_register_element (op, ee, GNUNET_YES);
2022 /* only send results immediately if the client wants it */
2023 send_client_element (op,
2024 &ee->element,
2025 GNUNET_SETU_STATUS_ADD_LOCAL);
2026 }
2027
2028 if ((op->received_total > 8) &&
2029 (op->received_fresh < op->received_total / 3))
2030 {
2031 /* The other peer gave us lots of old elements, there's something wrong. */
2032 GNUNET_break_op (0);
2033 fail_union_operation (op);
2034 return;
2035 }
2036 GNUNET_CADET_receive_done (op->channel);
2037 maybe_finish (op);
2038}
2039
2040
2041/**
2042 * Check a full element message from a remote peer.
2043 *
2044 * @param cls the union operation
2045 * @param emsg the message
2046 */
2047static int
2048check_union_p2p_full_element (void *cls,
2049 const struct GNUNET_SETU_ElementMessage *emsg)
2050{
2051 struct Operation *op = cls;
2052
2053 (void) op;
2054 // FIXME: check that we expect full elements here?
2055 return GNUNET_OK;
2056}
2057
2058
2059/**
2060 * Handle an element message from a remote peer.
2061 *
2062 * @param cls the union operation
2063 * @param emsg the message
2064 */
2065static void
2066handle_union_p2p_full_element (void *cls,
2067 const struct GNUNET_SETU_ElementMessage *emsg)
2068{
2069 struct Operation *op = cls;
2070 struct ElementEntry *ee;
2071 struct KeyEntry *ke;
2072 uint16_t element_size;
2073
2074 element_size = ntohs (emsg->header.size)
2075 - sizeof(struct GNUNET_SETU_ElementMessage);
2076 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2077 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
2078 ee->element.size = element_size;
2079 ee->element.data = &ee[1];
2080 ee->element.element_type = ntohs (emsg->element_type);
2081 ee->remote = GNUNET_YES;
2082 GNUNET_SETU_element_hash (&ee->element,
2083 &ee->element_hash);
2084 LOG (GNUNET_ERROR_TYPE_DEBUG,
2085 "Got element (full diff, size %u, hash %s) from peer\n",
2086 (unsigned int) element_size,
2087 GNUNET_h2s (&ee->element_hash));
2088
2089 GNUNET_STATISTICS_update (_GSS_statistics,
2090 "# received elements",
2091 1,
2092 GNUNET_NO);
2093 GNUNET_STATISTICS_update (_GSS_statistics,
2094 "# exchanged elements",
2095 1,
2096 GNUNET_NO);
2097
2098 op->received_total++;
2099
2100 ke = op_get_element (op,
2101 &ee->element_hash);
2102 if (NULL != ke)
2103 {
2104 /* Got repeated element. Should not happen since
2105 * we track demands. */
2106 GNUNET_STATISTICS_update (_GSS_statistics,
2107 "# repeated elements",
2108 1,
2109 GNUNET_NO);
2110 ke->received = GNUNET_YES;
2111 GNUNET_free (ee);
2112 }
2113 else
2114 {
2115 LOG (GNUNET_ERROR_TYPE_DEBUG,
2116 "Registering new element from remote peer\n");
2117 op->received_fresh++;
2118 op_register_element (op, ee, GNUNET_YES);
2119 /* only send results immediately if the client wants it */
2120 send_client_element (op,
2121 &ee->element,
2122 GNUNET_SETU_STATUS_ADD_LOCAL);
2123 }
2124
2125 if ((GNUNET_YES == op->byzantine) &&
2126 (op->received_total > 384 + op->received_fresh * 4) &&
2127 (op->received_fresh < op->received_total / 6))
2128 {
2129 /* The other peer gave us lots of old elements, there's something wrong. */
2130 LOG (GNUNET_ERROR_TYPE_ERROR,
2131 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
2132 (unsigned long long) op->received_fresh,
2133 (unsigned long long) op->received_total);
2134 GNUNET_break_op (0);
2135 fail_union_operation (op);
2136 return;
2137 }
2138 GNUNET_CADET_receive_done (op->channel);
2139}
2140
2141
2142/**
2143 * Send offers (for GNUNET_Hash-es) in response
2144 * to inquiries (for IBF_Key-s).
2145 *
2146 * @param cls the union operation
2147 * @param msg the message
2148 */
2149static int
2150check_union_p2p_inquiry (void *cls,
2151 const struct InquiryMessage *msg)
2152{
2153 struct Operation *op = cls;
2154 unsigned int num_keys;
2155
2156 if (op->phase != PHASE_INVENTORY_PASSIVE)
2157 {
2158 GNUNET_break_op (0);
2159 return GNUNET_SYSERR;
2160 }
2161 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2162 / sizeof(struct IBF_Key);
2163 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2164 != num_keys * sizeof(struct IBF_Key))
2165 {
2166 GNUNET_break_op (0);
2167 return GNUNET_SYSERR;
2168 }
2169 return GNUNET_OK;
2170}
2171
2172
2173/**
2174 * Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
2175 *
2176 * @param cls the union operation
2177 * @param msg the message
2178 */
2179static void
2180handle_union_p2p_inquiry (void *cls,
2181 const struct InquiryMessage *msg)
2182{
2183 struct Operation *op = cls;
2184 const struct IBF_Key *ibf_key;
2185 unsigned int num_keys;
2186
2187 LOG (GNUNET_ERROR_TYPE_DEBUG,
2188 "Received union inquiry\n");
2189 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2190 / sizeof(struct IBF_Key);
2191 ibf_key = (const struct IBF_Key *) &msg[1];
2192 while (0 != num_keys--)
2193 {
2194 struct IBF_Key unsalted_key;
2195
2196 unsalt_key (ibf_key,
2197 ntohl (msg->salt),
2198 &unsalted_key);
2199 send_offers_for_key (op,
2200 unsalted_key);
2201 ibf_key++;
2202 }
2203 GNUNET_CADET_receive_done (op->channel);
2204}
2205
2206
2207/**
2208 * Iterator over hash map entries, called to destroy the linked list of
2209 * colliding ibf key entries.
2210 *
2211 * @param cls closure
2212 * @param key current key code
2213 * @param value value in the hash map
2214 * @return #GNUNET_YES if we should continue to iterate,
2215 * #GNUNET_NO if not.
2216 */
2217static int
2218send_missing_full_elements_iter (void *cls,
2219 uint32_t key,
2220 void *value)
2221{
2222 struct Operation *op = cls;
2223 struct KeyEntry *ke = value;
2224 struct GNUNET_MQ_Envelope *ev;
2225 struct GNUNET_SETU_ElementMessage *emsg;
2226 struct ElementEntry *ee = ke->element;
2227
2228 if (GNUNET_YES == ke->received)
2229 return GNUNET_YES;
2230 ev = GNUNET_MQ_msg_extra (emsg,
2231 ee->element.size,
2232 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
2233 GNUNET_memcpy (&emsg[1],
2234 ee->element.data,
2235 ee->element.size);
2236 emsg->element_type = htons (ee->element.element_type);
2237 GNUNET_MQ_send (op->mq,
2238 ev);
2239 return GNUNET_YES;
2240}
2241
2242
2243/**
2244 * Handle a request for full set transmission.
2245 *
2246 * @parem cls closure, a set union operation
2247 * @param mh the demand message
2248 */
2249static void
2250handle_union_p2p_request_full (void *cls,
2251 const struct GNUNET_MessageHeader *mh)
2252{
2253 struct Operation *op = cls;
2254
2255 LOG (GNUNET_ERROR_TYPE_DEBUG,
2256 "Received request for full set transmission\n");
2257 if (PHASE_EXPECT_IBF != op->phase)
2258 {
2259 GNUNET_break_op (0);
2260 fail_union_operation (op);
2261 return;
2262 }
2263
2264 // FIXME: we need to check that our set is larger than the
2265 // byzantine_lower_bound by some threshold
2266 send_full_set (op);
2267 GNUNET_CADET_receive_done (op->channel);
2268}
2269
2270
2271/**
2272 * Handle a "full done" message.
2273 *
2274 * @parem cls closure, a set union operation
2275 * @param mh the demand message
2276 */
2277static void
2278handle_union_p2p_full_done (void *cls,
2279 const struct GNUNET_MessageHeader *mh)
2280{
2281 struct Operation *op = cls;
2282
2283 switch (op->phase)
2284 {
2285 case PHASE_EXPECT_IBF:
2286 {
2287 struct GNUNET_MQ_Envelope *ev;
2288
2289 LOG (GNUNET_ERROR_TYPE_DEBUG,
2290 "got FULL DONE, sending elements that other peer is missing\n");
2291
2292 /* send all the elements that did not come from the remote peer */
2293 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
2294 &send_missing_full_elements_iter,
2295 op);
2296 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
2297 GNUNET_MQ_send (op->mq,
2298 ev);
2299 op->phase = PHASE_DONE;
2300 /* we now wait until the other peer sends us the OVER message*/
2301 }
2302 break;
2303
2304 case PHASE_FULL_SENDING:
2305 {
2306 LOG (GNUNET_ERROR_TYPE_DEBUG,
2307 "got FULL DONE, finishing\n");
2308 /* We sent the full set, and got the response for that. We're done. */
2309 op->phase = PHASE_DONE;
2310 GNUNET_CADET_receive_done (op->channel);
2311 send_client_done (op);
2312 _GSS_operation_destroy2 (op);
2313 return;
2314 }
2315 break;
2316
2317 default:
2318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2319 "Handle full done phase is %u\n",
2320 (unsigned) op->phase);
2321 GNUNET_break_op (0);
2322 fail_union_operation (op);
2323 return;
2324 }
2325 GNUNET_CADET_receive_done (op->channel);
2326}
2327
2328
2329/**
2330 * Check a demand by the other peer for elements based on a list
2331 * of `struct GNUNET_HashCode`s.
2332 *
2333 * @parem cls closure, a set union operation
2334 * @param mh the demand message
2335 * @return #GNUNET_OK if @a mh is well-formed
2336 */
2337static int
2338check_union_p2p_demand (void *cls,
2339 const struct GNUNET_MessageHeader *mh)
2340{
2341 struct Operation *op = cls;
2342 unsigned int num_hashes;
2343
2344 (void) op;
2345 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2346 / sizeof(struct GNUNET_HashCode);
2347 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2348 != num_hashes * sizeof(struct GNUNET_HashCode))
2349 {
2350 GNUNET_break_op (0);
2351 return GNUNET_SYSERR;
2352 }
2353 return GNUNET_OK;
2354}
2355
2356
2357/**
2358 * Handle a demand by the other peer for elements based on a list
2359 * of `struct GNUNET_HashCode`s.
2360 *
2361 * @parem cls closure, a set union operation
2362 * @param mh the demand message
2363 */
2364static void
2365handle_union_p2p_demand (void *cls,
2366 const struct GNUNET_MessageHeader *mh)
2367{
2368 struct Operation *op = cls;
2369 struct ElementEntry *ee;
2370 struct GNUNET_SETU_ElementMessage *emsg;
2371 const struct GNUNET_HashCode *hash;
2372 unsigned int num_hashes;
2373 struct GNUNET_MQ_Envelope *ev;
2374
2375 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2376 / sizeof(struct GNUNET_HashCode);
2377 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2378 num_hashes > 0;
2379 hash++, num_hashes--)
2380 {
2381 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2382 hash);
2383 if (NULL == ee)
2384 {
2385 /* Demand for non-existing element. */
2386 GNUNET_break_op (0);
2387 fail_union_operation (op);
2388 return;
2389 }
2390 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2391 {
2392 /* Probably confused lazily copied sets. */
2393 GNUNET_break_op (0);
2394 fail_union_operation (op);
2395 return;
2396 }
2397 ev = GNUNET_MQ_msg_extra (emsg,
2398 ee->element.size,
2399 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
2400 GNUNET_memcpy (&emsg[1],
2401 ee->element.data,
2402 ee->element.size);
2403 emsg->reserved = htons (0);
2404 emsg->element_type = htons (ee->element.element_type);
2405 LOG (GNUNET_ERROR_TYPE_DEBUG,
2406 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2407 (void *) op,
2408 (unsigned int) ee->element.size,
2409 GNUNET_h2s (&ee->element_hash));
2410 GNUNET_MQ_send (op->mq, ev);
2411 GNUNET_STATISTICS_update (_GSS_statistics,
2412 "# exchanged elements",
2413 1,
2414 GNUNET_NO);
2415 if (op->symmetric)
2416 send_client_element (op,
2417 &ee->element,
2418 GNUNET_SET_STATUS_ADD_REMOTE);
2419 }
2420 GNUNET_CADET_receive_done (op->channel);
2421}
2422
2423
2424/**
2425 * Check offer (of `struct GNUNET_HashCode`s).
2426 *
2427 * @param cls the union operation
2428 * @param mh the message
2429 * @return #GNUNET_OK if @a mh is well-formed
2430 */
2431static int
2432check_union_p2p_offer (void *cls,
2433 const struct GNUNET_MessageHeader *mh)
2434{
2435 struct Operation *op = cls;
2436 unsigned int num_hashes;
2437
2438 /* look up elements and send them */
2439 if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
2440 (op->phase != PHASE_INVENTORY_ACTIVE))
2441 {
2442 GNUNET_break_op (0);
2443 return GNUNET_SYSERR;
2444 }
2445 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2446 / sizeof(struct GNUNET_HashCode);
2447 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2448 num_hashes * sizeof(struct GNUNET_HashCode))
2449 {
2450 GNUNET_break_op (0);
2451 return GNUNET_SYSERR;
2452 }
2453 return GNUNET_OK;
2454}
2455
2456
2457/**
2458 * Handle offers (of `struct GNUNET_HashCode`s) and
2459 * respond with demands (of `struct GNUNET_HashCode`s).
2460 *
2461 * @param cls the union operation
2462 * @param mh the message
2463 */
2464static void
2465handle_union_p2p_offer (void *cls,
2466 const struct GNUNET_MessageHeader *mh)
2467{
2468 struct Operation *op = cls;
2469 const struct GNUNET_HashCode *hash;
2470 unsigned int num_hashes;
2471
2472 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2473 / sizeof(struct GNUNET_HashCode);
2474 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2475 num_hashes > 0;
2476 hash++, num_hashes--)
2477 {
2478 struct ElementEntry *ee;
2479 struct GNUNET_MessageHeader *demands;
2480 struct GNUNET_MQ_Envelope *ev;
2481
2482 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2483 hash);
2484 if (NULL != ee)
2485 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2486 continue;
2487
2488 if (GNUNET_YES ==
2489 GNUNET_CONTAINER_multihashmap_contains (op->demanded_hashes,
2490 hash))
2491 {
2492 LOG (GNUNET_ERROR_TYPE_DEBUG,
2493 "Skipped sending duplicate demand\n");
2494 continue;
2495 }
2496
2497 GNUNET_assert (GNUNET_OK ==
2498 GNUNET_CONTAINER_multihashmap_put (
2499 op->demanded_hashes,
2500 hash,
2501 NULL,
2502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2503
2504 LOG (GNUNET_ERROR_TYPE_DEBUG,
2505 "[OP %x] Requesting element (hash %s)\n",
2506 (void *) op, GNUNET_h2s (hash));
2507 ev = GNUNET_MQ_msg_header_extra (demands,
2508 sizeof(struct GNUNET_HashCode),
2509 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
2510 GNUNET_memcpy (&demands[1],
2511 hash,
2512 sizeof(struct GNUNET_HashCode));
2513 GNUNET_MQ_send (op->mq, ev);
2514 }
2515 GNUNET_CADET_receive_done (op->channel);
2516}
2517
2518
2519/**
2520 * Handle a done message from a remote peer
2521 *
2522 * @param cls the union operation
2523 * @param mh the message
2524 */
2525static void
2526handle_union_p2p_done (void *cls,
2527 const struct GNUNET_MessageHeader *mh)
2528{
2529 struct Operation *op = cls;
2530
2531 switch (op->phase)
2532 {
2533 case PHASE_INVENTORY_PASSIVE:
2534 /* We got all requests, but still have to send our elements in response. */
2535 op->phase = PHASE_FINISH_WAITING;
2536 LOG (GNUNET_ERROR_TYPE_DEBUG,
2537 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2538 /* The active peer is done sending offers
2539 * and inquiries. This means that all
2540 * our responses to that (demands and offers)
2541 * must be in flight (queued or in mesh).
2542 *
2543 * We should notify the active peer once
2544 * all our demands are satisfied, so that the active
2545 * peer can quit if we gave it everything.
2546 */GNUNET_CADET_receive_done (op->channel);
2547 maybe_finish (op);
2548 return;
2549 case PHASE_INVENTORY_ACTIVE:
2550 LOG (GNUNET_ERROR_TYPE_DEBUG,
2551 "got DONE (as active partner), waiting to finish\n");
2552 /* All demands of the other peer are satisfied,
2553 * and we processed all offers, thus we know
2554 * exactly what our demands must be.
2555 *
2556 * We'll close the channel
2557 * to the other peer once our demands are met.
2558 */op->phase = PHASE_FINISH_CLOSING;
2559 GNUNET_CADET_receive_done (op->channel);
2560 maybe_finish (op);
2561 return;
2562 default:
2563 GNUNET_break_op (0);
2564 fail_union_operation (op);
2565 return;
2566 }
2567}
2568
2569
2570/**
2571 * Handle a over message from a remote peer
2572 *
2573 * @param cls the union operation
2574 * @param mh the message
2575 */
2576static void
2577handle_union_p2p_over (void *cls,
2578 const struct GNUNET_MessageHeader *mh)
2579{
2580 send_client_done (cls);
2581}
2582
2583
2584/**
2585 * Get the incoming socket associated with the given id.
2586 *
2587 * @param listener the listener to look in
2588 * @param id id to look for
2589 * @return the incoming socket associated with the id,
2590 * or NULL if there is none
2591 */
2592static struct Operation *
2593get_incoming (uint32_t id)
2594{
2595 for (struct Listener *listener = listener_head;
2596 NULL != listener;
2597 listener = listener->next)
2598 {
2599 for (struct Operation *op = listener->op_head;
2600 NULL != op;
2601 op = op->next)
2602 if (op->suggest_id == id)
2603 return op;
2604 }
2605 return NULL;
2606}
2607
2608
2609/**
2610 * Callback called when a client connects to the service.
2611 *
2612 * @param cls closure for the service
2613 * @param c the new client that connected to the service
2614 * @param mq the message queue used to send messages to the client
2615 * @return @a `struct ClientState`
2616 */
2617static void *
2618client_connect_cb (void *cls,
2619 struct GNUNET_SERVICE_Client *c,
2620 struct GNUNET_MQ_Handle *mq)
2621{
2622 struct ClientState *cs;
2623
2624 num_clients++;
2625 cs = GNUNET_new (struct ClientState);
2626 cs->client = c;
2627 cs->mq = mq;
2628 return cs;
2629}
2630
2631
2632/**
2633 * Iterator over hash map entries to free element entries.
2634 *
2635 * @param cls closure
2636 * @param key current key code
2637 * @param value a `struct ElementEntry *` to be free'd
2638 * @return #GNUNET_YES (continue to iterate)
2639 */
2640static int
2641destroy_elements_iterator (void *cls,
2642 const struct GNUNET_HashCode *key,
2643 void *value)
2644{
2645 struct ElementEntry *ee = value;
2646
2647 GNUNET_free (ee);
2648 return GNUNET_YES;
2649}
2650
2651
2652/**
2653 * Clean up after a client has disconnected
2654 *
2655 * @param cls closure, unused
2656 * @param client the client to clean up after
2657 * @param internal_cls the `struct ClientState`
2658 */
2659static void
2660client_disconnect_cb (void *cls,
2661 struct GNUNET_SERVICE_Client *client,
2662 void *internal_cls)
2663{
2664 struct ClientState *cs = internal_cls;
2665 struct Operation *op;
2666 struct Listener *listener;
2667 struct Set *set;
2668
2669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2670 "Client disconnected, cleaning up\n");
2671 if (NULL != (set = cs->set))
2672 {
2673 struct SetContent *content = set->content;
2674
2675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2676 "Destroying client's set\n");
2677 /* Destroy pending set operations */
2678 while (NULL != set->ops_head)
2679 _GSS_operation_destroy (set->ops_head);
2680
2681 /* Destroy operation-specific state */
2682 if (NULL != set->se)
2683 {
2684 strata_estimator_destroy (set->se);
2685 set->se = NULL;
2686 }
2687 /* free set content (or at least decrement RC) */
2688 set->content = NULL;
2689 GNUNET_assert (0 != content->refcount);
2690 content->refcount--;
2691 if (0 == content->refcount)
2692 {
2693 GNUNET_assert (NULL != content->elements);
2694 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
2695 &destroy_elements_iterator,
2696 NULL);
2697 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
2698 content->elements = NULL;
2699 GNUNET_free (content);
2700 }
2701 GNUNET_free (set);
2702 }
2703
2704 if (NULL != (listener = cs->listener))
2705 {
2706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707 "Destroying client's listener\n");
2708 GNUNET_CADET_close_port (listener->open_port);
2709 listener->open_port = NULL;
2710 while (NULL != (op = listener->op_head))
2711 {
2712 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2713 "Destroying incoming operation `%u' from peer `%s'\n",
2714 (unsigned int) op->client_request_id,
2715 GNUNET_i2s (&op->peer));
2716 incoming_destroy (op);
2717 }
2718 GNUNET_CONTAINER_DLL_remove (listener_head,
2719 listener_tail,
2720 listener);
2721 GNUNET_free (listener);
2722 }
2723 GNUNET_free (cs);
2724 num_clients--;
2725 if ( (GNUNET_YES == in_shutdown) &&
2726 (0 == num_clients) )
2727 {
2728 if (NULL != cadet)
2729 {
2730 GNUNET_CADET_disconnect (cadet);
2731 cadet = NULL;
2732 }
2733 }
2734}
2735
2736
2737/**
2738 * Check a request for a set operation from another peer.
2739 *
2740 * @param cls the operation state
2741 * @param msg the received message
2742 * @return #GNUNET_OK if the channel should be kept alive,
2743 * #GNUNET_SYSERR to destroy the channel
2744 */
2745static int
2746check_incoming_msg (void *cls,
2747 const struct OperationRequestMessage *msg)
2748{
2749 struct Operation *op = cls;
2750 struct Listener *listener = op->listener;
2751 const struct GNUNET_MessageHeader *nested_context;
2752
2753 /* double operation request */
2754 if (0 != op->suggest_id)
2755 {
2756 GNUNET_break_op (0);
2757 return GNUNET_SYSERR;
2758 }
2759 /* This should be equivalent to the previous condition, but can't hurt to check twice */
2760 if (NULL == listener)
2761 {
2762 GNUNET_break (0);
2763 return GNUNET_SYSERR;
2764 }
2765 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2766 if ((NULL != nested_context) &&
2767 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
2768 {
2769 GNUNET_break_op (0);
2770 return GNUNET_SYSERR;
2771 }
2772 return GNUNET_OK;
2773}
2774
2775
2776/**
2777 * Handle a request for a set operation from another peer. Checks if we
2778 * have a listener waiting for such a request (and in that case initiates
2779 * asking the listener about accepting the connection). If no listener
2780 * is waiting, we queue the operation request in hope that a listener
2781 * shows up soon (before timeout).
2782 *
2783 * This msg is expected as the first and only msg handled through the
2784 * non-operation bound virtual table, acceptance of this operation replaces
2785 * our virtual table and subsequent msgs would be routed differently (as
2786 * we then know what type of operation this is).
2787 *
2788 * @param cls the operation state
2789 * @param msg the received message
2790 * @return #GNUNET_OK if the channel should be kept alive,
2791 * #GNUNET_SYSERR to destroy the channel
2792 */
2793static void
2794handle_incoming_msg (void *cls,
2795 const struct OperationRequestMessage *msg)
2796{
2797 struct Operation *op = cls;
2798 struct Listener *listener = op->listener;
2799 const struct GNUNET_MessageHeader *nested_context;
2800 struct GNUNET_MQ_Envelope *env;
2801 struct GNUNET_SETU_RequestMessage *cmsg;
2802
2803 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2804 /* Make a copy of the nested_context (application-specific context
2805 information that is opaque to set) so we can pass it to the
2806 listener later on */
2807 if (NULL != nested_context)
2808 op->context_msg = GNUNET_copy_message (nested_context);
2809 op->remote_element_count = ntohl (msg->element_count);
2810 GNUNET_log (
2811 GNUNET_ERROR_TYPE_DEBUG,
2812 "Received P2P operation request (port %s) for active listener\n",
2813 GNUNET_h2s (&op->listener->app_id));
2814 GNUNET_assert (0 == op->suggest_id);
2815 if (0 == suggest_id)
2816 suggest_id++;
2817 op->suggest_id = suggest_id++;
2818 GNUNET_assert (NULL != op->timeout_task);
2819 GNUNET_SCHEDULER_cancel (op->timeout_task);
2820 op->timeout_task = NULL;
2821 env = GNUNET_MQ_msg_nested_mh (cmsg,
2822 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
2823 op->context_msg);
2824 GNUNET_log (
2825 GNUNET_ERROR_TYPE_DEBUG,
2826 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
2827 op->suggest_id,
2828 listener,
2829 listener->cs);
2830 cmsg->accept_id = htonl (op->suggest_id);
2831 cmsg->peer_id = op->peer;
2832 GNUNET_MQ_send (listener->cs->mq,
2833 env);
2834 /* NOTE: GNUNET_CADET_receive_done() will be called in
2835 #handle_client_accept() */
2836}
2837
2838
2839/**
2840 * Called when a client wants to create a new set. This is typically
2841 * the first request from a client, and includes the type of set
2842 * operation to be performed.
2843 *
2844 * @param cls client that sent the message
2845 * @param m message sent by the client
2846 */
2847static void
2848handle_client_create_set (void *cls,
2849 const struct GNUNET_SETU_CreateMessage *msg)
2850{
2851 struct ClientState *cs = cls;
2852 struct Set *set;
2853
2854 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2855 "Client created new set for union operation\n");
2856 if (NULL != cs->set)
2857 {
2858 /* There can only be one set per client */
2859 GNUNET_break (0);
2860 GNUNET_SERVICE_client_drop (cs->client);
2861 return;
2862 }
2863 set = GNUNET_new (struct Set);
2864 {
2865 struct StrataEstimator *se;
2866
2867 se = strata_estimator_create (SE_STRATA_COUNT,
2868 SE_IBF_SIZE,
2869 SE_IBF_HASH_NUM);
2870 if (NULL == se)
2871 {
2872 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2873 "Failed to allocate strata estimator\n");
2874 GNUNET_free (set);
2875 GNUNET_SERVICE_client_drop (cs->client);
2876 return;
2877 }
2878 set->se = se;
2879 }
2880 set->content = GNUNET_new (struct SetContent);
2881 set->content->refcount = 1;
2882 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
2883 GNUNET_YES);
2884 set->cs = cs;
2885 cs->set = set;
2886 GNUNET_SERVICE_client_continue (cs->client);
2887}
2888
2889
2890/**
2891 * Timeout happens iff:
2892 * - we suggested an operation to our listener,
2893 * but did not receive a response in time
2894 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
2895 *
2896 * @param cls channel context
2897 * @param tc context information (why was this task triggered now)
2898 */
2899static void
2900incoming_timeout_cb (void *cls)
2901{
2902 struct Operation *op = cls;
2903
2904 op->timeout_task = NULL;
2905 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2906 "Remote peer's incoming request timed out\n");
2907 incoming_destroy (op);
2908}
2909
2910
2911/**
2912 * Method called whenever another peer has added us to a channel the
2913 * other peer initiated. Only called (once) upon reception of data
2914 * from a channel we listen on.
2915 *
2916 * The channel context represents the operation itself and gets added
2917 * to a DLL, from where it gets looked up when our local listener
2918 * client responds to a proposed/suggested operation or connects and
2919 * associates with this operation.
2920 *
2921 * @param cls closure
2922 * @param channel new handle to the channel
2923 * @param source peer that started the channel
2924 * @return initial channel context for the channel
2925 * returns NULL on error
2926 */
2927static void *
2928channel_new_cb (void *cls,
2929 struct GNUNET_CADET_Channel *channel,
2930 const struct GNUNET_PeerIdentity *source)
2931{
2932 struct Listener *listener = cls;
2933 struct Operation *op;
2934
2935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2936 "New incoming channel\n");
2937 op = GNUNET_new (struct Operation);
2938 op->listener = listener;
2939 op->peer = *source;
2940 op->channel = channel;
2941 op->mq = GNUNET_CADET_get_mq (op->channel);
2942 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2943 UINT32_MAX);
2944 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
2945 &incoming_timeout_cb,
2946 op);
2947 GNUNET_CONTAINER_DLL_insert (listener->op_head,
2948 listener->op_tail,
2949 op);
2950 return op;
2951}
2952
2953
2954/**
2955 * Function called whenever a channel is destroyed. Should clean up
2956 * any associated state. It must NOT call
2957 * GNUNET_CADET_channel_destroy() on the channel.
2958 *
2959 * The peer_disconnect function is part of a a virtual table set initially either
2960 * when a peer creates a new channel with us, or once we create
2961 * a new channel ourselves (evaluate).
2962 *
2963 * Once we know the exact type of operation (union/intersection), the vt is
2964 * replaced with an operation specific instance (_GSS_[op]_vt).
2965 *
2966 * @param channel_ctx place where local state associated
2967 * with the channel is stored
2968 * @param channel connection to the other end (henceforth invalid)
2969 */
2970static void
2971channel_end_cb (void *channel_ctx,
2972 const struct GNUNET_CADET_Channel *channel)
2973{
2974 struct Operation *op = channel_ctx;
2975
2976 op->channel = NULL;
2977 _GSS_operation_destroy2 (op);
2978}
2979
2980
2981/**
2982 * Function called whenever an MQ-channel's transmission window size changes.
2983 *
2984 * The first callback in an outgoing channel will be with a non-zero value
2985 * and will mean the channel is connected to the destination.
2986 *
2987 * For an incoming channel it will be called immediately after the
2988 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
2989 *
2990 * @param cls Channel closure.
2991 * @param channel Connection to the other end (henceforth invalid).
2992 * @param window_size New window size. If the is more messages than buffer size
2993 * this value will be negative..
2994 */
2995static void
2996channel_window_cb (void *cls,
2997 const struct GNUNET_CADET_Channel *channel,
2998 int window_size)
2999{
3000 /* FIXME: not implemented, we could do flow control here... */
3001}
3002
3003
3004/**
3005 * Called when a client wants to create a new listener.
3006 *
3007 * @param cls client that sent the message
3008 * @param msg message sent by the client
3009 */
3010static void
3011handle_client_listen (void *cls,
3012 const struct GNUNET_SETU_ListenMessage *msg)
3013{
3014 struct ClientState *cs = cls;
3015 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3016 GNUNET_MQ_hd_var_size (incoming_msg,
3017 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3018 struct OperationRequestMessage,
3019 NULL),
3020 GNUNET_MQ_hd_var_size (union_p2p_ibf,
3021 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
3022 struct IBFMessage,
3023 NULL),
3024 GNUNET_MQ_hd_var_size (union_p2p_elements,
3025 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
3026 struct GNUNET_SETU_ElementMessage,
3027 NULL),
3028 GNUNET_MQ_hd_var_size (union_p2p_offer,
3029 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
3030 struct GNUNET_MessageHeader,
3031 NULL),
3032 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3033 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
3034 struct InquiryMessage,
3035 NULL),
3036 GNUNET_MQ_hd_var_size (union_p2p_demand,
3037 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
3038 struct GNUNET_MessageHeader,
3039 NULL),
3040 GNUNET_MQ_hd_fixed_size (union_p2p_done,
3041 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
3042 struct GNUNET_MessageHeader,
3043 NULL),
3044 GNUNET_MQ_hd_fixed_size (union_p2p_over,
3045 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
3046 struct GNUNET_MessageHeader,
3047 NULL),
3048 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3049 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3050 struct GNUNET_MessageHeader,
3051 NULL),
3052 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3053 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3054 struct GNUNET_MessageHeader,
3055 NULL),
3056 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3057 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3058 struct StrataEstimatorMessage,
3059 NULL),
3060 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3061 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
3062 struct StrataEstimatorMessage,
3063 NULL),
3064 GNUNET_MQ_hd_var_size (union_p2p_full_element,
3065 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3066 struct GNUNET_SETU_ElementMessage,
3067 NULL),
3068 GNUNET_MQ_handler_end ()
3069 };
3070 struct Listener *listener;
3071
3072 if (NULL != cs->listener)
3073 {
3074 /* max. one active listener per client! */
3075 GNUNET_break (0);
3076 GNUNET_SERVICE_client_drop (cs->client);
3077 return;
3078 }
3079 listener = GNUNET_new (struct Listener);
3080 listener->cs = cs;
3081 cs->listener = listener;
3082 listener->app_id = msg->app_id;
3083 GNUNET_CONTAINER_DLL_insert (listener_head,
3084 listener_tail,
3085 listener);
3086 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3087 "New listener created (port %s)\n",
3088 GNUNET_h2s (&listener->app_id));
3089 listener->open_port = GNUNET_CADET_open_port (cadet,
3090 &msg->app_id,
3091 &channel_new_cb,
3092 listener,
3093 &channel_window_cb,
3094 &channel_end_cb,
3095 cadet_handlers);
3096 GNUNET_SERVICE_client_continue (cs->client);
3097}
3098
3099
3100/**
3101 * Called when the listening client rejects an operation
3102 * request by another peer.
3103 *
3104 * @param cls client that sent the message
3105 * @param msg message sent by the client
3106 */
3107static void
3108handle_client_reject (void *cls,
3109 const struct GNUNET_SETU_RejectMessage *msg)
3110{
3111 struct ClientState *cs = cls;
3112 struct Operation *op;
3113
3114 op = get_incoming (ntohl (msg->accept_reject_id));
3115 if (NULL == op)
3116 {
3117 /* no matching incoming operation for this reject;
3118 could be that the other peer already disconnected... */
3119 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3120 "Client rejected unknown operation %u\n",
3121 (unsigned int) ntohl (msg->accept_reject_id));
3122 GNUNET_SERVICE_client_continue (cs->client);
3123 return;
3124 }
3125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3126 "Peer request (app %s) rejected by client\n",
3127 GNUNET_h2s (&cs->listener->app_id));
3128 _GSS_operation_destroy2 (op);
3129 GNUNET_SERVICE_client_continue (cs->client);
3130}
3131
3132
3133/**
3134 * Called when a client wants to add or remove an element to a set it inhabits.
3135 *
3136 * @param cls client that sent the message
3137 * @param msg message sent by the client
3138 */
3139static int
3140check_client_set_add (void *cls,
3141 const struct GNUNET_SETU_ElementMessage *msg)
3142{
3143 /* NOTE: Technically, we should probably check with the
3144 block library whether the element we are given is well-formed */
3145 return GNUNET_OK;
3146}
3147
3148
3149/**
3150 * Called when a client wants to add or remove an element to a set it inhabits.
3151 *
3152 * @param cls client that sent the message
3153 * @param msg message sent by the client
3154 */
3155static void
3156handle_client_set_add (void *cls,
3157 const struct GNUNET_SETU_ElementMessage *msg)
3158{
3159 struct ClientState *cs = cls;
3160 struct Set *set;
3161 struct GNUNET_SETU_Element el;
3162 struct ElementEntry *ee;
3163 struct GNUNET_HashCode hash;
3164
3165 if (NULL == (set = cs->set))
3166 {
3167 /* client without a set requested an operation */
3168 GNUNET_break (0);
3169 GNUNET_SERVICE_client_drop (cs->client);
3170 return;
3171 }
3172 GNUNET_SERVICE_client_continue (cs->client);
3173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
3174 el.size = ntohs (msg->header.size) - sizeof(*msg);
3175 el.data = &msg[1];
3176 el.element_type = ntohs (msg->element_type);
3177 GNUNET_SETU_element_hash (&el,
3178 &hash);
3179 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
3180 &hash);
3181 if (NULL == ee)
3182 {
3183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3184 "Client inserts element %s of size %u\n",
3185 GNUNET_h2s (&hash),
3186 el.size);
3187 ee = GNUNET_malloc (el.size + sizeof(*ee));
3188 ee->element.size = el.size;
3189 GNUNET_memcpy (&ee[1], el.data, el.size);
3190 ee->element.data = &ee[1];
3191 ee->element.element_type = el.element_type;
3192 ee->remote = GNUNET_NO;
3193 ee->generation = set->current_generation;
3194 ee->element_hash = hash;
3195 GNUNET_break (GNUNET_YES ==
3196 GNUNET_CONTAINER_multihashmap_put (
3197 set->content->elements,
3198 &ee->element_hash,
3199 ee,
3200 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3201 }
3202 else
3203 {
3204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3205 "Client inserted element %s of size %u twice (ignored)\n",
3206 GNUNET_h2s (&hash),
3207 el.size);
3208 /* same element inserted twice */
3209 return;
3210 }
3211 strata_estimator_insert (set->se,
3212 get_ibf_key (&ee->element_hash));
3213}
3214
3215
3216/**
3217 * Advance the current generation of a set,
3218 * adding exclusion ranges if necessary.
3219 *
3220 * @param set the set where we want to advance the generation
3221 */
3222static void
3223advance_generation (struct Set *set)
3224{
3225 set->content->latest_generation++;
3226 set->current_generation++;
3227}
3228
3229
3230/**
3231 * Called when a client wants to initiate a set operation with another
3232 * peer. Initiates the CADET connection to the listener and sends the
3233 * request.
3234 *
3235 * @param cls client that sent the message
3236 * @param msg message sent by the client
3237 * @return #GNUNET_OK if the message is well-formed
3238 */
3239static int
3240check_client_evaluate (void *cls,
3241 const struct GNUNET_SETU_EvaluateMessage *msg)
3242{
3243 /* FIXME: suboptimal, even if the context below could be NULL,
3244 there are malformed messages this does not check for... */
3245 return GNUNET_OK;
3246}
3247
3248
3249/**
3250 * Called when a client wants to initiate a set operation with another
3251 * peer. Initiates the CADET connection to the listener and sends the
3252 * request.
3253 *
3254 * @param cls client that sent the message
3255 * @param msg message sent by the client
3256 */
3257static void
3258handle_client_evaluate (void *cls,
3259 const struct GNUNET_SETU_EvaluateMessage *msg)
3260{
3261 struct ClientState *cs = cls;
3262 struct Operation *op = GNUNET_new (struct Operation);
3263 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3264 GNUNET_MQ_hd_var_size (incoming_msg,
3265 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3266 struct OperationRequestMessage,
3267 op),
3268 GNUNET_MQ_hd_var_size (union_p2p_ibf,
3269 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
3270 struct IBFMessage,
3271 op),
3272 GNUNET_MQ_hd_var_size (union_p2p_elements,
3273 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
3274 struct GNUNET_SETU_ElementMessage,
3275 op),
3276 GNUNET_MQ_hd_var_size (union_p2p_offer,
3277 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
3278 struct GNUNET_MessageHeader,
3279 op),
3280 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3281 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
3282 struct InquiryMessage,
3283 op),
3284 GNUNET_MQ_hd_var_size (union_p2p_demand,
3285 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
3286 struct GNUNET_MessageHeader,
3287 op),
3288 GNUNET_MQ_hd_fixed_size (union_p2p_done,
3289 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
3290 struct GNUNET_MessageHeader,
3291 op),
3292 GNUNET_MQ_hd_fixed_size (union_p2p_over,
3293 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
3294 struct GNUNET_MessageHeader,
3295 op),
3296 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3297 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
3298 struct GNUNET_MessageHeader,
3299 op),
3300 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3301 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
3302 struct GNUNET_MessageHeader,
3303 op),
3304 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3305 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
3306 struct StrataEstimatorMessage,
3307 op),
3308 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3309 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
3310 struct StrataEstimatorMessage,
3311 op),
3312 GNUNET_MQ_hd_var_size (union_p2p_full_element,
3313 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
3314 struct GNUNET_SETU_ElementMessage,
3315 op),
3316 GNUNET_MQ_handler_end ()
3317 };
3318 struct Set *set;
3319 const struct GNUNET_MessageHeader *context;
3320
3321 if (NULL == (set = cs->set))
3322 {
3323 GNUNET_break (0);
3324 GNUNET_free (op);
3325 GNUNET_SERVICE_client_drop (cs->client);
3326 return;
3327 }
3328 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
3329 UINT32_MAX);
3330 op->peer = msg->target_peer;
3331 op->client_request_id = ntohl (msg->request_id);
3332 op->byzantine = msg->byzantine;
3333 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3334 op->force_full = msg->force_full;
3335 op->force_delta = msg->force_delta;
3336 op->symmetric = msg->symmetric;
3337 context = GNUNET_MQ_extract_nested_mh (msg);
3338
3339 /* Advance generation values, so that
3340 mutations won't interfer with the running operation. */
3341 op->set = set;
3342 op->generation_created = set->current_generation;
3343 advance_generation (set);
3344 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3345 set->ops_tail,
3346 op);
3347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3348 "Creating new CADET channel to port %s for set union\n",
3349 GNUNET_h2s (&msg->app_id));
3350 op->channel = GNUNET_CADET_channel_create (cadet,
3351 op,
3352 &msg->target_peer,
3353 &msg->app_id,
3354 &channel_window_cb,
3355 &channel_end_cb,
3356 cadet_handlers);
3357 op->mq = GNUNET_CADET_get_mq (op->channel);
3358 {
3359 struct GNUNET_MQ_Envelope *ev;
3360 struct OperationRequestMessage *msg;
3361
3362 ev = GNUNET_MQ_msg_nested_mh (msg,
3363 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
3364 context);
3365 if (NULL == ev)
3366 {
3367 /* the context message is too large */
3368 GNUNET_break (0);
3369 GNUNET_SERVICE_client_drop (cs->client);
3370 return;
3371 }
3372 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3373 GNUNET_NO);
3374 /* copy the current generation's strata estimator for this operation */
3375 op->se = strata_estimator_dup (op->set->se);
3376 /* we started the operation, thus we have to send the operation request */
3377 op->phase = PHASE_EXPECT_SE;
3378 op->salt_receive = op->salt_send = 42; // FIXME?????
3379 LOG (GNUNET_ERROR_TYPE_DEBUG,
3380 "Initiating union operation evaluation\n");
3381 GNUNET_STATISTICS_update (_GSS_statistics,
3382 "# of total union operations",
3383 1,
3384 GNUNET_NO);
3385 GNUNET_STATISTICS_update (_GSS_statistics,
3386 "# of initiated union operations",
3387 1,
3388 GNUNET_NO);
3389 GNUNET_MQ_send (op->mq,
3390 ev);
3391 if (NULL != context)
3392 LOG (GNUNET_ERROR_TYPE_DEBUG,
3393 "sent op request with context message\n");
3394 else
3395 LOG (GNUNET_ERROR_TYPE_DEBUG,
3396 "sent op request without context message\n");
3397 initialize_key_to_element (op);
3398 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3399 op->key_to_element);
3400
3401 }
3402 GNUNET_SERVICE_client_continue (cs->client);
3403}
3404
3405
3406/**
3407 * Handle a request from the client to cancel a running set operation.
3408 *
3409 * @param cls the client
3410 * @param msg the message
3411 */
3412static void
3413handle_client_cancel (void *cls,
3414 const struct GNUNET_SETU_CancelMessage *msg)
3415{
3416 struct ClientState *cs = cls;
3417 struct Set *set;
3418 struct Operation *op;
3419 int found;
3420
3421 if (NULL == (set = cs->set))
3422 {
3423 /* client without a set requested an operation */
3424 GNUNET_break (0);
3425 GNUNET_SERVICE_client_drop (cs->client);
3426 return;
3427 }
3428 found = GNUNET_NO;
3429 for (op = set->ops_head; NULL != op; op = op->next)
3430 {
3431 if (op->client_request_id == ntohl (msg->request_id))
3432 {
3433 found = GNUNET_YES;
3434 break;
3435 }
3436 }
3437 if (GNUNET_NO == found)
3438 {
3439 /* It may happen that the operation was already destroyed due to
3440 * the other peer disconnecting. The client may not know about this
3441 * yet and try to cancel the (just barely non-existent) operation.
3442 * So this is not a hard error.
3443 *///
3444 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3445 "Client canceled non-existent op %u\n",
3446 (uint32_t) ntohl (msg->request_id));
3447 }
3448 else
3449 {
3450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3451 "Client requested cancel for op %u\n",
3452 (uint32_t) ntohl (msg->request_id));
3453 _GSS_operation_destroy (op);
3454 }
3455 GNUNET_SERVICE_client_continue (cs->client);
3456}
3457
3458
3459/**
3460 * Handle a request from the client to accept a set operation that
3461 * came from a remote peer. We forward the accept to the associated
3462 * operation for handling
3463 *
3464 * @param cls the client
3465 * @param msg the message
3466 */
3467static void
3468handle_client_accept (void *cls,
3469 const struct GNUNET_SETU_AcceptMessage *msg)
3470{
3471 struct ClientState *cs = cls;
3472 struct Set *set;
3473 struct Operation *op;
3474 struct GNUNET_SETU_ResultMessage *result_message;
3475 struct GNUNET_MQ_Envelope *ev;
3476 struct Listener *listener;
3477
3478 if (NULL == (set = cs->set))
3479 {
3480 /* client without a set requested to accept */
3481 GNUNET_break (0);
3482 GNUNET_SERVICE_client_drop (cs->client);
3483 return;
3484 }
3485 op = get_incoming (ntohl (msg->accept_reject_id));
3486 if (NULL == op)
3487 {
3488 /* It is not an error if the set op does not exist -- it may
3489 * have been destroyed when the partner peer disconnected. */
3490 GNUNET_log (
3491 GNUNET_ERROR_TYPE_INFO,
3492 "Client %p accepted request %u of listener %p that is no longer active\n",
3493 cs,
3494 ntohl (msg->accept_reject_id),
3495 cs->listener);
3496 ev = GNUNET_MQ_msg (result_message,
3497 GNUNET_MESSAGE_TYPE_SETU_RESULT);
3498 result_message->request_id = msg->request_id;
3499 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
3500 GNUNET_MQ_send (set->cs->mq, ev);
3501 GNUNET_SERVICE_client_continue (cs->client);
3502 return;
3503 }
3504 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3505 "Client accepting request %u\n",
3506 (uint32_t) ntohl (msg->accept_reject_id));
3507 listener = op->listener;
3508 op->listener = NULL;
3509 GNUNET_CONTAINER_DLL_remove (listener->op_head,
3510 listener->op_tail,
3511 op);
3512 op->set = set;
3513 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3514 set->ops_tail,
3515 op);
3516 op->client_request_id = ntohl (msg->request_id);
3517 op->byzantine = msg->byzantine;
3518 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3519 op->force_full = msg->force_full;
3520 op->force_delta = msg->force_delta;
3521 op->symmetric = msg->symmetric;
3522
3523 /* Advance generation values, so that future mutations do not
3524 interfer with the running operation. */
3525 op->generation_created = set->current_generation;
3526 advance_generation (set);
3527 GNUNET_assert (NULL == op->se);
3528
3529 LOG (GNUNET_ERROR_TYPE_DEBUG,
3530 "accepting set union operation\n");
3531 GNUNET_STATISTICS_update (_GSS_statistics,
3532 "# of accepted union operations",
3533 1,
3534 GNUNET_NO);
3535 GNUNET_STATISTICS_update (_GSS_statistics,
3536 "# of total union operations",
3537 1,
3538 GNUNET_NO);
3539 {
3540 const struct StrataEstimator *se;
3541 struct GNUNET_MQ_Envelope *ev;
3542 struct StrataEstimatorMessage *strata_msg;
3543 char *buf;
3544 size_t len;
3545 uint16_t type;
3546
3547 op->se = strata_estimator_dup (op->set->se);
3548 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3549 GNUNET_NO);
3550 op->salt_receive = op->salt_send = 42; // FIXME?????
3551 initialize_key_to_element (op);
3552 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3553 op->key_to_element);
3554
3555 /* kick off the operation */
3556 se = op->se;
3557 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3558 len = strata_estimator_write (se,
3559 buf);
3560 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
3561 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
3562 else
3563 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
3564 ev = GNUNET_MQ_msg_extra (strata_msg,
3565 len,
3566 type);
3567 GNUNET_memcpy (&strata_msg[1],
3568 buf,
3569 len);
3570 GNUNET_free (buf);
3571 strata_msg->set_size
3572 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3573 op->set->content->elements));
3574 GNUNET_MQ_send (op->mq,
3575 ev);
3576 op->phase = PHASE_EXPECT_IBF;
3577 }
3578 /* Now allow CADET to continue, as we did not do this in
3579 #handle_incoming_msg (as we wanted to first see if the
3580 local client would accept the request). */
3581 GNUNET_CADET_receive_done (op->channel);
3582 GNUNET_SERVICE_client_continue (cs->client);
3583}
3584
3585
3586/**
3587 * Called to clean up, after a shutdown has been requested.
3588 *
3589 * @param cls closure, NULL
3590 */
3591static void
3592shutdown_task (void *cls)
3593{
3594 /* Delay actual shutdown to allow service to disconnect clients */
3595 in_shutdown = GNUNET_YES;
3596 if (0 == num_clients)
3597 {
3598 if (NULL != cadet)
3599 {
3600 GNUNET_CADET_disconnect (cadet);
3601 cadet = NULL;
3602 }
3603 }
3604 GNUNET_STATISTICS_destroy (_GSS_statistics,
3605 GNUNET_YES);
3606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3607 "handled shutdown request\n");
3608}
3609
3610
3611/**
3612 * Function called by the service's run
3613 * method to run service-specific setup code.
3614 *
3615 * @param cls closure
3616 * @param cfg configuration to use
3617 * @param service the initialized service
3618 */
3619static void
3620run (void *cls,
3621 const struct GNUNET_CONFIGURATION_Handle *cfg,
3622 struct GNUNET_SERVICE_Handle *service)
3623{
3624 /* FIXME: need to modify SERVICE (!) API to allow
3625 us to run a shutdown task *after* clients were
3626 forcefully disconnected! */
3627 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3628 NULL);
3629 _GSS_statistics = GNUNET_STATISTICS_create ("setu",
3630 cfg);
3631 cadet = GNUNET_CADET_connect (cfg);
3632 if (NULL == cadet)
3633 {
3634 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3635 _ ("Could not connect to CADET service\n"));
3636 GNUNET_SCHEDULER_shutdown ();
3637 return;
3638 }
3639}
3640
3641
3642/**
3643 * Define "main" method using service macro.
3644 */
3645GNUNET_SERVICE_MAIN (
3646 "set",
3647 GNUNET_SERVICE_OPTION_NONE,
3648 &run,
3649 &client_connect_cb,
3650 &client_disconnect_cb,
3651 NULL,
3652 GNUNET_MQ_hd_fixed_size (client_accept,
3653 GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
3654 struct GNUNET_SETU_AcceptMessage,
3655 NULL),
3656 GNUNET_MQ_hd_var_size (client_set_add,
3657 GNUNET_MESSAGE_TYPE_SETU_ADD,
3658 struct GNUNET_SETU_ElementMessage,
3659 NULL),
3660 GNUNET_MQ_hd_fixed_size (client_create_set,
3661 GNUNET_MESSAGE_TYPE_SETU_CREATE,
3662 struct GNUNET_SETU_CreateMessage,
3663 NULL),
3664 GNUNET_MQ_hd_var_size (client_evaluate,
3665 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
3666 struct GNUNET_SETU_EvaluateMessage,
3667 NULL),
3668 GNUNET_MQ_hd_fixed_size (client_listen,
3669 GNUNET_MESSAGE_TYPE_SETU_LISTEN,
3670 struct GNUNET_SETU_ListenMessage,
3671 NULL),
3672 GNUNET_MQ_hd_fixed_size (client_reject,
3673 GNUNET_MESSAGE_TYPE_SETU_REJECT,
3674 struct GNUNET_SETU_RejectMessage,
3675 NULL),
3676 GNUNET_MQ_hd_fixed_size (client_cancel,
3677 GNUNET_MESSAGE_TYPE_SETU_CANCEL,
3678 struct GNUNET_SETU_CancelMessage,
3679 NULL),
3680 GNUNET_MQ_handler_end ());
3681
3682
3683/* end of gnunet-service-setu.c */