diff options
Diffstat (limited to 'src/set/gnunet-service-set.c')
-rw-r--r-- | src/set/gnunet-service-set.c | 1574 |
1 files changed, 712 insertions, 862 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 30d43e8a1..12af653c1 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet | 2 | This file is part of GNUnet |
3 | Copyright (C) 2013, 2014, 2017 GNUnet e.V. | 3 | Copyright (C) 2013-2017 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -24,6 +24,8 @@ | |||
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | */ | 25 | */ |
26 | #include "gnunet-service-set.h" | 26 | #include "gnunet-service-set.h" |
27 | #include "gnunet-service-set_union.h" | ||
28 | #include "gnunet-service-set_intersection.h" | ||
27 | #include "gnunet-service-set_protocol.h" | 29 | #include "gnunet-service-set_protocol.h" |
28 | #include "gnunet_statistics_service.h" | 30 | #include "gnunet_statistics_service.h" |
29 | 31 | ||
@@ -33,6 +35,35 @@ | |||
33 | */ | 35 | */ |
34 | #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES | 36 | #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES |
35 | 37 | ||
38 | |||
39 | /** | ||
40 | * Lazy copy requests made by a client. | ||
41 | */ | ||
42 | struct LazyCopyRequest | ||
43 | { | ||
44 | /** | ||
45 | * Kept in a DLL. | ||
46 | */ | ||
47 | struct LazyCopyRequest *prev; | ||
48 | |||
49 | /** | ||
50 | * Kept in a DLL. | ||
51 | */ | ||
52 | struct LazyCopyRequest *next; | ||
53 | |||
54 | /** | ||
55 | * Which set are we supposed to copy? | ||
56 | */ | ||
57 | struct Set *source_set; | ||
58 | |||
59 | /** | ||
60 | * Cookie identifying the request. | ||
61 | */ | ||
62 | uint32_t cookie; | ||
63 | |||
64 | }; | ||
65 | |||
66 | |||
36 | /** | 67 | /** |
37 | * A listener is inhabited by a client, and waits for evaluation | 68 | * A listener is inhabited by a client, and waits for evaluation |
38 | * requests from remote peers. | 69 | * requests from remote peers. |
@@ -50,21 +81,24 @@ struct Listener | |||
50 | struct Listener *prev; | 81 | struct Listener *prev; |
51 | 82 | ||
52 | /** | 83 | /** |
53 | * Client that owns the listener. | 84 | * Head of DLL of operations this listener is responsible for. |
54 | * Only one client may own a listener. | 85 | * Once the client has accepted/declined the operation, the |
86 | * operation is moved to the respective set's operation DLLS. | ||
55 | */ | 87 | */ |
56 | struct GNUNET_SERVICE_Client *client; | 88 | struct Operation *op_head; |
57 | 89 | ||
58 | /** | 90 | /** |
59 | * Message queue for the client | 91 | * Tail of DLL of operations this listener is responsible for. |
92 | * Once the client has accepted/declined the operation, the | ||
93 | * operation is moved to the respective set's operation DLLS. | ||
60 | */ | 94 | */ |
61 | struct GNUNET_MQ_Handle *client_mq; | 95 | struct Operation *op_tail; |
62 | 96 | ||
63 | /** | 97 | /** |
64 | * Application ID for the operation, used to distinguish | 98 | * Client that owns the listener. |
65 | * multiple operations of the same type with the same peer. | 99 | * Only one client may own a listener. |
66 | */ | 100 | */ |
67 | struct GNUNET_HashCode app_id; | 101 | struct ClientState *cs; |
68 | 102 | ||
69 | /** | 103 | /** |
70 | * The port we are listening on with CADET. | 104 | * The port we are listening on with CADET. |
@@ -72,27 +106,18 @@ struct Listener | |||
72 | struct GNUNET_CADET_Port *open_port; | 106 | struct GNUNET_CADET_Port *open_port; |
73 | 107 | ||
74 | /** | 108 | /** |
109 | * Application ID for the operation, used to distinguish | ||
110 | * multiple operations of the same type with the same peer. | ||
111 | */ | ||
112 | struct GNUNET_HashCode app_id; | ||
113 | |||
114 | /** | ||
75 | * The type of the operation. | 115 | * The type of the operation. |
76 | */ | 116 | */ |
77 | enum GNUNET_SET_OperationType operation; | 117 | enum GNUNET_SET_OperationType operation; |
78 | }; | 118 | }; |
79 | 119 | ||
80 | 120 | ||
81 | struct LazyCopyRequest | ||
82 | { | ||
83 | struct Set *source_set; | ||
84 | uint32_t cookie; | ||
85 | |||
86 | struct LazyCopyRequest *prev; | ||
87 | struct LazyCopyRequest *next; | ||
88 | }; | ||
89 | |||
90 | |||
91 | /** | ||
92 | * Configuration of our local peer. | ||
93 | */ | ||
94 | static const struct GNUNET_CONFIGURATION_Handle *configuration; | ||
95 | |||
96 | /** | 121 | /** |
97 | * Handle to the cadet service, used to listen for and connect to | 122 | * Handle to the cadet service, used to listen for and connect to |
98 | * remote peers. | 123 | * remote peers. |
@@ -100,96 +125,48 @@ static const struct GNUNET_CONFIGURATION_Handle *configuration; | |||
100 | static struct GNUNET_CADET_Handle *cadet; | 125 | static struct GNUNET_CADET_Handle *cadet; |
101 | 126 | ||
102 | /** | 127 | /** |
103 | * Sets are held in a doubly linked list. | 128 | * DLL of lazy copy requests by this client. |
104 | */ | 129 | */ |
105 | static struct Set *sets_head; | 130 | static struct LazyCopyRequest *lazy_copy_head; |
106 | 131 | ||
107 | /** | 132 | /** |
108 | * Sets are held in a doubly linked list. | 133 | * DLL of lazy copy requests by this client. |
109 | */ | 134 | */ |
110 | static struct Set *sets_tail; | 135 | static struct LazyCopyRequest *lazy_copy_tail; |
111 | 136 | ||
112 | /** | 137 | /** |
113 | * Listeners are held in a doubly linked list. | 138 | * Generator for unique cookie we set per lazy copy request. |
114 | */ | 139 | */ |
115 | static struct Listener *listeners_head; | 140 | static uint32_t lazy_copy_cookie; |
116 | 141 | ||
117 | /** | 142 | /** |
118 | * Listeners are held in a doubly linked list. | 143 | * Statistics handle. |
119 | */ | 144 | */ |
120 | static struct Listener *listeners_tail; | 145 | struct GNUNET_STATISTICS_Handle *_GSS_statistics; |
121 | 146 | ||
122 | /** | 147 | /** |
123 | * Incoming sockets from remote peers are held in a doubly linked | 148 | * Listeners are held in a doubly linked list. |
124 | * list. | ||
125 | */ | 149 | */ |
126 | static struct Operation *incoming_head; | 150 | static struct Listener *listener_head; |
127 | 151 | ||
128 | /** | 152 | /** |
129 | * Incoming sockets from remote peers are held in a doubly linked | 153 | * Listeners are held in a doubly linked list. |
130 | * list. | ||
131 | */ | 154 | */ |
132 | static struct Operation *incoming_tail; | 155 | static struct Listener *listener_tail; |
133 | |||
134 | static struct LazyCopyRequest *lazy_copy_head; | ||
135 | static struct LazyCopyRequest *lazy_copy_tail; | ||
136 | |||
137 | static uint32_t lazy_copy_cookie = 1; | ||
138 | 156 | ||
139 | /** | 157 | /** |
140 | * Counter for allocating unique IDs for clients, used to identify | 158 | * Counter for allocating unique IDs for clients, used to identify |
141 | * incoming operation requests from remote peers, that the client can | 159 | * incoming operation requests from remote peers, that the client can |
142 | * choose to accept or refuse. | 160 | * choose to accept or refuse. 0 must not be used (reserved for |
161 | * uninitialized). | ||
143 | */ | 162 | */ |
144 | static uint32_t suggest_id = 1; | 163 | static uint32_t suggest_id; |
145 | |||
146 | /** | ||
147 | * Statistics handle. | ||
148 | */ | ||
149 | struct GNUNET_STATISTICS_Handle *_GSS_statistics; | ||
150 | |||
151 | |||
152 | /** | ||
153 | * Get set that is owned by the given client, if any. | ||
154 | * | ||
155 | * @param client client to look for | ||
156 | * @return set that the client owns, NULL if the client | ||
157 | * does not own a set | ||
158 | */ | ||
159 | static struct Set * | ||
160 | set_get (struct GNUNET_SERVICE_Client *client) | ||
161 | { | ||
162 | struct Set *set; | ||
163 | |||
164 | for (set = sets_head; NULL != set; set = set->next) | ||
165 | if (set->client == client) | ||
166 | return set; | ||
167 | return NULL; | ||
168 | } | ||
169 | |||
170 | |||
171 | /** | ||
172 | * Get the listener associated with the given client, if any. | ||
173 | * | ||
174 | * @param client the client | ||
175 | * @return listener associated with the client, NULL | ||
176 | * if there isn't any | ||
177 | */ | ||
178 | static struct Listener * | ||
179 | listener_get (struct GNUNET_SERVICE_Client *client) | ||
180 | { | ||
181 | struct Listener *listener; | ||
182 | |||
183 | for (listener = listeners_head; NULL != listener; listener = listener->next) | ||
184 | if (listener->client == client) | ||
185 | return listener; | ||
186 | return NULL; | ||
187 | } | ||
188 | 164 | ||
189 | 165 | ||
190 | /** | 166 | /** |
191 | * Get the incoming socket associated with the given id. | 167 | * Get the incoming socket associated with the given id. |
192 | * | 168 | * |
169 | * @param listener the listener to look in | ||
193 | * @param id id to look for | 170 | * @param id id to look for |
194 | * @return the incoming socket associated with the id, | 171 | * @return the incoming socket associated with the id, |
195 | * or NULL if there is none | 172 | * or NULL if there is none |
@@ -197,43 +174,49 @@ listener_get (struct GNUNET_SERVICE_Client *client) | |||
197 | static struct Operation * | 174 | static struct Operation * |
198 | get_incoming (uint32_t id) | 175 | get_incoming (uint32_t id) |
199 | { | 176 | { |
200 | struct Operation *op; | 177 | for (struct Listener *listener = listener_head; |
201 | 178 | NULL != listener; | |
202 | for (op = incoming_head; NULL != op; op = op->next) | 179 | listener = listener->next) |
203 | if (op->suggest_id == id) | 180 | { |
204 | { | 181 | for (struct Operation *op = listener->op_head; NULL != op; op = op->next) |
205 | GNUNET_assert (GNUNET_YES == op->is_incoming); | 182 | if (op->suggest_id == id) |
206 | return op; | 183 | return op; |
207 | } | 184 | } |
208 | return NULL; | 185 | return NULL; |
209 | } | 186 | } |
210 | 187 | ||
211 | 188 | ||
212 | /** | 189 | /** |
213 | * Destroy a listener, free all resources associated with it. | 190 | * Destroy an incoming request from a remote peer |
214 | * | 191 | * |
215 | * @param listener listener to destroy | 192 | * @param op remote request to destroy |
216 | */ | 193 | */ |
217 | static void | 194 | static void |
218 | listener_destroy (struct Listener *listener) | 195 | incoming_destroy (struct Operation *op) |
219 | { | 196 | { |
220 | /* If the client is not dead yet, destroy it. | 197 | struct Listener *listener; |
221 | * The client's destroy callback will destroy the listener again. */ | 198 | struct GNUNET_CADET_Channel *channel; |
222 | if (NULL != listener->client) | ||
223 | { | ||
224 | struct GNUNET_SERVICE_Client *client = listener->client; | ||
225 | 199 | ||
226 | listener->client = NULL; | 200 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
227 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 201 | "Destroying incoming operation %p\n", |
228 | "Disconnecting listener client\n"); | 202 | op); |
229 | GNUNET_SERVICE_client_drop (client); | 203 | if (NULL != (listener = op->listener)) |
230 | return; | 204 | { |
205 | GNUNET_CONTAINER_DLL_remove (listener->op_head, | ||
206 | listener->op_tail, | ||
207 | op); | ||
208 | op->listener = NULL; | ||
209 | } | ||
210 | if (NULL != op->timeout_task) | ||
211 | { | ||
212 | GNUNET_SCHEDULER_cancel (op->timeout_task); | ||
213 | op->timeout_task = NULL; | ||
214 | } | ||
215 | if (NULL != (channel = op->channel)) | ||
216 | { | ||
217 | op->channel = NULL; | ||
218 | GNUNET_CADET_channel_destroy (channel); | ||
231 | } | 219 | } |
232 | GNUNET_CADET_close_port (listener->open_port); | ||
233 | GNUNET_CONTAINER_DLL_remove (listeners_head, | ||
234 | listeners_tail, | ||
235 | listener); | ||
236 | GNUNET_free (listener); | ||
237 | } | 220 | } |
238 | 221 | ||
239 | 222 | ||
@@ -303,12 +286,11 @@ garbage_collect_cb (void *cls, | |||
303 | static void | 286 | static void |
304 | collect_generation_garbage (struct Set *set) | 287 | collect_generation_garbage (struct Set *set) |
305 | { | 288 | { |
306 | struct Operation *op; | ||
307 | struct GarbageContext gc; | 289 | struct GarbageContext gc; |
308 | 290 | ||
309 | gc.min_op_generation = UINT_MAX; | 291 | gc.min_op_generation = UINT_MAX; |
310 | gc.max_op_generation = 0; | 292 | gc.max_op_generation = 0; |
311 | for (op = set->ops_head; NULL != op; op = op->next) | 293 | for (struct Operation *op = set->ops_head; NULL != op; op = op->next) |
312 | { | 294 | { |
313 | gc.min_op_generation = GNUNET_MIN (gc.min_op_generation, | 295 | gc.min_op_generation = GNUNET_MIN (gc.min_op_generation, |
314 | op->generation_created); | 296 | op->generation_created); |
@@ -322,23 +304,36 @@ collect_generation_garbage (struct Set *set) | |||
322 | } | 304 | } |
323 | 305 | ||
324 | 306 | ||
307 | /** | ||
308 | * Is @a generation in the range of exclusions? | ||
309 | * | ||
310 | * @param generation generation to query | ||
311 | * @param excluded array of generations where the element is excluded | ||
312 | * @param excluded_size length of the @a excluded array | ||
313 | * @return #GNUNET_YES if @a generation is in any of the ranges | ||
314 | */ | ||
325 | static int | 315 | static int |
326 | is_excluded_generation (unsigned int generation, | 316 | is_excluded_generation (unsigned int generation, |
327 | struct GenerationRange *excluded, | 317 | struct GenerationRange *excluded, |
328 | unsigned int excluded_size) | 318 | unsigned int excluded_size) |
329 | { | 319 | { |
330 | unsigned int i; | 320 | for (unsigned int i = 0; i < excluded_size; i++) |
331 | 321 | if ( (generation >= excluded[i].start) && | |
332 | for (i = 0; i < excluded_size; i++) | 322 | (generation < excluded[i].end) ) |
333 | { | ||
334 | if ( (generation >= excluded[i].start) && (generation < excluded[i].end) ) | ||
335 | return GNUNET_YES; | 323 | return GNUNET_YES; |
336 | } | ||
337 | |||
338 | return GNUNET_NO; | 324 | return GNUNET_NO; |
339 | } | 325 | } |
340 | 326 | ||
341 | 327 | ||
328 | /** | ||
329 | * Is element @a ee part of the set during @a query_generation? | ||
330 | * | ||
331 | * @param ee element to test | ||
332 | * @param query_generation generation to query | ||
333 | * @param excluded array of generations where the element is excluded | ||
334 | * @param excluded_size length of the @a excluded array | ||
335 | * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not | ||
336 | */ | ||
342 | static int | 337 | static int |
343 | is_element_of_generation (struct ElementEntry *ee, | 338 | is_element_of_generation (struct ElementEntry *ee, |
344 | unsigned int query_generation, | 339 | unsigned int query_generation, |
@@ -347,11 +342,12 @@ is_element_of_generation (struct ElementEntry *ee, | |||
347 | { | 342 | { |
348 | struct MutationEvent *mut; | 343 | struct MutationEvent *mut; |
349 | int is_present; | 344 | int is_present; |
350 | unsigned int i; | ||
351 | 345 | ||
352 | GNUNET_assert (NULL != ee->mutations); | 346 | GNUNET_assert (NULL != ee->mutations); |
353 | 347 | if (GNUNET_YES == | |
354 | if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size)) | 348 | is_excluded_generation (query_generation, |
349 | excluded, | ||
350 | excluded_size)) | ||
355 | { | 351 | { |
356 | GNUNET_break (0); | 352 | GNUNET_break (0); |
357 | return GNUNET_NO; | 353 | return GNUNET_NO; |
@@ -361,7 +357,7 @@ is_element_of_generation (struct ElementEntry *ee, | |||
361 | 357 | ||
362 | /* Could be made faster with binary search, but lists | 358 | /* Could be made faster with binary search, but lists |
363 | are small, so why bother. */ | 359 | are small, so why bother. */ |
364 | for (i = 0; i < ee->mutations_size; i++) | 360 | for (unsigned int i = 0; i < ee->mutations_size; i++) |
365 | { | 361 | { |
366 | mut = &ee->mutations[i]; | 362 | mut = &ee->mutations[i]; |
367 | 363 | ||
@@ -373,7 +369,10 @@ is_element_of_generation (struct ElementEntry *ee, | |||
373 | continue; | 369 | continue; |
374 | } | 370 | } |
375 | 371 | ||
376 | if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) | 372 | if (GNUNET_YES == |
373 | is_excluded_generation (mut->generation, | ||
374 | excluded, | ||
375 | excluded_size)) | ||
377 | { | 376 | { |
378 | /* The generation is excluded (because it belongs to another | 377 | /* The generation is excluded (because it belongs to another |
379 | fork via a lazy copy) and thus mutations aren't considered | 378 | fork via a lazy copy) and thus mutations aren't considered |
@@ -382,11 +381,12 @@ is_element_of_generation (struct ElementEntry *ee, | |||
382 | } | 381 | } |
383 | 382 | ||
384 | /* This would be an inconsistency in how we manage mutations. */ | 383 | /* This would be an inconsistency in how we manage mutations. */ |
385 | if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) ) | 384 | if ( (GNUNET_YES == is_present) && |
385 | (GNUNET_YES == mut->added) ) | ||
386 | GNUNET_assert (0); | 386 | GNUNET_assert (0); |
387 | |||
388 | /* Likewise. */ | 387 | /* Likewise. */ |
389 | if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) ) | 388 | if ( (GNUNET_NO == is_present) && |
389 | (GNUNET_NO == mut->added) ) | ||
390 | GNUNET_assert (0); | 390 | GNUNET_assert (0); |
391 | 391 | ||
392 | is_present = mut->added; | 392 | is_present = mut->added; |
@@ -396,44 +396,33 @@ is_element_of_generation (struct ElementEntry *ee, | |||
396 | } | 396 | } |
397 | 397 | ||
398 | 398 | ||
399 | int | 399 | /** |
400 | _GSS_is_element_of_set (struct ElementEntry *ee, | 400 | * Is element @a ee part of the set used by @a op? |
401 | struct Set *set) | 401 | * |
402 | { | 402 | * @param ee element to test |
403 | return is_element_of_generation (ee, | 403 | * @param op operation the defines the set and its generation |
404 | set->current_generation, | 404 | * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not |
405 | set->excluded_generations, | 405 | */ |
406 | set->excluded_generations_size); | ||
407 | } | ||
408 | |||
409 | |||
410 | static int | ||
411 | is_element_of_iteration (struct ElementEntry *ee, | ||
412 | struct Set *set) | ||
413 | { | ||
414 | return is_element_of_generation (ee, | ||
415 | set->iter_generation, | ||
416 | set->excluded_generations, | ||
417 | set->excluded_generations_size); | ||
418 | } | ||
419 | |||
420 | |||
421 | int | 406 | int |
422 | _GSS_is_element_of_operation (struct ElementEntry *ee, | 407 | _GSS_is_element_of_operation (struct ElementEntry *ee, |
423 | struct Operation *op) | 408 | struct Operation *op) |
424 | { | 409 | { |
425 | return is_element_of_generation (ee, | 410 | return is_element_of_generation (ee, |
426 | op->generation_created, | 411 | op->generation_created, |
427 | op->spec->set->excluded_generations, | 412 | op->set->excluded_generations, |
428 | op->spec->set->excluded_generations_size); | 413 | op->set->excluded_generations_size); |
429 | } | 414 | } |
430 | 415 | ||
431 | 416 | ||
432 | /** | 417 | /** |
433 | * Destroy the given operation. Call the implementation-specific | 418 | * Destroy the given operation. Used for any operation where both |
434 | * cancel function of the operation. Disconnects from the remote | 419 | * peers were known and that thus actually had a vt and channel. Must |
435 | * peer. Does not disconnect the client, as there may be multiple | 420 | * not be used for operations where 'listener' is still set and we do |
436 | * operations per set. | 421 | * not know the other peer. |
422 | * | ||
423 | * Call the implementation-specific cancel function of the operation. | ||
424 | * Disconnects from the remote peer. Does not disconnect the client, | ||
425 | * as there may be multiple operations per set. | ||
437 | * | 426 | * |
438 | * @param op operation to destroy | 427 | * @param op operation to destroy |
439 | * @param gc #GNUNET_YES to perform garbage collection on the set | 428 | * @param gc #GNUNET_YES to perform garbage collection on the set |
@@ -442,38 +431,39 @@ void | |||
442 | _GSS_operation_destroy (struct Operation *op, | 431 | _GSS_operation_destroy (struct Operation *op, |
443 | int gc) | 432 | int gc) |
444 | { | 433 | { |
445 | struct Set *set; | 434 | struct Set *set = op->set; |
446 | struct GNUNET_CADET_Channel *channel; | 435 | struct GNUNET_CADET_Channel *channel; |
447 | 436 | ||
448 | if (NULL == op->vt) | 437 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
438 | "Destroying operation %p\n", | ||
439 | op); | ||
440 | GNUNET_assert (NULL == op->listener); | ||
441 | if (NULL != op->state) | ||
449 | { | 442 | { |
450 | /* already in #_GSS_operation_destroy() */ | 443 | set->vt->cancel (op); |
451 | return; | 444 | op->state = NULL; |
452 | } | 445 | } |
453 | GNUNET_assert (GNUNET_NO == op->is_incoming); | 446 | if (NULL != set) |
454 | GNUNET_assert (NULL != op->spec); | ||
455 | set = op->spec->set; | ||
456 | GNUNET_CONTAINER_DLL_remove (set->ops_head, | ||
457 | set->ops_tail, | ||
458 | op); | ||
459 | op->vt->cancel (op); | ||
460 | op->vt = NULL; | ||
461 | if (NULL != op->spec) | ||
462 | { | 447 | { |
463 | if (NULL != op->spec->context_msg) | 448 | GNUNET_CONTAINER_DLL_remove (set->ops_head, |
464 | { | 449 | set->ops_tail, |
465 | GNUNET_free (op->spec->context_msg); | 450 | op); |
466 | op->spec->context_msg = NULL; | 451 | op->set = NULL; |
467 | } | 452 | } |
468 | GNUNET_free (op->spec); | 453 | if (NULL != op->context_msg) |
469 | op->spec = NULL; | 454 | { |
455 | GNUNET_free (op->context_msg); | ||
456 | op->context_msg = NULL; | ||
470 | } | 457 | } |
471 | if (NULL != (channel = op->channel)) | 458 | if (NULL != (channel = op->channel)) |
472 | { | 459 | { |
460 | /* This will free op; called conditionally as this helper function | ||
461 | is also called from within the channel disconnect handler. */ | ||
473 | op->channel = NULL; | 462 | op->channel = NULL; |
474 | GNUNET_CADET_channel_destroy (channel); | 463 | GNUNET_CADET_channel_destroy (channel); |
475 | } | 464 | } |
476 | if (GNUNET_YES == gc) | 465 | if ( (NULL != set) && |
466 | (GNUNET_YES == gc) ) | ||
477 | collect_generation_garbage (set); | 467 | collect_generation_garbage (set); |
478 | /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, | 468 | /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, |
479 | * there was a channel end handler that will free 'op' on the call stack. */ | 469 | * there was a channel end handler that will free 'op' on the call stack. */ |
@@ -481,6 +471,28 @@ _GSS_operation_destroy (struct Operation *op, | |||
481 | 471 | ||
482 | 472 | ||
483 | /** | 473 | /** |
474 | * Callback called when a client connects to the service. | ||
475 | * | ||
476 | * @param cls closure for the service | ||
477 | * @param c the new client that connected to the service | ||
478 | * @param mq the message queue used to send messages to the client | ||
479 | * @return @a `struct ClientState` | ||
480 | */ | ||
481 | static void * | ||
482 | client_connect_cb (void *cls, | ||
483 | struct GNUNET_SERVICE_Client *c, | ||
484 | struct GNUNET_MQ_Handle *mq) | ||
485 | { | ||
486 | struct ClientState *cs; | ||
487 | |||
488 | cs = GNUNET_new (struct ClientState); | ||
489 | cs->client = c; | ||
490 | cs->mq = mq; | ||
491 | return cs; | ||
492 | } | ||
493 | |||
494 | |||
495 | /** | ||
484 | * Iterator over hash map entries to free element entries. | 496 | * Iterator over hash map entries to free element entries. |
485 | * | 497 | * |
486 | * @param cls closure | 498 | * @param cls closure |
@@ -496,66 +508,76 @@ destroy_elements_iterator (void *cls, | |||
496 | struct ElementEntry *ee = value; | 508 | struct ElementEntry *ee = value; |
497 | 509 | ||
498 | GNUNET_free_non_null (ee->mutations); | 510 | GNUNET_free_non_null (ee->mutations); |
499 | |||
500 | GNUNET_free (ee); | 511 | GNUNET_free (ee); |
501 | return GNUNET_YES; | 512 | return GNUNET_YES; |
502 | } | 513 | } |
503 | 514 | ||
504 | 515 | ||
505 | /** | 516 | /** |
506 | * Destroy a set, and free all resources and operations associated with it. | 517 | * Clean up after a client has disconnected |
507 | * | 518 | * |
508 | * @param set the set to destroy | 519 | * @param cls closure, unused |
520 | * @param client the client to clean up after | ||
521 | * @param internal_cls the `struct ClientState` | ||
509 | */ | 522 | */ |
510 | static void | 523 | static void |
511 | set_destroy (struct Set *set) | 524 | client_disconnect_cb (void *cls, |
525 | struct GNUNET_SERVICE_Client *client, | ||
526 | void *internal_cls) | ||
512 | { | 527 | { |
513 | if (NULL != set->client) | 528 | struct ClientState *cs = internal_cls; |
514 | { | 529 | struct Operation *op; |
515 | /* If the client is not dead yet, destroy it. The client's destroy | 530 | struct Listener *listener; |
516 | * callback will call `set_destroy()` again in this case. We do | 531 | struct Set *set; |
517 | * this so that the channel end handler still has a valid set handle | 532 | |
518 | * to destroy. */ | 533 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
519 | struct GNUNET_SERVICE_Client *client = set->client; | 534 | "Client disconnected, cleaning up\n"); |
520 | 535 | if (NULL != (set = cs->set)) | |
521 | set->client = NULL; | ||
522 | GNUNET_SERVICE_client_drop (client); | ||
523 | return; | ||
524 | } | ||
525 | GNUNET_assert (NULL != set->state); | ||
526 | while (NULL != set->ops_head) | ||
527 | _GSS_operation_destroy (set->ops_head, GNUNET_NO); | ||
528 | set->vt->destroy_set (set->state); | ||
529 | set->state = NULL; | ||
530 | if (NULL != set->iter) | ||
531 | { | ||
532 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); | ||
533 | set->iter = NULL; | ||
534 | set->iteration_id++; | ||
535 | } | ||
536 | { | 536 | { |
537 | struct SetContent *content; | 537 | struct SetContent *content = set->content; |
538 | struct PendingMutation *pm; | 538 | struct PendingMutation *pm; |
539 | struct PendingMutation *pm_current; | 539 | struct PendingMutation *pm_current; |
540 | struct LazyCopyRequest *lcr; | ||
540 | 541 | ||
541 | content = set->content; | 542 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
543 | "Destroying client's set\n"); | ||
544 | /* Destroy pending set operations */ | ||
545 | while (NULL != set->ops_head) | ||
546 | _GSS_operation_destroy (set->ops_head, | ||
547 | GNUNET_NO); | ||
548 | |||
549 | /* Destroy operation-specific state */ | ||
550 | GNUNET_assert (NULL != set->state); | ||
551 | set->vt->destroy_set (set->state); | ||
552 | set->state = NULL; | ||
553 | |||
554 | /* Clean up ongoing iterations */ | ||
555 | if (NULL != set->iter) | ||
556 | { | ||
557 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); | ||
558 | set->iter = NULL; | ||
559 | set->iteration_id++; | ||
560 | } | ||
542 | 561 | ||
543 | // discard any pending mutations that reference this set | 562 | /* discard any pending mutations that reference this set */ |
544 | pm = content->pending_mutations_head; | 563 | pm = content->pending_mutations_head; |
545 | while (NULL != pm) | 564 | while (NULL != pm) |
546 | { | 565 | { |
547 | pm_current = pm; | 566 | pm_current = pm; |
548 | pm = pm->next; | 567 | pm = pm->next; |
549 | if (pm_current-> set == set) | 568 | if (pm_current->set == set) |
569 | { | ||
550 | GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, | 570 | GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, |
551 | content->pending_mutations_tail, | 571 | content->pending_mutations_tail, |
552 | pm_current); | 572 | pm_current); |
553 | 573 | GNUNET_free (pm_current); | |
574 | } | ||
554 | } | 575 | } |
555 | 576 | ||
577 | /* free set content (or at least decrement RC) */ | ||
556 | set->content = NULL; | 578 | set->content = NULL; |
557 | GNUNET_assert (0 != content->refcount); | 579 | GNUNET_assert (0 != content->refcount); |
558 | content->refcount -= 1; | 580 | content->refcount--; |
559 | if (0 == content->refcount) | 581 | if (0 == content->refcount) |
560 | { | 582 | { |
561 | GNUNET_assert (NULL != content->elements); | 583 | GNUNET_assert (NULL != content->elements); |
@@ -566,154 +588,85 @@ set_destroy (struct Set *set) | |||
566 | content->elements = NULL; | 588 | content->elements = NULL; |
567 | GNUNET_free (content); | 589 | GNUNET_free (content); |
568 | } | 590 | } |
569 | } | 591 | GNUNET_free_non_null (set->excluded_generations); |
570 | GNUNET_free_non_null (set->excluded_generations); | 592 | set->excluded_generations = NULL; |
571 | set->excluded_generations = NULL; | ||
572 | GNUNET_CONTAINER_DLL_remove (sets_head, | ||
573 | sets_tail, | ||
574 | set); | ||
575 | 593 | ||
576 | // remove set from pending copy requests | 594 | /* remove set from pending copy requests */ |
577 | { | ||
578 | struct LazyCopyRequest *lcr; | ||
579 | lcr = lazy_copy_head; | 595 | lcr = lazy_copy_head; |
580 | while (NULL != lcr) | 596 | while (NULL != lcr) |
581 | { | 597 | { |
582 | struct LazyCopyRequest *lcr_current; | 598 | struct LazyCopyRequest *lcr_current = lcr; |
583 | lcr_current = lcr; | 599 | |
584 | lcr = lcr->next; | 600 | lcr = lcr->next; |
585 | if (lcr_current->source_set == set) | 601 | if (lcr_current->source_set == set) |
602 | { | ||
586 | GNUNET_CONTAINER_DLL_remove (lazy_copy_head, | 603 | GNUNET_CONTAINER_DLL_remove (lazy_copy_head, |
587 | lazy_copy_tail, | 604 | lazy_copy_tail, |
588 | lcr_current); | 605 | lcr_current); |
606 | GNUNET_free (lcr_current); | ||
607 | } | ||
589 | } | 608 | } |
609 | GNUNET_free (set); | ||
590 | } | 610 | } |
591 | 611 | ||
592 | GNUNET_free (set); | 612 | if (NULL != (listener = cs->listener)) |
593 | } | ||
594 | |||
595 | |||
596 | /** | ||
597 | * Callback called when a client connects to the service. | ||
598 | * | ||
599 | * @param cls closure for the service | ||
600 | * @param c the new client that connected to the service | ||
601 | * @param mq the message queue used to send messages to the client | ||
602 | * @return @a c | ||
603 | */ | ||
604 | static void * | ||
605 | client_connect_cb (void *cls, | ||
606 | struct GNUNET_SERVICE_Client *c, | ||
607 | struct GNUNET_MQ_Handle *mq) | ||
608 | { | ||
609 | return c; | ||
610 | } | ||
611 | |||
612 | |||
613 | /** | ||
614 | * Clean up after a client has disconnected | ||
615 | * | ||
616 | * @param cls closure, unused | ||
617 | * @param client the client to clean up after | ||
618 | * @param internal_cls our client-specific internal data structure | ||
619 | */ | ||
620 | static void | ||
621 | client_disconnect_cb (void *cls, | ||
622 | struct GNUNET_SERVICE_Client *client, | ||
623 | void *internal_cls) | ||
624 | { | ||
625 | struct Listener *listener; | ||
626 | struct Set *set; | ||
627 | |||
628 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
629 | "client disconnected, cleaning up\n"); | ||
630 | set = set_get (client); | ||
631 | if (NULL != set) | ||
632 | { | 613 | { |
633 | set->client = NULL; | ||
634 | set_destroy (set); | ||
635 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 614 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
636 | "Client's set destroyed\n"); | 615 | "Destroying client's listener\n"); |
637 | } | 616 | GNUNET_CADET_close_port (listener->open_port); |
638 | listener = listener_get (client); | 617 | listener->open_port = NULL; |
639 | if (NULL != listener) | 618 | while (NULL != (op = listener->op_head)) |
640 | { | 619 | incoming_destroy (op); |
641 | listener->client = NULL; | 620 | GNUNET_CONTAINER_DLL_remove (listener_head, |
642 | listener_destroy (listener); | 621 | listener_tail, |
643 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 622 | listener); |
644 | "Client's listener destroyed\n"); | 623 | GNUNET_free (listener); |
645 | } | 624 | } |
625 | GNUNET_free (cs); | ||
646 | } | 626 | } |
647 | 627 | ||
648 | 628 | ||
649 | /** | 629 | /** |
650 | * Destroy an incoming request from a remote peer | 630 | * Check a request for a set operation from another peer. |
651 | * | 631 | * |
652 | * @param incoming remote request to destroy | 632 | * @param cls the operation state |
633 | * @param msg the received message | ||
634 | * @return #GNUNET_OK if the channel should be kept alive, | ||
635 | * #GNUNET_SYSERR to destroy the channel | ||
653 | */ | 636 | */ |
654 | static void | 637 | static int |
655 | incoming_destroy (struct Operation *incoming) | 638 | check_incoming_msg (void *cls, |
639 | const struct OperationRequestMessage *msg) | ||
656 | { | 640 | { |
657 | struct GNUNET_CADET_Channel *channel; | 641 | struct Operation *op = cls; |
642 | struct Listener *listener = op->listener; | ||
643 | const struct GNUNET_MessageHeader *nested_context; | ||
658 | 644 | ||
659 | GNUNET_assert (GNUNET_YES == incoming->is_incoming); | 645 | /* double operation request */ |
660 | GNUNET_CONTAINER_DLL_remove (incoming_head, | 646 | if (0 != op->suggest_id) |
661 | incoming_tail, | ||
662 | incoming); | ||
663 | if (NULL != incoming->timeout_task) | ||
664 | { | 647 | { |
665 | GNUNET_SCHEDULER_cancel (incoming->timeout_task); | 648 | GNUNET_break_op (0); |
666 | incoming->timeout_task = NULL; | 649 | return GNUNET_SYSERR; |
667 | } | 650 | } |
668 | /* make sure that the tunnel end handler will not destroy us again */ | 651 | /* This should be equivalent to the previous condition, but can't hurt to check twice */ |
669 | incoming->vt = NULL; | 652 | if (NULL == op->listener) |
670 | if (NULL != incoming->spec) | ||
671 | { | 653 | { |
672 | GNUNET_free (incoming->spec); | 654 | GNUNET_break (0); |
673 | incoming->spec = NULL; | 655 | return GNUNET_SYSERR; |
674 | } | 656 | } |
675 | if (NULL != (channel = incoming->channel)) | 657 | if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation)) |
676 | { | 658 | { |
677 | incoming->channel = NULL; | 659 | GNUNET_break_op (0); |
678 | GNUNET_CADET_channel_destroy (channel); | 660 | return GNUNET_SYSERR; |
679 | } | 661 | } |
680 | } | 662 | nested_context = GNUNET_MQ_extract_nested_mh (msg); |
681 | 663 | if ( (NULL != nested_context) && | |
682 | 664 | (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) | |
683 | /** | 665 | { |
684 | * Suggest the given request to the listener. The listening client can | 666 | GNUNET_break_op (0); |
685 | * then accept or reject the remote request. | 667 | return GNUNET_SYSERR; |
686 | * | 668 | } |
687 | * @param incoming the incoming peer with the request to suggest | 669 | return GNUNET_OK; |
688 | * @param listener the listener to suggest the request to | ||
689 | */ | ||
690 | static void | ||
691 | incoming_suggest (struct Operation *incoming, | ||
692 | struct Listener *listener) | ||
693 | { | ||
694 | struct GNUNET_MQ_Envelope *mqm; | ||
695 | struct GNUNET_SET_RequestMessage *cmsg; | ||
696 | |||
697 | GNUNET_assert (GNUNET_YES == incoming->is_incoming); | ||
698 | GNUNET_assert (NULL != incoming->spec); | ||
699 | GNUNET_assert (0 == incoming->suggest_id); | ||
700 | incoming->suggest_id = suggest_id++; | ||
701 | if (0 == suggest_id) | ||
702 | suggest_id++; | ||
703 | GNUNET_assert (NULL != incoming->timeout_task); | ||
704 | GNUNET_SCHEDULER_cancel (incoming->timeout_task); | ||
705 | incoming->timeout_task = NULL; | ||
706 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, | ||
707 | GNUNET_MESSAGE_TYPE_SET_REQUEST, | ||
708 | incoming->spec->context_msg); | ||
709 | GNUNET_assert (NULL != mqm); | ||
710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
711 | "Suggesting incoming request with accept id %u to listener\n", | ||
712 | incoming->suggest_id); | ||
713 | cmsg->accept_id = htonl (incoming->suggest_id); | ||
714 | cmsg->peer_id = incoming->spec->peer; | ||
715 | GNUNET_MQ_send (listener->client_mq, | ||
716 | mqm); | ||
717 | } | 670 | } |
718 | 671 | ||
719 | 672 | ||
@@ -729,93 +682,85 @@ incoming_suggest (struct Operation *incoming, | |||
729 | * our virtual table and subsequent msgs would be routed differently (as | 682 | * our virtual table and subsequent msgs would be routed differently (as |
730 | * we then know what type of operation this is). | 683 | * we then know what type of operation this is). |
731 | * | 684 | * |
732 | * @param op the operation state | 685 | * @param cls the operation state |
733 | * @param mh the received message | 686 | * @param msg the received message |
734 | * @return #GNUNET_OK if the channel should be kept alive, | 687 | * @return #GNUNET_OK if the channel should be kept alive, |
735 | * #GNUNET_SYSERR to destroy the channel | 688 | * #GNUNET_SYSERR to destroy the channel |
736 | */ | 689 | */ |
737 | static int | 690 | static void |
738 | handle_incoming_msg (struct Operation *op, | 691 | handle_incoming_msg (void *cls, |
739 | const struct GNUNET_MessageHeader *mh) | 692 | const struct OperationRequestMessage *msg) |
740 | { | 693 | { |
741 | const struct OperationRequestMessage *msg; | 694 | struct Operation *op = cls; |
742 | struct Listener *listener = op->listener; | 695 | struct Listener *listener = op->listener; |
743 | struct OperationSpecification *spec; | ||
744 | const struct GNUNET_MessageHeader *nested_context; | 696 | const struct GNUNET_MessageHeader *nested_context; |
697 | struct GNUNET_MQ_Envelope *env; | ||
698 | struct GNUNET_SET_RequestMessage *cmsg; | ||
745 | 699 | ||
746 | msg = (const struct OperationRequestMessage *) mh; | ||
747 | GNUNET_assert (GNUNET_YES == op->is_incoming); | ||
748 | if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type)) | ||
749 | { | ||
750 | GNUNET_break_op (0); | ||
751 | return GNUNET_SYSERR; | ||
752 | } | ||
753 | /* double operation request */ | ||
754 | if (NULL != op->spec) | ||
755 | { | ||
756 | GNUNET_break_op (0); | ||
757 | return GNUNET_SYSERR; | ||
758 | } | ||
759 | spec = GNUNET_new (struct OperationSpecification); | ||
760 | nested_context = GNUNET_MQ_extract_nested_mh (msg); | 700 | nested_context = GNUNET_MQ_extract_nested_mh (msg); |
761 | if ( (NULL != nested_context) && | ||
762 | (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) | ||
763 | { | ||
764 | GNUNET_break_op (0); | ||
765 | GNUNET_free (spec); | ||
766 | return GNUNET_SYSERR; | ||
767 | } | ||
768 | /* Make a copy of the nested_context (application-specific context | 701 | /* Make a copy of the nested_context (application-specific context |
769 | information that is opaque to set) so we can pass it to the | 702 | information that is opaque to set) so we can pass it to the |
770 | listener later on */ | 703 | listener later on */ |
771 | if (NULL != nested_context) | 704 | if (NULL != nested_context) |
772 | spec->context_msg = GNUNET_copy_message (nested_context); | 705 | op->context_msg = GNUNET_copy_message (nested_context); |
773 | spec->operation = ntohl (msg->operation); | 706 | op->remote_element_count = ntohl (msg->element_count); |
774 | spec->app_id = listener->app_id; | ||
775 | spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, | ||
776 | UINT32_MAX); | ||
777 | spec->peer = op->peer; | ||
778 | spec->remote_element_count = ntohl (msg->element_count); | ||
779 | op->spec = spec; | ||
780 | |||
781 | listener = op->listener; | ||
782 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 707 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
783 | "Received P2P operation request (op %u, port %s) for active listener\n", | 708 | "Received P2P operation request (op %u, port %s) for active listener\n", |
784 | (uint32_t) ntohl (msg->operation), | 709 | (uint32_t) ntohl (msg->operation), |
785 | GNUNET_h2s (&listener->app_id)); | 710 | GNUNET_h2s (&op->listener->app_id)); |
786 | incoming_suggest (op, | 711 | GNUNET_assert (0 == op->suggest_id); |
787 | listener); | 712 | if (0 == suggest_id) |
788 | return GNUNET_OK; | 713 | suggest_id++; |
714 | op->suggest_id = suggest_id++; | ||
715 | GNUNET_assert (NULL != op->timeout_task); | ||
716 | GNUNET_SCHEDULER_cancel (op->timeout_task); | ||
717 | op->timeout_task = NULL; | ||
718 | env = GNUNET_MQ_msg_nested_mh (cmsg, | ||
719 | GNUNET_MESSAGE_TYPE_SET_REQUEST, | ||
720 | op->context_msg); | ||
721 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
722 | "Suggesting incoming request with accept id %u to listener %p of client %p\n", | ||
723 | op->suggest_id, | ||
724 | listener, | ||
725 | listener->cs); | ||
726 | cmsg->accept_id = htonl (op->suggest_id); | ||
727 | cmsg->peer_id = op->peer; | ||
728 | GNUNET_MQ_send (listener->cs->mq, | ||
729 | env); | ||
730 | /* NOTE: GNUNET_CADET_receive_done() will be called in | ||
731 | #handle_client_accept() */ | ||
789 | } | 732 | } |
790 | 733 | ||
791 | 734 | ||
735 | /** | ||
736 | * Add an element to @a set as specified by @a msg | ||
737 | * | ||
738 | * @param set set to manipulate | ||
739 | * @param msg message specifying the change | ||
740 | */ | ||
792 | static void | 741 | static void |
793 | execute_add (struct Set *set, | 742 | execute_add (struct Set *set, |
794 | const struct GNUNET_MessageHeader *m) | 743 | const struct GNUNET_SET_ElementMessage *msg) |
795 | { | 744 | { |
796 | const struct GNUNET_SET_ElementMessage *msg; | ||
797 | struct GNUNET_SET_Element el; | 745 | struct GNUNET_SET_Element el; |
798 | struct ElementEntry *ee; | 746 | struct ElementEntry *ee; |
799 | struct GNUNET_HashCode hash; | 747 | struct GNUNET_HashCode hash; |
800 | 748 | ||
801 | GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type)); | 749 | GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type)); |
802 | 750 | el.size = ntohs (msg->header.size) - sizeof (*msg); | |
803 | msg = (const struct GNUNET_SET_ElementMessage *) m; | ||
804 | el.size = ntohs (m->size) - sizeof *msg; | ||
805 | el.data = &msg[1]; | 751 | el.data = &msg[1]; |
806 | el.element_type = ntohs (msg->element_type); | 752 | el.element_type = ntohs (msg->element_type); |
807 | GNUNET_SET_element_hash (&el, &hash); | 753 | GNUNET_SET_element_hash (&el, |
808 | 754 | &hash); | |
809 | ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, | 755 | ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, |
810 | &hash); | 756 | &hash); |
811 | |||
812 | if (NULL == ee) | 757 | if (NULL == ee) |
813 | { | 758 | { |
814 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 759 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
815 | "Client inserts element %s of size %u\n", | 760 | "Client inserts element %s of size %u\n", |
816 | GNUNET_h2s (&hash), | 761 | GNUNET_h2s (&hash), |
817 | el.size); | 762 | el.size); |
818 | ee = GNUNET_malloc (el.size + sizeof *ee); | 763 | ee = GNUNET_malloc (el.size + sizeof (*ee)); |
819 | ee->element.size = el.size; | 764 | ee->element.size = el.size; |
820 | GNUNET_memcpy (&ee[1], | 765 | GNUNET_memcpy (&ee[1], |
821 | el.data, | 766 | el.data, |
@@ -832,7 +777,11 @@ execute_add (struct Set *set, | |||
832 | ee, | 777 | ee, |
833 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 778 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
834 | } | 779 | } |
835 | else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) | 780 | else if (GNUNET_YES == |
781 | is_element_of_generation (ee, | ||
782 | set->current_generation, | ||
783 | set->excluded_generations, | ||
784 | set->excluded_generations_size)) | ||
836 | { | 785 | { |
837 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 786 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
838 | "Client inserted element %s of size %u twice (ignored)\n", | 787 | "Client inserted element %s of size %u twice (ignored)\n", |
@@ -852,24 +801,27 @@ execute_add (struct Set *set, | |||
852 | ee->mutations_size, | 801 | ee->mutations_size, |
853 | mut); | 802 | mut); |
854 | } | 803 | } |
855 | 804 | set->vt->add (set->state, | |
856 | set->vt->add (set->state, ee); | 805 | ee); |
857 | } | 806 | } |
858 | 807 | ||
859 | 808 | ||
809 | /** | ||
810 | * Remove an element from @a set as specified by @a msg | ||
811 | * | ||
812 | * @param set set to manipulate | ||
813 | * @param msg message specifying the change | ||
814 | */ | ||
860 | static void | 815 | static void |
861 | execute_remove (struct Set *set, | 816 | execute_remove (struct Set *set, |
862 | const struct GNUNET_MessageHeader *m) | 817 | const struct GNUNET_SET_ElementMessage *msg) |
863 | { | 818 | { |
864 | const struct GNUNET_SET_ElementMessage *msg; | ||
865 | struct GNUNET_SET_Element el; | 819 | struct GNUNET_SET_Element el; |
866 | struct ElementEntry *ee; | 820 | struct ElementEntry *ee; |
867 | struct GNUNET_HashCode hash; | 821 | struct GNUNET_HashCode hash; |
868 | 822 | ||
869 | GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type)); | 823 | GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type)); |
870 | 824 | el.size = ntohs (msg->header.size) - sizeof (*msg); | |
871 | msg = (const struct GNUNET_SET_ElementMessage *) m; | ||
872 | el.size = ntohs (m->size) - sizeof *msg; | ||
873 | el.data = &msg[1]; | 825 | el.data = &msg[1]; |
874 | el.element_type = ntohs (msg->element_type); | 826 | el.element_type = ntohs (msg->element_type); |
875 | GNUNET_SET_element_hash (&el, &hash); | 827 | GNUNET_SET_element_hash (&el, &hash); |
@@ -883,7 +835,11 @@ execute_remove (struct Set *set, | |||
883 | el.size); | 835 | el.size); |
884 | return; | 836 | return; |
885 | } | 837 | } |
886 | if (GNUNET_NO == _GSS_is_element_of_set (ee, set)) | 838 | if (GNUNET_NO == |
839 | is_element_of_generation (ee, | ||
840 | set->current_generation, | ||
841 | set->excluded_generations, | ||
842 | set->excluded_generations_size)) | ||
887 | { | 843 | { |
888 | /* Client tried to remove element twice */ | 844 | /* Client tried to remove element twice */ |
889 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 845 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -906,22 +862,28 @@ execute_remove (struct Set *set, | |||
906 | ee->mutations_size, | 862 | ee->mutations_size, |
907 | mut); | 863 | mut); |
908 | } | 864 | } |
909 | set->vt->remove (set->state, ee); | 865 | set->vt->remove (set->state, |
866 | ee); | ||
910 | } | 867 | } |
911 | 868 | ||
912 | 869 | ||
913 | 870 | /** | |
871 | * Perform a mutation on a set as specified by the @a msg | ||
872 | * | ||
873 | * @param set the set to mutate | ||
874 | * @param msg specification of what to change | ||
875 | */ | ||
914 | static void | 876 | static void |
915 | execute_mutation (struct Set *set, | 877 | execute_mutation (struct Set *set, |
916 | const struct GNUNET_MessageHeader *m) | 878 | const struct GNUNET_SET_ElementMessage *msg) |
917 | { | 879 | { |
918 | switch (ntohs (m->type)) | 880 | switch (ntohs (msg->header.type)) |
919 | { | 881 | { |
920 | case GNUNET_MESSAGE_TYPE_SET_ADD: | 882 | case GNUNET_MESSAGE_TYPE_SET_ADD: |
921 | execute_add (set, m); | 883 | execute_add (set, msg); |
922 | break; | 884 | break; |
923 | case GNUNET_MESSAGE_TYPE_SET_REMOVE: | 885 | case GNUNET_MESSAGE_TYPE_SET_REMOVE: |
924 | execute_remove (set, m); | 886 | execute_remove (set, msg); |
925 | break; | 887 | break; |
926 | default: | 888 | default: |
927 | GNUNET_break (0); | 889 | GNUNET_break (0); |
@@ -929,6 +891,34 @@ execute_mutation (struct Set *set, | |||
929 | } | 891 | } |
930 | 892 | ||
931 | 893 | ||
894 | /** | ||
895 | * Execute mutations that were delayed on a set because of | ||
896 | * pending operations. | ||
897 | * | ||
898 | * @param set the set to execute mutations on | ||
899 | */ | ||
900 | static void | ||
901 | execute_delayed_mutations (struct Set *set) | ||
902 | { | ||
903 | struct PendingMutation *pm; | ||
904 | |||
905 | if (0 != set->content->iterator_count) | ||
906 | return; /* still cannot do this */ | ||
907 | while (NULL != (pm = set->content->pending_mutations_head)) | ||
908 | { | ||
909 | GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, | ||
910 | set->content->pending_mutations_tail, | ||
911 | pm); | ||
912 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
913 | "Executing pending mutation on %p.\n", | ||
914 | pm->set); | ||
915 | execute_mutation (pm->set, | ||
916 | pm->msg); | ||
917 | GNUNET_free (pm->msg); | ||
918 | GNUNET_free (pm); | ||
919 | } | ||
920 | } | ||
921 | |||
932 | 922 | ||
933 | /** | 923 | /** |
934 | * Send the next element of a set to the set's client. The next element is given by | 924 | * Send the next element of a set to the set's client. The next element is given by |
@@ -952,65 +942,45 @@ send_client_element (struct Set *set) | |||
952 | struct GNUNET_SET_IterResponseMessage *msg; | 942 | struct GNUNET_SET_IterResponseMessage *msg; |
953 | 943 | ||
954 | GNUNET_assert (NULL != set->iter); | 944 | GNUNET_assert (NULL != set->iter); |
955 | 945 | do { | |
956 | again: | 946 | ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, |
957 | 947 | NULL, | |
958 | ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, | 948 | (const void **) &ee); |
959 | NULL, | 949 | if (GNUNET_NO == ret) |
960 | (const void **) &ee); | ||
961 | if (GNUNET_NO == ret) | ||
962 | { | ||
963 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
964 | "Iteration on %p done.\n", | ||
965 | (void *) set); | ||
966 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); | ||
967 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); | ||
968 | set->iter = NULL; | ||
969 | set->iteration_id++; | ||
970 | |||
971 | GNUNET_assert (set->content->iterator_count > 0); | ||
972 | set->content->iterator_count -= 1; | ||
973 | |||
974 | if (0 == set->content->iterator_count) | ||
975 | { | 950 | { |
976 | while (NULL != set->content->pending_mutations_head) | 951 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
977 | { | 952 | "Iteration on %p done.\n", |
978 | struct PendingMutation *pm; | 953 | set); |
979 | 954 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); | |
980 | pm = set->content->pending_mutations_head; | 955 | GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); |
981 | GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, | 956 | set->iter = NULL; |
982 | set->content->pending_mutations_tail, | 957 | set->iteration_id++; |
983 | pm); | 958 | GNUNET_assert (set->content->iterator_count > 0); |
984 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 959 | set->content->iterator_count--; |
985 | "Executing pending mutation on %p.\n", | 960 | execute_delayed_mutations (set); |
986 | (void *) pm->set); | 961 | GNUNET_MQ_send (set->cs->mq, |
987 | execute_mutation (pm->set, pm->mutation_message); | 962 | ev); |
988 | GNUNET_free (pm->mutation_message); | 963 | return; |
989 | GNUNET_free (pm); | ||
990 | } | ||
991 | } | 964 | } |
992 | |||
993 | } | ||
994 | else | ||
995 | { | ||
996 | GNUNET_assert (NULL != ee); | 965 | GNUNET_assert (NULL != ee); |
997 | 966 | } while (GNUNET_NO == | |
998 | if (GNUNET_NO == is_element_of_iteration (ee, set)) | 967 | is_element_of_generation (ee, |
999 | goto again; | 968 | set->iter_generation, |
1000 | 969 | set->excluded_generations, | |
1001 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 970 | set->excluded_generations_size)); |
1002 | "Sending iteration element on %p.\n", | 971 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1003 | (void *) set); | 972 | "Sending iteration element on %p.\n", |
1004 | ev = GNUNET_MQ_msg_extra (msg, | 973 | set); |
1005 | ee->element.size, | 974 | ev = GNUNET_MQ_msg_extra (msg, |
1006 | GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); | 975 | ee->element.size, |
1007 | GNUNET_memcpy (&msg[1], | 976 | GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); |
1008 | ee->element.data, | 977 | GNUNET_memcpy (&msg[1], |
1009 | ee->element.size); | 978 | ee->element.data, |
1010 | msg->element_type = htons (ee->element.element_type); | 979 | ee->element.size); |
1011 | msg->iteration_id = htons (set->iteration_id); | 980 | msg->element_type = htons (ee->element.element_type); |
1012 | } | 981 | msg->iteration_id = htons (set->iteration_id); |
1013 | GNUNET_MQ_send (set->client_mq, ev); | 982 | GNUNET_MQ_send (set->cs->mq, |
983 | ev); | ||
1014 | } | 984 | } |
1015 | 985 | ||
1016 | 986 | ||
@@ -1027,22 +997,21 @@ static void | |||
1027 | handle_client_iterate (void *cls, | 997 | handle_client_iterate (void *cls, |
1028 | const struct GNUNET_MessageHeader *m) | 998 | const struct GNUNET_MessageHeader *m) |
1029 | { | 999 | { |
1030 | struct GNUNET_SERVICE_Client *client = cls; | 1000 | struct ClientState *cs = cls; |
1031 | struct Set *set; | 1001 | struct Set *set; |
1032 | 1002 | ||
1033 | set = set_get (client); | 1003 | if (NULL == (set = cs->set)) |
1034 | if (NULL == set) | ||
1035 | { | 1004 | { |
1036 | /* attempt to iterate over a non existing set */ | 1005 | /* attempt to iterate over a non existing set */ |
1037 | GNUNET_break (0); | 1006 | GNUNET_break (0); |
1038 | GNUNET_SERVICE_client_drop (client); | 1007 | GNUNET_SERVICE_client_drop (cs->client); |
1039 | return; | 1008 | return; |
1040 | } | 1009 | } |
1041 | if (NULL != set->iter) | 1010 | if (NULL != set->iter) |
1042 | { | 1011 | { |
1043 | /* Only one concurrent iterate-action allowed per set */ | 1012 | /* Only one concurrent iterate-action allowed per set */ |
1044 | GNUNET_break (0); | 1013 | GNUNET_break (0); |
1045 | GNUNET_SERVICE_client_drop (client); | 1014 | GNUNET_SERVICE_client_drop (cs->client); |
1046 | return; | 1015 | return; |
1047 | } | 1016 | } |
1048 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1017 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1050,8 +1019,8 @@ handle_client_iterate (void *cls, | |||
1050 | (void *) set, | 1019 | (void *) set, |
1051 | set->current_generation, | 1020 | set->current_generation, |
1052 | GNUNET_CONTAINER_multihashmap_size (set->content->elements)); | 1021 | GNUNET_CONTAINER_multihashmap_size (set->content->elements)); |
1053 | GNUNET_SERVICE_client_continue (client); | 1022 | GNUNET_SERVICE_client_continue (cs->client); |
1054 | set->content->iterator_count += 1; | 1023 | set->content->iterator_count++; |
1055 | set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); | 1024 | set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); |
1056 | set->iter_generation = set->current_generation; | 1025 | set->iter_generation = set->current_generation; |
1057 | send_client_element (set); | 1026 | send_client_element (set); |
@@ -1070,17 +1039,17 @@ static void | |||
1070 | handle_client_create_set (void *cls, | 1039 | handle_client_create_set (void *cls, |
1071 | const struct GNUNET_SET_CreateMessage *msg) | 1040 | const struct GNUNET_SET_CreateMessage *msg) |
1072 | { | 1041 | { |
1073 | struct GNUNET_SERVICE_Client *client = cls; | 1042 | struct ClientState *cs = cls; |
1074 | struct Set *set; | 1043 | struct Set *set; |
1075 | 1044 | ||
1076 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1045 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1077 | "Client created new set (operation %u)\n", | 1046 | "Client created new set (operation %u)\n", |
1078 | (uint32_t) ntohl (msg->operation)); | 1047 | (uint32_t) ntohl (msg->operation)); |
1079 | if (NULL != set_get (client)) | 1048 | if (NULL != cs->set) |
1080 | { | 1049 | { |
1081 | /* There can only be one set per client */ | 1050 | /* There can only be one set per client */ |
1082 | GNUNET_break (0); | 1051 | GNUNET_break (0); |
1083 | GNUNET_SERVICE_client_drop (client); | 1052 | GNUNET_SERVICE_client_drop (cs->client); |
1084 | return; | 1053 | return; |
1085 | } | 1054 | } |
1086 | set = GNUNET_new (struct Set); | 1055 | set = GNUNET_new (struct Set); |
@@ -1095,27 +1064,25 @@ handle_client_create_set (void *cls, | |||
1095 | default: | 1064 | default: |
1096 | GNUNET_free (set); | 1065 | GNUNET_free (set); |
1097 | GNUNET_break (0); | 1066 | GNUNET_break (0); |
1098 | GNUNET_SERVICE_client_drop (client); | 1067 | GNUNET_SERVICE_client_drop (cs->client); |
1099 | return; | 1068 | return; |
1100 | } | 1069 | } |
1101 | set->operation = ntohl (msg->operation); | 1070 | set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); |
1102 | set->state = set->vt->create (); | 1071 | set->state = set->vt->create (); |
1103 | if (NULL == set->state) | 1072 | if (NULL == set->state) |
1104 | { | 1073 | { |
1105 | /* initialization failed (i.e. out of memory) */ | 1074 | /* initialization failed (i.e. out of memory) */ |
1106 | GNUNET_free (set); | 1075 | GNUNET_free (set); |
1107 | GNUNET_SERVICE_client_drop (client); | 1076 | GNUNET_SERVICE_client_drop (cs->client); |
1108 | return; | 1077 | return; |
1109 | } | 1078 | } |
1110 | set->content = GNUNET_new (struct SetContent); | 1079 | set->content = GNUNET_new (struct SetContent); |
1111 | set->content->refcount = 1; | 1080 | set->content->refcount = 1; |
1112 | set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | 1081 | set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, |
1113 | set->client = client; | 1082 | GNUNET_YES); |
1114 | set->client_mq = GNUNET_SERVICE_client_get_mq (client); | 1083 | set->cs = cs; |
1115 | GNUNET_CONTAINER_DLL_insert (sets_head, | 1084 | cs->set = set; |
1116 | sets_tail, | 1085 | GNUNET_SERVICE_client_continue (cs->client); |
1117 | set); | ||
1118 | GNUNET_SERVICE_client_continue (client); | ||
1119 | } | 1086 | } |
1120 | 1087 | ||
1121 | 1088 | ||
@@ -1131,31 +1098,12 @@ handle_client_create_set (void *cls, | |||
1131 | static void | 1098 | static void |
1132 | incoming_timeout_cb (void *cls) | 1099 | incoming_timeout_cb (void *cls) |
1133 | { | 1100 | { |
1134 | struct Operation *incoming = cls; | 1101 | struct Operation *op = cls; |
1135 | 1102 | ||
1136 | incoming->timeout_task = NULL; | 1103 | op->timeout_task = NULL; |
1137 | GNUNET_assert (GNUNET_YES == incoming->is_incoming); | ||
1138 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1104 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1139 | "Remote peer's incoming request timed out\n"); | 1105 | "Remote peer's incoming request timed out\n"); |
1140 | incoming_destroy (incoming); | ||
1141 | } | ||
1142 | |||
1143 | |||
1144 | /** | ||
1145 | * Terminates an incoming operation in case we have not yet received an | ||
1146 | * operation request. Called by the channel destruction handler. | ||
1147 | * | ||
1148 | * @param op the channel context | ||
1149 | */ | ||
1150 | static void | ||
1151 | handle_incoming_disconnect (struct Operation *op) | ||
1152 | { | ||
1153 | GNUNET_assert (GNUNET_YES == op->is_incoming); | ||
1154 | /* channel is already dead, incoming_destroy must not | ||
1155 | * destroy it ... */ | ||
1156 | op->channel = NULL; | ||
1157 | incoming_destroy (op); | 1106 | incoming_destroy (op); |
1158 | op->vt = NULL; | ||
1159 | } | 1107 | } |
1160 | 1108 | ||
1161 | 1109 | ||
@@ -1180,32 +1128,26 @@ channel_new_cb (void *cls, | |||
1180 | struct GNUNET_CADET_Channel *channel, | 1128 | struct GNUNET_CADET_Channel *channel, |
1181 | const struct GNUNET_PeerIdentity *source) | 1129 | const struct GNUNET_PeerIdentity *source) |
1182 | { | 1130 | { |
1183 | static const struct SetVT incoming_vt = { | ||
1184 | .msg_handler = &handle_incoming_msg, | ||
1185 | .peer_disconnect = &handle_incoming_disconnect | ||
1186 | }; | ||
1187 | struct Listener *listener = cls; | 1131 | struct Listener *listener = cls; |
1188 | struct Operation *incoming; | 1132 | struct Operation *op; |
1189 | 1133 | ||
1190 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1134 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1191 | "New incoming channel\n"); | 1135 | "New incoming channel\n"); |
1192 | incoming = GNUNET_new (struct Operation); | 1136 | op = GNUNET_new (struct Operation); |
1193 | incoming->listener = listener; | 1137 | op->listener = listener; |
1194 | incoming->is_incoming = GNUNET_YES; | 1138 | op->peer = *source; |
1195 | incoming->peer = *source; | 1139 | op->channel = channel; |
1196 | incoming->channel = channel; | 1140 | op->mq = GNUNET_CADET_get_mq (op->channel); |
1197 | incoming->mq = GNUNET_CADET_get_mq (incoming->channel); | 1141 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, |
1198 | incoming->vt = &incoming_vt; | 1142 | UINT32_MAX); |
1199 | incoming->timeout_task | 1143 | op->timeout_task |
1200 | = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, | 1144 | = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, |
1201 | &incoming_timeout_cb, | 1145 | &incoming_timeout_cb, |
1202 | incoming); | 1146 | op); |
1203 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, | 1147 | GNUNET_CONTAINER_DLL_insert (listener->op_head, |
1204 | incoming_tail, | 1148 | listener->op_tail, |
1205 | incoming); | 1149 | op); |
1206 | // incoming_suggest (incoming, | 1150 | return op; |
1207 | // listener); | ||
1208 | return incoming; | ||
1209 | } | 1151 | } |
1210 | 1152 | ||
1211 | 1153 | ||
@@ -1234,22 +1176,14 @@ channel_end_cb (void *channel_ctx, | |||
1234 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1176 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1235 | "channel_end_cb called\n"); | 1177 | "channel_end_cb called\n"); |
1236 | op->channel = NULL; | 1178 | op->channel = NULL; |
1237 | op->keep++; | 1179 | if (NULL != op->listener) |
1238 | /* the vt can be null if a client already requested canceling op. */ | 1180 | incoming_destroy (op); |
1239 | if (NULL != op->vt) | 1181 | else if (NULL != op->set) |
1240 | { | 1182 | op->set->vt->channel_death (op); |
1241 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1183 | else |
1242 | "calling peer disconnect due to channel end\n"); | 1184 | _GSS_operation_destroy (op, |
1243 | op->vt->peer_disconnect (op); | 1185 | GNUNET_YES); |
1244 | } | 1186 | GNUNET_free (op); |
1245 | op->keep--; | ||
1246 | if (0 == op->keep) | ||
1247 | { | ||
1248 | /* cadet will never call us with the context again! */ | ||
1249 | GNUNET_free (op); | ||
1250 | } | ||
1251 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1252 | "channel_end_cb finished\n"); | ||
1253 | } | 1187 | } |
1254 | 1188 | ||
1255 | 1189 | ||
@@ -1275,60 +1209,6 @@ channel_window_cb (void *cls, | |||
1275 | /* FIXME: not implemented, we could do flow control here... */ | 1209 | /* FIXME: not implemented, we could do flow control here... */ |
1276 | } | 1210 | } |
1277 | 1211 | ||
1278 | /** | ||
1279 | * FIXME: hack-job. Migrate to proper handler array use! | ||
1280 | * | ||
1281 | * @param cls local state associated with the channel. | ||
1282 | * @param message The actual message. | ||
1283 | */ | ||
1284 | static int | ||
1285 | check_p2p_message (void *cls, | ||
1286 | const struct GNUNET_MessageHeader *message) | ||
1287 | { | ||
1288 | return GNUNET_OK; | ||
1289 | } | ||
1290 | |||
1291 | |||
1292 | /** | ||
1293 | * FIXME: hack-job. Migrate to proper handler array use! | ||
1294 | * | ||
1295 | * Functions with this signature are called whenever a message is | ||
1296 | * received via a cadet channel. | ||
1297 | * | ||
1298 | * The msg_handler is a virtual table set in initially either when a peer | ||
1299 | * creates a new channel with us, or once we create a new channel | ||
1300 | * ourselves (evaluate). | ||
1301 | * | ||
1302 | * Once we know the exact type of operation (union/intersection), the vt is | ||
1303 | * replaced with an operation specific instance (_GSS_[op]_vt). | ||
1304 | * | ||
1305 | * @param cls local state associated with the channel. | ||
1306 | * @param message The actual message. | ||
1307 | */ | ||
1308 | static void | ||
1309 | handle_p2p_message (void *cls, | ||
1310 | const struct GNUNET_MessageHeader *message) | ||
1311 | { | ||
1312 | struct Operation *op = cls; | ||
1313 | int ret; | ||
1314 | |||
1315 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1316 | "Dispatching cadet message (type: %u)\n", | ||
1317 | ntohs (message->type)); | ||
1318 | /* do this before the handler, as the handler might kill the channel */ | ||
1319 | GNUNET_CADET_receive_done (op->channel); | ||
1320 | if (NULL != op->vt) | ||
1321 | ret = op->vt->msg_handler (op, | ||
1322 | message); | ||
1323 | else | ||
1324 | ret = GNUNET_SYSERR; | ||
1325 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1326 | "Handled cadet message (type: %u)\n", | ||
1327 | ntohs (message->type)); | ||
1328 | if (GNUNET_OK != ret) | ||
1329 | GNUNET_CADET_channel_destroy (op->channel); | ||
1330 | } | ||
1331 | |||
1332 | 1212 | ||
1333 | /** | 1213 | /** |
1334 | * Called when a client wants to create a new listener. | 1214 | * Called when a client wants to create a new listener. |
@@ -1340,116 +1220,99 @@ static void | |||
1340 | handle_client_listen (void *cls, | 1220 | handle_client_listen (void *cls, |
1341 | const struct GNUNET_SET_ListenMessage *msg) | 1221 | const struct GNUNET_SET_ListenMessage *msg) |
1342 | { | 1222 | { |
1343 | struct GNUNET_SERVICE_Client *client = cls; | 1223 | struct ClientState *cs = cls; |
1344 | struct GNUNET_MQ_MessageHandler cadet_handlers[] = { | 1224 | struct GNUNET_MQ_MessageHandler cadet_handlers[] = { |
1345 | GNUNET_MQ_hd_var_size (p2p_message, | 1225 | GNUNET_MQ_hd_var_size (incoming_msg, |
1346 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 1226 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
1347 | struct GNUNET_MessageHeader, | 1227 | struct OperationRequestMessage, |
1348 | NULL), | 1228 | NULL), |
1349 | GNUNET_MQ_hd_var_size (p2p_message, | 1229 | GNUNET_MQ_hd_var_size (union_p2p_ibf, |
1350 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, | 1230 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, |
1351 | struct GNUNET_MessageHeader, | 1231 | struct IBFMessage, |
1352 | NULL), | 1232 | NULL), |
1353 | GNUNET_MQ_hd_var_size (p2p_message, | 1233 | GNUNET_MQ_hd_var_size (union_p2p_elements, |
1354 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, | 1234 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, |
1355 | struct GNUNET_MessageHeader, | 1235 | struct GNUNET_SET_ElementMessage, |
1356 | NULL), | 1236 | NULL), |
1357 | GNUNET_MQ_hd_var_size (p2p_message, | 1237 | GNUNET_MQ_hd_var_size (union_p2p_offer, |
1358 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, | 1238 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, |
1359 | struct GNUNET_MessageHeader, | 1239 | struct GNUNET_MessageHeader, |
1360 | NULL), | 1240 | NULL), |
1361 | GNUNET_MQ_hd_var_size (p2p_message, | 1241 | GNUNET_MQ_hd_var_size (union_p2p_inquiry, |
1362 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, | 1242 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, |
1363 | struct GNUNET_MessageHeader, | 1243 | struct InquiryMessage, |
1364 | NULL), | 1244 | NULL), |
1365 | GNUNET_MQ_hd_var_size (p2p_message, | 1245 | GNUNET_MQ_hd_var_size (union_p2p_demand, |
1366 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, | 1246 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, |
1367 | struct GNUNET_MessageHeader, | 1247 | struct GNUNET_MessageHeader, |
1368 | NULL), | 1248 | NULL), |
1369 | GNUNET_MQ_hd_var_size (p2p_message, | 1249 | GNUNET_MQ_hd_fixed_size (union_p2p_done, |
1370 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, | 1250 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, |
1371 | struct GNUNET_MessageHeader, | 1251 | struct GNUNET_MessageHeader, |
1372 | NULL), | 1252 | NULL), |
1373 | GNUNET_MQ_hd_var_size (p2p_message, | 1253 | GNUNET_MQ_hd_fixed_size (union_p2p_full_done, |
1374 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, | 1254 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, |
1375 | struct GNUNET_MessageHeader, | 1255 | struct GNUNET_MessageHeader, |
1376 | NULL), | 1256 | NULL), |
1377 | GNUNET_MQ_hd_var_size (p2p_message, | 1257 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, |
1378 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, | 1258 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, |
1379 | struct GNUNET_MessageHeader, | 1259 | struct GNUNET_MessageHeader, |
1380 | NULL), | 1260 | NULL), |
1381 | GNUNET_MQ_hd_var_size (p2p_message, | 1261 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, |
1382 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, | 1262 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, |
1383 | struct GNUNET_MessageHeader, | 1263 | struct StrataEstimatorMessage, |
1384 | NULL), | 1264 | NULL), |
1385 | GNUNET_MQ_hd_var_size (p2p_message, | 1265 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, |
1386 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, | 1266 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, |
1387 | struct GNUNET_MessageHeader, | 1267 | struct StrataEstimatorMessage, |
1388 | NULL), | 1268 | NULL), |
1389 | GNUNET_MQ_hd_var_size (p2p_message, | 1269 | GNUNET_MQ_hd_var_size (union_p2p_full_element, |
1390 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, | 1270 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, |
1391 | struct GNUNET_MessageHeader, | 1271 | struct GNUNET_SET_ElementMessage, |
1392 | NULL), | ||
1393 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1394 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, | ||
1395 | struct GNUNET_MessageHeader, | ||
1396 | NULL), | 1272 | NULL), |
1397 | GNUNET_MQ_hd_var_size (p2p_message, | 1273 | GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, |
1274 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, | ||
1275 | struct IntersectionElementInfoMessage, | ||
1276 | NULL), | ||
1277 | GNUNET_MQ_hd_var_size (intersection_p2p_bf, | ||
1398 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, | 1278 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, |
1399 | struct GNUNET_MessageHeader, | 1279 | struct BFMessage, |
1400 | NULL), | ||
1401 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1402 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, | ||
1403 | struct GNUNET_MessageHeader, | ||
1404 | NULL), | 1280 | NULL), |
1281 | GNUNET_MQ_hd_fixed_size (intersection_p2p_done, | ||
1282 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, | ||
1283 | struct IntersectionDoneMessage, | ||
1284 | NULL), | ||
1405 | GNUNET_MQ_handler_end () | 1285 | GNUNET_MQ_handler_end () |
1406 | }; | 1286 | }; |
1407 | struct Listener *listener; | 1287 | struct Listener *listener; |
1408 | 1288 | ||
1409 | if (NULL != listener_get (client)) | 1289 | if (NULL != cs->listener) |
1410 | { | 1290 | { |
1411 | /* max. one active listener per client! */ | 1291 | /* max. one active listener per client! */ |
1412 | GNUNET_break (0); | 1292 | GNUNET_break (0); |
1413 | GNUNET_SERVICE_client_drop (client); | 1293 | GNUNET_SERVICE_client_drop (cs->client); |
1414 | return; | 1294 | return; |
1415 | } | 1295 | } |
1416 | listener = GNUNET_new (struct Listener); | 1296 | listener = GNUNET_new (struct Listener); |
1417 | listener->client = client; | 1297 | listener->cs = cs; |
1418 | listener->client_mq = GNUNET_SERVICE_client_get_mq (client); | ||
1419 | listener->app_id = msg->app_id; | 1298 | listener->app_id = msg->app_id; |
1420 | listener->operation = ntohl (msg->operation); | 1299 | listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); |
1421 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, | 1300 | GNUNET_CONTAINER_DLL_insert (listener_head, |
1422 | listeners_tail, | 1301 | listener_tail, |
1423 | listener); | 1302 | listener); |
1424 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1303 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1425 | "New listener created (op %u, port %s)\n", | 1304 | "New listener created (op %u, port %s)\n", |
1426 | listener->operation, | 1305 | listener->operation, |
1427 | GNUNET_h2s (&listener->app_id)); | 1306 | GNUNET_h2s (&listener->app_id)); |
1428 | listener->open_port = GNUNET_CADET_open_porT (cadet, | 1307 | listener->open_port |
1429 | &msg->app_id, | 1308 | = GNUNET_CADET_open_port (cadet, |
1430 | &channel_new_cb, | 1309 | &msg->app_id, |
1431 | listener, | 1310 | &channel_new_cb, |
1432 | &channel_window_cb, | 1311 | listener, |
1433 | &channel_end_cb, | 1312 | &channel_window_cb, |
1434 | cadet_handlers); | 1313 | &channel_end_cb, |
1435 | /* check for existing incoming requests the listener might be interested in */ | 1314 | cadet_handlers); |
1436 | for (struct Operation *op = incoming_head; NULL != op; op = op->next) | 1315 | GNUNET_SERVICE_client_continue (cs->client); |
1437 | { | ||
1438 | if (NULL == op->spec) | ||
1439 | continue; /* no details available yet */ | ||
1440 | if (0 != op->suggest_id) | ||
1441 | continue; /* this one has been already suggested to a listener */ | ||
1442 | if (listener->operation != op->spec->operation) | ||
1443 | continue; /* incompatible operation */ | ||
1444 | if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, | ||
1445 | &op->spec->app_id)) | ||
1446 | continue; /* incompatible appliation */ | ||
1447 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1448 | "Found matching existing request\n"); | ||
1449 | incoming_suggest (op, | ||
1450 | listener); | ||
1451 | } | ||
1452 | GNUNET_SERVICE_client_continue (client); | ||
1453 | } | 1316 | } |
1454 | 1317 | ||
1455 | 1318 | ||
@@ -1464,23 +1327,26 @@ static void | |||
1464 | handle_client_reject (void *cls, | 1327 | handle_client_reject (void *cls, |
1465 | const struct GNUNET_SET_RejectMessage *msg) | 1328 | const struct GNUNET_SET_RejectMessage *msg) |
1466 | { | 1329 | { |
1467 | struct GNUNET_SERVICE_Client *client = cls; | 1330 | struct ClientState *cs = cls; |
1468 | struct Operation *incoming; | 1331 | struct Operation *op; |
1469 | 1332 | ||
1470 | incoming = get_incoming (ntohl (msg->accept_reject_id)); | 1333 | op = get_incoming (ntohl (msg->accept_reject_id)); |
1471 | if (NULL == incoming) | 1334 | if (NULL == op) |
1472 | { | 1335 | { |
1473 | /* no matching incoming operation for this reject */ | 1336 | /* no matching incoming operation for this reject; |
1474 | GNUNET_break (0); | 1337 | could be that the other peer already disconnected... */ |
1475 | GNUNET_SERVICE_client_drop (client); | 1338 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
1339 | "Client rejected unknown operation %u\n", | ||
1340 | (unsigned int) ntohl (msg->accept_reject_id)); | ||
1341 | GNUNET_SERVICE_client_continue (cs->client); | ||
1476 | return; | 1342 | return; |
1477 | } | 1343 | } |
1478 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1344 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1479 | "Peer request (op %u, app %s) rejected by client\n", | 1345 | "Peer request (op %u, app %s) rejected by client\n", |
1480 | incoming->spec->operation, | 1346 | op->listener->operation, |
1481 | GNUNET_h2s (&incoming->spec->app_id)); | 1347 | GNUNET_h2s (&cs->listener->app_id)); |
1482 | GNUNET_CADET_channel_destroy (incoming->channel); | 1348 | GNUNET_CADET_channel_destroy (op->channel); |
1483 | GNUNET_SERVICE_client_continue (client); | 1349 | GNUNET_SERVICE_client_continue (cs->client); |
1484 | } | 1350 | } |
1485 | 1351 | ||
1486 | 1352 | ||
@@ -1488,13 +1354,14 @@ handle_client_reject (void *cls, | |||
1488 | * Called when a client wants to add or remove an element to a set it inhabits. | 1354 | * Called when a client wants to add or remove an element to a set it inhabits. |
1489 | * | 1355 | * |
1490 | * @param cls client that sent the message | 1356 | * @param cls client that sent the message |
1491 | * @param m message sent by the client | 1357 | * @param msg message sent by the client |
1492 | */ | 1358 | */ |
1493 | static int | 1359 | static int |
1494 | check_client_mutation (void *cls, | 1360 | check_client_mutation (void *cls, |
1495 | const struct GNUNET_MessageHeader *m) | 1361 | const struct GNUNET_SET_ElementMessage *msg) |
1496 | { | 1362 | { |
1497 | /* FIXME: any check we might want to do here? */ | 1363 | /* NOTE: Technically, we should probably check with the |
1364 | block library whether the element we are given is well-formed */ | ||
1498 | return GNUNET_OK; | 1365 | return GNUNET_OK; |
1499 | } | 1366 | } |
1500 | 1367 | ||
@@ -1503,25 +1370,23 @@ check_client_mutation (void *cls, | |||
1503 | * Called when a client wants to add or remove an element to a set it inhabits. | 1370 | * Called when a client wants to add or remove an element to a set it inhabits. |
1504 | * | 1371 | * |
1505 | * @param cls client that sent the message | 1372 | * @param cls client that sent the message |
1506 | * @param m message sent by the client | 1373 | * @param msg message sent by the client |
1507 | */ | 1374 | */ |
1508 | static void | 1375 | static void |
1509 | handle_client_mutation (void *cls, | 1376 | handle_client_mutation (void *cls, |
1510 | const struct GNUNET_MessageHeader *m) | 1377 | const struct GNUNET_SET_ElementMessage *msg) |
1511 | { | 1378 | { |
1512 | struct GNUNET_SERVICE_Client *client = cls; | 1379 | struct ClientState *cs = cls; |
1513 | struct Set *set; | 1380 | struct Set *set; |
1514 | 1381 | ||
1515 | set = set_get (client); | 1382 | if (NULL == (set = cs->set)) |
1516 | if (NULL == set) | ||
1517 | { | 1383 | { |
1518 | /* client without a set requested an operation */ | 1384 | /* client without a set requested an operation */ |
1519 | GNUNET_break (0); | 1385 | GNUNET_break (0); |
1520 | GNUNET_SERVICE_client_drop (client); | 1386 | GNUNET_SERVICE_client_drop (cs->client); |
1521 | return; | 1387 | return; |
1522 | } | 1388 | } |
1523 | 1389 | GNUNET_SERVICE_client_continue (cs->client); | |
1524 | GNUNET_SERVICE_client_continue (client); | ||
1525 | 1390 | ||
1526 | if (0 != set->content->iterator_count) | 1391 | if (0 != set->content->iterator_count) |
1527 | { | 1392 | { |
@@ -1529,16 +1394,18 @@ handle_client_mutation (void *cls, | |||
1529 | 1394 | ||
1530 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1395 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1531 | "Scheduling mutation on set\n"); | 1396 | "Scheduling mutation on set\n"); |
1532 | |||
1533 | pm = GNUNET_new (struct PendingMutation); | 1397 | pm = GNUNET_new (struct PendingMutation); |
1534 | pm->mutation_message = GNUNET_copy_message (m); | 1398 | pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); |
1535 | pm->set = set; | 1399 | pm->set = set; |
1536 | GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, | 1400 | GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, |
1537 | set->content->pending_mutations_tail, | 1401 | set->content->pending_mutations_tail, |
1538 | pm); | 1402 | pm); |
1539 | return; | 1403 | return; |
1540 | } | 1404 | } |
1541 | execute_mutation (set, m); | 1405 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1406 | "Executing mutation on set\n"); | ||
1407 | execute_mutation (set, | ||
1408 | msg); | ||
1542 | } | 1409 | } |
1543 | 1410 | ||
1544 | 1411 | ||
@@ -1555,8 +1422,8 @@ advance_generation (struct Set *set) | |||
1555 | 1422 | ||
1556 | if (set->current_generation == set->content->latest_generation) | 1423 | if (set->current_generation == set->content->latest_generation) |
1557 | { | 1424 | { |
1558 | set->content->latest_generation += 1; | 1425 | set->content->latest_generation++; |
1559 | set->current_generation += 1; | 1426 | set->current_generation++; |
1560 | return; | 1427 | return; |
1561 | } | 1428 | } |
1562 | 1429 | ||
@@ -1564,10 +1431,8 @@ advance_generation (struct Set *set) | |||
1564 | 1431 | ||
1565 | r.start = set->current_generation + 1; | 1432 | r.start = set->current_generation + 1; |
1566 | r.end = set->content->latest_generation + 1; | 1433 | r.end = set->content->latest_generation + 1; |
1567 | |||
1568 | set->content->latest_generation = r.end; | 1434 | set->content->latest_generation = r.end; |
1569 | set->current_generation = r.end; | 1435 | set->current_generation = r.end; |
1570 | |||
1571 | GNUNET_array_append (set->excluded_generations, | 1436 | GNUNET_array_append (set->excluded_generations, |
1572 | set->excluded_generations_size, | 1437 | set->excluded_generations_size, |
1573 | r); | 1438 | r); |
@@ -1605,112 +1470,105 @@ static void | |||
1605 | handle_client_evaluate (void *cls, | 1470 | handle_client_evaluate (void *cls, |
1606 | const struct GNUNET_SET_EvaluateMessage *msg) | 1471 | const struct GNUNET_SET_EvaluateMessage *msg) |
1607 | { | 1472 | { |
1608 | struct GNUNET_SERVICE_Client *client = cls; | 1473 | struct ClientState *cs = cls; |
1609 | struct Operation *op = GNUNET_new (struct Operation); | 1474 | struct Operation *op = GNUNET_new (struct Operation); |
1610 | const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { | 1475 | const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { |
1611 | GNUNET_MQ_hd_var_size (p2p_message, | 1476 | GNUNET_MQ_hd_var_size (incoming_msg, |
1612 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 1477 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
1613 | struct GNUNET_MessageHeader, | 1478 | struct OperationRequestMessage, |
1614 | op), | 1479 | op), |
1615 | GNUNET_MQ_hd_var_size (p2p_message, | 1480 | GNUNET_MQ_hd_var_size (union_p2p_ibf, |
1616 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, | 1481 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, |
1617 | struct GNUNET_MessageHeader, | 1482 | struct IBFMessage, |
1618 | op), | 1483 | op), |
1619 | GNUNET_MQ_hd_var_size (p2p_message, | 1484 | GNUNET_MQ_hd_var_size (union_p2p_elements, |
1620 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, | 1485 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, |
1621 | struct GNUNET_MessageHeader, | 1486 | struct GNUNET_SET_ElementMessage, |
1622 | op), | 1487 | op), |
1623 | GNUNET_MQ_hd_var_size (p2p_message, | 1488 | GNUNET_MQ_hd_var_size (union_p2p_offer, |
1624 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, | 1489 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, |
1625 | struct GNUNET_MessageHeader, | 1490 | struct GNUNET_MessageHeader, |
1626 | op), | 1491 | op), |
1627 | GNUNET_MQ_hd_var_size (p2p_message, | 1492 | GNUNET_MQ_hd_var_size (union_p2p_inquiry, |
1628 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, | 1493 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, |
1629 | struct GNUNET_MessageHeader, | 1494 | struct InquiryMessage, |
1630 | op), | 1495 | op), |
1631 | GNUNET_MQ_hd_var_size (p2p_message, | 1496 | GNUNET_MQ_hd_var_size (union_p2p_demand, |
1632 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, | 1497 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, |
1633 | struct GNUNET_MessageHeader, | 1498 | struct GNUNET_MessageHeader, |
1634 | op), | 1499 | op), |
1635 | GNUNET_MQ_hd_var_size (p2p_message, | 1500 | GNUNET_MQ_hd_fixed_size (union_p2p_done, |
1636 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, | 1501 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, |
1637 | struct GNUNET_MessageHeader, | 1502 | struct GNUNET_MessageHeader, |
1638 | op), | 1503 | op), |
1639 | GNUNET_MQ_hd_var_size (p2p_message, | 1504 | GNUNET_MQ_hd_fixed_size (union_p2p_full_done, |
1505 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, | ||
1506 | struct GNUNET_MessageHeader, | ||
1507 | op), | ||
1508 | GNUNET_MQ_hd_fixed_size (union_p2p_request_full, | ||
1509 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, | ||
1510 | struct GNUNET_MessageHeader, | ||
1511 | op), | ||
1512 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, | ||
1640 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, | 1513 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, |
1641 | struct GNUNET_MessageHeader, | 1514 | struct StrataEstimatorMessage, |
1642 | op), | 1515 | op), |
1643 | GNUNET_MQ_hd_var_size (p2p_message, | 1516 | GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, |
1644 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, | 1517 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, |
1645 | struct GNUNET_MessageHeader, | 1518 | struct StrataEstimatorMessage, |
1646 | op), | ||
1647 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1648 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, | ||
1649 | struct GNUNET_MessageHeader, | ||
1650 | op), | 1519 | op), |
1651 | GNUNET_MQ_hd_var_size (p2p_message, | 1520 | GNUNET_MQ_hd_var_size (union_p2p_full_element, |
1652 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, | ||
1653 | struct GNUNET_MessageHeader, | ||
1654 | op), | ||
1655 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1656 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, | 1521 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, |
1657 | struct GNUNET_MessageHeader, | 1522 | struct GNUNET_SET_ElementMessage, |
1658 | op), | 1523 | op), |
1659 | GNUNET_MQ_hd_var_size (p2p_message, | 1524 | GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, |
1660 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, | 1525 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, |
1661 | struct GNUNET_MessageHeader, | 1526 | struct IntersectionElementInfoMessage, |
1662 | op), | 1527 | op), |
1663 | GNUNET_MQ_hd_var_size (p2p_message, | 1528 | GNUNET_MQ_hd_var_size (intersection_p2p_bf, |
1664 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, | 1529 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, |
1665 | struct GNUNET_MessageHeader, | 1530 | struct BFMessage, |
1666 | op), | ||
1667 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1668 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, | ||
1669 | struct GNUNET_MessageHeader, | ||
1670 | op), | 1531 | op), |
1532 | GNUNET_MQ_hd_fixed_size (intersection_p2p_done, | ||
1533 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, | ||
1534 | struct IntersectionDoneMessage, | ||
1535 | op), | ||
1671 | GNUNET_MQ_handler_end () | 1536 | GNUNET_MQ_handler_end () |
1672 | }; | 1537 | }; |
1673 | struct Set *set; | 1538 | struct Set *set; |
1674 | struct OperationSpecification *spec; | ||
1675 | const struct GNUNET_MessageHeader *context; | 1539 | const struct GNUNET_MessageHeader *context; |
1676 | 1540 | ||
1677 | set = set_get (client); | 1541 | if (NULL == (set = cs->set)) |
1678 | if (NULL == set) | ||
1679 | { | 1542 | { |
1680 | GNUNET_break (0); | 1543 | GNUNET_break (0); |
1681 | GNUNET_free (op); | 1544 | GNUNET_free (op); |
1682 | GNUNET_SERVICE_client_drop (client); | 1545 | GNUNET_SERVICE_client_drop (cs->client); |
1683 | return; | 1546 | return; |
1684 | } | 1547 | } |
1685 | spec = GNUNET_new (struct OperationSpecification); | 1548 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, |
1686 | spec->operation = set->operation; | 1549 | UINT32_MAX); |
1687 | spec->app_id = msg->app_id; | 1550 | op->peer = msg->target_peer; |
1688 | spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, | 1551 | op->result_mode = ntohl (msg->result_mode); |
1689 | UINT32_MAX); | 1552 | op->client_request_id = ntohl (msg->request_id); |
1690 | spec->peer = msg->target_peer; | 1553 | op->byzantine = msg->byzantine; |
1691 | spec->set = set; | 1554 | op->byzantine_lower_bound = msg->byzantine_lower_bound; |
1692 | spec->result_mode = ntohl (msg->result_mode); | 1555 | op->force_full = msg->force_full; |
1693 | spec->client_request_id = ntohl (msg->request_id); | 1556 | op->force_delta = msg->force_delta; |
1694 | spec->byzantine = msg->byzantine; | ||
1695 | spec->byzantine_lower_bound = msg->byzantine_lower_bound; | ||
1696 | spec->force_full = msg->force_full; | ||
1697 | spec->force_delta = msg->force_delta; | ||
1698 | context = GNUNET_MQ_extract_nested_mh (msg); | 1557 | context = GNUNET_MQ_extract_nested_mh (msg); |
1699 | op->spec = spec; | ||
1700 | 1558 | ||
1701 | // Advance generation values, so that | 1559 | /* Advance generation values, so that |
1702 | // mutations won't interfer with the running operation. | 1560 | mutations won't interfer with the running operation. */ |
1561 | op->set = set; | ||
1703 | op->generation_created = set->current_generation; | 1562 | op->generation_created = set->current_generation; |
1704 | advance_generation (set); | 1563 | advance_generation (set); |
1705 | |||
1706 | op->vt = set->vt; | ||
1707 | GNUNET_CONTAINER_DLL_insert (set->ops_head, | 1564 | GNUNET_CONTAINER_DLL_insert (set->ops_head, |
1708 | set->ops_tail, | 1565 | set->ops_tail, |
1709 | op); | 1566 | op); |
1710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1567 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1711 | "Creating new CADET channel to port %s\n", | 1568 | "Creating new CADET channel to port %s for set operation type %u\n", |
1712 | GNUNET_h2s (&msg->app_id)); | 1569 | GNUNET_h2s (&msg->app_id), |
1713 | op->channel = GNUNET_CADET_channel_creatE (cadet, | 1570 | set->operation); |
1571 | op->channel = GNUNET_CADET_channel_create (cadet, | ||
1714 | op, | 1572 | op, |
1715 | &msg->target_peer, | 1573 | &msg->target_peer, |
1716 | &msg->app_id, | 1574 | &msg->app_id, |
@@ -1719,9 +1577,15 @@ handle_client_evaluate (void *cls, | |||
1719 | &channel_end_cb, | 1577 | &channel_end_cb, |
1720 | cadet_handlers); | 1578 | cadet_handlers); |
1721 | op->mq = GNUNET_CADET_get_mq (op->channel); | 1579 | op->mq = GNUNET_CADET_get_mq (op->channel); |
1722 | set->vt->evaluate (op, | 1580 | op->state = set->vt->evaluate (op, |
1723 | context); | 1581 | context); |
1724 | GNUNET_SERVICE_client_continue (client); | 1582 | if (NULL == op->state) |
1583 | { | ||
1584 | GNUNET_break (0); | ||
1585 | GNUNET_SERVICE_client_drop (cs->client); | ||
1586 | return; | ||
1587 | } | ||
1588 | GNUNET_SERVICE_client_continue (cs->client); | ||
1725 | } | 1589 | } |
1726 | 1590 | ||
1727 | 1591 | ||
@@ -1737,15 +1601,14 @@ static void | |||
1737 | handle_client_iter_ack (void *cls, | 1601 | handle_client_iter_ack (void *cls, |
1738 | const struct GNUNET_SET_IterAckMessage *ack) | 1602 | const struct GNUNET_SET_IterAckMessage *ack) |
1739 | { | 1603 | { |
1740 | struct GNUNET_SERVICE_Client *client = cls; | 1604 | struct ClientState *cs = cls; |
1741 | struct Set *set; | 1605 | struct Set *set; |
1742 | 1606 | ||
1743 | set = set_get (client); | 1607 | if (NULL == (set = cs->set)) |
1744 | if (NULL == set) | ||
1745 | { | 1608 | { |
1746 | /* client without a set acknowledged receiving a value */ | 1609 | /* client without a set acknowledged receiving a value */ |
1747 | GNUNET_break (0); | 1610 | GNUNET_break (0); |
1748 | GNUNET_SERVICE_client_drop (client); | 1611 | GNUNET_SERVICE_client_drop (cs->client); |
1749 | return; | 1612 | return; |
1750 | } | 1613 | } |
1751 | if (NULL == set->iter) | 1614 | if (NULL == set->iter) |
@@ -1753,10 +1616,10 @@ handle_client_iter_ack (void *cls, | |||
1753 | /* client sent an ack, but we were not expecting one (as | 1616 | /* client sent an ack, but we were not expecting one (as |
1754 | set iteration has finished) */ | 1617 | set iteration has finished) */ |
1755 | GNUNET_break (0); | 1618 | GNUNET_break (0); |
1756 | GNUNET_SERVICE_client_drop (client); | 1619 | GNUNET_SERVICE_client_drop (cs->client); |
1757 | return; | 1620 | return; |
1758 | } | 1621 | } |
1759 | GNUNET_SERVICE_client_continue (client); | 1622 | GNUNET_SERVICE_client_continue (cs->client); |
1760 | if (ntohl (ack->send_more)) | 1623 | if (ntohl (ack->send_more)) |
1761 | { | 1624 | { |
1762 | send_client_element (set); | 1625 | send_client_element (set); |
@@ -1780,42 +1643,33 @@ static void | |||
1780 | handle_client_copy_lazy_prepare (void *cls, | 1643 | handle_client_copy_lazy_prepare (void *cls, |
1781 | const struct GNUNET_MessageHeader *mh) | 1644 | const struct GNUNET_MessageHeader *mh) |
1782 | { | 1645 | { |
1783 | struct GNUNET_SERVICE_Client *client = cls; | 1646 | struct ClientState *cs = cls; |
1784 | struct Set *set; | 1647 | struct Set *set; |
1785 | struct LazyCopyRequest *cr; | 1648 | struct LazyCopyRequest *cr; |
1786 | struct GNUNET_MQ_Envelope *ev; | 1649 | struct GNUNET_MQ_Envelope *ev; |
1787 | struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; | 1650 | struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; |
1788 | 1651 | ||
1789 | set = set_get (client); | 1652 | if (NULL == (set = cs->set)) |
1790 | if (NULL == set) | ||
1791 | { | 1653 | { |
1792 | /* client without a set requested an operation */ | 1654 | /* client without a set requested an operation */ |
1793 | GNUNET_break (0); | 1655 | GNUNET_break (0); |
1794 | GNUNET_SERVICE_client_drop (client); | 1656 | GNUNET_SERVICE_client_drop (cs->client); |
1795 | return; | 1657 | return; |
1796 | } | 1658 | } |
1797 | 1659 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |
1660 | "Client requested creation of lazy copy\n"); | ||
1798 | cr = GNUNET_new (struct LazyCopyRequest); | 1661 | cr = GNUNET_new (struct LazyCopyRequest); |
1799 | 1662 | cr->cookie = ++lazy_copy_cookie; | |
1800 | cr->cookie = lazy_copy_cookie; | ||
1801 | lazy_copy_cookie += 1; | ||
1802 | cr->source_set = set; | 1663 | cr->source_set = set; |
1803 | |||
1804 | GNUNET_CONTAINER_DLL_insert (lazy_copy_head, | 1664 | GNUNET_CONTAINER_DLL_insert (lazy_copy_head, |
1805 | lazy_copy_tail, | 1665 | lazy_copy_tail, |
1806 | cr); | 1666 | cr); |
1807 | |||
1808 | |||
1809 | ev = GNUNET_MQ_msg (resp_msg, | 1667 | ev = GNUNET_MQ_msg (resp_msg, |
1810 | GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); | 1668 | GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); |
1811 | resp_msg->cookie = cr->cookie; | 1669 | resp_msg->cookie = cr->cookie; |
1812 | GNUNET_MQ_send (set->client_mq, ev); | 1670 | GNUNET_MQ_send (set->cs->mq, |
1813 | 1671 | ev); | |
1814 | 1672 | GNUNET_SERVICE_client_continue (cs->client); | |
1815 | GNUNET_SERVICE_client_continue (client); | ||
1816 | |||
1817 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1818 | "Client requested lazy copy\n"); | ||
1819 | } | 1673 | } |
1820 | 1674 | ||
1821 | 1675 | ||
@@ -1829,21 +1683,19 @@ static void | |||
1829 | handle_client_copy_lazy_connect (void *cls, | 1683 | handle_client_copy_lazy_connect (void *cls, |
1830 | const struct GNUNET_SET_CopyLazyConnectMessage *msg) | 1684 | const struct GNUNET_SET_CopyLazyConnectMessage *msg) |
1831 | { | 1685 | { |
1832 | struct GNUNET_SERVICE_Client *client = cls; | 1686 | struct ClientState *cs = cls; |
1833 | struct LazyCopyRequest *cr; | 1687 | struct LazyCopyRequest *cr; |
1834 | struct Set *set; | 1688 | struct Set *set; |
1835 | int found; | 1689 | int found; |
1836 | 1690 | ||
1837 | if (NULL != set_get (client)) | 1691 | if (NULL != cs->set) |
1838 | { | 1692 | { |
1839 | /* There can only be one set per client */ | 1693 | /* There can only be one set per client */ |
1840 | GNUNET_break (0); | 1694 | GNUNET_break (0); |
1841 | GNUNET_SERVICE_client_drop (client); | 1695 | GNUNET_SERVICE_client_drop (cs->client); |
1842 | return; | 1696 | return; |
1843 | } | 1697 | } |
1844 | |||
1845 | found = GNUNET_NO; | 1698 | found = GNUNET_NO; |
1846 | |||
1847 | for (cr = lazy_copy_head; NULL != cr; cr = cr->next) | 1699 | for (cr = lazy_copy_head; NULL != cr; cr = cr->next) |
1848 | { | 1700 | { |
1849 | if (cr->cookie == msg->cookie) | 1701 | if (cr->cookie == msg->cookie) |
@@ -1852,21 +1704,20 @@ handle_client_copy_lazy_connect (void *cls, | |||
1852 | break; | 1704 | break; |
1853 | } | 1705 | } |
1854 | } | 1706 | } |
1855 | |||
1856 | if (GNUNET_NO == found) | 1707 | if (GNUNET_NO == found) |
1857 | { | 1708 | { |
1858 | /* client asked for copy with cookie we don't know */ | 1709 | /* client asked for copy with cookie we don't know */ |
1859 | GNUNET_break (0); | 1710 | GNUNET_break (0); |
1860 | GNUNET_SERVICE_client_drop (client); | 1711 | GNUNET_SERVICE_client_drop (cs->client); |
1861 | return; | 1712 | return; |
1862 | } | 1713 | } |
1863 | |||
1864 | GNUNET_CONTAINER_DLL_remove (lazy_copy_head, | 1714 | GNUNET_CONTAINER_DLL_remove (lazy_copy_head, |
1865 | lazy_copy_tail, | 1715 | lazy_copy_tail, |
1866 | cr); | 1716 | cr); |
1867 | 1717 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |
1718 | "Client %p requested use of lazy copy\n", | ||
1719 | cs); | ||
1868 | set = GNUNET_new (struct Set); | 1720 | set = GNUNET_new (struct Set); |
1869 | |||
1870 | switch (cr->source_set->operation) | 1721 | switch (cr->source_set->operation) |
1871 | { | 1722 | { |
1872 | case GNUNET_SET_OPERATION_INTERSECTION: | 1723 | case GNUNET_SET_OPERATION_INTERSECTION: |
@@ -1886,37 +1737,28 @@ handle_client_copy_lazy_connect (void *cls, | |||
1886 | GNUNET_break (0); | 1737 | GNUNET_break (0); |
1887 | GNUNET_free (set); | 1738 | GNUNET_free (set); |
1888 | GNUNET_free (cr); | 1739 | GNUNET_free (cr); |
1889 | GNUNET_SERVICE_client_drop (client); | 1740 | GNUNET_SERVICE_client_drop (cs->client); |
1890 | return; | 1741 | return; |
1891 | } | 1742 | } |
1892 | 1743 | ||
1893 | set->operation = cr->source_set->operation; | 1744 | set->operation = cr->source_set->operation; |
1894 | set->state = set->vt->copy_state (cr->source_set); | 1745 | set->state = set->vt->copy_state (cr->source_set->state); |
1895 | set->content = cr->source_set->content; | 1746 | set->content = cr->source_set->content; |
1896 | set->content->refcount += 1; | 1747 | set->content->refcount++; |
1897 | 1748 | ||
1898 | set->current_generation = cr->source_set->current_generation; | 1749 | set->current_generation = cr->source_set->current_generation; |
1899 | set->excluded_generations_size = cr->source_set->excluded_generations_size; | 1750 | set->excluded_generations_size = cr->source_set->excluded_generations_size; |
1900 | set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations, | 1751 | set->excluded_generations |
1901 | set->excluded_generations_size * sizeof (struct GenerationRange)); | 1752 | = GNUNET_memdup (cr->source_set->excluded_generations, |
1753 | set->excluded_generations_size * sizeof (struct GenerationRange)); | ||
1902 | 1754 | ||
1903 | /* Advance the generation of the new set, so that mutations to the | 1755 | /* Advance the generation of the new set, so that mutations to the |
1904 | of the cloned set and the source set are independent. */ | 1756 | of the cloned set and the source set are independent. */ |
1905 | advance_generation (set); | 1757 | advance_generation (set); |
1906 | 1758 | set->cs = cs; | |
1907 | 1759 | cs->set = set; | |
1908 | set->client = client; | ||
1909 | set->client_mq = GNUNET_SERVICE_client_get_mq (client); | ||
1910 | GNUNET_CONTAINER_DLL_insert (sets_head, | ||
1911 | sets_tail, | ||
1912 | set); | ||
1913 | |||
1914 | GNUNET_free (cr); | 1760 | GNUNET_free (cr); |
1915 | 1761 | GNUNET_SERVICE_client_continue (cs->client); | |
1916 | GNUNET_SERVICE_client_continue (client); | ||
1917 | |||
1918 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1919 | "Client connected to lazy set\n"); | ||
1920 | } | 1762 | } |
1921 | 1763 | ||
1922 | 1764 | ||
@@ -1930,26 +1772,22 @@ static void | |||
1930 | handle_client_cancel (void *cls, | 1772 | handle_client_cancel (void *cls, |
1931 | const struct GNUNET_SET_CancelMessage *msg) | 1773 | const struct GNUNET_SET_CancelMessage *msg) |
1932 | { | 1774 | { |
1933 | struct GNUNET_SERVICE_Client *client = cls; | 1775 | struct ClientState *cs = cls; |
1934 | struct Set *set; | 1776 | struct Set *set; |
1935 | struct Operation *op; | 1777 | struct Operation *op; |
1936 | int found; | 1778 | int found; |
1937 | 1779 | ||
1938 | set = set_get (client); | 1780 | if (NULL == (set = cs->set)) |
1939 | if (NULL == set) | ||
1940 | { | 1781 | { |
1941 | /* client without a set requested an operation */ | 1782 | /* client without a set requested an operation */ |
1942 | GNUNET_break (0); | 1783 | GNUNET_break (0); |
1943 | GNUNET_SERVICE_client_drop (client); | 1784 | GNUNET_SERVICE_client_drop (cs->client); |
1944 | return; | 1785 | return; |
1945 | } | 1786 | } |
1946 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1947 | "Client requested cancel for op %u\n", | ||
1948 | (uint32_t) ntohl (msg->request_id)); | ||
1949 | found = GNUNET_NO; | 1787 | found = GNUNET_NO; |
1950 | for (op = set->ops_head; NULL != op; op = op->next) | 1788 | for (op = set->ops_head; NULL != op; op = op->next) |
1951 | { | 1789 | { |
1952 | if (op->spec->client_request_id == ntohl (msg->request_id)) | 1790 | if (op->client_request_id == ntohl (msg->request_id)) |
1953 | { | 1791 | { |
1954 | found = GNUNET_YES; | 1792 | found = GNUNET_YES; |
1955 | break; | 1793 | break; |
@@ -1962,15 +1800,19 @@ handle_client_cancel (void *cls, | |||
1962 | * yet and try to cancel the (just barely non-existent) operation. | 1800 | * yet and try to cancel the (just barely non-existent) operation. |
1963 | * So this is not a hard error. | 1801 | * So this is not a hard error. |
1964 | */ | 1802 | */ |
1965 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1803 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
1966 | "Client canceled non-existent op\n"); | 1804 | "Client canceled non-existent op %u\n", |
1805 | (uint32_t) ntohl (msg->request_id)); | ||
1967 | } | 1806 | } |
1968 | else | 1807 | else |
1969 | { | 1808 | { |
1809 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1810 | "Client requested cancel for op %u\n", | ||
1811 | (uint32_t) ntohl (msg->request_id)); | ||
1970 | _GSS_operation_destroy (op, | 1812 | _GSS_operation_destroy (op, |
1971 | GNUNET_YES); | 1813 | GNUNET_YES); |
1972 | } | 1814 | } |
1973 | GNUNET_SERVICE_client_continue (client); | 1815 | GNUNET_SERVICE_client_continue (cs->client); |
1974 | } | 1816 | } |
1975 | 1817 | ||
1976 | 1818 | ||
@@ -1986,18 +1828,18 @@ static void | |||
1986 | handle_client_accept (void *cls, | 1828 | handle_client_accept (void *cls, |
1987 | const struct GNUNET_SET_AcceptMessage *msg) | 1829 | const struct GNUNET_SET_AcceptMessage *msg) |
1988 | { | 1830 | { |
1989 | struct GNUNET_SERVICE_Client *client = cls; | 1831 | struct ClientState *cs = cls; |
1990 | struct Set *set; | 1832 | struct Set *set; |
1991 | struct Operation *op; | 1833 | struct Operation *op; |
1992 | struct GNUNET_SET_ResultMessage *result_message; | 1834 | struct GNUNET_SET_ResultMessage *result_message; |
1993 | struct GNUNET_MQ_Envelope *ev; | 1835 | struct GNUNET_MQ_Envelope *ev; |
1836 | struct Listener *listener; | ||
1994 | 1837 | ||
1995 | set = set_get (client); | 1838 | if (NULL == (set = cs->set)) |
1996 | if (NULL == set) | ||
1997 | { | 1839 | { |
1998 | /* client without a set requested to accept */ | 1840 | /* client without a set requested to accept */ |
1999 | GNUNET_break (0); | 1841 | GNUNET_break (0); |
2000 | GNUNET_SERVICE_client_drop (client); | 1842 | GNUNET_SERVICE_client_drop (cs->client); |
2001 | return; | 1843 | return; |
2002 | } | 1844 | } |
2003 | op = get_incoming (ntohl (msg->accept_reject_id)); | 1845 | op = get_incoming (ntohl (msg->accept_reject_id)); |
@@ -2005,71 +1847,75 @@ handle_client_accept (void *cls, | |||
2005 | { | 1847 | { |
2006 | /* It is not an error if the set op does not exist -- it may | 1848 | /* It is not an error if the set op does not exist -- it may |
2007 | * have been destroyed when the partner peer disconnected. */ | 1849 | * have been destroyed when the partner peer disconnected. */ |
2008 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1850 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
2009 | "Client accepted request that is no longer active\n"); | 1851 | "Client %p accepted request %u of listener %p that is no longer active\n", |
1852 | cs, | ||
1853 | ntohl (msg->accept_reject_id), | ||
1854 | cs->listener); | ||
2010 | ev = GNUNET_MQ_msg (result_message, | 1855 | ev = GNUNET_MQ_msg (result_message, |
2011 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 1856 | GNUNET_MESSAGE_TYPE_SET_RESULT); |
2012 | result_message->request_id = msg->request_id; | 1857 | result_message->request_id = msg->request_id; |
2013 | result_message->element_type = 0; | ||
2014 | result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 1858 | result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
2015 | GNUNET_MQ_send (set->client_mq, ev); | 1859 | GNUNET_MQ_send (set->cs->mq, |
2016 | GNUNET_SERVICE_client_continue (client); | 1860 | ev); |
1861 | GNUNET_SERVICE_client_continue (cs->client); | ||
2017 | return; | 1862 | return; |
2018 | } | 1863 | } |
2019 | |||
2020 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1864 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2021 | "Client accepting request %u\n", | 1865 | "Client accepting request %u\n", |
2022 | (uint32_t) ntohl (msg->accept_reject_id)); | 1866 | (uint32_t) ntohl (msg->accept_reject_id)); |
2023 | GNUNET_assert (GNUNET_YES == op->is_incoming); | 1867 | listener = op->listener; |
2024 | op->is_incoming = GNUNET_NO; | 1868 | op->listener = NULL; |
2025 | GNUNET_CONTAINER_DLL_remove (incoming_head, | 1869 | GNUNET_CONTAINER_DLL_remove (listener->op_head, |
2026 | incoming_tail, | 1870 | listener->op_tail, |
2027 | op); | 1871 | op); |
2028 | op->spec->set = set; | 1872 | op->set = set; |
2029 | GNUNET_CONTAINER_DLL_insert (set->ops_head, | 1873 | GNUNET_CONTAINER_DLL_insert (set->ops_head, |
2030 | set->ops_tail, | 1874 | set->ops_tail, |
2031 | op); | 1875 | op); |
2032 | op->spec->client_request_id = ntohl (msg->request_id); | 1876 | op->client_request_id = ntohl (msg->request_id); |
2033 | op->spec->result_mode = ntohl (msg->result_mode); | 1877 | op->result_mode = ntohl (msg->result_mode); |
2034 | op->spec->byzantine = msg->byzantine; | 1878 | op->byzantine = msg->byzantine; |
2035 | op->spec->byzantine_lower_bound = msg->byzantine_lower_bound; | 1879 | op->byzantine_lower_bound = msg->byzantine_lower_bound; |
2036 | op->spec->force_full = msg->force_full; | 1880 | op->force_full = msg->force_full; |
2037 | op->spec->force_delta = msg->force_delta; | 1881 | op->force_delta = msg->force_delta; |
2038 | 1882 | ||
2039 | // Advance generation values, so that | 1883 | /* Advance generation values, so that future mutations do not |
2040 | // mutations won't interfer with the running operation. | 1884 | interfer with the running operation. */ |
2041 | op->generation_created = set->current_generation; | 1885 | op->generation_created = set->current_generation; |
2042 | advance_generation (set); | 1886 | advance_generation (set); |
2043 | 1887 | GNUNET_assert (NULL == op->state); | |
2044 | op->vt = set->vt; | 1888 | op->state = set->vt->accept (op); |
2045 | op->vt->accept (op); | 1889 | if (NULL == op->state) |
2046 | GNUNET_SERVICE_client_continue (client); | 1890 | { |
1891 | GNUNET_break (0); | ||
1892 | GNUNET_SERVICE_client_drop (cs->client); | ||
1893 | return; | ||
1894 | } | ||
1895 | /* Now allow CADET to continue, as we did not do this in | ||
1896 | #handle_incoming_msg (as we wanted to first see if the | ||
1897 | local client would accept the request). */ | ||
1898 | GNUNET_CADET_receive_done (op->channel); | ||
1899 | GNUNET_SERVICE_client_continue (cs->client); | ||
2047 | } | 1900 | } |
2048 | 1901 | ||
2049 | 1902 | ||
2050 | /** | 1903 | /** |
2051 | * Called to clean up, after a shutdown has been requested. | 1904 | * Called to clean up, after a shutdown has been requested. |
2052 | * | 1905 | * |
2053 | * @param cls closure | 1906 | * @param cls closure, NULL |
2054 | */ | 1907 | */ |
2055 | static void | 1908 | static void |
2056 | shutdown_task (void *cls) | 1909 | shutdown_task (void *cls) |
2057 | { | 1910 | { |
2058 | while (NULL != incoming_head) | 1911 | /* Delay actual shutdown to allow service to disconnect clients */ |
2059 | incoming_destroy (incoming_head); | ||
2060 | while (NULL != listeners_head) | ||
2061 | listener_destroy (listeners_head); | ||
2062 | while (NULL != sets_head) | ||
2063 | set_destroy (sets_head); | ||
2064 | |||
2065 | /* it's important to destroy cadet at the end, as all channels | ||
2066 | * must be destroyed before the cadet handle! */ | ||
2067 | if (NULL != cadet) | 1912 | if (NULL != cadet) |
2068 | { | 1913 | { |
2069 | GNUNET_CADET_disconnect (cadet); | 1914 | GNUNET_CADET_disconnect (cadet); |
2070 | cadet = NULL; | 1915 | cadet = NULL; |
2071 | } | 1916 | } |
2072 | GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); | 1917 | GNUNET_STATISTICS_destroy (_GSS_statistics, |
1918 | GNUNET_YES); | ||
2073 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1919 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2074 | "handled shutdown request\n"); | 1920 | "handled shutdown request\n"); |
2075 | } | 1921 | } |
@@ -2088,15 +1934,19 @@ run (void *cls, | |||
2088 | const struct GNUNET_CONFIGURATION_Handle *cfg, | 1934 | const struct GNUNET_CONFIGURATION_Handle *cfg, |
2089 | struct GNUNET_SERVICE_Handle *service) | 1935 | struct GNUNET_SERVICE_Handle *service) |
2090 | { | 1936 | { |
2091 | configuration = cfg; | 1937 | /* FIXME: need to modify SERVICE (!) API to allow |
1938 | us to run a shutdown task *after* clients were | ||
1939 | forcefully disconnected! */ | ||
2092 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, | 1940 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, |
2093 | NULL); | 1941 | NULL); |
2094 | _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); | 1942 | _GSS_statistics = GNUNET_STATISTICS_create ("set", |
2095 | cadet = GNUNET_CADET_connecT (cfg); | 1943 | cfg); |
1944 | cadet = GNUNET_CADET_connect (cfg); | ||
2096 | if (NULL == cadet) | 1945 | if (NULL == cadet) |
2097 | { | 1946 | { |
2098 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1947 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
2099 | _("Could not connect to CADET service\n")); | 1948 | _("Could not connect to CADET service\n")); |
1949 | GNUNET_SCHEDULER_shutdown (); | ||
2100 | return; | 1950 | return; |
2101 | } | 1951 | } |
2102 | } | 1952 | } |
@@ -2122,7 +1972,7 @@ GNUNET_SERVICE_MAIN | |||
2122 | NULL), | 1972 | NULL), |
2123 | GNUNET_MQ_hd_var_size (client_mutation, | 1973 | GNUNET_MQ_hd_var_size (client_mutation, |
2124 | GNUNET_MESSAGE_TYPE_SET_ADD, | 1974 | GNUNET_MESSAGE_TYPE_SET_ADD, |
2125 | struct GNUNET_MessageHeader, | 1975 | struct GNUNET_SET_ElementMessage, |
2126 | NULL), | 1976 | NULL), |
2127 | GNUNET_MQ_hd_fixed_size (client_create_set, | 1977 | GNUNET_MQ_hd_fixed_size (client_create_set, |
2128 | GNUNET_MESSAGE_TYPE_SET_CREATE, | 1978 | GNUNET_MESSAGE_TYPE_SET_CREATE, |
@@ -2146,7 +1996,7 @@ GNUNET_SERVICE_MAIN | |||
2146 | NULL), | 1996 | NULL), |
2147 | GNUNET_MQ_hd_var_size (client_mutation, | 1997 | GNUNET_MQ_hd_var_size (client_mutation, |
2148 | GNUNET_MESSAGE_TYPE_SET_REMOVE, | 1998 | GNUNET_MESSAGE_TYPE_SET_REMOVE, |
2149 | struct GNUNET_MessageHeader, | 1999 | struct GNUNET_SET_ElementMessage, |
2150 | NULL), | 2000 | NULL), |
2151 | GNUNET_MQ_hd_fixed_size (client_cancel, | 2001 | GNUNET_MQ_hd_fixed_size (client_cancel, |
2152 | GNUNET_MESSAGE_TYPE_SET_CANCEL, | 2002 | GNUNET_MESSAGE_TYPE_SET_CANCEL, |