aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-06-04 11:57:59 +0200
committerChristian Grothoff <christian@grothoff.org>2019-06-04 11:57:59 +0200
commit58002acac13b2eef407a20ee3ddc5f458cd5e483 (patch)
tree8285fc1b32a8c9ac55f970667e7d29b791b6758e /src/peerstore/peerstore_api.c
parent9ce956ea4c93f038995a21c6c1c0133eee6bff75 (diff)
downloadgnunet-58002acac13b2eef407a20ee3ddc5f458cd5e483.tar.gz
gnunet-58002acac13b2eef407a20ee3ddc5f458cd5e483.zip
nicer loop structure
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c247
1 files changed, 103 insertions, 144 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 02f3e287a..64bc3ae72 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -28,7 +28,7 @@
28#include "peerstore.h" 28#include "peerstore.h"
29#include "peerstore_common.h" 29#include "peerstore_common.h"
30 30
31#define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__) 31#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
32 32
33/******************************************************************************/ 33/******************************************************************************/
34/************************ DATA STRUCTURES ****************************/ 34/************************ DATA STRUCTURES ****************************/
@@ -89,7 +89,6 @@ struct GNUNET_PEERSTORE_Handle
89 * Are we in the process of disconnecting but need to sync first? 89 * Are we in the process of disconnecting but need to sync first?
90 */ 90 */
91 int disconnecting; 91 int disconnecting;
92
93}; 92};
94 93
95/** 94/**
@@ -156,7 +155,6 @@ struct GNUNET_PEERSTORE_StoreContext
156 * Options for the store operation. 155 * Options for the store operation.
157 */ 156 */
158 enum GNUNET_PEERSTORE_StoreOption options; 157 enum GNUNET_PEERSTORE_StoreOption options;
159
160}; 158};
161 159
162/** 160/**
@@ -208,7 +206,6 @@ struct GNUNET_PEERSTORE_IterateContext
208 * #GNUNET_YES if we are currently processing records. 206 * #GNUNET_YES if we are currently processing records.
209 */ 207 */
210 int iterating; 208 int iterating;
211
212}; 209};
213 210
214/** 211/**
@@ -245,7 +242,6 @@ struct GNUNET_PEERSTORE_WatchContext
245 * Hash of the combined key 242 * Hash of the combined key
246 */ 243 */
247 struct GNUNET_HashCode keyhash; 244 struct GNUNET_HashCode keyhash;
248
249}; 245};
250 246
251/******************************************************************************/ 247/******************************************************************************/
@@ -271,8 +267,7 @@ disconnect (struct GNUNET_PEERSTORE_Handle *h)
271{ 267{
272 struct GNUNET_PEERSTORE_IterateContext *next; 268 struct GNUNET_PEERSTORE_IterateContext *next;
273 269
274 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; 270 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
275 NULL != ic;
276 ic = next) 271 ic = next)
277 { 272 {
278 next = ic->next; 273 next = ic->next;
@@ -285,9 +280,7 @@ disconnect (struct GNUNET_PEERSTORE_Handle *h)
285 icb_cls = ic->callback_cls; 280 icb_cls = ic->callback_cls;
286 GNUNET_PEERSTORE_iterate_cancel (ic); 281 GNUNET_PEERSTORE_iterate_cancel (ic);
287 if (NULL != icb) 282 if (NULL != icb)
288 icb (icb_cls, 283 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
289 NULL,
290 "Iteration canceled due to reconnection");
291 } 284 }
292 } 285 }
293 286
@@ -312,17 +305,13 @@ disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
312 disconnect (h); 305 disconnect (h);
313 LOG (GNUNET_ERROR_TYPE_DEBUG, 306 LOG (GNUNET_ERROR_TYPE_DEBUG,
314 "Scheduling task to reconnect to PEERSTORE service in %s.\n", 307 "Scheduling task to reconnect to PEERSTORE service in %s.\n",
315 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, 308 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
316 GNUNET_YES));
317 h->reconnect_task = 309 h->reconnect_task =
318 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 310 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
319 &reconnect,
320 h);
321 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); 311 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
322} 312}
323 313
324 314
325
326/** 315/**
327 * Callback after MQ envelope is sent 316 * Callback after MQ envelope is sent
328 * 317 *
@@ -352,8 +341,7 @@ store_request_sent (void *cls)
352 * Function called when we had trouble talking to the service. 341 * Function called when we had trouble talking to the service.
353 */ 342 */
354static void 343static void
355handle_client_error (void *cls, 344handle_client_error (void *cls, enum GNUNET_MQ_Error error)
356 enum GNUNET_MQ_Error error)
357{ 345{
358 struct GNUNET_PEERSTORE_Handle *h = cls; 346 struct GNUNET_PEERSTORE_Handle *h = cls;
359 347
@@ -373,9 +361,7 @@ handle_client_error (void *cls,
373 * @return #GNUNET_YES (continue to iterate) 361 * @return #GNUNET_YES (continue to iterate)
374 */ 362 */
375static int 363static int
376rewatch_it (void *cls, 364rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
377 const struct GNUNET_HashCode *key,
378 void *value)
379{ 365{
380 struct GNUNET_PEERSTORE_Handle *h = cls; 366 struct GNUNET_PEERSTORE_Handle *h = cls;
381 struct GNUNET_PEERSTORE_WatchContext *wc = value; 367 struct GNUNET_PEERSTORE_WatchContext *wc = value;
@@ -398,9 +384,7 @@ rewatch_it (void *cls,
398 * @return #GNUNET_YES to continue iteration 384 * @return #GNUNET_YES to continue iteration
399 */ 385 */
400static int 386static int
401destroy_watch (void *cls, 387destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
402 const struct GNUNET_HashCode *key,
403 void *value)
404{ 388{
405 struct GNUNET_PEERSTORE_WatchContext *wc = value; 389 struct GNUNET_PEERSTORE_WatchContext *wc = value;
406 390
@@ -461,8 +445,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
461 * @param sync_first send any pending STORE requests before disconnecting 445 * @param sync_first send any pending STORE requests before disconnecting
462 */ 446 */
463void 447void
464GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, 448GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
465 int sync_first)
466{ 449{
467 struct GNUNET_PEERSTORE_IterateContext *ic; 450 struct GNUNET_PEERSTORE_IterateContext *ic;
468 struct GNUNET_PEERSTORE_StoreContext *sc; 451 struct GNUNET_PEERSTORE_StoreContext *sc;
@@ -515,8 +498,7 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
515 GNUNET_free (sc->value); 498 GNUNET_free (sc->value);
516 GNUNET_free (sc->key); 499 GNUNET_free (sc->key);
517 GNUNET_free (sc); 500 GNUNET_free (sc);
518 if ( (GNUNET_YES == h->disconnecting) && 501 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
519 (NULL == h->store_head) )
520 final_disconnect (h); 502 final_disconnect (h);
521} 503}
522 504
@@ -542,7 +524,8 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
542 const char *sub_system, 524 const char *sub_system,
543 const struct GNUNET_PeerIdentity *peer, 525 const struct GNUNET_PeerIdentity *peer,
544 const char *key, 526 const char *key,
545 const void *value, size_t size, 527 const void *value,
528 size_t size,
546 struct GNUNET_TIME_Absolute expiry, 529 struct GNUNET_TIME_Absolute expiry,
547 enum GNUNET_PEERSTORE_StoreOption options, 530 enum GNUNET_PEERSTORE_StoreOption options,
548 GNUNET_PEERSTORE_Continuation cont, 531 GNUNET_PEERSTORE_Continuation cont,
@@ -553,10 +536,19 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
553 536
554 LOG (GNUNET_ERROR_TYPE_DEBUG, 537 LOG (GNUNET_ERROR_TYPE_DEBUG,
555 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", 538 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
556 size, sub_system, GNUNET_i2s (peer), key); 539 size,
557 ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size, 540 sub_system,
558 expiry, options, 541 GNUNET_i2s (peer),
559 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 542 key);
543 ev =
544 PEERSTORE_create_record_mq_envelope (sub_system,
545 peer,
546 key,
547 value,
548 size,
549 expiry,
550 options,
551 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
560 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext); 552 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
561 553
562 sc->sub_system = GNUNET_strdup (sub_system); 554 sc->sub_system = GNUNET_strdup (sub_system);
@@ -574,7 +566,6 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
574 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); 566 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
575 GNUNET_MQ_send (h->mq, ev); 567 GNUNET_MQ_send (h->mq, ev);
576 return sc; 568 return sc;
577
578} 569}
579 570
580 571
@@ -590,8 +581,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
590 * @param msg message received 581 * @param msg message received
591 */ 582 */
592static void 583static void
593handle_iterate_end (void *cls, 584handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
594 const struct GNUNET_MessageHeader *msg)
595{ 585{
596 struct GNUNET_PEERSTORE_Handle *h = cls; 586 struct GNUNET_PEERSTORE_Handle *h = cls;
597 struct GNUNET_PEERSTORE_IterateContext *ic; 587 struct GNUNET_PEERSTORE_IterateContext *ic;
@@ -602,7 +592,7 @@ handle_iterate_end (void *cls,
602 if (NULL == ic) 592 if (NULL == ic)
603 { 593 {
604 LOG (GNUNET_ERROR_TYPE_ERROR, 594 LOG (GNUNET_ERROR_TYPE_ERROR,
605 _("Unexpected iteration response, this should not happen.\n")); 595 _ ("Unexpected iteration response, this should not happen.\n"));
606 disconnect_and_schedule_reconnect (h); 596 disconnect_and_schedule_reconnect (h);
607 return; 597 return;
608 } 598 }
@@ -611,9 +601,7 @@ handle_iterate_end (void *cls,
611 ic->iterating = GNUNET_NO; 601 ic->iterating = GNUNET_NO;
612 GNUNET_PEERSTORE_iterate_cancel (ic); 602 GNUNET_PEERSTORE_iterate_cancel (ic);
613 if (NULL != callback) 603 if (NULL != callback)
614 callback (callback_cls, 604 callback (callback_cls, NULL, NULL);
615 NULL,
616 NULL);
617 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 605 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
618} 606}
619 607
@@ -626,8 +614,7 @@ handle_iterate_end (void *cls,
626 * @param msg message received 614 * @param msg message received
627 */ 615 */
628static int 616static int
629check_iterate_result (void *cls, 617check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
630 const struct StoreRecordMessage *msg)
631{ 618{
632 /* we defer validation to #handle_iterate_result */ 619 /* we defer validation to #handle_iterate_result */
633 return GNUNET_OK; 620 return GNUNET_OK;
@@ -641,8 +628,7 @@ check_iterate_result (void *cls,
641 * @param msg message received 628 * @param msg message received
642 */ 629 */
643static void 630static void
644handle_iterate_result (void *cls, 631handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
645 const struct StoreRecordMessage *msg)
646{ 632{
647 struct GNUNET_PEERSTORE_Handle *h = cls; 633 struct GNUNET_PEERSTORE_Handle *h = cls;
648 struct GNUNET_PEERSTORE_IterateContext *ic; 634 struct GNUNET_PEERSTORE_IterateContext *ic;
@@ -654,7 +640,7 @@ handle_iterate_result (void *cls,
654 if (NULL == ic) 640 if (NULL == ic)
655 { 641 {
656 LOG (GNUNET_ERROR_TYPE_ERROR, 642 LOG (GNUNET_ERROR_TYPE_ERROR,
657 _("Unexpected iteration response, this should not happen.\n")); 643 _ ("Unexpected iteration response, this should not happen.\n"));
658 disconnect_and_schedule_reconnect (h); 644 disconnect_and_schedule_reconnect (h);
659 return; 645 return;
660 } 646 }
@@ -668,13 +654,11 @@ handle_iterate_result (void *cls,
668 { 654 {
669 callback (callback_cls, 655 callback (callback_cls,
670 NULL, 656 NULL,
671 _("Received a malformed response from service.")); 657 _ ("Received a malformed response from service."));
672 } 658 }
673 else 659 else
674 { 660 {
675 callback (callback_cls, 661 callback (callback_cls, record, NULL);
676 record,
677 NULL);
678 PEERSTORE_destroy_record (record); 662 PEERSTORE_destroy_record (record);
679 } 663 }
680} 664}
@@ -691,9 +675,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
691{ 675{
692 if (GNUNET_NO == ic->iterating) 676 if (GNUNET_NO == ic->iterating)
693 { 677 {
694 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, 678 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
695 ic->h->iterate_tail,
696 ic);
697 GNUNET_free (ic->sub_system); 679 GNUNET_free (ic->sub_system);
698 GNUNET_free_non_null (ic->key); 680 GNUNET_free_non_null (ic->key);
699 GNUNET_free (ic); 681 GNUNET_free (ic);
@@ -725,13 +707,15 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
725 struct GNUNET_MQ_Envelope *ev; 707 struct GNUNET_MQ_Envelope *ev;
726 struct GNUNET_PEERSTORE_IterateContext *ic; 708 struct GNUNET_PEERSTORE_IterateContext *ic;
727 709
728 ev = PEERSTORE_create_record_mq_envelope (sub_system, 710 ev =
729 peer, 711 PEERSTORE_create_record_mq_envelope (sub_system,
730 key, 712 peer,
731 NULL, 0, 713 key,
732 GNUNET_TIME_UNIT_FOREVER_ABS, 714 NULL,
733 0, 715 0,
734 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 716 GNUNET_TIME_UNIT_FOREVER_ABS,
717 0,
718 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
735 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext); 719 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
736 ic->callback = callback; 720 ic->callback = callback;
737 ic->callback_cls = callback_cls; 721 ic->callback_cls = callback_cls;
@@ -741,9 +725,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
741 ic->peer = *peer; 725 ic->peer = *peer;
742 if (NULL != key) 726 if (NULL != key)
743 ic->key = GNUNET_strdup (key); 727 ic->key = GNUNET_strdup (key);
744 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, 728 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
745 h->iterate_tail,
746 ic);
747 LOG (GNUNET_ERROR_TYPE_DEBUG, 729 LOG (GNUNET_ERROR_TYPE_DEBUG,
748 "Sending an iterate request for sub system `%s'\n", 730 "Sending an iterate request for sub system `%s'\n",
749 sub_system); 731 sub_system);
@@ -763,8 +745,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
763 * @param msg message received 745 * @param msg message received
764 */ 746 */
765static int 747static int
766check_watch_record (void *cls, 748check_watch_record (void *cls, const struct StoreRecordMessage *msg)
767 const struct StoreRecordMessage *msg)
768{ 749{
769 /* we defer validation to #handle_watch_result */ 750 /* we defer validation to #handle_watch_result */
770 return GNUNET_OK; 751 return GNUNET_OK;
@@ -778,41 +759,33 @@ check_watch_record (void *cls,
778 * @param msg message received 759 * @param msg message received
779 */ 760 */
780static void 761static void
781handle_watch_record (void *cls, 762handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
782 const struct StoreRecordMessage *msg)
783{ 763{
784 struct GNUNET_PEERSTORE_Handle *h = cls; 764 struct GNUNET_PEERSTORE_Handle *h = cls;
785 struct GNUNET_PEERSTORE_Record *record; 765 struct GNUNET_PEERSTORE_Record *record;
786 struct GNUNET_HashCode keyhash; 766 struct GNUNET_HashCode keyhash;
787 struct GNUNET_PEERSTORE_WatchContext *wc; 767 struct GNUNET_PEERSTORE_WatchContext *wc;
788 768
789 LOG (GNUNET_ERROR_TYPE_DEBUG, 769 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
790 "Received a watch record from service.\n");
791 record = PEERSTORE_parse_record_message (msg); 770 record = PEERSTORE_parse_record_message (msg);
792 if (NULL == record) 771 if (NULL == record)
793 { 772 {
794 disconnect_and_schedule_reconnect (h); 773 disconnect_and_schedule_reconnect (h);
795 return; 774 return;
796 } 775 }
797 PEERSTORE_hash_key (record->sub_system, 776 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
798 &record->peer,
799 record->key,
800 &keyhash);
801 // FIXME: what if there are multiple watches for the same key? 777 // FIXME: what if there are multiple watches for the same key?
802 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, 778 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
803 &keyhash);
804 if (NULL == wc) 779 if (NULL == wc)
805 { 780 {
806 LOG (GNUNET_ERROR_TYPE_ERROR, 781 LOG (GNUNET_ERROR_TYPE_ERROR,
807 _("Received a watch result for a non existing watch.\n")); 782 _ ("Received a watch result for a non existing watch.\n"));
808 PEERSTORE_destroy_record (record); 783 PEERSTORE_destroy_record (record);
809 disconnect_and_schedule_reconnect (h); 784 disconnect_and_schedule_reconnect (h);
810 return; 785 return;
811 } 786 }
812 if (NULL != wc->callback) 787 if (NULL != wc->callback)
813 wc->callback (wc->callback_cls, 788 wc->callback (wc->callback_cls, record, NULL);
814 record,
815 NULL);
816 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 789 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
817 PEERSTORE_destroy_record (record); 790 PEERSTORE_destroy_record (record);
818} 791}
@@ -827,26 +800,24 @@ static void
827reconnect (void *cls) 800reconnect (void *cls)
828{ 801{
829 struct GNUNET_PEERSTORE_Handle *h = cls; 802 struct GNUNET_PEERSTORE_Handle *h = cls;
830 struct GNUNET_MQ_MessageHandler mq_handlers[] = { 803 struct GNUNET_MQ_MessageHandler mq_handlers[] =
831 GNUNET_MQ_hd_fixed_size (iterate_end, 804 {GNUNET_MQ_hd_fixed_size (iterate_end,
832 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, 805 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
833 struct GNUNET_MessageHeader, 806 struct GNUNET_MessageHeader,
834 h), 807 h),
835 GNUNET_MQ_hd_var_size (iterate_result, 808 GNUNET_MQ_hd_var_size (iterate_result,
836 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 809 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
837 struct StoreRecordMessage, 810 struct StoreRecordMessage,
838 h), 811 h),
839 GNUNET_MQ_hd_var_size (watch_record, 812 GNUNET_MQ_hd_var_size (watch_record,
840 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 813 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
841 struct StoreRecordMessage, 814 struct StoreRecordMessage,
842 h), 815 h),
843 GNUNET_MQ_handler_end () 816 GNUNET_MQ_handler_end ()};
844 };
845 struct GNUNET_MQ_Envelope *ev; 817 struct GNUNET_MQ_Envelope *ev;
846 818
847 h->reconnect_task = NULL; 819 h->reconnect_task = NULL;
848 LOG (GNUNET_ERROR_TYPE_DEBUG, 820 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
849 "Reconnecting...\n");
850 h->mq = GNUNET_CLIENT_connect (h->cfg, 821 h->mq = GNUNET_CLIENT_connect (h->cfg,
851 "peerstore", 822 "peerstore",
852 mq_handlers, 823 mq_handlers,
@@ -857,39 +828,35 @@ reconnect (void *cls)
857 LOG (GNUNET_ERROR_TYPE_DEBUG, 828 LOG (GNUNET_ERROR_TYPE_DEBUG,
858 "Resending pending requests after reconnect.\n"); 829 "Resending pending requests after reconnect.\n");
859 if (NULL != h->watches) 830 if (NULL != h->watches)
860 GNUNET_CONTAINER_multihashmap_iterate (h->watches, 831 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
861 &rewatch_it, 832 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
862 h);
863 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
864 NULL != ic;
865 ic = ic->next) 833 ic = ic->next)
866 { 834 {
867 ev = PEERSTORE_create_record_mq_envelope (ic->sub_system, 835 ev =
868 &ic->peer, 836 PEERSTORE_create_record_mq_envelope (ic->sub_system,
869 ic->key, 837 &ic->peer,
870 NULL, 0, 838 ic->key,
871 GNUNET_TIME_UNIT_FOREVER_ABS, 839 NULL,
872 0, 840 0,
873 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 841 GNUNET_TIME_UNIT_FOREVER_ABS,
842 0,
843 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
874 GNUNET_MQ_send (h->mq, ev); 844 GNUNET_MQ_send (h->mq, ev);
875 } 845 }
876 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; 846 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
877 NULL != sc;
878 sc = sc->next) 847 sc = sc->next)
879 { 848 {
880 ev = PEERSTORE_create_record_mq_envelope (sc->sub_system, 849 ev =
881 &sc->peer, 850 PEERSTORE_create_record_mq_envelope (sc->sub_system,
882 sc->key, 851 &sc->peer,
883 sc->value, 852 sc->key,
884 sc->size, 853 sc->value,
885 sc->expiry, 854 sc->size,
886 sc->options, 855 sc->expiry,
887 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 856 sc->options,
888 GNUNET_MQ_notify_sent (ev, 857 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
889 &store_request_sent, 858 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
890 sc); 859 GNUNET_MQ_send (h->mq, ev);
891 GNUNET_MQ_send (h->mq,
892 ev);
893 } 860 }
894} 861}
895 862
@@ -906,15 +873,13 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
906 struct GNUNET_MQ_Envelope *ev; 873 struct GNUNET_MQ_Envelope *ev;
907 struct StoreKeyHashMessage *hm; 874 struct StoreKeyHashMessage *hm;
908 875
909 LOG (GNUNET_ERROR_TYPE_DEBUG, 876 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
910 "Canceling watch.\n"); 877 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
911 ev = GNUNET_MQ_msg (hm,
912 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
913 hm->keyhash = wc->keyhash; 878 hm->keyhash = wc->keyhash;
914 GNUNET_MQ_send (h->mq, ev); 879 GNUNET_MQ_send (h->mq, ev);
915 GNUNET_CONTAINER_multihashmap_remove (h->watches, 880 GNUNET_assert (
916 &wc->keyhash, 881 GNUNET_YES ==
917 wc); 882 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
918 GNUNET_free (wc); 883 GNUNET_free (wc);
919} 884}
920 885
@@ -943,32 +908,26 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
943 struct StoreKeyHashMessage *hm; 908 struct StoreKeyHashMessage *hm;
944 struct GNUNET_PEERSTORE_WatchContext *wc; 909 struct GNUNET_PEERSTORE_WatchContext *wc;
945 910
946 ev = GNUNET_MQ_msg (hm, 911 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
947 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); 912 PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
948 PEERSTORE_hash_key (sub_system,
949 peer,
950 key,
951 &hm->keyhash);
952 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext); 913 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
953 wc->callback = callback; 914 wc->callback = callback;
954 wc->callback_cls = callback_cls; 915 wc->callback_cls = callback_cls;
955 wc->h = h; 916 wc->h = h;
956 wc->keyhash = hm->keyhash; 917 wc->keyhash = hm->keyhash;
957 if (NULL == h->watches) 918 if (NULL == h->watches)
958 h->watches = GNUNET_CONTAINER_multihashmap_create (5, 919 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
959 GNUNET_NO); 920 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
960 GNUNET_assert (GNUNET_OK == 921 h->watches,
961 GNUNET_CONTAINER_multihashmap_put (h->watches, 922 &wc->keyhash,
962 &wc->keyhash, 923 wc,
963 wc, 924 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
964 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
965 LOG (GNUNET_ERROR_TYPE_DEBUG, 925 LOG (GNUNET_ERROR_TYPE_DEBUG,
966 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n", 926 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
967 sub_system, 927 sub_system,
968 GNUNET_i2s (peer), 928 GNUNET_i2s (peer),
969 key); 929 key);
970 GNUNET_MQ_send (h->mq, 930 GNUNET_MQ_send (h->mq, ev);
971 ev);
972 return wc; 931 return wc;
973} 932}
974 933