diff options
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r-- | src/peerstore/peerstore_api.c | 247 |
1 files changed, 103 insertions, 144 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 02f3e287a..64bc3ae72 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c | |||
@@ -28,7 +28,7 @@ | |||
28 | #include "peerstore.h" | 28 | #include "peerstore.h" |
29 | #include "peerstore_common.h" | 29 | #include "peerstore_common.h" |
30 | 30 | ||
31 | #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__) | 31 | #define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__) |
32 | 32 | ||
33 | /******************************************************************************/ | 33 | /******************************************************************************/ |
34 | /************************ DATA STRUCTURES ****************************/ | 34 | /************************ DATA STRUCTURES ****************************/ |
@@ -89,7 +89,6 @@ struct GNUNET_PEERSTORE_Handle | |||
89 | * Are we in the process of disconnecting but need to sync first? | 89 | * Are we in the process of disconnecting but need to sync first? |
90 | */ | 90 | */ |
91 | int disconnecting; | 91 | int disconnecting; |
92 | |||
93 | }; | 92 | }; |
94 | 93 | ||
95 | /** | 94 | /** |
@@ -156,7 +155,6 @@ struct GNUNET_PEERSTORE_StoreContext | |||
156 | * Options for the store operation. | 155 | * Options for the store operation. |
157 | */ | 156 | */ |
158 | enum GNUNET_PEERSTORE_StoreOption options; | 157 | enum GNUNET_PEERSTORE_StoreOption options; |
159 | |||
160 | }; | 158 | }; |
161 | 159 | ||
162 | /** | 160 | /** |
@@ -208,7 +206,6 @@ struct GNUNET_PEERSTORE_IterateContext | |||
208 | * #GNUNET_YES if we are currently processing records. | 206 | * #GNUNET_YES if we are currently processing records. |
209 | */ | 207 | */ |
210 | int iterating; | 208 | int iterating; |
211 | |||
212 | }; | 209 | }; |
213 | 210 | ||
214 | /** | 211 | /** |
@@ -245,7 +242,6 @@ struct GNUNET_PEERSTORE_WatchContext | |||
245 | * Hash of the combined key | 242 | * Hash of the combined key |
246 | */ | 243 | */ |
247 | struct GNUNET_HashCode keyhash; | 244 | struct GNUNET_HashCode keyhash; |
248 | |||
249 | }; | 245 | }; |
250 | 246 | ||
251 | /******************************************************************************/ | 247 | /******************************************************************************/ |
@@ -271,8 +267,7 @@ disconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
271 | { | 267 | { |
272 | struct GNUNET_PEERSTORE_IterateContext *next; | 268 | struct GNUNET_PEERSTORE_IterateContext *next; |
273 | 269 | ||
274 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; | 270 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; |
275 | NULL != ic; | ||
276 | ic = next) | 271 | ic = next) |
277 | { | 272 | { |
278 | next = ic->next; | 273 | next = ic->next; |
@@ -285,9 +280,7 @@ disconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
285 | icb_cls = ic->callback_cls; | 280 | icb_cls = ic->callback_cls; |
286 | GNUNET_PEERSTORE_iterate_cancel (ic); | 281 | GNUNET_PEERSTORE_iterate_cancel (ic); |
287 | if (NULL != icb) | 282 | if (NULL != icb) |
288 | icb (icb_cls, | 283 | icb (icb_cls, NULL, "Iteration canceled due to reconnection"); |
289 | NULL, | ||
290 | "Iteration canceled due to reconnection"); | ||
291 | } | 284 | } |
292 | } | 285 | } |
293 | 286 | ||
@@ -312,17 +305,13 @@ disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
312 | disconnect (h); | 305 | disconnect (h); |
313 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 306 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
314 | "Scheduling task to reconnect to PEERSTORE service in %s.\n", | 307 | "Scheduling task to reconnect to PEERSTORE service in %s.\n", |
315 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, | 308 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); |
316 | GNUNET_YES)); | ||
317 | h->reconnect_task = | 309 | h->reconnect_task = |
318 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | 310 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); |
319 | &reconnect, | ||
320 | h); | ||
321 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | 311 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); |
322 | } | 312 | } |
323 | 313 | ||
324 | 314 | ||
325 | |||
326 | /** | 315 | /** |
327 | * Callback after MQ envelope is sent | 316 | * Callback after MQ envelope is sent |
328 | * | 317 | * |
@@ -352,8 +341,7 @@ store_request_sent (void *cls) | |||
352 | * Function called when we had trouble talking to the service. | 341 | * Function called when we had trouble talking to the service. |
353 | */ | 342 | */ |
354 | static void | 343 | static void |
355 | handle_client_error (void *cls, | 344 | handle_client_error (void *cls, enum GNUNET_MQ_Error error) |
356 | enum GNUNET_MQ_Error error) | ||
357 | { | 345 | { |
358 | struct GNUNET_PEERSTORE_Handle *h = cls; | 346 | struct GNUNET_PEERSTORE_Handle *h = cls; |
359 | 347 | ||
@@ -373,9 +361,7 @@ handle_client_error (void *cls, | |||
373 | * @return #GNUNET_YES (continue to iterate) | 361 | * @return #GNUNET_YES (continue to iterate) |
374 | */ | 362 | */ |
375 | static int | 363 | static int |
376 | rewatch_it (void *cls, | 364 | rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value) |
377 | const struct GNUNET_HashCode *key, | ||
378 | void *value) | ||
379 | { | 365 | { |
380 | struct GNUNET_PEERSTORE_Handle *h = cls; | 366 | struct GNUNET_PEERSTORE_Handle *h = cls; |
381 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | 367 | struct GNUNET_PEERSTORE_WatchContext *wc = value; |
@@ -398,9 +384,7 @@ rewatch_it (void *cls, | |||
398 | * @return #GNUNET_YES to continue iteration | 384 | * @return #GNUNET_YES to continue iteration |
399 | */ | 385 | */ |
400 | static int | 386 | static int |
401 | destroy_watch (void *cls, | 387 | destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value) |
402 | const struct GNUNET_HashCode *key, | ||
403 | void *value) | ||
404 | { | 388 | { |
405 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | 389 | struct GNUNET_PEERSTORE_WatchContext *wc = value; |
406 | 390 | ||
@@ -461,8 +445,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
461 | * @param sync_first send any pending STORE requests before disconnecting | 445 | * @param sync_first send any pending STORE requests before disconnecting |
462 | */ | 446 | */ |
463 | void | 447 | void |
464 | GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, | 448 | GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first) |
465 | int sync_first) | ||
466 | { | 449 | { |
467 | struct GNUNET_PEERSTORE_IterateContext *ic; | 450 | struct GNUNET_PEERSTORE_IterateContext *ic; |
468 | struct GNUNET_PEERSTORE_StoreContext *sc; | 451 | struct GNUNET_PEERSTORE_StoreContext *sc; |
@@ -515,8 +498,7 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) | |||
515 | GNUNET_free (sc->value); | 498 | GNUNET_free (sc->value); |
516 | GNUNET_free (sc->key); | 499 | GNUNET_free (sc->key); |
517 | GNUNET_free (sc); | 500 | GNUNET_free (sc); |
518 | if ( (GNUNET_YES == h->disconnecting) && | 501 | if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head)) |
519 | (NULL == h->store_head) ) | ||
520 | final_disconnect (h); | 502 | final_disconnect (h); |
521 | } | 503 | } |
522 | 504 | ||
@@ -542,7 +524,8 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
542 | const char *sub_system, | 524 | const char *sub_system, |
543 | const struct GNUNET_PeerIdentity *peer, | 525 | const struct GNUNET_PeerIdentity *peer, |
544 | const char *key, | 526 | const char *key, |
545 | const void *value, size_t size, | 527 | const void *value, |
528 | size_t size, | ||
546 | struct GNUNET_TIME_Absolute expiry, | 529 | struct GNUNET_TIME_Absolute expiry, |
547 | enum GNUNET_PEERSTORE_StoreOption options, | 530 | enum GNUNET_PEERSTORE_StoreOption options, |
548 | GNUNET_PEERSTORE_Continuation cont, | 531 | GNUNET_PEERSTORE_Continuation cont, |
@@ -553,10 +536,19 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
553 | 536 | ||
554 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 537 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
555 | "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", | 538 | "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", |
556 | size, sub_system, GNUNET_i2s (peer), key); | 539 | size, |
557 | ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size, | 540 | sub_system, |
558 | expiry, options, | 541 | GNUNET_i2s (peer), |
559 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | 542 | key); |
543 | ev = | ||
544 | PEERSTORE_create_record_mq_envelope (sub_system, | ||
545 | peer, | ||
546 | key, | ||
547 | value, | ||
548 | size, | ||
549 | expiry, | ||
550 | options, | ||
551 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | ||
560 | sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext); | 552 | sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext); |
561 | 553 | ||
562 | sc->sub_system = GNUNET_strdup (sub_system); | 554 | sc->sub_system = GNUNET_strdup (sub_system); |
@@ -574,7 +566,6 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
574 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); | 566 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); |
575 | GNUNET_MQ_send (h->mq, ev); | 567 | GNUNET_MQ_send (h->mq, ev); |
576 | return sc; | 568 | return sc; |
577 | |||
578 | } | 569 | } |
579 | 570 | ||
580 | 571 | ||
@@ -590,8 +581,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
590 | * @param msg message received | 581 | * @param msg message received |
591 | */ | 582 | */ |
592 | static void | 583 | static void |
593 | handle_iterate_end (void *cls, | 584 | handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg) |
594 | const struct GNUNET_MessageHeader *msg) | ||
595 | { | 585 | { |
596 | struct GNUNET_PEERSTORE_Handle *h = cls; | 586 | struct GNUNET_PEERSTORE_Handle *h = cls; |
597 | struct GNUNET_PEERSTORE_IterateContext *ic; | 587 | struct GNUNET_PEERSTORE_IterateContext *ic; |
@@ -602,7 +592,7 @@ handle_iterate_end (void *cls, | |||
602 | if (NULL == ic) | 592 | if (NULL == ic) |
603 | { | 593 | { |
604 | LOG (GNUNET_ERROR_TYPE_ERROR, | 594 | LOG (GNUNET_ERROR_TYPE_ERROR, |
605 | _("Unexpected iteration response, this should not happen.\n")); | 595 | _ ("Unexpected iteration response, this should not happen.\n")); |
606 | disconnect_and_schedule_reconnect (h); | 596 | disconnect_and_schedule_reconnect (h); |
607 | return; | 597 | return; |
608 | } | 598 | } |
@@ -611,9 +601,7 @@ handle_iterate_end (void *cls, | |||
611 | ic->iterating = GNUNET_NO; | 601 | ic->iterating = GNUNET_NO; |
612 | GNUNET_PEERSTORE_iterate_cancel (ic); | 602 | GNUNET_PEERSTORE_iterate_cancel (ic); |
613 | if (NULL != callback) | 603 | if (NULL != callback) |
614 | callback (callback_cls, | 604 | callback (callback_cls, NULL, NULL); |
615 | NULL, | ||
616 | NULL); | ||
617 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 605 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
618 | } | 606 | } |
619 | 607 | ||
@@ -626,8 +614,7 @@ handle_iterate_end (void *cls, | |||
626 | * @param msg message received | 614 | * @param msg message received |
627 | */ | 615 | */ |
628 | static int | 616 | static int |
629 | check_iterate_result (void *cls, | 617 | check_iterate_result (void *cls, const struct StoreRecordMessage *msg) |
630 | const struct StoreRecordMessage *msg) | ||
631 | { | 618 | { |
632 | /* we defer validation to #handle_iterate_result */ | 619 | /* we defer validation to #handle_iterate_result */ |
633 | return GNUNET_OK; | 620 | return GNUNET_OK; |
@@ -641,8 +628,7 @@ check_iterate_result (void *cls, | |||
641 | * @param msg message received | 628 | * @param msg message received |
642 | */ | 629 | */ |
643 | static void | 630 | static void |
644 | handle_iterate_result (void *cls, | 631 | handle_iterate_result (void *cls, const struct StoreRecordMessage *msg) |
645 | const struct StoreRecordMessage *msg) | ||
646 | { | 632 | { |
647 | struct GNUNET_PEERSTORE_Handle *h = cls; | 633 | struct GNUNET_PEERSTORE_Handle *h = cls; |
648 | struct GNUNET_PEERSTORE_IterateContext *ic; | 634 | struct GNUNET_PEERSTORE_IterateContext *ic; |
@@ -654,7 +640,7 @@ handle_iterate_result (void *cls, | |||
654 | if (NULL == ic) | 640 | if (NULL == ic) |
655 | { | 641 | { |
656 | LOG (GNUNET_ERROR_TYPE_ERROR, | 642 | LOG (GNUNET_ERROR_TYPE_ERROR, |
657 | _("Unexpected iteration response, this should not happen.\n")); | 643 | _ ("Unexpected iteration response, this should not happen.\n")); |
658 | disconnect_and_schedule_reconnect (h); | 644 | disconnect_and_schedule_reconnect (h); |
659 | return; | 645 | return; |
660 | } | 646 | } |
@@ -668,13 +654,11 @@ handle_iterate_result (void *cls, | |||
668 | { | 654 | { |
669 | callback (callback_cls, | 655 | callback (callback_cls, |
670 | NULL, | 656 | NULL, |
671 | _("Received a malformed response from service.")); | 657 | _ ("Received a malformed response from service.")); |
672 | } | 658 | } |
673 | else | 659 | else |
674 | { | 660 | { |
675 | callback (callback_cls, | 661 | callback (callback_cls, record, NULL); |
676 | record, | ||
677 | NULL); | ||
678 | PEERSTORE_destroy_record (record); | 662 | PEERSTORE_destroy_record (record); |
679 | } | 663 | } |
680 | } | 664 | } |
@@ -691,9 +675,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) | |||
691 | { | 675 | { |
692 | if (GNUNET_NO == ic->iterating) | 676 | if (GNUNET_NO == ic->iterating) |
693 | { | 677 | { |
694 | GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, | 678 | GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic); |
695 | ic->h->iterate_tail, | ||
696 | ic); | ||
697 | GNUNET_free (ic->sub_system); | 679 | GNUNET_free (ic->sub_system); |
698 | GNUNET_free_non_null (ic->key); | 680 | GNUNET_free_non_null (ic->key); |
699 | GNUNET_free (ic); | 681 | GNUNET_free (ic); |
@@ -725,13 +707,15 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
725 | struct GNUNET_MQ_Envelope *ev; | 707 | struct GNUNET_MQ_Envelope *ev; |
726 | struct GNUNET_PEERSTORE_IterateContext *ic; | 708 | struct GNUNET_PEERSTORE_IterateContext *ic; |
727 | 709 | ||
728 | ev = PEERSTORE_create_record_mq_envelope (sub_system, | 710 | ev = |
729 | peer, | 711 | PEERSTORE_create_record_mq_envelope (sub_system, |
730 | key, | 712 | peer, |
731 | NULL, 0, | 713 | key, |
732 | GNUNET_TIME_UNIT_FOREVER_ABS, | 714 | NULL, |
733 | 0, | 715 | 0, |
734 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | 716 | GNUNET_TIME_UNIT_FOREVER_ABS, |
717 | 0, | ||
718 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
735 | ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext); | 719 | ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext); |
736 | ic->callback = callback; | 720 | ic->callback = callback; |
737 | ic->callback_cls = callback_cls; | 721 | ic->callback_cls = callback_cls; |
@@ -741,9 +725,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
741 | ic->peer = *peer; | 725 | ic->peer = *peer; |
742 | if (NULL != key) | 726 | if (NULL != key) |
743 | ic->key = GNUNET_strdup (key); | 727 | ic->key = GNUNET_strdup (key); |
744 | GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, | 728 | GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic); |
745 | h->iterate_tail, | ||
746 | ic); | ||
747 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 729 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
748 | "Sending an iterate request for sub system `%s'\n", | 730 | "Sending an iterate request for sub system `%s'\n", |
749 | sub_system); | 731 | sub_system); |
@@ -763,8 +745,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
763 | * @param msg message received | 745 | * @param msg message received |
764 | */ | 746 | */ |
765 | static int | 747 | static int |
766 | check_watch_record (void *cls, | 748 | check_watch_record (void *cls, const struct StoreRecordMessage *msg) |
767 | const struct StoreRecordMessage *msg) | ||
768 | { | 749 | { |
769 | /* we defer validation to #handle_watch_result */ | 750 | /* we defer validation to #handle_watch_result */ |
770 | return GNUNET_OK; | 751 | return GNUNET_OK; |
@@ -778,41 +759,33 @@ check_watch_record (void *cls, | |||
778 | * @param msg message received | 759 | * @param msg message received |
779 | */ | 760 | */ |
780 | static void | 761 | static void |
781 | handle_watch_record (void *cls, | 762 | handle_watch_record (void *cls, const struct StoreRecordMessage *msg) |
782 | const struct StoreRecordMessage *msg) | ||
783 | { | 763 | { |
784 | struct GNUNET_PEERSTORE_Handle *h = cls; | 764 | struct GNUNET_PEERSTORE_Handle *h = cls; |
785 | struct GNUNET_PEERSTORE_Record *record; | 765 | struct GNUNET_PEERSTORE_Record *record; |
786 | struct GNUNET_HashCode keyhash; | 766 | struct GNUNET_HashCode keyhash; |
787 | struct GNUNET_PEERSTORE_WatchContext *wc; | 767 | struct GNUNET_PEERSTORE_WatchContext *wc; |
788 | 768 | ||
789 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 769 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); |
790 | "Received a watch record from service.\n"); | ||
791 | record = PEERSTORE_parse_record_message (msg); | 770 | record = PEERSTORE_parse_record_message (msg); |
792 | if (NULL == record) | 771 | if (NULL == record) |
793 | { | 772 | { |
794 | disconnect_and_schedule_reconnect (h); | 773 | disconnect_and_schedule_reconnect (h); |
795 | return; | 774 | return; |
796 | } | 775 | } |
797 | PEERSTORE_hash_key (record->sub_system, | 776 | PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash); |
798 | &record->peer, | ||
799 | record->key, | ||
800 | &keyhash); | ||
801 | // FIXME: what if there are multiple watches for the same key? | 777 | // FIXME: what if there are multiple watches for the same key? |
802 | wc = GNUNET_CONTAINER_multihashmap_get (h->watches, | 778 | wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash); |
803 | &keyhash); | ||
804 | if (NULL == wc) | 779 | if (NULL == wc) |
805 | { | 780 | { |
806 | LOG (GNUNET_ERROR_TYPE_ERROR, | 781 | LOG (GNUNET_ERROR_TYPE_ERROR, |
807 | _("Received a watch result for a non existing watch.\n")); | 782 | _ ("Received a watch result for a non existing watch.\n")); |
808 | PEERSTORE_destroy_record (record); | 783 | PEERSTORE_destroy_record (record); |
809 | disconnect_and_schedule_reconnect (h); | 784 | disconnect_and_schedule_reconnect (h); |
810 | return; | 785 | return; |
811 | } | 786 | } |
812 | if (NULL != wc->callback) | 787 | if (NULL != wc->callback) |
813 | wc->callback (wc->callback_cls, | 788 | wc->callback (wc->callback_cls, record, NULL); |
814 | record, | ||
815 | NULL); | ||
816 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 789 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
817 | PEERSTORE_destroy_record (record); | 790 | PEERSTORE_destroy_record (record); |
818 | } | 791 | } |
@@ -827,26 +800,24 @@ static void | |||
827 | reconnect (void *cls) | 800 | reconnect (void *cls) |
828 | { | 801 | { |
829 | struct GNUNET_PEERSTORE_Handle *h = cls; | 802 | struct GNUNET_PEERSTORE_Handle *h = cls; |
830 | struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 803 | struct GNUNET_MQ_MessageHandler mq_handlers[] = |
831 | GNUNET_MQ_hd_fixed_size (iterate_end, | 804 | {GNUNET_MQ_hd_fixed_size (iterate_end, |
832 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, | 805 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, |
833 | struct GNUNET_MessageHeader, | 806 | struct GNUNET_MessageHeader, |
834 | h), | 807 | h), |
835 | GNUNET_MQ_hd_var_size (iterate_result, | 808 | GNUNET_MQ_hd_var_size (iterate_result, |
836 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, | 809 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, |
837 | struct StoreRecordMessage, | 810 | struct StoreRecordMessage, |
838 | h), | 811 | h), |
839 | GNUNET_MQ_hd_var_size (watch_record, | 812 | GNUNET_MQ_hd_var_size (watch_record, |
840 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, | 813 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, |
841 | struct StoreRecordMessage, | 814 | struct StoreRecordMessage, |
842 | h), | 815 | h), |
843 | GNUNET_MQ_handler_end () | 816 | GNUNET_MQ_handler_end ()}; |
844 | }; | ||
845 | struct GNUNET_MQ_Envelope *ev; | 817 | struct GNUNET_MQ_Envelope *ev; |
846 | 818 | ||
847 | h->reconnect_task = NULL; | 819 | h->reconnect_task = NULL; |
848 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 820 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); |
849 | "Reconnecting...\n"); | ||
850 | h->mq = GNUNET_CLIENT_connect (h->cfg, | 821 | h->mq = GNUNET_CLIENT_connect (h->cfg, |
851 | "peerstore", | 822 | "peerstore", |
852 | mq_handlers, | 823 | mq_handlers, |
@@ -857,39 +828,35 @@ reconnect (void *cls) | |||
857 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 828 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
858 | "Resending pending requests after reconnect.\n"); | 829 | "Resending pending requests after reconnect.\n"); |
859 | if (NULL != h->watches) | 830 | if (NULL != h->watches) |
860 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, | 831 | GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h); |
861 | &rewatch_it, | 832 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic; |
862 | h); | ||
863 | for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; | ||
864 | NULL != ic; | ||
865 | ic = ic->next) | 833 | ic = ic->next) |
866 | { | 834 | { |
867 | ev = PEERSTORE_create_record_mq_envelope (ic->sub_system, | 835 | ev = |
868 | &ic->peer, | 836 | PEERSTORE_create_record_mq_envelope (ic->sub_system, |
869 | ic->key, | 837 | &ic->peer, |
870 | NULL, 0, | 838 | ic->key, |
871 | GNUNET_TIME_UNIT_FOREVER_ABS, | 839 | NULL, |
872 | 0, | 840 | 0, |
873 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | 841 | GNUNET_TIME_UNIT_FOREVER_ABS, |
842 | 0, | ||
843 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); | ||
874 | GNUNET_MQ_send (h->mq, ev); | 844 | GNUNET_MQ_send (h->mq, ev); |
875 | } | 845 | } |
876 | for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; | 846 | for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc; |
877 | NULL != sc; | ||
878 | sc = sc->next) | 847 | sc = sc->next) |
879 | { | 848 | { |
880 | ev = PEERSTORE_create_record_mq_envelope (sc->sub_system, | 849 | ev = |
881 | &sc->peer, | 850 | PEERSTORE_create_record_mq_envelope (sc->sub_system, |
882 | sc->key, | 851 | &sc->peer, |
883 | sc->value, | 852 | sc->key, |
884 | sc->size, | 853 | sc->value, |
885 | sc->expiry, | 854 | sc->size, |
886 | sc->options, | 855 | sc->expiry, |
887 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | 856 | sc->options, |
888 | GNUNET_MQ_notify_sent (ev, | 857 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); |
889 | &store_request_sent, | 858 | GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); |
890 | sc); | 859 | GNUNET_MQ_send (h->mq, ev); |
891 | GNUNET_MQ_send (h->mq, | ||
892 | ev); | ||
893 | } | 860 | } |
894 | } | 861 | } |
895 | 862 | ||
@@ -906,15 +873,13 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc) | |||
906 | struct GNUNET_MQ_Envelope *ev; | 873 | struct GNUNET_MQ_Envelope *ev; |
907 | struct StoreKeyHashMessage *hm; | 874 | struct StoreKeyHashMessage *hm; |
908 | 875 | ||
909 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 876 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); |
910 | "Canceling watch.\n"); | 877 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); |
911 | ev = GNUNET_MQ_msg (hm, | ||
912 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); | ||
913 | hm->keyhash = wc->keyhash; | 878 | hm->keyhash = wc->keyhash; |
914 | GNUNET_MQ_send (h->mq, ev); | 879 | GNUNET_MQ_send (h->mq, ev); |
915 | GNUNET_CONTAINER_multihashmap_remove (h->watches, | 880 | GNUNET_assert ( |
916 | &wc->keyhash, | 881 | GNUNET_YES == |
917 | wc); | 882 | GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc)); |
918 | GNUNET_free (wc); | 883 | GNUNET_free (wc); |
919 | } | 884 | } |
920 | 885 | ||
@@ -943,32 +908,26 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, | |||
943 | struct StoreKeyHashMessage *hm; | 908 | struct StoreKeyHashMessage *hm; |
944 | struct GNUNET_PEERSTORE_WatchContext *wc; | 909 | struct GNUNET_PEERSTORE_WatchContext *wc; |
945 | 910 | ||
946 | ev = GNUNET_MQ_msg (hm, | 911 | ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); |
947 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | 912 | PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash); |
948 | PEERSTORE_hash_key (sub_system, | ||
949 | peer, | ||
950 | key, | ||
951 | &hm->keyhash); | ||
952 | wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext); | 913 | wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext); |
953 | wc->callback = callback; | 914 | wc->callback = callback; |
954 | wc->callback_cls = callback_cls; | 915 | wc->callback_cls = callback_cls; |
955 | wc->h = h; | 916 | wc->h = h; |
956 | wc->keyhash = hm->keyhash; | 917 | wc->keyhash = hm->keyhash; |
957 | if (NULL == h->watches) | 918 | if (NULL == h->watches) |
958 | h->watches = GNUNET_CONTAINER_multihashmap_create (5, | 919 | h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO); |
959 | GNUNET_NO); | 920 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put ( |
960 | GNUNET_assert (GNUNET_OK == | 921 | h->watches, |
961 | GNUNET_CONTAINER_multihashmap_put (h->watches, | 922 | &wc->keyhash, |
962 | &wc->keyhash, | 923 | wc, |
963 | wc, | 924 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
964 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
965 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 925 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
966 | "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n", | 926 | "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n", |
967 | sub_system, | 927 | sub_system, |
968 | GNUNET_i2s (peer), | 928 | GNUNET_i2s (peer), |
969 | key); | 929 | key); |
970 | GNUNET_MQ_send (h->mq, | 930 | GNUNET_MQ_send (h->mq, ev); |
971 | ev); | ||
972 | return wc; | 931 | return wc; |
973 | } | 932 | } |
974 | 933 | ||