aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:22 +0100
committerChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:34 +0100
commitbf6f552fdefe75425635f66343f98995e2f602f6 (patch)
treeadd6ea146823579a137763b78e89839ff97b3902 /src/set
parenta9a5994e518ded483edb87513d5197b6539ed4ff (diff)
downloadgnunet-bf6f552fdefe75425635f66343f98995e2f602f6.tar.gz
gnunet-bf6f552fdefe75425635f66343f98995e2f602f6.zip
major clean up and bugfixes of SET
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set.c1319
-rw-r--r--src/set/gnunet-service-set.h360
-rw-r--r--src/set/gnunet-service-set_intersection.c542
-rw-r--r--src/set/gnunet-service-set_union.c363
4 files changed, 1283 insertions, 1301 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index b80c1f2fd..752253411 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013, 2014, 2017 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -35,6 +35,35 @@
35 */ 35 */
36#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES 36#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 37
38
39/**
40 * Lazy copy requests made by a client.
41 */
42struct LazyCopyRequest
43{
44 /**
45 * Kept in a DLL.
46 */
47 struct LazyCopyRequest *prev;
48
49 /**
50 * Kept in a DLL.
51 */
52 struct LazyCopyRequest *next;
53
54 /**
55 * Which set are we supposed to copy?
56 */
57 struct Set *source_set;
58
59 /**
60 * Cookie identifying the request.
61 */
62 uint32_t cookie;
63
64};
65
66
38/** 67/**
39 * A listener is inhabited by a client, and waits for evaluation 68 * A listener is inhabited by a client, and waits for evaluation
40 * requests from remote peers. 69 * requests from remote peers.
@@ -52,21 +81,24 @@ struct Listener
52 struct Listener *prev; 81 struct Listener *prev;
53 82
54 /** 83 /**
55 * Client that owns the listener. 84 * Head of DLL of operations this listener is responsible for.
56 * Only one client may own a listener. 85 * Once the client has accepted/declined the operation, the
86 * operation is moved to the respective set's operation DLLS.
57 */ 87 */
58 struct GNUNET_SERVICE_Client *client; 88 struct Operation *op_head;
59 89
60 /** 90 /**
61 * Message queue for the client 91 * Tail of DLL of operations this listener is responsible for.
92 * Once the client has accepted/declined the operation, the
93 * operation is moved to the respective set's operation DLLS.
62 */ 94 */
63 struct GNUNET_MQ_Handle *client_mq; 95 struct Operation *op_tail;
64 96
65 /** 97 /**
66 * Application ID for the operation, used to distinguish 98 * Client that owns the listener.
67 * multiple operations of the same type with the same peer. 99 * Only one client may own a listener.
68 */ 100 */
69 struct GNUNET_HashCode app_id; 101 struct ClientState *cs;
70 102
71 /** 103 /**
72 * The port we are listening on with CADET. 104 * The port we are listening on with CADET.
@@ -74,27 +106,18 @@ struct Listener
74 struct GNUNET_CADET_Port *open_port; 106 struct GNUNET_CADET_Port *open_port;
75 107
76 /** 108 /**
109 * Application ID for the operation, used to distinguish
110 * multiple operations of the same type with the same peer.
111 */
112 struct GNUNET_HashCode app_id;
113
114 /**
77 * The type of the operation. 115 * The type of the operation.
78 */ 116 */
79 enum GNUNET_SET_OperationType operation; 117 enum GNUNET_SET_OperationType operation;
80}; 118};
81 119
82 120
83struct LazyCopyRequest
84{
85 struct Set *source_set;
86 uint32_t cookie;
87
88 struct LazyCopyRequest *prev;
89 struct LazyCopyRequest *next;
90};
91
92
93/**
94 * Configuration of our local peer.
95 */
96static const struct GNUNET_CONFIGURATION_Handle *configuration;
97
98/** 121/**
99 * Handle to the cadet service, used to listen for and connect to 122 * Handle to the cadet service, used to listen for and connect to
100 * remote peers. 123 * remote peers.
@@ -102,94 +125,48 @@ static const struct GNUNET_CONFIGURATION_Handle *configuration;
102static struct GNUNET_CADET_Handle *cadet; 125static struct GNUNET_CADET_Handle *cadet;
103 126
104/** 127/**
105 * Sets are held in a doubly linked list. 128 * DLL of lazy copy requests by this client.
106 */ 129 */
107static struct Set *sets_head; 130static struct LazyCopyRequest *lazy_copy_head;
108 131
109/** 132/**
110 * Sets are held in a doubly linked list. 133 * DLL of lazy copy requests by this client.
111 */ 134 */
112static struct Set *sets_tail; 135static struct LazyCopyRequest *lazy_copy_tail;
113 136
114/** 137/**
115 * Listeners are held in a doubly linked list. 138 * Generator for unique cookie we set per lazy copy request.
116 */ 139 */
117static struct Listener *listeners_head; 140static uint32_t lazy_copy_cookie;
118 141
119/** 142/**
120 * Listeners are held in a doubly linked list. 143 * Statistics handle.
121 */ 144 */
122static struct Listener *listeners_tail; 145struct GNUNET_STATISTICS_Handle *_GSS_statistics;
123 146
124/** 147/**
125 * Incoming sockets from remote peers are held in a doubly linked 148 * Listeners are held in a doubly linked list.
126 * list.
127 */ 149 */
128static struct Operation *incoming_head; 150static struct Listener *listener_head;
129 151
130/** 152/**
131 * Incoming sockets from remote peers are held in a doubly linked 153 * Listeners are held in a doubly linked list.
132 * list.
133 */ 154 */
134static struct Operation *incoming_tail; 155static struct Listener *listener_tail;
135
136static struct LazyCopyRequest *lazy_copy_head;
137static struct LazyCopyRequest *lazy_copy_tail;
138
139static uint32_t lazy_copy_cookie = 1;
140 156
141/** 157/**
142 * Counter for allocating unique IDs for clients, used to identify 158 * Counter for allocating unique IDs for clients, used to identify
143 * incoming operation requests from remote peers, that the client can 159 * incoming operation requests from remote peers, that the client can
144 * choose to accept or refuse. 160 * choose to accept or refuse. 0 must not be used (reserved for
145 */ 161 * uninitialized).
146static uint32_t suggest_id = 1;
147
148/**
149 * Statistics handle.
150 */ 162 */
151struct GNUNET_STATISTICS_Handle *_GSS_statistics; 163static uint32_t suggest_id;
152
153
154/**
155 * Get set that is owned by the given client, if any.
156 *
157 * @param client client to look for
158 * @return set that the client owns, NULL if the client
159 * does not own a set
160 */
161static struct Set *
162set_get (struct GNUNET_SERVICE_Client *client)
163{
164 for (struct Set *set = sets_head; NULL != set; set = set->next)
165 if (set->client == client)
166 return set;
167 return NULL;
168}
169
170
171/**
172 * Get the listener associated with the given client, if any.
173 *
174 * @param client the client
175 * @return listener associated with the client, NULL
176 * if there isn't any
177 */
178static struct Listener *
179listener_get (struct GNUNET_SERVICE_Client *client)
180{
181 for (struct Listener *listener = listeners_head;
182 NULL != listener;
183 listener = listener->next)
184 if (listener->client == client)
185 return listener;
186 return NULL;
187}
188 164
189 165
190/** 166/**
191 * Get the incoming socket associated with the given id. 167 * Get the incoming socket associated with the given id.
192 * 168 *
169 * @param listener the listener to look in
193 * @param id id to look for 170 * @param id id to look for
194 * @return the incoming socket associated with the id, 171 * @return the incoming socket associated with the id,
195 * or NULL if there is none 172 * or NULL if there is none
@@ -197,44 +174,49 @@ listener_get (struct GNUNET_SERVICE_Client *client)
197static struct Operation * 174static struct Operation *
198get_incoming (uint32_t id) 175get_incoming (uint32_t id)
199{ 176{
200 for (struct Operation *op = incoming_head; NULL != op; op = op->next) 177 for (struct Listener *listener = listener_head;
201 if (op->suggest_id == id) 178 NULL != listener;
202 { 179 listener = listener->next)
203 GNUNET_assert (GNUNET_YES == op->is_incoming); 180 {
204 return op; 181 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
205 } 182 if (op->suggest_id == id)
183 return op;
184 }
206 return NULL; 185 return NULL;
207} 186}
208 187
209 188
210/** 189/**
211 * Destroy a listener, free all resources associated with it. 190 * Destroy an incoming request from a remote peer
212 * 191 *
213 * @param listener listener to destroy 192 * @param op remote request to destroy
214 */ 193 */
215static void 194static void
216listener_destroy (struct Listener *listener) 195incoming_destroy (struct Operation *op)
217{ 196{
218 /* If the client is not dead yet, destroy it. 197 struct Listener *listener;
219 * The client's destroy callback will destroy the listener again. */ 198 struct GNUNET_CADET_Channel *channel;
220 if (NULL != listener->client)
221 {
222 struct GNUNET_SERVICE_Client *client = listener->client;
223
224 GNUNET_MQ_destroy (listener->client_mq);
225 listener->client_mq = NULL;
226 199
227 listener->client = NULL; 200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 201 "Destroying incoming operation %p\n",
229 "Disconnecting listener client\n"); 202 op);
230 GNUNET_SERVICE_client_drop (client); 203 if (NULL != (listener = op->listener))
231 return; 204 {
205 GNUNET_CONTAINER_DLL_remove (listener->op_head,
206 listener->op_tail,
207 op);
208 op->listener = NULL;
209 }
210 if (NULL != op->timeout_task)
211 {
212 GNUNET_SCHEDULER_cancel (op->timeout_task);
213 op->timeout_task = NULL;
214 }
215 if (NULL != (channel = op->channel))
216 {
217 op->channel = NULL;
218 GNUNET_CADET_channel_destroy (channel);
232 } 219 }
233 GNUNET_CADET_close_port (listener->open_port);
234 GNUNET_CONTAINER_DLL_remove (listeners_head,
235 listeners_tail,
236 listener);
237 GNUNET_free (listener);
238} 220}
239 221
240 222
@@ -304,12 +286,11 @@ garbage_collect_cb (void *cls,
304static void 286static void
305collect_generation_garbage (struct Set *set) 287collect_generation_garbage (struct Set *set)
306{ 288{
307 struct Operation *op;
308 struct GarbageContext gc; 289 struct GarbageContext gc;
309 290
310 gc.min_op_generation = UINT_MAX; 291 gc.min_op_generation = UINT_MAX;
311 gc.max_op_generation = 0; 292 gc.max_op_generation = 0;
312 for (op = set->ops_head; NULL != op; op = op->next) 293 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
313 { 294 {
314 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation, 295 gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
315 op->generation_created); 296 op->generation_created);
@@ -323,23 +304,36 @@ collect_generation_garbage (struct Set *set)
323} 304}
324 305
325 306
307/**
308 * Is @a generation in the range of exclusions?
309 *
310 * @param generation generation to query
311 * @param excluded array of generations where the element is excluded
312 * @param excluded_size length of the @a excluded array
313 * @return #GNUNET_YES if @a generation is in any of the ranges
314 */
326static int 315static int
327is_excluded_generation (unsigned int generation, 316is_excluded_generation (unsigned int generation,
328 struct GenerationRange *excluded, 317 struct GenerationRange *excluded,
329 unsigned int excluded_size) 318 unsigned int excluded_size)
330{ 319{
331 unsigned int i; 320 for (unsigned int i = 0; i < excluded_size; i++)
332 321 if ( (generation >= excluded[i].start) &&
333 for (i = 0; i < excluded_size; i++) 322 (generation < excluded[i].end) )
334 {
335 if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
336 return GNUNET_YES; 323 return GNUNET_YES;
337 }
338
339 return GNUNET_NO; 324 return GNUNET_NO;
340} 325}
341 326
342 327
328/**
329 * Is element @a ee part of the set during @a query_generation?
330 *
331 * @param ee element to test
332 * @param query_generation generation to query
333 * @param excluded array of generations where the element is excluded
334 * @param excluded_size length of the @a excluded array
335 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
336 */
343static int 337static int
344is_element_of_generation (struct ElementEntry *ee, 338is_element_of_generation (struct ElementEntry *ee,
345 unsigned int query_generation, 339 unsigned int query_generation,
@@ -348,11 +342,12 @@ is_element_of_generation (struct ElementEntry *ee,
348{ 342{
349 struct MutationEvent *mut; 343 struct MutationEvent *mut;
350 int is_present; 344 int is_present;
351 unsigned int i;
352 345
353 GNUNET_assert (NULL != ee->mutations); 346 GNUNET_assert (NULL != ee->mutations);
354 347 if (GNUNET_YES ==
355 if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size)) 348 is_excluded_generation (query_generation,
349 excluded,
350 excluded_size))
356 { 351 {
357 GNUNET_break (0); 352 GNUNET_break (0);
358 return GNUNET_NO; 353 return GNUNET_NO;
@@ -362,7 +357,7 @@ is_element_of_generation (struct ElementEntry *ee,
362 357
363 /* Could be made faster with binary search, but lists 358 /* Could be made faster with binary search, but lists
364 are small, so why bother. */ 359 are small, so why bother. */
365 for (i = 0; i < ee->mutations_size; i++) 360 for (unsigned int i = 0; i < ee->mutations_size; i++)
366 { 361 {
367 mut = &ee->mutations[i]; 362 mut = &ee->mutations[i];
368 363
@@ -374,7 +369,10 @@ is_element_of_generation (struct ElementEntry *ee,
374 continue; 369 continue;
375 } 370 }
376 371
377 if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) 372 if (GNUNET_YES ==
373 is_excluded_generation (mut->generation,
374 excluded,
375 excluded_size))
378 { 376 {
379 /* The generation is excluded (because it belongs to another 377 /* The generation is excluded (because it belongs to another
380 fork via a lazy copy) and thus mutations aren't considered 378 fork via a lazy copy) and thus mutations aren't considered
@@ -383,11 +381,12 @@ is_element_of_generation (struct ElementEntry *ee,
383 } 381 }
384 382
385 /* This would be an inconsistency in how we manage mutations. */ 383 /* This would be an inconsistency in how we manage mutations. */
386 if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) ) 384 if ( (GNUNET_YES == is_present) &&
385 (GNUNET_YES == mut->added) )
387 GNUNET_assert (0); 386 GNUNET_assert (0);
388
389 /* Likewise. */ 387 /* Likewise. */
390 if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) ) 388 if ( (GNUNET_NO == is_present) &&
389 (GNUNET_NO == mut->added) )
391 GNUNET_assert (0); 390 GNUNET_assert (0);
392 391
393 is_present = mut->added; 392 is_present = mut->added;
@@ -397,44 +396,33 @@ is_element_of_generation (struct ElementEntry *ee,
397} 396}
398 397
399 398
400int 399/**
401_GSS_is_element_of_set (struct ElementEntry *ee, 400 * Is element @a ee part of the set used by @a op?
402 struct Set *set) 401 *
403{ 402 * @param ee element to test
404 return is_element_of_generation (ee, 403 * @param op operation the defines the set and its generation
405 set->current_generation, 404 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
406 set->excluded_generations, 405 */
407 set->excluded_generations_size);
408}
409
410
411static int
412is_element_of_iteration (struct ElementEntry *ee,
413 struct Set *set)
414{
415 return is_element_of_generation (ee,
416 set->iter_generation,
417 set->excluded_generations,
418 set->excluded_generations_size);
419}
420
421
422int 406int
423_GSS_is_element_of_operation (struct ElementEntry *ee, 407_GSS_is_element_of_operation (struct ElementEntry *ee,
424 struct Operation *op) 408 struct Operation *op)
425{ 409{
426 return is_element_of_generation (ee, 410 return is_element_of_generation (ee,
427 op->generation_created, 411 op->generation_created,
428 op->spec->set->excluded_generations, 412 op->set->excluded_generations,
429 op->spec->set->excluded_generations_size); 413 op->set->excluded_generations_size);
430} 414}
431 415
432 416
433/** 417/**
434 * Destroy the given operation. Call the implementation-specific 418 * Destroy the given operation. Used for any operation where both
435 * cancel function of the operation. Disconnects from the remote 419 * peers were known and that thus actually had a vt and channel. Must
436 * peer. Does not disconnect the client, as there may be multiple 420 * not be used for operations where 'listener' is still set and we do
437 * operations per set. 421 * not know the other peer.
422 *
423 * Call the implementation-specific cancel function of the operation.
424 * Disconnects from the remote peer. Does not disconnect the client,
425 * as there may be multiple operations per set.
438 * 426 *
439 * @param op operation to destroy 427 * @param op operation to destroy
440 * @param gc #GNUNET_YES to perform garbage collection on the set 428 * @param gc #GNUNET_YES to perform garbage collection on the set
@@ -443,39 +431,39 @@ void
443_GSS_operation_destroy (struct Operation *op, 431_GSS_operation_destroy (struct Operation *op,
444 int gc) 432 int gc)
445{ 433{
446 struct Set *set; 434 struct Set *set = op->set;
447 struct GNUNET_CADET_Channel *channel; 435 struct GNUNET_CADET_Channel *channel;
448 436
449 if (NULL == op->vt) 437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438 "Destroying operation %p\n",
439 op);
440 GNUNET_assert (NULL == op->listener);
441 if (NULL != op->state)
450 { 442 {
451 /* already in #_GSS_operation_destroy() */ 443 set->vt->cancel (op);
452 return; 444 op->state = NULL;
453 } 445 }
454 GNUNET_assert (GNUNET_NO == op->is_incoming); 446 if (NULL != set)
455 GNUNET_assert (NULL != op->spec);
456 set = op->spec->set;
457 GNUNET_CONTAINER_DLL_remove (set->ops_head,
458 set->ops_tail,
459 op);
460 op->vt->cancel (op);
461 op->vt = NULL;
462 if (NULL != op->spec)
463 { 447 {
464 if (NULL != op->spec->context_msg) 448 GNUNET_CONTAINER_DLL_remove (set->ops_head,
465 { 449 set->ops_tail,
466 GNUNET_free (op->spec->context_msg); 450 op);
467 op->spec->context_msg = NULL; 451 op->set = NULL;
468 } 452 }
469 GNUNET_free (op->spec); 453 if (NULL != op->context_msg)
470 op->spec = NULL; 454 {
455 GNUNET_free (op->context_msg);
456 op->context_msg = NULL;
471 } 457 }
472 if (NULL != (channel = op->channel)) 458 if (NULL != (channel = op->channel))
473 { 459 {
460 /* This will free op; called conditionally as this helper function
461 is also called from within the channel disconnect handler. */
474 op->channel = NULL; 462 op->channel = NULL;
475 GNUNET_CADET_channel_destroy (channel); 463 GNUNET_CADET_channel_destroy (channel);
476 } 464 }
477 465 if ( (NULL != set) &&
478 if (GNUNET_YES == gc) 466 (GNUNET_YES == gc) )
479 collect_generation_garbage (set); 467 collect_generation_garbage (set);
480 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, 468 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
481 * there was a channel end handler that will free 'op' on the call stack. */ 469 * there was a channel end handler that will free 'op' on the call stack. */
@@ -483,6 +471,28 @@ _GSS_operation_destroy (struct Operation *op,
483 471
484 472
485/** 473/**
474 * Callback called when a client connects to the service.
475 *
476 * @param cls closure for the service
477 * @param c the new client that connected to the service
478 * @param mq the message queue used to send messages to the client
479 * @return @a `struct ClientState`
480 */
481static void *
482client_connect_cb (void *cls,
483 struct GNUNET_SERVICE_Client *c,
484 struct GNUNET_MQ_Handle *mq)
485{
486 struct ClientState *cs;
487
488 cs = GNUNET_new (struct ClientState);
489 cs->client = c;
490 cs->mq = mq;
491 return cs;
492}
493
494
495/**
486 * Iterator over hash map entries to free element entries. 496 * Iterator over hash map entries to free element entries.
487 * 497 *
488 * @param cls closure 498 * @param cls closure
@@ -498,66 +508,76 @@ destroy_elements_iterator (void *cls,
498 struct ElementEntry *ee = value; 508 struct ElementEntry *ee = value;
499 509
500 GNUNET_free_non_null (ee->mutations); 510 GNUNET_free_non_null (ee->mutations);
501
502 GNUNET_free (ee); 511 GNUNET_free (ee);
503 return GNUNET_YES; 512 return GNUNET_YES;
504} 513}
505 514
506 515
507/** 516/**
508 * Destroy a set, and free all resources and operations associated with it. 517 * Clean up after a client has disconnected
509 * 518 *
510 * @param set the set to destroy 519 * @param cls closure, unused
520 * @param client the client to clean up after
521 * @param internal_cls the `struct ClientState`
511 */ 522 */
512static void 523static void
513set_destroy (struct Set *set) 524client_disconnect_cb (void *cls,
525 struct GNUNET_SERVICE_Client *client,
526 void *internal_cls)
514{ 527{
515 if (NULL != set->client) 528 struct ClientState *cs = internal_cls;
516 { 529 struct Operation *op;
517 /* If the client is not dead yet, destroy it. The client's destroy 530 struct Listener *listener;
518 * callback will call `set_destroy()` again in this case. We do 531 struct Set *set;
519 * this so that the channel end handler still has a valid set handle 532
520 * to destroy. */ 533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 struct GNUNET_SERVICE_Client *client = set->client; 534 "Client disconnected, cleaning up\n");
522 535 if (NULL != (set = cs->set))
523 set->client = NULL;
524 GNUNET_SERVICE_client_drop (client);
525 return;
526 }
527 GNUNET_assert (NULL != set->state);
528 while (NULL != set->ops_head)
529 _GSS_operation_destroy (set->ops_head, GNUNET_NO);
530 set->vt->destroy_set (set->state);
531 set->state = NULL;
532 if (NULL != set->iter)
533 {
534 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
535 set->iter = NULL;
536 set->iteration_id++;
537 }
538 { 536 {
539 struct SetContent *content; 537 struct SetContent *content = set->content;
540 struct PendingMutation *pm; 538 struct PendingMutation *pm;
541 struct PendingMutation *pm_current; 539 struct PendingMutation *pm_current;
540 struct LazyCopyRequest *lcr;
542 541
543 content = set->content; 542 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543 "Destroying client's set\n");
544 /* Destroy pending set operations */
545 while (NULL != set->ops_head)
546 _GSS_operation_destroy (set->ops_head,
547 GNUNET_NO);
548
549 /* Destroy operation-specific state */
550 GNUNET_assert (NULL != set->state);
551 set->vt->destroy_set (set->state);
552 set->state = NULL;
553
554 /* Clean up ongoing iterations */
555 if (NULL != set->iter)
556 {
557 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
558 set->iter = NULL;
559 set->iteration_id++;
560 }
544 561
545 // discard any pending mutations that reference this set 562 /* discard any pending mutations that reference this set */
546 pm = content->pending_mutations_head; 563 pm = content->pending_mutations_head;
547 while (NULL != pm) 564 while (NULL != pm)
548 { 565 {
549 pm_current = pm; 566 pm_current = pm;
550 pm = pm->next; 567 pm = pm->next;
551 if (pm_current-> set == set) 568 if (pm_current->set == set)
569 {
552 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, 570 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
553 content->pending_mutations_tail, 571 content->pending_mutations_tail,
554 pm_current); 572 pm_current);
555 573 GNUNET_free (pm_current);
574 }
556 } 575 }
557 576
577 /* free set content (or at least decrement RC) */
558 set->content = NULL; 578 set->content = NULL;
559 GNUNET_assert (0 != content->refcount); 579 GNUNET_assert (0 != content->refcount);
560 content->refcount -= 1; 580 content->refcount--;
561 if (0 == content->refcount) 581 if (0 == content->refcount)
562 { 582 {
563 GNUNET_assert (NULL != content->elements); 583 GNUNET_assert (NULL != content->elements);
@@ -568,166 +588,41 @@ set_destroy (struct Set *set)
568 content->elements = NULL; 588 content->elements = NULL;
569 GNUNET_free (content); 589 GNUNET_free (content);
570 } 590 }
571 } 591 GNUNET_free_non_null (set->excluded_generations);
572 GNUNET_free_non_null (set->excluded_generations); 592 set->excluded_generations = NULL;
573 set->excluded_generations = NULL;
574 GNUNET_CONTAINER_DLL_remove (sets_head,
575 sets_tail,
576 set);
577 593
578 // remove set from pending copy requests 594 /* remove set from pending copy requests */
579 {
580 struct LazyCopyRequest *lcr;
581 lcr = lazy_copy_head; 595 lcr = lazy_copy_head;
582 while (NULL != lcr) 596 while (NULL != lcr)
583 { 597 {
584 struct LazyCopyRequest *lcr_current; 598 struct LazyCopyRequest *lcr_current = lcr;
585 lcr_current = lcr; 599
586 lcr = lcr->next; 600 lcr = lcr->next;
587 if (lcr_current->source_set == set) 601 if (lcr_current->source_set == set)
602 {
588 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, 603 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
589 lazy_copy_tail, 604 lazy_copy_tail,
590 lcr_current); 605 lcr_current);
606 GNUNET_free (lcr_current);
607 }
591 } 608 }
609 GNUNET_free (set);
592 } 610 }
593 611
594 GNUNET_free (set); 612 if (NULL != (listener = cs->listener))
595}
596
597
598/**
599 * Callback called when a client connects to the service.
600 *
601 * @param cls closure for the service
602 * @param c the new client that connected to the service
603 * @param mq the message queue used to send messages to the client
604 * @return @a c
605 */
606static void *
607client_connect_cb (void *cls,
608 struct GNUNET_SERVICE_Client *c,
609 struct GNUNET_MQ_Handle *mq)
610{
611 return c;
612}
613
614
615/**
616 * Destroy an incoming request from a remote peer
617 *
618 * @param incoming remote request to destroy
619 */
620static void
621incoming_destroy (struct Operation *incoming)
622{
623 struct GNUNET_CADET_Channel *channel;
624
625 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
626 GNUNET_CONTAINER_DLL_remove (incoming_head,
627 incoming_tail,
628 incoming);
629 if (NULL != incoming->timeout_task)
630 {
631 GNUNET_SCHEDULER_cancel (incoming->timeout_task);
632 incoming->timeout_task = NULL;
633 }
634 /* make sure that the tunnel end handler will not destroy us again */
635 incoming->vt = NULL;
636 if (NULL != incoming->spec)
637 {
638 GNUNET_free (incoming->spec);
639 incoming->spec = NULL;
640 }
641 if (NULL != (channel = incoming->channel))
642 {
643 incoming->channel = NULL;
644 GNUNET_CADET_channel_destroy (channel);
645 }
646}
647
648
649/**
650 * Clean up after a client has disconnected
651 *
652 * @param cls closure, unused
653 * @param client the client to clean up after
654 * @param internal_cls our client-specific internal data structure
655 */
656static void
657client_disconnect_cb (void *cls,
658 struct GNUNET_SERVICE_Client *client,
659 void *internal_cls)
660{
661 struct Set *set;
662
663 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
664 "client disconnected, cleaning up\n");
665 set = set_get (client);
666 if (NULL != set)
667 { 613 {
668 set->client = NULL;
669 set_destroy (set);
670 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671 "Client's set destroyed\n"); 615 "Destroying client's listener\n");
672 } 616 GNUNET_CADET_close_port (listener->open_port);
673 struct Listener *listener = listener_get (client); 617 listener->open_port = NULL;
674 if (NULL != listener) 618 while (NULL != (op = listener->op_head))
675 { 619 incoming_destroy (op);
676 /* destroy all incoming operations whose client just 620 GNUNET_CONTAINER_DLL_remove (listener_head,
677 * got destroyed */ 621 listener_tail,
678 //struct Operation *op = incoming_head; 622 listener);
679 /* 623 GNUNET_free (listener);
680 while (NULL != op)
681 {
682 struct Operation *curr = op;
683 op = op->next;
684 if ( (GNUNET_YES == curr->is_incoming) &&
685 (curr->listener == listener) )
686 incoming_destroy (curr);
687 }
688 */
689 listener->client = NULL;
690 listener_destroy (listener);
691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
692 "Client's listener destroyed\n");
693 } 624 }
694} 625 GNUNET_free (cs);
695
696
697/**
698 * Suggest the given request to the listener. The listening client can
699 * then accept or reject the remote request.
700 *
701 * @param incoming the incoming peer with the request to suggest
702 * @param listener the listener to suggest the request to
703 */
704static void
705incoming_suggest (struct Operation *incoming,
706 struct Listener *listener)
707{
708 struct GNUNET_MQ_Envelope *mqm;
709 struct GNUNET_SET_RequestMessage *cmsg;
710
711 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
712 GNUNET_assert (NULL != incoming->spec);
713 GNUNET_assert (0 == incoming->suggest_id);
714 incoming->suggest_id = suggest_id++;
715 if (0 == suggest_id)
716 suggest_id++;
717 GNUNET_assert (NULL != incoming->timeout_task);
718 GNUNET_SCHEDULER_cancel (incoming->timeout_task);
719 incoming->timeout_task = NULL;
720 mqm = GNUNET_MQ_msg_nested_mh (cmsg,
721 GNUNET_MESSAGE_TYPE_SET_REQUEST,
722 incoming->spec->context_msg);
723 GNUNET_assert (NULL != mqm);
724 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
725 "Suggesting incoming request with accept id %u to listener\n",
726 incoming->suggest_id);
727 cmsg->accept_id = htonl (incoming->suggest_id);
728 cmsg->peer_id = incoming->spec->peer;
729 GNUNET_MQ_send (listener->client_mq,
730 mqm);
731} 626}
732 627
733 628
@@ -744,10 +639,22 @@ check_incoming_msg (void *cls,
744 const struct OperationRequestMessage *msg) 639 const struct OperationRequestMessage *msg)
745{ 640{
746 struct Operation *op = cls; 641 struct Operation *op = cls;
642 struct Listener *listener = op->listener;
747 const struct GNUNET_MessageHeader *nested_context; 643 const struct GNUNET_MessageHeader *nested_context;
748 644
749 /* double operation request */ 645 /* double operation request */
750 if (NULL != op->spec) 646 if (0 != op->suggest_id)
647 {
648 GNUNET_break_op (0);
649 return GNUNET_SYSERR;
650 }
651 /* This should be equivalent to the previous condition, but can't hurt to check twice */
652 if (NULL == op->listener)
653 {
654 GNUNET_break (0);
655 return GNUNET_SYSERR;
656 }
657 if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
751 { 658 {
752 GNUNET_break_op (0); 659 GNUNET_break_op (0);
753 return GNUNET_SYSERR; 660 return GNUNET_SYSERR;
@@ -786,61 +693,74 @@ handle_incoming_msg (void *cls,
786{ 693{
787 struct Operation *op = cls; 694 struct Operation *op = cls;
788 struct Listener *listener = op->listener; 695 struct Listener *listener = op->listener;
789 struct OperationSpecification *spec;
790 const struct GNUNET_MessageHeader *nested_context; 696 const struct GNUNET_MessageHeader *nested_context;
697 struct GNUNET_MQ_Envelope *env;
698 struct GNUNET_SET_RequestMessage *cmsg;
791 699
792 GNUNET_assert (GNUNET_YES == op->is_incoming);
793 spec = GNUNET_new (struct OperationSpecification);
794 nested_context = GNUNET_MQ_extract_nested_mh (msg); 700 nested_context = GNUNET_MQ_extract_nested_mh (msg);
795 /* Make a copy of the nested_context (application-specific context 701 /* Make a copy of the nested_context (application-specific context
796 information that is opaque to set) so we can pass it to the 702 information that is opaque to set) so we can pass it to the
797 listener later on */ 703 listener later on */
798 if (NULL != nested_context) 704 if (NULL != nested_context)
799 spec->context_msg = GNUNET_copy_message (nested_context); 705 op->context_msg = GNUNET_copy_message (nested_context);
800 spec->operation = ntohl (msg->operation); 706 op->remote_element_count = ntohl (msg->element_count);
801 spec->app_id = listener->app_id;
802 spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
803 UINT32_MAX);
804 spec->peer = op->peer;
805 spec->remote_element_count = ntohl (msg->element_count);
806 op->spec = spec;
807 listener = op->listener;
808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809 "Received P2P operation request (op %u, port %s) for active listener\n", 708 "Received P2P operation request (op %u, port %s) for active listener\n",
810 (uint32_t) ntohl (msg->operation), 709 (uint32_t) ntohl (msg->operation),
811 GNUNET_h2s (&listener->app_id)); 710 GNUNET_h2s (&op->listener->app_id));
812 incoming_suggest (op, 711 GNUNET_assert (0 == op->suggest_id);
813 listener); 712 if (0 == suggest_id)
713 suggest_id++;
714 op->suggest_id = suggest_id++;
715 GNUNET_assert (NULL != op->timeout_task);
716 GNUNET_SCHEDULER_cancel (op->timeout_task);
717 op->timeout_task = NULL;
718 env = GNUNET_MQ_msg_nested_mh (cmsg,
719 GNUNET_MESSAGE_TYPE_SET_REQUEST,
720 op->context_msg);
721 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
722 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
723 op->suggest_id,
724 listener,
725 listener->cs);
726 cmsg->accept_id = htonl (op->suggest_id);
727 cmsg->peer_id = op->peer;
728 GNUNET_MQ_send (listener->cs->mq,
729 env);
730 /* NOTE: GNUNET_CADET_receive_done() will be called in
731 #handle_client_accept() */
814} 732}
815 733
816 734
735/**
736 * Add an element to @a set as specified by @a msg
737 *
738 * @param set set to manipulate
739 * @param msg message specifying the change
740 */
817static void 741static void
818execute_add (struct Set *set, 742execute_add (struct Set *set,
819 const struct GNUNET_MessageHeader *m) 743 const struct GNUNET_SET_ElementMessage *msg)
820{ 744{
821 const struct GNUNET_SET_ElementMessage *msg;
822 struct GNUNET_SET_Element el; 745 struct GNUNET_SET_Element el;
823 struct ElementEntry *ee; 746 struct ElementEntry *ee;
824 struct GNUNET_HashCode hash; 747 struct GNUNET_HashCode hash;
825 748
826 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type)); 749 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
827 750 el.size = ntohs (msg->header.size) - sizeof (*msg);
828 msg = (const struct GNUNET_SET_ElementMessage *) m;
829 el.size = ntohs (m->size) - sizeof *msg;
830 el.data = &msg[1]; 751 el.data = &msg[1];
831 el.element_type = ntohs (msg->element_type); 752 el.element_type = ntohs (msg->element_type);
832 GNUNET_SET_element_hash (&el, &hash); 753 GNUNET_SET_element_hash (&el,
833 754 &hash);
834 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, 755 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
835 &hash); 756 &hash);
836
837 if (NULL == ee) 757 if (NULL == ee)
838 { 758 {
839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840 "Client inserts element %s of size %u\n", 760 "Client inserts element %s of size %u\n",
841 GNUNET_h2s (&hash), 761 GNUNET_h2s (&hash),
842 el.size); 762 el.size);
843 ee = GNUNET_malloc (el.size + sizeof *ee); 763 ee = GNUNET_malloc (el.size + sizeof (*ee));
844 ee->element.size = el.size; 764 ee->element.size = el.size;
845 GNUNET_memcpy (&ee[1], 765 GNUNET_memcpy (&ee[1],
846 el.data, 766 el.data,
@@ -857,7 +777,11 @@ execute_add (struct Set *set,
857 ee, 777 ee,
858 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 778 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
859 } 779 }
860 else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) 780 else if (GNUNET_YES ==
781 is_element_of_generation (ee,
782 set->current_generation,
783 set->excluded_generations,
784 set->excluded_generations_size))
861 { 785 {
862 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863 "Client inserted element %s of size %u twice (ignored)\n", 787 "Client inserted element %s of size %u twice (ignored)\n",
@@ -877,24 +801,27 @@ execute_add (struct Set *set,
877 ee->mutations_size, 801 ee->mutations_size,
878 mut); 802 mut);
879 } 803 }
880 804 set->vt->add (set->state,
881 set->vt->add (set->state, ee); 805 ee);
882} 806}
883 807
884 808
809/**
810 * Remove an element from @a set as specified by @a msg
811 *
812 * @param set set to manipulate
813 * @param msg message specifying the change
814 */
885static void 815static void
886execute_remove (struct Set *set, 816execute_remove (struct Set *set,
887 const struct GNUNET_MessageHeader *m) 817 const struct GNUNET_SET_ElementMessage *msg)
888{ 818{
889 const struct GNUNET_SET_ElementMessage *msg;
890 struct GNUNET_SET_Element el; 819 struct GNUNET_SET_Element el;
891 struct ElementEntry *ee; 820 struct ElementEntry *ee;
892 struct GNUNET_HashCode hash; 821 struct GNUNET_HashCode hash;
893 822
894 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type)); 823 GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
895 824 el.size = ntohs (msg->header.size) - sizeof (*msg);
896 msg = (const struct GNUNET_SET_ElementMessage *) m;
897 el.size = ntohs (m->size) - sizeof *msg;
898 el.data = &msg[1]; 825 el.data = &msg[1];
899 el.element_type = ntohs (msg->element_type); 826 el.element_type = ntohs (msg->element_type);
900 GNUNET_SET_element_hash (&el, &hash); 827 GNUNET_SET_element_hash (&el, &hash);
@@ -908,7 +835,11 @@ execute_remove (struct Set *set,
908 el.size); 835 el.size);
909 return; 836 return;
910 } 837 }
911 if (GNUNET_NO == _GSS_is_element_of_set (ee, set)) 838 if (GNUNET_NO ==
839 is_element_of_generation (ee,
840 set->current_generation,
841 set->excluded_generations,
842 set->excluded_generations_size))
912 { 843 {
913 /* Client tried to remove element twice */ 844 /* Client tried to remove element twice */
914 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -931,22 +862,28 @@ execute_remove (struct Set *set,
931 ee->mutations_size, 862 ee->mutations_size,
932 mut); 863 mut);
933 } 864 }
934 set->vt->remove (set->state, ee); 865 set->vt->remove (set->state,
866 ee);
935} 867}
936 868
937 869
938 870/**
871 * Perform a mutation on a set as specified by the @a msg
872 *
873 * @param set the set to mutate
874 * @param msg specification of what to change
875 */
939static void 876static void
940execute_mutation (struct Set *set, 877execute_mutation (struct Set *set,
941 const struct GNUNET_MessageHeader *m) 878 const struct GNUNET_SET_ElementMessage *msg)
942{ 879{
943 switch (ntohs (m->type)) 880 switch (ntohs (msg->header.type))
944 { 881 {
945 case GNUNET_MESSAGE_TYPE_SET_ADD: 882 case GNUNET_MESSAGE_TYPE_SET_ADD:
946 execute_add (set, m); 883 execute_add (set, msg);
947 break; 884 break;
948 case GNUNET_MESSAGE_TYPE_SET_REMOVE: 885 case GNUNET_MESSAGE_TYPE_SET_REMOVE:
949 execute_remove (set, m); 886 execute_remove (set, msg);
950 break; 887 break;
951 default: 888 default:
952 GNUNET_break (0); 889 GNUNET_break (0);
@@ -954,6 +891,34 @@ execute_mutation (struct Set *set,
954} 891}
955 892
956 893
894/**
895 * Execute mutations that were delayed on a set because of
896 * pending operations.
897 *
898 * @param set the set to execute mutations on
899 */
900static void
901execute_delayed_mutations (struct Set *set)
902{
903 struct PendingMutation *pm;
904
905 if (0 != set->content->iterator_count)
906 return; /* still cannot do this */
907 while (NULL != (pm = set->content->pending_mutations_head))
908 {
909 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
910 set->content->pending_mutations_tail,
911 pm);
912 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913 "Executing pending mutation on %p.\n",
914 pm->set);
915 execute_mutation (pm->set,
916 pm->msg);
917 GNUNET_free (pm->msg);
918 GNUNET_free (pm);
919 }
920}
921
957 922
958/** 923/**
959 * Send the next element of a set to the set's client. The next element is given by 924 * Send the next element of a set to the set's client. The next element is given by
@@ -977,65 +942,45 @@ send_client_element (struct Set *set)
977 struct GNUNET_SET_IterResponseMessage *msg; 942 struct GNUNET_SET_IterResponseMessage *msg;
978 943
979 GNUNET_assert (NULL != set->iter); 944 GNUNET_assert (NULL != set->iter);
980 945 do {
981again: 946 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
982 947 NULL,
983 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, 948 (const void **) &ee);
984 NULL, 949 if (GNUNET_NO == ret)
985 (const void **) &ee);
986 if (GNUNET_NO == ret)
987 {
988 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989 "Iteration on %p done.\n",
990 (void *) set);
991 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
992 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
993 set->iter = NULL;
994 set->iteration_id++;
995
996 GNUNET_assert (set->content->iterator_count > 0);
997 set->content->iterator_count -= 1;
998
999 if (0 == set->content->iterator_count)
1000 { 950 {
1001 while (NULL != set->content->pending_mutations_head) 951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002 { 952 "Iteration on %p done.\n",
1003 struct PendingMutation *pm; 953 set);
1004 954 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
1005 pm = set->content->pending_mutations_head; 955 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1006 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, 956 set->iter = NULL;
1007 set->content->pending_mutations_tail, 957 set->iteration_id++;
1008 pm); 958 GNUNET_assert (set->content->iterator_count > 0);
1009 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 959 set->content->iterator_count--;
1010 "Executing pending mutation on %p.\n", 960 execute_delayed_mutations (set);
1011 (void *) pm->set); 961 GNUNET_MQ_send (set->cs->mq,
1012 execute_mutation (pm->set, pm->mutation_message); 962 ev);
1013 GNUNET_free (pm->mutation_message); 963 return;
1014 GNUNET_free (pm);
1015 }
1016 } 964 }
1017
1018 }
1019 else
1020 {
1021 GNUNET_assert (NULL != ee); 965 GNUNET_assert (NULL != ee);
1022 966 } while (GNUNET_NO ==
1023 if (GNUNET_NO == is_element_of_iteration (ee, set)) 967 is_element_of_generation (ee,
1024 goto again; 968 set->iter_generation,
1025 969 set->excluded_generations,
1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 970 set->excluded_generations_size));
1027 "Sending iteration element on %p.\n", 971 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028 (void *) set); 972 "Sending iteration element on %p.\n",
1029 ev = GNUNET_MQ_msg_extra (msg, 973 set);
1030 ee->element.size, 974 ev = GNUNET_MQ_msg_extra (msg,
1031 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); 975 ee->element.size,
1032 GNUNET_memcpy (&msg[1], 976 GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
1033 ee->element.data, 977 GNUNET_memcpy (&msg[1],
1034 ee->element.size); 978 ee->element.data,
1035 msg->element_type = htons (ee->element.element_type); 979 ee->element.size);
1036 msg->iteration_id = htons (set->iteration_id); 980 msg->element_type = htons (ee->element.element_type);
1037 } 981 msg->iteration_id = htons (set->iteration_id);
1038 GNUNET_MQ_send (set->client_mq, ev); 982 GNUNET_MQ_send (set->cs->mq,
983 ev);
1039} 984}
1040 985
1041 986
@@ -1052,22 +997,21 @@ static void
1052handle_client_iterate (void *cls, 997handle_client_iterate (void *cls,
1053 const struct GNUNET_MessageHeader *m) 998 const struct GNUNET_MessageHeader *m)
1054{ 999{
1055 struct GNUNET_SERVICE_Client *client = cls; 1000 struct ClientState *cs = cls;
1056 struct Set *set; 1001 struct Set *set;
1057 1002
1058 set = set_get (client); 1003 if (NULL == (set = cs->set))
1059 if (NULL == set)
1060 { 1004 {
1061 /* attempt to iterate over a non existing set */ 1005 /* attempt to iterate over a non existing set */
1062 GNUNET_break (0); 1006 GNUNET_break (0);
1063 GNUNET_SERVICE_client_drop (client); 1007 GNUNET_SERVICE_client_drop (cs->client);
1064 return; 1008 return;
1065 } 1009 }
1066 if (NULL != set->iter) 1010 if (NULL != set->iter)
1067 { 1011 {
1068 /* Only one concurrent iterate-action allowed per set */ 1012 /* Only one concurrent iterate-action allowed per set */
1069 GNUNET_break (0); 1013 GNUNET_break (0);
1070 GNUNET_SERVICE_client_drop (client); 1014 GNUNET_SERVICE_client_drop (cs->client);
1071 return; 1015 return;
1072 } 1016 }
1073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1075,8 +1019,8 @@ handle_client_iterate (void *cls,
1075 (void *) set, 1019 (void *) set,
1076 set->current_generation, 1020 set->current_generation,
1077 GNUNET_CONTAINER_multihashmap_size (set->content->elements)); 1021 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
1078 GNUNET_SERVICE_client_continue (client); 1022 GNUNET_SERVICE_client_continue (cs->client);
1079 set->content->iterator_count += 1; 1023 set->content->iterator_count++;
1080 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); 1024 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
1081 set->iter_generation = set->current_generation; 1025 set->iter_generation = set->current_generation;
1082 send_client_element (set); 1026 send_client_element (set);
@@ -1095,17 +1039,17 @@ static void
1095handle_client_create_set (void *cls, 1039handle_client_create_set (void *cls,
1096 const struct GNUNET_SET_CreateMessage *msg) 1040 const struct GNUNET_SET_CreateMessage *msg)
1097{ 1041{
1098 struct GNUNET_SERVICE_Client *client = cls; 1042 struct ClientState *cs = cls;
1099 struct Set *set; 1043 struct Set *set;
1100 1044
1101 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1102 "Client created new set (operation %u)\n", 1046 "Client created new set (operation %u)\n",
1103 (uint32_t) ntohl (msg->operation)); 1047 (uint32_t) ntohl (msg->operation));
1104 if (NULL != set_get (client)) 1048 if (NULL != cs->set)
1105 { 1049 {
1106 /* There can only be one set per client */ 1050 /* There can only be one set per client */
1107 GNUNET_break (0); 1051 GNUNET_break (0);
1108 GNUNET_SERVICE_client_drop (client); 1052 GNUNET_SERVICE_client_drop (cs->client);
1109 return; 1053 return;
1110 } 1054 }
1111 set = GNUNET_new (struct Set); 1055 set = GNUNET_new (struct Set);
@@ -1120,7 +1064,7 @@ handle_client_create_set (void *cls,
1120 default: 1064 default:
1121 GNUNET_free (set); 1065 GNUNET_free (set);
1122 GNUNET_break (0); 1066 GNUNET_break (0);
1123 GNUNET_SERVICE_client_drop (client); 1067 GNUNET_SERVICE_client_drop (cs->client);
1124 return; 1068 return;
1125 } 1069 }
1126 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); 1070 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
@@ -1129,18 +1073,16 @@ handle_client_create_set (void *cls,
1129 { 1073 {
1130 /* initialization failed (i.e. out of memory) */ 1074 /* initialization failed (i.e. out of memory) */
1131 GNUNET_free (set); 1075 GNUNET_free (set);
1132 GNUNET_SERVICE_client_drop (client); 1076 GNUNET_SERVICE_client_drop (cs->client);
1133 return; 1077 return;
1134 } 1078 }
1135 set->content = GNUNET_new (struct SetContent); 1079 set->content = GNUNET_new (struct SetContent);
1136 set->content->refcount = 1; 1080 set->content->refcount = 1;
1137 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1081 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
1138 set->client = client; 1082 GNUNET_YES);
1139 set->client_mq = GNUNET_SERVICE_client_get_mq (client); 1083 set->cs = cs;
1140 GNUNET_CONTAINER_DLL_insert (sets_head, 1084 cs->set = set;
1141 sets_tail, 1085 GNUNET_SERVICE_client_continue (cs->client);
1142 set);
1143 GNUNET_SERVICE_client_continue (client);
1144} 1086}
1145 1087
1146 1088
@@ -1156,31 +1098,12 @@ handle_client_create_set (void *cls,
1156static void 1098static void
1157incoming_timeout_cb (void *cls) 1099incoming_timeout_cb (void *cls)
1158{ 1100{
1159 struct Operation *incoming = cls; 1101 struct Operation *op = cls;
1160 1102
1161 incoming->timeout_task = NULL; 1103 op->timeout_task = NULL;
1162 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
1163 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164 "Remote peer's incoming request timed out\n"); 1105 "Remote peer's incoming request timed out\n");
1165 incoming_destroy (incoming);
1166}
1167
1168
1169/**
1170 * Terminates an incoming operation in case we have not yet received an
1171 * operation request. Called by the channel destruction handler.
1172 *
1173 * @param op the channel context
1174 */
1175static void
1176handle_incoming_disconnect (struct Operation *op)
1177{
1178 GNUNET_assert (GNUNET_YES == op->is_incoming);
1179 /* channel is already dead, incoming_destroy must not
1180 * destroy it ... */
1181 op->channel = NULL;
1182 incoming_destroy (op); 1106 incoming_destroy (op);
1183 op->vt = NULL;
1184} 1107}
1185 1108
1186 1109
@@ -1205,31 +1128,26 @@ channel_new_cb (void *cls,
1205 struct GNUNET_CADET_Channel *channel, 1128 struct GNUNET_CADET_Channel *channel,
1206 const struct GNUNET_PeerIdentity *source) 1129 const struct GNUNET_PeerIdentity *source)
1207{ 1130{
1208 static const struct SetVT incoming_vt = {
1209 .peer_disconnect = &handle_incoming_disconnect
1210 };
1211 struct Listener *listener = cls; 1131 struct Listener *listener = cls;
1212 struct Operation *incoming; 1132 struct Operation *op;
1213 1133
1214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215 "New incoming channel\n"); 1135 "New incoming channel\n");
1216 incoming = GNUNET_new (struct Operation); 1136 op = GNUNET_new (struct Operation);
1217 incoming->listener = listener; 1137 op->listener = listener;
1218 incoming->is_incoming = GNUNET_YES; 1138 op->peer = *source;
1219 incoming->peer = *source; 1139 op->channel = channel;
1220 incoming->channel = channel; 1140 op->mq = GNUNET_CADET_get_mq (op->channel);
1221 incoming->mq = GNUNET_CADET_get_mq (incoming->channel); 1141 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1222 incoming->vt = &incoming_vt; 1142 UINT32_MAX);
1223 incoming->timeout_task 1143 op->timeout_task
1224 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, 1144 = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
1225 &incoming_timeout_cb, 1145 &incoming_timeout_cb,
1226 incoming); 1146 op);
1227 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, 1147 GNUNET_CONTAINER_DLL_insert (listener->op_head,
1228 incoming_tail, 1148 listener->op_tail,
1229 incoming); 1149 op);
1230 // incoming_suggest (incoming, 1150 return op;
1231 // listener);
1232 return incoming;
1233} 1151}
1234 1152
1235 1153
@@ -1258,22 +1176,14 @@ channel_end_cb (void *channel_ctx,
1258 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259 "channel_end_cb called\n"); 1177 "channel_end_cb called\n");
1260 op->channel = NULL; 1178 op->channel = NULL;
1261 op->keep++; 1179 if (NULL != op->listener)
1262 /* the vt can be null if a client already requested canceling op. */ 1180 incoming_destroy (op);
1263 if (NULL != op->vt) 1181 else if (NULL != op->set)
1264 { 1182 op->set->vt->channel_death (op);
1265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1183 else
1266 "calling peer disconnect due to channel end\n"); 1184 _GSS_operation_destroy (op,
1267 op->vt->peer_disconnect (op); 1185 GNUNET_YES);
1268 } 1186 GNUNET_free (op);
1269 op->keep--;
1270 if (0 == op->keep)
1271 {
1272 /* cadet will never call us with the context again! */
1273 GNUNET_free (op);
1274 }
1275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1276 "channel_end_cb finished\n");
1277} 1187}
1278 1188
1279 1189
@@ -1310,7 +1220,7 @@ static void
1310handle_client_listen (void *cls, 1220handle_client_listen (void *cls,
1311 const struct GNUNET_SET_ListenMessage *msg) 1221 const struct GNUNET_SET_ListenMessage *msg)
1312{ 1222{
1313 struct GNUNET_SERVICE_Client *client = cls; 1223 struct ClientState *cs = cls;
1314 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1224 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1315 GNUNET_MQ_hd_var_size (incoming_msg, 1225 GNUNET_MQ_hd_var_size (incoming_msg,
1316 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1226 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
@@ -1376,50 +1286,33 @@ handle_client_listen (void *cls,
1376 }; 1286 };
1377 struct Listener *listener; 1287 struct Listener *listener;
1378 1288
1379 if (NULL != listener_get (client)) 1289 if (NULL != cs->listener)
1380 { 1290 {
1381 /* max. one active listener per client! */ 1291 /* max. one active listener per client! */
1382 GNUNET_break (0); 1292 GNUNET_break (0);
1383 GNUNET_SERVICE_client_drop (client); 1293 GNUNET_SERVICE_client_drop (cs->client);
1384 return; 1294 return;
1385 } 1295 }
1386 listener = GNUNET_new (struct Listener); 1296 listener = GNUNET_new (struct Listener);
1387 listener->client = client; 1297 listener->cs = cs;
1388 listener->client_mq = GNUNET_SERVICE_client_get_mq (client);
1389 listener->app_id = msg->app_id; 1298 listener->app_id = msg->app_id;
1390 listener->operation = ntohl (msg->operation); 1299 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1391 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, 1300 GNUNET_CONTAINER_DLL_insert (listener_head,
1392 listeners_tail, 1301 listener_tail,
1393 listener); 1302 listener);
1394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1395 "New listener created (op %u, port %s)\n", 1304 "New listener created (op %u, port %s)\n",
1396 listener->operation, 1305 listener->operation,
1397 GNUNET_h2s (&listener->app_id)); 1306 GNUNET_h2s (&listener->app_id));
1398 listener->open_port = GNUNET_CADET_open_porT (cadet, 1307 listener->open_port
1399 &msg->app_id, 1308 = GNUNET_CADET_open_porT (cadet,
1400 &channel_new_cb, 1309 &msg->app_id,
1401 listener, 1310 &channel_new_cb,
1402 &channel_window_cb, 1311 listener,
1403 &channel_end_cb, 1312 &channel_window_cb,
1404 cadet_handlers); 1313 &channel_end_cb,
1405 /* check for existing incoming requests the listener might be interested in */ 1314 cadet_handlers);
1406 for (struct Operation *op = incoming_head; NULL != op; op = op->next) 1315 GNUNET_SERVICE_client_continue (cs->client);
1407 {
1408 if (NULL == op->spec)
1409 continue; /* no details available yet */
1410 if (0 != op->suggest_id)
1411 continue; /* this one has been already suggested to a listener */
1412 if (listener->operation != op->spec->operation)
1413 continue; /* incompatible operation */
1414 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
1415 &op->spec->app_id))
1416 continue; /* incompatible appliation */
1417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418 "Found matching existing request\n");
1419 incoming_suggest (op,
1420 listener);
1421 }
1422 GNUNET_SERVICE_client_continue (client);
1423} 1316}
1424 1317
1425 1318
@@ -1434,26 +1327,26 @@ static void
1434handle_client_reject (void *cls, 1327handle_client_reject (void *cls,
1435 const struct GNUNET_SET_RejectMessage *msg) 1328 const struct GNUNET_SET_RejectMessage *msg)
1436{ 1329{
1437 struct GNUNET_SERVICE_Client *client = cls; 1330 struct ClientState *cs = cls;
1438 struct Operation *incoming; 1331 struct Operation *op;
1439 1332
1440 incoming = get_incoming (ntohl (msg->accept_reject_id)); 1333 op = get_incoming (ntohl (msg->accept_reject_id));
1441 if (NULL == incoming) 1334 if (NULL == op)
1442 { 1335 {
1443 /* no matching incoming operation for this reject; 1336 /* no matching incoming operation for this reject;
1444 could be that the other peer already disconnected... */ 1337 could be that the other peer already disconnected... */
1445 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1338 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1446 "Client rejected unknown operation %u\n", 1339 "Client rejected unknown operation %u\n",
1447 (unsigned int) ntohl (msg->accept_reject_id)); 1340 (unsigned int) ntohl (msg->accept_reject_id));
1448 GNUNET_SERVICE_client_continue (client); 1341 GNUNET_SERVICE_client_continue (cs->client);
1449 return; 1342 return;
1450 } 1343 }
1451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452 "Peer request (op %u, app %s) rejected by client\n", 1345 "Peer request (op %u, app %s) rejected by client\n",
1453 incoming->spec->operation, 1346 op->listener->operation,
1454 GNUNET_h2s (&incoming->spec->app_id)); 1347 GNUNET_h2s (&cs->listener->app_id));
1455 GNUNET_CADET_channel_destroy (incoming->channel); 1348 GNUNET_CADET_channel_destroy (op->channel);
1456 GNUNET_SERVICE_client_continue (client); 1349 GNUNET_SERVICE_client_continue (cs->client);
1457} 1350}
1458 1351
1459 1352
@@ -1461,13 +1354,14 @@ handle_client_reject (void *cls,
1461 * Called when a client wants to add or remove an element to a set it inhabits. 1354 * Called when a client wants to add or remove an element to a set it inhabits.
1462 * 1355 *
1463 * @param cls client that sent the message 1356 * @param cls client that sent the message
1464 * @param m message sent by the client 1357 * @param msg message sent by the client
1465 */ 1358 */
1466static int 1359static int
1467check_client_mutation (void *cls, 1360check_client_mutation (void *cls,
1468 const struct GNUNET_MessageHeader *m) 1361 const struct GNUNET_SET_ElementMessage *msg)
1469{ 1362{
1470 /* FIXME: any check we might want to do here? */ 1363 /* NOTE: Technically, we should probably check with the
1364 block library whether the element we are given is well-formed */
1471 return GNUNET_OK; 1365 return GNUNET_OK;
1472} 1366}
1473 1367
@@ -1476,24 +1370,23 @@ check_client_mutation (void *cls,
1476 * Called when a client wants to add or remove an element to a set it inhabits. 1370 * Called when a client wants to add or remove an element to a set it inhabits.
1477 * 1371 *
1478 * @param cls client that sent the message 1372 * @param cls client that sent the message
1479 * @param m message sent by the client 1373 * @param msg message sent by the client
1480 */ 1374 */
1481static void 1375static void
1482handle_client_mutation (void *cls, 1376handle_client_mutation (void *cls,
1483 const struct GNUNET_MessageHeader *m) 1377 const struct GNUNET_SET_ElementMessage *msg)
1484{ 1378{
1485 struct GNUNET_SERVICE_Client *client = cls; 1379 struct ClientState *cs = cls;
1486 struct Set *set; 1380 struct Set *set;
1487 1381
1488 set = set_get (client); 1382 if (NULL == (set = cs->set))
1489 if (NULL == set)
1490 { 1383 {
1491 /* client without a set requested an operation */ 1384 /* client without a set requested an operation */
1492 GNUNET_break (0); 1385 GNUNET_break (0);
1493 GNUNET_SERVICE_client_drop (client); 1386 GNUNET_SERVICE_client_drop (cs->client);
1494 return; 1387 return;
1495 } 1388 }
1496 GNUNET_SERVICE_client_continue (client); 1389 GNUNET_SERVICE_client_continue (cs->client);
1497 1390
1498 if (0 != set->content->iterator_count) 1391 if (0 != set->content->iterator_count)
1499 { 1392 {
@@ -1502,7 +1395,7 @@ handle_client_mutation (void *cls,
1502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1503 "Scheduling mutation on set\n"); 1396 "Scheduling mutation on set\n");
1504 pm = GNUNET_new (struct PendingMutation); 1397 pm = GNUNET_new (struct PendingMutation);
1505 pm->mutation_message = GNUNET_copy_message (m); 1398 pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1506 pm->set = set; 1399 pm->set = set;
1507 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, 1400 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
1508 set->content->pending_mutations_tail, 1401 set->content->pending_mutations_tail,
@@ -1512,7 +1405,7 @@ handle_client_mutation (void *cls,
1512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1513 "Executing mutation on set\n"); 1406 "Executing mutation on set\n");
1514 execute_mutation (set, 1407 execute_mutation (set,
1515 m); 1408 msg);
1516} 1409}
1517 1410
1518 1411
@@ -1577,7 +1470,7 @@ static void
1577handle_client_evaluate (void *cls, 1470handle_client_evaluate (void *cls,
1578 const struct GNUNET_SET_EvaluateMessage *msg) 1471 const struct GNUNET_SET_EvaluateMessage *msg)
1579{ 1472{
1580 struct GNUNET_SERVICE_Client *client = cls; 1473 struct ClientState *cs = cls;
1581 struct Operation *op = GNUNET_new (struct Operation); 1474 struct Operation *op = GNUNET_new (struct Operation);
1582 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1475 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1583 GNUNET_MQ_hd_var_size (incoming_msg, 1476 GNUNET_MQ_hd_var_size (incoming_msg,
@@ -1643,45 +1536,38 @@ handle_client_evaluate (void *cls,
1643 GNUNET_MQ_handler_end () 1536 GNUNET_MQ_handler_end ()
1644 }; 1537 };
1645 struct Set *set; 1538 struct Set *set;
1646 struct OperationSpecification *spec;
1647 const struct GNUNET_MessageHeader *context; 1539 const struct GNUNET_MessageHeader *context;
1648 1540
1649 set = set_get (client); 1541 if (NULL == (set = cs->set))
1650 if (NULL == set)
1651 { 1542 {
1652 GNUNET_break (0); 1543 GNUNET_break (0);
1653 GNUNET_free (op); 1544 GNUNET_free (op);
1654 GNUNET_SERVICE_client_drop (client); 1545 GNUNET_SERVICE_client_drop (cs->client);
1655 return; 1546 return;
1656 } 1547 }
1657 spec = GNUNET_new (struct OperationSpecification); 1548 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
1658 spec->operation = set->operation; 1549 UINT32_MAX);
1659 spec->app_id = msg->app_id; 1550 op->peer = msg->target_peer;
1660 spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, 1551 op->result_mode = ntohl (msg->result_mode);
1661 UINT32_MAX); 1552 op->client_request_id = ntohl (msg->request_id);
1662 spec->peer = msg->target_peer; 1553 op->byzantine = msg->byzantine;
1663 spec->set = set; 1554 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1664 spec->result_mode = ntohl (msg->result_mode); 1555 op->force_full = msg->force_full;
1665 spec->client_request_id = ntohl (msg->request_id); 1556 op->force_delta = msg->force_delta;
1666 spec->byzantine = msg->byzantine;
1667 spec->byzantine_lower_bound = msg->byzantine_lower_bound;
1668 spec->force_full = msg->force_full;
1669 spec->force_delta = msg->force_delta;
1670 context = GNUNET_MQ_extract_nested_mh (msg); 1557 context = GNUNET_MQ_extract_nested_mh (msg);
1671 op->spec = spec;
1672 1558
1673 // Advance generation values, so that 1559 /* Advance generation values, so that
1674 // mutations won't interfer with the running operation. 1560 mutations won't interfer with the running operation. */
1561 op->set = set;
1675 op->generation_created = set->current_generation; 1562 op->generation_created = set->current_generation;
1676 advance_generation (set); 1563 advance_generation (set);
1677 op->operation = set->operation;
1678 op->vt = set->vt;
1679 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1564 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1680 set->ops_tail, 1565 set->ops_tail,
1681 op); 1566 op);
1682 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1567 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683 "Creating new CADET channel to port %s\n", 1568 "Creating new CADET channel to port %s for set operation type %u\n",
1684 GNUNET_h2s (&msg->app_id)); 1569 GNUNET_h2s (&msg->app_id),
1570 set->operation);
1685 op->channel = GNUNET_CADET_channel_creatE (cadet, 1571 op->channel = GNUNET_CADET_channel_creatE (cadet,
1686 op, 1572 op,
1687 &msg->target_peer, 1573 &msg->target_peer,
@@ -1691,9 +1577,15 @@ handle_client_evaluate (void *cls,
1691 &channel_end_cb, 1577 &channel_end_cb,
1692 cadet_handlers); 1578 cadet_handlers);
1693 op->mq = GNUNET_CADET_get_mq (op->channel); 1579 op->mq = GNUNET_CADET_get_mq (op->channel);
1694 set->vt->evaluate (op, 1580 op->state = set->vt->evaluate (op,
1695 context); 1581 context);
1696 GNUNET_SERVICE_client_continue (client); 1582 if (NULL == op->state)
1583 {
1584 GNUNET_break (0);
1585 GNUNET_SERVICE_client_drop (cs->client);
1586 return;
1587 }
1588 GNUNET_SERVICE_client_continue (cs->client);
1697} 1589}
1698 1590
1699 1591
@@ -1709,15 +1601,14 @@ static void
1709handle_client_iter_ack (void *cls, 1601handle_client_iter_ack (void *cls,
1710 const struct GNUNET_SET_IterAckMessage *ack) 1602 const struct GNUNET_SET_IterAckMessage *ack)
1711{ 1603{
1712 struct GNUNET_SERVICE_Client *client = cls; 1604 struct ClientState *cs = cls;
1713 struct Set *set; 1605 struct Set *set;
1714 1606
1715 set = set_get (client); 1607 if (NULL == (set = cs->set))
1716 if (NULL == set)
1717 { 1608 {
1718 /* client without a set acknowledged receiving a value */ 1609 /* client without a set acknowledged receiving a value */
1719 GNUNET_break (0); 1610 GNUNET_break (0);
1720 GNUNET_SERVICE_client_drop (client); 1611 GNUNET_SERVICE_client_drop (cs->client);
1721 return; 1612 return;
1722 } 1613 }
1723 if (NULL == set->iter) 1614 if (NULL == set->iter)
@@ -1725,10 +1616,10 @@ handle_client_iter_ack (void *cls,
1725 /* client sent an ack, but we were not expecting one (as 1616 /* client sent an ack, but we were not expecting one (as
1726 set iteration has finished) */ 1617 set iteration has finished) */
1727 GNUNET_break (0); 1618 GNUNET_break (0);
1728 GNUNET_SERVICE_client_drop (client); 1619 GNUNET_SERVICE_client_drop (cs->client);
1729 return; 1620 return;
1730 } 1621 }
1731 GNUNET_SERVICE_client_continue (client); 1622 GNUNET_SERVICE_client_continue (cs->client);
1732 if (ntohl (ack->send_more)) 1623 if (ntohl (ack->send_more))
1733 { 1624 {
1734 send_client_element (set); 1625 send_client_element (set);
@@ -1752,42 +1643,33 @@ static void
1752handle_client_copy_lazy_prepare (void *cls, 1643handle_client_copy_lazy_prepare (void *cls,
1753 const struct GNUNET_MessageHeader *mh) 1644 const struct GNUNET_MessageHeader *mh)
1754{ 1645{
1755 struct GNUNET_SERVICE_Client *client = cls; 1646 struct ClientState *cs = cls;
1756 struct Set *set; 1647 struct Set *set;
1757 struct LazyCopyRequest *cr; 1648 struct LazyCopyRequest *cr;
1758 struct GNUNET_MQ_Envelope *ev; 1649 struct GNUNET_MQ_Envelope *ev;
1759 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; 1650 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1760 1651
1761 set = set_get (client); 1652 if (NULL == (set = cs->set))
1762 if (NULL == set)
1763 { 1653 {
1764 /* client without a set requested an operation */ 1654 /* client without a set requested an operation */
1765 GNUNET_break (0); 1655 GNUNET_break (0);
1766 GNUNET_SERVICE_client_drop (client); 1656 GNUNET_SERVICE_client_drop (cs->client);
1767 return; 1657 return;
1768 } 1658 }
1769 1659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1660 "Client requested creation of lazy copy\n");
1770 cr = GNUNET_new (struct LazyCopyRequest); 1661 cr = GNUNET_new (struct LazyCopyRequest);
1771 1662 cr->cookie = ++lazy_copy_cookie;
1772 cr->cookie = lazy_copy_cookie;
1773 lazy_copy_cookie += 1;
1774 cr->source_set = set; 1663 cr->source_set = set;
1775
1776 GNUNET_CONTAINER_DLL_insert (lazy_copy_head, 1664 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1777 lazy_copy_tail, 1665 lazy_copy_tail,
1778 cr); 1666 cr);
1779
1780
1781 ev = GNUNET_MQ_msg (resp_msg, 1667 ev = GNUNET_MQ_msg (resp_msg,
1782 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); 1668 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
1783 resp_msg->cookie = cr->cookie; 1669 resp_msg->cookie = cr->cookie;
1784 GNUNET_MQ_send (set->client_mq, ev); 1670 GNUNET_MQ_send (set->cs->mq,
1785 1671 ev);
1786 1672 GNUNET_SERVICE_client_continue (cs->client);
1787 GNUNET_SERVICE_client_continue (client);
1788
1789 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790 "Client requested lazy copy\n");
1791} 1673}
1792 1674
1793 1675
@@ -1801,21 +1683,19 @@ static void
1801handle_client_copy_lazy_connect (void *cls, 1683handle_client_copy_lazy_connect (void *cls,
1802 const struct GNUNET_SET_CopyLazyConnectMessage *msg) 1684 const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1803{ 1685{
1804 struct GNUNET_SERVICE_Client *client = cls; 1686 struct ClientState *cs = cls;
1805 struct LazyCopyRequest *cr; 1687 struct LazyCopyRequest *cr;
1806 struct Set *set; 1688 struct Set *set;
1807 int found; 1689 int found;
1808 1690
1809 if (NULL != set_get (client)) 1691 if (NULL != cs->set)
1810 { 1692 {
1811 /* There can only be one set per client */ 1693 /* There can only be one set per client */
1812 GNUNET_break (0); 1694 GNUNET_break (0);
1813 GNUNET_SERVICE_client_drop (client); 1695 GNUNET_SERVICE_client_drop (cs->client);
1814 return; 1696 return;
1815 } 1697 }
1816
1817 found = GNUNET_NO; 1698 found = GNUNET_NO;
1818
1819 for (cr = lazy_copy_head; NULL != cr; cr = cr->next) 1699 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1820 { 1700 {
1821 if (cr->cookie == msg->cookie) 1701 if (cr->cookie == msg->cookie)
@@ -1824,21 +1704,20 @@ handle_client_copy_lazy_connect (void *cls,
1824 break; 1704 break;
1825 } 1705 }
1826 } 1706 }
1827
1828 if (GNUNET_NO == found) 1707 if (GNUNET_NO == found)
1829 { 1708 {
1830 /* client asked for copy with cookie we don't know */ 1709 /* client asked for copy with cookie we don't know */
1831 GNUNET_break (0); 1710 GNUNET_break (0);
1832 GNUNET_SERVICE_client_drop (client); 1711 GNUNET_SERVICE_client_drop (cs->client);
1833 return; 1712 return;
1834 } 1713 }
1835
1836 GNUNET_CONTAINER_DLL_remove (lazy_copy_head, 1714 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1837 lazy_copy_tail, 1715 lazy_copy_tail,
1838 cr); 1716 cr);
1839 1717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718 "Client %p requested use of lazy copy\n",
1719 cs);
1840 set = GNUNET_new (struct Set); 1720 set = GNUNET_new (struct Set);
1841
1842 switch (cr->source_set->operation) 1721 switch (cr->source_set->operation)
1843 { 1722 {
1844 case GNUNET_SET_OPERATION_INTERSECTION: 1723 case GNUNET_SET_OPERATION_INTERSECTION:
@@ -1858,37 +1737,28 @@ handle_client_copy_lazy_connect (void *cls,
1858 GNUNET_break (0); 1737 GNUNET_break (0);
1859 GNUNET_free (set); 1738 GNUNET_free (set);
1860 GNUNET_free (cr); 1739 GNUNET_free (cr);
1861 GNUNET_SERVICE_client_drop (client); 1740 GNUNET_SERVICE_client_drop (cs->client);
1862 return; 1741 return;
1863 } 1742 }
1864 1743
1865 set->operation = cr->source_set->operation; 1744 set->operation = cr->source_set->operation;
1866 set->state = set->vt->copy_state (cr->source_set); 1745 set->state = set->vt->copy_state (cr->source_set->state);
1867 set->content = cr->source_set->content; 1746 set->content = cr->source_set->content;
1868 set->content->refcount += 1; 1747 set->content->refcount++;
1869 1748
1870 set->current_generation = cr->source_set->current_generation; 1749 set->current_generation = cr->source_set->current_generation;
1871 set->excluded_generations_size = cr->source_set->excluded_generations_size; 1750 set->excluded_generations_size = cr->source_set->excluded_generations_size;
1872 set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations, 1751 set->excluded_generations
1873 set->excluded_generations_size * sizeof (struct GenerationRange)); 1752 = GNUNET_memdup (cr->source_set->excluded_generations,
1753 set->excluded_generations_size * sizeof (struct GenerationRange));
1874 1754
1875 /* Advance the generation of the new set, so that mutations to the 1755 /* Advance the generation of the new set, so that mutations to the
1876 of the cloned set and the source set are independent. */ 1756 of the cloned set and the source set are independent. */
1877 advance_generation (set); 1757 advance_generation (set);
1878 1758 set->cs = cs;
1879 1759 cs->set = set;
1880 set->client = client;
1881 set->client_mq = GNUNET_SERVICE_client_get_mq (client);
1882 GNUNET_CONTAINER_DLL_insert (sets_head,
1883 sets_tail,
1884 set);
1885
1886 GNUNET_free (cr); 1760 GNUNET_free (cr);
1887 1761 GNUNET_SERVICE_client_continue (cs->client);
1888 GNUNET_SERVICE_client_continue (client);
1889
1890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1891 "Client connected to lazy set\n");
1892} 1762}
1893 1763
1894 1764
@@ -1902,26 +1772,22 @@ static void
1902handle_client_cancel (void *cls, 1772handle_client_cancel (void *cls,
1903 const struct GNUNET_SET_CancelMessage *msg) 1773 const struct GNUNET_SET_CancelMessage *msg)
1904{ 1774{
1905 struct GNUNET_SERVICE_Client *client = cls; 1775 struct ClientState *cs = cls;
1906 struct Set *set; 1776 struct Set *set;
1907 struct Operation *op; 1777 struct Operation *op;
1908 int found; 1778 int found;
1909 1779
1910 set = set_get (client); 1780 if (NULL == (set = cs->set))
1911 if (NULL == set)
1912 { 1781 {
1913 /* client without a set requested an operation */ 1782 /* client without a set requested an operation */
1914 GNUNET_break (0); 1783 GNUNET_break (0);
1915 GNUNET_SERVICE_client_drop (client); 1784 GNUNET_SERVICE_client_drop (cs->client);
1916 return; 1785 return;
1917 } 1786 }
1918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919 "Client requested cancel for op %u\n",
1920 (uint32_t) ntohl (msg->request_id));
1921 found = GNUNET_NO; 1787 found = GNUNET_NO;
1922 for (op = set->ops_head; NULL != op; op = op->next) 1788 for (op = set->ops_head; NULL != op; op = op->next)
1923 { 1789 {
1924 if (op->spec->client_request_id == ntohl (msg->request_id)) 1790 if (op->client_request_id == ntohl (msg->request_id))
1925 { 1791 {
1926 found = GNUNET_YES; 1792 found = GNUNET_YES;
1927 break; 1793 break;
@@ -1934,15 +1800,19 @@ handle_client_cancel (void *cls,
1934 * yet and try to cancel the (just barely non-existent) operation. 1800 * yet and try to cancel the (just barely non-existent) operation.
1935 * So this is not a hard error. 1801 * So this is not a hard error.
1936 */ 1802 */
1937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1803 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1938 "Client canceled non-existent op\n"); 1804 "Client canceled non-existent op %u\n",
1805 (uint32_t) ntohl (msg->request_id));
1939 } 1806 }
1940 else 1807 else
1941 { 1808 {
1809 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1810 "Client requested cancel for op %u\n",
1811 (uint32_t) ntohl (msg->request_id));
1942 _GSS_operation_destroy (op, 1812 _GSS_operation_destroy (op,
1943 GNUNET_YES); 1813 GNUNET_YES);
1944 } 1814 }
1945 GNUNET_SERVICE_client_continue (client); 1815 GNUNET_SERVICE_client_continue (cs->client);
1946} 1816}
1947 1817
1948 1818
@@ -1958,18 +1828,18 @@ static void
1958handle_client_accept (void *cls, 1828handle_client_accept (void *cls,
1959 const struct GNUNET_SET_AcceptMessage *msg) 1829 const struct GNUNET_SET_AcceptMessage *msg)
1960{ 1830{
1961 struct GNUNET_SERVICE_Client *client = cls; 1831 struct ClientState *cs = cls;
1962 struct Set *set; 1832 struct Set *set;
1963 struct Operation *op; 1833 struct Operation *op;
1964 struct GNUNET_SET_ResultMessage *result_message; 1834 struct GNUNET_SET_ResultMessage *result_message;
1965 struct GNUNET_MQ_Envelope *ev; 1835 struct GNUNET_MQ_Envelope *ev;
1836 struct Listener *listener;
1966 1837
1967 set = set_get (client); 1838 if (NULL == (set = cs->set))
1968 if (NULL == set)
1969 { 1839 {
1970 /* client without a set requested to accept */ 1840 /* client without a set requested to accept */
1971 GNUNET_break (0); 1841 GNUNET_break (0);
1972 GNUNET_SERVICE_client_drop (client); 1842 GNUNET_SERVICE_client_drop (cs->client);
1973 return; 1843 return;
1974 } 1844 }
1975 op = get_incoming (ntohl (msg->accept_reject_id)); 1845 op = get_incoming (ntohl (msg->accept_reject_id));
@@ -1977,72 +1847,75 @@ handle_client_accept (void *cls,
1977 { 1847 {
1978 /* It is not an error if the set op does not exist -- it may 1848 /* It is not an error if the set op does not exist -- it may
1979 * have been destroyed when the partner peer disconnected. */ 1849 * have been destroyed when the partner peer disconnected. */
1980 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1850 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1981 "Client accepted request that is no longer active\n"); 1851 "Client %p accepted request %u of listener %p that is no longer active\n",
1852 cs,
1853 ntohl (msg->accept_reject_id),
1854 cs->listener);
1982 ev = GNUNET_MQ_msg (result_message, 1855 ev = GNUNET_MQ_msg (result_message,
1983 GNUNET_MESSAGE_TYPE_SET_RESULT); 1856 GNUNET_MESSAGE_TYPE_SET_RESULT);
1984 result_message->request_id = msg->request_id; 1857 result_message->request_id = msg->request_id;
1985 result_message->element_type = 0;
1986 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); 1858 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1987 GNUNET_MQ_send (set->client_mq, ev); 1859 GNUNET_MQ_send (set->cs->mq,
1988 GNUNET_SERVICE_client_continue (client); 1860 ev);
1861 GNUNET_SERVICE_client_continue (cs->client);
1989 return; 1862 return;
1990 } 1863 }
1991
1992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1864 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993 "Client accepting request %u\n", 1865 "Client accepting request %u\n",
1994 (uint32_t) ntohl (msg->accept_reject_id)); 1866 (uint32_t) ntohl (msg->accept_reject_id));
1995 GNUNET_assert (GNUNET_YES == op->is_incoming); 1867 listener = op->listener;
1996 op->is_incoming = GNUNET_NO; 1868 op->listener = NULL;
1997 GNUNET_CONTAINER_DLL_remove (incoming_head, 1869 GNUNET_CONTAINER_DLL_remove (listener->op_head,
1998 incoming_tail, 1870 listener->op_tail,
1999 op); 1871 op);
2000 op->spec->set = set; 1872 op->set = set;
2001 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1873 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2002 set->ops_tail, 1874 set->ops_tail,
2003 op); 1875 op);
2004 op->spec->client_request_id = ntohl (msg->request_id); 1876 op->client_request_id = ntohl (msg->request_id);
2005 op->spec->result_mode = ntohl (msg->result_mode); 1877 op->result_mode = ntohl (msg->result_mode);
2006 op->spec->byzantine = msg->byzantine; 1878 op->byzantine = msg->byzantine;
2007 op->spec->byzantine_lower_bound = msg->byzantine_lower_bound; 1879 op->byzantine_lower_bound = msg->byzantine_lower_bound;
2008 op->spec->force_full = msg->force_full; 1880 op->force_full = msg->force_full;
2009 op->spec->force_delta = msg->force_delta; 1881 op->force_delta = msg->force_delta;
2010 1882
2011 // Advance generation values, so that 1883 /* Advance generation values, so that future mutations do not
2012 // mutations won't interfer with the running operation. 1884 interfer with the running operation. */
2013 op->generation_created = set->current_generation; 1885 op->generation_created = set->current_generation;
2014 advance_generation (set); 1886 advance_generation (set);
2015 1887 GNUNET_assert (NULL == op->state);
2016 op->vt = set->vt; 1888 op->state = set->vt->accept (op);
2017 op->operation = set->operation; 1889 if (NULL == op->state)
2018 op->vt->accept (op); 1890 {
2019 GNUNET_SERVICE_client_continue (client); 1891 GNUNET_break (0);
1892 GNUNET_SERVICE_client_drop (cs->client);
1893 return;
1894 }
1895 /* Now allow CADET to continue, as we did not do this in
1896 #handle_incoming_msg (as we wanted to first see if the
1897 local client would accept the request). */
1898 GNUNET_CADET_receive_done (op->channel);
1899 GNUNET_SERVICE_client_continue (cs->client);
2020} 1900}
2021 1901
2022 1902
2023/** 1903/**
2024 * Called to clean up, after a shutdown has been requested. 1904 * Called to clean up, after a shutdown has been requested.
2025 * 1905 *
2026 * @param cls closure 1906 * @param cls closure, NULL
2027 */ 1907 */
2028static void 1908static void
2029shutdown_task (void *cls) 1909shutdown_task (void *cls)
2030{ 1910{
2031 while (NULL != incoming_head) 1911 /* Delay actual shutdown to allow service to disconnect clients */
2032 incoming_destroy (incoming_head);
2033 while (NULL != listeners_head)
2034 listener_destroy (listeners_head);
2035 while (NULL != sets_head)
2036 set_destroy (sets_head);
2037
2038 /* it's important to destroy cadet at the end, as all channels
2039 * must be destroyed before the cadet handle! */
2040 if (NULL != cadet) 1912 if (NULL != cadet)
2041 { 1913 {
2042 GNUNET_CADET_disconnect (cadet); 1914 GNUNET_CADET_disconnect (cadet);
2043 cadet = NULL; 1915 cadet = NULL;
2044 } 1916 }
2045 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); 1917 GNUNET_STATISTICS_destroy (_GSS_statistics,
1918 GNUNET_YES);
2046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047 "handled shutdown request\n"); 1920 "handled shutdown request\n");
2048} 1921}
@@ -2061,15 +1934,19 @@ run (void *cls,
2061 const struct GNUNET_CONFIGURATION_Handle *cfg, 1934 const struct GNUNET_CONFIGURATION_Handle *cfg,
2062 struct GNUNET_SERVICE_Handle *service) 1935 struct GNUNET_SERVICE_Handle *service)
2063{ 1936{
2064 configuration = cfg; 1937 /* FIXME: need to modify SERVICE (!) API to allow
1938 us to run a shutdown task *after* clients were
1939 forcefully disconnected! */
2065 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1940 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2066 NULL); 1941 NULL);
2067 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); 1942 _GSS_statistics = GNUNET_STATISTICS_create ("set",
1943 cfg);
2068 cadet = GNUNET_CADET_connecT (cfg); 1944 cadet = GNUNET_CADET_connecT (cfg);
2069 if (NULL == cadet) 1945 if (NULL == cadet)
2070 { 1946 {
2071 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1947 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2072 _("Could not connect to CADET service\n")); 1948 _("Could not connect to CADET service\n"));
1949 GNUNET_SCHEDULER_shutdown ();
2073 return; 1950 return;
2074 } 1951 }
2075} 1952}
@@ -2095,7 +1972,7 @@ GNUNET_SERVICE_MAIN
2095 NULL), 1972 NULL),
2096 GNUNET_MQ_hd_var_size (client_mutation, 1973 GNUNET_MQ_hd_var_size (client_mutation,
2097 GNUNET_MESSAGE_TYPE_SET_ADD, 1974 GNUNET_MESSAGE_TYPE_SET_ADD,
2098 struct GNUNET_MessageHeader, 1975 struct GNUNET_SET_ElementMessage,
2099 NULL), 1976 NULL),
2100 GNUNET_MQ_hd_fixed_size (client_create_set, 1977 GNUNET_MQ_hd_fixed_size (client_create_set,
2101 GNUNET_MESSAGE_TYPE_SET_CREATE, 1978 GNUNET_MESSAGE_TYPE_SET_CREATE,
@@ -2119,7 +1996,7 @@ GNUNET_SERVICE_MAIN
2119 NULL), 1996 NULL),
2120 GNUNET_MQ_hd_var_size (client_mutation, 1997 GNUNET_MQ_hd_var_size (client_mutation,
2121 GNUNET_MESSAGE_TYPE_SET_REMOVE, 1998 GNUNET_MESSAGE_TYPE_SET_REMOVE,
2122 struct GNUNET_MessageHeader, 1999 struct GNUNET_SET_ElementMessage,
2123 NULL), 2000 NULL),
2124 GNUNET_MQ_hd_fixed_size (client_cancel, 2001 GNUNET_MQ_hd_fixed_size (client_cancel,
2125 GNUNET_MESSAGE_TYPE_SET_CANCEL, 2002 GNUNET_MESSAGE_TYPE_SET_CANCEL,
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 86313d179..19413fd30 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013, 2014 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -68,92 +68,13 @@ struct Operation;
68 68
69 69
70/** 70/**
71 * Detail information about an operation.
72 */
73struct OperationSpecification
74{
75
76 /**
77 * The remove peer we evaluate the operation with.
78 */
79 struct GNUNET_PeerIdentity peer;
80
81 /**
82 * Application ID for the operation, used to distinguish
83 * multiple operations of the same type with the same peer.
84 */
85 struct GNUNET_HashCode app_id;
86
87 /**
88 * Context message, may be NULL.
89 */
90 struct GNUNET_MessageHeader *context_msg;
91
92 /**
93 * Set associated with the operation, NULL until the spec has been
94 * associated with a set.
95 */
96 struct Set *set;
97
98 /**
99 * Salt to use for the operation.
100 */
101 uint32_t salt;
102
103 /**
104 * Remote peers element count
105 */
106 uint32_t remote_element_count;
107
108 /**
109 * ID used to identify an operation between service and client
110 */
111 uint32_t client_request_id;
112
113 /**
114 * The type of the operation.
115 */
116 enum GNUNET_SET_OperationType operation;
117
118 /**
119 * When are elements sent to the client, and which elements are sent?
120 */
121 enum GNUNET_SET_ResultMode result_mode;
122
123 /**
124 * Always use delta operation instead of sending full sets,
125 * even it it's less efficient.
126 */
127 int force_delta;
128
129 /**
130 * Always send full sets, even if delta operations would
131 * be more efficient.
132 */
133 int force_full;
134
135 /**
136 * #GNUNET_YES to fail operations where Byzantine faults
137 * are suspected
138 */
139 int byzantine;
140
141 /**
142 * Lower bound for the set size, used only when
143 * byzantine mode is enabled.
144 */
145 int byzantine_lower_bound;
146};
147
148
149/**
150 * Signature of functions that create the implementation-specific 71 * Signature of functions that create the implementation-specific
151 * state for a set supporting a specific operation. 72 * state for a set supporting a specific operation.
152 * 73 *
153 * @return a set state specific to the supported operation, NULL on error 74 * @return a set state specific to the supported operation, NULL on error
154 */ 75 */
155typedef struct SetState * 76typedef struct SetState *
156(*CreateImpl) (void); 77(*SetCreateImpl) (void);
157 78
158 79
159/** 80/**
@@ -164,18 +85,18 @@ typedef struct SetState *
164 * @param ee element message from the client 85 * @param ee element message from the client
165 */ 86 */
166typedef void 87typedef void
167(*AddRemoveImpl) (struct SetState *state, 88(*SetAddRemoveImpl) (struct SetState *state,
168 struct ElementEntry *ee); 89 struct ElementEntry *ee);
169 90
170 91
171/** 92/**
172 * Signature of functions that handle disconnection of the remote 93 * Make a copy of a set's internal state.
173 * peer.
174 * 94 *
175 * @param op the set operation, contains implementation-specific data 95 * @param state set state to copy
96 * @return copy of the internal state
176 */ 97 */
177typedef void 98typedef struct SetState *
178(*PeerDisconnectImpl) (struct Operation *op); 99(*SetCopyStateImpl) (struct SetState *state);
179 100
180 101
181/** 102/**
@@ -185,7 +106,7 @@ typedef void
185 * @param state the set state, contains implementation-specific data 106 * @param state the set state, contains implementation-specific data
186 */ 107 */
187typedef void 108typedef void
188(*DestroySetImpl) (struct SetState *state); 109(*SetDestroyImpl) (struct SetState *state);
189 110
190 111
191/** 112/**
@@ -193,8 +114,9 @@ typedef void
193 * 114 *
194 * @param op operation that is created by accepting the operation, 115 * @param op operation that is created by accepting the operation,
195 * should be initialized by the implementation 116 * should be initialized by the implementation
117 * @return operation-specific state to keep in @a op
196 */ 118 */
197typedef void 119typedef struct OperationState *
198(*OpAcceptImpl) (struct Operation *op); 120(*OpAcceptImpl) (struct Operation *op);
199 121
200 122
@@ -206,23 +128,31 @@ typedef void
206 * begin the evaluation 128 * begin the evaluation
207 * @param opaque_context message to be transmitted to the listener 129 * @param opaque_context message to be transmitted to the listener
208 * to convince him to accept, may be NULL 130 * to convince him to accept, may be NULL
131 * @return operation-specific state to keep in @a op
209 */ 132 */
210typedef void 133typedef struct OperationState *
211(*OpEvaluateImpl) (struct Operation *op, 134(*OpEvaluateImpl) (struct Operation *op,
212 const struct GNUNET_MessageHeader *opaque_context); 135 const struct GNUNET_MessageHeader *opaque_context);
213 136
214
215/** 137/**
216 * Signature of functions that implement operation cancellation 138 * Signature of functions that implement operation cancelation.
139 * This includes notifying the client about the operation's final
140 * state.
217 * 141 *
218 * @param op operation state 142 * @param op operation state
219 */ 143 */
220typedef void 144typedef void
221(*CancelImpl) (struct Operation *op); 145(*OpCancelImpl) (struct Operation *op);
222 146
223 147
224typedef struct SetState * 148/**
225(*CopyStateImpl) (struct Set *op); 149 * Signature of functions called when the CADET channel died.
150 *
151 * @param op operation state
152 */
153typedef void
154(*OpChannelDeathImpl) (struct Operation *op);
155
226 156
227 157
228/** 158/**
@@ -234,17 +164,27 @@ struct SetVT
234 /** 164 /**
235 * Callback for the set creation. 165 * Callback for the set creation.
236 */ 166 */
237 CreateImpl create; 167 SetCreateImpl create;
238 168
239 /** 169 /**
240 * Callback for element insertion 170 * Callback for element insertion
241 */ 171 */
242 AddRemoveImpl add; 172 SetAddRemoveImpl add;
243 173
244 /** 174 /**
245 * Callback for element removal. 175 * Callback for element removal.
246 */ 176 */
247 AddRemoveImpl remove; 177 SetAddRemoveImpl remove;
178
179 /**
180 * Callback for making a copy of a set's internal state.
181 */
182 SetCopyStateImpl copy_state;
183
184 /**
185 * Callback for destruction of the set state.
186 */
187 SetDestroyImpl destroy_set;
248 188
249 /** 189 /**
250 * Callback for accepting a set operation request 190 * Callback for accepting a set operation request
@@ -257,21 +197,15 @@ struct SetVT
257 OpEvaluateImpl evaluate; 197 OpEvaluateImpl evaluate;
258 198
259 /** 199 /**
260 * Callback for destruction of the set state. 200 * Callback for canceling an operation.
261 */
262 DestroySetImpl destroy_set;
263
264 /**
265 * Callback for handling the remote peer's disconnect.
266 */ 201 */
267 PeerDisconnectImpl peer_disconnect; 202 OpCancelImpl cancel;
268 203
269 /** 204 /**
270 * Callback for canceling an operation by its ID. 205 * Callback called in case the CADET channel died.
271 */ 206 */
272 CancelImpl cancel; 207 OpChannelDeathImpl channel_death;
273 208
274 CopyStateImpl copy_state;
275}; 209};
276 210
277 211
@@ -341,20 +275,56 @@ struct ElementEntry
341}; 275};
342 276
343 277
278/**
279 * A listener is inhabited by a client, and waits for evaluation
280 * requests from remote peers.
281 */
344struct Listener; 282struct Listener;
345 283
346 284
347/** 285/**
286 * State we keep per client.
287 */
288struct ClientState
289{
290 /**
291 * Set, if associated with the client, otherwise NULL.
292 */
293 struct Set *set;
294
295 /**
296 * Listener, if associated with the client, otherwise NULL.
297 */
298 struct Listener *listener;
299
300 /**
301 * Client handle.
302 */
303 struct GNUNET_SERVICE_Client *client;
304
305 /**
306 * Message queue.
307 */
308 struct GNUNET_MQ_Handle *mq;
309
310};
311
312
313/**
348 * Operation context used to execute a set operation. 314 * Operation context used to execute a set operation.
349 */ 315 */
350struct Operation 316struct Operation
351{ 317{
318
352 /** 319 /**
353 * V-Table for the operation belonging to the tunnel contest. 320 * Kept in a DLL of the listener, if @e listener is non-NULL.
354 *
355 * Used for all operation specific operations after receiving the ops request
356 */ 321 */
357 const struct SetVT *vt; 322 struct Operation *next;
323
324 /**
325 * Kept in a DLL of the listener, if @e listener is non-NULL.
326 */
327 struct Operation *prev;
358 328
359 /** 329 /**
360 * Channel to the peer. 330 * Channel to the peer.
@@ -372,11 +342,15 @@ struct Operation
372 struct GNUNET_MQ_Handle *mq; 342 struct GNUNET_MQ_Handle *mq;
373 343
374 /** 344 /**
375 * Detail information about the set operation, including the set to 345 * Context message, may be NULL.
376 * use. When 'spec' is NULL, the operation is not yet entirely 346 */
377 * initialized. 347 struct GNUNET_MessageHeader *context_msg;
348
349 /**
350 * Set associated with the operation, NULL until the spec has been
351 * associated with a set.
378 */ 352 */
379 struct OperationSpecification *spec; 353 struct Set *set;
380 354
381 /** 355 /**
382 * Operation-specific operation state. Note that the exact 356 * Operation-specific operation state. Note that the exact
@@ -386,16 +360,6 @@ struct Operation
386 struct OperationState *state; 360 struct OperationState *state;
387 361
388 /** 362 /**
389 * Evaluate operations are held in a linked list.
390 */
391 struct Operation *next;
392
393 /**
394 * Evaluate operations are held in a linked list.
395 */
396 struct Operation *prev;
397
398 /**
399 * The identity of the requesting peer. Needs to 363 * The identity of the requesting peer. Needs to
400 * be stored here as the op spec might not have been created yet. 364 * be stored here as the op spec might not have been created yet.
401 */ 365 */
@@ -408,9 +372,48 @@ struct Operation
408 struct GNUNET_SCHEDULER_Task *timeout_task; 372 struct GNUNET_SCHEDULER_Task *timeout_task;
409 373
410 /** 374 /**
411 * The type of the operation. 375 * Salt to use for the operation.
412 */ 376 */
413 enum GNUNET_SET_OperationType operation; 377 uint32_t salt;
378
379 /**
380 * Remote peers element count
381 */
382 uint32_t remote_element_count;
383
384 /**
385 * ID used to identify an operation between service and client
386 */
387 uint32_t client_request_id;
388
389 /**
390 * When are elements sent to the client, and which elements are sent?
391 */
392 enum GNUNET_SET_ResultMode result_mode;
393
394 /**
395 * Always use delta operation instead of sending full sets,
396 * even it it's less efficient.
397 */
398 int force_delta;
399
400 /**
401 * Always send full sets, even if delta operations would
402 * be more efficient.
403 */
404 int force_full;
405
406 /**
407 * #GNUNET_YES to fail operations where Byzantine faults
408 * are suspected
409 */
410 int byzantine;
411
412 /**
413 * Lower bound for the set size, used only when
414 * byzantine mode is enabled.
415 */
416 int byzantine_lower_bound;
414 417
415 /** 418 /**
416 * Unique request id for the request from a remote peer, sent to the 419 * Unique request id for the request from a remote peer, sent to the
@@ -420,45 +423,26 @@ struct Operation
420 uint32_t suggest_id; 423 uint32_t suggest_id;
421 424
422 /** 425 /**
423 * #GNUNET_YES if this is not a "real" set operation yet, and we still
424 * need to wait for the other peer to give us more details.
425 */
426 int is_incoming;
427
428 /**
429 * Generation in which the operation handle 426 * Generation in which the operation handle
430 * was created. 427 * was created.
431 */ 428 */
432 unsigned int generation_created; 429 unsigned int generation_created;
433 430
434 /**
435 * Incremented whenever (during shutdown) some component still
436 * needs to do something with this before the operation is freed.
437 * (Used as a reference counter, but only during termination.)
438 */
439 unsigned int keep;
440}; 431};
441 432
442 433
443/** 434/**
444 * SetContent stores the actual set elements, 435 * SetContent stores the actual set elements, which may be shared by
445 * which may be shared by multiple generations derived 436 * multiple generations derived from one set.
446 * from one set.
447 */ 437 */
448struct SetContent 438struct SetContent
449{ 439{
450 /**
451 * Number of references to the content.
452 */
453 unsigned int refcount;
454 440
455 /** 441 /**
456 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`. 442 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
457 */ 443 */
458 struct GNUNET_CONTAINER_MultiHashMap *elements; 444 struct GNUNET_CONTAINER_MultiHashMap *elements;
459 445
460 unsigned int latest_generation;
461
462 /** 446 /**
463 * Mutations requested by the client that we're 447 * Mutations requested by the client that we're
464 * unable to execute right now because we're iterating 448 * unable to execute right now because we're iterating
@@ -474,6 +458,16 @@ struct SetContent
474 struct PendingMutation *pending_mutations_tail; 458 struct PendingMutation *pending_mutations_tail;
475 459
476 /** 460 /**
461 * Number of references to the content.
462 */
463 unsigned int refcount;
464
465 /**
466 * FIXME: document!
467 */
468 unsigned int latest_generation;
469
470 /**
477 * Number of concurrently active iterators. 471 * Number of concurrently active iterators.
478 */ 472 */
479 int iterator_count; 473 int iterator_count;
@@ -494,11 +488,24 @@ struct GenerationRange
494}; 488};
495 489
496 490
491/**
492 * Information about a mutation to apply to a set.
493 */
497struct PendingMutation 494struct PendingMutation
498{ 495{
496 /**
497 * Mutations are kept in a DLL.
498 */
499 struct PendingMutation *prev; 499 struct PendingMutation *prev;
500
501 /**
502 * Mutations are kept in a DLL.
503 */
500 struct PendingMutation *next; 504 struct PendingMutation *next;
501 505
506 /**
507 * Set this mutation is about.
508 */
502 struct Set *set; 509 struct Set *set;
503 510
504 /** 511 /**
@@ -506,7 +513,7 @@ struct PendingMutation
506 * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or 513 * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or
507 * #GNUNET_MESSAGE_TYPE_SET_REMOVE. 514 * #GNUNET_MESSAGE_TYPE_SET_REMOVE.
508 */ 515 */
509 struct GNUNET_MessageHeader *mutation_message; 516 struct GNUNET_SET_ElementMessage *msg;
510}; 517};
511 518
512 519
@@ -530,12 +537,13 @@ struct Set
530 * Client that owns the set. Only one client may own a set, 537 * Client that owns the set. Only one client may own a set,
531 * and there can only be one set per client. 538 * and there can only be one set per client.
532 */ 539 */
533 struct GNUNET_SERVICE_Client *client; 540 struct ClientState *cs;
534 541
535 /** 542 /**
536 * Message queue for the client. 543 * Content, possibly shared by multiple sets,
544 * and thus reference counted.
537 */ 545 */
538 struct GNUNET_MQ_Handle *client_mq; 546 struct SetContent *content;
539 547
540 /** 548 /**
541 * Virtual table for this set. Determined by the operation type of 549 * Virtual table for this set. Determined by the operation type of
@@ -568,15 +576,15 @@ struct Set
568 struct Operation *ops_tail; 576 struct Operation *ops_tail;
569 577
570 /** 578 /**
571 * Current generation, that is, number of previously executed 579 * List of generations we have to exclude, due to lazy copies.
572 * operations and lazy copies on the underlying set content.
573 */ 580 */
574 unsigned int current_generation; 581 struct GenerationRange *excluded_generations;
575 582
576 /** 583 /**
577 * List of generations we have to exclude, due to lazy copies. 584 * Current generation, that is, number of previously executed
585 * operations and lazy copies on the underlying set content.
578 */ 586 */
579 struct GenerationRange *excluded_generations; 587 unsigned int current_generation;
580 588
581 /** 589 /**
582 * Number of elements in array @a excluded_generations. 590 * Number of elements in array @a excluded_generations.
@@ -589,21 +597,16 @@ struct Set
589 enum GNUNET_SET_OperationType operation; 597 enum GNUNET_SET_OperationType operation;
590 598
591 /** 599 /**
592 * Each @e iter is assigned a unique number, so that the client
593 * can distinguish iterations.
594 */
595 uint16_t iteration_id;
596
597 /**
598 * Generation we're currently iteration over. 600 * Generation we're currently iteration over.
599 */ 601 */
600 unsigned int iter_generation; 602 unsigned int iter_generation;
601 603
602 /** 604 /**
603 * Content, possibly shared by multiple sets, 605 * Each @e iter is assigned a unique number, so that the client
604 * and thus reference counted. 606 * can distinguish iterations.
605 */ 607 */
606 struct SetContent *content; 608 uint16_t iteration_id;
609
607}; 610};
608 611
609 612
@@ -611,10 +614,14 @@ extern struct GNUNET_STATISTICS_Handle *_GSS_statistics;
611 614
612 615
613/** 616/**
614 * Destroy the given operation. Call the implementation-specific 617 * Destroy the given operation. Used for any operation where both
615 * cancel function of the operation. Disconnects from the remote 618 * peers were known and that thus actually had a vt and channel. Must
616 * peer. Does not disconnect the client, as there may be multiple 619 * not be used for operations where 'listener' is still set and we do
617 * operations per set. 620 * not know the other peer.
621 *
622 * Call the implementation-specific cancel function of the operation.
623 * Disconnects from the remote peer. Does not disconnect the client,
624 * as there may be multiple operations per set.
618 * 625 *
619 * @param op operation to destroy 626 * @param op operation to destroy
620 * @param gc #GNUNET_YES to perform garbage collection on the set 627 * @param gc #GNUNET_YES to perform garbage collection on the set
@@ -642,10 +649,13 @@ const struct SetVT *
642_GSS_intersection_vt (void); 649_GSS_intersection_vt (void);
643 650
644 651
645int 652/**
646_GSS_is_element_of_set (struct ElementEntry *ee, 653 * Is element @a ee part of the set used by @a op?
647 struct Set *set); 654 *
648 655 * @param ee element to test
656 * @param op operation the defines the set and its generation
657 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
658 */
649int 659int
650_GSS_is_element_of_operation (struct ElementEntry *ee, 660_GSS_is_element_of_operation (struct ElementEntry *ee,
651 struct Operation *op); 661 struct Operation *op);
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 8307672b9..9dc421792 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -56,6 +56,18 @@ enum IntersectionOperationPhase
56 PHASE_BF_EXCHANGE, 56 PHASE_BF_EXCHANGE,
57 57
58 /** 58 /**
59 * We must next send the P2P DONE message (after finishing mostly
60 * with the local client). Then we will wait for the channel to close.
61 */
62 PHASE_MUST_SEND_DONE,
63
64 /**
65 * We have received the P2P DONE message, and must finish with the
66 * local client before terminating the channel.
67 */
68 PHASE_DONE_RECEIVED,
69
70 /**
59 * The protocol is over. Results may still have to be sent to the 71 * The protocol is over. Results may still have to be sent to the
60 * client. 72 * client.
61 */ 73 */
@@ -162,6 +174,13 @@ struct OperationState
162 * Did we send the client that we are done? 174 * Did we send the client that we are done?
163 */ 175 */
164 int client_done_sent; 176 int client_done_sent;
177
178 /**
179 * Set whenever we reach the state where the death of the
180 * channel is perfectly find and should NOT result in the
181 * operation being cancelled.
182 */
183 int channel_death_expected;
165}; 184};
166 185
167 186
@@ -193,12 +212,12 @@ send_client_removed_element (struct Operation *op,
193 struct GNUNET_MQ_Envelope *ev; 212 struct GNUNET_MQ_Envelope *ev;
194 struct GNUNET_SET_ResultMessage *rm; 213 struct GNUNET_SET_ResultMessage *rm;
195 214
196 if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) 215 if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
197 return; /* Wrong mode for transmitting removed elements */ 216 return; /* Wrong mode for transmitting removed elements */
198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
199 "Sending removed element (size %u) to client\n", 218 "Sending removed element (size %u) to client\n",
200 element->size); 219 element->size);
201 GNUNET_assert (0 != op->spec->client_request_id); 220 GNUNET_assert (0 != op->client_request_id);
202 ev = GNUNET_MQ_msg_extra (rm, 221 ev = GNUNET_MQ_msg_extra (rm,
203 element->size, 222 element->size,
204 GNUNET_MESSAGE_TYPE_SET_RESULT); 223 GNUNET_MESSAGE_TYPE_SET_RESULT);
@@ -208,12 +227,12 @@ send_client_removed_element (struct Operation *op,
208 return; 227 return;
209 } 228 }
210 rm->result_status = htons (GNUNET_SET_STATUS_OK); 229 rm->result_status = htons (GNUNET_SET_STATUS_OK);
211 rm->request_id = htonl (op->spec->client_request_id); 230 rm->request_id = htonl (op->client_request_id);
212 rm->element_type = element->element_type; 231 rm->element_type = element->element_type;
213 GNUNET_memcpy (&rm[1], 232 GNUNET_memcpy (&rm[1],
214 element->data, 233 element->data,
215 element->size); 234 element->size);
216 GNUNET_MQ_send (op->spec->set->client_mq, 235 GNUNET_MQ_send (op->set->cs->mq,
217 ev); 236 ev);
218} 237}
219 238
@@ -397,9 +416,9 @@ fail_intersection_operation (struct Operation *op)
397 ev = GNUNET_MQ_msg (msg, 416 ev = GNUNET_MQ_msg (msg,
398 GNUNET_MESSAGE_TYPE_SET_RESULT); 417 GNUNET_MESSAGE_TYPE_SET_RESULT);
399 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 418 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
400 msg->request_id = htonl (op->spec->client_request_id); 419 msg->request_id = htonl (op->client_request_id);
401 msg->element_type = htons (0); 420 msg->element_type = htons (0);
402 GNUNET_MQ_send (op->spec->set->client_mq, 421 GNUNET_MQ_send (op->set->cs->mq,
403 ev); 422 ev);
404 _GSS_operation_destroy (op, 423 _GSS_operation_destroy (op,
405 GNUNET_YES); 424 GNUNET_YES);
@@ -428,8 +447,8 @@ send_bloomfilter (struct Operation *op)
428 should use more bits to maximize its set reduction 447 should use more bits to maximize its set reduction
429 potential and minimize overall bandwidth consumption. */ 448 potential and minimize overall bandwidth consumption. */
430 bf_elementbits = 2 + ceil (log2((double) 449 bf_elementbits = 2 + ceil (log2((double)
431 (op->spec->remote_element_count / 450 (op->remote_element_count /
432 (double) op->state->my_element_count))); 451 (double) op->state->my_element_count)));
433 if (bf_elementbits < 1) 452 if (bf_elementbits < 1)
434 bf_elementbits = 1; /* make sure k is not 0 */ 453 bf_elementbits = 1; /* make sure k is not 0 */
435 /* optimize BF-size to ~50% of bits set */ 454 /* optimize BF-size to ~50% of bits set */
@@ -515,12 +534,14 @@ send_client_done_and_destroy (void *cls)
515 struct GNUNET_MQ_Envelope *ev; 534 struct GNUNET_MQ_Envelope *ev;
516 struct GNUNET_SET_ResultMessage *rm; 535 struct GNUNET_SET_ResultMessage *rm;
517 536
537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538 "Intersection succeeded, sending DONE to local client\n");
518 ev = GNUNET_MQ_msg (rm, 539 ev = GNUNET_MQ_msg (rm,
519 GNUNET_MESSAGE_TYPE_SET_RESULT); 540 GNUNET_MESSAGE_TYPE_SET_RESULT);
520 rm->request_id = htonl (op->spec->client_request_id); 541 rm->request_id = htonl (op->client_request_id);
521 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 542 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
522 rm->element_type = htons (0); 543 rm->element_type = htons (0);
523 GNUNET_MQ_send (op->spec->set->client_mq, 544 GNUNET_MQ_send (op->set->cs->mq,
524 ev); 545 ev);
525 _GSS_operation_destroy (op, 546 _GSS_operation_destroy (op,
526 GNUNET_YES); 547 GNUNET_YES);
@@ -528,6 +549,53 @@ send_client_done_and_destroy (void *cls)
528 549
529 550
530/** 551/**
552 * Remember that we are done dealing with the local client
553 * AND have sent the other peer our message that we are done,
554 * so we are not just waiting for the channel to die before
555 * telling the local client that we are done as our last act.
556 *
557 * @param cls the `struct Operation`.
558 */
559static void
560finished_local_operations (void *cls)
561{
562 struct Operation *op = cls;
563
564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565 "DONE sent to other peer, now waiting for other end to close the channel\n");
566 op->state->phase = PHASE_FINISHED;
567 op->state->channel_death_expected = GNUNET_YES;
568}
569
570
571/**
572 * Notify the other peer that we are done. Once this message
573 * is out, we still need to notify the local client that we
574 * are done.
575 *
576 * @param op operation to notify for.
577 */
578static void
579send_p2p_done (struct Operation *op)
580{
581 struct GNUNET_MQ_Envelope *ev;
582 struct IntersectionDoneMessage *idm;
583
584 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
585 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
586 ev = GNUNET_MQ_msg (idm,
587 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
588 idm->final_element_count = htonl (op->state->my_element_count);
589 idm->element_xor_hash = op->state->my_xor;
590 GNUNET_MQ_notify_sent (ev,
591 &finished_local_operations,
592 op);
593 GNUNET_MQ_send (op->mq,
594 ev);
595}
596
597
598/**
531 * Send all elements in the full result iterator. 599 * Send all elements in the full result iterator.
532 * 600 *
533 * @param cls the `struct Operation *` 601 * @param cls the `struct Operation *`
@@ -550,10 +618,21 @@ send_remaining_elements (void *cls)
550 { 618 {
551 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
552 "Sending done and destroy because iterator ran out\n"); 620 "Sending done and destroy because iterator ran out\n");
553 op->keep--;
554 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); 621 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
555 op->state->full_result_iter = NULL; 622 op->state->full_result_iter = NULL;
556 send_client_done_and_destroy (op); 623 if (PHASE_DONE_RECEIVED == op->state->phase)
624 {
625 op->state->phase = PHASE_FINISHED;
626 send_client_done_and_destroy (op);
627 }
628 else if (PHASE_MUST_SEND_DONE == op->state->phase)
629 {
630 send_p2p_done (op);
631 }
632 else
633 {
634 GNUNET_assert (0);
635 }
557 return; 636 return;
558 } 637 }
559 ee = nxt; 638 ee = nxt;
@@ -562,48 +641,136 @@ send_remaining_elements (void *cls)
562 "Sending element %s:%u to client (full set)\n", 641 "Sending element %s:%u to client (full set)\n",
563 GNUNET_h2s (&ee->element_hash), 642 GNUNET_h2s (&ee->element_hash),
564 element->size); 643 element->size);
565 GNUNET_assert (0 != op->spec->client_request_id); 644 GNUNET_assert (0 != op->client_request_id);
566 ev = GNUNET_MQ_msg_extra (rm, 645 ev = GNUNET_MQ_msg_extra (rm,
567 element->size, 646 element->size,
568 GNUNET_MESSAGE_TYPE_SET_RESULT); 647 GNUNET_MESSAGE_TYPE_SET_RESULT);
569 GNUNET_assert (NULL != ev); 648 GNUNET_assert (NULL != ev);
570 rm->result_status = htons (GNUNET_SET_STATUS_OK); 649 rm->result_status = htons (GNUNET_SET_STATUS_OK);
571 rm->request_id = htonl (op->spec->client_request_id); 650 rm->request_id = htonl (op->client_request_id);
572 rm->element_type = element->element_type; 651 rm->element_type = element->element_type;
573 GNUNET_memcpy (&rm[1], 652 GNUNET_memcpy (&rm[1],
574 element->data, 653 element->data,
575 element->size); 654 element->size);
576 GNUNET_MQ_notify_sent (ev, 655 GNUNET_MQ_notify_sent (ev,
577 &send_remaining_elements, 656 &send_remaining_elements,
578 op); 657 op);
579 GNUNET_MQ_send (op->spec->set->client_mq, 658 GNUNET_MQ_send (op->set->cs->mq,
580 ev); 659 ev);
581} 660}
582 661
583 662
584/** 663/**
585 * Inform the peer that this operation is complete. 664 * Fills the "my_elements" hashmap with the initial set of
665 * (non-deleted) elements from the set of the specification.
586 * 666 *
587 * @param op the intersection operation to fail 667 * @param cls closure with the `struct Operation *`
668 * @param key current key code for the element
669 * @param value value in the hash map with the `struct ElementEntry *`
670 * @return #GNUNET_YES (we should continue to iterate)
671 */
672static int
673initialize_map_unfiltered (void *cls,
674 const struct GNUNET_HashCode *key,
675 void *value)
676{
677 struct ElementEntry *ee = value;
678 struct Operation *op = cls;
679
680 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
681 return GNUNET_YES; /* element not live in operation's generation */
682 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
683 &ee->element_hash,
684 &op->state->my_xor);
685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686 "Initial full initialization of my_elements, adding %s:%u\n",
687 GNUNET_h2s (&ee->element_hash),
688 ee->element.size);
689 GNUNET_break (GNUNET_YES ==
690 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
691 &ee->element_hash,
692 ee,
693 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
694 return GNUNET_YES;
695}
696
697
698/**
699 * Send our element count to the peer, in case our element count is
700 * lower than his.
701 *
702 * @param op intersection operation
588 */ 703 */
589static void 704static void
590send_peer_done (struct Operation *op) 705send_element_count (struct Operation *op)
591{ 706{
592 struct GNUNET_MQ_Envelope *ev; 707 struct GNUNET_MQ_Envelope *ev;
593 struct IntersectionDoneMessage *idm; 708 struct IntersectionElementInfoMessage *msg;
594 709
595 op->state->phase = PHASE_FINISHED;
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
597 "Intersection succeeded, sending DONE\n"); 711 "Sending our element count (%u)\n",
598 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); 712 op->state->my_element_count);
599 op->state->local_bf = NULL; 713 ev = GNUNET_MQ_msg (msg,
714 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
715 msg->sender_element_count = htonl (op->state->my_element_count);
716 GNUNET_MQ_send (op->mq, ev);
717}
600 718
601 ev = GNUNET_MQ_msg (idm, 719
602 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); 720/**
603 idm->final_element_count = htonl (op->state->my_element_count); 721 * We go first, initialize our map with all elements and
604 idm->element_xor_hash = op->state->my_xor; 722 * send the first Bloom filter.
605 GNUNET_MQ_send (op->mq, 723 *
606 ev); 724 * @param op operation to start exchange for
725 */
726static void
727begin_bf_exchange (struct Operation *op)
728{
729 op->state->phase = PHASE_BF_EXCHANGE;
730 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
731 &initialize_map_unfiltered,
732 op);
733 send_bloomfilter (op);
734}
735
736
737/**
738 * Handle the initial `struct IntersectionElementInfoMessage` from a
739 * remote peer.
740 *
741 * @param cls the intersection operation
742 * @param mh the header of the message
743 */
744void
745handle_intersection_p2p_element_info (void *cls,
746 const struct IntersectionElementInfoMessage *msg)
747{
748 struct Operation *op = cls;
749
750 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
751 {
752 GNUNET_break_op (0);
753 fail_intersection_operation(op);
754 return;
755 }
756 op->remote_element_count = ntohl (msg->sender_element_count);
757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
758 "Received remote element count (%u), I have %u\n",
759 op->remote_element_count,
760 op->state->my_element_count);
761 if ( ( (PHASE_INITIAL != op->state->phase) &&
762 (PHASE_COUNT_SENT != op->state->phase) ) ||
763 (op->state->my_element_count > op->remote_element_count) ||
764 (0 == op->state->my_element_count) ||
765 (0 == op->remote_element_count) )
766 {
767 GNUNET_break_op (0);
768 fail_intersection_operation(op);
769 return;
770 }
771 GNUNET_break (NULL == op->state->remote_bf);
772 begin_bf_exchange (op);
773 GNUNET_CADET_receive_done (op->channel);
607} 774}
608 775
609 776
@@ -618,9 +785,9 @@ process_bf (struct Operation *op)
618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", 786 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
620 op->state->phase, 787 op->state->phase,
621 op->spec->remote_element_count, 788 op->remote_element_count,
622 op->state->my_element_count, 789 op->state->my_element_count,
623 GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); 790 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
624 switch (op->state->phase) 791 switch (op->state->phase)
625 { 792 {
626 case PHASE_INITIAL: 793 case PHASE_INITIAL:
@@ -631,7 +798,7 @@ process_bf (struct Operation *op)
631 /* This is the first BF being sent, build our initial map with 798 /* This is the first BF being sent, build our initial map with
632 filtering in place */ 799 filtering in place */
633 op->state->my_element_count = 0; 800 op->state->my_element_count = 0;
634 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, 801 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
635 &filtered_map_initialization, 802 &filtered_map_initialization,
636 op); 803 op);
637 break; 804 break;
@@ -641,6 +808,14 @@ process_bf (struct Operation *op)
641 &iterator_bf_reduce, 808 &iterator_bf_reduce,
642 op); 809 op);
643 break; 810 break;
811 case PHASE_MUST_SEND_DONE:
812 GNUNET_break_op (0);
813 fail_intersection_operation(op);
814 return;
815 case PHASE_DONE_RECEIVED:
816 GNUNET_break_op (0);
817 fail_intersection_operation(op);
818 return;
644 case PHASE_FINISHED: 819 case PHASE_FINISHED:
645 GNUNET_break_op (0); 820 GNUNET_break_op (0);
646 fail_intersection_operation(op); 821 fail_intersection_operation(op);
@@ -650,13 +825,28 @@ process_bf (struct Operation *op)
650 op->state->remote_bf = NULL; 825 op->state->remote_bf = NULL;
651 826
652 if ( (0 == op->state->my_element_count) || /* fully disjoint */ 827 if ( (0 == op->state->my_element_count) || /* fully disjoint */
653 ( (op->state->my_element_count == op->spec->remote_element_count) && 828 ( (op->state->my_element_count == op->remote_element_count) &&
654 (0 == memcmp (&op->state->my_xor, 829 (0 == memcmp (&op->state->my_xor,
655 &op->state->other_xor, 830 &op->state->other_xor,
656 sizeof (struct GNUNET_HashCode))) ) ) 831 sizeof (struct GNUNET_HashCode))) ) )
657 { 832 {
658 /* we are done */ 833 /* we are done */
659 send_peer_done (op); 834 op->state->phase = PHASE_MUST_SEND_DONE;
835 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836 "Intersection succeeded, sending DONE to other peer\n");
837 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
838 op->state->local_bf = NULL;
839 if (GNUNET_SET_RESULT_FULL == op->result_mode)
840 {
841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842 "Sending full result set (%u elements)\n",
843 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
844 op->state->full_result_iter
845 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
846 send_remaining_elements (op);
847 return;
848 }
849 send_p2p_done (op);
660 return; 850 return;
661 } 851 }
662 op->state->phase = PHASE_BF_EXCHANGE; 852 op->state->phase = PHASE_BF_EXCHANGE;
@@ -677,7 +867,7 @@ check_intersection_p2p_bf (void *cls,
677{ 867{
678 struct Operation *op = cls; 868 struct Operation *op = cls;
679 869
680 if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) 870 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
681 { 871 {
682 GNUNET_break_op (0); 872 GNUNET_break_op (0);
683 return GNUNET_SYSERR; 873 return GNUNET_SYSERR;
@@ -727,7 +917,7 @@ handle_intersection_p2p_bf (void *cls,
727 bf_size, 917 bf_size,
728 bf_bits_per_element); 918 bf_bits_per_element);
729 op->state->salt = ntohl (msg->sender_mutator); 919 op->state->salt = ntohl (msg->sender_mutator);
730 op->spec->remote_element_count = ntohl (msg->sender_element_count); 920 op->remote_element_count = ntohl (msg->sender_element_count);
731 process_bf (op); 921 process_bf (op);
732 break; 922 break;
733 } 923 }
@@ -740,7 +930,7 @@ handle_intersection_p2p_bf (void *cls,
740 op->state->bf_bits_per_element = bf_bits_per_element; 930 op->state->bf_bits_per_element = bf_bits_per_element;
741 op->state->bf_data_offset = 0; 931 op->state->bf_data_offset = 0;
742 op->state->salt = ntohl (msg->sender_mutator); 932 op->state->salt = ntohl (msg->sender_mutator);
743 op->spec->remote_element_count = ntohl (msg->sender_element_count); 933 op->remote_element_count = ntohl (msg->sender_element_count);
744 } 934 }
745 else 935 else
746 { 936 {
@@ -749,7 +939,7 @@ handle_intersection_p2p_bf (void *cls,
749 (op->state->bf_bits_per_element != bf_bits_per_element) || 939 (op->state->bf_bits_per_element != bf_bits_per_element) ||
750 (op->state->bf_data_offset + chunk_size > bf_size) || 940 (op->state->bf_data_offset + chunk_size > bf_size) ||
751 (op->state->salt != ntohl (msg->sender_mutator)) || 941 (op->state->salt != ntohl (msg->sender_mutator)) ||
752 (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) 942 (op->remote_element_count != ntohl (msg->sender_element_count)) )
753 { 943 {
754 GNUNET_break_op (0); 944 GNUNET_break_op (0);
755 fail_intersection_operation (op); 945 fail_intersection_operation (op);
@@ -783,147 +973,6 @@ handle_intersection_p2p_bf (void *cls,
783 973
784 974
785/** 975/**
786 * Fills the "my_elements" hashmap with the initial set of
787 * (non-deleted) elements from the set of the specification.
788 *
789 * @param cls closure with the `struct Operation *`
790 * @param key current key code for the element
791 * @param value value in the hash map with the `struct ElementEntry *`
792 * @return #GNUNET_YES (we should continue to iterate)
793 */
794static int
795initialize_map_unfiltered (void *cls,
796 const struct GNUNET_HashCode *key,
797 void *value)
798{
799 struct ElementEntry *ee = value;
800 struct Operation *op = cls;
801
802 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
803 return GNUNET_YES; /* element not live in operation's generation */
804 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
805 &ee->element_hash,
806 &op->state->my_xor);
807 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808 "Initial full initialization of my_elements, adding %s:%u\n",
809 GNUNET_h2s (&ee->element_hash),
810 ee->element.size);
811 GNUNET_break (GNUNET_YES ==
812 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
813 &ee->element_hash,
814 ee,
815 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
816 return GNUNET_YES;
817}
818
819
820/**
821 * Send our element count to the peer, in case our element count is
822 * lower than his.
823 *
824 * @param op intersection operation
825 */
826static void
827send_element_count (struct Operation *op)
828{
829 struct GNUNET_MQ_Envelope *ev;
830 struct IntersectionElementInfoMessage *msg;
831
832 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
833 "Sending our element count (%u)\n",
834 op->state->my_element_count);
835 ev = GNUNET_MQ_msg (msg,
836 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
837 msg->sender_element_count = htonl (op->state->my_element_count);
838 GNUNET_MQ_send (op->mq, ev);
839}
840
841
842/**
843 * We go first, initialize our map with all elements and
844 * send the first Bloom filter.
845 *
846 * @param op operation to start exchange for
847 */
848static void
849begin_bf_exchange (struct Operation *op)
850{
851 op->state->phase = PHASE_BF_EXCHANGE;
852 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
853 &initialize_map_unfiltered,
854 op);
855 send_bloomfilter (op);
856}
857
858
859/**
860 * Handle the initial `struct IntersectionElementInfoMessage` from a
861 * remote peer.
862 *
863 * @param cls the intersection operation
864 * @param mh the header of the message
865 */
866void
867handle_intersection_p2p_element_info (void *cls,
868 const struct IntersectionElementInfoMessage *msg)
869{
870 struct Operation *op = cls;
871
872 if (GNUNET_SET_OPERATION_INTERSECTION != op->operation)
873 {
874 GNUNET_break_op (0);
875 fail_intersection_operation(op);
876 return;
877 }
878 op->spec->remote_element_count = ntohl (msg->sender_element_count);
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "Received remote element count (%u), I have %u\n",
881 op->spec->remote_element_count,
882 op->state->my_element_count);
883 if ( ( (PHASE_INITIAL != op->state->phase) &&
884 (PHASE_COUNT_SENT != op->state->phase) ) ||
885 (op->state->my_element_count > op->spec->remote_element_count) ||
886 (0 == op->state->my_element_count) ||
887 (0 == op->spec->remote_element_count) )
888 {
889 GNUNET_break_op (0);
890 fail_intersection_operation(op);
891 return;
892 }
893 GNUNET_break (NULL == op->state->remote_bf);
894 begin_bf_exchange (op);
895 GNUNET_CADET_receive_done (op->channel);
896}
897
898
899/**
900 * Send a result message to the client indicating that the operation
901 * is over. After the result done message has been sent to the
902 * client, destroy the evaluate operation.
903 *
904 * @param op intersection operation
905 */
906static void
907finish_and_destroy (struct Operation *op)
908{
909 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
910
911 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
912 {
913 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914 "Sending full result set (%u elements)\n",
915 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
916 op->state->full_result_iter
917 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
918 op->keep++;
919 send_remaining_elements (op);
920 return;
921 }
922 send_client_done_and_destroy (op);
923}
924
925
926/**
927 * Remove all elements from our hashmap. 976 * Remove all elements from our hashmap.
928 * 977 *
929 * @param cls closure with the `struct Operation *` 978 * @param cls closure with the `struct Operation *`
@@ -970,10 +1019,10 @@ handle_intersection_p2p_done (void *cls,
970{ 1019{
971 struct Operation *op = cls; 1020 struct Operation *op = cls;
972 1021
973 if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) 1022 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
974 { 1023 {
975 GNUNET_break_op (0); 1024 GNUNET_break_op (0);
976 fail_intersection_operation(op); 1025 fail_intersection_operation (op);
977 return; 1026 return;
978 } 1027 }
979 if (PHASE_BF_EXCHANGE != op->state->phase) 1028 if (PHASE_BF_EXCHANGE != op->state->phase)
@@ -1005,9 +1054,22 @@ handle_intersection_p2p_done (void *cls,
1005 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1054 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006 "Got IntersectionDoneMessage, have %u elements in intersection\n", 1055 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1007 op->state->my_element_count); 1056 op->state->my_element_count);
1008 op->state->phase = PHASE_FINISHED; 1057 op->state->phase = PHASE_DONE_RECEIVED;
1009 finish_and_destroy (op);
1010 GNUNET_CADET_receive_done (op->channel); 1058 GNUNET_CADET_receive_done (op->channel);
1059
1060 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1061 if (GNUNET_SET_RESULT_FULL == op->result_mode)
1062 {
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Sending full result set to client (%u elements)\n",
1065 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1066 op->state->full_result_iter
1067 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1068 send_remaining_elements (op);
1069 return;
1070 }
1071 op->state->phase = PHASE_FINISHED;
1072 send_client_done_and_destroy (op);
1011} 1073}
1012 1074
1013 1075
@@ -1018,24 +1080,16 @@ handle_intersection_p2p_done (void *cls,
1018 * begin the evaluation 1080 * begin the evaluation
1019 * @param opaque_context message to be transmitted to the listener 1081 * @param opaque_context message to be transmitted to the listener
1020 * to convince him to accept, may be NULL 1082 * to convince him to accept, may be NULL
1083 * @return operation-specific state to keep in @a op
1021 */ 1084 */
1022static void 1085static struct OperationState *
1023intersection_evaluate (struct Operation *op, 1086intersection_evaluate (struct Operation *op,
1024 const struct GNUNET_MessageHeader *opaque_context) 1087 const struct GNUNET_MessageHeader *opaque_context)
1025{ 1088{
1089 struct OperationState *state;
1026 struct GNUNET_MQ_Envelope *ev; 1090 struct GNUNET_MQ_Envelope *ev;
1027 struct OperationRequestMessage *msg; 1091 struct OperationRequestMessage *msg;
1028 1092
1029 op->state = GNUNET_new (struct OperationState);
1030 /* we started the operation, thus we have to send the operation request */
1031 op->state->phase = PHASE_INITIAL;
1032 op->state->my_element_count = op->spec->set->state->current_set_element_count;
1033 op->state->my_elements
1034 = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
1035 GNUNET_YES);
1036
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "Initiating intersection operation evaluation\n");
1039 ev = GNUNET_MQ_msg_nested_mh (msg, 1093 ev = GNUNET_MQ_msg_nested_mh (msg,
1040 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1094 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1041 opaque_context); 1095 opaque_context);
@@ -1043,20 +1097,30 @@ intersection_evaluate (struct Operation *op,
1043 { 1097 {
1044 /* the context message is too large!? */ 1098 /* the context message is too large!? */
1045 GNUNET_break (0); 1099 GNUNET_break (0);
1046 GNUNET_SERVICE_client_drop (op->spec->set->client); 1100 return NULL;
1047 return;
1048 } 1101 }
1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103 "Initiating intersection operation evaluation\n");
1104 state = GNUNET_new (struct OperationState);
1105 /* we started the operation, thus we have to send the operation request */
1106 state->phase = PHASE_INITIAL;
1107 state->my_element_count = op->set->state->current_set_element_count;
1108 state->my_elements
1109 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1110 GNUNET_YES);
1111
1049 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); 1112 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1050 msg->element_count = htonl (op->state->my_element_count); 1113 msg->element_count = htonl (state->my_element_count);
1051 GNUNET_MQ_send (op->mq, 1114 GNUNET_MQ_send (op->mq,
1052 ev); 1115 ev);
1053 op->state->phase = PHASE_COUNT_SENT; 1116 state->phase = PHASE_COUNT_SENT;
1054 if (NULL != opaque_context) 1117 if (NULL != opaque_context)
1055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1118 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056 "Sent op request with context message\n"); 1119 "Sent op request with context message\n");
1057 else 1120 else
1058 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1121 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059 "Sent op request without context message\n"); 1122 "Sent op request without context message\n");
1123 return state;
1060} 1124}
1061 1125
1062 1126
@@ -1066,53 +1130,33 @@ intersection_evaluate (struct Operation *op,
1066 * 1130 *
1067 * @param op operation that will be accepted as an intersection operation 1131 * @param op operation that will be accepted as an intersection operation
1068 */ 1132 */
1069static void 1133static struct OperationState *
1070intersection_accept (struct Operation *op) 1134intersection_accept (struct Operation *op)
1071{ 1135{
1136 struct OperationState *state;
1137
1072 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073 "Accepting set intersection operation\n"); 1139 "Accepting set intersection operation\n");
1074 op->state = GNUNET_new (struct OperationState); 1140 state = GNUNET_new (struct OperationState);
1075 op->state->phase = PHASE_INITIAL; 1141 state->phase = PHASE_INITIAL;
1076 op->state->my_element_count 1142 state->my_element_count
1077 = op->spec->set->state->current_set_element_count; 1143 = op->set->state->current_set_element_count;
1078 GNUNET_assert (NULL == op->state->my_elements); 1144 state->my_elements
1079 op->state->my_elements 1145 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1080 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, 1146 op->remote_element_count),
1081 op->spec->remote_element_count),
1082 GNUNET_YES); 1147 GNUNET_YES);
1083 if (op->spec->remote_element_count < op->state->my_element_count) 1148 op->state = state;
1149 if (op->remote_element_count < state->my_element_count)
1084 { 1150 {
1085 /* If the other peer (Alice) has fewer elements than us (Bob), 1151 /* If the other peer (Alice) has fewer elements than us (Bob),
1086 we just send the count as Alice should send the first BF */ 1152 we just send the count as Alice should send the first BF */
1087 send_element_count (op); 1153 send_element_count (op);
1088 op->state->phase = PHASE_COUNT_SENT; 1154 state->phase = PHASE_COUNT_SENT;
1089 return; 1155 return state;
1090 } 1156 }
1091 /* We have fewer elements, so we start with the BF */ 1157 /* We have fewer elements, so we start with the BF */
1092 begin_bf_exchange (op); 1158 begin_bf_exchange (op);
1093} 1159 return state;
1094
1095
1096/**
1097 * Handler for peer-disconnects, notifies the client about the aborted
1098 * operation. If we did not expect anything from the other peer, we
1099 * gracefully terminate the operation.
1100 *
1101 * @param op the destroyed operation
1102 */
1103static void
1104intersection_peer_disconnect (struct Operation *op)
1105{
1106 if (PHASE_FINISHED != op->state->phase)
1107 {
1108 fail_intersection_operation (op);
1109 return;
1110 }
1111 /* the session has already been concluded */
1112 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113 "Other peer disconnected (finished)\n");
1114 if (GNUNET_NO == op->state->client_done_sent)
1115 finish_and_destroy (op);
1116} 1160}
1117 1161
1118 1162
@@ -1215,6 +1259,28 @@ intersection_remove (struct SetState *set_state,
1215 1259
1216 1260
1217/** 1261/**
1262 * Callback for channel death for the intersection operation.
1263 *
1264 * @param op operation that lost the channel
1265 */
1266static void
1267intersection_channel_death (struct Operation *op)
1268{
1269 if (GNUNET_YES == op->state->channel_death_expected)
1270 {
1271 /* oh goodie, we are done! */
1272 send_client_done_and_destroy (op);
1273 }
1274 else
1275 {
1276 /* sorry, channel went down early, too bad. */
1277 _GSS_operation_destroy (op,
1278 GNUNET_YES);
1279 }
1280}
1281
1282
1283/**
1218 * Get the table with implementing functions for set intersection. 1284 * Get the table with implementing functions for set intersection.
1219 * 1285 *
1220 * @return the operation specific VTable 1286 * @return the operation specific VTable
@@ -1229,8 +1295,8 @@ _GSS_intersection_vt ()
1229 .destroy_set = &intersection_set_destroy, 1295 .destroy_set = &intersection_set_destroy,
1230 .evaluate = &intersection_evaluate, 1296 .evaluate = &intersection_evaluate,
1231 .accept = &intersection_accept, 1297 .accept = &intersection_accept,
1232 .peer_disconnect = &intersection_peer_disconnect,
1233 .cancel = &intersection_op_cancel, 1298 .cancel = &intersection_op_cancel,
1299 .channel_death = &intersection_channel_death,
1234 }; 1300 };
1235 1301
1236 return &intersection_vt; 1302 return &intersection_vt;
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 9eaf12fef..fc7e578e6 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -368,9 +368,10 @@ fail_union_operation (struct Operation *op)
368 "union operation failed\n"); 368 "union operation failed\n");
369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371 msg->request_id = htonl (op->spec->client_request_id); 371 msg->request_id = htonl (op->client_request_id);
372 msg->element_type = htons (0); 372 msg->element_type = htons (0);
373 GNUNET_MQ_send (op->spec->set->client_mq, ev); 373 GNUNET_MQ_send (op->set->cs->mq,
374 ev);
374 _GSS_operation_destroy (op, GNUNET_YES); 375 _GSS_operation_destroy (op, GNUNET_YES);
375} 376}
376 377
@@ -401,7 +402,14 @@ get_ibf_key (const struct GNUNET_HashCode *src)
401 */ 402 */
402struct GetElementContext 403struct GetElementContext
403{ 404{
405 /**
406 * FIXME.
407 */
404 struct GNUNET_HashCode hash; 408 struct GNUNET_HashCode hash;
409
410 /**
411 * FIXME.
412 */
405 struct KeyEntry *k; 413 struct KeyEntry *k;
406}; 414};
407 415
@@ -504,6 +512,9 @@ op_register_element (struct Operation *op,
504} 512}
505 513
506 514
515/**
516 * FIXME.
517 */
507static void 518static void
508salt_key (const struct IBF_Key *k_in, 519salt_key (const struct IBF_Key *k_in,
509 uint32_t salt, 520 uint32_t salt,
@@ -517,6 +528,9 @@ salt_key (const struct IBF_Key *k_in,
517} 528}
518 529
519 530
531/**
532 * FIXME.
533 */
520static void 534static void
521unsalt_key (const struct IBF_Key *k_in, 535unsalt_key (const struct IBF_Key *k_in,
522 uint32_t salt, 536 uint32_t salt,
@@ -550,7 +564,9 @@ prepare_ibf_iterator (void *cls,
550 (void *) op, 564 (void *) op,
551 (unsigned long) ke->ibf_key.key_val, 565 (unsigned long) ke->ibf_key.key_val,
552 GNUNET_h2s (&ke->element->element_hash)); 566 GNUNET_h2s (&ke->element->element_hash));
553 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); 567 salt_key (&ke->ibf_key,
568 op->state->salt_send,
569 &salted_key);
554 ibf_insert (op->state->local_ibf, salted_key); 570 ibf_insert (op->state->local_ibf, salted_key);
555 return GNUNET_YES; 571 return GNUNET_YES;
556} 572}
@@ -576,12 +592,14 @@ init_key_to_element_iterator (void *cls,
576 592
577 /* make sure that the element belongs to the set at the time 593 /* make sure that the element belongs to the set at the time
578 * of creating the operation */ 594 * of creating the operation */
579 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) 595 if (GNUNET_NO ==
596 _GSS_is_element_of_operation (ee,
597 op))
580 return GNUNET_YES; 598 return GNUNET_YES;
581
582 GNUNET_assert (GNUNET_NO == ee->remote); 599 GNUNET_assert (GNUNET_NO == ee->remote);
583 600 op_register_element (op,
584 op_register_element (op, ee, GNUNET_NO); 601 ee,
602 GNUNET_NO);
585 return GNUNET_YES; 603 return GNUNET_YES;
586} 604}
587 605
@@ -598,9 +616,11 @@ initialize_key_to_element (struct Operation *op)
598 unsigned int len; 616 unsigned int len;
599 617
600 GNUNET_assert (NULL == op->state->key_to_element); 618 GNUNET_assert (NULL == op->state->key_to_element);
601 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); 619 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
602 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 620 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
603 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); 621 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
622 &init_key_to_element_iterator,
623 op);
604} 624}
605 625
606 626
@@ -707,44 +727,6 @@ send_ibf (struct Operation *op,
707 727
708 728
709/** 729/**
710 * Send a strata estimator to the remote peer.
711 *
712 * @param op the union operation with the remote peer
713 */
714static void
715send_strata_estimator (struct Operation *op)
716{
717 const struct StrataEstimator *se = op->state->se;
718 struct GNUNET_MQ_Envelope *ev;
719 struct StrataEstimatorMessage *strata_msg;
720 char *buf;
721 size_t len;
722 uint16_t type;
723
724 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
725 len = strata_estimator_write (op->state->se,
726 buf);
727 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
728 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729 else
730 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
731 ev = GNUNET_MQ_msg_extra (strata_msg,
732 len,
733 type);
734 GNUNET_memcpy (&strata_msg[1],
735 buf,
736 len);
737 GNUNET_free (buf);
738 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
739 GNUNET_MQ_send (op->mq,
740 ev);
741 op->state->phase = PHASE_EXPECT_IBF;
742 LOG (GNUNET_ERROR_TYPE_DEBUG,
743 "sent SE, expecting IBF\n");
744}
745
746
747/**
748 * Compute the necessary order of an ibf 730 * Compute the necessary order of an ibf
749 * from the size of the symmetric set difference. 731 * from the size of the symmetric set difference.
750 * 732 *
@@ -777,7 +759,7 @@ get_order_from_difference (unsigned int diff)
777 * @return #GNUNET_YES (to continue iterating) 759 * @return #GNUNET_YES (to continue iterating)
778 */ 760 */
779static int 761static int
780send_element_iterator (void *cls, 762send_full_element_iterator (void *cls,
781 const struct GNUNET_HashCode *key, 763 const struct GNUNET_HashCode *key,
782 void *value) 764 void *value)
783{ 765{
@@ -803,16 +785,23 @@ send_element_iterator (void *cls,
803} 785}
804 786
805 787
788/**
789 * Switch to full set transmission for @a op.
790 *
791 * @param op operation to switch to full set transmission.
792 */
806static void 793static void
807send_full_set (struct Operation *op) 794send_full_set (struct Operation *op)
808{ 795{
809 struct GNUNET_MQ_Envelope *ev; 796 struct GNUNET_MQ_Envelope *ev;
810 797
811 op->state->phase = PHASE_FULL_SENDING; 798 op->state->phase = PHASE_FULL_SENDING;
799 LOG (GNUNET_ERROR_TYPE_INFO,
800 "Dedicing to transmit the full set\n");
812 /* FIXME: use a more memory-friendly way of doing this with an 801 /* FIXME: use a more memory-friendly way of doing this with an
813 iterator, just as we do in the non-full case! */ 802 iterator, just as we do in the non-full case! */
814 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, 803 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
815 &send_element_iterator, 804 &send_full_element_iterator,
816 op); 805 op);
817 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 806 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
818 GNUNET_MQ_send (op->mq, 807 GNUNET_MQ_send (op->mq,
@@ -923,15 +912,15 @@ handle_union_p2p_strata_estimator (void *cls,
923 } 912 }
924 } 913 }
925 914
926 if ( (GNUNET_YES == op->spec->byzantine) && 915 if ( (GNUNET_YES == op->byzantine) &&
927 (other_size < op->spec->byzantine_lower_bound) ) 916 (other_size < op->byzantine_lower_bound) )
928 { 917 {
929 GNUNET_break (0); 918 GNUNET_break (0);
930 fail_union_operation (op); 919 fail_union_operation (op);
931 return; 920 return;
932 } 921 }
933 922
934 if ( (GNUNET_YES == op->spec->force_full) || 923 if ( (GNUNET_YES == op->force_full) ||
935 (diff > op->state->initial_size / 4) || 924 (diff > op->state->initial_size / 4) ||
936 (0 == other_size) ) 925 (0 == other_size) )
937 { 926 {
@@ -1058,14 +1047,16 @@ decode_and_send (struct Operation *op)
1058 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); 1047 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1059 1048
1060 if (GNUNET_OK != 1049 if (GNUNET_OK !=
1061 prepare_ibf (op, op->state->remote_ibf->size)) 1050 prepare_ibf (op,
1051 op->state->remote_ibf->size))
1062 { 1052 {
1063 GNUNET_break (0); 1053 GNUNET_break (0);
1064 /* allocation failed */ 1054 /* allocation failed */
1065 return GNUNET_SYSERR; 1055 return GNUNET_SYSERR;
1066 } 1056 }
1067 diff_ibf = ibf_dup (op->state->local_ibf); 1057 diff_ibf = ibf_dup (op->state->local_ibf);
1068 ibf_subtract (diff_ibf, op->state->remote_ibf); 1058 ibf_subtract (diff_ibf,
1059 op->state->remote_ibf);
1069 1060
1070 ibf_destroy (op->state->remote_ibf); 1061 ibf_destroy (op->state->remote_ibf);
1071 op->state->remote_ibf = NULL; 1062 op->state->remote_ibf = NULL;
@@ -1162,8 +1153,12 @@ decode_and_send (struct Operation *op)
1162 if (1 == side) 1153 if (1 == side)
1163 { 1154 {
1164 struct IBF_Key unsalted_key; 1155 struct IBF_Key unsalted_key;
1165 unsalt_key (&key, op->state->salt_receive, &unsalted_key); 1156
1166 send_offers_for_key (op, unsalted_key); 1157 unsalt_key (&key,
1158 op->state->salt_receive,
1159 &unsalted_key);
1160 send_offers_for_key (op,
1161 unsalted_key);
1167 } 1162 }
1168 else if (-1 == side) 1163 else if (-1 == side)
1169 { 1164 {
@@ -1211,7 +1206,7 @@ check_union_p2p_ibf (void *cls,
1211 struct Operation *op = cls; 1206 struct Operation *op = cls;
1212 unsigned int buckets_in_message; 1207 unsigned int buckets_in_message;
1213 1208
1214 if (GNUNET_SET_OPERATION_UNION != op->operation) 1209 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1215 { 1210 {
1216 GNUNET_break_op (0); 1211 GNUNET_break_op (0);
1217 return GNUNET_SYSERR; 1212 return GNUNET_SYSERR;
@@ -1304,6 +1299,8 @@ handle_union_p2p_ibf (void *cls,
1304 else 1299 else
1305 { 1300 {
1306 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); 1301 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1302 LOG (GNUNET_ERROR_TYPE_INFO,
1303 "Received more of IBF\n");
1307 } 1304 }
1308 GNUNET_assert (NULL != op->state->remote_ibf); 1305 GNUNET_assert (NULL != op->state->remote_ibf);
1309 1306
@@ -1351,7 +1348,7 @@ send_client_element (struct Operation *op,
1351 LOG (GNUNET_ERROR_TYPE_DEBUG, 1348 LOG (GNUNET_ERROR_TYPE_DEBUG,
1352 "sending element (size %u) to client\n", 1349 "sending element (size %u) to client\n",
1353 element->size); 1350 element->size);
1354 GNUNET_assert (0 != op->spec->client_request_id); 1351 GNUNET_assert (0 != op->client_request_id);
1355 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 1352 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1356 if (NULL == ev) 1353 if (NULL == ev)
1357 { 1354 {
@@ -1360,11 +1357,14 @@ send_client_element (struct Operation *op,
1360 return; 1357 return;
1361 } 1358 }
1362 rm->result_status = htons (status); 1359 rm->result_status = htons (status);
1363 rm->request_id = htonl (op->spec->client_request_id); 1360 rm->request_id = htonl (op->client_request_id);
1364 rm->element_type = htons (element->element_type); 1361 rm->element_type = htons (element->element_type);
1365 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); 1362 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1366 GNUNET_memcpy (&rm[1], element->data, element->size); 1363 GNUNET_memcpy (&rm[1],
1367 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1364 element->data,
1365 element->size);
1366 GNUNET_MQ_send (op->set->cs->mq,
1367 ev);
1368} 1368}
1369 1369
1370 1370
@@ -1381,14 +1381,19 @@ send_done_and_destroy (void *cls)
1381 struct GNUNET_MQ_Envelope *ev; 1381 struct GNUNET_MQ_Envelope *ev;
1382 struct GNUNET_SET_ResultMessage *rm; 1382 struct GNUNET_SET_ResultMessage *rm;
1383 1383
1384 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1384 LOG (GNUNET_ERROR_TYPE_INFO,
1385 rm->request_id = htonl (op->spec->client_request_id); 1385 "Signalling client that union operation is done\n");
1386 ev = GNUNET_MQ_msg (rm,
1387 GNUNET_MESSAGE_TYPE_SET_RESULT);
1388 rm->request_id = htonl (op->client_request_id);
1386 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1389 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1387 rm->element_type = htons (0); 1390 rm->element_type = htons (0);
1388 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); 1391 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1389 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1392 GNUNET_MQ_send (op->set->cs->mq,
1393 ev);
1390 /* Will also call the union-specific cancel function. */ 1394 /* Will also call the union-specific cancel function. */
1391 _GSS_operation_destroy (op, GNUNET_YES); 1395 _GSS_operation_destroy (op,
1396 GNUNET_YES);
1392} 1397}
1393 1398
1394 1399
@@ -1415,8 +1420,8 @@ maybe_finish (struct Operation *op)
1415 1420
1416 op->state->phase = PHASE_DONE; 1421 op->state->phase = PHASE_DONE;
1417 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); 1422 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1418 GNUNET_MQ_send (op->mq, ev); 1423 GNUNET_MQ_send (op->mq,
1419 1424 ev);
1420 /* We now wait until the other peer closes the channel 1425 /* We now wait until the other peer closes the channel
1421 * after it got all elements from us. */ 1426 * after it got all elements from us. */
1422 } 1427 }
@@ -1447,7 +1452,7 @@ check_union_p2p_elements (void *cls,
1447{ 1452{
1448 struct Operation *op = cls; 1453 struct Operation *op = cls;
1449 1454
1450 if (GNUNET_SET_OPERATION_UNION != op->operation) 1455 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1451 { 1456 {
1452 GNUNET_break_op (0); 1457 GNUNET_break_op (0);
1453 return GNUNET_SYSERR; 1458 return GNUNET_SYSERR;
@@ -1535,7 +1540,7 @@ handle_union_p2p_elements (void *cls,
1535 op->state->received_fresh++; 1540 op->state->received_fresh++;
1536 op_register_element (op, ee, GNUNET_YES); 1541 op_register_element (op, ee, GNUNET_YES);
1537 /* only send results immediately if the client wants it */ 1542 /* only send results immediately if the client wants it */
1538 switch (op->spec->result_mode) 1543 switch (op->result_mode)
1539 { 1544 {
1540 case GNUNET_SET_RESULT_ADDED: 1545 case GNUNET_SET_RESULT_ADDED:
1541 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); 1546 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
@@ -1575,7 +1580,7 @@ check_union_p2p_full_element (void *cls,
1575{ 1580{
1576 struct Operation *op = cls; 1581 struct Operation *op = cls;
1577 1582
1578 if (GNUNET_SET_OPERATION_UNION != op->operation) 1583 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1579 { 1584 {
1580 GNUNET_break_op (0); 1585 GNUNET_break_op (0);
1581 return GNUNET_SYSERR; 1586 return GNUNET_SYSERR;
@@ -1644,7 +1649,7 @@ handle_union_p2p_full_element (void *cls,
1644 op->state->received_fresh++; 1649 op->state->received_fresh++;
1645 op_register_element (op, ee, GNUNET_YES); 1650 op_register_element (op, ee, GNUNET_YES);
1646 /* only send results immediately if the client wants it */ 1651 /* only send results immediately if the client wants it */
1647 switch (op->spec->result_mode) 1652 switch (op->result_mode)
1648 { 1653 {
1649 case GNUNET_SET_RESULT_ADDED: 1654 case GNUNET_SET_RESULT_ADDED:
1650 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); 1655 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
@@ -1659,7 +1664,7 @@ handle_union_p2p_full_element (void *cls,
1659 } 1664 }
1660 } 1665 }
1661 1666
1662 if ( (GNUNET_YES == op->spec->byzantine) && 1667 if ( (GNUNET_YES == op->byzantine) &&
1663 (op->state->received_total > 384 + op->state->received_fresh * 4) && 1668 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1664 (op->state->received_fresh < op->state->received_total / 6) ) 1669 (op->state->received_fresh < op->state->received_total / 6) )
1665 { 1670 {
@@ -1690,7 +1695,7 @@ check_union_p2p_inquiry (void *cls,
1690 struct Operation *op = cls; 1695 struct Operation *op = cls;
1691 unsigned int num_keys; 1696 unsigned int num_keys;
1692 1697
1693 if (GNUNET_SET_OPERATION_UNION != op->operation) 1698 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1694 { 1699 {
1695 GNUNET_break_op (0); 1700 GNUNET_break_op (0);
1696 return GNUNET_SYSERR; 1701 return GNUNET_SYSERR;
@@ -1727,6 +1732,8 @@ handle_union_p2p_inquiry (void *cls,
1727 const struct IBF_Key *ibf_key; 1732 const struct IBF_Key *ibf_key;
1728 unsigned int num_keys; 1733 unsigned int num_keys;
1729 1734
1735 LOG (GNUNET_ERROR_TYPE_INFO,
1736 "Received union inquiry\n");
1730 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) 1737 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1731 / sizeof (struct IBF_Key); 1738 / sizeof (struct IBF_Key);
1732 ibf_key = (const struct IBF_Key *) &msg[1]; 1739 ibf_key = (const struct IBF_Key *) &msg[1];
@@ -1734,8 +1741,11 @@ handle_union_p2p_inquiry (void *cls,
1734 { 1741 {
1735 struct IBF_Key unsalted_key; 1742 struct IBF_Key unsalted_key;
1736 1743
1737 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); 1744 unsalt_key (ibf_key,
1738 send_offers_for_key (op, unsalted_key); 1745 ntohl (msg->salt),
1746 &unsalted_key);
1747 send_offers_for_key (op,
1748 unsalted_key);
1739 ibf_key++; 1749 ibf_key++;
1740 } 1750 }
1741 GNUNET_CADET_receive_done (op->channel); 1751 GNUNET_CADET_receive_done (op->channel);
@@ -1753,9 +1763,9 @@ handle_union_p2p_inquiry (void *cls,
1753 * #GNUNET_NO if not. 1763 * #GNUNET_NO if not.
1754 */ 1764 */
1755static int 1765static int
1756send_missing_elements_iter (void *cls, 1766send_missing_full_elements_iter (void *cls,
1757 uint32_t key, 1767 uint32_t key,
1758 void *value) 1768 void *value)
1759{ 1769{
1760 struct Operation *op = cls; 1770 struct Operation *op = cls;
1761 struct KeyEntry *ke = value; 1771 struct KeyEntry *ke = value;
@@ -1765,13 +1775,15 @@ send_missing_elements_iter (void *cls,
1765 1775
1766 if (GNUNET_YES == ke->received) 1776 if (GNUNET_YES == ke->received)
1767 return GNUNET_YES; 1777 return GNUNET_YES;
1768 1778 ev = GNUNET_MQ_msg_extra (emsg,
1769 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); 1779 ee->element.size,
1770 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); 1780 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1771 emsg->reserved = htons (0); 1781 GNUNET_memcpy (&emsg[1],
1782 ee->element.data,
1783 ee->element.size);
1772 emsg->element_type = htons (ee->element.element_type); 1784 emsg->element_type = htons (ee->element.element_type);
1773 GNUNET_MQ_send (op->mq, ev); 1785 GNUNET_MQ_send (op->mq,
1774 1786 ev);
1775 return GNUNET_YES; 1787 return GNUNET_YES;
1776} 1788}
1777 1789
@@ -1790,7 +1802,7 @@ handle_union_p2p_request_full (void *cls,
1790 1802
1791 LOG (GNUNET_ERROR_TYPE_INFO, 1803 LOG (GNUNET_ERROR_TYPE_INFO,
1792 "Received request for full set transmission\n"); 1804 "Received request for full set transmission\n");
1793 if (GNUNET_SET_OPERATION_UNION != op->operation) 1805 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1794 { 1806 {
1795 GNUNET_break_op (0); 1807 GNUNET_break_op (0);
1796 fail_union_operation (op); 1808 fail_union_operation (op);
@@ -1833,11 +1845,15 @@ handle_union_p2p_full_done (void *cls,
1833 1845
1834 /* send all the elements that did not come from the remote peer */ 1846 /* send all the elements that did not come from the remote peer */
1835 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 1847 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1836 &send_missing_elements_iter, 1848 &send_missing_full_elements_iter,
1837 op); 1849 op);
1838 1850
1839 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 1851 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1840 GNUNET_MQ_send (op->mq, ev); 1852 GNUNET_MQ_notify_sent (ev,
1853 &send_done_and_destroy,
1854 op);
1855 GNUNET_MQ_send (op->mq,
1856 ev);
1841 op->state->phase = PHASE_DONE; 1857 op->state->phase = PHASE_DONE;
1842 /* we now wait until the other peer shuts the tunnel down*/ 1858 /* we now wait until the other peer shuts the tunnel down*/
1843 } 1859 }
@@ -1880,7 +1896,7 @@ check_union_p2p_demand (void *cls,
1880 struct Operation *op = cls; 1896 struct Operation *op = cls;
1881 unsigned int num_hashes; 1897 unsigned int num_hashes;
1882 1898
1883 if (GNUNET_SET_OPERATION_UNION != op->operation) 1899 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1884 { 1900 {
1885 GNUNET_break_op (0); 1901 GNUNET_break_op (0);
1886 return GNUNET_SYSERR; 1902 return GNUNET_SYSERR;
@@ -1921,7 +1937,7 @@ handle_union_p2p_demand (void *cls,
1921 num_hashes > 0; 1937 num_hashes > 0;
1922 hash++, num_hashes--) 1938 hash++, num_hashes--)
1923 { 1939 {
1924 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, 1940 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1925 hash); 1941 hash);
1926 if (NULL == ee) 1942 if (NULL == ee)
1927 { 1943 {
@@ -1952,7 +1968,7 @@ handle_union_p2p_demand (void *cls,
1952 1, 1968 1,
1953 GNUNET_NO); 1969 GNUNET_NO);
1954 1970
1955 switch (op->spec->result_mode) 1971 switch (op->result_mode)
1956 { 1972 {
1957 case GNUNET_SET_RESULT_ADDED: 1973 case GNUNET_SET_RESULT_ADDED:
1958 /* Nothing to do. */ 1974 /* Nothing to do. */
@@ -1984,7 +2000,7 @@ check_union_p2p_offer (void *cls,
1984 struct Operation *op = cls; 2000 struct Operation *op = cls;
1985 unsigned int num_hashes; 2001 unsigned int num_hashes;
1986 2002
1987 if (GNUNET_SET_OPERATION_UNION != op->operation) 2003 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1988 { 2004 {
1989 GNUNET_break_op (0); 2005 GNUNET_break_op (0);
1990 return GNUNET_SYSERR; 2006 return GNUNET_SYSERR;
@@ -1998,8 +2014,8 @@ check_union_p2p_offer (void *cls,
1998 } 2014 }
1999 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 2015 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2000 / sizeof (struct GNUNET_HashCode); 2016 / sizeof (struct GNUNET_HashCode);
2001 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 2017 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2002 != num_hashes * sizeof (struct GNUNET_HashCode)) 2018 num_hashes * sizeof (struct GNUNET_HashCode))
2003 { 2019 {
2004 GNUNET_break_op (0); 2020 GNUNET_break_op (0);
2005 return GNUNET_SYSERR; 2021 return GNUNET_SYSERR;
@@ -2033,7 +2049,7 @@ handle_union_p2p_offer (void *cls,
2033 struct GNUNET_MessageHeader *demands; 2049 struct GNUNET_MessageHeader *demands;
2034 struct GNUNET_MQ_Envelope *ev; 2050 struct GNUNET_MQ_Envelope *ev;
2035 2051
2036 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, 2052 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2037 hash); 2053 hash);
2038 if (NULL != ee) 2054 if (NULL != ee)
2039 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) 2055 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
@@ -2060,7 +2076,9 @@ handle_union_p2p_offer (void *cls,
2060 ev = GNUNET_MQ_msg_header_extra (demands, 2076 ev = GNUNET_MQ_msg_header_extra (demands,
2061 sizeof (struct GNUNET_HashCode), 2077 sizeof (struct GNUNET_HashCode),
2062 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); 2078 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2063 *(struct GNUNET_HashCode *) &demands[1] = *hash; 2079 GNUNET_memcpy (&demands[1],
2080 hash,
2081 sizeof (struct GNUNET_HashCode));
2064 GNUNET_MQ_send (op->mq, ev); 2082 GNUNET_MQ_send (op->mq, ev);
2065 } 2083 }
2066 GNUNET_CADET_receive_done (op->channel); 2084 GNUNET_CADET_receive_done (op->channel);
@@ -2079,7 +2097,7 @@ handle_union_p2p_done (void *cls,
2079{ 2097{
2080 struct Operation *op = cls; 2098 struct Operation *op = cls;
2081 2099
2082 if (GNUNET_SET_OPERATION_UNION != op->operation) 2100 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2083 { 2101 {
2084 GNUNET_break_op (0); 2102 GNUNET_break_op (0);
2085 fail_union_operation (op); 2103 fail_union_operation (op);
@@ -2134,21 +2152,31 @@ handle_union_p2p_done (void *cls,
2134 * @param opaque_context message to be transmitted to the listener 2152 * @param opaque_context message to be transmitted to the listener
2135 * to convince him to accept, may be NULL 2153 * to convince him to accept, may be NULL
2136 */ 2154 */
2137static void 2155static struct OperationState *
2138union_evaluate (struct Operation *op, 2156union_evaluate (struct Operation *op,
2139 const struct GNUNET_MessageHeader *opaque_context) 2157 const struct GNUNET_MessageHeader *opaque_context)
2140{ 2158{
2159 struct OperationState *state;
2141 struct GNUNET_MQ_Envelope *ev; 2160 struct GNUNET_MQ_Envelope *ev;
2142 struct OperationRequestMessage *msg; 2161 struct OperationRequestMessage *msg;
2143 2162
2144 GNUNET_assert (NULL == op->state); 2163 ev = GNUNET_MQ_msg_nested_mh (msg,
2145 op->state = GNUNET_new (struct OperationState); 2164 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2146 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); 2165 opaque_context);
2166 if (NULL == ev)
2167 {
2168 /* the context message is too large */
2169 GNUNET_break (0);
2170 return NULL;
2171 }
2172 state = GNUNET_new (struct OperationState);
2173 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2174 GNUNET_NO);
2147 /* copy the current generation's strata estimator for this operation */ 2175 /* copy the current generation's strata estimator for this operation */
2148 op->state->se = strata_estimator_dup (op->spec->set->state->se); 2176 state->se = strata_estimator_dup (op->set->state->se);
2149 /* we started the operation, thus we have to send the operation request */ 2177 /* we started the operation, thus we have to send the operation request */
2150 op->state->phase = PHASE_EXPECT_SE; 2178 state->phase = PHASE_EXPECT_SE;
2151 op->state->salt_receive = op->state->salt_send = 42; 2179 state->salt_receive = state->salt_send = 42; // FIXME?????
2152 LOG (GNUNET_ERROR_TYPE_DEBUG, 2180 LOG (GNUNET_ERROR_TYPE_DEBUG,
2153 "Initiating union operation evaluation\n"); 2181 "Initiating union operation evaluation\n");
2154 GNUNET_STATISTICS_update (_GSS_statistics, 2182 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -2159,16 +2187,6 @@ union_evaluate (struct Operation *op,
2159 "# of initiated union operations", 2187 "# of initiated union operations",
2160 1, 2188 1,
2161 GNUNET_NO); 2189 GNUNET_NO);
2162 ev = GNUNET_MQ_msg_nested_mh (msg,
2163 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2164 opaque_context);
2165 if (NULL == ev)
2166 {
2167 /* the context message is too large */
2168 GNUNET_break (0);
2169 GNUNET_SERVICE_client_drop (op->spec->set->client);
2170 return;
2171 }
2172 msg->operation = htonl (GNUNET_SET_OPERATION_UNION); 2190 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2173 GNUNET_MQ_send (op->mq, 2191 GNUNET_MQ_send (op->mq,
2174 ev); 2192 ev);
@@ -2180,8 +2198,10 @@ union_evaluate (struct Operation *op,
2180 LOG (GNUNET_ERROR_TYPE_DEBUG, 2198 LOG (GNUNET_ERROR_TYPE_DEBUG,
2181 "sent op request without context message\n"); 2199 "sent op request without context message\n");
2182 2200
2201 op->state = state;
2183 initialize_key_to_element (op); 2202 initialize_key_to_element (op);
2184 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); 2203 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2204 return state;
2185} 2205}
2186 2206
2187 2207
@@ -2191,13 +2211,19 @@ union_evaluate (struct Operation *op,
2191 * 2211 *
2192 * @param op operation that will be accepted as a union operation 2212 * @param op operation that will be accepted as a union operation
2193 */ 2213 */
2194static void 2214static struct OperationState *
2195union_accept (struct Operation *op) 2215union_accept (struct Operation *op)
2196{ 2216{
2217 struct OperationState *state;
2218 const struct StrataEstimator *se;
2219 struct GNUNET_MQ_Envelope *ev;
2220 struct StrataEstimatorMessage *strata_msg;
2221 char *buf;
2222 size_t len;
2223 uint16_t type;
2224
2197 LOG (GNUNET_ERROR_TYPE_DEBUG, 2225 LOG (GNUNET_ERROR_TYPE_DEBUG,
2198 "accepting set union operation\n"); 2226 "accepting set union operation\n");
2199 GNUNET_assert (NULL == op->state);
2200
2201 GNUNET_STATISTICS_update (_GSS_statistics, 2227 GNUNET_STATISTICS_update (_GSS_statistics,
2202 "# of accepted union operations", 2228 "# of accepted union operations",
2203 1, 2229 1,
@@ -2207,14 +2233,37 @@ union_accept (struct Operation *op)
2207 1, 2233 1,
2208 GNUNET_NO); 2234 GNUNET_NO);
2209 2235
2210 op->state = GNUNET_new (struct OperationState); 2236 state = GNUNET_new (struct OperationState);
2211 op->state->se = strata_estimator_dup (op->spec->set->state->se); 2237 state->se = strata_estimator_dup (op->set->state->se);
2212 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); 2238 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2213 op->state->salt_receive = op->state->salt_send = 42; 2239 GNUNET_NO);
2240 state->salt_receive = state->salt_send = 42; // FIXME?????
2241 op->state = state;
2214 initialize_key_to_element (op); 2242 initialize_key_to_element (op);
2215 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); 2243 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2244
2216 /* kick off the operation */ 2245 /* kick off the operation */
2217 send_strata_estimator (op); 2246 se = state->se;
2247 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2248 len = strata_estimator_write (se,
2249 buf);
2250 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2251 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2252 else
2253 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2254 ev = GNUNET_MQ_msg_extra (strata_msg,
2255 len,
2256 type);
2257 GNUNET_memcpy (&strata_msg[1],
2258 buf,
2259 len);
2260 GNUNET_free (buf);
2261 strata_msg->set_size
2262 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2263 GNUNET_MQ_send (op->mq,
2264 ev);
2265 state->phase = PHASE_EXPECT_IBF;
2266 return state;
2218} 2267}
2219 2268
2220 2269
@@ -2254,7 +2303,8 @@ union_set_create (void)
2254 * @param ee the element to add to the set 2303 * @param ee the element to add to the set
2255 */ 2304 */
2256static void 2305static void
2257union_add (struct SetState *set_state, struct ElementEntry *ee) 2306union_add (struct SetState *set_state,
2307 struct ElementEntry *ee)
2258{ 2308{
2259 strata_estimator_insert (set_state->se, 2309 strata_estimator_insert (set_state->se,
2260 get_ibf_key (&ee->element_hash)); 2310 get_ibf_key (&ee->element_hash));
@@ -2269,7 +2319,8 @@ union_add (struct SetState *set_state, struct ElementEntry *ee)
2269 * @param ee set element to remove 2319 * @param ee set element to remove
2270 */ 2320 */
2271static void 2321static void
2272union_remove (struct SetState *set_state, struct ElementEntry *ee) 2322union_remove (struct SetState *set_state,
2323 struct ElementEntry *ee)
2273{ 2324{
2274 strata_estimator_remove (set_state->se, 2325 strata_estimator_remove (set_state->se,
2275 get_ibf_key (&ee->element_hash)); 2326 get_ibf_key (&ee->element_hash));
@@ -2294,61 +2345,39 @@ union_set_destroy (struct SetState *set_state)
2294 2345
2295 2346
2296/** 2347/**
2297 * Handler for peer-disconnects, notifies the client
2298 * about the aborted operation in case the op was not concluded.
2299 *
2300 * @param op the destroyed operation
2301 */
2302static void
2303union_peer_disconnect (struct Operation *op)
2304{
2305 if (PHASE_DONE != op->state->phase)
2306 {
2307 struct GNUNET_MQ_Envelope *ev;
2308 struct GNUNET_SET_ResultMessage *msg;
2309
2310 ev = GNUNET_MQ_msg (msg,
2311 GNUNET_MESSAGE_TYPE_SET_RESULT);
2312 msg->request_id = htonl (op->spec->client_request_id);
2313 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2314 msg->element_type = htons (0);
2315 GNUNET_MQ_send (op->spec->set->client_mq,
2316 ev);
2317 LOG (GNUNET_ERROR_TYPE_WARNING,
2318 "other peer disconnected prematurely, phase %u\n",
2319 op->state->phase);
2320 _GSS_operation_destroy (op,
2321 GNUNET_YES);
2322 return;
2323 }
2324 // else: the session has already been concluded
2325 LOG (GNUNET_ERROR_TYPE_DEBUG,
2326 "other peer disconnected (finished)\n");
2327 if (GNUNET_NO == op->state->client_done_sent)
2328 send_done_and_destroy (op);
2329}
2330
2331
2332/**
2333 * Copy union-specific set state. 2348 * Copy union-specific set state.
2334 * 2349 *
2335 * @param set source set for copying the union state 2350 * @param state source state for copying the union state
2336 * @return a copy of the union-specific set state 2351 * @return a copy of the union-specific set state
2337 */ 2352 */
2338static struct SetState * 2353static struct SetState *
2339union_copy_state (struct Set *set) 2354union_copy_state (struct SetState *state)
2340{ 2355{
2341 struct SetState *new_state; 2356 struct SetState *new_state;
2342 2357
2358 GNUNET_assert ( (NULL != state) &&
2359 (NULL != state->se) );
2343 new_state = GNUNET_new (struct SetState); 2360 new_state = GNUNET_new (struct SetState);
2344 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) ); 2361 new_state->se = strata_estimator_dup (state->se);
2345 new_state->se = strata_estimator_dup (set->state->se);
2346 2362
2347 return new_state; 2363 return new_state;
2348} 2364}
2349 2365
2350 2366
2351/** 2367/**
2368 * Handle case where channel went down for an operation.
2369 *
2370 * @param op operation that lost the channel
2371 */
2372static void
2373union_channel_death (struct Operation *op)
2374{
2375 _GSS_operation_destroy (op,
2376 GNUNET_YES);
2377}
2378
2379
2380/**
2352 * Get the table with implementing functions for 2381 * Get the table with implementing functions for
2353 * set union. 2382 * set union.
2354 * 2383 *
@@ -2364,9 +2393,9 @@ _GSS_union_vt ()
2364 .destroy_set = &union_set_destroy, 2393 .destroy_set = &union_set_destroy,
2365 .evaluate = &union_evaluate, 2394 .evaluate = &union_evaluate,
2366 .accept = &union_accept, 2395 .accept = &union_accept,
2367 .peer_disconnect = &union_peer_disconnect,
2368 .cancel = &union_op_cancel, 2396 .cancel = &union_op_cancel,
2369 .copy_state = &union_copy_state, 2397 .copy_state = &union_copy_state,
2398 .channel_death = &union_channel_death
2370 }; 2399 };
2371 2400
2372 return &union_vt; 2401 return &union_vt;