aboutsummaryrefslogtreecommitdiff
path: root/src/set/set_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-11-27 13:31:52 +0000
committerChristian Grothoff <christian@grothoff.org>2014-11-27 13:31:52 +0000
commit1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2 (patch)
tree309d6241bf67a6a549071aa0ec3d9af82d030d7b /src/set/set_api.c
parentce9b32618b6ee488352ef0eb506c744868145f82 (diff)
downloadgnunet-1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2.tar.gz
gnunet-1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2.zip
use and respect send_more field in IterAckMessage
Diffstat (limited to 'src/set/set_api.c')
-rw-r--r--src/set/set_api.c463
1 files changed, 256 insertions, 207 deletions
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 90cba446c..d62475013 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors) 3 (C) 2012-2014 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -17,7 +17,6 @@
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19*/ 19*/
20
21/** 20/**
22 * @file set/set_api.c 21 * @file set/set_api.c
23 * @brief api for the set service 22 * @brief api for the set service
@@ -44,7 +43,7 @@ struct GNUNET_SET_Handle
44 struct GNUNET_CLIENT_Connection *client; 43 struct GNUNET_CLIENT_Connection *client;
45 44
46 /** 45 /**
47 * Message queue for 'client'. 46 * Message queue for @e client.
48 */ 47 */
49 struct GNUNET_MQ_Handle *mq; 48 struct GNUNET_MQ_Handle *mq;
50 49
@@ -59,30 +58,30 @@ struct GNUNET_SET_Handle
59 struct GNUNET_SET_OperationHandle *ops_tail; 58 struct GNUNET_SET_OperationHandle *ops_tail;
60 59
61 /** 60 /**
62 * Should the set be destroyed once all operations are gone? 61 * Callback for the current iteration over the set,
62 * NULL if no iterator is active.
63 */ 63 */
64 int destroy_requested; 64 GNUNET_SET_ElementIterator iterator;
65 65
66 /** 66 /**
67 * Has the set become invalid (e.g. service died)? 67 * Closure for @e iterator
68 */ 68 */
69 int invalid; 69 void *iterator_cls;
70 70
71 /** 71 /**
72 * Callback for the current iteration over the set, 72 * Should the set be destroyed once all operations are gone?
73 * NULL if no iterator is active.
74 */ 73 */
75 GNUNET_SET_ElementIterator iterator; 74 int destroy_requested;
76 75
77 /** 76 /**
78 * Closure for 'iterator' 77 * Has the set become invalid (e.g. service died)?
79 */ 78 */
80 void *iterator_cls; 79 int invalid;
81}; 80};
82 81
83 82
84/** 83/**
85 * Opaque handle to a set operation request from another peer. 84 * Handle for a set operation request from another peer.
86 */ 85 */
87struct GNUNET_SET_Request 86struct GNUNET_SET_Request
88{ 87{
@@ -94,15 +93,14 @@ struct GNUNET_SET_Request
94 93
95 /** 94 /**
96 * Has the request been accepted already? 95 * Has the request been accepted already?
97 * GNUNET_YES/GNUNET_NO 96 * #GNUNET_YES/#GNUNET_NO
98 */ 97 */
99 int accepted; 98 int accepted;
100}; 99};
101 100
102 101
103/** 102/**
104 * Handle to an operation. 103 * Handle to an operation. Only known to the service after committing
105 * Only known to the service after commiting
106 * the handle with a set. 104 * the handle with a set.
107 */ 105 */
108struct GNUNET_SET_OperationHandle 106struct GNUNET_SET_OperationHandle
@@ -114,7 +112,7 @@ struct GNUNET_SET_OperationHandle
114 GNUNET_SET_ResultIterator result_cb; 112 GNUNET_SET_ResultIterator result_cb;
115 113
116 /** 114 /**
117 * Closure for result_cb. 115 * Closure for @e result_cb.
118 */ 116 */
119 void *result_cls; 117 void *result_cls;
120 118
@@ -125,11 +123,6 @@ struct GNUNET_SET_OperationHandle
125 struct GNUNET_SET_Handle *set; 123 struct GNUNET_SET_Handle *set;
126 124
127 /** 125 /**
128 * Request ID to identify the operation within the set.
129 */
130 uint32_t request_id;
131
132 /**
133 * Message sent to the server on calling conclude, 126 * Message sent to the server on calling conclude,
134 * NULL if conclude has been called. 127 * NULL if conclude has been called.
135 */ 128 */
@@ -150,6 +143,11 @@ struct GNUNET_SET_OperationHandle
150 * Handles are kept in a linked list. 143 * Handles are kept in a linked list.
151 */ 144 */
152 struct GNUNET_SET_OperationHandle *next; 145 struct GNUNET_SET_OperationHandle *next;
146
147 /**
148 * Request ID to identify the operation within the set.
149 */
150 uint32_t request_id;
153}; 151};
154 152
155 153
@@ -182,16 +180,11 @@ struct GNUNET_SET_ListenHandle
182 GNUNET_SET_ListenCallback listen_cb; 180 GNUNET_SET_ListenCallback listen_cb;
183 181
184 /** 182 /**
185 * Closure for listen_cb. 183 * Closure for @e listen_cb.
186 */ 184 */
187 void *listen_cls; 185 void *listen_cls;
188 186
189 /** 187 /**
190 * Operation we listen for.
191 */
192 enum GNUNET_SET_OperationType operation;
193
194 /**
195 * Application ID we listen for. 188 * Application ID we listen for.
196 */ 189 */
197 struct GNUNET_HashCode app_id; 190 struct GNUNET_HashCode app_id;
@@ -205,59 +198,78 @@ struct GNUNET_SET_ListenHandle
205 * Task for reconnecting when the listener fails. 198 * Task for reconnecting when the listener fails.
206 */ 199 */
207 GNUNET_SCHEDULER_TaskIdentifier reconnect_task; 200 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
208};
209 201
210 202 /**
211/* forward declaration */ 203 * Operation we listen for.
212static void 204 */
213listen_connect (void *cls, 205 enum GNUNET_SET_OperationType operation;
214 const struct GNUNET_SCHEDULER_TaskContext *tc); 206};
215 207
216 208
217/** 209/**
218 * Handle element for iteration over the set. 210 * Handle element for iteration over the set. Notifies the
211 * iterator and sends an acknowledgement to the service.
219 * 212 *
220 * @param cls the set 213 * @param cls the set
221 * @param mh the message 214 * @param mh the message
222 */ 215 */
223static void 216static void
224handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh) 217handle_iter_element (void *cls,
218 const struct GNUNET_MessageHeader *mh)
225{ 219{
226 struct GNUNET_SET_Handle *set = cls; 220 struct GNUNET_SET_Handle *set = cls;
221 GNUNET_SET_ElementIterator iter = set->iterator;
227 struct GNUNET_SET_Element element; 222 struct GNUNET_SET_Element element;
228 const struct GNUNET_SET_IterResponseMessage *msg = 223 const struct GNUNET_SET_IterResponseMessage *msg;
229 (const struct GNUNET_SET_IterResponseMessage *) mh;
230 struct GNUNET_SET_IterAckMessage *ack_msg; 224 struct GNUNET_SET_IterAckMessage *ack_msg;
231 struct GNUNET_MQ_Envelope *ev; 225 struct GNUNET_MQ_Envelope *ev;
226 uint16_t msize;
232 227
233 if (NULL == set->iterator) 228 msize = ntohs (mh->size);
234 return; 229 if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
235 230 {
236 element.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_IterResponseMessage); 231 /* message malformed */
237 element.element_type = htons (msg->element_type); 232 GNUNET_break (0);
238 element.data = &msg[1]; 233 set->iterator = NULL;
239 set->iterator (set->iterator_cls, &element); 234 iter (set->iterator_cls,
240 ev = GNUNET_MQ_msg (ack_msg, GNUNET_MESSAGE_TYPE_SET_ITER_ACK); 235 NULL);
241 ack_msg->send_more = htonl (1); 236 iter = NULL;
237 }
238 if (NULL != iter)
239 {
240 msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
241 element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
242 element.element_type = htons (msg->element_type);
243 element.data = &msg[1];
244 iter (set->iterator_cls,
245 &element);
246 }
247 ev = GNUNET_MQ_msg (ack_msg,
248 GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
249 ack_msg->send_more = htonl ((NULL != iter));
242 GNUNET_MQ_send (set->mq, ev); 250 GNUNET_MQ_send (set->mq, ev);
243} 251}
244 252
245 253
246/** 254/**
247 * Handle element for iteration over the set. 255 * Handle message signalling conclusion of iteration over the set.
256 * Notifies the iterator that we are done.
248 * 257 *
249 * @param cls the set 258 * @param cls the set
250 * @param mh the message 259 * @param mh the message
251 */ 260 */
252static void 261static void
253handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh) 262handle_iter_done (void *cls,
263 const struct GNUNET_MessageHeader *mh)
254{ 264{
255 struct GNUNET_SET_Handle *set = cls; 265 struct GNUNET_SET_Handle *set = cls;
266 GNUNET_SET_ElementIterator iter = set->iterator;
256 267
257 if (NULL == set->iterator) 268 if (NULL == iter)
258 return; 269 return;
259 270 set->iterator = NULL;
260 set->iterator (set->iterator_cls, NULL); 271 iter (set->iterator_cls,
272 NULL);
261} 273}
262 274
263 275
@@ -268,47 +280,53 @@ handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh)
268 * @param mh the message 280 * @param mh the message
269 */ 281 */
270static void 282static void
271handle_result (void *cls, const struct GNUNET_MessageHeader *mh) 283handle_result (void *cls,
284 const struct GNUNET_MessageHeader *mh)
272{ 285{
273 const struct GNUNET_SET_ResultMessage *msg;
274 struct GNUNET_SET_Handle *set = cls; 286 struct GNUNET_SET_Handle *set = cls;
287 const struct GNUNET_SET_ResultMessage *msg;
275 struct GNUNET_SET_OperationHandle *oh; 288 struct GNUNET_SET_OperationHandle *oh;
276 struct GNUNET_SET_Element e; 289 struct GNUNET_SET_Element e;
277 enum GNUNET_SET_Status result_status; 290 enum GNUNET_SET_Status result_status;
278 291
279 msg = (const struct GNUNET_SET_ResultMessage *) mh; 292 msg = (const struct GNUNET_SET_ResultMessage *) mh;
280 GNUNET_assert (NULL != set);
281 GNUNET_assert (NULL != set->mq); 293 GNUNET_assert (NULL != set->mq);
282
283 result_status = ntohs (msg->result_status); 294 result_status = ntohs (msg->result_status);
284 295 oh = GNUNET_MQ_assoc_get (set->mq,
285 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); 296 ntohl (msg->request_id));
286 // 'oh' can be NULL if we canceled the operation, but the service
287 // did not get the cancel message yet.
288 if (NULL == oh) 297 if (NULL == oh)
289 { 298 {
290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ignoring result from canceled operation\n"); 299 /* 'oh' can be NULL if we canceled the operation, but the service
300 did not get the cancel message yet. */
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302 "Ignoring result from canceled operation\n");
291 return; 303 return;
292 } 304 }
293 /* status is not STATUS_OK => there's no attached element,
294 * and this is the last result message we get */
295 if (GNUNET_SET_STATUS_OK != result_status) 305 if (GNUNET_SET_STATUS_OK != result_status)
296 { 306 {
307 /* status is not STATUS_OK => there's no attached element,
308 * and this is the last result message we get */
297 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); 309 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
298 GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh); 310 GNUNET_CONTAINER_DLL_remove (set->ops_head,
299 if (GNUNET_YES == oh->set->destroy_requested) 311 set->ops_tail,
300 GNUNET_SET_destroy (oh->set); 312 oh);
313 if ( (GNUNET_YES == set->destroy_requested) &&
314 (NULL == set->ops_head) )
315 GNUNET_SET_destroy (set);
301 if (NULL != oh->result_cb) 316 if (NULL != oh->result_cb)
302 oh->result_cb (oh->result_cls, NULL, result_status); 317 oh->result_cb (oh->result_cls,
318 NULL,
319 result_status);
303 GNUNET_free (oh); 320 GNUNET_free (oh);
304 return; 321 return;
305 } 322 }
306
307 e.data = &msg[1]; 323 e.data = &msg[1];
308 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); 324 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
309 e.element_type = msg->element_type; 325 e.element_type = msg->element_type;
310 if (NULL != oh->result_cb) 326 if (NULL != oh->result_cb)
311 oh->result_cb (oh->result_cls, &e, result_status); 327 oh->result_cb (oh->result_cls,
328 &e,
329 result_status);
312} 330}
313 331
314 332
@@ -360,93 +378,30 @@ handle_request (void *cls,
360} 378}
361 379
362 380
363static void
364handle_client_listener_error (void *cls,
365 enum GNUNET_MQ_Error error)
366{
367 struct GNUNET_SET_ListenHandle *lh = cls;
368
369 LOG (GNUNET_ERROR_TYPE_DEBUG,
370 "listener broke down, re-connecting\n");
371 GNUNET_CLIENT_disconnect (lh->client);
372 lh->client = NULL;
373 GNUNET_MQ_destroy (lh->mq);
374 lh->mq = NULL;
375 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
376 &listen_connect, lh);
377 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
378}
379
380
381/** 381/**
382 * Destroy the set handle if no operations are left, mark the set 382 * Destroy the given set operation.
383 * for destruction otherwise.
384 * 383 *
385 * @param set set handle to destroy 384 * @param oh set operation to destroy
386 */ 385 */
387static int 386static void
388set_destroy (struct GNUNET_SET_Handle *set) 387set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
389{
390 if (NULL != set->ops_head)
391 {
392 set->destroy_requested = GNUNET_YES;
393 return GNUNET_NO;
394 }
395 LOG (GNUNET_ERROR_TYPE_DEBUG, "Really destroying set\n");
396 GNUNET_CLIENT_disconnect (set->client);
397 set->client = NULL;
398 GNUNET_MQ_destroy (set->mq);
399 set->mq = NULL;
400 GNUNET_free (set);
401 return GNUNET_YES;
402}
403
404
405/**
406 * Cancel the given set operation. We need to send an explicit cancel message,
407 * as all operations one one set communicate using one handle.
408 *
409 * In contrast to #GNUNET_SET_operation_cancel(), this function indicates whether
410 * the set of the operation has been destroyed because all operations are done and
411 * the set's destruction was requested before.
412 *
413 * @param oh set operation to cancel
414 * @return #GNUNET_YES if the set of the operation was destroyed
415 */
416static int
417set_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
418{ 388{
419 int ret = GNUNET_NO; 389 struct GNUNET_SET_Handle *set = oh->set;
390 struct GNUNET_SET_OperationHandle *h_assoc;
420 391
421 if (NULL != oh->conclude_mqm) 392 if (NULL != oh->conclude_mqm)
422 GNUNET_MQ_discard (oh->conclude_mqm); 393 GNUNET_MQ_discard (oh->conclude_mqm);
423
424 /* is the operation already commited? */ 394 /* is the operation already commited? */
425 if (NULL != oh->set) 395 if (NULL != set)
426 { 396 {
427 struct GNUNET_SET_OperationHandle *h_assoc; 397 GNUNET_CONTAINER_DLL_remove (set->ops_head,
428 struct GNUNET_SET_CancelMessage *m; 398 set->ops_tail,
429 struct GNUNET_MQ_Envelope *mqm;
430
431 GNUNET_CONTAINER_DLL_remove (oh->set->ops_head,
432 oh->set->ops_tail,
433 oh); 399 oh);
434 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, 400 h_assoc = GNUNET_MQ_assoc_remove (set->mq,
435 oh->request_id); 401 oh->request_id);
436 GNUNET_assert ((h_assoc == NULL) || (h_assoc == oh)); 402 GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
437 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
438 m->request_id = htonl (oh->request_id);
439 GNUNET_MQ_send (oh->set->mq, mqm);
440
441 if (GNUNET_YES == oh->set->destroy_requested)
442 {
443 LOG (GNUNET_ERROR_TYPE_DEBUG,
444 "Destroying set after operation cancel\n");
445 ret = set_destroy (oh->set);
446 }
447 } 403 }
448 GNUNET_free (oh); 404 GNUNET_free (oh);
449 return ret;
450} 405}
451 406
452 407
@@ -460,27 +415,58 @@ set_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
460void 415void
461GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) 416GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
462{ 417{
463 (void) set_operation_cancel (oh); 418 struct GNUNET_SET_Handle *set = oh->set;
419 struct GNUNET_SET_CancelMessage *m;
420 struct GNUNET_MQ_Envelope *mqm;
421
422 if (NULL != set)
423 {
424 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
425 m->request_id = htonl (oh->request_id);
426 GNUNET_MQ_send (set->mq, mqm);
427 }
428 set_operation_destroy (oh);
429 if ( (NULL != set) &&
430 (GNUNET_YES == set->destroy_requested) &&
431 (NULL == set->ops_head) )
432 {
433 LOG (GNUNET_ERROR_TYPE_DEBUG,
434 "Destroying set after operation cancel\n");
435 GNUNET_SET_destroy (set);
436 }
464} 437}
465 438
466 439
440/**
441 * We encountered an error communicating with the set service while
442 * performing a set operation. Report to the application.
443 *
444 * @param cls the `struct GNUNET_SET_Handle`
445 * @param error error code
446 */
467static void 447static void
468handle_client_set_error (void *cls, enum GNUNET_MQ_Error error) 448handle_client_set_error (void *cls,
449 enum GNUNET_MQ_Error error)
469{ 450{
470 struct GNUNET_SET_Handle *set = cls; 451 struct GNUNET_SET_Handle *set = cls;
471 452
472 LOG (GNUNET_ERROR_TYPE_DEBUG, 453 LOG (GNUNET_ERROR_TYPE_DEBUG,
473 "handling client set error\n"); 454 "Handling client set error\n");
474
475 while (NULL != set->ops_head) 455 while (NULL != set->ops_head)
476 { 456 {
477 if (NULL != set->ops_head->result_cb) 457 if (NULL != set->ops_head->result_cb)
478 set->ops_head->result_cb (set->ops_head->result_cls, NULL, 458 set->ops_head->result_cb (set->ops_head->result_cls,
459 NULL,
479 GNUNET_SET_STATUS_FAILURE); 460 GNUNET_SET_STATUS_FAILURE);
480 if (GNUNET_YES == set_operation_cancel (set->ops_head)) 461 set_operation_destroy (set->ops_head);
481 return; /* stop if the set is destroyed */
482 } 462 }
483 set->invalid = GNUNET_YES; 463 set->invalid = GNUNET_YES;
464 if (GNUNET_YES == set->destroy_requested)
465 {
466 LOG (GNUNET_ERROR_TYPE_DEBUG,
467 "Destroying set after operation failure\n");
468 GNUNET_SET_destroy (set);
469 }
484} 470}
485 471
486 472
@@ -500,9 +486,11 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
500 enum GNUNET_SET_OperationType op) 486 enum GNUNET_SET_OperationType op)
501{ 487{
502 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 488 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
503 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0}, 489 { &handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
504 {handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0}, 490 { &handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0},
505 {handle_iter_done, GNUNET_MESSAGE_TYPE_SET_ITER_DONE, 0}, 491 { &handle_iter_done,
492 GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
493 sizeof (struct GNUNET_MessageHeader) },
506 GNUNET_MQ_HANDLERS_END 494 GNUNET_MQ_HANDLERS_END
507 }; 495 };
508 struct GNUNET_SET_Handle *set; 496 struct GNUNET_SET_Handle *set;
@@ -511,12 +499,17 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
511 499
512 set = GNUNET_new (struct GNUNET_SET_Handle); 500 set = GNUNET_new (struct GNUNET_SET_Handle);
513 set->client = GNUNET_CLIENT_connect ("set", cfg); 501 set->client = GNUNET_CLIENT_connect ("set", cfg);
514 LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n"); 502 if (NULL == set->client)
515 GNUNET_assert (NULL != set->client); 503 {
516 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, 504 GNUNET_free (set);
517 handle_client_set_error, set); 505 return NULL;
506 }
507 set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
508 mq_handlers,
509 &handle_client_set_error, set);
518 GNUNET_assert (NULL != set->mq); 510 GNUNET_assert (NULL != set->mq);
519 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); 511 mqm = GNUNET_MQ_msg (msg,
512 GNUNET_MESSAGE_TYPE_SET_CREATE);
520 msg->operation = htonl (op); 513 msg->operation = htonl (op);
521 GNUNET_MQ_send (set->mq, mqm); 514 GNUNET_MQ_send (set->mq, mqm);
522 return set; 515 return set;
@@ -524,10 +517,10 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
524 517
525 518
526/** 519/**
527 * Add an element to the given set. 520 * Add an element to the given set. After the element has been added
528 * After the element has been added (in the sense of being 521 * (in the sense of being transmitted to the set service), @a cont
529 * transmitted to the set service), cont will be called. 522 * will be called. Multiple calls to GNUNET_SET_add_element() can be
530 * Calls to add_element can be queued 523 * queued.
531 * 524 *
532 * @param set set to add element to 525 * @param set set to add element to
533 * @param element element to add to the set 526 * @param element element to add to the set
@@ -551,21 +544,24 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
551 cont (cont_cls); 544 cont (cont_cls);
552 return GNUNET_SYSERR; 545 return GNUNET_SYSERR;
553 } 546 }
554 547 mqm = GNUNET_MQ_msg_extra (msg, element->size,
555 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); 548 GNUNET_MESSAGE_TYPE_SET_ADD);
556 msg->element_type = element->element_type; 549 msg->element_type = element->element_type;
557 memcpy (&msg[1], element->data, element->size); 550 memcpy (&msg[1],
558 GNUNET_MQ_notify_sent (mqm, cont, cont_cls); 551 element->data,
552 element->size);
553 GNUNET_MQ_notify_sent (mqm,
554 cont, cont_cls);
559 GNUNET_MQ_send (set->mq, mqm); 555 GNUNET_MQ_send (set->mq, mqm);
560 return GNUNET_OK; 556 return GNUNET_OK;
561} 557}
562 558
563 559
564/** 560/**
565 * Remove an element to the given set. 561 * Remove an element to the given set. After the element has been
566 * After the element has been removed (in the sense of the 562 * removed (in the sense of the request being transmitted to the set
567 * request being transmitted to the set service), cont will be called. 563 * service), @a cont will be called. Multiple calls to
568 * Calls to remove_element can be queued 564 * GNUNET_SET_remove_element() can be queued
569 * 565 *
570 * @param set set to remove element from 566 * @param set set to remove element from
571 * @param element element to remove from the set 567 * @param element element to remove from the set
@@ -589,27 +585,49 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
589 cont (cont_cls); 585 cont (cont_cls);
590 return GNUNET_SYSERR; 586 return GNUNET_SYSERR;
591 } 587 }
592
593 mqm = GNUNET_MQ_msg_extra (msg, 588 mqm = GNUNET_MQ_msg_extra (msg,
594 element->size, 589 element->size,
595 GNUNET_MESSAGE_TYPE_SET_REMOVE); 590 GNUNET_MESSAGE_TYPE_SET_REMOVE);
596 msg->element_type = element->element_type; 591 msg->element_type = element->element_type;
597 memcpy (&msg[1], element->data, element->size); 592 memcpy (&msg[1],
598 GNUNET_MQ_notify_sent (mqm, cont, cont_cls); 593 element->data,
594 element->size);
595 GNUNET_MQ_notify_sent (mqm,
596 cont, cont_cls);
599 GNUNET_MQ_send (set->mq, mqm); 597 GNUNET_MQ_send (set->mq, mqm);
600 return GNUNET_OK; 598 return GNUNET_OK;
601} 599}
602 600
603 601
604/** 602/**
605 * Destroy the set handle, and free all associated resources. 603 * Destroy the set handle if no operations are left, mark the set
604 * for destruction otherwise.
606 * 605 *
607 * @param set set handle to destroy 606 * @param set set handle to destroy
608 */ 607 */
609void 608void
610GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) 609GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
611{ 610{
612 (void) set_destroy (set); 611 if (NULL != set->ops_head)
612 {
613 LOG (GNUNET_ERROR_TYPE_DEBUG,
614 "Set operations are pending, delaying set destruction\n");
615 set->destroy_requested = GNUNET_YES;
616 return;
617 }
618 LOG (GNUNET_ERROR_TYPE_DEBUG,
619 "Really destroying set\n");
620 if (NULL != set->client)
621 {
622 GNUNET_CLIENT_disconnect (set->client);
623 set->client = NULL;
624 }
625 if (NULL != set->mq)
626 {
627 GNUNET_MQ_destroy (set->mq);
628 set->mq = NULL;
629 }
630 GNUNET_free (set);
613} 631}
614 632
615 633
@@ -656,43 +674,76 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
656 674
657 675
658/** 676/**
659 * Connect to the set service in order to listen 677 * Connect to the set service in order to listen for requests.
660 * for request.
661 * 678 *
662 * @param cls the listen handle to connect 679 * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
663 * @param tc task context if invoked as a task, NULL otherwise 680 * @param tc task context if invoked as a task, NULL otherwise
664 */ 681 */
665static void 682static void
666listen_connect (void *cls, 683listen_connect (void *cls,
667 const struct GNUNET_SCHEDULER_TaskContext *tc) 684 const struct GNUNET_SCHEDULER_TaskContext *tc);
685
686
687/**
688 * Our connection with the set service encountered an error,
689 * re-initialize with exponential back-off.
690 *
691 * @param cls the `struct GNUNET_SET_ListenHandle *`
692 * @param error reason for the disconnect
693 */
694static void
695handle_client_listener_error (void *cls,
696 enum GNUNET_MQ_Error error)
668{ 697{
669 struct GNUNET_MQ_Envelope *mqm;
670 struct GNUNET_SET_ListenMessage *msg;
671 struct GNUNET_SET_ListenHandle *lh = cls; 698 struct GNUNET_SET_ListenHandle *lh = cls;
699
700 LOG (GNUNET_ERROR_TYPE_DEBUG,
701 "Listener broke down (%d), re-connecting\n",
702 (int) error);
703 GNUNET_CLIENT_disconnect (lh->client);
704 lh->client = NULL;
705 GNUNET_MQ_destroy (lh->mq);
706 lh->mq = NULL;
707 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
708 &listen_connect, lh);
709 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
710}
711
712
713/**
714 * Connect to the set service in order to listen for requests.
715 *
716 * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
717 * @param tc task context if invoked as a task, NULL otherwise
718 */
719static void
720listen_connect (void *cls,
721 const struct GNUNET_SCHEDULER_TaskContext *tc)
722{
672 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 723 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
673 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, 724 { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
674 GNUNET_MQ_HANDLERS_END 725 GNUNET_MQ_HANDLERS_END
675 }; 726 };
727 struct GNUNET_SET_ListenHandle *lh = cls;
728 struct GNUNET_MQ_Envelope *mqm;
729 struct GNUNET_SET_ListenMessage *msg;
676 730
677 if ((tc != NULL) &&(tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) 731 if ( (NULL != tc) &&
732 (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
678 { 733 {
679 LOG (GNUNET_ERROR_TYPE_DEBUG, "listener not reconnecting due to shutdown\n"); 734 LOG (GNUNET_ERROR_TYPE_DEBUG,
735 "Listener not reconnecting due to shutdown\n");
680 return; 736 return;
681 } 737 }
682
683 lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 738 lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
684 739
685 GNUNET_assert (NULL == lh->client); 740 GNUNET_assert (NULL == lh->client);
686 lh->client = GNUNET_CLIENT_connect ("set", lh->cfg); 741 lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
687 if (NULL == lh->client) 742 if (NULL == lh->client)
688 {
689 LOG (GNUNET_ERROR_TYPE_ERROR,
690 "could not connect to set (wrong configuration?), giving up listening\n");
691 return; 743 return;
692 }
693 GNUNET_assert (NULL == lh->mq); 744 GNUNET_assert (NULL == lh->mq);
694 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, 745 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
695 handle_client_listener_error, lh); 746 &handle_client_listener_error, lh);
696 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); 747 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
697 msg->operation = htonl (lh->operation); 748 msg->operation = htonl (lh->operation);
698 msg->app_id = lh->app_id; 749 msg->app_id = lh->app_id;
@@ -709,7 +760,7 @@ listen_connect (void *cls,
709 * @param app_id id of the application that handles set operation requests 760 * @param app_id id of the application that handles set operation requests
710 * @param listen_cb called for each incoming request matching the operation 761 * @param listen_cb called for each incoming request matching the operation
711 * and application id 762 * and application id
712 * @param listen_cls handle for listen_cb 763 * @param listen_cls handle for @a listen_cb
713 * @return a handle that can be used to cancel the listen operation 764 * @return a handle that can be used to cancel the listen operation
714 */ 765 */
715struct GNUNET_SET_ListenHandle * 766struct GNUNET_SET_ListenHandle *
@@ -729,6 +780,11 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
729 lh->app_id = *app_id; 780 lh->app_id = *app_id;
730 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 781 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
731 listen_connect (lh, NULL); 782 listen_connect (lh, NULL);
783 if (NULL == lh->client)
784 {
785 GNUNET_free (lh);
786 return NULL;
787 }
732 return lh; 788 return lh;
733} 789}
734 790
@@ -741,8 +797,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
741void 797void
742GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) 798GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
743{ 799{
744 LOG (GNUNET_ERROR_TYPE_DEBUG, "canceling listener\n"); 800 LOG (GNUNET_ERROR_TYPE_DEBUG,
745 /* listener's connection may have failed, thus mq/client could be NULL */ 801 "Canceling listener\n");
746 if (NULL != lh->mq) 802 if (NULL != lh->mq)
747 { 803 {
748 GNUNET_MQ_destroy (lh->mq); 804 GNUNET_MQ_destroy (lh->mq);
@@ -786,21 +842,16 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
786 struct GNUNET_SET_OperationHandle *oh; 842 struct GNUNET_SET_OperationHandle *oh;
787 struct GNUNET_SET_AcceptMessage *msg; 843 struct GNUNET_SET_AcceptMessage *msg;
788 844
789 GNUNET_assert (NULL != request);
790 GNUNET_assert (GNUNET_NO == request->accepted); 845 GNUNET_assert (GNUNET_NO == request->accepted);
791 request->accepted = GNUNET_YES; 846 request->accepted = GNUNET_YES;
792
793 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
794 oh->result_cb = result_cb;
795 oh->result_cls = result_cls;
796
797 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); 847 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
798 msg->accept_reject_id = htonl (request->accept_id); 848 msg->accept_reject_id = htonl (request->accept_id);
799 msg->result_mode = htonl (result_mode); 849 msg->result_mode = htonl (result_mode);
800 850 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
851 oh->result_cb = result_cb;
852 oh->result_cls = result_cls;
801 oh->conclude_mqm = mqm; 853 oh->conclude_mqm = mqm;
802 oh->request_id_addr = &msg->request_id; 854 oh->request_id_addr = &msg->request_id;
803
804 return oh; 855 return oh;
805} 856}
806 857
@@ -840,9 +891,9 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
840 891
841 892
842/** 893/**
843 * Iterate over all elements in the given set. 894 * Iterate over all elements in the given set. Note that this
844 * Note that this operation involves transferring every element of the set 895 * operation involves transferring every element of the set from the
845 * from the service to the client, and is thus costly. 896 * service to the client, and is thus costly.
846 * 897 *
847 * @param set the set to iterate over 898 * @param set the set to iterate over
848 * @param iter the iterator to call for each element 899 * @param iter the iterator to call for each element
@@ -858,15 +909,13 @@ GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
858{ 909{
859 struct GNUNET_MQ_Envelope *ev; 910 struct GNUNET_MQ_Envelope *ev;
860 911
861
862 GNUNET_assert (NULL != iter); 912 GNUNET_assert (NULL != iter);
863
864 if (GNUNET_YES == set->invalid) 913 if (GNUNET_YES == set->invalid)
865 return GNUNET_SYSERR; 914 return GNUNET_SYSERR;
866 if (NULL != set->iterator) 915 if (NULL != set->iterator)
867 return GNUNET_NO; 916 return GNUNET_NO;
868 LOG (GNUNET_ERROR_TYPE_DEBUG, 917 LOG (GNUNET_ERROR_TYPE_DEBUG,
869 "iterating set\n"); 918 "Iterating over set\n");
870 set->iterator = iter; 919 set->iterator = iter;
871 set->iterator_cls = iter_cls; 920 set->iterator_cls = iter_cls;
872 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST); 921 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);