aboutsummaryrefslogtreecommitdiff
path: root/src/service/peerstore/peerstore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/peerstore/peerstore_api.c')
-rw-r--r--src/service/peerstore/peerstore_api.c419
1 files changed, 208 insertions, 211 deletions
diff --git a/src/service/peerstore/peerstore_api.c b/src/service/peerstore/peerstore_api.c
index 3dec7e01b..394f64378 100644
--- a/src/service/peerstore/peerstore_api.c
+++ b/src/service/peerstore/peerstore_api.c
@@ -23,6 +23,7 @@
23 * @author Omar Tarabai 23 * @author Omar Tarabai
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 */ 25 */
26#include "gnunet_common.h"
26#include "platform.h" 27#include "platform.h"
27#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
28#include "gnunet_hello_uri_lib.h" 29#include "gnunet_hello_uri_lib.h"
@@ -86,6 +87,11 @@ struct GNUNET_PEERSTORE_Handle
86 */ 87 */
87 struct GNUNET_TIME_Relative reconnect_delay; 88 struct GNUNET_TIME_Relative reconnect_delay;
88 89
90 /**
91 *
92 */
93 uint32_t last_op_id;
94
89}; 95};
90 96
91/** 97/**
@@ -114,6 +120,11 @@ struct GNUNET_PEERSTORE_StoreContext
114 GNUNET_PEERSTORE_Continuation cont; 120 GNUNET_PEERSTORE_Continuation cont;
115 121
116 /** 122 /**
123 * Request ID
124 */
125 uint32_t rid;
126
127 /**
117 * Closure for @e cont 128 * Closure for @e cont
118 */ 129 */
119 void *cont_cls; 130 void *cont_cls;
@@ -216,9 +227,10 @@ struct GNUNET_PEERSTORE_IterateContext
216 void *callback_cls; 227 void *callback_cls;
217 228
218 /** 229 /**
219 * #GNUNET_YES if we are currently processing records. 230 * Request ID
220 */ 231 */
221 int iterating; 232 uint32_t rid;
233
222}; 234};
223 235
224/** 236/**
@@ -275,6 +287,12 @@ struct GNUNET_PEERSTORE_WatchContext
275 * The sub system requested the watch. 287 * The sub system requested the watch.
276 */ 288 */
277 const char *sub_system; 289 const char *sub_system;
290
291 /**
292 * Request ID
293 */
294 uint32_t rid;
295
278}; 296};
279 297
280/** 298/**
@@ -306,6 +324,12 @@ struct GNUNET_PEERSTORE_NotifyContext
306 * Is this request canceled. 324 * Is this request canceled.
307 */ 325 */
308 unsigned int canceled; 326 unsigned int canceled;
327
328 /**
329 * Request ID
330 */
331 uint32_t rid;
332
309}; 333};
310 334
311/******************************************************************************/ 335/******************************************************************************/
@@ -320,6 +344,18 @@ struct GNUNET_PEERSTORE_NotifyContext
320static void 344static void
321reconnect (void *cls); 345reconnect (void *cls);
322 346
347/**
348 * Get a fresh operation id to distinguish between namestore requests
349 *
350 * @param h the namestore handle
351 * @return next operation id to use
352 */
353static uint32_t
354get_op_id (struct GNUNET_PEERSTORE_Handle *h)
355{
356 return h->last_op_id++;
357}
358
323 359
324/** 360/**
325 * Disconnect from the peerstore service. 361 * Disconnect from the peerstore service.
@@ -329,25 +365,13 @@ reconnect (void *cls);
329static void 365static void
330disconnect (struct GNUNET_PEERSTORE_Handle *h) 366disconnect (struct GNUNET_PEERSTORE_Handle *h)
331{ 367{
332 struct GNUNET_PEERSTORE_IterateContext *next; 368 if (NULL != h->watches)
333
334 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
335 ic = next)
336 { 369 {
337 next = ic->next; 370 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (h->watches));
338 if (GNUNET_YES == ic->iterating) 371 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
339 {
340 GNUNET_PEERSTORE_Processor icb;
341 void *icb_cls;
342
343 icb = ic->callback;
344 icb_cls = ic->callback_cls;
345 GNUNET_PEERSTORE_iterate_cancel (ic);
346 if (NULL != icb)
347 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
348 }
349 } 372 }
350 373 GNUNET_assert (NULL == h->iterate_head);
374 GNUNET_assert (NULL == h->store_head);
351 if (NULL != h->mq) 375 if (NULL != h->mq)
352 { 376 {
353 GNUNET_MQ_destroy (h->mq); 377 GNUNET_MQ_destroy (h->mq);
@@ -376,29 +400,6 @@ disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
376} 400}
377 401
378 402
379/**
380 * Callback after MQ envelope is sent
381 *
382 * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
383 */
384static void
385store_request_sent (void *cls)
386{
387 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
388 GNUNET_PEERSTORE_Continuation cont;
389 void *cont_cls;
390
391 if (NULL != sc)
392 {
393 cont = sc->cont;
394 cont_cls = sc->cont_cls;
395 GNUNET_PEERSTORE_store_cancel (sc);
396 if (NULL != cont)
397 cont (cont_cls, GNUNET_OK);
398 }
399}
400
401
402/******************************************************************************/ 403/******************************************************************************/
403/******************* CONNECTION FUNCTIONS *********************/ 404/******************* CONNECTION FUNCTIONS *********************/
404/******************************************************************************/ 405/******************************************************************************/
@@ -437,30 +438,13 @@ rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
437 438
438 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); 439 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
439 hm->keyhash = wc->keyhash; 440 hm->keyhash = wc->keyhash;
441 hm->rid = get_op_id (h);
440 GNUNET_MQ_send (h->mq, ev); 442 GNUNET_MQ_send (h->mq, ev);
441 return GNUNET_YES; 443 return GNUNET_YES;
442} 444}
443 445
444 446
445/** 447/**
446 * Iterator over watch requests to cancel them.
447 *
448 * @param cls unused
449 * @param key key to the watch request
450 * @param value watch context
451 * @return #GNUNET_YES to continue iteration
452 */
453static int
454destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
455{
456 struct GNUNET_PEERSTORE_WatchContext *wc = value;
457
458 GNUNET_PEERSTORE_watch_cancel (wc);
459 return GNUNET_YES;
460}
461
462
463/**
464 * Connect to the PEERSTORE service. 448 * Connect to the PEERSTORE service.
465 * 449 *
466 * @param cfg configuration to use 450 * @param cfg configuration to use
@@ -493,26 +477,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
493void 477void
494GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h) 478GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h)
495{ 479{
496 struct GNUNET_PEERSTORE_IterateContext *ic; 480 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnect initiated from client.\n");
497 struct GNUNET_PEERSTORE_StoreContext *sc;
498
499 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
500 if (NULL != h->watches)
501 {
502 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
503 GNUNET_CONTAINER_multihashmap_destroy (h->watches);
504 h->watches = NULL;
505 }
506 while (NULL != (ic = h->iterate_head))
507 {
508 GNUNET_break (0);
509 GNUNET_PEERSTORE_iterate_cancel (ic);
510 }
511 while (NULL != (sc = h->store_head))
512 {
513 GNUNET_break (0);
514 GNUNET_PEERSTORE_store_cancel (sc);
515 }
516 disconnect (h); 481 disconnect (h);
517} 482}
518 483
@@ -530,17 +495,17 @@ GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h)
530void 495void
531GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) 496GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
532{ 497{
533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 498 LOG (GNUNET_ERROR_TYPE_DEBUG,
534 "store cancel with sc %p \n", 499 "store cancel with sc %p \n",
535 sc); 500 sc);
536 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc); 501 GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
537 GNUNET_free (sc->sub_system); 502 GNUNET_free (sc->sub_system);
538 GNUNET_free (sc->value); 503 GNUNET_free (sc->value);
539 GNUNET_free (sc->key); 504 GNUNET_free (sc->key);
540 GNUNET_free (sc); 505 GNUNET_free (sc);
541 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 506 LOG (GNUNET_ERROR_TYPE_DEBUG,
542 "store cancel with sc %p is null\n", 507 "store cancel with sc %p is null\n",
543 sc); 508 sc);
544} 509}
545 510
546 511
@@ -581,17 +546,8 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
581 sub_system, 546 sub_system,
582 GNUNET_i2s (peer), 547 GNUNET_i2s (peer),
583 key); 548 key);
584 ev =
585 PEERSTORE_create_record_mq_envelope (sub_system,
586 peer,
587 key,
588 value,
589 size,
590 expiry,
591 options,
592 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
593 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext); 549 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
594 550 sc->rid = get_op_id (h);
595 sc->sub_system = GNUNET_strdup (sub_system); 551 sc->sub_system = GNUNET_strdup (sub_system);
596 sc->peer = *peer; 552 sc->peer = *peer;
597 sc->key = GNUNET_strdup (key); 553 sc->key = GNUNET_strdup (key);
@@ -602,14 +558,54 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
602 sc->cont = cont; 558 sc->cont = cont;
603 sc->cont_cls = cont_cls; 559 sc->cont_cls = cont_cls;
604 sc->h = h; 560 sc->h = h;
561 ev =
562 PEERSTORE_create_record_mq_envelope (sc->rid,
563 sub_system,
564 peer,
565 key,
566 value,
567 size,
568 expiry,
569 options,
570 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
605 571
606 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc); 572 GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
607 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
608 GNUNET_MQ_send (h->mq, ev); 573 GNUNET_MQ_send (h->mq, ev);
609 return sc; 574 return sc;
610} 575}
611 576
612 577
578/**
579 * When a response for store request is received
580 *
581 * @param cls a `struct GNUNET_PEERSTORE_Handle *`
582 * @param msg message received
583 */
584static void
585handle_store_result (void *cls, const struct PeerstoreResultMessage *msg)
586{
587 struct GNUNET_PEERSTORE_Handle *h = cls;
588 struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head;
589
590 LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerstoreResultMessage\n");
591 for (sc = h->store_head; NULL != sc; sc = sc->next)
592 {
593 LOG (GNUNET_ERROR_TYPE_DEBUG, "Trying %u ?= %u\n", sc->rid, msg->rid);
594 if (sc->rid == msg->rid)
595 break;
596 }
597 if (NULL == sc)
598 {
599 LOG (GNUNET_ERROR_TYPE_WARNING,
600 _("Unexpected store response.\n"));
601 return;
602 }
603 if (NULL != sc->cont)
604 sc->cont (sc->cont_cls, ntohl (msg->result));
605 GNUNET_CONTAINER_DLL_remove (h->store_head, h->store_tail, sc);
606}
607
608
613/******************************************************************************/ 609/******************************************************************************/
614/******************* ITERATE FUNCTIONS *********************/ 610/******************* ITERATE FUNCTIONS *********************/
615/******************************************************************************/ 611/******************************************************************************/
@@ -622,29 +618,24 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
622 * @param msg message received 618 * @param msg message received
623 */ 619 */
624static void 620static void
625handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg) 621handle_iterate_end (void *cls, const struct PeerstoreResultMessage *msg)
626{ 622{
627 struct GNUNET_PEERSTORE_Handle *h = cls; 623 struct GNUNET_PEERSTORE_Handle *h = cls;
628 struct GNUNET_PEERSTORE_IterateContext *ic; 624 struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
629 GNUNET_PEERSTORE_Processor callback;
630 void *callback_cls;
631 625
632 ic = h->iterate_head; 626 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
627 if (ic->rid == msg->rid)
628 break;
633 if (NULL == ic) 629 if (NULL == ic)
634 { 630 {
635 LOG (GNUNET_ERROR_TYPE_ERROR, 631 LOG (GNUNET_ERROR_TYPE_WARNING,
636 _ ("Unexpected iteration response, this should not happen.\n")); 632 _ ("Unexpected iteration response.\n"));
637 disconnect_and_schedule_reconnect (h);
638 return; 633 return;
639 } 634 }
640 callback = ic->callback; 635 if (NULL != ic->callback)
641 callback_cls = ic->callback_cls; 636 ic->callback (ic->callback_cls, NULL, NULL);
642 ic->iterating = GNUNET_NO; 637 LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up iteration with rid %u\n", ic->rid);
643 GNUNET_PEERSTORE_iterate_cancel (ic); 638 GNUNET_CONTAINER_DLL_remove (h->iterate_head, h->iterate_tail, ic);
644 /* NOTE: set this here and not after callback because callback may free h */
645 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
646 if (NULL != callback)
647 callback (callback_cls, NULL, NULL);
648} 639}
649 640
650 641
@@ -674,11 +665,12 @@ handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
674{ 665{
675 struct GNUNET_PEERSTORE_Handle *h = cls; 666 struct GNUNET_PEERSTORE_Handle *h = cls;
676 struct GNUNET_PEERSTORE_IterateContext *ic; 667 struct GNUNET_PEERSTORE_IterateContext *ic;
677 GNUNET_PEERSTORE_Processor callback;
678 void *callback_cls;
679 struct GNUNET_PEERSTORE_Record *record; 668 struct GNUNET_PEERSTORE_Record *record;
680 669
681 ic = h->iterate_head; 670 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received StoreRecordMessage\n");
671 for (ic = h->iterate_head; NULL != ic; ic = ic->next)
672 if (ic->rid == msg->rid)
673 break;
682 if (NULL == ic) 674 if (NULL == ic)
683 { 675 {
684 LOG (GNUNET_ERROR_TYPE_ERROR, 676 LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -686,21 +678,18 @@ handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
686 disconnect_and_schedule_reconnect (h); 678 disconnect_and_schedule_reconnect (h);
687 return; 679 return;
688 } 680 }
689 ic->iterating = GNUNET_YES; 681 if (NULL == ic->callback)
690 callback = ic->callback;
691 callback_cls = ic->callback_cls;
692 if (NULL == callback)
693 return; 682 return;
694 record = PEERSTORE_parse_record_message (msg); 683 record = PEERSTORE_parse_record_message (msg);
695 if (NULL == record) 684 if (NULL == record)
696 { 685 {
697 callback (callback_cls, 686 ic->callback (ic->callback_cls,
698 NULL, 687 NULL,
699 _ ("Received a malformed response from service.")); 688 _ ("Received a malformed response from service."));
700 } 689 }
701 else 690 else
702 { 691 {
703 callback (callback_cls, record, NULL); 692 ic->callback (ic->callback_cls, record, NULL);
704 PEERSTORE_destroy_record (record); 693 PEERSTORE_destroy_record (record);
705 } 694 }
706} 695}
@@ -715,11 +704,6 @@ handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
715void 704void
716GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) 705GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
717{ 706{
718 if (GNUNET_YES == ic->iterating)
719 {
720 if (NULL != ic->callback)
721 ic->callback (ic->callback_cls, NULL, "Iteration canceled due to reconnection");
722 }
723 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic); 707 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
724 GNUNET_free (ic->sub_system); 708 GNUNET_free (ic->sub_system);
725 GNUNET_free (ic->key); 709 GNUNET_free (ic->key);
@@ -738,8 +722,11 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
738 struct GNUNET_MQ_Envelope *ev; 722 struct GNUNET_MQ_Envelope *ev;
739 struct GNUNET_PEERSTORE_IterateContext *ic; 723 struct GNUNET_PEERSTORE_IterateContext *ic;
740 724
725 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
726 ic->rid = get_op_id (h);
741 ev = 727 ev =
742 PEERSTORE_create_record_mq_envelope (sub_system, 728 PEERSTORE_create_record_mq_envelope (ic->rid,
729 sub_system,
743 peer, 730 peer,
744 key, 731 key,
745 NULL, 732 NULL,
@@ -747,7 +734,6 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
747 GNUNET_TIME_UNIT_FOREVER_ABS, 734 GNUNET_TIME_UNIT_FOREVER_ABS,
748 0, 735 0,
749 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 736 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
750 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
751 ic->callback = callback; 737 ic->callback = callback;
752 ic->callback_cls = callback_cls; 738 ic->callback_cls = callback_cls;
753 ic->h = h; 739 ic->h = h;
@@ -831,10 +817,14 @@ static void
831reconnect (void *cls) 817reconnect (void *cls)
832{ 818{
833 struct GNUNET_PEERSTORE_Handle *h = cls; 819 struct GNUNET_PEERSTORE_Handle *h = cls;
834 struct GNUNET_MQ_MessageHandler mq_handlers[] = 820 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
835 { GNUNET_MQ_hd_fixed_size (iterate_end, 821 GNUNET_MQ_hd_fixed_size (iterate_end,
836 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, 822 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
837 struct GNUNET_MessageHeader, 823 struct PeerstoreResultMessage,
824 h),
825 GNUNET_MQ_hd_fixed_size (store_result,
826 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT,
827 struct PeerstoreResultMessage,
838 h), 828 h),
839 GNUNET_MQ_hd_var_size (iterate_result, 829 GNUNET_MQ_hd_var_size (iterate_result,
840 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 830 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
@@ -844,7 +834,8 @@ reconnect (void *cls)
844 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 834 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
845 struct StoreRecordMessage, 835 struct StoreRecordMessage,
846 h), 836 h),
847 GNUNET_MQ_handler_end () }; 837 GNUNET_MQ_handler_end ()
838 };
848 struct GNUNET_MQ_Envelope *ev; 839 struct GNUNET_MQ_Envelope *ev;
849 840
850 h->reconnect_task = NULL; 841 h->reconnect_task = NULL;
@@ -868,8 +859,10 @@ reconnect (void *cls)
868 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; 859 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
869 ic = ic->next) 860 ic = ic->next)
870 { 861 {
862 ic->rid = get_op_id(h);
871 ev = 863 ev =
872 PEERSTORE_create_record_mq_envelope (ic->sub_system, 864 PEERSTORE_create_record_mq_envelope (ic->rid,
865 ic->sub_system,
873 &ic->peer, 866 &ic->peer,
874 ic->key, 867 ic->key,
875 NULL, 868 NULL,
@@ -882,8 +875,10 @@ reconnect (void *cls)
882 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc; 875 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
883 sc = sc->next) 876 sc = sc->next)
884 { 877 {
878 sc->rid = get_op_id(h);
885 ev = 879 ev =
886 PEERSTORE_create_record_mq_envelope (sc->sub_system, 880 PEERSTORE_create_record_mq_envelope (sc->rid,
881 sc->sub_system,
887 &sc->peer, 882 &sc->peer,
888 sc->key, 883 sc->key,
889 sc->value, 884 sc->value,
@@ -891,7 +886,6 @@ reconnect (void *cls)
891 sc->expiry, 886 sc->expiry,
892 sc->options, 887 sc->options,
893 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 888 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
894 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
895 GNUNET_MQ_send (h->mq, ev); 889 GNUNET_MQ_send (h->mq, ev);
896 } 890 }
897} 891}
@@ -909,7 +903,7 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
909 struct GNUNET_MQ_Envelope *ev; 903 struct GNUNET_MQ_Envelope *ev;
910 struct StoreKeyHashMessage *hm; 904 struct StoreKeyHashMessage *hm;
911 905
912 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); 906 LOG (GNUNET_ERROR_TYPE_DEBUG, "Cancelling watch.\n");
913 if (NULL != wc->ic) 907 if (NULL != wc->ic)
914 { 908 {
915 GNUNET_PEERSTORE_iterate_cancel (wc->ic); 909 GNUNET_PEERSTORE_iterate_cancel (wc->ic);
@@ -919,6 +913,7 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
919 913
920 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); 914 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
921 hm->keyhash = wc->keyhash; 915 hm->keyhash = wc->keyhash;
916 hm->rid = get_op_id (h);
922 GNUNET_MQ_send (h->mq, ev); 917 GNUNET_MQ_send (h->mq, ev);
923 GNUNET_assert ( 918 GNUNET_assert (
924 GNUNET_YES == 919 GNUNET_YES ==
@@ -935,53 +930,55 @@ watch_iterate (void *cls,
935 struct GNUNET_PEERSTORE_WatchContext *wc = cls; 930 struct GNUNET_PEERSTORE_WatchContext *wc = cls;
936 struct GNUNET_PEERSTORE_Handle *h = wc->h; 931 struct GNUNET_PEERSTORE_Handle *h = wc->h;
937 struct StoreKeyHashMessage *hm; 932 struct StoreKeyHashMessage *hm;
933 struct GNUNET_MQ_Envelope *ev;
934 const struct GNUNET_PeerIdentity *peer;
938 935
939 if (NULL != emsg) 936 if (NULL != emsg)
940 { 937 {
941 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 938 LOG (GNUNET_ERROR_TYPE_WARNING,
942 "Got failure from PEERSTORE: %s\n", 939 "Got failure from PEERSTORE: %s\n",
943 emsg); 940 emsg);
944 wc->callback (wc->callback_cls, NULL, emsg); 941 wc->callback (wc->callback_cls, NULL, emsg);
945 return; 942 return;
946 } 943 }
947 if (NULL == record) 944 if ((NULL != record) &&
945 (NULL != wc->callback))
948 { 946 {
949 struct GNUNET_MQ_Envelope *ev; 947 wc->callback (wc->callback_cls, record, NULL);
950 const struct GNUNET_PeerIdentity *peer;
951
952 if (NULL == wc->peer)
953 peer = GNUNET_new (struct GNUNET_PeerIdentity);
954 else
955 peer = wc->peer;
956 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
957 PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
958 LOG (GNUNET_ERROR_TYPE_DEBUG,
959 "Hash key we watch for %s\n",
960 GNUNET_h2s_full (&hm->keyhash));
961 wc->keyhash = hm->keyhash;
962 if (NULL == h->watches)
963 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
964 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
965 h->watches,
966 &wc->keyhash,
967 wc,
968 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
969 LOG (GNUNET_ERROR_TYPE_DEBUG,
970 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
971 wc->sub_system,
972 GNUNET_i2s (peer),
973 wc->key);
974 GNUNET_MQ_send (h->mq, ev);
975 wc->ic = NULL;
976 if (NULL != wc->callback)
977 wc->callback (wc->callback_cls, record, NULL);
978 if (NULL == wc->peer)
979 GNUNET_free_nz ((void *) peer);
980 return; 948 return;
981 } 949 }
982 950
951 if (NULL == wc->peer)
952 peer = GNUNET_new (struct GNUNET_PeerIdentity);
953 else
954 peer = wc->peer;
955 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
956 PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
957 hm->rid = get_op_id (h);
958 LOG (GNUNET_ERROR_TYPE_DEBUG,
959 "Hash key we watch for %s\n",
960 GNUNET_h2s_full (&hm->keyhash));
961 wc->keyhash = hm->keyhash;
962 if (NULL == h->watches)
963 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
964 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
965 h->watches,
966 &wc->keyhash,
967 wc,
968 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
969 LOG (GNUNET_ERROR_TYPE_DEBUG,
970 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
971 wc->sub_system,
972 GNUNET_i2s (peer),
973 wc->key);
974 GNUNET_MQ_send (h->mq, ev);
975 wc->ic = NULL;
983 if (NULL != wc->callback) 976 if (NULL != wc->callback)
984 wc->callback (wc->callback_cls, record, NULL); 977 wc->callback (wc->callback_cls, record, NULL);
978 if (NULL == wc->peer)
979 GNUNET_free_nz ((void *) peer);
980 return;
981
985} 982}
986 983
987 984
@@ -1040,32 +1037,32 @@ hello_updated (void *cls,
1040 struct GNUNET_PEERSTORE_NotifyContext *nc = cls; 1037 struct GNUNET_PEERSTORE_NotifyContext *nc = cls;
1041 const struct GNUNET_MessageHeader *hello; 1038 const struct GNUNET_MessageHeader *hello;
1042 1039
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1040 LOG (GNUNET_ERROR_TYPE_DEBUG,
1044 "hello_updated\n"); 1041 "hello_updated\n");
1045 if (NULL != emsg) 1042 if (NULL != emsg)
1046 { 1043 {
1047 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1044 LOG (GNUNET_ERROR_TYPE_WARNING,
1048 "Got failure from PEERSTORE: %s\n", 1045 "Got failure from PEERSTORE: %s\n",
1049 emsg); 1046 emsg);
1050 nc->callback (nc->callback_cls, NULL, NULL, emsg); 1047 nc->callback (nc->callback_cls, NULL, NULL, emsg);
1051 return; 1048 return;
1052 } 1049 }
1053 if (NULL == record) 1050 if (NULL == record)
1054 return; 1051 return;
1055 hello = record->value; 1052 hello = record->value;
1056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1053 LOG (GNUNET_ERROR_TYPE_DEBUG,
1057 "hello_updated with expired %s and size %u for peer %s\n", 1054 "hello_updated with expired %s and size %u for peer %s\n",
1058 GNUNET_STRINGS_absolute_time_to_string ( 1055 GNUNET_STRINGS_absolute_time_to_string (
1059 GNUNET_HELLO_builder_get_expiration_time (hello)), 1056 GNUNET_HELLO_builder_get_expiration_time (hello)),
1060 ntohs (hello->size), 1057 ntohs (hello->size),
1061 GNUNET_i2s (&record->peer)); 1058 GNUNET_i2s (&record->peer));
1062 if ((0 == record->value_size)) 1059 if ((0 == record->value_size))
1063 { 1060 {
1064 GNUNET_break (0); 1061 GNUNET_break (0);
1065 return; 1062 return;
1066 } 1063 }
1067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1064 LOG (GNUNET_ERROR_TYPE_DEBUG,
1068 "hello_updated call callback\n"); 1065 "hello_updated call callback\n");
1069 nc->callback (nc->callback_cls, &record->peer, hello, NULL); 1066 nc->callback (nc->callback_cls, &record->peer, hello, NULL);
1070} 1067}
1071 1068
@@ -1119,13 +1116,13 @@ merge_success (void *cls, int success)
1119 1116
1120 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_remove (huc->store_context_map, 1117 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_remove (huc->store_context_map,
1121 huc->pid, shu_cls->sc)) 1118 huc->pid, shu_cls->sc))
1122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1119 LOG (GNUNET_ERROR_TYPE_WARNING,
1123 "There was no store context to be removed after storing hello for peer %s\n", 1120 "There was no store context to be removed after storing hello for peer %s\n",
1124 GNUNET_i2s (huc->pid)); 1121 GNUNET_i2s (huc->pid));
1125 if (GNUNET_OK != success) 1122 if (GNUNET_OK != success)
1126 { 1123 {
1127 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1124 LOG (GNUNET_ERROR_TYPE_WARNING,
1128 "Storing hello uri failed\n"); 1125 "Storing hello uri failed\n");
1129 huc->cont (huc->cont_cls, success); 1126 huc->cont (huc->cont_cls, success);
1130 GNUNET_free (huc->hello); 1127 GNUNET_free (huc->hello);
1131 GNUNET_free (huc->pid); 1128 GNUNET_free (huc->pid);
@@ -1139,18 +1136,18 @@ merge_success (void *cls, int success)
1139 huc->wc = NULL; 1136 huc->wc = NULL;
1140 huc->cont (huc->cont_cls, GNUNET_OK); 1137 huc->cont (huc->cont_cls, GNUNET_OK);
1141 huc->success = GNUNET_OK; 1138 huc->success = GNUNET_OK;
1142 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1139 LOG (GNUNET_ERROR_TYPE_DEBUG,
1143 "Storing hello uri succeeded for peer %s!\n", 1140 "Storing hello uri succeeded for peer %s!\n",
1144 GNUNET_i2s (huc->pid)); 1141 GNUNET_i2s (huc->pid));
1145 GNUNET_free (huc->hello); 1142 GNUNET_free (huc->hello);
1146 GNUNET_free (huc->pid); 1143 GNUNET_free (huc->pid);
1147 GNUNET_free (huc); 1144 GNUNET_free (huc);
1148 GNUNET_free (shu_cls); 1145 GNUNET_free (shu_cls);
1149 return; 1146 return;
1150 } 1147 }
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1148 LOG (GNUNET_ERROR_TYPE_DEBUG,
1152 "Got notified during storing hello uri for peer %s!\n", 1149 "Got notified during storing hello uri for peer %s!\n",
1153 GNUNET_i2s (huc->pid)); 1150 GNUNET_i2s (huc->pid));
1154 GNUNET_free (shu_cls); 1151 GNUNET_free (shu_cls);
1155} 1152}
1156 1153
@@ -1176,9 +1173,9 @@ store_hello (struct GNUNET_PEERSTORE_StoreHelloContext *huc,
1176 GNUNET_PEERSTORE_STOREOPTION_REPLACE, 1173 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1177 merge_success, 1174 merge_success,
1178 shu_cls); 1175 shu_cls);
1179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1176 LOG (GNUNET_ERROR_TYPE_DEBUG,
1180 "store_hello with expiration %s\n", 1177 "store_hello with expiration %s\n",
1181 GNUNET_STRINGS_absolute_time_to_string (hello_exp)); 1178 GNUNET_STRINGS_absolute_time_to_string (hello_exp));
1182 GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_put ( 1179 GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_put (
1183 huc->store_context_map, 1180 huc->store_context_map,
1184 huc->pid, 1181 huc->pid,
@@ -1202,27 +1199,27 @@ merge_uri (void *cls,
1202 1199
1203 if (NULL != emsg) 1200 if (NULL != emsg)
1204 { 1201 {
1205 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1202 LOG (GNUNET_ERROR_TYPE_WARNING,
1206 "Got failure from PEERSTORE: %s\n", 1203 "Got failure from PEERSTORE: %s\n",
1207 emsg); 1204 emsg);
1208 return; 1205 return;
1209 } 1206 }
1210 1207
1211 if (NULL == record && GNUNET_NO == huc->success) 1208 if (NULL == record && GNUNET_NO == huc->success)
1212 { 1209 {
1213 huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello); 1210 huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
1214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1211 LOG (GNUNET_ERROR_TYPE_DEBUG,
1215 "merge_uri just store for peer %s with expiration %s\n", 1212 "merge_uri just store for peer %s with expiration %s\n",
1216 GNUNET_i2s (huc->pid), 1213 GNUNET_i2s (huc->pid),
1217 GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time)); 1214 GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
1218 store_hello (huc, huc->hello); 1215 store_hello (huc, huc->hello);
1219 } 1216 }
1220 else if (GNUNET_NO == huc->success && 0 == GNUNET_memcmp (huc->pid, 1217 else if (GNUNET_NO == huc->success && 0 == GNUNET_memcmp (huc->pid,
1221 &record->peer)) 1218 &record->peer))
1222 { 1219 {
1223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1220 LOG (GNUNET_ERROR_TYPE_DEBUG,
1224 "merge_uri record for peer %s\n", 1221 "merge_uri record for peer %s\n",
1225 GNUNET_i2s (&record->peer)); 1222 GNUNET_i2s (&record->peer));
1226 hello = record->value; 1223 hello = record->value;
1227 if ((0 == record->value_size)) 1224 if ((0 == record->value_size))
1228 { 1225 {