diff options
author | Omar Tarabai <tarabai@devegypt.com> | 2014-05-30 16:06:00 +0000 |
---|---|---|
committer | Omar Tarabai <tarabai@devegypt.com> | 2014-05-30 16:06:00 +0000 |
commit | 95cdeb5c0bb1f14f3959863e6bf4675db48ea177 (patch) | |
tree | 545e2a910c0efdae9dc2e2af8da45efa007cdf32 /src/peerstore/peerstore_api.c | |
parent | 02f9d1e7389d0da0a475ae0035b67801c7ca2d06 (diff) | |
download | gnunet-95cdeb5c0bb1f14f3959863e6bf4675db48ea177.tar.gz gnunet-95cdeb5c0bb1f14f3959863e6bf4675db48ea177.zip |
peerstore: towards watch functionality
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r-- | src/peerstore/peerstore_api.c | 142 |
1 files changed, 101 insertions, 41 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 14b1c3e88..c9a68f4bf 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c | |||
@@ -76,14 +76,9 @@ struct GNUNET_PEERSTORE_Handle | |||
76 | struct GNUNET_PEERSTORE_IterateContext *iterate_tail; | 76 | struct GNUNET_PEERSTORE_IterateContext *iterate_tail; |
77 | 77 | ||
78 | /** | 78 | /** |
79 | * Head of WATCH requests (active and inactive). | 79 | * Hashmap of watch requests |
80 | */ | 80 | */ |
81 | struct GNUNET_PEERSTORE_WatchContext *watch_head; | 81 | struct GNUNET_CONTAINER_MultiHashMap *watches; |
82 | |||
83 | /** | ||
84 | * Tail of WATCH requests (active and inactive). | ||
85 | */ | ||
86 | struct GNUNET_PEERSTORE_WatchContext *watch_tail; | ||
87 | 82 | ||
88 | }; | 83 | }; |
89 | 84 | ||
@@ -215,6 +210,11 @@ struct GNUNET_PEERSTORE_WatchContext | |||
215 | void *callback_cls; | 210 | void *callback_cls; |
216 | 211 | ||
217 | /** | 212 | /** |
213 | * Hash of the combined key | ||
214 | */ | ||
215 | struct GNUNET_HashCode keyhash; | ||
216 | |||
217 | /** | ||
218 | * #GNUNET_YES / #GNUNET_NO | 218 | * #GNUNET_YES / #GNUNET_NO |
219 | * if sent, cannot be canceled | 219 | * if sent, cannot be canceled |
220 | */ | 220 | */ |
@@ -284,27 +284,6 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error) | |||
284 | } | 284 | } |
285 | 285 | ||
286 | /** | 286 | /** |
287 | * Should be called only after destroying MQ and connection | ||
288 | */ | ||
289 | static void | ||
290 | cleanup_handle(struct GNUNET_PEERSTORE_Handle *h) | ||
291 | { | ||
292 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
293 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
294 | |||
295 | while (NULL != (sc = h->store_head)) | ||
296 | { | ||
297 | GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc); | ||
298 | GNUNET_free(sc); | ||
299 | } | ||
300 | while (NULL != (ic = h->iterate_head)) | ||
301 | { | ||
302 | GNUNET_CONTAINER_DLL_remove(h->iterate_head, h->iterate_tail, ic); | ||
303 | GNUNET_free(ic); | ||
304 | } | ||
305 | } | ||
306 | |||
307 | /** | ||
308 | * Close the existing connection to PEERSTORE and reconnect. | 287 | * Close the existing connection to PEERSTORE and reconnect. |
309 | * | 288 | * |
310 | * @param h handle to the service | 289 | * @param h handle to the service |
@@ -312,7 +291,6 @@ cleanup_handle(struct GNUNET_PEERSTORE_Handle *h) | |||
312 | static void | 291 | static void |
313 | reconnect (struct GNUNET_PEERSTORE_Handle *h) | 292 | reconnect (struct GNUNET_PEERSTORE_Handle *h) |
314 | { | 293 | { |
315 | |||
316 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); | 294 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); |
317 | if (NULL != h->mq) | 295 | if (NULL != h->mq) |
318 | { | 296 | { |
@@ -324,12 +302,13 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
324 | GNUNET_CLIENT_disconnect (h->client); | 302 | GNUNET_CLIENT_disconnect (h->client); |
325 | h->client = NULL; | 303 | h->client = NULL; |
326 | } | 304 | } |
327 | cleanup_handle(h); | ||
328 | h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); | 305 | h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); |
306 | //FIXME: retry connecting if fails again (client == NULL) | ||
329 | h->mq = GNUNET_MQ_queue_for_connection_client(h->client, | 307 | h->mq = GNUNET_MQ_queue_for_connection_client(h->client, |
330 | mq_handlers, | 308 | mq_handlers, |
331 | &handle_client_error, | 309 | &handle_client_error, |
332 | h); | 310 | h); |
311 | //FIXME: resend pending requests after reconnecting | ||
333 | 312 | ||
334 | } | 313 | } |
335 | 314 | ||
@@ -373,6 +352,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
373 | void | 352 | void |
374 | GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) | 353 | GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) |
375 | { | 354 | { |
355 | if(NULL != h->watches) | ||
356 | { | ||
357 | GNUNET_CONTAINER_multihashmap_destroy(h->watches); | ||
358 | h->watches = NULL; | ||
359 | } | ||
376 | if(NULL != h->mq) | 360 | if(NULL != h->mq) |
377 | { | 361 | { |
378 | GNUNET_MQ_destroy(h->mq); | 362 | GNUNET_MQ_destroy(h->mq); |
@@ -383,7 +367,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) | |||
383 | GNUNET_CLIENT_disconnect (h->client); | 367 | GNUNET_CLIENT_disconnect (h->client); |
384 | h->client = NULL; | 368 | h->client = NULL; |
385 | } | 369 | } |
386 | cleanup_handle(h); | ||
387 | GNUNET_free(h); | 370 | GNUNET_free(h); |
388 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n"); | 371 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n"); |
389 | } | 372 | } |
@@ -655,7 +638,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) | |||
655 | */ | 638 | */ |
656 | struct GNUNET_PEERSTORE_IterateContext * | 639 | struct GNUNET_PEERSTORE_IterateContext * |
657 | GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | 640 | GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, |
658 | char *sub_system, | 641 | const char *sub_system, |
659 | const struct GNUNET_PeerIdentity *peer, | 642 | const struct GNUNET_PeerIdentity *peer, |
660 | const char *key, | 643 | const char *key, |
661 | struct GNUNET_TIME_Relative timeout, | 644 | struct GNUNET_TIME_Relative timeout, |
@@ -698,7 +681,54 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
698 | */ | 681 | */ |
699 | void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) | 682 | void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) |
700 | { | 683 | { |
684 | /*struct GNUNET_PEERSTORE_Handle *h = cls; | ||
685 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
686 | GNUNET_PEERSTORE_Processor callback; | ||
687 | void *callback_cls; | ||
688 | |||
689 | |||
690 | |||
691 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
692 | uint16_t msg_type; | ||
693 | struct GNUNET_PEERSTORE_Record *record; | ||
694 | int continue_iter; | ||
695 | |||
696 | ic = h->iterate_head; | ||
697 | if(NULL == ic) | ||
698 | { | ||
699 | LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n"); | ||
700 | reconnect(h); | ||
701 | return; | ||
702 | } | ||
703 | callback = ic->callback; | ||
704 | callback_cls = ic->callback_cls; | ||
705 | if(NULL == msg) * Connection error * | ||
706 | { | ||
701 | 707 | ||
708 | if(NULL != callback) | ||
709 | callback(callback_cls, NULL, | ||
710 | _("Error communicating with `PEERSTORE' service.")); | ||
711 | reconnect(h); | ||
712 | return; | ||
713 | } | ||
714 | msg_type = ntohs(msg->type); | ||
715 | if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) | ||
716 | { | ||
717 | GNUNET_PEERSTORE_iterate_cancel(ic); | ||
718 | if(NULL != callback) | ||
719 | callback(callback_cls, NULL, NULL); | ||
720 | return; | ||
721 | } | ||
722 | if(NULL != callback) | ||
723 | { | ||
724 | record = PEERSTORE_parse_record_message(msg); | ||
725 | if(NULL == record) | ||
726 | continue_iter = callback(callback_cls, record, _("Received a malformed response from service.")); | ||
727 | else | ||
728 | continue_iter = callback(callback_cls, record, NULL); | ||
729 | if(GNUNET_NO == continue_iter) | ||
730 | ic->callback = NULL; | ||
731 | }*/ | ||
702 | } | 732 | } |
703 | 733 | ||
704 | /** | 734 | /** |
@@ -715,6 +745,36 @@ void watch_request_sent (void *cls) | |||
715 | } | 745 | } |
716 | 746 | ||
717 | /** | 747 | /** |
748 | * Cancel a watch request | ||
749 | * | ||
750 | * @wc handle to the watch request | ||
751 | */ | ||
752 | void | ||
753 | GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc) | ||
754 | { | ||
755 | struct GNUNET_PEERSTORE_Handle *h = wc->h; | ||
756 | struct GNUNET_MQ_Envelope *ev; | ||
757 | struct StoreKeyHashMessage *hm; | ||
758 | |||
759 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); | ||
760 | if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */ | ||
761 | { | ||
762 | ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); | ||
763 | GNUNET_MQ_send(h->mq, ev); | ||
764 | wc->callback = NULL; | ||
765 | wc->callback_cls = NULL; | ||
766 | } | ||
767 | if(NULL != wc->ev) | ||
768 | { | ||
769 | GNUNET_MQ_send_cancel(wc->ev); | ||
770 | wc->ev = NULL; | ||
771 | } | ||
772 | GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc); | ||
773 | GNUNET_free(wc); | ||
774 | |||
775 | } | ||
776 | |||
777 | /** | ||
718 | * Request watching a given key | 778 | * Request watching a given key |
719 | * User will be notified with any new values added to key | 779 | * User will be notified with any new values added to key |
720 | * | 780 | * |
@@ -728,28 +788,28 @@ void watch_request_sent (void *cls) | |||
728 | */ | 788 | */ |
729 | struct GNUNET_PEERSTORE_WatchContext * | 789 | struct GNUNET_PEERSTORE_WatchContext * |
730 | GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, | 790 | GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, |
731 | char *sub_system, | 791 | const char *sub_system, |
732 | const struct GNUNET_PeerIdentity *peer, | 792 | const struct GNUNET_PeerIdentity *peer, |
733 | const char *key, | 793 | const char *key, |
734 | GNUNET_PEERSTORE_Processor callback, void *callback_cls) | 794 | GNUNET_PEERSTORE_Processor callback, void *callback_cls) |
735 | { | 795 | { |
736 | struct GNUNET_MQ_Envelope *ev; | 796 | struct GNUNET_MQ_Envelope *ev; |
797 | struct StoreKeyHashMessage *hm; | ||
737 | struct GNUNET_PEERSTORE_WatchContext *wc; | 798 | struct GNUNET_PEERSTORE_WatchContext *wc; |
738 | 799 | ||
739 | ev = PEERSTORE_create_record_mq_envelope(sub_system, | 800 | ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); |
740 | peer, | 801 | PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash); |
741 | key, | ||
742 | NULL, | ||
743 | 0, | ||
744 | NULL, | ||
745 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | ||
746 | wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext); | 802 | wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext); |
747 | wc->callback = callback; | 803 | wc->callback = callback; |
748 | wc->callback_cls = callback_cls; | 804 | wc->callback_cls = callback_cls; |
749 | wc->ev = ev; | 805 | wc->ev = ev; |
750 | wc->h = h; | 806 | wc->h = h; |
751 | wc->request_sent = GNUNET_NO; | 807 | wc->request_sent = GNUNET_NO; |
752 | GNUNET_CONTAINER_DLL_insert(h->watch_head, h->watch_tail, wc); | 808 | wc->keyhash = hm->keyhash; |
809 | if(NULL == h->watches) | ||
810 | h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); | ||
811 | GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash, | ||
812 | wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
753 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 813 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
754 | "Sending a watch request for sub system `%s'.\n", sub_system); | 814 | "Sending a watch request for sub system `%s'.\n", sub_system); |
755 | GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); | 815 | GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); |