From 95cdeb5c0bb1f14f3959863e6bf4675db48ea177 Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Fri, 30 May 2014 16:06:00 +0000 Subject: peerstore: towards watch functionality --- src/peerstore/gnunet-service-peerstore.c | 79 ++++++++++++++++- src/peerstore/peerstore.h | 18 ++++ src/peerstore/peerstore_api.c | 142 ++++++++++++++++++++++--------- src/peerstore/peerstore_common.c | 32 +++++++ src/peerstore/peerstore_common.h | 10 +++ src/peerstore/test_peerstore_api.c | 38 +++++++++ 6 files changed, 275 insertions(+), 44 deletions(-) (limited to 'src/peerstore') diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index c410630c9..70d79ea5e 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c @@ -29,7 +29,23 @@ #include "gnunet_peerstore_plugin.h" #include "peerstore_common.h" -//TODO: GNUNET_SERVER_receive_done() ? +/** + * Context of a PEERSTORE watch + */ +struct WatchContext +{ + + /** + * Hash of key of watched record + */ + struct GNUNET_HashCode keyhash; + + /** + * Client requested the watch + */ + struct GNUNET_SERVER_Client *client; + +}; /** * Interval for expired records cleanup (in seconds) @@ -51,6 +67,11 @@ char *db_lib_name; */ static struct GNUNET_PEERSTORE_PluginFunctions *db; +/** + * Hashmap with all watch requests + */ +static struct GNUNET_CONTAINER_MultiHashMap *watchers; + /** * Task run during shutdown. * @@ -67,7 +88,8 @@ shutdown_task (void *cls, GNUNET_free (db_lib_name); db_lib_name = NULL; } - + if(NULL != watchers) + GNUNET_CONTAINER_multihashmap_destroy(watchers); GNUNET_SCHEDULER_shutdown(); } @@ -131,6 +153,55 @@ int record_iterator(void *cls, return GNUNET_YES; } +/** + * Handle a watch cancel request from client + * + * @param cls unused + * @param client identification of the client + * @param message the actual message + */ +void handle_watch_cancel (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct StoreKeyHashMessage *hm; + + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n"); + if(NULL == watchers) + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, + "Received a watch cancel request when we don't have any watchers.\n"); + GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); + return; + } + hm = (struct StoreKeyHashMessage *) message; + GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client); + GNUNET_SERVER_receive_done(client, GNUNET_OK); +} + +/** + * Handle a watch request from client + * + * @param cls unused + * @param client identification of the client + * @param message the actual message + */ +void handle_watch (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct StoreKeyHashMessage *hm; + + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n"); + hm = (struct StoreKeyHashMessage *) message; + GNUNET_SERVER_client_mark_monitor(client); + if(NULL == watchers) + watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash, + client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_SERVER_receive_done(client, GNUNET_OK); +} + /** * Handle an iterate request from client * @@ -232,7 +303,7 @@ void handle_store (void *cls, tc = GNUNET_SERVER_transmit_context_create (client); GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type); GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); - + //TODO: notify watchers, if a client is disconnected, remove its watch entry } /** @@ -250,6 +321,8 @@ run (void *cls, static const struct GNUNET_SERVER_MessageHandler handlers[] = { {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0}, {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0}, + {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, sizeof(struct StoreKeyHashMessage)}, + {&handle_watch_cancel, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, sizeof(struct StoreKeyHashMessage)}, {NULL, NULL, 0, 0} }; char *database; diff --git a/src/peerstore/peerstore.h b/src/peerstore/peerstore.h index 7c6e6bdbc..5adf9f363 100644 --- a/src/peerstore/peerstore.h +++ b/src/peerstore/peerstore.h @@ -77,6 +77,24 @@ struct StoreRecordMessage }; +/** + * Message carrying record key hash + */ +struct StoreKeyHashMessage +{ + + /** + * GNUnet message header + */ + struct GNUNET_MessageHeader header; + + /** + * Hash of a record key + */ + struct GNUNET_HashCode keyhash; + +}; + GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 14b1c3e88..c9a68f4bf 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c @@ -76,14 +76,9 @@ struct GNUNET_PEERSTORE_Handle struct GNUNET_PEERSTORE_IterateContext *iterate_tail; /** - * Head of WATCH requests (active and inactive). + * Hashmap of watch requests */ - struct GNUNET_PEERSTORE_WatchContext *watch_head; - - /** - * Tail of WATCH requests (active and inactive). - */ - struct GNUNET_PEERSTORE_WatchContext *watch_tail; + struct GNUNET_CONTAINER_MultiHashMap *watches; }; @@ -214,6 +209,11 @@ struct GNUNET_PEERSTORE_WatchContext */ void *callback_cls; + /** + * Hash of the combined key + */ + struct GNUNET_HashCode keyhash; + /** * #GNUNET_YES / #GNUNET_NO * if sent, cannot be canceled @@ -283,27 +283,6 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error) reconnect(h); } -/** - * Should be called only after destroying MQ and connection - */ -static void -cleanup_handle(struct GNUNET_PEERSTORE_Handle *h) -{ - struct GNUNET_PEERSTORE_StoreContext *sc; - struct GNUNET_PEERSTORE_IterateContext *ic; - - while (NULL != (sc = h->store_head)) - { - GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc); - GNUNET_free(sc); - } - while (NULL != (ic = h->iterate_head)) - { - GNUNET_CONTAINER_DLL_remove(h->iterate_head, h->iterate_tail, ic); - GNUNET_free(ic); - } -} - /** * Close the existing connection to PEERSTORE and reconnect. * @@ -312,7 +291,6 @@ cleanup_handle(struct GNUNET_PEERSTORE_Handle *h) static void reconnect (struct GNUNET_PEERSTORE_Handle *h) { - LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); if (NULL != h->mq) { @@ -324,12 +302,13 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) GNUNET_CLIENT_disconnect (h->client); h->client = NULL; } - cleanup_handle(h); h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); + //FIXME: retry connecting if fails again (client == NULL) h->mq = GNUNET_MQ_queue_for_connection_client(h->client, mq_handlers, &handle_client_error, h); + //FIXME: resend pending requests after reconnecting } @@ -373,6 +352,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) { + if(NULL != h->watches) + { + GNUNET_CONTAINER_multihashmap_destroy(h->watches); + h->watches = NULL; + } if(NULL != h->mq) { GNUNET_MQ_destroy(h->mq); @@ -383,7 +367,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) GNUNET_CLIENT_disconnect (h->client); h->client = NULL; } - cleanup_handle(h); GNUNET_free(h); LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n"); } @@ -655,7 +638,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic) */ struct GNUNET_PEERSTORE_IterateContext * GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, - char *sub_system, + const char *sub_system, const struct GNUNET_PeerIdentity *peer, const char *key, struct GNUNET_TIME_Relative timeout, @@ -698,7 +681,54 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, */ void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) { + /*struct GNUNET_PEERSTORE_Handle *h = cls; + struct GNUNET_PEERSTORE_WatchContext *wc; + GNUNET_PEERSTORE_Processor callback; + void *callback_cls; + + + + struct GNUNET_PEERSTORE_IterateContext *ic; + uint16_t msg_type; + struct GNUNET_PEERSTORE_Record *record; + int continue_iter; + + ic = h->iterate_head; + if(NULL == ic) + { + LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n"); + reconnect(h); + return; + } + callback = ic->callback; + callback_cls = ic->callback_cls; + if(NULL == msg) * Connection error * + { + if(NULL != callback) + callback(callback_cls, NULL, + _("Error communicating with `PEERSTORE' service.")); + reconnect(h); + return; + } + msg_type = ntohs(msg->type); + if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) + { + GNUNET_PEERSTORE_iterate_cancel(ic); + if(NULL != callback) + callback(callback_cls, NULL, NULL); + return; + } + if(NULL != callback) + { + record = PEERSTORE_parse_record_message(msg); + if(NULL == record) + continue_iter = callback(callback_cls, record, _("Received a malformed response from service.")); + else + continue_iter = callback(callback_cls, record, NULL); + if(GNUNET_NO == continue_iter) + ic->callback = NULL; + }*/ } /** @@ -714,6 +744,36 @@ void watch_request_sent (void *cls) wc->ev = NULL; } +/** + * Cancel a watch request + * + * @wc handle to the watch request + */ +void +GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc) +{ + struct GNUNET_PEERSTORE_Handle *h = wc->h; + struct GNUNET_MQ_Envelope *ev; + struct StoreKeyHashMessage *hm; + + LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n"); + if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */ + { + ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL); + GNUNET_MQ_send(h->mq, ev); + wc->callback = NULL; + wc->callback_cls = NULL; + } + if(NULL != wc->ev) + { + GNUNET_MQ_send_cancel(wc->ev); + wc->ev = NULL; + } + GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc); + GNUNET_free(wc); + +} + /** * Request watching a given key * User will be notified with any new values added to key @@ -728,28 +788,28 @@ void watch_request_sent (void *cls) */ struct GNUNET_PEERSTORE_WatchContext * GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, - char *sub_system, + const char *sub_system, const struct GNUNET_PeerIdentity *peer, const char *key, GNUNET_PEERSTORE_Processor callback, void *callback_cls) { struct GNUNET_MQ_Envelope *ev; + struct StoreKeyHashMessage *hm; struct GNUNET_PEERSTORE_WatchContext *wc; - ev = PEERSTORE_create_record_mq_envelope(sub_system, - peer, - key, - NULL, - 0, - NULL, - GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); + ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); + PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash); wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext); wc->callback = callback; wc->callback_cls = callback_cls; wc->ev = ev; wc->h = h; wc->request_sent = GNUNET_NO; - GNUNET_CONTAINER_DLL_insert(h->watch_head, h->watch_tail, wc); + wc->keyhash = hm->keyhash; + if(NULL == h->watches) + h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash, + wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending a watch request for sub system `%s'.\n", sub_system); GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); diff --git a/src/peerstore/peerstore_common.c b/src/peerstore/peerstore_common.c index 5783973b6..2b62abf19 100644 --- a/src/peerstore/peerstore_common.c +++ b/src/peerstore/peerstore_common.c @@ -25,6 +25,38 @@ #include "peerstore_common.h" +/** + * Creates a hash of the given key combination + * + */ +void +PEERSTORE_hash_key(const char *sub_system, + const struct GNUNET_PeerIdentity *peer, + const char *key, + struct GNUNET_HashCode *ret) +{ + size_t sssize; + size_t psize; + size_t ksize; + size_t totalsize; + void *block; + void *blockptr; + + sssize = strlen(sub_system) + 1; + psize = sizeof(struct GNUNET_PeerIdentity); + ksize = strlen(sub_system) + 1; + totalsize = sssize + psize + ksize; + block = GNUNET_malloc(totalsize); + blockptr = block; + memcpy(blockptr, sub_system, sssize); + blockptr += sssize; + memcpy(blockptr, peer, psize); + blockptr += psize; + memcpy(blockptr, key, ksize); + GNUNET_CRYPTO_hash(block, totalsize, ret); + GNUNET_free(block); +} + /** * Creates a record message ready to be sent * diff --git a/src/peerstore/peerstore_common.h b/src/peerstore/peerstore_common.h index cd918497b..20cb9c0e7 100644 --- a/src/peerstore/peerstore_common.h +++ b/src/peerstore/peerstore_common.h @@ -26,6 +26,16 @@ #include "peerstore.h" +/** + * Creates a hash of the given key combination + * + */ +void +PEERSTORE_hash_key(const char *sub_system, + const struct GNUNET_PeerIdentity *peer, + const char *key, + struct GNUNET_HashCode *ret); + /** * Creates a record message ready to be sent * diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c index fe62933e7..7a512f664 100644 --- a/src/peerstore/test_peerstore_api.c +++ b/src/peerstore/test_peerstore_api.c @@ -27,6 +27,8 @@ #include "gnunet_peerstore_service.h" #include +//TODO: test single cycle of watch, store, iterate + static int ok = 1; static int counter = 0; @@ -76,6 +78,25 @@ void store_cont(void *cls, int success) NULL); } +int watch_cb (void *cls, + struct GNUNET_PEERSTORE_Record *record, + char *emsg) +{ + if(NULL != emsg) + { + printf("Error received: %s.\n", emsg); + return GNUNET_YES; + } + + printf("Watch Record:\n"); + printf("Sub system: %s\n", record->sub_system); + printf("Peer: %s\n", GNUNET_i2s (record->peer)); + printf("Key: %s\n", record->key); + printf("Value: %.*s\n", (int)record->value_size, (char *)record->value); + printf("Expiry: %" PRIu64 "\n", record->expiry->abs_value_us); + return GNUNET_YES; +} + static void run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, @@ -91,6 +112,12 @@ run (void *cls, expiry = GNUNET_TIME_absolute_get(); h = GNUNET_PEERSTORE_connect(cfg); GNUNET_assert(NULL != h); + GNUNET_PEERSTORE_watch(h, + "peerstore-test", + &pid, + "peerstore-test-key", + &watch_cb, + NULL); GNUNET_PEERSTORE_store(h, "peerstore-test", &pid, @@ -103,6 +130,17 @@ run (void *cls, } +int iterator (void *cls, const struct GNUNET_HashCode *key, void *value) +{ + struct GNUNET_CONTAINER_MultiHashMap *map = cls; + uint32_t *x = value; + + printf("Received value: %d\n", *x); + if(*x == 2) + GNUNET_CONTAINER_multihashmap_remove(map, key, value); + return GNUNET_YES; +} + int main (int argc, char *argv[]) { -- cgit v1.2.3