diff options
author | Omar Tarabai <tarabai@devegypt.com> | 2014-08-07 18:24:04 +0000 |
---|---|---|
committer | Omar Tarabai <tarabai@devegypt.com> | 2014-08-07 18:24:04 +0000 |
commit | 605b0e88f6e64ac71a5afb08ce90c909a2807438 (patch) | |
tree | 9b713ebd15b1d3fa68f05ae6d62bebc899448f9a /src/sensor | |
parent | d6e728b2b869ee521216deae75a767165d3e1001 (diff) | |
download | gnunet-605b0e88f6e64ac71a5afb08ce90c909a2807438.tar.gz gnunet-605b0e88f6e64ac71a5afb08ce90c909a2807438.zip |
sensor: merged reporting module
Diffstat (limited to 'src/sensor')
-rw-r--r-- | src/sensor/Makefile.am | 7 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor.c | 6 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor_reporting.c | 976 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor_reporting_anomaly.c | 526 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor_reporting_value.c | 571 | ||||
-rw-r--r-- | src/sensor/sensor.h | 23 | ||||
-rw-r--r-- | src/sensor/test_sensors/test-sensor-statistics | 31 |
7 files changed, 1014 insertions, 1126 deletions
diff --git a/src/sensor/Makefile.am b/src/sensor/Makefile.am index fb90513dd..6cc66c72e 100644 --- a/src/sensor/Makefile.am +++ b/src/sensor/Makefile.am | |||
@@ -37,8 +37,7 @@ gnunet_sensor_LDADD = \ | |||
37 | gnunet_service_sensor_SOURCES = \ | 37 | gnunet_service_sensor_SOURCES = \ |
38 | gnunet-service-sensor.c \ | 38 | gnunet-service-sensor.c \ |
39 | gnunet-service-sensor_analysis.c \ | 39 | gnunet-service-sensor_analysis.c \ |
40 | gnunet-service-sensor_reporting_value.c \ | 40 | gnunet-service-sensor_reporting.c \ |
41 | gnunet-service-sensor_reporting_anomaly.c \ | ||
42 | gnunet-service-sensor_update.c | 41 | gnunet-service-sensor_update.c |
43 | gnunet_service_sensor_LDADD = \ | 42 | gnunet_service_sensor_LDADD = \ |
44 | libgnunetsensorutil.la \ | 43 | libgnunetsensorutil.la \ |
@@ -91,9 +90,9 @@ TESTS = $(check_PROGRAMS) | |||
91 | endif | 90 | endif |
92 | 91 | ||
93 | test_sensor_api_SOURCES = \ | 92 | test_sensor_api_SOURCES = \ |
94 | test_sensor_api.c | 93 | test_sensor_api.c |
95 | test_sensor_api_LDADD = \ | 94 | test_sensor_api_LDADD = \ |
96 | $(top_builddir)/src/util/libgnunetutil.la | 95 | $(top_builddir)/src/util/libgnunetutil.la |
97 | 96 | ||
98 | pkgsensordir = sensors | 97 | pkgsensordir = sensors |
99 | 98 | ||
diff --git a/src/sensor/gnunet-service-sensor.c b/src/sensor/gnunet-service-sensor.c index 4dc366cc1..bf88094ea 100644 --- a/src/sensor/gnunet-service-sensor.c +++ b/src/sensor/gnunet-service-sensor.c | |||
@@ -102,8 +102,7 @@ stop () | |||
102 | { | 102 | { |
103 | SENSOR_update_stop (); | 103 | SENSOR_update_stop (); |
104 | SENSOR_analysis_stop (); | 104 | SENSOR_analysis_stop (); |
105 | SENSOR_reporting_value_stop (); | 105 | SENSOR_reporting_stop (); |
106 | SENSOR_reporting_anomaly_stop (); | ||
107 | GNUNET_SENSOR_destroy_sensors (sensors); | 106 | GNUNET_SENSOR_destroy_sensors (sensors); |
108 | } | 107 | } |
109 | 108 | ||
@@ -621,8 +620,7 @@ start () | |||
621 | { | 620 | { |
622 | sensors = GNUNET_SENSOR_load_all_sensors (sensor_dir); | 621 | sensors = GNUNET_SENSOR_load_all_sensors (sensor_dir); |
623 | schedule_all_sensors (); | 622 | schedule_all_sensors (); |
624 | SENSOR_reporting_value_start (cfg, sensors); | 623 | SENSOR_reporting_start (cfg, sensors); |
625 | SENSOR_reporting_anomaly_start (cfg, sensors); | ||
626 | SENSOR_analysis_start (cfg, sensors); | 624 | SENSOR_analysis_start (cfg, sensors); |
627 | SENSOR_update_start (cfg, sensors, &reset); | 625 | SENSOR_update_start (cfg, sensors, &reset); |
628 | } | 626 | } |
diff --git a/src/sensor/gnunet-service-sensor_reporting.c b/src/sensor/gnunet-service-sensor_reporting.c new file mode 100644 index 000000000..15edd149a --- /dev/null +++ b/src/sensor/gnunet-service-sensor_reporting.c | |||
@@ -0,0 +1,976 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file sensor/gnunet-service-sensor_reporting.c | ||
23 | * @brief sensor service reporting functionality | ||
24 | * @author Omar Tarabai | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "sensor.h" | ||
29 | #include "gnunet_peerstore_service.h" | ||
30 | #include "gnunet_core_service.h" | ||
31 | #include "gnunet_cadet_service.h" | ||
32 | #include "gnunet_applications.h" | ||
33 | |||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__) | ||
35 | |||
36 | |||
37 | struct AnomalyInfo | ||
38 | { | ||
39 | |||
40 | /** | ||
41 | * DLL | ||
42 | */ | ||
43 | struct AnomalyInfo *prev; | ||
44 | |||
45 | /** | ||
46 | * DLL | ||
47 | */ | ||
48 | struct AnomalyInfo *next; | ||
49 | |||
50 | /** | ||
51 | * Sensor information | ||
52 | */ | ||
53 | struct GNUNET_SENSOR_SensorInfo *sensor; | ||
54 | |||
55 | /** | ||
56 | * Current anomalous status of sensor | ||
57 | */ | ||
58 | int anomalous; | ||
59 | |||
60 | /** | ||
61 | * List of peers that reported an anomaly for this sensor | ||
62 | */ | ||
63 | struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors; | ||
64 | |||
65 | }; | ||
66 | |||
67 | struct ValueInfo | ||
68 | { | ||
69 | |||
70 | /** | ||
71 | * DLL | ||
72 | */ | ||
73 | struct ValueInfo *prev; | ||
74 | |||
75 | /** | ||
76 | * DLL | ||
77 | */ | ||
78 | struct ValueInfo *next; | ||
79 | |||
80 | /** | ||
81 | * Sensor information | ||
82 | */ | ||
83 | struct GNUNET_SENSOR_SensorInfo *sensor; | ||
84 | |||
85 | /** | ||
86 | * Last value read from sensor | ||
87 | */ | ||
88 | void *last_value; | ||
89 | |||
90 | /** | ||
91 | * Size of @e last_value | ||
92 | */ | ||
93 | size_t last_value_size; | ||
94 | |||
95 | /** | ||
96 | * Timestamp of last value reading | ||
97 | */ | ||
98 | struct GNUNET_TIME_Absolute last_value_timestamp; | ||
99 | |||
100 | /** | ||
101 | * Has the last value seen already been reported to collection point? | ||
102 | */ | ||
103 | int last_value_reported; | ||
104 | |||
105 | /** | ||
106 | * Watcher of sensor values | ||
107 | */ | ||
108 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
109 | |||
110 | /** | ||
111 | * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK) | ||
112 | */ | ||
113 | GNUNET_SCHEDULER_TaskIdentifier reporting_task; | ||
114 | |||
115 | }; | ||
116 | |||
117 | /** | ||
118 | * Information about a connected CORE peer. | ||
119 | * Note that we only know about a connected peer if it is running the same | ||
120 | * application (sensor anomaly reporting) as us. | ||
121 | */ | ||
122 | struct CorePeer | ||
123 | { | ||
124 | |||
125 | /** | ||
126 | * DLL | ||
127 | */ | ||
128 | struct CorePeer *prev; | ||
129 | |||
130 | /** | ||
131 | * DLL | ||
132 | */ | ||
133 | struct CorePeer *next; | ||
134 | |||
135 | /** | ||
136 | * Peer identity of connected peer | ||
137 | */ | ||
138 | struct GNUNET_PeerIdentity *peer_id; | ||
139 | |||
140 | /** | ||
141 | * Message queue for messages to be sent to this peer | ||
142 | */ | ||
143 | struct GNUNET_MQ_Handle *mq; | ||
144 | |||
145 | }; | ||
146 | |||
147 | /** | ||
148 | * Information about a connected CADET peer (collection point). | ||
149 | */ | ||
150 | struct CadetPeer | ||
151 | { | ||
152 | |||
153 | /** | ||
154 | * DLL | ||
155 | */ | ||
156 | struct CadetPeer *prev; | ||
157 | |||
158 | /** | ||
159 | * DLL | ||
160 | */ | ||
161 | struct CadetPeer *next; | ||
162 | |||
163 | /** | ||
164 | * Peer Identity | ||
165 | */ | ||
166 | struct GNUNET_PeerIdentity peer_id; | ||
167 | |||
168 | /** | ||
169 | * CADET channel handle | ||
170 | */ | ||
171 | struct GNUNET_CADET_Channel *channel; | ||
172 | |||
173 | /** | ||
174 | * Message queue for messages to be sent to this peer | ||
175 | */ | ||
176 | struct GNUNET_MQ_Handle *mq; | ||
177 | |||
178 | /** | ||
179 | * Are we currently destroying the channel and its context? | ||
180 | */ | ||
181 | int destroying; | ||
182 | |||
183 | }; | ||
184 | |||
185 | |||
186 | /** | ||
187 | * Our configuration. | ||
188 | */ | ||
189 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
190 | |||
191 | /** | ||
192 | * Multihashmap of loaded sensors | ||
193 | */ | ||
194 | static struct GNUNET_CONTAINER_MultiHashMap *sensors; | ||
195 | |||
196 | /** | ||
197 | * Handle to peerstore service | ||
198 | */ | ||
199 | static struct GNUNET_PEERSTORE_Handle *peerstore; | ||
200 | |||
201 | /** | ||
202 | * Handle to core service | ||
203 | */ | ||
204 | static struct GNUNET_CORE_Handle *core; | ||
205 | |||
206 | /** | ||
207 | * Handle to CADET service | ||
208 | */ | ||
209 | static struct GNUNET_CADET_Handle *cadet; | ||
210 | |||
211 | /** | ||
212 | * My peer id | ||
213 | */ | ||
214 | static struct GNUNET_PeerIdentity mypeerid; | ||
215 | |||
216 | /** | ||
217 | * Head of DLL of anomaly info structs | ||
218 | */ | ||
219 | static struct AnomalyInfo *ai_head; | ||
220 | |||
221 | /** | ||
222 | * Tail of DLL of anomaly info structs | ||
223 | */ | ||
224 | static struct AnomalyInfo *ai_tail; | ||
225 | |||
226 | /** | ||
227 | * Head of DLL of value info structs | ||
228 | */ | ||
229 | static struct ValueInfo *vi_head; | ||
230 | |||
231 | /** | ||
232 | * Tail of DLL of value info structs | ||
233 | */ | ||
234 | static struct ValueInfo *vi_tail; | ||
235 | |||
236 | /** | ||
237 | * Head of DLL of CORE peers | ||
238 | */ | ||
239 | static struct CorePeer *corep_head; | ||
240 | |||
241 | /** | ||
242 | * Tail of DLL of CORE peers | ||
243 | */ | ||
244 | static struct CorePeer *corep_tail; | ||
245 | |||
246 | /** | ||
247 | * Head of DLL of CADET peers | ||
248 | */ | ||
249 | static struct CadetPeer *cadetp_head; | ||
250 | |||
251 | /** | ||
252 | * Tail of DLL of CADET peers | ||
253 | */ | ||
254 | static struct CadetPeer *cadetp_tail; | ||
255 | |||
256 | /** | ||
257 | * Is the module started? | ||
258 | */ | ||
259 | static int module_running = GNUNET_NO; | ||
260 | |||
261 | /** | ||
262 | * Number of known neighborhood peers | ||
263 | */ | ||
264 | static int neighborhood; | ||
265 | |||
266 | |||
267 | |||
268 | /******************************************************************************/ | ||
269 | /****************************** CLEANUP ******************************/ | ||
270 | /******************************************************************************/ | ||
271 | |||
272 | /** | ||
273 | * Destroy anomaly info struct | ||
274 | * | ||
275 | * @param ai struct to destroy | ||
276 | */ | ||
277 | static void | ||
278 | destroy_anomaly_info (struct AnomalyInfo *ai) | ||
279 | { | ||
280 | if (NULL != ai->anomalous_neighbors) | ||
281 | GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors); | ||
282 | GNUNET_free (ai); | ||
283 | } | ||
284 | |||
285 | |||
286 | /** | ||
287 | * Destroy value info struct | ||
288 | * | ||
289 | * @param vi struct to destroy | ||
290 | */ | ||
291 | static void | ||
292 | destroy_value_info (struct ValueInfo *vi) | ||
293 | { | ||
294 | if (NULL != vi->wc) | ||
295 | { | ||
296 | GNUNET_PEERSTORE_watch_cancel (vi->wc); | ||
297 | vi->wc = NULL; | ||
298 | } | ||
299 | if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task) | ||
300 | { | ||
301 | GNUNET_SCHEDULER_cancel (vi->reporting_task); | ||
302 | vi->reporting_task = GNUNET_SCHEDULER_NO_TASK; | ||
303 | } | ||
304 | if (NULL != vi->last_value) | ||
305 | { | ||
306 | GNUNET_free (vi->last_value); | ||
307 | vi->last_value = NULL; | ||
308 | } | ||
309 | GNUNET_free (vi); | ||
310 | } | ||
311 | |||
312 | |||
313 | /** | ||
314 | * Destroy core peer struct | ||
315 | * | ||
316 | * @param corep struct to destroy | ||
317 | */ | ||
318 | static void | ||
319 | destroy_core_peer (struct CorePeer *corep) | ||
320 | { | ||
321 | struct AnomalyInfo *ai; | ||
322 | |||
323 | if (NULL != corep->mq) | ||
324 | { | ||
325 | GNUNET_MQ_destroy (corep->mq); | ||
326 | corep->mq = NULL; | ||
327 | } | ||
328 | ai = ai_head; | ||
329 | while (NULL != ai) | ||
330 | { | ||
331 | GNUNET_assert (NULL != ai->anomalous_neighbors); | ||
332 | GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, | ||
333 | corep->peer_id); | ||
334 | ai = ai->next; | ||
335 | } | ||
336 | GNUNET_free (corep); | ||
337 | } | ||
338 | |||
339 | |||
340 | /** | ||
341 | * Destroy cadet peer struct | ||
342 | * | ||
343 | * @param cadetp struct to destroy | ||
344 | */ | ||
345 | static void | ||
346 | destroy_cadet_peer (struct CadetPeer *cadetp) | ||
347 | { | ||
348 | cadetp->destroying = GNUNET_YES; | ||
349 | if (NULL != cadetp->mq) | ||
350 | { | ||
351 | GNUNET_MQ_destroy (cadetp->mq); | ||
352 | cadetp->mq = NULL; | ||
353 | } | ||
354 | if (NULL != cadetp->channel) | ||
355 | { | ||
356 | GNUNET_CADET_channel_destroy (cadetp->channel); | ||
357 | cadetp->channel = NULL; | ||
358 | } | ||
359 | GNUNET_free (cadetp); | ||
360 | } | ||
361 | |||
362 | |||
363 | /** | ||
364 | * Stop sensor reporting module | ||
365 | */ | ||
366 | void | ||
367 | SENSOR_reporting_stop () | ||
368 | { | ||
369 | struct ValueInfo *vi; | ||
370 | struct CorePeer *corep; | ||
371 | struct AnomalyInfo *ai; | ||
372 | struct CadetPeer *cadetp; | ||
373 | |||
374 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n"); | ||
375 | module_running = GNUNET_NO; | ||
376 | neighborhood = 0; | ||
377 | /* Destroy value info's */ | ||
378 | vi = vi_head; | ||
379 | while (NULL != vi) | ||
380 | { | ||
381 | GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi); | ||
382 | destroy_value_info (vi); | ||
383 | vi = vi_head; | ||
384 | } | ||
385 | /* Destroy core peers */ | ||
386 | corep = corep_head; | ||
387 | while (NULL != corep) | ||
388 | { | ||
389 | GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep); | ||
390 | destroy_core_peer (corep); | ||
391 | corep = corep_head; | ||
392 | } | ||
393 | /* Destroy anomaly info's */ | ||
394 | ai = ai_head; | ||
395 | while (NULL != ai) | ||
396 | { | ||
397 | GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai); | ||
398 | destroy_anomaly_info (ai); | ||
399 | ai = ai_head; | ||
400 | } | ||
401 | /* Destroy cadet peers */ | ||
402 | cadetp = cadetp_head; | ||
403 | while (NULL != cadetp) | ||
404 | { | ||
405 | GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp); | ||
406 | destroy_cadet_peer (cadetp); | ||
407 | cadetp = cadetp_head; | ||
408 | } | ||
409 | /* Disconnect from other services */ | ||
410 | if (NULL != core) | ||
411 | { | ||
412 | GNUNET_CORE_disconnect (core); | ||
413 | core = NULL; | ||
414 | } | ||
415 | if (NULL != peerstore) | ||
416 | { | ||
417 | GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO); | ||
418 | peerstore = NULL; | ||
419 | } | ||
420 | if (NULL != cadet) | ||
421 | { | ||
422 | GNUNET_CADET_disconnect (cadet); | ||
423 | cadet = NULL; | ||
424 | } | ||
425 | } | ||
426 | |||
427 | |||
428 | /******************************************************************************/ | ||
429 | /****************************** HELPERS ******************************/ | ||
430 | /******************************************************************************/ | ||
431 | |||
432 | |||
433 | /** | ||
434 | * Gets the anomaly info struct related to the given sensor | ||
435 | * | ||
436 | * @param sensor Sensor to search by | ||
437 | */ | ||
438 | static struct AnomalyInfo * | ||
439 | get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor) | ||
440 | { | ||
441 | struct AnomalyInfo *ai; | ||
442 | |||
443 | ai = ai_head; | ||
444 | while (NULL != ai) | ||
445 | { | ||
446 | if (ai->sensor == sensor) | ||
447 | { | ||
448 | return ai; | ||
449 | } | ||
450 | ai = ai->next; | ||
451 | } | ||
452 | return NULL; | ||
453 | } | ||
454 | |||
455 | |||
456 | /** | ||
457 | * Returns context of a connected CADET peer. | ||
458 | * Creates it first if didn't exist before. | ||
459 | * | ||
460 | * @param pid Peer Identity | ||
461 | * @return Context of connected CADET peer | ||
462 | */ | ||
463 | static struct CadetPeer * | ||
464 | get_cadet_peer (struct GNUNET_PeerIdentity pid) | ||
465 | { | ||
466 | struct CadetPeer *cadetp; | ||
467 | |||
468 | cadetp = cadetp_head; | ||
469 | while (NULL != cadetp) | ||
470 | { | ||
471 | if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id)) | ||
472 | return cadetp; | ||
473 | cadetp = cadetp->next; | ||
474 | } | ||
475 | /* Not found, create struct and channel */ | ||
476 | cadetp = GNUNET_new (struct CadetPeer); | ||
477 | cadetp->peer_id = pid; | ||
478 | cadetp->channel = | ||
479 | GNUNET_CADET_channel_create (cadet, cadetp, &pid, | ||
480 | GNUNET_APPLICATION_TYPE_SENSORDASHBOARD, | ||
481 | GNUNET_CADET_OPTION_DEFAULT); | ||
482 | cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel); | ||
483 | GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp); | ||
484 | return cadetp; | ||
485 | } | ||
486 | |||
487 | |||
488 | /** | ||
489 | * Create an anomaly report message from a given anomaly info struct inside a | ||
490 | * MQ envelope. | ||
491 | * | ||
492 | * @param ai Anomaly info struct to use | ||
493 | * @return Envelope with message | ||
494 | */ | ||
495 | static struct GNUNET_MQ_Envelope * | ||
496 | create_anomaly_report_message (struct AnomalyInfo *ai) | ||
497 | { | ||
498 | struct GNUNET_SENSOR_AnomalyReportMessage *arm; | ||
499 | struct GNUNET_MQ_Envelope *ev; | ||
500 | |||
501 | ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT); | ||
502 | GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1, | ||
503 | &arm->sensorname_hash); | ||
504 | arm->sensorversion_major = htons (ai->sensor->version_major); | ||
505 | arm->sensorversion_minor = htons (ai->sensor->version_minor); | ||
506 | arm->anomalous = htons (ai->anomalous); | ||
507 | arm->anomalous_neighbors = | ||
508 | ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) / | ||
509 | neighborhood; | ||
510 | return ev; | ||
511 | } | ||
512 | |||
513 | |||
514 | /** | ||
515 | * Create a sensor value message from a given value info struct inside a MQ | ||
516 | * envelope. | ||
517 | * | ||
518 | * @param vi Value info struct to use | ||
519 | * @return Envelope with message | ||
520 | */ | ||
521 | static struct GNUNET_MQ_Envelope * | ||
522 | create_value_message (struct ValueInfo *vi) | ||
523 | { | ||
524 | struct GNUNET_SENSOR_ValueMessage *vm; | ||
525 | struct GNUNET_MQ_Envelope *ev; | ||
526 | |||
527 | ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size, GNUNET_MESSAGE_TYPE_SENSOR_READING); | ||
528 | GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1, | ||
529 | &vm->sensorname_hash); | ||
530 | vm->sensorversion_major = htons (vi->sensor->version_major); | ||
531 | vm->sensorversion_minor = htons (vi->sensor->version_minor); | ||
532 | vm->timestamp = vi->last_value_timestamp; | ||
533 | vm->value_size = htons (vi->last_value_size); | ||
534 | memcpy (&vm[1], vi->last_value, vi->last_value_size); | ||
535 | return ev; | ||
536 | } | ||
537 | |||
538 | |||
539 | /** | ||
540 | * Send given anomaly info report by putting it in the given message queue. | ||
541 | * | ||
542 | * @param mq Message queue to put the message in | ||
543 | * @param ai Anomaly info to report | ||
544 | */ | ||
545 | static void | ||
546 | send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai) | ||
547 | { | ||
548 | struct GNUNET_MQ_Envelope *ev; | ||
549 | |||
550 | ev = create_anomaly_report_message (ai); | ||
551 | GNUNET_MQ_send (mq, ev); | ||
552 | } | ||
553 | |||
554 | |||
555 | /******************************************************************************/ | ||
556 | /*************************** CORE Handlers ***************************/ | ||
557 | /******************************************************************************/ | ||
558 | |||
559 | |||
560 | /** | ||
561 | * An inbound anomaly report is received from a peer through CORE. | ||
562 | * | ||
563 | * @param cls closure (unused) | ||
564 | * @param peer the other peer involved | ||
565 | * @param message the actual message | ||
566 | * @return #GNUNET_OK to keep the connection open, | ||
567 | * #GNUNET_SYSERR to close connection to the peer (signal serious error) | ||
568 | */ | ||
569 | static int | ||
570 | handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other, | ||
571 | const struct GNUNET_MessageHeader *message) | ||
572 | { | ||
573 | struct GNUNET_SENSOR_AnomalyReportMessage *arm; | ||
574 | struct GNUNET_SENSOR_SensorInfo *sensor; | ||
575 | struct AnomalyInfo *ai; | ||
576 | struct CadetPeer *cadetp; | ||
577 | int peer_in_list; | ||
578 | |||
579 | arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message; | ||
580 | sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash); | ||
581 | if (NULL == sensor || sensor->version_major != arm->sensorversion_major || | ||
582 | sensor->version_minor != arm->sensorversion_minor) | ||
583 | { | ||
584 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
585 | "I don't have the sensor reported by the peer `%s'.\n", | ||
586 | GNUNET_i2s (other)); | ||
587 | return GNUNET_OK; | ||
588 | } | ||
589 | ai = get_anomaly_info_by_sensor (sensor); | ||
590 | GNUNET_assert (NULL != ai); | ||
591 | peer_in_list = | ||
592 | GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other); | ||
593 | if (GNUNET_YES == ai->anomalous) | ||
594 | { | ||
595 | if (GNUNET_YES == peer_in_list) | ||
596 | GNUNET_break_op (0); | ||
597 | else | ||
598 | GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL, | ||
599 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
600 | } | ||
601 | else | ||
602 | { | ||
603 | if (GNUNET_NO == peer_in_list) | ||
604 | GNUNET_break_op (0); | ||
605 | else | ||
606 | GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other); | ||
607 | } | ||
608 | /* Send anomaly update to collection point */ | ||
609 | if (NULL != ai->sensor->collection_point && | ||
610 | GNUNET_YES == ai->sensor->report_anomalies) | ||
611 | { | ||
612 | cadetp = get_cadet_peer (*ai->sensor->collection_point); | ||
613 | send_anomaly_report (cadetp->mq, ai); | ||
614 | } | ||
615 | return GNUNET_OK; | ||
616 | } | ||
617 | |||
618 | |||
619 | /******************************************************************************/ | ||
620 | /************************ PEERSTORE callbacks ************************/ | ||
621 | /******************************************************************************/ | ||
622 | |||
623 | |||
624 | /** | ||
625 | * Sensor value watch callback | ||
626 | * | ||
627 | * @param cls Closure, ValueInfo struct related to the sensor we are watching | ||
628 | * @param record PEERSTORE new record, NULL if error | ||
629 | * @param emsg Error message, NULL if no error | ||
630 | * @return GNUNET_YES to continue watching | ||
631 | */ | ||
632 | static int | ||
633 | value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg) | ||
634 | { | ||
635 | struct ValueInfo *vi = cls; | ||
636 | |||
637 | if (NULL != emsg) | ||
638 | { | ||
639 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
640 | _("PEERSTORE error: %s.\n"), emsg); | ||
641 | return GNUNET_YES; | ||
642 | } | ||
643 | if (NULL != vi->last_value) | ||
644 | { | ||
645 | GNUNET_free (vi->last_value); | ||
646 | vi->last_value_size = 0; | ||
647 | } | ||
648 | vi->last_value = GNUNET_memdup (record->value, record->value_size); | ||
649 | vi->last_value_size = record->value_size; | ||
650 | vi->last_value_timestamp = GNUNET_TIME_absolute_get(); | ||
651 | vi->last_value_reported = GNUNET_NO; | ||
652 | return GNUNET_YES; | ||
653 | } | ||
654 | |||
655 | |||
656 | /******************************************************************************/ | ||
657 | /************************** CORE callbacks ***************************/ | ||
658 | /******************************************************************************/ | ||
659 | |||
660 | |||
661 | /** | ||
662 | * Method called whenever a CORE peer disconnects. | ||
663 | * | ||
664 | * @param cls closure (unused) | ||
665 | * @param peer peer identity this notification is about | ||
666 | */ | ||
667 | static void | ||
668 | core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
669 | { | ||
670 | struct CorePeer *corep; | ||
671 | |||
672 | if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer)) | ||
673 | return; | ||
674 | neighborhood--; | ||
675 | corep = corep_head; | ||
676 | while (NULL != corep) | ||
677 | { | ||
678 | if (peer == corep->peer_id) | ||
679 | { | ||
680 | GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep); | ||
681 | destroy_core_peer (corep); | ||
682 | return; | ||
683 | } | ||
684 | corep = corep->next; | ||
685 | } | ||
686 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
687 | _("Received disconnect notification from CORE" | ||
688 | " for a peer we didn't know about.\n")); | ||
689 | } | ||
690 | |||
691 | |||
692 | /** | ||
693 | * Method called whenever a given peer connects through CORE. | ||
694 | * | ||
695 | * @param cls closure (unused) | ||
696 | * @param peer peer identity this notification is about | ||
697 | */ | ||
698 | static void | ||
699 | core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
700 | { | ||
701 | struct CorePeer *corep; | ||
702 | struct AnomalyInfo *ai; | ||
703 | |||
704 | if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer)) | ||
705 | return; | ||
706 | neighborhood++; | ||
707 | corep = GNUNET_new (struct CorePeer); | ||
708 | corep->peer_id = (struct GNUNET_PeerIdentity *) peer; | ||
709 | corep->mq = GNUNET_CORE_mq_create (core, peer); | ||
710 | GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep); | ||
711 | /* Send any locally anomalous sensors to the new peer */ | ||
712 | ai = ai_head; | ||
713 | while (NULL != ai) | ||
714 | { | ||
715 | if (GNUNET_YES == ai->anomalous) | ||
716 | send_anomaly_report (corep->mq, ai); | ||
717 | ai = ai->next; | ||
718 | } | ||
719 | } | ||
720 | |||
721 | |||
722 | /** | ||
723 | * Function called after #GNUNET_CORE_connect has succeeded (or failed | ||
724 | * for good). Note that the private key of the peer is intentionally | ||
725 | * not exposed here; if you need it, your process should try to read | ||
726 | * the private key file directly (which should work if you are | ||
727 | * authorized...). Implementations of this function must not call | ||
728 | * #GNUNET_CORE_disconnect (other than by scheduling a new task to | ||
729 | * do this later). | ||
730 | * | ||
731 | * @param cls closure (unused) | ||
732 | * @param my_identity ID of this peer, NULL if we failed | ||
733 | */ | ||
734 | static void | ||
735 | core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) | ||
736 | { | ||
737 | if (NULL == my_identity) | ||
738 | { | ||
739 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n")); | ||
740 | SENSOR_reporting_stop (); | ||
741 | return; | ||
742 | } | ||
743 | if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity)) | ||
744 | { | ||
745 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
746 | _("Peer identity received from CORE init doesn't match ours.\n")); | ||
747 | SENSOR_reporting_stop (); | ||
748 | return; | ||
749 | } | ||
750 | } | ||
751 | |||
752 | |||
753 | /******************************************************************************/ | ||
754 | /************************* CADET callbacks ***************************/ | ||
755 | /******************************************************************************/ | ||
756 | |||
757 | /** | ||
758 | * Function called whenever a channel is destroyed. Should clean up | ||
759 | * any associated state. | ||
760 | * | ||
761 | * It must NOT call #GNUNET_CADET_channel_destroy on the channel. | ||
762 | * | ||
763 | * @param cls closure (set from #GNUNET_CADET_connect) | ||
764 | * @param channel connection to the other end (henceforth invalid) | ||
765 | * @param channel_ctx place where local state associated | ||
766 | * with the channel is stored | ||
767 | */ | ||
768 | static void | ||
769 | cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel, | ||
770 | void *channel_ctx) | ||
771 | { | ||
772 | struct CadetPeer *cadetp = channel_ctx; | ||
773 | |||
774 | if (GNUNET_YES == cadetp->destroying) | ||
775 | return; | ||
776 | GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp); | ||
777 | cadetp->channel = NULL; | ||
778 | destroy_cadet_peer (cadetp); | ||
779 | } | ||
780 | |||
781 | |||
782 | /******************************************************************************/ | ||
783 | /********************** Local anomaly receiver ***********************/ | ||
784 | /******************************************************************************/ | ||
785 | |||
786 | |||
787 | /** | ||
788 | * Used by the analysis module to tell the reporting module about a change in | ||
789 | * the anomaly status of a sensor. | ||
790 | * | ||
791 | * @param sensor Related sensor | ||
792 | * @param anomalous The new sensor anomalous status | ||
793 | */ | ||
794 | void | ||
795 | SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor, | ||
796 | int anomalous) | ||
797 | { | ||
798 | struct AnomalyInfo *ai; | ||
799 | struct CorePeer *corep; | ||
800 | struct CadetPeer *cadetp; | ||
801 | |||
802 | if (GNUNET_NO == module_running) | ||
803 | return; | ||
804 | ai = get_anomaly_info_by_sensor (sensor); | ||
805 | GNUNET_assert (NULL != ai); | ||
806 | ai->anomalous = anomalous; | ||
807 | /* Report change to all neighbors */ | ||
808 | corep = corep_head; | ||
809 | while (NULL != corep) | ||
810 | { | ||
811 | send_anomaly_report (corep->mq, ai); | ||
812 | corep = corep->next; | ||
813 | } | ||
814 | if (NULL != ai->sensor->collection_point && | ||
815 | GNUNET_YES == ai->sensor->report_anomalies) | ||
816 | { | ||
817 | cadetp = get_cadet_peer (*ai->sensor->collection_point); | ||
818 | send_anomaly_report (cadetp->mq, ai); | ||
819 | } | ||
820 | } | ||
821 | |||
822 | |||
823 | /******************************************************************************/ | ||
824 | /******************* Reporting values (periodic) *********************/ | ||
825 | /******************************************************************************/ | ||
826 | |||
827 | |||
828 | /** | ||
829 | * Task scheduled to send values to collection point | ||
830 | * | ||
831 | * @param cls closure, a `struct ValueReportingContext *` | ||
832 | * @param tc unused | ||
833 | */ | ||
834 | static void | ||
835 | report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
836 | { | ||
837 | struct ValueInfo *vi = cls; | ||
838 | struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor; | ||
839 | struct CadetPeer *cadetp; | ||
840 | struct GNUNET_MQ_Envelope *ev; | ||
841 | |||
842 | vi->reporting_task = | ||
843 | GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval, | ||
844 | &report_value, vi); | ||
845 | if (0 == vi->last_value_size || | ||
846 | GNUNET_YES == vi->last_value_reported) | ||
847 | { | ||
848 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
849 | "Did not receive a fresh value from `%s' to report.\n", | ||
850 | sensor->name); | ||
851 | return; | ||
852 | } | ||
853 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
854 | "Now trying to report last seen value of `%s' to collection point.\n", | ||
855 | sensor->name); | ||
856 | cadetp = get_cadet_peer (*sensor->collection_point); | ||
857 | ev = create_value_message (vi); | ||
858 | GNUNET_MQ_send (cadetp->mq, ev); | ||
859 | vi->last_value_reported = GNUNET_YES; | ||
860 | } | ||
861 | |||
862 | |||
863 | /******************************************************************************/ | ||
864 | /******************************** INIT *******************************/ | ||
865 | /******************************************************************************/ | ||
866 | |||
867 | |||
868 | /** | ||
869 | * Iterator for defined sensors and creates anomaly info context | ||
870 | * | ||
871 | * @param cls unused | ||
872 | * @param key unused | ||
873 | * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information | ||
874 | * @return #GNUNET_YES to continue iterations | ||
875 | */ | ||
876 | static int | ||
877 | init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key, | ||
878 | void *value) | ||
879 | { | ||
880 | struct GNUNET_SENSOR_SensorInfo *sensor = value; | ||
881 | struct AnomalyInfo *ai; | ||
882 | struct ValueInfo *vi; | ||
883 | |||
884 | /* Create sensor anomaly info context */ | ||
885 | ai = GNUNET_new (struct AnomalyInfo); | ||
886 | ai->sensor = sensor; | ||
887 | ai->anomalous = GNUNET_NO; | ||
888 | ai->anomalous_neighbors = | ||
889 | GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); | ||
890 | GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai); | ||
891 | /* Create sensor value info context (if needed to be reported) */ | ||
892 | if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values) | ||
893 | return GNUNET_YES; | ||
894 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
895 | "Reporting sensor `%s' values to collection point `%s' every %s.\n", | ||
896 | sensor->name, GNUNET_i2s_full (sensor->collection_point), | ||
897 | GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval, | ||
898 | GNUNET_YES)); | ||
899 | vi = GNUNET_new (struct ValueInfo); | ||
900 | vi->sensor = sensor; | ||
901 | vi->last_value = NULL; | ||
902 | vi->last_value_size = 0; | ||
903 | vi->last_value_reported = GNUNET_NO; | ||
904 | vi->wc = | ||
905 | GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name, | ||
906 | &value_watch_cb, vi); | ||
907 | vi->reporting_task = | ||
908 | GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval, | ||
909 | &report_value, vi); | ||
910 | GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi); | ||
911 | return GNUNET_YES; | ||
912 | } | ||
913 | |||
914 | |||
915 | /** | ||
916 | * Start the sensor anomaly reporting module | ||
917 | * | ||
918 | * @param c our service configuration | ||
919 | * @param s multihashmap of loaded sensors | ||
920 | * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise | ||
921 | */ | ||
922 | int | ||
923 | SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c, | ||
924 | struct GNUNET_CONTAINER_MultiHashMap *s) | ||
925 | { | ||
926 | static struct GNUNET_CORE_MessageHandler core_handlers[] = { | ||
927 | {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT, | ||
928 | sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)}, | ||
929 | {NULL, 0, 0} | ||
930 | }; | ||
931 | static struct GNUNET_CADET_MessageHandler cadet_handlers[] = { | ||
932 | {NULL, 0, 0} | ||
933 | }; | ||
934 | |||
935 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n"); | ||
936 | GNUNET_assert (NULL != s); | ||
937 | sensors = s; | ||
938 | cfg = c; | ||
939 | /* Connect to PEERSTORE */ | ||
940 | peerstore = GNUNET_PEERSTORE_connect (cfg); | ||
941 | if (NULL == peerstore) | ||
942 | { | ||
943 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
944 | _("Failed to connect to peerstore service.\n")); | ||
945 | SENSOR_reporting_stop (); | ||
946 | return GNUNET_SYSERR; | ||
947 | } | ||
948 | /* Connect to CORE */ | ||
949 | core = | ||
950 | GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb, | ||
951 | &core_disconnect_cb, NULL, GNUNET_YES, NULL, | ||
952 | GNUNET_YES, core_handlers); | ||
953 | if (NULL == core) | ||
954 | { | ||
955 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n")); | ||
956 | SENSOR_reporting_stop (); | ||
957 | return GNUNET_SYSERR; | ||
958 | } | ||
959 | /* Connect to CADET */ | ||
960 | cadet = | ||
961 | GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed, | ||
962 | cadet_handlers, NULL); | ||
963 | if (NULL == cadet) | ||
964 | { | ||
965 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n")); | ||
966 | SENSOR_reporting_stop (); | ||
967 | return GNUNET_SYSERR; | ||
968 | } | ||
969 | GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid); | ||
970 | GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL); | ||
971 | neighborhood = 0; | ||
972 | module_running = GNUNET_YES; | ||
973 | return GNUNET_OK; | ||
974 | } | ||
975 | |||
976 | /* end of gnunet-service-sensor_reporting.c */ | ||
diff --git a/src/sensor/gnunet-service-sensor_reporting_anomaly.c b/src/sensor/gnunet-service-sensor_reporting_anomaly.c deleted file mode 100644 index ff07c2e52..000000000 --- a/src/sensor/gnunet-service-sensor_reporting_anomaly.c +++ /dev/null | |||
@@ -1,526 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file sensor/gnunet-service-sensor_reporting_anomaly.c | ||
23 | * @brief sensor service anomaly reporting functionality | ||
24 | * @author Omar Tarabai | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "sensor.h" | ||
29 | #include "gnunet_peerstore_service.h" | ||
30 | #include "gnunet_core_service.h" | ||
31 | |||
32 | #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting-anomaly",__VA_ARGS__) | ||
33 | |||
34 | struct AnomalyInfo | ||
35 | { | ||
36 | |||
37 | /** | ||
38 | * DLL | ||
39 | */ | ||
40 | struct AnomalyInfo *prev; | ||
41 | |||
42 | /** | ||
43 | * DLL | ||
44 | */ | ||
45 | struct AnomalyInfo *next; | ||
46 | |||
47 | /** | ||
48 | * Sensor information | ||
49 | */ | ||
50 | struct GNUNET_SENSOR_SensorInfo *sensor; | ||
51 | |||
52 | /** | ||
53 | * Current anomalous status of sensor | ||
54 | */ | ||
55 | int anomalous; | ||
56 | |||
57 | /** | ||
58 | * List of peers that reported an anomaly for this sensor | ||
59 | */ | ||
60 | struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors; | ||
61 | |||
62 | }; | ||
63 | |||
64 | /** | ||
65 | * Information about a connected CORE peer. | ||
66 | * Note that we only know about a connected peer if it is running the same | ||
67 | * application (sensor anomaly reporting) as us. | ||
68 | */ | ||
69 | struct CorePeer | ||
70 | { | ||
71 | |||
72 | /** | ||
73 | * DLL | ||
74 | */ | ||
75 | struct CorePeer *prev; | ||
76 | |||
77 | /** | ||
78 | * DLL | ||
79 | */ | ||
80 | struct CorePeer *next; | ||
81 | |||
82 | /** | ||
83 | * Peer identity of connected peer | ||
84 | */ | ||
85 | struct GNUNET_PeerIdentity *peerid; | ||
86 | |||
87 | /** | ||
88 | * Message queue for messages to be sent to this peer | ||
89 | */ | ||
90 | struct GNUNET_MQ_Handle *mq; | ||
91 | |||
92 | }; | ||
93 | |||
94 | |||
95 | /** | ||
96 | * Our configuration. | ||
97 | */ | ||
98 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
99 | |||
100 | /** | ||
101 | * Multihashmap of loaded sensors | ||
102 | */ | ||
103 | static struct GNUNET_CONTAINER_MultiHashMap *sensors; | ||
104 | |||
105 | /** | ||
106 | * Handle to core service | ||
107 | */ | ||
108 | static struct GNUNET_CORE_Handle *core; | ||
109 | |||
110 | /** | ||
111 | * My peer id | ||
112 | */ | ||
113 | static struct GNUNET_PeerIdentity mypeerid; | ||
114 | |||
115 | /** | ||
116 | * Head of DLL of anomaly info structs | ||
117 | */ | ||
118 | static struct AnomalyInfo *ai_head; | ||
119 | |||
120 | /** | ||
121 | * Tail of DLL of anomaly info structs | ||
122 | */ | ||
123 | static struct AnomalyInfo *ai_tail; | ||
124 | |||
125 | /** | ||
126 | * Head of DLL of CORE peers | ||
127 | */ | ||
128 | static struct CorePeer *cp_head; | ||
129 | |||
130 | /** | ||
131 | * Tail of DLL of CORE peers | ||
132 | */ | ||
133 | static struct CorePeer *cp_tail; | ||
134 | |||
135 | /** | ||
136 | * Is the module started? | ||
137 | */ | ||
138 | static int module_running = GNUNET_NO; | ||
139 | |||
140 | /** | ||
141 | * Number of known neighborhood peers | ||
142 | */ | ||
143 | static int neighborhood; | ||
144 | |||
145 | |||
146 | /** | ||
147 | * Destroy anomaly info struct | ||
148 | * | ||
149 | * @param ai struct to destroy | ||
150 | */ | ||
151 | static void | ||
152 | destroy_anomaly_info (struct AnomalyInfo *ai) | ||
153 | { | ||
154 | if (NULL != ai->anomalous_neighbors) | ||
155 | GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors); | ||
156 | GNUNET_free (ai); | ||
157 | } | ||
158 | |||
159 | |||
160 | /** | ||
161 | * Destroy core peer struct | ||
162 | * | ||
163 | * @param cp struct to destroy | ||
164 | */ | ||
165 | static void | ||
166 | destroy_core_peer (struct CorePeer *cp) | ||
167 | { | ||
168 | struct AnomalyInfo *ai; | ||
169 | |||
170 | if (NULL != cp->mq) | ||
171 | { | ||
172 | GNUNET_MQ_destroy (cp->mq); | ||
173 | cp->mq = NULL; | ||
174 | } | ||
175 | ai = ai_head; | ||
176 | while (NULL != ai) | ||
177 | { | ||
178 | GNUNET_assert (NULL != ai->anomalous_neighbors); | ||
179 | GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, | ||
180 | cp->peerid); | ||
181 | ai = ai->next; | ||
182 | } | ||
183 | GNUNET_free (cp); | ||
184 | } | ||
185 | |||
186 | |||
187 | /** | ||
188 | * Stop sensor anomaly reporting module | ||
189 | */ | ||
190 | void | ||
191 | SENSOR_reporting_anomaly_stop () | ||
192 | { | ||
193 | struct AnomalyInfo *ai; | ||
194 | struct CorePeer *cp; | ||
195 | |||
196 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n"); | ||
197 | module_running = GNUNET_NO; | ||
198 | ai = ai_head; | ||
199 | while (NULL != ai) | ||
200 | { | ||
201 | GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai); | ||
202 | destroy_anomaly_info (ai); | ||
203 | ai = ai_head; | ||
204 | } | ||
205 | cp = cp_head; | ||
206 | while (NULL != cp) | ||
207 | { | ||
208 | GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp); | ||
209 | destroy_core_peer (cp); | ||
210 | cp = cp_head; | ||
211 | } | ||
212 | neighborhood = 0; | ||
213 | if (NULL != core) | ||
214 | { | ||
215 | GNUNET_CORE_disconnect (core); | ||
216 | core = NULL; | ||
217 | } | ||
218 | } | ||
219 | |||
220 | |||
221 | /** | ||
222 | * Gets the anomaly info struct related to the given sensor | ||
223 | * | ||
224 | * @param sensor Sensor to search by | ||
225 | */ | ||
226 | static struct AnomalyInfo * | ||
227 | get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor) | ||
228 | { | ||
229 | struct AnomalyInfo *ai; | ||
230 | |||
231 | ai = ai_head; | ||
232 | while (NULL != ai) | ||
233 | { | ||
234 | if (ai->sensor == sensor) | ||
235 | { | ||
236 | return ai; | ||
237 | } | ||
238 | ai = ai->next; | ||
239 | } | ||
240 | return NULL; | ||
241 | } | ||
242 | |||
243 | |||
244 | /** | ||
245 | * Create an anomaly report message from a given anomaly info structb inside an | ||
246 | * MQ envelope. | ||
247 | * | ||
248 | * @param ai Anomaly info struct to use | ||
249 | * @return | ||
250 | */ | ||
251 | static struct GNUNET_MQ_Envelope * | ||
252 | create_anomaly_report_message (struct AnomalyInfo *ai) | ||
253 | { | ||
254 | struct AnomalyReportMessage *arm; | ||
255 | struct GNUNET_MQ_Envelope *ev; | ||
256 | |||
257 | ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT); | ||
258 | GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1, | ||
259 | &arm->sensorname_hash); | ||
260 | arm->sensorversion_major = ai->sensor->version_major; | ||
261 | arm->sensorversion_minor = ai->sensor->version_minor; | ||
262 | arm->anomalous = ai->anomalous; | ||
263 | arm->anomalous_neighbors = | ||
264 | ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) / | ||
265 | neighborhood; | ||
266 | return ev; | ||
267 | } | ||
268 | |||
269 | |||
270 | /** | ||
271 | * Send given anomaly info report to given core peer. | ||
272 | * | ||
273 | * @param cp Core peer to send the report to | ||
274 | * @param ai Anomaly info to report | ||
275 | */ | ||
276 | static void | ||
277 | send_anomaly_report (struct CorePeer *cp, struct AnomalyInfo *ai) | ||
278 | { | ||
279 | struct GNUNET_MQ_Envelope *ev; | ||
280 | |||
281 | GNUNET_assert (NULL != cp->mq); | ||
282 | ev = create_anomaly_report_message (ai); | ||
283 | GNUNET_MQ_send (cp->mq, ev); | ||
284 | } | ||
285 | |||
286 | |||
287 | /** | ||
288 | * An inbound anomaly report is received from a peer through CORE. | ||
289 | * | ||
290 | * @param cls closure (unused) | ||
291 | * @param peer the other peer involved | ||
292 | * @param message the actual message | ||
293 | * @return #GNUNET_OK to keep the connection open, | ||
294 | * #GNUNET_SYSERR to close connection to the peer (signal serious error) | ||
295 | */ | ||
296 | static int | ||
297 | handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other, | ||
298 | const struct GNUNET_MessageHeader *message) | ||
299 | { | ||
300 | struct AnomalyReportMessage *arm; | ||
301 | struct GNUNET_SENSOR_SensorInfo *sensor; | ||
302 | struct AnomalyInfo *ai; | ||
303 | int peer_in_list; | ||
304 | |||
305 | arm = (struct AnomalyReportMessage *) message; | ||
306 | sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash); | ||
307 | if (NULL == sensor || sensor->version_major != arm->sensorversion_major || | ||
308 | sensor->version_minor != arm->sensorversion_minor) | ||
309 | { | ||
310 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
311 | "I don't have the sensor reported by the peer `%s'.\n", | ||
312 | GNUNET_i2s (other)); | ||
313 | return GNUNET_OK; | ||
314 | } | ||
315 | ai = get_anomaly_info_by_sensor (sensor); | ||
316 | GNUNET_assert (NULL != ai); | ||
317 | peer_in_list = | ||
318 | GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other); | ||
319 | if (GNUNET_YES == ai->anomalous) | ||
320 | { | ||
321 | if (GNUNET_YES == peer_in_list) | ||
322 | GNUNET_break_op (0); | ||
323 | else | ||
324 | GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL, | ||
325 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | ||
326 | } | ||
327 | else | ||
328 | { | ||
329 | if (GNUNET_NO == peer_in_list) | ||
330 | GNUNET_break_op (0); | ||
331 | else | ||
332 | GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other); | ||
333 | } | ||
334 | //TODO: report to collection point if anomalous neigbors jump up or down | ||
335 | // by a configurable percentage or is now 0% or 100% | ||
336 | return GNUNET_OK; | ||
337 | } | ||
338 | |||
339 | |||
340 | /** | ||
341 | * Method called whenever a CORE peer disconnects. | ||
342 | * | ||
343 | * @param cls closure (unused) | ||
344 | * @param peer peer identity this notification is about | ||
345 | */ | ||
346 | static void | ||
347 | core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
348 | { | ||
349 | struct CorePeer *cp; | ||
350 | |||
351 | if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer)) | ||
352 | return; | ||
353 | neighborhood--; | ||
354 | cp = cp_head; | ||
355 | while (NULL != cp) | ||
356 | { | ||
357 | if (peer == cp->peerid) | ||
358 | { | ||
359 | GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp); | ||
360 | destroy_core_peer (cp); | ||
361 | return; | ||
362 | } | ||
363 | cp = cp->next; | ||
364 | } | ||
365 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
366 | _("Received disconnect notification from CORE" | ||
367 | " for a peer we didn't know about.\n")); | ||
368 | } | ||
369 | |||
370 | |||
371 | /** | ||
372 | * Method called whenever a given peer connects through CORE. | ||
373 | * | ||
374 | * @param cls closure (unused) | ||
375 | * @param peer peer identity this notification is about | ||
376 | */ | ||
377 | static void | ||
378 | core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) | ||
379 | { | ||
380 | struct CorePeer *cp; | ||
381 | struct AnomalyInfo *ai; | ||
382 | |||
383 | if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer)) | ||
384 | return; | ||
385 | neighborhood++; | ||
386 | cp = GNUNET_new (struct CorePeer); | ||
387 | cp->peerid = (struct GNUNET_PeerIdentity *) peer; | ||
388 | cp->mq = GNUNET_CORE_mq_create (core, peer); | ||
389 | GNUNET_CONTAINER_DLL_insert (cp_head, cp_tail, cp); | ||
390 | /* Send any locally anomalous sensors to the new peer */ | ||
391 | ai = ai_head; | ||
392 | while (NULL != ai) | ||
393 | { | ||
394 | if (GNUNET_YES == ai->anomalous) | ||
395 | send_anomaly_report (cp, ai); | ||
396 | ai = ai->next; | ||
397 | } | ||
398 | } | ||
399 | |||
400 | |||
401 | /** | ||
402 | * Function called after #GNUNET_CORE_connect has succeeded (or failed | ||
403 | * for good). Note that the private key of the peer is intentionally | ||
404 | * not exposed here; if you need it, your process should try to read | ||
405 | * the private key file directly (which should work if you are | ||
406 | * authorized...). Implementations of this function must not call | ||
407 | * #GNUNET_CORE_disconnect (other than by scheduling a new task to | ||
408 | * do this later). | ||
409 | * | ||
410 | * @param cls closure (unused) | ||
411 | * @param my_identity ID of this peer, NULL if we failed | ||
412 | */ | ||
413 | static void | ||
414 | core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) | ||
415 | { | ||
416 | if (NULL == my_identity) | ||
417 | { | ||
418 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n")); | ||
419 | SENSOR_reporting_anomaly_stop (); | ||
420 | return; | ||
421 | } | ||
422 | if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity)) | ||
423 | { | ||
424 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
425 | _("Peer identity received from CORE init doesn't match ours.\n")); | ||
426 | SENSOR_reporting_anomaly_stop (); | ||
427 | return; | ||
428 | } | ||
429 | } | ||
430 | |||
431 | |||
432 | /** | ||
433 | * Used by the analysis module to tell the reporting module about a change in | ||
434 | * the anomaly status of a sensor. | ||
435 | * | ||
436 | * @param sensor Related sensor | ||
437 | * @param anomalous The new sensor anomalous status | ||
438 | */ | ||
439 | void | ||
440 | SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor, | ||
441 | int anomalous) | ||
442 | { | ||
443 | struct AnomalyInfo *ai; | ||
444 | struct CorePeer *cp; | ||
445 | |||
446 | if (GNUNET_NO == module_running) | ||
447 | return; | ||
448 | ai = get_anomaly_info_by_sensor (sensor); | ||
449 | GNUNET_assert (NULL != ai); | ||
450 | ai->anomalous = anomalous; | ||
451 | /* Report change to all neighbors */ | ||
452 | cp = cp_head; | ||
453 | while (NULL != cp) | ||
454 | { | ||
455 | send_anomaly_report (cp, ai); | ||
456 | cp = cp->next; | ||
457 | } | ||
458 | //TODO: report change to collection point if report_anomalies | ||
459 | } | ||
460 | |||
461 | |||
462 | /** | ||
463 | * Iterator for defined sensors and creates anomaly info context | ||
464 | * | ||
465 | * @param cls unused | ||
466 | * @param key unused | ||
467 | * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information | ||
468 | * @return #GNUNET_YES to continue iterations | ||
469 | */ | ||
470 | static int | ||
471 | init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key, | ||
472 | void *value) | ||
473 | { | ||
474 | struct GNUNET_SENSOR_SensorInfo *sensor = value; | ||
475 | struct AnomalyInfo *ai; | ||
476 | |||
477 | ai = GNUNET_new (struct AnomalyInfo); | ||
478 | |||
479 | ai->sensor = sensor; | ||
480 | ai->anomalous = GNUNET_NO; | ||
481 | ai->anomalous_neighbors = | ||
482 | GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); | ||
483 | GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai); | ||
484 | return GNUNET_YES; | ||
485 | } | ||
486 | |||
487 | |||
488 | /** | ||
489 | * Start the sensor anomaly reporting module | ||
490 | * | ||
491 | * @param c our service configuration | ||
492 | * @param s multihashmap of loaded sensors | ||
493 | * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise | ||
494 | */ | ||
495 | int | ||
496 | SENSOR_reporting_anomaly_start (const struct GNUNET_CONFIGURATION_Handle *c, | ||
497 | struct GNUNET_CONTAINER_MultiHashMap *s) | ||
498 | { | ||
499 | static struct GNUNET_CORE_MessageHandler core_handlers[] = { | ||
500 | {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT, | ||
501 | sizeof (struct AnomalyReportMessage)}, | ||
502 | {NULL, 0, 0} | ||
503 | }; | ||
504 | |||
505 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor anomaly reporting module.\n"); | ||
506 | GNUNET_assert (NULL != s); | ||
507 | sensors = s; | ||
508 | cfg = c; | ||
509 | core = | ||
510 | GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb, | ||
511 | &core_disconnect_cb, NULL, GNUNET_YES, NULL, | ||
512 | GNUNET_YES, core_handlers); | ||
513 | if (NULL == core) | ||
514 | { | ||
515 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n")); | ||
516 | SENSOR_reporting_anomaly_stop (); | ||
517 | return GNUNET_SYSERR; | ||
518 | } | ||
519 | GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid); | ||
520 | GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL); | ||
521 | neighborhood = 0; | ||
522 | module_running = GNUNET_YES; | ||
523 | return GNUNET_OK; | ||
524 | } | ||
525 | |||
526 | /* end of gnunet-service-sensor_reporting_anomaly.c */ | ||
diff --git a/src/sensor/gnunet-service-sensor_reporting_value.c b/src/sensor/gnunet-service-sensor_reporting_value.c deleted file mode 100644 index 84fbd11ae..000000000 --- a/src/sensor/gnunet-service-sensor_reporting_value.c +++ /dev/null | |||
@@ -1,571 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file sensor/gnunet-service-sensor_reporting_value.c | ||
23 | * @brief sensor service value reporting functionality | ||
24 | * @author Omar Tarabai | ||
25 | */ | ||
26 | #include <inttypes.h> | ||
27 | #include "platform.h" | ||
28 | #include "gnunet_util_lib.h" | ||
29 | #include "sensor.h" | ||
30 | #include "gnunet_peerstore_service.h" | ||
31 | #include "gnunet_cadet_service.h" | ||
32 | #include "gnunet_applications.h" | ||
33 | |||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting-value",__VA_ARGS__) | ||
35 | |||
36 | /** | ||
37 | * Retry interval (seconds) in case channel to collection point is busy | ||
38 | */ | ||
39 | #define COLLECTION_RETRY 1 | ||
40 | |||
41 | /** | ||
42 | * Context of reporting sensor values | ||
43 | */ | ||
44 | struct ValueReportingContext | ||
45 | { | ||
46 | |||
47 | /** | ||
48 | * DLL | ||
49 | */ | ||
50 | struct ValueReportingContext *prev; | ||
51 | |||
52 | /** | ||
53 | * DLL | ||
54 | */ | ||
55 | struct ValueReportingContext *next; | ||
56 | |||
57 | /** | ||
58 | * Sensor information | ||
59 | */ | ||
60 | struct GNUNET_SENSOR_SensorInfo *sensor; | ||
61 | |||
62 | /** | ||
63 | * Collection point reporting task | ||
64 | * (or #GNUNET_SCHEDULER_NO_TASK) | ||
65 | */ | ||
66 | GNUNET_SCHEDULER_TaskIdentifier cp_task; | ||
67 | |||
68 | /** | ||
69 | * Watcher of sensor values | ||
70 | */ | ||
71 | struct GNUNET_PEERSTORE_WatchContext *wc; | ||
72 | |||
73 | /** | ||
74 | * Last value read from sensor | ||
75 | */ | ||
76 | void *last_value; | ||
77 | |||
78 | /** | ||
79 | * Size of @e last_value | ||
80 | */ | ||
81 | size_t last_value_size; | ||
82 | |||
83 | /** | ||
84 | * Timestamp of last value reading | ||
85 | */ | ||
86 | uint64_t timestamp; | ||
87 | |||
88 | }; | ||
89 | |||
90 | /** | ||
91 | * Context of a created CADET channel | ||
92 | */ | ||
93 | struct CadetChannelContext | ||
94 | { | ||
95 | |||
96 | /** | ||
97 | * DLL | ||
98 | */ | ||
99 | struct CadetChannelContext *prev; | ||
100 | |||
101 | /** | ||
102 | * DLL | ||
103 | */ | ||
104 | struct CadetChannelContext *next; | ||
105 | |||
106 | /** | ||
107 | * Peer Id of | ||
108 | */ | ||
109 | struct GNUNET_PeerIdentity pid; | ||
110 | |||
111 | /** | ||
112 | * CADET channel handle | ||
113 | */ | ||
114 | struct GNUNET_CADET_Channel *c; | ||
115 | |||
116 | /** | ||
117 | * Are we sending data on this channel? | ||
118 | * #GNUNET_YES / #GNUNET_NO | ||
119 | */ | ||
120 | int sending; | ||
121 | |||
122 | /** | ||
123 | * Pointer to a pending message to be sent over the channel | ||
124 | */ | ||
125 | void *pending_msg; | ||
126 | |||
127 | /** | ||
128 | * Size of @e pending_msg | ||
129 | */ | ||
130 | size_t pending_msg_size; | ||
131 | |||
132 | /** | ||
133 | * Handle to CADET tranmission request in case we are sending | ||
134 | * (sending == #GNUNET_YES) | ||
135 | */ | ||
136 | struct GNUNET_CADET_TransmitHandle *th; | ||
137 | |||
138 | /** | ||
139 | * Are we currently destroying the channel and its context? | ||
140 | */ | ||
141 | int destroying; | ||
142 | |||
143 | }; | ||
144 | |||
145 | /** | ||
146 | * Our configuration. | ||
147 | */ | ||
148 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
149 | |||
150 | /** | ||
151 | * Handle to peerstore service | ||
152 | */ | ||
153 | static struct GNUNET_PEERSTORE_Handle *peerstore; | ||
154 | |||
155 | /** | ||
156 | * My peer id | ||
157 | */ | ||
158 | static struct GNUNET_PeerIdentity mypeerid; | ||
159 | |||
160 | /** | ||
161 | * Handle to CADET service | ||
162 | */ | ||
163 | static struct GNUNET_CADET_Handle *cadet; | ||
164 | |||
165 | /** | ||
166 | * Head of DLL of all reporting contexts | ||
167 | */ | ||
168 | struct ValueReportingContext *vrc_head; | ||
169 | |||
170 | /** | ||
171 | * Tail of DLL of all reporting contexts | ||
172 | */ | ||
173 | struct ValueReportingContext *vrc_tail; | ||
174 | |||
175 | /** | ||
176 | * Head of DLL of all cadet channels | ||
177 | */ | ||
178 | struct CadetChannelContext *cc_head; | ||
179 | |||
180 | /** | ||
181 | * Tail of DLL of all cadet channels | ||
182 | */ | ||
183 | struct CadetChannelContext *cc_tail; | ||
184 | |||
185 | /** | ||
186 | * Destroy a reporting context structure | ||
187 | */ | ||
188 | static void | ||
189 | destroy_value_reporting_context (struct ValueReportingContext *vrc) | ||
190 | { | ||
191 | if (NULL != vrc->wc) | ||
192 | { | ||
193 | GNUNET_PEERSTORE_watch_cancel (vrc->wc); | ||
194 | vrc->wc = NULL; | ||
195 | } | ||
196 | if (GNUNET_SCHEDULER_NO_TASK != vrc->cp_task) | ||
197 | { | ||
198 | GNUNET_SCHEDULER_cancel (vrc->cp_task); | ||
199 | vrc->cp_task = GNUNET_SCHEDULER_NO_TASK; | ||
200 | } | ||
201 | if (NULL != vrc->last_value) | ||
202 | { | ||
203 | GNUNET_free (vrc->last_value); | ||
204 | vrc->last_value_size = 0; | ||
205 | } | ||
206 | GNUNET_free (vrc); | ||
207 | } | ||
208 | |||
209 | |||
210 | /** | ||
211 | * Destroy a CADET channel context struct | ||
212 | */ | ||
213 | static void | ||
214 | destroy_cadet_channel_context (struct CadetChannelContext *cc) | ||
215 | { | ||
216 | cc->destroying = GNUNET_YES; | ||
217 | if (NULL != cc->th) | ||
218 | { | ||
219 | GNUNET_CADET_notify_transmit_ready_cancel (cc->th); | ||
220 | cc->th = NULL; | ||
221 | } | ||
222 | if (NULL != cc->pending_msg) | ||
223 | { | ||
224 | GNUNET_free (cc->pending_msg); | ||
225 | cc->pending_msg = NULL; | ||
226 | } | ||
227 | if (NULL != cc->c) | ||
228 | { | ||
229 | GNUNET_CADET_channel_destroy (cc->c); | ||
230 | cc->c = NULL; | ||
231 | } | ||
232 | GNUNET_free (cc); | ||
233 | } | ||
234 | |||
235 | |||
236 | /** | ||
237 | * Stop sensor value reporting module | ||
238 | */ | ||
239 | void | ||
240 | SENSOR_reporting_value_stop () | ||
241 | { | ||
242 | struct ValueReportingContext *vrc; | ||
243 | struct CadetChannelContext *cc; | ||
244 | |||
245 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor value reporting module.\n"); | ||
246 | while (NULL != cc_head) | ||
247 | { | ||
248 | cc = cc_head; | ||
249 | GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc); | ||
250 | destroy_cadet_channel_context (cc); | ||
251 | } | ||
252 | while (NULL != vrc_head) | ||
253 | { | ||
254 | vrc = vrc_head; | ||
255 | GNUNET_CONTAINER_DLL_remove (vrc_head, vrc_tail, vrc); | ||
256 | destroy_value_reporting_context (vrc); | ||
257 | } | ||
258 | if (NULL != peerstore) | ||
259 | { | ||
260 | GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES); | ||
261 | peerstore = NULL; | ||
262 | } | ||
263 | if (NULL != cadet) | ||
264 | { | ||
265 | GNUNET_CADET_disconnect (cadet); | ||
266 | cadet = NULL; | ||
267 | } | ||
268 | } | ||
269 | |||
270 | |||
271 | /** | ||
272 | * Returns CADET channel established to given peer or creates a new one. | ||
273 | * | ||
274 | * @param pid Peer Identity | ||
275 | * @return Context of established cadet channel | ||
276 | */ | ||
277 | static struct CadetChannelContext * | ||
278 | get_cadet_channel (struct GNUNET_PeerIdentity pid) | ||
279 | { | ||
280 | struct CadetChannelContext *cc; | ||
281 | |||
282 | cc = cc_head; | ||
283 | while (NULL != cc) | ||
284 | { | ||
285 | if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid)) | ||
286 | return cc; | ||
287 | cc = cc->next; | ||
288 | } | ||
289 | cc = GNUNET_new (struct CadetChannelContext); | ||
290 | cc->c = | ||
291 | GNUNET_CADET_channel_create (cadet, cc, &pid, | ||
292 | GNUNET_APPLICATION_TYPE_SENSORDASHBOARD, | ||
293 | GNUNET_CADET_OPTION_DEFAULT); | ||
294 | cc->pid = pid; | ||
295 | cc->sending = GNUNET_NO; | ||
296 | cc->destroying = GNUNET_NO; | ||
297 | GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc); | ||
298 | return cc; | ||
299 | } | ||
300 | |||
301 | |||
302 | /** | ||
303 | * Construct a reading message ready to be sent over CADET channel | ||
304 | * | ||
305 | * @param rc reporting context to read data from | ||
306 | * @param msg used to return the created message structure | ||
307 | * @return size of created message | ||
308 | */ | ||
309 | static size_t | ||
310 | construct_reading_message (struct ValueReportingContext *vrc, | ||
311 | struct GNUNET_SENSOR_ReadingMessage **msg) | ||
312 | { | ||
313 | struct GNUNET_SENSOR_ReadingMessage *ret; | ||
314 | uint16_t sensorname_size; | ||
315 | uint16_t total_size; | ||
316 | void *dummy; | ||
317 | |||
318 | sensorname_size = strlen (vrc->sensor->name) + 1; | ||
319 | total_size = | ||
320 | sizeof (struct GNUNET_SENSOR_ReadingMessage) + sensorname_size + | ||
321 | vrc->last_value_size; | ||
322 | ret = GNUNET_malloc (total_size); | ||
323 | ret->header.size = htons (total_size); | ||
324 | ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING); | ||
325 | ret->sensorname_size = htons (sensorname_size); | ||
326 | ret->sensorversion_major = htons (vrc->sensor->version_major); | ||
327 | ret->sensorversion_minor = htons (vrc->sensor->version_minor); | ||
328 | ret->timestamp = GNUNET_htobe64 (vrc->timestamp); | ||
329 | ret->value_size = htons (vrc->last_value_size); | ||
330 | dummy = &ret[1]; | ||
331 | memcpy (dummy, vrc->sensor->name, sensorname_size); | ||
332 | dummy += sensorname_size; | ||
333 | memcpy (dummy, vrc->last_value, vrc->last_value_size); | ||
334 | *msg = ret; | ||
335 | return total_size; | ||
336 | } | ||
337 | |||
338 | |||
339 | /** | ||
340 | * Function called to notify a client about the connection begin ready | ||
341 | * to queue more data. @a buf will be NULL and @a size zero if the | ||
342 | * connection was closed for writing in the meantime. | ||
343 | * | ||
344 | * @param cls closure | ||
345 | * @param size number of bytes available in @a buf | ||
346 | * @param buf where the callee should write the message | ||
347 | * @return number of bytes written to @a buf | ||
348 | */ | ||
349 | static size_t | ||
350 | do_report_value (void *cls, size_t size, void *buf) | ||
351 | { | ||
352 | struct CadetChannelContext *cc = cls; | ||
353 | size_t written = 0; | ||
354 | |||
355 | cc->th = NULL; | ||
356 | cc->sending = GNUNET_NO; | ||
357 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n"); | ||
358 | if (NULL == buf) | ||
359 | { | ||
360 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
361 | "CADET failed to transmit message (NULL buf), discarding.\n"); | ||
362 | } | ||
363 | else if (size < cc->pending_msg_size) | ||
364 | { | ||
365 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
366 | "CADET failed to transmit message (small size, expected: %u, got: %u)" | ||
367 | ", discarding.\n", cc->pending_msg_size, size); | ||
368 | } | ||
369 | else | ||
370 | { | ||
371 | memcpy (buf, cc->pending_msg, cc->pending_msg_size); | ||
372 | written = cc->pending_msg_size; | ||
373 | } | ||
374 | GNUNET_free (cc->pending_msg); | ||
375 | cc->pending_msg = NULL; | ||
376 | cc->pending_msg_size = 0; | ||
377 | return written; | ||
378 | } | ||
379 | |||
380 | |||
381 | /** | ||
382 | * Task scheduled to send values to collection point | ||
383 | * | ||
384 | * @param cls closure, a `struct ValueReportingContext *` | ||
385 | * @param tc unused | ||
386 | */ | ||
387 | static void | ||
388 | report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
389 | { | ||
390 | struct ValueReportingContext *vrc = cls; | ||
391 | struct GNUNET_SENSOR_SensorInfo *sensor = vrc->sensor; | ||
392 | struct CadetChannelContext *cc; | ||
393 | struct GNUNET_SENSOR_ReadingMessage *msg; | ||
394 | size_t msg_size; | ||
395 | |||
396 | vrc->cp_task = GNUNET_SCHEDULER_NO_TASK; | ||
397 | if (0 == vrc->last_value_size) /* Did not receive a sensor value yet */ | ||
398 | { | ||
399 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
400 | "Did not receive a value from `%s' to report yet.\n", | ||
401 | vrc->sensor->name); | ||
402 | vrc->cp_task = | ||
403 | GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval, | ||
404 | &report_value, vrc); | ||
405 | return; | ||
406 | } | ||
407 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
408 | "Now trying to report last seen value of `%s' " "to collection point.\n", | ||
409 | vrc->sensor->name); | ||
410 | GNUNET_assert (NULL != sensor->collection_point); | ||
411 | cc = get_cadet_channel (*sensor->collection_point); | ||
412 | if (GNUNET_YES == cc->sending) | ||
413 | { | ||
414 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
415 | "Cadet channel to collection point busy, " | ||
416 | "trying again for sensor `%s' after %d seconds.\n", vrc->sensor->name, | ||
417 | COLLECTION_RETRY); | ||
418 | vrc->cp_task = | ||
419 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply | ||
420 | (GNUNET_TIME_UNIT_SECONDS, | ||
421 | COLLECTION_RETRY), &report_value, vrc); | ||
422 | return; | ||
423 | } | ||
424 | msg_size = construct_reading_message (vrc, &msg); | ||
425 | cc->sending = GNUNET_YES; | ||
426 | cc->pending_msg = msg; | ||
427 | cc->pending_msg_size = msg_size; | ||
428 | cc->th = | ||
429 | GNUNET_CADET_notify_transmit_ready (cc->c, GNUNET_YES, | ||
430 | sensor->value_reporting_interval, | ||
431 | msg_size, &do_report_value, cc); | ||
432 | vrc->cp_task = | ||
433 | GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval, | ||
434 | &report_value, vrc); | ||
435 | } | ||
436 | |||
437 | |||
438 | /** | ||
439 | * Sensor value watch callback | ||
440 | */ | ||
441 | static int | ||
442 | value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg) | ||
443 | { | ||
444 | struct ValueReportingContext *vrc = cls; | ||
445 | |||
446 | if (NULL != emsg) | ||
447 | return GNUNET_YES; | ||
448 | if (NULL != vrc->last_value) | ||
449 | { | ||
450 | GNUNET_free (vrc->last_value); | ||
451 | vrc->last_value_size = 0; | ||
452 | } | ||
453 | vrc->last_value = GNUNET_malloc (record->value_size); | ||
454 | memcpy (vrc->last_value, record->value, record->value_size); | ||
455 | vrc->last_value_size = record->value_size; | ||
456 | vrc->timestamp = GNUNET_TIME_absolute_get ().abs_value_us; | ||
457 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
458 | "Received a sensor `%s' watch value at " "timestamp %" PRIu64 | ||
459 | ", updating notification last_value.\n", vrc->sensor->name, | ||
460 | vrc->timestamp); | ||
461 | return GNUNET_YES; | ||
462 | } | ||
463 | |||
464 | |||
465 | /** | ||
466 | * Function called whenever a channel is destroyed. Should clean up | ||
467 | * any associated state. | ||
468 | * | ||
469 | * It must NOT call #GNUNET_CADET_channel_destroy on the channel. | ||
470 | * | ||
471 | * @param cls closure (set from #GNUNET_CADET_connect) | ||
472 | * @param channel connection to the other end (henceforth invalid) | ||
473 | * @param channel_ctx place where local state associated | ||
474 | * with the channel is stored | ||
475 | */ | ||
476 | static void | ||
477 | cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel, | ||
478 | void *channel_ctx) | ||
479 | { | ||
480 | struct CadetChannelContext *cc = channel_ctx; | ||
481 | |||
482 | if (GNUNET_YES == cc->destroying) | ||
483 | return; | ||
484 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
485 | "Received a `channel destroyed' notification from CADET, " | ||
486 | "cleaning up.\n"); | ||
487 | GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc); | ||
488 | cc->c = NULL; | ||
489 | destroy_cadet_channel_context (cc); | ||
490 | } | ||
491 | |||
492 | |||
493 | /** | ||
494 | * Iterator for defined sensors | ||
495 | * Watches sensors for readings to report | ||
496 | * | ||
497 | * @param cls unused | ||
498 | * @param key unused | ||
499 | * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information | ||
500 | * @return #GNUNET_YES to continue iterations | ||
501 | */ | ||
502 | static int | ||
503 | init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key, | ||
504 | void *value) | ||
505 | { | ||
506 | struct GNUNET_SENSOR_SensorInfo *sensor = value; | ||
507 | struct ValueReportingContext *vrc; | ||
508 | |||
509 | if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values) | ||
510 | return GNUNET_YES; | ||
511 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
512 | "Reporting sensor `%s' values to collection point `%s' every %s.\n", | ||
513 | sensor->name, GNUNET_i2s_full (sensor->collection_point), | ||
514 | GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval, | ||
515 | GNUNET_YES)); | ||
516 | vrc = GNUNET_new (struct ValueReportingContext); | ||
517 | vrc->sensor = sensor; | ||
518 | vrc->last_value = NULL; | ||
519 | vrc->last_value_size = 0; | ||
520 | vrc->wc = | ||
521 | GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name, | ||
522 | &value_watch_cb, vrc); | ||
523 | vrc->cp_task = | ||
524 | GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval, | ||
525 | &report_value, vrc); | ||
526 | GNUNET_CONTAINER_DLL_insert (vrc_head, vrc_tail, vrc); | ||
527 | return GNUNET_YES; | ||
528 | } | ||
529 | |||
530 | |||
531 | /** | ||
532 | * Start the sensor value reporting module | ||
533 | * | ||
534 | * @param c our service configuration | ||
535 | * @param sensors multihashmap of loaded sensors | ||
536 | * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise | ||
537 | */ | ||
538 | int | ||
539 | SENSOR_reporting_value_start (const struct GNUNET_CONFIGURATION_Handle *c, | ||
540 | struct GNUNET_CONTAINER_MultiHashMap *sensors) | ||
541 | { | ||
542 | static struct GNUNET_CADET_MessageHandler cadet_handlers[] = { | ||
543 | {NULL, 0, 0} | ||
544 | }; | ||
545 | |||
546 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor value reporting module.\n"); | ||
547 | GNUNET_assert (NULL != sensors); | ||
548 | cfg = c; | ||
549 | peerstore = GNUNET_PEERSTORE_connect (cfg); | ||
550 | if (NULL == peerstore) | ||
551 | { | ||
552 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
553 | _("Failed to connect to peerstore service.\n")); | ||
554 | SENSOR_reporting_value_stop (); | ||
555 | return GNUNET_SYSERR; | ||
556 | } | ||
557 | cadet = | ||
558 | GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed, | ||
559 | cadet_handlers, NULL); | ||
560 | if (NULL == cadet) | ||
561 | { | ||
562 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n")); | ||
563 | SENSOR_reporting_value_stop (); | ||
564 | return GNUNET_SYSERR; | ||
565 | } | ||
566 | GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid); | ||
567 | GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL); | ||
568 | return GNUNET_OK; | ||
569 | } | ||
570 | |||
571 | /* end of gnunet-service-sensor_reporting_value.c */ | ||
diff --git a/src/sensor/sensor.h b/src/sensor/sensor.h index 3cca3b0c0..5d75ee427 100644 --- a/src/sensor/sensor.h +++ b/src/sensor/sensor.h | |||
@@ -81,29 +81,10 @@ SENSOR_analysis_start (const struct GNUNET_CONFIGURATION_Handle *c, | |||
81 | 81 | ||
82 | 82 | ||
83 | /** | 83 | /** |
84 | * Stop sensor value reporting module | ||
85 | */ | ||
86 | void | ||
87 | SENSOR_reporting_value_stop (); | ||
88 | |||
89 | |||
90 | /** | ||
91 | * Start the sensor value reporting module | ||
92 | * | ||
93 | * @param c our service configuration | ||
94 | * @param sensors multihashmap of loaded sensors | ||
95 | * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise | ||
96 | */ | ||
97 | int | ||
98 | SENSOR_reporting_value_start (const struct GNUNET_CONFIGURATION_Handle *c, | ||
99 | struct GNUNET_CONTAINER_MultiHashMap *sensors); | ||
100 | |||
101 | |||
102 | /** | ||
103 | * Stop sensor anomaly reporting module | 84 | * Stop sensor anomaly reporting module |
104 | */ | 85 | */ |
105 | void | 86 | void |
106 | SENSOR_reporting_anomaly_stop (); | 87 | SENSOR_reporting_stop (); |
107 | 88 | ||
108 | /** | 89 | /** |
109 | * Used by the analysis module to tell the reporting module about a change in | 90 | * Used by the analysis module to tell the reporting module about a change in |
@@ -125,7 +106,7 @@ SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor, | |||
125 | * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise | 106 | * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise |
126 | */ | 107 | */ |
127 | int | 108 | int |
128 | SENSOR_reporting_anomaly_start (const struct GNUNET_CONFIGURATION_Handle *c, | 109 | SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c, |
129 | struct GNUNET_CONTAINER_MultiHashMap *s); | 110 | struct GNUNET_CONTAINER_MultiHashMap *s); |
130 | 111 | ||
131 | 112 | ||
diff --git a/src/sensor/test_sensors/test-sensor-statistics b/src/sensor/test_sensors/test-sensor-statistics new file mode 100644 index 000000000..578907e67 --- /dev/null +++ b/src/sensor/test_sensors/test-sensor-statistics | |||
@@ -0,0 +1,31 @@ | |||
1 | [test-sensor-statistics] | ||
2 | |||
3 | VERSION = 1.0 | ||
4 | DESCRIPTION = Test sensor for collecting data from gnunet-statistics | ||
5 | CATEGORY = GNUnet | ||
6 | ENABLED = YES | ||
7 | |||
8 | # Start and end time format: %Y-%m-%d %H:%M:%S | ||
9 | #START_TIME = | ||
10 | #END_TIME = | ||
11 | #Interval in seconds | ||
12 | INTERVAL = 60 | ||
13 | #LIFETIME = | ||
14 | |||
15 | #CAPABILITIES = | ||
16 | |||
17 | SOURCE = gnunet-statistics | ||
18 | |||
19 | GNUNET_STAT_SERVICE = test-sensor | ||
20 | GNUNET_STAT_NAME = test-statistic | ||
21 | |||
22 | #EXT_PROCESS = | ||
23 | #EXT_ARGS = | ||
24 | |||
25 | EXPECTED_DATATYPE = numeric | ||
26 | |||
27 | # Reporting: | ||
28 | #COLLECTION_POINT = NCEKA096482PC84GFTG61EHAVXY3BQDTPB5FANATQD5CDADJ2HP0 | ||
29 | # Comment or remove next line to disable reporting sensor values to collection point | ||
30 | #VALUE_COLLECTION_INTERVAL = 120 | ||
31 | REPORT_ANOMALIES = YES | ||