aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-06-06 11:55:14 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-06-06 11:55:14 +0000
commitbdfda4dd17faf721d82bfee411dcaf2777012a9f (patch)
treeda228559f7992fe39e29e7cd4b908ec89a8d5e7e /src
parentf92f3f8a717f1a3b332bec4f162ad8da237f2e89 (diff)
downloadgnunet-bdfda4dd17faf721d82bfee411dcaf2777012a9f.tar.gz
gnunet-bdfda4dd17faf721d82bfee411dcaf2777012a9f.zip
using PEERSTORE in SENSOR + fixes in PEERSTORE
Diffstat (limited to 'src')
-rw-r--r--src/peerstore/gnunet-service-peerstore.c4
-rw-r--r--src/peerstore/peerstore_api.c89
-rw-r--r--src/sensor/Makefile.am1
-rw-r--r--src/sensor/gnunet-service-sensor.c57
4 files changed, 140 insertions, 11 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index c620bd57b..140db80d8 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -32,7 +32,7 @@
32/** 32/**
33 * Interval for expired records cleanup (in seconds) 33 * Interval for expired records cleanup (in seconds)
34 */ 34 */
35#define CLEANUP_INTERVAL 300 /* 5mins */ 35#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
36 36
37/** 37/**
38 * Our configuration. 38 * Our configuration.
@@ -96,7 +96,7 @@ cleanup_expired_records(void *cls,
96 deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get()); 96 deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get());
97 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted); 97 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted);
98 GNUNET_SCHEDULER_add_delayed( 98 GNUNET_SCHEDULER_add_delayed(
99 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, CLEANUP_INTERVAL), 99 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL),
100 &cleanup_expired_records, NULL); 100 &cleanup_expired_records, NULL);
101} 101}
102 102
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 2b1cc6a1d..b53bc2f1a 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -245,6 +245,27 @@ static void
245reconnect (struct GNUNET_PEERSTORE_Handle *h); 245reconnect (struct GNUNET_PEERSTORE_Handle *h);
246 246
247/** 247/**
248 * Callback after MQ envelope is sent
249 *
250 * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
251 */
252void watch_request_sent (void *cls);
253
254/**
255 * Callback after MQ envelope is sent
256 *
257 * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
258 */
259void iterate_request_sent (void *cls);
260
261/**
262 * Callback after MQ envelope is sent
263 *
264 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
265 */
266void store_request_sent (void *cls);
267
268/**
248 * MQ message handlers 269 * MQ message handlers
249 */ 270 */
250static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 271static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
@@ -268,6 +289,28 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
268} 289}
269 290
270/** 291/**
292 * Iterator over previous watches to resend them
293 */
294int rewatch_it(void *cls,
295 const struct GNUNET_HashCode *key,
296 void *value)
297{
298 struct GNUNET_PEERSTORE_Handle *h = cls;
299 struct GNUNET_PEERSTORE_WatchContext *wc = value;
300 struct StoreKeyHashMessage *hm;
301
302 if(GNUNET_YES == wc->request_sent)
303 { /* Envelope gone, create new one. */
304 wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
305 hm->keyhash = wc->keyhash;
306 wc->request_sent = GNUNET_NO;
307 }
308 GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
309 GNUNET_MQ_send(h->mq, wc->ev);
310 return GNUNET_YES;
311}
312
313/**
271 * Close the existing connection to PEERSTORE and reconnect. 314 * Close the existing connection to PEERSTORE and reconnect.
272 * 315 *
273 * @param h handle to the service 316 * @param h handle to the service
@@ -275,6 +318,11 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
275static void 318static void
276reconnect (struct GNUNET_PEERSTORE_Handle *h) 319reconnect (struct GNUNET_PEERSTORE_Handle *h)
277{ 320{
321 struct GNUNET_PEERSTORE_IterateContext *ic;
322 GNUNET_PEERSTORE_Processor icb;
323 void *icb_cls;
324 struct GNUNET_PEERSTORE_StoreContext *sc;
325
278 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); 326 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
279 if (NULL != h->mq) 327 if (NULL != h->mq)
280 { 328 {
@@ -287,13 +335,43 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
287 h->client = NULL; 335 h->client = NULL;
288 } 336 }
289 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); 337 h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
290 //FIXME: retry connecting if fails again (client == NULL) 338 GNUNET_assert(NULL != h->client);
291 h->mq = GNUNET_MQ_queue_for_connection_client(h->client, 339 h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
292 mq_handlers, 340 mq_handlers,
293 &handle_client_error, 341 &handle_client_error,
294 h); 342 h);
295 //FIXME: resend pending requests after reconnecting 343 LOG(GNUNET_ERROR_TYPE_DEBUG,
296 344 "Resending pending requests after reconnect.\n");
345 if (NULL != h->watches)
346 {
347 GNUNET_CONTAINER_multihashmap_iterate(h->watches,
348 &rewatch_it, h);
349 }
350 ic = h->iterate_head;
351 while (NULL != ic)
352 {
353 if (GNUNET_YES == ic->request_sent)
354 {
355 icb = ic->callback;
356 icb_cls = ic->callback_cls;
357 GNUNET_PEERSTORE_iterate_cancel(ic);
358 if(NULL != icb)
359 icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
360 }
361 else
362 {
363 GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
364 GNUNET_MQ_send(h->mq, ic->ev);
365 }
366 ic = ic->next;
367 }
368 sc = h->store_head;
369 while (NULL != sc)
370 {
371 GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
372 GNUNET_MQ_send(h->mq, sc->ev);
373 sc = sc->next;
374 }
297} 375}
298 376
299/** 377/**
@@ -336,6 +414,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
336void 414void
337GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) 415GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
338{ 416{
417 LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
339 if(NULL != h->watches) 418 if(NULL != h->watches)
340 { 419 {
341 GNUNET_CONTAINER_multihashmap_destroy(h->watches); 420 GNUNET_CONTAINER_multihashmap_destroy(h->watches);
@@ -442,7 +521,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
442 sc->cont = cont; 521 sc->cont = cont;
443 sc->cont_cls = cont_cls; 522 sc->cont_cls = cont_cls;
444 sc->h = h; 523 sc->h = h;
445 GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); 524 GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
446 GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); 525 GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
447 GNUNET_MQ_send(h->mq, ev); 526 GNUNET_MQ_send(h->mq, ev);
448 return sc; 527 return sc;
@@ -604,7 +683,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
604 ic->ev = ev; 683 ic->ev = ev;
605 ic->h = h; 684 ic->h = h;
606 ic->request_sent = GNUNET_NO; 685 ic->request_sent = GNUNET_NO;
607 GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic); 686 GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
608 LOG(GNUNET_ERROR_TYPE_DEBUG, 687 LOG(GNUNET_ERROR_TYPE_DEBUG,
609 "Sending an iterate request for sub system `%s'\n", sub_system); 688 "Sending an iterate request for sub system `%s'\n", sub_system);
610 GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic); 689 GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
diff --git a/src/sensor/Makefile.am b/src/sensor/Makefile.am
index e3d9bbe69..b6c55f5e6 100644
--- a/src/sensor/Makefile.am
+++ b/src/sensor/Makefile.am
@@ -36,6 +36,7 @@ gnunet_service_sensor_SOURCES = \
36gnunet_service_sensor_LDADD = \ 36gnunet_service_sensor_LDADD = \
37 $(top_builddir)/src/util/libgnunetutil.la \ 37 $(top_builddir)/src/util/libgnunetutil.la \
38 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 38 $(top_builddir)/src/statistics/libgnunetstatistics.la \
39 $(top_builddir)/src/peerstore/libgnunetpeerstore.la \
39 $(GN_LIBINTL) 40 $(GN_LIBINTL)
40 41
41libgnunetsensor_la_SOURCES = \ 42libgnunetsensor_la_SOURCES = \
diff --git a/src/sensor/gnunet-service-sensor.c b/src/sensor/gnunet-service-sensor.c
index 6d47cb1e2..b42562ace 100644
--- a/src/sensor/gnunet-service-sensor.c
+++ b/src/sensor/gnunet-service-sensor.c
@@ -28,6 +28,7 @@
28#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
29#include "sensor.h" 29#include "sensor.h"
30#include "gnunet_statistics_service.h" 30#include "gnunet_statistics_service.h"
31#include "gnunet_peerstore_service.h"
31 32
32/** 33/**
33 * Minimum sensor execution interval (in seconds) 34 * Minimum sensor execution interval (in seconds)
@@ -204,6 +205,21 @@ static const char *datatypes[] = { "uint64", "double", "string", NULL };
204struct GNUNET_STATISTICS_Handle *statistics; 205struct GNUNET_STATISTICS_Handle *statistics;
205 206
206/** 207/**
208 * Handle to peerstore service
209 */
210struct GNUNET_PEERSTORE_Handle *peerstore;
211
212/**
213 * Service name
214 */
215char *subsystem = "sensor";
216
217/**
218 * My peer id
219 */
220struct GNUNET_PeerIdentity peerid;
221
222/**
207 * Remove sensor execution from scheduler 223 * Remove sensor execution from scheduler
208 * 224 *
209 * @param cls unused 225 * @param cls unused
@@ -290,7 +306,15 @@ shutdown_task (void *cls,
290 GNUNET_CONTAINER_multihashmap_iterate(sensors, &destroy_sensor, NULL); 306 GNUNET_CONTAINER_multihashmap_iterate(sensors, &destroy_sensor, NULL);
291 GNUNET_CONTAINER_multihashmap_destroy(sensors); 307 GNUNET_CONTAINER_multihashmap_destroy(sensors);
292 if(NULL != statistics) 308 if(NULL != statistics)
309 {
293 GNUNET_STATISTICS_destroy(statistics, GNUNET_YES); 310 GNUNET_STATISTICS_destroy(statistics, GNUNET_YES);
311 statistics = NULL;
312 }
313 if(NULL != peerstore)
314 {
315 GNUNET_PEERSTORE_disconnect(peerstore);
316 peerstore = NULL;
317 }
294 GNUNET_SCHEDULER_shutdown(); 318 GNUNET_SCHEDULER_shutdown();
295} 319}
296 320
@@ -816,8 +840,21 @@ int sensor_statistics_iterator (void *cls,
816 int is_persistent) 840 int is_persistent)
817{ 841{
818 struct SensorInfo *sensorinfo = cls; 842 struct SensorInfo *sensorinfo = cls;
843 struct GNUNET_TIME_Absolute expiry;
819 844
820 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %" PRIu64 "\n", sensorinfo->name, value); 845 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %" PRIu64 "\n", sensorinfo->name, value);
846 //FIXME: store first line, last line or all ??
847 expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval);
848 GNUNET_PEERSTORE_store(peerstore,
849 subsystem,
850 &peerid,
851 sensorinfo->name,
852 &value,
853 sizeof(value),
854 expiry,
855 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
856 NULL,
857 NULL);
821 return GNUNET_OK; 858 return GNUNET_OK;
822} 859}
823 860
@@ -845,6 +882,7 @@ void end_sensor_run_stat (void *cls, int success)
845void sensor_process_callback (void *cls, const char *line) 882void sensor_process_callback (void *cls, const char *line)
846{ 883{
847 struct SensorInfo *sensorinfo = cls; 884 struct SensorInfo *sensorinfo = cls;
885 struct GNUNET_TIME_Absolute expiry;
848 886
849 if(NULL == line) //end of output 887 if(NULL == line) //end of output
850 { 888 {
@@ -854,6 +892,18 @@ void sensor_process_callback (void *cls, const char *line)
854 return; 892 return;
855 } 893 }
856 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %s\n", sensorinfo->name, line); 894 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %s\n", sensorinfo->name, line);
895 //FIXME: store first line, last line or all ??
896 expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval);
897 GNUNET_PEERSTORE_store(peerstore,
898 subsystem,
899 &peerid,
900 sensorinfo->name,
901 line,
902 strlen(line) + 1,
903 expiry,
904 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
905 NULL,
906 NULL);
857} 907}
858 908
859/** 909/**
@@ -903,10 +953,6 @@ sensor_run (void *cls,
903 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Starting the execution of sensor `%s'\n", sensorinfo->name); 953 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Starting the execution of sensor `%s'\n", sensorinfo->name);
904 if(sources[0] == sensorinfo->source) //gnunet-statistics 954 if(sources[0] == sensorinfo->source) //gnunet-statistics
905 { 955 {
906 if(NULL == statistics)
907 {
908 statistics = GNUNET_STATISTICS_create("sensor", cfg);
909 }
910 sensorinfo->gnunet_stat_get_handle = GNUNET_STATISTICS_get(statistics, 956 sensorinfo->gnunet_stat_get_handle = GNUNET_STATISTICS_get(statistics,
911 sensorinfo->gnunet_stat_service, 957 sensorinfo->gnunet_stat_service,
912 sensorinfo->gnunet_stat_name, 958 sensorinfo->gnunet_stat_name,
@@ -1032,6 +1078,9 @@ run (void *cls,
1032 sensors = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); 1078 sensors = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
1033 reload_sensors(); 1079 reload_sensors();
1034 schedule_all_sensors(); 1080 schedule_all_sensors();
1081 statistics = GNUNET_STATISTICS_create("sensor", cfg);
1082 GNUNET_CRYPTO_get_peer_identity(cfg, &peerid);
1083 peerstore = GNUNET_PEERSTORE_connect(cfg);
1035 GNUNET_SERVER_add_handlers (server, handlers); 1084 GNUNET_SERVER_add_handlers (server, handlers);
1036 GNUNET_SERVER_disconnect_notify (server, 1085 GNUNET_SERVER_disconnect_notify (server,
1037 &handle_client_disconnect, 1086 &handle_client_disconnect,