diff options
Diffstat (limited to 'src/set/gnunet-service-set.c')
-rw-r--r-- | src/set/gnunet-service-set.c | 1984 |
1 files changed, 0 insertions, 1984 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c deleted file mode 100644 index 2b859d81a..000000000 --- a/src/set/gnunet-service-set.c +++ /dev/null | |||
@@ -1,1984 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | Copyright (C) 2013-2017 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | /** | ||
21 | * @file set/gnunet-service-set.c | ||
22 | * @brief two-peer set operations | ||
23 | * @author Florian Dold | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "gnunet-service-set.h" | ||
27 | #include "gnunet-service-set_union.h" | ||
28 | #include "gnunet-service-set_intersection.h" | ||
29 | #include "gnunet-service-set_protocol.h" | ||
30 | #include "gnunet_statistics_service.h" | ||
31 | |||
32 | /** | ||
33 | * How long do we hold on to an incoming channel if there is | ||
34 | * no local listener before giving up? | ||
35 | */ | ||
36 | #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES | ||
37 | |||
38 | |||
39 | /** | ||
40 | * Lazy copy requests made by a client. | ||
41 | */ | ||
42 | struct LazyCopyRequest | ||
43 | { | ||
44 | /** | ||
45 | * Kept in a DLL. | ||
46 | */ | ||
47 | struct LazyCopyRequest *prev; | ||
48 | |||
49 | /** | ||
50 | * Kept in a DLL. | ||
51 | */ | ||
52 | struct LazyCopyRequest *next; | ||
53 | |||
54 | /** | ||
55 | * Which set are we supposed to copy? | ||
56 | */ | ||
57 | struct Set *source_set; | ||
58 | |||
59 | /** | ||
60 | * Cookie identifying the request. | ||
61 | */ | ||
62 | uint32_t cookie; | ||
63 | }; | ||
64 | |||
65 | |||
66 | /** | ||
67 | * A listener is inhabited by a client, and waits for evaluation | ||
68 | * requests from remote peers. | ||
69 | */ | ||
70 | struct Listener | ||
71 | { | ||
72 | /** | ||
73 | * Listeners are held in a doubly linked list. | ||
74 | */ | ||
75 | struct Listener *next; | ||
76 | |||
77 | /** | ||
78 | * Listeners are held in a doubly linked list. | ||
79 | */ | ||
80 | struct Listener *prev; | ||
81 | |||
82 | /** | ||
83 | * Head of DLL of operations this listener is responsible for. | ||
84 | * Once the client has accepted/declined the operation, the | ||
85 | * operation is moved to the respective set's operation DLLS. | ||
86 | */ | ||
87 | struct Operation *op_head; | ||
88 | |||
89 | /** | ||
90 | * Tail of DLL of operations this listener is responsible for. | ||
91 | * Once the client has accepted/declined the operation, the | ||
92 | * operation is moved to the respective set's operation DLLS. | ||
93 | */ | ||
94 | struct Operation *op_tail; | ||
95 | |||
96 | /** | ||
97 | * Client that owns the listener. | ||
98 | * Only one client may own a listener. | ||
99 | */ | ||
100 | struct ClientState *cs; | ||
101 | |||
102 | /** | ||
103 | * The port we are listening on with CADET. | ||
104 | */ | ||
105 | struct GNUNET_CADET_Port *open_port; | ||
106 | |||
107 | /** | ||
108 | * Application ID for the operation, used to distinguish | ||
109 | * multiple operations of the same type with the same peer. | ||
110 | */ | ||
111 | struct GNUNET_HashCode app_id; | ||
112 | |||
113 | /** | ||
114 | * The type of the operation. | ||
115 | */ | ||
116 | enum GNUNET_SET_OperationType operation; | ||
117 | }; | ||
118 | |||
119 | |||
120 | /** | ||
121 | * Handle to the cadet service, used to listen for and connect to | ||
122 | * remote peers. | ||
123 | */ | ||
124 | static struct GNUNET_CADET_Handle *cadet; | ||
125 | |||
126 | /** | ||
127 | * DLL of lazy copy requests by this client. | ||
128 | */ | ||
129 | static struct LazyCopyRequest *lazy_copy_head; | ||
130 | |||
131 | /** | ||
132 | * DLL of lazy copy requests by this client. | ||
133 | */ | ||
134 | static struct LazyCopyRequest *lazy_copy_tail; | ||
135 | |||
136 | /** | ||
137 | * Generator for unique cookie we set per lazy copy request. | ||
138 | */ | ||
139 | static uint32_t lazy_copy_cookie; | ||
140 | |||
141 | /** | ||
142 | * Statistics handle. | ||
143 | */ | ||
144 | struct GNUNET_STATISTICS_Handle *_GSS_statistics; | ||
145 | |||
146 | /** | ||
147 | * Listeners are held in a doubly linked list. | ||
148 | */ | ||
149 | static struct Listener *listener_head; | ||
150 | |||
151 | /** | ||
152 | * Listeners are held in a doubly linked list. | ||
153 | */ | ||
154 | static struct Listener *listener_tail; | ||
155 | |||
156 | /** | ||
157 | * Number of active clients. | ||
158 | */ | ||
159 | static unsigned int num_clients; | ||
160 | |||
161 | /** | ||
162 | * Are we in shutdown? if #GNUNET_YES and the number of clients | ||
163 | * drops to zero, disconnect from CADET. | ||
164 | */ | ||
165 | static int in_shutdown; | ||
166 | |||
167 | /** | ||
168 | * Counter for allocating unique IDs for clients, used to identify | ||
169 | * incoming operation requests from remote peers, that the client can | ||
170 | * choose to accept or refuse. 0 must not be used (reserved for | ||
171 | * uninitialized). | ||
172 | */ | ||
173 | static uint32_t suggest_id; | ||
174 | |||
175 | |||
176 | /** | ||
177 | * Get the incoming socket associated with the given id. | ||
178 | * | ||
179 | * @param listener the listener to look in | ||
180 | * @param id id to look for | ||
181 | * @return the incoming socket associated with the id, | ||
182 | * or NULL if there is none | ||
183 | */ | ||
184 | static struct Operation * | ||
185 | get_incoming (uint32_t id) | ||
186 | { | ||
187 | for (struct Listener *listener = listener_head; NULL != listener; | ||
188 | listener = listener->next) | ||
189 | { | ||
190 | for (struct Operation *op = listener->op_head; NULL != op; op = op->next) | ||
191 | if (op->suggest_id == id) | ||
192 | return op; | ||
193 | } | ||
194 | return NULL; | ||
195 | } | ||
196 | |||
197 | |||
198 | /** | ||
199 | * Destroy an incoming request from a remote peer | ||
200 | * | ||
201 | * @param op remote request to destroy | ||
202 | */ | ||
203 | static void | ||
204 | incoming_destroy (struct Operation *op) | ||
205 | { | ||
206 | struct Listener *listener; | ||
207 | |||
208 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
209 | "Destroying incoming operation %p\n", | ||
210 | op); | ||
211 | if (NULL != (listener = op->listener)) | ||
212 | { | ||
213 | GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); | ||
214 | op->listener = NULL; | ||
215 | } | ||
216 | if (NULL != op->timeout_task) | ||
217 | { | ||
218 | GNUNET_SCHEDULER_cancel (op->timeout_task); | ||
219 | op->timeout_task = NULL; | ||
220 | } | ||
221 | _GSS_operation_destroy2 (op); | ||
222 | } | ||
223 | |||
224 | |||
225 | /** | ||
226 | * Context for the #garbage_collect_cb(). | ||
227 | */ | ||
228 | struct GarbageContext | ||
229 | { | ||
230 | /** | ||
231 | * Map for which we are garbage collecting removed elements. | ||
232 | */ | ||
233 | struct GNUNET_CONTAINER_MultiHashMap *map; | ||
234 | |||
235 | /** | ||
236 | * Lowest generation for which an operation is still pending. | ||
237 | */ | ||
238 | unsigned int min_op_generation; | ||
239 | |||
240 | /** | ||
241 | * Largest generation for which an operation is still pending. | ||
242 | */ | ||
243 | unsigned int max_op_generation; | ||
244 | }; | ||
245 | |||
246 | |||
247 | /** | ||
248 | * Function invoked to check if an element can be removed from | ||
249 | * the set's history because it is no longer needed. | ||
250 | * | ||
251 | * @param cls the `struct GarbageContext *` | ||
252 | * @param key key of the element in the map | ||
253 | * @param value the `struct ElementEntry *` | ||
254 | * @return #GNUNET_OK (continue to iterate) | ||
255 | */ | ||
256 | static int | ||
257 | garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
258 | { | ||
259 | // struct GarbageContext *gc = cls; | ||
260 | // struct ElementEntry *ee = value; | ||
261 | |||
262 | // if (GNUNET_YES != ee->removed) | ||
263 | // return GNUNET_OK; | ||
264 | // if ( (gc->max_op_generation < ee->generation_added) || | ||
265 | // (ee->generation_removed > gc->min_op_generation) ) | ||
266 | // { | ||
267 | // GNUNET_assert (GNUNET_YES == | ||
268 | // GNUNET_CONTAINER_multihashmap_remove (gc->map, | ||
269 | // key, | ||
270 | // ee)); | ||
271 | // GNUNET_free (ee); | ||
272 | // } | ||
273 | return GNUNET_OK; | ||
274 | } | ||
275 | |||
276 | |||
277 | /** | ||
278 | * Collect and destroy elements that are not needed anymore, because | ||
279 | * their lifetime (as determined by their generation) does not overlap | ||
280 | * with any active set operation. | ||
281 | * | ||
282 | * @param set set to garbage collect | ||
283 | */ | ||
284 | static void | ||
285 | collect_generation_garbage (struct Set *set) | ||
286 | { | ||
287 | struct GarbageContext gc; | ||
288 | |||
289 | gc.min_op_generation = UINT_MAX; | ||
290 | gc.max_op_generation = 0; | ||
291 | for (struct Operation *op = set->ops_head; NULL != op; op = op->next) | ||
292 | { | ||
293 | gc.min_op_generation = | ||
294 | GNUNET_MIN (gc.min_op_generation, op->generation_created); | ||
295 | gc.max_op_generation = | ||
296 | GNUNET_MAX (gc.max_op_generation, op->generation_created); | ||
297 | } | ||
298 | gc.map = set->content->elements; | ||
299 | GNUNET_CONTAINER_multihashmap_iterate (set->content->elements, | ||
300 | &garbage_collect_cb, | ||
301 | &gc); | ||
302 | } | ||
303 | |||
304 | |||
305 | /** | ||
306 | * Is @a generation in the range of exclusions? | ||
307 | * | ||
308 | * @param generation generation to query | ||
309 | * @param excluded array of generations where the element is excluded | ||
310 | * @param excluded_size length of the @a excluded array | ||
311 | * @return #GNUNET_YES if @a generation is in any of the ranges | ||
312 | */ | ||
313 | static int | ||
314 | is_excluded_generation (unsigned int generation, | ||
315 | struct GenerationRange *excluded, | ||
316 | unsigned int excluded_size) | ||
317 | { | ||
318 | for (unsigned int i = 0; i < excluded_size; i++) | ||
319 | if ((generation >= excluded[i].start) && (generation < excluded[i].end)) | ||
320 | return GNUNET_YES; | ||
321 | return GNUNET_NO; | ||
322 | } | ||
323 | |||
324 | |||
325 | /** | ||
326 | * Is element @a ee part of the set during @a query_generation? | ||
327 | * | ||
328 | * @param ee element to test | ||
329 | * @param query_generation generation to query | ||
330 | * @param excluded array of generations where the element is excluded | ||
331 | * @param excluded_size length of the @a excluded array | ||
332 | * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not | ||
333 | */ | ||
334 | static int | ||
335 | is_element_of_generation (struct ElementEntry *ee, | ||
336 | unsigned int query_generation, | ||
337 | struct GenerationRange *excluded, | ||
338 | unsigned int excluded_size) | ||
339 | { | ||
340 | struct MutationEvent *mut; | ||
341 | int is_present; | ||
342 | |||
343 | GNUNET_assert (NULL != ee->mutations); | ||
344 | if (GNUNET_YES == | ||
345 | is_excluded_generation (query_generation, excluded, excluded_size)) | ||
346 | { | ||
347 | GNUNET_break (0); | ||
348 | return GNUNET_NO; | ||
349 | } | ||
350 | |||
351 | is_present = GNUNET_NO; | ||
352 | |||
353 | /* Could be made faster with binary search, but lists | ||
354 | are small, so why bother. */ | ||
355 | for (unsigned int i = 0; i < ee->mutations_size; i++) | ||
356 | { | ||
357 | mut = &ee->mutations[i]; | ||
358 | |||
359 | if (mut->generation > query_generation) | ||
360 | { | ||
361 | /* The mutation doesn't apply to our generation | ||
362 | anymore. We can'b break here, since mutations aren't | ||
363 | sorted by generation. */ | ||
364 | continue; | ||
365 | } | ||
366 | |||
367 | if (GNUNET_YES == | ||
368 | is_excluded_generation (mut->generation, excluded, excluded_size)) | ||
369 | { | ||
370 | /* The generation is excluded (because it belongs to another | ||
371 | fork via a lazy copy) and thus mutations aren't considered | ||
372 | for membership testing. */ | ||
373 | continue; | ||
374 | } | ||
375 | |||
376 | /* This would be an inconsistency in how we manage mutations. */ | ||
377 | if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added)) | ||
378 | GNUNET_assert (0); | ||
379 | /* Likewise. */ | ||
380 | if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added)) | ||
381 | GNUNET_assert (0); | ||
382 | |||
383 | is_present = mut->added; | ||
384 | } | ||
385 | |||
386 | return is_present; | ||
387 | } | ||
388 | |||
389 | |||
390 | /** | ||
391 | * Is element @a ee part of the set used by @a op? | ||
392 | * | ||
393 | * @param ee element to test | ||
394 | * @param op operation the defines the set and its generation | ||
395 | * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not | ||
396 | */ | ||
397 | int | ||
398 | _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) | ||
399 | { | ||
400 | return is_element_of_generation (ee, | ||
401 | op->generation_created, | ||
402 | op->set->excluded_generations, | ||
403 | op->set->excluded_generations_size); | ||
404 | } | ||
405 | |||
406 | |||
407 | /** | ||
408 | * Destroy the given operation. Used for any operation where both | ||
409 | * peers were known and that thus actually had a vt and channel. Must | ||
410 | * not be used for operations where 'listener' is still set and we do | ||
411 | * not know the other peer. | ||
412 | * | ||
413 | * Call the implementation-specific cancel function of the operation. | ||
414 | * Disconnects from the remote peer. Does not disconnect the client, | ||
415 | * as there may be multiple operations per set. | ||
416 | * | ||
417 | * @param op operation to destroy | ||
418 | * @param gc #GNUNET_YES to perform garbage collection on the set | ||
419 | */ | ||
420 | void | ||
421 | _GSS_operation_destroy (struct Operation *op, int gc) | ||
422 | { | ||
423 | struct Set *set = op->set; | ||
424 | struct GNUNET_CADET_Channel *channel; | ||
425 | |||
426 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op); | ||
427 | GNUNET_assert (NULL == op->listener); | ||
428 | if (NULL != op->state) | ||
429 | { | ||
430 | set->vt->cancel (op); | ||
431 | op->state = NULL; | ||
432 | } | ||
433 | if (NULL != set) | ||
434 | { | ||
435 | GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op); | ||
436 | op->set = NULL; | ||
437 | } | ||
438 | if (NULL != op->context_msg) | ||
439 | { | ||
440 | GNUNET_free (op->context_msg); | ||
441 | op->context_msg = NULL; | ||
442 | } | ||
443 | if (NULL != (channel = op->channel)) | ||
444 | { | ||
445 | /* This will free op; called conditionally as this helper function | ||
446 | is also called from within the channel disconnect handler. */ | ||
447 | op->channel = NULL; | ||
448 | GNUNET_CADET_channel_destroy (channel); | ||
449 | } | ||
450 | if ((NULL != set) && (GNUNET_YES == gc)) | ||
451 | collect_generation_garbage (set); | ||
452 | /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, | ||
453 | * there was a channel end handler that will free 'op' on the call stack. */ | ||
454 | } | ||
455 | |||
456 | |||
457 | /** | ||
458 | * Callback called when a client connects to the service. | ||
459 | * | ||
460 | * @param cls closure for the service | ||
461 | * @param c the new client that connected to the service | ||
462 | * @param mq the message queue used to send messages to the client | ||
463 | * @return @a `struct ClientState` | ||
464 | */ | ||
465 | static void * | ||
466 | client_connect_cb (void *cls, | ||
467 | struct GNUNET_SERVICE_Client *c, | ||
468 | struct GNUNET_MQ_Handle *mq) | ||
469 | { | ||
470 | struct ClientState *cs; | ||
471 | |||
472 | num_clients++; | ||
473 | cs = GNUNET_new (struct ClientState); | ||
474 | cs->client = c; | ||
475 | cs->mq = mq; | ||
476 | return cs; | ||
477 | } | ||
478 | |||
479 | |||
480 | /** | ||
481 | * Iterator over hash map entries to free element entries. | ||
482 | * | ||
483 | * @param cls closure | ||
484 | * @param key current key code | ||
485 | * @param value a `struct ElementEntry *` to be free'd | ||
486 | * @return #GNUNET_YES (continue to iterate) | ||
487 | */ | ||
488 | static int | ||
489 | destroy_elements_iterator (void *cls, | ||
490 | const struct GNUNET_HashCode *key, | ||
491 | void *value) | ||
492 | { | ||
493 | struct ElementEntry *ee = value; | ||
494 | |||
495 | GNUNET_free (ee->mutations); | ||
496 | GNUNET_free (ee); | ||
497 | return GNUNET_YES; | ||
498 | } | ||
499 | |||
500 | |||
501 | /** | ||
502 | * Clean up after a client has disconnected | ||
503 | * | ||
504 | * @param cls closure, unused | ||
505 | * @param client the client to clean up after | ||
506 | * @param internal_cls the `struct ClientState` | ||
507 | */ | ||
508 | static void | ||
509 | client_disconnect_cb (void *cls, | ||
510 | struct GNUNET_SERVICE_Client *client, | ||
511 | void *internal_cls) | ||
512 | { | ||
513 | struct ClientState *cs = internal_cls; | ||
514 | struct Operation *op; | ||
515 | struct Listener *listener; | ||
516 | struct Set *set; | ||
517 | |||
518 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n"); | ||
519 | if (NULL != (set = cs->set)) | ||
520 | { | ||
521 | struct SetContent *content = set->content; | ||
522 | struct PendingMutation *pm; | ||
523 | struct PendingMutation *pm_current; | ||
524 | struct LazyCopyRequest *lcr; | ||
525 | |||
526 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); | ||
527 | /* Destroy pending set operations */ | ||
528 | while (NULL != set->ops_head) | ||
529 | _GSS_operation_destroy (set->ops_head, GNUNET_NO); | ||
530 | |||
531 | /* Destroy operation-specific state */ | ||
532 | GNUNET_assert (NULL != set->state); | ||
533 | set->vt->destroy_set (set->state); | ||
534 | set->state = NULL; | ||
535 | |||
536 | /* Clean up ongoing iterations */ | ||
537 | if (NULL != set->iter) | ||
538 | { | ||
539 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); | ||
540 | set->iter = NULL; | ||
541 | set->iteration_id++; | ||
542 | } | ||
543 | |||
544 | /* discard any pending mutations that reference this set */ | ||
545 | pm = content->pending_mutations_head; | ||
546 | while (NULL != pm) | ||
547 | { | ||
548 | pm_current = pm; | ||
549 | pm = pm->next; | ||
550 | if (pm_current->set == set) | ||
551 | { | ||
552 | GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, | ||
553 | content->pending_mutations_tail, | ||
554 | pm_current); | ||
555 | GNUNET_free (pm_current); | ||
556 | } | ||
557 | } | ||
558 | |||
559 | /* free set content (or at least decrement RC) */ | ||
560 | set->content = NULL; | ||
561 | GNUNET_assert (0 != content->refcount); | ||
562 | content->refcount--; | ||
563 | if (0 == content->refcount) | ||
564 | { | ||
565 | GNUNET_assert (NULL != content->elements); | ||
566 | GNUNET_CONTAINER_multihashmap_iterate (content->elements, | ||
567 | &destroy_elements_iterator, | ||
568 | NULL); | ||
569 | GNUNET_CONTAINER_multihashmap_destroy (content->elements); | ||
570 | content->elements = NULL; | ||
571 | GNUNET_free (content); | ||
572 | } | ||
573 | GNUNET_free (set->excluded_generations); | ||
574 | set->excluded_generations = NULL; | ||
575 | |||
576 | /* remove set from pending copy requests */ | ||
577 | lcr = lazy_copy_head; | ||
578 | while (NULL != lcr) | ||
579 | { | ||
580 | struct LazyCopyRequest *lcr_current = lcr; | ||
581 | |||
582 | lcr = lcr->next; | ||
583 | if (lcr_current->source_set == set) | ||
584 | { | ||
585 | GNUNET_CONTAINER_DLL_remove (lazy_copy_head, | ||
586 | lazy_copy_tail, | ||
587 | lcr_current); | ||
588 | GNUNET_free (lcr_current); | ||
589 | } | ||
590 | } | ||
591 | GNUNET_free (set); | ||
592 | } | ||
593 | |||
594 | if (NULL != (listener = cs->listener)) | ||
595 | { | ||
596 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n"); | ||
597 | GNUNET_CADET_close_port (listener->open_port); | ||
598 | listener->open_port = NULL; | ||
599 | while (NULL != (op = listener->op_head)) | ||
600 | { | ||
601 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
602 | "Destroying incoming operation `%u' from peer `%s'\n", | ||
603 | (unsigned int) op->client_request_id, | ||
604 | GNUNET_i2s (&op->peer)); | ||
605 | incoming_destroy (op); | ||
606 | } | ||
607 | GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener); | ||
608 | GNUNET_free (listener); | ||
609 | } | ||
610 | GNUNET_free (cs); | ||
611 | num_clients--; | ||
612 | if ((GNUNET_YES == in_shutdown) && (0 == num_clients)) | ||
613 | { | ||
614 | if (NULL != cadet) | ||
615 | { | ||
616 | GNUNET_CADET_disconnect (cadet); | ||
617 | cadet = NULL; | ||
618 | } | ||
619 | } | ||
620 | } | ||
621 | |||
622 | |||
623 | /** | ||
624 | * Check a request for a set operation from another peer. | ||
625 | * | ||
626 | * @param cls the operation state | ||
627 | * @param msg the received message | ||
628 | * @return #GNUNET_OK if the channel should be kept alive, | ||
629 | * #GNUNET_SYSERR to destroy the channel | ||
630 | */ | ||
631 | static int | ||
632 | check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) | ||
633 | { | ||
634 | struct Operation *op = cls; | ||
635 | struct Listener *listener = op->listener; | ||
636 | const struct GNUNET_MessageHeader *nested_context; | ||
637 | |||
638 | /* double operation request */ | ||
639 | if (0 != op->suggest_id) | ||
640 | { | ||
641 | GNUNET_break_op (0); | ||
642 | return GNUNET_SYSERR; | ||
643 | } | ||
644 | /* This should be equivalent to the previous condition, but can't hurt to check twice */ | ||
645 | if (NULL == op->listener) | ||
646 | { | ||
647 | GNUNET_break (0); | ||
648 | return GNUNET_SYSERR; | ||
649 | } | ||
650 | if (listener->operation != | ||
651 | (enum GNUNET_SET_OperationType) ntohl (msg->operation)) | ||
652 | { | ||
653 | GNUNET_break_op (0); | ||
654 | return GNUNET_SYSERR; | ||
655 | } | ||
656 | nested_context = GNUNET_MQ_extract_nested_mh (msg); | ||
657 | if ((NULL != nested_context) && | ||
658 | (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE)) | ||
659 | { | ||
660 | GNUNET_break_op (0); | ||
661 | return GNUNET_SYSERR; | ||
662 | } | ||
663 | return GNUNET_OK; | ||
664 | } | ||
665 | |||
666 | |||
667 | /** | ||
668 | * Handle a request for a set operation from another peer. Checks if we | ||
669 | * have a listener waiting for such a request (and in that case initiates | ||
670 | * asking the listener about accepting the connection). If no listener | ||
671 | * is waiting, we queue the operation request in hope that a listener | ||
672 | * shows up soon (before timeout). | ||
673 | * | ||
674 | * This msg is expected as the first and only msg handled through the | ||
675 | * non-operation bound virtual table, acceptance of this operation replaces | ||
676 | * our virtual table and subsequent msgs would be routed differently (as | ||
677 | * we then know what type of operation this is). | ||
678 | * | ||
679 | * @param cls the operation state | ||
680 | * @param msg the received message | ||
681 | * @return #GNUNET_OK if the channel should be kept alive, | ||
682 | * #GNUNET_SYSERR to destroy the channel | ||
683 | */ | ||
684 | static void | ||
685 | handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) | ||
686 | { | ||
687 | struct Operation *op = cls; | ||
688 | struct Listener *listener = op->listener; | ||
689 | const struct GNUNET_MessageHeader *nested_context; | ||
690 | struct GNUNET_MQ_Envelope *env; | ||
691 | struct GNUNET_SET_RequestMessage *cmsg; | ||
692 | |||
693 | nested_context = GNUNET_MQ_extract_nested_mh (msg); | ||
694 | /* Make a copy of the nested_context (application-specific context | ||
695 | information that is opaque to set) so we can pass it to the | ||
696 | listener later on */ | ||
697 | if (NULL != nested_context) | ||
698 | op->context_msg = GNUNET_copy_message (nested_context); | ||
699 | op->remote_element_count = ntohl (msg->element_count); | ||
700 | GNUNET_log ( | ||
701 | GNUNET_ERROR_TYPE_DEBUG, | ||
702 | "Received P2P operation request (op %u, port %s) for active listener\n", | ||
703 | (uint32_t) ntohl (msg->operation), | ||
704 | GNUNET_h2s (&op->listener->app_id)); | ||
705 | GNUNET_assert (0 == op->suggest_id); | ||
706 | if (0 == suggest_id) | ||
707 | suggest_id++; | ||
708 | op->suggest_id = suggest_id++; | ||
709 | GNUNET_assert (NULL != op->timeout_task); | ||
710 | GNUNET_SCHEDULER_cancel (op->timeout_task); | ||
711 | op->timeout_task = NULL; | ||
712 | env = GNUNET_MQ_msg_nested_mh (cmsg, | ||
713 | GNUNET_MESSAGE_TYPE_SET_REQUEST, | ||
714 | op->context_msg); | ||
715 | GNUNET_log ( | ||
716 | GNUNET_ERROR_TYPE_DEBUG, | ||
717 | "Suggesting incoming request with accept id %u to listener %p of client %p\n", | ||
718 | op->suggest_id, | ||
719 | listener, | ||
720 | listener->cs); | ||
721 | cmsg->accept_id = htonl (op->suggest_id); | ||
722 | cmsg->peer_id = op->peer; | ||
723 | GNUNET_MQ_send (listener->cs->mq, env); | ||
724 | /* NOTE: GNUNET_CADET_receive_done() will be called in | ||
725 | #handle_client_accept() */ | ||
726 | } | ||
727 | |||
728 | |||
729 | /** | ||
730 | * Add an element to @a set as specified by @a msg | ||
731 | * | ||
732 | * @param set set to manipulate | ||
733 | * @param msg message specifying the change | ||
734 | */ | ||
735 | static void | ||
736 | execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) | ||
737 | { | ||
738 | struct GNUNET_SET_Element el; | ||
739 | struct ElementEntry *ee; | ||
740 | struct GNUNET_HashCode hash; | ||
741 | |||
742 | GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type)); | ||
743 | el.size = ntohs (msg->header.size) - sizeof(*msg); | ||
744 | el.data = &msg[1]; | ||
745 | el.element_type = ntohs (msg->element_type); | ||
746 | GNUNET_SET_element_hash (&el, &hash); | ||
747 | ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); | ||
748 | if (NULL == ee) | ||
749 | { | ||
750 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
751 | "Client inserts element %s of size %u\n", | ||
752 | GNUNET_h2s (&hash), | ||
753 | el.size); | ||
754 | ee = GNUNET_malloc (el.size + sizeof(*ee)); | ||
755 | ee->element.size = el.size; | ||
756 | GNUNET_memcpy (&ee[1], el.data, el.size); | ||
757 | ee->element.data = &ee[1]; | ||
758 | ee->element.element_type = el.element_type; | ||
759 | ee->remote = GNUNET_NO; | ||
760 | ee->mutations = NULL; | ||
761 | ee->mutations_size = 0; | ||
762 | ee->element_hash = hash; | ||
763 | GNUNET_break (GNUNET_YES == | ||
764 | GNUNET_CONTAINER_multihashmap_put ( | ||
765 | set->content->elements, | ||
766 | &ee->element_hash, | ||
767 | ee, | ||
768 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
769 | } | ||
770 | else if (GNUNET_YES == | ||
771 | is_element_of_generation (ee, | ||
772 | set->current_generation, | ||
773 | set->excluded_generations, | ||
774 | set->excluded_generations_size)) | ||
775 | { | ||
776 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
777 | "Client inserted element %s of size %u twice (ignored)\n", | ||
778 | GNUNET_h2s (&hash), | ||
779 | el.size); | ||
780 | |||
781 | /* same element inserted twice */ | ||
782 | return; | ||
783 | } | ||
784 | |||
785 | { | ||
786 | struct MutationEvent mut = { .generation = set->current_generation, | ||
787 | .added = GNUNET_YES }; | ||
788 | GNUNET_array_append (ee->mutations, ee->mutations_size, mut); | ||
789 | } | ||
790 | set->vt->add (set->state, ee); | ||
791 | } | ||
792 | |||
793 | |||
794 | /** | ||
795 | * Remove an element from @a set as specified by @a msg | ||
796 | * | ||
797 | * @param set set to manipulate | ||
798 | * @param msg message specifying the change | ||
799 | */ | ||
800 | static void | ||
801 | execute_remove (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) | ||
802 | { | ||
803 | struct GNUNET_SET_Element el; | ||
804 | struct ElementEntry *ee; | ||
805 | struct GNUNET_HashCode hash; | ||
806 | |||
807 | GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type)); | ||
808 | el.size = ntohs (msg->header.size) - sizeof(*msg); | ||
809 | el.data = &msg[1]; | ||
810 | el.element_type = ntohs (msg->element_type); | ||
811 | GNUNET_SET_element_hash (&el, &hash); | ||
812 | ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); | ||
813 | if (NULL == ee) | ||
814 | { | ||
815 | /* Client tried to remove non-existing element. */ | ||
816 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
817 | "Client removes non-existing element of size %u\n", | ||
818 | el.size); | ||
819 | return; | ||
820 | } | ||
821 | if (GNUNET_NO == is_element_of_generation (ee, | ||
822 | set->current_generation, | ||
823 | set->excluded_generations, | ||
824 | set->excluded_generations_size)) | ||
825 | { | ||
826 | /* Client tried to remove element twice */ | ||
827 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
828 | "Client removed element of size %u twice (ignored)\n", | ||
829 | el.size); | ||
830 | return; | ||
831 | } | ||
832 | else | ||
833 | { | ||
834 | struct MutationEvent mut = { .generation = set->current_generation, | ||
835 | .added = GNUNET_NO }; | ||
836 | |||
837 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
838 | "Client removes element of size %u\n", | ||
839 | el.size); | ||
840 | |||
841 | GNUNET_array_append (ee->mutations, ee->mutations_size, mut); | ||
842 | } | ||
843 | set->vt->remove (set->state, ee); | ||
844 | } | ||
845 | |||
846 | |||
847 | /** | ||
848 | * Perform a mutation on a set as specified by the @a msg | ||
849 | * | ||
850 | * @param set the set to mutate | ||
851 | * @param msg specification of what to change | ||
852 | */ | ||
853 | static void | ||
854 | execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) | ||
855 | { | ||
856 | switch (ntohs (msg->header.type)) | ||
857 | { | ||
858 | case GNUNET_MESSAGE_TYPE_SET_ADD: | ||
859 | execute_add (set, msg); | ||
860 | break; | ||
861 | |||
862 | case GNUNET_MESSAGE_TYPE_SET_REMOVE: | ||
863 | execute_remove (set, msg); | ||
864 | break; | ||
865 | |||
866 | default: | ||
867 | GNUNET_break (0); | ||
868 | } | ||
869 | } | ||
870 | |||
871 | |||
872 | /** | ||
873 | * Execute mutations that were delayed on a set because of | ||
874 | * pending operations. | ||
875 | * | ||
876 | * @param set the set to execute mutations on | ||
877 | */ | ||
878 | static void | ||
879 | execute_delayed_mutations (struct Set *set) | ||
880 | { | ||
881 | struct PendingMutation *pm; | ||
882 | |||
883 | if (0 != set->content->iterator_count) | ||
884 | return; /* still cannot do this */ | ||
885 | while (NULL != (pm = set->content->pending_mutations_head)) | ||
886 | { | ||
887 | GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, | ||
888 | set->content->pending_mutations_tail, | ||
889 | pm); | ||
890 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
891 | "Executing pending mutation on %p.\n", | ||
892 | pm->set); | ||
893 | execute_mutation (pm->set, pm->msg); | ||
894 | GNUNET_free (pm->msg); | ||
895 | GNUNET_free (pm); | ||
896 | } | ||
897 | } | ||
898 | |||
899 | |||
900 | /** | ||
901 | * Send the next element of a set to the set's client. The next element is given by | ||
902 | * the set's current hashmap iterator. The set's iterator will be set to NULL if there | ||
903 | * are no more elements in the set. The caller must ensure that the set's iterator is | ||
904 | * valid. | ||
905 | * | ||
906 | * The client will acknowledge each received element with a | ||
907 | * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our | ||
908 | * #handle_client_iter_ack() will then trigger the next transmission. | ||
909 | * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged. | ||
910 | * | ||
911 | * @param set set that should send its next element to its client | ||
912 | */ | ||
913 | static void | ||
914 | send_client_element (struct Set *set) | ||
915 | { | ||
916 | int ret; | ||
917 | struct ElementEntry *ee; | ||
918 | struct GNUNET_MQ_Envelope *ev; | ||
919 | struct GNUNET_SET_IterResponseMessage *msg; | ||
920 | |||
921 | GNUNET_assert (NULL != set->iter); | ||
922 | do | ||
923 | { | ||
924 | ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, | ||
925 | NULL, | ||
926 | (const void **) &ee); | ||
927 | if (GNUNET_NO == ret) | ||
928 | { | ||
929 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set); | ||
930 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); | ||
931 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); | ||
932 | set->iter = NULL; | ||
933 | set->iteration_id++; | ||
934 | GNUNET_assert (set->content->iterator_count > 0); | ||
935 | set->content->iterator_count--; | ||
936 | execute_delayed_mutations (set); | ||
937 | GNUNET_MQ_send (set->cs->mq, ev); | ||
938 | return; | ||
939 | } | ||
940 | GNUNET_assert (NULL != ee); | ||
941 | } | ||
942 | while (GNUNET_NO == | ||
943 | is_element_of_generation (ee, | ||
944 | set->iter_generation, | ||
945 | set->excluded_generations, | ||
946 | set->excluded_generations_size)); | ||
947 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
948 | "Sending iteration element on %p.\n", | ||
949 | set); | ||
950 | ev = GNUNET_MQ_msg_extra (msg, | ||
951 | ee->element.size, | ||
952 | GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); | ||
953 | GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size); | ||
954 | msg->element_type = htons (ee->element.element_type); | ||
955 | msg->iteration_id = htons (set->iteration_id); | ||
956 | GNUNET_MQ_send (set->cs->mq, ev); | ||
957 | } | ||
958 | |||
959 | |||
960 | /** | ||
961 | * Called when a client wants to iterate the elements of a set. | ||
962 | * Checks if we have a set associated with the client and if we | ||
963 | * can right now start an iteration. If all checks out, starts | ||
964 | * sending the elements of the set to the client. | ||
965 | * | ||
966 | * @param cls client that sent the message | ||
967 | * @param m message sent by the client | ||
968 | */ | ||
969 | static void | ||
970 | handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) | ||
971 | { | ||
972 | struct ClientState *cs = cls; | ||
973 | struct Set *set; | ||
974 | |||
975 | if (NULL == (set = cs->set)) | ||
976 | { | ||
977 | /* attempt to iterate over a non existing set */ | ||
978 | GNUNET_break (0); | ||
979 | GNUNET_SERVICE_client_drop (cs->client); | ||
980 | return; | ||
981 | } | ||
982 | if (NULL != set->iter) | ||
983 | { | ||
984 | /* Only one concurrent iterate-action allowed per set */ | ||
985 | GNUNET_break (0); | ||
986 | GNUNET_SERVICE_client_drop (cs->client); | ||
987 | return; | ||
988 | } | ||
989 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
990 | "Iterating set %p in gen %u with %u content elements\n", | ||
991 | (void *) set, | ||
992 | set->current_generation, | ||
993 | GNUNET_CONTAINER_multihashmap_size (set->content->elements)); | ||
994 | GNUNET_SERVICE_client_continue (cs->client); | ||
995 | set->content->iterator_count++; | ||
996 | set->iter = | ||
997 | GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); | ||
998 | set->iter_generation = set->current_generation; | ||
999 | send_client_element (set); | ||
1000 | } | ||
1001 | |||
1002 | |||
1003 | /** | ||
1004 | * Called when a client wants to create a new set. This is typically | ||
1005 | * the first request from a client, and includes the type of set | ||
1006 | * operation to be performed. | ||
1007 | * | ||
1008 | * @param cls client that sent the message | ||
1009 | * @param m message sent by the client | ||
1010 | */ | ||
1011 | static void | ||
1012 | handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) | ||
1013 | { | ||
1014 | struct ClientState *cs = cls; | ||
1015 | struct Set *set; | ||
1016 | |||
1017 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1018 | "Client created new set (operation %u)\n", | ||
1019 | (uint32_t) ntohl (msg->operation)); | ||
1020 | if (NULL != cs->set) | ||
1021 | { | ||
1022 | /* There can only be one set per client */ | ||
1023 | GNUNET_break (0); | ||
1024 | GNUNET_SERVICE_client_drop (cs->client); | ||
1025 | return; | ||
1026 | } | ||
1027 | set = GNUNET_new (struct Set); | ||
1028 | switch (ntohl (msg->operation)) | ||
1029 | { | ||
1030 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
1031 | set->vt = _GSS_intersection_vt (); | ||
1032 | break; | ||
1033 | |||
1034 | case GNUNET_SET_OPERATION_UNION: | ||
1035 | set->vt = _GSS_union_vt (); | ||
1036 | break; | ||
1037 | |||
1038 | default: | ||
1039 | GNUNET_free (set); | ||
1040 | GNUNET_break (0); | ||
1041 | GNUNET_SERVICE_client_drop (cs->client); | ||
1042 | return; | ||
1043 | } | ||
1044 | set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); | ||
1045 | set->state = set->vt->create (); | ||
1046 | if (NULL == set->state) | ||
1047 | { | ||
1048 | /* initialization failed (i.e. out of memory) */ | ||
1049 | GNUNET_free (set); | ||
1050 | GNUNET_SERVICE_client_drop (cs->client); | ||
1051 | return; | ||
1052 | } | ||
1053 | set->content = GNUNET_new (struct SetContent); | ||
1054 | set->content->refcount = 1; | ||
1055 | set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
1056 | set->cs = cs; | ||
1057 | cs->set = set; | ||
1058 | GNUNET_SERVICE_client_continue (cs->client); | ||
1059 | } | ||
1060 | |||
1061 | |||
1062 | /** | ||
1063 | * Timeout happens iff: | ||
1064 | * - we suggested an operation to our listener, | ||
1065 | * but did not receive a response in time | ||
1066 | * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST | ||
1067 | * | ||
1068 | * @param cls channel context | ||
1069 | * @param tc context information (why was this task triggered now) | ||
1070 | */ | ||
1071 | static void | ||
1072 | incoming_timeout_cb (void *cls) | ||
1073 | { | ||
1074 | struct Operation *op = cls; | ||
1075 | |||
1076 | op->timeout_task = NULL; | ||
1077 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1078 | "Remote peer's incoming request timed out\n"); | ||
1079 | incoming_destroy (op); | ||
1080 | } | ||
1081 | |||
1082 | |||
1083 | /** | ||
1084 | * Method called whenever another peer has added us to a channel the | ||
1085 | * other peer initiated. Only called (once) upon reception of data | ||
1086 | * from a channel we listen on. | ||
1087 | * | ||
1088 | * The channel context represents the operation itself and gets added | ||
1089 | * to a DLL, from where it gets looked up when our local listener | ||
1090 | * client responds to a proposed/suggested operation or connects and | ||
1091 | * associates with this operation. | ||
1092 | * | ||
1093 | * @param cls closure | ||
1094 | * @param channel new handle to the channel | ||
1095 | * @param source peer that started the channel | ||
1096 | * @return initial channel context for the channel | ||
1097 | * returns NULL on error | ||
1098 | */ | ||
1099 | static void * | ||
1100 | channel_new_cb (void *cls, | ||
1101 | struct GNUNET_CADET_Channel *channel, | ||
1102 | const struct GNUNET_PeerIdentity *source) | ||
1103 | { | ||
1104 | struct Listener *listener = cls; | ||
1105 | struct Operation *op; | ||
1106 | |||
1107 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n"); | ||
1108 | op = GNUNET_new (struct Operation); | ||
1109 | op->listener = listener; | ||
1110 | op->peer = *source; | ||
1111 | op->channel = channel; | ||
1112 | op->mq = GNUNET_CADET_get_mq (op->channel); | ||
1113 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); | ||
1114 | op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, | ||
1115 | &incoming_timeout_cb, | ||
1116 | op); | ||
1117 | GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op); | ||
1118 | return op; | ||
1119 | } | ||
1120 | |||
1121 | |||
1122 | /** | ||
1123 | * Function called whenever a channel is destroyed. Should clean up | ||
1124 | * any associated state. It must NOT call | ||
1125 | * GNUNET_CADET_channel_destroy() on the channel. | ||
1126 | * | ||
1127 | * The peer_disconnect function is part of a a virtual table set initially either | ||
1128 | * when a peer creates a new channel with us, or once we create | ||
1129 | * a new channel ourselves (evaluate). | ||
1130 | * | ||
1131 | * Once we know the exact type of operation (union/intersection), the vt is | ||
1132 | * replaced with an operation specific instance (_GSS_[op]_vt). | ||
1133 | * | ||
1134 | * @param channel_ctx place where local state associated | ||
1135 | * with the channel is stored | ||
1136 | * @param channel connection to the other end (henceforth invalid) | ||
1137 | */ | ||
1138 | static void | ||
1139 | channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) | ||
1140 | { | ||
1141 | struct Operation *op = channel_ctx; | ||
1142 | |||
1143 | op->channel = NULL; | ||
1144 | _GSS_operation_destroy2 (op); | ||
1145 | } | ||
1146 | |||
1147 | |||
1148 | /** | ||
1149 | * This function probably should not exist | ||
1150 | * and be replaced by inlining more specific | ||
1151 | * logic in the various places where it is called. | ||
1152 | */ | ||
1153 | void | ||
1154 | _GSS_operation_destroy2 (struct Operation *op) | ||
1155 | { | ||
1156 | struct GNUNET_CADET_Channel *channel; | ||
1157 | |||
1158 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n"); | ||
1159 | if (NULL != (channel = op->channel)) | ||
1160 | { | ||
1161 | /* This will free op; called conditionally as this helper function | ||
1162 | is also called from within the channel disconnect handler. */ | ||
1163 | op->channel = NULL; | ||
1164 | GNUNET_CADET_channel_destroy (channel); | ||
1165 | } | ||
1166 | if (NULL != op->listener) | ||
1167 | { | ||
1168 | incoming_destroy (op); | ||
1169 | return; | ||
1170 | } | ||
1171 | if (NULL != op->set) | ||
1172 | op->set->vt->channel_death (op); | ||
1173 | else | ||
1174 | _GSS_operation_destroy (op, GNUNET_YES); | ||
1175 | GNUNET_free (op); | ||
1176 | } | ||
1177 | |||
1178 | |||
1179 | /** | ||
1180 | * Function called whenever an MQ-channel's transmission window size changes. | ||
1181 | * | ||
1182 | * The first callback in an outgoing channel will be with a non-zero value | ||
1183 | * and will mean the channel is connected to the destination. | ||
1184 | * | ||
1185 | * For an incoming channel it will be called immediately after the | ||
1186 | * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. | ||
1187 | * | ||
1188 | * @param cls Channel closure. | ||
1189 | * @param channel Connection to the other end (henceforth invalid). | ||
1190 | * @param window_size New window size. If the is more messages than buffer size | ||
1191 | * this value will be negative.. | ||
1192 | */ | ||
1193 | static void | ||
1194 | channel_window_cb (void *cls, | ||
1195 | const struct GNUNET_CADET_Channel *channel, | ||
1196 | int window_size) | ||
1197 | { | ||
1198 | /* FIXME: not implemented, we could do flow control here... */ | ||
1199 | } | ||
1200 | |||
1201 | |||
1202 | /** | ||
1203 | * Called when a client wants to create a new listener. | ||
1204 | * | ||
1205 | * @param cls client that sent the message | ||
1206 | * @param msg message sent by the client | ||
1207 | */ | ||
1208 | static void | ||
1209 | handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) | ||
1210 | { | ||
1211 | struct ClientState *cs = cls; | ||
1212 | struct GNUNET_MQ_MessageHandler cadet_handlers[] = | ||
1213 | { GNUNET_MQ_hd_var_size (incoming_msg, | ||
1214 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | ||
1215 | struct OperationRequestMessage, | ||
1216 | NULL), | ||
1217 | GNUNET_MQ_hd_var_size (union_p2p_ibf, | ||
1218 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, | ||
1219 | struct IBFMessage, | ||
1220 | NULL), | ||
1221 | GNUNET_MQ_hd_var_size (union_p2p_elements, | ||
1222 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, | ||
1223 | struct GNUNET_SET_ElementMessage, | ||
1224 | NULL), | ||
1225 | GNUNET_MQ_hd_var_size (union_p2p_offer, | ||
1226 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, | ||
1227 | struct GNUNET_MessageHeader, | ||
1228 | NULL), | ||
1229 | GNUNET_MQ_hd_var_size (union_p2p_inquiry, | ||
1230 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, | ||
1231 | struct InquiryMessage, | ||
1232 | NULL), | ||
1233 | GNUNET_MQ_hd_var_size (union_p2p_demand, | ||
1234 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, | ||
1235 | struct GNUNET_MessageHeader, | ||
1236 | NULL), | ||
1237 | GNUNET_MQ_hd_fixed_size (union_p2p_done, | ||
1238 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, | ||
1239 | struct GNUNET_MessageHeader, | ||
1240 | NULL), | ||
1241 | GNUNET_MQ_hd_fixed_size (union_p2p_over, | ||
1242 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER, | ||
1243 | struct GNUNET_MessageHeader, | ||
1244 | NULL), | ||
1245 | GNUNET_MQ_hd_fixed_size (union_p2p_full_done, | ||
1246 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, | ||
1247 | struct GNUNET_MessageHeader, | ||
1248 | NULL), | ||
1249 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, | ||
1250 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, | ||
1251 | struct GNUNET_MessageHeader, | ||
1252 | NULL), | ||
1253 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | ||
1254 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, | ||
1255 | struct StrataEstimatorMessage, | ||
1256 | NULL), | ||
1257 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | ||
1258 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, | ||
1259 | struct StrataEstimatorMessage, | ||
1260 | NULL), | ||
1261 | GNUNET_MQ_hd_var_size (union_p2p_full_element, | ||
1262 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, | ||
1263 | struct GNUNET_SET_ElementMessage, | ||
1264 | NULL), | ||
1265 | GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, | ||
1266 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, | ||
1267 | struct IntersectionElementInfoMessage, | ||
1268 | NULL), | ||
1269 | GNUNET_MQ_hd_var_size (intersection_p2p_bf, | ||
1270 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, | ||
1271 | struct BFMessage, | ||
1272 | NULL), | ||
1273 | GNUNET_MQ_hd_fixed_size (intersection_p2p_done, | ||
1274 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, | ||
1275 | struct IntersectionDoneMessage, | ||
1276 | NULL), | ||
1277 | GNUNET_MQ_handler_end () }; | ||
1278 | struct Listener *listener; | ||
1279 | |||
1280 | if (NULL != cs->listener) | ||
1281 | { | ||
1282 | /* max. one active listener per client! */ | ||
1283 | GNUNET_break (0); | ||
1284 | GNUNET_SERVICE_client_drop (cs->client); | ||
1285 | return; | ||
1286 | } | ||
1287 | listener = GNUNET_new (struct Listener); | ||
1288 | listener->cs = cs; | ||
1289 | cs->listener = listener; | ||
1290 | listener->app_id = msg->app_id; | ||
1291 | listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); | ||
1292 | GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener); | ||
1293 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1294 | "New listener created (op %u, port %s)\n", | ||
1295 | listener->operation, | ||
1296 | GNUNET_h2s (&listener->app_id)); | ||
1297 | listener->open_port = GNUNET_CADET_open_port (cadet, | ||
1298 | &msg->app_id, | ||
1299 | &channel_new_cb, | ||
1300 | listener, | ||
1301 | &channel_window_cb, | ||
1302 | &channel_end_cb, | ||
1303 | cadet_handlers); | ||
1304 | GNUNET_SERVICE_client_continue (cs->client); | ||
1305 | } | ||
1306 | |||
1307 | |||
1308 | /** | ||
1309 | * Called when the listening client rejects an operation | ||
1310 | * request by another peer. | ||
1311 | * | ||
1312 | * @param cls client that sent the message | ||
1313 | * @param msg message sent by the client | ||
1314 | */ | ||
1315 | static void | ||
1316 | handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) | ||
1317 | { | ||
1318 | struct ClientState *cs = cls; | ||
1319 | struct Operation *op; | ||
1320 | |||
1321 | op = get_incoming (ntohl (msg->accept_reject_id)); | ||
1322 | if (NULL == op) | ||
1323 | { | ||
1324 | /* no matching incoming operation for this reject; | ||
1325 | could be that the other peer already disconnected... */ | ||
1326 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1327 | "Client rejected unknown operation %u\n", | ||
1328 | (unsigned int) ntohl (msg->accept_reject_id)); | ||
1329 | GNUNET_SERVICE_client_continue (cs->client); | ||
1330 | return; | ||
1331 | } | ||
1332 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1333 | "Peer request (op %u, app %s) rejected by client\n", | ||
1334 | op->listener->operation, | ||
1335 | GNUNET_h2s (&cs->listener->app_id)); | ||
1336 | _GSS_operation_destroy2 (op); | ||
1337 | GNUNET_SERVICE_client_continue (cs->client); | ||
1338 | } | ||
1339 | |||
1340 | |||
1341 | /** | ||
1342 | * Called when a client wants to add or remove an element to a set it inhabits. | ||
1343 | * | ||
1344 | * @param cls client that sent the message | ||
1345 | * @param msg message sent by the client | ||
1346 | */ | ||
1347 | static int | ||
1348 | check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) | ||
1349 | { | ||
1350 | /* NOTE: Technically, we should probably check with the | ||
1351 | block library whether the element we are given is well-formed */ | ||
1352 | return GNUNET_OK; | ||
1353 | } | ||
1354 | |||
1355 | |||
1356 | /** | ||
1357 | * Called when a client wants to add or remove an element to a set it inhabits. | ||
1358 | * | ||
1359 | * @param cls client that sent the message | ||
1360 | * @param msg message sent by the client | ||
1361 | */ | ||
1362 | static void | ||
1363 | handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) | ||
1364 | { | ||
1365 | struct ClientState *cs = cls; | ||
1366 | struct Set *set; | ||
1367 | |||
1368 | if (NULL == (set = cs->set)) | ||
1369 | { | ||
1370 | /* client without a set requested an operation */ | ||
1371 | GNUNET_break (0); | ||
1372 | GNUNET_SERVICE_client_drop (cs->client); | ||
1373 | return; | ||
1374 | } | ||
1375 | GNUNET_SERVICE_client_continue (cs->client); | ||
1376 | |||
1377 | if (0 != set->content->iterator_count) | ||
1378 | { | ||
1379 | struct PendingMutation *pm; | ||
1380 | |||
1381 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n"); | ||
1382 | pm = GNUNET_new (struct PendingMutation); | ||
1383 | pm->msg = | ||
1384 | (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); | ||
1385 | pm->set = set; | ||
1386 | GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, | ||
1387 | set->content->pending_mutations_tail, | ||
1388 | pm); | ||
1389 | return; | ||
1390 | } | ||
1391 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); | ||
1392 | execute_mutation (set, msg); | ||
1393 | } | ||
1394 | |||
1395 | |||
1396 | /** | ||
1397 | * Advance the current generation of a set, | ||
1398 | * adding exclusion ranges if necessary. | ||
1399 | * | ||
1400 | * @param set the set where we want to advance the generation | ||
1401 | */ | ||
1402 | static void | ||
1403 | advance_generation (struct Set *set) | ||
1404 | { | ||
1405 | struct GenerationRange r; | ||
1406 | |||
1407 | if (set->current_generation == set->content->latest_generation) | ||
1408 | { | ||
1409 | set->content->latest_generation++; | ||
1410 | set->current_generation++; | ||
1411 | return; | ||
1412 | } | ||
1413 | |||
1414 | GNUNET_assert (set->current_generation < set->content->latest_generation); | ||
1415 | |||
1416 | r.start = set->current_generation + 1; | ||
1417 | r.end = set->content->latest_generation + 1; | ||
1418 | set->content->latest_generation = r.end; | ||
1419 | set->current_generation = r.end; | ||
1420 | GNUNET_array_append (set->excluded_generations, | ||
1421 | set->excluded_generations_size, | ||
1422 | r); | ||
1423 | } | ||
1424 | |||
1425 | |||
1426 | /** | ||
1427 | * Called when a client wants to initiate a set operation with another | ||
1428 | * peer. Initiates the CADET connection to the listener and sends the | ||
1429 | * request. | ||
1430 | * | ||
1431 | * @param cls client that sent the message | ||
1432 | * @param msg message sent by the client | ||
1433 | * @return #GNUNET_OK if the message is well-formed | ||
1434 | */ | ||
1435 | static int | ||
1436 | check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | ||
1437 | { | ||
1438 | /* FIXME: suboptimal, even if the context below could be NULL, | ||
1439 | there are malformed messages this does not check for... */ | ||
1440 | return GNUNET_OK; | ||
1441 | } | ||
1442 | |||
1443 | |||
1444 | /** | ||
1445 | * Called when a client wants to initiate a set operation with another | ||
1446 | * peer. Initiates the CADET connection to the listener and sends the | ||
1447 | * request. | ||
1448 | * | ||
1449 | * @param cls client that sent the message | ||
1450 | * @param msg message sent by the client | ||
1451 | */ | ||
1452 | static void | ||
1453 | handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | ||
1454 | { | ||
1455 | struct ClientState *cs = cls; | ||
1456 | struct Operation *op = GNUNET_new (struct Operation); | ||
1457 | const struct GNUNET_MQ_MessageHandler cadet_handlers[] = | ||
1458 | { GNUNET_MQ_hd_var_size (incoming_msg, | ||
1459 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | ||
1460 | struct OperationRequestMessage, | ||
1461 | op), | ||
1462 | GNUNET_MQ_hd_var_size (union_p2p_ibf, | ||
1463 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, | ||
1464 | struct IBFMessage, | ||
1465 | op), | ||
1466 | GNUNET_MQ_hd_var_size (union_p2p_elements, | ||
1467 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, | ||
1468 | struct GNUNET_SET_ElementMessage, | ||
1469 | op), | ||
1470 | GNUNET_MQ_hd_var_size (union_p2p_offer, | ||
1471 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, | ||
1472 | struct GNUNET_MessageHeader, | ||
1473 | op), | ||
1474 | GNUNET_MQ_hd_var_size (union_p2p_inquiry, | ||
1475 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, | ||
1476 | struct InquiryMessage, | ||
1477 | op), | ||
1478 | GNUNET_MQ_hd_var_size (union_p2p_demand, | ||
1479 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, | ||
1480 | struct GNUNET_MessageHeader, | ||
1481 | op), | ||
1482 | GNUNET_MQ_hd_fixed_size (union_p2p_done, | ||
1483 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, | ||
1484 | struct GNUNET_MessageHeader, | ||
1485 | op), | ||
1486 | GNUNET_MQ_hd_fixed_size (union_p2p_over, | ||
1487 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER, | ||
1488 | struct GNUNET_MessageHeader, | ||
1489 | op), | ||
1490 | GNUNET_MQ_hd_fixed_size (union_p2p_full_done, | ||
1491 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, | ||
1492 | struct GNUNET_MessageHeader, | ||
1493 | op), | ||
1494 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, | ||
1495 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, | ||
1496 | struct GNUNET_MessageHeader, | ||
1497 | op), | ||
1498 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | ||
1499 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, | ||
1500 | struct StrataEstimatorMessage, | ||
1501 | op), | ||
1502 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | ||
1503 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, | ||
1504 | struct StrataEstimatorMessage, | ||
1505 | op), | ||
1506 | GNUNET_MQ_hd_var_size (union_p2p_full_element, | ||
1507 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, | ||
1508 | struct GNUNET_SET_ElementMessage, | ||
1509 | op), | ||
1510 | GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, | ||
1511 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, | ||
1512 | struct IntersectionElementInfoMessage, | ||
1513 | op), | ||
1514 | GNUNET_MQ_hd_var_size (intersection_p2p_bf, | ||
1515 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, | ||
1516 | struct BFMessage, | ||
1517 | op), | ||
1518 | GNUNET_MQ_hd_fixed_size (intersection_p2p_done, | ||
1519 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, | ||
1520 | struct IntersectionDoneMessage, | ||
1521 | op), | ||
1522 | GNUNET_MQ_handler_end () }; | ||
1523 | struct Set *set; | ||
1524 | const struct GNUNET_MessageHeader *context; | ||
1525 | |||
1526 | if (NULL == (set = cs->set)) | ||
1527 | { | ||
1528 | GNUNET_break (0); | ||
1529 | GNUNET_free (op); | ||
1530 | GNUNET_SERVICE_client_drop (cs->client); | ||
1531 | return; | ||
1532 | } | ||
1533 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); | ||
1534 | op->peer = msg->target_peer; | ||
1535 | op->result_mode = ntohl (msg->result_mode); | ||
1536 | op->client_request_id = ntohl (msg->request_id); | ||
1537 | op->byzantine = msg->byzantine; | ||
1538 | op->byzantine_lower_bound = msg->byzantine_lower_bound; | ||
1539 | op->force_full = msg->force_full; | ||
1540 | op->force_delta = msg->force_delta; | ||
1541 | context = GNUNET_MQ_extract_nested_mh (msg); | ||
1542 | |||
1543 | /* Advance generation values, so that | ||
1544 | mutations won't interfer with the running operation. */ | ||
1545 | op->set = set; | ||
1546 | op->generation_created = set->current_generation; | ||
1547 | advance_generation (set); | ||
1548 | GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); | ||
1549 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1550 | "Creating new CADET channel to port %s for set operation type %u\n", | ||
1551 | GNUNET_h2s (&msg->app_id), | ||
1552 | set->operation); | ||
1553 | op->channel = GNUNET_CADET_channel_create (cadet, | ||
1554 | op, | ||
1555 | &msg->target_peer, | ||
1556 | &msg->app_id, | ||
1557 | &channel_window_cb, | ||
1558 | &channel_end_cb, | ||
1559 | cadet_handlers); | ||
1560 | op->mq = GNUNET_CADET_get_mq (op->channel); | ||
1561 | op->state = set->vt->evaluate (op, context); | ||
1562 | if (NULL == op->state) | ||
1563 | { | ||
1564 | GNUNET_break (0); | ||
1565 | GNUNET_SERVICE_client_drop (cs->client); | ||
1566 | return; | ||
1567 | } | ||
1568 | GNUNET_SERVICE_client_continue (cs->client); | ||
1569 | } | ||
1570 | |||
1571 | |||
1572 | /** | ||
1573 | * Handle an ack from a client, and send the next element. Note | ||
1574 | * that we only expect acks for set elements, not after the | ||
1575 | * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message. | ||
1576 | * | ||
1577 | * @param cls client the client | ||
1578 | * @param ack the message | ||
1579 | */ | ||
1580 | static void | ||
1581 | handle_client_iter_ack (void *cls, const struct GNUNET_SET_IterAckMessage *ack) | ||
1582 | { | ||
1583 | struct ClientState *cs = cls; | ||
1584 | struct Set *set; | ||
1585 | |||
1586 | if (NULL == (set = cs->set)) | ||
1587 | { | ||
1588 | /* client without a set acknowledged receiving a value */ | ||
1589 | GNUNET_break (0); | ||
1590 | GNUNET_SERVICE_client_drop (cs->client); | ||
1591 | return; | ||
1592 | } | ||
1593 | if (NULL == set->iter) | ||
1594 | { | ||
1595 | /* client sent an ack, but we were not expecting one (as | ||
1596 | set iteration has finished) */ | ||
1597 | GNUNET_break (0); | ||
1598 | GNUNET_SERVICE_client_drop (cs->client); | ||
1599 | return; | ||
1600 | } | ||
1601 | GNUNET_SERVICE_client_continue (cs->client); | ||
1602 | if (ntohl (ack->send_more)) | ||
1603 | { | ||
1604 | send_client_element (set); | ||
1605 | } | ||
1606 | else | ||
1607 | { | ||
1608 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); | ||
1609 | set->iter = NULL; | ||
1610 | set->iteration_id++; | ||
1611 | } | ||
1612 | } | ||
1613 | |||
1614 | |||
1615 | /** | ||
1616 | * Handle a request from the client to copy a set. | ||
1617 | * | ||
1618 | * @param cls the client | ||
1619 | * @param mh the message | ||
1620 | */ | ||
1621 | static void | ||
1622 | handle_client_copy_lazy_prepare (void *cls, | ||
1623 | const struct GNUNET_MessageHeader *mh) | ||
1624 | { | ||
1625 | struct ClientState *cs = cls; | ||
1626 | struct Set *set; | ||
1627 | struct LazyCopyRequest *cr; | ||
1628 | struct GNUNET_MQ_Envelope *ev; | ||
1629 | struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; | ||
1630 | |||
1631 | if (NULL == (set = cs->set)) | ||
1632 | { | ||
1633 | /* client without a set requested an operation */ | ||
1634 | GNUNET_break (0); | ||
1635 | GNUNET_SERVICE_client_drop (cs->client); | ||
1636 | return; | ||
1637 | } | ||
1638 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1639 | "Client requested creation of lazy copy\n"); | ||
1640 | cr = GNUNET_new (struct LazyCopyRequest); | ||
1641 | cr->cookie = ++lazy_copy_cookie; | ||
1642 | cr->source_set = set; | ||
1643 | GNUNET_CONTAINER_DLL_insert (lazy_copy_head, lazy_copy_tail, cr); | ||
1644 | ev = GNUNET_MQ_msg (resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); | ||
1645 | resp_msg->cookie = cr->cookie; | ||
1646 | GNUNET_MQ_send (set->cs->mq, ev); | ||
1647 | GNUNET_SERVICE_client_continue (cs->client); | ||
1648 | } | ||
1649 | |||
1650 | |||
1651 | /** | ||
1652 | * Handle a request from the client to connect to a copy of a set. | ||
1653 | * | ||
1654 | * @param cls the client | ||
1655 | * @param msg the message | ||
1656 | */ | ||
1657 | static void | ||
1658 | handle_client_copy_lazy_connect ( | ||
1659 | void *cls, | ||
1660 | const struct GNUNET_SET_CopyLazyConnectMessage *msg) | ||
1661 | { | ||
1662 | struct ClientState *cs = cls; | ||
1663 | struct LazyCopyRequest *cr; | ||
1664 | struct Set *set; | ||
1665 | int found; | ||
1666 | |||
1667 | if (NULL != cs->set) | ||
1668 | { | ||
1669 | /* There can only be one set per client */ | ||
1670 | GNUNET_break (0); | ||
1671 | GNUNET_SERVICE_client_drop (cs->client); | ||
1672 | return; | ||
1673 | } | ||
1674 | found = GNUNET_NO; | ||
1675 | for (cr = lazy_copy_head; NULL != cr; cr = cr->next) | ||
1676 | { | ||
1677 | if (cr->cookie == msg->cookie) | ||
1678 | { | ||
1679 | found = GNUNET_YES; | ||
1680 | break; | ||
1681 | } | ||
1682 | } | ||
1683 | if (GNUNET_NO == found) | ||
1684 | { | ||
1685 | /* client asked for copy with cookie we don't know */ | ||
1686 | GNUNET_break (0); | ||
1687 | GNUNET_SERVICE_client_drop (cs->client); | ||
1688 | return; | ||
1689 | } | ||
1690 | GNUNET_CONTAINER_DLL_remove (lazy_copy_head, lazy_copy_tail, cr); | ||
1691 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1692 | "Client %p requested use of lazy copy\n", | ||
1693 | cs); | ||
1694 | set = GNUNET_new (struct Set); | ||
1695 | switch (cr->source_set->operation) | ||
1696 | { | ||
1697 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
1698 | set->vt = _GSS_intersection_vt (); | ||
1699 | break; | ||
1700 | |||
1701 | case GNUNET_SET_OPERATION_UNION: | ||
1702 | set->vt = _GSS_union_vt (); | ||
1703 | break; | ||
1704 | |||
1705 | default: | ||
1706 | GNUNET_assert (0); | ||
1707 | return; | ||
1708 | } | ||
1709 | |||
1710 | if (NULL == set->vt->copy_state) | ||
1711 | { | ||
1712 | /* Lazy copy not supported for this set operation */ | ||
1713 | GNUNET_break (0); | ||
1714 | GNUNET_free (set); | ||
1715 | GNUNET_free (cr); | ||
1716 | GNUNET_SERVICE_client_drop (cs->client); | ||
1717 | return; | ||
1718 | } | ||
1719 | |||
1720 | set->operation = cr->source_set->operation; | ||
1721 | set->state = set->vt->copy_state (cr->source_set->state); | ||
1722 | set->content = cr->source_set->content; | ||
1723 | set->content->refcount++; | ||
1724 | |||
1725 | set->current_generation = cr->source_set->current_generation; | ||
1726 | set->excluded_generations_size = cr->source_set->excluded_generations_size; | ||
1727 | set->excluded_generations = | ||
1728 | GNUNET_memdup (cr->source_set->excluded_generations, | ||
1729 | set->excluded_generations_size | ||
1730 | * sizeof(struct GenerationRange)); | ||
1731 | |||
1732 | /* Advance the generation of the new set, so that mutations to the | ||
1733 | of the cloned set and the source set are independent. */ | ||
1734 | advance_generation (set); | ||
1735 | set->cs = cs; | ||
1736 | cs->set = set; | ||
1737 | GNUNET_free (cr); | ||
1738 | GNUNET_SERVICE_client_continue (cs->client); | ||
1739 | } | ||
1740 | |||
1741 | |||
1742 | /** | ||
1743 | * Handle a request from the client to cancel a running set operation. | ||
1744 | * | ||
1745 | * @param cls the client | ||
1746 | * @param msg the message | ||
1747 | */ | ||
1748 | static void | ||
1749 | handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) | ||
1750 | { | ||
1751 | struct ClientState *cs = cls; | ||
1752 | struct Set *set; | ||
1753 | struct Operation *op; | ||
1754 | int found; | ||
1755 | |||
1756 | if (NULL == (set = cs->set)) | ||
1757 | { | ||
1758 | /* client without a set requested an operation */ | ||
1759 | GNUNET_break (0); | ||
1760 | GNUNET_SERVICE_client_drop (cs->client); | ||
1761 | return; | ||
1762 | } | ||
1763 | found = GNUNET_NO; | ||
1764 | for (op = set->ops_head; NULL != op; op = op->next) | ||
1765 | { | ||
1766 | if (op->client_request_id == ntohl (msg->request_id)) | ||
1767 | { | ||
1768 | found = GNUNET_YES; | ||
1769 | break; | ||
1770 | } | ||
1771 | } | ||
1772 | if (GNUNET_NO == found) | ||
1773 | { | ||
1774 | /* It may happen that the operation was already destroyed due to | ||
1775 | * the other peer disconnecting. The client may not know about this | ||
1776 | * yet and try to cancel the (just barely non-existent) operation. | ||
1777 | * So this is not a hard error. | ||
1778 | */GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1779 | "Client canceled non-existent op %u\n", | ||
1780 | (uint32_t) ntohl (msg->request_id)); | ||
1781 | } | ||
1782 | else | ||
1783 | { | ||
1784 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1785 | "Client requested cancel for op %u\n", | ||
1786 | (uint32_t) ntohl (msg->request_id)); | ||
1787 | _GSS_operation_destroy (op, GNUNET_YES); | ||
1788 | } | ||
1789 | GNUNET_SERVICE_client_continue (cs->client); | ||
1790 | } | ||
1791 | |||
1792 | |||
1793 | /** | ||
1794 | * Handle a request from the client to accept a set operation that | ||
1795 | * came from a remote peer. We forward the accept to the associated | ||
1796 | * operation for handling | ||
1797 | * | ||
1798 | * @param cls the client | ||
1799 | * @param msg the message | ||
1800 | */ | ||
1801 | static void | ||
1802 | handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) | ||
1803 | { | ||
1804 | struct ClientState *cs = cls; | ||
1805 | struct Set *set; | ||
1806 | struct Operation *op; | ||
1807 | struct GNUNET_SET_ResultMessage *result_message; | ||
1808 | struct GNUNET_MQ_Envelope *ev; | ||
1809 | struct Listener *listener; | ||
1810 | |||
1811 | if (NULL == (set = cs->set)) | ||
1812 | { | ||
1813 | /* client without a set requested to accept */ | ||
1814 | GNUNET_break (0); | ||
1815 | GNUNET_SERVICE_client_drop (cs->client); | ||
1816 | return; | ||
1817 | } | ||
1818 | op = get_incoming (ntohl (msg->accept_reject_id)); | ||
1819 | if (NULL == op) | ||
1820 | { | ||
1821 | /* It is not an error if the set op does not exist -- it may | ||
1822 | * have been destroyed when the partner peer disconnected. */ | ||
1823 | GNUNET_log ( | ||
1824 | GNUNET_ERROR_TYPE_INFO, | ||
1825 | "Client %p accepted request %u of listener %p that is no longer active\n", | ||
1826 | cs, | ||
1827 | ntohl (msg->accept_reject_id), | ||
1828 | cs->listener); | ||
1829 | ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
1830 | result_message->request_id = msg->request_id; | ||
1831 | result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); | ||
1832 | GNUNET_MQ_send (set->cs->mq, ev); | ||
1833 | GNUNET_SERVICE_client_continue (cs->client); | ||
1834 | return; | ||
1835 | } | ||
1836 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1837 | "Client accepting request %u\n", | ||
1838 | (uint32_t) ntohl (msg->accept_reject_id)); | ||
1839 | listener = op->listener; | ||
1840 | op->listener = NULL; | ||
1841 | GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op); | ||
1842 | op->set = set; | ||
1843 | GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); | ||
1844 | op->client_request_id = ntohl (msg->request_id); | ||
1845 | op->result_mode = ntohl (msg->result_mode); | ||
1846 | op->byzantine = msg->byzantine; | ||
1847 | op->byzantine_lower_bound = msg->byzantine_lower_bound; | ||
1848 | op->force_full = msg->force_full; | ||
1849 | op->force_delta = msg->force_delta; | ||
1850 | |||
1851 | /* Advance generation values, so that future mutations do not | ||
1852 | interfer with the running operation. */ | ||
1853 | op->generation_created = set->current_generation; | ||
1854 | advance_generation (set); | ||
1855 | GNUNET_assert (NULL == op->state); | ||
1856 | op->state = set->vt->accept (op); | ||
1857 | if (NULL == op->state) | ||
1858 | { | ||
1859 | GNUNET_break (0); | ||
1860 | GNUNET_SERVICE_client_drop (cs->client); | ||
1861 | return; | ||
1862 | } | ||
1863 | /* Now allow CADET to continue, as we did not do this in | ||
1864 | #handle_incoming_msg (as we wanted to first see if the | ||
1865 | local client would accept the request). */ | ||
1866 | GNUNET_CADET_receive_done (op->channel); | ||
1867 | GNUNET_SERVICE_client_continue (cs->client); | ||
1868 | } | ||
1869 | |||
1870 | |||
1871 | /** | ||
1872 | * Called to clean up, after a shutdown has been requested. | ||
1873 | * | ||
1874 | * @param cls closure, NULL | ||
1875 | */ | ||
1876 | static void | ||
1877 | shutdown_task (void *cls) | ||
1878 | { | ||
1879 | /* Delay actual shutdown to allow service to disconnect clients */ | ||
1880 | in_shutdown = GNUNET_YES; | ||
1881 | if (0 == num_clients) | ||
1882 | { | ||
1883 | if (NULL != cadet) | ||
1884 | { | ||
1885 | GNUNET_CADET_disconnect (cadet); | ||
1886 | cadet = NULL; | ||
1887 | } | ||
1888 | } | ||
1889 | GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); | ||
1890 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); | ||
1891 | } | ||
1892 | |||
1893 | |||
1894 | /** | ||
1895 | * Function called by the service's run | ||
1896 | * method to run service-specific setup code. | ||
1897 | * | ||
1898 | * @param cls closure | ||
1899 | * @param cfg configuration to use | ||
1900 | * @param service the initialized service | ||
1901 | */ | ||
1902 | static void | ||
1903 | run (void *cls, | ||
1904 | const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
1905 | struct GNUNET_SERVICE_Handle *service) | ||
1906 | { | ||
1907 | /* FIXME: need to modify SERVICE (!) API to allow | ||
1908 | us to run a shutdown task *after* clients were | ||
1909 | forcefully disconnected! */ | ||
1910 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); | ||
1911 | _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); | ||
1912 | cadet = GNUNET_CADET_connect (cfg); | ||
1913 | if (NULL == cadet) | ||
1914 | { | ||
1915 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1916 | _ ("Could not connect to CADET service\n")); | ||
1917 | GNUNET_SCHEDULER_shutdown (); | ||
1918 | return; | ||
1919 | } | ||
1920 | } | ||
1921 | |||
1922 | |||
1923 | /** | ||
1924 | * Define "main" method using service macro. | ||
1925 | */ | ||
1926 | GNUNET_SERVICE_MAIN ( | ||
1927 | "set", | ||
1928 | GNUNET_SERVICE_OPTION_NONE, | ||
1929 | &run, | ||
1930 | &client_connect_cb, | ||
1931 | &client_disconnect_cb, | ||
1932 | NULL, | ||
1933 | GNUNET_MQ_hd_fixed_size (client_accept, | ||
1934 | GNUNET_MESSAGE_TYPE_SET_ACCEPT, | ||
1935 | struct GNUNET_SET_AcceptMessage, | ||
1936 | NULL), | ||
1937 | GNUNET_MQ_hd_fixed_size (client_iter_ack, | ||
1938 | GNUNET_MESSAGE_TYPE_SET_ITER_ACK, | ||
1939 | struct GNUNET_SET_IterAckMessage, | ||
1940 | NULL), | ||
1941 | GNUNET_MQ_hd_var_size (client_mutation, | ||
1942 | GNUNET_MESSAGE_TYPE_SET_ADD, | ||
1943 | struct GNUNET_SET_ElementMessage, | ||
1944 | NULL), | ||
1945 | GNUNET_MQ_hd_fixed_size (client_create_set, | ||
1946 | GNUNET_MESSAGE_TYPE_SET_CREATE, | ||
1947 | struct GNUNET_SET_CreateMessage, | ||
1948 | NULL), | ||
1949 | GNUNET_MQ_hd_fixed_size (client_iterate, | ||
1950 | GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST, | ||
1951 | struct GNUNET_MessageHeader, | ||
1952 | NULL), | ||
1953 | GNUNET_MQ_hd_var_size (client_evaluate, | ||
1954 | GNUNET_MESSAGE_TYPE_SET_EVALUATE, | ||
1955 | struct GNUNET_SET_EvaluateMessage, | ||
1956 | NULL), | ||
1957 | GNUNET_MQ_hd_fixed_size (client_listen, | ||
1958 | GNUNET_MESSAGE_TYPE_SET_LISTEN, | ||
1959 | struct GNUNET_SET_ListenMessage, | ||
1960 | NULL), | ||
1961 | GNUNET_MQ_hd_fixed_size (client_reject, | ||
1962 | GNUNET_MESSAGE_TYPE_SET_REJECT, | ||
1963 | struct GNUNET_SET_RejectMessage, | ||
1964 | NULL), | ||
1965 | GNUNET_MQ_hd_var_size (client_mutation, | ||
1966 | GNUNET_MESSAGE_TYPE_SET_REMOVE, | ||
1967 | struct GNUNET_SET_ElementMessage, | ||
1968 | NULL), | ||
1969 | GNUNET_MQ_hd_fixed_size (client_cancel, | ||
1970 | GNUNET_MESSAGE_TYPE_SET_CANCEL, | ||
1971 | struct GNUNET_SET_CancelMessage, | ||
1972 | NULL), | ||
1973 | GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare, | ||
1974 | GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE, | ||
1975 | struct GNUNET_MessageHeader, | ||
1976 | NULL), | ||
1977 | GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect, | ||
1978 | GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT, | ||
1979 | struct GNUNET_SET_CopyLazyConnectMessage, | ||
1980 | NULL), | ||
1981 | GNUNET_MQ_handler_end ()); | ||
1982 | |||
1983 | |||
1984 | /* end of gnunet-service-set.c */ | ||