aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set.c')
-rw-r--r--src/set/gnunet-service-set.c1574
1 files changed, 712 insertions, 862 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 30d43e8a1..12af653c1 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013, 2014, 2017 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -24,6 +24,8 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 */ 25 */
26#include "gnunet-service-set.h" 26#include "gnunet-service-set.h"
27#include "gnunet-service-set_union.h"
28#include "gnunet-service-set_intersection.h"
27#include "gnunet-service-set_protocol.h" 29#include "gnunet-service-set_protocol.h"
28#include "gnunet_statistics_service.h" 30#include "gnunet_statistics_service.h"
29 31
@@ -33,6 +35,35 @@
33 */ 35 */
34#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES 36#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35 37
38
39/**
40 * Lazy copy requests made by a client.
41 */
42struct LazyCopyRequest
43{
44 /**
45 * Kept in a DLL.
46 */
47 struct LazyCopyRequest *prev;
48
49 /**
50 * Kept in a DLL.
51 */
52 struct LazyCopyRequest *next;
53
54 /**
55 * Which set are we supposed to copy?
56 */
57 struct Set *source_set;
58
59 /**
60 * Cookie identifying the request.
61 */
62 uint32_t cookie;
63
64};
65
66
36/** 67/**
37 * A listener is inhabited by a client, and waits for evaluation 68 * A listener is inhabited by a client, and waits for evaluation
38 * requests from remote peers. 69 * requests from remote peers.
@@ -50,21 +81,24 @@ struct Listener
50 struct Listener *prev; 81 struct Listener *prev;
51 82
52 /** 83 /**
53 * Client that owns the listener. 84 * Head of DLL of operations this listener is responsible for.
54 * Only one client may own a listener. 85 * Once the client has accepted/declined the operation, the
86 * operation is moved to the respective set's operation DLLS.
55 */ 87 */
56 struct GNUNET_SERVICE_Client *client; 88 struct Operation *op_head;
57 89
58 /** 90 /**
59 * Message queue for the client 91 * Tail of DLL of operations this listener is responsible for.
92 * Once the client has accepted/declined the operation, the
93 * operation is moved to the respective set's operation DLLS.
60 */ 94 */
61 struct GNUNET_MQ_Handle *client_mq; 95 struct Operation *op_tail;
62 96
63 /** 97 /**
64 * Application ID for the operation, used to distinguish 98 * Client that owns the listener.
65 * multiple operations of the same type with the same peer. 99 * Only one client may own a listener.
66 */ 100 */
67 struct GNUNET_HashCode app_id; 101 struct ClientState *cs;
68 102
69 /** 103 /**
70 * The port we are listening on with CADET. 104 * The port we are listening on with CADET.
@@ -72,27 +106,18 @@ struct Listener
72 struct GNUNET_CADET_Port *open_port; 106 struct GNUNET_CADET_Port *open_port;
73 107
74 /** 108 /**
109 * Application ID for the operation, used to distinguish
110 * multiple operations of the same type with the same peer.
111 */
112 struct GNUNET_HashCode app_id;
113
114 /**
75 * The type of the operation. 115 * The type of the operation.
76 */ 116 */
77 enum GNUNET_SET_OperationType operation; 117 enum GNUNET_SET_OperationType operation;
78}; 118};
79 119
80 120
81struct LazyCopyRequest
82{
83 struct Set *source_set;
84 uint32_t cookie;
85
86 struct LazyCopyRequest *prev;
87 struct LazyCopyRequest *next;
88};
89
90
91/**
92 * Configuration of our local peer.
93 */
94static const struct GNUNET_CONFIGURATION_Handle *configuration;
95
96/** 121/**
97 * Handle to the cadet service, used to listen for and connect to 122 * Handle to the cadet service, used to listen for and connect to
98 * remote peers. 123 * remote peers.
@@ -100,96 +125,48 @@ static const struct GNUNET_CONFIGURATION_Handle *configuration;
100static struct GNUNET_CADET_Handle *cadet; 125static struct GNUNET_CADET_Handle *cadet;
101 126
102/** 127/**
103 * Sets are held in a doubly linked list. 128 * DLL of lazy copy requests by this client.
104 */ 129 */
105static struct Set *sets_head; 130static struct LazyCopyRequest *lazy_copy_head;
106 131
107/** 132/**
108 * Sets are held in a doubly linked list. 133 * DLL of lazy copy requests by this client.
109 */ 134 */
110static struct Set *sets_tail; 135static struct LazyCopyRequest *lazy_copy_tail;
111 136
112/** 137/**
113 * Listeners are held in a doubly linked list. 138 * Generator for unique cookie we set per lazy copy request.
114 */ 139 */
115static struct Listener *listeners_head; 140static uint32_t lazy_copy_cookie;
116 141
117/** 142/**
118 * Listeners are held in a doubly linked list. 143 * Statistics handle.
119 */ 144 */
120static struct Listener *listeners_tail; 145struct GNUNET_STATISTICS_Handle *_GSS_statistics;
121 146
122/** 147/**
123 * Incoming sockets from remote peers are held in a doubly linked 148 * Listeners are held in a doubly linked list.
124 * list.
125 */ 149 */
126static struct Operation *incoming_head; 150static struct Listener *listener_head;
127 151
128/** 152/**
129 * Incoming sockets from remote peers are held in a doubly linked 153 * Listeners are held in a doubly linked list.
130 * list.
131 */ 154 */
132static struct Operation *incoming_tail; 155static struct Listener *listener_tail;
133
134static struct LazyCopyRequest *lazy_copy_head;
135static struct LazyCopyRequest *lazy_copy_tail;
136
137static uint32_t lazy_copy_cookie = 1;
138 156
139/** 157/**
140 * Counter for allocating unique IDs for clients, used to identify 158 * Counter for allocating unique IDs for clients, used to identify
141 * incoming operation requests from remote peers, that the client can 159 * incoming operation requests from remote peers, that the client can
142 * choose to accept or refuse. 160 * choose to accept or refuse. 0 must not be used (reserved for
161 * uninitialized).
143 */ 162 */
144static uint32_t suggest_id = 1; 163static uint32_t suggest_id;
145
146/**
147 * Statistics handle.
148 */
149struct GNUNET_STATISTICS_Handle *_GSS_statistics;
150
151
152/**
153 * Get set that is owned by the given client, if any.
154 *
155 * @param client client to look for
156 * @return set that the client owns, NULL if the client
157 * does not own a set
158 */
159static struct Set *
160set_get (struct GNUNET_SERVICE_Client *client)
161{
162 struct Set *set;
163
164 for (set = sets_head; NULL != set; set = set->next)
165 if (set->client == client)
166 return set;
167 return NULL;
168}
169
170
171/**
172 * Get the listener associated with the given client, if any.
173 *
174 * @param client the client
175 * @return listener associated with the client, NULL
176 * if there isn't any
177 */
178static struct Listener *
179listener_get (struct GNUNET_SERVICE_Client *client)
180{
181 struct Listener *listener;
182
183 for (listener = listeners_head; NULL != listener; listener = listener->next)
184 if (listener->client == client)
185 return listener;
186 return NULL;
187}
188 164
189 165
190/** 166/**
191 * Get the incoming socket associated with the given id. 167 * Get the incoming socket associated with the given id.
192 * 168 *
169 * @param listener the listener to look in
193 * @param id id to look for 170 * @param id id to look for
194 * @return the incoming socket associated with the id, 171 * @return the incoming socket associated with the id,
195 * or NULL if there is none 172 * or NULL if there is none
@@ -197,43 +174,49 @@ listener_get (struct GNUNET_SERVICE_Client *client)
197static struct Operation * 174static struct Operation *
198get_incoming (uint32_t id) 175get_incoming (uint32_t id)
199{ 176{
200 struct Operation *op; 177 for (struct Listener *listener = listener_head;
201 178 NULL != listener;
202 for (op = incoming_head; NULL != op; op = op->next) 179 listener = listener->next)
203 if (op->suggest_id == id) 180 {
204 { 181 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
205 GNUNET_assert (GNUNET_YES == op->is_incoming); 182 if (op->suggest_id == id)
206 return op; 183 return op;
207 } 184 }
208 return NULL; 185 return NULL;
209} 186}
210 187
211 188
212/** 189/**
213 * Destroy a listener, free all resources associated with it. 190 * Destroy an incoming request from a remote peer
214 * 191 *
215 * @param listener listener to destroy 192 * @param op remote request to destroy
216 */ 193 */
217static void 194static void
218listener_destroy (struct Listener *listener) 195incoming_destroy (struct Operation *op)
219{ 196{
220 /* If the client is not dead yet, destroy it. 197 struct Listener *listener;
221 * The client's destroy callback will destroy the listener again. */ 198 struct GNUNET_CADET_Channel *channel;
222 if (NULL != listener->client)
223 {
224 struct GNUNET_SERVICE_Client *client = listener->client;
225 199
226 listener->client = NULL; 200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 201 "Destroying incoming operation %p\n",
228 "Disconnecting listener client\n"); 202 op);
229 GNUNET_SERVICE_client_drop (client); 203 if (NULL != (listener = op->listener))
230 return; 204 {
205 GNUNET_CONTAINER_DLL_remove (listener->op_head,
206 listener->op_tail,
207 op);
208 op->listener = NULL;
209 }
210 if (NULL != op->timeout_task)
211 {
212 GNUNET_SCHEDULER_cancel (op->timeout_task);
213 op->timeout_task = NULL;
214 }
215 if (NULL != (channel = op->channel))
216 {
217 op->channel = NULL;
218 GNUNET_CADET_channel_destroy (channel);
231 } 219 }
232 GNUNET_CADET_close_port (listener->open_port);
233 GNUNET_CONTAINER_DLL_remove (listeners_head,
234 listeners_tail,
235 listener);
236 GNUNET_free (listener);
237} 220}
238 221
239 222
@@ -303,12 +286,11 @@ garbage_collect_cb (void *cls,
303static void 286static void
304collect_generation_garbage (struct Set *set) 287collect_generation_garbage (struct Set *set)
305{ 288{
306 struct Operation *op;
307 struct GarbageContext gc; 289 struct GarbageContext gc;
308 290
309 gc.min_op_generation = UINT_MAX; 291 gc.min_op_generation = UINT_MAX;
310 gc.max_op_generation = 0; 292 gc.max_op_generation = 0;
311 for (op = set->ops_head; NULL != op; op = op->next) 293 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
312 { 294 {
313 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation, 295 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
314 op->generation_created); 296 op->generation_created);
@@ -322,23 +304,36 @@ collect_generation_garbage (struct Set *set)
322} 304}
323 305
324 306
307/**
308 * Is @a generation in the range of exclusions?
309 *
310 * @param generation generation to query
311 * @param excluded array of generations where the element is excluded
312 * @param excluded_size length of the @a excluded array
313 * @return #GNUNET_YES if @a generation is in any of the ranges
314 */
325static int 315static int
326is_excluded_generation (unsigned int generation, 316is_excluded_generation (unsigned int generation,
327 struct GenerationRange *excluded, 317 struct GenerationRange *excluded,
328 unsigned int excluded_size) 318 unsigned int excluded_size)
329{ 319{
330 unsigned int i; 320 for (unsigned int i = 0; i < excluded_size; i++)
331 321 if ( (generation >= excluded[i].start) &&
332 for (i = 0; i < excluded_size; i++) 322 (generation < excluded[i].end) )
333 {
334 if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
335 return GNUNET_YES; 323 return GNUNET_YES;
336 }
337
338 return GNUNET_NO; 324 return GNUNET_NO;
339} 325}
340 326
341 327
328/**
329 * Is element @a ee part of the set during @a query_generation?
330 *
331 * @param ee element to test
332 * @param query_generation generation to query
333 * @param excluded array of generations where the element is excluded
334 * @param excluded_size length of the @a excluded array
335 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
336 */
342static int 337static int
343is_element_of_generation (struct ElementEntry *ee, 338is_element_of_generation (struct ElementEntry *ee,
344 unsigned int query_generation, 339 unsigned int query_generation,
@@ -347,11 +342,12 @@ is_element_of_generation (struct ElementEntry *ee,
347{ 342{
348 struct MutationEvent *mut; 343 struct MutationEvent *mut;
349 int is_present; 344 int is_present;
350 unsigned int i;
351 345
352 GNUNET_assert (NULL != ee->mutations); 346 GNUNET_assert (NULL != ee->mutations);
353 347 if (GNUNET_YES ==
354 if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size)) 348 is_excluded_generation (query_generation,
349 excluded,
350 excluded_size))
355 { 351 {
356 GNUNET_break (0); 352 GNUNET_break (0);
357 return GNUNET_NO; 353 return GNUNET_NO;
@@ -361,7 +357,7 @@ is_element_of_generation (struct ElementEntry *ee,
361 357
362 /* Could be made faster with binary search, but lists 358 /* Could be made faster with binary search, but lists
363 are small, so why bother. */ 359 are small, so why bother. */
364 for (i = 0; i < ee->mutations_size; i++) 360 for (unsigned int i = 0; i < ee->mutations_size; i++)
365 { 361 {
366 mut = &ee->mutations[i]; 362 mut = &ee->mutations[i];
367 363
@@ -373,7 +369,10 @@ is_element_of_generation (struct ElementEntry *ee,
373 continue; 369 continue;
374 } 370 }
375 371
376 if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) 372 if (GNUNET_YES ==
373 is_excluded_generation (mut->generation,
374 excluded,
375 excluded_size))
377 { 376 {
378 /* The generation is excluded (because it belongs to another 377 /* The generation is excluded (because it belongs to another
379 fork via a lazy copy) and thus mutations aren't considered 378 fork via a lazy copy) and thus mutations aren't considered
@@ -382,11 +381,12 @@ is_element_of_generation (struct ElementEntry *ee,
382 } 381 }
383 382
384 /* This would be an inconsistency in how we manage mutations. */ 383 /* This would be an inconsistency in how we manage mutations. */
385 if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) ) 384 if ( (GNUNET_YES == is_present) &&
385 (GNUNET_YES == mut->added) )
386 GNUNET_assert (0); 386 GNUNET_assert (0);
387
388 /* Likewise. */ 387 /* Likewise. */
389 if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) ) 388 if ( (GNUNET_NO == is_present) &&
389 (GNUNET_NO == mut->added) )
390 GNUNET_assert (0); 390 GNUNET_assert (0);
391 391
392 is_present = mut->added; 392 is_present = mut->added;
@@ -396,44 +396,33 @@ is_element_of_generation (struct ElementEntry *ee,
396} 396}
397 397
398 398
399int 399/**
400_GSS_is_element_of_set (struct ElementEntry *ee, 400 * Is element @a ee part of the set used by @a op?
401 struct Set *set) 401 *
402{ 402 * @param ee element to test
403 return is_element_of_generation (ee, 403 * @param op operation the defines the set and its generation
404 set->current_generation, 404 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
405 set->excluded_generations, 405 */
406 set->excluded_generations_size);
407}
408
409
410static int
411is_element_of_iteration (struct ElementEntry *ee,
412 struct Set *set)
413{
414 return is_element_of_generation (ee,
415 set->iter_generation,
416 set->excluded_generations,
417 set->excluded_generations_size);
418}
419
420
421int 406int
422_GSS_is_element_of_operation (struct ElementEntry *ee, 407_GSS_is_element_of_operation (struct ElementEntry *ee,
423 struct Operation *op) 408 struct Operation *op)
424{ 409{
425 return is_element_of_generation (ee, 410 return is_element_of_generation (ee,
426 op->generation_created, 411 op->generation_created,
427 op->spec->set->excluded_generations, 412 op->set->excluded_generations,
428 op->spec->set->excluded_generations_size); 413 op->set->excluded_generations_size);
429} 414}
430 415
431 416
432/** 417/**
433 * Destroy the given operation. Call the implementation-specific 418 * Destroy the given operation. Used for any operation where both
434 * cancel function of the operation. Disconnects from the remote 419 * peers were known and that thus actually had a vt and channel. Must
435 * peer. Does not disconnect the client, as there may be multiple 420 * not be used for operations where 'listener' is still set and we do
436 * operations per set. 421 * not know the other peer.
422 *
423 * Call the implementation-specific cancel function of the operation.
424 * Disconnects from the remote peer. Does not disconnect the client,
425 * as there may be multiple operations per set.
437 * 426 *
438 * @param op operation to destroy 427 * @param op operation to destroy
439 * @param gc #GNUNET_YES to perform garbage collection on the set 428 * @param gc #GNUNET_YES to perform garbage collection on the set
@@ -442,38 +431,39 @@ void
442_GSS_operation_destroy (struct Operation *op, 431_GSS_operation_destroy (struct Operation *op,
443 int gc) 432 int gc)
444{ 433{
445 struct Set *set; 434 struct Set *set = op->set;
446 struct GNUNET_CADET_Channel *channel; 435 struct GNUNET_CADET_Channel *channel;
447 436
448 if (NULL == op->vt) 437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "Destroying operation %p\n",
439 op);
440 GNUNET_assert (NULL == op->listener);
441 if (NULL != op->state)
449 { 442 {
450 /* already in #_GSS_operation_destroy() */ 443 set->vt->cancel (op);
451 return; 444 op->state = NULL;
452 } 445 }
453 GNUNET_assert (GNUNET_NO == op->is_incoming); 446 if (NULL != set)
454 GNUNET_assert (NULL != op->spec);
455 set = op->spec->set;
456 GNUNET_CONTAINER_DLL_remove (set->ops_head,
457 set->ops_tail,
458 op);
459 op->vt->cancel (op);
460 op->vt = NULL;
461 if (NULL != op->spec)
462 { 447 {
463 if (NULL != op->spec->context_msg) 448 GNUNET_CONTAINER_DLL_remove (set->ops_head,
464 { 449 set->ops_tail,
465 GNUNET_free (op->spec->context_msg); 450 op);
466 op->spec->context_msg = NULL; 451 op->set = NULL;
467 } 452 }
468 GNUNET_free (op->spec); 453 if (NULL != op->context_msg)
469 op->spec = NULL; 454 {
455 GNUNET_free (op->context_msg);
456 op->context_msg = NULL;
470 } 457 }
471 if (NULL != (channel = op->channel)) 458 if (NULL != (channel = op->channel))
472 { 459 {
460 /* This will free op; called conditionally as this helper function
461 is also called from within the channel disconnect handler. */
473 op->channel = NULL; 462 op->channel = NULL;
474 GNUNET_CADET_channel_destroy (channel); 463 GNUNET_CADET_channel_destroy (channel);
475 } 464 }
476 if (GNUNET_YES == gc) 465 if ( (NULL != set) &&
466 (GNUNET_YES == gc) )
477 collect_generation_garbage (set); 467 collect_generation_garbage (set);
478 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, 468 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
479 * there was a channel end handler that will free 'op' on the call stack. */ 469 * there was a channel end handler that will free 'op' on the call stack. */
@@ -481,6 +471,28 @@ _GSS_operation_destroy (struct Operation *op,
481 471
482 472
483/** 473/**
474 * Callback called when a client connects to the service.
475 *
476 * @param cls closure for the service
477 * @param c the new client that connected to the service
478 * @param mq the message queue used to send messages to the client
479 * @return @a `struct ClientState`
480 */
481static void *
482client_connect_cb (void *cls,
483 struct GNUNET_SERVICE_Client *c,
484 struct GNUNET_MQ_Handle *mq)
485{
486 struct ClientState *cs;
487
488 cs = GNUNET_new (struct ClientState);
489 cs->client = c;
490 cs->mq = mq;
491 return cs;
492}
493
494
495/**
484 * Iterator over hash map entries to free element entries. 496 * Iterator over hash map entries to free element entries.
485 * 497 *
486 * @param cls closure 498 * @param cls closure
@@ -496,66 +508,76 @@ destroy_elements_iterator (void *cls,
496 struct ElementEntry *ee = value; 508 struct ElementEntry *ee = value;
497 509
498 GNUNET_free_non_null (ee->mutations); 510 GNUNET_free_non_null (ee->mutations);
499
500 GNUNET_free (ee); 511 GNUNET_free (ee);
501 return GNUNET_YES; 512 return GNUNET_YES;
502} 513}
503 514
504 515
505/** 516/**
506 * Destroy a set, and free all resources and operations associated with it. 517 * Clean up after a client has disconnected
507 * 518 *
508 * @param set the set to destroy 519 * @param cls closure, unused
520 * @param client the client to clean up after
521 * @param internal_cls the `struct ClientState`
509 */ 522 */
510static void 523static void
511set_destroy (struct Set *set) 524client_disconnect_cb (void *cls,
525 struct GNUNET_SERVICE_Client *client,
526 void *internal_cls)
512{ 527{
513 if (NULL != set->client) 528 struct ClientState *cs = internal_cls;
514 { 529 struct Operation *op;
515 /* If the client is not dead yet, destroy it. The client's destroy 530 struct Listener *listener;
516 * callback will call `set_destroy()` again in this case. We do 531 struct Set *set;
517 * this so that the channel end handler still has a valid set handle 532
518 * to destroy. */ 533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 struct GNUNET_SERVICE_Client *client = set->client; 534 "Client disconnected, cleaning up\n");
520 535 if (NULL != (set = cs->set))
521 set->client = NULL;
522 GNUNET_SERVICE_client_drop (client);
523 return;
524 }
525 GNUNET_assert (NULL != set->state);
526 while (NULL != set->ops_head)
527 _GSS_operation_destroy (set->ops_head, GNUNET_NO);
528 set->vt->destroy_set (set->state);
529 set->state = NULL;
530 if (NULL != set->iter)
531 {
532 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
533 set->iter = NULL;
534 set->iteration_id++;
535 }
536 { 536 {
537 struct SetContent *content; 537 struct SetContent *content = set->content;
538 struct PendingMutation *pm; 538 struct PendingMutation *pm;
539 struct PendingMutation *pm_current; 539 struct PendingMutation *pm_current;
540 struct LazyCopyRequest *lcr;
540 541
541 content = set->content; 542 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543 "Destroying client's set\n");
544 /* Destroy pending set operations */
545 while (NULL != set->ops_head)
546 _GSS_operation_destroy (set->ops_head,
547 GNUNET_NO);
548
549 /* Destroy operation-specific state */
550 GNUNET_assert (NULL != set->state);
551 set->vt->destroy_set (set->state);
552 set->state = NULL;
553
554 /* Clean up ongoing iterations */
555 if (NULL != set->iter)
556 {
557 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
558 set->iter = NULL;
559 set->iteration_id++;
560 }
542 561
543 // discard any pending mutations that reference this set 562 /* discard any pending mutations that reference this set */
544 pm = content->pending_mutations_head; 563 pm = content->pending_mutations_head;
545 while (NULL != pm) 564 while (NULL != pm)
546 { 565 {
547 pm_current = pm; 566 pm_current = pm;
548 pm = pm->next; 567 pm = pm->next;
549 if (pm_current-> set == set) 568 if (pm_current->set == set)
569 {
550 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, 570 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
551 content->pending_mutations_tail, 571 content->pending_mutations_tail,
552 pm_current); 572 pm_current);
553 573 GNUNET_free (pm_current);
574 }
554 } 575 }
555 576
577 /* free set content (or at least decrement RC) */
556 set->content = NULL; 578 set->content = NULL;
557 GNUNET_assert (0 != content->refcount); 579 GNUNET_assert (0 != content->refcount);
558 content->refcount -= 1; 580 content->refcount--;
559 if (0 == content->refcount) 581 if (0 == content->refcount)
560 { 582 {
561 GNUNET_assert (NULL != content->elements); 583 GNUNET_assert (NULL != content->elements);
@@ -566,154 +588,85 @@ set_destroy (struct Set *set)
566 content->elements = NULL; 588 content->elements = NULL;
567 GNUNET_free (content); 589 GNUNET_free (content);
568 } 590 }
569 } 591 GNUNET_free_non_null (set->excluded_generations);
570 GNUNET_free_non_null (set->excluded_generations); 592 set->excluded_generations = NULL;
571 set->excluded_generations = NULL;
572 GNUNET_CONTAINER_DLL_remove (sets_head,
573 sets_tail,
574 set);
575 593
576 // remove set from pending copy requests 594 /* remove set from pending copy requests */
577 {
578 struct LazyCopyRequest *lcr;
579 lcr = lazy_copy_head; 595 lcr = lazy_copy_head;
580 while (NULL != lcr) 596 while (NULL != lcr)
581 { 597 {
582 struct LazyCopyRequest *lcr_current; 598 struct LazyCopyRequest *lcr_current = lcr;
583 lcr_current = lcr; 599
584 lcr = lcr->next; 600 lcr = lcr->next;
585 if (lcr_current->source_set == set) 601 if (lcr_current->source_set == set)
602 {
586 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, 603 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
587 lazy_copy_tail, 604 lazy_copy_tail,
588 lcr_current); 605 lcr_current);
606 GNUNET_free (lcr_current);
607 }
589 } 608 }
609 GNUNET_free (set);
590 } 610 }
591 611
592 GNUNET_free (set); 612 if (NULL != (listener = cs->listener))
593}
594
595
596/**
597 * Callback called when a client connects to the service.
598 *
599 * @param cls closure for the service
600 * @param c the new client that connected to the service
601 * @param mq the message queue used to send messages to the client
602 * @return @a c
603 */
604static void *
605client_connect_cb (void *cls,
606 struct GNUNET_SERVICE_Client *c,
607 struct GNUNET_MQ_Handle *mq)
608{
609 return c;
610}
611
612
613/**
614 * Clean up after a client has disconnected
615 *
616 * @param cls closure, unused
617 * @param client the client to clean up after
618 * @param internal_cls our client-specific internal data structure
619 */
620static void
621client_disconnect_cb (void *cls,
622 struct GNUNET_SERVICE_Client *client,
623 void *internal_cls)
624{
625 struct Listener *listener;
626 struct Set *set;
627
628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629 "client disconnected, cleaning up\n");
630 set = set_get (client);
631 if (NULL != set)
632 { 613 {
633 set->client = NULL;
634 set_destroy (set);
635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636 "Client's set destroyed\n"); 615 "Destroying client's listener\n");
637 } 616 GNUNET_CADET_close_port (listener->open_port);
638 listener = listener_get (client); 617 listener->open_port = NULL;
639 if (NULL != listener) 618 while (NULL != (op = listener->op_head))
640 { 619 incoming_destroy (op);
641 listener->client = NULL; 620 GNUNET_CONTAINER_DLL_remove (listener_head,
642 listener_destroy (listener); 621 listener_tail,
643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 622 listener);
644 "Client's listener destroyed\n"); 623 GNUNET_free (listener);
645 } 624 }
625 GNUNET_free (cs);
646} 626}
647 627
648 628
649/** 629/**
650 * Destroy an incoming request from a remote peer 630 * Check a request for a set operation from another peer.
651 * 631 *
652 * @param incoming remote request to destroy 632 * @param cls the operation state
633 * @param msg the received message
634 * @return #GNUNET_OK if the channel should be kept alive,
635 * #GNUNET_SYSERR to destroy the channel
653 */ 636 */
654static void 637static int
655incoming_destroy (struct Operation *incoming) 638check_incoming_msg (void *cls,
639 const struct OperationRequestMessage *msg)
656{ 640{
657 struct GNUNET_CADET_Channel *channel; 641 struct Operation *op = cls;
642 struct Listener *listener = op->listener;
643 const struct GNUNET_MessageHeader *nested_context;
658 644
659 GNUNET_assert (GNUNET_YES == incoming->is_incoming); 645 /* double operation request */
660 GNUNET_CONTAINER_DLL_remove (incoming_head, 646 if (0 != op->suggest_id)
661 incoming_tail,
662 incoming);
663 if (NULL != incoming->timeout_task)
664 { 647 {
665 GNUNET_SCHEDULER_cancel (incoming->timeout_task); 648 GNUNET_break_op (0);
666 incoming->timeout_task = NULL; 649 return GNUNET_SYSERR;
667 } 650 }
668 /* make sure that the tunnel end handler will not destroy us again */ 651 /* This should be equivalent to the previous condition, but can't hurt to check twice */
669 incoming->vt = NULL; 652 if (NULL == op->listener)
670 if (NULL != incoming->spec)
671 { 653 {
672 GNUNET_free (incoming->spec); 654 GNUNET_break (0);
673 incoming->spec = NULL; 655 return GNUNET_SYSERR;
674 } 656 }
675 if (NULL != (channel = incoming->channel)) 657 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
676 { 658 {
677 incoming->channel = NULL; 659 GNUNET_break_op (0);
678 GNUNET_CADET_channel_destroy (channel); 660 return GNUNET_SYSERR;
679 } 661 }
680} 662 nested_context = GNUNET_MQ_extract_nested_mh (msg);
681 663 if ( (NULL != nested_context) &&
682 664 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
683/** 665 {
684 * Suggest the given request to the listener. The listening client can 666 GNUNET_break_op (0);
685 * then accept or reject the remote request. 667 return GNUNET_SYSERR;
686 * 668 }
687 * @param incoming the incoming peer with the request to suggest 669 return GNUNET_OK;
688 * @param listener the listener to suggest the request to
689 */
690static void
691incoming_suggest (struct Operation *incoming,
692 struct Listener *listener)
693{
694 struct GNUNET_MQ_Envelope *mqm;
695 struct GNUNET_SET_RequestMessage *cmsg;
696
697 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
698 GNUNET_assert (NULL != incoming->spec);
699 GNUNET_assert (0 == incoming->suggest_id);
700 incoming->suggest_id = suggest_id++;
701 if (0 == suggest_id)
702 suggest_id++;
703 GNUNET_assert (NULL != incoming->timeout_task);
704 GNUNET_SCHEDULER_cancel (incoming->timeout_task);
705 incoming->timeout_task = NULL;
706 mqm = GNUNET_MQ_msg_nested_mh (cmsg,
707 GNUNET_MESSAGE_TYPE_SET_REQUEST,
708 incoming->spec->context_msg);
709 GNUNET_assert (NULL != mqm);
710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
711 "Suggesting incoming request with accept id %u to listener\n",
712 incoming->suggest_id);
713 cmsg->accept_id = htonl (incoming->suggest_id);
714 cmsg->peer_id = incoming->spec->peer;
715 GNUNET_MQ_send (listener->client_mq,
716 mqm);
717} 670}
718 671
719 672
@@ -729,93 +682,85 @@ incoming_suggest (struct Operation *incoming,
729 * our virtual table and subsequent msgs would be routed differently (as 682 * our virtual table and subsequent msgs would be routed differently (as
730 * we then know what type of operation this is). 683 * we then know what type of operation this is).
731 * 684 *
732 * @param op the operation state 685 * @param cls the operation state
733 * @param mh the received message 686 * @param msg the received message
734 * @return #GNUNET_OK if the channel should be kept alive, 687 * @return #GNUNET_OK if the channel should be kept alive,
735 * #GNUNET_SYSERR to destroy the channel 688 * #GNUNET_SYSERR to destroy the channel
736 */ 689 */
737static int 690static void
738handle_incoming_msg (struct Operation *op, 691handle_incoming_msg (void *cls,
739 const struct GNUNET_MessageHeader *mh) 692 const struct OperationRequestMessage *msg)
740{ 693{
741 const struct OperationRequestMessage *msg; 694 struct Operation *op = cls;
742 struct Listener *listener = op->listener; 695 struct Listener *listener = op->listener;
743 struct OperationSpecification *spec;
744 const struct GNUNET_MessageHeader *nested_context; 696 const struct GNUNET_MessageHeader *nested_context;
697 struct GNUNET_MQ_Envelope *env;
698 struct GNUNET_SET_RequestMessage *cmsg;
745 699
746 msg = (const struct OperationRequestMessage *) mh;
747 GNUNET_assert (GNUNET_YES == op->is_incoming);
748 if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
749 {
750 GNUNET_break_op (0);
751 return GNUNET_SYSERR;
752 }
753 /* double operation request */
754 if (NULL != op->spec)
755 {
756 GNUNET_break_op (0);
757 return GNUNET_SYSERR;
758 }
759 spec = GNUNET_new (struct OperationSpecification);
760 nested_context = GNUNET_MQ_extract_nested_mh (msg); 700 nested_context = GNUNET_MQ_extract_nested_mh (msg);
761 if ( (NULL != nested_context) &&
762 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
763 {
764 GNUNET_break_op (0);
765 GNUNET_free (spec);
766 return GNUNET_SYSERR;
767 }
768 /* Make a copy of the nested_context (application-specific context 701 /* Make a copy of the nested_context (application-specific context
769 information that is opaque to set) so we can pass it to the 702 information that is opaque to set) so we can pass it to the
770 listener later on */ 703 listener later on */
771 if (NULL != nested_context) 704 if (NULL != nested_context)
772 spec->context_msg = GNUNET_copy_message (nested_context); 705 op->context_msg = GNUNET_copy_message (nested_context);
773 spec->operation = ntohl (msg->operation); 706 op->remote_element_count = ntohl (msg->element_count);
774 spec->app_id = listener->app_id;
775 spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
776 UINT32_MAX);
777 spec->peer = op->peer;
778 spec->remote_element_count = ntohl (msg->element_count);
779 op->spec = spec;
780
781 listener = op->listener;
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "Received P2P operation request (op %u, port %s) for active listener\n", 708 "Received P2P operation request (op %u, port %s) for active listener\n",
784 (uint32_t) ntohl (msg->operation), 709 (uint32_t) ntohl (msg->operation),
785 GNUNET_h2s (&listener->app_id)); 710 GNUNET_h2s (&op->listener->app_id));
786 incoming_suggest (op, 711 GNUNET_assert (0 == op->suggest_id);
787 listener); 712 if (0 == suggest_id)
788 return GNUNET_OK; 713 suggest_id++;
714 op->suggest_id = suggest_id++;
715 GNUNET_assert (NULL != op->timeout_task);
716 GNUNET_SCHEDULER_cancel (op->timeout_task);
717 op->timeout_task = NULL;
718 env = GNUNET_MQ_msg_nested_mh (cmsg,
719 GNUNET_MESSAGE_TYPE_SET_REQUEST,
720 op->context_msg);
721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
723 op->suggest_id,
724 listener,
725 listener->cs);
726 cmsg->accept_id = htonl (op->suggest_id);
727 cmsg->peer_id = op->peer;
728 GNUNET_MQ_send (listener->cs->mq,
729 env);
730 /* NOTE: GNUNET_CADET_receive_done() will be called in
731 #handle_client_accept() */
789} 732}
790 733
791 734
735/**
736 * Add an element to @a set as specified by @a msg
737 *
738 * @param set set to manipulate
739 * @param msg message specifying the change
740 */
792static void 741static void
793execute_add (struct Set *set, 742execute_add (struct Set *set,
794 const struct GNUNET_MessageHeader *m) 743 const struct GNUNET_SET_ElementMessage *msg)
795{ 744{
796 const struct GNUNET_SET_ElementMessage *msg;
797 struct GNUNET_SET_Element el; 745 struct GNUNET_SET_Element el;
798 struct ElementEntry *ee; 746 struct ElementEntry *ee;
799 struct GNUNET_HashCode hash; 747 struct GNUNET_HashCode hash;
800 748
801 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type)); 749 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
802 750 el.size = ntohs (msg->header.size) - sizeof (*msg);
803 msg = (const struct GNUNET_SET_ElementMessage *) m;
804 el.size = ntohs (m->size) - sizeof *msg;
805 el.data = &msg[1]; 751 el.data = &msg[1];
806 el.element_type = ntohs (msg->element_type); 752 el.element_type = ntohs (msg->element_type);
807 GNUNET_SET_element_hash (&el, &hash); 753 GNUNET_SET_element_hash (&el,
808 754 &hash);
809 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, 755 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
810 &hash); 756 &hash);
811
812 if (NULL == ee) 757 if (NULL == ee)
813 { 758 {
814 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
815 "Client inserts element %s of size %u\n", 760 "Client inserts element %s of size %u\n",
816 GNUNET_h2s (&hash), 761 GNUNET_h2s (&hash),
817 el.size); 762 el.size);
818 ee = GNUNET_malloc (el.size + sizeof *ee); 763 ee = GNUNET_malloc (el.size + sizeof (*ee));
819 ee->element.size = el.size; 764 ee->element.size = el.size;
820 GNUNET_memcpy (&ee[1], 765 GNUNET_memcpy (&ee[1],
821 el.data, 766 el.data,
@@ -832,7 +777,11 @@ execute_add (struct Set *set,
832 ee, 777 ee,
833 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 778 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
834 } 779 }
835 else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) 780 else if (GNUNET_YES ==
781 is_element_of_generation (ee,
782 set->current_generation,
783 set->excluded_generations,
784 set->excluded_generations_size))
836 { 785 {
837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838 "Client inserted element %s of size %u twice (ignored)\n", 787 "Client inserted element %s of size %u twice (ignored)\n",
@@ -852,24 +801,27 @@ execute_add (struct Set *set,
852 ee->mutations_size, 801 ee->mutations_size,
853 mut); 802 mut);
854 } 803 }
855 804 set->vt->add (set->state,
856 set->vt->add (set->state, ee); 805 ee);
857} 806}
858 807
859 808
809/**
810 * Remove an element from @a set as specified by @a msg
811 *
812 * @param set set to manipulate
813 * @param msg message specifying the change
814 */
860static void 815static void
861execute_remove (struct Set *set, 816execute_remove (struct Set *set,
862 const struct GNUNET_MessageHeader *m) 817 const struct GNUNET_SET_ElementMessage *msg)
863{ 818{
864 const struct GNUNET_SET_ElementMessage *msg;
865 struct GNUNET_SET_Element el; 819 struct GNUNET_SET_Element el;
866 struct ElementEntry *ee; 820 struct ElementEntry *ee;
867 struct GNUNET_HashCode hash; 821 struct GNUNET_HashCode hash;
868 822
869 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type)); 823 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
870 824 el.size = ntohs (msg->header.size) - sizeof (*msg);
871 msg = (const struct GNUNET_SET_ElementMessage *) m;
872 el.size = ntohs (m->size) - sizeof *msg;
873 el.data = &msg[1]; 825 el.data = &msg[1];
874 el.element_type = ntohs (msg->element_type); 826 el.element_type = ntohs (msg->element_type);
875 GNUNET_SET_element_hash (&el, &hash); 827 GNUNET_SET_element_hash (&el, &hash);
@@ -883,7 +835,11 @@ execute_remove (struct Set *set,
883 el.size); 835 el.size);
884 return; 836 return;
885 } 837 }
886 if (GNUNET_NO == _GSS_is_element_of_set (ee, set)) 838 if (GNUNET_NO ==
839 is_element_of_generation (ee,
840 set->current_generation,
841 set->excluded_generations,
842 set->excluded_generations_size))
887 { 843 {
888 /* Client tried to remove element twice */ 844 /* Client tried to remove element twice */
889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -906,22 +862,28 @@ execute_remove (struct Set *set,
906 ee->mutations_size, 862 ee->mutations_size,
907 mut); 863 mut);
908 } 864 }
909 set->vt->remove (set->state, ee); 865 set->vt->remove (set->state,
866 ee);
910} 867}
911 868
912 869
913 870/**
871 * Perform a mutation on a set as specified by the @a msg
872 *
873 * @param set the set to mutate
874 * @param msg specification of what to change
875 */
914static void 876static void
915execute_mutation (struct Set *set, 877execute_mutation (struct Set *set,
916 const struct GNUNET_MessageHeader *m) 878 const struct GNUNET_SET_ElementMessage *msg)
917{ 879{
918 switch (ntohs (m->type)) 880 switch (ntohs (msg->header.type))
919 { 881 {
920 case GNUNET_MESSAGE_TYPE_SET_ADD: 882 case GNUNET_MESSAGE_TYPE_SET_ADD:
921 execute_add (set, m); 883 execute_add (set, msg);
922 break; 884 break;
923 case GNUNET_MESSAGE_TYPE_SET_REMOVE: 885 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
924 execute_remove (set, m); 886 execute_remove (set, msg);
925 break; 887 break;
926 default: 888 default:
927 GNUNET_break (0); 889 GNUNET_break (0);
@@ -929,6 +891,34 @@ execute_mutation (struct Set *set,
929} 891}
930 892
931 893
894/**
895 * Execute mutations that were delayed on a set because of
896 * pending operations.
897 *
898 * @param set the set to execute mutations on
899 */
900static void
901execute_delayed_mutations (struct Set *set)
902{
903 struct PendingMutation *pm;
904
905 if (0 != set->content->iterator_count)
906 return; /* still cannot do this */
907 while (NULL != (pm = set->content->pending_mutations_head))
908 {
909 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
910 set->content->pending_mutations_tail,
911 pm);
912 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913 "Executing pending mutation on %p.\n",
914 pm->set);
915 execute_mutation (pm->set,
916 pm->msg);
917 GNUNET_free (pm->msg);
918 GNUNET_free (pm);
919 }
920}
921
932 922
933/** 923/**
934 * Send the next element of a set to the set's client. The next element is given by 924 * Send the next element of a set to the set's client. The next element is given by
@@ -952,65 +942,45 @@ send_client_element (struct Set *set)
952 struct GNUNET_SET_IterResponseMessage *msg; 942 struct GNUNET_SET_IterResponseMessage *msg;
953 943
954 GNUNET_assert (NULL != set->iter); 944 GNUNET_assert (NULL != set->iter);
955 945 do {
956again: 946 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
957 947 NULL,
958 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, 948 (const void **) &ee);
959 NULL, 949 if (GNUNET_NO == ret)
960 (const void **) &ee);
961 if (GNUNET_NO == ret)
962 {
963 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964 "Iteration on %p done.\n",
965 (void *) set);
966 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
967 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
968 set->iter = NULL;
969 set->iteration_id++;
970
971 GNUNET_assert (set->content->iterator_count > 0);
972 set->content->iterator_count -= 1;
973
974 if (0 == set->content->iterator_count)
975 { 950 {
976 while (NULL != set->content->pending_mutations_head) 951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977 { 952 "Iteration on %p done.\n",
978 struct PendingMutation *pm; 953 set);
979 954 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
980 pm = set->content->pending_mutations_head; 955 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
981 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, 956 set->iter = NULL;
982 set->content->pending_mutations_tail, 957 set->iteration_id++;
983 pm); 958 GNUNET_assert (set->content->iterator_count > 0);
984 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 959 set->content->iterator_count--;
985 "Executing pending mutation on %p.\n", 960 execute_delayed_mutations (set);
986 (void *) pm->set); 961 GNUNET_MQ_send (set->cs->mq,
987 execute_mutation (pm->set, pm->mutation_message); 962 ev);
988 GNUNET_free (pm->mutation_message); 963 return;
989 GNUNET_free (pm);
990 }
991 } 964 }
992
993 }
994 else
995 {
996 GNUNET_assert (NULL != ee); 965 GNUNET_assert (NULL != ee);
997 966 } while (GNUNET_NO ==
998 if (GNUNET_NO == is_element_of_iteration (ee, set)) 967 is_element_of_generation (ee,
999 goto again; 968 set->iter_generation,
1000 969 set->excluded_generations,
1001 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 970 set->excluded_generations_size));
1002 "Sending iteration element on %p.\n", 971 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1003 (void *) set); 972 "Sending iteration element on %p.\n",
1004 ev = GNUNET_MQ_msg_extra (msg, 973 set);
1005 ee->element.size, 974 ev = GNUNET_MQ_msg_extra (msg,
1006 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); 975 ee->element.size,
1007 GNUNET_memcpy (&msg[1], 976 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1008 ee->element.data, 977 GNUNET_memcpy (&msg[1],
1009 ee->element.size); 978 ee->element.data,
1010 msg->element_type = htons (ee->element.element_type); 979 ee->element.size);
1011 msg->iteration_id = htons (set->iteration_id); 980 msg->element_type = htons (ee->element.element_type);
1012 } 981 msg->iteration_id = htons (set->iteration_id);
1013 GNUNET_MQ_send (set->client_mq, ev); 982 GNUNET_MQ_send (set->cs->mq,
983 ev);
1014} 984}
1015 985
1016 986
@@ -1027,22 +997,21 @@ static void
1027handle_client_iterate (void *cls, 997handle_client_iterate (void *cls,
1028 const struct GNUNET_MessageHeader *m) 998 const struct GNUNET_MessageHeader *m)
1029{ 999{
1030 struct GNUNET_SERVICE_Client *client = cls; 1000 struct ClientState *cs = cls;
1031 struct Set *set; 1001 struct Set *set;
1032 1002
1033 set = set_get (client); 1003 if (NULL == (set = cs->set))
1034 if (NULL == set)
1035 { 1004 {
1036 /* attempt to iterate over a non existing set */ 1005 /* attempt to iterate over a non existing set */
1037 GNUNET_break (0); 1006 GNUNET_break (0);
1038 GNUNET_SERVICE_client_drop (client); 1007 GNUNET_SERVICE_client_drop (cs->client);
1039 return; 1008 return;
1040 } 1009 }
1041 if (NULL != set->iter) 1010 if (NULL != set->iter)
1042 { 1011 {
1043 /* Only one concurrent iterate-action allowed per set */ 1012 /* Only one concurrent iterate-action allowed per set */
1044 GNUNET_break (0); 1013 GNUNET_break (0);
1045 GNUNET_SERVICE_client_drop (client); 1014 GNUNET_SERVICE_client_drop (cs->client);
1046 return; 1015 return;
1047 } 1016 }
1048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1050,8 +1019,8 @@ handle_client_iterate (void *cls,
1050 (void *) set, 1019 (void *) set,
1051 set->current_generation, 1020 set->current_generation,
1052 GNUNET_CONTAINER_multihashmap_size (set->content->elements)); 1021 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1053 GNUNET_SERVICE_client_continue (client); 1022 GNUNET_SERVICE_client_continue (cs->client);
1054 set->content->iterator_count += 1; 1023 set->content->iterator_count++;
1055 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); 1024 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1056 set->iter_generation = set->current_generation; 1025 set->iter_generation = set->current_generation;
1057 send_client_element (set); 1026 send_client_element (set);
@@ -1070,17 +1039,17 @@ static void
1070handle_client_create_set (void *cls, 1039handle_client_create_set (void *cls,
1071 const struct GNUNET_SET_CreateMessage *msg) 1040 const struct GNUNET_SET_CreateMessage *msg)
1072{ 1041{
1073 struct GNUNET_SERVICE_Client *client = cls; 1042 struct ClientState *cs = cls;
1074 struct Set *set; 1043 struct Set *set;
1075 1044
1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077 "Client created new set (operation %u)\n", 1046 "Client created new set (operation %u)\n",
1078 (uint32_t) ntohl (msg->operation)); 1047 (uint32_t) ntohl (msg->operation));
1079 if (NULL != set_get (client)) 1048 if (NULL != cs->set)
1080 { 1049 {
1081 /* There can only be one set per client */ 1050 /* There can only be one set per client */
1082 GNUNET_break (0); 1051 GNUNET_break (0);
1083 GNUNET_SERVICE_client_drop (client); 1052 GNUNET_SERVICE_client_drop (cs->client);
1084 return; 1053 return;
1085 } 1054 }
1086 set = GNUNET_new (struct Set); 1055 set = GNUNET_new (struct Set);
@@ -1095,27 +1064,25 @@ handle_client_create_set (void *cls,
1095 default: 1064 default:
1096 GNUNET_free (set); 1065 GNUNET_free (set);
1097 GNUNET_break (0); 1066 GNUNET_break (0);
1098 GNUNET_SERVICE_client_drop (client); 1067 GNUNET_SERVICE_client_drop (cs->client);
1099 return; 1068 return;
1100 } 1069 }
1101 set->operation = ntohl (msg->operation); 1070 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1102 set->state = set->vt->create (); 1071 set->state = set->vt->create ();
1103 if (NULL == set->state) 1072 if (NULL == set->state)
1104 { 1073 {
1105 /* initialization failed (i.e. out of memory) */ 1074 /* initialization failed (i.e. out of memory) */
1106 GNUNET_free (set); 1075 GNUNET_free (set);
1107 GNUNET_SERVICE_client_drop (client); 1076 GNUNET_SERVICE_client_drop (cs->client);
1108 return; 1077 return;
1109 } 1078 }
1110 set->content = GNUNET_new (struct SetContent); 1079 set->content = GNUNET_new (struct SetContent);
1111 set->content->refcount = 1; 1080 set->content->refcount = 1;
1112 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1081 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1113 set->client = client; 1082 GNUNET_YES);
1114 set->client_mq = GNUNET_SERVICE_client_get_mq (client); 1083 set->cs = cs;
1115 GNUNET_CONTAINER_DLL_insert (sets_head, 1084 cs->set = set;
1116 sets_tail, 1085 GNUNET_SERVICE_client_continue (cs->client);
1117 set);
1118 GNUNET_SERVICE_client_continue (client);
1119} 1086}
1120 1087
1121 1088
@@ -1131,31 +1098,12 @@ handle_client_create_set (void *cls,
1131static void 1098static void
1132incoming_timeout_cb (void *cls) 1099incoming_timeout_cb (void *cls)
1133{ 1100{
1134 struct Operation *incoming = cls; 1101 struct Operation *op = cls;
1135 1102
1136 incoming->timeout_task = NULL; 1103 op->timeout_task = NULL;
1137 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139 "Remote peer's incoming request timed out\n"); 1105 "Remote peer's incoming request timed out\n");
1140 incoming_destroy (incoming);
1141}
1142
1143
1144/**
1145 * Terminates an incoming operation in case we have not yet received an
1146 * operation request. Called by the channel destruction handler.
1147 *
1148 * @param op the channel context
1149 */
1150static void
1151handle_incoming_disconnect (struct Operation *op)
1152{
1153 GNUNET_assert (GNUNET_YES == op->is_incoming);
1154 /* channel is already dead, incoming_destroy must not
1155 * destroy it ... */
1156 op->channel = NULL;
1157 incoming_destroy (op); 1106 incoming_destroy (op);
1158 op->vt = NULL;
1159} 1107}
1160 1108
1161 1109
@@ -1180,32 +1128,26 @@ channel_new_cb (void *cls,
1180 struct GNUNET_CADET_Channel *channel, 1128 struct GNUNET_CADET_Channel *channel,
1181 const struct GNUNET_PeerIdentity *source) 1129 const struct GNUNET_PeerIdentity *source)
1182{ 1130{
1183 static const struct SetVT incoming_vt = {
1184 .msg_handler = &handle_incoming_msg,
1185 .peer_disconnect = &handle_incoming_disconnect
1186 };
1187 struct Listener *listener = cls; 1131 struct Listener *listener = cls;
1188 struct Operation *incoming; 1132 struct Operation *op;
1189 1133
1190 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191 "New incoming channel\n"); 1135 "New incoming channel\n");
1192 incoming = GNUNET_new (struct Operation); 1136 op = GNUNET_new (struct Operation);
1193 incoming->listener = listener; 1137 op->listener = listener;
1194 incoming->is_incoming = GNUNET_YES; 1138 op->peer = *source;
1195 incoming->peer = *source; 1139 op->channel = channel;
1196 incoming->channel = channel; 1140 op->mq = GNUNET_CADET_get_mq (op->channel);
1197 incoming->mq = GNUNET_CADET_get_mq (incoming->channel); 1141 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1198 incoming->vt = &incoming_vt; 1142 UINT32_MAX);
1199 incoming->timeout_task 1143 op->timeout_task
1200 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, 1144 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1201 &incoming_timeout_cb, 1145 &incoming_timeout_cb,
1202 incoming); 1146 op);
1203 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, 1147 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1204 incoming_tail, 1148 listener->op_tail,
1205 incoming); 1149 op);
1206 // incoming_suggest (incoming, 1150 return op;
1207 // listener);
1208 return incoming;
1209} 1151}
1210 1152
1211 1153
@@ -1234,22 +1176,14 @@ channel_end_cb (void *channel_ctx,
1234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235 "channel_end_cb called\n"); 1177 "channel_end_cb called\n");
1236 op->channel = NULL; 1178 op->channel = NULL;
1237 op->keep++; 1179 if (NULL != op->listener)
1238 /* the vt can be null if a client already requested canceling op. */ 1180 incoming_destroy (op);
1239 if (NULL != op->vt) 1181 else if (NULL != op->set)
1240 { 1182 op->set->vt->channel_death (op);
1241 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1183 else
1242 "calling peer disconnect due to channel end\n"); 1184 _GSS_operation_destroy (op,
1243 op->vt->peer_disconnect (op); 1185 GNUNET_YES);
1244 } 1186 GNUNET_free (op);
1245 op->keep--;
1246 if (0 == op->keep)
1247 {
1248 /* cadet will never call us with the context again! */
1249 GNUNET_free (op);
1250 }
1251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1252 "channel_end_cb finished\n");
1253} 1187}
1254 1188
1255 1189
@@ -1275,60 +1209,6 @@ channel_window_cb (void *cls,
1275 /* FIXME: not implemented, we could do flow control here... */ 1209 /* FIXME: not implemented, we could do flow control here... */
1276} 1210}
1277 1211
1278/**
1279 * FIXME: hack-job. Migrate to proper handler array use!
1280 *
1281 * @param cls local state associated with the channel.
1282 * @param message The actual message.
1283 */
1284static int
1285check_p2p_message (void *cls,
1286 const struct GNUNET_MessageHeader *message)
1287{
1288 return GNUNET_OK;
1289}
1290
1291
1292/**
1293 * FIXME: hack-job. Migrate to proper handler array use!
1294 *
1295 * Functions with this signature are called whenever a message is
1296 * received via a cadet channel.
1297 *
1298 * The msg_handler is a virtual table set in initially either when a peer
1299 * creates a new channel with us, or once we create a new channel
1300 * ourselves (evaluate).
1301 *
1302 * Once we know the exact type of operation (union/intersection), the vt is
1303 * replaced with an operation specific instance (_GSS_[op]_vt).
1304 *
1305 * @param cls local state associated with the channel.
1306 * @param message The actual message.
1307 */
1308static void
1309handle_p2p_message (void *cls,
1310 const struct GNUNET_MessageHeader *message)
1311{
1312 struct Operation *op = cls;
1313 int ret;
1314
1315 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1316 "Dispatching cadet message (type: %u)\n",
1317 ntohs (message->type));
1318 /* do this before the handler, as the handler might kill the channel */
1319 GNUNET_CADET_receive_done (op->channel);
1320 if (NULL != op->vt)
1321 ret = op->vt->msg_handler (op,
1322 message);
1323 else
1324 ret = GNUNET_SYSERR;
1325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326 "Handled cadet message (type: %u)\n",
1327 ntohs (message->type));
1328 if (GNUNET_OK != ret)
1329 GNUNET_CADET_channel_destroy (op->channel);
1330}
1331
1332 1212
1333/** 1213/**
1334 * Called when a client wants to create a new listener. 1214 * Called when a client wants to create a new listener.
@@ -1340,116 +1220,99 @@ static void
1340handle_client_listen (void *cls, 1220handle_client_listen (void *cls,
1341 const struct GNUNET_SET_ListenMessage *msg) 1221 const struct GNUNET_SET_ListenMessage *msg)
1342{ 1222{
1343 struct GNUNET_SERVICE_Client *client = cls; 1223 struct ClientState *cs = cls;
1344 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1224 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1345 GNUNET_MQ_hd_var_size (p2p_message, 1225 GNUNET_MQ_hd_var_size (incoming_msg,
1346 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1226 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1347 struct GNUNET_MessageHeader, 1227 struct OperationRequestMessage,
1348 NULL), 1228 NULL),
1349 GNUNET_MQ_hd_var_size (p2p_message, 1229 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1350 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 1230 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1351 struct GNUNET_MessageHeader, 1231 struct IBFMessage,
1352 NULL), 1232 NULL),
1353 GNUNET_MQ_hd_var_size (p2p_message, 1233 GNUNET_MQ_hd_var_size (union_p2p_elements,
1354 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 1234 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1355 struct GNUNET_MessageHeader, 1235 struct GNUNET_SET_ElementMessage,
1356 NULL), 1236 NULL),
1357 GNUNET_MQ_hd_var_size (p2p_message, 1237 GNUNET_MQ_hd_var_size (union_p2p_offer,
1358 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 1238 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1359 struct GNUNET_MessageHeader, 1239 struct GNUNET_MessageHeader,
1360 NULL), 1240 NULL),
1361 GNUNET_MQ_hd_var_size (p2p_message, 1241 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1362 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 1242 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1363 struct GNUNET_MessageHeader, 1243 struct InquiryMessage,
1364 NULL), 1244 NULL),
1365 GNUNET_MQ_hd_var_size (p2p_message, 1245 GNUNET_MQ_hd_var_size (union_p2p_demand,
1366 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 1246 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1367 struct GNUNET_MessageHeader, 1247 struct GNUNET_MessageHeader,
1368 NULL), 1248 NULL),
1369 GNUNET_MQ_hd_var_size (p2p_message, 1249 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1370 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 1250 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1371 struct GNUNET_MessageHeader, 1251 struct GNUNET_MessageHeader,
1372 NULL), 1252 NULL),
1373 GNUNET_MQ_hd_var_size (p2p_message, 1253 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1374 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, 1254 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1375 struct GNUNET_MessageHeader, 1255 struct GNUNET_MessageHeader,
1376 NULL), 1256 NULL),
1377 GNUNET_MQ_hd_var_size (p2p_message, 1257 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1378 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, 1258 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1379 struct GNUNET_MessageHeader, 1259 struct GNUNET_MessageHeader,
1380 NULL), 1260 NULL),
1381 GNUNET_MQ_hd_var_size (p2p_message, 1261 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1382 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 1262 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1383 struct GNUNET_MessageHeader, 1263 struct StrataEstimatorMessage,
1384 NULL), 1264 NULL),
1385 GNUNET_MQ_hd_var_size (p2p_message, 1265 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1386 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 1266 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1387 struct GNUNET_MessageHeader, 1267 struct StrataEstimatorMessage,
1388 NULL), 1268 NULL),
1389 GNUNET_MQ_hd_var_size (p2p_message, 1269 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1390 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, 1270 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1391 struct GNUNET_MessageHeader, 1271 struct GNUNET_SET_ElementMessage,
1392 NULL),
1393 GNUNET_MQ_hd_var_size (p2p_message,
1394 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1395 struct GNUNET_MessageHeader,
1396 NULL), 1272 NULL),
1397 GNUNET_MQ_hd_var_size (p2p_message, 1273 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1274 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1275 struct IntersectionElementInfoMessage,
1276 NULL),
1277 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1398 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 1278 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1399 struct GNUNET_MessageHeader, 1279 struct BFMessage,
1400 NULL),
1401 GNUNET_MQ_hd_var_size (p2p_message,
1402 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1403 struct GNUNET_MessageHeader,
1404 NULL), 1280 NULL),
1281 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1282 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1283 struct IntersectionDoneMessage,
1284 NULL),
1405 GNUNET_MQ_handler_end () 1285 GNUNET_MQ_handler_end ()
1406 }; 1286 };
1407 struct Listener *listener; 1287 struct Listener *listener;
1408 1288
1409 if (NULL != listener_get (client)) 1289 if (NULL != cs->listener)
1410 { 1290 {
1411 /* max. one active listener per client! */ 1291 /* max. one active listener per client! */
1412 GNUNET_break (0); 1292 GNUNET_break (0);
1413 GNUNET_SERVICE_client_drop (client); 1293 GNUNET_SERVICE_client_drop (cs->client);
1414 return; 1294 return;
1415 } 1295 }
1416 listener = GNUNET_new (struct Listener); 1296 listener = GNUNET_new (struct Listener);
1417 listener->client = client; 1297 listener->cs = cs;
1418 listener->client_mq = GNUNET_SERVICE_client_get_mq (client);
1419 listener->app_id = msg->app_id; 1298 listener->app_id = msg->app_id;
1420 listener->operation = ntohl (msg->operation); 1299 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1421 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, 1300 GNUNET_CONTAINER_DLL_insert (listener_head,
1422 listeners_tail, 1301 listener_tail,
1423 listener); 1302 listener);
1424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425 "New listener created (op %u, port %s)\n", 1304 "New listener created (op %u, port %s)\n",
1426 listener->operation, 1305 listener->operation,
1427 GNUNET_h2s (&listener->app_id)); 1306 GNUNET_h2s (&listener->app_id));
1428 listener->open_port = GNUNET_CADET_open_porT (cadet, 1307 listener->open_port
1429 &msg->app_id, 1308 = GNUNET_CADET_open_port (cadet,
1430 &channel_new_cb, 1309 &msg->app_id,
1431 listener, 1310 &channel_new_cb,
1432 &channel_window_cb, 1311 listener,
1433 &channel_end_cb, 1312 &channel_window_cb,
1434 cadet_handlers); 1313 &channel_end_cb,
1435 /* check for existing incoming requests the listener might be interested in */ 1314 cadet_handlers);
1436 for (struct Operation *op = incoming_head; NULL != op; op = op->next) 1315 GNUNET_SERVICE_client_continue (cs->client);
1437 {
1438 if (NULL == op->spec)
1439 continue; /* no details available yet */
1440 if (0 != op->suggest_id)
1441 continue; /* this one has been already suggested to a listener */
1442 if (listener->operation != op->spec->operation)
1443 continue; /* incompatible operation */
1444 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1445 &op->spec->app_id))
1446 continue; /* incompatible appliation */
1447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1448 "Found matching existing request\n");
1449 incoming_suggest (op,
1450 listener);
1451 }
1452 GNUNET_SERVICE_client_continue (client);
1453} 1316}
1454 1317
1455 1318
@@ -1464,23 +1327,26 @@ static void
1464handle_client_reject (void *cls, 1327handle_client_reject (void *cls,
1465 const struct GNUNET_SET_RejectMessage *msg) 1328 const struct GNUNET_SET_RejectMessage *msg)
1466{ 1329{
1467 struct GNUNET_SERVICE_Client *client = cls; 1330 struct ClientState *cs = cls;
1468 struct Operation *incoming; 1331 struct Operation *op;
1469 1332
1470 incoming = get_incoming (ntohl (msg->accept_reject_id)); 1333 op = get_incoming (ntohl (msg->accept_reject_id));
1471 if (NULL == incoming) 1334 if (NULL == op)
1472 { 1335 {
1473 /* no matching incoming operation for this reject */ 1336 /* no matching incoming operation for this reject;
1474 GNUNET_break (0); 1337 could be that the other peer already disconnected... */
1475 GNUNET_SERVICE_client_drop (client); 1338 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1339 "Client rejected unknown operation %u\n",
1340 (unsigned int) ntohl (msg->accept_reject_id));
1341 GNUNET_SERVICE_client_continue (cs->client);
1476 return; 1342 return;
1477 } 1343 }
1478 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1479 "Peer request (op %u, app %s) rejected by client\n", 1345 "Peer request (op %u, app %s) rejected by client\n",
1480 incoming->spec->operation, 1346 op->listener->operation,
1481 GNUNET_h2s (&incoming->spec->app_id)); 1347 GNUNET_h2s (&cs->listener->app_id));
1482 GNUNET_CADET_channel_destroy (incoming->channel); 1348 GNUNET_CADET_channel_destroy (op->channel);
1483 GNUNET_SERVICE_client_continue (client); 1349 GNUNET_SERVICE_client_continue (cs->client);
1484} 1350}
1485 1351
1486 1352
@@ -1488,13 +1354,14 @@ handle_client_reject (void *cls,
1488 * Called when a client wants to add or remove an element to a set it inhabits. 1354 * Called when a client wants to add or remove an element to a set it inhabits.
1489 * 1355 *
1490 * @param cls client that sent the message 1356 * @param cls client that sent the message
1491 * @param m message sent by the client 1357 * @param msg message sent by the client
1492 */ 1358 */
1493static int 1359static int
1494check_client_mutation (void *cls, 1360check_client_mutation (void *cls,
1495 const struct GNUNET_MessageHeader *m) 1361 const struct GNUNET_SET_ElementMessage *msg)
1496{ 1362{
1497 /* FIXME: any check we might want to do here? */ 1363 /* NOTE: Technically, we should probably check with the
1364 block library whether the element we are given is well-formed */
1498 return GNUNET_OK; 1365 return GNUNET_OK;
1499} 1366}
1500 1367
@@ -1503,25 +1370,23 @@ check_client_mutation (void *cls,
1503 * Called when a client wants to add or remove an element to a set it inhabits. 1370 * Called when a client wants to add or remove an element to a set it inhabits.
1504 * 1371 *
1505 * @param cls client that sent the message 1372 * @param cls client that sent the message
1506 * @param m message sent by the client 1373 * @param msg message sent by the client
1507 */ 1374 */
1508static void 1375static void
1509handle_client_mutation (void *cls, 1376handle_client_mutation (void *cls,
1510 const struct GNUNET_MessageHeader *m) 1377 const struct GNUNET_SET_ElementMessage *msg)
1511{ 1378{
1512 struct GNUNET_SERVICE_Client *client = cls; 1379 struct ClientState *cs = cls;
1513 struct Set *set; 1380 struct Set *set;
1514 1381
1515 set = set_get (client); 1382 if (NULL == (set = cs->set))
1516 if (NULL == set)
1517 { 1383 {
1518 /* client without a set requested an operation */ 1384 /* client without a set requested an operation */
1519 GNUNET_break (0); 1385 GNUNET_break (0);
1520 GNUNET_SERVICE_client_drop (client); 1386 GNUNET_SERVICE_client_drop (cs->client);
1521 return; 1387 return;
1522 } 1388 }
1523 1389 GNUNET_SERVICE_client_continue (cs->client);
1524 GNUNET_SERVICE_client_continue (client);
1525 1390
1526 if (0 != set->content->iterator_count) 1391 if (0 != set->content->iterator_count)
1527 { 1392 {
@@ -1529,16 +1394,18 @@ handle_client_mutation (void *cls,
1529 1394
1530 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1531 "Scheduling mutation on set\n"); 1396 "Scheduling mutation on set\n");
1532
1533 pm = GNUNET_new (struct PendingMutation); 1397 pm = GNUNET_new (struct PendingMutation);
1534 pm->mutation_message = GNUNET_copy_message (m); 1398 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1535 pm->set = set; 1399 pm->set = set;
1536 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, 1400 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1537 set->content->pending_mutations_tail, 1401 set->content->pending_mutations_tail,
1538 pm); 1402 pm);
1539 return; 1403 return;
1540 } 1404 }
1541 execute_mutation (set, m); 1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406 "Executing mutation on set\n");
1407 execute_mutation (set,
1408 msg);
1542} 1409}
1543 1410
1544 1411
@@ -1555,8 +1422,8 @@ advance_generation (struct Set *set)
1555 1422
1556 if (set->current_generation == set->content->latest_generation) 1423 if (set->current_generation == set->content->latest_generation)
1557 { 1424 {
1558 set->content->latest_generation += 1; 1425 set->content->latest_generation++;
1559 set->current_generation += 1; 1426 set->current_generation++;
1560 return; 1427 return;
1561 } 1428 }
1562 1429
@@ -1564,10 +1431,8 @@ advance_generation (struct Set *set)
1564 1431
1565 r.start = set->current_generation + 1; 1432 r.start = set->current_generation + 1;
1566 r.end = set->content->latest_generation + 1; 1433 r.end = set->content->latest_generation + 1;
1567
1568 set->content->latest_generation = r.end; 1434 set->content->latest_generation = r.end;
1569 set->current_generation = r.end; 1435 set->current_generation = r.end;
1570
1571 GNUNET_array_append (set->excluded_generations, 1436 GNUNET_array_append (set->excluded_generations,
1572 set->excluded_generations_size, 1437 set->excluded_generations_size,
1573 r); 1438 r);
@@ -1605,112 +1470,105 @@ static void
1605handle_client_evaluate (void *cls, 1470handle_client_evaluate (void *cls,
1606 const struct GNUNET_SET_EvaluateMessage *msg) 1471 const struct GNUNET_SET_EvaluateMessage *msg)
1607{ 1472{
1608 struct GNUNET_SERVICE_Client *client = cls; 1473 struct ClientState *cs = cls;
1609 struct Operation *op = GNUNET_new (struct Operation); 1474 struct Operation *op = GNUNET_new (struct Operation);
1610 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1475 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1611 GNUNET_MQ_hd_var_size (p2p_message, 1476 GNUNET_MQ_hd_var_size (incoming_msg,
1612 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1477 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1613 struct GNUNET_MessageHeader, 1478 struct OperationRequestMessage,
1614 op), 1479 op),
1615 GNUNET_MQ_hd_var_size (p2p_message, 1480 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1616 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 1481 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1617 struct GNUNET_MessageHeader, 1482 struct IBFMessage,
1618 op), 1483 op),
1619 GNUNET_MQ_hd_var_size (p2p_message, 1484 GNUNET_MQ_hd_var_size (union_p2p_elements,
1620 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 1485 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1621 struct GNUNET_MessageHeader, 1486 struct GNUNET_SET_ElementMessage,
1622 op), 1487 op),
1623 GNUNET_MQ_hd_var_size (p2p_message, 1488 GNUNET_MQ_hd_var_size (union_p2p_offer,
1624 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 1489 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1625 struct GNUNET_MessageHeader, 1490 struct GNUNET_MessageHeader,
1626 op), 1491 op),
1627 GNUNET_MQ_hd_var_size (p2p_message, 1492 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1628 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 1493 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1629 struct GNUNET_MessageHeader, 1494 struct InquiryMessage,
1630 op), 1495 op),
1631 GNUNET_MQ_hd_var_size (p2p_message, 1496 GNUNET_MQ_hd_var_size (union_p2p_demand,
1632 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 1497 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1633 struct GNUNET_MessageHeader, 1498 struct GNUNET_MessageHeader,
1634 op), 1499 op),
1635 GNUNET_MQ_hd_var_size (p2p_message, 1500 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1636 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 1501 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1637 struct GNUNET_MessageHeader, 1502 struct GNUNET_MessageHeader,
1638 op), 1503 op),
1639 GNUNET_MQ_hd_var_size (p2p_message, 1504 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1505 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1506 struct GNUNET_MessageHeader,
1507 op),
1508 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1509 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1510 struct GNUNET_MessageHeader,
1511 op),
1512 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1640 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 1513 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1641 struct GNUNET_MessageHeader, 1514 struct StrataEstimatorMessage,
1642 op), 1515 op),
1643 GNUNET_MQ_hd_var_size (p2p_message, 1516 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1644 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 1517 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1645 struct GNUNET_MessageHeader, 1518 struct StrataEstimatorMessage,
1646 op),
1647 GNUNET_MQ_hd_var_size (p2p_message,
1648 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1649 struct GNUNET_MessageHeader,
1650 op), 1519 op),
1651 GNUNET_MQ_hd_var_size (p2p_message, 1520 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1652 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1653 struct GNUNET_MessageHeader,
1654 op),
1655 GNUNET_MQ_hd_var_size (p2p_message,
1656 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, 1521 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1657 struct GNUNET_MessageHeader, 1522 struct GNUNET_SET_ElementMessage,
1658 op), 1523 op),
1659 GNUNET_MQ_hd_var_size (p2p_message, 1524 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1660 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 1525 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1661 struct GNUNET_MessageHeader, 1526 struct IntersectionElementInfoMessage,
1662 op), 1527 op),
1663 GNUNET_MQ_hd_var_size (p2p_message, 1528 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1664 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 1529 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1665 struct GNUNET_MessageHeader, 1530 struct BFMessage,
1666 op),
1667 GNUNET_MQ_hd_var_size (p2p_message,
1668 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1669 struct GNUNET_MessageHeader,
1670 op), 1531 op),
1532 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1533 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1534 struct IntersectionDoneMessage,
1535 op),
1671 GNUNET_MQ_handler_end () 1536 GNUNET_MQ_handler_end ()
1672 }; 1537 };
1673 struct Set *set; 1538 struct Set *set;
1674 struct OperationSpecification *spec;
1675 const struct GNUNET_MessageHeader *context; 1539 const struct GNUNET_MessageHeader *context;
1676 1540
1677 set = set_get (client); 1541 if (NULL == (set = cs->set))
1678 if (NULL == set)
1679 { 1542 {
1680 GNUNET_break (0); 1543 GNUNET_break (0);
1681 GNUNET_free (op); 1544 GNUNET_free (op);
1682 GNUNET_SERVICE_client_drop (client); 1545 GNUNET_SERVICE_client_drop (cs->client);
1683 return; 1546 return;
1684 } 1547 }
1685 spec = GNUNET_new (struct OperationSpecification); 1548 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1686 spec->operation = set->operation; 1549 UINT32_MAX);
1687 spec->app_id = msg->app_id; 1550 op->peer = msg->target_peer;
1688 spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 1551 op->result_mode = ntohl (msg->result_mode);
1689 UINT32_MAX); 1552 op->client_request_id = ntohl (msg->request_id);
1690 spec->peer = msg->target_peer; 1553 op->byzantine = msg->byzantine;
1691 spec->set = set; 1554 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1692 spec->result_mode = ntohl (msg->result_mode); 1555 op->force_full = msg->force_full;
1693 spec->client_request_id = ntohl (msg->request_id); 1556 op->force_delta = msg->force_delta;
1694 spec->byzantine = msg->byzantine;
1695 spec->byzantine_lower_bound = msg->byzantine_lower_bound;
1696 spec->force_full = msg->force_full;
1697 spec->force_delta = msg->force_delta;
1698 context = GNUNET_MQ_extract_nested_mh (msg); 1557 context = GNUNET_MQ_extract_nested_mh (msg);
1699 op->spec = spec;
1700 1558
1701 // Advance generation values, so that 1559 /* Advance generation values, so that
1702 // mutations won't interfer with the running operation. 1560 mutations won't interfer with the running operation. */
1561 op->set = set;
1703 op->generation_created = set->current_generation; 1562 op->generation_created = set->current_generation;
1704 advance_generation (set); 1563 advance_generation (set);
1705
1706 op->vt = set->vt;
1707 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1564 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1708 set->ops_tail, 1565 set->ops_tail,
1709 op); 1566 op);
1710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1711 "Creating new CADET channel to port %s\n", 1568 "Creating new CADET channel to port %s for set operation type %u\n",
1712 GNUNET_h2s (&msg->app_id)); 1569 GNUNET_h2s (&msg->app_id),
1713 op->channel = GNUNET_CADET_channel_creatE (cadet, 1570 set->operation);
1571 op->channel = GNUNET_CADET_channel_create (cadet,
1714 op, 1572 op,
1715 &msg->target_peer, 1573 &msg->target_peer,
1716 &msg->app_id, 1574 &msg->app_id,
@@ -1719,9 +1577,15 @@ handle_client_evaluate (void *cls,
1719 &channel_end_cb, 1577 &channel_end_cb,
1720 cadet_handlers); 1578 cadet_handlers);
1721 op->mq = GNUNET_CADET_get_mq (op->channel); 1579 op->mq = GNUNET_CADET_get_mq (op->channel);
1722 set->vt->evaluate (op, 1580 op->state = set->vt->evaluate (op,
1723 context); 1581 context);
1724 GNUNET_SERVICE_client_continue (client); 1582 if (NULL == op->state)
1583 {
1584 GNUNET_break (0);
1585 GNUNET_SERVICE_client_drop (cs->client);
1586 return;
1587 }
1588 GNUNET_SERVICE_client_continue (cs->client);
1725} 1589}
1726 1590
1727 1591
@@ -1737,15 +1601,14 @@ static void
1737handle_client_iter_ack (void *cls, 1601handle_client_iter_ack (void *cls,
1738 const struct GNUNET_SET_IterAckMessage *ack) 1602 const struct GNUNET_SET_IterAckMessage *ack)
1739{ 1603{
1740 struct GNUNET_SERVICE_Client *client = cls; 1604 struct ClientState *cs = cls;
1741 struct Set *set; 1605 struct Set *set;
1742 1606
1743 set = set_get (client); 1607 if (NULL == (set = cs->set))
1744 if (NULL == set)
1745 { 1608 {
1746 /* client without a set acknowledged receiving a value */ 1609 /* client without a set acknowledged receiving a value */
1747 GNUNET_break (0); 1610 GNUNET_break (0);
1748 GNUNET_SERVICE_client_drop (client); 1611 GNUNET_SERVICE_client_drop (cs->client);
1749 return; 1612 return;
1750 } 1613 }
1751 if (NULL == set->iter) 1614 if (NULL == set->iter)
@@ -1753,10 +1616,10 @@ handle_client_iter_ack (void *cls,
1753 /* client sent an ack, but we were not expecting one (as 1616 /* client sent an ack, but we were not expecting one (as
1754 set iteration has finished) */ 1617 set iteration has finished) */
1755 GNUNET_break (0); 1618 GNUNET_break (0);
1756 GNUNET_SERVICE_client_drop (client); 1619 GNUNET_SERVICE_client_drop (cs->client);
1757 return; 1620 return;
1758 } 1621 }
1759 GNUNET_SERVICE_client_continue (client); 1622 GNUNET_SERVICE_client_continue (cs->client);
1760 if (ntohl (ack->send_more)) 1623 if (ntohl (ack->send_more))
1761 { 1624 {
1762 send_client_element (set); 1625 send_client_element (set);
@@ -1780,42 +1643,33 @@ static void
1780handle_client_copy_lazy_prepare (void *cls, 1643handle_client_copy_lazy_prepare (void *cls,
1781 const struct GNUNET_MessageHeader *mh) 1644 const struct GNUNET_MessageHeader *mh)
1782{ 1645{
1783 struct GNUNET_SERVICE_Client *client = cls; 1646 struct ClientState *cs = cls;
1784 struct Set *set; 1647 struct Set *set;
1785 struct LazyCopyRequest *cr; 1648 struct LazyCopyRequest *cr;
1786 struct GNUNET_MQ_Envelope *ev; 1649 struct GNUNET_MQ_Envelope *ev;
1787 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; 1650 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1788 1651
1789 set = set_get (client); 1652 if (NULL == (set = cs->set))
1790 if (NULL == set)
1791 { 1653 {
1792 /* client without a set requested an operation */ 1654 /* client without a set requested an operation */
1793 GNUNET_break (0); 1655 GNUNET_break (0);
1794 GNUNET_SERVICE_client_drop (client); 1656 GNUNET_SERVICE_client_drop (cs->client);
1795 return; 1657 return;
1796 } 1658 }
1797 1659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660 "Client requested creation of lazy copy\n");
1798 cr = GNUNET_new (struct LazyCopyRequest); 1661 cr = GNUNET_new (struct LazyCopyRequest);
1799 1662 cr->cookie = ++lazy_copy_cookie;
1800 cr->cookie = lazy_copy_cookie;
1801 lazy_copy_cookie += 1;
1802 cr->source_set = set; 1663 cr->source_set = set;
1803
1804 GNUNET_CONTAINER_DLL_insert (lazy_copy_head, 1664 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1805 lazy_copy_tail, 1665 lazy_copy_tail,
1806 cr); 1666 cr);
1807
1808
1809 ev = GNUNET_MQ_msg (resp_msg, 1667 ev = GNUNET_MQ_msg (resp_msg,
1810 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); 1668 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1811 resp_msg->cookie = cr->cookie; 1669 resp_msg->cookie = cr->cookie;
1812 GNUNET_MQ_send (set->client_mq, ev); 1670 GNUNET_MQ_send (set->cs->mq,
1813 1671 ev);
1814 1672 GNUNET_SERVICE_client_continue (cs->client);
1815 GNUNET_SERVICE_client_continue (client);
1816
1817 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1818 "Client requested lazy copy\n");
1819} 1673}
1820 1674
1821 1675
@@ -1829,21 +1683,19 @@ static void
1829handle_client_copy_lazy_connect (void *cls, 1683handle_client_copy_lazy_connect (void *cls,
1830 const struct GNUNET_SET_CopyLazyConnectMessage *msg) 1684 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1831{ 1685{
1832 struct GNUNET_SERVICE_Client *client = cls; 1686 struct ClientState *cs = cls;
1833 struct LazyCopyRequest *cr; 1687 struct LazyCopyRequest *cr;
1834 struct Set *set; 1688 struct Set *set;
1835 int found; 1689 int found;
1836 1690
1837 if (NULL != set_get (client)) 1691 if (NULL != cs->set)
1838 { 1692 {
1839 /* There can only be one set per client */ 1693 /* There can only be one set per client */
1840 GNUNET_break (0); 1694 GNUNET_break (0);
1841 GNUNET_SERVICE_client_drop (client); 1695 GNUNET_SERVICE_client_drop (cs->client);
1842 return; 1696 return;
1843 } 1697 }
1844
1845 found = GNUNET_NO; 1698 found = GNUNET_NO;
1846
1847 for (cr = lazy_copy_head; NULL != cr; cr = cr->next) 1699 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1848 { 1700 {
1849 if (cr->cookie == msg->cookie) 1701 if (cr->cookie == msg->cookie)
@@ -1852,21 +1704,20 @@ handle_client_copy_lazy_connect (void *cls,
1852 break; 1704 break;
1853 } 1705 }
1854 } 1706 }
1855
1856 if (GNUNET_NO == found) 1707 if (GNUNET_NO == found)
1857 { 1708 {
1858 /* client asked for copy with cookie we don't know */ 1709 /* client asked for copy with cookie we don't know */
1859 GNUNET_break (0); 1710 GNUNET_break (0);
1860 GNUNET_SERVICE_client_drop (client); 1711 GNUNET_SERVICE_client_drop (cs->client);
1861 return; 1712 return;
1862 } 1713 }
1863
1864 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, 1714 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1865 lazy_copy_tail, 1715 lazy_copy_tail,
1866 cr); 1716 cr);
1867 1717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718 "Client %p requested use of lazy copy\n",
1719 cs);
1868 set = GNUNET_new (struct Set); 1720 set = GNUNET_new (struct Set);
1869
1870 switch (cr->source_set->operation) 1721 switch (cr->source_set->operation)
1871 { 1722 {
1872 case GNUNET_SET_OPERATION_INTERSECTION: 1723 case GNUNET_SET_OPERATION_INTERSECTION:
@@ -1886,37 +1737,28 @@ handle_client_copy_lazy_connect (void *cls,
1886 GNUNET_break (0); 1737 GNUNET_break (0);
1887 GNUNET_free (set); 1738 GNUNET_free (set);
1888 GNUNET_free (cr); 1739 GNUNET_free (cr);
1889 GNUNET_SERVICE_client_drop (client); 1740 GNUNET_SERVICE_client_drop (cs->client);
1890 return; 1741 return;
1891 } 1742 }
1892 1743
1893 set->operation = cr->source_set->operation; 1744 set->operation = cr->source_set->operation;
1894 set->state = set->vt->copy_state (cr->source_set); 1745 set->state = set->vt->copy_state (cr->source_set->state);
1895 set->content = cr->source_set->content; 1746 set->content = cr->source_set->content;
1896 set->content->refcount += 1; 1747 set->content->refcount++;
1897 1748
1898 set->current_generation = cr->source_set->current_generation; 1749 set->current_generation = cr->source_set->current_generation;
1899 set->excluded_generations_size = cr->source_set->excluded_generations_size; 1750 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1900 set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations, 1751 set->excluded_generations
1901 set->excluded_generations_size * sizeof (struct GenerationRange)); 1752 = GNUNET_memdup (cr->source_set->excluded_generations,
1753 set->excluded_generations_size * sizeof (struct GenerationRange));
1902 1754
1903 /* Advance the generation of the new set, so that mutations to the 1755 /* Advance the generation of the new set, so that mutations to the
1904 of the cloned set and the source set are independent. */ 1756 of the cloned set and the source set are independent. */
1905 advance_generation (set); 1757 advance_generation (set);
1906 1758 set->cs = cs;
1907 1759 cs->set = set;
1908 set->client = client;
1909 set->client_mq = GNUNET_SERVICE_client_get_mq (client);
1910 GNUNET_CONTAINER_DLL_insert (sets_head,
1911 sets_tail,
1912 set);
1913
1914 GNUNET_free (cr); 1760 GNUNET_free (cr);
1915 1761 GNUNET_SERVICE_client_continue (cs->client);
1916 GNUNET_SERVICE_client_continue (client);
1917
1918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919 "Client connected to lazy set\n");
1920} 1762}
1921 1763
1922 1764
@@ -1930,26 +1772,22 @@ static void
1930handle_client_cancel (void *cls, 1772handle_client_cancel (void *cls,
1931 const struct GNUNET_SET_CancelMessage *msg) 1773 const struct GNUNET_SET_CancelMessage *msg)
1932{ 1774{
1933 struct GNUNET_SERVICE_Client *client = cls; 1775 struct ClientState *cs = cls;
1934 struct Set *set; 1776 struct Set *set;
1935 struct Operation *op; 1777 struct Operation *op;
1936 int found; 1778 int found;
1937 1779
1938 set = set_get (client); 1780 if (NULL == (set = cs->set))
1939 if (NULL == set)
1940 { 1781 {
1941 /* client without a set requested an operation */ 1782 /* client without a set requested an operation */
1942 GNUNET_break (0); 1783 GNUNET_break (0);
1943 GNUNET_SERVICE_client_drop (client); 1784 GNUNET_SERVICE_client_drop (cs->client);
1944 return; 1785 return;
1945 } 1786 }
1946 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1947 "Client requested cancel for op %u\n",
1948 (uint32_t) ntohl (msg->request_id));
1949 found = GNUNET_NO; 1787 found = GNUNET_NO;
1950 for (op = set->ops_head; NULL != op; op = op->next) 1788 for (op = set->ops_head; NULL != op; op = op->next)
1951 { 1789 {
1952 if (op->spec->client_request_id == ntohl (msg->request_id)) 1790 if (op->client_request_id == ntohl (msg->request_id))
1953 { 1791 {
1954 found = GNUNET_YES; 1792 found = GNUNET_YES;
1955 break; 1793 break;
@@ -1962,15 +1800,19 @@ handle_client_cancel (void *cls,
1962 * yet and try to cancel the (just barely non-existent) operation. 1800 * yet and try to cancel the (just barely non-existent) operation.
1963 * So this is not a hard error. 1801 * So this is not a hard error.
1964 */ 1802 */
1965 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1803 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1966 "Client canceled non-existent op\n"); 1804 "Client canceled non-existent op %u\n",
1805 (uint32_t) ntohl (msg->request_id));
1967 } 1806 }
1968 else 1807 else
1969 { 1808 {
1809 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1810 "Client requested cancel for op %u\n",
1811 (uint32_t) ntohl (msg->request_id));
1970 _GSS_operation_destroy (op, 1812 _GSS_operation_destroy (op,
1971 GNUNET_YES); 1813 GNUNET_YES);
1972 } 1814 }
1973 GNUNET_SERVICE_client_continue (client); 1815 GNUNET_SERVICE_client_continue (cs->client);
1974} 1816}
1975 1817
1976 1818
@@ -1986,18 +1828,18 @@ static void
1986handle_client_accept (void *cls, 1828handle_client_accept (void *cls,
1987 const struct GNUNET_SET_AcceptMessage *msg) 1829 const struct GNUNET_SET_AcceptMessage *msg)
1988{ 1830{
1989 struct GNUNET_SERVICE_Client *client = cls; 1831 struct ClientState *cs = cls;
1990 struct Set *set; 1832 struct Set *set;
1991 struct Operation *op; 1833 struct Operation *op;
1992 struct GNUNET_SET_ResultMessage *result_message; 1834 struct GNUNET_SET_ResultMessage *result_message;
1993 struct GNUNET_MQ_Envelope *ev; 1835 struct GNUNET_MQ_Envelope *ev;
1836 struct Listener *listener;
1994 1837
1995 set = set_get (client); 1838 if (NULL == (set = cs->set))
1996 if (NULL == set)
1997 { 1839 {
1998 /* client without a set requested to accept */ 1840 /* client without a set requested to accept */
1999 GNUNET_break (0); 1841 GNUNET_break (0);
2000 GNUNET_SERVICE_client_drop (client); 1842 GNUNET_SERVICE_client_drop (cs->client);
2001 return; 1843 return;
2002 } 1844 }
2003 op = get_incoming (ntohl (msg->accept_reject_id)); 1845 op = get_incoming (ntohl (msg->accept_reject_id));
@@ -2005,71 +1847,75 @@ handle_client_accept (void *cls,
2005 { 1847 {
2006 /* It is not an error if the set op does not exist -- it may 1848 /* It is not an error if the set op does not exist -- it may
2007 * have been destroyed when the partner peer disconnected. */ 1849 * have been destroyed when the partner peer disconnected. */
2008 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1850 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2009 "Client accepted request that is no longer active\n"); 1851 "Client %p accepted request %u of listener %p that is no longer active\n",
1852 cs,
1853 ntohl (msg->accept_reject_id),
1854 cs->listener);
2010 ev = GNUNET_MQ_msg (result_message, 1855 ev = GNUNET_MQ_msg (result_message,
2011 GNUNET_MESSAGE_TYPE_SET_RESULT); 1856 GNUNET_MESSAGE_TYPE_SET_RESULT);
2012 result_message->request_id = msg->request_id; 1857 result_message->request_id = msg->request_id;
2013 result_message->element_type = 0;
2014 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); 1858 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2015 GNUNET_MQ_send (set->client_mq, ev); 1859 GNUNET_MQ_send (set->cs->mq,
2016 GNUNET_SERVICE_client_continue (client); 1860 ev);
1861 GNUNET_SERVICE_client_continue (cs->client);
2017 return; 1862 return;
2018 } 1863 }
2019
2020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1864 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2021 "Client accepting request %u\n", 1865 "Client accepting request %u\n",
2022 (uint32_t) ntohl (msg->accept_reject_id)); 1866 (uint32_t) ntohl (msg->accept_reject_id));
2023 GNUNET_assert (GNUNET_YES == op->is_incoming); 1867 listener = op->listener;
2024 op->is_incoming = GNUNET_NO; 1868 op->listener = NULL;
2025 GNUNET_CONTAINER_DLL_remove (incoming_head, 1869 GNUNET_CONTAINER_DLL_remove (listener->op_head,
2026 incoming_tail, 1870 listener->op_tail,
2027 op); 1871 op);
2028 op->spec->set = set; 1872 op->set = set;
2029 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1873 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2030 set->ops_tail, 1874 set->ops_tail,
2031 op); 1875 op);
2032 op->spec->client_request_id = ntohl (msg->request_id); 1876 op->client_request_id = ntohl (msg->request_id);
2033 op->spec->result_mode = ntohl (msg->result_mode); 1877 op->result_mode = ntohl (msg->result_mode);
2034 op->spec->byzantine = msg->byzantine; 1878 op->byzantine = msg->byzantine;
2035 op->spec->byzantine_lower_bound = msg->byzantine_lower_bound; 1879 op->byzantine_lower_bound = msg->byzantine_lower_bound;
2036 op->spec->force_full = msg->force_full; 1880 op->force_full = msg->force_full;
2037 op->spec->force_delta = msg->force_delta; 1881 op->force_delta = msg->force_delta;
2038 1882
2039 // Advance generation values, so that 1883 /* Advance generation values, so that future mutations do not
2040 // mutations won't interfer with the running operation. 1884 interfer with the running operation. */
2041 op->generation_created = set->current_generation; 1885 op->generation_created = set->current_generation;
2042 advance_generation (set); 1886 advance_generation (set);
2043 1887 GNUNET_assert (NULL == op->state);
2044 op->vt = set->vt; 1888 op->state = set->vt->accept (op);
2045 op->vt->accept (op); 1889 if (NULL == op->state)
2046 GNUNET_SERVICE_client_continue (client); 1890 {
1891 GNUNET_break (0);
1892 GNUNET_SERVICE_client_drop (cs->client);
1893 return;
1894 }
1895 /* Now allow CADET to continue, as we did not do this in
1896 #handle_incoming_msg (as we wanted to first see if the
1897 local client would accept the request). */
1898 GNUNET_CADET_receive_done (op->channel);
1899 GNUNET_SERVICE_client_continue (cs->client);
2047} 1900}
2048 1901
2049 1902
2050/** 1903/**
2051 * Called to clean up, after a shutdown has been requested. 1904 * Called to clean up, after a shutdown has been requested.
2052 * 1905 *
2053 * @param cls closure 1906 * @param cls closure, NULL
2054 */ 1907 */
2055static void 1908static void
2056shutdown_task (void *cls) 1909shutdown_task (void *cls)
2057{ 1910{
2058 while (NULL != incoming_head) 1911 /* Delay actual shutdown to allow service to disconnect clients */
2059 incoming_destroy (incoming_head);
2060 while (NULL != listeners_head)
2061 listener_destroy (listeners_head);
2062 while (NULL != sets_head)
2063 set_destroy (sets_head);
2064
2065 /* it's important to destroy cadet at the end, as all channels
2066 * must be destroyed before the cadet handle! */
2067 if (NULL != cadet) 1912 if (NULL != cadet)
2068 { 1913 {
2069 GNUNET_CADET_disconnect (cadet); 1914 GNUNET_CADET_disconnect (cadet);
2070 cadet = NULL; 1915 cadet = NULL;
2071 } 1916 }
2072 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); 1917 GNUNET_STATISTICS_destroy (_GSS_statistics,
1918 GNUNET_YES);
2073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2074 "handled shutdown request\n"); 1920 "handled shutdown request\n");
2075} 1921}
@@ -2088,15 +1934,19 @@ run (void *cls,
2088 const struct GNUNET_CONFIGURATION_Handle *cfg, 1934 const struct GNUNET_CONFIGURATION_Handle *cfg,
2089 struct GNUNET_SERVICE_Handle *service) 1935 struct GNUNET_SERVICE_Handle *service)
2090{ 1936{
2091 configuration = cfg; 1937 /* FIXME: need to modify SERVICE (!) API to allow
1938 us to run a shutdown task *after* clients were
1939 forcefully disconnected! */
2092 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1940 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2093 NULL); 1941 NULL);
2094 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); 1942 _GSS_statistics = GNUNET_STATISTICS_create ("set",
2095 cadet = GNUNET_CADET_connecT (cfg); 1943 cfg);
1944 cadet = GNUNET_CADET_connect (cfg);
2096 if (NULL == cadet) 1945 if (NULL == cadet)
2097 { 1946 {
2098 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1947 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2099 _("Could not connect to CADET service\n")); 1948 _("Could not connect to CADET service\n"));
1949 GNUNET_SCHEDULER_shutdown ();
2100 return; 1950 return;
2101 } 1951 }
2102} 1952}
@@ -2122,7 +1972,7 @@ GNUNET_SERVICE_MAIN
2122 NULL), 1972 NULL),
2123 GNUNET_MQ_hd_var_size (client_mutation, 1973 GNUNET_MQ_hd_var_size (client_mutation,
2124 GNUNET_MESSAGE_TYPE_SET_ADD, 1974 GNUNET_MESSAGE_TYPE_SET_ADD,
2125 struct GNUNET_MessageHeader, 1975 struct GNUNET_SET_ElementMessage,
2126 NULL), 1976 NULL),
2127 GNUNET_MQ_hd_fixed_size (client_create_set, 1977 GNUNET_MQ_hd_fixed_size (client_create_set,
2128 GNUNET_MESSAGE_TYPE_SET_CREATE, 1978 GNUNET_MESSAGE_TYPE_SET_CREATE,
@@ -2146,7 +1996,7 @@ GNUNET_SERVICE_MAIN
2146 NULL), 1996 NULL),
2147 GNUNET_MQ_hd_var_size (client_mutation, 1997 GNUNET_MQ_hd_var_size (client_mutation,
2148 GNUNET_MESSAGE_TYPE_SET_REMOVE, 1998 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2149 struct GNUNET_MessageHeader, 1999 struct GNUNET_SET_ElementMessage,
2150 NULL), 2000 NULL),
2151 GNUNET_MQ_hd_fixed_size (client_cancel, 2001 GNUNET_MQ_hd_fixed_size (client_cancel,
2152 GNUNET_MESSAGE_TYPE_SET_CANCEL, 2002 GNUNET_MESSAGE_TYPE_SET_CANCEL,