aboutsummaryrefslogtreecommitdiff
path: root/src/contrib/service/set/gnunet-service-set.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/contrib/service/set/gnunet-service-set.c')
-rw-r--r--src/contrib/service/set/gnunet-service-set.c1983
1 files changed, 1983 insertions, 0 deletions
diff --git a/src/contrib/service/set/gnunet-service-set.c b/src/contrib/service/set/gnunet-service-set.c
new file mode 100644
index 000000000..7c522ec34
--- /dev/null
+++ b/src/contrib/service/set/gnunet-service-set.c
@@ -0,0 +1,1983 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-set.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet-service-set.h"
28#include "gnunet-service-set_union.h"
29#include "gnunet-service-set_intersection.h"
30#include "gnunet-service-set_protocol.h"
31#include "gnunet_statistics_service.h"
32
33/**
34 * How long do we hold on to an incoming channel if there is
35 * no local listener before giving up?
36 */
37#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
38
39
40/**
41 * Lazy copy requests made by a client.
42 */
43struct LazyCopyRequest
44{
45 /**
46 * Kept in a DLL.
47 */
48 struct LazyCopyRequest *prev;
49
50 /**
51 * Kept in a DLL.
52 */
53 struct LazyCopyRequest *next;
54
55 /**
56 * Which set are we supposed to copy?
57 */
58 struct Set *source_set;
59
60 /**
61 * Cookie identifying the request.
62 */
63 uint32_t cookie;
64};
65
66
67/**
68 * A listener is inhabited by a client, and waits for evaluation
69 * requests from remote peers.
70 */
71struct Listener
72{
73 /**
74 * Listeners are held in a doubly linked list.
75 */
76 struct Listener *next;
77
78 /**
79 * Listeners are held in a doubly linked list.
80 */
81 struct Listener *prev;
82
83 /**
84 * Head of DLL of operations this listener is responsible for.
85 * Once the client has accepted/declined the operation, the
86 * operation is moved to the respective set's operation DLLS.
87 */
88 struct Operation *op_head;
89
90 /**
91 * Tail of DLL of operations this listener is responsible for.
92 * Once the client has accepted/declined the operation, the
93 * operation is moved to the respective set's operation DLLS.
94 */
95 struct Operation *op_tail;
96
97 /**
98 * Client that owns the listener.
99 * Only one client may own a listener.
100 */
101 struct ClientState *cs;
102
103 /**
104 * The port we are listening on with CADET.
105 */
106 struct GNUNET_CADET_Port *open_port;
107
108 /**
109 * Application ID for the operation, used to distinguish
110 * multiple operations of the same type with the same peer.
111 */
112 struct GNUNET_HashCode app_id;
113
114 /**
115 * The type of the operation.
116 */
117 enum GNUNET_SET_OperationType operation;
118};
119
120
121/**
122 * Handle to the cadet service, used to listen for and connect to
123 * remote peers.
124 */
125static struct GNUNET_CADET_Handle *cadet;
126
127/**
128 * DLL of lazy copy requests by this client.
129 */
130static struct LazyCopyRequest *lazy_copy_head;
131
132/**
133 * DLL of lazy copy requests by this client.
134 */
135static struct LazyCopyRequest *lazy_copy_tail;
136
137/**
138 * Generator for unique cookie we set per lazy copy request.
139 */
140static uint32_t lazy_copy_cookie;
141
142/**
143 * Statistics handle.
144 */
145struct GNUNET_STATISTICS_Handle *_GSS_statistics;
146
147/**
148 * Listeners are held in a doubly linked list.
149 */
150static struct Listener *listener_head;
151
152/**
153 * Listeners are held in a doubly linked list.
154 */
155static struct Listener *listener_tail;
156
157/**
158 * Number of active clients.
159 */
160static unsigned int num_clients;
161
162/**
163 * Are we in shutdown? if #GNUNET_YES and the number of clients
164 * drops to zero, disconnect from CADET.
165 */
166static int in_shutdown;
167
168/**
169 * Counter for allocating unique IDs for clients, used to identify
170 * incoming operation requests from remote peers, that the client can
171 * choose to accept or refuse. 0 must not be used (reserved for
172 * uninitialized).
173 */
174static uint32_t suggest_id;
175
176
177/**
178 * Get the incoming socket associated with the given id.
179 *
180 * @param id id to look for
181 * @return the incoming socket associated with the id,
182 * or NULL if there is none
183 */
184static struct Operation *
185get_incoming (uint32_t id)
186{
187 for (struct Listener *listener = listener_head; NULL != listener;
188 listener = listener->next)
189 {
190 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191 if (op->suggest_id == id)
192 return op;
193 }
194 return NULL;
195}
196
197
198/**
199 * Destroy an incoming request from a remote peer
200 *
201 * @param op remote request to destroy
202 */
203static void
204incoming_destroy (struct Operation *op)
205{
206 struct Listener *listener;
207
208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
209 "Destroying incoming operation %p\n",
210 op);
211 if (NULL != (listener = op->listener))
212 {
213 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
214 op->listener = NULL;
215 }
216 if (NULL != op->timeout_task)
217 {
218 GNUNET_SCHEDULER_cancel (op->timeout_task);
219 op->timeout_task = NULL;
220 }
221 _GSS_operation_destroy2 (op);
222}
223
224
225/**
226 * Context for the #garbage_collect_cb().
227 */
228struct GarbageContext
229{
230 /**
231 * Map for which we are garbage collecting removed elements.
232 */
233 struct GNUNET_CONTAINER_MultiHashMap *map;
234
235 /**
236 * Lowest generation for which an operation is still pending.
237 */
238 unsigned int min_op_generation;
239
240 /**
241 * Largest generation for which an operation is still pending.
242 */
243 unsigned int max_op_generation;
244};
245
246
247/**
248 * Function invoked to check if an element can be removed from
249 * the set's history because it is no longer needed.
250 *
251 * @param cls the `struct GarbageContext *`
252 * @param key key of the element in the map
253 * @param value the `struct ElementEntry *`
254 * @return #GNUNET_OK (continue to iterate)
255 */
256static int
257garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
258{
259 // struct GarbageContext *gc = cls;
260 // struct ElementEntry *ee = value;
261
262 // if (GNUNET_YES != ee->removed)
263 // return GNUNET_OK;
264 // if ( (gc->max_op_generation < ee->generation_added) ||
265 // (ee->generation_removed > gc->min_op_generation) )
266 // {
267 // GNUNET_assert (GNUNET_YES ==
268 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
269 // key,
270 // ee));
271 // GNUNET_free (ee);
272 // }
273 return GNUNET_OK;
274}
275
276
277/**
278 * Collect and destroy elements that are not needed anymore, because
279 * their lifetime (as determined by their generation) does not overlap
280 * with any active set operation.
281 *
282 * @param set set to garbage collect
283 */
284static void
285collect_generation_garbage (struct Set *set)
286{
287 struct GarbageContext gc;
288
289 gc.min_op_generation = UINT_MAX;
290 gc.max_op_generation = 0;
291 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
292 {
293 gc.min_op_generation =
294 GNUNET_MIN (gc.min_op_generation, op->generation_created);
295 gc.max_op_generation =
296 GNUNET_MAX (gc.max_op_generation, op->generation_created);
297 }
298 gc.map = set->content->elements;
299 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
300 &garbage_collect_cb,
301 &gc);
302}
303
304
305/**
306 * Is @a generation in the range of exclusions?
307 *
308 * @param generation generation to query
309 * @param excluded array of generations where the element is excluded
310 * @param excluded_size length of the @a excluded array
311 * @return #GNUNET_YES if @a generation is in any of the ranges
312 */
313static int
314is_excluded_generation (unsigned int generation,
315 struct GenerationRange *excluded,
316 unsigned int excluded_size)
317{
318 for (unsigned int i = 0; i < excluded_size; i++)
319 if ((generation >= excluded[i].start) && (generation < excluded[i].end))
320 return GNUNET_YES;
321 return GNUNET_NO;
322}
323
324
325/**
326 * Is element @a ee part of the set during @a query_generation?
327 *
328 * @param ee element to test
329 * @param query_generation generation to query
330 * @param excluded array of generations where the element is excluded
331 * @param excluded_size length of the @a excluded array
332 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
333 */
334static int
335is_element_of_generation (struct ElementEntry *ee,
336 unsigned int query_generation,
337 struct GenerationRange *excluded,
338 unsigned int excluded_size)
339{
340 struct MutationEvent *mut;
341 int is_present;
342
343 GNUNET_assert (NULL != ee->mutations);
344 if (GNUNET_YES ==
345 is_excluded_generation (query_generation, excluded, excluded_size))
346 {
347 GNUNET_break (0);
348 return GNUNET_NO;
349 }
350
351 is_present = GNUNET_NO;
352
353 /* Could be made faster with binary search, but lists
354 are small, so why bother. */
355 for (unsigned int i = 0; i < ee->mutations_size; i++)
356 {
357 mut = &ee->mutations[i];
358
359 if (mut->generation > query_generation)
360 {
361 /* The mutation doesn't apply to our generation
362 anymore. We can'b break here, since mutations aren't
363 sorted by generation. */
364 continue;
365 }
366
367 if (GNUNET_YES ==
368 is_excluded_generation (mut->generation, excluded, excluded_size))
369 {
370 /* The generation is excluded (because it belongs to another
371 fork via a lazy copy) and thus mutations aren't considered
372 for membership testing. */
373 continue;
374 }
375
376 /* This would be an inconsistency in how we manage mutations. */
377 if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
378 GNUNET_assert (0);
379 /* Likewise. */
380 if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
381 GNUNET_assert (0);
382
383 is_present = mut->added;
384 }
385
386 return is_present;
387}
388
389
390/**
391 * Is element @a ee part of the set used by @a op?
392 *
393 * @param ee element to test
394 * @param op operation the defines the set and its generation
395 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
396 */
397int
398_GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op)
399{
400 return is_element_of_generation (ee,
401 op->generation_created,
402 op->set->excluded_generations,
403 op->set->excluded_generations_size);
404}
405
406
407/**
408 * Destroy the given operation. Used for any operation where both
409 * peers were known and that thus actually had a vt and channel. Must
410 * not be used for operations where 'listener' is still set and we do
411 * not know the other peer.
412 *
413 * Call the implementation-specific cancel function of the operation.
414 * Disconnects from the remote peer. Does not disconnect the client,
415 * as there may be multiple operations per set.
416 *
417 * @param op operation to destroy
418 * @param gc #GNUNET_YES to perform garbage collection on the set
419 */
420void
421_GSS_operation_destroy (struct Operation *op, int gc)
422{
423 struct Set *set = op->set;
424 struct GNUNET_CADET_Channel *channel;
425
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
427 GNUNET_assert (NULL == op->listener);
428 if (NULL != op->state)
429 {
430 set->vt->cancel (op);
431 op->state = NULL;
432 }
433 if (NULL != set)
434 {
435 GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op);
436 op->set = NULL;
437 }
438 if (NULL != op->context_msg)
439 {
440 GNUNET_free (op->context_msg);
441 op->context_msg = NULL;
442 }
443 if (NULL != (channel = op->channel))
444 {
445 /* This will free op; called conditionally as this helper function
446 is also called from within the channel disconnect handler. */
447 op->channel = NULL;
448 GNUNET_CADET_channel_destroy (channel);
449 }
450 if ((NULL != set) && (GNUNET_YES == gc))
451 collect_generation_garbage (set);
452 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
453 * there was a channel end handler that will free 'op' on the call stack. */
454}
455
456
457/**
458 * Callback called when a client connects to the service.
459 *
460 * @param cls closure for the service
461 * @param c the new client that connected to the service
462 * @param mq the message queue used to send messages to the client
463 * @return @a `struct ClientState`
464 */
465static void *
466client_connect_cb (void *cls,
467 struct GNUNET_SERVICE_Client *c,
468 struct GNUNET_MQ_Handle *mq)
469{
470 struct ClientState *cs;
471
472 num_clients++;
473 cs = GNUNET_new (struct ClientState);
474 cs->client = c;
475 cs->mq = mq;
476 return cs;
477}
478
479
480/**
481 * Iterator over hash map entries to free element entries.
482 *
483 * @param cls closure
484 * @param key current key code
485 * @param value a `struct ElementEntry *` to be free'd
486 * @return #GNUNET_YES (continue to iterate)
487 */
488static int
489destroy_elements_iterator (void *cls,
490 const struct GNUNET_HashCode *key,
491 void *value)
492{
493 struct ElementEntry *ee = value;
494
495 GNUNET_free (ee->mutations);
496 GNUNET_free (ee);
497 return GNUNET_YES;
498}
499
500
501/**
502 * Clean up after a client has disconnected
503 *
504 * @param cls closure, unused
505 * @param client the client to clean up after
506 * @param internal_cls the `struct ClientState`
507 */
508static void
509client_disconnect_cb (void *cls,
510 struct GNUNET_SERVICE_Client *client,
511 void *internal_cls)
512{
513 struct ClientState *cs = internal_cls;
514 struct Operation *op;
515 struct Listener *listener;
516 struct Set *set;
517
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
519 if (NULL != (set = cs->set))
520 {
521 struct SetContent *content = set->content;
522 struct PendingMutation *pm;
523 struct PendingMutation *pm_current;
524 struct LazyCopyRequest *lcr;
525
526 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
527 /* Destroy pending set operations */
528 while (NULL != set->ops_head)
529 _GSS_operation_destroy (set->ops_head, GNUNET_NO);
530
531 /* Destroy operation-specific state */
532 GNUNET_assert (NULL != set->state);
533 set->vt->destroy_set (set->state);
534 set->state = NULL;
535
536 /* Clean up ongoing iterations */
537 if (NULL != set->iter)
538 {
539 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
540 set->iter = NULL;
541 set->iteration_id++;
542 }
543
544 /* discard any pending mutations that reference this set */
545 pm = content->pending_mutations_head;
546 while (NULL != pm)
547 {
548 pm_current = pm;
549 pm = pm->next;
550 if (pm_current->set == set)
551 {
552 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
553 content->pending_mutations_tail,
554 pm_current);
555 GNUNET_free (pm_current);
556 }
557 }
558
559 /* free set content (or at least decrement RC) */
560 set->content = NULL;
561 GNUNET_assert (0 != content->refcount);
562 content->refcount--;
563 if (0 == content->refcount)
564 {
565 GNUNET_assert (NULL != content->elements);
566 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
567 &destroy_elements_iterator,
568 NULL);
569 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
570 content->elements = NULL;
571 GNUNET_free (content);
572 }
573 GNUNET_free (set->excluded_generations);
574 set->excluded_generations = NULL;
575
576 /* remove set from pending copy requests */
577 lcr = lazy_copy_head;
578 while (NULL != lcr)
579 {
580 struct LazyCopyRequest *lcr_current = lcr;
581
582 lcr = lcr->next;
583 if (lcr_current->source_set == set)
584 {
585 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
586 lazy_copy_tail,
587 lcr_current);
588 GNUNET_free (lcr_current);
589 }
590 }
591 GNUNET_free (set);
592 }
593
594 if (NULL != (listener = cs->listener))
595 {
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
597 GNUNET_CADET_close_port (listener->open_port);
598 listener->open_port = NULL;
599 while (NULL != (op = listener->op_head))
600 {
601 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
602 "Destroying incoming operation `%u' from peer `%s'\n",
603 (unsigned int) op->client_request_id,
604 GNUNET_i2s (&op->peer));
605 incoming_destroy (op);
606 }
607 GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
608 GNUNET_free (listener);
609 }
610 GNUNET_free (cs);
611 num_clients--;
612 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
613 {
614 if (NULL != cadet)
615 {
616 GNUNET_CADET_disconnect (cadet);
617 cadet = NULL;
618 }
619 }
620}
621
622
623/**
624 * Check a request for a set operation from another peer.
625 *
626 * @param cls the operation state
627 * @param msg the received message
628 * @return #GNUNET_OK if the channel should be kept alive,
629 * #GNUNET_SYSERR to destroy the channel
630 */
631static int
632check_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
633{
634 struct Operation *op = cls;
635 struct Listener *listener = op->listener;
636 const struct GNUNET_MessageHeader *nested_context;
637
638 /* double operation request */
639 if (0 != op->suggest_id)
640 {
641 GNUNET_break_op (0);
642 return GNUNET_SYSERR;
643 }
644 /* This should be equivalent to the previous condition, but can't hurt to check twice */
645 if (NULL == op->listener)
646 {
647 GNUNET_break (0);
648 return GNUNET_SYSERR;
649 }
650 if (listener->operation !=
651 (enum GNUNET_SET_OperationType) ntohl (msg->operation))
652 {
653 GNUNET_break_op (0);
654 return GNUNET_SYSERR;
655 }
656 nested_context = GNUNET_MQ_extract_nested_mh (msg);
657 if ((NULL != nested_context) &&
658 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
659 {
660 GNUNET_break_op (0);
661 return GNUNET_SYSERR;
662 }
663 return GNUNET_OK;
664}
665
666
667/**
668 * Handle a request for a set operation from another peer. Checks if we
669 * have a listener waiting for such a request (and in that case initiates
670 * asking the listener about accepting the connection). If no listener
671 * is waiting, we queue the operation request in hope that a listener
672 * shows up soon (before timeout).
673 *
674 * This msg is expected as the first and only msg handled through the
675 * non-operation bound virtual table, acceptance of this operation replaces
676 * our virtual table and subsequent msgs would be routed differently (as
677 * we then know what type of operation this is).
678 *
679 * @param cls the operation state
680 * @param msg the received message
681 * @return #GNUNET_OK if the channel should be kept alive,
682 * #GNUNET_SYSERR to destroy the channel
683 */
684static void
685handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
686{
687 struct Operation *op = cls;
688 struct Listener *listener = op->listener;
689 const struct GNUNET_MessageHeader *nested_context;
690 struct GNUNET_MQ_Envelope *env;
691 struct GNUNET_SET_RequestMessage *cmsg;
692
693 nested_context = GNUNET_MQ_extract_nested_mh (msg);
694 /* Make a copy of the nested_context (application-specific context
695 information that is opaque to set) so we can pass it to the
696 listener later on */
697 if (NULL != nested_context)
698 op->context_msg = GNUNET_copy_message (nested_context);
699 op->remote_element_count = ntohl (msg->element_count);
700 GNUNET_log (
701 GNUNET_ERROR_TYPE_DEBUG,
702 "Received P2P operation request (op %u, port %s) for active listener\n",
703 (uint32_t) ntohl (msg->operation),
704 GNUNET_h2s (&op->listener->app_id));
705 GNUNET_assert (0 == op->suggest_id);
706 if (0 == suggest_id)
707 suggest_id++;
708 op->suggest_id = suggest_id++;
709 GNUNET_assert (NULL != op->timeout_task);
710 GNUNET_SCHEDULER_cancel (op->timeout_task);
711 op->timeout_task = NULL;
712 env = GNUNET_MQ_msg_nested_mh (cmsg,
713 GNUNET_MESSAGE_TYPE_SET_REQUEST,
714 op->context_msg);
715 GNUNET_log (
716 GNUNET_ERROR_TYPE_DEBUG,
717 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
718 op->suggest_id,
719 listener,
720 listener->cs);
721 cmsg->accept_id = htonl (op->suggest_id);
722 cmsg->peer_id = op->peer;
723 GNUNET_MQ_send (listener->cs->mq, env);
724 /* NOTE: GNUNET_CADET_receive_done() will be called in
725 #handle_client_accept() */
726}
727
728
729/**
730 * Add an element to @a set as specified by @a msg
731 *
732 * @param set set to manipulate
733 * @param msg message specifying the change
734 */
735static void
736execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
737{
738 struct GNUNET_SET_Element el;
739 struct ElementEntry *ee;
740 struct GNUNET_HashCode hash;
741
742 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
743 el.size = ntohs (msg->header.size) - sizeof(*msg);
744 el.data = &msg[1];
745 el.element_type = ntohs (msg->element_type);
746 GNUNET_SET_element_hash (&el, &hash);
747 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
748 if (NULL == ee)
749 {
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "Client inserts element %s of size %u\n",
752 GNUNET_h2s (&hash),
753 el.size);
754 ee = GNUNET_malloc (el.size + sizeof(*ee));
755 ee->element.size = el.size;
756 GNUNET_memcpy (&ee[1], el.data, el.size);
757 ee->element.data = &ee[1];
758 ee->element.element_type = el.element_type;
759 ee->remote = GNUNET_NO;
760 ee->mutations = NULL;
761 ee->mutations_size = 0;
762 ee->element_hash = hash;
763 GNUNET_break (GNUNET_YES ==
764 GNUNET_CONTAINER_multihashmap_put (
765 set->content->elements,
766 &ee->element_hash,
767 ee,
768 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
769 }
770 else if (GNUNET_YES ==
771 is_element_of_generation (ee,
772 set->current_generation,
773 set->excluded_generations,
774 set->excluded_generations_size))
775 {
776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777 "Client inserted element %s of size %u twice (ignored)\n",
778 GNUNET_h2s (&hash),
779 el.size);
780
781 /* same element inserted twice */
782 return;
783 }
784
785 {
786 struct MutationEvent mut = { .generation = set->current_generation,
787 .added = GNUNET_YES };
788 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
789 }
790 set->vt->add (set->state, ee);
791}
792
793
794/**
795 * Remove an element from @a set as specified by @a msg
796 *
797 * @param set set to manipulate
798 * @param msg message specifying the change
799 */
800static void
801execute_remove (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
802{
803 struct GNUNET_SET_Element el;
804 struct ElementEntry *ee;
805 struct GNUNET_HashCode hash;
806
807 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
808 el.size = ntohs (msg->header.size) - sizeof(*msg);
809 el.data = &msg[1];
810 el.element_type = ntohs (msg->element_type);
811 GNUNET_SET_element_hash (&el, &hash);
812 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
813 if (NULL == ee)
814 {
815 /* Client tried to remove non-existing element. */
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817 "Client removes non-existing element of size %u\n",
818 el.size);
819 return;
820 }
821 if (GNUNET_NO == is_element_of_generation (ee,
822 set->current_generation,
823 set->excluded_generations,
824 set->excluded_generations_size))
825 {
826 /* Client tried to remove element twice */
827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828 "Client removed element of size %u twice (ignored)\n",
829 el.size);
830 return;
831 }
832 else
833 {
834 struct MutationEvent mut = { .generation = set->current_generation,
835 .added = GNUNET_NO };
836
837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838 "Client removes element of size %u\n",
839 el.size);
840
841 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
842 }
843 set->vt->remove (set->state, ee);
844}
845
846
847/**
848 * Perform a mutation on a set as specified by the @a msg
849 *
850 * @param set the set to mutate
851 * @param msg specification of what to change
852 */
853static void
854execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
855{
856 switch (ntohs (msg->header.type))
857 {
858 case GNUNET_MESSAGE_TYPE_SET_ADD:
859 execute_add (set, msg);
860 break;
861
862 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
863 execute_remove (set, msg);
864 break;
865
866 default:
867 GNUNET_break (0);
868 }
869}
870
871
872/**
873 * Execute mutations that were delayed on a set because of
874 * pending operations.
875 *
876 * @param set the set to execute mutations on
877 */
878static void
879execute_delayed_mutations (struct Set *set)
880{
881 struct PendingMutation *pm;
882
883 if (0 != set->content->iterator_count)
884 return; /* still cannot do this */
885 while (NULL != (pm = set->content->pending_mutations_head))
886 {
887 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
888 set->content->pending_mutations_tail,
889 pm);
890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891 "Executing pending mutation on %p.\n",
892 pm->set);
893 execute_mutation (pm->set, pm->msg);
894 GNUNET_free (pm->msg);
895 GNUNET_free (pm);
896 }
897}
898
899
900/**
901 * Send the next element of a set to the set's client. The next element is given by
902 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
903 * are no more elements in the set. The caller must ensure that the set's iterator is
904 * valid.
905 *
906 * The client will acknowledge each received element with a
907 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
908 * #handle_client_iter_ack() will then trigger the next transmission.
909 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
910 *
911 * @param set set that should send its next element to its client
912 */
913static void
914send_client_element (struct Set *set)
915{
916 int ret;
917 struct ElementEntry *ee;
918 struct GNUNET_MQ_Envelope *ev;
919 struct GNUNET_SET_IterResponseMessage *msg;
920
921 GNUNET_assert (NULL != set->iter);
922 do
923 {
924 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
925 NULL,
926 (const void **) &ee);
927 if (GNUNET_NO == ret)
928 {
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
930 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
931 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
932 set->iter = NULL;
933 set->iteration_id++;
934 GNUNET_assert (set->content->iterator_count > 0);
935 set->content->iterator_count--;
936 execute_delayed_mutations (set);
937 GNUNET_MQ_send (set->cs->mq, ev);
938 return;
939 }
940 GNUNET_assert (NULL != ee);
941 }
942 while (GNUNET_NO ==
943 is_element_of_generation (ee,
944 set->iter_generation,
945 set->excluded_generations,
946 set->excluded_generations_size));
947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948 "Sending iteration element on %p.\n",
949 set);
950 ev = GNUNET_MQ_msg_extra (msg,
951 ee->element.size,
952 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
953 GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size);
954 msg->element_type = htons (ee->element.element_type);
955 msg->iteration_id = htons (set->iteration_id);
956 GNUNET_MQ_send (set->cs->mq, ev);
957}
958
959
960/**
961 * Called when a client wants to iterate the elements of a set.
962 * Checks if we have a set associated with the client and if we
963 * can right now start an iteration. If all checks out, starts
964 * sending the elements of the set to the client.
965 *
966 * @param cls client that sent the message
967 * @param m message sent by the client
968 */
969static void
970handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m)
971{
972 struct ClientState *cs = cls;
973 struct Set *set;
974
975 if (NULL == (set = cs->set))
976 {
977 /* attempt to iterate over a non existing set */
978 GNUNET_break (0);
979 GNUNET_SERVICE_client_drop (cs->client);
980 return;
981 }
982 if (NULL != set->iter)
983 {
984 /* Only one concurrent iterate-action allowed per set */
985 GNUNET_break (0);
986 GNUNET_SERVICE_client_drop (cs->client);
987 return;
988 }
989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990 "Iterating set %p in gen %u with %u content elements\n",
991 (void *) set,
992 set->current_generation,
993 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
994 GNUNET_SERVICE_client_continue (cs->client);
995 set->content->iterator_count++;
996 set->iter =
997 GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
998 set->iter_generation = set->current_generation;
999 send_client_element (set);
1000}
1001
1002
1003/**
1004 * Called when a client wants to create a new set. This is typically
1005 * the first request from a client, and includes the type of set
1006 * operation to be performed.
1007 *
1008 * @param cls client that sent the message
1009 * @param msg message sent by the client
1010 */
1011static void
1012handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg)
1013{
1014 struct ClientState *cs = cls;
1015 struct Set *set;
1016
1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018 "Client created new set (operation %u)\n",
1019 (uint32_t) ntohl (msg->operation));
1020 if (NULL != cs->set)
1021 {
1022 /* There can only be one set per client */
1023 GNUNET_break (0);
1024 GNUNET_SERVICE_client_drop (cs->client);
1025 return;
1026 }
1027 set = GNUNET_new (struct Set);
1028 switch (ntohl (msg->operation))
1029 {
1030 case GNUNET_SET_OPERATION_INTERSECTION:
1031 set->vt = _GSS_intersection_vt ();
1032 break;
1033
1034 case GNUNET_SET_OPERATION_UNION:
1035 set->vt = _GSS_union_vt ();
1036 break;
1037
1038 default:
1039 GNUNET_free (set);
1040 GNUNET_break (0);
1041 GNUNET_SERVICE_client_drop (cs->client);
1042 return;
1043 }
1044 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1045 set->state = set->vt->create ();
1046 if (NULL == set->state)
1047 {
1048 /* initialization failed (i.e. out of memory) */
1049 GNUNET_free (set);
1050 GNUNET_SERVICE_client_drop (cs->client);
1051 return;
1052 }
1053 set->content = GNUNET_new (struct SetContent);
1054 set->content->refcount = 1;
1055 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1056 set->cs = cs;
1057 cs->set = set;
1058 GNUNET_SERVICE_client_continue (cs->client);
1059}
1060
1061
1062/**
1063 * Timeout happens iff:
1064 * - we suggested an operation to our listener,
1065 * but did not receive a response in time
1066 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1067 *
1068 * @param cls channel context
1069 */
1070static void
1071incoming_timeout_cb (void *cls)
1072{
1073 struct Operation *op = cls;
1074
1075 op->timeout_task = NULL;
1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077 "Remote peer's incoming request timed out\n");
1078 incoming_destroy (op);
1079}
1080
1081
1082/**
1083 * Method called whenever another peer has added us to a channel the
1084 * other peer initiated. Only called (once) upon reception of data
1085 * from a channel we listen on.
1086 *
1087 * The channel context represents the operation itself and gets added
1088 * to a DLL, from where it gets looked up when our local listener
1089 * client responds to a proposed/suggested operation or connects and
1090 * associates with this operation.
1091 *
1092 * @param cls closure
1093 * @param channel new handle to the channel
1094 * @param source peer that started the channel
1095 * @return initial channel context for the channel
1096 * returns NULL on error
1097 */
1098static void *
1099channel_new_cb (void *cls,
1100 struct GNUNET_CADET_Channel *channel,
1101 const struct GNUNET_PeerIdentity *source)
1102{
1103 struct Listener *listener = cls;
1104 struct Operation *op;
1105
1106 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1107 op = GNUNET_new (struct Operation);
1108 op->listener = listener;
1109 op->peer = *source;
1110 op->channel = channel;
1111 op->mq = GNUNET_CADET_get_mq (op->channel);
1112 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1113 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1114 &incoming_timeout_cb,
1115 op);
1116 GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op);
1117 return op;
1118}
1119
1120
1121/**
1122 * Function called whenever a channel is destroyed. Should clean up
1123 * any associated state. It must NOT call
1124 * GNUNET_CADET_channel_destroy() on the channel.
1125 *
1126 * The peer_disconnect function is part of a a virtual table set initially either
1127 * when a peer creates a new channel with us, or once we create
1128 * a new channel ourselves (evaluate).
1129 *
1130 * Once we know the exact type of operation (union/intersection), the vt is
1131 * replaced with an operation specific instance (_GSS_[op]_vt).
1132 *
1133 * @param channel_ctx place where local state associated
1134 * with the channel is stored
1135 * @param channel connection to the other end (henceforth invalid)
1136 */
1137static void
1138channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1139{
1140 struct Operation *op = channel_ctx;
1141
1142 op->channel = NULL;
1143 _GSS_operation_destroy2 (op);
1144}
1145
1146
1147/**
1148 * This function probably should not exist
1149 * and be replaced by inlining more specific
1150 * logic in the various places where it is called.
1151 */
1152void
1153_GSS_operation_destroy2 (struct Operation *op)
1154{
1155 struct GNUNET_CADET_Channel *channel;
1156
1157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1158 if (NULL != (channel = op->channel))
1159 {
1160 /* This will free op; called conditionally as this helper function
1161 is also called from within the channel disconnect handler. */
1162 op->channel = NULL;
1163 GNUNET_CADET_channel_destroy (channel);
1164 }
1165 if (NULL != op->listener)
1166 {
1167 incoming_destroy (op);
1168 return;
1169 }
1170 if (NULL != op->set)
1171 op->set->vt->channel_death (op);
1172 else
1173 _GSS_operation_destroy (op, GNUNET_YES);
1174 GNUNET_free (op);
1175}
1176
1177
1178/**
1179 * Function called whenever an MQ-channel's transmission window size changes.
1180 *
1181 * The first callback in an outgoing channel will be with a non-zero value
1182 * and will mean the channel is connected to the destination.
1183 *
1184 * For an incoming channel it will be called immediately after the
1185 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1186 *
1187 * @param cls Channel closure.
1188 * @param channel Connection to the other end (henceforth invalid).
1189 * @param window_size New window size. If the is more messages than buffer size
1190 * this value will be negative..
1191 */
1192static void
1193channel_window_cb (void *cls,
1194 const struct GNUNET_CADET_Channel *channel,
1195 int window_size)
1196{
1197 /* FIXME: not implemented, we could do flow control here... */
1198}
1199
1200
1201/**
1202 * Called when a client wants to create a new listener.
1203 *
1204 * @param cls client that sent the message
1205 * @param msg message sent by the client
1206 */
1207static void
1208handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg)
1209{
1210 struct ClientState *cs = cls;
1211 struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1212 { GNUNET_MQ_hd_var_size (incoming_msg,
1213 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1214 struct OperationRequestMessage,
1215 NULL),
1216 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1217 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1218 struct IBFMessage,
1219 NULL),
1220 GNUNET_MQ_hd_var_size (union_p2p_elements,
1221 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1222 struct GNUNET_SET_ElementMessage,
1223 NULL),
1224 GNUNET_MQ_hd_var_size (union_p2p_offer,
1225 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1226 struct GNUNET_MessageHeader,
1227 NULL),
1228 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1229 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1230 struct InquiryMessage,
1231 NULL),
1232 GNUNET_MQ_hd_var_size (union_p2p_demand,
1233 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1234 struct GNUNET_MessageHeader,
1235 NULL),
1236 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1237 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1238 struct GNUNET_MessageHeader,
1239 NULL),
1240 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1241 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1242 struct GNUNET_MessageHeader,
1243 NULL),
1244 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1245 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1246 struct GNUNET_MessageHeader,
1247 NULL),
1248 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1249 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1250 struct GNUNET_MessageHeader,
1251 NULL),
1252 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1253 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1254 struct StrataEstimatorMessage,
1255 NULL),
1256 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1257 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1258 struct StrataEstimatorMessage,
1259 NULL),
1260 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1261 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1262 struct GNUNET_SET_ElementMessage,
1263 NULL),
1264 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1265 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1266 struct IntersectionElementInfoMessage,
1267 NULL),
1268 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1269 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1270 struct BFMessage,
1271 NULL),
1272 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1273 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1274 struct IntersectionDoneMessage,
1275 NULL),
1276 GNUNET_MQ_handler_end () };
1277 struct Listener *listener;
1278
1279 if (NULL != cs->listener)
1280 {
1281 /* max. one active listener per client! */
1282 GNUNET_break (0);
1283 GNUNET_SERVICE_client_drop (cs->client);
1284 return;
1285 }
1286 listener = GNUNET_new (struct Listener);
1287 listener->cs = cs;
1288 cs->listener = listener;
1289 listener->app_id = msg->app_id;
1290 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1291 GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener);
1292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1293 "New listener created (op %u, port %s)\n",
1294 listener->operation,
1295 GNUNET_h2s (&listener->app_id));
1296 listener->open_port = GNUNET_CADET_open_port (cadet,
1297 &msg->app_id,
1298 &channel_new_cb,
1299 listener,
1300 &channel_window_cb,
1301 &channel_end_cb,
1302 cadet_handlers);
1303 GNUNET_SERVICE_client_continue (cs->client);
1304}
1305
1306
1307/**
1308 * Called when the listening client rejects an operation
1309 * request by another peer.
1310 *
1311 * @param cls client that sent the message
1312 * @param msg message sent by the client
1313 */
1314static void
1315handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg)
1316{
1317 struct ClientState *cs = cls;
1318 struct Operation *op;
1319
1320 op = get_incoming (ntohl (msg->accept_reject_id));
1321 if (NULL == op)
1322 {
1323 /* no matching incoming operation for this reject;
1324 could be that the other peer already disconnected... */
1325 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1326 "Client rejected unknown operation %u\n",
1327 (unsigned int) ntohl (msg->accept_reject_id));
1328 GNUNET_SERVICE_client_continue (cs->client);
1329 return;
1330 }
1331 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332 "Peer request (op %u, app %s) rejected by client\n",
1333 op->listener->operation,
1334 GNUNET_h2s (&cs->listener->app_id));
1335 _GSS_operation_destroy2 (op);
1336 GNUNET_SERVICE_client_continue (cs->client);
1337}
1338
1339
1340/**
1341 * Called when a client wants to add or remove an element to a set it inhabits.
1342 *
1343 * @param cls client that sent the message
1344 * @param msg message sent by the client
1345 */
1346static int
1347check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1348{
1349 /* NOTE: Technically, we should probably check with the
1350 block library whether the element we are given is well-formed */
1351 return GNUNET_OK;
1352}
1353
1354
1355/**
1356 * Called when a client wants to add or remove an element to a set it inhabits.
1357 *
1358 * @param cls client that sent the message
1359 * @param msg message sent by the client
1360 */
1361static void
1362handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1363{
1364 struct ClientState *cs = cls;
1365 struct Set *set;
1366
1367 if (NULL == (set = cs->set))
1368 {
1369 /* client without a set requested an operation */
1370 GNUNET_break (0);
1371 GNUNET_SERVICE_client_drop (cs->client);
1372 return;
1373 }
1374 GNUNET_SERVICE_client_continue (cs->client);
1375
1376 if (0 != set->content->iterator_count)
1377 {
1378 struct PendingMutation *pm;
1379
1380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1381 pm = GNUNET_new (struct PendingMutation);
1382 pm->msg =
1383 (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1384 pm->set = set;
1385 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1386 set->content->pending_mutations_tail,
1387 pm);
1388 return;
1389 }
1390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1391 execute_mutation (set, msg);
1392}
1393
1394
1395/**
1396 * Advance the current generation of a set,
1397 * adding exclusion ranges if necessary.
1398 *
1399 * @param set the set where we want to advance the generation
1400 */
1401static void
1402advance_generation (struct Set *set)
1403{
1404 struct GenerationRange r;
1405
1406 if (set->current_generation == set->content->latest_generation)
1407 {
1408 set->content->latest_generation++;
1409 set->current_generation++;
1410 return;
1411 }
1412
1413 GNUNET_assert (set->current_generation < set->content->latest_generation);
1414
1415 r.start = set->current_generation + 1;
1416 r.end = set->content->latest_generation + 1;
1417 set->content->latest_generation = r.end;
1418 set->current_generation = r.end;
1419 GNUNET_array_append (set->excluded_generations,
1420 set->excluded_generations_size,
1421 r);
1422}
1423
1424
1425/**
1426 * Called when a client wants to initiate a set operation with another
1427 * peer. Initiates the CADET connection to the listener and sends the
1428 * request.
1429 *
1430 * @param cls client that sent the message
1431 * @param msg message sent by the client
1432 * @return #GNUNET_OK if the message is well-formed
1433 */
1434static int
1435check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1436{
1437 /* FIXME: suboptimal, even if the context below could be NULL,
1438 there are malformed messages this does not check for... */
1439 return GNUNET_OK;
1440}
1441
1442
1443/**
1444 * Called when a client wants to initiate a set operation with another
1445 * peer. Initiates the CADET connection to the listener and sends the
1446 * request.
1447 *
1448 * @param cls client that sent the message
1449 * @param msg message sent by the client
1450 */
1451static void
1452handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1453{
1454 struct ClientState *cs = cls;
1455 struct Operation *op = GNUNET_new (struct Operation);
1456 const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1457 { GNUNET_MQ_hd_var_size (incoming_msg,
1458 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1459 struct OperationRequestMessage,
1460 op),
1461 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1462 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1463 struct IBFMessage,
1464 op),
1465 GNUNET_MQ_hd_var_size (union_p2p_elements,
1466 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1467 struct GNUNET_SET_ElementMessage,
1468 op),
1469 GNUNET_MQ_hd_var_size (union_p2p_offer,
1470 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1471 struct GNUNET_MessageHeader,
1472 op),
1473 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1474 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1475 struct InquiryMessage,
1476 op),
1477 GNUNET_MQ_hd_var_size (union_p2p_demand,
1478 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1479 struct GNUNET_MessageHeader,
1480 op),
1481 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1482 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1483 struct GNUNET_MessageHeader,
1484 op),
1485 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1486 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1487 struct GNUNET_MessageHeader,
1488 op),
1489 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1490 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1491 struct GNUNET_MessageHeader,
1492 op),
1493 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1494 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1495 struct GNUNET_MessageHeader,
1496 op),
1497 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1498 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1499 struct StrataEstimatorMessage,
1500 op),
1501 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1502 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1503 struct StrataEstimatorMessage,
1504 op),
1505 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1506 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1507 struct GNUNET_SET_ElementMessage,
1508 op),
1509 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1510 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1511 struct IntersectionElementInfoMessage,
1512 op),
1513 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1514 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1515 struct BFMessage,
1516 op),
1517 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1518 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1519 struct IntersectionDoneMessage,
1520 op),
1521 GNUNET_MQ_handler_end () };
1522 struct Set *set;
1523 const struct GNUNET_MessageHeader *context;
1524
1525 if (NULL == (set = cs->set))
1526 {
1527 GNUNET_break (0);
1528 GNUNET_free (op);
1529 GNUNET_SERVICE_client_drop (cs->client);
1530 return;
1531 }
1532 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1533 op->peer = msg->target_peer;
1534 op->result_mode = ntohl (msg->result_mode);
1535 op->client_request_id = ntohl (msg->request_id);
1536 op->byzantine = msg->byzantine;
1537 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1538 op->force_full = msg->force_full;
1539 op->force_delta = msg->force_delta;
1540 context = GNUNET_MQ_extract_nested_mh (msg);
1541
1542 /* Advance generation values, so that
1543 mutations won't interfer with the running operation. */
1544 op->set = set;
1545 op->generation_created = set->current_generation;
1546 advance_generation (set);
1547 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1548 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1549 "Creating new CADET channel to port %s for set operation type %u\n",
1550 GNUNET_h2s (&msg->app_id),
1551 set->operation);
1552 op->channel = GNUNET_CADET_channel_create (cadet,
1553 op,
1554 &msg->target_peer,
1555 &msg->app_id,
1556 &channel_window_cb,
1557 &channel_end_cb,
1558 cadet_handlers);
1559 op->mq = GNUNET_CADET_get_mq (op->channel);
1560 op->state = set->vt->evaluate (op, context);
1561 if (NULL == op->state)
1562 {
1563 GNUNET_break (0);
1564 GNUNET_SERVICE_client_drop (cs->client);
1565 return;
1566 }
1567 GNUNET_SERVICE_client_continue (cs->client);
1568}
1569
1570
1571/**
1572 * Handle an ack from a client, and send the next element. Note
1573 * that we only expect acks for set elements, not after the
1574 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1575 *
1576 * @param cls client the client
1577 * @param ack the message
1578 */
1579static void
1580handle_client_iter_ack (void *cls, const struct GNUNET_SET_IterAckMessage *ack)
1581{
1582 struct ClientState *cs = cls;
1583 struct Set *set;
1584
1585 if (NULL == (set = cs->set))
1586 {
1587 /* client without a set acknowledged receiving a value */
1588 GNUNET_break (0);
1589 GNUNET_SERVICE_client_drop (cs->client);
1590 return;
1591 }
1592 if (NULL == set->iter)
1593 {
1594 /* client sent an ack, but we were not expecting one (as
1595 set iteration has finished) */
1596 GNUNET_break (0);
1597 GNUNET_SERVICE_client_drop (cs->client);
1598 return;
1599 }
1600 GNUNET_SERVICE_client_continue (cs->client);
1601 if (ntohl (ack->send_more))
1602 {
1603 send_client_element (set);
1604 }
1605 else
1606 {
1607 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1608 set->iter = NULL;
1609 set->iteration_id++;
1610 }
1611}
1612
1613
1614/**
1615 * Handle a request from the client to copy a set.
1616 *
1617 * @param cls the client
1618 * @param mh the message
1619 */
1620static void
1621handle_client_copy_lazy_prepare (void *cls,
1622 const struct GNUNET_MessageHeader *mh)
1623{
1624 struct ClientState *cs = cls;
1625 struct Set *set;
1626 struct LazyCopyRequest *cr;
1627 struct GNUNET_MQ_Envelope *ev;
1628 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1629
1630 if (NULL == (set = cs->set))
1631 {
1632 /* client without a set requested an operation */
1633 GNUNET_break (0);
1634 GNUNET_SERVICE_client_drop (cs->client);
1635 return;
1636 }
1637 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1638 "Client requested creation of lazy copy\n");
1639 cr = GNUNET_new (struct LazyCopyRequest);
1640 cr->cookie = ++lazy_copy_cookie;
1641 cr->source_set = set;
1642 GNUNET_CONTAINER_DLL_insert (lazy_copy_head, lazy_copy_tail, cr);
1643 ev = GNUNET_MQ_msg (resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1644 resp_msg->cookie = cr->cookie;
1645 GNUNET_MQ_send (set->cs->mq, ev);
1646 GNUNET_SERVICE_client_continue (cs->client);
1647}
1648
1649
1650/**
1651 * Handle a request from the client to connect to a copy of a set.
1652 *
1653 * @param cls the client
1654 * @param msg the message
1655 */
1656static void
1657handle_client_copy_lazy_connect (
1658 void *cls,
1659 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1660{
1661 struct ClientState *cs = cls;
1662 struct LazyCopyRequest *cr;
1663 struct Set *set;
1664 int found;
1665
1666 if (NULL != cs->set)
1667 {
1668 /* There can only be one set per client */
1669 GNUNET_break (0);
1670 GNUNET_SERVICE_client_drop (cs->client);
1671 return;
1672 }
1673 found = GNUNET_NO;
1674 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1675 {
1676 if (cr->cookie == msg->cookie)
1677 {
1678 found = GNUNET_YES;
1679 break;
1680 }
1681 }
1682 if (GNUNET_NO == found)
1683 {
1684 /* client asked for copy with cookie we don't know */
1685 GNUNET_break (0);
1686 GNUNET_SERVICE_client_drop (cs->client);
1687 return;
1688 }
1689 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, lazy_copy_tail, cr);
1690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1691 "Client %p requested use of lazy copy\n",
1692 cs);
1693 set = GNUNET_new (struct Set);
1694 switch (cr->source_set->operation)
1695 {
1696 case GNUNET_SET_OPERATION_INTERSECTION:
1697 set->vt = _GSS_intersection_vt ();
1698 break;
1699
1700 case GNUNET_SET_OPERATION_UNION:
1701 set->vt = _GSS_union_vt ();
1702 break;
1703
1704 default:
1705 GNUNET_assert (0);
1706 return;
1707 }
1708
1709 if (NULL == set->vt->copy_state)
1710 {
1711 /* Lazy copy not supported for this set operation */
1712 GNUNET_break (0);
1713 GNUNET_free (set);
1714 GNUNET_free (cr);
1715 GNUNET_SERVICE_client_drop (cs->client);
1716 return;
1717 }
1718
1719 set->operation = cr->source_set->operation;
1720 set->state = set->vt->copy_state (cr->source_set->state);
1721 set->content = cr->source_set->content;
1722 set->content->refcount++;
1723
1724 set->current_generation = cr->source_set->current_generation;
1725 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1726 set->excluded_generations =
1727 GNUNET_memdup (cr->source_set->excluded_generations,
1728 set->excluded_generations_size
1729 * sizeof(struct GenerationRange));
1730
1731 /* Advance the generation of the new set, so that mutations to the
1732 of the cloned set and the source set are independent. */
1733 advance_generation (set);
1734 set->cs = cs;
1735 cs->set = set;
1736 GNUNET_free (cr);
1737 GNUNET_SERVICE_client_continue (cs->client);
1738}
1739
1740
1741/**
1742 * Handle a request from the client to cancel a running set operation.
1743 *
1744 * @param cls the client
1745 * @param msg the message
1746 */
1747static void
1748handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg)
1749{
1750 struct ClientState *cs = cls;
1751 struct Set *set;
1752 struct Operation *op;
1753 int found;
1754
1755 if (NULL == (set = cs->set))
1756 {
1757 /* client without a set requested an operation */
1758 GNUNET_break (0);
1759 GNUNET_SERVICE_client_drop (cs->client);
1760 return;
1761 }
1762 found = GNUNET_NO;
1763 for (op = set->ops_head; NULL != op; op = op->next)
1764 {
1765 if (op->client_request_id == ntohl (msg->request_id))
1766 {
1767 found = GNUNET_YES;
1768 break;
1769 }
1770 }
1771 if (GNUNET_NO == found)
1772 {
1773 /* It may happen that the operation was already destroyed due to
1774 * the other peer disconnecting. The client may not know about this
1775 * yet and try to cancel the (just barely non-existent) operation.
1776 * So this is not a hard error.
1777 */GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1778 "Client canceled non-existent op %u\n",
1779 (uint32_t) ntohl (msg->request_id));
1780 }
1781 else
1782 {
1783 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784 "Client requested cancel for op %u\n",
1785 (uint32_t) ntohl (msg->request_id));
1786 _GSS_operation_destroy (op, GNUNET_YES);
1787 }
1788 GNUNET_SERVICE_client_continue (cs->client);
1789}
1790
1791
1792/**
1793 * Handle a request from the client to accept a set operation that
1794 * came from a remote peer. We forward the accept to the associated
1795 * operation for handling
1796 *
1797 * @param cls the client
1798 * @param msg the message
1799 */
1800static void
1801handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg)
1802{
1803 struct ClientState *cs = cls;
1804 struct Set *set;
1805 struct Operation *op;
1806 struct GNUNET_SET_ResultMessage *result_message;
1807 struct GNUNET_MQ_Envelope *ev;
1808 struct Listener *listener;
1809
1810 if (NULL == (set = cs->set))
1811 {
1812 /* client without a set requested to accept */
1813 GNUNET_break (0);
1814 GNUNET_SERVICE_client_drop (cs->client);
1815 return;
1816 }
1817 op = get_incoming (ntohl (msg->accept_reject_id));
1818 if (NULL == op)
1819 {
1820 /* It is not an error if the set op does not exist -- it may
1821 * have been destroyed when the partner peer disconnected. */
1822 GNUNET_log (
1823 GNUNET_ERROR_TYPE_INFO,
1824 "Client %p accepted request %u of listener %p that is no longer active\n",
1825 cs,
1826 ntohl (msg->accept_reject_id),
1827 cs->listener);
1828 ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1829 result_message->request_id = msg->request_id;
1830 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1831 GNUNET_MQ_send (set->cs->mq, ev);
1832 GNUNET_SERVICE_client_continue (cs->client);
1833 return;
1834 }
1835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1836 "Client accepting request %u\n",
1837 (uint32_t) ntohl (msg->accept_reject_id));
1838 listener = op->listener;
1839 op->listener = NULL;
1840 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
1841 op->set = set;
1842 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1843 op->client_request_id = ntohl (msg->request_id);
1844 op->result_mode = ntohl (msg->result_mode);
1845 op->byzantine = msg->byzantine;
1846 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1847 op->force_full = msg->force_full;
1848 op->force_delta = msg->force_delta;
1849
1850 /* Advance generation values, so that future mutations do not
1851 interfer with the running operation. */
1852 op->generation_created = set->current_generation;
1853 advance_generation (set);
1854 GNUNET_assert (NULL == op->state);
1855 op->state = set->vt->accept (op);
1856 if (NULL == op->state)
1857 {
1858 GNUNET_break (0);
1859 GNUNET_SERVICE_client_drop (cs->client);
1860 return;
1861 }
1862 /* Now allow CADET to continue, as we did not do this in
1863 #handle_incoming_msg (as we wanted to first see if the
1864 local client would accept the request). */
1865 GNUNET_CADET_receive_done (op->channel);
1866 GNUNET_SERVICE_client_continue (cs->client);
1867}
1868
1869
1870/**
1871 * Called to clean up, after a shutdown has been requested.
1872 *
1873 * @param cls closure, NULL
1874 */
1875static void
1876shutdown_task (void *cls)
1877{
1878 /* Delay actual shutdown to allow service to disconnect clients */
1879 in_shutdown = GNUNET_YES;
1880 if (0 == num_clients)
1881 {
1882 if (NULL != cadet)
1883 {
1884 GNUNET_CADET_disconnect (cadet);
1885 cadet = NULL;
1886 }
1887 }
1888 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
1889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1890}
1891
1892
1893/**
1894 * Function called by the service's run
1895 * method to run service-specific setup code.
1896 *
1897 * @param cls closure
1898 * @param cfg configuration to use
1899 * @param service the initialized service
1900 */
1901static void
1902run (void *cls,
1903 const struct GNUNET_CONFIGURATION_Handle *cfg,
1904 struct GNUNET_SERVICE_Handle *service)
1905{
1906 /* FIXME: need to modify SERVICE (!) API to allow
1907 us to run a shutdown task *after* clients were
1908 forcefully disconnected! */
1909 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1910 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
1911 cadet = GNUNET_CADET_connect (cfg);
1912 if (NULL == cadet)
1913 {
1914 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1915 _ ("Could not connect to CADET service\n"));
1916 GNUNET_SCHEDULER_shutdown ();
1917 return;
1918 }
1919}
1920
1921
1922/**
1923 * Define "main" method using service macro.
1924 */
1925GNUNET_SERVICE_MAIN (
1926 "set",
1927 GNUNET_SERVICE_OPTION_NONE,
1928 &run,
1929 &client_connect_cb,
1930 &client_disconnect_cb,
1931 NULL,
1932 GNUNET_MQ_hd_fixed_size (client_accept,
1933 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1934 struct GNUNET_SET_AcceptMessage,
1935 NULL),
1936 GNUNET_MQ_hd_fixed_size (client_iter_ack,
1937 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1938 struct GNUNET_SET_IterAckMessage,
1939 NULL),
1940 GNUNET_MQ_hd_var_size (client_mutation,
1941 GNUNET_MESSAGE_TYPE_SET_ADD,
1942 struct GNUNET_SET_ElementMessage,
1943 NULL),
1944 GNUNET_MQ_hd_fixed_size (client_create_set,
1945 GNUNET_MESSAGE_TYPE_SET_CREATE,
1946 struct GNUNET_SET_CreateMessage,
1947 NULL),
1948 GNUNET_MQ_hd_fixed_size (client_iterate,
1949 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1950 struct GNUNET_MessageHeader,
1951 NULL),
1952 GNUNET_MQ_hd_var_size (client_evaluate,
1953 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1954 struct GNUNET_SET_EvaluateMessage,
1955 NULL),
1956 GNUNET_MQ_hd_fixed_size (client_listen,
1957 GNUNET_MESSAGE_TYPE_SET_LISTEN,
1958 struct GNUNET_SET_ListenMessage,
1959 NULL),
1960 GNUNET_MQ_hd_fixed_size (client_reject,
1961 GNUNET_MESSAGE_TYPE_SET_REJECT,
1962 struct GNUNET_SET_RejectMessage,
1963 NULL),
1964 GNUNET_MQ_hd_var_size (client_mutation,
1965 GNUNET_MESSAGE_TYPE_SET_REMOVE,
1966 struct GNUNET_SET_ElementMessage,
1967 NULL),
1968 GNUNET_MQ_hd_fixed_size (client_cancel,
1969 GNUNET_MESSAGE_TYPE_SET_CANCEL,
1970 struct GNUNET_SET_CancelMessage,
1971 NULL),
1972 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
1973 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1974 struct GNUNET_MessageHeader,
1975 NULL),
1976 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
1977 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1978 struct GNUNET_SET_CopyLazyConnectMessage,
1979 NULL),
1980 GNUNET_MQ_handler_end ());
1981
1982
1983/* end of gnunet-service-set.c */