aboutsummaryrefslogtreecommitdiff
path: root/src/sensor
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-09-20 17:42:17 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-09-20 17:42:17 +0000
commitf306e8dd6ea01dea1dbf2cbb6a49b899021281ad (patch)
tree235626e45f4d9d4a259859680347b40dcd47e15d /src/sensor
parent0c50476ae0a91d23c71ab8f3dddd7feb9ccc2397 (diff)
downloadgnunet-f306e8dd6ea01dea1dbf2cbb6a49b899021281ad.tar.gz
gnunet-f306e8dd6ea01dea1dbf2cbb6a49b899021281ad.zip
sensor: retry reporting to collection point if fails
Diffstat (limited to 'src/sensor')
-rw-r--r--src/sensor/gnunet-service-sensor_reporting.c179
1 files changed, 176 insertions, 3 deletions
diff --git a/src/sensor/gnunet-service-sensor_reporting.c b/src/sensor/gnunet-service-sensor_reporting.c
index 104c66945..3b2c08e42 100644
--- a/src/sensor/gnunet-service-sensor_reporting.c
+++ b/src/sensor/gnunet-service-sensor_reporting.c
@@ -33,6 +33,11 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35 35
36/**
37 * Retry time when failing to connect to collection point
38 */
39#define CP_RETRY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
40
36 41
37/** 42/**
38 * When we are still generating a proof-of-work and we need to send an anomaly 43 * When we are still generating a proof-of-work and we need to send an anomaly
@@ -225,6 +230,16 @@ struct CadetPeer
225 struct GNUNET_MQ_Handle *mq; 230 struct GNUNET_MQ_Handle *mq;
226 231
227 /** 232 /**
233 * CADET transmit handle
234 */
235 struct GNUNET_CADET_TransmitHandle *th;
236
237 /**
238 * Task used to try reconnection to collection point after failure
239 */
240 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
241
242 /**
228 * Are we currently destroying the channel and its context? 243 * Are we currently destroying the channel and its context?
229 */ 244 */
230 int destroying; 245 int destroying;
@@ -324,6 +339,13 @@ static long long unsigned int pow_matching_bits;
324 339
325 340
326 341
342/**
343 * Try reconnecting to collection point and send last queued message
344 */
345static void
346cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
347
348
327/******************************************************************************/ 349/******************************************************************************/
328/****************************** CLEANUP ******************************/ 350/****************************** CLEANUP ******************************/
329/******************************************************************************/ 351/******************************************************************************/
@@ -428,6 +450,11 @@ static void
428destroy_cadet_peer (struct CadetPeer *cadetp) 450destroy_cadet_peer (struct CadetPeer *cadetp)
429{ 451{
430 cadetp->destroying = GNUNET_YES; 452 cadetp->destroying = GNUNET_YES;
453 if (GNUNET_SCHEDULER_NO_TASK != cadetp->reconnect_task)
454 {
455 GNUNET_SCHEDULER_cancel (cadetp->reconnect_task);
456 cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
457 }
431 if (NULL != cadetp->mq) 458 if (NULL != cadetp->mq)
432 { 459 {
433 GNUNET_MQ_destroy (cadetp->mq); 460 GNUNET_MQ_destroy (cadetp->mq);
@@ -536,6 +563,142 @@ get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
536 563
537 564
538/** 565/**
566 * Function called to notify a client about the connection
567 * begin ready to queue more data. "buf" will be
568 * NULL and "size" zero if the connection was closed for
569 * writing in the meantime.
570 *
571 * @param cls closure
572 * @param size number of bytes available in buf
573 * @param buf where the callee should write the message
574 * @return number of bytes written to buf
575 */
576static size_t
577cp_mq_ntr (void *cls, size_t size, void *buf)
578{
579 struct CadetPeer *cadetp = cls;
580 const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (cadetp->mq);
581 uint16_t msize;
582
583 LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_ntr()\n");
584 cadetp->th = NULL;
585 if (NULL == buf)
586 {
587 LOG (GNUNET_ERROR_TYPE_INFO,
588 "Sending anomaly report to collection point failed."
589 " Retrying connection in %s.\n",
590 GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
591 cadetp->reconnect_task =
592 GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
593 return 0;
594 }
595 msize = ntohs (msg->size);
596 GNUNET_assert (msize <= size);
597 memcpy (buf, msg, msize);
598 GNUNET_MQ_impl_send_continue (cadetp->mq);
599 return msize;
600}
601
602
603/**
604 * Try reconnecting to collection point and send last queued message
605 */
606static void
607cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
608{
609 struct CadetPeer *cadetp = cls;
610 const struct GNUNET_MessageHeader *msg;
611
612 LOG (GNUNET_ERROR_TYPE_INFO,
613 "Retrying connection to collection point `%s'.\n",
614 GNUNET_i2s (&cadetp->peer_id));
615 cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
616 GNUNET_assert (NULL == cadetp->channel);
617 cadetp->channel =
618 GNUNET_CADET_channel_create (cadet, cadetp, &cadetp->peer_id,
619 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
620 GNUNET_CADET_OPTION_RELIABLE);
621 msg = GNUNET_MQ_impl_current (cadetp->mq);
622 cadetp->th =
623 GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
624 GNUNET_TIME_UNIT_FOREVER_REL,
625 ntohs (msg->size), cp_mq_ntr, cadetp);
626}
627
628
629/**
630 * Signature of functions implementing the
631 * sending functionality of a message queue.
632 *
633 * @param mq the message queue
634 * @param msg the message to send
635 * @param impl_state state of the implementation
636 */
637static void
638cp_mq_send_impl (struct GNUNET_MQ_Handle *mq,
639 const struct GNUNET_MessageHeader *msg, void *impl_state)
640{
641 struct CadetPeer *cadetp = impl_state;
642
643 LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_send_impl()\n");
644 GNUNET_assert (NULL == cadetp->th);
645 if (NULL == cadetp->channel)
646 {
647 LOG (GNUNET_ERROR_TYPE_INFO,
648 "Sending anomaly report to collection point failed."
649 " Retrying connection in %s.\n",
650 GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
651 cadetp->reconnect_task =
652 GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
653 return;
654 }
655 cadetp->th =
656 GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
657 GNUNET_TIME_UNIT_FOREVER_REL,
658 ntohs (msg->size), cp_mq_ntr, cadetp);
659}
660
661
662/**
663 * Signature of functions implementing the
664 * destruction of a message queue.
665 * Implementations must not free 'mq', but should
666 * take care of 'impl_state'.
667 *
668 * @param mq the message queue to destroy
669 * @param impl_state state of the implementation
670 */
671static void
672cp_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
673{
674 struct CadetPeer *cp = impl_state;
675
676 LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_destroy_impl()\n");
677 if (NULL != cp->th)
678 {
679 GNUNET_CADET_notify_transmit_ready_cancel (cp->th);
680 cp->th = NULL;
681 }
682}
683
684
685/**
686 * Create the message queue used to send messages to a collection point.
687 * This will be used to make sure that the message are queued even if the
688 * connection to the collection point can not be established at the moment.
689 *
690 * @param cp CadetPeer information struct
691 * @return Message queue handle
692 */
693static struct GNUNET_MQ_Handle *
694cp_mq_create (struct CadetPeer *cp)
695{
696 return GNUNET_MQ_queue_for_callbacks (cp_mq_send_impl, cp_mq_destroy_impl,
697 NULL, cp, NULL, NULL, NULL);
698}
699
700
701/**
539 * Returns context of a connected CADET peer. 702 * Returns context of a connected CADET peer.
540 * Creates it first if didn't exist before. 703 * Creates it first if didn't exist before.
541 * 704 *
@@ -563,7 +726,8 @@ get_cadet_peer (struct GNUNET_PeerIdentity pid)
563 GNUNET_CADET_channel_create (cadet, cadetp, &pid, 726 GNUNET_CADET_channel_create (cadet, cadetp, &pid,
564 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD, 727 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
565 GNUNET_CADET_OPTION_RELIABLE); 728 GNUNET_CADET_OPTION_RELIABLE);
566 cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel); 729 cadetp->mq = cp_mq_create (cadetp);
730 cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
567 GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp); 731 GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
568 return cadetp; 732 return cadetp;
569} 733}
@@ -1009,9 +1173,12 @@ cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
1009 LOG (GNUNET_ERROR_TYPE_DEBUG, 1173 LOG (GNUNET_ERROR_TYPE_DEBUG,
1010 "CADET channel was destroyed by remote peer `%s' or failed to start.\n", 1174 "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
1011 GNUNET_i2s (&cadetp->peer_id)); 1175 GNUNET_i2s (&cadetp->peer_id));
1012 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp); 1176 if (NULL != cadetp->th)
1177 {
1178 GNUNET_CADET_notify_transmit_ready_cancel (cadetp->th);
1179 cadetp->th = NULL;
1180 }
1013 cadetp->channel = NULL; 1181 cadetp->channel = NULL;
1014 destroy_cadet_peer (cadetp);
1015} 1182}
1016 1183
1017 1184
@@ -1098,6 +1265,12 @@ report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1098 "Now trying to report last seen value of `%s' to collection point.\n", 1265 "Now trying to report last seen value of `%s' to collection point.\n",
1099 sensor->name); 1266 sensor->name);
1100 cadetp = get_cadet_peer (*sensor->collection_point); 1267 cadetp = get_cadet_peer (*sensor->collection_point);
1268 if (NULL == cadetp->channel)
1269 {
1270 LOG (GNUNET_ERROR_TYPE_WARNING,
1271 "Trying to send value to collection point but connection failed, discarding.\n");
1272 return;
1273 }
1101 ev = create_value_message (vi); 1274 ev = create_value_message (vi);
1102 GNUNET_MQ_send (cadetp->mq, ev); 1275 GNUNET_MQ_send (cadetp->mq, ev);
1103 vi->last_value_reported = GNUNET_YES; 1276 vi->last_value_reported = GNUNET_YES;