diff options
author | Omar Tarabai <tarabai@devegypt.com> | 2014-09-20 17:42:17 +0000 |
---|---|---|
committer | Omar Tarabai <tarabai@devegypt.com> | 2014-09-20 17:42:17 +0000 |
commit | f306e8dd6ea01dea1dbf2cbb6a49b899021281ad (patch) | |
tree | 235626e45f4d9d4a259859680347b40dcd47e15d /src/sensor | |
parent | 0c50476ae0a91d23c71ab8f3dddd7feb9ccc2397 (diff) | |
download | gnunet-f306e8dd6ea01dea1dbf2cbb6a49b899021281ad.tar.gz gnunet-f306e8dd6ea01dea1dbf2cbb6a49b899021281ad.zip |
sensor: retry reporting to collection point if fails
Diffstat (limited to 'src/sensor')
-rw-r--r-- | src/sensor/gnunet-service-sensor_reporting.c | 179 |
1 files changed, 176 insertions, 3 deletions
diff --git a/src/sensor/gnunet-service-sensor_reporting.c b/src/sensor/gnunet-service-sensor_reporting.c index 104c66945..3b2c08e42 100644 --- a/src/sensor/gnunet-service-sensor_reporting.c +++ b/src/sensor/gnunet-service-sensor_reporting.c | |||
@@ -33,6 +33,11 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__) |
35 | 35 | ||
36 | /** | ||
37 | * Retry time when failing to connect to collection point | ||
38 | */ | ||
39 | #define CP_RETRY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1) | ||
40 | |||
36 | 41 | ||
37 | /** | 42 | /** |
38 | * When we are still generating a proof-of-work and we need to send an anomaly | 43 | * When we are still generating a proof-of-work and we need to send an anomaly |
@@ -225,6 +230,16 @@ struct CadetPeer | |||
225 | struct GNUNET_MQ_Handle *mq; | 230 | struct GNUNET_MQ_Handle *mq; |
226 | 231 | ||
227 | /** | 232 | /** |
233 | * CADET transmit handle | ||
234 | */ | ||
235 | struct GNUNET_CADET_TransmitHandle *th; | ||
236 | |||
237 | /** | ||
238 | * Task used to try reconnection to collection point after failure | ||
239 | */ | ||
240 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; | ||
241 | |||
242 | /** | ||
228 | * Are we currently destroying the channel and its context? | 243 | * Are we currently destroying the channel and its context? |
229 | */ | 244 | */ |
230 | int destroying; | 245 | int destroying; |
@@ -324,6 +339,13 @@ static long long unsigned int pow_matching_bits; | |||
324 | 339 | ||
325 | 340 | ||
326 | 341 | ||
342 | /** | ||
343 | * Try reconnecting to collection point and send last queued message | ||
344 | */ | ||
345 | static void | ||
346 | cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
347 | |||
348 | |||
327 | /******************************************************************************/ | 349 | /******************************************************************************/ |
328 | /****************************** CLEANUP ******************************/ | 350 | /****************************** CLEANUP ******************************/ |
329 | /******************************************************************************/ | 351 | /******************************************************************************/ |
@@ -428,6 +450,11 @@ static void | |||
428 | destroy_cadet_peer (struct CadetPeer *cadetp) | 450 | destroy_cadet_peer (struct CadetPeer *cadetp) |
429 | { | 451 | { |
430 | cadetp->destroying = GNUNET_YES; | 452 | cadetp->destroying = GNUNET_YES; |
453 | if (GNUNET_SCHEDULER_NO_TASK != cadetp->reconnect_task) | ||
454 | { | ||
455 | GNUNET_SCHEDULER_cancel (cadetp->reconnect_task); | ||
456 | cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
457 | } | ||
431 | if (NULL != cadetp->mq) | 458 | if (NULL != cadetp->mq) |
432 | { | 459 | { |
433 | GNUNET_MQ_destroy (cadetp->mq); | 460 | GNUNET_MQ_destroy (cadetp->mq); |
@@ -536,6 +563,142 @@ get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor) | |||
536 | 563 | ||
537 | 564 | ||
538 | /** | 565 | /** |
566 | * Function called to notify a client about the connection | ||
567 | * begin ready to queue more data. "buf" will be | ||
568 | * NULL and "size" zero if the connection was closed for | ||
569 | * writing in the meantime. | ||
570 | * | ||
571 | * @param cls closure | ||
572 | * @param size number of bytes available in buf | ||
573 | * @param buf where the callee should write the message | ||
574 | * @return number of bytes written to buf | ||
575 | */ | ||
576 | static size_t | ||
577 | cp_mq_ntr (void *cls, size_t size, void *buf) | ||
578 | { | ||
579 | struct CadetPeer *cadetp = cls; | ||
580 | const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (cadetp->mq); | ||
581 | uint16_t msize; | ||
582 | |||
583 | LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_ntr()\n"); | ||
584 | cadetp->th = NULL; | ||
585 | if (NULL == buf) | ||
586 | { | ||
587 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
588 | "Sending anomaly report to collection point failed." | ||
589 | " Retrying connection in %s.\n", | ||
590 | GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO)); | ||
591 | cadetp->reconnect_task = | ||
592 | GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp); | ||
593 | return 0; | ||
594 | } | ||
595 | msize = ntohs (msg->size); | ||
596 | GNUNET_assert (msize <= size); | ||
597 | memcpy (buf, msg, msize); | ||
598 | GNUNET_MQ_impl_send_continue (cadetp->mq); | ||
599 | return msize; | ||
600 | } | ||
601 | |||
602 | |||
603 | /** | ||
604 | * Try reconnecting to collection point and send last queued message | ||
605 | */ | ||
606 | static void | ||
607 | cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
608 | { | ||
609 | struct CadetPeer *cadetp = cls; | ||
610 | const struct GNUNET_MessageHeader *msg; | ||
611 | |||
612 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
613 | "Retrying connection to collection point `%s'.\n", | ||
614 | GNUNET_i2s (&cadetp->peer_id)); | ||
615 | cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
616 | GNUNET_assert (NULL == cadetp->channel); | ||
617 | cadetp->channel = | ||
618 | GNUNET_CADET_channel_create (cadet, cadetp, &cadetp->peer_id, | ||
619 | GNUNET_APPLICATION_TYPE_SENSORDASHBOARD, | ||
620 | GNUNET_CADET_OPTION_RELIABLE); | ||
621 | msg = GNUNET_MQ_impl_current (cadetp->mq); | ||
622 | cadetp->th = | ||
623 | GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO, | ||
624 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
625 | ntohs (msg->size), cp_mq_ntr, cadetp); | ||
626 | } | ||
627 | |||
628 | |||
629 | /** | ||
630 | * Signature of functions implementing the | ||
631 | * sending functionality of a message queue. | ||
632 | * | ||
633 | * @param mq the message queue | ||
634 | * @param msg the message to send | ||
635 | * @param impl_state state of the implementation | ||
636 | */ | ||
637 | static void | ||
638 | cp_mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
639 | const struct GNUNET_MessageHeader *msg, void *impl_state) | ||
640 | { | ||
641 | struct CadetPeer *cadetp = impl_state; | ||
642 | |||
643 | LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_send_impl()\n"); | ||
644 | GNUNET_assert (NULL == cadetp->th); | ||
645 | if (NULL == cadetp->channel) | ||
646 | { | ||
647 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
648 | "Sending anomaly report to collection point failed." | ||
649 | " Retrying connection in %s.\n", | ||
650 | GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO)); | ||
651 | cadetp->reconnect_task = | ||
652 | GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp); | ||
653 | return; | ||
654 | } | ||
655 | cadetp->th = | ||
656 | GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO, | ||
657 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
658 | ntohs (msg->size), cp_mq_ntr, cadetp); | ||
659 | } | ||
660 | |||
661 | |||
662 | /** | ||
663 | * Signature of functions implementing the | ||
664 | * destruction of a message queue. | ||
665 | * Implementations must not free 'mq', but should | ||
666 | * take care of 'impl_state'. | ||
667 | * | ||
668 | * @param mq the message queue to destroy | ||
669 | * @param impl_state state of the implementation | ||
670 | */ | ||
671 | static void | ||
672 | cp_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) | ||
673 | { | ||
674 | struct CadetPeer *cp = impl_state; | ||
675 | |||
676 | LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_destroy_impl()\n"); | ||
677 | if (NULL != cp->th) | ||
678 | { | ||
679 | GNUNET_CADET_notify_transmit_ready_cancel (cp->th); | ||
680 | cp->th = NULL; | ||
681 | } | ||
682 | } | ||
683 | |||
684 | |||
685 | /** | ||
686 | * Create the message queue used to send messages to a collection point. | ||
687 | * This will be used to make sure that the message are queued even if the | ||
688 | * connection to the collection point can not be established at the moment. | ||
689 | * | ||
690 | * @param cp CadetPeer information struct | ||
691 | * @return Message queue handle | ||
692 | */ | ||
693 | static struct GNUNET_MQ_Handle * | ||
694 | cp_mq_create (struct CadetPeer *cp) | ||
695 | { | ||
696 | return GNUNET_MQ_queue_for_callbacks (cp_mq_send_impl, cp_mq_destroy_impl, | ||
697 | NULL, cp, NULL, NULL, NULL); | ||
698 | } | ||
699 | |||
700 | |||
701 | /** | ||
539 | * Returns context of a connected CADET peer. | 702 | * Returns context of a connected CADET peer. |
540 | * Creates it first if didn't exist before. | 703 | * Creates it first if didn't exist before. |
541 | * | 704 | * |
@@ -563,7 +726,8 @@ get_cadet_peer (struct GNUNET_PeerIdentity pid) | |||
563 | GNUNET_CADET_channel_create (cadet, cadetp, &pid, | 726 | GNUNET_CADET_channel_create (cadet, cadetp, &pid, |
564 | GNUNET_APPLICATION_TYPE_SENSORDASHBOARD, | 727 | GNUNET_APPLICATION_TYPE_SENSORDASHBOARD, |
565 | GNUNET_CADET_OPTION_RELIABLE); | 728 | GNUNET_CADET_OPTION_RELIABLE); |
566 | cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel); | 729 | cadetp->mq = cp_mq_create (cadetp); |
730 | cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
567 | GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp); | 731 | GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp); |
568 | return cadetp; | 732 | return cadetp; |
569 | } | 733 | } |
@@ -1009,9 +1173,12 @@ cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel, | |||
1009 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1173 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1010 | "CADET channel was destroyed by remote peer `%s' or failed to start.\n", | 1174 | "CADET channel was destroyed by remote peer `%s' or failed to start.\n", |
1011 | GNUNET_i2s (&cadetp->peer_id)); | 1175 | GNUNET_i2s (&cadetp->peer_id)); |
1012 | GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp); | 1176 | if (NULL != cadetp->th) |
1177 | { | ||
1178 | GNUNET_CADET_notify_transmit_ready_cancel (cadetp->th); | ||
1179 | cadetp->th = NULL; | ||
1180 | } | ||
1013 | cadetp->channel = NULL; | 1181 | cadetp->channel = NULL; |
1014 | destroy_cadet_peer (cadetp); | ||
1015 | } | 1182 | } |
1016 | 1183 | ||
1017 | 1184 | ||
@@ -1098,6 +1265,12 @@ report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1098 | "Now trying to report last seen value of `%s' to collection point.\n", | 1265 | "Now trying to report last seen value of `%s' to collection point.\n", |
1099 | sensor->name); | 1266 | sensor->name); |
1100 | cadetp = get_cadet_peer (*sensor->collection_point); | 1267 | cadetp = get_cadet_peer (*sensor->collection_point); |
1268 | if (NULL == cadetp->channel) | ||
1269 | { | ||
1270 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1271 | "Trying to send value to collection point but connection failed, discarding.\n"); | ||
1272 | return; | ||
1273 | } | ||
1101 | ev = create_value_message (vi); | 1274 | ev = create_value_message (vi); |
1102 | GNUNET_MQ_send (cadetp->mq, ev); | 1275 | GNUNET_MQ_send (cadetp->mq, ev); |
1103 | vi->last_value_reported = GNUNET_YES; | 1276 | vi->last_value_reported = GNUNET_YES; |