aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set.c')
-rw-r--r--src/set/gnunet-service-set.c1984
1 files changed, 0 insertions, 1984 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
deleted file mode 100644
index 2b859d81a..000000000
--- a/src/set/gnunet-service-set.c
+++ /dev/null
@@ -1,1984 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-set.c
22 * @brief two-peer set operations
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "gnunet-service-set.h"
27#include "gnunet-service-set_union.h"
28#include "gnunet-service-set_intersection.h"
29#include "gnunet-service-set_protocol.h"
30#include "gnunet_statistics_service.h"
31
32/**
33 * How long do we hold on to an incoming channel if there is
34 * no local listener before giving up?
35 */
36#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38
39/**
40 * Lazy copy requests made by a client.
41 */
42struct LazyCopyRequest
43{
44 /**
45 * Kept in a DLL.
46 */
47 struct LazyCopyRequest *prev;
48
49 /**
50 * Kept in a DLL.
51 */
52 struct LazyCopyRequest *next;
53
54 /**
55 * Which set are we supposed to copy?
56 */
57 struct Set *source_set;
58
59 /**
60 * Cookie identifying the request.
61 */
62 uint32_t cookie;
63};
64
65
66/**
67 * A listener is inhabited by a client, and waits for evaluation
68 * requests from remote peers.
69 */
70struct Listener
71{
72 /**
73 * Listeners are held in a doubly linked list.
74 */
75 struct Listener *next;
76
77 /**
78 * Listeners are held in a doubly linked list.
79 */
80 struct Listener *prev;
81
82 /**
83 * Head of DLL of operations this listener is responsible for.
84 * Once the client has accepted/declined the operation, the
85 * operation is moved to the respective set's operation DLLS.
86 */
87 struct Operation *op_head;
88
89 /**
90 * Tail of DLL of operations this listener is responsible for.
91 * Once the client has accepted/declined the operation, the
92 * operation is moved to the respective set's operation DLLS.
93 */
94 struct Operation *op_tail;
95
96 /**
97 * Client that owns the listener.
98 * Only one client may own a listener.
99 */
100 struct ClientState *cs;
101
102 /**
103 * The port we are listening on with CADET.
104 */
105 struct GNUNET_CADET_Port *open_port;
106
107 /**
108 * Application ID for the operation, used to distinguish
109 * multiple operations of the same type with the same peer.
110 */
111 struct GNUNET_HashCode app_id;
112
113 /**
114 * The type of the operation.
115 */
116 enum GNUNET_SET_OperationType operation;
117};
118
119
120/**
121 * Handle to the cadet service, used to listen for and connect to
122 * remote peers.
123 */
124static struct GNUNET_CADET_Handle *cadet;
125
126/**
127 * DLL of lazy copy requests by this client.
128 */
129static struct LazyCopyRequest *lazy_copy_head;
130
131/**
132 * DLL of lazy copy requests by this client.
133 */
134static struct LazyCopyRequest *lazy_copy_tail;
135
136/**
137 * Generator for unique cookie we set per lazy copy request.
138 */
139static uint32_t lazy_copy_cookie;
140
141/**
142 * Statistics handle.
143 */
144struct GNUNET_STATISTICS_Handle *_GSS_statistics;
145
146/**
147 * Listeners are held in a doubly linked list.
148 */
149static struct Listener *listener_head;
150
151/**
152 * Listeners are held in a doubly linked list.
153 */
154static struct Listener *listener_tail;
155
156/**
157 * Number of active clients.
158 */
159static unsigned int num_clients;
160
161/**
162 * Are we in shutdown? if #GNUNET_YES and the number of clients
163 * drops to zero, disconnect from CADET.
164 */
165static int in_shutdown;
166
167/**
168 * Counter for allocating unique IDs for clients, used to identify
169 * incoming operation requests from remote peers, that the client can
170 * choose to accept or refuse. 0 must not be used (reserved for
171 * uninitialized).
172 */
173static uint32_t suggest_id;
174
175
176/**
177 * Get the incoming socket associated with the given id.
178 *
179 * @param listener the listener to look in
180 * @param id id to look for
181 * @return the incoming socket associated with the id,
182 * or NULL if there is none
183 */
184static struct Operation *
185get_incoming (uint32_t id)
186{
187 for (struct Listener *listener = listener_head; NULL != listener;
188 listener = listener->next)
189 {
190 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191 if (op->suggest_id == id)
192 return op;
193 }
194 return NULL;
195}
196
197
198/**
199 * Destroy an incoming request from a remote peer
200 *
201 * @param op remote request to destroy
202 */
203static void
204incoming_destroy (struct Operation *op)
205{
206 struct Listener *listener;
207
208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
209 "Destroying incoming operation %p\n",
210 op);
211 if (NULL != (listener = op->listener))
212 {
213 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
214 op->listener = NULL;
215 }
216 if (NULL != op->timeout_task)
217 {
218 GNUNET_SCHEDULER_cancel (op->timeout_task);
219 op->timeout_task = NULL;
220 }
221 _GSS_operation_destroy2 (op);
222}
223
224
225/**
226 * Context for the #garbage_collect_cb().
227 */
228struct GarbageContext
229{
230 /**
231 * Map for which we are garbage collecting removed elements.
232 */
233 struct GNUNET_CONTAINER_MultiHashMap *map;
234
235 /**
236 * Lowest generation for which an operation is still pending.
237 */
238 unsigned int min_op_generation;
239
240 /**
241 * Largest generation for which an operation is still pending.
242 */
243 unsigned int max_op_generation;
244};
245
246
247/**
248 * Function invoked to check if an element can be removed from
249 * the set's history because it is no longer needed.
250 *
251 * @param cls the `struct GarbageContext *`
252 * @param key key of the element in the map
253 * @param value the `struct ElementEntry *`
254 * @return #GNUNET_OK (continue to iterate)
255 */
256static int
257garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
258{
259 // struct GarbageContext *gc = cls;
260 // struct ElementEntry *ee = value;
261
262 // if (GNUNET_YES != ee->removed)
263 // return GNUNET_OK;
264 // if ( (gc->max_op_generation < ee->generation_added) ||
265 // (ee->generation_removed > gc->min_op_generation) )
266 // {
267 // GNUNET_assert (GNUNET_YES ==
268 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
269 // key,
270 // ee));
271 // GNUNET_free (ee);
272 // }
273 return GNUNET_OK;
274}
275
276
277/**
278 * Collect and destroy elements that are not needed anymore, because
279 * their lifetime (as determined by their generation) does not overlap
280 * with any active set operation.
281 *
282 * @param set set to garbage collect
283 */
284static void
285collect_generation_garbage (struct Set *set)
286{
287 struct GarbageContext gc;
288
289 gc.min_op_generation = UINT_MAX;
290 gc.max_op_generation = 0;
291 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
292 {
293 gc.min_op_generation =
294 GNUNET_MIN (gc.min_op_generation, op->generation_created);
295 gc.max_op_generation =
296 GNUNET_MAX (gc.max_op_generation, op->generation_created);
297 }
298 gc.map = set->content->elements;
299 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
300 &garbage_collect_cb,
301 &gc);
302}
303
304
305/**
306 * Is @a generation in the range of exclusions?
307 *
308 * @param generation generation to query
309 * @param excluded array of generations where the element is excluded
310 * @param excluded_size length of the @a excluded array
311 * @return #GNUNET_YES if @a generation is in any of the ranges
312 */
313static int
314is_excluded_generation (unsigned int generation,
315 struct GenerationRange *excluded,
316 unsigned int excluded_size)
317{
318 for (unsigned int i = 0; i < excluded_size; i++)
319 if ((generation >= excluded[i].start) && (generation < excluded[i].end))
320 return GNUNET_YES;
321 return GNUNET_NO;
322}
323
324
325/**
326 * Is element @a ee part of the set during @a query_generation?
327 *
328 * @param ee element to test
329 * @param query_generation generation to query
330 * @param excluded array of generations where the element is excluded
331 * @param excluded_size length of the @a excluded array
332 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
333 */
334static int
335is_element_of_generation (struct ElementEntry *ee,
336 unsigned int query_generation,
337 struct GenerationRange *excluded,
338 unsigned int excluded_size)
339{
340 struct MutationEvent *mut;
341 int is_present;
342
343 GNUNET_assert (NULL != ee->mutations);
344 if (GNUNET_YES ==
345 is_excluded_generation (query_generation, excluded, excluded_size))
346 {
347 GNUNET_break (0);
348 return GNUNET_NO;
349 }
350
351 is_present = GNUNET_NO;
352
353 /* Could be made faster with binary search, but lists
354 are small, so why bother. */
355 for (unsigned int i = 0; i < ee->mutations_size; i++)
356 {
357 mut = &ee->mutations[i];
358
359 if (mut->generation > query_generation)
360 {
361 /* The mutation doesn't apply to our generation
362 anymore. We can'b break here, since mutations aren't
363 sorted by generation. */
364 continue;
365 }
366
367 if (GNUNET_YES ==
368 is_excluded_generation (mut->generation, excluded, excluded_size))
369 {
370 /* The generation is excluded (because it belongs to another
371 fork via a lazy copy) and thus mutations aren't considered
372 for membership testing. */
373 continue;
374 }
375
376 /* This would be an inconsistency in how we manage mutations. */
377 if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
378 GNUNET_assert (0);
379 /* Likewise. */
380 if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
381 GNUNET_assert (0);
382
383 is_present = mut->added;
384 }
385
386 return is_present;
387}
388
389
390/**
391 * Is element @a ee part of the set used by @a op?
392 *
393 * @param ee element to test
394 * @param op operation the defines the set and its generation
395 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
396 */
397int
398_GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op)
399{
400 return is_element_of_generation (ee,
401 op->generation_created,
402 op->set->excluded_generations,
403 op->set->excluded_generations_size);
404}
405
406
407/**
408 * Destroy the given operation. Used for any operation where both
409 * peers were known and that thus actually had a vt and channel. Must
410 * not be used for operations where 'listener' is still set and we do
411 * not know the other peer.
412 *
413 * Call the implementation-specific cancel function of the operation.
414 * Disconnects from the remote peer. Does not disconnect the client,
415 * as there may be multiple operations per set.
416 *
417 * @param op operation to destroy
418 * @param gc #GNUNET_YES to perform garbage collection on the set
419 */
420void
421_GSS_operation_destroy (struct Operation *op, int gc)
422{
423 struct Set *set = op->set;
424 struct GNUNET_CADET_Channel *channel;
425
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
427 GNUNET_assert (NULL == op->listener);
428 if (NULL != op->state)
429 {
430 set->vt->cancel (op);
431 op->state = NULL;
432 }
433 if (NULL != set)
434 {
435 GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op);
436 op->set = NULL;
437 }
438 if (NULL != op->context_msg)
439 {
440 GNUNET_free (op->context_msg);
441 op->context_msg = NULL;
442 }
443 if (NULL != (channel = op->channel))
444 {
445 /* This will free op; called conditionally as this helper function
446 is also called from within the channel disconnect handler. */
447 op->channel = NULL;
448 GNUNET_CADET_channel_destroy (channel);
449 }
450 if ((NULL != set) && (GNUNET_YES == gc))
451 collect_generation_garbage (set);
452 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
453 * there was a channel end handler that will free 'op' on the call stack. */
454}
455
456
457/**
458 * Callback called when a client connects to the service.
459 *
460 * @param cls closure for the service
461 * @param c the new client that connected to the service
462 * @param mq the message queue used to send messages to the client
463 * @return @a `struct ClientState`
464 */
465static void *
466client_connect_cb (void *cls,
467 struct GNUNET_SERVICE_Client *c,
468 struct GNUNET_MQ_Handle *mq)
469{
470 struct ClientState *cs;
471
472 num_clients++;
473 cs = GNUNET_new (struct ClientState);
474 cs->client = c;
475 cs->mq = mq;
476 return cs;
477}
478
479
480/**
481 * Iterator over hash map entries to free element entries.
482 *
483 * @param cls closure
484 * @param key current key code
485 * @param value a `struct ElementEntry *` to be free'd
486 * @return #GNUNET_YES (continue to iterate)
487 */
488static int
489destroy_elements_iterator (void *cls,
490 const struct GNUNET_HashCode *key,
491 void *value)
492{
493 struct ElementEntry *ee = value;
494
495 GNUNET_free (ee->mutations);
496 GNUNET_free (ee);
497 return GNUNET_YES;
498}
499
500
501/**
502 * Clean up after a client has disconnected
503 *
504 * @param cls closure, unused
505 * @param client the client to clean up after
506 * @param internal_cls the `struct ClientState`
507 */
508static void
509client_disconnect_cb (void *cls,
510 struct GNUNET_SERVICE_Client *client,
511 void *internal_cls)
512{
513 struct ClientState *cs = internal_cls;
514 struct Operation *op;
515 struct Listener *listener;
516 struct Set *set;
517
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
519 if (NULL != (set = cs->set))
520 {
521 struct SetContent *content = set->content;
522 struct PendingMutation *pm;
523 struct PendingMutation *pm_current;
524 struct LazyCopyRequest *lcr;
525
526 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
527 /* Destroy pending set operations */
528 while (NULL != set->ops_head)
529 _GSS_operation_destroy (set->ops_head, GNUNET_NO);
530
531 /* Destroy operation-specific state */
532 GNUNET_assert (NULL != set->state);
533 set->vt->destroy_set (set->state);
534 set->state = NULL;
535
536 /* Clean up ongoing iterations */
537 if (NULL != set->iter)
538 {
539 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
540 set->iter = NULL;
541 set->iteration_id++;
542 }
543
544 /* discard any pending mutations that reference this set */
545 pm = content->pending_mutations_head;
546 while (NULL != pm)
547 {
548 pm_current = pm;
549 pm = pm->next;
550 if (pm_current->set == set)
551 {
552 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
553 content->pending_mutations_tail,
554 pm_current);
555 GNUNET_free (pm_current);
556 }
557 }
558
559 /* free set content (or at least decrement RC) */
560 set->content = NULL;
561 GNUNET_assert (0 != content->refcount);
562 content->refcount--;
563 if (0 == content->refcount)
564 {
565 GNUNET_assert (NULL != content->elements);
566 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
567 &destroy_elements_iterator,
568 NULL);
569 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
570 content->elements = NULL;
571 GNUNET_free (content);
572 }
573 GNUNET_free (set->excluded_generations);
574 set->excluded_generations = NULL;
575
576 /* remove set from pending copy requests */
577 lcr = lazy_copy_head;
578 while (NULL != lcr)
579 {
580 struct LazyCopyRequest *lcr_current = lcr;
581
582 lcr = lcr->next;
583 if (lcr_current->source_set == set)
584 {
585 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
586 lazy_copy_tail,
587 lcr_current);
588 GNUNET_free (lcr_current);
589 }
590 }
591 GNUNET_free (set);
592 }
593
594 if (NULL != (listener = cs->listener))
595 {
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
597 GNUNET_CADET_close_port (listener->open_port);
598 listener->open_port = NULL;
599 while (NULL != (op = listener->op_head))
600 {
601 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
602 "Destroying incoming operation `%u' from peer `%s'\n",
603 (unsigned int) op->client_request_id,
604 GNUNET_i2s (&op->peer));
605 incoming_destroy (op);
606 }
607 GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
608 GNUNET_free (listener);
609 }
610 GNUNET_free (cs);
611 num_clients--;
612 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
613 {
614 if (NULL != cadet)
615 {
616 GNUNET_CADET_disconnect (cadet);
617 cadet = NULL;
618 }
619 }
620}
621
622
623/**
624 * Check a request for a set operation from another peer.
625 *
626 * @param cls the operation state
627 * @param msg the received message
628 * @return #GNUNET_OK if the channel should be kept alive,
629 * #GNUNET_SYSERR to destroy the channel
630 */
631static int
632check_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
633{
634 struct Operation *op = cls;
635 struct Listener *listener = op->listener;
636 const struct GNUNET_MessageHeader *nested_context;
637
638 /* double operation request */
639 if (0 != op->suggest_id)
640 {
641 GNUNET_break_op (0);
642 return GNUNET_SYSERR;
643 }
644 /* This should be equivalent to the previous condition, but can't hurt to check twice */
645 if (NULL == op->listener)
646 {
647 GNUNET_break (0);
648 return GNUNET_SYSERR;
649 }
650 if (listener->operation !=
651 (enum GNUNET_SET_OperationType) ntohl (msg->operation))
652 {
653 GNUNET_break_op (0);
654 return GNUNET_SYSERR;
655 }
656 nested_context = GNUNET_MQ_extract_nested_mh (msg);
657 if ((NULL != nested_context) &&
658 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
659 {
660 GNUNET_break_op (0);
661 return GNUNET_SYSERR;
662 }
663 return GNUNET_OK;
664}
665
666
667/**
668 * Handle a request for a set operation from another peer. Checks if we
669 * have a listener waiting for such a request (and in that case initiates
670 * asking the listener about accepting the connection). If no listener
671 * is waiting, we queue the operation request in hope that a listener
672 * shows up soon (before timeout).
673 *
674 * This msg is expected as the first and only msg handled through the
675 * non-operation bound virtual table, acceptance of this operation replaces
676 * our virtual table and subsequent msgs would be routed differently (as
677 * we then know what type of operation this is).
678 *
679 * @param cls the operation state
680 * @param msg the received message
681 * @return #GNUNET_OK if the channel should be kept alive,
682 * #GNUNET_SYSERR to destroy the channel
683 */
684static void
685handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
686{
687 struct Operation *op = cls;
688 struct Listener *listener = op->listener;
689 const struct GNUNET_MessageHeader *nested_context;
690 struct GNUNET_MQ_Envelope *env;
691 struct GNUNET_SET_RequestMessage *cmsg;
692
693 nested_context = GNUNET_MQ_extract_nested_mh (msg);
694 /* Make a copy of the nested_context (application-specific context
695 information that is opaque to set) so we can pass it to the
696 listener later on */
697 if (NULL != nested_context)
698 op->context_msg = GNUNET_copy_message (nested_context);
699 op->remote_element_count = ntohl (msg->element_count);
700 GNUNET_log (
701 GNUNET_ERROR_TYPE_DEBUG,
702 "Received P2P operation request (op %u, port %s) for active listener\n",
703 (uint32_t) ntohl (msg->operation),
704 GNUNET_h2s (&op->listener->app_id));
705 GNUNET_assert (0 == op->suggest_id);
706 if (0 == suggest_id)
707 suggest_id++;
708 op->suggest_id = suggest_id++;
709 GNUNET_assert (NULL != op->timeout_task);
710 GNUNET_SCHEDULER_cancel (op->timeout_task);
711 op->timeout_task = NULL;
712 env = GNUNET_MQ_msg_nested_mh (cmsg,
713 GNUNET_MESSAGE_TYPE_SET_REQUEST,
714 op->context_msg);
715 GNUNET_log (
716 GNUNET_ERROR_TYPE_DEBUG,
717 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
718 op->suggest_id,
719 listener,
720 listener->cs);
721 cmsg->accept_id = htonl (op->suggest_id);
722 cmsg->peer_id = op->peer;
723 GNUNET_MQ_send (listener->cs->mq, env);
724 /* NOTE: GNUNET_CADET_receive_done() will be called in
725 #handle_client_accept() */
726}
727
728
729/**
730 * Add an element to @a set as specified by @a msg
731 *
732 * @param set set to manipulate
733 * @param msg message specifying the change
734 */
735static void
736execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
737{
738 struct GNUNET_SET_Element el;
739 struct ElementEntry *ee;
740 struct GNUNET_HashCode hash;
741
742 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
743 el.size = ntohs (msg->header.size) - sizeof(*msg);
744 el.data = &msg[1];
745 el.element_type = ntohs (msg->element_type);
746 GNUNET_SET_element_hash (&el, &hash);
747 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
748 if (NULL == ee)
749 {
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "Client inserts element %s of size %u\n",
752 GNUNET_h2s (&hash),
753 el.size);
754 ee = GNUNET_malloc (el.size + sizeof(*ee));
755 ee->element.size = el.size;
756 GNUNET_memcpy (&ee[1], el.data, el.size);
757 ee->element.data = &ee[1];
758 ee->element.element_type = el.element_type;
759 ee->remote = GNUNET_NO;
760 ee->mutations = NULL;
761 ee->mutations_size = 0;
762 ee->element_hash = hash;
763 GNUNET_break (GNUNET_YES ==
764 GNUNET_CONTAINER_multihashmap_put (
765 set->content->elements,
766 &ee->element_hash,
767 ee,
768 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
769 }
770 else if (GNUNET_YES ==
771 is_element_of_generation (ee,
772 set->current_generation,
773 set->excluded_generations,
774 set->excluded_generations_size))
775 {
776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
777 "Client inserted element %s of size %u twice (ignored)\n",
778 GNUNET_h2s (&hash),
779 el.size);
780
781 /* same element inserted twice */
782 return;
783 }
784
785 {
786 struct MutationEvent mut = { .generation = set->current_generation,
787 .added = GNUNET_YES };
788 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
789 }
790 set->vt->add (set->state, ee);
791}
792
793
794/**
795 * Remove an element from @a set as specified by @a msg
796 *
797 * @param set set to manipulate
798 * @param msg message specifying the change
799 */
800static void
801execute_remove (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
802{
803 struct GNUNET_SET_Element el;
804 struct ElementEntry *ee;
805 struct GNUNET_HashCode hash;
806
807 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
808 el.size = ntohs (msg->header.size) - sizeof(*msg);
809 el.data = &msg[1];
810 el.element_type = ntohs (msg->element_type);
811 GNUNET_SET_element_hash (&el, &hash);
812 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
813 if (NULL == ee)
814 {
815 /* Client tried to remove non-existing element. */
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817 "Client removes non-existing element of size %u\n",
818 el.size);
819 return;
820 }
821 if (GNUNET_NO == is_element_of_generation (ee,
822 set->current_generation,
823 set->excluded_generations,
824 set->excluded_generations_size))
825 {
826 /* Client tried to remove element twice */
827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828 "Client removed element of size %u twice (ignored)\n",
829 el.size);
830 return;
831 }
832 else
833 {
834 struct MutationEvent mut = { .generation = set->current_generation,
835 .added = GNUNET_NO };
836
837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838 "Client removes element of size %u\n",
839 el.size);
840
841 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
842 }
843 set->vt->remove (set->state, ee);
844}
845
846
847/**
848 * Perform a mutation on a set as specified by the @a msg
849 *
850 * @param set the set to mutate
851 * @param msg specification of what to change
852 */
853static void
854execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
855{
856 switch (ntohs (msg->header.type))
857 {
858 case GNUNET_MESSAGE_TYPE_SET_ADD:
859 execute_add (set, msg);
860 break;
861
862 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
863 execute_remove (set, msg);
864 break;
865
866 default:
867 GNUNET_break (0);
868 }
869}
870
871
872/**
873 * Execute mutations that were delayed on a set because of
874 * pending operations.
875 *
876 * @param set the set to execute mutations on
877 */
878static void
879execute_delayed_mutations (struct Set *set)
880{
881 struct PendingMutation *pm;
882
883 if (0 != set->content->iterator_count)
884 return; /* still cannot do this */
885 while (NULL != (pm = set->content->pending_mutations_head))
886 {
887 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
888 set->content->pending_mutations_tail,
889 pm);
890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891 "Executing pending mutation on %p.\n",
892 pm->set);
893 execute_mutation (pm->set, pm->msg);
894 GNUNET_free (pm->msg);
895 GNUNET_free (pm);
896 }
897}
898
899
900/**
901 * Send the next element of a set to the set's client. The next element is given by
902 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
903 * are no more elements in the set. The caller must ensure that the set's iterator is
904 * valid.
905 *
906 * The client will acknowledge each received element with a
907 * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
908 * #handle_client_iter_ack() will then trigger the next transmission.
909 * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
910 *
911 * @param set set that should send its next element to its client
912 */
913static void
914send_client_element (struct Set *set)
915{
916 int ret;
917 struct ElementEntry *ee;
918 struct GNUNET_MQ_Envelope *ev;
919 struct GNUNET_SET_IterResponseMessage *msg;
920
921 GNUNET_assert (NULL != set->iter);
922 do
923 {
924 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
925 NULL,
926 (const void **) &ee);
927 if (GNUNET_NO == ret)
928 {
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
930 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
931 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
932 set->iter = NULL;
933 set->iteration_id++;
934 GNUNET_assert (set->content->iterator_count > 0);
935 set->content->iterator_count--;
936 execute_delayed_mutations (set);
937 GNUNET_MQ_send (set->cs->mq, ev);
938 return;
939 }
940 GNUNET_assert (NULL != ee);
941 }
942 while (GNUNET_NO ==
943 is_element_of_generation (ee,
944 set->iter_generation,
945 set->excluded_generations,
946 set->excluded_generations_size));
947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948 "Sending iteration element on %p.\n",
949 set);
950 ev = GNUNET_MQ_msg_extra (msg,
951 ee->element.size,
952 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
953 GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size);
954 msg->element_type = htons (ee->element.element_type);
955 msg->iteration_id = htons (set->iteration_id);
956 GNUNET_MQ_send (set->cs->mq, ev);
957}
958
959
960/**
961 * Called when a client wants to iterate the elements of a set.
962 * Checks if we have a set associated with the client and if we
963 * can right now start an iteration. If all checks out, starts
964 * sending the elements of the set to the client.
965 *
966 * @param cls client that sent the message
967 * @param m message sent by the client
968 */
969static void
970handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m)
971{
972 struct ClientState *cs = cls;
973 struct Set *set;
974
975 if (NULL == (set = cs->set))
976 {
977 /* attempt to iterate over a non existing set */
978 GNUNET_break (0);
979 GNUNET_SERVICE_client_drop (cs->client);
980 return;
981 }
982 if (NULL != set->iter)
983 {
984 /* Only one concurrent iterate-action allowed per set */
985 GNUNET_break (0);
986 GNUNET_SERVICE_client_drop (cs->client);
987 return;
988 }
989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990 "Iterating set %p in gen %u with %u content elements\n",
991 (void *) set,
992 set->current_generation,
993 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
994 GNUNET_SERVICE_client_continue (cs->client);
995 set->content->iterator_count++;
996 set->iter =
997 GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
998 set->iter_generation = set->current_generation;
999 send_client_element (set);
1000}
1001
1002
1003/**
1004 * Called when a client wants to create a new set. This is typically
1005 * the first request from a client, and includes the type of set
1006 * operation to be performed.
1007 *
1008 * @param cls client that sent the message
1009 * @param m message sent by the client
1010 */
1011static void
1012handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg)
1013{
1014 struct ClientState *cs = cls;
1015 struct Set *set;
1016
1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018 "Client created new set (operation %u)\n",
1019 (uint32_t) ntohl (msg->operation));
1020 if (NULL != cs->set)
1021 {
1022 /* There can only be one set per client */
1023 GNUNET_break (0);
1024 GNUNET_SERVICE_client_drop (cs->client);
1025 return;
1026 }
1027 set = GNUNET_new (struct Set);
1028 switch (ntohl (msg->operation))
1029 {
1030 case GNUNET_SET_OPERATION_INTERSECTION:
1031 set->vt = _GSS_intersection_vt ();
1032 break;
1033
1034 case GNUNET_SET_OPERATION_UNION:
1035 set->vt = _GSS_union_vt ();
1036 break;
1037
1038 default:
1039 GNUNET_free (set);
1040 GNUNET_break (0);
1041 GNUNET_SERVICE_client_drop (cs->client);
1042 return;
1043 }
1044 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1045 set->state = set->vt->create ();
1046 if (NULL == set->state)
1047 {
1048 /* initialization failed (i.e. out of memory) */
1049 GNUNET_free (set);
1050 GNUNET_SERVICE_client_drop (cs->client);
1051 return;
1052 }
1053 set->content = GNUNET_new (struct SetContent);
1054 set->content->refcount = 1;
1055 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1056 set->cs = cs;
1057 cs->set = set;
1058 GNUNET_SERVICE_client_continue (cs->client);
1059}
1060
1061
1062/**
1063 * Timeout happens iff:
1064 * - we suggested an operation to our listener,
1065 * but did not receive a response in time
1066 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
1067 *
1068 * @param cls channel context
1069 * @param tc context information (why was this task triggered now)
1070 */
1071static void
1072incoming_timeout_cb (void *cls)
1073{
1074 struct Operation *op = cls;
1075
1076 op->timeout_task = NULL;
1077 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078 "Remote peer's incoming request timed out\n");
1079 incoming_destroy (op);
1080}
1081
1082
1083/**
1084 * Method called whenever another peer has added us to a channel the
1085 * other peer initiated. Only called (once) upon reception of data
1086 * from a channel we listen on.
1087 *
1088 * The channel context represents the operation itself and gets added
1089 * to a DLL, from where it gets looked up when our local listener
1090 * client responds to a proposed/suggested operation or connects and
1091 * associates with this operation.
1092 *
1093 * @param cls closure
1094 * @param channel new handle to the channel
1095 * @param source peer that started the channel
1096 * @return initial channel context for the channel
1097 * returns NULL on error
1098 */
1099static void *
1100channel_new_cb (void *cls,
1101 struct GNUNET_CADET_Channel *channel,
1102 const struct GNUNET_PeerIdentity *source)
1103{
1104 struct Listener *listener = cls;
1105 struct Operation *op;
1106
1107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1108 op = GNUNET_new (struct Operation);
1109 op->listener = listener;
1110 op->peer = *source;
1111 op->channel = channel;
1112 op->mq = GNUNET_CADET_get_mq (op->channel);
1113 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1114 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1115 &incoming_timeout_cb,
1116 op);
1117 GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op);
1118 return op;
1119}
1120
1121
1122/**
1123 * Function called whenever a channel is destroyed. Should clean up
1124 * any associated state. It must NOT call
1125 * GNUNET_CADET_channel_destroy() on the channel.
1126 *
1127 * The peer_disconnect function is part of a a virtual table set initially either
1128 * when a peer creates a new channel with us, or once we create
1129 * a new channel ourselves (evaluate).
1130 *
1131 * Once we know the exact type of operation (union/intersection), the vt is
1132 * replaced with an operation specific instance (_GSS_[op]_vt).
1133 *
1134 * @param channel_ctx place where local state associated
1135 * with the channel is stored
1136 * @param channel connection to the other end (henceforth invalid)
1137 */
1138static void
1139channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1140{
1141 struct Operation *op = channel_ctx;
1142
1143 op->channel = NULL;
1144 _GSS_operation_destroy2 (op);
1145}
1146
1147
1148/**
1149 * This function probably should not exist
1150 * and be replaced by inlining more specific
1151 * logic in the various places where it is called.
1152 */
1153void
1154_GSS_operation_destroy2 (struct Operation *op)
1155{
1156 struct GNUNET_CADET_Channel *channel;
1157
1158 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1159 if (NULL != (channel = op->channel))
1160 {
1161 /* This will free op; called conditionally as this helper function
1162 is also called from within the channel disconnect handler. */
1163 op->channel = NULL;
1164 GNUNET_CADET_channel_destroy (channel);
1165 }
1166 if (NULL != op->listener)
1167 {
1168 incoming_destroy (op);
1169 return;
1170 }
1171 if (NULL != op->set)
1172 op->set->vt->channel_death (op);
1173 else
1174 _GSS_operation_destroy (op, GNUNET_YES);
1175 GNUNET_free (op);
1176}
1177
1178
1179/**
1180 * Function called whenever an MQ-channel's transmission window size changes.
1181 *
1182 * The first callback in an outgoing channel will be with a non-zero value
1183 * and will mean the channel is connected to the destination.
1184 *
1185 * For an incoming channel it will be called immediately after the
1186 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
1187 *
1188 * @param cls Channel closure.
1189 * @param channel Connection to the other end (henceforth invalid).
1190 * @param window_size New window size. If the is more messages than buffer size
1191 * this value will be negative..
1192 */
1193static void
1194channel_window_cb (void *cls,
1195 const struct GNUNET_CADET_Channel *channel,
1196 int window_size)
1197{
1198 /* FIXME: not implemented, we could do flow control here... */
1199}
1200
1201
1202/**
1203 * Called when a client wants to create a new listener.
1204 *
1205 * @param cls client that sent the message
1206 * @param msg message sent by the client
1207 */
1208static void
1209handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg)
1210{
1211 struct ClientState *cs = cls;
1212 struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1213 { GNUNET_MQ_hd_var_size (incoming_msg,
1214 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1215 struct OperationRequestMessage,
1216 NULL),
1217 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1218 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1219 struct IBFMessage,
1220 NULL),
1221 GNUNET_MQ_hd_var_size (union_p2p_elements,
1222 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1223 struct GNUNET_SET_ElementMessage,
1224 NULL),
1225 GNUNET_MQ_hd_var_size (union_p2p_offer,
1226 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1227 struct GNUNET_MessageHeader,
1228 NULL),
1229 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1230 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1231 struct InquiryMessage,
1232 NULL),
1233 GNUNET_MQ_hd_var_size (union_p2p_demand,
1234 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1235 struct GNUNET_MessageHeader,
1236 NULL),
1237 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1238 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1239 struct GNUNET_MessageHeader,
1240 NULL),
1241 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1242 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1243 struct GNUNET_MessageHeader,
1244 NULL),
1245 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1246 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1247 struct GNUNET_MessageHeader,
1248 NULL),
1249 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1250 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1251 struct GNUNET_MessageHeader,
1252 NULL),
1253 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1254 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1255 struct StrataEstimatorMessage,
1256 NULL),
1257 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1258 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1259 struct StrataEstimatorMessage,
1260 NULL),
1261 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1262 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1263 struct GNUNET_SET_ElementMessage,
1264 NULL),
1265 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1266 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1267 struct IntersectionElementInfoMessage,
1268 NULL),
1269 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1270 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1271 struct BFMessage,
1272 NULL),
1273 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1274 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1275 struct IntersectionDoneMessage,
1276 NULL),
1277 GNUNET_MQ_handler_end () };
1278 struct Listener *listener;
1279
1280 if (NULL != cs->listener)
1281 {
1282 /* max. one active listener per client! */
1283 GNUNET_break (0);
1284 GNUNET_SERVICE_client_drop (cs->client);
1285 return;
1286 }
1287 listener = GNUNET_new (struct Listener);
1288 listener->cs = cs;
1289 cs->listener = listener;
1290 listener->app_id = msg->app_id;
1291 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1292 GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener);
1293 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294 "New listener created (op %u, port %s)\n",
1295 listener->operation,
1296 GNUNET_h2s (&listener->app_id));
1297 listener->open_port = GNUNET_CADET_open_port (cadet,
1298 &msg->app_id,
1299 &channel_new_cb,
1300 listener,
1301 &channel_window_cb,
1302 &channel_end_cb,
1303 cadet_handlers);
1304 GNUNET_SERVICE_client_continue (cs->client);
1305}
1306
1307
1308/**
1309 * Called when the listening client rejects an operation
1310 * request by another peer.
1311 *
1312 * @param cls client that sent the message
1313 * @param msg message sent by the client
1314 */
1315static void
1316handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg)
1317{
1318 struct ClientState *cs = cls;
1319 struct Operation *op;
1320
1321 op = get_incoming (ntohl (msg->accept_reject_id));
1322 if (NULL == op)
1323 {
1324 /* no matching incoming operation for this reject;
1325 could be that the other peer already disconnected... */
1326 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1327 "Client rejected unknown operation %u\n",
1328 (unsigned int) ntohl (msg->accept_reject_id));
1329 GNUNET_SERVICE_client_continue (cs->client);
1330 return;
1331 }
1332 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1333 "Peer request (op %u, app %s) rejected by client\n",
1334 op->listener->operation,
1335 GNUNET_h2s (&cs->listener->app_id));
1336 _GSS_operation_destroy2 (op);
1337 GNUNET_SERVICE_client_continue (cs->client);
1338}
1339
1340
1341/**
1342 * Called when a client wants to add or remove an element to a set it inhabits.
1343 *
1344 * @param cls client that sent the message
1345 * @param msg message sent by the client
1346 */
1347static int
1348check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1349{
1350 /* NOTE: Technically, we should probably check with the
1351 block library whether the element we are given is well-formed */
1352 return GNUNET_OK;
1353}
1354
1355
1356/**
1357 * Called when a client wants to add or remove an element to a set it inhabits.
1358 *
1359 * @param cls client that sent the message
1360 * @param msg message sent by the client
1361 */
1362static void
1363handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
1364{
1365 struct ClientState *cs = cls;
1366 struct Set *set;
1367
1368 if (NULL == (set = cs->set))
1369 {
1370 /* client without a set requested an operation */
1371 GNUNET_break (0);
1372 GNUNET_SERVICE_client_drop (cs->client);
1373 return;
1374 }
1375 GNUNET_SERVICE_client_continue (cs->client);
1376
1377 if (0 != set->content->iterator_count)
1378 {
1379 struct PendingMutation *pm;
1380
1381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1382 pm = GNUNET_new (struct PendingMutation);
1383 pm->msg =
1384 (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1385 pm->set = set;
1386 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1387 set->content->pending_mutations_tail,
1388 pm);
1389 return;
1390 }
1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1392 execute_mutation (set, msg);
1393}
1394
1395
1396/**
1397 * Advance the current generation of a set,
1398 * adding exclusion ranges if necessary.
1399 *
1400 * @param set the set where we want to advance the generation
1401 */
1402static void
1403advance_generation (struct Set *set)
1404{
1405 struct GenerationRange r;
1406
1407 if (set->current_generation == set->content->latest_generation)
1408 {
1409 set->content->latest_generation++;
1410 set->current_generation++;
1411 return;
1412 }
1413
1414 GNUNET_assert (set->current_generation < set->content->latest_generation);
1415
1416 r.start = set->current_generation + 1;
1417 r.end = set->content->latest_generation + 1;
1418 set->content->latest_generation = r.end;
1419 set->current_generation = r.end;
1420 GNUNET_array_append (set->excluded_generations,
1421 set->excluded_generations_size,
1422 r);
1423}
1424
1425
1426/**
1427 * Called when a client wants to initiate a set operation with another
1428 * peer. Initiates the CADET connection to the listener and sends the
1429 * request.
1430 *
1431 * @param cls client that sent the message
1432 * @param msg message sent by the client
1433 * @return #GNUNET_OK if the message is well-formed
1434 */
1435static int
1436check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1437{
1438 /* FIXME: suboptimal, even if the context below could be NULL,
1439 there are malformed messages this does not check for... */
1440 return GNUNET_OK;
1441}
1442
1443
1444/**
1445 * Called when a client wants to initiate a set operation with another
1446 * peer. Initiates the CADET connection to the listener and sends the
1447 * request.
1448 *
1449 * @param cls client that sent the message
1450 * @param msg message sent by the client
1451 */
1452static void
1453handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
1454{
1455 struct ClientState *cs = cls;
1456 struct Operation *op = GNUNET_new (struct Operation);
1457 const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1458 { GNUNET_MQ_hd_var_size (incoming_msg,
1459 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1460 struct OperationRequestMessage,
1461 op),
1462 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1463 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1464 struct IBFMessage,
1465 op),
1466 GNUNET_MQ_hd_var_size (union_p2p_elements,
1467 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1468 struct GNUNET_SET_ElementMessage,
1469 op),
1470 GNUNET_MQ_hd_var_size (union_p2p_offer,
1471 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1472 struct GNUNET_MessageHeader,
1473 op),
1474 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1475 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1476 struct InquiryMessage,
1477 op),
1478 GNUNET_MQ_hd_var_size (union_p2p_demand,
1479 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1480 struct GNUNET_MessageHeader,
1481 op),
1482 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1483 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1484 struct GNUNET_MessageHeader,
1485 op),
1486 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1487 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
1488 struct GNUNET_MessageHeader,
1489 op),
1490 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1491 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1492 struct GNUNET_MessageHeader,
1493 op),
1494 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1495 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1496 struct GNUNET_MessageHeader,
1497 op),
1498 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1499 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1500 struct StrataEstimatorMessage,
1501 op),
1502 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1503 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1504 struct StrataEstimatorMessage,
1505 op),
1506 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1507 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1508 struct GNUNET_SET_ElementMessage,
1509 op),
1510 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1511 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1512 struct IntersectionElementInfoMessage,
1513 op),
1514 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1515 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1516 struct BFMessage,
1517 op),
1518 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1519 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1520 struct IntersectionDoneMessage,
1521 op),
1522 GNUNET_MQ_handler_end () };
1523 struct Set *set;
1524 const struct GNUNET_MessageHeader *context;
1525
1526 if (NULL == (set = cs->set))
1527 {
1528 GNUNET_break (0);
1529 GNUNET_free (op);
1530 GNUNET_SERVICE_client_drop (cs->client);
1531 return;
1532 }
1533 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1534 op->peer = msg->target_peer;
1535 op->result_mode = ntohl (msg->result_mode);
1536 op->client_request_id = ntohl (msg->request_id);
1537 op->byzantine = msg->byzantine;
1538 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1539 op->force_full = msg->force_full;
1540 op->force_delta = msg->force_delta;
1541 context = GNUNET_MQ_extract_nested_mh (msg);
1542
1543 /* Advance generation values, so that
1544 mutations won't interfer with the running operation. */
1545 op->set = set;
1546 op->generation_created = set->current_generation;
1547 advance_generation (set);
1548 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550 "Creating new CADET channel to port %s for set operation type %u\n",
1551 GNUNET_h2s (&msg->app_id),
1552 set->operation);
1553 op->channel = GNUNET_CADET_channel_create (cadet,
1554 op,
1555 &msg->target_peer,
1556 &msg->app_id,
1557 &channel_window_cb,
1558 &channel_end_cb,
1559 cadet_handlers);
1560 op->mq = GNUNET_CADET_get_mq (op->channel);
1561 op->state = set->vt->evaluate (op, context);
1562 if (NULL == op->state)
1563 {
1564 GNUNET_break (0);
1565 GNUNET_SERVICE_client_drop (cs->client);
1566 return;
1567 }
1568 GNUNET_SERVICE_client_continue (cs->client);
1569}
1570
1571
1572/**
1573 * Handle an ack from a client, and send the next element. Note
1574 * that we only expect acks for set elements, not after the
1575 * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
1576 *
1577 * @param cls client the client
1578 * @param ack the message
1579 */
1580static void
1581handle_client_iter_ack (void *cls, const struct GNUNET_SET_IterAckMessage *ack)
1582{
1583 struct ClientState *cs = cls;
1584 struct Set *set;
1585
1586 if (NULL == (set = cs->set))
1587 {
1588 /* client without a set acknowledged receiving a value */
1589 GNUNET_break (0);
1590 GNUNET_SERVICE_client_drop (cs->client);
1591 return;
1592 }
1593 if (NULL == set->iter)
1594 {
1595 /* client sent an ack, but we were not expecting one (as
1596 set iteration has finished) */
1597 GNUNET_break (0);
1598 GNUNET_SERVICE_client_drop (cs->client);
1599 return;
1600 }
1601 GNUNET_SERVICE_client_continue (cs->client);
1602 if (ntohl (ack->send_more))
1603 {
1604 send_client_element (set);
1605 }
1606 else
1607 {
1608 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1609 set->iter = NULL;
1610 set->iteration_id++;
1611 }
1612}
1613
1614
1615/**
1616 * Handle a request from the client to copy a set.
1617 *
1618 * @param cls the client
1619 * @param mh the message
1620 */
1621static void
1622handle_client_copy_lazy_prepare (void *cls,
1623 const struct GNUNET_MessageHeader *mh)
1624{
1625 struct ClientState *cs = cls;
1626 struct Set *set;
1627 struct LazyCopyRequest *cr;
1628 struct GNUNET_MQ_Envelope *ev;
1629 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1630
1631 if (NULL == (set = cs->set))
1632 {
1633 /* client without a set requested an operation */
1634 GNUNET_break (0);
1635 GNUNET_SERVICE_client_drop (cs->client);
1636 return;
1637 }
1638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1639 "Client requested creation of lazy copy\n");
1640 cr = GNUNET_new (struct LazyCopyRequest);
1641 cr->cookie = ++lazy_copy_cookie;
1642 cr->source_set = set;
1643 GNUNET_CONTAINER_DLL_insert (lazy_copy_head, lazy_copy_tail, cr);
1644 ev = GNUNET_MQ_msg (resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1645 resp_msg->cookie = cr->cookie;
1646 GNUNET_MQ_send (set->cs->mq, ev);
1647 GNUNET_SERVICE_client_continue (cs->client);
1648}
1649
1650
1651/**
1652 * Handle a request from the client to connect to a copy of a set.
1653 *
1654 * @param cls the client
1655 * @param msg the message
1656 */
1657static void
1658handle_client_copy_lazy_connect (
1659 void *cls,
1660 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1661{
1662 struct ClientState *cs = cls;
1663 struct LazyCopyRequest *cr;
1664 struct Set *set;
1665 int found;
1666
1667 if (NULL != cs->set)
1668 {
1669 /* There can only be one set per client */
1670 GNUNET_break (0);
1671 GNUNET_SERVICE_client_drop (cs->client);
1672 return;
1673 }
1674 found = GNUNET_NO;
1675 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1676 {
1677 if (cr->cookie == msg->cookie)
1678 {
1679 found = GNUNET_YES;
1680 break;
1681 }
1682 }
1683 if (GNUNET_NO == found)
1684 {
1685 /* client asked for copy with cookie we don't know */
1686 GNUNET_break (0);
1687 GNUNET_SERVICE_client_drop (cs->client);
1688 return;
1689 }
1690 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, lazy_copy_tail, cr);
1691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1692 "Client %p requested use of lazy copy\n",
1693 cs);
1694 set = GNUNET_new (struct Set);
1695 switch (cr->source_set->operation)
1696 {
1697 case GNUNET_SET_OPERATION_INTERSECTION:
1698 set->vt = _GSS_intersection_vt ();
1699 break;
1700
1701 case GNUNET_SET_OPERATION_UNION:
1702 set->vt = _GSS_union_vt ();
1703 break;
1704
1705 default:
1706 GNUNET_assert (0);
1707 return;
1708 }
1709
1710 if (NULL == set->vt->copy_state)
1711 {
1712 /* Lazy copy not supported for this set operation */
1713 GNUNET_break (0);
1714 GNUNET_free (set);
1715 GNUNET_free (cr);
1716 GNUNET_SERVICE_client_drop (cs->client);
1717 return;
1718 }
1719
1720 set->operation = cr->source_set->operation;
1721 set->state = set->vt->copy_state (cr->source_set->state);
1722 set->content = cr->source_set->content;
1723 set->content->refcount++;
1724
1725 set->current_generation = cr->source_set->current_generation;
1726 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1727 set->excluded_generations =
1728 GNUNET_memdup (cr->source_set->excluded_generations,
1729 set->excluded_generations_size
1730 * sizeof(struct GenerationRange));
1731
1732 /* Advance the generation of the new set, so that mutations to the
1733 of the cloned set and the source set are independent. */
1734 advance_generation (set);
1735 set->cs = cs;
1736 cs->set = set;
1737 GNUNET_free (cr);
1738 GNUNET_SERVICE_client_continue (cs->client);
1739}
1740
1741
1742/**
1743 * Handle a request from the client to cancel a running set operation.
1744 *
1745 * @param cls the client
1746 * @param msg the message
1747 */
1748static void
1749handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg)
1750{
1751 struct ClientState *cs = cls;
1752 struct Set *set;
1753 struct Operation *op;
1754 int found;
1755
1756 if (NULL == (set = cs->set))
1757 {
1758 /* client without a set requested an operation */
1759 GNUNET_break (0);
1760 GNUNET_SERVICE_client_drop (cs->client);
1761 return;
1762 }
1763 found = GNUNET_NO;
1764 for (op = set->ops_head; NULL != op; op = op->next)
1765 {
1766 if (op->client_request_id == ntohl (msg->request_id))
1767 {
1768 found = GNUNET_YES;
1769 break;
1770 }
1771 }
1772 if (GNUNET_NO == found)
1773 {
1774 /* It may happen that the operation was already destroyed due to
1775 * the other peer disconnecting. The client may not know about this
1776 * yet and try to cancel the (just barely non-existent) operation.
1777 * So this is not a hard error.
1778 */GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1779 "Client canceled non-existent op %u\n",
1780 (uint32_t) ntohl (msg->request_id));
1781 }
1782 else
1783 {
1784 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785 "Client requested cancel for op %u\n",
1786 (uint32_t) ntohl (msg->request_id));
1787 _GSS_operation_destroy (op, GNUNET_YES);
1788 }
1789 GNUNET_SERVICE_client_continue (cs->client);
1790}
1791
1792
1793/**
1794 * Handle a request from the client to accept a set operation that
1795 * came from a remote peer. We forward the accept to the associated
1796 * operation for handling
1797 *
1798 * @param cls the client
1799 * @param msg the message
1800 */
1801static void
1802handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg)
1803{
1804 struct ClientState *cs = cls;
1805 struct Set *set;
1806 struct Operation *op;
1807 struct GNUNET_SET_ResultMessage *result_message;
1808 struct GNUNET_MQ_Envelope *ev;
1809 struct Listener *listener;
1810
1811 if (NULL == (set = cs->set))
1812 {
1813 /* client without a set requested to accept */
1814 GNUNET_break (0);
1815 GNUNET_SERVICE_client_drop (cs->client);
1816 return;
1817 }
1818 op = get_incoming (ntohl (msg->accept_reject_id));
1819 if (NULL == op)
1820 {
1821 /* It is not an error if the set op does not exist -- it may
1822 * have been destroyed when the partner peer disconnected. */
1823 GNUNET_log (
1824 GNUNET_ERROR_TYPE_INFO,
1825 "Client %p accepted request %u of listener %p that is no longer active\n",
1826 cs,
1827 ntohl (msg->accept_reject_id),
1828 cs->listener);
1829 ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1830 result_message->request_id = msg->request_id;
1831 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1832 GNUNET_MQ_send (set->cs->mq, ev);
1833 GNUNET_SERVICE_client_continue (cs->client);
1834 return;
1835 }
1836 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837 "Client accepting request %u\n",
1838 (uint32_t) ntohl (msg->accept_reject_id));
1839 listener = op->listener;
1840 op->listener = NULL;
1841 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
1842 op->set = set;
1843 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1844 op->client_request_id = ntohl (msg->request_id);
1845 op->result_mode = ntohl (msg->result_mode);
1846 op->byzantine = msg->byzantine;
1847 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1848 op->force_full = msg->force_full;
1849 op->force_delta = msg->force_delta;
1850
1851 /* Advance generation values, so that future mutations do not
1852 interfer with the running operation. */
1853 op->generation_created = set->current_generation;
1854 advance_generation (set);
1855 GNUNET_assert (NULL == op->state);
1856 op->state = set->vt->accept (op);
1857 if (NULL == op->state)
1858 {
1859 GNUNET_break (0);
1860 GNUNET_SERVICE_client_drop (cs->client);
1861 return;
1862 }
1863 /* Now allow CADET to continue, as we did not do this in
1864 #handle_incoming_msg (as we wanted to first see if the
1865 local client would accept the request). */
1866 GNUNET_CADET_receive_done (op->channel);
1867 GNUNET_SERVICE_client_continue (cs->client);
1868}
1869
1870
1871/**
1872 * Called to clean up, after a shutdown has been requested.
1873 *
1874 * @param cls closure, NULL
1875 */
1876static void
1877shutdown_task (void *cls)
1878{
1879 /* Delay actual shutdown to allow service to disconnect clients */
1880 in_shutdown = GNUNET_YES;
1881 if (0 == num_clients)
1882 {
1883 if (NULL != cadet)
1884 {
1885 GNUNET_CADET_disconnect (cadet);
1886 cadet = NULL;
1887 }
1888 }
1889 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
1890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1891}
1892
1893
1894/**
1895 * Function called by the service's run
1896 * method to run service-specific setup code.
1897 *
1898 * @param cls closure
1899 * @param cfg configuration to use
1900 * @param service the initialized service
1901 */
1902static void
1903run (void *cls,
1904 const struct GNUNET_CONFIGURATION_Handle *cfg,
1905 struct GNUNET_SERVICE_Handle *service)
1906{
1907 /* FIXME: need to modify SERVICE (!) API to allow
1908 us to run a shutdown task *after* clients were
1909 forcefully disconnected! */
1910 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
1911 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
1912 cadet = GNUNET_CADET_connect (cfg);
1913 if (NULL == cadet)
1914 {
1915 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1916 _ ("Could not connect to CADET service\n"));
1917 GNUNET_SCHEDULER_shutdown ();
1918 return;
1919 }
1920}
1921
1922
1923/**
1924 * Define "main" method using service macro.
1925 */
1926GNUNET_SERVICE_MAIN (
1927 "set",
1928 GNUNET_SERVICE_OPTION_NONE,
1929 &run,
1930 &client_connect_cb,
1931 &client_disconnect_cb,
1932 NULL,
1933 GNUNET_MQ_hd_fixed_size (client_accept,
1934 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1935 struct GNUNET_SET_AcceptMessage,
1936 NULL),
1937 GNUNET_MQ_hd_fixed_size (client_iter_ack,
1938 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1939 struct GNUNET_SET_IterAckMessage,
1940 NULL),
1941 GNUNET_MQ_hd_var_size (client_mutation,
1942 GNUNET_MESSAGE_TYPE_SET_ADD,
1943 struct GNUNET_SET_ElementMessage,
1944 NULL),
1945 GNUNET_MQ_hd_fixed_size (client_create_set,
1946 GNUNET_MESSAGE_TYPE_SET_CREATE,
1947 struct GNUNET_SET_CreateMessage,
1948 NULL),
1949 GNUNET_MQ_hd_fixed_size (client_iterate,
1950 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1951 struct GNUNET_MessageHeader,
1952 NULL),
1953 GNUNET_MQ_hd_var_size (client_evaluate,
1954 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1955 struct GNUNET_SET_EvaluateMessage,
1956 NULL),
1957 GNUNET_MQ_hd_fixed_size (client_listen,
1958 GNUNET_MESSAGE_TYPE_SET_LISTEN,
1959 struct GNUNET_SET_ListenMessage,
1960 NULL),
1961 GNUNET_MQ_hd_fixed_size (client_reject,
1962 GNUNET_MESSAGE_TYPE_SET_REJECT,
1963 struct GNUNET_SET_RejectMessage,
1964 NULL),
1965 GNUNET_MQ_hd_var_size (client_mutation,
1966 GNUNET_MESSAGE_TYPE_SET_REMOVE,
1967 struct GNUNET_SET_ElementMessage,
1968 NULL),
1969 GNUNET_MQ_hd_fixed_size (client_cancel,
1970 GNUNET_MESSAGE_TYPE_SET_CANCEL,
1971 struct GNUNET_SET_CancelMessage,
1972 NULL),
1973 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
1974 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1975 struct GNUNET_MessageHeader,
1976 NULL),
1977 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
1978 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1979 struct GNUNET_SET_CopyLazyConnectMessage,
1980 NULL),
1981 GNUNET_MQ_handler_end ());
1982
1983
1984/* end of gnunet-service-set.c */