aboutsummaryrefslogtreecommitdiff
path: root/src/setu/gnunet-service-setu.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/setu/gnunet-service-setu.c')
-rw-r--r--src/setu/gnunet-service-setu.c5437
1 files changed, 0 insertions, 5437 deletions
diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
deleted file mode 100644
index 339d347f8..000000000
--- a/src/setu/gnunet-service-setu.c
+++ /dev/null
@@ -1,5437 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file setu/gnunet-service-setu.c
22 * @brief set union operation
23 * @author Florian Dold
24 * @author Christian Grothoff
25 * @author Elias Summermatter
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_statistics_service.h"
30#include "ibf.h"
31#include "gnunet_protocols.h"
32#include "gnunet_applications.h"
33#include "gnunet_cadet_service.h"
34#include "gnunet-service-setu_strata_estimator.h"
35#include "gnunet-service-setu_protocol.h"
36#include "gnunet_statistics_service.h"
37#include <gcrypt.h>
38#include "gnunet_setu_service.h"
39#include "setu.h"
40
41#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
42
43/**
44 * How long do we hold on to an incoming channel if there is
45 * no local listener before giving up?
46 */
47#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
48
49/**
50 * Number of IBFs in a strata estimator.
51 */
52#define SE_STRATA_COUNT 32
53
54
55/**
56 * Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348
57 * Based on the bsc thesis of Elias Summermatter (2021)
58 */
59#define SE_IBFS_TOTAL_SIZE 632
60
61/**
62 * The hash num parameter for the difference digests and strata estimators.
63 */
64#define SE_IBF_HASH_NUM 3
65
66/**
67 * Number of buckets that can be transmitted in one message.
68 */
69#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
70
71/**
72 * The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
73 * Choose this value so that computing the IBF is still cheaper
74 * than transmitting all values.
75 */
76#define MAX_IBF_SIZE 1048576
77
78
79/**
80 * Minimal size of an ibf
81 * Based on the bsc thesis of Elias Summermatter (2021)
82 */
83#define IBF_MIN_SIZE 37
84
85/**
86 * AVG RTT for differential sync when k=2 and Factor = 2
87 * Based on the bsc thesis of Elias Summermatter (2021)
88 */
89#define DIFFERENTIAL_RTT_MEAN 3.65145
90
91/**
92 * Security level used for byzantine checks (2^80)
93 */
94
95#define SECURITY_LEVEL 80
96
97/**
98 * Is the estimated probability for a new round this values
99 * is based on the bsc thesis of Elias Summermatter (2021)
100 */
101
102#define PROBABILITY_FOR_NEW_ROUND 0.15
103
104/**
105 * Measure the performance in a csv
106 */
107
108#define MEASURE_PERFORMANCE 0
109
110
111/**
112 * Current phase we are in for a union operation.
113 */
114enum UnionOperationPhase
115{
116 /**
117 * We sent the request message, and expect a strata estimator.
118 */
119 PHASE_EXPECT_SE,
120
121 /**
122 * We sent the strata estimator, and expect an IBF. This phase is entered once
123 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
124 *
125 * XXX: could use better wording.
126 * XXX: repurposed to also expect a "request full set" message, should be renamed
127 *
128 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
129 */
130 PHASE_EXPECT_IBF,
131
132 /**
133 * Continuation for multi part IBFs.
134 */
135 PHASE_EXPECT_IBF_LAST,
136
137 /**
138 * We are decoding an IBF.
139 */
140 PHASE_ACTIVE_DECODING,
141
142 /**
143 * The other peer is decoding the IBF we just sent.
144 */
145 PHASE_PASSIVE_DECODING,
146
147 /**
148 * The protocol is almost finished, but we still have to flush our message
149 * queue and/or expect some elements.
150 */
151 PHASE_FINISH_CLOSING,
152
153 /**
154 * In the penultimate phase, we wait until all our demands are satisfied.
155 * Then we send a done message, and wait for another done message.
156 */
157 PHASE_FINISH_WAITING,
158
159 /**
160 * In the ultimate phase, we wait until our demands are satisfied and then
161 * quit (sending another DONE message).
162 */
163 PHASE_FINISHED,
164
165 /**
166 * After sending the full set, wait for responses with the elements
167 * that the local peer is missing.
168 */
169 PHASE_FULL_SENDING,
170
171 /**
172 * Phase that receives full set first and then sends elements that are
173 * the local peer missing
174 */
175 PHASE_FULL_RECEIVING
176};
177
178/**
179 * Different modes of operations
180 */
181
182enum MODE_OF_OPERATION
183{
184 /**
185 * Mode just synchronizes the difference between sets
186 */
187 DIFFERENTIAL_SYNC,
188
189 /**
190 * Mode send full set sending local set first
191 */
192 FULL_SYNC_LOCAL_SENDING_FIRST,
193
194 /**
195 * Mode request full set from remote peer
196 */
197 FULL_SYNC_REMOTE_SENDING_FIRST
198};
199
200
201/**
202 * Information about an element element in the set. All elements are
203 * stored in a hash-table from their hash-code to their `struct
204 * Element`, so that the remove and add operations are reasonably
205 * fast.
206 */
207struct ElementEntry
208{
209 /**
210 * The actual element. The data for the element
211 * should be allocated at the end of this struct.
212 */
213 struct GNUNET_SETU_Element element;
214
215 /**
216 * Hash of the element. For set union: Will be used to derive the
217 * different IBF keys for different salts.
218 */
219 struct GNUNET_HashCode element_hash;
220
221 /**
222 * First generation that includes this element.
223 */
224 unsigned int generation;
225
226 /**
227 * #GNUNET_YES if the element is a remote element, and does not belong
228 * to the operation's set.
229 */
230 int remote;
231};
232
233
234/**
235 * A listener is inhabited by a client, and waits for evaluation
236 * requests from remote peers.
237 */
238struct Listener;
239
240
241/**
242 * A set that supports a specific operation with other peers.
243 */
244struct Set;
245
246
247/**
248 * State we keep per client.
249 */
250struct ClientState
251{
252 /**
253 * Set, if associated with the client, otherwise NULL.
254 */
255 struct Set *set;
256
257 /**
258 * Listener, if associated with the client, otherwise NULL.
259 */
260 struct Listener *listener;
261
262 /**
263 * Client handle.
264 */
265 struct GNUNET_SERVICE_Client *client;
266
267 /**
268 * Message queue.
269 */
270 struct GNUNET_MQ_Handle *mq;
271};
272
273
274/**
275 * Operation context used to execute a set operation.
276 */
277struct Operation
278{
279
280 /**
281 * The identity of the requesting peer. Needs to
282 * be stored here as the op spec might not have been created yet.
283 */
284 struct GNUNET_PeerIdentity peer;
285
286 /**
287 * Initial size of our set, just before the operation started.
288 */
289 uint64_t initial_size;
290
291 /**
292 * Kept in a DLL of the listener, if @e listener is non-NULL.
293 */
294 struct Operation *next;
295
296 /**
297 * Kept in a DLL of the listener, if @e listener is non-NULL.
298 */
299 struct Operation *prev;
300
301 /**
302 * Channel to the peer.
303 */
304 struct GNUNET_CADET_Channel *channel;
305
306 /**
307 * Port this operation runs on.
308 */
309 struct Listener *listener;
310
311 /**
312 * Message queue for the channel.
313 */
314 struct GNUNET_MQ_Handle *mq;
315
316 /**
317 * Context message, may be NULL.
318 */
319 struct GNUNET_MessageHeader *context_msg;
320
321 /**
322 * Set associated with the operation, NULL until the spec has been
323 * associated with a set.
324 */
325 struct Set *set;
326
327 /**
328 * Copy of the set's strata estimator at the time of
329 * creation of this operation.
330 */
331 struct MultiStrataEstimator *se;
332
333 /**
334 * The IBF we currently receive.
335 */
336 struct InvertibleBloomFilter *remote_ibf;
337
338 /**
339 * The IBF with the local set's element.
340 */
341 struct InvertibleBloomFilter *local_ibf;
342
343 /**
344 * Maps unsalted IBF-Keys to elements.
345 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
346 * Colliding IBF-Keys are linked.
347 */
348 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
349
350 /**
351 * Timeout task, if the incoming peer has not been accepted
352 * after the timeout, it will be disconnected.
353 */
354 struct GNUNET_SCHEDULER_Task *timeout_task;
355
356 /**
357 * Hashes for elements that we have demanded from the other peer.
358 */
359 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
360
361 /**
362 * Current state of the operation.
363 */
364 enum UnionOperationPhase phase;
365
366 /**
367 * Did we send the client that we are done?
368 */
369 int client_done_sent;
370
371 /**
372 * Number of ibf buckets already received into the @a remote_ibf.
373 */
374 uint64_t ibf_buckets_received;
375
376 /**
377 * Salt that we're using for sending IBFs
378 */
379 uint32_t salt_send;
380
381 /**
382 * Salt for the IBF we've received and that we're currently decoding.
383 */
384 uint32_t salt_receive;
385
386 /**
387 * Number of elements we received from the other peer
388 * that were not in the local set yet.
389 */
390 uint32_t received_fresh;
391
392 /**
393 * Total number of elements received from the other peer.
394 */
395 uint32_t received_total;
396
397 /**
398 * Salt to use for the operation.
399 */
400 uint32_t salt;
401
402 /**
403 * Remote peers element count
404 */
405 uint32_t remote_element_count;
406
407 /**
408 * ID used to identify an operation between service and client
409 */
410 uint32_t client_request_id;
411
412 /**
413 * Always use delta operation instead of sending full sets,
414 * even it it's less efficient.
415 */
416 int force_delta;
417
418 /**
419 * Always send full sets, even if delta operations would
420 * be more efficient.
421 */
422 int force_full;
423
424 /**
425 * #GNUNET_YES to fail operations where Byzantine faults
426 * are suspected
427 */
428 int byzantine;
429
430 /**
431 * #GNUNET_YES to also send back set elements we are sending to
432 * the remote peer.
433 */
434 int symmetric;
435
436 /**
437 * Lower bound for the set size, used only when
438 * byzantine mode is enabled.
439 */
440 uint64_t byzantine_lower_bound;
441
442 /**
443 * Unique request id for the request from a remote peer, sent to the
444 * client, which will accept or reject the request. Set to '0' iff
445 * the request has not been suggested yet.
446 */
447 uint32_t suggest_id;
448
449 /**
450 * Generation in which the operation handle
451 * was created.
452 */
453 unsigned int generation_created;
454
455
456 /**
457 * User defined Bandwidth Round Trips Tradeoff
458 */
459 uint64_t rtt_bandwidth_tradeoff;
460
461
462 /**
463 * Number of Element per bucket in IBF
464 */
465 uint8_t ibf_number_buckets_per_element;
466
467
468 /**
469 * Set difference is multiplied with this factor
470 * to gennerate large enough IBF
471 */
472 uint8_t ibf_bucket_number_factor;
473
474 /**
475 * Defines which site a client is
476 * 0 = Initiating peer
477 * 1 = Receiving peer
478 */
479 uint8_t peer_site;
480
481
482 /**
483 * Local peer element count
484 */
485 uint64_t local_element_count;
486
487 /**
488 * Mode of operation that was chosen by the algorithm
489 */
490 uint8_t mode_of_operation;
491
492 /**
493 * Hashmap to keep track of the send/received messages
494 */
495 struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
496
497 /**
498 * Hashmap to keep track of the send/received inquiries (ibf keys)
499 */
500 struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
501
502
503 /**
504 * Total size of local set
505 */
506 uint64_t total_elements_size_local;
507
508 /**
509 * Limit of number of elements in set
510 */
511 uint64_t byzantine_upper_bound;
512
513 /**
514 * is the count of already passed differential sync iterations
515 */
516 uint8_t differential_sync_iterations;
517
518 /**
519 * Estimated or committed set difference at the start
520 */
521 uint64_t remote_set_diff;
522
523 /**
524 * Estimated or committed set difference at the start
525 */
526 uint64_t local_set_diff;
527
528 /**
529 * Boolean to enforce an active passive switch
530 */
531 bool active_passive_switch_required;
532};
533
534
535/**
536 * SetContent stores the actual set elements, which may be shared by
537 * multiple generations derived from one set.
538 */
539struct SetContent
540{
541 /**
542 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
543 */
544 struct GNUNET_CONTAINER_MultiHashMap *elements;
545
546 /**
547 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *` randomized.
548 */
549 struct GNUNET_CONTAINER_MultiHashMap *elements_randomized;
550
551 /**
552 * Salt to construct the randomized element map
553 */
554 uint64_t elements_randomized_salt;
555
556 /**
557 * Number of references to the content.
558 */
559 unsigned int refcount;
560
561 /**
562 * FIXME: document!
563 */
564 unsigned int latest_generation;
565
566 /**
567 * Number of concurrently active iterators.
568 */
569 int iterator_count;
570};
571
572
573/**
574 * A set that supports a specific operation with other peers.
575 */
576struct Set
577{
578 /**
579 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
580 */
581 struct Set *next;
582
583 /**
584 * Sets are held in a doubly linked list.
585 */
586 struct Set *prev;
587
588 /**
589 * Client that owns the set. Only one client may own a set,
590 * and there can only be one set per client.
591 */
592 struct ClientState *cs;
593
594 /**
595 * Content, possibly shared by multiple sets,
596 * and thus reference counted.
597 */
598 struct SetContent *content;
599
600 /**
601 * The strata estimator is only generated once for each set. The IBF keys
602 * are derived from the element hashes with salt=0.
603 */
604 struct MultiStrataEstimator *se;
605
606 /**
607 * Evaluate operations are held in a linked list.
608 */
609 struct Operation *ops_head;
610
611 /**
612 * Evaluate operations are held in a linked list.
613 */
614 struct Operation *ops_tail;
615
616 /**
617 * Current generation, that is, number of previously executed
618 * operations and lazy copies on the underlying set content.
619 */
620 unsigned int current_generation;
621
622};
623
624
625/**
626 * The key entry is used to associate an ibf key with an element.
627 */
628struct KeyEntry
629{
630 /**
631 * IBF key for the entry, derived from the current salt.
632 */
633 struct IBF_Key ibf_key;
634
635 /**
636 * The actual element associated with the key.
637 *
638 * Only owned by the union operation if element->operation
639 * is #GNUNET_YES.
640 */
641 struct ElementEntry *element;
642
643 /**
644 * Did we receive this element? Even if element->is_foreign is false, we
645 * might have received the element, so this indicates that the other peer
646 * has it.
647 */
648 int received;
649};
650
651
652/**
653 * Used as a closure for sending elements
654 * with a specific IBF key.
655 */
656struct SendElementClosure
657{
658 /**
659 * The IBF key whose matching elements should be
660 * sent.
661 */
662 struct IBF_Key ibf_key;
663
664 /**
665 * Operation for which the elements
666 * should be sent.
667 */
668 struct Operation *op;
669};
670
671
672/**
673 * A listener is inhabited by a client, and waits for evaluation
674 * requests from remote peers.
675 */
676struct Listener
677{
678 /**
679 * Listeners are held in a doubly linked list.
680 */
681 struct Listener *next;
682
683 /**
684 * Listeners are held in a doubly linked list.
685 */
686 struct Listener *prev;
687
688 /**
689 * Head of DLL of operations this listener is responsible for.
690 * Once the client has accepted/declined the operation, the
691 * operation is moved to the respective set's operation DLLS.
692 */
693 struct Operation *op_head;
694
695 /**
696 * Tail of DLL of operations this listener is responsible for.
697 * Once the client has accepted/declined the operation, the
698 * operation is moved to the respective set's operation DLLS.
699 */
700 struct Operation *op_tail;
701
702 /**
703 * Client that owns the listener.
704 * Only one client may own a listener.
705 */
706 struct ClientState *cs;
707
708 /**
709 * The port we are listening on with CADET.
710 */
711 struct GNUNET_CADET_Port *open_port;
712
713 /**
714 * Application ID for the operation, used to distinguish
715 * multiple operations of the same type with the same peer.
716 */
717 struct GNUNET_HashCode app_id;
718
719};
720
721
722/**
723 * Handle to the cadet service, used to listen for and connect to
724 * remote peers.
725 */
726static struct GNUNET_CADET_Handle *cadet;
727
728/**
729 * Statistics handle.
730 */
731static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
732
733/**
734 * Listeners are held in a doubly linked list.
735 */
736static struct Listener *listener_head;
737
738/**
739 * Listeners are held in a doubly linked list.
740 */
741static struct Listener *listener_tail;
742
743/**
744 * Number of active clients.
745 */
746static unsigned int num_clients;
747
748/**
749 * Are we in shutdown? if #GNUNET_YES and the number of clients
750 * drops to zero, disconnect from CADET.
751 */
752static int in_shutdown;
753
754/**
755 * Counter for allocating unique IDs for clients, used to identify incoming
756 * operation requests from remote peers, that the client can choose to accept
757 * or refuse. 0 must not be used (reserved for uninitialized).
758 */
759static uint32_t suggest_id;
760
761#if MEASURE_PERFORMANCE
762/**
763 * Handles configuration file for setu performance test
764 *
765 */
766static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767
768
769/**
770 * Stores the performance data for induvidual message
771 */
772
773
774struct perf_num_send_received_msg
775{
776 uint64_t sent;
777 uint64_t sent_var_bytes;
778 uint64_t received;
779 uint64_t received_var_bytes;
780};
781
782/**
783 * Main struct to measure performance (data/rtts)
784 */
785struct per_store_struct
786{
787 struct perf_num_send_received_msg operation_request;
788 struct perf_num_send_received_msg se;
789 struct perf_num_send_received_msg request_full;
790 struct perf_num_send_received_msg element_full;
791 struct perf_num_send_received_msg full_done;
792 struct perf_num_send_received_msg ibf;
793 struct perf_num_send_received_msg inquery;
794 struct perf_num_send_received_msg element;
795 struct perf_num_send_received_msg demand;
796 struct perf_num_send_received_msg offer;
797 struct perf_num_send_received_msg done;
798 struct perf_num_send_received_msg over;
799 uint64_t se_diff;
800 uint64_t se_diff_remote;
801 uint64_t se_diff_local;
802 uint64_t active_passive_switches;
803 uint8_t mode_of_operation;
804};
805
806struct per_store_struct perf_store;
807#endif
808
809/**
810 * Different states to control the messages flow in differential mode
811 */
812
813enum MESSAGE_CONTROL_FLOW_STATE
814{
815 /**
816 * Initial message state
817 */
818 MSG_CFS_UNINITIALIZED,
819
820 /**
821 * Track that a message has been sent
822 */
823 MSG_CFS_SENT,
824
825 /**
826 * Track that receiving this message is expected
827 */
828 MSG_CFS_EXPECTED,
829
830 /**
831 * Track that message has been received
832 */
833 MSG_CFS_RECEIVED,
834};
835
836/**
837 * Message types to track in message control flow
838 */
839
840enum MESSAGE_TYPE
841{
842 /**
843 * Offer message type
844 */
845 OFFER_MESSAGE,
846
847 /**
848 * Demand message type
849 */
850 DEMAND_MESSAGE,
851
852 /**
853 * Element message type
854 */
855 ELEMENT_MESSAGE,
856};
857
858
859/**
860 * Struct to tracked messages in message control flow
861 */
862struct messageControlFlowElement
863{
864 /**
865 * Track the message control state of the offer message
866 */
867 enum MESSAGE_CONTROL_FLOW_STATE offer;
868 /**
869 * Track the message control state of the demand message
870 */
871 enum MESSAGE_CONTROL_FLOW_STATE demand;
872 /**
873 * Track the message control state of the element message
874 */
875 enum MESSAGE_CONTROL_FLOW_STATE element;
876};
877
878
879#if MEASURE_PERFORMANCE
880
881/**
882 * Loads different configuration to perform performance tests
883 *
884 * @param op operation handle
885 */
886static void
887load_config (struct Operation *op)
888{
889 long long number;
890 float fl;
891
892 setu_cfg = GNUNET_CONFIGURATION_create ();
893 GNUNET_CONFIGURATION_load (setu_cfg,
894 "perf_setu.conf");
895 GNUNET_CONFIGURATION_get_value_float (setu_cfg,
896 "IBF",
897 "BUCKET_NUMBER_FACTOR",
898 &fl);
899 op->ibf_bucket_number_factor = fl;
900 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
901 "IBF",
902 "NUMBER_PER_BUCKET",
903 &number);
904 op->ibf_number_buckets_per_element = number;
905 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
906 "PERFORMANCE",
907 "TRADEOFF",
908 &number);
909 op->rtt_bandwidth_tradeoff = number;
910 GNUNET_CONFIGURATION_get_value_number (setu_cfg,
911 "BOUNDARIES",
912 "UPPER_ELEMENT",
913 &number);
914 op->byzantine_upper_bound = number;
915 op->peer_site = 0;
916}
917
918
919/**
920 * Function to calculate total bytes used for performance measurement
921 * @param size
922 * @param perf_num_send_received_msg
923 * @return bytes used
924 */
925static int
926sum_sent_received_bytes (uint64_t size,
927 struct perf_num_send_received_msg
928 perf_num_send_received_msg)
929{
930 return (size * perf_num_send_received_msg.sent)
931 + (size * perf_num_send_received_msg.received)
932 + perf_num_send_received_msg.sent_var_bytes
933 + perf_num_send_received_msg.received_var_bytes;
934}
935
936
937/**
938 * Function that calculates the perfmance values and writes them down
939 */
940static void
941calculate_perf_store ()
942{
943
944 /**
945 * Calculate RTT of init phase normally always 1
946 */
947 float rtt = 1;
948 int bytes_transmitted = 0;
949
950 /**
951 * Calculate RGNUNET_SETU_AcceptMessageRT of Fullsync normally 1 or 1.5 depending
952 */
953 if ((perf_store.element_full.received != 0) ||
954 (perf_store.element_full.sent != 0)
955 )
956 rtt += 1;
957
958 if ((perf_store.request_full.received != 0) ||
959 (perf_store.request_full.sent != 0)
960 )
961 rtt += 0.5;
962
963 /**
964 * In case of a differential sync 3 rtt's are needed.
965 * for every active/passive switch additional 3.5 rtt's are used
966 */
967 if ((perf_store.element.received != 0) ||
968 (perf_store.element.sent != 0))
969 {
970 int iterations = perf_store.active_passive_switches;
971
972 if (iterations > 0)
973 rtt += iterations * 0.5;
974 rtt += 2.5;
975 }
976
977
978 /**
979 * Calculate data sended size
980 */
981 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
982 GNUNET_SETU_ResultMessage),
983 perf_store.request_full);
984
985 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
986 GNUNET_SETU_ElementMessage),
987 perf_store.element_full);
988 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
989 GNUNET_SETU_ElementMessage),
990 perf_store.element);
991 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
992 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
993 StrataEstimatorMessage),
994 perf_store.se);
995 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
996 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
997 perf_store.ibf);
998 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
999 perf_store.inquery);
1000 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1001 GNUNET_MessageHeader),
1002 perf_store.demand);
1003 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1004 GNUNET_MessageHeader),
1005 perf_store.offer);
1006 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1007
1008 /**
1009 * Write IBF failure rate for different BUCKET_NUMBER_FACTOR
1010 */
1011 float factor;
1012 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1013 &factor);
1014 long long num_per_bucket;
1015 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1016 &num_per_bucket);
1017
1018
1019 int decoded = 0;
1020 if (perf_store.active_passive_switches == 0)
1021 decoded = 1;
1022 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1023 IBFMessage),
1024 perf_store.ibf);
1025
1026 FILE *out1 = fopen ("perf_data.csv", "a");
1027 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1028 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1029 bytes_transmitted,
1030 perf_store.se_diff_local,perf_store.se_diff_remote,
1031 perf_store.mode_of_operation);
1032 fclose (out1);
1033
1034}
1035
1036
1037#endif
1038/**
1039 * Function that chooses the optimal mode of operation depending on
1040 * operation parameters.
1041 * @param avg_element_size
1042 * @param local_set_size
1043 * @param remote_set_size
1044 * @param est_set_diff_remote
1045 * @param est_set_diff_local
1046 * @param bandwith_latency_tradeoff
1047 * @param ibf_bucket_number_factor
1048 * @return calcuated mode of operation
1049 */
1050static uint8_t
1051estimate_best_mode_of_operation (uint64_t avg_element_size,
1052 uint64_t local_set_size,
1053 uint64_t remote_set_size,
1054 uint64_t est_set_diff_remote,
1055 uint64_t est_set_diff_local,
1056 uint64_t bandwith_latency_tradeoff,
1057 uint64_t ibf_bucket_number_factor)
1058{
1059
1060 /*
1061 * In case of initial sync fall to predefined states
1062 */
1063
1064 if (0 == local_set_size)
1065 return FULL_SYNC_REMOTE_SENDING_FIRST;
1066 if (0 == remote_set_size)
1067 return FULL_SYNC_LOCAL_SENDING_FIRST;
1068
1069 /*
1070 * Calculate bytes for full Sync
1071 */
1072
1073 uint8_t sizeof_full_done_header = 4;
1074 uint8_t sizeof_done_header = 4;
1075 uint8_t rtt_min_full = 2;
1076 uint8_t sizeof_request_full = 4;
1077 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1078
1079 /* Estimate byte required if we send first */
1080 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1081 + local_set_size;
1082
1083 uint64_t total_bytes_full_local_send_first = (avg_element_size
1084 *
1085 total_elements_to_send_local_send_first) \
1086 + (
1087 total_elements_to_send_local_send_first * sizeof(struct
1088 GNUNET_SETU_ElementMessage)) \
1089 + (sizeof_full_done_header * 2) \
1090 + rtt_min_full
1091 * bandwith_latency_tradeoff;
1092
1093 /* Estimate bytes required if we request from remote peer */
1094 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095 + remote_set_size;
1096
1097 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098 *
1099 total_elements_to_send_remote_send_first) \
1100 + (
1101 total_elements_to_send_remote_send_first * sizeof(struct
1102 GNUNET_SETU_ElementMessage)) \
1103 + (sizeof_full_done_header * 2) \
1104 + (rtt_min_full + 0.5)
1105 * bandwith_latency_tradeoff \
1106 + sizeof_request_full;
1107
1108 /*
1109 * Calculate bytes for differential Sync
1110 */
1111
1112 /* Estimate bytes required by IBF transmission*/
1113
1114 long double ibf_bucket_count = estimated_total_diff
1115 * ibf_bucket_number_factor;
1116
1117 if (ibf_bucket_count <= IBF_MIN_SIZE)
1118 {
1119 ibf_bucket_count = IBF_MIN_SIZE;
1120 }
1121 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1122 / ((float) MAX_BUCKETS_PER_MESSAGE));
1123
1124 uint64_t estimated_counter_size = ceil (
1125 MIN (2 * log2l (((float) local_set_size)
1126 / ((float) ibf_bucket_count)),
1127 log2l (local_set_size)));
1128
1129 long double counter_bytes = (float) estimated_counter_size / 8;
1130
1131 uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count)
1132 * 1.2 \
1133 + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1134 + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1135 * 1.2 \
1136 + (ibf_bucket_count * counter_bytes) * 1.2);
1137
1138 /* Estimate full byte count for differential sync */
1139 uint64_t element_size = (avg_element_size
1140 + sizeof (struct GNUNET_SETU_ElementMessage)) \
1141 * estimated_total_diff;
1142 uint64_t done_size = sizeof_done_header;
1143 uint64_t inquery_size = (sizeof (struct IBF_Key)
1144 + sizeof (struct InquiryMessage))
1145 * estimated_total_diff;
1146 uint64_t demand_size =
1147 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1148 * estimated_total_diff;
1149 uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1150 + sizeof (struct GNUNET_MessageHeader))
1151 * estimated_total_diff;
1152
1153 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1154 + demand_size + offer_size + ibf_bytes) \
1155 + (DIFFERENTIAL_RTT_MEAN
1156 * bandwith_latency_tradeoff);
1157
1158 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1159 total_bytes_full_remote_send_first);
1160
1161 /* Decide between full and differential sync */
1162
1163 if (full_min < total_bytes_diff)
1164 {
1165 /* Decide between sending all element first or receiving all elements */
1166 if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1167 {
1168 return FULL_SYNC_LOCAL_SENDING_FIRST;
1169 }
1170 else
1171 {
1172 return FULL_SYNC_REMOTE_SENDING_FIRST;
1173 }
1174 }
1175 else
1176 {
1177 return DIFFERENTIAL_SYNC;
1178 }
1179}
1180
1181
1182/**
1183 * Validates the if a message is received in a correct phase
1184 * @param allowed_phases
1185 * @param size_phases
1186 * @param op
1187 * @return #GNUNET_YES if message permitted in phase and #GNUNET_NO if not permitted in given
1188 * phase
1189 */
1190static enum GNUNET_GenericReturnValue
1191check_valid_phase (const uint8_t allowed_phases[],
1192 size_t size_phases,
1193 struct Operation *op)
1194{
1195 /**
1196 * Iterate over allowed phases
1197 */
1198 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1199 {
1200 uint8_t phase = allowed_phases[phase_ctr];
1201 if (phase == op->phase)
1202 {
1203 LOG (GNUNET_ERROR_TYPE_DEBUG,
1204 "Message received in valid phase\n");
1205 return GNUNET_YES;
1206 }
1207 }
1208 LOG (GNUNET_ERROR_TYPE_ERROR,
1209 "Received message in invalid phase: %u\n", op->phase);
1210 return GNUNET_NO;
1211}
1212
1213
1214/**
1215 * Function to update, track and validate message received in differential
1216 * sync. This function tracks states of messages and check it against different
1217 * constraints as described in Summermatter's BSc Thesis (2021)
1218 * @param hash_map: Hashmap to store message control flow
1219 * @param new_mcfs: The new message control flow state an given message type should be set to
1220 * @param hash_code: Hash code of the element
1221 * @param mt: The message type for which the message control flow state should be set
1222 * @return GNUNET_YES message is valid in message control flow GNUNET_NO when message is not valid
1223 * at this point in message flow
1224 */
1225static int
1226update_message_control_flow (struct GNUNET_CONTAINER_MultiHashMap *hash_map,
1227 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1228 const struct GNUNET_HashCode *hash_code,
1229 enum MESSAGE_TYPE mt)
1230{
1231 struct messageControlFlowElement *cfe = NULL;
1232 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1233
1234 /**
1235 * Check logic for forbidden messages
1236 */
1237
1238 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1239 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1240 {
1241 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1242 {
1243 LOG (GNUNET_ERROR_TYPE_ERROR,
1244 "Received an element without sent offer!\n");
1245 return GNUNET_NO;
1246 }
1247 /* Check that only requested elements are received! */
1248 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1249 MSG_CFS_SENT))
1250 {
1251 LOG (GNUNET_ERROR_TYPE_ERROR,
1252 "Received an element that was not demanded\n");
1253 return GNUNET_NO;
1254 }
1255 }
1256
1257 /**
1258 * In case the element hash is not in the hashmap create a new entry
1259 */
1260
1261 if (NULL == cfe)
1262 {
1263 cfe = GNUNET_new (struct messageControlFlowElement);
1264 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1265 cfe,
1266 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1267 {
1268 GNUNET_free (cfe);
1269 return GNUNET_SYSERR;
1270 }
1271 }
1272
1273 /**
1274 * Set state of message
1275 */
1276
1277 if (OFFER_MESSAGE == mt)
1278 {
1279 mcfs = &cfe->offer;
1280 }
1281 else if (DEMAND_MESSAGE == mt)
1282 {
1283 mcfs = &cfe->demand;
1284 }
1285 else if (ELEMENT_MESSAGE == mt)
1286 {
1287 mcfs = &cfe->element;
1288 }
1289 else
1290 {
1291 return GNUNET_SYSERR;
1292 }
1293
1294 /**
1295 * Check if state is allowed
1296 */
1297
1298 if (new_mcfs <= *mcfs)
1299 {
1300 return GNUNET_NO;
1301 }
1302
1303 *mcfs = new_mcfs;
1304 return GNUNET_YES;
1305}
1306
1307
1308/**
1309 * Validate if a message in differential sync si already received before.
1310 * @param hash_map
1311 * @param hash_code
1312 * @param mt
1313 * @return GNUNET_YES when message is already in store if message is not in store return GNUNET_NO
1314 */
1315static int
1316is_message_in_message_control_flow (struct
1317 GNUNET_CONTAINER_MultiHashMap *hash_map,
1318 struct GNUNET_HashCode *hash_code,
1319 enum MESSAGE_TYPE mt)
1320{
1321 struct messageControlFlowElement *cfe = NULL;
1322 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1323
1324 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1325
1326 /**
1327 * Set state of message
1328 */
1329
1330 if (cfe != NULL)
1331 {
1332 if (OFFER_MESSAGE == mt)
1333 {
1334 mcfs = &cfe->offer;
1335 }
1336 else if (DEMAND_MESSAGE == mt)
1337 {
1338 mcfs = &cfe->demand;
1339 }
1340 else if (ELEMENT_MESSAGE == mt)
1341 {
1342 mcfs = &cfe->element;
1343 }
1344 else
1345 {
1346 return GNUNET_SYSERR;
1347 }
1348
1349 /**
1350 * Evaluate if set is in message
1351 */
1352 if (*mcfs != MSG_CFS_UNINITIALIZED)
1353 {
1354 return GNUNET_YES;
1355 }
1356 }
1357 return GNUNET_NO;
1358}
1359
1360
1361/**
1362 * Iterator for determining if all demands have been
1363 * satisfied
1364 *
1365 * @param cls the union operation `struct Operation *`
1366 * @param key unused
1367 * @param value the `struct ElementEntry *` to insert
1368 * into the key-to-element mapping
1369 * @return #GNUNET_YES (to continue iterating)
1370 */
1371static int
1372determinate_done_message_iterator (void *cls,
1373 const struct GNUNET_HashCode *key,
1374 void *value)
1375{
1376 struct messageControlFlowElement *mcfe = value;
1377
1378 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1379 {
1380 return GNUNET_YES;
1381 }
1382 return GNUNET_NO;
1383}
1384
1385
1386/**
1387 * Iterator for determining average size
1388 *
1389 * @param cls the union operation `struct Operation *`
1390 * @param key unused
1391 * @param value the `struct ElementEntry *` to insert
1392 * into the key-to-element mapping
1393 * @return #GNUNET_YES (to continue iterating)
1394 */
1395static int
1396determinate_avg_element_size_iterator (void *cls,
1397 const struct GNUNET_HashCode *key,
1398 void *value)
1399{
1400 struct Operation *op = cls;
1401 struct GNUNET_SETU_Element *element = value;
1402 op->total_elements_size_local += element->size;
1403 return GNUNET_YES;
1404}
1405
1406
1407/**
1408 * Create randomized element hashmap for full sending
1409 *
1410 * @param cls the union operation `struct Operation *`
1411 * @param key unused
1412 * @param value the `struct ElementEntry *` to insert
1413 * into the key-to-element mapping
1414 * @return #GNUNET_YES (to continue iterating)
1415 */
1416static int
1417create_randomized_element_iterator (void *cls,
1418 const struct GNUNET_HashCode *key,
1419 void *value)
1420{
1421 struct Operation *op = cls;
1422
1423 struct GNUNET_HashContext *hashed_key_context =
1424 GNUNET_CRYPTO_hash_context_start ();
1425 struct GNUNET_HashCode new_key;
1426
1427 /**
1428 * Hash element with new salt to randomize hashmap
1429 */
1430 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1431 &key,
1432 sizeof(struct IBF_Key));
1433 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1434 &op->set->content->elements_randomized_salt,
1435 sizeof(uint32_t));
1436 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1437 &new_key);
1438 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1439 &new_key,value,
1440 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1441 return GNUNET_YES;
1442}
1443
1444
1445/**
1446 * Iterator over hash map entries, called to
1447 * destroy the linked list of colliding ibf key entries.
1448 *
1449 * @param cls closure
1450 * @param key current key code
1451 * @param value value in the hash map
1452 * @return #GNUNET_YES if we should continue to iterate,
1453 * #GNUNET_NO if not.
1454 */
1455static int
1456destroy_key_to_element_iter (void *cls,
1457 uint32_t key,
1458 void *value)
1459{
1460 struct KeyEntry *k = value;
1461
1462 GNUNET_assert (NULL != k);
1463 if (GNUNET_YES == k->element->remote)
1464 {
1465 GNUNET_free (k->element);
1466 k->element = NULL;
1467 }
1468 GNUNET_free (k);
1469 return GNUNET_YES;
1470}
1471
1472
1473/**
1474 * Signal to the client that the operation has finished and
1475 * destroy the operation.
1476 *
1477 * @param cls operation to destroy
1478 */
1479static void
1480send_client_done (void *cls)
1481{
1482 struct Operation *op = cls;
1483 struct GNUNET_MQ_Envelope *ev;
1484 struct GNUNET_SETU_ResultMessage *rm;
1485
1486 if (GNUNET_YES == op->client_done_sent)
1487 return;
1488 if (PHASE_FINISHED != op->phase)
1489 {
1490 LOG (GNUNET_ERROR_TYPE_WARNING,
1491 "Union operation failed\n");
1492 GNUNET_STATISTICS_update (_GSS_statistics,
1493 "# Union operations failed",
1494 1,
1495 GNUNET_NO);
1496 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SETU_RESULT);
1497 rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1498 rm->request_id = htonl (op->client_request_id);
1499 rm->element_type = htons (0);
1500 GNUNET_MQ_send (op->set->cs->mq,
1501 ev);
1502 return;
1503 }
1504
1505 op->client_done_sent = GNUNET_YES;
1506
1507 GNUNET_STATISTICS_update (_GSS_statistics,
1508 "# Union operations succeeded",
1509 1,
1510 GNUNET_NO);
1511 LOG (GNUNET_ERROR_TYPE_INFO,
1512 "Signalling client that union operation is done\n");
1513 ev = GNUNET_MQ_msg (rm,
1514 GNUNET_MESSAGE_TYPE_SETU_RESULT);
1515 rm->request_id = htonl (op->client_request_id);
1516 rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
1517 rm->element_type = htons (0);
1518 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
1519 op->key_to_element));
1520 GNUNET_MQ_send (op->set->cs->mq,
1521 ev);
1522}
1523
1524
1525/**
1526 * Check if all given byzantine parameters are in given boundaries
1527 * @param op
1528 * @return indicator if all given byzantine parameters are in given boundaries
1529 */
1530
1531static int
1532check_byzantine_bounds (struct Operation *op)
1533{
1534 if (op->byzantine != GNUNET_YES)
1535 return GNUNET_OK;
1536
1537 /**
1538 * Check upper byzantine bounds
1539 */
1540 if (op->remote_element_count + op->remote_set_diff >
1541 op->byzantine_upper_bound)
1542 return GNUNET_SYSERR;
1543 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1544 return GNUNET_SYSERR;
1545
1546 /**
1547 * Check lower byzantine bounds
1548 */
1549 if (op->remote_element_count < op->byzantine_lower_bound)
1550 return GNUNET_SYSERR;
1551 return GNUNET_OK;
1552}
1553
1554
1555/* FIXME: the destroy logic is a mess and should be cleaned up! */
1556
1557/**
1558 * Destroy the given operation. Used for any operation where both
1559 * peers were known and that thus actually had a vt and channel. Must
1560 * not be used for operations where 'listener' is still set and we do
1561 * not know the other peer.
1562 *
1563 * Call the implementation-specific cancel function of the operation.
1564 * Disconnects from the remote peer. Does not disconnect the client,
1565 * as there may be multiple operations per set.
1566 *
1567 * @param op operation to destroy
1568 */
1569static void
1570_GSS_operation_destroy (struct Operation *op)
1571{
1572 struct Set *set = op->set;
1573 struct GNUNET_CADET_Channel *channel;
1574
1575 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1576 "Destroying union operation %p\n",
1577 op);
1578 GNUNET_assert (NULL == op->listener);
1579 /* check if the op was canceled twice */
1580 if (NULL != op->remote_ibf)
1581 {
1582 ibf_destroy (op->remote_ibf);
1583 op->remote_ibf = NULL;
1584 }
1585 if (NULL != op->demanded_hashes)
1586 {
1587 GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
1588 op->demanded_hashes = NULL;
1589 }
1590 if (NULL != op->local_ibf)
1591 {
1592 ibf_destroy (op->local_ibf);
1593 op->local_ibf = NULL;
1594 }
1595 if (NULL != op->se)
1596 {
1597 strata_estimator_destroy (op->se);
1598 op->se = NULL;
1599 }
1600 if (NULL != op->key_to_element)
1601 {
1602 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
1603 &destroy_key_to_element_iter,
1604 NULL);
1605 GNUNET_CONTAINER_multihashmap32_destroy (op->key_to_element);
1606 op->key_to_element = NULL;
1607 }
1608 if (NULL != set)
1609 {
1610 GNUNET_CONTAINER_DLL_remove (set->ops_head,
1611 set->ops_tail,
1612 op);
1613 op->set = NULL;
1614 }
1615 if (NULL != op->context_msg)
1616 {
1617 GNUNET_free (op->context_msg);
1618 op->context_msg = NULL;
1619 }
1620 if (NULL != (channel = op->channel))
1621 {
1622 /* This will free op; called conditionally as this helper function
1623 is also called from within the channel disconnect handler. */
1624 op->channel = NULL;
1625 GNUNET_CADET_channel_destroy (channel);
1626 }
1627 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
1628 * there was a channel end handler that will free 'op' on the call stack. */
1629}
1630
1631
1632/**
1633 * This function probably should not exist
1634 * and be replaced by inlining more specific
1635 * logic in the various places where it is called.
1636 */
1637static void
1638_GSS_operation_destroy2 (struct Operation *op);
1639
1640
1641/**
1642 * Destroy an incoming request from a remote peer
1643 *
1644 * @param op remote request to destroy
1645 */
1646static void
1647incoming_destroy (struct Operation *op)
1648{
1649 struct Listener *listener;
1650
1651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1652 "Destroying incoming operation %p\n",
1653 op);
1654 if (NULL != (listener = op->listener))
1655 {
1656 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1657 listener->op_tail,
1658 op);
1659 op->listener = NULL;
1660 }
1661 if (NULL != op->timeout_task)
1662 {
1663 GNUNET_SCHEDULER_cancel (op->timeout_task);
1664 op->timeout_task = NULL;
1665 }
1666 _GSS_operation_destroy2 (op);
1667}
1668
1669
1670/**
1671 * This function probably should not exist
1672 * and be replaced by inlining more specific
1673 * logic in the various places where it is called.
1674 */
1675static void
1676_GSS_operation_destroy2 (struct Operation *op)
1677{
1678 struct GNUNET_CADET_Channel *channel;
1679
1680 if (NULL != (channel = op->channel))
1681 {
1682 /* This will free op; called conditionally as this helper function
1683 is also called from within the channel disconnect handler. */
1684 op->channel = NULL;
1685 GNUNET_CADET_channel_destroy (channel);
1686 }
1687 if (NULL != op->listener)
1688 {
1689 incoming_destroy (op);
1690 return;
1691 }
1692 if (NULL != op->set)
1693 send_client_done (op);
1694 _GSS_operation_destroy (op);
1695 GNUNET_free (op);
1696}
1697
1698
1699/**
1700 * Inform the client that the union operation has failed,
1701 * and proceed to destroy the evaluate operation.
1702 *
1703 * @param op the union operation to fail
1704 */
1705static void
1706fail_union_operation (struct Operation *op)
1707{
1708 struct GNUNET_MQ_Envelope *ev;
1709 struct GNUNET_SETU_ResultMessage *msg;
1710
1711 LOG (GNUNET_ERROR_TYPE_WARNING,
1712 "union operation failed\n");
1713 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETU_RESULT);
1714 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1715 msg->request_id = htonl (op->client_request_id);
1716 msg->element_type = htons (0);
1717 GNUNET_MQ_send (op->set->cs->mq,
1718 ev);
1719 _GSS_operation_destroy (op);
1720}
1721
1722
1723/**
1724 * Function that checks if full sync is plausible
1725 * @param initial_local_elements_in_set
1726 * @param estimated_set_difference
1727 * @param repeated_elements
1728 * @param fresh_elements
1729 * @param op
1730 * @return GNUNET_OK if
1731 */
1732
1733static void
1734full_sync_plausibility_check (struct Operation *op)
1735{
1736 if (GNUNET_YES != op->byzantine)
1737 return;
1738
1739 int security_level_lb = -1 * SECURITY_LEVEL;
1740 uint64_t duplicates = op->received_fresh - op->received_total;
1741
1742 /*
1743 * Protect full sync from receiving double element when in FULL SENDING
1744 */
1745 if (PHASE_FULL_SENDING == op->phase)
1746 {
1747 if (duplicates > 0)
1748 {
1749 LOG (GNUNET_ERROR_TYPE_ERROR,
1750 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1751 "mode of operation this is not allowed! Duplicates: %llu\n",
1752 (unsigned long long) duplicates);
1753 GNUNET_break_op (0);
1754 fail_union_operation (op);
1755 return;
1756 }
1757
1758 }
1759
1760 /*
1761 * Protect full sync with probabilistic algorithm
1762 */
1763 if (PHASE_FULL_RECEIVING == op->phase)
1764 {
1765 if (0 == op->remote_set_diff)
1766 op->remote_set_diff = 1;
1767
1768 long double base = (1 - (long double) (op->remote_set_diff
1769 / (long double) (op->initial_size
1770 + op->
1771 remote_set_diff)));
1772 long double exponent = (op->received_total - (op->received_fresh * ((long
1773 double)
1774 op->
1775 initial_size
1776 / (long
1777 double)
1778 op->
1779 remote_set_diff)));
1780 long double value = exponent * (log2l (base) / log2l (2));
1781 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1782 {
1783 LOG (GNUNET_ERROR_TYPE_ERROR,
1784 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1785 "to many duplicated full element : %LF\n",
1786 value);
1787 GNUNET_break_op (0);
1788 fail_union_operation (op);
1789 return;
1790 }
1791 }
1792}
1793
1794
1795/**
1796 * Limit active passive switches in differential sync to configured security level
1797 * @param op
1798 */
1799static void
1800check_max_differential_rounds (struct Operation *op)
1801{
1802 double probability = op->differential_sync_iterations * (log2l (
1803 PROBABILITY_FOR_NEW_ROUND)
1804 / log2l (2));
1805 if ((-1 * SECURITY_LEVEL) > probability)
1806 {
1807 LOG (GNUNET_ERROR_TYPE_ERROR,
1808 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1809 "switches in differential sync: %u\n",
1810 op->differential_sync_iterations);
1811 GNUNET_break_op (0);
1812 fail_union_operation (op);
1813 return;
1814 }
1815}
1816
1817
1818/**
1819 * Derive the IBF key from a hash code and
1820 * a salt.
1821 *
1822 * @param src the hash code
1823 * @return the derived IBF key
1824 */
1825static struct IBF_Key
1826get_ibf_key (const struct GNUNET_HashCode *src)
1827{
1828 struct IBF_Key key;
1829 uint16_t salt = 0;
1830
1831 GNUNET_assert (GNUNET_OK ==
1832 GNUNET_CRYPTO_kdf (&key, sizeof(key),
1833 src, sizeof *src,
1834 &salt, sizeof(salt),
1835 NULL, 0));
1836 return key;
1837}
1838
1839
1840/**
1841 * Context for #op_get_element_iterator
1842 */
1843struct GetElementContext
1844{
1845 /**
1846 * Gnunet hash code in context
1847 */
1848 struct GNUNET_HashCode hash;
1849
1850 /**
1851 * Pointer to the key entry
1852 */
1853 struct KeyEntry *k;
1854};
1855
1856
1857/**
1858 * Iterator over the mapping from IBF keys to element entries. Checks if we
1859 * have an element with a given GNUNET_HashCode.
1860 *
1861 * @param cls closure
1862 * @param key current key code
1863 * @param value value in the hash map
1864 * @return #GNUNET_YES if we should search further,
1865 * #GNUNET_NO if we've found the element.
1866 */
1867static int
1868op_get_element_iterator (void *cls,
1869 uint32_t key,
1870 void *value)
1871{
1872 struct GetElementContext *ctx = cls;
1873 struct KeyEntry *k = value;
1874
1875 GNUNET_assert (NULL != k);
1876 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
1877 &ctx->hash))
1878 {
1879 ctx->k = k;
1880 return GNUNET_NO;
1881 }
1882 return GNUNET_YES;
1883}
1884
1885
1886/**
1887 * Determine whether the given element is already in the operation's element
1888 * set.
1889 *
1890 * @param op operation that should be tested for 'element_hash'
1891 * @param element_hash hash of the element to look for
1892 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
1893 */
1894static struct KeyEntry *
1895op_get_element (struct Operation *op,
1896 const struct GNUNET_HashCode *element_hash)
1897{
1898 int ret;
1899 struct IBF_Key ibf_key;
1900 struct GetElementContext ctx = { { { 0 } }, 0 };
1901
1902 ctx.hash = *element_hash;
1903
1904 ibf_key = get_ibf_key (element_hash);
1905 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->key_to_element,
1906 (uint32_t) ibf_key.key_val,
1907 &op_get_element_iterator,
1908 &ctx);
1909
1910 /* was the iteration aborted because we found the element? */
1911 if (GNUNET_SYSERR == ret)
1912 {
1913 GNUNET_assert (NULL != ctx.k);
1914 return ctx.k;
1915 }
1916 return NULL;
1917}
1918
1919
1920/**
1921 * Insert an element into the union operation's
1922 * key-to-element mapping. Takes ownership of 'ee'.
1923 * Note that this does not insert the element in the set,
1924 * only in the operation's key-element mapping.
1925 * This is done to speed up re-tried operations, if some elements
1926 * were transmitted, and then the IBF fails to decode.
1927 *
1928 * XXX: clarify ownership, doesn't sound right.
1929 *
1930 * @param op the union operation
1931 * @param ee the element entry
1932 * @param received was this element received from the remote peer?
1933 */
1934static void
1935op_register_element (struct Operation *op,
1936 struct ElementEntry *ee,
1937 int received)
1938{
1939 struct IBF_Key ibf_key;
1940 struct KeyEntry *k;
1941
1942 ibf_key = get_ibf_key (&ee->element_hash);
1943 k = GNUNET_new (struct KeyEntry);
1944 k->element = ee;
1945 k->ibf_key = ibf_key;
1946 k->received = received;
1947 GNUNET_assert (GNUNET_OK ==
1948 GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
1949 (uint32_t) ibf_key.key_val,
1950 k,
1951 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1952}
1953
1954
1955/**
1956 * Modify an IBF key @a k_in based on the @a salt, returning a
1957 * salted key in @a k_out.
1958 */
1959static void
1960salt_key (const struct IBF_Key *k_in,
1961 uint32_t salt,
1962 struct IBF_Key *k_out)
1963{
1964 int s = (salt * 7) % 64;
1965 uint64_t x = k_in->key_val;
1966
1967 /* rotate ibf key */
1968 x = (x >> s) | (x << (64 - s));
1969 k_out->key_val = x;
1970}
1971
1972
1973/**
1974 * Reverse modification done in the salt_key function
1975 */
1976static void
1977unsalt_key (const struct IBF_Key *k_in,
1978 uint32_t salt,
1979 struct IBF_Key *k_out)
1980{
1981 int s = (salt * 7) % 64;
1982 uint64_t x = k_in->key_val;
1983
1984 x = (x << s) | (x >> (64 - s));
1985 k_out->key_val = x;
1986}
1987
1988
1989/**
1990 * Insert a key into an ibf.
1991 *
1992 * @param cls the ibf
1993 * @param key unused
1994 * @param value the key entry to get the key from
1995 */
1996static int
1997prepare_ibf_iterator (void *cls,
1998 uint32_t key,
1999 void *value)
2000{
2001 struct Operation *op = cls;
2002 struct KeyEntry *ke = value;
2003 struct IBF_Key salted_key;
2004
2005 LOG (GNUNET_ERROR_TYPE_DEBUG,
2006 "[OP %p] inserting %lx (hash %s) into ibf\n",
2007 op,
2008 (unsigned long) ke->ibf_key.key_val,
2009 GNUNET_h2s (&ke->element->element_hash));
2010 salt_key (&ke->ibf_key,
2011 op->salt_send,
2012 &salted_key);
2013 ibf_insert (op->local_ibf, salted_key);
2014 return GNUNET_YES;
2015}
2016
2017
2018/**
2019 * Is element @a ee part of the set used by @a op?
2020 *
2021 * @param ee element to test
2022 * @param op operation the defines the set and its generation
2023 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
2024 */
2025static int
2026_GSS_is_element_of_operation (struct ElementEntry *ee,
2027 struct Operation *op)
2028{
2029 return ee->generation >= op->generation_created;
2030}
2031
2032
2033/**
2034 * Iterator for initializing the
2035 * key-to-element mapping of a union operation
2036 *
2037 * @param cls the union operation `struct Operation *`
2038 * @param key unused
2039 * @param value the `struct ElementEntry *` to insert
2040 * into the key-to-element mapping
2041 * @return #GNUNET_YES (to continue iterating)
2042 */
2043static int
2044init_key_to_element_iterator (void *cls,
2045 const struct GNUNET_HashCode *key,
2046 void *value)
2047{
2048 struct Operation *op = cls;
2049 struct ElementEntry *ee = value;
2050
2051 /* make sure that the element belongs to the set at the time
2052 * of creating the operation */
2053 if (GNUNET_NO ==
2054 _GSS_is_element_of_operation (ee,
2055 op))
2056 return GNUNET_YES;
2057 GNUNET_assert (GNUNET_NO == ee->remote);
2058 op_register_element (op,
2059 ee,
2060 GNUNET_NO);
2061 return GNUNET_YES;
2062}
2063
2064
2065/**
2066 * Initialize the IBF key to element mapping local to this set operation.
2067 *
2068 * @param op the set union operation
2069 */
2070static void
2071initialize_key_to_element (struct Operation *op)
2072{
2073 unsigned int len;
2074
2075 GNUNET_assert (NULL == op->key_to_element);
2076 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
2077 op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
2078 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2079 &init_key_to_element_iterator,
2080 op);
2081}
2082
2083
2084/**
2085 * Create an ibf with the operation's elements
2086 * of the specified size
2087 *
2088 * @param op the union operation
2089 * @param size size of the ibf to create
2090 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
2091 */
2092static int
2093prepare_ibf (struct Operation *op,
2094 uint32_t size)
2095{
2096 GNUNET_assert (NULL != op->key_to_element);
2097
2098 if (NULL != op->local_ibf)
2099 ibf_destroy (op->local_ibf);
2100 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2101 op->local_ibf = ibf_create (size,
2102 ((uint8_t) op->ibf_number_buckets_per_element));
2103 if (NULL == op->local_ibf)
2104 {
2105 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2106 "Failed to allocate local IBF\n");
2107 return GNUNET_SYSERR;
2108 }
2109 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
2110 &prepare_ibf_iterator,
2111 op);
2112 return GNUNET_OK;
2113}
2114
2115
2116/**
2117 * Send an ibf of appropriate size.
2118 *
2119 * Fragments the IBF into multiple messages if necessary.
2120 *
2121 * @param op the union operation
2122 * @param ibf_order order of the ibf to send, size=2^order
2123 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
2124 */
2125static int
2126send_ibf (struct Operation *op,
2127 uint32_t ibf_size)
2128{
2129 uint64_t buckets_sent = 0;
2130 struct InvertibleBloomFilter *ibf;
2131 op->differential_sync_iterations++;
2132
2133 /**
2134 * Enforce min size of IBF
2135 */
2136 uint32_t ibf_min_size = IBF_MIN_SIZE;
2137
2138 if (ibf_size < ibf_min_size)
2139 {
2140 ibf_size = ibf_min_size;
2141 }
2142 if (GNUNET_OK !=
2143 prepare_ibf (op, ibf_size))
2144 {
2145 /* allocation failed */
2146 return GNUNET_SYSERR;
2147 }
2148
2149 LOG (GNUNET_ERROR_TYPE_DEBUG,
2150 "sending ibf of size %u\n",
2151 (unsigned int) ibf_size);
2152
2153 {
2154 char name[64];
2155
2156 GNUNET_snprintf (name,
2157 sizeof(name),
2158 "# sent IBF (order %u)",
2159 ibf_size);
2160 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
2161 }
2162
2163 ibf = op->local_ibf;
2164
2165 while (buckets_sent < ibf_size)
2166 {
2167 unsigned int buckets_in_message;
2168 struct GNUNET_MQ_Envelope *ev;
2169 struct IBFMessage *msg;
2170
2171 buckets_in_message = ibf_size - buckets_sent;
2172 /* limit to maximum */
2173 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
2174 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
2175
2176#if MEASURE_PERFORMANCE
2177 perf_store.ibf.sent += 1;
2178 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2179#endif
2180 ev = GNUNET_MQ_msg_extra (msg,
2181 buckets_in_message * IBF_BUCKET_SIZE,
2182 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
2183 msg->ibf_size = ibf_size;
2184 msg->offset = htonl (buckets_sent);
2185 msg->salt = htonl (op->salt_send);
2186 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2187
2188
2189 ibf_write_slice (ibf, buckets_sent,
2190 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
2191 buckets_sent += buckets_in_message;
2192 LOG (GNUNET_ERROR_TYPE_DEBUG,
2193 "ibf chunk size %u, %llu/%u sent\n",
2194 (unsigned int) buckets_in_message,
2195 (unsigned long long) buckets_sent,
2196 (unsigned int) ibf_size);
2197 GNUNET_MQ_send (op->mq, ev);
2198 }
2199
2200 /* The other peer must decode the IBF, so
2201 * we're passive. */
2202 op->phase = PHASE_PASSIVE_DECODING;
2203 return GNUNET_OK;
2204}
2205
2206
2207/**
2208 * Compute the necessary order of an ibf
2209 * from the size of the symmetric set difference.
2210 *
2211 * @param diff the difference
2212 * @return the required size of the ibf
2213 */
2214static unsigned int
2215get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2216 float ibf_bucket_number_factor)
2217{
2218 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2219 * Elias Summermatter (2021) in section 3.11 **/
2220 return (((int) (diff * ibf_bucket_number_factor)) | 1);
2221
2222}
2223
2224
2225static unsigned int
2226get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
2227 decoded_elements, unsigned int last_ibf_size)
2228{
2229 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2230 - (ibf_bucket_number_factor
2231 * decoded_elements));
2232 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2233 * Elias Summermatter (2021) in section 3.11 **/
2234 return next_size | 1;
2235}
2236
2237
2238/**
2239 * Send a set element.
2240 *
2241 * @param cls the union operation `struct Operation *`
2242 * @param key unused
2243 * @param value the `struct ElementEntry *` to insert
2244 * into the key-to-element mapping
2245 * @return #GNUNET_YES (to continue iterating)
2246 */
2247static int
2248send_full_element_iterator (void *cls,
2249 const struct GNUNET_HashCode *key,
2250 void *value)
2251{
2252 struct Operation *op = cls;
2253 struct GNUNET_SETU_ElementMessage *emsg;
2254 struct ElementEntry *ee = value;
2255 struct GNUNET_SETU_Element *el = &ee->element;
2256 struct GNUNET_MQ_Envelope *ev;
2257
2258 LOG (GNUNET_ERROR_TYPE_DEBUG,
2259 "Sending element %s\n",
2260 GNUNET_h2s (key));
2261#if MEASURE_PERFORMANCE
2262 perf_store.element_full.received += 1;
2263 perf_store.element_full.received_var_bytes += el->size;
2264#endif
2265 ev = GNUNET_MQ_msg_extra (emsg,
2266 el->size,
2267 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
2268 emsg->element_type = htons (el->element_type);
2269 GNUNET_memcpy (&emsg[1],
2270 el->data,
2271 el->size);
2272 GNUNET_MQ_send (op->mq,
2273 ev);
2274 return GNUNET_YES;
2275}
2276
2277
2278/**
2279 * Switch to full set transmission for @a op.
2280 *
2281 * @param op operation to switch to full set transmission.
2282 */
2283static void
2284send_full_set (struct Operation *op)
2285{
2286 struct GNUNET_MQ_Envelope *ev;
2287
2288 op->phase = PHASE_FULL_SENDING;
2289 LOG (GNUNET_ERROR_TYPE_DEBUG,
2290 "Dedicing to transmit the full set\n");
2291 /* FIXME: use a more memory-friendly way of doing this with an
2292 iterator, just as we do in the non-full case! */
2293
2294 // Randomize Elements to send
2295 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2296 32,GNUNET_NO);
2297 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2298 GNUNET_CRYPTO_QUALITY_NONCE,
2299 UINT64_MAX);
2300 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2301 &
2302 create_randomized_element_iterator,
2303 op);
2304
2305 (void) GNUNET_CONTAINER_multihashmap_iterate (
2306 op->set->content->elements_randomized,
2307 &send_full_element_iterator,
2308 op);
2309#if MEASURE_PERFORMANCE
2310 perf_store.full_done.sent += 1;
2311#endif
2312 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
2313 GNUNET_MQ_send (op->mq,
2314 ev);
2315}
2316
2317
2318/**
2319 * Handle a strata estimator from a remote peer
2320 *
2321 * @param cls the union operation
2322 * @param msg the message
2323 */
2324static int
2325check_union_p2p_strata_estimator (void *cls,
2326 const struct StrataEstimatorMessage *msg)
2327{
2328 struct Operation *op = cls;
2329 int is_compressed;
2330 size_t len;
2331
2332 if (op->phase != PHASE_EXPECT_SE)
2333 {
2334 GNUNET_break (0);
2335 return GNUNET_SYSERR;
2336 }
2337 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2338 msg->header.type));
2339 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2340 if ((GNUNET_NO == is_compressed) &&
2341 (len != SE_STRATA_COUNT * SE_IBFS_TOTAL_SIZE * IBF_BUCKET_SIZE))
2342 {
2343 GNUNET_break (0);
2344 return GNUNET_SYSERR;
2345 }
2346 return GNUNET_OK;
2347}
2348
2349
2350/**
2351 * Handle a strata estimator from a remote peer
2352 *
2353 * @param cls the union operation
2354 * @param msg the message
2355 */
2356static void
2357handle_union_p2p_strata_estimator (void *cls,
2358 const struct StrataEstimatorMessage *msg)
2359{
2360#if MEASURE_PERFORMANCE
2361 perf_store.se.received += 1;
2362 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2363 StrataEstimatorMessage);
2364#endif
2365 struct Operation *op = cls;
2366 struct MultiStrataEstimator *remote_se;
2367 unsigned int diff;
2368 uint64_t other_size;
2369 size_t len;
2370 int is_compressed;
2371 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2372 op->set->content->elements);
2373 // Setting peer site to receiving peer
2374 op->peer_site = 1;
2375
2376 /**
2377 * Check that the message is received only in supported phase
2378 */
2379 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2380 if (GNUNET_OK !=
2381 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2382 {
2383 GNUNET_break (0);
2384 fail_union_operation (op);
2385 return;
2386 }
2387
2388 /** Only allow 1,2,4,8 SEs **/
2389 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2390 {
2391 LOG (GNUNET_ERROR_TYPE_ERROR,
2392 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2393 msg->se_count);
2394 GNUNET_break_op (0);
2395 fail_union_operation (op);
2396 return;
2397 }
2398
2399 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2400 msg->header.type));
2401 GNUNET_STATISTICS_update (_GSS_statistics,
2402 "# bytes of SE received",
2403 ntohs (msg->header.size),
2404 GNUNET_NO);
2405 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2406 other_size = GNUNET_ntohll (msg->set_size);
2407 op->remote_element_count = other_size;
2408
2409 if (op->byzantine_upper_bound < op->remote_element_count)
2410 {
2411 LOG (GNUNET_ERROR_TYPE_ERROR,
2412 "Exceeded configured upper bound <%lu> of element: %u\n",
2413 op->byzantine_upper_bound,
2414 op->remote_element_count);
2415 fail_union_operation (op);
2416 return;
2417 }
2418
2419 remote_se = strata_estimator_create (SE_STRATA_COUNT,
2420 SE_IBFS_TOTAL_SIZE,
2421 SE_IBF_HASH_NUM);
2422 if (NULL == remote_se)
2423 {
2424 /* insufficient resources, fail */
2425 fail_union_operation (op);
2426 return;
2427 }
2428 if (GNUNET_OK !=
2429 strata_estimator_read (&msg[1],
2430 len,
2431 is_compressed,
2432 msg->se_count,
2433 SE_IBFS_TOTAL_SIZE,
2434 remote_se))
2435 {
2436 /* decompression failed */
2437 strata_estimator_destroy (remote_se);
2438 fail_union_operation (op);
2439 return;
2440 }
2441 GNUNET_assert (NULL != op->se);
2442 strata_estimator_difference (remote_se,
2443 op->se);
2444
2445 /* Calculate remote local diff */
2446 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2447 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2448
2449 /* Prevent estimations from overshooting max element */
2450 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2451 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2452 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2453 diff_local = op->byzantine_upper_bound - op->local_element_count;
2454 if ((diff_remote < 0) || (diff_local < 0))
2455 {
2456 strata_estimator_destroy (remote_se);
2457 LOG (GNUNET_ERROR_TYPE_ERROR,
2458 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2459 "malicious: remote diff %ld, local diff: %ld\n",
2460 diff_remote, diff_local);
2461 GNUNET_break_op (0);
2462 fail_union_operation (op);
2463 return;
2464 }
2465
2466 /* Make estimation more precise in initial sync cases */
2467 if (0 == op->remote_element_count)
2468 {
2469 diff_remote = 0;
2470 diff_local = op->local_element_count;
2471 }
2472 if (0 == op->local_element_count)
2473 {
2474 diff_local = 0;
2475 diff_remote = op->remote_element_count;
2476 }
2477
2478 diff = diff_remote + diff_local;
2479 op->remote_set_diff = diff_remote;
2480
2481 /** Calculate avg element size if not initial sync **/
2482 uint64_t avg_element_size = 0;
2483 if (0 < op->local_element_count)
2484 {
2485 op->total_elements_size_local = 0;
2486 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2487 &
2488 determinate_avg_element_size_iterator,
2489 op);
2490 avg_element_size = op->total_elements_size_local / op->local_element_count;
2491 }
2492
2493 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2494 GNUNET_CONTAINER_multihashmap_size (
2495 op->set->content->
2496 elements),
2497 op->
2498 remote_element_count,
2499 diff_remote,
2500 diff_local,
2501 op->
2502 rtt_bandwidth_tradeoff,
2503 op->
2504 ibf_bucket_number_factor);
2505
2506#if MEASURE_PERFORMANCE
2507 perf_store.se_diff_local = diff_local;
2508 perf_store.se_diff_remote = diff_remote;
2509 perf_store.se_diff = diff;
2510 perf_store.mode_of_operation = op->mode_of_operation;
2511#endif
2512
2513 strata_estimator_destroy (remote_se);
2514 strata_estimator_destroy (op->se);
2515 op->se = NULL;
2516 LOG (GNUNET_ERROR_TYPE_DEBUG,
2517 "got se diff=%d, using ibf size %d\n",
2518 diff,
2519 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2520 op->ibf_bucket_number_factor));
2521
2522 {
2523 char *set_debug;
2524
2525 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
2526 if ((NULL != set_debug) &&
2527 (0 == strcmp (set_debug, "1")))
2528 {
2529 FILE *f = fopen ("set.log", "a");
2530 fprintf (f, "%llu\n", (unsigned long long) diff);
2531 fclose (f);
2532 }
2533 }
2534
2535 if ((GNUNET_YES == op->byzantine) &&
2536 (other_size < op->byzantine_lower_bound))
2537 {
2538 GNUNET_break (0);
2539 fail_union_operation (op);
2540 return;
2541 }
2542
2543 if ((GNUNET_YES == op->force_full) ||
2544 (op->mode_of_operation != DIFFERENTIAL_SYNC))
2545 {
2546 LOG (GNUNET_ERROR_TYPE_DEBUG,
2547 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
2548 diff,
2549 (unsigned long long) op->initial_size);
2550 GNUNET_STATISTICS_update (_GSS_statistics,
2551 "# of full sends",
2552 1,
2553 GNUNET_NO);
2554 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
2555 {
2556 struct TransmitFullMessage *signal_msg;
2557 struct GNUNET_MQ_Envelope *ev;
2558 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2559 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL);
2560 signal_msg->remote_set_difference = htonl (diff_local);
2561 signal_msg->remote_set_size = htonl (op->local_element_count);
2562 signal_msg->local_set_difference = htonl (diff_remote);
2563 GNUNET_MQ_send (op->mq,
2564 ev);
2565 send_full_set (op);
2566 }
2567 else
2568 {
2569 struct GNUNET_MQ_Envelope *ev;
2570
2571 LOG (GNUNET_ERROR_TYPE_DEBUG,
2572 "Telling other peer that we expect its full set\n");
2573 op->phase = PHASE_FULL_RECEIVING;
2574#if MEASURE_PERFORMANCE
2575 perf_store.request_full.sent += 1;
2576#endif
2577 struct TransmitFullMessage *signal_msg;
2578 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2579 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
2580 signal_msg->remote_set_difference = htonl (diff_local);
2581 signal_msg->remote_set_size = htonl (op->local_element_count);
2582 signal_msg->local_set_difference = htonl (diff_remote);
2583 GNUNET_MQ_send (op->mq,
2584 ev);
2585 }
2586 }
2587 else
2588 {
2589 GNUNET_STATISTICS_update (_GSS_statistics,
2590 "# of ibf sends",
2591 1,
2592 GNUNET_NO);
2593 if (GNUNET_OK !=
2594 send_ibf (op,
2595 get_size_from_difference (diff,
2596 op->ibf_number_buckets_per_element,
2597 op->ibf_bucket_number_factor)))
2598 {
2599 /* Internal error, best we can do is shut the connection */
2600 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2601 "Failed to send IBF, closing connection\n");
2602 fail_union_operation (op);
2603 return;
2604 }
2605 }
2606 GNUNET_CADET_receive_done (op->channel);
2607}
2608
2609
2610/**
2611 * Iterator to send elements to a remote peer
2612 *
2613 * @param cls closure with the element key and the union operation
2614 * @param key ignored
2615 * @param value the key entry
2616 */
2617static int
2618send_offers_iterator (void *cls,
2619 uint32_t key,
2620 void *value)
2621{
2622 struct SendElementClosure *sec = cls;
2623 struct Operation *op = sec->op;
2624 struct KeyEntry *ke = value;
2625 struct GNUNET_MQ_Envelope *ev;
2626 struct GNUNET_MessageHeader *mh;
2627
2628 /* Detect 32-bit key collision for the 64-bit IBF keys. */
2629 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2630 {
2631 op->active_passive_switch_required = true;
2632 return GNUNET_YES;
2633 }
2634
2635 /* Prevent implementation from sending a offer multiple times in case of roll switch */
2636 if (GNUNET_YES ==
2637 is_message_in_message_control_flow (
2638 op->message_control_flow,
2639 &ke->element->element_hash,
2640 OFFER_MESSAGE)
2641 )
2642 {
2643 LOG (GNUNET_ERROR_TYPE_DEBUG,
2644 "Skipping already sent processed element offer!\n");
2645 return GNUNET_YES;
2646 }
2647
2648 /* Save send offer message for message control */
2649 if (GNUNET_YES !=
2650 update_message_control_flow (
2651 op->message_control_flow,
2652 MSG_CFS_SENT,
2653 &ke->element->element_hash,
2654 OFFER_MESSAGE)
2655 )
2656 {
2657 LOG (GNUNET_ERROR_TYPE_ERROR,
2658 "Double offer message sent found!\n");
2659 GNUNET_break (0);
2660 fail_union_operation (op);
2661 return GNUNET_NO;
2662 }
2663 ;
2664
2665 /* Mark element to be expected to received */
2666 if (GNUNET_YES !=
2667 update_message_control_flow (
2668 op->message_control_flow,
2669 MSG_CFS_EXPECTED,
2670 &ke->element->element_hash,
2671 DEMAND_MESSAGE)
2672 )
2673 {
2674 LOG (GNUNET_ERROR_TYPE_ERROR,
2675 "Double demand received found!\n");
2676 GNUNET_break (0);
2677 fail_union_operation (op);
2678 return GNUNET_NO;
2679 }
2680 ;
2681#if MEASURE_PERFORMANCE
2682 perf_store.offer.sent += 1;
2683 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2684#endif
2685 ev = GNUNET_MQ_msg_header_extra (mh,
2686 sizeof(struct GNUNET_HashCode),
2687 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
2688 GNUNET_assert (NULL != ev);
2689 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
2690 LOG (GNUNET_ERROR_TYPE_DEBUG,
2691 "[OP %p] sending element offer (%s) to peer\n",
2692 op,
2693 GNUNET_h2s (&ke->element->element_hash));
2694 GNUNET_MQ_send (op->mq, ev);
2695 return GNUNET_YES;
2696}
2697
2698
2699/**
2700 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
2701 *
2702 * @param op union operation
2703 * @param ibf_key IBF key of interest
2704 */
2705void
2706send_offers_for_key (struct Operation *op,
2707 struct IBF_Key ibf_key)
2708{
2709 struct SendElementClosure send_cls;
2710
2711 send_cls.ibf_key = ibf_key;
2712 send_cls.op = op;
2713 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
2714 op->key_to_element,
2715 (uint32_t) ibf_key.
2716 key_val,
2717 &send_offers_iterator,
2718 &send_cls);
2719}
2720
2721
2722/**
2723 * Decode which elements are missing on each side, and
2724 * send the appropriate offers and inquiries.
2725 *
2726 * @param op union operation
2727 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
2728 */
2729static int
2730decode_and_send (struct Operation *op)
2731{
2732 struct IBF_Key key;
2733 struct IBF_Key last_key;
2734 int side;
2735 unsigned int num_decoded;
2736 struct InvertibleBloomFilter *diff_ibf;
2737
2738 GNUNET_assert (PHASE_ACTIVE_DECODING == op->phase);
2739
2740 if (GNUNET_OK !=
2741 prepare_ibf (op,
2742 op->remote_ibf->size))
2743 {
2744 GNUNET_break (0);
2745 /* allocation failed */
2746 return GNUNET_SYSERR;
2747 }
2748
2749 diff_ibf = ibf_dup (op->local_ibf);
2750 ibf_subtract (diff_ibf,
2751 op->remote_ibf);
2752
2753 ibf_destroy (op->remote_ibf);
2754 op->remote_ibf = NULL;
2755
2756 LOG (GNUNET_ERROR_TYPE_DEBUG,
2757 "decoding IBF (size=%u)\n",
2758 diff_ibf->size);
2759
2760 num_decoded = 0;
2761 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
2762
2763 while (1)
2764 {
2765 int res;
2766 int cycle_detected = GNUNET_NO;
2767
2768 last_key = key;
2769
2770 res = ibf_decode (diff_ibf,
2771 &side,
2772 &key);
2773 if (res == GNUNET_OK)
2774 {
2775 LOG (GNUNET_ERROR_TYPE_DEBUG,
2776 "decoded ibf key %lx\n",
2777 (unsigned long) key.key_val);
2778 num_decoded += 1;
2779 if ((num_decoded > diff_ibf->size) ||
2780 ((num_decoded > 1) &&
2781 (last_key.key_val == key.key_val)))
2782 {
2783 LOG (GNUNET_ERROR_TYPE_DEBUG,
2784 "detected cyclic ibf (decoded %u/%u)\n",
2785 num_decoded,
2786 diff_ibf->size);
2787 cycle_detected = GNUNET_YES;
2788 }
2789 }
2790 if ((GNUNET_SYSERR == res) ||
2791 (GNUNET_YES == cycle_detected))
2792 {
2793 uint32_t next_size;
2794 /** Enforce odd ibf size **/
2795
2796 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
2797 diff_ibf->size);
2798 /** Make ibf estimation size odd reasoning can be found in BSc Thesis of
2799 * Elias Summermatter (2021) in section 3.11 **/
2800 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2801
2802 if (next_size<ibf_min_size)
2803 next_size = ibf_min_size;
2804
2805
2806 if (next_size <= MAX_IBF_SIZE)
2807 {
2808 LOG (GNUNET_ERROR_TYPE_DEBUG,
2809 "decoding failed, sending larger ibf (size %u)\n",
2810 next_size);
2811 GNUNET_STATISTICS_update (_GSS_statistics,
2812 "# of IBF retries",
2813 1,
2814 GNUNET_NO);
2815#if MEASURE_PERFORMANCE
2816 perf_store.active_passive_switches += 1;
2817#endif
2818
2819 op->salt_send = op->salt_receive++;
2820
2821 if (GNUNET_OK !=
2822 send_ibf (op, next_size))
2823 {
2824 /* Internal error, best we can do is shut the connection */
2825 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2826 "Failed to send IBF, closing connection\n");
2827 fail_union_operation (op);
2828 ibf_destroy (diff_ibf);
2829 return GNUNET_SYSERR;
2830 }
2831 }
2832 else
2833 {
2834 GNUNET_STATISTICS_update (_GSS_statistics,
2835 "# of failed union operations (too large)",
2836 1,
2837 GNUNET_NO);
2838 // XXX: Send the whole set, element-by-element
2839 LOG (GNUNET_ERROR_TYPE_ERROR,
2840 "set union failed: reached ibf limit\n");
2841 fail_union_operation (op);
2842 ibf_destroy (diff_ibf);
2843 return GNUNET_SYSERR;
2844 }
2845 break;
2846 }
2847 if (GNUNET_NO == res)
2848 {
2849 struct GNUNET_MQ_Envelope *ev;
2850
2851 LOG (GNUNET_ERROR_TYPE_DEBUG,
2852 "transmitted all values, sending DONE\n");
2853
2854#if MEASURE_PERFORMANCE
2855 perf_store.done.sent += 1;
2856#endif
2857 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
2858 GNUNET_MQ_send (op->mq, ev);
2859 /* We now wait until we get a DONE message back
2860 * and then wait for our MQ to be flushed and all our
2861 * demands be delivered. */
2862 break;
2863 }
2864 if (1 == side)
2865 {
2866 struct IBF_Key unsalted_key;
2867 unsalt_key (&key,
2868 op->salt_receive,
2869 &unsalted_key);
2870 send_offers_for_key (op,
2871 unsalted_key);
2872 }
2873 else if (-1 == side)
2874 {
2875 struct GNUNET_MQ_Envelope *ev;
2876 struct InquiryMessage *msg;
2877
2878#if MEASURE_PERFORMANCE
2879 perf_store.inquery.sent += 1;
2880 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2881#endif
2882
2883 /** Add sent inquiries to hashmap for flow control **/
2884 struct GNUNET_HashContext *hashed_key_context =
2885 GNUNET_CRYPTO_hash_context_start ();
2886 struct GNUNET_HashCode *hashed_key = (struct
2887 GNUNET_HashCode*) GNUNET_malloc (
2888 sizeof(struct GNUNET_HashCode));
2889 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_SENT;
2890 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2891 &key,
2892 sizeof(struct IBF_Key));
2893 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2894 hashed_key);
2895 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2896 hashed_key,
2897 &mcfs,
2898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
2899 );
2900
2901 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
2902 * the effort additional complexity. */
2903 ev = GNUNET_MQ_msg_extra (msg,
2904 sizeof(struct IBF_Key),
2905 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY);
2906 msg->salt = htonl (op->salt_receive);
2907 GNUNET_memcpy (&msg[1],
2908 &key,
2909 sizeof(struct IBF_Key));
2910 LOG (GNUNET_ERROR_TYPE_DEBUG,
2911 "sending element inquiry for IBF key %lx\n",
2912 (unsigned long) key.key_val);
2913 GNUNET_MQ_send (op->mq, ev);
2914 }
2915 else
2916 {
2917 GNUNET_assert (0);
2918 }
2919 }
2920 ibf_destroy (diff_ibf);
2921 return GNUNET_OK;
2922}
2923
2924
2925/**
2926 * Check send full message received from other peer
2927 * @param cls
2928 * @param msg
2929 * @return
2930 */
2931
2932static int
2933check_union_p2p_send_full (void *cls,
2934 const struct TransmitFullMessage *msg)
2935{
2936 return GNUNET_OK;
2937}
2938
2939
2940/**
2941 * Handle send full message received from other peer
2942 *
2943 * @param cls
2944 * @param msg
2945 */
2946static void
2947handle_union_p2p_send_full (void *cls,
2948 const struct TransmitFullMessage *msg)
2949{
2950 struct Operation *op = cls;
2951
2952 /**
2953 * Check that the message is received only in supported phase
2954 */
2955 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2956 if (GNUNET_OK !=
2957 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2958 {
2959 GNUNET_break (0);
2960 fail_union_operation (op);
2961 return;
2962 }
2963
2964 /** write received values to operator**/
2965 op->remote_element_count = ntohl (msg->remote_set_size);
2966 op->remote_set_diff = ntohl (msg->remote_set_difference);
2967 op->local_set_diff = ntohl (msg->local_set_difference);
2968
2969 /** Check byzantine limits **/
2970 if (check_byzantine_bounds (op) != GNUNET_OK)
2971 {
2972 LOG (GNUNET_ERROR_TYPE_ERROR,
2973 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2974 "criteria\n");
2975 GNUNET_break_op (0);
2976 fail_union_operation (op);
2977 return;
2978 }
2979
2980 /** Calculate avg element size if not initial sync **/
2981 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2982 op->set->content->elements);
2983 uint64_t avg_element_size = 0;
2984 if (0 < op->local_element_count)
2985 {
2986 op->total_elements_size_local = 0;
2987 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2988 &
2989 determinate_avg_element_size_iterator,
2990 op);
2991 avg_element_size = op->total_elements_size_local / op->local_element_count;
2992 }
2993
2994 /** Validate mode of operation **/
2995 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2996 op->
2997 remote_element_count,
2998 op->
2999 local_element_count,
3000 op->local_set_diff,
3001 op->remote_set_diff,
3002 op->
3003 rtt_bandwidth_tradeoff,
3004 op->
3005 ibf_bucket_number_factor);
3006 if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation)
3007 {
3008 LOG (GNUNET_ERROR_TYPE_ERROR,
3009 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3010 " : %d\n", mode_of_operation);
3011 GNUNET_break_op (0);
3012 fail_union_operation (op);
3013 return;
3014 }
3015 op->phase = PHASE_FULL_RECEIVING;
3016}
3017
3018
3019/**
3020 * Check an IBF message from a remote peer.
3021 *
3022 * Reassemble the IBF from multiple pieces, and
3023 * process the whole IBF once possible.
3024 *
3025 * @param cls the union operation
3026 * @param msg the header of the message
3027 * @return #GNUNET_OK if @a msg is well-formed
3028 */
3029static int
3030check_union_p2p_ibf (void *cls,
3031 const struct IBFMessage *msg)
3032{
3033 struct Operation *op = cls;
3034 unsigned int buckets_in_message;
3035
3036 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3037 / IBF_BUCKET_SIZE;
3038 if (0 == buckets_in_message)
3039 {
3040 GNUNET_break_op (0);
3041 return GNUNET_SYSERR;
3042 }
3043 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
3044 * IBF_BUCKET_SIZE)
3045 {
3046 GNUNET_break_op (0);
3047 return GNUNET_SYSERR;
3048 }
3049 if (op->phase == PHASE_EXPECT_IBF_LAST)
3050 {
3051 if (ntohl (msg->offset) != op->ibf_buckets_received)
3052 {
3053 GNUNET_break_op (0);
3054 return GNUNET_SYSERR;
3055 }
3056
3057 if (msg->ibf_size != op->remote_ibf->size)
3058 {
3059 GNUNET_break_op (0);
3060 return GNUNET_SYSERR;
3061 }
3062 if (ntohl (msg->salt) != op->salt_receive)
3063 {
3064 GNUNET_break_op (0);
3065 return GNUNET_SYSERR;
3066 }
3067 }
3068 else if ((op->phase != PHASE_PASSIVE_DECODING) &&
3069 (op->phase != PHASE_EXPECT_IBF))
3070 {
3071 GNUNET_break_op (0);
3072 return GNUNET_SYSERR;
3073 }
3074
3075 return GNUNET_OK;
3076}
3077
3078
3079/**
3080 * Handle an IBF message from a remote peer.
3081 *
3082 * Reassemble the IBF from multiple pieces, and
3083 * process the whole IBF once possible.
3084 *
3085 * @param cls the union operation
3086 * @param msg the header of the message
3087 */
3088static void
3089handle_union_p2p_ibf (void *cls,
3090 const struct IBFMessage *msg)
3091{
3092 struct Operation *op = cls;
3093 unsigned int buckets_in_message;
3094 /**
3095 * Check that the message is received only in supported phase
3096 */
3097 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3098 PHASE_PASSIVE_DECODING};
3099 if (GNUNET_OK !=
3100 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3101 {
3102 GNUNET_break (0);
3103 fail_union_operation (op);
3104 return;
3105 }
3106 op->differential_sync_iterations++;
3107 check_max_differential_rounds (op);
3108 op->active_passive_switch_required = false;
3109
3110#if MEASURE_PERFORMANCE
3111 perf_store.ibf.received += 1;
3112 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3113#endif
3114
3115 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3116 / IBF_BUCKET_SIZE;
3117 if ((op->phase == PHASE_PASSIVE_DECODING) ||
3118 (op->phase == PHASE_EXPECT_IBF))
3119 {
3120 op->phase = PHASE_EXPECT_IBF_LAST;
3121 GNUNET_assert (NULL == op->remote_ibf);
3122 LOG (GNUNET_ERROR_TYPE_DEBUG,
3123 "Creating new ibf of size %u\n",
3124 ntohl (msg->ibf_size));
3125 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3126 op->remote_ibf = ibf_create (msg->ibf_size,
3127 ((uint8_t) op->ibf_number_buckets_per_element));
3128 op->salt_receive = ntohl (msg->salt);
3129 LOG (GNUNET_ERROR_TYPE_DEBUG,
3130 "Receiving new IBF with salt %u\n",
3131 op->salt_receive);
3132 if (NULL == op->remote_ibf)
3133 {
3134 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3135 "Failed to parse remote IBF, closing connection\n");
3136 fail_union_operation (op);
3137 return;
3138 }
3139 op->ibf_buckets_received = 0;
3140 if (0 != ntohl (msg->offset))
3141 {
3142 GNUNET_break_op (0);
3143 fail_union_operation (op);
3144 return;
3145 }
3146 }
3147 else
3148 {
3149 GNUNET_assert (op->phase == PHASE_EXPECT_IBF_LAST);
3150 LOG (GNUNET_ERROR_TYPE_DEBUG,
3151 "Received more of IBF\n");
3152 }
3153 GNUNET_assert (NULL != op->remote_ibf);
3154
3155 ibf_read_slice (&msg[1],
3156 op->ibf_buckets_received,
3157 buckets_in_message,
3158 op->remote_ibf, msg->ibf_counter_bit_length);
3159 op->ibf_buckets_received += buckets_in_message;
3160
3161 if (op->ibf_buckets_received == op->remote_ibf->size)
3162 {
3163 LOG (GNUNET_ERROR_TYPE_DEBUG,
3164 "received full ibf\n");
3165 op->phase = PHASE_ACTIVE_DECODING;
3166 if (GNUNET_OK !=
3167 decode_and_send (op))
3168 {
3169 /* Internal error, best we can do is shut down */
3170 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3171 "Failed to decode IBF, closing connection\n");
3172 fail_union_operation (op);
3173 return;
3174 }
3175 }
3176 GNUNET_CADET_receive_done (op->channel);
3177}
3178
3179
3180/**
3181 * Send a result message to the client indicating
3182 * that there is a new element.
3183 *
3184 * @param op union operation
3185 * @param element element to send
3186 * @param status status to send with the new element
3187 */
3188static void
3189send_client_element (struct Operation *op,
3190 const struct GNUNET_SETU_Element *element,
3191 enum GNUNET_SETU_Status status)
3192{
3193 struct GNUNET_MQ_Envelope *ev;
3194 struct GNUNET_SETU_ResultMessage *rm;
3195
3196 LOG (GNUNET_ERROR_TYPE_DEBUG,
3197 "sending element (size %u) to client\n",
3198 element->size);
3199 GNUNET_assert (0 != op->client_request_id);
3200 ev = GNUNET_MQ_msg_extra (rm,
3201 element->size,
3202 GNUNET_MESSAGE_TYPE_SETU_RESULT);
3203 if (NULL == ev)
3204 {
3205 GNUNET_MQ_discard (ev);
3206 GNUNET_break (0);
3207 return;
3208 }
3209 rm->result_status = htons (status);
3210 rm->request_id = htonl (op->client_request_id);
3211 rm->element_type = htons (element->element_type);
3212 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
3213 op->key_to_element));
3214 GNUNET_memcpy (&rm[1],
3215 element->data,
3216 element->size);
3217 GNUNET_MQ_send (op->set->cs->mq,
3218 ev);
3219}
3220
3221
3222/**
3223 * Tests if the operation is finished, and if so notify.
3224 *
3225 * @param op operation to check
3226 */
3227static void
3228maybe_finish (struct Operation *op)
3229{
3230 unsigned int num_demanded;
3231
3232 num_demanded = GNUNET_CONTAINER_multihashmap_size (
3233 op->demanded_hashes);
3234 int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3235 op->message_control_flow,
3236 &
3237 determinate_done_message_iterator,
3238 op);
3239 if (PHASE_FINISH_WAITING == op->phase)
3240 {
3241 LOG (GNUNET_ERROR_TYPE_DEBUG,
3242 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
3243 num_demanded, op->peer_site);
3244 if (-1 != send_done)
3245 {
3246 struct GNUNET_MQ_Envelope *ev;
3247
3248 op->phase = PHASE_FINISHED;
3249#if MEASURE_PERFORMANCE
3250 perf_store.done.sent += 1;
3251#endif
3252 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
3253 GNUNET_MQ_send (op->mq,
3254 ev);
3255 /* We now wait until the other peer sends P2P_OVER
3256 * after it got all elements from us. */
3257 }
3258 }
3259 if (PHASE_FINISH_CLOSING == op->phase)
3260 {
3261 LOG (GNUNET_ERROR_TYPE_DEBUG,
3262 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
3263 num_demanded, op->peer_site);
3264 if (-1 != send_done)
3265 {
3266 op->phase = PHASE_FINISHED;
3267 send_client_done (op);
3268 _GSS_operation_destroy2 (op);
3269 }
3270 }
3271}
3272
3273
3274/**
3275 * Check an element message from a remote peer.
3276 *
3277 * @param cls the union operation
3278 * @param emsg the message
3279 */
3280static int
3281check_union_p2p_elements (void *cls,
3282 const struct GNUNET_SETU_ElementMessage *emsg)
3283{
3284 struct Operation *op = cls;
3285
3286 if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
3287 {
3288 GNUNET_break_op (0);
3289 return GNUNET_SYSERR;
3290 }
3291 return GNUNET_OK;
3292}
3293
3294
3295/**
3296 * Handle an element message from a remote peer.
3297 * Sent by the other peer either because we decoded an IBF and placed a demand,
3298 * or because the other peer switched to full set transmission.
3299 *
3300 * @param cls the union operation
3301 * @param emsg the message
3302 */
3303static void
3304handle_union_p2p_elements (void *cls,
3305 const struct GNUNET_SETU_ElementMessage *emsg)
3306{
3307 struct Operation *op = cls;
3308 struct ElementEntry *ee;
3309 struct KeyEntry *ke;
3310 uint16_t element_size;
3311
3312 /**
3313 * Check that the message is received only in supported phase
3314 */
3315 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3316 PHASE_FINISH_WAITING, PHASE_FINISH_CLOSING};
3317 if (GNUNET_OK !=
3318 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3319 {
3320 GNUNET_break (0);
3321 fail_union_operation (op);
3322 return;
3323 }
3324
3325 element_size = ntohs (emsg->header.size) - sizeof(struct
3326 GNUNET_SETU_ElementMessage);
3327#if MEASURE_PERFORMANCE
3328 perf_store.element.received += 1;
3329 perf_store.element.received_var_bytes += element_size;
3330#endif
3331
3332 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3333 GNUNET_memcpy (&ee[1],
3334 &emsg[1],
3335 element_size);
3336 ee->element.size = element_size;
3337 ee->element.data = &ee[1];
3338 ee->element.element_type = ntohs (emsg->element_type);
3339 ee->remote = GNUNET_YES;
3340 GNUNET_SETU_element_hash (&ee->element,
3341 &ee->element_hash);
3342 if (GNUNET_NO ==
3343 GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
3344 &ee->element_hash,
3345 NULL))
3346 {
3347 /* We got something we didn't demand, since it's not in our map. */
3348 GNUNET_break_op (0);
3349 fail_union_operation (op);
3350 return;
3351 }
3352
3353 if (GNUNET_OK !=
3354 update_message_control_flow (
3355 op->message_control_flow,
3356 MSG_CFS_RECEIVED,
3357 &ee->element_hash,
3358 ELEMENT_MESSAGE)
3359 )
3360 {
3361 LOG (GNUNET_ERROR_TYPE_ERROR,
3362 "An element has been received more than once!\n");
3363 GNUNET_break (0);
3364 fail_union_operation (op);
3365 return;
3366 }
3367
3368 LOG (GNUNET_ERROR_TYPE_DEBUG,
3369 "Got element (size %u, hash %s) from peer\n",
3370 (unsigned int) element_size,
3371 GNUNET_h2s (&ee->element_hash));
3372
3373 GNUNET_STATISTICS_update (_GSS_statistics,
3374 "# received elements",
3375 1,
3376 GNUNET_NO);
3377 GNUNET_STATISTICS_update (_GSS_statistics,
3378 "# exchanged elements",
3379 1,
3380 GNUNET_NO);
3381
3382 op->received_total++;
3383
3384 ke = op_get_element (op,
3385 &ee->element_hash);
3386 if (NULL != ke)
3387 {
3388 /* Got repeated element. Should not happen since
3389 * we track demands. */
3390 GNUNET_STATISTICS_update (_GSS_statistics,
3391 "# repeated elements",
3392 1,
3393 GNUNET_NO);
3394 ke->received = GNUNET_YES;
3395 GNUNET_free (ee);
3396 }
3397 else
3398 {
3399 LOG (GNUNET_ERROR_TYPE_DEBUG,
3400 "Registering new element from remote peer\n");
3401 op->received_fresh++;
3402 op_register_element (op, ee, GNUNET_YES);
3403 /* only send results immediately if the client wants it */
3404 send_client_element (op,
3405 &ee->element,
3406 GNUNET_SETU_STATUS_ADD_LOCAL);
3407 }
3408
3409 if ((op->received_total > 8) &&
3410 (op->received_fresh < op->received_total / 3))
3411 {
3412 /* The other peer gave us lots of old elements, there's something wrong. */
3413 GNUNET_break_op (0);
3414 fail_union_operation (op);
3415 return;
3416 }
3417 GNUNET_CADET_receive_done (op->channel);
3418 maybe_finish (op);
3419}
3420
3421
3422/**
3423 * Check a full element message from a remote peer.
3424 *
3425 * @param cls the union operation
3426 * @param emsg the message
3427 */
3428static int
3429check_union_p2p_full_element (void *cls,
3430 const struct GNUNET_SETU_ElementMessage *emsg)
3431{
3432 struct Operation *op = cls;
3433
3434 (void) op;
3435
3436 // FIXME: check that we expect full elements here?
3437 return GNUNET_OK;
3438}
3439
3440
3441/**
3442 * Handle an element message from a remote peer.
3443 *
3444 * @param cls the union operation
3445 * @param emsg the message
3446 */
3447static void
3448handle_union_p2p_full_element (void *cls,
3449 const struct GNUNET_SETU_ElementMessage *emsg)
3450{
3451 struct Operation *op = cls;
3452 struct ElementEntry *ee;
3453 struct KeyEntry *ke;
3454 uint16_t element_size;
3455
3456 /**
3457 * Check that the message is received only in supported phase
3458 */
3459 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
3460 if (GNUNET_OK !=
3461 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3462 {
3463 GNUNET_break (0);
3464 fail_union_operation (op);
3465 return;
3466 }
3467
3468 element_size = ntohs (emsg->header.size)
3469 - sizeof(struct GNUNET_SETU_ElementMessage);
3470
3471#if MEASURE_PERFORMANCE
3472 perf_store.element_full.received += 1;
3473 perf_store.element_full.received_var_bytes += element_size;
3474#endif
3475
3476 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3477 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
3478 ee->element.size = element_size;
3479 ee->element.data = &ee[1];
3480 ee->element.element_type = ntohs (emsg->element_type);
3481 ee->remote = GNUNET_YES;
3482 GNUNET_SETU_element_hash (&ee->element,
3483 &ee->element_hash);
3484 LOG (GNUNET_ERROR_TYPE_DEBUG,
3485 "Got element (full diff, size %u, hash %s) from peer\n",
3486 (unsigned int) element_size,
3487 GNUNET_h2s (&ee->element_hash));
3488
3489 GNUNET_STATISTICS_update (_GSS_statistics,
3490 "# received elements",
3491 1,
3492 GNUNET_NO);
3493 GNUNET_STATISTICS_update (_GSS_statistics,
3494 "# exchanged elements",
3495 1,
3496 GNUNET_NO);
3497
3498 op->received_total++;
3499 ke = op_get_element (op,
3500 &ee->element_hash);
3501 if (NULL != ke)
3502 {
3503 GNUNET_STATISTICS_update (_GSS_statistics,
3504 "# repeated elements",
3505 1,
3506 GNUNET_NO);
3507 full_sync_plausibility_check (op);
3508 ke->received = GNUNET_YES;
3509 GNUNET_free (ee);
3510 }
3511 else
3512 {
3513 LOG (GNUNET_ERROR_TYPE_DEBUG,
3514 "Registering new element from remote peer\n");
3515 op->received_fresh++;
3516 op_register_element (op, ee, GNUNET_YES);
3517 /* only send results immediately if the client wants it */
3518 send_client_element (op,
3519 &ee->element,
3520 GNUNET_SETU_STATUS_ADD_LOCAL);
3521 }
3522
3523
3524 if ((GNUNET_YES == op->byzantine) &&
3525 (op->received_total > op->remote_element_count) )
3526 {
3527 /* The other peer gave us lots of old elements, there's something wrong. */
3528 LOG (GNUNET_ERROR_TYPE_ERROR,
3529 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
3530 (unsigned long long) op->received_total,
3531 (unsigned long long) op->remote_element_count);
3532 GNUNET_break_op (0);
3533 fail_union_operation (op);
3534 return;
3535 }
3536 GNUNET_CADET_receive_done (op->channel);
3537}
3538
3539
3540/**
3541 * Send offers (for GNUNET_Hash-es) in response
3542 * to inquiries (for IBF_Key-s).
3543 *
3544 * @param cls the union operation
3545 * @param msg the message
3546 */
3547static int
3548check_union_p2p_inquiry (void *cls,
3549 const struct InquiryMessage *msg)
3550{
3551 struct Operation *op = cls;
3552 unsigned int num_keys;
3553
3554 if (op->phase != PHASE_PASSIVE_DECODING)
3555 {
3556 GNUNET_break_op (0);
3557 return GNUNET_SYSERR;
3558 }
3559 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3560 / sizeof(struct IBF_Key);
3561 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3562 != num_keys * sizeof(struct IBF_Key))
3563 {
3564 GNUNET_break_op (0);
3565 return GNUNET_SYSERR;
3566 }
3567 return GNUNET_OK;
3568}
3569
3570
3571/**
3572 * Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
3573 *
3574 * @param cls the union operation
3575 * @param msg the message
3576 */
3577static void
3578handle_union_p2p_inquiry (void *cls,
3579 const struct InquiryMessage *msg)
3580{
3581 struct Operation *op = cls;
3582 const struct IBF_Key *ibf_key;
3583 unsigned int num_keys;
3584
3585 /**
3586 * Check that the message is received only in supported phase
3587 */
3588 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3589 if (GNUNET_OK !=
3590 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3591 {
3592 GNUNET_break (0);
3593 fail_union_operation (op);
3594 return;
3595 }
3596
3597#if MEASURE_PERFORMANCE
3598 perf_store.inquery.received += 1;
3599 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3600 - sizeof(struct InquiryMessage));
3601#endif
3602
3603 LOG (GNUNET_ERROR_TYPE_DEBUG,
3604 "Received union inquiry\n");
3605 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3606 / sizeof(struct IBF_Key);
3607 ibf_key = (const struct IBF_Key *) &msg[1];
3608
3609 /** Add received inquiries to hashmap for flow control **/
3610 struct GNUNET_HashContext *hashed_key_context =
3611 GNUNET_CRYPTO_hash_context_start ();
3612 struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3613 sizeof(struct GNUNET_HashCode));;
3614 enum MESSAGE_CONTROL_FLOW_STATE mcfs = MSG_CFS_RECEIVED;
3615 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3616 &ibf_key,
3617 sizeof(struct IBF_Key));
3618 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3619 hashed_key);
3620 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3621 hashed_key,
3622 &mcfs,
3623 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
3624 );
3625
3626 while (0 != num_keys--)
3627 {
3628 struct IBF_Key unsalted_key;
3629 unsalt_key (ibf_key,
3630 ntohl (msg->salt),
3631 &unsalted_key);
3632 send_offers_for_key (op,
3633 unsalted_key);
3634 ibf_key++;
3635 }
3636 GNUNET_CADET_receive_done (op->channel);
3637}
3638
3639
3640/**
3641 * Iterator over hash map entries, called to destroy the linked list of
3642 * colliding ibf key entries.
3643 *
3644 * @param cls closure
3645 * @param key current key code
3646 * @param value value in the hash map
3647 * @return #GNUNET_YES if we should continue to iterate,
3648 * #GNUNET_NO if not.
3649 */
3650static int
3651send_missing_full_elements_iter (void *cls,
3652 uint32_t key,
3653 void *value)
3654{
3655 struct Operation *op = cls;
3656 struct KeyEntry *ke = value;
3657 struct GNUNET_MQ_Envelope *ev;
3658 struct GNUNET_SETU_ElementMessage *emsg;
3659 struct ElementEntry *ee = ke->element;
3660
3661 if (GNUNET_YES == ke->received)
3662 return GNUNET_YES;
3663#if MEASURE_PERFORMANCE
3664 perf_store.element_full.received += 1;
3665#endif
3666 ev = GNUNET_MQ_msg_extra (emsg,
3667 ee->element.size,
3668 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
3669 GNUNET_memcpy (&emsg[1],
3670 ee->element.data,
3671 ee->element.size);
3672 emsg->element_type = htons (ee->element.element_type);
3673 GNUNET_MQ_send (op->mq,
3674 ev);
3675 return GNUNET_YES;
3676}
3677
3678
3679/**
3680 * Handle a request for full set transmission.
3681 *
3682 * @param cls closure, a set union operation
3683 * @param mh the demand message
3684 */
3685static int
3686check_union_p2p_request_full (void *cls,
3687 const struct TransmitFullMessage *mh)
3688{
3689 return GNUNET_OK;
3690}
3691
3692
3693static void
3694handle_union_p2p_request_full (void *cls,
3695 const struct TransmitFullMessage *msg)
3696{
3697 struct Operation *op = cls;
3698
3699 /**
3700 * Check that the message is received only in supported phase
3701 */
3702 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3703 if (GNUNET_OK !=
3704 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3705 {
3706 GNUNET_break (0);
3707 fail_union_operation (op);
3708 return;
3709 }
3710
3711 op->remote_element_count = ntohl (msg->remote_set_size);
3712 op->remote_set_diff = ntohl (msg->remote_set_difference);
3713 op->local_set_diff = ntohl (msg->local_set_difference);
3714
3715
3716 if (check_byzantine_bounds (op) != GNUNET_OK)
3717 {
3718 LOG (GNUNET_ERROR_TYPE_ERROR,
3719 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3720 "criteria\n");
3721 GNUNET_break_op (0);
3722 fail_union_operation (op);
3723 return;
3724 }
3725
3726#if MEASURE_PERFORMANCE
3727 perf_store.request_full.received += 1;
3728#endif
3729
3730 LOG (GNUNET_ERROR_TYPE_DEBUG,
3731 "Received request for full set transmission\n");
3732
3733 /** Calculate avg element size if not initial sync **/
3734 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3735 op->set->content->elements);
3736 uint64_t avg_element_size = 0;
3737 if (0 < op->local_element_count)
3738 {
3739 op->total_elements_size_local = 0;
3740 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3741 &
3742 determinate_avg_element_size_iterator,
3743 op);
3744 avg_element_size = op->total_elements_size_local / op->local_element_count;
3745 }
3746
3747 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3748 op->
3749 remote_element_count,
3750 op->
3751 local_element_count,
3752 op->local_set_diff,
3753 op->remote_set_diff,
3754 op->
3755 rtt_bandwidth_tradeoff,
3756 op->
3757 ibf_bucket_number_factor);
3758 if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation)
3759 {
3760 LOG (GNUNET_ERROR_TYPE_ERROR,
3761 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3762 " : %d\n", mode_of_operation);
3763 GNUNET_break_op (0);
3764 fail_union_operation (op);
3765 return;
3766 }
3767
3768 // FIXME: we need to check that our set is larger than the
3769 // byzantine_lower_bound by some threshold
3770 send_full_set (op);
3771 GNUNET_CADET_receive_done (op->channel);
3772}
3773
3774
3775/**
3776 * Handle a "full done" message.
3777 *
3778 * @param cls closure, a set union operation
3779 * @param mh the demand message
3780 */
3781static void
3782handle_union_p2p_full_done (void *cls,
3783 const struct GNUNET_MessageHeader *mh)
3784{
3785 struct Operation *op = cls;
3786
3787 /**
3788 * Check that the message is received only in supported phase
3789 */
3790 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3791 if (GNUNET_OK !=
3792 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3793 {
3794 GNUNET_break (0);
3795 fail_union_operation (op);
3796 return;
3797 }
3798
3799#if MEASURE_PERFORMANCE
3800 perf_store.full_done.received += 1;
3801#endif
3802
3803 switch (op->phase)
3804 {
3805 case PHASE_FULL_RECEIVING:
3806 {
3807 struct GNUNET_MQ_Envelope *ev;
3808
3809 if ((GNUNET_YES == op->byzantine) &&
3810 (op->received_total != op->remote_element_count) )
3811 {
3812 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3813 LOG (GNUNET_ERROR_TYPE_ERROR,
3814 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3815 (unsigned long long) op->received_total,
3816 (unsigned long long) op->remote_element_count);
3817 GNUNET_break_op (0);
3818 fail_union_operation (op);
3819 return;
3820 }
3821
3822 LOG (GNUNET_ERROR_TYPE_DEBUG,
3823 "got FULL DONE, sending elements that other peer is missing\n");
3824
3825 /* send all the elements that did not come from the remote peer */
3826 GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
3827 &send_missing_full_elements_iter,
3828 op);
3829#if MEASURE_PERFORMANCE
3830 perf_store.full_done.sent += 1;
3831#endif
3832 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
3833 GNUNET_MQ_send (op->mq,
3834 ev);
3835 op->phase = PHASE_FINISHED;
3836 /* we now wait until the other peer sends us the OVER message*/
3837 }
3838 break;
3839
3840 case PHASE_FULL_SENDING:
3841 {
3842 LOG (GNUNET_ERROR_TYPE_DEBUG,
3843 "got FULL DONE, finishing\n");
3844 /* We sent the full set, and got the response for that. We're done. */
3845 op->phase = PHASE_FINISHED;
3846 GNUNET_CADET_receive_done (op->channel);
3847 send_client_done (op);
3848 _GSS_operation_destroy2 (op);
3849 return;
3850 }
3851
3852 default:
3853 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3854 "Handle full done phase is %u\n",
3855 (unsigned) op->phase);
3856 GNUNET_break_op (0);
3857 fail_union_operation (op);
3858 return;
3859 }
3860 GNUNET_CADET_receive_done (op->channel);
3861}
3862
3863
3864/**
3865 * Check a demand by the other peer for elements based on a list
3866 * of `struct GNUNET_HashCode`s.
3867 *
3868 * @param cls closure, a set union operation
3869 * @param mh the demand message
3870 * @return #GNUNET_OK if @a mh is well-formed
3871 */
3872static int
3873check_union_p2p_demand (void *cls,
3874 const struct GNUNET_MessageHeader *mh)
3875{
3876 struct Operation *op = cls;
3877 unsigned int num_hashes;
3878
3879 (void) op;
3880 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3881 / sizeof(struct GNUNET_HashCode);
3882 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3883 != num_hashes * sizeof(struct GNUNET_HashCode))
3884 {
3885 GNUNET_break_op (0);
3886 return GNUNET_SYSERR;
3887 }
3888 return GNUNET_OK;
3889}
3890
3891
3892/**
3893 * Handle a demand by the other peer for elements based on a list
3894 * of `struct GNUNET_HashCode`s.
3895 *
3896 * @param cls closure, a set union operation
3897 * @param mh the demand message
3898 */
3899static void
3900handle_union_p2p_demand (void *cls,
3901 const struct GNUNET_MessageHeader *mh)
3902{
3903 struct Operation *op = cls;
3904 struct ElementEntry *ee;
3905 struct GNUNET_SETU_ElementMessage *emsg;
3906 const struct GNUNET_HashCode *hash;
3907 unsigned int num_hashes;
3908 struct GNUNET_MQ_Envelope *ev;
3909
3910 /**
3911 * Check that the message is received only in supported phase
3912 */
3913 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3914 PHASE_FINISH_WAITING};
3915 if (GNUNET_OK !=
3916 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3917 {
3918 GNUNET_break (0);
3919 fail_union_operation (op);
3920 return;
3921 }
3922#if MEASURE_PERFORMANCE
3923 perf_store.demand.received += 1;
3924 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3925 GNUNET_MessageHeader));
3926#endif
3927
3928 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3929 / sizeof(struct GNUNET_HashCode);
3930 for (hash = (const struct GNUNET_HashCode *) &mh[1];
3931 num_hashes > 0;
3932 hash++, num_hashes--)
3933 {
3934 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
3935 hash);
3936 if (NULL == ee)
3937 {
3938 /* Demand for non-existing element. */
3939 GNUNET_break_op (0);
3940 fail_union_operation (op);
3941 return;
3942 }
3943
3944 /* Save send demand message for message control */
3945 if (GNUNET_YES !=
3946 update_message_control_flow (
3947 op->message_control_flow,
3948 MSG_CFS_RECEIVED,
3949 &ee->element_hash,
3950 DEMAND_MESSAGE)
3951 )
3952 {
3953 LOG (GNUNET_ERROR_TYPE_ERROR,
3954 "Double demand message received found!\n");
3955 GNUNET_break (0);
3956 fail_union_operation (op);
3957 return;
3958 }
3959 ;
3960
3961 /* Mark element to be expected to received */
3962 if (GNUNET_YES !=
3963 update_message_control_flow (
3964 op->message_control_flow,
3965 MSG_CFS_SENT,
3966 &ee->element_hash,
3967 ELEMENT_MESSAGE)
3968 )
3969 {
3970 LOG (GNUNET_ERROR_TYPE_ERROR,
3971 "Double element message sent found!\n");
3972 GNUNET_break (0);
3973 fail_union_operation (op);
3974 return;
3975 }
3976 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
3977 {
3978 /* Probably confused lazily copied sets. */
3979 GNUNET_break_op (0);
3980 fail_union_operation (op);
3981 return;
3982 }
3983#if MEASURE_PERFORMANCE
3984 perf_store.element.sent += 1;
3985 perf_store.element.sent_var_bytes += ee->element.size;
3986#endif
3987 ev = GNUNET_MQ_msg_extra (emsg,
3988 ee->element.size,
3989 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
3990 GNUNET_memcpy (&emsg[1],
3991 ee->element.data,
3992 ee->element.size);
3993 emsg->reserved = htons (0);
3994 emsg->element_type = htons (ee->element.element_type);
3995 LOG (GNUNET_ERROR_TYPE_DEBUG,
3996 "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
3997 op,
3998 (unsigned int) ee->element.size,
3999 GNUNET_h2s (&ee->element_hash));
4000 GNUNET_MQ_send (op->mq, ev);
4001 GNUNET_STATISTICS_update (_GSS_statistics,
4002 "# exchanged elements",
4003 1,
4004 GNUNET_NO);
4005 if (op->symmetric)
4006 send_client_element (op,
4007 &ee->element,
4008 GNUNET_SETU_STATUS_ADD_REMOTE);
4009 }
4010 GNUNET_CADET_receive_done (op->channel);
4011 maybe_finish (op);
4012}
4013
4014
4015/**
4016 * Check offer (of `struct GNUNET_HashCode`s).
4017 *
4018 * @param cls the union operation
4019 * @param mh the message
4020 * @return #GNUNET_OK if @a mh is well-formed
4021 */
4022static int
4023check_union_p2p_offer (void *cls,
4024 const struct GNUNET_MessageHeader *mh)
4025{
4026 struct Operation *op = cls;
4027 unsigned int num_hashes;
4028
4029 /* look up elements and send them */
4030 if ((op->phase != PHASE_PASSIVE_DECODING) &&
4031 (op->phase != PHASE_ACTIVE_DECODING))
4032 {
4033 GNUNET_break_op (0);
4034 return GNUNET_SYSERR;
4035 }
4036 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4037 / sizeof(struct GNUNET_HashCode);
4038 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
4039 num_hashes * sizeof(struct GNUNET_HashCode))
4040 {
4041 GNUNET_break_op (0);
4042 return GNUNET_SYSERR;
4043 }
4044 return GNUNET_OK;
4045}
4046
4047
4048/**
4049 * Handle offers (of `struct GNUNET_HashCode`s) and
4050 * respond with demands (of `struct GNUNET_HashCode`s).
4051 *
4052 * @param cls the union operation
4053 * @param mh the message
4054 */
4055static void
4056handle_union_p2p_offer (void *cls,
4057 const struct GNUNET_MessageHeader *mh)
4058{
4059 struct Operation *op = cls;
4060 const struct GNUNET_HashCode *hash;
4061 unsigned int num_hashes;
4062 /**
4063 * Check that the message is received only in supported phase
4064 */
4065 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4066 if (GNUNET_OK !=
4067 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4068 {
4069 GNUNET_break (0);
4070 fail_union_operation (op);
4071 return;
4072 }
4073
4074#if MEASURE_PERFORMANCE
4075 perf_store.offer.received += 1;
4076 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4077 GNUNET_MessageHeader));
4078#endif
4079
4080 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4081 / sizeof(struct GNUNET_HashCode);
4082 for (hash = (const struct GNUNET_HashCode *) &mh[1];
4083 num_hashes > 0;
4084 hash++, num_hashes--)
4085 {
4086 struct ElementEntry *ee;
4087 struct GNUNET_MessageHeader *demands;
4088 struct GNUNET_MQ_Envelope *ev;
4089
4090 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
4091 hash);
4092 if (NULL != ee)
4093 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
4094 continue;
4095
4096 if (GNUNET_YES ==
4097 GNUNET_CONTAINER_multihashmap_contains (op->demanded_hashes,
4098 hash))
4099 {
4100 LOG (GNUNET_ERROR_TYPE_DEBUG,
4101 "Skipped sending duplicate demand\n");
4102 continue;
4103 }
4104
4105 GNUNET_assert (GNUNET_OK ==
4106 GNUNET_CONTAINER_multihashmap_put (
4107 op->demanded_hashes,
4108 hash,
4109 NULL,
4110 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
4111
4112 LOG (GNUNET_ERROR_TYPE_DEBUG,
4113 "[OP %p] Requesting element (hash %s)\n",
4114 op, GNUNET_h2s (hash));
4115
4116#if MEASURE_PERFORMANCE
4117 perf_store.demand.sent += 1;
4118 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4119#endif
4120 /* Save send demand message for message control */
4121 if (GNUNET_YES !=
4122 update_message_control_flow (
4123 op->message_control_flow,
4124 MSG_CFS_SENT,
4125 hash,
4126 DEMAND_MESSAGE))
4127 {
4128 LOG (GNUNET_ERROR_TYPE_ERROR,
4129 "Double demand message sent found!\n");
4130 GNUNET_break (0);
4131 fail_union_operation (op);
4132 return;
4133 }
4134
4135 /* Mark offer as received received */
4136 if (GNUNET_YES !=
4137 update_message_control_flow (
4138 op->message_control_flow,
4139 MSG_CFS_RECEIVED,
4140 hash,
4141 OFFER_MESSAGE))
4142 {
4143 LOG (GNUNET_ERROR_TYPE_ERROR,
4144 "Double offer message received found!\n");
4145 GNUNET_break (0);
4146 fail_union_operation (op);
4147 return;
4148 }
4149 /* Mark element to be expected to received */
4150 if (GNUNET_YES !=
4151 update_message_control_flow (
4152 op->message_control_flow,
4153 MSG_CFS_EXPECTED,
4154 hash,
4155 ELEMENT_MESSAGE))
4156 {
4157 LOG (GNUNET_ERROR_TYPE_ERROR,
4158 "Element already expected!\n");
4159 GNUNET_break (0);
4160 fail_union_operation (op);
4161 return;
4162 }
4163 ev = GNUNET_MQ_msg_header_extra (demands,
4164 sizeof(struct GNUNET_HashCode),
4165 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
4166 GNUNET_memcpy (&demands[1],
4167 hash,
4168 sizeof(struct GNUNET_HashCode));
4169 GNUNET_MQ_send (op->mq, ev);
4170 }
4171 GNUNET_CADET_receive_done (op->channel);
4172}
4173
4174
4175/**
4176 * Handle a done message from a remote peer
4177 *
4178 * @param cls the union operation
4179 * @param mh the message
4180 */
4181static void
4182handle_union_p2p_done (void *cls,
4183 const struct GNUNET_MessageHeader *mh)
4184{
4185 struct Operation *op = cls;
4186
4187 /**
4188 * Check that the message is received only in supported phase
4189 */
4190 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4191 if (GNUNET_OK !=
4192 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4193 {
4194 GNUNET_break (0);
4195 fail_union_operation (op);
4196 return;
4197 }
4198
4199 if (op->active_passive_switch_required)
4200 {
4201 LOG (GNUNET_ERROR_TYPE_ERROR,
4202 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4203 GNUNET_break (0);
4204 fail_union_operation (op);
4205 return;
4206 }
4207
4208#if MEASURE_PERFORMANCE
4209 perf_store.done.received += 1;
4210#endif
4211 switch (op->phase)
4212 {
4213 case PHASE_PASSIVE_DECODING:
4214 /* We got all requests, but still have to send our elements in response. */
4215 op->phase = PHASE_FINISH_WAITING;
4216 LOG (GNUNET_ERROR_TYPE_DEBUG,
4217 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
4218 /* The active peer is done sending offers
4219 * and inquiries. This means that all
4220 * our responses to that (demands and offers)
4221 * must be in flight (queued or in mesh).
4222 *
4223 * We should notify the active peer once
4224 * all our demands are satisfied, so that the active
4225 * peer can quit if we gave it everything.
4226 */GNUNET_CADET_receive_done (op->channel);
4227 maybe_finish (op);
4228 return;
4229 case PHASE_ACTIVE_DECODING:
4230 LOG (GNUNET_ERROR_TYPE_DEBUG,
4231 "got DONE (as active partner), waiting to finish\n");
4232 /* All demands of the other peer are satisfied,
4233 * and we processed all offers, thus we know
4234 * exactly what our demands must be.
4235 *
4236 * We'll close the channel
4237 * to the other peer once our demands are met.
4238 */op->phase = PHASE_FINISH_CLOSING;
4239 GNUNET_CADET_receive_done (op->channel);
4240 maybe_finish (op);
4241 return;
4242 default:
4243 GNUNET_break_op (0);
4244 fail_union_operation (op);
4245 return;
4246 }
4247}
4248
4249
4250/**
4251 * Handle a over message from a remote peer
4252 *
4253 * @param cls the union operation
4254 * @param mh the message
4255 */
4256static void
4257handle_union_p2p_over (void *cls,
4258 const struct GNUNET_MessageHeader *mh)
4259{
4260#if MEASURE_PERFORMANCE
4261 perf_store.over.received += 1;
4262#endif
4263 send_client_done (cls);
4264}
4265
4266
4267/**
4268 * Get the incoming socket associated with the given id.
4269 *
4270 * @param listener the listener to look in
4271 * @param id id to look for
4272 * @return the incoming socket associated with the id,
4273 * or NULL if there is none
4274 */
4275static struct Operation *
4276get_incoming (uint32_t id)
4277{
4278 for (struct Listener *listener = listener_head;
4279 NULL != listener;
4280 listener = listener->next)
4281 {
4282 for (struct Operation *op = listener->op_head;
4283 NULL != op;
4284 op = op->next)
4285 if (op->suggest_id == id)
4286 return op;
4287 }
4288 return NULL;
4289}
4290
4291
4292/**
4293 * Callback called when a client connects to the service.
4294 *
4295 * @param cls closure for the service
4296 * @param c the new client that connected to the service
4297 * @param mq the message queue used to send messages to the client
4298 * @return @a `struct ClientState`
4299 */
4300static void *
4301client_connect_cb (void *cls,
4302 struct GNUNET_SERVICE_Client *c,
4303 struct GNUNET_MQ_Handle *mq)
4304{
4305 struct ClientState *cs;
4306
4307 num_clients++;
4308 cs = GNUNET_new (struct ClientState);
4309 cs->client = c;
4310 cs->mq = mq;
4311 return cs;
4312}
4313
4314
4315/**
4316 * Iterator over hash map entries to free element entries.
4317 *
4318 * @param cls closure
4319 * @param key current key code
4320 * @param value a `struct ElementEntry *` to be free'd
4321 * @return #GNUNET_YES (continue to iterate)
4322 */
4323static int
4324destroy_elements_iterator (void *cls,
4325 const struct GNUNET_HashCode *key,
4326 void *value)
4327{
4328 struct ElementEntry *ee = value;
4329
4330 GNUNET_free (ee);
4331 return GNUNET_YES;
4332}
4333
4334
4335/**
4336 * Clean up after a client has disconnected
4337 *
4338 * @param cls closure, unused
4339 * @param client the client to clean up after
4340 * @param internal_cls the `struct ClientState`
4341 */
4342static void
4343client_disconnect_cb (void *cls,
4344 struct GNUNET_SERVICE_Client *client,
4345 void *internal_cls)
4346{
4347 struct ClientState *cs = internal_cls;
4348 struct Operation *op;
4349 struct Listener *listener;
4350 struct Set *set;
4351
4352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4353 "Client disconnected, cleaning up\n");
4354 if (NULL != (set = cs->set))
4355 {
4356 struct SetContent *content = set->content;
4357
4358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4359 "Destroying client's set\n");
4360 /* Destroy pending set operations */
4361 while (NULL != set->ops_head)
4362 _GSS_operation_destroy (set->ops_head);
4363
4364 /* Destroy operation-specific state */
4365 if (NULL != set->se)
4366 {
4367 strata_estimator_destroy (set->se);
4368 set->se = NULL;
4369 }
4370 /* free set content (or at least decrement RC) */
4371 set->content = NULL;
4372 GNUNET_assert (0 != content->refcount);
4373 content->refcount--;
4374 if (0 == content->refcount)
4375 {
4376 GNUNET_assert (NULL != content->elements);
4377 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
4378 &destroy_elements_iterator,
4379 NULL);
4380 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
4381 content->elements = NULL;
4382 GNUNET_free (content);
4383 }
4384 GNUNET_free (set);
4385 }
4386
4387 if (NULL != (listener = cs->listener))
4388 {
4389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4390 "Destroying client's listener\n");
4391 GNUNET_CADET_close_port (listener->open_port);
4392 listener->open_port = NULL;
4393 while (NULL != (op = listener->op_head))
4394 {
4395 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4396 "Destroying incoming operation `%u' from peer `%s'\n",
4397 (unsigned int) op->client_request_id,
4398 GNUNET_i2s (&op->peer));
4399 incoming_destroy (op);
4400 }
4401 GNUNET_CONTAINER_DLL_remove (listener_head,
4402 listener_tail,
4403 listener);
4404 GNUNET_free (listener);
4405 }
4406 GNUNET_free (cs);
4407 num_clients--;
4408 if ( (GNUNET_YES == in_shutdown) &&
4409 (0 == num_clients) )
4410 {
4411 if (NULL != cadet)
4412 {
4413 GNUNET_CADET_disconnect (cadet);
4414 cadet = NULL;
4415 }
4416 }
4417}
4418
4419
4420/**
4421 * Check a request for a set operation from another peer.
4422 *
4423 * @param cls the operation state
4424 * @param msg the received message
4425 * @return #GNUNET_OK if the channel should be kept alive,
4426 * #GNUNET_SYSERR to destroy the channel
4427 */
4428static int
4429check_incoming_msg (void *cls,
4430 const struct OperationRequestMessage *msg)
4431{
4432 struct Operation *op = cls;
4433 struct Listener *listener = op->listener;
4434 const struct GNUNET_MessageHeader *nested_context;
4435
4436 /* double operation request */
4437 if (0 != op->suggest_id)
4438 {
4439 GNUNET_break_op (0);
4440 return GNUNET_SYSERR;
4441 }
4442 /* This should be equivalent to the previous condition, but can't hurt to check twice */
4443 if (NULL == listener)
4444 {
4445 GNUNET_break (0);
4446 return GNUNET_SYSERR;
4447 }
4448 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4449 if ((NULL != nested_context) &&
4450 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
4451 {
4452 GNUNET_break_op (0);
4453 return GNUNET_SYSERR;
4454 }
4455 return GNUNET_OK;
4456}
4457
4458
4459/**
4460 * Handle a request for a set operation from another peer. Checks if we
4461 * have a listener waiting for such a request (and in that case initiates
4462 * asking the listener about accepting the connection). If no listener
4463 * is waiting, we queue the operation request in hope that a listener
4464 * shows up soon (before timeout).
4465 *
4466 * This msg is expected as the first and only msg handled through the
4467 * non-operation bound virtual table, acceptance of this operation replaces
4468 * our virtual table and subsequent msgs would be routed differently (as
4469 * we then know what type of operation this is).
4470 *
4471 * @param cls the operation state
4472 * @param msg the received message
4473 */
4474static void
4475handle_incoming_msg (void *cls,
4476 const struct OperationRequestMessage *msg)
4477{
4478 struct Operation *op = cls;
4479 struct Listener *listener = op->listener;
4480 const struct GNUNET_MessageHeader *nested_context;
4481 struct GNUNET_MQ_Envelope *env;
4482 struct GNUNET_SETU_RequestMessage *cmsg;
4483
4484 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4485 /* Make a copy of the nested_context (application-specific context
4486 information that is opaque to set) so we can pass it to the
4487 listener later on */
4488 if (NULL != nested_context)
4489 op->context_msg = GNUNET_copy_message (nested_context);
4490 op->remote_element_count = ntohl (msg->element_count);
4491 GNUNET_log (
4492 GNUNET_ERROR_TYPE_DEBUG,
4493 "Received P2P operation request (port %s) for active listener\n",
4494 GNUNET_h2s (&op->listener->app_id));
4495 GNUNET_assert (0 == op->suggest_id);
4496 if (0 == suggest_id)
4497 suggest_id++;
4498 op->suggest_id = suggest_id++;
4499 GNUNET_assert (NULL != op->timeout_task);
4500 GNUNET_SCHEDULER_cancel (op->timeout_task);
4501 op->timeout_task = NULL;
4502 env = GNUNET_MQ_msg_nested_mh (cmsg,
4503 GNUNET_MESSAGE_TYPE_SETU_REQUEST,
4504 op->context_msg);
4505 GNUNET_log (
4506 GNUNET_ERROR_TYPE_DEBUG,
4507 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
4508 op->suggest_id,
4509 listener,
4510 listener->cs);
4511 cmsg->accept_id = htonl (op->suggest_id);
4512 cmsg->peer_id = op->peer;
4513 GNUNET_MQ_send (listener->cs->mq,
4514 env);
4515 /* NOTE: GNUNET_CADET_receive_done() will be called in
4516 #handle_client_accept() */
4517}
4518
4519
4520/**
4521 * Called when a client wants to create a new set. This is typically
4522 * the first request from a client, and includes the type of set
4523 * operation to be performed.
4524 *
4525 * @param cls client that sent the message
4526 * @param m message sent by the client
4527 */
4528static void
4529handle_client_create_set (void *cls,
4530 const struct GNUNET_SETU_CreateMessage *msg)
4531{
4532 struct ClientState *cs = cls;
4533 struct Set *set;
4534
4535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4536 "Client created new set for union operation\n");
4537 if (NULL != cs->set)
4538 {
4539 /* There can only be one set per client */
4540 GNUNET_break (0);
4541 GNUNET_SERVICE_client_drop (cs->client);
4542 return;
4543 }
4544 set = GNUNET_new (struct Set);
4545 {
4546 struct MultiStrataEstimator *se;
4547
4548 se = strata_estimator_create (SE_STRATA_COUNT,
4549 SE_IBFS_TOTAL_SIZE,
4550 SE_IBF_HASH_NUM);
4551 if (NULL == se)
4552 {
4553 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4554 "Failed to allocate strata estimator\n");
4555 GNUNET_free (set);
4556 GNUNET_SERVICE_client_drop (cs->client);
4557 return;
4558 }
4559 set->se = se;
4560 }
4561 set->content = GNUNET_new (struct SetContent);
4562 set->content->refcount = 1;
4563 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
4564 GNUNET_YES);
4565 set->cs = cs;
4566 cs->set = set;
4567 GNUNET_SERVICE_client_continue (cs->client);
4568}
4569
4570
4571/**
4572 * Timeout happens iff:
4573 * - we suggested an operation to our listener,
4574 * but did not receive a response in time
4575 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
4576 *
4577 * @param cls channel context
4578 * @param tc context information (why was this task triggered now)
4579 */
4580static void
4581incoming_timeout_cb (void *cls)
4582{
4583 struct Operation *op = cls;
4584
4585 op->timeout_task = NULL;
4586 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4587 "Remote peer's incoming request timed out\n");
4588 incoming_destroy (op);
4589}
4590
4591
4592/**
4593 * Method called whenever another peer has added us to a channel the
4594 * other peer initiated. Only called (once) upon reception of data
4595 * from a channel we listen on.
4596 *
4597 * The channel context represents the operation itself and gets added
4598 * to a DLL, from where it gets looked up when our local listener
4599 * client responds to a proposed/suggested operation or connects and
4600 * associates with this operation.
4601 *
4602 * @param cls closure
4603 * @param channel new handle to the channel
4604 * @param source peer that started the channel
4605 * @return initial channel context for the channel
4606 * returns NULL on error
4607 */
4608static void *
4609channel_new_cb (void *cls,
4610 struct GNUNET_CADET_Channel *channel,
4611 const struct GNUNET_PeerIdentity *source)
4612{
4613 struct Listener *listener = cls;
4614 struct Operation *op;
4615
4616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4617 "New incoming channel\n");
4618 op = GNUNET_new (struct Operation);
4619 op->listener = listener;
4620 op->peer = *source;
4621 op->channel = channel;
4622 op->mq = GNUNET_CADET_get_mq (op->channel);
4623 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
4624 UINT32_MAX);
4625 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
4626 &incoming_timeout_cb,
4627 op);
4628 GNUNET_CONTAINER_DLL_insert (listener->op_head,
4629 listener->op_tail,
4630 op);
4631 return op;
4632}
4633
4634
4635/**
4636 * Function called whenever a channel is destroyed. Should clean up
4637 * any associated state. It must NOT call
4638 * GNUNET_CADET_channel_destroy() on the channel.
4639 *
4640 * The peer_disconnect function is part of a a virtual table set initially either
4641 * when a peer creates a new channel with us, or once we create
4642 * a new channel ourselves (evaluate).
4643 *
4644 * Once we know the exact type of operation (union/intersection), the vt is
4645 * replaced with an operation specific instance (_GSS_[op]_vt).
4646 *
4647 * @param channel_ctx place where local state associated
4648 * with the channel is stored
4649 * @param channel connection to the other end (henceforth invalid)
4650 */
4651static void
4652channel_end_cb (void *channel_ctx,
4653 const struct GNUNET_CADET_Channel *channel)
4654{
4655 struct Operation *op = channel_ctx;
4656
4657 op->channel = NULL;
4658 _GSS_operation_destroy2 (op);
4659}
4660
4661
4662/**
4663 * Function called whenever an MQ-channel's transmission window size changes.
4664 *
4665 * The first callback in an outgoing channel will be with a non-zero value
4666 * and will mean the channel is connected to the destination.
4667 *
4668 * For an incoming channel it will be called immediately after the
4669 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
4670 *
4671 * @param cls Channel closure.
4672 * @param channel Connection to the other end (henceforth invalid).
4673 * @param window_size New window size. If the is more messages than buffer size
4674 * this value will be negative..
4675 */
4676static void
4677channel_window_cb (void *cls,
4678 const struct GNUNET_CADET_Channel *channel,
4679 int window_size)
4680{
4681 /* FIXME: not implemented, we could do flow control here... */
4682}
4683
4684
4685/**
4686 * Called when a client wants to create a new listener.
4687 *
4688 * @param cls client that sent the message
4689 * @param msg message sent by the client
4690 */
4691
4692static void
4693handle_client_listen (void *cls,
4694 const struct GNUNET_SETU_ListenMessage *msg)
4695{
4696 struct ClientState *cs = cls;
4697 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4698 GNUNET_MQ_hd_var_size (incoming_msg,
4699 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
4700 struct OperationRequestMessage,
4701 NULL),
4702 GNUNET_MQ_hd_var_size (union_p2p_ibf,
4703 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
4704 struct IBFMessage,
4705 NULL),
4706 GNUNET_MQ_hd_var_size (union_p2p_elements,
4707 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
4708 struct GNUNET_SETU_ElementMessage,
4709 NULL),
4710 GNUNET_MQ_hd_var_size (union_p2p_offer,
4711 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
4712 struct GNUNET_MessageHeader,
4713 NULL),
4714 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4715 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
4716 struct InquiryMessage,
4717 NULL),
4718 GNUNET_MQ_hd_var_size (union_p2p_demand,
4719 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
4720 struct GNUNET_MessageHeader,
4721 NULL),
4722 GNUNET_MQ_hd_fixed_size (union_p2p_done,
4723 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
4724 struct GNUNET_MessageHeader,
4725 NULL),
4726 GNUNET_MQ_hd_fixed_size (union_p2p_over,
4727 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
4728 struct GNUNET_MessageHeader,
4729 NULL),
4730 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4731 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
4732 struct GNUNET_MessageHeader,
4733 NULL),
4734 GNUNET_MQ_hd_var_size (union_p2p_request_full,
4735 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
4736 struct TransmitFullMessage,
4737 NULL),
4738 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4739 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
4740 struct StrataEstimatorMessage,
4741 NULL),
4742 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4743 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
4744 struct StrataEstimatorMessage,
4745 NULL),
4746 GNUNET_MQ_hd_var_size (union_p2p_full_element,
4747 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
4748 struct GNUNET_SETU_ElementMessage,
4749 NULL),
4750 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4751 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
4752 struct TransmitFullMessage,
4753 NULL),
4754 GNUNET_MQ_handler_end ()
4755 };
4756 struct Listener *listener;
4757
4758 if (NULL != cs->listener)
4759 {
4760 /* max. one active listener per client! */
4761 GNUNET_break (0);
4762 GNUNET_SERVICE_client_drop (cs->client);
4763 return;
4764 }
4765 listener = GNUNET_new (struct Listener);
4766 listener->cs = cs;
4767 cs->listener = listener;
4768 listener->app_id = msg->app_id;
4769 GNUNET_CONTAINER_DLL_insert (listener_head,
4770 listener_tail,
4771 listener);
4772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4773 "New listener created (port %s)\n",
4774 GNUNET_h2s (&listener->app_id));
4775 listener->open_port = GNUNET_CADET_open_port (cadet,
4776 &msg->app_id,
4777 &channel_new_cb,
4778 listener,
4779 &channel_window_cb,
4780 &channel_end_cb,
4781 cadet_handlers);
4782 GNUNET_SERVICE_client_continue (cs->client);
4783}
4784
4785
4786/**
4787 * Called when the listening client rejects an operation
4788 * request by another peer.
4789 *
4790 * @param cls client that sent the message
4791 * @param msg message sent by the client
4792 */
4793static void
4794handle_client_reject (void *cls,
4795 const struct GNUNET_SETU_RejectMessage *msg)
4796{
4797 struct ClientState *cs = cls;
4798 struct Operation *op;
4799
4800 op = get_incoming (ntohl (msg->accept_reject_id));
4801 if (NULL == op)
4802 {
4803 /* no matching incoming operation for this reject;
4804 could be that the other peer already disconnected... */
4805 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4806 "Client rejected unknown operation %u\n",
4807 (unsigned int) ntohl (msg->accept_reject_id));
4808 GNUNET_SERVICE_client_continue (cs->client);
4809 return;
4810 }
4811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4812 "Peer request (app %s) rejected by client\n",
4813 GNUNET_h2s (&cs->listener->app_id));
4814 _GSS_operation_destroy2 (op);
4815 GNUNET_SERVICE_client_continue (cs->client);
4816}
4817
4818
4819/**
4820 * Called when a client wants to add or remove an element to a set it inhabits.
4821 *
4822 * @param cls client that sent the message
4823 * @param msg message sent by the client
4824 */
4825static int
4826check_client_set_add (void *cls,
4827 const struct GNUNET_SETU_ElementMessage *msg)
4828{
4829 /* NOTE: Technically, we should probably check with the
4830 block library whether the element we are given is well-formed */
4831 return GNUNET_OK;
4832}
4833
4834
4835/**
4836 * Called when a client wants to add or remove an element to a set it inhabits.
4837 *
4838 * @param cls client that sent the message
4839 * @param msg message sent by the client
4840 */
4841static void
4842handle_client_set_add (void *cls,
4843 const struct GNUNET_SETU_ElementMessage *msg)
4844{
4845 struct ClientState *cs = cls;
4846 struct Set *set;
4847 struct GNUNET_SETU_Element el;
4848 struct ElementEntry *ee;
4849 struct GNUNET_HashCode hash;
4850
4851 if (NULL == (set = cs->set))
4852 {
4853 /* client without a set requested an operation */
4854 GNUNET_break (0);
4855 GNUNET_SERVICE_client_drop (cs->client);
4856 return;
4857 }
4858 GNUNET_SERVICE_client_continue (cs->client);
4859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
4860 el.size = ntohs (msg->header.size) - sizeof(*msg);
4861 el.data = &msg[1];
4862 el.element_type = ntohs (msg->element_type);
4863 GNUNET_SETU_element_hash (&el,
4864 &hash);
4865 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
4866 &hash);
4867 if (NULL == ee)
4868 {
4869 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4870 "Client inserts element %s of size %u\n",
4871 GNUNET_h2s (&hash),
4872 el.size);
4873 ee = GNUNET_malloc (el.size + sizeof(*ee));
4874 ee->element.size = el.size;
4875 GNUNET_memcpy (&ee[1], el.data, el.size);
4876 ee->element.data = &ee[1];
4877 ee->element.element_type = el.element_type;
4878 ee->remote = GNUNET_NO;
4879 ee->generation = set->current_generation;
4880 ee->element_hash = hash;
4881 GNUNET_break (GNUNET_YES ==
4882 GNUNET_CONTAINER_multihashmap_put (
4883 set->content->elements,
4884 &ee->element_hash,
4885 ee,
4886 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4887 }
4888 else
4889 {
4890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4891 "Client inserted element %s of size %u twice (ignored)\n",
4892 GNUNET_h2s (&hash),
4893 el.size);
4894 /* same element inserted twice */
4895 return;
4896 }
4897 strata_estimator_insert (set->se,
4898 get_ibf_key (&ee->element_hash));
4899}
4900
4901
4902/**
4903 * Advance the current generation of a set,
4904 * adding exclusion ranges if necessary.
4905 *
4906 * @param set the set where we want to advance the generation
4907 */
4908static void
4909advance_generation (struct Set *set)
4910{
4911 set->content->latest_generation++;
4912 set->current_generation++;
4913}
4914
4915
4916/**
4917 * Called when a client wants to initiate a set operation with another
4918 * peer. Initiates the CADET connection to the listener and sends the
4919 * request.
4920 *
4921 * @param cls client that sent the message
4922 * @param msg message sent by the client
4923 * @return #GNUNET_OK if the message is well-formed
4924 */
4925static int
4926check_client_evaluate (void *cls,
4927 const struct GNUNET_SETU_EvaluateMessage *msg)
4928{
4929 /* FIXME: suboptimal, even if the context below could be NULL,
4930 there are malformed messages this does not check for... */
4931 return GNUNET_OK;
4932}
4933
4934
4935/**
4936 * Called when a client wants to initiate a set operation with another
4937 * peer. Initiates the CADET connection to the listener and sends the
4938 * request.
4939 *
4940 * @param cls client that sent the message
4941 * @param msg message sent by the client
4942 */
4943static void
4944handle_client_evaluate (void *cls,
4945 const struct GNUNET_SETU_EvaluateMessage *msg)
4946{
4947 struct ClientState *cs = cls;
4948 struct Operation *op = GNUNET_new (struct Operation);
4949
4950 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4951 GNUNET_MQ_hd_var_size (incoming_msg,
4952 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
4953 struct OperationRequestMessage,
4954 op),
4955 GNUNET_MQ_hd_var_size (union_p2p_ibf,
4956 GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
4957 struct IBFMessage,
4958 op),
4959 GNUNET_MQ_hd_var_size (union_p2p_elements,
4960 GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
4961 struct GNUNET_SETU_ElementMessage,
4962 op),
4963 GNUNET_MQ_hd_var_size (union_p2p_offer,
4964 GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
4965 struct GNUNET_MessageHeader,
4966 op),
4967 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4968 GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
4969 struct InquiryMessage,
4970 op),
4971 GNUNET_MQ_hd_var_size (union_p2p_demand,
4972 GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
4973 struct GNUNET_MessageHeader,
4974 op),
4975 GNUNET_MQ_hd_fixed_size (union_p2p_done,
4976 GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
4977 struct GNUNET_MessageHeader,
4978 op),
4979 GNUNET_MQ_hd_fixed_size (union_p2p_over,
4980 GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
4981 struct GNUNET_MessageHeader,
4982 op),
4983 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4984 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
4985 struct GNUNET_MessageHeader,
4986 op),
4987 GNUNET_MQ_hd_var_size (union_p2p_request_full,
4988 GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
4989 struct TransmitFullMessage,
4990 op),
4991 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4992 GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
4993 struct StrataEstimatorMessage,
4994 op),
4995 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4996 GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
4997 struct StrataEstimatorMessage,
4998 op),
4999 GNUNET_MQ_hd_var_size (union_p2p_full_element,
5000 GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
5001 struct GNUNET_SETU_ElementMessage,
5002 op),
5003 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5004 GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL,
5005 struct TransmitFullMessage,
5006 NULL),
5007 GNUNET_MQ_handler_end ()
5008 };
5009 struct Set *set;
5010 const struct GNUNET_MessageHeader *context;
5011
5012 if (NULL == (set = cs->set))
5013 {
5014 GNUNET_break (0);
5015 GNUNET_free (op);
5016 GNUNET_SERVICE_client_drop (cs->client);
5017 return;
5018 }
5019 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
5020 UINT32_MAX);
5021 op->peer = msg->target_peer;
5022 op->client_request_id = ntohl (msg->request_id);
5023 op->byzantine = msg->byzantine;
5024 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5025 op->force_full = msg->force_full;
5026 op->force_delta = msg->force_delta;
5027 op->symmetric = msg->symmetric;
5028 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5029 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5030 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5031 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5032 op->active_passive_switch_required = false;
5033 context = GNUNET_MQ_extract_nested_mh (msg);
5034
5035 /* create hashmap for message control */
5036 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5037 GNUNET_NO);
5038 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5039
5040#if MEASURE_PERFORMANCE
5041 /* load config */
5042 load_config (op);
5043#endif
5044
5045 /* Advance generation values, so that
5046 mutations won't interfer with the running operation. */
5047 op->set = set;
5048 op->generation_created = set->current_generation;
5049 advance_generation (set);
5050 GNUNET_CONTAINER_DLL_insert (set->ops_head,
5051 set->ops_tail,
5052 op);
5053 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5054 "Creating new CADET channel to port %s for set union\n",
5055 GNUNET_h2s (&msg->app_id));
5056 op->channel = GNUNET_CADET_channel_create (cadet,
5057 op,
5058 &msg->target_peer,
5059 &msg->app_id,
5060 &channel_window_cb,
5061 &channel_end_cb,
5062 cadet_handlers);
5063 op->mq = GNUNET_CADET_get_mq (op->channel);
5064 {
5065 struct GNUNET_MQ_Envelope *ev;
5066 struct OperationRequestMessage *msg;
5067
5068#if MEASURE_PERFORMANCE
5069 perf_store.operation_request.sent += 1;
5070#endif
5071 ev = GNUNET_MQ_msg_nested_mh (msg,
5072 GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
5073 context);
5074 if (NULL == ev)
5075 {
5076 /* the context message is too large */
5077 GNUNET_break (0);
5078 GNUNET_SERVICE_client_drop (cs->client);
5079 return;
5080 }
5081 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5082 GNUNET_NO);
5083 /* copy the current generation's strata estimator for this operation */
5084 op->se = strata_estimator_dup (op->set->se);
5085 /* we started the operation, thus we have to send the operation request */
5086 op->phase = PHASE_EXPECT_SE;
5087
5088 op->salt_receive = (op->peer_site + 1) % 2;
5089 op->salt_send = op->peer_site; // FIXME?????
5090
5091
5092 LOG (GNUNET_ERROR_TYPE_DEBUG,
5093 "Initiating union operation evaluation\n");
5094 GNUNET_STATISTICS_update (_GSS_statistics,
5095 "# of total union operations",
5096 1,
5097 GNUNET_NO);
5098 GNUNET_STATISTICS_update (_GSS_statistics,
5099 "# of initiated union operations",
5100 1,
5101 GNUNET_NO);
5102 GNUNET_MQ_send (op->mq,
5103 ev);
5104 if (NULL != context)
5105 LOG (GNUNET_ERROR_TYPE_DEBUG,
5106 "sent op request with context message\n");
5107 else
5108 LOG (GNUNET_ERROR_TYPE_DEBUG,
5109 "sent op request without context message\n");
5110 initialize_key_to_element (op);
5111 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
5112 op->key_to_element);
5113
5114 }
5115 GNUNET_SERVICE_client_continue (cs->client);
5116}
5117
5118
5119/**
5120 * Handle a request from the client to cancel a running set operation.
5121 *
5122 * @param cls the client
5123 * @param msg the message
5124 */
5125static void
5126handle_client_cancel (void *cls,
5127 const struct GNUNET_SETU_CancelMessage *msg)
5128{
5129 struct ClientState *cs = cls;
5130 struct Set *set;
5131 struct Operation *op;
5132 int found;
5133
5134 if (NULL == (set = cs->set))
5135 {
5136 /* client without a set requested an operation */
5137 GNUNET_break (0);
5138 GNUNET_SERVICE_client_drop (cs->client);
5139 return;
5140 }
5141 found = GNUNET_NO;
5142 for (op = set->ops_head; NULL != op; op = op->next)
5143 {
5144 if (op->client_request_id == ntohl (msg->request_id))
5145 {
5146 found = GNUNET_YES;
5147 break;
5148 }
5149 }
5150 if (GNUNET_NO == found)
5151 {
5152 /* It may happen that the operation was already destroyed due to
5153 * the other peer disconnecting. The client may not know about this
5154 * yet and try to cancel the (just barely non-existent) operation.
5155 * So this is not a hard error.
5156 *///
5157 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
5158 "Client canceled non-existent op %u\n",
5159 (uint32_t) ntohl (msg->request_id));
5160 }
5161 else
5162 {
5163 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5164 "Client requested cancel for op %u\n",
5165 (uint32_t) ntohl (msg->request_id));
5166 _GSS_operation_destroy (op);
5167 }
5168 GNUNET_SERVICE_client_continue (cs->client);
5169}
5170
5171
5172/**
5173 * Handle a request from the client to accept a set operation that
5174 * came from a remote peer. We forward the accept to the associated
5175 * operation for handling
5176 *
5177 * @param cls the client
5178 * @param msg the message
5179 */
5180static void
5181handle_client_accept (void *cls,
5182 const struct GNUNET_SETU_AcceptMessage *msg)
5183{
5184 struct ClientState *cs = cls;
5185 struct Set *set;
5186 struct Operation *op;
5187 struct GNUNET_SETU_ResultMessage *result_message;
5188 struct GNUNET_MQ_Envelope *ev;
5189 struct Listener *listener;
5190
5191 if (NULL == (set = cs->set))
5192 {
5193 /* client without a set requested to accept */
5194 GNUNET_break (0);
5195 GNUNET_SERVICE_client_drop (cs->client);
5196 return;
5197 }
5198 op = get_incoming (ntohl (msg->accept_reject_id));
5199 if (NULL == op)
5200 {
5201 /* It is not an error if the set op does not exist -- it may
5202 * have been destroyed when the partner peer disconnected. */
5203 GNUNET_log (
5204 GNUNET_ERROR_TYPE_INFO,
5205 "Client %p accepted request %u of listener %p that is no longer active\n",
5206 cs,
5207 ntohl (msg->accept_reject_id),
5208 cs->listener);
5209 ev = GNUNET_MQ_msg (result_message,
5210 GNUNET_MESSAGE_TYPE_SETU_RESULT);
5211 result_message->request_id = msg->request_id;
5212 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
5213 GNUNET_MQ_send (set->cs->mq, ev);
5214 GNUNET_SERVICE_client_continue (cs->client);
5215 return;
5216 }
5217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5218 "Client accepting request %u\n",
5219 (uint32_t) ntohl (msg->accept_reject_id));
5220 listener = op->listener;
5221 op->listener = NULL;
5222 GNUNET_CONTAINER_DLL_remove (listener->op_head,
5223 listener->op_tail,
5224 op);
5225 op->set = set;
5226 GNUNET_CONTAINER_DLL_insert (set->ops_head,
5227 set->ops_tail,
5228 op);
5229 op->client_request_id = ntohl (msg->request_id);
5230 op->byzantine = msg->byzantine;
5231 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5232 op->force_full = msg->force_full;
5233 op->force_delta = msg->force_delta;
5234 op->symmetric = msg->symmetric;
5235 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5236 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5237 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5238 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5239 op->active_passive_switch_required = false;
5240 /* create hashmap for message control */
5241 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5242 GNUNET_NO);
5243 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5244
5245#if MEASURE_PERFORMANCE
5246 /* load config */
5247 load_config (op);
5248#endif
5249
5250 /* Advance generation values, so that future mutations do not
5251 interfer with the running operation. */
5252 op->generation_created = set->current_generation;
5253 advance_generation (set);
5254 GNUNET_assert (NULL == op->se);
5255
5256 LOG (GNUNET_ERROR_TYPE_DEBUG,
5257 "accepting set union operation\n");
5258 GNUNET_STATISTICS_update (_GSS_statistics,
5259 "# of accepted union operations",
5260 1,
5261 GNUNET_NO);
5262 GNUNET_STATISTICS_update (_GSS_statistics,
5263 "# of total union operations",
5264 1,
5265 GNUNET_NO);
5266 {
5267 struct MultiStrataEstimator *se;
5268 struct GNUNET_MQ_Envelope *ev;
5269 struct StrataEstimatorMessage *strata_msg;
5270 char *buf;
5271 size_t len;
5272 uint16_t type;
5273
5274 op->se = strata_estimator_dup (op->set->se);
5275 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5276 GNUNET_NO);
5277 op->salt_receive = (op->peer_site + 1) % 2;
5278 op->salt_send = op->peer_site; // FIXME?????
5279 initialize_key_to_element (op);
5280 op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
5281 op->key_to_element);
5282
5283 /* kick off the operation */
5284 se = op->se;
5285
5286 uint8_t se_count = 1;
5287 if (op->initial_size > 0)
5288 {
5289 op->total_elements_size_local = 0;
5290 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5291 &
5292 determinate_avg_element_size_iterator,
5293 op);
5294 se_count = determine_strata_count (
5295 op->total_elements_size_local / op->initial_size,
5296 op->initial_size);
5297 }
5298 buf = GNUNET_malloc (se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5299 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
5300 len = strata_estimator_write (se,
5301 SE_IBFS_TOTAL_SIZE,
5302 se_count,
5303 buf);
5304#if MEASURE_PERFORMANCE
5305 perf_store.se.sent += 1;
5306 perf_store.se.sent_var_bytes += len;
5307#endif
5308
5309 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5310 * SE_IBFS_TOTAL_SIZE)
5311 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
5312 else
5313 type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
5314 ev = GNUNET_MQ_msg_extra (strata_msg,
5315 len,
5316 type);
5317 GNUNET_memcpy (&strata_msg[1],
5318 buf,
5319 len);
5320 GNUNET_free (buf);
5321 strata_msg->set_size
5322 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
5323 op->set->content->elements));
5324 strata_msg->se_count = se_count;
5325 GNUNET_MQ_send (op->mq,
5326 ev);
5327 op->phase = PHASE_EXPECT_IBF;
5328 }
5329 /* Now allow CADET to continue, as we did not do this in
5330 #handle_incoming_msg (as we wanted to first see if the
5331 local client would accept the request). */
5332 GNUNET_CADET_receive_done (op->channel);
5333 GNUNET_SERVICE_client_continue (cs->client);
5334}
5335
5336
5337/**
5338 * Called to clean up, after a shutdown has been requested.
5339 *
5340 * @param cls closure, NULL
5341 */
5342static void
5343shutdown_task (void *cls)
5344{
5345 /* Delay actual shutdown to allow service to disconnect clients */
5346 in_shutdown = GNUNET_YES;
5347 if (0 == num_clients)
5348 {
5349 if (NULL != cadet)
5350 {
5351 GNUNET_CADET_disconnect (cadet);
5352 cadet = NULL;
5353 }
5354 }
5355 GNUNET_STATISTICS_destroy (_GSS_statistics,
5356 GNUNET_YES);
5357 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5358 "handled shutdown request\n");
5359#if MEASURE_PERFORMANCE
5360 calculate_perf_store ();
5361#endif
5362}
5363
5364
5365/**
5366 * Function called by the service's run
5367 * method to run service-specific setup code.
5368 *
5369 * @param cls closure
5370 * @param cfg configuration to use
5371 * @param service the initialized service
5372 */
5373static void
5374run (void *cls,
5375 const struct GNUNET_CONFIGURATION_Handle *cfg,
5376 struct GNUNET_SERVICE_Handle *service)
5377{
5378 /* FIXME: need to modify SERVICE (!) API to allow
5379 us to run a shutdown task *after* clients were
5380 forcefully disconnected! */
5381 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
5382 NULL);
5383 _GSS_statistics = GNUNET_STATISTICS_create ("setu",
5384 cfg);
5385 cadet = GNUNET_CADET_connect (cfg);
5386 if (NULL == cadet)
5387 {
5388 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
5389 _ ("Could not connect to CADET service\n"));
5390 GNUNET_SCHEDULER_shutdown ();
5391 return;
5392 }
5393}
5394
5395
5396/**
5397 * Define "main" method using service macro.
5398 */
5399GNUNET_SERVICE_MAIN (
5400 "set",
5401 GNUNET_SERVICE_OPTION_NONE,
5402 &run,
5403 &client_connect_cb,
5404 &client_disconnect_cb,
5405 NULL,
5406 GNUNET_MQ_hd_fixed_size (client_accept,
5407 GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
5408 struct GNUNET_SETU_AcceptMessage,
5409 NULL),
5410 GNUNET_MQ_hd_var_size (client_set_add,
5411 GNUNET_MESSAGE_TYPE_SETU_ADD,
5412 struct GNUNET_SETU_ElementMessage,
5413 NULL),
5414 GNUNET_MQ_hd_fixed_size (client_create_set,
5415 GNUNET_MESSAGE_TYPE_SETU_CREATE,
5416 struct GNUNET_SETU_CreateMessage,
5417 NULL),
5418 GNUNET_MQ_hd_var_size (client_evaluate,
5419 GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
5420 struct GNUNET_SETU_EvaluateMessage,
5421 NULL),
5422 GNUNET_MQ_hd_fixed_size (client_listen,
5423 GNUNET_MESSAGE_TYPE_SETU_LISTEN,
5424 struct GNUNET_SETU_ListenMessage,
5425 NULL),
5426 GNUNET_MQ_hd_fixed_size (client_reject,
5427 GNUNET_MESSAGE_TYPE_SETU_REJECT,
5428 struct GNUNET_SETU_RejectMessage,
5429 NULL),
5430 GNUNET_MQ_hd_fixed_size (client_cancel,
5431 GNUNET_MESSAGE_TYPE_SETU_CANCEL,
5432 struct GNUNET_SETU_CancelMessage,
5433 NULL),
5434 GNUNET_MQ_handler_end ());
5435
5436
5437/* end of gnunet-service-setu.c */