aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/cadet_api.c
diff options
context:
space:
mode:
authorBart Polot <bart.polot+voyager@gmail.com>2017-02-02 17:16:20 +0100
committerBart Polot <bart.polot+voyager@gmail.com>2017-02-02 17:16:20 +0100
commit17047b7bcbe3f1756028058a9887416c6afab5d8 (patch)
tree58a529f577b4d0f13d74a47a460a2f7673b95d1d /src/cadet/cadet_api.c
parent761447f2122038ce821780f6de37e5355f0ee301 (diff)
downloadgnunet-17047b7bcbe3f1756028058a9887416c6afab5d8.tar.gz
gnunet-17047b7bcbe3f1756028058a9887416c6afab5d8.zip
Implement data ack in CADET MQ API
Diffstat (limited to 'src/cadet/cadet_api.c')
-rw-r--r--src/cadet/cadet_api.c54
1 files changed, 51 insertions, 3 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 7640a924a..d3bb1abd6 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -291,6 +291,11 @@ struct GNUNET_CADET_Channel
291 struct GNUNET_MQ_Handle *mq; 291 struct GNUNET_MQ_Handle *mq;
292 292
293 /** 293 /**
294 * Task to allow mq to send more traffic.
295 */
296 struct GNUNET_SCHEDULER_Task *mq_cont;
297
298 /**
294 * Window change handler. 299 * Window change handler.
295 */ 300 */
296 GNUNET_CADET_WindowSizeEventHandler window_changes; 301 GNUNET_CADET_WindowSizeEventHandler window_changes;
@@ -630,10 +635,36 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
630} 635}
631 636
632 637
638/**
639 * Notify the application about a change in the window size (if needed).
640 *
641 * @param ch Channel to notify about.
642 */
643static void
644notify_window_size (struct GNUNET_CADET_Channel *ch)
645{
646 if (NULL != ch->window_changes)
647 {
648 ch->window_changes (ch->ctx, ch, ch->allow_send);
649 }
650}
651
633/******************************************************************************/ 652/******************************************************************************/
634/*********************** MQ API CALLBACKS ****************************/ 653/*********************** MQ API CALLBACKS ****************************/
635/******************************************************************************/ 654/******************************************************************************/
636 655
656/**
657 * Allow the MQ implementation to send the next message.
658 *
659 * @param cls Closure (channel whose mq to activate).
660 */
661static void
662cadet_mq_send_continue (void *cls)
663{
664 struct GNUNET_CADET_Channel *ch = cls;
665
666 GNUNET_MQ_impl_send_continue (ch->mq);
667}
637 668
638/** 669/**
639 * Implement sending functionality of a message queue for 670 * Implement sending functionality of a message queue for
@@ -680,7 +711,14 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
680 msg); 711 msg);
681 cadet_msg->ccn = ch->ccn; 712 cadet_msg->ccn = ch->ccn;
682 GNUNET_MQ_send (h->mq, env); 713 GNUNET_MQ_send (h->mq, env);
683 GNUNET_MQ_impl_send_continue (mq); 714
715 GNUNET_assert (0 < ch->allow_send);
716 ch->allow_send--;
717 notify_window_size (ch);
718 if (0 < ch->allow_send)
719 {
720 ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
721 } /* Otherwise it will be called upon ACK receipt. */
684} 722}
685 723
686 724
@@ -1012,8 +1050,6 @@ handle_local_data (void *cls,
1012 * 1050 *
1013 * @param h Cadet handle. 1051 * @param h Cadet handle.
1014 * @param message Message itself. 1052 * @param message Message itself.
1015 *
1016 * FIXME either delete or port to MQ
1017 */ 1053 */
1018static void 1054static void
1019handle_local_ack (void *cls, 1055handle_local_ack (void *cls,
@@ -1038,6 +1074,18 @@ handle_local_ack (void *cls,
1038 "Got an ACK on channel %X, allow send now %u!\n", 1074 "Got an ACK on channel %X, allow send now %u!\n",
1039 ntohl (ch->ccn.channel_of_client), 1075 ntohl (ch->ccn.channel_of_client),
1040 ch->allow_send); 1076 ch->allow_send);
1077 if (NULL != ch->mq)
1078 {
1079 notify_window_size (ch);
1080 if (1 == ch->allow_send)
1081 {
1082 ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
1083 }
1084 return;
1085 }
1086
1087 /** @deprecated */
1088 /* Old style API */
1041 for (th = h->th_head; NULL != th; th = th->next) 1089 for (th = h->th_head; NULL != th; th = th->next)
1042 { 1090 {
1043 if ( (th->channel == ch) && 1091 if ( (th->channel == ch) &&