aboutsummaryrefslogtreecommitdiff
path: root/src/setu/gnunet-service-setu.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2020-08-16 20:46:39 +0200
committerChristian Grothoff <christian@grothoff.org>2020-08-16 20:46:39 +0200
commitbe0475f2a583d203465d3091ff933806a5ace613 (patch)
treecbb1ad1a27cd91a6ff1ccb8025595cbe4c15972a /src/setu/gnunet-service-setu.c
parentf1f40feb2beb5c036da0e2b93c433b09b920e0b4 (diff)
downloadgnunet-be0475f2a583d203465d3091ff933806a5ace613.tar.gz
gnunet-be0475f2a583d203465d3091ff933806a5ace613.zip
split of set union from set service (preliminary)
Diffstat (limited to 'src/setu/gnunet-service-setu.c')
-rw-r--r--src/setu/gnunet-service-setu.c3482
1 files changed, 3482 insertions, 0 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
new file mode 100644
index 000000000..88edc622f
--- /dev/null
+++ b/src/setu/gnunet-service-setu.c
@@ -0,0 +1,3482 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file setu/gnunet-service-setu.c
22 * @brief set union operation
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h"
29#include "gnunet-service-setu.h"
30#include "ibf.h"
31#include "gnunet-service-setu_strata_estimator.h"
32#include "gnunet-service-setu_protocol.h"
33#include "gnunet_statistics_service.h"
34#include <gcrypt.h>
35
36
37#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
38
39/**
40 * How long do we hold on to an incoming channel if there is
41 * no local listener before giving up?
42 */
43#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
44
45/**
46 * Number of IBFs in a strata estimator.
47 */
48#define SE_STRATA_COUNT 32
49
50/**
51 * Size of the IBFs in the strata estimator.
52 */
53#define SE_IBF_SIZE 80
54
55/**
56 * The hash num parameter for the difference digests and strata estimators.
57 */
58#define SE_IBF_HASH_NUM 4
59
60/**
61 * Number of buckets that can be transmitted in one message.
62 */
63#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
64
65/**
66 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
67 * Choose this value so that computing the IBF is still cheaper
68 * than transmitting all values.
69 */
70#define MAX_IBF_ORDER (20)
71
72/**
73 * Number of buckets used in the ibf per estimated
74 * difference.
75 */
76#define IBF_ALPHA 4
77
78
79/**
80 * Current phase we are in for a union operation.
81 */
82enum UnionOperationPhase
83{
84 /**
85 * We sent the request message, and expect a strata estimator.
86 */
87 PHASE_EXPECT_SE,
88
89 /**
90 * We sent the strata estimator, and expect an IBF. This phase is entered once
91 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
92 *
93 * XXX: could use better wording.
94 * XXX: repurposed to also expect a "request full set" message, should be renamed
95 *
96 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
97 */
98 PHASE_EXPECT_IBF,
99
100 /**
101 * Continuation for multi part IBFs.
102 */
103 PHASE_EXPECT_IBF_CONT,
104
105 /**
106 * We are decoding an IBF.
107 */
108 PHASE_INVENTORY_ACTIVE,
109
110 /**
111 * The other peer is decoding the IBF we just sent.
112 */
113 PHASE_INVENTORY_PASSIVE,
114
115 /**
116 * The protocol is almost finished, but we still have to flush our message
117 * queue and/or expect some elements.
118 */
119 PHASE_FINISH_CLOSING,
120
121 /**
122 * In the penultimate phase,
123 * we wait until all our demands
124 * are satisfied. Then we send a done
125 * message, and wait for another done message.
126 */
127 PHASE_FINISH_WAITING,
128
129 /**
130 * In the ultimate phase, we wait until
131 * our demands are satisfied and then
132 * quit (sending another DONE message).
133 */
134 PHASE_DONE,
135
136 /**
137 * After sending the full set, wait for responses with the elements
138 * that the local peer is missing.
139 */
140 PHASE_FULL_SENDING,
141};
142
143
144/**
145 * State of an evaluate operation with another peer.
146 */
147struct OperationState
148{
149 /**
150 * Copy of the set's strata estimator at the time of
151 * creation of this operation.
152 */
153 struct StrataEstimator *se;
154
155 /**
156 * The IBF we currently receive.
157 */
158 struct InvertibleBloomFilter *remote_ibf;
159
160 /**
161 * The IBF with the local set's element.
162 */
163 struct InvertibleBloomFilter *local_ibf;
164
165 /**
166 * Maps unsalted IBF-Keys to elements.
167 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
168 * Colliding IBF-Keys are linked.
169 */
170 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
171
172 /**
173 * Current state of the operation.
174 */
175 enum UnionOperationPhase phase;
176
177 /**
178 * Did we send the client that we are done?
179 */
180 int client_done_sent;
181
182 /**
183 * Number of ibf buckets already received into the @a remote_ibf.
184 */
185 unsigned int ibf_buckets_received;
186
187 /**
188 * Hashes for elements that we have demanded from the other peer.
189 */
190 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
191
192 /**
193 * Salt that we're using for sending IBFs
194 */
195 uint32_t salt_send;
196
197 /**
198 * Salt for the IBF we've received and that we're currently decoding.
199 */
200 uint32_t salt_receive;
201
202 /**
203 * Number of elements we received from the other peer
204 * that were not in the local set yet.
205 */
206 uint32_t received_fresh;
207
208 /**
209 * Total number of elements received from the other peer.
210 */
211 uint32_t received_total;
212
213 /**
214 * Initial size of our set, just before
215 * the operation started.
216 */
217 uint64_t initial_size;
218};
219
220
221/**
222 * The key entry is used to associate an ibf key with an element.
223 */
224struct KeyEntry
225{
226 /**
227 * IBF key for the entry, derived from the current salt.
228 */
229 struct IBF_Key ibf_key;
230
231 /**
232 * The actual element associated with the key.
233 *
234 * Only owned by the union operation if element->operation
235 * is #GNUNET_YES.
236 */
237 struct ElementEntry *element;
238
239 /**
240 * Did we receive this element?
241 * Even if element->is_foreign is false, we might
242 * have received the element, so this indicates that
243 * the other peer has it.
244 */
245 int received;
246};
247
248
249/**
250 * Used as a closure for sending elements
251 * with a specific IBF key.
252 */
253struct SendElementClosure
254{
255 /**
256 * The IBF key whose matching elements should be
257 * sent.
258 */
259 struct IBF_Key ibf_key;
260
261 /**
262 * Operation for which the elements
263 * should be sent.
264 */
265 struct Operation *op;
266};
267
268
269/**
270 * Extra state required for efficient set union.
271 */
272struct SetState
273{
274 /**
275 * The strata estimator is only generated once for
276 * each set.
277 * The IBF keys are derived from the element hashes with
278 * salt=0.
279 */
280 struct StrataEstimator *se;
281};
282
283
284/**
285 * A listener is inhabited by a client, and waits for evaluation
286 * requests from remote peers.
287 */
288struct Listener
289{
290 /**
291 * Listeners are held in a doubly linked list.
292 */
293 struct Listener *next;
294
295 /**
296 * Listeners are held in a doubly linked list.
297 */
298 struct Listener *prev;
299
300 /**
301 * Head of DLL of operations this listener is responsible for.
302 * Once the client has accepted/declined the operation, the
303 * operation is moved to the respective set's operation DLLS.
304 */
305 struct Operation *op_head;
306
307 /**
308 * Tail of DLL of operations this listener is responsible for.
309 * Once the client has accepted/declined the operation, the
310 * operation is moved to the respective set's operation DLLS.
311 */
312 struct Operation *op_tail;
313
314 /**
315 * Client that owns the listener.
316 * Only one client may own a listener.
317 */
318 struct ClientState *cs;
319
320 /**
321 * The port we are listening on with CADET.
322 */
323 struct GNUNET_CADET_Port *open_port;
324
325 /**
326 * Application ID for the operation, used to distinguish
327 * multiple operations of the same type with the same peer.
328 */
329 struct GNUNET_HashCode app_id;
330
331};
332
333
334/**
335 * Handle to the cadet service, used to listen for and connect to
336 * remote peers.
337 */
338static struct GNUNET_CADET_Handle *cadet;
339
340/**
341 * Statistics handle.
342 */
343struct GNUNET_STATISTICS_Handle *_GSS_statistics;
344
345/**
346 * Listeners are held in a doubly linked list.
347 */
348static struct Listener *listener_head;
349
350/**
351 * Listeners are held in a doubly linked list.
352 */
353static struct Listener *listener_tail;
354
355/**
356 * Number of active clients.
357 */
358static unsigned int num_clients;
359
360/**
361 * Are we in shutdown? if #GNUNET_YES and the number of clients
362 * drops to zero, disconnect from CADET.
363 */
364static int in_shutdown;
365
366/**
367 * Counter for allocating unique IDs for clients, used to identify
368 * incoming operation requests from remote peers, that the client can
369 * choose to accept or refuse. 0 must not be used (reserved for
370 * uninitialized).
371 */
372static uint32_t suggest_id;
373
374
375/**
376 * Iterator over hash map entries, called to
377 * destroy the linked list of colliding ibf key entries.
378 *
379 * @param cls closure
380 * @param key current key code
381 * @param value value in the hash map
382 * @return #GNUNET_YES if we should continue to iterate,
383 * #GNUNET_NO if not.
384 */
385static int
386destroy_key_to_element_iter (void *cls,
387 uint32_t key,
388 void *value)
389{
390 struct KeyEntry *k = value;
391
392 GNUNET_assert (NULL != k);
393 if (GNUNET_YES == k->element->remote)
394 {
395 GNUNET_free (k->element);
396 k->element = NULL;
397 }
398 GNUNET_free (k);
399 return GNUNET_YES;
400}
401
402
403/**
404 * Destroy the union operation. Only things specific to the union
405 * operation are destroyed.
406 *
407 * @param op union operation to destroy
408 */
409static void
410union_op_cancel (struct Operation *op)
411{
412 LOG (GNUNET_ERROR_TYPE_DEBUG,
413 "destroying union op\n");
414 /* check if the op was canceled twice */
415 GNUNET_assert (NULL != op->state);
416 if (NULL != op->state->remote_ibf)
417 {
418 ibf_destroy (op->state->remote_ibf);
419 op->state->remote_ibf = NULL;
420 }
421 if (NULL != op->state->demanded_hashes)
422 {
423 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
424 op->state->demanded_hashes = NULL;
425 }
426 if (NULL != op->state->local_ibf)
427 {
428 ibf_destroy (op->state->local_ibf);
429 op->state->local_ibf = NULL;
430 }
431 if (NULL != op->state->se)
432 {
433 strata_estimator_destroy (op->state->se);
434 op->state->se = NULL;
435 }
436 if (NULL != op->state->key_to_element)
437 {
438 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
439 &destroy_key_to_element_iter,
440 NULL);
441 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
442 op->state->key_to_element = NULL;
443 }
444 GNUNET_free (op->state);
445 op->state = NULL;
446 LOG (GNUNET_ERROR_TYPE_DEBUG,
447 "destroying union op done\n");
448}
449
450
451/**
452 * Inform the client that the union operation has failed,
453 * and proceed to destroy the evaluate operation.
454 *
455 * @param op the union operation to fail
456 */
457static void
458fail_union_operation (struct Operation *op)
459{
460 struct GNUNET_MQ_Envelope *ev;
461 struct GNUNET_SETU_ResultMessage *msg;
462
463 LOG (GNUNET_ERROR_TYPE_WARNING,
464 "union operation failed\n");
465 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
466 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
467 msg->request_id = htonl (op->client_request_id);
468 msg->element_type = htons (0);
469 GNUNET_MQ_send (op->set->cs->mq,
470 ev);
471 _GSS_operation_destroy (op);
472}
473
474
475/**
476 * Derive the IBF key from a hash code and
477 * a salt.
478 *
479 * @param src the hash code
480 * @return the derived IBF key
481 */
482static struct IBF_Key
483get_ibf_key (const struct GNUNET_HashCode *src)
484{
485 struct IBF_Key key;
486 uint16_t salt = 0;
487
488 GNUNET_assert (GNUNET_OK ==
489 GNUNET_CRYPTO_kdf (&key, sizeof(key),
490 src, sizeof *src,
491 &salt, sizeof(salt),
492 NULL, 0));
493 return key;
494}
495
496
497/**
498 * Context for #op_get_element_iterator
499 */
500struct GetElementContext
501{
502 /**
503 * FIXME.
504 */
505 struct GNUNET_HashCode hash;
506
507 /**
508 * FIXME.
509 */
510 struct KeyEntry *k;
511};
512
513
514/**
515 * Iterator over the mapping from IBF keys to element entries. Checks if we
516 * have an element with a given GNUNET_HashCode.
517 *
518 * @param cls closure
519 * @param key current key code
520 * @param value value in the hash map
521 * @return #GNUNET_YES if we should search further,
522 * #GNUNET_NO if we've found the element.
523 */
524static int
525op_get_element_iterator (void *cls,
526 uint32_t key,
527 void *value)
528{
529 struct GetElementContext *ctx = cls;
530 struct KeyEntry *k = value;
531
532 GNUNET_assert (NULL != k);
533 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
534 &ctx->hash))
535 {
536 ctx->k = k;
537 return GNUNET_NO;
538 }
539 return GNUNET_YES;
540}
541
542
543/**
544 * Determine whether the given element is already in the operation's element
545 * set.
546 *
547 * @param op operation that should be tested for 'element_hash'
548 * @param element_hash hash of the element to look for
549 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
550 */
551static struct KeyEntry *
552op_get_element (struct Operation *op,
553 const struct GNUNET_HashCode *element_hash)
554{
555 int ret;
556 struct IBF_Key ibf_key;
557 struct GetElementContext ctx = { { { 0 } }, 0 };
558
559 ctx.hash = *element_hash;
560
561 ibf_key = get_ibf_key (element_hash);
562 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
563 (uint32_t) ibf_key.key_val,
564 op_get_element_iterator,
565 &ctx);
566
567 /* was the iteration aborted because we found the element? */
568 if (GNUNET_SYSERR == ret)
569 {
570 GNUNET_assert (NULL != ctx.k);
571 return ctx.k;
572 }
573 return NULL;
574}
575
576
577/**
578 * Insert an element into the union operation's
579 * key-to-element mapping. Takes ownership of 'ee'.
580 * Note that this does not insert the element in the set,
581 * only in the operation's key-element mapping.
582 * This is done to speed up re-tried operations, if some elements
583 * were transmitted, and then the IBF fails to decode.
584 *
585 * XXX: clarify ownership, doesn't sound right.
586 *
587 * @param op the union operation
588 * @param ee the element entry
589 * @parem received was this element received from the remote peer?
590 */
591static void
592op_register_element (struct Operation *op,
593 struct ElementEntry *ee,
594 int received)
595{
596 struct IBF_Key ibf_key;
597 struct KeyEntry *k;
598
599 ibf_key = get_ibf_key (&ee->element_hash);
600 k = GNUNET_new (struct KeyEntry);
601 k->element = ee;
602 k->ibf_key = ibf_key;
603 k->received = received;
604 GNUNET_assert (GNUNET_OK ==
605 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
606 (uint32_t) ibf_key.key_val,
607 k,
608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
609}
610
611
612/**
613 * FIXME.
614 */
615static void
616salt_key (const struct IBF_Key *k_in,
617 uint32_t salt,
618 struct IBF_Key *k_out)
619{
620 int s = salt % 64;
621 uint64_t x = k_in->key_val;
622
623 /* rotate ibf key */
624 x = (x >> s) | (x << (64 - s));
625 k_out->key_val = x;
626}
627
628
629/**
630 * FIXME.
631 */
632static void
633unsalt_key (const struct IBF_Key *k_in,
634 uint32_t salt,
635 struct IBF_Key *k_out)
636{
637 int s = salt % 64;
638 uint64_t x = k_in->key_val;
639
640 x = (x << s) | (x >> (64 - s));
641 k_out->key_val = x;
642}
643
644
645/**
646 * Insert a key into an ibf.
647 *
648 * @param cls the ibf
649 * @param key unused
650 * @param value the key entry to get the key from
651 */
652static int
653prepare_ibf_iterator (void *cls,
654 uint32_t key,
655 void *value)
656{
657 struct Operation *op = cls;
658 struct KeyEntry *ke = value;
659 struct IBF_Key salted_key;
660
661 LOG (GNUNET_ERROR_TYPE_DEBUG,
662 "[OP %x] inserting %lx (hash %s) into ibf\n",
663 (void *) op,
664 (unsigned long) ke->ibf_key.key_val,
665 GNUNET_h2s (&ke->element->element_hash));
666 salt_key (&ke->ibf_key,
667 op->state->salt_send,
668 &salted_key);
669 ibf_insert (op->state->local_ibf, salted_key);
670 return GNUNET_YES;
671}
672
673
674/**
675 * Iterator for initializing the
676 * key-to-element mapping of a union operation
677 *
678 * @param cls the union operation `struct Operation *`
679 * @param key unused
680 * @param value the `struct ElementEntry *` to insert
681 * into the key-to-element mapping
682 * @return #GNUNET_YES (to continue iterating)
683 */
684static int
685init_key_to_element_iterator (void *cls,
686 const struct GNUNET_HashCode *key,
687 void *value)
688{
689 struct Operation *op = cls;
690 struct ElementEntry *ee = value;
691
692 /* make sure that the element belongs to the set at the time
693 * of creating the operation */
694 if (GNUNET_NO ==
695 _GSS_is_element_of_operation (ee,
696 op))
697 return GNUNET_YES;
698 GNUNET_assert (GNUNET_NO == ee->remote);
699 op_register_element (op,
700 ee,
701 GNUNET_NO);
702 return GNUNET_YES;
703}
704
705
706/**
707 * Initialize the IBF key to element mapping local to this set
708 * operation.
709 *
710 * @param op the set union operation
711 */
712static void
713initialize_key_to_element (struct Operation *op)
714{
715 unsigned int len;
716
717 GNUNET_assert (NULL == op->state->key_to_element);
718 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
719 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
720 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
721 &init_key_to_element_iterator,
722 op);
723}
724
725
726/**
727 * Create an ibf with the operation's elements
728 * of the specified size
729 *
730 * @param op the union operation
731 * @param size size of the ibf to create
732 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
733 */
734static int
735prepare_ibf (struct Operation *op,
736 uint32_t size)
737{
738 GNUNET_assert (NULL != op->state->key_to_element);
739
740 if (NULL != op->state->local_ibf)
741 ibf_destroy (op->state->local_ibf);
742 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
743 if (NULL == op->state->local_ibf)
744 {
745 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
746 "Failed to allocate local IBF\n");
747 return GNUNET_SYSERR;
748 }
749 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
750 &prepare_ibf_iterator,
751 op);
752 return GNUNET_OK;
753}
754
755
756/**
757 * Send an ibf of appropriate size.
758 *
759 * Fragments the IBF into multiple messages if necessary.
760 *
761 * @param op the union operation
762 * @param ibf_order order of the ibf to send, size=2^order
763 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
764 */
765static int
766send_ibf (struct Operation *op,
767 uint16_t ibf_order)
768{
769 unsigned int buckets_sent = 0;
770 struct InvertibleBloomFilter *ibf;
771
772 if (GNUNET_OK !=
773 prepare_ibf (op, 1 << ibf_order))
774 {
775 /* allocation failed */
776 return GNUNET_SYSERR;
777 }
778
779 LOG (GNUNET_ERROR_TYPE_DEBUG,
780 "sending ibf of size %u\n",
781 1 << ibf_order);
782
783 {
784 char name[64] = { 0 };
785 snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
786 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
787 }
788
789 ibf = op->state->local_ibf;
790
791 while (buckets_sent < (1 << ibf_order))
792 {
793 unsigned int buckets_in_message;
794 struct GNUNET_MQ_Envelope *ev;
795 struct IBFMessage *msg;
796
797 buckets_in_message = (1 << ibf_order) - buckets_sent;
798 /* limit to maximum */
799 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
800 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
801
802 ev = GNUNET_MQ_msg_extra (msg,
803 buckets_in_message * IBF_BUCKET_SIZE,
804 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
805 msg->reserved1 = 0;
806 msg->reserved2 = 0;
807 msg->order = ibf_order;
808 msg->offset = htonl (buckets_sent);
809 msg->salt = htonl (op->state->salt_send);
810 ibf_write_slice (ibf, buckets_sent,
811 buckets_in_message, &msg[1]);
812 buckets_sent += buckets_in_message;
813 LOG (GNUNET_ERROR_TYPE_DEBUG,
814 "ibf chunk size %u, %u/%u sent\n",
815 buckets_in_message,
816 buckets_sent,
817 1 << ibf_order);
818 GNUNET_MQ_send (op->mq, ev);
819 }
820
821 /* The other peer must decode the IBF, so
822 * we're passive. */
823 op->state->phase = PHASE_INVENTORY_PASSIVE;
824 return GNUNET_OK;
825}
826
827
828/**
829 * Compute the necessary order of an ibf
830 * from the size of the symmetric set difference.
831 *
832 * @param diff the difference
833 * @return the required size of the ibf
834 */
835static unsigned int
836get_order_from_difference (unsigned int diff)
837{
838 unsigned int ibf_order;
839
840 ibf_order = 2;
841 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
842 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
843 (ibf_order < MAX_IBF_ORDER))
844 ibf_order++;
845 // add one for correction
846 return ibf_order + 1;
847}
848
849
850/**
851 * Send a set element.
852 *
853 * @param cls the union operation `struct Operation *`
854 * @param key unused
855 * @param value the `struct ElementEntry *` to insert
856 * into the key-to-element mapping
857 * @return #GNUNET_YES (to continue iterating)
858 */
859static int
860send_full_element_iterator (void *cls,
861 const struct GNUNET_HashCode *key,
862 void *value)
863{
864 struct Operation *op = cls;
865 struct GNUNET_SETU_ElementMessage *emsg;
866 struct ElementEntry *ee = value;
867 struct GNUNET_SETU_Element *el = &ee->element;
868 struct GNUNET_MQ_Envelope *ev;
869
870 LOG (GNUNET_ERROR_TYPE_DEBUG,
871 "Sending element %s\n",
872 GNUNET_h2s (key));
873 ev = GNUNET_MQ_msg_extra (emsg,
874 el->size,
875 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
876 emsg->element_type = htons (el->element_type);
877 GNUNET_memcpy (&emsg[1],
878 el->data,
879 el->size);
880 GNUNET_MQ_send (op->mq,
881 ev);
882 return GNUNET_YES;
883}
884
885
886/**
887 * Switch to full set transmission for @a op.
888 *
889 * @param op operation to switch to full set transmission.
890 */
891static void
892send_full_set (struct Operation *op)
893{
894 struct GNUNET_MQ_Envelope *ev;
895
896 op->state->phase = PHASE_FULL_SENDING;
897 LOG (GNUNET_ERROR_TYPE_DEBUG,
898 "Dedicing to transmit the full set\n");
899 /* FIXME: use a more memory-friendly way of doing this with an
900 iterator, just as we do in the non-full case! */
901 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
902 &send_full_element_iterator,
903 op);
904 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
905 GNUNET_MQ_send (op->mq,
906 ev);
907}
908
909
910/**
911 * Handle a strata estimator from a remote peer
912 *
913 * @param cls the union operation
914 * @param msg the message
915 */
916static int
917check_union_p2p_strata_estimator (void *cls,
918 const struct StrataEstimatorMessage *msg)
919{
920 struct Operation *op = cls;
921 int is_compressed;
922 size_t len;
923
924 if (op->state->phase != PHASE_EXPECT_SE)
925 {
926 GNUNET_break (0);
927 return GNUNET_SYSERR;
928 }
929 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
930 msg->header.type));
931 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
932 if ((GNUNET_NO == is_compressed) &&
933 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
934 {
935 GNUNET_break (0);
936 return GNUNET_SYSERR;
937 }
938 return GNUNET_OK;
939}
940
941
942/**
943 * Handle a strata estimator from a remote peer
944 *
945 * @param cls the union operation
946 * @param msg the message
947 */
948static void
949handle_union_p2p_strata_estimator (void *cls,
950 const struct StrataEstimatorMessage *msg)
951{
952 struct Operation *op = cls;
953 struct StrataEstimator *remote_se;
954 unsigned int diff;
955 uint64_t other_size;
956 size_t len;
957 int is_compressed;
958
959 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
960 msg->header.type));
961 GNUNET_STATISTICS_update (_GSS_statistics,
962 "# bytes of SE received",
963 ntohs (msg->header.size),
964 GNUNET_NO);
965 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
966 other_size = GNUNET_ntohll (msg->set_size);
967 remote_se = strata_estimator_create (SE_STRATA_COUNT,
968 SE_IBF_SIZE,
969 SE_IBF_HASH_NUM);
970 if (NULL == remote_se)
971 {
972 /* insufficient resources, fail */
973 fail_union_operation (op);
974 return;
975 }
976 if (GNUNET_OK !=
977 strata_estimator_read (&msg[1],
978 len,
979 is_compressed,
980 remote_se))
981 {
982 /* decompression failed */
983 strata_estimator_destroy (remote_se);
984 fail_union_operation (op);
985 return;
986 }
987 GNUNET_assert (NULL != op->state->se);
988 diff = strata_estimator_difference (remote_se,
989 op->state->se);
990
991 if (diff > 200)
992 diff = diff * 3 / 2;
993
994 strata_estimator_destroy (remote_se);
995 strata_estimator_destroy (op->state->se);
996 op->state->se = NULL;
997 LOG (GNUNET_ERROR_TYPE_DEBUG,
998 "got se diff=%d, using ibf size %d\n",
999 diff,
1000 1U << get_order_from_difference (diff));
1001
1002 {
1003 char *set_debug;
1004
1005 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
1006 if ((NULL != set_debug) &&
1007 (0 == strcmp (set_debug, "1")))
1008 {
1009 FILE *f = fopen ("set.log", "a");
1010 fprintf (f, "%llu\n", (unsigned long long) diff);
1011 fclose (f);
1012 }
1013 }
1014
1015 if ((GNUNET_YES == op->byzantine) &&
1016 (other_size < op->byzantine_lower_bound))
1017 {
1018 GNUNET_break (0);
1019 fail_union_operation (op);
1020 return;
1021 }
1022
1023 if ((GNUNET_YES == op->force_full) ||
1024 (diff > op->state->initial_size / 4) ||
1025 (0 == other_size))
1026 {
1027 LOG (GNUNET_ERROR_TYPE_DEBUG,
1028 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
1029 diff,
1030 op->state->initial_size);
1031 GNUNET_STATISTICS_update (_GSS_statistics,
1032 "# of full sends",
1033 1,
1034 GNUNET_NO);
1035 if ((op->state->initial_size <= other_size) ||
1036 (0 == other_size))
1037 {
1038 send_full_set (op);
1039 }
1040 else
1041 {
1042 struct GNUNET_MQ_Envelope *ev;
1043
1044 LOG (GNUNET_ERROR_TYPE_DEBUG,
1045 "Telling other peer that we expect its full set\n");
1046 op->state->phase = PHASE_EXPECT_IBF;
1047 ev = GNUNET_MQ_msg_header (
1048 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
1049 GNUNET_MQ_send (op->mq,
1050 ev);
1051 }
1052 }
1053 else
1054 {
1055 GNUNET_STATISTICS_update (_GSS_statistics,
1056 "# of ibf sends",
1057 1,
1058 GNUNET_NO);
1059 if (GNUNET_OK !=
1060 send_ibf (op,
1061 get_order_from_difference (diff)))
1062 {
1063 /* Internal error, best we can do is shut the connection */
1064 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1065 "Failed to send IBF, closing connection\n");
1066 fail_union_operation (op);
1067 return;
1068 }
1069 }
1070 GNUNET_CADET_receive_done (op->channel);
1071}
1072
1073
1074/**
1075 * Iterator to send elements to a remote peer
1076 *
1077 * @param cls closure with the element key and the union operation
1078 * @param key ignored
1079 * @param value the key entry
1080 */
1081static int
1082send_offers_iterator (void *cls,
1083 uint32_t key,
1084 void *value)
1085{
1086 struct SendElementClosure *sec = cls;
1087 struct Operation *op = sec->op;
1088 struct KeyEntry *ke = value;
1089 struct GNUNET_MQ_Envelope *ev;
1090 struct GNUNET_MessageHeader *mh;
1091
1092 /* Detect 32-bit key collision for the 64-bit IBF keys. */
1093 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1094 return GNUNET_YES;
1095
1096 ev = GNUNET_MQ_msg_header_extra (mh,
1097 sizeof(struct GNUNET_HashCode),
1098 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
1099
1100 GNUNET_assert (NULL != ev);
1101 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1102 LOG (GNUNET_ERROR_TYPE_DEBUG,
1103 "[OP %x] sending element offer (%s) to peer\n",
1104 (void *) op,
1105 GNUNET_h2s (&ke->element->element_hash));
1106 GNUNET_MQ_send (op->mq, ev);
1107 return GNUNET_YES;
1108}
1109
1110
1111/**
1112 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
1113 *
1114 * @param op union operation
1115 * @param ibf_key IBF key of interest
1116 */
1117static void
1118send_offers_for_key (struct Operation *op,
1119 struct IBF_Key ibf_key)
1120{
1121 struct SendElementClosure send_cls;
1122
1123 send_cls.ibf_key = ibf_key;
1124 send_cls.op = op;
1125 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
1126 op->state->key_to_element,
1127 (uint32_t) ibf_key.
1128 key_val,
1129 &send_offers_iterator,
1130 &send_cls);
1131}
1132
1133
1134/**
1135 * Decode which elements are missing on each side, and
1136 * send the appropriate offers and inquiries.
1137 *
1138 * @param op union operation
1139 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
1140 */
1141static int
1142decode_and_send (struct Operation *op)
1143{
1144 struct IBF_Key key;
1145 struct IBF_Key last_key;
1146 int side;
1147 unsigned int num_decoded;
1148 struct InvertibleBloomFilter *diff_ibf;
1149
1150 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1151
1152 if (GNUNET_OK !=
1153 prepare_ibf (op,
1154 op->state->remote_ibf->size))
1155 {
1156 GNUNET_break (0);
1157 /* allocation failed */
1158 return GNUNET_SYSERR;
1159 }
1160 diff_ibf = ibf_dup (op->state->local_ibf);
1161 ibf_subtract (diff_ibf,
1162 op->state->remote_ibf);
1163
1164 ibf_destroy (op->state->remote_ibf);
1165 op->state->remote_ibf = NULL;
1166
1167 LOG (GNUNET_ERROR_TYPE_DEBUG,
1168 "decoding IBF (size=%u)\n",
1169 diff_ibf->size);
1170
1171 num_decoded = 0;
1172 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1173
1174 while (1)
1175 {
1176 int res;
1177 int cycle_detected = GNUNET_NO;
1178
1179 last_key = key;
1180
1181 res = ibf_decode (diff_ibf, &side, &key);
1182 if (res == GNUNET_OK)
1183 {
1184 LOG (GNUNET_ERROR_TYPE_DEBUG,
1185 "decoded ibf key %lx\n",
1186 (unsigned long) key.key_val);
1187 num_decoded += 1;
1188 if ((num_decoded > diff_ibf->size) ||
1189 ((num_decoded > 1) &&
1190 (last_key.key_val == key.key_val)))
1191 {
1192 LOG (GNUNET_ERROR_TYPE_DEBUG,
1193 "detected cyclic ibf (decoded %u/%u)\n",
1194 num_decoded,
1195 diff_ibf->size);
1196 cycle_detected = GNUNET_YES;
1197 }
1198 }
1199 if ((GNUNET_SYSERR == res) ||
1200 (GNUNET_YES == cycle_detected))
1201 {
1202 int next_order;
1203 next_order = 0;
1204 while (1 << next_order < diff_ibf->size)
1205 next_order++;
1206 next_order++;
1207 if (next_order <= MAX_IBF_ORDER)
1208 {
1209 LOG (GNUNET_ERROR_TYPE_DEBUG,
1210 "decoding failed, sending larger ibf (size %u)\n",
1211 1 << next_order);
1212 GNUNET_STATISTICS_update (_GSS_statistics,
1213 "# of IBF retries",
1214 1,
1215 GNUNET_NO);
1216 op->state->salt_send++;
1217 if (GNUNET_OK !=
1218 send_ibf (op, next_order))
1219 {
1220 /* Internal error, best we can do is shut the connection */
1221 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1222 "Failed to send IBF, closing connection\n");
1223 fail_union_operation (op);
1224 ibf_destroy (diff_ibf);
1225 return GNUNET_SYSERR;
1226 }
1227 }
1228 else
1229 {
1230 GNUNET_STATISTICS_update (_GSS_statistics,
1231 "# of failed union operations (too large)",
1232 1,
1233 GNUNET_NO);
1234 // XXX: Send the whole set, element-by-element
1235 LOG (GNUNET_ERROR_TYPE_ERROR,
1236 "set union failed: reached ibf limit\n");
1237 fail_union_operation (op);
1238 ibf_destroy (diff_ibf);
1239 return GNUNET_SYSERR;
1240 }
1241 break;
1242 }
1243 if (GNUNET_NO == res)
1244 {
1245 struct GNUNET_MQ_Envelope *ev;
1246
1247 LOG (GNUNET_ERROR_TYPE_DEBUG,
1248 "transmitted all values, sending DONE\n");
1249 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1250 GNUNET_MQ_send (op->mq, ev);
1251 /* We now wait until we get a DONE message back
1252 * and then wait for our MQ to be flushed and all our
1253 * demands be delivered. */
1254 break;
1255 }
1256 if (1 == side)
1257 {
1258 struct IBF_Key unsalted_key;
1259
1260 unsalt_key (&key,
1261 op->state->salt_receive,
1262 &unsalted_key);
1263 send_offers_for_key (op,
1264 unsalted_key);
1265 }
1266 else if (-1 == side)
1267 {
1268 struct GNUNET_MQ_Envelope *ev;
1269 struct InquiryMessage *msg;
1270
1271 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1272 * the effort additional complexity. */
1273 ev = GNUNET_MQ_msg_extra (msg,
1274 sizeof(struct IBF_Key),
1275 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
1276 msg->salt = htonl (op->state->salt_receive);
1277 GNUNET_memcpy (&msg[1],
1278 &key,
1279 sizeof(struct IBF_Key));
1280 LOG (GNUNET_ERROR_TYPE_DEBUG,
1281 "sending element inquiry for IBF key %lx\n",
1282 (unsigned long) key.key_val);
1283 GNUNET_MQ_send (op->mq, ev);
1284 }
1285 else
1286 {
1287 GNUNET_assert (0);
1288 }
1289 }
1290 ibf_destroy (diff_ibf);
1291 return GNUNET_OK;
1292}
1293
1294
1295/**
1296 * Check an IBF message from a remote peer.
1297 *
1298 * Reassemble the IBF from multiple pieces, and
1299 * process the whole IBF once possible.
1300 *
1301 * @param cls the union operation
1302 * @param msg the header of the message
1303 * @return #GNUNET_OK if @a msg is well-formed
1304 */
1305static int
1306check_union_p2p_ibf (void *cls,
1307 const struct IBFMessage *msg)
1308{
1309 struct Operation *op = cls;
1310 unsigned int buckets_in_message;
1311
1312 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1313 / IBF_BUCKET_SIZE;
1314 if (0 == buckets_in_message)
1315 {
1316 GNUNET_break_op (0);
1317 return GNUNET_SYSERR;
1318 }
1319 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1320 * IBF_BUCKET_SIZE)
1321 {
1322 GNUNET_break_op (0);
1323 return GNUNET_SYSERR;
1324 }
1325 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1326 {
1327 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1328 {
1329 GNUNET_break_op (0);
1330 return GNUNET_SYSERR;
1331 }
1332 if (1 << msg->order != op->state->remote_ibf->size)
1333 {
1334 GNUNET_break_op (0);
1335 return GNUNET_SYSERR;
1336 }
1337 if (ntohl (msg->salt) != op->state->salt_receive)
1338 {
1339 GNUNET_break_op (0);
1340 return GNUNET_SYSERR;
1341 }
1342 }
1343 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1344 (op->state->phase != PHASE_EXPECT_IBF))
1345 {
1346 GNUNET_break_op (0);
1347 return GNUNET_SYSERR;
1348 }
1349
1350 return GNUNET_OK;
1351}
1352
1353
1354/**
1355 * Handle an IBF message from a remote peer.
1356 *
1357 * Reassemble the IBF from multiple pieces, and
1358 * process the whole IBF once possible.
1359 *
1360 * @param cls the union operation
1361 * @param msg the header of the message
1362 */
1363static void
1364handle_union_p2p_ibf (void *cls,
1365 const struct IBFMessage *msg)
1366{
1367 struct Operation *op = cls;
1368 unsigned int buckets_in_message;
1369
1370 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1371 / IBF_BUCKET_SIZE;
1372 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1373 (op->state->phase == PHASE_EXPECT_IBF))
1374 {
1375 op->state->phase = PHASE_EXPECT_IBF_CONT;
1376 GNUNET_assert (NULL == op->state->remote_ibf);
1377 LOG (GNUNET_ERROR_TYPE_DEBUG,
1378 "Creating new ibf of size %u\n",
1379 1 << msg->order);
1380 op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1381 op->state->salt_receive = ntohl (msg->salt);
1382 LOG (GNUNET_ERROR_TYPE_DEBUG,
1383 "Receiving new IBF with salt %u\n",
1384 op->state->salt_receive);
1385 if (NULL == op->state->remote_ibf)
1386 {
1387 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1388 "Failed to parse remote IBF, closing connection\n");
1389 fail_union_operation (op);
1390 return;
1391 }
1392 op->state->ibf_buckets_received = 0;
1393 if (0 != ntohl (msg->offset))
1394 {
1395 GNUNET_break_op (0);
1396 fail_union_operation (op);
1397 return;
1398 }
1399 }
1400 else
1401 {
1402 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1403 LOG (GNUNET_ERROR_TYPE_DEBUG,
1404 "Received more of IBF\n");
1405 }
1406 GNUNET_assert (NULL != op->state->remote_ibf);
1407
1408 ibf_read_slice (&msg[1],
1409 op->state->ibf_buckets_received,
1410 buckets_in_message,
1411 op->state->remote_ibf);
1412 op->state->ibf_buckets_received += buckets_in_message;
1413
1414 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1415 {
1416 LOG (GNUNET_ERROR_TYPE_DEBUG,
1417 "received full ibf\n");
1418 op->state->phase = PHASE_INVENTORY_ACTIVE;
1419 if (GNUNET_OK !=
1420 decode_and_send (op))
1421 {
1422 /* Internal error, best we can do is shut down */
1423 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1424 "Failed to decode IBF, closing connection\n");
1425 fail_union_operation (op);
1426 return;
1427 }
1428 }
1429 GNUNET_CADET_receive_done (op->channel);
1430}
1431
1432
1433/**
1434 * Send a result message to the client indicating
1435 * that there is a new element.
1436 *
1437 * @param op union operation
1438 * @param element element to send
1439 * @param status status to send with the new element
1440 */
1441static void
1442send_client_element (struct Operation *op,
1443 const struct GNUNET_SETU_Element *element,
1444 int status)
1445{
1446 struct GNUNET_MQ_Envelope *ev;
1447 struct GNUNET_SETU_ResultMessage *rm;
1448
1449 LOG (GNUNET_ERROR_TYPE_DEBUG,
1450 "sending element (size %u) to client\n",
1451 element->size);
1452 GNUNET_assert (0 != op->client_request_id);
1453 ev = GNUNET_MQ_msg_extra (rm,
1454 element->size,
1455 GNUNET_MESSAGE_TYPE_SET_RESULT);
1456 if (NULL == ev)
1457 {
1458 GNUNET_MQ_discard (ev);
1459 GNUNET_break (0);
1460 return;
1461 }
1462 rm->result_status = htons (status);
1463 rm->request_id = htonl (op->client_request_id);
1464 rm->element_type = htons (element->element_type);
1465 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1466 op->state->key_to_element));
1467 GNUNET_memcpy (&rm[1],
1468 element->data,
1469 element->size);
1470 GNUNET_MQ_send (op->set->cs->mq,
1471 ev);
1472}
1473
1474
1475/**
1476 * Signal to the client that the operation has finished and
1477 * destroy the operation.
1478 *
1479 * @param cls operation to destroy
1480 */
1481static void
1482send_client_done (void *cls)
1483{
1484 struct Operation *op = cls;
1485 struct GNUNET_MQ_Envelope *ev;
1486 struct GNUNET_SETU_ResultMessage *rm;
1487
1488 if (GNUNET_YES == op->state->client_done_sent)
1489 {
1490 return;
1491 }
1492
1493 if (PHASE_DONE != op->state->phase)
1494 {
1495 LOG (GNUNET_ERROR_TYPE_WARNING,
1496 "Union operation failed\n");
1497 GNUNET_STATISTICS_update (_GSS_statistics,
1498 "# Union operations failed",
1499 1,
1500 GNUNET_NO);
1501 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1502 rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1503 rm->request_id = htonl (op->client_request_id);
1504 rm->element_type = htons (0);
1505 GNUNET_MQ_send (op->set->cs->mq,
1506 ev);
1507 return;
1508 }
1509
1510 op->state->client_done_sent = GNUNET_YES;
1511
1512 GNUNET_STATISTICS_update (_GSS_statistics,
1513 "# Union operations succeeded",
1514 1,
1515 GNUNET_NO);
1516 LOG (GNUNET_ERROR_TYPE_INFO,
1517 "Signalling client that union operation is done\n");
1518 ev = GNUNET_MQ_msg (rm,
1519 GNUNET_MESSAGE_TYPE_SET_RESULT);
1520 rm->request_id = htonl (op->client_request_id);
1521 rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
1522 rm->element_type = htons (0);
1523 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1524 op->state->key_to_element));
1525 GNUNET_MQ_send (op->set->cs->mq,
1526 ev);
1527}
1528
1529
1530/**
1531 * Tests if the operation is finished, and if so notify.
1532 *
1533 * @param op operation to check
1534 */
1535static void
1536maybe_finish (struct Operation *op)
1537{
1538 unsigned int num_demanded;
1539
1540 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1541 op->state->demanded_hashes);
1542
1543 if (PHASE_FINISH_WAITING == op->state->phase)
1544 {
1545 LOG (GNUNET_ERROR_TYPE_DEBUG,
1546 "In PHASE_FINISH_WAITING, pending %u demands\n",
1547 num_demanded);
1548 if (0 == num_demanded)
1549 {
1550 struct GNUNET_MQ_Envelope *ev;
1551
1552 op->state->phase = PHASE_DONE;
1553 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1554 GNUNET_MQ_send (op->mq,
1555 ev);
1556 /* We now wait until the other peer sends P2P_OVER
1557 * after it got all elements from us. */
1558 }
1559 }
1560 if (PHASE_FINISH_CLOSING == op->state->phase)
1561 {
1562 LOG (GNUNET_ERROR_TYPE_DEBUG,
1563 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1564 num_demanded);
1565 if (0 == num_demanded)
1566 {
1567 op->state->phase = PHASE_DONE;
1568 send_client_done (op);
1569 _GSS_operation_destroy2 (op);
1570 }
1571 }
1572}
1573
1574
1575/**
1576 * Check an element message from a remote peer.
1577 *
1578 * @param cls the union operation
1579 * @param emsg the message
1580 */
1581static int
1582check_union_p2p_elements (void *cls,
1583 const struct GNUNET_SETU_ElementMessage *emsg)
1584{
1585 struct Operation *op = cls;
1586
1587 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1588 {
1589 GNUNET_break_op (0);
1590 return GNUNET_SYSERR;
1591 }
1592 return GNUNET_OK;
1593}
1594
1595
1596/**
1597 * Handle an element message from a remote peer.
1598 * Sent by the other peer either because we decoded an IBF and placed a demand,
1599 * or because the other peer switched to full set transmission.
1600 *
1601 * @param cls the union operation
1602 * @param emsg the message
1603 */
1604static void
1605handle_union_p2p_elements (void *cls,
1606 const struct GNUNET_SETU_ElementMessage *emsg)
1607{
1608 struct Operation *op = cls;
1609 struct ElementEntry *ee;
1610 struct KeyEntry *ke;
1611 uint16_t element_size;
1612
1613 element_size = ntohs (emsg->header.size) - sizeof(struct
1614 GNUNET_SETU_ElementMessage);
1615 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1616 GNUNET_memcpy (&ee[1],
1617 &emsg[1],
1618 element_size);
1619 ee->element.size = element_size;
1620 ee->element.data = &ee[1];
1621 ee->element.element_type = ntohs (emsg->element_type);
1622 ee->remote = GNUNET_YES;
1623 GNUNET_SETU_element_hash (&ee->element,
1624 &ee->element_hash);
1625 if (GNUNET_NO ==
1626 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1627 &ee->element_hash,
1628 NULL))
1629 {
1630 /* We got something we didn't demand, since it's not in our map. */
1631 GNUNET_break_op (0);
1632 fail_union_operation (op);
1633 return;
1634 }
1635
1636 LOG (GNUNET_ERROR_TYPE_DEBUG,
1637 "Got element (size %u, hash %s) from peer\n",
1638 (unsigned int) element_size,
1639 GNUNET_h2s (&ee->element_hash));
1640
1641 GNUNET_STATISTICS_update (_GSS_statistics,
1642 "# received elements",
1643 1,
1644 GNUNET_NO);
1645 GNUNET_STATISTICS_update (_GSS_statistics,
1646 "# exchanged elements",
1647 1,
1648 GNUNET_NO);
1649
1650 op->state->received_total++;
1651
1652 ke = op_get_element (op, &ee->element_hash);
1653 if (NULL != ke)
1654 {
1655 /* Got repeated element. Should not happen since
1656 * we track demands. */
1657 GNUNET_STATISTICS_update (_GSS_statistics,
1658 "# repeated elements",
1659 1,
1660 GNUNET_NO);
1661 ke->received = GNUNET_YES;
1662 GNUNET_free (ee);
1663 }
1664 else
1665 {
1666 LOG (GNUNET_ERROR_TYPE_DEBUG,
1667 "Registering new element from remote peer\n");
1668 op->state->received_fresh++;
1669 op_register_element (op, ee, GNUNET_YES);
1670 /* only send results immediately if the client wants it */
1671 send_client_element (op,
1672 &ee->element,
1673 GNUNET_SETU_STATUS_ADD_LOCAL);
1674 }
1675
1676 if ((op->state->received_total > 8) &&
1677 (op->state->received_fresh < op->state->received_total / 3))
1678 {
1679 /* The other peer gave us lots of old elements, there's something wrong. */
1680 GNUNET_break_op (0);
1681 fail_union_operation (op);
1682 return;
1683 }
1684 GNUNET_CADET_receive_done (op->channel);
1685 maybe_finish (op);
1686}
1687
1688
1689/**
1690 * Check a full element message from a remote peer.
1691 *
1692 * @param cls the union operation
1693 * @param emsg the message
1694 */
1695static int
1696check_union_p2p_full_element (void *cls,
1697 const struct GNUNET_SETU_ElementMessage *emsg)
1698{
1699 struct Operation *op = cls;
1700
1701 (void) op;
1702 // FIXME: check that we expect full elements here?
1703 return GNUNET_OK;
1704}
1705
1706
1707/**
1708 * Handle an element message from a remote peer.
1709 *
1710 * @param cls the union operation
1711 * @param emsg the message
1712 */
1713static void
1714handle_union_p2p_full_element (void *cls,
1715 const struct GNUNET_SETU_ElementMessage *emsg)
1716{
1717 struct Operation *op = cls;
1718 struct ElementEntry *ee;
1719 struct KeyEntry *ke;
1720 uint16_t element_size;
1721
1722 element_size = ntohs (emsg->header.size)
1723 - sizeof(struct GNUNET_SETU_ElementMessage);
1724 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1725 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1726 ee->element.size = element_size;
1727 ee->element.data = &ee[1];
1728 ee->element.element_type = ntohs (emsg->element_type);
1729 ee->remote = GNUNET_YES;
1730 GNUNET_SETU_element_hash (&ee->element, &ee->element_hash);
1731
1732 LOG (GNUNET_ERROR_TYPE_DEBUG,
1733 "Got element (full diff, size %u, hash %s) from peer\n",
1734 (unsigned int) element_size,
1735 GNUNET_h2s (&ee->element_hash));
1736
1737 GNUNET_STATISTICS_update (_GSS_statistics,
1738 "# received elements",
1739 1,
1740 GNUNET_NO);
1741 GNUNET_STATISTICS_update (_GSS_statistics,
1742 "# exchanged elements",
1743 1,
1744 GNUNET_NO);
1745
1746 op->state->received_total++;
1747
1748 ke = op_get_element (op, &ee->element_hash);
1749 if (NULL != ke)
1750 {
1751 /* Got repeated element. Should not happen since
1752 * we track demands. */
1753 GNUNET_STATISTICS_update (_GSS_statistics,
1754 "# repeated elements",
1755 1,
1756 GNUNET_NO);
1757 ke->received = GNUNET_YES;
1758 GNUNET_free (ee);
1759 }
1760 else
1761 {
1762 LOG (GNUNET_ERROR_TYPE_DEBUG,
1763 "Registering new element from remote peer\n");
1764 op->state->received_fresh++;
1765 op_register_element (op, ee, GNUNET_YES);
1766 /* only send results immediately if the client wants it */
1767 send_client_element (op,
1768 &ee->element,
1769 GNUNET_SETU_STATUS_ADD_LOCAL);
1770 }
1771
1772 if ((GNUNET_YES == op->byzantine) &&
1773 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1774 (op->state->received_fresh < op->state->received_total / 6))
1775 {
1776 /* The other peer gave us lots of old elements, there's something wrong. */
1777 LOG (GNUNET_ERROR_TYPE_ERROR,
1778 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1779 (unsigned long long) op->state->received_fresh,
1780 (unsigned long long) op->state->received_total);
1781 GNUNET_break_op (0);
1782 fail_union_operation (op);
1783 return;
1784 }
1785 GNUNET_CADET_receive_done (op->channel);
1786}
1787
1788
1789/**
1790 * Send offers (for GNUNET_Hash-es) in response
1791 * to inquiries (for IBF_Key-s).
1792 *
1793 * @param cls the union operation
1794 * @param msg the message
1795 */
1796static int
1797check_union_p2p_inquiry (void *cls,
1798 const struct InquiryMessage *msg)
1799{
1800 struct Operation *op = cls;
1801 unsigned int num_keys;
1802
1803 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1804 {
1805 GNUNET_break_op (0);
1806 return GNUNET_SYSERR;
1807 }
1808 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1809 / sizeof(struct IBF_Key);
1810 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1811 != num_keys * sizeof(struct IBF_Key))
1812 {
1813 GNUNET_break_op (0);
1814 return GNUNET_SYSERR;
1815 }
1816 return GNUNET_OK;
1817}
1818
1819
1820/**
1821 * Send offers (for GNUNET_Hash-es) in response
1822 * to inquiries (for IBF_Key-s).
1823 *
1824 * @param cls the union operation
1825 * @param msg the message
1826 */
1827static void
1828handle_union_p2p_inquiry (void *cls,
1829 const struct InquiryMessage *msg)
1830{
1831 struct Operation *op = cls;
1832 const struct IBF_Key *ibf_key;
1833 unsigned int num_keys;
1834
1835 LOG (GNUNET_ERROR_TYPE_DEBUG,
1836 "Received union inquiry\n");
1837 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1838 / sizeof(struct IBF_Key);
1839 ibf_key = (const struct IBF_Key *) &msg[1];
1840 while (0 != num_keys--)
1841 {
1842 struct IBF_Key unsalted_key;
1843
1844 unsalt_key (ibf_key,
1845 ntohl (msg->salt),
1846 &unsalted_key);
1847 send_offers_for_key (op,
1848 unsalted_key);
1849 ibf_key++;
1850 }
1851 GNUNET_CADET_receive_done (op->channel);
1852}
1853
1854
1855/**
1856 * Iterator over hash map entries, called to
1857 * destroy the linked list of colliding ibf key entries.
1858 *
1859 * @param cls closure
1860 * @param key current key code
1861 * @param value value in the hash map
1862 * @return #GNUNET_YES if we should continue to iterate,
1863 * #GNUNET_NO if not.
1864 */
1865static int
1866send_missing_full_elements_iter (void *cls,
1867 uint32_t key,
1868 void *value)
1869{
1870 struct Operation *op = cls;
1871 struct KeyEntry *ke = value;
1872 struct GNUNET_MQ_Envelope *ev;
1873 struct GNUNET_SETU_ElementMessage *emsg;
1874 struct ElementEntry *ee = ke->element;
1875
1876 if (GNUNET_YES == ke->received)
1877 return GNUNET_YES;
1878 ev = GNUNET_MQ_msg_extra (emsg,
1879 ee->element.size,
1880 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1881 GNUNET_memcpy (&emsg[1],
1882 ee->element.data,
1883 ee->element.size);
1884 emsg->element_type = htons (ee->element.element_type);
1885 GNUNET_MQ_send (op->mq,
1886 ev);
1887 return GNUNET_YES;
1888}
1889
1890
1891/**
1892 * Handle a request for full set transmission.
1893 *
1894 * @parem cls closure, a set union operation
1895 * @param mh the demand message
1896 */
1897static void
1898handle_union_p2p_request_full (void *cls,
1899 const struct GNUNET_MessageHeader *mh)
1900{
1901 struct Operation *op = cls;
1902
1903 LOG (GNUNET_ERROR_TYPE_DEBUG,
1904 "Received request for full set transmission\n");
1905 if (PHASE_EXPECT_IBF != op->state->phase)
1906 {
1907 GNUNET_break_op (0);
1908 fail_union_operation (op);
1909 return;
1910 }
1911
1912 // FIXME: we need to check that our set is larger than the
1913 // byzantine_lower_bound by some threshold
1914 send_full_set (op);
1915 GNUNET_CADET_receive_done (op->channel);
1916}
1917
1918
1919/**
1920 * Handle a "full done" message.
1921 *
1922 * @parem cls closure, a set union operation
1923 * @param mh the demand message
1924 */
1925static void
1926handle_union_p2p_full_done (void *cls,
1927 const struct GNUNET_MessageHeader *mh)
1928{
1929 struct Operation *op = cls;
1930
1931 switch (op->state->phase)
1932 {
1933 case PHASE_EXPECT_IBF:
1934 {
1935 struct GNUNET_MQ_Envelope *ev;
1936
1937 LOG (GNUNET_ERROR_TYPE_DEBUG,
1938 "got FULL DONE, sending elements that other peer is missing\n");
1939
1940 /* send all the elements that did not come from the remote peer */
1941 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1942 &send_missing_full_elements_iter,
1943 op);
1944
1945 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1946 GNUNET_MQ_send (op->mq,
1947 ev);
1948 op->state->phase = PHASE_DONE;
1949 /* we now wait until the other peer sends us the OVER message*/
1950 }
1951 break;
1952
1953 case PHASE_FULL_SENDING:
1954 {
1955 LOG (GNUNET_ERROR_TYPE_DEBUG,
1956 "got FULL DONE, finishing\n");
1957 /* We sent the full set, and got the response for that. We're done. */
1958 op->state->phase = PHASE_DONE;
1959 GNUNET_CADET_receive_done (op->channel);
1960 send_client_done (op);
1961 _GSS_operation_destroy2 (op);
1962 return;
1963 }
1964 break;
1965
1966 default:
1967 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1968 "Handle full done phase is %u\n",
1969 (unsigned) op->state->phase);
1970 GNUNET_break_op (0);
1971 fail_union_operation (op);
1972 return;
1973 }
1974 GNUNET_CADET_receive_done (op->channel);
1975}
1976
1977
1978/**
1979 * Check a demand by the other peer for elements based on a list
1980 * of `struct GNUNET_HashCode`s.
1981 *
1982 * @parem cls closure, a set union operation
1983 * @param mh the demand message
1984 * @return #GNUNET_OK if @a mh is well-formed
1985 */
1986static int
1987check_union_p2p_demand (void *cls,
1988 const struct GNUNET_MessageHeader *mh)
1989{
1990 struct Operation *op = cls;
1991 unsigned int num_hashes;
1992
1993 (void) op;
1994 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1995 / sizeof(struct GNUNET_HashCode);
1996 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1997 != num_hashes * sizeof(struct GNUNET_HashCode))
1998 {
1999 GNUNET_break_op (0);
2000 return GNUNET_SYSERR;
2001 }
2002 return GNUNET_OK;
2003}
2004
2005
2006/**
2007 * Handle a demand by the other peer for elements based on a list
2008 * of `struct GNUNET_HashCode`s.
2009 *
2010 * @parem cls closure, a set union operation
2011 * @param mh the demand message
2012 */
2013static void
2014handle_union_p2p_demand (void *cls,
2015 const struct GNUNET_MessageHeader *mh)
2016{
2017 struct Operation *op = cls;
2018 struct ElementEntry *ee;
2019 struct GNUNET_SETU_ElementMessage *emsg;
2020 const struct GNUNET_HashCode *hash;
2021 unsigned int num_hashes;
2022 struct GNUNET_MQ_Envelope *ev;
2023
2024 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2025 / sizeof(struct GNUNET_HashCode);
2026 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2027 num_hashes > 0;
2028 hash++, num_hashes--)
2029 {
2030 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2031 hash);
2032 if (NULL == ee)
2033 {
2034 /* Demand for non-existing element. */
2035 GNUNET_break_op (0);
2036 fail_union_operation (op);
2037 return;
2038 }
2039 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2040 {
2041 /* Probably confused lazily copied sets. */
2042 GNUNET_break_op (0);
2043 fail_union_operation (op);
2044 return;
2045 }
2046 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
2047 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
2048 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
2049 emsg->reserved = htons (0);
2050 emsg->element_type = htons (ee->element.element_type);
2051 LOG (GNUNET_ERROR_TYPE_DEBUG,
2052 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
2053 (void *) op,
2054 (unsigned int) ee->element.size,
2055 GNUNET_h2s (&ee->element_hash));
2056 GNUNET_MQ_send (op->mq, ev);
2057 GNUNET_STATISTICS_update (_GSS_statistics,
2058 "# exchanged elements",
2059 1,
2060 GNUNET_NO);
2061 }
2062 GNUNET_CADET_receive_done (op->channel);
2063}
2064
2065
2066/**
2067 * Check offer (of `struct GNUNET_HashCode`s).
2068 *
2069 * @param cls the union operation
2070 * @param mh the message
2071 * @return #GNUNET_OK if @a mh is well-formed
2072 */
2073static int
2074check_union_p2p_offer (void *cls,
2075 const struct GNUNET_MessageHeader *mh)
2076{
2077 struct Operation *op = cls;
2078 unsigned int num_hashes;
2079
2080 /* look up elements and send them */
2081 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2082 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2083 {
2084 GNUNET_break_op (0);
2085 return GNUNET_SYSERR;
2086 }
2087 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2088 / sizeof(struct GNUNET_HashCode);
2089 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2090 num_hashes * sizeof(struct GNUNET_HashCode))
2091 {
2092 GNUNET_break_op (0);
2093 return GNUNET_SYSERR;
2094 }
2095 return GNUNET_OK;
2096}
2097
2098
2099/**
2100 * Handle offers (of `struct GNUNET_HashCode`s) and
2101 * respond with demands (of `struct GNUNET_HashCode`s).
2102 *
2103 * @param cls the union operation
2104 * @param mh the message
2105 */
2106static void
2107handle_union_p2p_offer (void *cls,
2108 const struct GNUNET_MessageHeader *mh)
2109{
2110 struct Operation *op = cls;
2111 const struct GNUNET_HashCode *hash;
2112 unsigned int num_hashes;
2113
2114 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2115 / sizeof(struct GNUNET_HashCode);
2116 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2117 num_hashes > 0;
2118 hash++, num_hashes--)
2119 {
2120 struct ElementEntry *ee;
2121 struct GNUNET_MessageHeader *demands;
2122 struct GNUNET_MQ_Envelope *ev;
2123
2124 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2125 hash);
2126 if (NULL != ee)
2127 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2128 continue;
2129
2130 if (GNUNET_YES ==
2131 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2132 hash))
2133 {
2134 LOG (GNUNET_ERROR_TYPE_DEBUG,
2135 "Skipped sending duplicate demand\n");
2136 continue;
2137 }
2138
2139 GNUNET_assert (GNUNET_OK ==
2140 GNUNET_CONTAINER_multihashmap_put (
2141 op->state->demanded_hashes,
2142 hash,
2143 NULL,
2144 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
2145
2146 LOG (GNUNET_ERROR_TYPE_DEBUG,
2147 "[OP %x] Requesting element (hash %s)\n",
2148 (void *) op, GNUNET_h2s (hash));
2149 ev = GNUNET_MQ_msg_header_extra (demands,
2150 sizeof(struct GNUNET_HashCode),
2151 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2152 GNUNET_memcpy (&demands[1],
2153 hash,
2154 sizeof(struct GNUNET_HashCode));
2155 GNUNET_MQ_send (op->mq, ev);
2156 }
2157 GNUNET_CADET_receive_done (op->channel);
2158}
2159
2160
2161/**
2162 * Handle a done message from a remote peer
2163 *
2164 * @param cls the union operation
2165 * @param mh the message
2166 */
2167static void
2168handle_union_p2p_done (void *cls,
2169 const struct GNUNET_MessageHeader *mh)
2170{
2171 struct Operation *op = cls;
2172
2173 switch (op->state->phase)
2174 {
2175 case PHASE_INVENTORY_PASSIVE:
2176 /* We got all requests, but still have to send our elements in response. */
2177 op->state->phase = PHASE_FINISH_WAITING;
2178
2179 LOG (GNUNET_ERROR_TYPE_DEBUG,
2180 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2181 /* The active peer is done sending offers
2182 * and inquiries. This means that all
2183 * our responses to that (demands and offers)
2184 * must be in flight (queued or in mesh).
2185 *
2186 * We should notify the active peer once
2187 * all our demands are satisfied, so that the active
2188 * peer can quit if we gave it everything.
2189 */GNUNET_CADET_receive_done (op->channel);
2190 maybe_finish (op);
2191 return;
2192
2193 case PHASE_INVENTORY_ACTIVE:
2194 LOG (GNUNET_ERROR_TYPE_DEBUG,
2195 "got DONE (as active partner), waiting to finish\n");
2196 /* All demands of the other peer are satisfied,
2197 * and we processed all offers, thus we know
2198 * exactly what our demands must be.
2199 *
2200 * We'll close the channel
2201 * to the other peer once our demands are met.
2202 */op->state->phase = PHASE_FINISH_CLOSING;
2203 GNUNET_CADET_receive_done (op->channel);
2204 maybe_finish (op);
2205 return;
2206
2207 default:
2208 GNUNET_break_op (0);
2209 fail_union_operation (op);
2210 return;
2211 }
2212}
2213
2214
2215/**
2216 * Handle a over message from a remote peer
2217 *
2218 * @param cls the union operation
2219 * @param mh the message
2220 */
2221static void
2222handle_union_p2p_over (void *cls,
2223 const struct GNUNET_MessageHeader *mh)
2224{
2225 send_client_done (cls);
2226}
2227
2228
2229/**
2230 * Initiate operation to evaluate a set union with a remote peer.
2231 *
2232 * @param op operation to perform (to be initialized)
2233 * @param opaque_context message to be transmitted to the listener
2234 * to convince it to accept, may be NULL
2235 */
2236static struct OperationState *
2237union_evaluate (struct Operation *op,
2238 const struct GNUNET_MessageHeader *opaque_context)
2239{
2240 struct OperationState *state;
2241 struct GNUNET_MQ_Envelope *ev;
2242 struct OperationRequestMessage *msg;
2243
2244 ev = GNUNET_MQ_msg_nested_mh (msg,
2245 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2246 opaque_context);
2247 if (NULL == ev)
2248 {
2249 /* the context message is too large */
2250 GNUNET_break (0);
2251 return NULL;
2252 }
2253 state = GNUNET_new (struct OperationState);
2254 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2255 GNUNET_NO);
2256 /* copy the current generation's strata estimator for this operation */
2257 state->se = strata_estimator_dup (op->set->state->se);
2258 /* we started the operation, thus we have to send the operation request */
2259 state->phase = PHASE_EXPECT_SE;
2260 state->salt_receive = state->salt_send = 42; // FIXME?????
2261 LOG (GNUNET_ERROR_TYPE_DEBUG,
2262 "Initiating union operation evaluation\n");
2263 GNUNET_STATISTICS_update (_GSS_statistics,
2264 "# of total union operations",
2265 1,
2266 GNUNET_NO);
2267 GNUNET_STATISTICS_update (_GSS_statistics,
2268 "# of initiated union operations",
2269 1,
2270 GNUNET_NO);
2271 GNUNET_MQ_send (op->mq,
2272 ev);
2273
2274 if (NULL != opaque_context)
2275 LOG (GNUNET_ERROR_TYPE_DEBUG,
2276 "sent op request with context message\n");
2277 else
2278 LOG (GNUNET_ERROR_TYPE_DEBUG,
2279 "sent op request without context message\n");
2280
2281 op->state = state;
2282 initialize_key_to_element (op);
2283 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
2284 state->key_to_element);
2285 return state;
2286}
2287
2288
2289/**
2290 * Get the incoming socket associated with the given id.
2291 *
2292 * @param listener the listener to look in
2293 * @param id id to look for
2294 * @return the incoming socket associated with the id,
2295 * or NULL if there is none
2296 */
2297static struct Operation *
2298get_incoming (uint32_t id)
2299{
2300 for (struct Listener *listener = listener_head;
2301 NULL != listener;
2302 listener = listener->next)
2303 {
2304 for (struct Operation *op = listener->op_head;
2305 NULL != op;
2306 op = op->next)
2307 if (op->suggest_id == id)
2308 return op;
2309 }
2310 return NULL;
2311}
2312
2313
2314/**
2315 * Destroy an incoming request from a remote peer
2316 *
2317 * @param op remote request to destroy
2318 */
2319static void
2320incoming_destroy (struct Operation *op)
2321{
2322 struct Listener *listener;
2323
2324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325 "Destroying incoming operation %p\n",
2326 op);
2327 if (NULL != (listener = op->listener))
2328 {
2329 GNUNET_CONTAINER_DLL_remove (listener->op_head,
2330 listener->op_tail,
2331 op);
2332 op->listener = NULL;
2333 }
2334 if (NULL != op->timeout_task)
2335 {
2336 GNUNET_SCHEDULER_cancel (op->timeout_task);
2337 op->timeout_task = NULL;
2338 }
2339 _GSS_operation_destroy2 (op);
2340}
2341
2342
2343/**
2344 * Is element @a ee part of the set used by @a op?
2345 *
2346 * @param ee element to test
2347 * @param op operation the defines the set and its generation
2348 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
2349 */
2350int
2351_GSS_is_element_of_operation (struct ElementEntry *ee,
2352 struct Operation *op)
2353{
2354 return ee->generation >= op->generation_created;
2355}
2356
2357
2358/**
2359 * Destroy the given operation. Used for any operation where both
2360 * peers were known and that thus actually had a vt and channel. Must
2361 * not be used for operations where 'listener' is still set and we do
2362 * not know the other peer.
2363 *
2364 * Call the implementation-specific cancel function of the operation.
2365 * Disconnects from the remote peer. Does not disconnect the client,
2366 * as there may be multiple operations per set.
2367 *
2368 * @param op operation to destroy
2369 */
2370void
2371_GSS_operation_destroy (struct Operation *op)
2372{
2373 struct Set *set = op->set;
2374 struct GNUNET_CADET_Channel *channel;
2375
2376 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2377 "Destroying operation %p\n", op);
2378 GNUNET_assert (NULL == op->listener);
2379 if (NULL != op->state)
2380 {
2381 union_op_cancel (op);
2382 op->state = NULL;
2383 }
2384 if (NULL != set)
2385 {
2386 GNUNET_CONTAINER_DLL_remove (set->ops_head,
2387 set->ops_tail,
2388 op);
2389 op->set = NULL;
2390 }
2391 if (NULL != op->context_msg)
2392 {
2393 GNUNET_free (op->context_msg);
2394 op->context_msg = NULL;
2395 }
2396 if (NULL != (channel = op->channel))
2397 {
2398 /* This will free op; called conditionally as this helper function
2399 is also called from within the channel disconnect handler. */
2400 op->channel = NULL;
2401 GNUNET_CADET_channel_destroy (channel);
2402 }
2403 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
2404 * there was a channel end handler that will free 'op' on the call stack. */
2405}
2406
2407
2408/**
2409 * Callback called when a client connects to the service.
2410 *
2411 * @param cls closure for the service
2412 * @param c the new client that connected to the service
2413 * @param mq the message queue used to send messages to the client
2414 * @return @a `struct ClientState`
2415 */
2416static void *
2417client_connect_cb (void *cls,
2418 struct GNUNET_SERVICE_Client *c,
2419 struct GNUNET_MQ_Handle *mq)
2420{
2421 struct ClientState *cs;
2422
2423 num_clients++;
2424 cs = GNUNET_new (struct ClientState);
2425 cs->client = c;
2426 cs->mq = mq;
2427 return cs;
2428}
2429
2430
2431/**
2432 * Iterator over hash map entries to free element entries.
2433 *
2434 * @param cls closure
2435 * @param key current key code
2436 * @param value a `struct ElementEntry *` to be free'd
2437 * @return #GNUNET_YES (continue to iterate)
2438 */
2439static int
2440destroy_elements_iterator (void *cls,
2441 const struct GNUNET_HashCode *key,
2442 void *value)
2443{
2444 struct ElementEntry *ee = value;
2445
2446 GNUNET_free (ee);
2447 return GNUNET_YES;
2448}
2449
2450
2451/**
2452 * Clean up after a client has disconnected
2453 *
2454 * @param cls closure, unused
2455 * @param client the client to clean up after
2456 * @param internal_cls the `struct ClientState`
2457 */
2458static void
2459client_disconnect_cb (void *cls,
2460 struct GNUNET_SERVICE_Client *client,
2461 void *internal_cls)
2462{
2463 struct ClientState *cs = internal_cls;
2464 struct Operation *op;
2465 struct Listener *listener;
2466 struct Set *set;
2467
2468 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2469 "Client disconnected, cleaning up\n");
2470 if (NULL != (set = cs->set))
2471 {
2472 struct SetContent *content = set->content;
2473
2474 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2475 "Destroying client's set\n");
2476 /* Destroy pending set operations */
2477 while (NULL != set->ops_head)
2478 _GSS_operation_destroy (set->ops_head);
2479
2480 /* Destroy operation-specific state */
2481 GNUNET_assert (NULL != set->state);
2482 if (NULL != set->state->se)
2483 {
2484 strata_estimator_destroy (set->state->se);
2485 set->state->se = NULL;
2486 }
2487 GNUNET_free (set->state);
2488
2489 /* free set content (or at least decrement RC) */
2490 set->content = NULL;
2491 GNUNET_assert (0 != content->refcount);
2492 content->refcount--;
2493 if (0 == content->refcount)
2494 {
2495 GNUNET_assert (NULL != content->elements);
2496 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
2497 &destroy_elements_iterator,
2498 NULL);
2499 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
2500 content->elements = NULL;
2501 GNUNET_free (content);
2502 }
2503 GNUNET_free (set);
2504 }
2505
2506 if (NULL != (listener = cs->listener))
2507 {
2508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2509 "Destroying client's listener\n");
2510 GNUNET_CADET_close_port (listener->open_port);
2511 listener->open_port = NULL;
2512 while (NULL != (op = listener->op_head))
2513 {
2514 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2515 "Destroying incoming operation `%u' from peer `%s'\n",
2516 (unsigned int) op->client_request_id,
2517 GNUNET_i2s (&op->peer));
2518 incoming_destroy (op);
2519 }
2520 GNUNET_CONTAINER_DLL_remove (listener_head,
2521 listener_tail,
2522 listener);
2523 GNUNET_free (listener);
2524 }
2525 GNUNET_free (cs);
2526 num_clients--;
2527 if ( (GNUNET_YES == in_shutdown) &&
2528 (0 == num_clients) )
2529 {
2530 if (NULL != cadet)
2531 {
2532 GNUNET_CADET_disconnect (cadet);
2533 cadet = NULL;
2534 }
2535 }
2536}
2537
2538
2539/**
2540 * Check a request for a set operation from another peer.
2541 *
2542 * @param cls the operation state
2543 * @param msg the received message
2544 * @return #GNUNET_OK if the channel should be kept alive,
2545 * #GNUNET_SYSERR to destroy the channel
2546 */
2547static int
2548check_incoming_msg (void *cls,
2549 const struct OperationRequestMessage *msg)
2550{
2551 struct Operation *op = cls;
2552 struct Listener *listener = op->listener;
2553 const struct GNUNET_MessageHeader *nested_context;
2554
2555 /* double operation request */
2556 if (0 != op->suggest_id)
2557 {
2558 GNUNET_break_op (0);
2559 return GNUNET_SYSERR;
2560 }
2561 /* This should be equivalent to the previous condition, but can't hurt to check twice */
2562 if (NULL == listener)
2563 {
2564 GNUNET_break (0);
2565 return GNUNET_SYSERR;
2566 }
2567 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2568 if ((NULL != nested_context) &&
2569 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
2570 {
2571 GNUNET_break_op (0);
2572 return GNUNET_SYSERR;
2573 }
2574 return GNUNET_OK;
2575}
2576
2577
2578/**
2579 * Handle a request for a set operation from another peer. Checks if we
2580 * have a listener waiting for such a request (and in that case initiates
2581 * asking the listener about accepting the connection). If no listener
2582 * is waiting, we queue the operation request in hope that a listener
2583 * shows up soon (before timeout).
2584 *
2585 * This msg is expected as the first and only msg handled through the
2586 * non-operation bound virtual table, acceptance of this operation replaces
2587 * our virtual table and subsequent msgs would be routed differently (as
2588 * we then know what type of operation this is).
2589 *
2590 * @param cls the operation state
2591 * @param msg the received message
2592 * @return #GNUNET_OK if the channel should be kept alive,
2593 * #GNUNET_SYSERR to destroy the channel
2594 */
2595static void
2596handle_incoming_msg (void *cls,
2597 const struct OperationRequestMessage *msg)
2598{
2599 struct Operation *op = cls;
2600 struct Listener *listener = op->listener;
2601 const struct GNUNET_MessageHeader *nested_context;
2602 struct GNUNET_MQ_Envelope *env;
2603 struct GNUNET_SETU_RequestMessage *cmsg;
2604
2605 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2606 /* Make a copy of the nested_context (application-specific context
2607 information that is opaque to set) so we can pass it to the
2608 listener later on */
2609 if (NULL != nested_context)
2610 op->context_msg = GNUNET_copy_message (nested_context);
2611 op->remote_element_count = ntohl (msg->element_count);
2612 GNUNET_log (
2613 GNUNET_ERROR_TYPE_DEBUG,
2614 "Received P2P operation request (port %s) for active listener\n",
2615 GNUNET_h2s (&op->listener->app_id));
2616 GNUNET_assert (0 == op->suggest_id);
2617 if (0 == suggest_id)
2618 suggest_id++;
2619 op->suggest_id = suggest_id++;
2620 GNUNET_assert (NULL != op->timeout_task);
2621 GNUNET_SCHEDULER_cancel (op->timeout_task);
2622 op->timeout_task = NULL;
2623 env = GNUNET_MQ_msg_nested_mh (cmsg,
2624 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
2625 op->context_msg);
2626 GNUNET_log (
2627 GNUNET_ERROR_TYPE_DEBUG,
2628 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
2629 op->suggest_id,
2630 listener,
2631 listener->cs);
2632 cmsg->accept_id = htonl (op->suggest_id);
2633 cmsg->peer_id = op->peer;
2634 GNUNET_MQ_send (listener->cs->mq,
2635 env);
2636 /* NOTE: GNUNET_CADET_receive_done() will be called in
2637 #handle_client_accept() */
2638}
2639
2640
2641/**
2642 * Called when a client wants to create a new set. This is typically
2643 * the first request from a client, and includes the type of set
2644 * operation to be performed.
2645 *
2646 * @param cls client that sent the message
2647 * @param m message sent by the client
2648 */
2649static void
2650handle_client_create_set (void *cls,
2651 const struct GNUNET_SETU_CreateMessage *msg)
2652{
2653 struct ClientState *cs = cls;
2654 struct Set *set;
2655
2656 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2657 "Client created new set for union operation\n");
2658 if (NULL != cs->set)
2659 {
2660 /* There can only be one set per client */
2661 GNUNET_break (0);
2662 GNUNET_SERVICE_client_drop (cs->client);
2663 return;
2664 }
2665 set = GNUNET_new (struct Set);
2666 {
2667 struct SetState *set_state;
2668
2669 set_state = GNUNET_new (struct SetState); // FIXME: avoid this malloc, merge structs!
2670 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
2671 SE_IBF_SIZE, SE_IBF_HASH_NUM);
2672 if (NULL == set_state->se)
2673 {
2674 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2675 "Failed to allocate strata estimator\n");
2676 GNUNET_free (set);
2677 GNUNET_SERVICE_client_drop (cs->client);
2678 return;
2679 }
2680 set->state = set_state;
2681 }
2682 set->content = GNUNET_new (struct SetContent);
2683 set->content->refcount = 1;
2684 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
2685 GNUNET_YES);
2686 set->cs = cs;
2687 cs->set = set;
2688 GNUNET_SERVICE_client_continue (cs->client);
2689}
2690
2691
2692/**
2693 * Timeout happens iff:
2694 * - we suggested an operation to our listener,
2695 * but did not receive a response in time
2696 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
2697 *
2698 * @param cls channel context
2699 * @param tc context information (why was this task triggered now)
2700 */
2701static void
2702incoming_timeout_cb (void *cls)
2703{
2704 struct Operation *op = cls;
2705
2706 op->timeout_task = NULL;
2707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2708 "Remote peer's incoming request timed out\n");
2709 incoming_destroy (op);
2710}
2711
2712
2713/**
2714 * Method called whenever another peer has added us to a channel the
2715 * other peer initiated. Only called (once) upon reception of data
2716 * from a channel we listen on.
2717 *
2718 * The channel context represents the operation itself and gets added
2719 * to a DLL, from where it gets looked up when our local listener
2720 * client responds to a proposed/suggested operation or connects and
2721 * associates with this operation.
2722 *
2723 * @param cls closure
2724 * @param channel new handle to the channel
2725 * @param source peer that started the channel
2726 * @return initial channel context for the channel
2727 * returns NULL on error
2728 */
2729static void *
2730channel_new_cb (void *cls,
2731 struct GNUNET_CADET_Channel *channel,
2732 const struct GNUNET_PeerIdentity *source)
2733{
2734 struct Listener *listener = cls;
2735 struct Operation *op;
2736
2737 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2738 "New incoming channel\n");
2739 op = GNUNET_new (struct Operation);
2740 op->listener = listener;
2741 op->peer = *source;
2742 op->channel = channel;
2743 op->mq = GNUNET_CADET_get_mq (op->channel);
2744 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2745 UINT32_MAX);
2746 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
2747 &incoming_timeout_cb,
2748 op);
2749 GNUNET_CONTAINER_DLL_insert (listener->op_head,
2750 listener->op_tail,
2751 op);
2752 return op;
2753}
2754
2755
2756/**
2757 * Function called whenever a channel is destroyed. Should clean up
2758 * any associated state. It must NOT call
2759 * GNUNET_CADET_channel_destroy() on the channel.
2760 *
2761 * The peer_disconnect function is part of a a virtual table set initially either
2762 * when a peer creates a new channel with us, or once we create
2763 * a new channel ourselves (evaluate).
2764 *
2765 * Once we know the exact type of operation (union/intersection), the vt is
2766 * replaced with an operation specific instance (_GSS_[op]_vt).
2767 *
2768 * @param channel_ctx place where local state associated
2769 * with the channel is stored
2770 * @param channel connection to the other end (henceforth invalid)
2771 */
2772static void
2773channel_end_cb (void *channel_ctx,
2774 const struct GNUNET_CADET_Channel *channel)
2775{
2776 struct Operation *op = channel_ctx;
2777
2778 op->channel = NULL;
2779 _GSS_operation_destroy2 (op);
2780}
2781
2782
2783/**
2784 * This function probably should not exist
2785 * and be replaced by inlining more specific
2786 * logic in the various places where it is called.
2787 */
2788void
2789_GSS_operation_destroy2 (struct Operation *op)
2790{
2791 struct GNUNET_CADET_Channel *channel;
2792
2793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2794 "channel_end_cb called\n");
2795 if (NULL != (channel = op->channel))
2796 {
2797 /* This will free op; called conditionally as this helper function
2798 is also called from within the channel disconnect handler. */
2799 op->channel = NULL;
2800 GNUNET_CADET_channel_destroy (channel);
2801 }
2802 if (NULL != op->listener)
2803 {
2804 incoming_destroy (op);
2805 return;
2806 }
2807 if (NULL != op->set)
2808 send_client_done (op);
2809 _GSS_operation_destroy (op);
2810 GNUNET_free (op);
2811}
2812
2813
2814/**
2815 * Function called whenever an MQ-channel's transmission window size changes.
2816 *
2817 * The first callback in an outgoing channel will be with a non-zero value
2818 * and will mean the channel is connected to the destination.
2819 *
2820 * For an incoming channel it will be called immediately after the
2821 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
2822 *
2823 * @param cls Channel closure.
2824 * @param channel Connection to the other end (henceforth invalid).
2825 * @param window_size New window size. If the is more messages than buffer size
2826 * this value will be negative..
2827 */
2828static void
2829channel_window_cb (void *cls,
2830 const struct GNUNET_CADET_Channel *channel,
2831 int window_size)
2832{
2833 /* FIXME: not implemented, we could do flow control here... */
2834}
2835
2836
2837/**
2838 * Called when a client wants to create a new listener.
2839 *
2840 * @param cls client that sent the message
2841 * @param msg message sent by the client
2842 */
2843static void
2844handle_client_listen (void *cls,
2845 const struct GNUNET_SETU_ListenMessage *msg)
2846{
2847 struct ClientState *cs = cls;
2848 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2849 GNUNET_MQ_hd_var_size (incoming_msg,
2850 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
2851 struct OperationRequestMessage,
2852 NULL),
2853 GNUNET_MQ_hd_var_size (union_p2p_ibf,
2854 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
2855 struct IBFMessage,
2856 NULL),
2857 GNUNET_MQ_hd_var_size (union_p2p_elements,
2858 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
2859 struct GNUNET_SETU_ElementMessage,
2860 NULL),
2861 GNUNET_MQ_hd_var_size (union_p2p_offer,
2862 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
2863 struct GNUNET_MessageHeader,
2864 NULL),
2865 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
2866 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
2867 struct InquiryMessage,
2868 NULL),
2869 GNUNET_MQ_hd_var_size (union_p2p_demand,
2870 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
2871 struct GNUNET_MessageHeader,
2872 NULL),
2873 GNUNET_MQ_hd_fixed_size (union_p2p_done,
2874 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
2875 struct GNUNET_MessageHeader,
2876 NULL),
2877 GNUNET_MQ_hd_fixed_size (union_p2p_over,
2878 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
2879 struct GNUNET_MessageHeader,
2880 NULL),
2881 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
2882 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
2883 struct GNUNET_MessageHeader,
2884 NULL),
2885 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
2886 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
2887 struct GNUNET_MessageHeader,
2888 NULL),
2889 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
2890 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
2891 struct StrataEstimatorMessage,
2892 NULL),
2893 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
2894 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
2895 struct StrataEstimatorMessage,
2896 NULL),
2897 GNUNET_MQ_hd_var_size (union_p2p_full_element,
2898 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
2899 struct GNUNET_SETU_ElementMessage,
2900 NULL),
2901 GNUNET_MQ_handler_end ()
2902 };
2903 struct Listener *listener;
2904
2905 if (NULL != cs->listener)
2906 {
2907 /* max. one active listener per client! */
2908 GNUNET_break (0);
2909 GNUNET_SERVICE_client_drop (cs->client);
2910 return;
2911 }
2912 listener = GNUNET_new (struct Listener);
2913 listener->cs = cs;
2914 cs->listener = listener;
2915 listener->app_id = msg->app_id;
2916 GNUNET_CONTAINER_DLL_insert (listener_head,
2917 listener_tail,
2918 listener);
2919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2920 "New listener created (port %s)\n",
2921 GNUNET_h2s (&listener->app_id));
2922 listener->open_port = GNUNET_CADET_open_port (cadet,
2923 &msg->app_id,
2924 &channel_new_cb,
2925 listener,
2926 &channel_window_cb,
2927 &channel_end_cb,
2928 cadet_handlers);
2929 GNUNET_SERVICE_client_continue (cs->client);
2930}
2931
2932
2933/**
2934 * Called when the listening client rejects an operation
2935 * request by another peer.
2936 *
2937 * @param cls client that sent the message
2938 * @param msg message sent by the client
2939 */
2940static void
2941handle_client_reject (void *cls,
2942 const struct GNUNET_SETU_RejectMessage *msg)
2943{
2944 struct ClientState *cs = cls;
2945 struct Operation *op;
2946
2947 op = get_incoming (ntohl (msg->accept_reject_id));
2948 if (NULL == op)
2949 {
2950 /* no matching incoming operation for this reject;
2951 could be that the other peer already disconnected... */
2952 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2953 "Client rejected unknown operation %u\n",
2954 (unsigned int) ntohl (msg->accept_reject_id));
2955 GNUNET_SERVICE_client_continue (cs->client);
2956 return;
2957 }
2958 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2959 "Peer request (app %s) rejected by client\n",
2960 GNUNET_h2s (&cs->listener->app_id));
2961 _GSS_operation_destroy2 (op);
2962 GNUNET_SERVICE_client_continue (cs->client);
2963}
2964
2965
2966/**
2967 * Called when a client wants to add or remove an element to a set it inhabits.
2968 *
2969 * @param cls client that sent the message
2970 * @param msg message sent by the client
2971 */
2972static int
2973check_client_set_add (void *cls,
2974 const struct GNUNET_SETU_ElementMessage *msg)
2975{
2976 /* NOTE: Technically, we should probably check with the
2977 block library whether the element we are given is well-formed */
2978 return GNUNET_OK;
2979}
2980
2981
2982/**
2983 * Called when a client wants to add or remove an element to a set it inhabits.
2984 *
2985 * @param cls client that sent the message
2986 * @param msg message sent by the client
2987 */
2988static void
2989handle_client_set_add (void *cls,
2990 const struct GNUNET_SETU_ElementMessage *msg)
2991{
2992 struct ClientState *cs = cls;
2993 struct Set *set;
2994 struct GNUNET_SETU_Element el;
2995 struct ElementEntry *ee;
2996 struct GNUNET_HashCode hash;
2997
2998 if (NULL == (set = cs->set))
2999 {
3000 /* client without a set requested an operation */
3001 GNUNET_break (0);
3002 GNUNET_SERVICE_client_drop (cs->client);
3003 return;
3004 }
3005 GNUNET_SERVICE_client_continue (cs->client);
3006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
3007 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
3008 el.size = ntohs (msg->header.size) - sizeof(*msg);
3009 el.data = &msg[1];
3010 el.element_type = ntohs (msg->element_type);
3011 GNUNET_SETU_element_hash (&el,
3012 &hash);
3013 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
3014 &hash);
3015 if (NULL == ee)
3016 {
3017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3018 "Client inserts element %s of size %u\n",
3019 GNUNET_h2s (&hash),
3020 el.size);
3021 ee = GNUNET_malloc (el.size + sizeof(*ee));
3022 ee->element.size = el.size;
3023 GNUNET_memcpy (&ee[1], el.data, el.size);
3024 ee->element.data = &ee[1];
3025 ee->element.element_type = el.element_type;
3026 ee->remote = GNUNET_NO;
3027 ee->generation = set->current_generation;
3028 ee->element_hash = hash;
3029 GNUNET_break (GNUNET_YES ==
3030 GNUNET_CONTAINER_multihashmap_put (
3031 set->content->elements,
3032 &ee->element_hash,
3033 ee,
3034 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3035 }
3036 else
3037 {
3038 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3039 "Client inserted element %s of size %u twice (ignored)\n",
3040 GNUNET_h2s (&hash),
3041 el.size);
3042 /* same element inserted twice */
3043 return;
3044 }
3045 strata_estimator_insert (set->state->se,
3046 get_ibf_key (&ee->element_hash));
3047}
3048
3049
3050/**
3051 * Advance the current generation of a set,
3052 * adding exclusion ranges if necessary.
3053 *
3054 * @param set the set where we want to advance the generation
3055 */
3056static void
3057advance_generation (struct Set *set)
3058{
3059 set->content->latest_generation++;
3060 set->current_generation++;
3061}
3062
3063
3064/**
3065 * Called when a client wants to initiate a set operation with another
3066 * peer. Initiates the CADET connection to the listener and sends the
3067 * request.
3068 *
3069 * @param cls client that sent the message
3070 * @param msg message sent by the client
3071 * @return #GNUNET_OK if the message is well-formed
3072 */
3073static int
3074check_client_evaluate (void *cls,
3075 const struct GNUNET_SETU_EvaluateMessage *msg)
3076{
3077 /* FIXME: suboptimal, even if the context below could be NULL,
3078 there are malformed messages this does not check for... */
3079 return GNUNET_OK;
3080}
3081
3082
3083/**
3084 * Called when a client wants to initiate a set operation with another
3085 * peer. Initiates the CADET connection to the listener and sends the
3086 * request.
3087 *
3088 * @param cls client that sent the message
3089 * @param msg message sent by the client
3090 */
3091static void
3092handle_client_evaluate (void *cls,
3093 const struct GNUNET_SETU_EvaluateMessage *msg)
3094{
3095 struct ClientState *cs = cls;
3096 struct Operation *op = GNUNET_new (struct Operation);
3097 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3098 GNUNET_MQ_hd_var_size (incoming_msg,
3099 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
3100 struct OperationRequestMessage,
3101 op),
3102 GNUNET_MQ_hd_var_size (union_p2p_ibf,
3103 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
3104 struct IBFMessage,
3105 op),
3106 GNUNET_MQ_hd_var_size (union_p2p_elements,
3107 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
3108 struct GNUNET_SETU_ElementMessage,
3109 op),
3110 GNUNET_MQ_hd_var_size (union_p2p_offer,
3111 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
3112 struct GNUNET_MessageHeader,
3113 op),
3114 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3115 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
3116 struct InquiryMessage,
3117 op),
3118 GNUNET_MQ_hd_var_size (union_p2p_demand,
3119 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
3120 struct GNUNET_MessageHeader,
3121 op),
3122 GNUNET_MQ_hd_fixed_size (union_p2p_done,
3123 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
3124 struct GNUNET_MessageHeader,
3125 op),
3126 GNUNET_MQ_hd_fixed_size (union_p2p_over,
3127 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
3128 struct GNUNET_MessageHeader,
3129 op),
3130 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3131 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
3132 struct GNUNET_MessageHeader,
3133 op),
3134 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3135 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
3136 struct GNUNET_MessageHeader,
3137 op),
3138 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3139 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
3140 struct StrataEstimatorMessage,
3141 op),
3142 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3143 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
3144 struct StrataEstimatorMessage,
3145 op),
3146 GNUNET_MQ_hd_var_size (union_p2p_full_element,
3147 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
3148 struct GNUNET_SETU_ElementMessage,
3149 op),
3150 GNUNET_MQ_handler_end ()
3151 };
3152 struct Set *set;
3153 const struct GNUNET_MessageHeader *context;
3154
3155 if (NULL == (set = cs->set))
3156 {
3157 GNUNET_break (0);
3158 GNUNET_free (op);
3159 GNUNET_SERVICE_client_drop (cs->client);
3160 return;
3161 }
3162 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
3163 UINT32_MAX);
3164 op->peer = msg->target_peer;
3165 op->client_request_id = ntohl (msg->request_id);
3166 op->byzantine = msg->byzantine;
3167 op->byzantine_lower_bound = msg->byzantine_lower_bound;
3168 op->force_full = msg->force_full;
3169 op->force_delta = msg->force_delta;
3170 context = GNUNET_MQ_extract_nested_mh (msg);
3171
3172 /* Advance generation values, so that
3173 mutations won't interfer with the running operation. */
3174 op->set = set;
3175 op->generation_created = set->current_generation;
3176 advance_generation (set);
3177 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3178 set->ops_tail,
3179 op);
3180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3181 "Creating new CADET channel to port %s for set union\n",
3182 GNUNET_h2s (&msg->app_id));
3183 op->channel = GNUNET_CADET_channel_create (cadet,
3184 op,
3185 &msg->target_peer,
3186 &msg->app_id,
3187 &channel_window_cb,
3188 &channel_end_cb,
3189 cadet_handlers);
3190 op->mq = GNUNET_CADET_get_mq (op->channel);
3191 op->state = union_evaluate (op, context);
3192 if (NULL == op->state)
3193 {
3194 GNUNET_break (0);
3195 GNUNET_SERVICE_client_drop (cs->client);
3196 return;
3197 }
3198 GNUNET_SERVICE_client_continue (cs->client);
3199}
3200
3201
3202/**
3203 * Handle a request from the client to cancel a running set operation.
3204 *
3205 * @param cls the client
3206 * @param msg the message
3207 */
3208static void
3209handle_client_cancel (void *cls,
3210 const struct GNUNET_SETU_CancelMessage *msg)
3211{
3212 struct ClientState *cs = cls;
3213 struct Set *set;
3214 struct Operation *op;
3215 int found;
3216
3217 if (NULL == (set = cs->set))
3218 {
3219 /* client without a set requested an operation */
3220 GNUNET_break (0);
3221 GNUNET_SERVICE_client_drop (cs->client);
3222 return;
3223 }
3224 found = GNUNET_NO;
3225 for (op = set->ops_head; NULL != op; op = op->next)
3226 {
3227 if (op->client_request_id == ntohl (msg->request_id))
3228 {
3229 found = GNUNET_YES;
3230 break;
3231 }
3232 }
3233 if (GNUNET_NO == found)
3234 {
3235 /* It may happen that the operation was already destroyed due to
3236 * the other peer disconnecting. The client may not know about this
3237 * yet and try to cancel the (just barely non-existent) operation.
3238 * So this is not a hard error.
3239 *///
3240 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3241 "Client canceled non-existent op %u\n",
3242 (uint32_t) ntohl (msg->request_id));
3243 }
3244 else
3245 {
3246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3247 "Client requested cancel for op %u\n",
3248 (uint32_t) ntohl (msg->request_id));
3249 _GSS_operation_destroy (op);
3250 }
3251 GNUNET_SERVICE_client_continue (cs->client);
3252}
3253
3254
3255/**
3256 * Handle a request from the client to accept a set operation that
3257 * came from a remote peer. We forward the accept to the associated
3258 * operation for handling
3259 *
3260 * @param cls the client
3261 * @param msg the message
3262 */
3263static void
3264handle_client_accept (void *cls,
3265 const struct GNUNET_SETU_AcceptMessage *msg)
3266{
3267 struct ClientState *cs = cls;
3268 struct Set *set;
3269 struct Operation *op;
3270 struct GNUNET_SETU_ResultMessage *result_message;
3271 struct GNUNET_MQ_Envelope *ev;
3272 struct Listener *listener;
3273
3274 if (NULL == (set = cs->set))
3275 {
3276 /* client without a set requested to accept */
3277 GNUNET_break (0);
3278 GNUNET_SERVICE_client_drop (cs->client);
3279 return;
3280 }
3281 op = get_incoming (ntohl (msg->accept_reject_id));
3282 if (NULL == op)
3283 {
3284 /* It is not an error if the set op does not exist -- it may
3285 * have been destroyed when the partner peer disconnected. */
3286 GNUNET_log (
3287 GNUNET_ERROR_TYPE_INFO,
3288 "Client %p accepted request %u of listener %p that is no longer active\n",
3289 cs,
3290 ntohl (msg->accept_reject_id),
3291 cs->listener);
3292 ev = GNUNET_MQ_msg (result_message,
3293 GNUNET_MESSAGE_TYPE_SET_RESULT);
3294 result_message->request_id = msg->request_id;
3295 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
3296 GNUNET_MQ_send (set->cs->mq, ev);
3297 GNUNET_SERVICE_client_continue (cs->client);
3298 return;
3299 }
3300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3301 "Client accepting request %u\n",
3302 (uint32_t) ntohl (msg->accept_reject_id));
3303 listener = op->listener;
3304 op->listener = NULL;
3305 GNUNET_CONTAINER_DLL_remove (listener->op_head,
3306 listener->op_tail,
3307 op);
3308 op->set = set;
3309 GNUNET_CONTAINER_DLL_insert (set->ops_head,
3310 set->ops_tail,
3311 op);
3312 op->client_request_id = ntohl (msg->request_id);
3313 op->byzantine = msg->byzantine;
3314 op->byzantine_lower_bound = msg->byzantine_lower_bound;
3315 op->force_full = msg->force_full;
3316 op->force_delta = msg->force_delta;
3317
3318 /* Advance generation values, so that future mutations do not
3319 interfer with the running operation. */
3320 op->generation_created = set->current_generation;
3321 advance_generation (set);
3322 GNUNET_assert (NULL == op->state);
3323
3324 LOG (GNUNET_ERROR_TYPE_DEBUG,
3325 "accepting set union operation\n");
3326 GNUNET_STATISTICS_update (_GSS_statistics,
3327 "# of accepted union operations",
3328 1,
3329 GNUNET_NO);
3330 GNUNET_STATISTICS_update (_GSS_statistics,
3331 "# of total union operations",
3332 1,
3333 GNUNET_NO);
3334 {
3335 struct OperationState *state;
3336 const struct StrataEstimator *se;
3337 struct GNUNET_MQ_Envelope *ev;
3338 struct StrataEstimatorMessage *strata_msg;
3339 char *buf;
3340 size_t len;
3341 uint16_t type;
3342
3343 state = GNUNET_new (struct OperationState); // FIXME: merge with 'op' to avoid malloc!
3344 state->se = strata_estimator_dup (op->set->state->se);
3345 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
3346 GNUNET_NO);
3347 state->salt_receive = state->salt_send = 42; // FIXME?????
3348 op->state = state;
3349 initialize_key_to_element (op);
3350 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (
3351 state->key_to_element);
3352
3353 /* kick off the operation */
3354 se = state->se;
3355 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3356 len = strata_estimator_write (se,
3357 buf);
3358 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
3359 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
3360 else
3361 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
3362 ev = GNUNET_MQ_msg_extra (strata_msg,
3363 len,
3364 type);
3365 GNUNET_memcpy (&strata_msg[1],
3366 buf,
3367 len);
3368 GNUNET_free (buf);
3369 strata_msg->set_size
3370 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
3371 op->set->content->elements));
3372 GNUNET_MQ_send (op->mq,
3373 ev);
3374 state->phase = PHASE_EXPECT_IBF;
3375
3376 op->state = state;
3377 }
3378 /* Now allow CADET to continue, as we did not do this in
3379 #handle_incoming_msg (as we wanted to first see if the
3380 local client would accept the request). */
3381 GNUNET_CADET_receive_done (op->channel);
3382 GNUNET_SERVICE_client_continue (cs->client);
3383}
3384
3385
3386/**
3387 * Called to clean up, after a shutdown has been requested.
3388 *
3389 * @param cls closure, NULL
3390 */
3391static void
3392shutdown_task (void *cls)
3393{
3394 /* Delay actual shutdown to allow service to disconnect clients */
3395 in_shutdown = GNUNET_YES;
3396 if (0 == num_clients)
3397 {
3398 if (NULL != cadet)
3399 {
3400 GNUNET_CADET_disconnect (cadet);
3401 cadet = NULL;
3402 }
3403 }
3404 GNUNET_STATISTICS_destroy (_GSS_statistics,
3405 GNUNET_YES);
3406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3407 "handled shutdown request\n");
3408}
3409
3410
3411/**
3412 * Function called by the service's run
3413 * method to run service-specific setup code.
3414 *
3415 * @param cls closure
3416 * @param cfg configuration to use
3417 * @param service the initialized service
3418 */
3419static void
3420run (void *cls,
3421 const struct GNUNET_CONFIGURATION_Handle *cfg,
3422 struct GNUNET_SERVICE_Handle *service)
3423{
3424 /* FIXME: need to modify SERVICE (!) API to allow
3425 us to run a shutdown task *after* clients were
3426 forcefully disconnected! */
3427 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3428 NULL);
3429 _GSS_statistics = GNUNET_STATISTICS_create ("setu", cfg);
3430 cadet = GNUNET_CADET_connect (cfg);
3431 if (NULL == cadet)
3432 {
3433 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3434 _ ("Could not connect to CADET service\n"));
3435 GNUNET_SCHEDULER_shutdown ();
3436 return;
3437 }
3438}
3439
3440
3441/**
3442 * Define "main" method using service macro.
3443 */
3444GNUNET_SERVICE_MAIN (
3445 "set",
3446 GNUNET_SERVICE_OPTION_NONE,
3447 &run,
3448 &client_connect_cb,
3449 &client_disconnect_cb,
3450 NULL,
3451 GNUNET_MQ_hd_fixed_size (client_accept,
3452 GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
3453 struct GNUNET_SETU_AcceptMessage,
3454 NULL),
3455 GNUNET_MQ_hd_var_size (client_set_add,
3456 GNUNET_MESSAGE_TYPE_SETU_ADD,
3457 struct GNUNET_SETU_ElementMessage,
3458 NULL),
3459 GNUNET_MQ_hd_fixed_size (client_create_set,
3460 GNUNET_MESSAGE_TYPE_SETU_CREATE,
3461 struct GNUNET_SETU_CreateMessage,
3462 NULL),
3463 GNUNET_MQ_hd_var_size (client_evaluate,
3464 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
3465 struct GNUNET_SETU_EvaluateMessage,
3466 NULL),
3467 GNUNET_MQ_hd_fixed_size (client_listen,
3468 GNUNET_MESSAGE_TYPE_SETU_LISTEN,
3469 struct GNUNET_SETU_ListenMessage,
3470 NULL),
3471 GNUNET_MQ_hd_fixed_size (client_reject,
3472 GNUNET_MESSAGE_TYPE_SETU_REJECT,
3473 struct GNUNET_SETU_RejectMessage,
3474 NULL),
3475 GNUNET_MQ_hd_fixed_size (client_cancel,
3476 GNUNET_MESSAGE_TYPE_SETU_CANCEL,
3477 struct GNUNET_SETU_CancelMessage,
3478 NULL),
3479 GNUNET_MQ_handler_end ());
3480
3481
3482/* end of gnunet-service-setu.c */