From 95cdeb5c0bb1f14f3959863e6bf4675db48ea177 Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Fri, 30 May 2014 16:06:00 +0000 Subject: peerstore: towards watch functionality --- src/peerstore/peerstore_api.c | 142 ++++++++++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 41 deletions(-) (limited to 'src/peerstore/peerstore_api.c') diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 14b1c3e88..c9a68f4bf 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c @@ -76,14 +76,9 @@ struct GNUNET_PEERSTORE_Handle struct GNUNET_PEERSTORE_IterateContext *iterate_tail; /** - * Head of WATCH requests (active and inactive). + * Hashmap of watch requests */ - struct GNUNET_PEERSTORE_WatchContext *watch_head; - - /** - * Tail of WATCH requests (active and inactive). - */ - struct GNUNET_PEERSTORE_WatchContext *watch_tail; + struct GNUNET_CONTAINER_MultiHashMap *watches; }; @@ -214,6 +209,11 @@ struct GNUNET_PEERSTORE_WatchContext */ void *callback_cls; + /** + * Hash of the combined key + */ + struct GNUNET_HashCode keyhash; + /** * #GNUNET_YES / #GNUNET_NO * if sent, cannot be canceled @@ -283,27 +283,6 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error) reconnect(h); } -/** - * Should be called only after destroying MQ and connection - */ -static void -cleanup_handle(struct GNUNET_PEERSTORE_Handle *h) -{ - struct GNUNET_PEERSTORE_StoreContext *sc; - struct GNUNET_PEERSTORE_IterateContext *ic; - - while (NULL != (sc = h->store_head)) - { - GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc); - GNUNET_free(sc); - } - while (NULL != (ic = h->iterate_head)) - { - GNUNET_CONTAINER_DLL_remove(h->iterate_head, h->iterate_tail, ic); - GNUNET_free(ic); - } -} - /** * Close the existing connection to PEERSTORE and reconnect. * @@ -312,7 +291,6 @@ cleanup_handle(struct GNUNET_PEERSTORE_Handle *h) static void reconnect (struct GNUNET_PEERSTORE_Handle *h) { - LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); if (NULL != h->mq) { @@ -324,12 +302,13 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) GNUNET_CLIENT_disconnect (h->client); h->client = NULL; } - cleanup_handle(h); h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); + //FIXME: retry connecting if fails again (client == NULL) h->mq = GNUNET_MQ_queue_for_connection_client(h->client, mq_handlers, &handle_client_error, h); + //FIXME: resend pending requests after reconnecting } @@ -373,6 +352,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) { + if(NULL != h->watches) + { + GNUNET_CONTAINER_multihashmap_destroy(h->watches); + h->watches = NULL; + } if(NULL != h->mq) { GNUNET_MQ_destroy(h->mq); @@ -383,7 +367,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) GNUNET_CLIENT_disconnect (h->client); h->client = NULL; } - cleanup_handle(h); GNUNET_free(h); LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n"); } @@ -655,7 +638,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) */ struct GNUNET_PEERSTORE_IterateContext * GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, - char *sub_system, + const char *sub_system, const struct GNUNET_PeerIdentity *peer, const char *key, struct GNUNET_TIME_Relative timeout, @@ -698,7 +681,54 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, */ void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) { + /*struct GNUNET_PEERSTORE_Handle *h = cls; + struct GNUNET_PEERSTORE_WatchContext *wc; + GNUNET_PEERSTORE_Processor callback; + void *callback_cls; + + + + struct GNUNET_PEERSTORE_IterateContext *ic; + uint16_t msg_type; + struct GNUNET_PEERSTORE_Record *record; + int continue_iter; + + ic = h->iterate_head; + if(NULL == ic) + { + LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n"); + reconnect(h); + return; + } + callback = ic->callback; + callback_cls = ic->callback_cls; + if(NULL == msg) * Connection error * + { + if(NULL != callback) + callback(callback_cls, NULL, + _("Error communicating with `PEERSTORE' service.")); + reconnect(h); + return; + } + msg_type = ntohs(msg->type); + if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) + { + GNUNET_PEERSTORE_iterate_cancel(ic); + if(NULL != callback) + callback(callback_cls, NULL, NULL); + return; + } + if(NULL != callback) + { + record = PEERSTORE_parse_record_message(msg); + if(NULL == record) + continue_iter = callback(callback_cls, record, _("Received a malformed response from service.")); + else + continue_iter = callback(callback_cls, record, NULL); + if(GNUNET_NO == continue_iter) + ic->callback = NULL; + }*/ } /** @@ -714,6 +744,36 @@ void watch_request_sent (void *cls) wc->ev = NULL; } +/** + * Cancel a watch request + * + * @wc handle to the watch request + */ +void +GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc) +{ + struct GNUNET_PEERSTORE_Handle *h = wc->h; + struct GNUNET_MQ_Envelope *ev; + struct StoreKeyHashMessage *hm; + + LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); + if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */ + { + ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); + GNUNET_MQ_send(h->mq, ev); + wc->callback = NULL; + wc->callback_cls = NULL; + } + if(NULL != wc->ev) + { + GNUNET_MQ_send_cancel(wc->ev); + wc->ev = NULL; + } + GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc); + GNUNET_free(wc); + +} + /** * Request watching a given key * User will be notified with any new values added to key @@ -728,28 +788,28 @@ void watch_request_sent (void *cls) */ struct GNUNET_PEERSTORE_WatchContext * GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, - char *sub_system, + const char *sub_system, const struct GNUNET_PeerIdentity *peer, const char *key, GNUNET_PEERSTORE_Processor callback, void *callback_cls) { struct GNUNET_MQ_Envelope *ev; + struct StoreKeyHashMessage *hm; struct GNUNET_PEERSTORE_WatchContext *wc; - ev = PEERSTORE_create_record_mq_envelope(sub_system, - peer, - key, - NULL, - 0, - NULL, - GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); + ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); + PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash); wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext); wc->callback = callback; wc->callback_cls = callback_cls; wc->ev = ev; wc->h = h; wc->request_sent = GNUNET_NO; - GNUNET_CONTAINER_DLL_insert(h->watch_head, h->watch_tail, wc); + wc->keyhash = hm->keyhash; + if(NULL == h->watches) + h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash, + wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending a watch request for sub system `%s'.\n", sub_system); GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); -- cgit v1.2.3