aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:22 +0100
committerChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:34 +0100
commitbf6f552fdefe75425635f66343f98995e2f602f6 (patch)
treeadd6ea146823579a137763b78e89839ff97b3902 /src/set/gnunet-service-set.c
parenta9a5994e518ded483edb87513d5197b6539ed4ff (diff)
downloadgnunet-bf6f552fdefe75425635f66343f98995e2f602f6.tar.gz
gnunet-bf6f552fdefe75425635f66343f98995e2f602f6.zip
major clean up and bugfixes of SET
Diffstat (limited to 'src/set/gnunet-service-set.c')
-rw-r--r--src/set/gnunet-service-set.c1319
1 files changed, 598 insertions, 721 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index b80c1f2fd..752253411 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013, 2014, 2017 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -35,6 +35,35 @@
35 */ 35 */
36#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES 36#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 37
38
39/**
40 * Lazy copy requests made by a client.
41 */
42struct LazyCopyRequest
43{
44 /**
45 * Kept in a DLL.
46 */
47 struct LazyCopyRequest *prev;
48
49 /**
50 * Kept in a DLL.
51 */
52 struct LazyCopyRequest *next;
53
54 /**
55 * Which set are we supposed to copy?
56 */
57 struct Set *source_set;
58
59 /**
60 * Cookie identifying the request.
61 */
62 uint32_t cookie;
63
64};
65
66
38/** 67/**
39 * A listener is inhabited by a client, and waits for evaluation 68 * A listener is inhabited by a client, and waits for evaluation
40 * requests from remote peers. 69 * requests from remote peers.
@@ -52,21 +81,24 @@ struct Listener
52 struct Listener *prev; 81 struct Listener *prev;
53 82
54 /** 83 /**
55 * Client that owns the listener. 84 * Head of DLL of operations this listener is responsible for.
56 * Only one client may own a listener. 85 * Once the client has accepted/declined the operation, the
86 * operation is moved to the respective set's operation DLLS.
57 */ 87 */
58 struct GNUNET_SERVICE_Client *client; 88 struct Operation *op_head;
59 89
60 /** 90 /**
61 * Message queue for the client 91 * Tail of DLL of operations this listener is responsible for.
92 * Once the client has accepted/declined the operation, the
93 * operation is moved to the respective set's operation DLLS.
62 */ 94 */
63 struct GNUNET_MQ_Handle *client_mq; 95 struct Operation *op_tail;
64 96
65 /** 97 /**
66 * Application ID for the operation, used to distinguish 98 * Client that owns the listener.
67 * multiple operations of the same type with the same peer. 99 * Only one client may own a listener.
68 */ 100 */
69 struct GNUNET_HashCode app_id; 101 struct ClientState *cs;
70 102
71 /** 103 /**
72 * The port we are listening on with CADET. 104 * The port we are listening on with CADET.
@@ -74,27 +106,18 @@ struct Listener
74 struct GNUNET_CADET_Port *open_port; 106 struct GNUNET_CADET_Port *open_port;
75 107
76 /** 108 /**
109 * Application ID for the operation, used to distinguish
110 * multiple operations of the same type with the same peer.
111 */
112 struct GNUNET_HashCode app_id;
113
114 /**
77 * The type of the operation. 115 * The type of the operation.
78 */ 116 */
79 enum GNUNET_SET_OperationType operation; 117 enum GNUNET_SET_OperationType operation;
80}; 118};
81 119
82 120
83struct LazyCopyRequest
84{
85 struct Set *source_set;
86 uint32_t cookie;
87
88 struct LazyCopyRequest *prev;
89 struct LazyCopyRequest *next;
90};
91
92
93/**
94 * Configuration of our local peer.
95 */
96static const struct GNUNET_CONFIGURATION_Handle *configuration;
97
98/** 121/**
99 * Handle to the cadet service, used to listen for and connect to 122 * Handle to the cadet service, used to listen for and connect to
100 * remote peers. 123 * remote peers.
@@ -102,94 +125,48 @@ static const struct GNUNET_CONFIGURATION_Handle *configuration;
102static struct GNUNET_CADET_Handle *cadet; 125static struct GNUNET_CADET_Handle *cadet;
103 126
104/** 127/**
105 * Sets are held in a doubly linked list. 128 * DLL of lazy copy requests by this client.
106 */ 129 */
107static struct Set *sets_head; 130static struct LazyCopyRequest *lazy_copy_head;
108 131
109/** 132/**
110 * Sets are held in a doubly linked list. 133 * DLL of lazy copy requests by this client.
111 */ 134 */
112static struct Set *sets_tail; 135static struct LazyCopyRequest *lazy_copy_tail;
113 136
114/** 137/**
115 * Listeners are held in a doubly linked list. 138 * Generator for unique cookie we set per lazy copy request.
116 */ 139 */
117static struct Listener *listeners_head; 140static uint32_t lazy_copy_cookie;
118 141
119/** 142/**
120 * Listeners are held in a doubly linked list. 143 * Statistics handle.
121 */ 144 */
122static struct Listener *listeners_tail; 145struct GNUNET_STATISTICS_Handle *_GSS_statistics;
123 146
124/** 147/**
125 * Incoming sockets from remote peers are held in a doubly linked 148 * Listeners are held in a doubly linked list.
126 * list.
127 */ 149 */
128static struct Operation *incoming_head; 150static struct Listener *listener_head;
129 151
130/** 152/**
131 * Incoming sockets from remote peers are held in a doubly linked 153 * Listeners are held in a doubly linked list.
132 * list.
133 */ 154 */
134static struct Operation *incoming_tail; 155static struct Listener *listener_tail;
135
136static struct LazyCopyRequest *lazy_copy_head;
137static struct LazyCopyRequest *lazy_copy_tail;
138
139static uint32_t lazy_copy_cookie = 1;
140 156
141/** 157/**
142 * Counter for allocating unique IDs for clients, used to identify 158 * Counter for allocating unique IDs for clients, used to identify
143 * incoming operation requests from remote peers, that the client can 159 * incoming operation requests from remote peers, that the client can
144 * choose to accept or refuse. 160 * choose to accept or refuse. 0 must not be used (reserved for
145 */ 161 * uninitialized).
146static uint32_t suggest_id = 1;
147
148/**
149 * Statistics handle.
150 */ 162 */
151struct GNUNET_STATISTICS_Handle *_GSS_statistics; 163static uint32_t suggest_id;
152
153
154/**
155 * Get set that is owned by the given client, if any.
156 *
157 * @param client client to look for
158 * @return set that the client owns, NULL if the client
159 * does not own a set
160 */
161static struct Set *
162set_get (struct GNUNET_SERVICE_Client *client)
163{
164 for (struct Set *set = sets_head; NULL != set; set = set->next)
165 if (set->client == client)
166 return set;
167 return NULL;
168}
169
170
171/**
172 * Get the listener associated with the given client, if any.
173 *
174 * @param client the client
175 * @return listener associated with the client, NULL
176 * if there isn't any
177 */
178static struct Listener *
179listener_get (struct GNUNET_SERVICE_Client *client)
180{
181 for (struct Listener *listener = listeners_head;
182 NULL != listener;
183 listener = listener->next)
184 if (listener->client == client)
185 return listener;
186 return NULL;
187}
188 164
189 165
190/** 166/**
191 * Get the incoming socket associated with the given id. 167 * Get the incoming socket associated with the given id.
192 * 168 *
169 * @param listener the listener to look in
193 * @param id id to look for 170 * @param id id to look for
194 * @return the incoming socket associated with the id, 171 * @return the incoming socket associated with the id,
195 * or NULL if there is none 172 * or NULL if there is none
@@ -197,44 +174,49 @@ listener_get (struct GNUNET_SERVICE_Client *client)
197static struct Operation * 174static struct Operation *
198get_incoming (uint32_t id) 175get_incoming (uint32_t id)
199{ 176{
200 for (struct Operation *op = incoming_head; NULL != op; op = op->next) 177 for (struct Listener *listener = listener_head;
201 if (op->suggest_id == id) 178 NULL != listener;
202 { 179 listener = listener->next)
203 GNUNET_assert (GNUNET_YES == op->is_incoming); 180 {
204 return op; 181 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
205 } 182 if (op->suggest_id == id)
183 return op;
184 }
206 return NULL; 185 return NULL;
207} 186}
208 187
209 188
210/** 189/**
211 * Destroy a listener, free all resources associated with it. 190 * Destroy an incoming request from a remote peer
212 * 191 *
213 * @param listener listener to destroy 192 * @param op remote request to destroy
214 */ 193 */
215static void 194static void
216listener_destroy (struct Listener *listener) 195incoming_destroy (struct Operation *op)
217{ 196{
218 /* If the client is not dead yet, destroy it. 197 struct Listener *listener;
219 * The client's destroy callback will destroy the listener again. */ 198 struct GNUNET_CADET_Channel *channel;
220 if (NULL != listener->client)
221 {
222 struct GNUNET_SERVICE_Client *client = listener->client;
223
224 GNUNET_MQ_destroy (listener->client_mq);
225 listener->client_mq = NULL;
226 199
227 listener->client = NULL; 200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 201 "Destroying incoming operation %p\n",
229 "Disconnecting listener client\n"); 202 op);
230 GNUNET_SERVICE_client_drop (client); 203 if (NULL != (listener = op->listener))
231 return; 204 {
205 GNUNET_CONTAINER_DLL_remove (listener->op_head,
206 listener->op_tail,
207 op);
208 op->listener = NULL;
209 }
210 if (NULL != op->timeout_task)
211 {
212 GNUNET_SCHEDULER_cancel (op->timeout_task);
213 op->timeout_task = NULL;
214 }
215 if (NULL != (channel = op->channel))
216 {
217 op->channel = NULL;
218 GNUNET_CADET_channel_destroy (channel);
232 } 219 }
233 GNUNET_CADET_close_port (listener->open_port);
234 GNUNET_CONTAINER_DLL_remove (listeners_head,
235 listeners_tail,
236 listener);
237 GNUNET_free (listener);
238} 220}
239 221
240 222
@@ -304,12 +286,11 @@ garbage_collect_cb (void *cls,
304static void 286static void
305collect_generation_garbage (struct Set *set) 287collect_generation_garbage (struct Set *set)
306{ 288{
307 struct Operation *op;
308 struct GarbageContext gc; 289 struct GarbageContext gc;
309 290
310 gc.min_op_generation = UINT_MAX; 291 gc.min_op_generation = UINT_MAX;
311 gc.max_op_generation = 0; 292 gc.max_op_generation = 0;
312 for (op = set->ops_head; NULL != op; op = op->next) 293 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
313 { 294 {
314 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation, 295 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
315 op->generation_created); 296 op->generation_created);
@@ -323,23 +304,36 @@ collect_generation_garbage (struct Set *set)
323} 304}
324 305
325 306
307/**
308 * Is @a generation in the range of exclusions?
309 *
310 * @param generation generation to query
311 * @param excluded array of generations where the element is excluded
312 * @param excluded_size length of the @a excluded array
313 * @return #GNUNET_YES if @a generation is in any of the ranges
314 */
326static int 315static int
327is_excluded_generation (unsigned int generation, 316is_excluded_generation (unsigned int generation,
328 struct GenerationRange *excluded, 317 struct GenerationRange *excluded,
329 unsigned int excluded_size) 318 unsigned int excluded_size)
330{ 319{
331 unsigned int i; 320 for (unsigned int i = 0; i < excluded_size; i++)
332 321 if ( (generation >= excluded[i].start) &&
333 for (i = 0; i < excluded_size; i++) 322 (generation < excluded[i].end) )
334 {
335 if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
336 return GNUNET_YES; 323 return GNUNET_YES;
337 }
338
339 return GNUNET_NO; 324 return GNUNET_NO;
340} 325}
341 326
342 327
328/**
329 * Is element @a ee part of the set during @a query_generation?
330 *
331 * @param ee element to test
332 * @param query_generation generation to query
333 * @param excluded array of generations where the element is excluded
334 * @param excluded_size length of the @a excluded array
335 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
336 */
343static int 337static int
344is_element_of_generation (struct ElementEntry *ee, 338is_element_of_generation (struct ElementEntry *ee,
345 unsigned int query_generation, 339 unsigned int query_generation,
@@ -348,11 +342,12 @@ is_element_of_generation (struct ElementEntry *ee,
348{ 342{
349 struct MutationEvent *mut; 343 struct MutationEvent *mut;
350 int is_present; 344 int is_present;
351 unsigned int i;
352 345
353 GNUNET_assert (NULL != ee->mutations); 346 GNUNET_assert (NULL != ee->mutations);
354 347 if (GNUNET_YES ==
355 if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size)) 348 is_excluded_generation (query_generation,
349 excluded,
350 excluded_size))
356 { 351 {
357 GNUNET_break (0); 352 GNUNET_break (0);
358 return GNUNET_NO; 353 return GNUNET_NO;
@@ -362,7 +357,7 @@ is_element_of_generation (struct ElementEntry *ee,
362 357
363 /* Could be made faster with binary search, but lists 358 /* Could be made faster with binary search, but lists
364 are small, so why bother. */ 359 are small, so why bother. */
365 for (i = 0; i < ee->mutations_size; i++) 360 for (unsigned int i = 0; i < ee->mutations_size; i++)
366 { 361 {
367 mut = &ee->mutations[i]; 362 mut = &ee->mutations[i];
368 363
@@ -374,7 +369,10 @@ is_element_of_generation (struct ElementEntry *ee,
374 continue; 369 continue;
375 } 370 }
376 371
377 if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) 372 if (GNUNET_YES ==
373 is_excluded_generation (mut->generation,
374 excluded,
375 excluded_size))
378 { 376 {
379 /* The generation is excluded (because it belongs to another 377 /* The generation is excluded (because it belongs to another
380 fork via a lazy copy) and thus mutations aren't considered 378 fork via a lazy copy) and thus mutations aren't considered
@@ -383,11 +381,12 @@ is_element_of_generation (struct ElementEntry *ee,
383 } 381 }
384 382
385 /* This would be an inconsistency in how we manage mutations. */ 383 /* This would be an inconsistency in how we manage mutations. */
386 if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) ) 384 if ( (GNUNET_YES == is_present) &&
385 (GNUNET_YES == mut->added) )
387 GNUNET_assert (0); 386 GNUNET_assert (0);
388
389 /* Likewise. */ 387 /* Likewise. */
390 if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) ) 388 if ( (GNUNET_NO == is_present) &&
389 (GNUNET_NO == mut->added) )
391 GNUNET_assert (0); 390 GNUNET_assert (0);
392 391
393 is_present = mut->added; 392 is_present = mut->added;
@@ -397,44 +396,33 @@ is_element_of_generation (struct ElementEntry *ee,
397} 396}
398 397
399 398
400int 399/**
401_GSS_is_element_of_set (struct ElementEntry *ee, 400 * Is element @a ee part of the set used by @a op?
402 struct Set *set) 401 *
403{ 402 * @param ee element to test
404 return is_element_of_generation (ee, 403 * @param op operation the defines the set and its generation
405 set->current_generation, 404 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
406 set->excluded_generations, 405 */
407 set->excluded_generations_size);
408}
409
410
411static int
412is_element_of_iteration (struct ElementEntry *ee,
413 struct Set *set)
414{
415 return is_element_of_generation (ee,
416 set->iter_generation,
417 set->excluded_generations,
418 set->excluded_generations_size);
419}
420
421
422int 406int
423_GSS_is_element_of_operation (struct ElementEntry *ee, 407_GSS_is_element_of_operation (struct ElementEntry *ee,
424 struct Operation *op) 408 struct Operation *op)
425{ 409{
426 return is_element_of_generation (ee, 410 return is_element_of_generation (ee,
427 op->generation_created, 411 op->generation_created,
428 op->spec->set->excluded_generations, 412 op->set->excluded_generations,
429 op->spec->set->excluded_generations_size); 413 op->set->excluded_generations_size);
430} 414}
431 415
432 416
433/** 417/**
434 * Destroy the given operation. Call the implementation-specific 418 * Destroy the given operation. Used for any operation where both
435 * cancel function of the operation. Disconnects from the remote 419 * peers were known and that thus actually had a vt and channel. Must
436 * peer. Does not disconnect the client, as there may be multiple 420 * not be used for operations where 'listener' is still set and we do
437 * operations per set. 421 * not know the other peer.
422 *
423 * Call the implementation-specific cancel function of the operation.
424 * Disconnects from the remote peer. Does not disconnect the client,
425 * as there may be multiple operations per set.
438 * 426 *
439 * @param op operation to destroy 427 * @param op operation to destroy
440 * @param gc #GNUNET_YES to perform garbage collection on the set 428 * @param gc #GNUNET_YES to perform garbage collection on the set
@@ -443,39 +431,39 @@ void
443_GSS_operation_destroy (struct Operation *op, 431_GSS_operation_destroy (struct Operation *op,
444 int gc) 432 int gc)
445{ 433{
446 struct Set *set; 434 struct Set *set = op->set;
447 struct GNUNET_CADET_Channel *channel; 435 struct GNUNET_CADET_Channel *channel;
448 436
449 if (NULL == op->vt) 437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "Destroying operation %p\n",
439 op);
440 GNUNET_assert (NULL == op->listener);
441 if (NULL != op->state)
450 { 442 {
451 /* already in #_GSS_operation_destroy() */ 443 set->vt->cancel (op);
452 return; 444 op->state = NULL;
453 } 445 }
454 GNUNET_assert (GNUNET_NO == op->is_incoming); 446 if (NULL != set)
455 GNUNET_assert (NULL != op->spec);
456 set = op->spec->set;
457 GNUNET_CONTAINER_DLL_remove (set->ops_head,
458 set->ops_tail,
459 op);
460 op->vt->cancel (op);
461 op->vt = NULL;
462 if (NULL != op->spec)
463 { 447 {
464 if (NULL != op->spec->context_msg) 448 GNUNET_CONTAINER_DLL_remove (set->ops_head,
465 { 449 set->ops_tail,
466 GNUNET_free (op->spec->context_msg); 450 op);
467 op->spec->context_msg = NULL; 451 op->set = NULL;
468 } 452 }
469 GNUNET_free (op->spec); 453 if (NULL != op->context_msg)
470 op->spec = NULL; 454 {
455 GNUNET_free (op->context_msg);
456 op->context_msg = NULL;
471 } 457 }
472 if (NULL != (channel = op->channel)) 458 if (NULL != (channel = op->channel))
473 { 459 {
460 /* This will free op; called conditionally as this helper function
461 is also called from within the channel disconnect handler. */
474 op->channel = NULL; 462 op->channel = NULL;
475 GNUNET_CADET_channel_destroy (channel); 463 GNUNET_CADET_channel_destroy (channel);
476 } 464 }
477 465 if ( (NULL != set) &&
478 if (GNUNET_YES == gc) 466 (GNUNET_YES == gc) )
479 collect_generation_garbage (set); 467 collect_generation_garbage (set);
480 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, 468 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
481 * there was a channel end handler that will free 'op' on the call stack. */ 469 * there was a channel end handler that will free 'op' on the call stack. */
@@ -483,6 +471,28 @@ _GSS_operation_destroy (struct Operation *op,
483 471
484 472
485/** 473/**
474 * Callback called when a client connects to the service.
475 *
476 * @param cls closure for the service
477 * @param c the new client that connected to the service
478 * @param mq the message queue used to send messages to the client
479 * @return @a `struct ClientState`
480 */
481static void *
482client_connect_cb (void *cls,
483 struct GNUNET_SERVICE_Client *c,
484 struct GNUNET_MQ_Handle *mq)
485{
486 struct ClientState *cs;
487
488 cs = GNUNET_new (struct ClientState);
489 cs->client = c;
490 cs->mq = mq;
491 return cs;
492}
493
494
495/**
486 * Iterator over hash map entries to free element entries. 496 * Iterator over hash map entries to free element entries.
487 * 497 *
488 * @param cls closure 498 * @param cls closure
@@ -498,66 +508,76 @@ destroy_elements_iterator (void *cls,
498 struct ElementEntry *ee = value; 508 struct ElementEntry *ee = value;
499 509
500 GNUNET_free_non_null (ee->mutations); 510 GNUNET_free_non_null (ee->mutations);
501
502 GNUNET_free (ee); 511 GNUNET_free (ee);
503 return GNUNET_YES; 512 return GNUNET_YES;
504} 513}
505 514
506 515
507/** 516/**
508 * Destroy a set, and free all resources and operations associated with it. 517 * Clean up after a client has disconnected
509 * 518 *
510 * @param set the set to destroy 519 * @param cls closure, unused
520 * @param client the client to clean up after
521 * @param internal_cls the `struct ClientState`
511 */ 522 */
512static void 523static void
513set_destroy (struct Set *set) 524client_disconnect_cb (void *cls,
525 struct GNUNET_SERVICE_Client *client,
526 void *internal_cls)
514{ 527{
515 if (NULL != set->client) 528 struct ClientState *cs = internal_cls;
516 { 529 struct Operation *op;
517 /* If the client is not dead yet, destroy it. The client's destroy 530 struct Listener *listener;
518 * callback will call `set_destroy()` again in this case. We do 531 struct Set *set;
519 * this so that the channel end handler still has a valid set handle 532
520 * to destroy. */ 533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 struct GNUNET_SERVICE_Client *client = set->client; 534 "Client disconnected, cleaning up\n");
522 535 if (NULL != (set = cs->set))
523 set->client = NULL;
524 GNUNET_SERVICE_client_drop (client);
525 return;
526 }
527 GNUNET_assert (NULL != set->state);
528 while (NULL != set->ops_head)
529 _GSS_operation_destroy (set->ops_head, GNUNET_NO);
530 set->vt->destroy_set (set->state);
531 set->state = NULL;
532 if (NULL != set->iter)
533 {
534 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
535 set->iter = NULL;
536 set->iteration_id++;
537 }
538 { 536 {
539 struct SetContent *content; 537 struct SetContent *content = set->content;
540 struct PendingMutation *pm; 538 struct PendingMutation *pm;
541 struct PendingMutation *pm_current; 539 struct PendingMutation *pm_current;
540 struct LazyCopyRequest *lcr;
542 541
543 content = set->content; 542 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543 "Destroying client's set\n");
544 /* Destroy pending set operations */
545 while (NULL != set->ops_head)
546 _GSS_operation_destroy (set->ops_head,
547 GNUNET_NO);
548
549 /* Destroy operation-specific state */
550 GNUNET_assert (NULL != set->state);
551 set->vt->destroy_set (set->state);
552 set->state = NULL;
553
554 /* Clean up ongoing iterations */
555 if (NULL != set->iter)
556 {
557 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
558 set->iter = NULL;
559 set->iteration_id++;
560 }
544 561
545 // discard any pending mutations that reference this set 562 /* discard any pending mutations that reference this set */
546 pm = content->pending_mutations_head; 563 pm = content->pending_mutations_head;
547 while (NULL != pm) 564 while (NULL != pm)
548 { 565 {
549 pm_current = pm; 566 pm_current = pm;
550 pm = pm->next; 567 pm = pm->next;
551 if (pm_current-> set == set) 568 if (pm_current->set == set)
569 {
552 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, 570 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
553 content->pending_mutations_tail, 571 content->pending_mutations_tail,
554 pm_current); 572 pm_current);
555 573 GNUNET_free (pm_current);
574 }
556 } 575 }
557 576
577 /* free set content (or at least decrement RC) */
558 set->content = NULL; 578 set->content = NULL;
559 GNUNET_assert (0 != content->refcount); 579 GNUNET_assert (0 != content->refcount);
560 content->refcount -= 1; 580 content->refcount--;
561 if (0 == content->refcount) 581 if (0 == content->refcount)
562 { 582 {
563 GNUNET_assert (NULL != content->elements); 583 GNUNET_assert (NULL != content->elements);
@@ -568,166 +588,41 @@ set_destroy (struct Set *set)
568 content->elements = NULL; 588 content->elements = NULL;
569 GNUNET_free (content); 589 GNUNET_free (content);
570 } 590 }
571 } 591 GNUNET_free_non_null (set->excluded_generations);
572 GNUNET_free_non_null (set->excluded_generations); 592 set->excluded_generations = NULL;
573 set->excluded_generations = NULL;
574 GNUNET_CONTAINER_DLL_remove (sets_head,
575 sets_tail,
576 set);
577 593
578 // remove set from pending copy requests 594 /* remove set from pending copy requests */
579 {
580 struct LazyCopyRequest *lcr;
581 lcr = lazy_copy_head; 595 lcr = lazy_copy_head;
582 while (NULL != lcr) 596 while (NULL != lcr)
583 { 597 {
584 struct LazyCopyRequest *lcr_current; 598 struct LazyCopyRequest *lcr_current = lcr;
585 lcr_current = lcr; 599
586 lcr = lcr->next; 600 lcr = lcr->next;
587 if (lcr_current->source_set == set) 601 if (lcr_current->source_set == set)
602 {
588 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, 603 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
589 lazy_copy_tail, 604 lazy_copy_tail,
590 lcr_current); 605 lcr_current);
606 GNUNET_free (lcr_current);
607 }
591 } 608 }
609 GNUNET_free (set);
592 } 610 }
593 611
594 GNUNET_free (set); 612 if (NULL != (listener = cs->listener))
595}
596
597
598/**
599 * Callback called when a client connects to the service.
600 *
601 * @param cls closure for the service
602 * @param c the new client that connected to the service
603 * @param mq the message queue used to send messages to the client
604 * @return @a c
605 */
606static void *
607client_connect_cb (void *cls,
608 struct GNUNET_SERVICE_Client *c,
609 struct GNUNET_MQ_Handle *mq)
610{
611 return c;
612}
613
614
615/**
616 * Destroy an incoming request from a remote peer
617 *
618 * @param incoming remote request to destroy
619 */
620static void
621incoming_destroy (struct Operation *incoming)
622{
623 struct GNUNET_CADET_Channel *channel;
624
625 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
626 GNUNET_CONTAINER_DLL_remove (incoming_head,
627 incoming_tail,
628 incoming);
629 if (NULL != incoming->timeout_task)
630 {
631 GNUNET_SCHEDULER_cancel (incoming->timeout_task);
632 incoming->timeout_task = NULL;
633 }
634 /* make sure that the tunnel end handler will not destroy us again */
635 incoming->vt = NULL;
636 if (NULL != incoming->spec)
637 {
638 GNUNET_free (incoming->spec);
639 incoming->spec = NULL;
640 }
641 if (NULL != (channel = incoming->channel))
642 {
643 incoming->channel = NULL;
644 GNUNET_CADET_channel_destroy (channel);
645 }
646}
647
648
649/**
650 * Clean up after a client has disconnected
651 *
652 * @param cls closure, unused
653 * @param client the client to clean up after
654 * @param internal_cls our client-specific internal data structure
655 */
656static void
657client_disconnect_cb (void *cls,
658 struct GNUNET_SERVICE_Client *client,
659 void *internal_cls)
660{
661 struct Set *set;
662
663 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
664 "client disconnected, cleaning up\n");
665 set = set_get (client);
666 if (NULL != set)
667 { 613 {
668 set->client = NULL;
669 set_destroy (set);
670 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671 "Client's set destroyed\n"); 615 "Destroying client's listener\n");
672 } 616 GNUNET_CADET_close_port (listener->open_port);
673 struct Listener *listener = listener_get (client); 617 listener->open_port = NULL;
674 if (NULL != listener) 618 while (NULL != (op = listener->op_head))
675 { 619 incoming_destroy (op);
676 /* destroy all incoming operations whose client just 620 GNUNET_CONTAINER_DLL_remove (listener_head,
677 * got destroyed */ 621 listener_tail,
678 //struct Operation *op = incoming_head; 622 listener);
679 /* 623 GNUNET_free (listener);
680 while (NULL != op)
681 {
682 struct Operation *curr = op;
683 op = op->next;
684 if ( (GNUNET_YES == curr->is_incoming) &&
685 (curr->listener == listener) )
686 incoming_destroy (curr);
687 }
688 */
689 listener->client = NULL;
690 listener_destroy (listener);
691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
692 "Client's listener destroyed\n");
693 } 624 }
694} 625 GNUNET_free (cs);
695
696
697/**
698 * Suggest the given request to the listener. The listening client can
699 * then accept or reject the remote request.
700 *
701 * @param incoming the incoming peer with the request to suggest
702 * @param listener the listener to suggest the request to
703 */
704static void
705incoming_suggest (struct Operation *incoming,
706 struct Listener *listener)
707{
708 struct GNUNET_MQ_Envelope *mqm;
709 struct GNUNET_SET_RequestMessage *cmsg;
710
711 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
712 GNUNET_assert (NULL != incoming->spec);
713 GNUNET_assert (0 == incoming->suggest_id);
714 incoming->suggest_id = suggest_id++;
715 if (0 == suggest_id)
716 suggest_id++;
717 GNUNET_assert (NULL != incoming->timeout_task);
718 GNUNET_SCHEDULER_cancel (incoming->timeout_task);
719 incoming->timeout_task = NULL;
720 mqm = GNUNET_MQ_msg_nested_mh (cmsg,
721 GNUNET_MESSAGE_TYPE_SET_REQUEST,
722 incoming->spec->context_msg);
723 GNUNET_assert (NULL != mqm);
724 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
725 "Suggesting incoming request with accept id %u to listener\n",
726 incoming->suggest_id);
727 cmsg->accept_id = htonl (incoming->suggest_id);
728 cmsg->peer_id = incoming->spec->peer;
729 GNUNET_MQ_send (listener->client_mq,
730 mqm);
731} 626}
732 627
733 628
@@ -744,10 +639,22 @@ check_incoming_msg (void *cls,
744 const struct OperationRequestMessage *msg) 639 const struct OperationRequestMessage *msg)
745{ 640{
746 struct Operation *op = cls; 641 struct Operation *op = cls;
642 struct Listener *listener = op->listener;
747 const struct GNUNET_MessageHeader *nested_context; 643 const struct GNUNET_MessageHeader *nested_context;
748 644
749 /* double operation request */ 645 /* double operation request */
750 if (NULL != op->spec) 646 if (0 != op->suggest_id)
647 {
648 GNUNET_break_op (0);
649 return GNUNET_SYSERR;
650 }
651 /* This should be equivalent to the previous condition, but can't hurt to check twice */
652 if (NULL == op->listener)
653 {
654 GNUNET_break (0);
655 return GNUNET_SYSERR;
656 }
657 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
751 { 658 {
752 GNUNET_break_op (0); 659 GNUNET_break_op (0);
753 return GNUNET_SYSERR; 660 return GNUNET_SYSERR;
@@ -786,61 +693,74 @@ handle_incoming_msg (void *cls,
786{ 693{
787 struct Operation *op = cls; 694 struct Operation *op = cls;
788 struct Listener *listener = op->listener; 695 struct Listener *listener = op->listener;
789 struct OperationSpecification *spec;
790 const struct GNUNET_MessageHeader *nested_context; 696 const struct GNUNET_MessageHeader *nested_context;
697 struct GNUNET_MQ_Envelope *env;
698 struct GNUNET_SET_RequestMessage *cmsg;
791 699
792 GNUNET_assert (GNUNET_YES == op->is_incoming);
793 spec = GNUNET_new (struct OperationSpecification);
794 nested_context = GNUNET_MQ_extract_nested_mh (msg); 700 nested_context = GNUNET_MQ_extract_nested_mh (msg);
795 /* Make a copy of the nested_context (application-specific context 701 /* Make a copy of the nested_context (application-specific context
796 information that is opaque to set) so we can pass it to the 702 information that is opaque to set) so we can pass it to the
797 listener later on */ 703 listener later on */
798 if (NULL != nested_context) 704 if (NULL != nested_context)
799 spec->context_msg = GNUNET_copy_message (nested_context); 705 op->context_msg = GNUNET_copy_message (nested_context);
800 spec->operation = ntohl (msg->operation); 706 op->remote_element_count = ntohl (msg->element_count);
801 spec->app_id = listener->app_id;
802 spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
803 UINT32_MAX);
804 spec->peer = op->peer;
805 spec->remote_element_count = ntohl (msg->element_count);
806 op->spec = spec;
807 listener = op->listener;
808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809 "Received P2P operation request (op %u, port %s) for active listener\n", 708 "Received P2P operation request (op %u, port %s) for active listener\n",
810 (uint32_t) ntohl (msg->operation), 709 (uint32_t) ntohl (msg->operation),
811 GNUNET_h2s (&listener->app_id)); 710 GNUNET_h2s (&op->listener->app_id));
812 incoming_suggest (op, 711 GNUNET_assert (0 == op->suggest_id);
813 listener); 712 if (0 == suggest_id)
713 suggest_id++;
714 op->suggest_id = suggest_id++;
715 GNUNET_assert (NULL != op->timeout_task);
716 GNUNET_SCHEDULER_cancel (op->timeout_task);
717 op->timeout_task = NULL;
718 env = GNUNET_MQ_msg_nested_mh (cmsg,
719 GNUNET_MESSAGE_TYPE_SET_REQUEST,
720 op->context_msg);
721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
723 op->suggest_id,
724 listener,
725 listener->cs);
726 cmsg->accept_id = htonl (op->suggest_id);
727 cmsg->peer_id = op->peer;
728 GNUNET_MQ_send (listener->cs->mq,
729 env);
730 /* NOTE: GNUNET_CADET_receive_done() will be called in
731 #handle_client_accept() */
814} 732}
815 733
816 734
735/**
736 * Add an element to @a set as specified by @a msg
737 *
738 * @param set set to manipulate
739 * @param msg message specifying the change
740 */
817static void 741static void
818execute_add (struct Set *set, 742execute_add (struct Set *set,
819 const struct GNUNET_MessageHeader *m) 743 const struct GNUNET_SET_ElementMessage *msg)
820{ 744{
821 const struct GNUNET_SET_ElementMessage *msg;
822 struct GNUNET_SET_Element el; 745 struct GNUNET_SET_Element el;
823 struct ElementEntry *ee; 746 struct ElementEntry *ee;
824 struct GNUNET_HashCode hash; 747 struct GNUNET_HashCode hash;
825 748
826 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type)); 749 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
827 750 el.size = ntohs (msg->header.size) - sizeof (*msg);
828 msg = (const struct GNUNET_SET_ElementMessage *) m;
829 el.size = ntohs (m->size) - sizeof *msg;
830 el.data = &msg[1]; 751 el.data = &msg[1];
831 el.element_type = ntohs (msg->element_type); 752 el.element_type = ntohs (msg->element_type);
832 GNUNET_SET_element_hash (&el, &hash); 753 GNUNET_SET_element_hash (&el,
833 754 &hash);
834 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, 755 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
835 &hash); 756 &hash);
836
837 if (NULL == ee) 757 if (NULL == ee)
838 { 758 {
839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840 "Client inserts element %s of size %u\n", 760 "Client inserts element %s of size %u\n",
841 GNUNET_h2s (&hash), 761 GNUNET_h2s (&hash),
842 el.size); 762 el.size);
843 ee = GNUNET_malloc (el.size + sizeof *ee); 763 ee = GNUNET_malloc (el.size + sizeof (*ee));
844 ee->element.size = el.size; 764 ee->element.size = el.size;
845 GNUNET_memcpy (&ee[1], 765 GNUNET_memcpy (&ee[1],
846 el.data, 766 el.data,
@@ -857,7 +777,11 @@ execute_add (struct Set *set,
857 ee, 777 ee,
858 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 778 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
859 } 779 }
860 else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) 780 else if (GNUNET_YES ==
781 is_element_of_generation (ee,
782 set->current_generation,
783 set->excluded_generations,
784 set->excluded_generations_size))
861 { 785 {
862 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863 "Client inserted element %s of size %u twice (ignored)\n", 787 "Client inserted element %s of size %u twice (ignored)\n",
@@ -877,24 +801,27 @@ execute_add (struct Set *set,
877 ee->mutations_size, 801 ee->mutations_size,
878 mut); 802 mut);
879 } 803 }
880 804 set->vt->add (set->state,
881 set->vt->add (set->state, ee); 805 ee);
882} 806}
883 807
884 808
809/**
810 * Remove an element from @a set as specified by @a msg
811 *
812 * @param set set to manipulate
813 * @param msg message specifying the change
814 */
885static void 815static void
886execute_remove (struct Set *set, 816execute_remove (struct Set *set,
887 const struct GNUNET_MessageHeader *m) 817 const struct GNUNET_SET_ElementMessage *msg)
888{ 818{
889 const struct GNUNET_SET_ElementMessage *msg;
890 struct GNUNET_SET_Element el; 819 struct GNUNET_SET_Element el;
891 struct ElementEntry *ee; 820 struct ElementEntry *ee;
892 struct GNUNET_HashCode hash; 821 struct GNUNET_HashCode hash;
893 822
894 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type)); 823 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
895 824 el.size = ntohs (msg->header.size) - sizeof (*msg);
896 msg = (const struct GNUNET_SET_ElementMessage *) m;
897 el.size = ntohs (m->size) - sizeof *msg;
898 el.data = &msg[1]; 825 el.data = &msg[1];
899 el.element_type = ntohs (msg->element_type); 826 el.element_type = ntohs (msg->element_type);
900 GNUNET_SET_element_hash (&el, &hash); 827 GNUNET_SET_element_hash (&el, &hash);
@@ -908,7 +835,11 @@ execute_remove (struct Set *set,
908 el.size); 835 el.size);
909 return; 836 return;
910 } 837 }
911 if (GNUNET_NO == _GSS_is_element_of_set (ee, set)) 838 if (GNUNET_NO ==
839 is_element_of_generation (ee,
840 set->current_generation,
841 set->excluded_generations,
842 set->excluded_generations_size))
912 { 843 {
913 /* Client tried to remove element twice */ 844 /* Client tried to remove element twice */
914 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -931,22 +862,28 @@ execute_remove (struct Set *set,
931 ee->mutations_size, 862 ee->mutations_size,
932 mut); 863 mut);
933 } 864 }
934 set->vt->remove (set->state, ee); 865 set->vt->remove (set->state,
866 ee);
935} 867}
936 868
937 869
938 870/**
871 * Perform a mutation on a set as specified by the @a msg
872 *
873 * @param set the set to mutate
874 * @param msg specification of what to change
875 */
939static void 876static void
940execute_mutation (struct Set *set, 877execute_mutation (struct Set *set,
941 const struct GNUNET_MessageHeader *m) 878 const struct GNUNET_SET_ElementMessage *msg)
942{ 879{
943 switch (ntohs (m->type)) 880 switch (ntohs (msg->header.type))
944 { 881 {
945 case GNUNET_MESSAGE_TYPE_SET_ADD: 882 case GNUNET_MESSAGE_TYPE_SET_ADD:
946 execute_add (set, m); 883 execute_add (set, msg);
947 break; 884 break;
948 case GNUNET_MESSAGE_TYPE_SET_REMOVE: 885 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
949 execute_remove (set, m); 886 execute_remove (set, msg);
950 break; 887 break;
951 default: 888 default:
952 GNUNET_break (0); 889 GNUNET_break (0);
@@ -954,6 +891,34 @@ execute_mutation (struct Set *set,
954} 891}
955 892
956 893
894/**
895 * Execute mutations that were delayed on a set because of
896 * pending operations.
897 *
898 * @param set the set to execute mutations on
899 */
900static void
901execute_delayed_mutations (struct Set *set)
902{
903 struct PendingMutation *pm;
904
905 if (0 != set->content->iterator_count)
906 return; /* still cannot do this */
907 while (NULL != (pm = set->content->pending_mutations_head))
908 {
909 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
910 set->content->pending_mutations_tail,
911 pm);
912 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913 "Executing pending mutation on %p.\n",
914 pm->set);
915 execute_mutation (pm->set,
916 pm->msg);
917 GNUNET_free (pm->msg);
918 GNUNET_free (pm);
919 }
920}
921
957 922
958/** 923/**
959 * Send the next element of a set to the set's client. The next element is given by 924 * Send the next element of a set to the set's client. The next element is given by
@@ -977,65 +942,45 @@ send_client_element (struct Set *set)
977 struct GNUNET_SET_IterResponseMessage *msg; 942 struct GNUNET_SET_IterResponseMessage *msg;
978 943
979 GNUNET_assert (NULL != set->iter); 944 GNUNET_assert (NULL != set->iter);
980 945 do {
981again: 946 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
982 947 NULL,
983 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, 948 (const void **) &ee);
984 NULL, 949 if (GNUNET_NO == ret)
985 (const void **) &ee);
986 if (GNUNET_NO == ret)
987 {
988 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989 "Iteration on %p done.\n",
990 (void *) set);
991 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
992 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
993 set->iter = NULL;
994 set->iteration_id++;
995
996 GNUNET_assert (set->content->iterator_count > 0);
997 set->content->iterator_count -= 1;
998
999 if (0 == set->content->iterator_count)
1000 { 950 {
1001 while (NULL != set->content->pending_mutations_head) 951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002 { 952 "Iteration on %p done.\n",
1003 struct PendingMutation *pm; 953 set);
1004 954 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
1005 pm = set->content->pending_mutations_head; 955 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1006 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, 956 set->iter = NULL;
1007 set->content->pending_mutations_tail, 957 set->iteration_id++;
1008 pm); 958 GNUNET_assert (set->content->iterator_count > 0);
1009 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 959 set->content->iterator_count--;
1010 "Executing pending mutation on %p.\n", 960 execute_delayed_mutations (set);
1011 (void *) pm->set); 961 GNUNET_MQ_send (set->cs->mq,
1012 execute_mutation (pm->set, pm->mutation_message); 962 ev);
1013 GNUNET_free (pm->mutation_message); 963 return;
1014 GNUNET_free (pm);
1015 }
1016 } 964 }
1017
1018 }
1019 else
1020 {
1021 GNUNET_assert (NULL != ee); 965 GNUNET_assert (NULL != ee);
1022 966 } while (GNUNET_NO ==
1023 if (GNUNET_NO == is_element_of_iteration (ee, set)) 967 is_element_of_generation (ee,
1024 goto again; 968 set->iter_generation,
1025 969 set->excluded_generations,
1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 970 set->excluded_generations_size));
1027 "Sending iteration element on %p.\n", 971 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028 (void *) set); 972 "Sending iteration element on %p.\n",
1029 ev = GNUNET_MQ_msg_extra (msg, 973 set);
1030 ee->element.size, 974 ev = GNUNET_MQ_msg_extra (msg,
1031 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); 975 ee->element.size,
1032 GNUNET_memcpy (&msg[1], 976 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1033 ee->element.data, 977 GNUNET_memcpy (&msg[1],
1034 ee->element.size); 978 ee->element.data,
1035 msg->element_type = htons (ee->element.element_type); 979 ee->element.size);
1036 msg->iteration_id = htons (set->iteration_id); 980 msg->element_type = htons (ee->element.element_type);
1037 } 981 msg->iteration_id = htons (set->iteration_id);
1038 GNUNET_MQ_send (set->client_mq, ev); 982 GNUNET_MQ_send (set->cs->mq,
983 ev);
1039} 984}
1040 985
1041 986
@@ -1052,22 +997,21 @@ static void
1052handle_client_iterate (void *cls, 997handle_client_iterate (void *cls,
1053 const struct GNUNET_MessageHeader *m) 998 const struct GNUNET_MessageHeader *m)
1054{ 999{
1055 struct GNUNET_SERVICE_Client *client = cls; 1000 struct ClientState *cs = cls;
1056 struct Set *set; 1001 struct Set *set;
1057 1002
1058 set = set_get (client); 1003 if (NULL == (set = cs->set))
1059 if (NULL == set)
1060 { 1004 {
1061 /* attempt to iterate over a non existing set */ 1005 /* attempt to iterate over a non existing set */
1062 GNUNET_break (0); 1006 GNUNET_break (0);
1063 GNUNET_SERVICE_client_drop (client); 1007 GNUNET_SERVICE_client_drop (cs->client);
1064 return; 1008 return;
1065 } 1009 }
1066 if (NULL != set->iter) 1010 if (NULL != set->iter)
1067 { 1011 {
1068 /* Only one concurrent iterate-action allowed per set */ 1012 /* Only one concurrent iterate-action allowed per set */
1069 GNUNET_break (0); 1013 GNUNET_break (0);
1070 GNUNET_SERVICE_client_drop (client); 1014 GNUNET_SERVICE_client_drop (cs->client);
1071 return; 1015 return;
1072 } 1016 }
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1075,8 +1019,8 @@ handle_client_iterate (void *cls,
1075 (void *) set, 1019 (void *) set,
1076 set->current_generation, 1020 set->current_generation,
1077 GNUNET_CONTAINER_multihashmap_size (set->content->elements)); 1021 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1078 GNUNET_SERVICE_client_continue (client); 1022 GNUNET_SERVICE_client_continue (cs->client);
1079 set->content->iterator_count += 1; 1023 set->content->iterator_count++;
1080 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); 1024 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1081 set->iter_generation = set->current_generation; 1025 set->iter_generation = set->current_generation;
1082 send_client_element (set); 1026 send_client_element (set);
@@ -1095,17 +1039,17 @@ static void
1095handle_client_create_set (void *cls, 1039handle_client_create_set (void *cls,
1096 const struct GNUNET_SET_CreateMessage *msg) 1040 const struct GNUNET_SET_CreateMessage *msg)
1097{ 1041{
1098 struct GNUNET_SERVICE_Client *client = cls; 1042 struct ClientState *cs = cls;
1099 struct Set *set; 1043 struct Set *set;
1100 1044
1101 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1102 "Client created new set (operation %u)\n", 1046 "Client created new set (operation %u)\n",
1103 (uint32_t) ntohl (msg->operation)); 1047 (uint32_t) ntohl (msg->operation));
1104 if (NULL != set_get (client)) 1048 if (NULL != cs->set)
1105 { 1049 {
1106 /* There can only be one set per client */ 1050 /* There can only be one set per client */
1107 GNUNET_break (0); 1051 GNUNET_break (0);
1108 GNUNET_SERVICE_client_drop (client); 1052 GNUNET_SERVICE_client_drop (cs->client);
1109 return; 1053 return;
1110 } 1054 }
1111 set = GNUNET_new (struct Set); 1055 set = GNUNET_new (struct Set);
@@ -1120,7 +1064,7 @@ handle_client_create_set (void *cls,
1120 default: 1064 default:
1121 GNUNET_free (set); 1065 GNUNET_free (set);
1122 GNUNET_break (0); 1066 GNUNET_break (0);
1123 GNUNET_SERVICE_client_drop (client); 1067 GNUNET_SERVICE_client_drop (cs->client);
1124 return; 1068 return;
1125 } 1069 }
1126 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); 1070 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
@@ -1129,18 +1073,16 @@ handle_client_create_set (void *cls,
1129 { 1073 {
1130 /* initialization failed (i.e. out of memory) */ 1074 /* initialization failed (i.e. out of memory) */
1131 GNUNET_free (set); 1075 GNUNET_free (set);
1132 GNUNET_SERVICE_client_drop (client); 1076 GNUNET_SERVICE_client_drop (cs->client);
1133 return; 1077 return;
1134 } 1078 }
1135 set->content = GNUNET_new (struct SetContent); 1079 set->content = GNUNET_new (struct SetContent);
1136 set->content->refcount = 1; 1080 set->content->refcount = 1;
1137 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1081 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1138 set->client = client; 1082 GNUNET_YES);
1139 set->client_mq = GNUNET_SERVICE_client_get_mq (client); 1083 set->cs = cs;
1140 GNUNET_CONTAINER_DLL_insert (sets_head, 1084 cs->set = set;
1141 sets_tail, 1085 GNUNET_SERVICE_client_continue (cs->client);
1142 set);
1143 GNUNET_SERVICE_client_continue (client);
1144} 1086}
1145 1087
1146 1088
@@ -1156,31 +1098,12 @@ handle_client_create_set (void *cls,
1156static void 1098static void
1157incoming_timeout_cb (void *cls) 1099incoming_timeout_cb (void *cls)
1158{ 1100{
1159 struct Operation *incoming = cls; 1101 struct Operation *op = cls;
1160 1102
1161 incoming->timeout_task = NULL; 1103 op->timeout_task = NULL;
1162 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1163 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164 "Remote peer's incoming request timed out\n"); 1105 "Remote peer's incoming request timed out\n");
1165 incoming_destroy (incoming);
1166}
1167
1168
1169/**
1170 * Terminates an incoming operation in case we have not yet received an
1171 * operation request. Called by the channel destruction handler.
1172 *
1173 * @param op the channel context
1174 */
1175static void
1176handle_incoming_disconnect (struct Operation *op)
1177{
1178 GNUNET_assert (GNUNET_YES == op->is_incoming);
1179 /* channel is already dead, incoming_destroy must not
1180 * destroy it ... */
1181 op->channel = NULL;
1182 incoming_destroy (op); 1106 incoming_destroy (op);
1183 op->vt = NULL;
1184} 1107}
1185 1108
1186 1109
@@ -1205,31 +1128,26 @@ channel_new_cb (void *cls,
1205 struct GNUNET_CADET_Channel *channel, 1128 struct GNUNET_CADET_Channel *channel,
1206 const struct GNUNET_PeerIdentity *source) 1129 const struct GNUNET_PeerIdentity *source)
1207{ 1130{
1208 static const struct SetVT incoming_vt = {
1209 .peer_disconnect = &handle_incoming_disconnect
1210 };
1211 struct Listener *listener = cls; 1131 struct Listener *listener = cls;
1212 struct Operation *incoming; 1132 struct Operation *op;
1213 1133
1214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215 "New incoming channel\n"); 1135 "New incoming channel\n");
1216 incoming = GNUNET_new (struct Operation); 1136 op = GNUNET_new (struct Operation);
1217 incoming->listener = listener; 1137 op->listener = listener;
1218 incoming->is_incoming = GNUNET_YES; 1138 op->peer = *source;
1219 incoming->peer = *source; 1139 op->channel = channel;
1220 incoming->channel = channel; 1140 op->mq = GNUNET_CADET_get_mq (op->channel);
1221 incoming->mq = GNUNET_CADET_get_mq (incoming->channel); 1141 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1222 incoming->vt = &incoming_vt; 1142 UINT32_MAX);
1223 incoming->timeout_task 1143 op->timeout_task
1224 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, 1144 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1225 &incoming_timeout_cb, 1145 &incoming_timeout_cb,
1226 incoming); 1146 op);
1227 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, 1147 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1228 incoming_tail, 1148 listener->op_tail,
1229 incoming); 1149 op);
1230 // incoming_suggest (incoming, 1150 return op;
1231 // listener);
1232 return incoming;
1233} 1151}
1234 1152
1235 1153
@@ -1258,22 +1176,14 @@ channel_end_cb (void *channel_ctx,
1258 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259 "channel_end_cb called\n"); 1177 "channel_end_cb called\n");
1260 op->channel = NULL; 1178 op->channel = NULL;
1261 op->keep++; 1179 if (NULL != op->listener)
1262 /* the vt can be null if a client already requested canceling op. */ 1180 incoming_destroy (op);
1263 if (NULL != op->vt) 1181 else if (NULL != op->set)
1264 { 1182 op->set->vt->channel_death (op);
1265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1183 else
1266 "calling peer disconnect due to channel end\n"); 1184 _GSS_operation_destroy (op,
1267 op->vt->peer_disconnect (op); 1185 GNUNET_YES);
1268 } 1186 GNUNET_free (op);
1269 op->keep--;
1270 if (0 == op->keep)
1271 {
1272 /* cadet will never call us with the context again! */
1273 GNUNET_free (op);
1274 }
1275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276 "channel_end_cb finished\n");
1277} 1187}
1278 1188
1279 1189
@@ -1310,7 +1220,7 @@ static void
1310handle_client_listen (void *cls, 1220handle_client_listen (void *cls,
1311 const struct GNUNET_SET_ListenMessage *msg) 1221 const struct GNUNET_SET_ListenMessage *msg)
1312{ 1222{
1313 struct GNUNET_SERVICE_Client *client = cls; 1223 struct ClientState *cs = cls;
1314 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1224 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1315 GNUNET_MQ_hd_var_size (incoming_msg, 1225 GNUNET_MQ_hd_var_size (incoming_msg,
1316 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1226 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
@@ -1376,50 +1286,33 @@ handle_client_listen (void *cls,
1376 }; 1286 };
1377 struct Listener *listener; 1287 struct Listener *listener;
1378 1288
1379 if (NULL != listener_get (client)) 1289 if (NULL != cs->listener)
1380 { 1290 {
1381 /* max. one active listener per client! */ 1291 /* max. one active listener per client! */
1382 GNUNET_break (0); 1292 GNUNET_break (0);
1383 GNUNET_SERVICE_client_drop (client); 1293 GNUNET_SERVICE_client_drop (cs->client);
1384 return; 1294 return;
1385 } 1295 }
1386 listener = GNUNET_new (struct Listener); 1296 listener = GNUNET_new (struct Listener);
1387 listener->client = client; 1297 listener->cs = cs;
1388 listener->client_mq = GNUNET_SERVICE_client_get_mq (client);
1389 listener->app_id = msg->app_id; 1298 listener->app_id = msg->app_id;
1390 listener->operation = ntohl (msg->operation); 1299 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1391 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, 1300 GNUNET_CONTAINER_DLL_insert (listener_head,
1392 listeners_tail, 1301 listener_tail,
1393 listener); 1302 listener);
1394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395 "New listener created (op %u, port %s)\n", 1304 "New listener created (op %u, port %s)\n",
1396 listener->operation, 1305 listener->operation,
1397 GNUNET_h2s (&listener->app_id)); 1306 GNUNET_h2s (&listener->app_id));
1398 listener->open_port = GNUNET_CADET_open_porT (cadet, 1307 listener->open_port
1399 &msg->app_id, 1308 = GNUNET_CADET_open_porT (cadet,
1400 &channel_new_cb, 1309 &msg->app_id,
1401 listener, 1310 &channel_new_cb,
1402 &channel_window_cb, 1311 listener,
1403 &channel_end_cb, 1312 &channel_window_cb,
1404 cadet_handlers); 1313 &channel_end_cb,
1405 /* check for existing incoming requests the listener might be interested in */ 1314 cadet_handlers);
1406 for (struct Operation *op = incoming_head; NULL != op; op = op->next) 1315 GNUNET_SERVICE_client_continue (cs->client);
1407 {
1408 if (NULL == op->spec)
1409 continue; /* no details available yet */
1410 if (0 != op->suggest_id)
1411 continue; /* this one has been already suggested to a listener */
1412 if (listener->operation != op->spec->operation)
1413 continue; /* incompatible operation */
1414 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1415 &op->spec->app_id))
1416 continue; /* incompatible appliation */
1417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418 "Found matching existing request\n");
1419 incoming_suggest (op,
1420 listener);
1421 }
1422 GNUNET_SERVICE_client_continue (client);
1423} 1316}
1424 1317
1425 1318
@@ -1434,26 +1327,26 @@ static void
1434handle_client_reject (void *cls, 1327handle_client_reject (void *cls,
1435 const struct GNUNET_SET_RejectMessage *msg) 1328 const struct GNUNET_SET_RejectMessage *msg)
1436{ 1329{
1437 struct GNUNET_SERVICE_Client *client = cls; 1330 struct ClientState *cs = cls;
1438 struct Operation *incoming; 1331 struct Operation *op;
1439 1332
1440 incoming = get_incoming (ntohl (msg->accept_reject_id)); 1333 op = get_incoming (ntohl (msg->accept_reject_id));
1441 if (NULL == incoming) 1334 if (NULL == op)
1442 { 1335 {
1443 /* no matching incoming operation for this reject; 1336 /* no matching incoming operation for this reject;
1444 could be that the other peer already disconnected... */ 1337 could be that the other peer already disconnected... */
1445 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1338 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1446 "Client rejected unknown operation %u\n", 1339 "Client rejected unknown operation %u\n",
1447 (unsigned int) ntohl (msg->accept_reject_id)); 1340 (unsigned int) ntohl (msg->accept_reject_id));
1448 GNUNET_SERVICE_client_continue (client); 1341 GNUNET_SERVICE_client_continue (cs->client);
1449 return; 1342 return;
1450 } 1343 }
1451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452 "Peer request (op %u, app %s) rejected by client\n", 1345 "Peer request (op %u, app %s) rejected by client\n",
1453 incoming->spec->operation, 1346 op->listener->operation,
1454 GNUNET_h2s (&incoming->spec->app_id)); 1347 GNUNET_h2s (&cs->listener->app_id));
1455 GNUNET_CADET_channel_destroy (incoming->channel); 1348 GNUNET_CADET_channel_destroy (op->channel);
1456 GNUNET_SERVICE_client_continue (client); 1349 GNUNET_SERVICE_client_continue (cs->client);
1457} 1350}
1458 1351
1459 1352
@@ -1461,13 +1354,14 @@ handle_client_reject (void *cls,
1461 * Called when a client wants to add or remove an element to a set it inhabits. 1354 * Called when a client wants to add or remove an element to a set it inhabits.
1462 * 1355 *
1463 * @param cls client that sent the message 1356 * @param cls client that sent the message
1464 * @param m message sent by the client 1357 * @param msg message sent by the client
1465 */ 1358 */
1466static int 1359static int
1467check_client_mutation (void *cls, 1360check_client_mutation (void *cls,
1468 const struct GNUNET_MessageHeader *m) 1361 const struct GNUNET_SET_ElementMessage *msg)
1469{ 1362{
1470 /* FIXME: any check we might want to do here? */ 1363 /* NOTE: Technically, we should probably check with the
1364 block library whether the element we are given is well-formed */
1471 return GNUNET_OK; 1365 return GNUNET_OK;
1472} 1366}
1473 1367
@@ -1476,24 +1370,23 @@ check_client_mutation (void *cls,
1476 * Called when a client wants to add or remove an element to a set it inhabits. 1370 * Called when a client wants to add or remove an element to a set it inhabits.
1477 * 1371 *
1478 * @param cls client that sent the message 1372 * @param cls client that sent the message
1479 * @param m message sent by the client 1373 * @param msg message sent by the client
1480 */ 1374 */
1481static void 1375static void
1482handle_client_mutation (void *cls, 1376handle_client_mutation (void *cls,
1483 const struct GNUNET_MessageHeader *m) 1377 const struct GNUNET_SET_ElementMessage *msg)
1484{ 1378{
1485 struct GNUNET_SERVICE_Client *client = cls; 1379 struct ClientState *cs = cls;
1486 struct Set *set; 1380 struct Set *set;
1487 1381
1488 set = set_get (client); 1382 if (NULL == (set = cs->set))
1489 if (NULL == set)
1490 { 1383 {
1491 /* client without a set requested an operation */ 1384 /* client without a set requested an operation */
1492 GNUNET_break (0); 1385 GNUNET_break (0);
1493 GNUNET_SERVICE_client_drop (client); 1386 GNUNET_SERVICE_client_drop (cs->client);
1494 return; 1387 return;
1495 } 1388 }
1496 GNUNET_SERVICE_client_continue (client); 1389 GNUNET_SERVICE_client_continue (cs->client);
1497 1390
1498 if (0 != set->content->iterator_count) 1391 if (0 != set->content->iterator_count)
1499 { 1392 {
@@ -1502,7 +1395,7 @@ handle_client_mutation (void *cls,
1502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1503 "Scheduling mutation on set\n"); 1396 "Scheduling mutation on set\n");
1504 pm = GNUNET_new (struct PendingMutation); 1397 pm = GNUNET_new (struct PendingMutation);
1505 pm->mutation_message = GNUNET_copy_message (m); 1398 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1506 pm->set = set; 1399 pm->set = set;
1507 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, 1400 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1508 set->content->pending_mutations_tail, 1401 set->content->pending_mutations_tail,
@@ -1512,7 +1405,7 @@ handle_client_mutation (void *cls,
1512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1513 "Executing mutation on set\n"); 1406 "Executing mutation on set\n");
1514 execute_mutation (set, 1407 execute_mutation (set,
1515 m); 1408 msg);
1516} 1409}
1517 1410
1518 1411
@@ -1577,7 +1470,7 @@ static void
1577handle_client_evaluate (void *cls, 1470handle_client_evaluate (void *cls,
1578 const struct GNUNET_SET_EvaluateMessage *msg) 1471 const struct GNUNET_SET_EvaluateMessage *msg)
1579{ 1472{
1580 struct GNUNET_SERVICE_Client *client = cls; 1473 struct ClientState *cs = cls;
1581 struct Operation *op = GNUNET_new (struct Operation); 1474 struct Operation *op = GNUNET_new (struct Operation);
1582 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1475 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1583 GNUNET_MQ_hd_var_size (incoming_msg, 1476 GNUNET_MQ_hd_var_size (incoming_msg,
@@ -1643,45 +1536,38 @@ handle_client_evaluate (void *cls,
1643 GNUNET_MQ_handler_end () 1536 GNUNET_MQ_handler_end ()
1644 }; 1537 };
1645 struct Set *set; 1538 struct Set *set;
1646 struct OperationSpecification *spec;
1647 const struct GNUNET_MessageHeader *context; 1539 const struct GNUNET_MessageHeader *context;
1648 1540
1649 set = set_get (client); 1541 if (NULL == (set = cs->set))
1650 if (NULL == set)
1651 { 1542 {
1652 GNUNET_break (0); 1543 GNUNET_break (0);
1653 GNUNET_free (op); 1544 GNUNET_free (op);
1654 GNUNET_SERVICE_client_drop (client); 1545 GNUNET_SERVICE_client_drop (cs->client);
1655 return; 1546 return;
1656 } 1547 }
1657 spec = GNUNET_new (struct OperationSpecification); 1548 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1658 spec->operation = set->operation; 1549 UINT32_MAX);
1659 spec->app_id = msg->app_id; 1550 op->peer = msg->target_peer;
1660 spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 1551 op->result_mode = ntohl (msg->result_mode);
1661 UINT32_MAX); 1552 op->client_request_id = ntohl (msg->request_id);
1662 spec->peer = msg->target_peer; 1553 op->byzantine = msg->byzantine;
1663 spec->set = set; 1554 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1664 spec->result_mode = ntohl (msg->result_mode); 1555 op->force_full = msg->force_full;
1665 spec->client_request_id = ntohl (msg->request_id); 1556 op->force_delta = msg->force_delta;
1666 spec->byzantine = msg->byzantine;
1667 spec->byzantine_lower_bound = msg->byzantine_lower_bound;
1668 spec->force_full = msg->force_full;
1669 spec->force_delta = msg->force_delta;
1670 context = GNUNET_MQ_extract_nested_mh (msg); 1557 context = GNUNET_MQ_extract_nested_mh (msg);
1671 op->spec = spec;
1672 1558
1673 // Advance generation values, so that 1559 /* Advance generation values, so that
1674 // mutations won't interfer with the running operation. 1560 mutations won't interfer with the running operation. */
1561 op->set = set;
1675 op->generation_created = set->current_generation; 1562 op->generation_created = set->current_generation;
1676 advance_generation (set); 1563 advance_generation (set);
1677 op->operation = set->operation;
1678 op->vt = set->vt;
1679 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1564 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1680 set->ops_tail, 1565 set->ops_tail,
1681 op); 1566 op);
1682 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683 "Creating new CADET channel to port %s\n", 1568 "Creating new CADET channel to port %s for set operation type %u\n",
1684 GNUNET_h2s (&msg->app_id)); 1569 GNUNET_h2s (&msg->app_id),
1570 set->operation);
1685 op->channel = GNUNET_CADET_channel_creatE (cadet, 1571 op->channel = GNUNET_CADET_channel_creatE (cadet,
1686 op, 1572 op,
1687 &msg->target_peer, 1573 &msg->target_peer,
@@ -1691,9 +1577,15 @@ handle_client_evaluate (void *cls,
1691 &channel_end_cb, 1577 &channel_end_cb,
1692 cadet_handlers); 1578 cadet_handlers);
1693 op->mq = GNUNET_CADET_get_mq (op->channel); 1579 op->mq = GNUNET_CADET_get_mq (op->channel);
1694 set->vt->evaluate (op, 1580 op->state = set->vt->evaluate (op,
1695 context); 1581 context);
1696 GNUNET_SERVICE_client_continue (client); 1582 if (NULL == op->state)
1583 {
1584 GNUNET_break (0);
1585 GNUNET_SERVICE_client_drop (cs->client);
1586 return;
1587 }
1588 GNUNET_SERVICE_client_continue (cs->client);
1697} 1589}
1698 1590
1699 1591
@@ -1709,15 +1601,14 @@ static void
1709handle_client_iter_ack (void *cls, 1601handle_client_iter_ack (void *cls,
1710 const struct GNUNET_SET_IterAckMessage *ack) 1602 const struct GNUNET_SET_IterAckMessage *ack)
1711{ 1603{
1712 struct GNUNET_SERVICE_Client *client = cls; 1604 struct ClientState *cs = cls;
1713 struct Set *set; 1605 struct Set *set;
1714 1606
1715 set = set_get (client); 1607 if (NULL == (set = cs->set))
1716 if (NULL == set)
1717 { 1608 {
1718 /* client without a set acknowledged receiving a value */ 1609 /* client without a set acknowledged receiving a value */
1719 GNUNET_break (0); 1610 GNUNET_break (0);
1720 GNUNET_SERVICE_client_drop (client); 1611 GNUNET_SERVICE_client_drop (cs->client);
1721 return; 1612 return;
1722 } 1613 }
1723 if (NULL == set->iter) 1614 if (NULL == set->iter)
@@ -1725,10 +1616,10 @@ handle_client_iter_ack (void *cls,
1725 /* client sent an ack, but we were not expecting one (as 1616 /* client sent an ack, but we were not expecting one (as
1726 set iteration has finished) */ 1617 set iteration has finished) */
1727 GNUNET_break (0); 1618 GNUNET_break (0);
1728 GNUNET_SERVICE_client_drop (client); 1619 GNUNET_SERVICE_client_drop (cs->client);
1729 return; 1620 return;
1730 } 1621 }
1731 GNUNET_SERVICE_client_continue (client); 1622 GNUNET_SERVICE_client_continue (cs->client);
1732 if (ntohl (ack->send_more)) 1623 if (ntohl (ack->send_more))
1733 { 1624 {
1734 send_client_element (set); 1625 send_client_element (set);
@@ -1752,42 +1643,33 @@ static void
1752handle_client_copy_lazy_prepare (void *cls, 1643handle_client_copy_lazy_prepare (void *cls,
1753 const struct GNUNET_MessageHeader *mh) 1644 const struct GNUNET_MessageHeader *mh)
1754{ 1645{
1755 struct GNUNET_SERVICE_Client *client = cls; 1646 struct ClientState *cs = cls;
1756 struct Set *set; 1647 struct Set *set;
1757 struct LazyCopyRequest *cr; 1648 struct LazyCopyRequest *cr;
1758 struct GNUNET_MQ_Envelope *ev; 1649 struct GNUNET_MQ_Envelope *ev;
1759 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; 1650 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1760 1651
1761 set = set_get (client); 1652 if (NULL == (set = cs->set))
1762 if (NULL == set)
1763 { 1653 {
1764 /* client without a set requested an operation */ 1654 /* client without a set requested an operation */
1765 GNUNET_break (0); 1655 GNUNET_break (0);
1766 GNUNET_SERVICE_client_drop (client); 1656 GNUNET_SERVICE_client_drop (cs->client);
1767 return; 1657 return;
1768 } 1658 }
1769 1659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660 "Client requested creation of lazy copy\n");
1770 cr = GNUNET_new (struct LazyCopyRequest); 1661 cr = GNUNET_new (struct LazyCopyRequest);
1771 1662 cr->cookie = ++lazy_copy_cookie;
1772 cr->cookie = lazy_copy_cookie;
1773 lazy_copy_cookie += 1;
1774 cr->source_set = set; 1663 cr->source_set = set;
1775
1776 GNUNET_CONTAINER_DLL_insert (lazy_copy_head, 1664 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1777 lazy_copy_tail, 1665 lazy_copy_tail,
1778 cr); 1666 cr);
1779
1780
1781 ev = GNUNET_MQ_msg (resp_msg, 1667 ev = GNUNET_MQ_msg (resp_msg,
1782 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); 1668 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1783 resp_msg->cookie = cr->cookie; 1669 resp_msg->cookie = cr->cookie;
1784 GNUNET_MQ_send (set->client_mq, ev); 1670 GNUNET_MQ_send (set->cs->mq,
1785 1671 ev);
1786 1672 GNUNET_SERVICE_client_continue (cs->client);
1787 GNUNET_SERVICE_client_continue (client);
1788
1789 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790 "Client requested lazy copy\n");
1791} 1673}
1792 1674
1793 1675
@@ -1801,21 +1683,19 @@ static void
1801handle_client_copy_lazy_connect (void *cls, 1683handle_client_copy_lazy_connect (void *cls,
1802 const struct GNUNET_SET_CopyLazyConnectMessage *msg) 1684 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1803{ 1685{
1804 struct GNUNET_SERVICE_Client *client = cls; 1686 struct ClientState *cs = cls;
1805 struct LazyCopyRequest *cr; 1687 struct LazyCopyRequest *cr;
1806 struct Set *set; 1688 struct Set *set;
1807 int found; 1689 int found;
1808 1690
1809 if (NULL != set_get (client)) 1691 if (NULL != cs->set)
1810 { 1692 {
1811 /* There can only be one set per client */ 1693 /* There can only be one set per client */
1812 GNUNET_break (0); 1694 GNUNET_break (0);
1813 GNUNET_SERVICE_client_drop (client); 1695 GNUNET_SERVICE_client_drop (cs->client);
1814 return; 1696 return;
1815 } 1697 }
1816
1817 found = GNUNET_NO; 1698 found = GNUNET_NO;
1818
1819 for (cr = lazy_copy_head; NULL != cr; cr = cr->next) 1699 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1820 { 1700 {
1821 if (cr->cookie == msg->cookie) 1701 if (cr->cookie == msg->cookie)
@@ -1824,21 +1704,20 @@ handle_client_copy_lazy_connect (void *cls,
1824 break; 1704 break;
1825 } 1705 }
1826 } 1706 }
1827
1828 if (GNUNET_NO == found) 1707 if (GNUNET_NO == found)
1829 { 1708 {
1830 /* client asked for copy with cookie we don't know */ 1709 /* client asked for copy with cookie we don't know */
1831 GNUNET_break (0); 1710 GNUNET_break (0);
1832 GNUNET_SERVICE_client_drop (client); 1711 GNUNET_SERVICE_client_drop (cs->client);
1833 return; 1712 return;
1834 } 1713 }
1835
1836 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, 1714 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1837 lazy_copy_tail, 1715 lazy_copy_tail,
1838 cr); 1716 cr);
1839 1717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718 "Client %p requested use of lazy copy\n",
1719 cs);
1840 set = GNUNET_new (struct Set); 1720 set = GNUNET_new (struct Set);
1841
1842 switch (cr->source_set->operation) 1721 switch (cr->source_set->operation)
1843 { 1722 {
1844 case GNUNET_SET_OPERATION_INTERSECTION: 1723 case GNUNET_SET_OPERATION_INTERSECTION:
@@ -1858,37 +1737,28 @@ handle_client_copy_lazy_connect (void *cls,
1858 GNUNET_break (0); 1737 GNUNET_break (0);
1859 GNUNET_free (set); 1738 GNUNET_free (set);
1860 GNUNET_free (cr); 1739 GNUNET_free (cr);
1861 GNUNET_SERVICE_client_drop (client); 1740 GNUNET_SERVICE_client_drop (cs->client);
1862 return; 1741 return;
1863 } 1742 }
1864 1743
1865 set->operation = cr->source_set->operation; 1744 set->operation = cr->source_set->operation;
1866 set->state = set->vt->copy_state (cr->source_set); 1745 set->state = set->vt->copy_state (cr->source_set->state);
1867 set->content = cr->source_set->content; 1746 set->content = cr->source_set->content;
1868 set->content->refcount += 1; 1747 set->content->refcount++;
1869 1748
1870 set->current_generation = cr->source_set->current_generation; 1749 set->current_generation = cr->source_set->current_generation;
1871 set->excluded_generations_size = cr->source_set->excluded_generations_size; 1750 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1872 set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations, 1751 set->excluded_generations
1873 set->excluded_generations_size * sizeof (struct GenerationRange)); 1752 = GNUNET_memdup (cr->source_set->excluded_generations,
1753 set->excluded_generations_size * sizeof (struct GenerationRange));
1874 1754
1875 /* Advance the generation of the new set, so that mutations to the 1755 /* Advance the generation of the new set, so that mutations to the
1876 of the cloned set and the source set are independent. */ 1756 of the cloned set and the source set are independent. */
1877 advance_generation (set); 1757 advance_generation (set);
1878 1758 set->cs = cs;
1879 1759 cs->set = set;
1880 set->client = client;
1881 set->client_mq = GNUNET_SERVICE_client_get_mq (client);
1882 GNUNET_CONTAINER_DLL_insert (sets_head,
1883 sets_tail,
1884 set);
1885
1886 GNUNET_free (cr); 1760 GNUNET_free (cr);
1887 1761 GNUNET_SERVICE_client_continue (cs->client);
1888 GNUNET_SERVICE_client_continue (client);
1889
1890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1891 "Client connected to lazy set\n");
1892} 1762}
1893 1763
1894 1764
@@ -1902,26 +1772,22 @@ static void
1902handle_client_cancel (void *cls, 1772handle_client_cancel (void *cls,
1903 const struct GNUNET_SET_CancelMessage *msg) 1773 const struct GNUNET_SET_CancelMessage *msg)
1904{ 1774{
1905 struct GNUNET_SERVICE_Client *client = cls; 1775 struct ClientState *cs = cls;
1906 struct Set *set; 1776 struct Set *set;
1907 struct Operation *op; 1777 struct Operation *op;
1908 int found; 1778 int found;
1909 1779
1910 set = set_get (client); 1780 if (NULL == (set = cs->set))
1911 if (NULL == set)
1912 { 1781 {
1913 /* client without a set requested an operation */ 1782 /* client without a set requested an operation */
1914 GNUNET_break (0); 1783 GNUNET_break (0);
1915 GNUNET_SERVICE_client_drop (client); 1784 GNUNET_SERVICE_client_drop (cs->client);
1916 return; 1785 return;
1917 } 1786 }
1918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919 "Client requested cancel for op %u\n",
1920 (uint32_t) ntohl (msg->request_id));
1921 found = GNUNET_NO; 1787 found = GNUNET_NO;
1922 for (op = set->ops_head; NULL != op; op = op->next) 1788 for (op = set->ops_head; NULL != op; op = op->next)
1923 { 1789 {
1924 if (op->spec->client_request_id == ntohl (msg->request_id)) 1790 if (op->client_request_id == ntohl (msg->request_id))
1925 { 1791 {
1926 found = GNUNET_YES; 1792 found = GNUNET_YES;
1927 break; 1793 break;
@@ -1934,15 +1800,19 @@ handle_client_cancel (void *cls,
1934 * yet and try to cancel the (just barely non-existent) operation. 1800 * yet and try to cancel the (just barely non-existent) operation.
1935 * So this is not a hard error. 1801 * So this is not a hard error.
1936 */ 1802 */
1937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1803 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1938 "Client canceled non-existent op\n"); 1804 "Client canceled non-existent op %u\n",
1805 (uint32_t) ntohl (msg->request_id));
1939 } 1806 }
1940 else 1807 else
1941 { 1808 {
1809 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1810 "Client requested cancel for op %u\n",
1811 (uint32_t) ntohl (msg->request_id));
1942 _GSS_operation_destroy (op, 1812 _GSS_operation_destroy (op,
1943 GNUNET_YES); 1813 GNUNET_YES);
1944 } 1814 }
1945 GNUNET_SERVICE_client_continue (client); 1815 GNUNET_SERVICE_client_continue (cs->client);
1946} 1816}
1947 1817
1948 1818
@@ -1958,18 +1828,18 @@ static void
1958handle_client_accept (void *cls, 1828handle_client_accept (void *cls,
1959 const struct GNUNET_SET_AcceptMessage *msg) 1829 const struct GNUNET_SET_AcceptMessage *msg)
1960{ 1830{
1961 struct GNUNET_SERVICE_Client *client = cls; 1831 struct ClientState *cs = cls;
1962 struct Set *set; 1832 struct Set *set;
1963 struct Operation *op; 1833 struct Operation *op;
1964 struct GNUNET_SET_ResultMessage *result_message; 1834 struct GNUNET_SET_ResultMessage *result_message;
1965 struct GNUNET_MQ_Envelope *ev; 1835 struct GNUNET_MQ_Envelope *ev;
1836 struct Listener *listener;
1966 1837
1967 set = set_get (client); 1838 if (NULL == (set = cs->set))
1968 if (NULL == set)
1969 { 1839 {
1970 /* client without a set requested to accept */ 1840 /* client without a set requested to accept */
1971 GNUNET_break (0); 1841 GNUNET_break (0);
1972 GNUNET_SERVICE_client_drop (client); 1842 GNUNET_SERVICE_client_drop (cs->client);
1973 return; 1843 return;
1974 } 1844 }
1975 op = get_incoming (ntohl (msg->accept_reject_id)); 1845 op = get_incoming (ntohl (msg->accept_reject_id));
@@ -1977,72 +1847,75 @@ handle_client_accept (void *cls,
1977 { 1847 {
1978 /* It is not an error if the set op does not exist -- it may 1848 /* It is not an error if the set op does not exist -- it may
1979 * have been destroyed when the partner peer disconnected. */ 1849 * have been destroyed when the partner peer disconnected. */
1980 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1850 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1981 "Client accepted request that is no longer active\n"); 1851 "Client %p accepted request %u of listener %p that is no longer active\n",
1852 cs,
1853 ntohl (msg->accept_reject_id),
1854 cs->listener);
1982 ev = GNUNET_MQ_msg (result_message, 1855 ev = GNUNET_MQ_msg (result_message,
1983 GNUNET_MESSAGE_TYPE_SET_RESULT); 1856 GNUNET_MESSAGE_TYPE_SET_RESULT);
1984 result_message->request_id = msg->request_id; 1857 result_message->request_id = msg->request_id;
1985 result_message->element_type = 0;
1986 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); 1858 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1987 GNUNET_MQ_send (set->client_mq, ev); 1859 GNUNET_MQ_send (set->cs->mq,
1988 GNUNET_SERVICE_client_continue (client); 1860 ev);
1861 GNUNET_SERVICE_client_continue (cs->client);
1989 return; 1862 return;
1990 } 1863 }
1991
1992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1864 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993 "Client accepting request %u\n", 1865 "Client accepting request %u\n",
1994 (uint32_t) ntohl (msg->accept_reject_id)); 1866 (uint32_t) ntohl (msg->accept_reject_id));
1995 GNUNET_assert (GNUNET_YES == op->is_incoming); 1867 listener = op->listener;
1996 op->is_incoming = GNUNET_NO; 1868 op->listener = NULL;
1997 GNUNET_CONTAINER_DLL_remove (incoming_head, 1869 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1998 incoming_tail, 1870 listener->op_tail,
1999 op); 1871 op);
2000 op->spec->set = set; 1872 op->set = set;
2001 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1873 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2002 set->ops_tail, 1874 set->ops_tail,
2003 op); 1875 op);
2004 op->spec->client_request_id = ntohl (msg->request_id); 1876 op->client_request_id = ntohl (msg->request_id);
2005 op->spec->result_mode = ntohl (msg->result_mode); 1877 op->result_mode = ntohl (msg->result_mode);
2006 op->spec->byzantine = msg->byzantine; 1878 op->byzantine = msg->byzantine;
2007 op->spec->byzantine_lower_bound = msg->byzantine_lower_bound; 1879 op->byzantine_lower_bound = msg->byzantine_lower_bound;
2008 op->spec->force_full = msg->force_full; 1880 op->force_full = msg->force_full;
2009 op->spec->force_delta = msg->force_delta; 1881 op->force_delta = msg->force_delta;
2010 1882
2011 // Advance generation values, so that 1883 /* Advance generation values, so that future mutations do not
2012 // mutations won't interfer with the running operation. 1884 interfer with the running operation. */
2013 op->generation_created = set->current_generation; 1885 op->generation_created = set->current_generation;
2014 advance_generation (set); 1886 advance_generation (set);
2015 1887 GNUNET_assert (NULL == op->state);
2016 op->vt = set->vt; 1888 op->state = set->vt->accept (op);
2017 op->operation = set->operation; 1889 if (NULL == op->state)
2018 op->vt->accept (op); 1890 {
2019 GNUNET_SERVICE_client_continue (client); 1891 GNUNET_break (0);
1892 GNUNET_SERVICE_client_drop (cs->client);
1893 return;
1894 }
1895 /* Now allow CADET to continue, as we did not do this in
1896 #handle_incoming_msg (as we wanted to first see if the
1897 local client would accept the request). */
1898 GNUNET_CADET_receive_done (op->channel);
1899 GNUNET_SERVICE_client_continue (cs->client);
2020} 1900}
2021 1901
2022 1902
2023/** 1903/**
2024 * Called to clean up, after a shutdown has been requested. 1904 * Called to clean up, after a shutdown has been requested.
2025 * 1905 *
2026 * @param cls closure 1906 * @param cls closure, NULL
2027 */ 1907 */
2028static void 1908static void
2029shutdown_task (void *cls) 1909shutdown_task (void *cls)
2030{ 1910{
2031 while (NULL != incoming_head) 1911 /* Delay actual shutdown to allow service to disconnect clients */
2032 incoming_destroy (incoming_head);
2033 while (NULL != listeners_head)
2034 listener_destroy (listeners_head);
2035 while (NULL != sets_head)
2036 set_destroy (sets_head);
2037
2038 /* it's important to destroy cadet at the end, as all channels
2039 * must be destroyed before the cadet handle! */
2040 if (NULL != cadet) 1912 if (NULL != cadet)
2041 { 1913 {
2042 GNUNET_CADET_disconnect (cadet); 1914 GNUNET_CADET_disconnect (cadet);
2043 cadet = NULL; 1915 cadet = NULL;
2044 } 1916 }
2045 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); 1917 GNUNET_STATISTICS_destroy (_GSS_statistics,
1918 GNUNET_YES);
2046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047 "handled shutdown request\n"); 1920 "handled shutdown request\n");
2048} 1921}
@@ -2061,15 +1934,19 @@ run (void *cls,
2061 const struct GNUNET_CONFIGURATION_Handle *cfg, 1934 const struct GNUNET_CONFIGURATION_Handle *cfg,
2062 struct GNUNET_SERVICE_Handle *service) 1935 struct GNUNET_SERVICE_Handle *service)
2063{ 1936{
2064 configuration = cfg; 1937 /* FIXME: need to modify SERVICE (!) API to allow
1938 us to run a shutdown task *after* clients were
1939 forcefully disconnected! */
2065 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1940 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2066 NULL); 1941 NULL);
2067 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); 1942 _GSS_statistics = GNUNET_STATISTICS_create ("set",
1943 cfg);
2068 cadet = GNUNET_CADET_connecT (cfg); 1944 cadet = GNUNET_CADET_connecT (cfg);
2069 if (NULL == cadet) 1945 if (NULL == cadet)
2070 { 1946 {
2071 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1947 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2072 _("Could not connect to CADET service\n")); 1948 _("Could not connect to CADET service\n"));
1949 GNUNET_SCHEDULER_shutdown ();
2073 return; 1950 return;
2074 } 1951 }
2075} 1952}
@@ -2095,7 +1972,7 @@ GNUNET_SERVICE_MAIN
2095 NULL), 1972 NULL),
2096 GNUNET_MQ_hd_var_size (client_mutation, 1973 GNUNET_MQ_hd_var_size (client_mutation,
2097 GNUNET_MESSAGE_TYPE_SET_ADD, 1974 GNUNET_MESSAGE_TYPE_SET_ADD,
2098 struct GNUNET_MessageHeader, 1975 struct GNUNET_SET_ElementMessage,
2099 NULL), 1976 NULL),
2100 GNUNET_MQ_hd_fixed_size (client_create_set, 1977 GNUNET_MQ_hd_fixed_size (client_create_set,
2101 GNUNET_MESSAGE_TYPE_SET_CREATE, 1978 GNUNET_MESSAGE_TYPE_SET_CREATE,
@@ -2119,7 +1996,7 @@ GNUNET_SERVICE_MAIN
2119 NULL), 1996 NULL),
2120 GNUNET_MQ_hd_var_size (client_mutation, 1997 GNUNET_MQ_hd_var_size (client_mutation,
2121 GNUNET_MESSAGE_TYPE_SET_REMOVE, 1998 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2122 struct GNUNET_MessageHeader, 1999 struct GNUNET_SET_ElementMessage,
2123 NULL), 2000 NULL),
2124 GNUNET_MQ_hd_fixed_size (client_cancel, 2001 GNUNET_MQ_hd_fixed_size (client_cancel,
2125 GNUNET_MESSAGE_TYPE_SET_CANCEL, 2002 GNUNET_MESSAGE_TYPE_SET_CANCEL,