aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-12-19 10:15:01 +0000
committerChristian Grothoff <christian@grothoff.org>2014-12-19 10:15:01 +0000
commit1446d6a11597c0ef6b614b2759c704d37a0f759c (patch)
tree8b8d73b02723514c7fc210e7d246388f99f988ac /src/peerstore/peerstore_api.c
parent401cd28052d544bf3260b02f03391397b23ef770 (diff)
downloadgnunet-1446d6a11597c0ef6b614b2759c704d37a0f759c.tar.gz
gnunet-1446d6a11597c0ef6b614b2759c704d37a0f759c.zip
finishing fixing #3581, with also simplified logic
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c206
1 files changed, 57 insertions, 149 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 8ef04604f..39f37b022 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -133,11 +133,6 @@ struct GNUNET_PEERSTORE_StoreContext
133 void *value; 133 void *value;
134 134
135 /** 135 /**
136 * MQ Envelope with store request message
137 */
138 struct GNUNET_MQ_Envelope *ev;
139
140 /**
141 * Peer the store is for. 136 * Peer the store is for.
142 */ 137 */
143 struct GNUNET_PeerIdentity peer; 138 struct GNUNET_PeerIdentity peer;
@@ -200,11 +195,6 @@ struct GNUNET_PEERSTORE_IterateContext
200 struct GNUNET_TIME_Relative timeout; 195 struct GNUNET_TIME_Relative timeout;
201 196
202 /** 197 /**
203 * MQ Envelope with iterate request message
204 */
205 struct GNUNET_MQ_Envelope *ev;
206
207 /**
208 * Callback with each matching record 198 * Callback with each matching record
209 */ 199 */
210 GNUNET_PEERSTORE_Processor callback; 200 GNUNET_PEERSTORE_Processor callback;
@@ -215,8 +205,7 @@ struct GNUNET_PEERSTORE_IterateContext
215 void *callback_cls; 205 void *callback_cls;
216 206
217 /** 207 /**
218 * #GNUNET_YES / #GNUNET_NO 208 * #GNUNET_YES if we are currently processing records.
219 * Iterate request has been sent and we are still expecting records
220 */ 209 */
221 int iterating; 210 int iterating;
222 211
@@ -249,11 +238,6 @@ struct GNUNET_PEERSTORE_WatchContext
249 struct GNUNET_PEERSTORE_Handle *h; 238 struct GNUNET_PEERSTORE_Handle *h;
250 239
251 /** 240 /**
252 * MQ Envelope with watch request message
253 */
254 struct GNUNET_MQ_Envelope *ev;
255
256 /**
257 * Callback with each record received 241 * Callback with each record received
258 */ 242 */
259 GNUNET_PEERSTORE_Processor callback; 243 GNUNET_PEERSTORE_Processor callback;
@@ -268,12 +252,6 @@ struct GNUNET_PEERSTORE_WatchContext
268 */ 252 */
269 struct GNUNET_HashCode keyhash; 253 struct GNUNET_HashCode keyhash;
270 254
271 /**
272 * #GNUNET_YES / #GNUNET_NO
273 * if sent, cannot be canceled
274 */
275 int request_sent;
276
277}; 255};
278 256
279/******************************************************************************/ 257/******************************************************************************/
@@ -306,36 +284,6 @@ handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg);
306static void 284static void
307reconnect (struct GNUNET_PEERSTORE_Handle *h); 285reconnect (struct GNUNET_PEERSTORE_Handle *h);
308 286
309/**
310 * Callback after MQ envelope is sent
311 *
312 * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
313 */
314static void
315watch_request_sent (void *cls)
316{
317 struct GNUNET_PEERSTORE_WatchContext *wc = cls;
318
319 wc->request_sent = GNUNET_YES;
320 wc->ev = NULL;
321}
322
323
324/**
325 * Callback after MQ envelope is sent
326 *
327 * @param cls a `struct GNUNET_PEERSTORE_IterateContext *`
328 */
329static void
330iterate_request_sent (void *cls)
331{
332 struct GNUNET_PEERSTORE_IterateContext *ic = cls;
333
334 LOG (GNUNET_ERROR_TYPE_DEBUG, "Iterate request sent to service.\n");
335 ic->iterating = GNUNET_YES;
336 ic->ev = NULL;
337}
338
339 287
340/** 288/**
341 * Callback after MQ envelope is sent 289 * Callback after MQ envelope is sent
@@ -349,7 +297,6 @@ store_request_sent (void *cls)
349 GNUNET_PEERSTORE_Continuation cont; 297 GNUNET_PEERSTORE_Continuation cont;
350 void *cont_cls; 298 void *cont_cls;
351 299
352 sc->ev = NULL;
353 cont = sc->cont; 300 cont = sc->cont;
354 cont_cls = sc->cont_cls; 301 cont_cls = sc->cont_cls;
355 GNUNET_PEERSTORE_store_cancel (sc); 302 GNUNET_PEERSTORE_store_cancel (sc);
@@ -386,6 +333,11 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
386 333
387/** 334/**
388 * Iterator over previous watches to resend them 335 * Iterator over previous watches to resend them
336 *
337 * @param cls the `struct GNUNET_PEERSTORE_Handle`
338 * @param key key for the watch
339 * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
340 * @return #GNUNET_YES (continue to iterate)
389 */ 341 */
390static int 342static int
391rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value) 343rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
@@ -393,15 +345,11 @@ rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
393 struct GNUNET_PEERSTORE_Handle *h = cls; 345 struct GNUNET_PEERSTORE_Handle *h = cls;
394 struct GNUNET_PEERSTORE_WatchContext *wc = value; 346 struct GNUNET_PEERSTORE_WatchContext *wc = value;
395 struct StoreKeyHashMessage *hm; 347 struct StoreKeyHashMessage *hm;
348 struct GNUNET_MQ_Envelope *ev;
396 349
397 if (GNUNET_YES == wc->request_sent) 350 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
398 { /* Envelope gone, create new one. */ 351 hm->keyhash = wc->keyhash;
399 wc->ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); 352 GNUNET_MQ_send (h->mq, ev);
400 hm->keyhash = wc->keyhash;
401 wc->request_sent = GNUNET_NO;
402 }
403 GNUNET_MQ_notify_sent (wc->ev, &watch_request_sent, wc);
404 GNUNET_MQ_send (h->mq, wc->ev);
405 return GNUNET_YES; 353 return GNUNET_YES;
406} 354}
407 355
@@ -437,48 +385,21 @@ static void
437reconnect (struct GNUNET_PEERSTORE_Handle *h) 385reconnect (struct GNUNET_PEERSTORE_Handle *h)
438{ 386{
439 struct GNUNET_PEERSTORE_IterateContext *ic; 387 struct GNUNET_PEERSTORE_IterateContext *ic;
440 struct GNUNET_PEERSTORE_IterateContext *ic_tmp; 388 struct GNUNET_PEERSTORE_IterateContext *next;
441 GNUNET_PEERSTORE_Processor icb; 389 GNUNET_PEERSTORE_Processor icb;
442 void *icb_cls; 390 void *icb_cls;
443 struct GNUNET_PEERSTORE_StoreContext *sc; 391 struct GNUNET_PEERSTORE_StoreContext *sc;
392 struct GNUNET_MQ_Envelope *ev;
444 393
445 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); 394 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
446 for (sc = h->store_head; NULL != sc; sc = sc->next) 395 for (ic = h->iterate_head; NULL != ic; ic = next)
447 {
448 if (NULL != sc->ev)
449 {
450 GNUNET_MQ_send_cancel (sc->ev);
451 sc->ev = NULL;
452 }
453 }
454 ic = h->iterate_head;
455 while (NULL != ic)
456 { 396 {
457 if (GNUNET_YES == ic->iterating) 397 next = ic->next;
458 { 398 icb = ic->callback;
459 icb = ic->callback; 399 icb_cls = ic->callback_cls;
460 icb_cls = ic->callback_cls; 400 GNUNET_PEERSTORE_iterate_cancel (ic);
461 ic->iterating = GNUNET_NO; 401 if (NULL != icb)
462 ic_tmp = ic; 402 icb (icb_cls, NULL, _("Iteration canceled due to reconnection."));
463 ic = ic->next;
464 GNUNET_PEERSTORE_iterate_cancel (ic_tmp);
465 if (NULL != icb)
466 icb (icb_cls, NULL, _("Iteration canceled due to reconnection."));
467 }
468 else
469 {
470 if (GNUNET_SCHEDULER_NO_TASK != ic->timeout_task)
471 {
472 GNUNET_SCHEDULER_cancel (ic->timeout_task);
473 ic->timeout_task = GNUNET_SCHEDULER_NO_TASK;
474 }
475 if (NULL != ic->ev)
476 {
477 GNUNET_MQ_send_cancel (ic->ev);
478 ic->ev = NULL;
479 }
480 ic = ic->next;
481 }
482 } 403 }
483 if (NULL != h->mq) 404 if (NULL != h->mq)
484 { 405 {
@@ -490,6 +411,7 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
490 GNUNET_CLIENT_disconnect (h->client); 411 GNUNET_CLIENT_disconnect (h->client);
491 h->client = NULL; 412 h->client = NULL;
492 } 413 }
414
493 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); 415 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
494 GNUNET_assert (NULL != h->client); 416 GNUNET_assert (NULL != h->client);
495 h->mq = 417 h->mq =
@@ -501,24 +423,23 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
501 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h); 423 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
502 for (ic = h->iterate_head; NULL != ic; ic = ic->next) 424 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
503 { 425 {
504 ic->ev = 426 ev =
505 PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key, 427 PEERSTORE_create_record_mq_envelope (ic->sub_system, &ic->peer, ic->key,
506 NULL, 0, NULL, 0, 428 NULL, 0, NULL, 0,
507 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 429 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
508 GNUNET_MQ_notify_sent (ic->ev, &iterate_request_sent, ic); 430 GNUNET_MQ_send (h->mq, ev);
509 GNUNET_MQ_send (h->mq, ic->ev);
510 ic->timeout_task = 431 ic->timeout_task =
511 GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic); 432 GNUNET_SCHEDULER_add_delayed (ic->timeout, &iterate_timeout, ic);
512 } 433 }
513 for (sc = h->store_head; NULL != sc; sc = sc->next) 434 for (sc = h->store_head; NULL != sc; sc = sc->next)
514 { 435 {
515 sc->ev = 436 ev =
516 PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key, 437 PEERSTORE_create_record_mq_envelope (sc->sub_system, &sc->peer, sc->key,
517 sc->value, sc->size, &sc->expiry, 438 sc->value, sc->size, &sc->expiry,
518 sc->options, 439 sc->options,
519 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 440 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
520 GNUNET_MQ_notify_sent (sc->ev, &store_request_sent, sc); 441 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
521 GNUNET_MQ_send (h->mq, sc->ev); 442 GNUNET_MQ_send (h->mq, ev);
522 } 443 }
523} 444}
524 445
@@ -667,11 +588,6 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
667{ 588{
668 struct GNUNET_PEERSTORE_Handle *h = sc->h; 589 struct GNUNET_PEERSTORE_Handle *h = sc->h;
669 590
670 if (NULL != sc->ev)
671 {
672 GNUNET_MQ_send_cancel (sc->ev);
673 sc->ev = NULL;
674 }
675 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc); 591 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
676 GNUNET_free (sc->sub_system); 592 GNUNET_free (sc->sub_system);
677 GNUNET_free (sc->value); 593 GNUNET_free (sc->value);
@@ -766,6 +682,7 @@ handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
766 reconnect (h); 682 reconnect (h);
767 return; 683 return;
768 } 684 }
685 ic->iterating = GNUNET_YES;
769 callback = ic->callback; 686 callback = ic->callback;
770 callback_cls = ic->callback_cls; 687 callback_cls = ic->callback_cls;
771 if (NULL == msg) /* Connection error */ 688 if (NULL == msg) /* Connection error */
@@ -819,11 +736,6 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
819 } 736 }
820 if (GNUNET_NO == ic->iterating) 737 if (GNUNET_NO == ic->iterating)
821 { 738 {
822 if (NULL != ic->ev)
823 {
824 GNUNET_MQ_send_cancel (ic->ev);
825 ic->ev = NULL;
826 }
827 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic); 739 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
828 GNUNET_free (ic->sub_system); 740 GNUNET_free (ic->sub_system);
829 if (NULL != ic->key) 741 if (NULL != ic->key)
@@ -865,7 +777,6 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
865 777
866 ic->callback = callback; 778 ic->callback = callback;
867 ic->callback_cls = callback_cls; 779 ic->callback_cls = callback_cls;
868 ic->ev = ev;
869 ic->h = h; 780 ic->h = h;
870 ic->sub_system = GNUNET_strdup (sub_system); 781 ic->sub_system = GNUNET_strdup (sub_system);
871 if (NULL != peer) 782 if (NULL != peer)
@@ -873,11 +784,11 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
873 if (NULL != key) 784 if (NULL != key)
874 ic->key = GNUNET_strdup (key); 785 ic->key = GNUNET_strdup (key);
875 ic->timeout = timeout; 786 ic->timeout = timeout;
876 ic->iterating = GNUNET_NO; 787 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
877 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic); 788 h->iterate_tail,
789 ic);
878 LOG (GNUNET_ERROR_TYPE_DEBUG, 790 LOG (GNUNET_ERROR_TYPE_DEBUG,
879 "Sending an iterate request for sub system `%s'\n", sub_system); 791 "Sending an iterate request for sub system `%s'\n", sub_system);
880 GNUNET_MQ_notify_sent (ev, &iterate_request_sent, ic);
881 GNUNET_MQ_send (h->mq, ev); 792 GNUNET_MQ_send (h->mq, ev);
882 ic->timeout_task = 793 ic->timeout_task =
883 GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic); 794 GNUNET_SCHEDULER_add_delayed (timeout, &iterate_timeout, ic);
@@ -914,6 +825,7 @@ handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
914 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); 825 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
915 record = PEERSTORE_parse_record_message (msg); 826 record = PEERSTORE_parse_record_message (msg);
916 PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash); 827 PEERSTORE_hash_key (record->sub_system, record->peer, record->key, &keyhash);
828 // FIXME: what if there are multiple watches for the same key?
917 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash); 829 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
918 if (NULL == wc) 830 if (NULL == wc)
919 { 831 {
@@ -941,23 +853,15 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
941 struct GNUNET_MQ_Envelope *ev; 853 struct GNUNET_MQ_Envelope *ev;
942 struct StoreKeyHashMessage *hm; 854 struct StoreKeyHashMessage *hm;
943 855
944 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); 856 LOG (GNUNET_ERROR_TYPE_DEBUG,
945 if (GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */ 857 "Canceling watch.\n");
946 { 858 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
947 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); 859 hm->keyhash = wc->keyhash;
948 hm->keyhash = wc->keyhash; 860 GNUNET_MQ_send (h->mq, ev);
949 GNUNET_MQ_send (h->mq, ev); 861 GNUNET_CONTAINER_multihashmap_remove (h->watches,
950 wc->callback = NULL; 862 &wc->keyhash,
951 wc->callback_cls = NULL; 863 wc);
952 }
953 if (NULL != wc->ev)
954 {
955 GNUNET_MQ_send_cancel (wc->ev);
956 wc->ev = NULL;
957 }
958 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc);
959 GNUNET_free (wc); 864 GNUNET_free (wc);
960
961} 865}
962 866
963 867
@@ -987,23 +891,27 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
987 GNUNET_assert (NULL != peer); 891 GNUNET_assert (NULL != peer);
988 GNUNET_assert (NULL != key); 892 GNUNET_assert (NULL != key);
989 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); 893 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
990 PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash); 894 PEERSTORE_hash_key (sub_system,
895 peer,
896 key,
897 &hm->keyhash);
991 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext); 898 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
992
993 wc->callback = callback; 899 wc->callback = callback;
994 wc->callback_cls = callback_cls; 900 wc->callback_cls = callback_cls;
995 wc->ev = ev;
996 wc->h = h; 901 wc->h = h;
997 wc->request_sent = GNUNET_NO;
998 wc->keyhash = hm->keyhash; 902 wc->keyhash = hm->keyhash;
999 if (NULL == h->watches) 903 if (NULL == h->watches)
1000 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO); 904 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
1001 GNUNET_CONTAINER_multihashmap_put (h->watches, &wc->keyhash, wc, 905 GNUNET_assert (GNUNET_OK ==
1002 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); 906 GNUNET_CONTAINER_multihashmap_put (h->watches,
907 &wc->keyhash,
908 wc,
909 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1003 LOG (GNUNET_ERROR_TYPE_DEBUG, 910 LOG (GNUNET_ERROR_TYPE_DEBUG,
1004 "Sending a watch request for ss `%s', peer `%s', key `%s'.\n", 911 "Sending a watch request for ss `%s', peer `%s', key `%s'.\n",
1005 sub_system, GNUNET_i2s (peer), key); 912 sub_system,
1006 GNUNET_MQ_notify_sent (ev, &watch_request_sent, wc); 913 GNUNET_i2s (peer),
914 key);
1007 GNUNET_MQ_send (h->mq, ev); 915 GNUNET_MQ_send (h->mq, ev);
1008 return wc; 916 return wc;
1009} 917}