aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-05-30 16:06:00 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-05-30 16:06:00 +0000
commit95cdeb5c0bb1f14f3959863e6bf4675db48ea177 (patch)
tree545e2a910c0efdae9dc2e2af8da45efa007cdf32 /src/peerstore/peerstore_api.c
parent02f9d1e7389d0da0a475ae0035b67801c7ca2d06 (diff)
downloadgnunet-95cdeb5c0bb1f14f3959863e6bf4675db48ea177.tar.gz
gnunet-95cdeb5c0bb1f14f3959863e6bf4675db48ea177.zip
peerstore: towards watch functionality
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c142
1 files changed, 101 insertions, 41 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 14b1c3e88..c9a68f4bf 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -76,14 +76,9 @@ struct GNUNET_PEERSTORE_Handle
76 struct GNUNET_PEERSTORE_IterateContext *iterate_tail; 76 struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
77 77
78 /** 78 /**
79 * Head of WATCH requests (active and inactive). 79 * Hashmap of watch requests
80 */ 80 */
81 struct GNUNET_PEERSTORE_WatchContext *watch_head; 81 struct GNUNET_CONTAINER_MultiHashMap *watches;
82
83 /**
84 * Tail of WATCH requests (active and inactive).
85 */
86 struct GNUNET_PEERSTORE_WatchContext *watch_tail;
87 82
88}; 83};
89 84
@@ -215,6 +210,11 @@ struct GNUNET_PEERSTORE_WatchContext
215 void *callback_cls; 210 void *callback_cls;
216 211
217 /** 212 /**
213 * Hash of the combined key
214 */
215 struct GNUNET_HashCode keyhash;
216
217 /**
218 * #GNUNET_YES / #GNUNET_NO 218 * #GNUNET_YES / #GNUNET_NO
219 * if sent, cannot be canceled 219 * if sent, cannot be canceled
220 */ 220 */
@@ -284,27 +284,6 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
284} 284}
285 285
286/** 286/**
287 * Should be called only after destroying MQ and connection
288 */
289static void
290cleanup_handle(struct GNUNET_PEERSTORE_Handle *h)
291{
292 struct GNUNET_PEERSTORE_StoreContext *sc;
293 struct GNUNET_PEERSTORE_IterateContext *ic;
294
295 while (NULL != (sc = h->store_head))
296 {
297 GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
298 GNUNET_free(sc);
299 }
300 while (NULL != (ic = h->iterate_head))
301 {
302 GNUNET_CONTAINER_DLL_remove(h->iterate_head, h->iterate_tail, ic);
303 GNUNET_free(ic);
304 }
305}
306
307/**
308 * Close the existing connection to PEERSTORE and reconnect. 287 * Close the existing connection to PEERSTORE and reconnect.
309 * 288 *
310 * @param h handle to the service 289 * @param h handle to the service
@@ -312,7 +291,6 @@ cleanup_handle(struct GNUNET_PEERSTORE_Handle *h)
312static void 291static void
313reconnect (struct GNUNET_PEERSTORE_Handle *h) 292reconnect (struct GNUNET_PEERSTORE_Handle *h)
314{ 293{
315
316 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); 294 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
317 if (NULL != h->mq) 295 if (NULL != h->mq)
318 { 296 {
@@ -324,12 +302,13 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
324 GNUNET_CLIENT_disconnect (h->client); 302 GNUNET_CLIENT_disconnect (h->client);
325 h->client = NULL; 303 h->client = NULL;
326 } 304 }
327 cleanup_handle(h);
328 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); 305 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
306 //FIXME: retry connecting if fails again (client == NULL)
329 h->mq = GNUNET_MQ_queue_for_connection_client(h->client, 307 h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
330 mq_handlers, 308 mq_handlers,
331 &handle_client_error, 309 &handle_client_error,
332 h); 310 h);
311 //FIXME: resend pending requests after reconnecting
333 312
334} 313}
335 314
@@ -373,6 +352,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
373void 352void
374GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) 353GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
375{ 354{
355 if(NULL != h->watches)
356 {
357 GNUNET_CONTAINER_multihashmap_destroy(h->watches);
358 h->watches = NULL;
359 }
376 if(NULL != h->mq) 360 if(NULL != h->mq)
377 { 361 {
378 GNUNET_MQ_destroy(h->mq); 362 GNUNET_MQ_destroy(h->mq);
@@ -383,7 +367,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
383 GNUNET_CLIENT_disconnect (h->client); 367 GNUNET_CLIENT_disconnect (h->client);
384 h->client = NULL; 368 h->client = NULL;
385 } 369 }
386 cleanup_handle(h);
387 GNUNET_free(h); 370 GNUNET_free(h);
388 LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n"); 371 LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n");
389} 372}
@@ -655,7 +638,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
655 */ 638 */
656struct GNUNET_PEERSTORE_IterateContext * 639struct GNUNET_PEERSTORE_IterateContext *
657GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, 640GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
658 char *sub_system, 641 const char *sub_system,
659 const struct GNUNET_PeerIdentity *peer, 642 const struct GNUNET_PeerIdentity *peer,
660 const char *key, 643 const char *key,
661 struct GNUNET_TIME_Relative timeout, 644 struct GNUNET_TIME_Relative timeout,
@@ -698,7 +681,54 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
698 */ 681 */
699void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) 682void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
700{ 683{
684 /*struct GNUNET_PEERSTORE_Handle *h = cls;
685 struct GNUNET_PEERSTORE_WatchContext *wc;
686 GNUNET_PEERSTORE_Processor callback;
687 void *callback_cls;
688
689
690
691 struct GNUNET_PEERSTORE_IterateContext *ic;
692 uint16_t msg_type;
693 struct GNUNET_PEERSTORE_Record *record;
694 int continue_iter;
695
696 ic = h->iterate_head;
697 if(NULL == ic)
698 {
699 LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
700 reconnect(h);
701 return;
702 }
703 callback = ic->callback;
704 callback_cls = ic->callback_cls;
705 if(NULL == msg) * Connection error *
706 {
701 707
708 if(NULL != callback)
709 callback(callback_cls, NULL,
710 _("Error communicating with `PEERSTORE' service."));
711 reconnect(h);
712 return;
713 }
714 msg_type = ntohs(msg->type);
715 if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
716 {
717 GNUNET_PEERSTORE_iterate_cancel(ic);
718 if(NULL != callback)
719 callback(callback_cls, NULL, NULL);
720 return;
721 }
722 if(NULL != callback)
723 {
724 record = PEERSTORE_parse_record_message(msg);
725 if(NULL == record)
726 continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
727 else
728 continue_iter = callback(callback_cls, record, NULL);
729 if(GNUNET_NO == continue_iter)
730 ic->callback = NULL;
731 }*/
702} 732}
703 733
704/** 734/**
@@ -715,6 +745,36 @@ void watch_request_sent (void *cls)
715} 745}
716 746
717/** 747/**
748 * Cancel a watch request
749 *
750 * @wc handle to the watch request
751 */
752void
753GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
754{
755 struct GNUNET_PEERSTORE_Handle *h = wc->h;
756 struct GNUNET_MQ_Envelope *ev;
757 struct StoreKeyHashMessage *hm;
758
759 LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
760 if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
761 {
762 ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
763 GNUNET_MQ_send(h->mq, ev);
764 wc->callback = NULL;
765 wc->callback_cls = NULL;
766 }
767 if(NULL != wc->ev)
768 {
769 GNUNET_MQ_send_cancel(wc->ev);
770 wc->ev = NULL;
771 }
772 GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
773 GNUNET_free(wc);
774
775}
776
777/**
718 * Request watching a given key 778 * Request watching a given key
719 * User will be notified with any new values added to key 779 * User will be notified with any new values added to key
720 * 780 *
@@ -728,28 +788,28 @@ void watch_request_sent (void *cls)
728 */ 788 */
729struct GNUNET_PEERSTORE_WatchContext * 789struct GNUNET_PEERSTORE_WatchContext *
730GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, 790GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
731 char *sub_system, 791 const char *sub_system,
732 const struct GNUNET_PeerIdentity *peer, 792 const struct GNUNET_PeerIdentity *peer,
733 const char *key, 793 const char *key,
734 GNUNET_PEERSTORE_Processor callback, void *callback_cls) 794 GNUNET_PEERSTORE_Processor callback, void *callback_cls)
735{ 795{
736 struct GNUNET_MQ_Envelope *ev; 796 struct GNUNET_MQ_Envelope *ev;
797 struct StoreKeyHashMessage *hm;
737 struct GNUNET_PEERSTORE_WatchContext *wc; 798 struct GNUNET_PEERSTORE_WatchContext *wc;
738 799
739 ev = PEERSTORE_create_record_mq_envelope(sub_system, 800 ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
740 peer, 801 PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
741 key,
742 NULL,
743 0,
744 NULL,
745 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
746 wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext); 802 wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
747 wc->callback = callback; 803 wc->callback = callback;
748 wc->callback_cls = callback_cls; 804 wc->callback_cls = callback_cls;
749 wc->ev = ev; 805 wc->ev = ev;
750 wc->h = h; 806 wc->h = h;
751 wc->request_sent = GNUNET_NO; 807 wc->request_sent = GNUNET_NO;
752 GNUNET_CONTAINER_DLL_insert(h->watch_head, h->watch_tail, wc); 808 wc->keyhash = hm->keyhash;
809 if(NULL == h->watches)
810 h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
811 GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
812 wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
753 LOG(GNUNET_ERROR_TYPE_DEBUG, 813 LOG(GNUNET_ERROR_TYPE_DEBUG,
754 "Sending a watch request for sub system `%s'.\n", sub_system); 814 "Sending a watch request for sub system `%s'.\n", sub_system);
755 GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); 815 GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);