diff options
author | Omar Tarabai <tarabai@devegypt.com> | 2014-05-31 21:48:01 +0000 |
---|---|---|
committer | Omar Tarabai <tarabai@devegypt.com> | 2014-05-31 21:48:01 +0000 |
commit | 805d4a3556dee6d13ad5a9439b7a4a9ef3f9ab46 (patch) | |
tree | 63473a907807a2aadb7780353b4ad34ee6a041fa /src/peerstore | |
parent | 95cdeb5c0bb1f14f3959863e6bf4675db48ea177 (diff) | |
download | gnunet-805d4a3556dee6d13ad5a9439b7a4a9ef3f9ab46.tar.gz gnunet-805d4a3556dee6d13ad5a9439b7a4a9ef3f9ab46.zip |
peerstore: watch functionality
Diffstat (limited to 'src/peerstore')
-rw-r--r-- | src/peerstore/gnunet-service-peerstore.c | 94 | ||||
-rw-r--r-- | src/peerstore/peerstore_api.c | 68 | ||||
-rw-r--r-- | src/peerstore/test_peerstore_api.c | 1 |
3 files changed, 88 insertions, 75 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index 70d79ea5e..706fcaaae 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c | |||
@@ -73,6 +73,11 @@ static struct GNUNET_PEERSTORE_PluginFunctions *db; | |||
73 | static struct GNUNET_CONTAINER_MultiHashMap *watchers; | 73 | static struct GNUNET_CONTAINER_MultiHashMap *watchers; |
74 | 74 | ||
75 | /** | 75 | /** |
76 | * Our notification context. | ||
77 | */ | ||
78 | static struct GNUNET_SERVER_NotificationContext *nc; | ||
79 | |||
80 | /** | ||
76 | * Task run during shutdown. | 81 | * Task run during shutdown. |
77 | * | 82 | * |
78 | * @param cls unused | 83 | * @param cls unused |
@@ -88,8 +93,8 @@ shutdown_task (void *cls, | |||
88 | GNUNET_free (db_lib_name); | 93 | GNUNET_free (db_lib_name); |
89 | db_lib_name = NULL; | 94 | db_lib_name = NULL; |
90 | } | 95 | } |
91 | if(NULL != watchers) | 96 | GNUNET_SERVER_notification_context_destroy(nc); |
92 | GNUNET_CONTAINER_multihashmap_destroy(watchers); | 97 | GNUNET_CONTAINER_multihashmap_destroy(watchers); |
93 | GNUNET_SCHEDULER_shutdown(); | 98 | GNUNET_SCHEDULER_shutdown(); |
94 | } | 99 | } |
95 | 100 | ||
@@ -154,6 +159,60 @@ int record_iterator(void *cls, | |||
154 | } | 159 | } |
155 | 160 | ||
156 | /** | 161 | /** |
162 | * Iterator over all watcher clients | ||
163 | * to notify them of a new record | ||
164 | * | ||
165 | * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *' | ||
166 | * @param key hash of record key | ||
167 | * @param value the watcher client, a 'struct GNUNET_SERVER_Client *' | ||
168 | * @return #GNUNET_YES to continue iterating | ||
169 | */ | ||
170 | int watch_notifier_it(void *cls, | ||
171 | const struct GNUNET_HashCode *key, | ||
172 | void *value) | ||
173 | { | ||
174 | struct GNUNET_PEERSTORE_Record *record = cls; | ||
175 | struct GNUNET_SERVER_Client *client = value; | ||
176 | struct StoreRecordMessage *srm; | ||
177 | |||
178 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n"); | ||
179 | if(NULL == value) | ||
180 | { | ||
181 | GNUNET_CONTAINER_multihashmap_remove(watchers, key, value); | ||
182 | return GNUNET_YES; | ||
183 | } | ||
184 | srm = PEERSTORE_create_record_message(record->sub_system, | ||
185 | record->peer, | ||
186 | record->key, | ||
187 | record->value, | ||
188 | record->value_size, | ||
189 | record->expiry, | ||
190 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); | ||
191 | GNUNET_SERVER_notification_context_unicast(nc, client, | ||
192 | (const struct GNUNET_MessageHeader *)srm, GNUNET_YES); | ||
193 | return GNUNET_YES; | ||
194 | } | ||
195 | |||
196 | /** | ||
197 | * Given a new record, notifies watchers | ||
198 | * | ||
199 | * @cls closure, a 'struct GNUNET_PEERSTORE_Record *' | ||
200 | * @tc unused | ||
201 | */ | ||
202 | void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
203 | { | ||
204 | struct GNUNET_PEERSTORE_Record *record = cls; | ||
205 | struct GNUNET_HashCode keyhash; | ||
206 | |||
207 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n"); | ||
208 | PEERSTORE_hash_key(record->sub_system, | ||
209 | record->peer, | ||
210 | record->key, | ||
211 | &keyhash); | ||
212 | GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record); | ||
213 | } | ||
214 | |||
215 | /** | ||
157 | * Handle a watch cancel request from client | 216 | * Handle a watch cancel request from client |
158 | * | 217 | * |
159 | * @param cls unused | 218 | * @param cls unused |
@@ -167,13 +226,6 @@ void handle_watch_cancel (void *cls, | |||
167 | struct StoreKeyHashMessage *hm; | 226 | struct StoreKeyHashMessage *hm; |
168 | 227 | ||
169 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n"); | 228 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n"); |
170 | if(NULL == watchers) | ||
171 | { | ||
172 | GNUNET_log(GNUNET_ERROR_TYPE_WARNING, | ||
173 | "Received a watch cancel request when we don't have any watchers.\n"); | ||
174 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); | ||
175 | return; | ||
176 | } | ||
177 | hm = (struct StoreKeyHashMessage *) message; | 229 | hm = (struct StoreKeyHashMessage *) message; |
178 | GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client); | 230 | GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client); |
179 | GNUNET_SERVER_receive_done(client, GNUNET_OK); | 231 | GNUNET_SERVER_receive_done(client, GNUNET_OK); |
@@ -195,8 +247,7 @@ void handle_watch (void *cls, | |||
195 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n"); | 247 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n"); |
196 | hm = (struct StoreKeyHashMessage *) message; | 248 | hm = (struct StoreKeyHashMessage *) message; |
197 | GNUNET_SERVER_client_mark_monitor(client); | 249 | GNUNET_SERVER_client_mark_monitor(client); |
198 | if(NULL == watchers) | 250 | GNUNET_SERVER_notification_context_add(nc, client); |
199 | watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); | ||
200 | GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash, | 251 | GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash, |
201 | client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 252 | client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
202 | GNUNET_SERVER_receive_done(client, GNUNET_OK); | 253 | GNUNET_SERVER_receive_done(client, GNUNET_OK); |
@@ -246,7 +297,7 @@ void handle_iterate (void *cls, | |||
246 | GNUNET_free(tc); | 297 | GNUNET_free(tc); |
247 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); | 298 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); |
248 | } | 299 | } |
249 | GNUNET_free(record); | 300 | GNUNET_free(record); /* FIXME: destroy record */ |
250 | } | 301 | } |
251 | 302 | ||
252 | /** | 303 | /** |
@@ -261,7 +312,6 @@ void handle_store (void *cls, | |||
261 | const struct GNUNET_MessageHeader *message) | 312 | const struct GNUNET_MessageHeader *message) |
262 | { | 313 | { |
263 | struct GNUNET_PEERSTORE_Record *record; | 314 | struct GNUNET_PEERSTORE_Record *record; |
264 | uint16_t response_type; | ||
265 | struct GNUNET_SERVER_TransmitContext *tc; | 315 | struct GNUNET_SERVER_TransmitContext *tc; |
266 | 316 | ||
267 | record = PEERSTORE_parse_record_message(message); | 317 | record = PEERSTORE_parse_record_message(message); |
@@ -275,6 +325,7 @@ void handle_store (void *cls, | |||
275 | || NULL == record->peer | 325 | || NULL == record->peer |
276 | || NULL == record->key) | 326 | || NULL == record->key) |
277 | { | 327 | { |
328 | /* FIXME: Destroy record */ | ||
278 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n"); | 329 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n"); |
279 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); | 330 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); |
280 | return; | 331 | return; |
@@ -284,7 +335,7 @@ void handle_store (void *cls, | |||
284 | record->sub_system, | 335 | record->sub_system, |
285 | GNUNET_i2s (record->peer), | 336 | GNUNET_i2s (record->peer), |
286 | record->key); | 337 | record->key); |
287 | if(GNUNET_OK == db->store_record(db->cls, | 338 | if(GNUNET_OK != db->store_record(db->cls, |
288 | record->sub_system, | 339 | record->sub_system, |
289 | record->peer, | 340 | record->peer, |
290 | record->key, | 341 | record->key, |
@@ -292,18 +343,15 @@ void handle_store (void *cls, | |||
292 | record->value_size, | 343 | record->value_size, |
293 | *record->expiry)) | 344 | *record->expiry)) |
294 | { | 345 | { |
295 | response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK; | 346 | /* FIXME: Destroy record */ |
296 | } | ||
297 | else | ||
298 | { | ||
299 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error."); | 347 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error."); |
300 | response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL; | 348 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); |
349 | return; | ||
301 | } | 350 | } |
302 | |||
303 | tc = GNUNET_SERVER_transmit_context_create (client); | 351 | tc = GNUNET_SERVER_transmit_context_create (client); |
304 | GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type); | 352 | GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK); |
305 | GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); | 353 | GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); |
306 | //TODO: notify watchers, if a client is disconnected, remove its watch entry | 354 | GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1); |
307 | } | 355 | } |
308 | 356 | ||
309 | /** | 357 | /** |
@@ -343,6 +391,8 @@ run (void *cls, | |||
343 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name); | 391 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name); |
344 | else | 392 | else |
345 | { | 393 | { |
394 | nc = GNUNET_SERVER_notification_context_create (server, 16); | ||
395 | watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); | ||
346 | GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL); | 396 | GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL); |
347 | GNUNET_SERVER_add_handlers (server, handlers); | 397 | GNUNET_SERVER_add_handlers (server, handlers); |
348 | GNUNET_SERVER_disconnect_notify (server, | 398 | GNUNET_SERVER_disconnect_notify (server, |
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index c9a68f4bf..8748625b7 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c | |||
@@ -263,7 +263,6 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h); | |||
263 | */ | 263 | */ |
264 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 264 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
265 | {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)}, | 265 | {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)}, |
266 | {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)}, | ||
267 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, | 266 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, |
268 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)}, | 267 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)}, |
269 | {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, | 268 | {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, |
@@ -386,7 +385,6 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) | |||
386 | { | 385 | { |
387 | struct GNUNET_PEERSTORE_Handle *h = cls; | 386 | struct GNUNET_PEERSTORE_Handle *h = cls; |
388 | struct GNUNET_PEERSTORE_StoreContext *sc; | 387 | struct GNUNET_PEERSTORE_StoreContext *sc; |
389 | uint16_t msg_type; | ||
390 | GNUNET_PEERSTORE_Continuation cont; | 388 | GNUNET_PEERSTORE_Continuation cont; |
391 | void *cont_cls; | 389 | void *cont_cls; |
392 | 390 | ||
@@ -409,13 +407,7 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) | |||
409 | return; | 407 | return; |
410 | } | 408 | } |
411 | if(NULL != cont) /* Run continuation */ | 409 | if(NULL != cont) /* Run continuation */ |
412 | { | 410 | cont(cont_cls, GNUNET_OK); |
413 | msg_type = ntohs(msg->type); | ||
414 | if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type) | ||
415 | cont(cont_cls, GNUNET_OK); | ||
416 | else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type) | ||
417 | cont(cont_cls, GNUNET_SYSERR); | ||
418 | } | ||
419 | 411 | ||
420 | } | 412 | } |
421 | 413 | ||
@@ -681,54 +673,26 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
681 | */ | 673 | */ |
682 | void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) | 674 | void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg) |
683 | { | 675 | { |
684 | /*struct GNUNET_PEERSTORE_Handle *h = cls; | 676 | struct GNUNET_PEERSTORE_Handle *h = cls; |
685 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
686 | GNUNET_PEERSTORE_Processor callback; | ||
687 | void *callback_cls; | ||
688 | |||
689 | |||
690 | |||
691 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
692 | uint16_t msg_type; | ||
693 | struct GNUNET_PEERSTORE_Record *record; | 677 | struct GNUNET_PEERSTORE_Record *record; |
694 | int continue_iter; | 678 | struct GNUNET_HashCode keyhash; |
679 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
695 | 680 | ||
696 | ic = h->iterate_head; | 681 | if(NULL == msg) |
697 | if(NULL == ic) | ||
698 | { | ||
699 | LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n"); | ||
700 | reconnect(h); | ||
701 | return; | ||
702 | } | ||
703 | callback = ic->callback; | ||
704 | callback_cls = ic->callback_cls; | ||
705 | if(NULL == msg) * Connection error * | ||
706 | { | 682 | { |
707 | 683 | LOG(GNUNET_ERROR_TYPE_ERROR, | |
708 | if(NULL != callback) | 684 | "Problem receiving a watch response, no way to determine which request.\n"); |
709 | callback(callback_cls, NULL, | ||
710 | _("Error communicating with `PEERSTORE' service.")); | ||
711 | reconnect(h); | 685 | reconnect(h); |
712 | return; | 686 | return; |
713 | } | 687 | } |
714 | msg_type = ntohs(msg->type); | 688 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n"); |
715 | if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type) | 689 | record = PEERSTORE_parse_record_message(msg); |
716 | { | 690 | PEERSTORE_hash_key(record->sub_system, |
717 | GNUNET_PEERSTORE_iterate_cancel(ic); | 691 | record->peer, record->key, &keyhash); |
718 | if(NULL != callback) | 692 | wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash); |
719 | callback(callback_cls, NULL, NULL); | 693 | if(NULL != wc->callback) |
720 | return; | 694 | wc->callback(wc->callback_cls, record, NULL); |
721 | } | 695 | /* TODO: destroy record */ |
722 | if(NULL != callback) | ||
723 | { | ||
724 | record = PEERSTORE_parse_record_message(msg); | ||
725 | if(NULL == record) | ||
726 | continue_iter = callback(callback_cls, record, _("Received a malformed response from service.")); | ||
727 | else | ||
728 | continue_iter = callback(callback_cls, record, NULL); | ||
729 | if(GNUNET_NO == continue_iter) | ||
730 | ic->callback = NULL; | ||
731 | }*/ | ||
732 | } | 696 | } |
733 | 697 | ||
734 | /** | 698 | /** |
@@ -809,7 +773,7 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h, | |||
809 | if(NULL == h->watches) | 773 | if(NULL == h->watches) |
810 | h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); | 774 | h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO); |
811 | GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash, | 775 | GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash, |
812 | wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 776 | wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); |
813 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 777 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
814 | "Sending a watch request for sub system `%s'.\n", sub_system); | 778 | "Sending a watch request for sub system `%s'.\n", sub_system); |
815 | GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); | 779 | GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc); |
diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c index 7a512f664..02c8815c5 100644 --- a/src/peerstore/test_peerstore_api.c +++ b/src/peerstore/test_peerstore_api.c | |||
@@ -127,7 +127,6 @@ run (void *cls, | |||
127 | expiry, | 127 | expiry, |
128 | &store_cont, | 128 | &store_cont, |
129 | NULL); | 129 | NULL); |
130 | |||
131 | } | 130 | } |
132 | 131 | ||
133 | int iterator (void *cls, const struct GNUNET_HashCode *key, void *value) | 132 | int iterator (void *cls, const struct GNUNET_HashCode *key, void *value) |