aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-05-31 21:48:01 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-05-31 21:48:01 +0000
commit805d4a3556dee6d13ad5a9439b7a4a9ef3f9ab46 (patch)
tree63473a907807a2aadb7780353b4ad34ee6a041fa /src/peerstore
parent95cdeb5c0bb1f14f3959863e6bf4675db48ea177 (diff)
downloadgnunet-805d4a3556dee6d13ad5a9439b7a4a9ef3f9ab46.tar.gz
gnunet-805d4a3556dee6d13ad5a9439b7a4a9ef3f9ab46.zip
peerstore: watch functionality
Diffstat (limited to 'src/peerstore')
-rw-r--r--src/peerstore/gnunet-service-peerstore.c94
-rw-r--r--src/peerstore/peerstore_api.c68
-rw-r--r--src/peerstore/test_peerstore_api.c1
3 files changed, 88 insertions, 75 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index 70d79ea5e..706fcaaae 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -73,6 +73,11 @@ static struct GNUNET_PEERSTORE_PluginFunctions *db;
73static struct GNUNET_CONTAINER_MultiHashMap *watchers; 73static struct GNUNET_CONTAINER_MultiHashMap *watchers;
74 74
75/** 75/**
76 * Our notification context.
77 */
78static struct GNUNET_SERVER_NotificationContext *nc;
79
80/**
76 * Task run during shutdown. 81 * Task run during shutdown.
77 * 82 *
78 * @param cls unused 83 * @param cls unused
@@ -88,8 +93,8 @@ shutdown_task (void *cls,
88 GNUNET_free (db_lib_name); 93 GNUNET_free (db_lib_name);
89 db_lib_name = NULL; 94 db_lib_name = NULL;
90 } 95 }
91 if(NULL != watchers) 96 GNUNET_SERVER_notification_context_destroy(nc);
92 GNUNET_CONTAINER_multihashmap_destroy(watchers); 97 GNUNET_CONTAINER_multihashmap_destroy(watchers);
93 GNUNET_SCHEDULER_shutdown(); 98 GNUNET_SCHEDULER_shutdown();
94} 99}
95 100
@@ -154,6 +159,60 @@ int record_iterator(void *cls,
154} 159}
155 160
156/** 161/**
162 * Iterator over all watcher clients
163 * to notify them of a new record
164 *
165 * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
166 * @param key hash of record key
167 * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
168 * @return #GNUNET_YES to continue iterating
169 */
170int watch_notifier_it(void *cls,
171 const struct GNUNET_HashCode *key,
172 void *value)
173{
174 struct GNUNET_PEERSTORE_Record *record = cls;
175 struct GNUNET_SERVER_Client *client = value;
176 struct StoreRecordMessage *srm;
177
178 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
179 if(NULL == value)
180 {
181 GNUNET_CONTAINER_multihashmap_remove(watchers, key, value);
182 return GNUNET_YES;
183 }
184 srm = PEERSTORE_create_record_message(record->sub_system,
185 record->peer,
186 record->key,
187 record->value,
188 record->value_size,
189 record->expiry,
190 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
191 GNUNET_SERVER_notification_context_unicast(nc, client,
192 (const struct GNUNET_MessageHeader *)srm, GNUNET_YES);
193 return GNUNET_YES;
194}
195
196/**
197 * Given a new record, notifies watchers
198 *
199 * @cls closure, a 'struct GNUNET_PEERSTORE_Record *'
200 * @tc unused
201 */
202void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
203{
204 struct GNUNET_PEERSTORE_Record *record = cls;
205 struct GNUNET_HashCode keyhash;
206
207 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n");
208 PEERSTORE_hash_key(record->sub_system,
209 record->peer,
210 record->key,
211 &keyhash);
212 GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record);
213}
214
215/**
157 * Handle a watch cancel request from client 216 * Handle a watch cancel request from client
158 * 217 *
159 * @param cls unused 218 * @param cls unused
@@ -167,13 +226,6 @@ void handle_watch_cancel (void *cls,
167 struct StoreKeyHashMessage *hm; 226 struct StoreKeyHashMessage *hm;
168 227
169 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n"); 228 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n");
170 if(NULL == watchers)
171 {
172 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
173 "Received a watch cancel request when we don't have any watchers.\n");
174 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
175 return;
176 }
177 hm = (struct StoreKeyHashMessage *) message; 229 hm = (struct StoreKeyHashMessage *) message;
178 GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client); 230 GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client);
179 GNUNET_SERVER_receive_done(client, GNUNET_OK); 231 GNUNET_SERVER_receive_done(client, GNUNET_OK);
@@ -195,8 +247,7 @@ void handle_watch (void *cls,
195 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n"); 247 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n");
196 hm = (struct StoreKeyHashMessage *) message; 248 hm = (struct StoreKeyHashMessage *) message;
197 GNUNET_SERVER_client_mark_monitor(client); 249 GNUNET_SERVER_client_mark_monitor(client);
198 if(NULL == watchers) 250 GNUNET_SERVER_notification_context_add(nc, client);
199 watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
200 GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash, 251 GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash,
201 client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 252 client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
202 GNUNET_SERVER_receive_done(client, GNUNET_OK); 253 GNUNET_SERVER_receive_done(client, GNUNET_OK);
@@ -246,7 +297,7 @@ void handle_iterate (void *cls,
246 GNUNET_free(tc); 297 GNUNET_free(tc);
247 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); 298 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
248 } 299 }
249 GNUNET_free(record); 300 GNUNET_free(record); /* FIXME: destroy record */
250} 301}
251 302
252/** 303/**
@@ -261,7 +312,6 @@ void handle_store (void *cls,
261 const struct GNUNET_MessageHeader *message) 312 const struct GNUNET_MessageHeader *message)
262{ 313{
263 struct GNUNET_PEERSTORE_Record *record; 314 struct GNUNET_PEERSTORE_Record *record;
264 uint16_t response_type;
265 struct GNUNET_SERVER_TransmitContext *tc; 315 struct GNUNET_SERVER_TransmitContext *tc;
266 316
267 record = PEERSTORE_parse_record_message(message); 317 record = PEERSTORE_parse_record_message(message);
@@ -275,6 +325,7 @@ void handle_store (void *cls,
275 || NULL == record->peer 325 || NULL == record->peer
276 || NULL == record->key) 326 || NULL == record->key)
277 { 327 {
328 /* FIXME: Destroy record */
278 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n"); 329 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n");
279 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); 330 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
280 return; 331 return;
@@ -284,7 +335,7 @@ void handle_store (void *cls,
284 record->sub_system, 335 record->sub_system,
285 GNUNET_i2s (record->peer), 336 GNUNET_i2s (record->peer),
286 record->key); 337 record->key);
287 if(GNUNET_OK == db->store_record(db->cls, 338 if(GNUNET_OK != db->store_record(db->cls,
288 record->sub_system, 339 record->sub_system,
289 record->peer, 340 record->peer,
290 record->key, 341 record->key,
@@ -292,18 +343,15 @@ void handle_store (void *cls,
292 record->value_size, 343 record->value_size,
293 *record->expiry)) 344 *record->expiry))
294 { 345 {
295 response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK; 346 /* FIXME: Destroy record */
296 }
297 else
298 {
299 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error."); 347 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error.");
300 response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL; 348 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
349 return;
301 } 350 }
302
303 tc = GNUNET_SERVER_transmit_context_create (client); 351 tc = GNUNET_SERVER_transmit_context_create (client);
304 GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type); 352 GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK);
305 GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); 353 GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
306 //TODO: notify watchers, if a client is disconnected, remove its watch entry 354 GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1);
307} 355}
308 356
309/** 357/**
@@ -343,6 +391,8 @@ run (void *cls,
343 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name); 391 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name);
344 else 392 else
345 { 393 {
394 nc = GNUNET_SERVER_notification_context_create (server, 16);
395 watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
346 GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL); 396 GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL);
347 GNUNET_SERVER_add_handlers (server, handlers); 397 GNUNET_SERVER_add_handlers (server, handlers);
348 GNUNET_SERVER_disconnect_notify (server, 398 GNUNET_SERVER_disconnect_notify (server,
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index c9a68f4bf..8748625b7 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -263,7 +263,6 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h);
263 */ 263 */
264static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 264static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
265 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)}, 265 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)},
266 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)},
267 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, 266 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
268 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)}, 267 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
269 {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, 268 {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
@@ -386,7 +385,6 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
386{ 385{
387 struct GNUNET_PEERSTORE_Handle *h = cls; 386 struct GNUNET_PEERSTORE_Handle *h = cls;
388 struct GNUNET_PEERSTORE_StoreContext *sc; 387 struct GNUNET_PEERSTORE_StoreContext *sc;
389 uint16_t msg_type;
390 GNUNET_PEERSTORE_Continuation cont; 388 GNUNET_PEERSTORE_Continuation cont;
391 void *cont_cls; 389 void *cont_cls;
392 390
@@ -409,13 +407,7 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
409 return; 407 return;
410 } 408 }
411 if(NULL != cont) /* Run continuation */ 409 if(NULL != cont) /* Run continuation */
412 { 410 cont(cont_cls, GNUNET_OK);
413 msg_type = ntohs(msg->type);
414 if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
415 cont(cont_cls, GNUNET_OK);
416 else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
417 cont(cont_cls, GNUNET_SYSERR);
418 }
419 411
420} 412}
421 413
@@ -681,54 +673,26 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
681 */ 673 */
682void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) 674void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
683{ 675{
684 /*struct GNUNET_PEERSTORE_Handle *h = cls; 676 struct GNUNET_PEERSTORE_Handle *h = cls;
685 struct GNUNET_PEERSTORE_WatchContext *wc;
686 GNUNET_PEERSTORE_Processor callback;
687 void *callback_cls;
688
689
690
691 struct GNUNET_PEERSTORE_IterateContext *ic;
692 uint16_t msg_type;
693 struct GNUNET_PEERSTORE_Record *record; 677 struct GNUNET_PEERSTORE_Record *record;
694 int continue_iter; 678 struct GNUNET_HashCode keyhash;
679 struct GNUNET_PEERSTORE_WatchContext *wc;
695 680
696 ic = h->iterate_head; 681 if(NULL == msg)
697 if(NULL == ic)
698 {
699 LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
700 reconnect(h);
701 return;
702 }
703 callback = ic->callback;
704 callback_cls = ic->callback_cls;
705 if(NULL == msg) * Connection error *
706 { 682 {
707 683 LOG(GNUNET_ERROR_TYPE_ERROR,
708 if(NULL != callback) 684 "Problem receiving a watch response, no way to determine which request.\n");
709 callback(callback_cls, NULL,
710 _("Error communicating with `PEERSTORE' service."));
711 reconnect(h); 685 reconnect(h);
712 return; 686 return;
713 } 687 }
714 msg_type = ntohs(msg->type); 688 LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
715 if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) 689 record = PEERSTORE_parse_record_message(msg);
716 { 690 PEERSTORE_hash_key(record->sub_system,
717 GNUNET_PEERSTORE_iterate_cancel(ic); 691 record->peer, record->key, &keyhash);
718 if(NULL != callback) 692 wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash);
719 callback(callback_cls, NULL, NULL); 693 if(NULL != wc->callback)
720 return; 694 wc->callback(wc->callback_cls, record, NULL);
721 } 695 /* TODO: destroy record */
722 if(NULL != callback)
723 {
724 record = PEERSTORE_parse_record_message(msg);
725 if(NULL == record)
726 continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
727 else
728 continue_iter = callback(callback_cls, record, NULL);
729 if(GNUNET_NO == continue_iter)
730 ic->callback = NULL;
731 }*/
732} 696}
733 697
734/** 698/**
@@ -809,7 +773,7 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
809 if(NULL == h->watches) 773 if(NULL == h->watches)
810 h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); 774 h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
811 GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash, 775 GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
812 wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 776 wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
813 LOG(GNUNET_ERROR_TYPE_DEBUG, 777 LOG(GNUNET_ERROR_TYPE_DEBUG,
814 "Sending a watch request for sub system `%s'.\n", sub_system); 778 "Sending a watch request for sub system `%s'.\n", sub_system);
815 GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); 779 GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c
index 7a512f664..02c8815c5 100644
--- a/src/peerstore/test_peerstore_api.c
+++ b/src/peerstore/test_peerstore_api.c
@@ -127,7 +127,6 @@ run (void *cls,
127 expiry, 127 expiry,
128 &store_cont, 128 &store_cont,
129 NULL); 129 NULL);
130
131} 130}
132 131
133int iterator (void *cls, const struct GNUNET_HashCode *key, void *value) 132int iterator (void *cls, const struct GNUNET_HashCode *key, void *value)