aboutsummaryrefslogtreecommitdiff
path: root/src/sensor
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-08-07 18:24:04 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-08-07 18:24:04 +0000
commit605b0e88f6e64ac71a5afb08ce90c909a2807438 (patch)
tree9b713ebd15b1d3fa68f05ae6d62bebc899448f9a /src/sensor
parentd6e728b2b869ee521216deae75a767165d3e1001 (diff)
downloadgnunet-605b0e88f6e64ac71a5afb08ce90c909a2807438.tar.gz
gnunet-605b0e88f6e64ac71a5afb08ce90c909a2807438.zip
sensor: merged reporting module
Diffstat (limited to 'src/sensor')
-rw-r--r--src/sensor/Makefile.am7
-rw-r--r--src/sensor/gnunet-service-sensor.c6
-rw-r--r--src/sensor/gnunet-service-sensor_reporting.c976
-rw-r--r--src/sensor/gnunet-service-sensor_reporting_anomaly.c526
-rw-r--r--src/sensor/gnunet-service-sensor_reporting_value.c571
-rw-r--r--src/sensor/sensor.h23
-rw-r--r--src/sensor/test_sensors/test-sensor-statistics31
7 files changed, 1014 insertions, 1126 deletions
diff --git a/src/sensor/Makefile.am b/src/sensor/Makefile.am
index fb90513dd..6cc66c72e 100644
--- a/src/sensor/Makefile.am
+++ b/src/sensor/Makefile.am
@@ -37,8 +37,7 @@ gnunet_sensor_LDADD = \
37gnunet_service_sensor_SOURCES = \ 37gnunet_service_sensor_SOURCES = \
38 gnunet-service-sensor.c \ 38 gnunet-service-sensor.c \
39 gnunet-service-sensor_analysis.c \ 39 gnunet-service-sensor_analysis.c \
40 gnunet-service-sensor_reporting_value.c \ 40 gnunet-service-sensor_reporting.c \
41 gnunet-service-sensor_reporting_anomaly.c \
42 gnunet-service-sensor_update.c 41 gnunet-service-sensor_update.c
43gnunet_service_sensor_LDADD = \ 42gnunet_service_sensor_LDADD = \
44 libgnunetsensorutil.la \ 43 libgnunetsensorutil.la \
@@ -91,9 +90,9 @@ TESTS = $(check_PROGRAMS)
91endif 90endif
92 91
93test_sensor_api_SOURCES = \ 92test_sensor_api_SOURCES = \
94 test_sensor_api.c 93 test_sensor_api.c
95test_sensor_api_LDADD = \ 94test_sensor_api_LDADD = \
96 $(top_builddir)/src/util/libgnunetutil.la 95 $(top_builddir)/src/util/libgnunetutil.la
97 96
98pkgsensordir = sensors 97pkgsensordir = sensors
99 98
diff --git a/src/sensor/gnunet-service-sensor.c b/src/sensor/gnunet-service-sensor.c
index 4dc366cc1..bf88094ea 100644
--- a/src/sensor/gnunet-service-sensor.c
+++ b/src/sensor/gnunet-service-sensor.c
@@ -102,8 +102,7 @@ stop ()
102{ 102{
103 SENSOR_update_stop (); 103 SENSOR_update_stop ();
104 SENSOR_analysis_stop (); 104 SENSOR_analysis_stop ();
105 SENSOR_reporting_value_stop (); 105 SENSOR_reporting_stop ();
106 SENSOR_reporting_anomaly_stop ();
107 GNUNET_SENSOR_destroy_sensors (sensors); 106 GNUNET_SENSOR_destroy_sensors (sensors);
108} 107}
109 108
@@ -621,8 +620,7 @@ start ()
621{ 620{
622 sensors = GNUNET_SENSOR_load_all_sensors (sensor_dir); 621 sensors = GNUNET_SENSOR_load_all_sensors (sensor_dir);
623 schedule_all_sensors (); 622 schedule_all_sensors ();
624 SENSOR_reporting_value_start (cfg, sensors); 623 SENSOR_reporting_start (cfg, sensors);
625 SENSOR_reporting_anomaly_start (cfg, sensors);
626 SENSOR_analysis_start (cfg, sensors); 624 SENSOR_analysis_start (cfg, sensors);
627 SENSOR_update_start (cfg, sensors, &reset); 625 SENSOR_update_start (cfg, sensors, &reset);
628} 626}
diff --git a/src/sensor/gnunet-service-sensor_reporting.c b/src/sensor/gnunet-service-sensor_reporting.c
new file mode 100644
index 000000000..15edd149a
--- /dev/null
+++ b/src/sensor/gnunet-service-sensor_reporting.c
@@ -0,0 +1,976 @@
1/*
2 This file is part of GNUnet.
3 (C)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file sensor/gnunet-service-sensor_reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "sensor.h"
29#include "gnunet_peerstore_service.h"
30#include "gnunet_core_service.h"
31#include "gnunet_cadet_service.h"
32#include "gnunet_applications.h"
33
34#define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36
37struct AnomalyInfo
38{
39
40 /**
41 * DLL
42 */
43 struct AnomalyInfo *prev;
44
45 /**
46 * DLL
47 */
48 struct AnomalyInfo *next;
49
50 /**
51 * Sensor information
52 */
53 struct GNUNET_SENSOR_SensorInfo *sensor;
54
55 /**
56 * Current anomalous status of sensor
57 */
58 int anomalous;
59
60 /**
61 * List of peers that reported an anomaly for this sensor
62 */
63 struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
64
65};
66
67struct ValueInfo
68{
69
70 /**
71 * DLL
72 */
73 struct ValueInfo *prev;
74
75 /**
76 * DLL
77 */
78 struct ValueInfo *next;
79
80 /**
81 * Sensor information
82 */
83 struct GNUNET_SENSOR_SensorInfo *sensor;
84
85 /**
86 * Last value read from sensor
87 */
88 void *last_value;
89
90 /**
91 * Size of @e last_value
92 */
93 size_t last_value_size;
94
95 /**
96 * Timestamp of last value reading
97 */
98 struct GNUNET_TIME_Absolute last_value_timestamp;
99
100 /**
101 * Has the last value seen already been reported to collection point?
102 */
103 int last_value_reported;
104
105 /**
106 * Watcher of sensor values
107 */
108 struct GNUNET_PEERSTORE_WatchContext *wc;
109
110 /**
111 * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
112 */
113 GNUNET_SCHEDULER_TaskIdentifier reporting_task;
114
115};
116
117/**
118 * Information about a connected CORE peer.
119 * Note that we only know about a connected peer if it is running the same
120 * application (sensor anomaly reporting) as us.
121 */
122struct CorePeer
123{
124
125 /**
126 * DLL
127 */
128 struct CorePeer *prev;
129
130 /**
131 * DLL
132 */
133 struct CorePeer *next;
134
135 /**
136 * Peer identity of connected peer
137 */
138 struct GNUNET_PeerIdentity *peer_id;
139
140 /**
141 * Message queue for messages to be sent to this peer
142 */
143 struct GNUNET_MQ_Handle *mq;
144
145};
146
147/**
148 * Information about a connected CADET peer (collection point).
149 */
150struct CadetPeer
151{
152
153 /**
154 * DLL
155 */
156 struct CadetPeer *prev;
157
158 /**
159 * DLL
160 */
161 struct CadetPeer *next;
162
163 /**
164 * Peer Identity
165 */
166 struct GNUNET_PeerIdentity peer_id;
167
168 /**
169 * CADET channel handle
170 */
171 struct GNUNET_CADET_Channel *channel;
172
173 /**
174 * Message queue for messages to be sent to this peer
175 */
176 struct GNUNET_MQ_Handle *mq;
177
178 /**
179 * Are we currently destroying the channel and its context?
180 */
181 int destroying;
182
183};
184
185
186/**
187 * Our configuration.
188 */
189static const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191/**
192 * Multihashmap of loaded sensors
193 */
194static struct GNUNET_CONTAINER_MultiHashMap *sensors;
195
196/**
197 * Handle to peerstore service
198 */
199static struct GNUNET_PEERSTORE_Handle *peerstore;
200
201/**
202 * Handle to core service
203 */
204static struct GNUNET_CORE_Handle *core;
205
206/**
207 * Handle to CADET service
208 */
209static struct GNUNET_CADET_Handle *cadet;
210
211/**
212 * My peer id
213 */
214static struct GNUNET_PeerIdentity mypeerid;
215
216/**
217 * Head of DLL of anomaly info structs
218 */
219static struct AnomalyInfo *ai_head;
220
221/**
222 * Tail of DLL of anomaly info structs
223 */
224static struct AnomalyInfo *ai_tail;
225
226/**
227 * Head of DLL of value info structs
228 */
229static struct ValueInfo *vi_head;
230
231/**
232 * Tail of DLL of value info structs
233 */
234static struct ValueInfo *vi_tail;
235
236/**
237 * Head of DLL of CORE peers
238 */
239static struct CorePeer *corep_head;
240
241/**
242 * Tail of DLL of CORE peers
243 */
244static struct CorePeer *corep_tail;
245
246/**
247 * Head of DLL of CADET peers
248 */
249static struct CadetPeer *cadetp_head;
250
251/**
252 * Tail of DLL of CADET peers
253 */
254static struct CadetPeer *cadetp_tail;
255
256/**
257 * Is the module started?
258 */
259static int module_running = GNUNET_NO;
260
261/**
262 * Number of known neighborhood peers
263 */
264static int neighborhood;
265
266
267
268/******************************************************************************/
269/****************************** CLEANUP ******************************/
270/******************************************************************************/
271
272/**
273 * Destroy anomaly info struct
274 *
275 * @param ai struct to destroy
276 */
277static void
278destroy_anomaly_info (struct AnomalyInfo *ai)
279{
280 if (NULL != ai->anomalous_neighbors)
281 GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
282 GNUNET_free (ai);
283}
284
285
286/**
287 * Destroy value info struct
288 *
289 * @param vi struct to destroy
290 */
291static void
292destroy_value_info (struct ValueInfo *vi)
293{
294 if (NULL != vi->wc)
295 {
296 GNUNET_PEERSTORE_watch_cancel (vi->wc);
297 vi->wc = NULL;
298 }
299 if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
300 {
301 GNUNET_SCHEDULER_cancel (vi->reporting_task);
302 vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
303 }
304 if (NULL != vi->last_value)
305 {
306 GNUNET_free (vi->last_value);
307 vi->last_value = NULL;
308 }
309 GNUNET_free (vi);
310}
311
312
313/**
314 * Destroy core peer struct
315 *
316 * @param corep struct to destroy
317 */
318static void
319destroy_core_peer (struct CorePeer *corep)
320{
321 struct AnomalyInfo *ai;
322
323 if (NULL != corep->mq)
324 {
325 GNUNET_MQ_destroy (corep->mq);
326 corep->mq = NULL;
327 }
328 ai = ai_head;
329 while (NULL != ai)
330 {
331 GNUNET_assert (NULL != ai->anomalous_neighbors);
332 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
333 corep->peer_id);
334 ai = ai->next;
335 }
336 GNUNET_free (corep);
337}
338
339
340/**
341 * Destroy cadet peer struct
342 *
343 * @param cadetp struct to destroy
344 */
345static void
346destroy_cadet_peer (struct CadetPeer *cadetp)
347{
348 cadetp->destroying = GNUNET_YES;
349 if (NULL != cadetp->mq)
350 {
351 GNUNET_MQ_destroy (cadetp->mq);
352 cadetp->mq = NULL;
353 }
354 if (NULL != cadetp->channel)
355 {
356 GNUNET_CADET_channel_destroy (cadetp->channel);
357 cadetp->channel = NULL;
358 }
359 GNUNET_free (cadetp);
360}
361
362
363/**
364 * Stop sensor reporting module
365 */
366void
367SENSOR_reporting_stop ()
368{
369 struct ValueInfo *vi;
370 struct CorePeer *corep;
371 struct AnomalyInfo *ai;
372 struct CadetPeer *cadetp;
373
374 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
375 module_running = GNUNET_NO;
376 neighborhood = 0;
377 /* Destroy value info's */
378 vi = vi_head;
379 while (NULL != vi)
380 {
381 GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
382 destroy_value_info (vi);
383 vi = vi_head;
384 }
385 /* Destroy core peers */
386 corep = corep_head;
387 while (NULL != corep)
388 {
389 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
390 destroy_core_peer (corep);
391 corep = corep_head;
392 }
393 /* Destroy anomaly info's */
394 ai = ai_head;
395 while (NULL != ai)
396 {
397 GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
398 destroy_anomaly_info (ai);
399 ai = ai_head;
400 }
401 /* Destroy cadet peers */
402 cadetp = cadetp_head;
403 while (NULL != cadetp)
404 {
405 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
406 destroy_cadet_peer (cadetp);
407 cadetp = cadetp_head;
408 }
409 /* Disconnect from other services */
410 if (NULL != core)
411 {
412 GNUNET_CORE_disconnect (core);
413 core = NULL;
414 }
415 if (NULL != peerstore)
416 {
417 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
418 peerstore = NULL;
419 }
420 if (NULL != cadet)
421 {
422 GNUNET_CADET_disconnect (cadet);
423 cadet = NULL;
424 }
425}
426
427
428/******************************************************************************/
429/****************************** HELPERS ******************************/
430/******************************************************************************/
431
432
433/**
434 * Gets the anomaly info struct related to the given sensor
435 *
436 * @param sensor Sensor to search by
437 */
438static struct AnomalyInfo *
439get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
440{
441 struct AnomalyInfo *ai;
442
443 ai = ai_head;
444 while (NULL != ai)
445 {
446 if (ai->sensor == sensor)
447 {
448 return ai;
449 }
450 ai = ai->next;
451 }
452 return NULL;
453}
454
455
456/**
457 * Returns context of a connected CADET peer.
458 * Creates it first if didn't exist before.
459 *
460 * @param pid Peer Identity
461 * @return Context of connected CADET peer
462 */
463static struct CadetPeer *
464get_cadet_peer (struct GNUNET_PeerIdentity pid)
465{
466 struct CadetPeer *cadetp;
467
468 cadetp = cadetp_head;
469 while (NULL != cadetp)
470 {
471 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
472 return cadetp;
473 cadetp = cadetp->next;
474 }
475 /* Not found, create struct and channel */
476 cadetp = GNUNET_new (struct CadetPeer);
477 cadetp->peer_id = pid;
478 cadetp->channel =
479 GNUNET_CADET_channel_create (cadet, cadetp, &pid,
480 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
481 GNUNET_CADET_OPTION_DEFAULT);
482 cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
483 GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
484 return cadetp;
485}
486
487
488/**
489 * Create an anomaly report message from a given anomaly info struct inside a
490 * MQ envelope.
491 *
492 * @param ai Anomaly info struct to use
493 * @return Envelope with message
494 */
495static struct GNUNET_MQ_Envelope *
496create_anomaly_report_message (struct AnomalyInfo *ai)
497{
498 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
499 struct GNUNET_MQ_Envelope *ev;
500
501 ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT);
502 GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
503 &arm->sensorname_hash);
504 arm->sensorversion_major = htons (ai->sensor->version_major);
505 arm->sensorversion_minor = htons (ai->sensor->version_minor);
506 arm->anomalous = htons (ai->anomalous);
507 arm->anomalous_neighbors =
508 ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) /
509 neighborhood;
510 return ev;
511}
512
513
514/**
515 * Create a sensor value message from a given value info struct inside a MQ
516 * envelope.
517 *
518 * @param vi Value info struct to use
519 * @return Envelope with message
520 */
521static struct GNUNET_MQ_Envelope *
522create_value_message (struct ValueInfo *vi)
523{
524 struct GNUNET_SENSOR_ValueMessage *vm;
525 struct GNUNET_MQ_Envelope *ev;
526
527 ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size, GNUNET_MESSAGE_TYPE_SENSOR_READING);
528 GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
529 &vm->sensorname_hash);
530 vm->sensorversion_major = htons (vi->sensor->version_major);
531 vm->sensorversion_minor = htons (vi->sensor->version_minor);
532 vm->timestamp = vi->last_value_timestamp;
533 vm->value_size = htons (vi->last_value_size);
534 memcpy (&vm[1], vi->last_value, vi->last_value_size);
535 return ev;
536}
537
538
539/**
540 * Send given anomaly info report by putting it in the given message queue.
541 *
542 * @param mq Message queue to put the message in
543 * @param ai Anomaly info to report
544 */
545static void
546send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai)
547{
548 struct GNUNET_MQ_Envelope *ev;
549
550 ev = create_anomaly_report_message (ai);
551 GNUNET_MQ_send (mq, ev);
552}
553
554
555/******************************************************************************/
556/*************************** CORE Handlers ***************************/
557/******************************************************************************/
558
559
560/**
561 * An inbound anomaly report is received from a peer through CORE.
562 *
563 * @param cls closure (unused)
564 * @param peer the other peer involved
565 * @param message the actual message
566 * @return #GNUNET_OK to keep the connection open,
567 * #GNUNET_SYSERR to close connection to the peer (signal serious error)
568 */
569static int
570handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
571 const struct GNUNET_MessageHeader *message)
572{
573 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
574 struct GNUNET_SENSOR_SensorInfo *sensor;
575 struct AnomalyInfo *ai;
576 struct CadetPeer *cadetp;
577 int peer_in_list;
578
579 arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
580 sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
581 if (NULL == sensor || sensor->version_major != arm->sensorversion_major ||
582 sensor->version_minor != arm->sensorversion_minor)
583 {
584 LOG (GNUNET_ERROR_TYPE_WARNING,
585 "I don't have the sensor reported by the peer `%s'.\n",
586 GNUNET_i2s (other));
587 return GNUNET_OK;
588 }
589 ai = get_anomaly_info_by_sensor (sensor);
590 GNUNET_assert (NULL != ai);
591 peer_in_list =
592 GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other);
593 if (GNUNET_YES == ai->anomalous)
594 {
595 if (GNUNET_YES == peer_in_list)
596 GNUNET_break_op (0);
597 else
598 GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL,
599 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
600 }
601 else
602 {
603 if (GNUNET_NO == peer_in_list)
604 GNUNET_break_op (0);
605 else
606 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other);
607 }
608 /* Send anomaly update to collection point */
609 if (NULL != ai->sensor->collection_point &&
610 GNUNET_YES == ai->sensor->report_anomalies)
611 {
612 cadetp = get_cadet_peer (*ai->sensor->collection_point);
613 send_anomaly_report (cadetp->mq, ai);
614 }
615 return GNUNET_OK;
616}
617
618
619/******************************************************************************/
620/************************ PEERSTORE callbacks ************************/
621/******************************************************************************/
622
623
624/**
625 * Sensor value watch callback
626 *
627 * @param cls Closure, ValueInfo struct related to the sensor we are watching
628 * @param record PEERSTORE new record, NULL if error
629 * @param emsg Error message, NULL if no error
630 * @return GNUNET_YES to continue watching
631 */
632static int
633value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
634{
635 struct ValueInfo *vi = cls;
636
637 if (NULL != emsg)
638 {
639 LOG (GNUNET_ERROR_TYPE_ERROR,
640 _("PEERSTORE error: %s.\n"), emsg);
641 return GNUNET_YES;
642 }
643 if (NULL != vi->last_value)
644 {
645 GNUNET_free (vi->last_value);
646 vi->last_value_size = 0;
647 }
648 vi->last_value = GNUNET_memdup (record->value, record->value_size);
649 vi->last_value_size = record->value_size;
650 vi->last_value_timestamp = GNUNET_TIME_absolute_get();
651 vi->last_value_reported = GNUNET_NO;
652 return GNUNET_YES;
653}
654
655
656/******************************************************************************/
657/************************** CORE callbacks ***************************/
658/******************************************************************************/
659
660
661/**
662 * Method called whenever a CORE peer disconnects.
663 *
664 * @param cls closure (unused)
665 * @param peer peer identity this notification is about
666 */
667static void
668core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
669{
670 struct CorePeer *corep;
671
672 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
673 return;
674 neighborhood--;
675 corep = corep_head;
676 while (NULL != corep)
677 {
678 if (peer == corep->peer_id)
679 {
680 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
681 destroy_core_peer (corep);
682 return;
683 }
684 corep = corep->next;
685 }
686 LOG (GNUNET_ERROR_TYPE_ERROR,
687 _("Received disconnect notification from CORE"
688 " for a peer we didn't know about.\n"));
689}
690
691
692/**
693 * Method called whenever a given peer connects through CORE.
694 *
695 * @param cls closure (unused)
696 * @param peer peer identity this notification is about
697 */
698static void
699core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
700{
701 struct CorePeer *corep;
702 struct AnomalyInfo *ai;
703
704 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
705 return;
706 neighborhood++;
707 corep = GNUNET_new (struct CorePeer);
708 corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
709 corep->mq = GNUNET_CORE_mq_create (core, peer);
710 GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
711 /* Send any locally anomalous sensors to the new peer */
712 ai = ai_head;
713 while (NULL != ai)
714 {
715 if (GNUNET_YES == ai->anomalous)
716 send_anomaly_report (corep->mq, ai);
717 ai = ai->next;
718 }
719}
720
721
722/**
723 * Function called after #GNUNET_CORE_connect has succeeded (or failed
724 * for good). Note that the private key of the peer is intentionally
725 * not exposed here; if you need it, your process should try to read
726 * the private key file directly (which should work if you are
727 * authorized...). Implementations of this function must not call
728 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
729 * do this later).
730 *
731 * @param cls closure (unused)
732 * @param my_identity ID of this peer, NULL if we failed
733 */
734static void
735core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
736{
737 if (NULL == my_identity)
738 {
739 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
740 SENSOR_reporting_stop ();
741 return;
742 }
743 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
744 {
745 LOG (GNUNET_ERROR_TYPE_ERROR,
746 _("Peer identity received from CORE init doesn't match ours.\n"));
747 SENSOR_reporting_stop ();
748 return;
749 }
750}
751
752
753/******************************************************************************/
754/************************* CADET callbacks ***************************/
755/******************************************************************************/
756
757/**
758 * Function called whenever a channel is destroyed. Should clean up
759 * any associated state.
760 *
761 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
762 *
763 * @param cls closure (set from #GNUNET_CADET_connect)
764 * @param channel connection to the other end (henceforth invalid)
765 * @param channel_ctx place where local state associated
766 * with the channel is stored
767 */
768static void
769cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
770 void *channel_ctx)
771{
772 struct CadetPeer *cadetp = channel_ctx;
773
774 if (GNUNET_YES == cadetp->destroying)
775 return;
776 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
777 cadetp->channel = NULL;
778 destroy_cadet_peer (cadetp);
779}
780
781
782/******************************************************************************/
783/********************** Local anomaly receiver ***********************/
784/******************************************************************************/
785
786
787/**
788 * Used by the analysis module to tell the reporting module about a change in
789 * the anomaly status of a sensor.
790 *
791 * @param sensor Related sensor
792 * @param anomalous The new sensor anomalous status
793 */
794void
795SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
796 int anomalous)
797{
798 struct AnomalyInfo *ai;
799 struct CorePeer *corep;
800 struct CadetPeer *cadetp;
801
802 if (GNUNET_NO == module_running)
803 return;
804 ai = get_anomaly_info_by_sensor (sensor);
805 GNUNET_assert (NULL != ai);
806 ai->anomalous = anomalous;
807 /* Report change to all neighbors */
808 corep = corep_head;
809 while (NULL != corep)
810 {
811 send_anomaly_report (corep->mq, ai);
812 corep = corep->next;
813 }
814 if (NULL != ai->sensor->collection_point &&
815 GNUNET_YES == ai->sensor->report_anomalies)
816 {
817 cadetp = get_cadet_peer (*ai->sensor->collection_point);
818 send_anomaly_report (cadetp->mq, ai);
819 }
820}
821
822
823/******************************************************************************/
824/******************* Reporting values (periodic) *********************/
825/******************************************************************************/
826
827
828/**
829 * Task scheduled to send values to collection point
830 *
831 * @param cls closure, a `struct ValueReportingContext *`
832 * @param tc unused
833 */
834static void
835report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
836{
837 struct ValueInfo *vi = cls;
838 struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
839 struct CadetPeer *cadetp;
840 struct GNUNET_MQ_Envelope *ev;
841
842 vi->reporting_task =
843 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
844 &report_value, vi);
845 if (0 == vi->last_value_size ||
846 GNUNET_YES == vi->last_value_reported)
847 {
848 LOG (GNUNET_ERROR_TYPE_WARNING,
849 "Did not receive a fresh value from `%s' to report.\n",
850 sensor->name);
851 return;
852 }
853 LOG (GNUNET_ERROR_TYPE_DEBUG,
854 "Now trying to report last seen value of `%s' to collection point.\n",
855 sensor->name);
856 cadetp = get_cadet_peer (*sensor->collection_point);
857 ev = create_value_message (vi);
858 GNUNET_MQ_send (cadetp->mq, ev);
859 vi->last_value_reported = GNUNET_YES;
860}
861
862
863/******************************************************************************/
864/******************************** INIT *******************************/
865/******************************************************************************/
866
867
868/**
869 * Iterator for defined sensors and creates anomaly info context
870 *
871 * @param cls unused
872 * @param key unused
873 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
874 * @return #GNUNET_YES to continue iterations
875 */
876static int
877init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
878 void *value)
879{
880 struct GNUNET_SENSOR_SensorInfo *sensor = value;
881 struct AnomalyInfo *ai;
882 struct ValueInfo *vi;
883
884 /* Create sensor anomaly info context */
885 ai = GNUNET_new (struct AnomalyInfo);
886 ai->sensor = sensor;
887 ai->anomalous = GNUNET_NO;
888 ai->anomalous_neighbors =
889 GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
890 GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
891 /* Create sensor value info context (if needed to be reported) */
892 if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
893 return GNUNET_YES;
894 LOG (GNUNET_ERROR_TYPE_INFO,
895 "Reporting sensor `%s' values to collection point `%s' every %s.\n",
896 sensor->name, GNUNET_i2s_full (sensor->collection_point),
897 GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
898 GNUNET_YES));
899 vi = GNUNET_new (struct ValueInfo);
900 vi->sensor = sensor;
901 vi->last_value = NULL;
902 vi->last_value_size = 0;
903 vi->last_value_reported = GNUNET_NO;
904 vi->wc =
905 GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
906 &value_watch_cb, vi);
907 vi->reporting_task =
908 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
909 &report_value, vi);
910 GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
911 return GNUNET_YES;
912}
913
914
915/**
916 * Start the sensor anomaly reporting module
917 *
918 * @param c our service configuration
919 * @param s multihashmap of loaded sensors
920 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
921 */
922int
923SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
924 struct GNUNET_CONTAINER_MultiHashMap *s)
925{
926 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
927 {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT,
928 sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
929 {NULL, 0, 0}
930 };
931 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
932 {NULL, 0, 0}
933 };
934
935 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
936 GNUNET_assert (NULL != s);
937 sensors = s;
938 cfg = c;
939 /* Connect to PEERSTORE */
940 peerstore = GNUNET_PEERSTORE_connect (cfg);
941 if (NULL == peerstore)
942 {
943 LOG (GNUNET_ERROR_TYPE_ERROR,
944 _("Failed to connect to peerstore service.\n"));
945 SENSOR_reporting_stop ();
946 return GNUNET_SYSERR;
947 }
948 /* Connect to CORE */
949 core =
950 GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
951 &core_disconnect_cb, NULL, GNUNET_YES, NULL,
952 GNUNET_YES, core_handlers);
953 if (NULL == core)
954 {
955 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
956 SENSOR_reporting_stop ();
957 return GNUNET_SYSERR;
958 }
959 /* Connect to CADET */
960 cadet =
961 GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
962 cadet_handlers, NULL);
963 if (NULL == cadet)
964 {
965 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
966 SENSOR_reporting_stop ();
967 return GNUNET_SYSERR;
968 }
969 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
970 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
971 neighborhood = 0;
972 module_running = GNUNET_YES;
973 return GNUNET_OK;
974}
975
976/* end of gnunet-service-sensor_reporting.c */
diff --git a/src/sensor/gnunet-service-sensor_reporting_anomaly.c b/src/sensor/gnunet-service-sensor_reporting_anomaly.c
deleted file mode 100644
index ff07c2e52..000000000
--- a/src/sensor/gnunet-service-sensor_reporting_anomaly.c
+++ /dev/null
@@ -1,526 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file sensor/gnunet-service-sensor_reporting_anomaly.c
23 * @brief sensor service anomaly reporting functionality
24 * @author Omar Tarabai
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "sensor.h"
29#include "gnunet_peerstore_service.h"
30#include "gnunet_core_service.h"
31
32#define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting-anomaly",__VA_ARGS__)
33
34struct AnomalyInfo
35{
36
37 /**
38 * DLL
39 */
40 struct AnomalyInfo *prev;
41
42 /**
43 * DLL
44 */
45 struct AnomalyInfo *next;
46
47 /**
48 * Sensor information
49 */
50 struct GNUNET_SENSOR_SensorInfo *sensor;
51
52 /**
53 * Current anomalous status of sensor
54 */
55 int anomalous;
56
57 /**
58 * List of peers that reported an anomaly for this sensor
59 */
60 struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
61
62};
63
64/**
65 * Information about a connected CORE peer.
66 * Note that we only know about a connected peer if it is running the same
67 * application (sensor anomaly reporting) as us.
68 */
69struct CorePeer
70{
71
72 /**
73 * DLL
74 */
75 struct CorePeer *prev;
76
77 /**
78 * DLL
79 */
80 struct CorePeer *next;
81
82 /**
83 * Peer identity of connected peer
84 */
85 struct GNUNET_PeerIdentity *peerid;
86
87 /**
88 * Message queue for messages to be sent to this peer
89 */
90 struct GNUNET_MQ_Handle *mq;
91
92};
93
94
95/**
96 * Our configuration.
97 */
98static const struct GNUNET_CONFIGURATION_Handle *cfg;
99
100/**
101 * Multihashmap of loaded sensors
102 */
103static struct GNUNET_CONTAINER_MultiHashMap *sensors;
104
105/**
106 * Handle to core service
107 */
108static struct GNUNET_CORE_Handle *core;
109
110/**
111 * My peer id
112 */
113static struct GNUNET_PeerIdentity mypeerid;
114
115/**
116 * Head of DLL of anomaly info structs
117 */
118static struct AnomalyInfo *ai_head;
119
120/**
121 * Tail of DLL of anomaly info structs
122 */
123static struct AnomalyInfo *ai_tail;
124
125/**
126 * Head of DLL of CORE peers
127 */
128static struct CorePeer *cp_head;
129
130/**
131 * Tail of DLL of CORE peers
132 */
133static struct CorePeer *cp_tail;
134
135/**
136 * Is the module started?
137 */
138static int module_running = GNUNET_NO;
139
140/**
141 * Number of known neighborhood peers
142 */
143static int neighborhood;
144
145
146/**
147 * Destroy anomaly info struct
148 *
149 * @param ai struct to destroy
150 */
151static void
152destroy_anomaly_info (struct AnomalyInfo *ai)
153{
154 if (NULL != ai->anomalous_neighbors)
155 GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
156 GNUNET_free (ai);
157}
158
159
160/**
161 * Destroy core peer struct
162 *
163 * @param cp struct to destroy
164 */
165static void
166destroy_core_peer (struct CorePeer *cp)
167{
168 struct AnomalyInfo *ai;
169
170 if (NULL != cp->mq)
171 {
172 GNUNET_MQ_destroy (cp->mq);
173 cp->mq = NULL;
174 }
175 ai = ai_head;
176 while (NULL != ai)
177 {
178 GNUNET_assert (NULL != ai->anomalous_neighbors);
179 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
180 cp->peerid);
181 ai = ai->next;
182 }
183 GNUNET_free (cp);
184}
185
186
187/**
188 * Stop sensor anomaly reporting module
189 */
190void
191SENSOR_reporting_anomaly_stop ()
192{
193 struct AnomalyInfo *ai;
194 struct CorePeer *cp;
195
196 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
197 module_running = GNUNET_NO;
198 ai = ai_head;
199 while (NULL != ai)
200 {
201 GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
202 destroy_anomaly_info (ai);
203 ai = ai_head;
204 }
205 cp = cp_head;
206 while (NULL != cp)
207 {
208 GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp);
209 destroy_core_peer (cp);
210 cp = cp_head;
211 }
212 neighborhood = 0;
213 if (NULL != core)
214 {
215 GNUNET_CORE_disconnect (core);
216 core = NULL;
217 }
218}
219
220
221/**
222 * Gets the anomaly info struct related to the given sensor
223 *
224 * @param sensor Sensor to search by
225 */
226static struct AnomalyInfo *
227get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
228{
229 struct AnomalyInfo *ai;
230
231 ai = ai_head;
232 while (NULL != ai)
233 {
234 if (ai->sensor == sensor)
235 {
236 return ai;
237 }
238 ai = ai->next;
239 }
240 return NULL;
241}
242
243
244/**
245 * Create an anomaly report message from a given anomaly info structb inside an
246 * MQ envelope.
247 *
248 * @param ai Anomaly info struct to use
249 * @return
250 */
251static struct GNUNET_MQ_Envelope *
252create_anomaly_report_message (struct AnomalyInfo *ai)
253{
254 struct AnomalyReportMessage *arm;
255 struct GNUNET_MQ_Envelope *ev;
256
257 ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT);
258 GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
259 &arm->sensorname_hash);
260 arm->sensorversion_major = ai->sensor->version_major;
261 arm->sensorversion_minor = ai->sensor->version_minor;
262 arm->anomalous = ai->anomalous;
263 arm->anomalous_neighbors =
264 ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) /
265 neighborhood;
266 return ev;
267}
268
269
270/**
271 * Send given anomaly info report to given core peer.
272 *
273 * @param cp Core peer to send the report to
274 * @param ai Anomaly info to report
275 */
276static void
277send_anomaly_report (struct CorePeer *cp, struct AnomalyInfo *ai)
278{
279 struct GNUNET_MQ_Envelope *ev;
280
281 GNUNET_assert (NULL != cp->mq);
282 ev = create_anomaly_report_message (ai);
283 GNUNET_MQ_send (cp->mq, ev);
284}
285
286
287/**
288 * An inbound anomaly report is received from a peer through CORE.
289 *
290 * @param cls closure (unused)
291 * @param peer the other peer involved
292 * @param message the actual message
293 * @return #GNUNET_OK to keep the connection open,
294 * #GNUNET_SYSERR to close connection to the peer (signal serious error)
295 */
296static int
297handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
298 const struct GNUNET_MessageHeader *message)
299{
300 struct AnomalyReportMessage *arm;
301 struct GNUNET_SENSOR_SensorInfo *sensor;
302 struct AnomalyInfo *ai;
303 int peer_in_list;
304
305 arm = (struct AnomalyReportMessage *) message;
306 sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
307 if (NULL == sensor || sensor->version_major != arm->sensorversion_major ||
308 sensor->version_minor != arm->sensorversion_minor)
309 {
310 LOG (GNUNET_ERROR_TYPE_WARNING,
311 "I don't have the sensor reported by the peer `%s'.\n",
312 GNUNET_i2s (other));
313 return GNUNET_OK;
314 }
315 ai = get_anomaly_info_by_sensor (sensor);
316 GNUNET_assert (NULL != ai);
317 peer_in_list =
318 GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other);
319 if (GNUNET_YES == ai->anomalous)
320 {
321 if (GNUNET_YES == peer_in_list)
322 GNUNET_break_op (0);
323 else
324 GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL,
325 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
326 }
327 else
328 {
329 if (GNUNET_NO == peer_in_list)
330 GNUNET_break_op (0);
331 else
332 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other);
333 }
334 //TODO: report to collection point if anomalous neigbors jump up or down
335 // by a configurable percentage or is now 0% or 100%
336 return GNUNET_OK;
337}
338
339
340/**
341 * Method called whenever a CORE peer disconnects.
342 *
343 * @param cls closure (unused)
344 * @param peer peer identity this notification is about
345 */
346static void
347core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
348{
349 struct CorePeer *cp;
350
351 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
352 return;
353 neighborhood--;
354 cp = cp_head;
355 while (NULL != cp)
356 {
357 if (peer == cp->peerid)
358 {
359 GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp);
360 destroy_core_peer (cp);
361 return;
362 }
363 cp = cp->next;
364 }
365 LOG (GNUNET_ERROR_TYPE_ERROR,
366 _("Received disconnect notification from CORE"
367 " for a peer we didn't know about.\n"));
368}
369
370
371/**
372 * Method called whenever a given peer connects through CORE.
373 *
374 * @param cls closure (unused)
375 * @param peer peer identity this notification is about
376 */
377static void
378core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
379{
380 struct CorePeer *cp;
381 struct AnomalyInfo *ai;
382
383 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
384 return;
385 neighborhood++;
386 cp = GNUNET_new (struct CorePeer);
387 cp->peerid = (struct GNUNET_PeerIdentity *) peer;
388 cp->mq = GNUNET_CORE_mq_create (core, peer);
389 GNUNET_CONTAINER_DLL_insert (cp_head, cp_tail, cp);
390 /* Send any locally anomalous sensors to the new peer */
391 ai = ai_head;
392 while (NULL != ai)
393 {
394 if (GNUNET_YES == ai->anomalous)
395 send_anomaly_report (cp, ai);
396 ai = ai->next;
397 }
398}
399
400
401/**
402 * Function called after #GNUNET_CORE_connect has succeeded (or failed
403 * for good). Note that the private key of the peer is intentionally
404 * not exposed here; if you need it, your process should try to read
405 * the private key file directly (which should work if you are
406 * authorized...). Implementations of this function must not call
407 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
408 * do this later).
409 *
410 * @param cls closure (unused)
411 * @param my_identity ID of this peer, NULL if we failed
412 */
413static void
414core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
415{
416 if (NULL == my_identity)
417 {
418 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
419 SENSOR_reporting_anomaly_stop ();
420 return;
421 }
422 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
423 {
424 LOG (GNUNET_ERROR_TYPE_ERROR,
425 _("Peer identity received from CORE init doesn't match ours.\n"));
426 SENSOR_reporting_anomaly_stop ();
427 return;
428 }
429}
430
431
432/**
433 * Used by the analysis module to tell the reporting module about a change in
434 * the anomaly status of a sensor.
435 *
436 * @param sensor Related sensor
437 * @param anomalous The new sensor anomalous status
438 */
439void
440SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
441 int anomalous)
442{
443 struct AnomalyInfo *ai;
444 struct CorePeer *cp;
445
446 if (GNUNET_NO == module_running)
447 return;
448 ai = get_anomaly_info_by_sensor (sensor);
449 GNUNET_assert (NULL != ai);
450 ai->anomalous = anomalous;
451 /* Report change to all neighbors */
452 cp = cp_head;
453 while (NULL != cp)
454 {
455 send_anomaly_report (cp, ai);
456 cp = cp->next;
457 }
458 //TODO: report change to collection point if report_anomalies
459}
460
461
462/**
463 * Iterator for defined sensors and creates anomaly info context
464 *
465 * @param cls unused
466 * @param key unused
467 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
468 * @return #GNUNET_YES to continue iterations
469 */
470static int
471init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
472 void *value)
473{
474 struct GNUNET_SENSOR_SensorInfo *sensor = value;
475 struct AnomalyInfo *ai;
476
477 ai = GNUNET_new (struct AnomalyInfo);
478
479 ai->sensor = sensor;
480 ai->anomalous = GNUNET_NO;
481 ai->anomalous_neighbors =
482 GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
483 GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
484 return GNUNET_YES;
485}
486
487
488/**
489 * Start the sensor anomaly reporting module
490 *
491 * @param c our service configuration
492 * @param s multihashmap of loaded sensors
493 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
494 */
495int
496SENSOR_reporting_anomaly_start (const struct GNUNET_CONFIGURATION_Handle *c,
497 struct GNUNET_CONTAINER_MultiHashMap *s)
498{
499 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
500 {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT,
501 sizeof (struct AnomalyReportMessage)},
502 {NULL, 0, 0}
503 };
504
505 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor anomaly reporting module.\n");
506 GNUNET_assert (NULL != s);
507 sensors = s;
508 cfg = c;
509 core =
510 GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
511 &core_disconnect_cb, NULL, GNUNET_YES, NULL,
512 GNUNET_YES, core_handlers);
513 if (NULL == core)
514 {
515 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
516 SENSOR_reporting_anomaly_stop ();
517 return GNUNET_SYSERR;
518 }
519 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
520 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
521 neighborhood = 0;
522 module_running = GNUNET_YES;
523 return GNUNET_OK;
524}
525
526/* end of gnunet-service-sensor_reporting_anomaly.c */
diff --git a/src/sensor/gnunet-service-sensor_reporting_value.c b/src/sensor/gnunet-service-sensor_reporting_value.c
deleted file mode 100644
index 84fbd11ae..000000000
--- a/src/sensor/gnunet-service-sensor_reporting_value.c
+++ /dev/null
@@ -1,571 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file sensor/gnunet-service-sensor_reporting_value.c
23 * @brief sensor service value reporting functionality
24 * @author Omar Tarabai
25 */
26#include <inttypes.h>
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "sensor.h"
30#include "gnunet_peerstore_service.h"
31#include "gnunet_cadet_service.h"
32#include "gnunet_applications.h"
33
34#define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting-value",__VA_ARGS__)
35
36/**
37 * Retry interval (seconds) in case channel to collection point is busy
38 */
39#define COLLECTION_RETRY 1
40
41/**
42 * Context of reporting sensor values
43 */
44struct ValueReportingContext
45{
46
47 /**
48 * DLL
49 */
50 struct ValueReportingContext *prev;
51
52 /**
53 * DLL
54 */
55 struct ValueReportingContext *next;
56
57 /**
58 * Sensor information
59 */
60 struct GNUNET_SENSOR_SensorInfo *sensor;
61
62 /**
63 * Collection point reporting task
64 * (or #GNUNET_SCHEDULER_NO_TASK)
65 */
66 GNUNET_SCHEDULER_TaskIdentifier cp_task;
67
68 /**
69 * Watcher of sensor values
70 */
71 struct GNUNET_PEERSTORE_WatchContext *wc;
72
73 /**
74 * Last value read from sensor
75 */
76 void *last_value;
77
78 /**
79 * Size of @e last_value
80 */
81 size_t last_value_size;
82
83 /**
84 * Timestamp of last value reading
85 */
86 uint64_t timestamp;
87
88};
89
90/**
91 * Context of a created CADET channel
92 */
93struct CadetChannelContext
94{
95
96 /**
97 * DLL
98 */
99 struct CadetChannelContext *prev;
100
101 /**
102 * DLL
103 */
104 struct CadetChannelContext *next;
105
106 /**
107 * Peer Id of
108 */
109 struct GNUNET_PeerIdentity pid;
110
111 /**
112 * CADET channel handle
113 */
114 struct GNUNET_CADET_Channel *c;
115
116 /**
117 * Are we sending data on this channel?
118 * #GNUNET_YES / #GNUNET_NO
119 */
120 int sending;
121
122 /**
123 * Pointer to a pending message to be sent over the channel
124 */
125 void *pending_msg;
126
127 /**
128 * Size of @e pending_msg
129 */
130 size_t pending_msg_size;
131
132 /**
133 * Handle to CADET tranmission request in case we are sending
134 * (sending == #GNUNET_YES)
135 */
136 struct GNUNET_CADET_TransmitHandle *th;
137
138 /**
139 * Are we currently destroying the channel and its context?
140 */
141 int destroying;
142
143};
144
145/**
146 * Our configuration.
147 */
148static const struct GNUNET_CONFIGURATION_Handle *cfg;
149
150/**
151 * Handle to peerstore service
152 */
153static struct GNUNET_PEERSTORE_Handle *peerstore;
154
155/**
156 * My peer id
157 */
158static struct GNUNET_PeerIdentity mypeerid;
159
160/**
161 * Handle to CADET service
162 */
163static struct GNUNET_CADET_Handle *cadet;
164
165/**
166 * Head of DLL of all reporting contexts
167 */
168struct ValueReportingContext *vrc_head;
169
170/**
171 * Tail of DLL of all reporting contexts
172 */
173struct ValueReportingContext *vrc_tail;
174
175/**
176 * Head of DLL of all cadet channels
177 */
178struct CadetChannelContext *cc_head;
179
180/**
181 * Tail of DLL of all cadet channels
182 */
183struct CadetChannelContext *cc_tail;
184
185/**
186 * Destroy a reporting context structure
187 */
188static void
189destroy_value_reporting_context (struct ValueReportingContext *vrc)
190{
191 if (NULL != vrc->wc)
192 {
193 GNUNET_PEERSTORE_watch_cancel (vrc->wc);
194 vrc->wc = NULL;
195 }
196 if (GNUNET_SCHEDULER_NO_TASK != vrc->cp_task)
197 {
198 GNUNET_SCHEDULER_cancel (vrc->cp_task);
199 vrc->cp_task = GNUNET_SCHEDULER_NO_TASK;
200 }
201 if (NULL != vrc->last_value)
202 {
203 GNUNET_free (vrc->last_value);
204 vrc->last_value_size = 0;
205 }
206 GNUNET_free (vrc);
207}
208
209
210/**
211 * Destroy a CADET channel context struct
212 */
213static void
214destroy_cadet_channel_context (struct CadetChannelContext *cc)
215{
216 cc->destroying = GNUNET_YES;
217 if (NULL != cc->th)
218 {
219 GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
220 cc->th = NULL;
221 }
222 if (NULL != cc->pending_msg)
223 {
224 GNUNET_free (cc->pending_msg);
225 cc->pending_msg = NULL;
226 }
227 if (NULL != cc->c)
228 {
229 GNUNET_CADET_channel_destroy (cc->c);
230 cc->c = NULL;
231 }
232 GNUNET_free (cc);
233}
234
235
236/**
237 * Stop sensor value reporting module
238 */
239void
240SENSOR_reporting_value_stop ()
241{
242 struct ValueReportingContext *vrc;
243 struct CadetChannelContext *cc;
244
245 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor value reporting module.\n");
246 while (NULL != cc_head)
247 {
248 cc = cc_head;
249 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
250 destroy_cadet_channel_context (cc);
251 }
252 while (NULL != vrc_head)
253 {
254 vrc = vrc_head;
255 GNUNET_CONTAINER_DLL_remove (vrc_head, vrc_tail, vrc);
256 destroy_value_reporting_context (vrc);
257 }
258 if (NULL != peerstore)
259 {
260 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
261 peerstore = NULL;
262 }
263 if (NULL != cadet)
264 {
265 GNUNET_CADET_disconnect (cadet);
266 cadet = NULL;
267 }
268}
269
270
271/**
272 * Returns CADET channel established to given peer or creates a new one.
273 *
274 * @param pid Peer Identity
275 * @return Context of established cadet channel
276 */
277static struct CadetChannelContext *
278get_cadet_channel (struct GNUNET_PeerIdentity pid)
279{
280 struct CadetChannelContext *cc;
281
282 cc = cc_head;
283 while (NULL != cc)
284 {
285 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
286 return cc;
287 cc = cc->next;
288 }
289 cc = GNUNET_new (struct CadetChannelContext);
290 cc->c =
291 GNUNET_CADET_channel_create (cadet, cc, &pid,
292 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
293 GNUNET_CADET_OPTION_DEFAULT);
294 cc->pid = pid;
295 cc->sending = GNUNET_NO;
296 cc->destroying = GNUNET_NO;
297 GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
298 return cc;
299}
300
301
302/**
303 * Construct a reading message ready to be sent over CADET channel
304 *
305 * @param rc reporting context to read data from
306 * @param msg used to return the created message structure
307 * @return size of created message
308 */
309static size_t
310construct_reading_message (struct ValueReportingContext *vrc,
311 struct GNUNET_SENSOR_ReadingMessage **msg)
312{
313 struct GNUNET_SENSOR_ReadingMessage *ret;
314 uint16_t sensorname_size;
315 uint16_t total_size;
316 void *dummy;
317
318 sensorname_size = strlen (vrc->sensor->name) + 1;
319 total_size =
320 sizeof (struct GNUNET_SENSOR_ReadingMessage) + sensorname_size +
321 vrc->last_value_size;
322 ret = GNUNET_malloc (total_size);
323 ret->header.size = htons (total_size);
324 ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
325 ret->sensorname_size = htons (sensorname_size);
326 ret->sensorversion_major = htons (vrc->sensor->version_major);
327 ret->sensorversion_minor = htons (vrc->sensor->version_minor);
328 ret->timestamp = GNUNET_htobe64 (vrc->timestamp);
329 ret->value_size = htons (vrc->last_value_size);
330 dummy = &ret[1];
331 memcpy (dummy, vrc->sensor->name, sensorname_size);
332 dummy += sensorname_size;
333 memcpy (dummy, vrc->last_value, vrc->last_value_size);
334 *msg = ret;
335 return total_size;
336}
337
338
339/**
340 * Function called to notify a client about the connection begin ready
341 * to queue more data. @a buf will be NULL and @a size zero if the
342 * connection was closed for writing in the meantime.
343 *
344 * @param cls closure
345 * @param size number of bytes available in @a buf
346 * @param buf where the callee should write the message
347 * @return number of bytes written to @a buf
348 */
349static size_t
350do_report_value (void *cls, size_t size, void *buf)
351{
352 struct CadetChannelContext *cc = cls;
353 size_t written = 0;
354
355 cc->th = NULL;
356 cc->sending = GNUNET_NO;
357 LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
358 if (NULL == buf)
359 {
360 LOG (GNUNET_ERROR_TYPE_WARNING,
361 "CADET failed to transmit message (NULL buf), discarding.\n");
362 }
363 else if (size < cc->pending_msg_size)
364 {
365 LOG (GNUNET_ERROR_TYPE_WARNING,
366 "CADET failed to transmit message (small size, expected: %u, got: %u)"
367 ", discarding.\n", cc->pending_msg_size, size);
368 }
369 else
370 {
371 memcpy (buf, cc->pending_msg, cc->pending_msg_size);
372 written = cc->pending_msg_size;
373 }
374 GNUNET_free (cc->pending_msg);
375 cc->pending_msg = NULL;
376 cc->pending_msg_size = 0;
377 return written;
378}
379
380
381/**
382 * Task scheduled to send values to collection point
383 *
384 * @param cls closure, a `struct ValueReportingContext *`
385 * @param tc unused
386 */
387static void
388report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
389{
390 struct ValueReportingContext *vrc = cls;
391 struct GNUNET_SENSOR_SensorInfo *sensor = vrc->sensor;
392 struct CadetChannelContext *cc;
393 struct GNUNET_SENSOR_ReadingMessage *msg;
394 size_t msg_size;
395
396 vrc->cp_task = GNUNET_SCHEDULER_NO_TASK;
397 if (0 == vrc->last_value_size) /* Did not receive a sensor value yet */
398 {
399 LOG (GNUNET_ERROR_TYPE_WARNING,
400 "Did not receive a value from `%s' to report yet.\n",
401 vrc->sensor->name);
402 vrc->cp_task =
403 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
404 &report_value, vrc);
405 return;
406 }
407 LOG (GNUNET_ERROR_TYPE_DEBUG,
408 "Now trying to report last seen value of `%s' " "to collection point.\n",
409 vrc->sensor->name);
410 GNUNET_assert (NULL != sensor->collection_point);
411 cc = get_cadet_channel (*sensor->collection_point);
412 if (GNUNET_YES == cc->sending)
413 {
414 LOG (GNUNET_ERROR_TYPE_DEBUG,
415 "Cadet channel to collection point busy, "
416 "trying again for sensor `%s' after %d seconds.\n", vrc->sensor->name,
417 COLLECTION_RETRY);
418 vrc->cp_task =
419 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
420 (GNUNET_TIME_UNIT_SECONDS,
421 COLLECTION_RETRY), &report_value, vrc);
422 return;
423 }
424 msg_size = construct_reading_message (vrc, &msg);
425 cc->sending = GNUNET_YES;
426 cc->pending_msg = msg;
427 cc->pending_msg_size = msg_size;
428 cc->th =
429 GNUNET_CADET_notify_transmit_ready (cc->c, GNUNET_YES,
430 sensor->value_reporting_interval,
431 msg_size, &do_report_value, cc);
432 vrc->cp_task =
433 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
434 &report_value, vrc);
435}
436
437
438/**
439 * Sensor value watch callback
440 */
441static int
442value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
443{
444 struct ValueReportingContext *vrc = cls;
445
446 if (NULL != emsg)
447 return GNUNET_YES;
448 if (NULL != vrc->last_value)
449 {
450 GNUNET_free (vrc->last_value);
451 vrc->last_value_size = 0;
452 }
453 vrc->last_value = GNUNET_malloc (record->value_size);
454 memcpy (vrc->last_value, record->value, record->value_size);
455 vrc->last_value_size = record->value_size;
456 vrc->timestamp = GNUNET_TIME_absolute_get ().abs_value_us;
457 LOG (GNUNET_ERROR_TYPE_DEBUG,
458 "Received a sensor `%s' watch value at " "timestamp %" PRIu64
459 ", updating notification last_value.\n", vrc->sensor->name,
460 vrc->timestamp);
461 return GNUNET_YES;
462}
463
464
465/**
466 * Function called whenever a channel is destroyed. Should clean up
467 * any associated state.
468 *
469 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
470 *
471 * @param cls closure (set from #GNUNET_CADET_connect)
472 * @param channel connection to the other end (henceforth invalid)
473 * @param channel_ctx place where local state associated
474 * with the channel is stored
475 */
476static void
477cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
478 void *channel_ctx)
479{
480 struct CadetChannelContext *cc = channel_ctx;
481
482 if (GNUNET_YES == cc->destroying)
483 return;
484 LOG (GNUNET_ERROR_TYPE_DEBUG,
485 "Received a `channel destroyed' notification from CADET, "
486 "cleaning up.\n");
487 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
488 cc->c = NULL;
489 destroy_cadet_channel_context (cc);
490}
491
492
493/**
494 * Iterator for defined sensors
495 * Watches sensors for readings to report
496 *
497 * @param cls unused
498 * @param key unused
499 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
500 * @return #GNUNET_YES to continue iterations
501 */
502static int
503init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
504 void *value)
505{
506 struct GNUNET_SENSOR_SensorInfo *sensor = value;
507 struct ValueReportingContext *vrc;
508
509 if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
510 return GNUNET_YES;
511 LOG (GNUNET_ERROR_TYPE_INFO,
512 "Reporting sensor `%s' values to collection point `%s' every %s.\n",
513 sensor->name, GNUNET_i2s_full (sensor->collection_point),
514 GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
515 GNUNET_YES));
516 vrc = GNUNET_new (struct ValueReportingContext);
517 vrc->sensor = sensor;
518 vrc->last_value = NULL;
519 vrc->last_value_size = 0;
520 vrc->wc =
521 GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
522 &value_watch_cb, vrc);
523 vrc->cp_task =
524 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
525 &report_value, vrc);
526 GNUNET_CONTAINER_DLL_insert (vrc_head, vrc_tail, vrc);
527 return GNUNET_YES;
528}
529
530
531/**
532 * Start the sensor value reporting module
533 *
534 * @param c our service configuration
535 * @param sensors multihashmap of loaded sensors
536 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
537 */
538int
539SENSOR_reporting_value_start (const struct GNUNET_CONFIGURATION_Handle *c,
540 struct GNUNET_CONTAINER_MultiHashMap *sensors)
541{
542 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
543 {NULL, 0, 0}
544 };
545
546 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor value reporting module.\n");
547 GNUNET_assert (NULL != sensors);
548 cfg = c;
549 peerstore = GNUNET_PEERSTORE_connect (cfg);
550 if (NULL == peerstore)
551 {
552 LOG (GNUNET_ERROR_TYPE_ERROR,
553 _("Failed to connect to peerstore service.\n"));
554 SENSOR_reporting_value_stop ();
555 return GNUNET_SYSERR;
556 }
557 cadet =
558 GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
559 cadet_handlers, NULL);
560 if (NULL == cadet)
561 {
562 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
563 SENSOR_reporting_value_stop ();
564 return GNUNET_SYSERR;
565 }
566 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
567 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
568 return GNUNET_OK;
569}
570
571/* end of gnunet-service-sensor_reporting_value.c */
diff --git a/src/sensor/sensor.h b/src/sensor/sensor.h
index 3cca3b0c0..5d75ee427 100644
--- a/src/sensor/sensor.h
+++ b/src/sensor/sensor.h
@@ -81,29 +81,10 @@ SENSOR_analysis_start (const struct GNUNET_CONFIGURATION_Handle *c,
81 81
82 82
83/** 83/**
84 * Stop sensor value reporting module
85 */
86void
87SENSOR_reporting_value_stop ();
88
89
90/**
91 * Start the sensor value reporting module
92 *
93 * @param c our service configuration
94 * @param sensors multihashmap of loaded sensors
95 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
96 */
97int
98SENSOR_reporting_value_start (const struct GNUNET_CONFIGURATION_Handle *c,
99 struct GNUNET_CONTAINER_MultiHashMap *sensors);
100
101
102/**
103 * Stop sensor anomaly reporting module 84 * Stop sensor anomaly reporting module
104 */ 85 */
105void 86void
106SENSOR_reporting_anomaly_stop (); 87SENSOR_reporting_stop ();
107 88
108/** 89/**
109 * Used by the analysis module to tell the reporting module about a change in 90 * Used by the analysis module to tell the reporting module about a change in
@@ -125,7 +106,7 @@ SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
125 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise 106 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
126 */ 107 */
127int 108int
128SENSOR_reporting_anomaly_start (const struct GNUNET_CONFIGURATION_Handle *c, 109SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
129 struct GNUNET_CONTAINER_MultiHashMap *s); 110 struct GNUNET_CONTAINER_MultiHashMap *s);
130 111
131 112
diff --git a/src/sensor/test_sensors/test-sensor-statistics b/src/sensor/test_sensors/test-sensor-statistics
new file mode 100644
index 000000000..578907e67
--- /dev/null
+++ b/src/sensor/test_sensors/test-sensor-statistics
@@ -0,0 +1,31 @@
1[test-sensor-statistics]
2
3VERSION = 1.0
4DESCRIPTION = Test sensor for collecting data from gnunet-statistics
5CATEGORY = GNUnet
6ENABLED = YES
7
8# Start and end time format: %Y-%m-%d %H:%M:%S
9#START_TIME =
10#END_TIME =
11#Interval in seconds
12INTERVAL = 60
13#LIFETIME =
14
15#CAPABILITIES =
16
17SOURCE = gnunet-statistics
18
19GNUNET_STAT_SERVICE = test-sensor
20GNUNET_STAT_NAME = test-statistic
21
22#EXT_PROCESS =
23#EXT_ARGS =
24
25EXPECTED_DATATYPE = numeric
26
27# Reporting:
28#COLLECTION_POINT = NCEKA096482PC84GFTG61EHAVXY3BQDTPB5FANATQD5CDADJ2HP0
29# Comment or remove next line to disable reporting sensor values to collection point
30#VALUE_COLLECTION_INTERVAL = 120
31REPORT_ANOMALIES = YES