From 53d3397bf638156cbbedd9d8da379eb502dc9d83 Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Tue, 12 Aug 2014 11:38:22 +0000 Subject: sensordashboard: storing received anomaly reports --- src/include/gnunet_sensor_util_lib.h | 2 +- .../gnunet-service-sensordashboard.c | 63 ++++++++++++++++++++-- 2 files changed, 60 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/include/gnunet_sensor_util_lib.h b/src/include/gnunet_sensor_util_lib.h index c5d4e6697..87ae564fa 100644 --- a/src/include/gnunet_sensor_util_lib.h +++ b/src/include/gnunet_sensor_util_lib.h @@ -325,7 +325,7 @@ struct GNUNET_SENSOR_AnomalyReportMessage /** * New anomaly status */ - uint8_t anomalous; + uint16_t anomalous; /** * Percentage of neighbors reported the same anomaly diff --git a/src/sensordashboard/gnunet-service-sensordashboard.c b/src/sensordashboard/gnunet-service-sensordashboard.c index 732b5a535..7dc1b33d6 100644 --- a/src/sensordashboard/gnunet-service-sensordashboard.c +++ b/src/sensordashboard/gnunet-service-sensordashboard.c @@ -154,9 +154,16 @@ static struct GNUNET_CADET_Handle *cadet; static struct GNUNET_PEERSTORE_Handle *peerstore; /** - * Name of this subsystem to be used for peerstore operations + * Name of the subsystem used to store sensor values received from remote peers + * in PEERSTORE */ -static char *subsystem = "sensordashboard"; +static char *values_subsystem = "sensordashboard-values"; + +/** + * Name of the subsystem used to store anomaly reports received from remote + * peers in PEERSTORE + */ +static char *anomalies_subsystem = "sensordashboard-anomalies"; /** * Head of a DLL of all connected client peers @@ -392,6 +399,51 @@ queue_msg (struct GNUNET_MessageHeader *msg, struct ClientPeerContext *cp) } +/** + * Called with any anomaly report received from a peer. + * + * Each time the function must call #GNUNET_CADET_receive_done on the channel + * in order to receive the next message. This doesn't need to be immediate: + * can be delayed if some processing is done on the message. + * + * @param cls Closure (set from #GNUNET_CADET_connect). + * @param channel Connection to the other end. + * @param channel_ctx Place to store local state associated with the channel. + * @param message The actual message. + * @return #GNUNET_OK to keep the channel open, + * #GNUNET_SYSERR to close it (signal serious error). + */ +static int +handle_anomaly_report (void *cls, struct GNUNET_CADET_Channel *channel, + void **channel_ctx, + const struct GNUNET_MessageHeader *message) +{ + struct ClientPeerContext *cp = *channel_ctx; + struct GNUNET_SENSOR_AnomalyReportMessage *anomaly_msg; + struct GNUNET_SENSOR_SensorInfo *sensor; + uint16_t anomalous; + struct GNUNET_TIME_Absolute expiry; + + anomaly_msg = (struct GNUNET_SENSOR_AnomalyReportMessage *) message; + sensor = + GNUNET_CONTAINER_multihashmap_get (sensors, + &anomaly_msg->sensorname_hash); + if (NULL == sensor) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + anomalous = ntohs (anomaly_msg->anomalous); + expiry = + (GNUNET_YES == + anomalous) ? GNUNET_TIME_UNIT_FOREVER_ABS : GNUNET_TIME_absolute_get (); + GNUNET_PEERSTORE_store (peerstore, anomalies_subsystem, &cp->peerid, + sensor->name, &anomalous, sizeof (anomalous), expiry, + GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); + return GNUNET_OK; +} + + /** * Iterate over defined sensors, creates and sends brief sensor information to * given client peer over CADET. @@ -501,7 +553,7 @@ parse_reading_message (const struct GNUNET_MessageHeader *msg, return NULL; } if ((sensor->version_minor != ntohs (vm->sensorversion_minor)) || - (sensor->version_major != ntohs (vm->sensorversion_major))) + (sensor->version_major != ntohs (vm->sensorversion_major))) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Sensor version mismatch in reading message.\n"); @@ -558,7 +610,7 @@ handle_sensor_reading (void *cls, struct GNUNET_CADET_Channel *channel, "# Sensor name: `%s'\n" "# Timestamp: %" PRIu64 "\n" "# Value size: %" PRIu64 ".\n", GNUNET_i2s (&cp->peerid), reading->sensor->name, reading->timestamp, reading->value_size); - GNUNET_PEERSTORE_store (peerstore, subsystem, &cp->peerid, + GNUNET_PEERSTORE_store (peerstore, values_subsystem, &cp->peerid, reading->sensor->name, reading->value, reading->value_size, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, NULL, NULL); @@ -726,6 +778,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, {&handle_sensor_full_req, GNUNET_MESSAGE_TYPE_SENSOR_FULL_REQ, sizeof (struct GNUNET_MessageHeader)}, + {&handle_anomaly_report, + GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT, + sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)}, {NULL, 0, 0} }; static uint32_t cadet_ports[] = { -- cgit v1.2.3