diff options
author | Omar Tarabai <tarabai@devegypt.com> | 2014-06-06 11:55:14 +0000 |
---|---|---|
committer | Omar Tarabai <tarabai@devegypt.com> | 2014-06-06 11:55:14 +0000 |
commit | bdfda4dd17faf721d82bfee411dcaf2777012a9f (patch) | |
tree | da228559f7992fe39e29e7cd4b908ec89a8d5e7e /src | |
parent | f92f3f8a717f1a3b332bec4f162ad8da237f2e89 (diff) | |
download | gnunet-bdfda4dd17faf721d82bfee411dcaf2777012a9f.tar.gz gnunet-bdfda4dd17faf721d82bfee411dcaf2777012a9f.zip |
using PEERSTORE in SENSOR + fixes in PEERSTORE
Diffstat (limited to 'src')
-rw-r--r-- | src/peerstore/gnunet-service-peerstore.c | 4 | ||||
-rw-r--r-- | src/peerstore/peerstore_api.c | 89 | ||||
-rw-r--r-- | src/sensor/Makefile.am | 1 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor.c | 57 |
4 files changed, 140 insertions, 11 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index c620bd57b..140db80d8 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c | |||
@@ -32,7 +32,7 @@ | |||
32 | /** | 32 | /** |
33 | * Interval for expired records cleanup (in seconds) | 33 | * Interval for expired records cleanup (in seconds) |
34 | */ | 34 | */ |
35 | #define CLEANUP_INTERVAL 300 /* 5mins */ | 35 | #define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */ |
36 | 36 | ||
37 | /** | 37 | /** |
38 | * Our configuration. | 38 | * Our configuration. |
@@ -96,7 +96,7 @@ cleanup_expired_records(void *cls, | |||
96 | deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get()); | 96 | deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get()); |
97 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted); | 97 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted); |
98 | GNUNET_SCHEDULER_add_delayed( | 98 | GNUNET_SCHEDULER_add_delayed( |
99 | GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, CLEANUP_INTERVAL), | 99 | GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL), |
100 | &cleanup_expired_records, NULL); | 100 | &cleanup_expired_records, NULL); |
101 | } | 101 | } |
102 | 102 | ||
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 2b1cc6a1d..b53bc2f1a 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c | |||
@@ -245,6 +245,27 @@ static void | |||
245 | reconnect (struct GNUNET_PEERSTORE_Handle *h); | 245 | reconnect (struct GNUNET_PEERSTORE_Handle *h); |
246 | 246 | ||
247 | /** | 247 | /** |
248 | * Callback after MQ envelope is sent | ||
249 | * | ||
250 | * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *' | ||
251 | */ | ||
252 | void watch_request_sent (void *cls); | ||
253 | |||
254 | /** | ||
255 | * Callback after MQ envelope is sent | ||
256 | * | ||
257 | * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *' | ||
258 | */ | ||
259 | void iterate_request_sent (void *cls); | ||
260 | |||
261 | /** | ||
262 | * Callback after MQ envelope is sent | ||
263 | * | ||
264 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' | ||
265 | */ | ||
266 | void store_request_sent (void *cls); | ||
267 | |||
268 | /** | ||
248 | * MQ message handlers | 269 | * MQ message handlers |
249 | */ | 270 | */ |
250 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 271 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
@@ -268,6 +289,28 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error) | |||
268 | } | 289 | } |
269 | 290 | ||
270 | /** | 291 | /** |
292 | * Iterator over previous watches to resend them | ||
293 | */ | ||
294 | int rewatch_it(void *cls, | ||
295 | const struct GNUNET_HashCode *key, | ||
296 | void *value) | ||
297 | { | ||
298 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
299 | struct GNUNET_PEERSTORE_WatchContext *wc = value; | ||
300 | struct StoreKeyHashMessage *hm; | ||
301 | |||
302 | if(GNUNET_YES == wc->request_sent) | ||
303 | { /* Envelope gone, create new one. */ | ||
304 | wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); | ||
305 | hm->keyhash = wc->keyhash; | ||
306 | wc->request_sent = GNUNET_NO; | ||
307 | } | ||
308 | GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc); | ||
309 | GNUNET_MQ_send(h->mq, wc->ev); | ||
310 | return GNUNET_YES; | ||
311 | } | ||
312 | |||
313 | /** | ||
271 | * Close the existing connection to PEERSTORE and reconnect. | 314 | * Close the existing connection to PEERSTORE and reconnect. |
272 | * | 315 | * |
273 | * @param h handle to the service | 316 | * @param h handle to the service |
@@ -275,6 +318,11 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error) | |||
275 | static void | 318 | static void |
276 | reconnect (struct GNUNET_PEERSTORE_Handle *h) | 319 | reconnect (struct GNUNET_PEERSTORE_Handle *h) |
277 | { | 320 | { |
321 | struct GNUNET_PEERSTORE_IterateContext *ic; | ||
322 | GNUNET_PEERSTORE_Processor icb; | ||
323 | void *icb_cls; | ||
324 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
325 | |||
278 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); | 326 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); |
279 | if (NULL != h->mq) | 327 | if (NULL != h->mq) |
280 | { | 328 | { |
@@ -287,13 +335,43 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
287 | h->client = NULL; | 335 | h->client = NULL; |
288 | } | 336 | } |
289 | h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); | 337 | h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); |
290 | //FIXME: retry connecting if fails again (client == NULL) | 338 | GNUNET_assert(NULL != h->client); |
291 | h->mq = GNUNET_MQ_queue_for_connection_client(h->client, | 339 | h->mq = GNUNET_MQ_queue_for_connection_client(h->client, |
292 | mq_handlers, | 340 | mq_handlers, |
293 | &handle_client_error, | 341 | &handle_client_error, |
294 | h); | 342 | h); |
295 | //FIXME: resend pending requests after reconnecting | 343 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
296 | 344 | "Resending pending requests after reconnect.\n"); | |
345 | if (NULL != h->watches) | ||
346 | { | ||
347 | GNUNET_CONTAINER_multihashmap_iterate(h->watches, | ||
348 | &rewatch_it, h); | ||
349 | } | ||
350 | ic = h->iterate_head; | ||
351 | while (NULL != ic) | ||
352 | { | ||
353 | if (GNUNET_YES == ic->request_sent) | ||
354 | { | ||
355 | icb = ic->callback; | ||
356 | icb_cls = ic->callback_cls; | ||
357 | GNUNET_PEERSTORE_iterate_cancel(ic); | ||
358 | if(NULL != icb) | ||
359 | icb(icb_cls, NULL,_("Iteration canceled due to reconnection.")); | ||
360 | } | ||
361 | else | ||
362 | { | ||
363 | GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic); | ||
364 | GNUNET_MQ_send(h->mq, ic->ev); | ||
365 | } | ||
366 | ic = ic->next; | ||
367 | } | ||
368 | sc = h->store_head; | ||
369 | while (NULL != sc) | ||
370 | { | ||
371 | GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc); | ||
372 | GNUNET_MQ_send(h->mq, sc->ev); | ||
373 | sc = sc->next; | ||
374 | } | ||
297 | } | 375 | } |
298 | 376 | ||
299 | /** | 377 | /** |
@@ -336,6 +414,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
336 | void | 414 | void |
337 | GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) | 415 | GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) |
338 | { | 416 | { |
417 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n"); | ||
339 | if(NULL != h->watches) | 418 | if(NULL != h->watches) |
340 | { | 419 | { |
341 | GNUNET_CONTAINER_multihashmap_destroy(h->watches); | 420 | GNUNET_CONTAINER_multihashmap_destroy(h->watches); |
@@ -442,7 +521,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
442 | sc->cont = cont; | 521 | sc->cont = cont; |
443 | sc->cont_cls = cont_cls; | 522 | sc->cont_cls = cont_cls; |
444 | sc->h = h; | 523 | sc->h = h; |
445 | GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); | 524 | GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc); |
446 | GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); | 525 | GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); |
447 | GNUNET_MQ_send(h->mq, ev); | 526 | GNUNET_MQ_send(h->mq, ev); |
448 | return sc; | 527 | return sc; |
@@ -604,7 +683,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, | |||
604 | ic->ev = ev; | 683 | ic->ev = ev; |
605 | ic->h = h; | 684 | ic->h = h; |
606 | ic->request_sent = GNUNET_NO; | 685 | ic->request_sent = GNUNET_NO; |
607 | GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic); | 686 | GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic); |
608 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 687 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
609 | "Sending an iterate request for sub system `%s'\n", sub_system); | 688 | "Sending an iterate request for sub system `%s'\n", sub_system); |
610 | GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic); | 689 | GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic); |
diff --git a/src/sensor/Makefile.am b/src/sensor/Makefile.am index e3d9bbe69..b6c55f5e6 100644 --- a/src/sensor/Makefile.am +++ b/src/sensor/Makefile.am | |||
@@ -36,6 +36,7 @@ gnunet_service_sensor_SOURCES = \ | |||
36 | gnunet_service_sensor_LDADD = \ | 36 | gnunet_service_sensor_LDADD = \ |
37 | $(top_builddir)/src/util/libgnunetutil.la \ | 37 | $(top_builddir)/src/util/libgnunetutil.la \ |
38 | $(top_builddir)/src/statistics/libgnunetstatistics.la \ | 38 | $(top_builddir)/src/statistics/libgnunetstatistics.la \ |
39 | $(top_builddir)/src/peerstore/libgnunetpeerstore.la \ | ||
39 | $(GN_LIBINTL) | 40 | $(GN_LIBINTL) |
40 | 41 | ||
41 | libgnunetsensor_la_SOURCES = \ | 42 | libgnunetsensor_la_SOURCES = \ |
diff --git a/src/sensor/gnunet-service-sensor.c b/src/sensor/gnunet-service-sensor.c index 6d47cb1e2..b42562ace 100644 --- a/src/sensor/gnunet-service-sensor.c +++ b/src/sensor/gnunet-service-sensor.c | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
29 | #include "sensor.h" | 29 | #include "sensor.h" |
30 | #include "gnunet_statistics_service.h" | 30 | #include "gnunet_statistics_service.h" |
31 | #include "gnunet_peerstore_service.h" | ||
31 | 32 | ||
32 | /** | 33 | /** |
33 | * Minimum sensor execution interval (in seconds) | 34 | * Minimum sensor execution interval (in seconds) |
@@ -204,6 +205,21 @@ static const char *datatypes[] = { "uint64", "double", "string", NULL }; | |||
204 | struct GNUNET_STATISTICS_Handle *statistics; | 205 | struct GNUNET_STATISTICS_Handle *statistics; |
205 | 206 | ||
206 | /** | 207 | /** |
208 | * Handle to peerstore service | ||
209 | */ | ||
210 | struct GNUNET_PEERSTORE_Handle *peerstore; | ||
211 | |||
212 | /** | ||
213 | * Service name | ||
214 | */ | ||
215 | char *subsystem = "sensor"; | ||
216 | |||
217 | /** | ||
218 | * My peer id | ||
219 | */ | ||
220 | struct GNUNET_PeerIdentity peerid; | ||
221 | |||
222 | /** | ||
207 | * Remove sensor execution from scheduler | 223 | * Remove sensor execution from scheduler |
208 | * | 224 | * |
209 | * @param cls unused | 225 | * @param cls unused |
@@ -290,7 +306,15 @@ shutdown_task (void *cls, | |||
290 | GNUNET_CONTAINER_multihashmap_iterate(sensors, &destroy_sensor, NULL); | 306 | GNUNET_CONTAINER_multihashmap_iterate(sensors, &destroy_sensor, NULL); |
291 | GNUNET_CONTAINER_multihashmap_destroy(sensors); | 307 | GNUNET_CONTAINER_multihashmap_destroy(sensors); |
292 | if(NULL != statistics) | 308 | if(NULL != statistics) |
309 | { | ||
293 | GNUNET_STATISTICS_destroy(statistics, GNUNET_YES); | 310 | GNUNET_STATISTICS_destroy(statistics, GNUNET_YES); |
311 | statistics = NULL; | ||
312 | } | ||
313 | if(NULL != peerstore) | ||
314 | { | ||
315 | GNUNET_PEERSTORE_disconnect(peerstore); | ||
316 | peerstore = NULL; | ||
317 | } | ||
294 | GNUNET_SCHEDULER_shutdown(); | 318 | GNUNET_SCHEDULER_shutdown(); |
295 | } | 319 | } |
296 | 320 | ||
@@ -816,8 +840,21 @@ int sensor_statistics_iterator (void *cls, | |||
816 | int is_persistent) | 840 | int is_persistent) |
817 | { | 841 | { |
818 | struct SensorInfo *sensorinfo = cls; | 842 | struct SensorInfo *sensorinfo = cls; |
843 | struct GNUNET_TIME_Absolute expiry; | ||
819 | 844 | ||
820 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %" PRIu64 "\n", sensorinfo->name, value); | 845 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %" PRIu64 "\n", sensorinfo->name, value); |
846 | //FIXME: store first line, last line or all ?? | ||
847 | expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval); | ||
848 | GNUNET_PEERSTORE_store(peerstore, | ||
849 | subsystem, | ||
850 | &peerid, | ||
851 | sensorinfo->name, | ||
852 | &value, | ||
853 | sizeof(value), | ||
854 | expiry, | ||
855 | GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, | ||
856 | NULL, | ||
857 | NULL); | ||
821 | return GNUNET_OK; | 858 | return GNUNET_OK; |
822 | } | 859 | } |
823 | 860 | ||
@@ -845,6 +882,7 @@ void end_sensor_run_stat (void *cls, int success) | |||
845 | void sensor_process_callback (void *cls, const char *line) | 882 | void sensor_process_callback (void *cls, const char *line) |
846 | { | 883 | { |
847 | struct SensorInfo *sensorinfo = cls; | 884 | struct SensorInfo *sensorinfo = cls; |
885 | struct GNUNET_TIME_Absolute expiry; | ||
848 | 886 | ||
849 | if(NULL == line) //end of output | 887 | if(NULL == line) //end of output |
850 | { | 888 | { |
@@ -854,6 +892,18 @@ void sensor_process_callback (void *cls, const char *line) | |||
854 | return; | 892 | return; |
855 | } | 893 | } |
856 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %s\n", sensorinfo->name, line); | 894 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %s\n", sensorinfo->name, line); |
895 | //FIXME: store first line, last line or all ?? | ||
896 | expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval); | ||
897 | GNUNET_PEERSTORE_store(peerstore, | ||
898 | subsystem, | ||
899 | &peerid, | ||
900 | sensorinfo->name, | ||
901 | line, | ||
902 | strlen(line) + 1, | ||
903 | expiry, | ||
904 | GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, | ||
905 | NULL, | ||
906 | NULL); | ||
857 | } | 907 | } |
858 | 908 | ||
859 | /** | 909 | /** |
@@ -903,10 +953,6 @@ sensor_run (void *cls, | |||
903 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Starting the execution of sensor `%s'\n", sensorinfo->name); | 953 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Starting the execution of sensor `%s'\n", sensorinfo->name); |
904 | if(sources[0] == sensorinfo->source) //gnunet-statistics | 954 | if(sources[0] == sensorinfo->source) //gnunet-statistics |
905 | { | 955 | { |
906 | if(NULL == statistics) | ||
907 | { | ||
908 | statistics = GNUNET_STATISTICS_create("sensor", cfg); | ||
909 | } | ||
910 | sensorinfo->gnunet_stat_get_handle = GNUNET_STATISTICS_get(statistics, | 956 | sensorinfo->gnunet_stat_get_handle = GNUNET_STATISTICS_get(statistics, |
911 | sensorinfo->gnunet_stat_service, | 957 | sensorinfo->gnunet_stat_service, |
912 | sensorinfo->gnunet_stat_name, | 958 | sensorinfo->gnunet_stat_name, |
@@ -1032,6 +1078,9 @@ run (void *cls, | |||
1032 | sensors = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); | 1078 | sensors = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); |
1033 | reload_sensors(); | 1079 | reload_sensors(); |
1034 | schedule_all_sensors(); | 1080 | schedule_all_sensors(); |
1081 | statistics = GNUNET_STATISTICS_create("sensor", cfg); | ||
1082 | GNUNET_CRYPTO_get_peer_identity(cfg, &peerid); | ||
1083 | peerstore = GNUNET_PEERSTORE_connect(cfg); | ||
1035 | GNUNET_SERVER_add_handlers (server, handlers); | 1084 | GNUNET_SERVER_add_handlers (server, handlers); |
1036 | GNUNET_SERVER_disconnect_notify (server, | 1085 | GNUNET_SERVER_disconnect_notify (server, |
1037 | &handle_client_disconnect, | 1086 | &handle_client_disconnect, |