diff options
author | Christian Grothoff <christian@grothoff.org> | 2014-11-27 13:31:52 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2014-11-27 13:31:52 +0000 |
commit | 1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2 (patch) | |
tree | 309d6241bf67a6a549071aa0ec3d9af82d030d7b /src/set/set_api.c | |
parent | ce9b32618b6ee488352ef0eb506c744868145f82 (diff) | |
download | gnunet-1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2.tar.gz gnunet-1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2.zip |
use and respect send_more field in IterAckMessage
Diffstat (limited to 'src/set/set_api.c')
-rw-r--r-- | src/set/set_api.c | 463 |
1 files changed, 256 insertions, 207 deletions
diff --git a/src/set/set_api.c b/src/set/set_api.c index 90cba446c..d62475013 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | (C) 2012, 2013 Christian Grothoff (and other contributing authors) | 3 | (C) 2012-2014 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -17,7 +17,6 @@ | |||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | |||
21 | /** | 20 | /** |
22 | * @file set/set_api.c | 21 | * @file set/set_api.c |
23 | * @brief api for the set service | 22 | * @brief api for the set service |
@@ -44,7 +43,7 @@ struct GNUNET_SET_Handle | |||
44 | struct GNUNET_CLIENT_Connection *client; | 43 | struct GNUNET_CLIENT_Connection *client; |
45 | 44 | ||
46 | /** | 45 | /** |
47 | * Message queue for 'client'. | 46 | * Message queue for @e client. |
48 | */ | 47 | */ |
49 | struct GNUNET_MQ_Handle *mq; | 48 | struct GNUNET_MQ_Handle *mq; |
50 | 49 | ||
@@ -59,30 +58,30 @@ struct GNUNET_SET_Handle | |||
59 | struct GNUNET_SET_OperationHandle *ops_tail; | 58 | struct GNUNET_SET_OperationHandle *ops_tail; |
60 | 59 | ||
61 | /** | 60 | /** |
62 | * Should the set be destroyed once all operations are gone? | 61 | * Callback for the current iteration over the set, |
62 | * NULL if no iterator is active. | ||
63 | */ | 63 | */ |
64 | int destroy_requested; | 64 | GNUNET_SET_ElementIterator iterator; |
65 | 65 | ||
66 | /** | 66 | /** |
67 | * Has the set become invalid (e.g. service died)? | 67 | * Closure for @e iterator |
68 | */ | 68 | */ |
69 | int invalid; | 69 | void *iterator_cls; |
70 | 70 | ||
71 | /** | 71 | /** |
72 | * Callback for the current iteration over the set, | 72 | * Should the set be destroyed once all operations are gone? |
73 | * NULL if no iterator is active. | ||
74 | */ | 73 | */ |
75 | GNUNET_SET_ElementIterator iterator; | 74 | int destroy_requested; |
76 | 75 | ||
77 | /** | 76 | /** |
78 | * Closure for 'iterator' | 77 | * Has the set become invalid (e.g. service died)? |
79 | */ | 78 | */ |
80 | void *iterator_cls; | 79 | int invalid; |
81 | }; | 80 | }; |
82 | 81 | ||
83 | 82 | ||
84 | /** | 83 | /** |
85 | * Opaque handle to a set operation request from another peer. | 84 | * Handle for a set operation request from another peer. |
86 | */ | 85 | */ |
87 | struct GNUNET_SET_Request | 86 | struct GNUNET_SET_Request |
88 | { | 87 | { |
@@ -94,15 +93,14 @@ struct GNUNET_SET_Request | |||
94 | 93 | ||
95 | /** | 94 | /** |
96 | * Has the request been accepted already? | 95 | * Has the request been accepted already? |
97 | * GNUNET_YES/GNUNET_NO | 96 | * #GNUNET_YES/#GNUNET_NO |
98 | */ | 97 | */ |
99 | int accepted; | 98 | int accepted; |
100 | }; | 99 | }; |
101 | 100 | ||
102 | 101 | ||
103 | /** | 102 | /** |
104 | * Handle to an operation. | 103 | * Handle to an operation. Only known to the service after committing |
105 | * Only known to the service after commiting | ||
106 | * the handle with a set. | 104 | * the handle with a set. |
107 | */ | 105 | */ |
108 | struct GNUNET_SET_OperationHandle | 106 | struct GNUNET_SET_OperationHandle |
@@ -114,7 +112,7 @@ struct GNUNET_SET_OperationHandle | |||
114 | GNUNET_SET_ResultIterator result_cb; | 112 | GNUNET_SET_ResultIterator result_cb; |
115 | 113 | ||
116 | /** | 114 | /** |
117 | * Closure for result_cb. | 115 | * Closure for @e result_cb. |
118 | */ | 116 | */ |
119 | void *result_cls; | 117 | void *result_cls; |
120 | 118 | ||
@@ -125,11 +123,6 @@ struct GNUNET_SET_OperationHandle | |||
125 | struct GNUNET_SET_Handle *set; | 123 | struct GNUNET_SET_Handle *set; |
126 | 124 | ||
127 | /** | 125 | /** |
128 | * Request ID to identify the operation within the set. | ||
129 | */ | ||
130 | uint32_t request_id; | ||
131 | |||
132 | /** | ||
133 | * Message sent to the server on calling conclude, | 126 | * Message sent to the server on calling conclude, |
134 | * NULL if conclude has been called. | 127 | * NULL if conclude has been called. |
135 | */ | 128 | */ |
@@ -150,6 +143,11 @@ struct GNUNET_SET_OperationHandle | |||
150 | * Handles are kept in a linked list. | 143 | * Handles are kept in a linked list. |
151 | */ | 144 | */ |
152 | struct GNUNET_SET_OperationHandle *next; | 145 | struct GNUNET_SET_OperationHandle *next; |
146 | |||
147 | /** | ||
148 | * Request ID to identify the operation within the set. | ||
149 | */ | ||
150 | uint32_t request_id; | ||
153 | }; | 151 | }; |
154 | 152 | ||
155 | 153 | ||
@@ -182,16 +180,11 @@ struct GNUNET_SET_ListenHandle | |||
182 | GNUNET_SET_ListenCallback listen_cb; | 180 | GNUNET_SET_ListenCallback listen_cb; |
183 | 181 | ||
184 | /** | 182 | /** |
185 | * Closure for listen_cb. | 183 | * Closure for @e listen_cb. |
186 | */ | 184 | */ |
187 | void *listen_cls; | 185 | void *listen_cls; |
188 | 186 | ||
189 | /** | 187 | /** |
190 | * Operation we listen for. | ||
191 | */ | ||
192 | enum GNUNET_SET_OperationType operation; | ||
193 | |||
194 | /** | ||
195 | * Application ID we listen for. | 188 | * Application ID we listen for. |
196 | */ | 189 | */ |
197 | struct GNUNET_HashCode app_id; | 190 | struct GNUNET_HashCode app_id; |
@@ -205,59 +198,78 @@ struct GNUNET_SET_ListenHandle | |||
205 | * Task for reconnecting when the listener fails. | 198 | * Task for reconnecting when the listener fails. |
206 | */ | 199 | */ |
207 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; | 200 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; |
208 | }; | ||
209 | 201 | ||
210 | 202 | /** | |
211 | /* forward declaration */ | 203 | * Operation we listen for. |
212 | static void | 204 | */ |
213 | listen_connect (void *cls, | 205 | enum GNUNET_SET_OperationType operation; |
214 | const struct GNUNET_SCHEDULER_TaskContext *tc); | 206 | }; |
215 | 207 | ||
216 | 208 | ||
217 | /** | 209 | /** |
218 | * Handle element for iteration over the set. | 210 | * Handle element for iteration over the set. Notifies the |
211 | * iterator and sends an acknowledgement to the service. | ||
219 | * | 212 | * |
220 | * @param cls the set | 213 | * @param cls the set |
221 | * @param mh the message | 214 | * @param mh the message |
222 | */ | 215 | */ |
223 | static void | 216 | static void |
224 | handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh) | 217 | handle_iter_element (void *cls, |
218 | const struct GNUNET_MessageHeader *mh) | ||
225 | { | 219 | { |
226 | struct GNUNET_SET_Handle *set = cls; | 220 | struct GNUNET_SET_Handle *set = cls; |
221 | GNUNET_SET_ElementIterator iter = set->iterator; | ||
227 | struct GNUNET_SET_Element element; | 222 | struct GNUNET_SET_Element element; |
228 | const struct GNUNET_SET_IterResponseMessage *msg = | 223 | const struct GNUNET_SET_IterResponseMessage *msg; |
229 | (const struct GNUNET_SET_IterResponseMessage *) mh; | ||
230 | struct GNUNET_SET_IterAckMessage *ack_msg; | 224 | struct GNUNET_SET_IterAckMessage *ack_msg; |
231 | struct GNUNET_MQ_Envelope *ev; | 225 | struct GNUNET_MQ_Envelope *ev; |
226 | uint16_t msize; | ||
232 | 227 | ||
233 | if (NULL == set->iterator) | 228 | msize = ntohs (mh->size); |
234 | return; | 229 | if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage))) |
235 | 230 | { | |
236 | element.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_IterResponseMessage); | 231 | /* message malformed */ |
237 | element.element_type = htons (msg->element_type); | 232 | GNUNET_break (0); |
238 | element.data = &msg[1]; | 233 | set->iterator = NULL; |
239 | set->iterator (set->iterator_cls, &element); | 234 | iter (set->iterator_cls, |
240 | ev = GNUNET_MQ_msg (ack_msg, GNUNET_MESSAGE_TYPE_SET_ITER_ACK); | 235 | NULL); |
241 | ack_msg->send_more = htonl (1); | 236 | iter = NULL; |
237 | } | ||
238 | if (NULL != iter) | ||
239 | { | ||
240 | msg = (const struct GNUNET_SET_IterResponseMessage *) mh; | ||
241 | element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage); | ||
242 | element.element_type = htons (msg->element_type); | ||
243 | element.data = &msg[1]; | ||
244 | iter (set->iterator_cls, | ||
245 | &element); | ||
246 | } | ||
247 | ev = GNUNET_MQ_msg (ack_msg, | ||
248 | GNUNET_MESSAGE_TYPE_SET_ITER_ACK); | ||
249 | ack_msg->send_more = htonl ((NULL != iter)); | ||
242 | GNUNET_MQ_send (set->mq, ev); | 250 | GNUNET_MQ_send (set->mq, ev); |
243 | } | 251 | } |
244 | 252 | ||
245 | 253 | ||
246 | /** | 254 | /** |
247 | * Handle element for iteration over the set. | 255 | * Handle message signalling conclusion of iteration over the set. |
256 | * Notifies the iterator that we are done. | ||
248 | * | 257 | * |
249 | * @param cls the set | 258 | * @param cls the set |
250 | * @param mh the message | 259 | * @param mh the message |
251 | */ | 260 | */ |
252 | static void | 261 | static void |
253 | handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh) | 262 | handle_iter_done (void *cls, |
263 | const struct GNUNET_MessageHeader *mh) | ||
254 | { | 264 | { |
255 | struct GNUNET_SET_Handle *set = cls; | 265 | struct GNUNET_SET_Handle *set = cls; |
266 | GNUNET_SET_ElementIterator iter = set->iterator; | ||
256 | 267 | ||
257 | if (NULL == set->iterator) | 268 | if (NULL == iter) |
258 | return; | 269 | return; |
259 | 270 | set->iterator = NULL; | |
260 | set->iterator (set->iterator_cls, NULL); | 271 | iter (set->iterator_cls, |
272 | NULL); | ||
261 | } | 273 | } |
262 | 274 | ||
263 | 275 | ||
@@ -268,47 +280,53 @@ handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
268 | * @param mh the message | 280 | * @param mh the message |
269 | */ | 281 | */ |
270 | static void | 282 | static void |
271 | handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | 283 | handle_result (void *cls, |
284 | const struct GNUNET_MessageHeader *mh) | ||
272 | { | 285 | { |
273 | const struct GNUNET_SET_ResultMessage *msg; | ||
274 | struct GNUNET_SET_Handle *set = cls; | 286 | struct GNUNET_SET_Handle *set = cls; |
287 | const struct GNUNET_SET_ResultMessage *msg; | ||
275 | struct GNUNET_SET_OperationHandle *oh; | 288 | struct GNUNET_SET_OperationHandle *oh; |
276 | struct GNUNET_SET_Element e; | 289 | struct GNUNET_SET_Element e; |
277 | enum GNUNET_SET_Status result_status; | 290 | enum GNUNET_SET_Status result_status; |
278 | 291 | ||
279 | msg = (const struct GNUNET_SET_ResultMessage *) mh; | 292 | msg = (const struct GNUNET_SET_ResultMessage *) mh; |
280 | GNUNET_assert (NULL != set); | ||
281 | GNUNET_assert (NULL != set->mq); | 293 | GNUNET_assert (NULL != set->mq); |
282 | |||
283 | result_status = ntohs (msg->result_status); | 294 | result_status = ntohs (msg->result_status); |
284 | 295 | oh = GNUNET_MQ_assoc_get (set->mq, | |
285 | oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); | 296 | ntohl (msg->request_id)); |
286 | // 'oh' can be NULL if we canceled the operation, but the service | ||
287 | // did not get the cancel message yet. | ||
288 | if (NULL == oh) | 297 | if (NULL == oh) |
289 | { | 298 | { |
290 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ignoring result from canceled operation\n"); | 299 | /* 'oh' can be NULL if we canceled the operation, but the service |
300 | did not get the cancel message yet. */ | ||
301 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
302 | "Ignoring result from canceled operation\n"); | ||
291 | return; | 303 | return; |
292 | } | 304 | } |
293 | /* status is not STATUS_OK => there's no attached element, | ||
294 | * and this is the last result message we get */ | ||
295 | if (GNUNET_SET_STATUS_OK != result_status) | 305 | if (GNUNET_SET_STATUS_OK != result_status) |
296 | { | 306 | { |
307 | /* status is not STATUS_OK => there's no attached element, | ||
308 | * and this is the last result message we get */ | ||
297 | GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); | 309 | GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); |
298 | GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh); | 310 | GNUNET_CONTAINER_DLL_remove (set->ops_head, |
299 | if (GNUNET_YES == oh->set->destroy_requested) | 311 | set->ops_tail, |
300 | GNUNET_SET_destroy (oh->set); | 312 | oh); |
313 | if ( (GNUNET_YES == set->destroy_requested) && | ||
314 | (NULL == set->ops_head) ) | ||
315 | GNUNET_SET_destroy (set); | ||
301 | if (NULL != oh->result_cb) | 316 | if (NULL != oh->result_cb) |
302 | oh->result_cb (oh->result_cls, NULL, result_status); | 317 | oh->result_cb (oh->result_cls, |
318 | NULL, | ||
319 | result_status); | ||
303 | GNUNET_free (oh); | 320 | GNUNET_free (oh); |
304 | return; | 321 | return; |
305 | } | 322 | } |
306 | |||
307 | e.data = &msg[1]; | 323 | e.data = &msg[1]; |
308 | e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); | 324 | e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); |
309 | e.element_type = msg->element_type; | 325 | e.element_type = msg->element_type; |
310 | if (NULL != oh->result_cb) | 326 | if (NULL != oh->result_cb) |
311 | oh->result_cb (oh->result_cls, &e, result_status); | 327 | oh->result_cb (oh->result_cls, |
328 | &e, | ||
329 | result_status); | ||
312 | } | 330 | } |
313 | 331 | ||
314 | 332 | ||
@@ -360,93 +378,30 @@ handle_request (void *cls, | |||
360 | } | 378 | } |
361 | 379 | ||
362 | 380 | ||
363 | static void | ||
364 | handle_client_listener_error (void *cls, | ||
365 | enum GNUNET_MQ_Error error) | ||
366 | { | ||
367 | struct GNUNET_SET_ListenHandle *lh = cls; | ||
368 | |||
369 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
370 | "listener broke down, re-connecting\n"); | ||
371 | GNUNET_CLIENT_disconnect (lh->client); | ||
372 | lh->client = NULL; | ||
373 | GNUNET_MQ_destroy (lh->mq); | ||
374 | lh->mq = NULL; | ||
375 | lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, | ||
376 | &listen_connect, lh); | ||
377 | lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff); | ||
378 | } | ||
379 | |||
380 | |||
381 | /** | 381 | /** |
382 | * Destroy the set handle if no operations are left, mark the set | 382 | * Destroy the given set operation. |
383 | * for destruction otherwise. | ||
384 | * | 383 | * |
385 | * @param set set handle to destroy | 384 | * @param oh set operation to destroy |
386 | */ | 385 | */ |
387 | static int | 386 | static void |
388 | set_destroy (struct GNUNET_SET_Handle *set) | 387 | set_operation_destroy (struct GNUNET_SET_OperationHandle *oh) |
389 | { | ||
390 | if (NULL != set->ops_head) | ||
391 | { | ||
392 | set->destroy_requested = GNUNET_YES; | ||
393 | return GNUNET_NO; | ||
394 | } | ||
395 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Really destroying set\n"); | ||
396 | GNUNET_CLIENT_disconnect (set->client); | ||
397 | set->client = NULL; | ||
398 | GNUNET_MQ_destroy (set->mq); | ||
399 | set->mq = NULL; | ||
400 | GNUNET_free (set); | ||
401 | return GNUNET_YES; | ||
402 | } | ||
403 | |||
404 | |||
405 | /** | ||
406 | * Cancel the given set operation. We need to send an explicit cancel message, | ||
407 | * as all operations one one set communicate using one handle. | ||
408 | * | ||
409 | * In contrast to #GNUNET_SET_operation_cancel(), this function indicates whether | ||
410 | * the set of the operation has been destroyed because all operations are done and | ||
411 | * the set's destruction was requested before. | ||
412 | * | ||
413 | * @param oh set operation to cancel | ||
414 | * @return #GNUNET_YES if the set of the operation was destroyed | ||
415 | */ | ||
416 | static int | ||
417 | set_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | ||
418 | { | 388 | { |
419 | int ret = GNUNET_NO; | 389 | struct GNUNET_SET_Handle *set = oh->set; |
390 | struct GNUNET_SET_OperationHandle *h_assoc; | ||
420 | 391 | ||
421 | if (NULL != oh->conclude_mqm) | 392 | if (NULL != oh->conclude_mqm) |
422 | GNUNET_MQ_discard (oh->conclude_mqm); | 393 | GNUNET_MQ_discard (oh->conclude_mqm); |
423 | |||
424 | /* is the operation already commited? */ | 394 | /* is the operation already commited? */ |
425 | if (NULL != oh->set) | 395 | if (NULL != set) |
426 | { | 396 | { |
427 | struct GNUNET_SET_OperationHandle *h_assoc; | 397 | GNUNET_CONTAINER_DLL_remove (set->ops_head, |
428 | struct GNUNET_SET_CancelMessage *m; | 398 | set->ops_tail, |
429 | struct GNUNET_MQ_Envelope *mqm; | ||
430 | |||
431 | GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, | ||
432 | oh->set->ops_tail, | ||
433 | oh); | 399 | oh); |
434 | h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, | 400 | h_assoc = GNUNET_MQ_assoc_remove (set->mq, |
435 | oh->request_id); | 401 | oh->request_id); |
436 | GNUNET_assert ((h_assoc == NULL) || (h_assoc == oh)); | 402 | GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh)); |
437 | mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL); | ||
438 | m->request_id = htonl (oh->request_id); | ||
439 | GNUNET_MQ_send (oh->set->mq, mqm); | ||
440 | |||
441 | if (GNUNET_YES == oh->set->destroy_requested) | ||
442 | { | ||
443 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
444 | "Destroying set after operation cancel\n"); | ||
445 | ret = set_destroy (oh->set); | ||
446 | } | ||
447 | } | 403 | } |
448 | GNUNET_free (oh); | 404 | GNUNET_free (oh); |
449 | return ret; | ||
450 | } | 405 | } |
451 | 406 | ||
452 | 407 | ||
@@ -460,27 +415,58 @@ set_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | |||
460 | void | 415 | void |
461 | GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | 416 | GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) |
462 | { | 417 | { |
463 | (void) set_operation_cancel (oh); | 418 | struct GNUNET_SET_Handle *set = oh->set; |
419 | struct GNUNET_SET_CancelMessage *m; | ||
420 | struct GNUNET_MQ_Envelope *mqm; | ||
421 | |||
422 | if (NULL != set) | ||
423 | { | ||
424 | mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL); | ||
425 | m->request_id = htonl (oh->request_id); | ||
426 | GNUNET_MQ_send (set->mq, mqm); | ||
427 | } | ||
428 | set_operation_destroy (oh); | ||
429 | if ( (NULL != set) && | ||
430 | (GNUNET_YES == set->destroy_requested) && | ||
431 | (NULL == set->ops_head) ) | ||
432 | { | ||
433 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
434 | "Destroying set after operation cancel\n"); | ||
435 | GNUNET_SET_destroy (set); | ||
436 | } | ||
464 | } | 437 | } |
465 | 438 | ||
466 | 439 | ||
440 | /** | ||
441 | * We encountered an error communicating with the set service while | ||
442 | * performing a set operation. Report to the application. | ||
443 | * | ||
444 | * @param cls the `struct GNUNET_SET_Handle` | ||
445 | * @param error error code | ||
446 | */ | ||
467 | static void | 447 | static void |
468 | handle_client_set_error (void *cls, enum GNUNET_MQ_Error error) | 448 | handle_client_set_error (void *cls, |
449 | enum GNUNET_MQ_Error error) | ||
469 | { | 450 | { |
470 | struct GNUNET_SET_Handle *set = cls; | 451 | struct GNUNET_SET_Handle *set = cls; |
471 | 452 | ||
472 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 453 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
473 | "handling client set error\n"); | 454 | "Handling client set error\n"); |
474 | |||
475 | while (NULL != set->ops_head) | 455 | while (NULL != set->ops_head) |
476 | { | 456 | { |
477 | if (NULL != set->ops_head->result_cb) | 457 | if (NULL != set->ops_head->result_cb) |
478 | set->ops_head->result_cb (set->ops_head->result_cls, NULL, | 458 | set->ops_head->result_cb (set->ops_head->result_cls, |
459 | NULL, | ||
479 | GNUNET_SET_STATUS_FAILURE); | 460 | GNUNET_SET_STATUS_FAILURE); |
480 | if (GNUNET_YES == set_operation_cancel (set->ops_head)) | 461 | set_operation_destroy (set->ops_head); |
481 | return; /* stop if the set is destroyed */ | ||
482 | } | 462 | } |
483 | set->invalid = GNUNET_YES; | 463 | set->invalid = GNUNET_YES; |
464 | if (GNUNET_YES == set->destroy_requested) | ||
465 | { | ||
466 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
467 | "Destroying set after operation failure\n"); | ||
468 | GNUNET_SET_destroy (set); | ||
469 | } | ||
484 | } | 470 | } |
485 | 471 | ||
486 | 472 | ||
@@ -500,9 +486,11 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
500 | enum GNUNET_SET_OperationType op) | 486 | enum GNUNET_SET_OperationType op) |
501 | { | 487 | { |
502 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 488 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
503 | {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0}, | 489 | { &handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0}, |
504 | {handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0}, | 490 | { &handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0}, |
505 | {handle_iter_done, GNUNET_MESSAGE_TYPE_SET_ITER_DONE, 0}, | 491 | { &handle_iter_done, |
492 | GNUNET_MESSAGE_TYPE_SET_ITER_DONE, | ||
493 | sizeof (struct GNUNET_MessageHeader) }, | ||
506 | GNUNET_MQ_HANDLERS_END | 494 | GNUNET_MQ_HANDLERS_END |
507 | }; | 495 | }; |
508 | struct GNUNET_SET_Handle *set; | 496 | struct GNUNET_SET_Handle *set; |
@@ -511,12 +499,17 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
511 | 499 | ||
512 | set = GNUNET_new (struct GNUNET_SET_Handle); | 500 | set = GNUNET_new (struct GNUNET_SET_Handle); |
513 | set->client = GNUNET_CLIENT_connect ("set", cfg); | 501 | set->client = GNUNET_CLIENT_connect ("set", cfg); |
514 | LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n"); | 502 | if (NULL == set->client) |
515 | GNUNET_assert (NULL != set->client); | 503 | { |
516 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, | 504 | GNUNET_free (set); |
517 | handle_client_set_error, set); | 505 | return NULL; |
506 | } | ||
507 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, | ||
508 | mq_handlers, | ||
509 | &handle_client_set_error, set); | ||
518 | GNUNET_assert (NULL != set->mq); | 510 | GNUNET_assert (NULL != set->mq); |
519 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); | 511 | mqm = GNUNET_MQ_msg (msg, |
512 | GNUNET_MESSAGE_TYPE_SET_CREATE); | ||
520 | msg->operation = htonl (op); | 513 | msg->operation = htonl (op); |
521 | GNUNET_MQ_send (set->mq, mqm); | 514 | GNUNET_MQ_send (set->mq, mqm); |
522 | return set; | 515 | return set; |
@@ -524,10 +517,10 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
524 | 517 | ||
525 | 518 | ||
526 | /** | 519 | /** |
527 | * Add an element to the given set. | 520 | * Add an element to the given set. After the element has been added |
528 | * After the element has been added (in the sense of being | 521 | * (in the sense of being transmitted to the set service), @a cont |
529 | * transmitted to the set service), cont will be called. | 522 | * will be called. Multiple calls to GNUNET_SET_add_element() can be |
530 | * Calls to add_element can be queued | 523 | * queued. |
531 | * | 524 | * |
532 | * @param set set to add element to | 525 | * @param set set to add element to |
533 | * @param element element to add to the set | 526 | * @param element element to add to the set |
@@ -551,21 +544,24 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set, | |||
551 | cont (cont_cls); | 544 | cont (cont_cls); |
552 | return GNUNET_SYSERR; | 545 | return GNUNET_SYSERR; |
553 | } | 546 | } |
554 | 547 | mqm = GNUNET_MQ_msg_extra (msg, element->size, | |
555 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); | 548 | GNUNET_MESSAGE_TYPE_SET_ADD); |
556 | msg->element_type = element->element_type; | 549 | msg->element_type = element->element_type; |
557 | memcpy (&msg[1], element->data, element->size); | 550 | memcpy (&msg[1], |
558 | GNUNET_MQ_notify_sent (mqm, cont, cont_cls); | 551 | element->data, |
552 | element->size); | ||
553 | GNUNET_MQ_notify_sent (mqm, | ||
554 | cont, cont_cls); | ||
559 | GNUNET_MQ_send (set->mq, mqm); | 555 | GNUNET_MQ_send (set->mq, mqm); |
560 | return GNUNET_OK; | 556 | return GNUNET_OK; |
561 | } | 557 | } |
562 | 558 | ||
563 | 559 | ||
564 | /** | 560 | /** |
565 | * Remove an element to the given set. | 561 | * Remove an element to the given set. After the element has been |
566 | * After the element has been removed (in the sense of the | 562 | * removed (in the sense of the request being transmitted to the set |
567 | * request being transmitted to the set service), cont will be called. | 563 | * service), @a cont will be called. Multiple calls to |
568 | * Calls to remove_element can be queued | 564 | * GNUNET_SET_remove_element() can be queued |
569 | * | 565 | * |
570 | * @param set set to remove element from | 566 | * @param set set to remove element from |
571 | * @param element element to remove from the set | 567 | * @param element element to remove from the set |
@@ -589,27 +585,49 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set, | |||
589 | cont (cont_cls); | 585 | cont (cont_cls); |
590 | return GNUNET_SYSERR; | 586 | return GNUNET_SYSERR; |
591 | } | 587 | } |
592 | |||
593 | mqm = GNUNET_MQ_msg_extra (msg, | 588 | mqm = GNUNET_MQ_msg_extra (msg, |
594 | element->size, | 589 | element->size, |
595 | GNUNET_MESSAGE_TYPE_SET_REMOVE); | 590 | GNUNET_MESSAGE_TYPE_SET_REMOVE); |
596 | msg->element_type = element->element_type; | 591 | msg->element_type = element->element_type; |
597 | memcpy (&msg[1], element->data, element->size); | 592 | memcpy (&msg[1], |
598 | GNUNET_MQ_notify_sent (mqm, cont, cont_cls); | 593 | element->data, |
594 | element->size); | ||
595 | GNUNET_MQ_notify_sent (mqm, | ||
596 | cont, cont_cls); | ||
599 | GNUNET_MQ_send (set->mq, mqm); | 597 | GNUNET_MQ_send (set->mq, mqm); |
600 | return GNUNET_OK; | 598 | return GNUNET_OK; |
601 | } | 599 | } |
602 | 600 | ||
603 | 601 | ||
604 | /** | 602 | /** |
605 | * Destroy the set handle, and free all associated resources. | 603 | * Destroy the set handle if no operations are left, mark the set |
604 | * for destruction otherwise. | ||
606 | * | 605 | * |
607 | * @param set set handle to destroy | 606 | * @param set set handle to destroy |
608 | */ | 607 | */ |
609 | void | 608 | void |
610 | GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | 609 | GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) |
611 | { | 610 | { |
612 | (void) set_destroy (set); | 611 | if (NULL != set->ops_head) |
612 | { | ||
613 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
614 | "Set operations are pending, delaying set destruction\n"); | ||
615 | set->destroy_requested = GNUNET_YES; | ||
616 | return; | ||
617 | } | ||
618 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
619 | "Really destroying set\n"); | ||
620 | if (NULL != set->client) | ||
621 | { | ||
622 | GNUNET_CLIENT_disconnect (set->client); | ||
623 | set->client = NULL; | ||
624 | } | ||
625 | if (NULL != set->mq) | ||
626 | { | ||
627 | GNUNET_MQ_destroy (set->mq); | ||
628 | set->mq = NULL; | ||
629 | } | ||
630 | GNUNET_free (set); | ||
613 | } | 631 | } |
614 | 632 | ||
615 | 633 | ||
@@ -656,43 +674,76 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, | |||
656 | 674 | ||
657 | 675 | ||
658 | /** | 676 | /** |
659 | * Connect to the set service in order to listen | 677 | * Connect to the set service in order to listen for requests. |
660 | * for request. | ||
661 | * | 678 | * |
662 | * @param cls the listen handle to connect | 679 | * @param cls the `struct GNUNET_SET_ListenHandle *` to connect |
663 | * @param tc task context if invoked as a task, NULL otherwise | 680 | * @param tc task context if invoked as a task, NULL otherwise |
664 | */ | 681 | */ |
665 | static void | 682 | static void |
666 | listen_connect (void *cls, | 683 | listen_connect (void *cls, |
667 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 684 | const struct GNUNET_SCHEDULER_TaskContext *tc); |
685 | |||
686 | |||
687 | /** | ||
688 | * Our connection with the set service encountered an error, | ||
689 | * re-initialize with exponential back-off. | ||
690 | * | ||
691 | * @param cls the `struct GNUNET_SET_ListenHandle *` | ||
692 | * @param error reason for the disconnect | ||
693 | */ | ||
694 | static void | ||
695 | handle_client_listener_error (void *cls, | ||
696 | enum GNUNET_MQ_Error error) | ||
668 | { | 697 | { |
669 | struct GNUNET_MQ_Envelope *mqm; | ||
670 | struct GNUNET_SET_ListenMessage *msg; | ||
671 | struct GNUNET_SET_ListenHandle *lh = cls; | 698 | struct GNUNET_SET_ListenHandle *lh = cls; |
699 | |||
700 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
701 | "Listener broke down (%d), re-connecting\n", | ||
702 | (int) error); | ||
703 | GNUNET_CLIENT_disconnect (lh->client); | ||
704 | lh->client = NULL; | ||
705 | GNUNET_MQ_destroy (lh->mq); | ||
706 | lh->mq = NULL; | ||
707 | lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, | ||
708 | &listen_connect, lh); | ||
709 | lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff); | ||
710 | } | ||
711 | |||
712 | |||
713 | /** | ||
714 | * Connect to the set service in order to listen for requests. | ||
715 | * | ||
716 | * @param cls the `struct GNUNET_SET_ListenHandle *` to connect | ||
717 | * @param tc task context if invoked as a task, NULL otherwise | ||
718 | */ | ||
719 | static void | ||
720 | listen_connect (void *cls, | ||
721 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
722 | { | ||
672 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 723 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
673 | {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, | 724 | { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST }, |
674 | GNUNET_MQ_HANDLERS_END | 725 | GNUNET_MQ_HANDLERS_END |
675 | }; | 726 | }; |
727 | struct GNUNET_SET_ListenHandle *lh = cls; | ||
728 | struct GNUNET_MQ_Envelope *mqm; | ||
729 | struct GNUNET_SET_ListenMessage *msg; | ||
676 | 730 | ||
677 | if ((tc != NULL) &&(tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) | 731 | if ( (NULL != tc) && |
732 | (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) ) | ||
678 | { | 733 | { |
679 | LOG (GNUNET_ERROR_TYPE_DEBUG, "listener not reconnecting due to shutdown\n"); | 734 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
735 | "Listener not reconnecting due to shutdown\n"); | ||
680 | return; | 736 | return; |
681 | } | 737 | } |
682 | |||
683 | lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | 738 | lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK; |
684 | 739 | ||
685 | GNUNET_assert (NULL == lh->client); | 740 | GNUNET_assert (NULL == lh->client); |
686 | lh->client = GNUNET_CLIENT_connect ("set", lh->cfg); | 741 | lh->client = GNUNET_CLIENT_connect ("set", lh->cfg); |
687 | if (NULL == lh->client) | 742 | if (NULL == lh->client) |
688 | { | ||
689 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
690 | "could not connect to set (wrong configuration?), giving up listening\n"); | ||
691 | return; | 743 | return; |
692 | } | ||
693 | GNUNET_assert (NULL == lh->mq); | 744 | GNUNET_assert (NULL == lh->mq); |
694 | lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, | 745 | lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, |
695 | handle_client_listener_error, lh); | 746 | &handle_client_listener_error, lh); |
696 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); | 747 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); |
697 | msg->operation = htonl (lh->operation); | 748 | msg->operation = htonl (lh->operation); |
698 | msg->app_id = lh->app_id; | 749 | msg->app_id = lh->app_id; |
@@ -709,7 +760,7 @@ listen_connect (void *cls, | |||
709 | * @param app_id id of the application that handles set operation requests | 760 | * @param app_id id of the application that handles set operation requests |
710 | * @param listen_cb called for each incoming request matching the operation | 761 | * @param listen_cb called for each incoming request matching the operation |
711 | * and application id | 762 | * and application id |
712 | * @param listen_cls handle for listen_cb | 763 | * @param listen_cls handle for @a listen_cb |
713 | * @return a handle that can be used to cancel the listen operation | 764 | * @return a handle that can be used to cancel the listen operation |
714 | */ | 765 | */ |
715 | struct GNUNET_SET_ListenHandle * | 766 | struct GNUNET_SET_ListenHandle * |
@@ -729,6 +780,11 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
729 | lh->app_id = *app_id; | 780 | lh->app_id = *app_id; |
730 | lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 781 | lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; |
731 | listen_connect (lh, NULL); | 782 | listen_connect (lh, NULL); |
783 | if (NULL == lh->client) | ||
784 | { | ||
785 | GNUNET_free (lh); | ||
786 | return NULL; | ||
787 | } | ||
732 | return lh; | 788 | return lh; |
733 | } | 789 | } |
734 | 790 | ||
@@ -741,8 +797,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
741 | void | 797 | void |
742 | GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) | 798 | GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) |
743 | { | 799 | { |
744 | LOG (GNUNET_ERROR_TYPE_DEBUG, "canceling listener\n"); | 800 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
745 | /* listener's connection may have failed, thus mq/client could be NULL */ | 801 | "Canceling listener\n"); |
746 | if (NULL != lh->mq) | 802 | if (NULL != lh->mq) |
747 | { | 803 | { |
748 | GNUNET_MQ_destroy (lh->mq); | 804 | GNUNET_MQ_destroy (lh->mq); |
@@ -786,21 +842,16 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
786 | struct GNUNET_SET_OperationHandle *oh; | 842 | struct GNUNET_SET_OperationHandle *oh; |
787 | struct GNUNET_SET_AcceptMessage *msg; | 843 | struct GNUNET_SET_AcceptMessage *msg; |
788 | 844 | ||
789 | GNUNET_assert (NULL != request); | ||
790 | GNUNET_assert (GNUNET_NO == request->accepted); | 845 | GNUNET_assert (GNUNET_NO == request->accepted); |
791 | request->accepted = GNUNET_YES; | 846 | request->accepted = GNUNET_YES; |
792 | |||
793 | oh = GNUNET_new (struct GNUNET_SET_OperationHandle); | ||
794 | oh->result_cb = result_cb; | ||
795 | oh->result_cls = result_cls; | ||
796 | |||
797 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); | 847 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); |
798 | msg->accept_reject_id = htonl (request->accept_id); | 848 | msg->accept_reject_id = htonl (request->accept_id); |
799 | msg->result_mode = htonl (result_mode); | 849 | msg->result_mode = htonl (result_mode); |
800 | 850 | oh = GNUNET_new (struct GNUNET_SET_OperationHandle); | |
851 | oh->result_cb = result_cb; | ||
852 | oh->result_cls = result_cls; | ||
801 | oh->conclude_mqm = mqm; | 853 | oh->conclude_mqm = mqm; |
802 | oh->request_id_addr = &msg->request_id; | 854 | oh->request_id_addr = &msg->request_id; |
803 | |||
804 | return oh; | 855 | return oh; |
805 | } | 856 | } |
806 | 857 | ||
@@ -840,9 +891,9 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, | |||
840 | 891 | ||
841 | 892 | ||
842 | /** | 893 | /** |
843 | * Iterate over all elements in the given set. | 894 | * Iterate over all elements in the given set. Note that this |
844 | * Note that this operation involves transferring every element of the set | 895 | * operation involves transferring every element of the set from the |
845 | * from the service to the client, and is thus costly. | 896 | * service to the client, and is thus costly. |
846 | * | 897 | * |
847 | * @param set the set to iterate over | 898 | * @param set the set to iterate over |
848 | * @param iter the iterator to call for each element | 899 | * @param iter the iterator to call for each element |
@@ -858,15 +909,13 @@ GNUNET_SET_iterate (struct GNUNET_SET_Handle *set, | |||
858 | { | 909 | { |
859 | struct GNUNET_MQ_Envelope *ev; | 910 | struct GNUNET_MQ_Envelope *ev; |
860 | 911 | ||
861 | |||
862 | GNUNET_assert (NULL != iter); | 912 | GNUNET_assert (NULL != iter); |
863 | |||
864 | if (GNUNET_YES == set->invalid) | 913 | if (GNUNET_YES == set->invalid) |
865 | return GNUNET_SYSERR; | 914 | return GNUNET_SYSERR; |
866 | if (NULL != set->iterator) | 915 | if (NULL != set->iterator) |
867 | return GNUNET_NO; | 916 | return GNUNET_NO; |
868 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 917 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
869 | "iterating set\n"); | 918 | "Iterating over set\n"); |
870 | set->iterator = iter; | 919 | set->iterator = iter; |
871 | set->iterator_cls = iter_cls; | 920 | set->iterator_cls = iter_cls; |
872 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST); | 921 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST); |