From 805d4a3556dee6d13ad5a9439b7a4a9ef3f9ab46 Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Sat, 31 May 2014 21:48:01 +0000 Subject: peerstore: watch functionality --- src/peerstore/gnunet-service-peerstore.c | 94 ++++++++++++++++++++++++-------- src/peerstore/peerstore_api.c | 68 ++++++----------------- src/peerstore/test_peerstore_api.c | 1 - 3 files changed, 88 insertions(+), 75 deletions(-) (limited to 'src/peerstore') diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index 70d79ea5e..706fcaaae 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c @@ -72,6 +72,11 @@ static struct GNUNET_PEERSTORE_PluginFunctions *db; */ static struct GNUNET_CONTAINER_MultiHashMap *watchers; +/** + * Our notification context. + */ +static struct GNUNET_SERVER_NotificationContext *nc; + /** * Task run during shutdown. * @@ -88,8 +93,8 @@ shutdown_task (void *cls, GNUNET_free (db_lib_name); db_lib_name = NULL; } - if(NULL != watchers) - GNUNET_CONTAINER_multihashmap_destroy(watchers); + GNUNET_SERVER_notification_context_destroy(nc); + GNUNET_CONTAINER_multihashmap_destroy(watchers); GNUNET_SCHEDULER_shutdown(); } @@ -153,6 +158,60 @@ int record_iterator(void *cls, return GNUNET_YES; } +/** + * Iterator over all watcher clients + * to notify them of a new record + * + * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *' + * @param key hash of record key + * @param value the watcher client, a 'struct GNUNET_SERVER_Client *' + * @return #GNUNET_YES to continue iterating + */ +int watch_notifier_it(void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_PEERSTORE_Record *record = cls; + struct GNUNET_SERVER_Client *client = value; + struct StoreRecordMessage *srm; + + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n"); + if(NULL == value) + { + GNUNET_CONTAINER_multihashmap_remove(watchers, key, value); + return GNUNET_YES; + } + srm = PEERSTORE_create_record_message(record->sub_system, + record->peer, + record->key, + record->value, + record->value_size, + record->expiry, + GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); + GNUNET_SERVER_notification_context_unicast(nc, client, + (const struct GNUNET_MessageHeader *)srm, GNUNET_YES); + return GNUNET_YES; +} + +/** + * Given a new record, notifies watchers + * + * @cls closure, a 'struct GNUNET_PEERSTORE_Record *' + * @tc unused + */ +void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_PEERSTORE_Record *record = cls; + struct GNUNET_HashCode keyhash; + + GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n"); + PEERSTORE_hash_key(record->sub_system, + record->peer, + record->key, + &keyhash); + GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record); +} + /** * Handle a watch cancel request from client * @@ -167,13 +226,6 @@ void handle_watch_cancel (void *cls, struct StoreKeyHashMessage *hm; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n"); - if(NULL == watchers) - { - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, - "Received a watch cancel request when we don't have any watchers.\n"); - GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); - return; - } hm = (struct StoreKeyHashMessage *) message; GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client); GNUNET_SERVER_receive_done(client, GNUNET_OK); @@ -195,8 +247,7 @@ void handle_watch (void *cls, GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n"); hm = (struct StoreKeyHashMessage *) message; GNUNET_SERVER_client_mark_monitor(client); - if(NULL == watchers) - watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); + GNUNET_SERVER_notification_context_add(nc, client); GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash, client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_SERVER_receive_done(client, GNUNET_OK); @@ -246,7 +297,7 @@ void handle_iterate (void *cls, GNUNET_free(tc); GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); } - GNUNET_free(record); + GNUNET_free(record); /* FIXME: destroy record */ } /** @@ -261,7 +312,6 @@ void handle_store (void *cls, const struct GNUNET_MessageHeader *message) { struct GNUNET_PEERSTORE_Record *record; - uint16_t response_type; struct GNUNET_SERVER_TransmitContext *tc; record = PEERSTORE_parse_record_message(message); @@ -275,6 +325,7 @@ void handle_store (void *cls, || NULL == record->peer || NULL == record->key) { + /* FIXME: Destroy record */ GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n"); GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); return; @@ -284,7 +335,7 @@ void handle_store (void *cls, record->sub_system, GNUNET_i2s (record->peer), record->key); - if(GNUNET_OK == db->store_record(db->cls, + if(GNUNET_OK != db->store_record(db->cls, record->sub_system, record->peer, record->key, @@ -292,18 +343,15 @@ void handle_store (void *cls, record->value_size, *record->expiry)) { - response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK; - } - else - { + /* FIXME: Destroy record */ GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error."); - response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL; + GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); + return; } - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type); + GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK); GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); - //TODO: notify watchers, if a client is disconnected, remove its watch entry + GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1); } /** @@ -343,6 +391,8 @@ run (void *cls, GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name); else { + nc = GNUNET_SERVER_notification_context_create (server, 16); + watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL); GNUNET_SERVER_add_handlers (server, handlers); GNUNET_SERVER_disconnect_notify (server, diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index c9a68f4bf..8748625b7 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c @@ -263,7 +263,6 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h); */ static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)}, - {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)}, {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)}, {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, @@ -386,7 +385,6 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_PEERSTORE_Handle *h = cls; struct GNUNET_PEERSTORE_StoreContext *sc; - uint16_t msg_type; GNUNET_PEERSTORE_Continuation cont; void *cont_cls; @@ -409,13 +407,7 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) return; } if(NULL != cont) /* Run continuation */ - { - msg_type = ntohs(msg->type); - if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type) - cont(cont_cls, GNUNET_OK); - else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type) - cont(cont_cls, GNUNET_SYSERR); - } + cont(cont_cls, GNUNET_OK); } @@ -681,54 +673,26 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, */ void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) { - /*struct GNUNET_PEERSTORE_Handle *h = cls; - struct GNUNET_PEERSTORE_WatchContext *wc; - GNUNET_PEERSTORE_Processor callback; - void *callback_cls; - - - - struct GNUNET_PEERSTORE_IterateContext *ic; - uint16_t msg_type; + struct GNUNET_PEERSTORE_Handle *h = cls; struct GNUNET_PEERSTORE_Record *record; - int continue_iter; + struct GNUNET_HashCode keyhash; + struct GNUNET_PEERSTORE_WatchContext *wc; - ic = h->iterate_head; - if(NULL == ic) - { - LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n"); - reconnect(h); - return; - } - callback = ic->callback; - callback_cls = ic->callback_cls; - if(NULL == msg) * Connection error * + if(NULL == msg) { - - if(NULL != callback) - callback(callback_cls, NULL, - _("Error communicating with `PEERSTORE' service.")); + LOG(GNUNET_ERROR_TYPE_ERROR, + "Problem receiving a watch response, no way to determine which request.\n"); reconnect(h); return; } - msg_type = ntohs(msg->type); - if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) - { - GNUNET_PEERSTORE_iterate_cancel(ic); - if(NULL != callback) - callback(callback_cls, NULL, NULL); - return; - } - if(NULL != callback) - { - record = PEERSTORE_parse_record_message(msg); - if(NULL == record) - continue_iter = callback(callback_cls, record, _("Received a malformed response from service.")); - else - continue_iter = callback(callback_cls, record, NULL); - if(GNUNET_NO == continue_iter) - ic->callback = NULL; - }*/ + LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); + record = PEERSTORE_parse_record_message(msg); + PEERSTORE_hash_key(record->sub_system, + record->peer, record->key, &keyhash); + wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash); + if(NULL != wc->callback) + wc->callback(wc->callback_cls, record, NULL); + /* TODO: destroy record */ } /** @@ -809,7 +773,7 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, if(NULL == h->watches) h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash, - wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending a watch request for sub system `%s'.\n", sub_system); GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c index 7a512f664..02c8815c5 100644 --- a/src/peerstore/test_peerstore_api.c +++ b/src/peerstore/test_peerstore_api.c @@ -127,7 +127,6 @@ run (void *cls, expiry, &store_cont, NULL); - } int iterator (void *cls, const struct GNUNET_HashCode *key, void *value) -- cgit v1.2.3