aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet-new_channel.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-01-24 21:00:23 +0100
committerChristian Grothoff <christian@grothoff.org>2017-01-24 21:00:23 +0100
commit8ed6d64262665ba9ce306823f569213feabba669 (patch)
treec40c8f1843c25a79e392a9766e79f1cb45eabb60 /src/cadet/gnunet-service-cadet-new_channel.c
parent3bdda27bbeb0b69bd2c5a73022b237ae0342e9c1 (diff)
downloadgnunet-8ed6d64262665ba9ce306823f569213feabba669.tar.gz
gnunet-8ed6d64262665ba9ce306823f569213feabba669.zip
fix client-client loopback flow control
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r--src/cadet/gnunet-service-cadet-new_channel.c316
1 files changed, 192 insertions, 124 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c
index 74aafe5a1..98cfa8383 100644
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ b/src/cadet/gnunet-service-cadet-new_channel.c
@@ -25,6 +25,8 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 * 26 *
27 * TODO: 27 * TODO:
28 * - FIXME: send ACKs back to loopback clients!
29 *
28 * - introduce shutdown so we can have half-closed channels, modify 30 * - introduce shutdown so we can have half-closed channels, modify
29 * destroy to include MID to have FIN-ACK equivalents, etc. 31 * destroy to include MID to have FIN-ACK equivalents, etc.
30 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! 32 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
@@ -160,6 +162,46 @@ struct CadetOutOfOrderMessage
160 162
161 163
162/** 164/**
165 * Client endpoint of a `struct CadetChannel`. A channel may be a
166 * loopback channel, in which case it has two of these endpoints.
167 * Note that flow control also is required in both directions.
168 */
169struct CadetChannelClient
170{
171 /**
172 * Client handle. Not by itself sufficient to designate
173 * the client endpoint, as the same client handle may
174 * be used for both the owner and the destination, and
175 * we thus also need the channel ID to identify the client.
176 */
177 struct CadetClient *c;
178
179 /**
180 * Head of DLL of messages received out of order or while client was unready.
181 */
182 struct CadetOutOfOrderMessage *head_recv;
183
184 /**
185 * Tail DLL of messages received out of order or while client was unready.
186 */
187 struct CadetOutOfOrderMessage *tail_recv;
188
189 /**
190 * Local tunnel number for this client.
191 * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
192 * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
193 */
194 struct GNUNET_CADET_ClientChannelNumber ccn;
195
196 /**
197 * Can we send data to the client?
198 */
199 int client_ready;
200
201};
202
203
204/**
163 * Struct containing all information regarding a channel to a remote client. 205 * Struct containing all information regarding a channel to a remote client.
164 */ 206 */
165struct CadetChannel 207struct CadetChannel
@@ -173,13 +215,13 @@ struct CadetChannel
173 * Client owner of the tunnel, if any. 215 * Client owner of the tunnel, if any.
174 * (Used if this channel represends the initiating end of the tunnel.) 216 * (Used if this channel represends the initiating end of the tunnel.)
175 */ 217 */
176 struct CadetClient *owner; 218 struct CadetChannelClient *owner;
177 219
178 /** 220 /**
179 * Client destination of the tunnel, if any. 221 * Client destination of the tunnel, if any.
180 * (Used if this channel represents the listening end of the tunnel.) 222 * (Used if this channel represents the listening end of the tunnel.)
181 */ 223 */
182 struct CadetClient *dest; 224 struct CadetChannelClient *dest;
183 225
184 /** 226 /**
185 * Last entry in the tunnel's queue relating to control messages 227 * Last entry in the tunnel's queue relating to control messages
@@ -200,16 +242,6 @@ struct CadetChannel
200 struct CadetReliableMessage *tail_sent; 242 struct CadetReliableMessage *tail_sent;
201 243
202 /** 244 /**
203 * Head of DLL of messages received out of order or while client was unready.
204 */
205 struct CadetOutOfOrderMessage *head_recv;
206
207 /**
208 * Tail DLL of messages received out of order or while client was unready.
209 */
210 struct CadetOutOfOrderMessage *tail_recv;
211
212 /**
213 * Task to resend/poll in case no ACK is received. 245 * Task to resend/poll in case no ACK is received.
214 */ 246 */
215 struct GNUNET_SCHEDULER_Task *retry_control_task; 247 struct GNUNET_SCHEDULER_Task *retry_control_task;
@@ -271,28 +303,11 @@ struct CadetChannel
271 struct GNUNET_CADET_ChannelTunnelNumber ctn; 303 struct GNUNET_CADET_ChannelTunnelNumber ctn;
272 304
273 /** 305 /**
274 * Local tunnel number for local client @e owner owning the channel.
275 * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
276 */
277 struct GNUNET_CADET_ClientChannelNumber ccn_owner;
278
279 /**
280 * Local tunnel number for local client @e dest owning the channel.
281 * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
282 */
283 struct GNUNET_CADET_ClientChannelNumber ccn_dest;
284
285 /**
286 * Channel state. 306 * Channel state.
287 */ 307 */
288 enum CadetChannelState state; 308 enum CadetChannelState state;
289 309
290 /** 310 /**
291 * Can we send data to the client?
292 */
293 int client_ready;
294
295 /**
296 * Is the tunnel bufferless (minimum latency)? 311 * Is the tunnel bufferless (minimum latency)?
297 */ 312 */
298 int nobuffer; 313 int nobuffer;
@@ -342,8 +357,8 @@ GCCH_2s (const struct CadetChannel *ch)
342 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), 357 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
343 GNUNET_h2s (&ch->port), 358 GNUNET_h2s (&ch->port),
344 ch->ctn, 359 ch->ctn,
345 ntohl (ch->ccn_owner.channel_of_client), 360 (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
346 ntohl (ch->ccn_dest.channel_of_client)); 361 (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
347 return buf; 362 return buf;
348} 363}
349 364
@@ -363,6 +378,28 @@ GCCH_get_id (const struct CadetChannel *ch)
363 378
364 379
365/** 380/**
381 * Release memory associated with @a ccc
382 *
383 * @param ccc data structure to clean up
384 */
385static void
386free_channel_client (struct CadetChannelClient *ccc)
387{
388 struct CadetOutOfOrderMessage *com;
389
390 while (NULL != (com = ccc->head_recv))
391 {
392 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
393 ccc->tail_recv,
394 com);
395 GNUNET_MQ_discard (com->env);
396 GNUNET_free (com);
397 }
398 GNUNET_free (ccc);
399}
400
401
402/**
366 * Destroy the given channel. 403 * Destroy the given channel.
367 * 404 *
368 * @param ch channel to destroy 405 * @param ch channel to destroy
@@ -371,7 +408,6 @@ static void
371channel_destroy (struct CadetChannel *ch) 408channel_destroy (struct CadetChannel *ch)
372{ 409{
373 struct CadetReliableMessage *crm; 410 struct CadetReliableMessage *crm;
374 struct CadetOutOfOrderMessage *com;
375 411
376 while (NULL != (crm = ch->head_sent)) 412 while (NULL != (crm = ch->head_sent))
377 { 413 {
@@ -386,13 +422,15 @@ channel_destroy (struct CadetChannel *ch)
386 crm); 422 crm);
387 GNUNET_free (crm); 423 GNUNET_free (crm);
388 } 424 }
389 while (NULL != (com = ch->head_recv)) 425 if (NULL != ch->owner)
390 { 426 {
391 GNUNET_CONTAINER_DLL_remove (ch->head_recv, 427 free_channel_client (ch->owner);
392 ch->tail_recv, 428 ch->owner = NULL;
393 com); 429 }
394 GNUNET_MQ_discard (com->env); 430 if (NULL != ch->dest)
395 GNUNET_free (com); 431 {
432 free_channel_client (ch->dest);
433 ch->dest = NULL;
396 } 434 }
397 if (NULL != ch->last_control_qe) 435 if (NULL != ch->last_control_qe)
398 { 436 {
@@ -444,7 +482,7 @@ channel_open_sent_cb (void *cls)
444 ch->last_control_qe = NULL; 482 ch->last_control_qe = NULL;
445 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); 483 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
446 LOG (GNUNET_ERROR_TYPE_DEBUG, 484 LOG (GNUNET_ERROR_TYPE_DEBUG,
447 "Sent CHANNEL_OPEN on %s, retrying in %s\n", 485 "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
448 GCCH_2s (ch), 486 GCCH_2s (ch),
449 GNUNET_STRINGS_relative_time_to_string (ch->retry_time, 487 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
450 GNUNET_YES)); 488 GNUNET_YES));
@@ -532,23 +570,30 @@ GCCH_channel_local_new (struct CadetClient *owner,
532 uint32_t options) 570 uint32_t options)
533{ 571{
534 struct CadetChannel *ch; 572 struct CadetChannel *ch;
573 struct CadetChannelClient *ccco;
574
575 ccco = GNUNET_new (struct CadetChannelClient);
576 ccco->c = owner;
577 ccco->ccn = ccn;
578 ccco->client_ready = GNUNET_YES;
535 579
536 ch = GNUNET_new (struct CadetChannel); 580 ch = GNUNET_new (struct CadetChannel);
537 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); 581 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
538 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); 582 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
539 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); 583 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
540 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ 584 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
541 ch->owner = owner; 585 ch->owner = ccco;
542 ch->ccn_owner = ccn;
543 ch->port = *port; 586 ch->port = *port;
544 if (0 == memcmp (&my_full_id, 587 if (0 == memcmp (&my_full_id,
545 GCP_get_id (destination), 588 GCP_get_id (destination),
546 sizeof (struct GNUNET_PeerIdentity))) 589 sizeof (struct GNUNET_PeerIdentity)))
547 { 590 {
591 struct CadetClient *c;
592
548 ch->is_loopback = GNUNET_YES; 593 ch->is_loopback = GNUNET_YES;
549 ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports, 594 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
550 port); 595 port);
551 if (NULL == ch->dest) 596 if (NULL == c)
552 { 597 {
553 /* port closed, wait for it to possibly open */ 598 /* port closed, wait for it to possibly open */
554 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, 599 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
@@ -561,8 +606,11 @@ GCCH_channel_local_new (struct CadetClient *owner,
561 } 606 }
562 else 607 else
563 { 608 {
609 ch->dest = GNUNET_new (struct CadetChannelClient);
610 ch->dest->c = c;
611 ch->dest->client_ready = GNUNET_YES;
564 GCCH_bind (ch, 612 GCCH_bind (ch,
565 ch->dest); 613 ch->dest->c);
566 } 614 }
567 } 615 }
568 else 616 else
@@ -786,20 +834,18 @@ send_ack_to_client (struct CadetChannel *ch,
786{ 834{
787 struct GNUNET_MQ_Envelope *env; 835 struct GNUNET_MQ_Envelope *env;
788 struct GNUNET_CADET_LocalAck *ack; 836 struct GNUNET_CADET_LocalAck *ack;
789 struct CadetClient *c; 837 struct CadetChannelClient *ccc;
790 838
791 env = GNUNET_MQ_msg (ack, 839 env = GNUNET_MQ_msg (ack,
792 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); 840 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
793 ack->ccn = (GNUNET_YES == to_owner) ? ch->ccn_owner : ch->ccn_dest; 841 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
794 c = (GNUNET_YES == to_owner) 842 ack->ccn = ccc->ccn;
795 ? ch->owner
796 : ch->dest;
797 LOG (GNUNET_ERROR_TYPE_DEBUG, 843 LOG (GNUNET_ERROR_TYPE_DEBUG,
798 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n", 844 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n",
799 GSC_2s (c), 845 GSC_2s (ccc->c),
800 (GNUNET_YES == to_owner) ? "owner" : "dest", 846 (GNUNET_YES == to_owner) ? "owner" : "dest",
801 ntohl (ack->ccn.channel_of_client)); 847 ntohl (ack->ccn.channel_of_client));
802 GSC_send_to_client (c, 848 GSC_send_to_client (ccc->c,
803 env); 849 env);
804} 850}
805 851
@@ -817,6 +863,7 @@ GCCH_bind (struct CadetChannel *ch,
817 struct CadetClient *c) 863 struct CadetClient *c)
818{ 864{
819 uint32_t options; 865 uint32_t options;
866 struct CadetChannelClient *cccd;
820 867
821 LOG (GNUNET_ERROR_TYPE_DEBUG, 868 LOG (GNUNET_ERROR_TYPE_DEBUG,
822 "Binding %s from %s to port %s of %s\n", 869 "Binding %s from %s to port %s of %s\n",
@@ -837,16 +884,19 @@ GCCH_bind (struct CadetChannel *ch,
837 options |= GNUNET_CADET_OPTION_RELIABLE; 884 options |= GNUNET_CADET_OPTION_RELIABLE;
838 if (ch->out_of_order) 885 if (ch->out_of_order)
839 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; 886 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
840 ch->dest = c; 887 cccd = GNUNET_new (struct CadetChannelClient);
841 ch->ccn_dest = GSC_bind (c, 888 ch->dest = cccd;
842 ch, 889 cccd->c = c;
843 (GNUNET_YES == ch->is_loopback) 890 cccd->client_ready = GNUNET_YES;
844 ? GCP_get (&my_full_id, 891 cccd->ccn = GSC_bind (c,
845 GNUNET_YES) 892 ch,
846 : GCT_get_destination (ch->t), 893 (GNUNET_YES == ch->is_loopback)
847 &ch->port, 894 ? GCP_get (&my_full_id,
848 options); 895 GNUNET_YES)
849 GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < 896 : GCT_get_destination (ch->t),
897 &ch->port,
898 options);
899 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
850 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); 900 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
851 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ 901 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
852 if (GNUNET_YES == ch->is_loopback) 902 if (GNUNET_YES == ch->is_loopback)
@@ -862,7 +912,7 @@ GCCH_bind (struct CadetChannel *ch,
862 ch); 912 ch);
863 } 913 }
864 /* give client it's initial supply of ACKs */ 914 /* give client it's initial supply of ACKs */
865 GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < 915 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
866 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); 916 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
867 for (unsigned int i=0;i<ch->max_pending_messages;i++) 917 for (unsigned int i=0;i<ch->max_pending_messages;i++)
868 send_ack_to_client (ch, 918 send_ack_to_client (ch,
@@ -876,22 +926,37 @@ GCCH_bind (struct CadetChannel *ch,
876 * 926 *
877 * @param ch channel to destroy 927 * @param ch channel to destroy
878 * @param c client that caused the destruction 928 * @param c client that caused the destruction
929 * @param ccn client number of the client @a c
879 */ 930 */
880void 931void
881GCCH_channel_local_destroy (struct CadetChannel *ch, 932GCCH_channel_local_destroy (struct CadetChannel *ch,
882 struct CadetClient *c) 933 struct CadetClient *c,
934 struct GNUNET_CADET_ClientChannelNumber ccn)
883{ 935{
884 LOG (GNUNET_ERROR_TYPE_DEBUG, 936 LOG (GNUNET_ERROR_TYPE_DEBUG,
885 "%s asks for destruction of %s\n", 937 "%s asks for destruction of %s\n",
886 GSC_2s (c), 938 GSC_2s (c),
887 GCCH_2s (ch)); 939 GCCH_2s (ch));
888 GNUNET_assert (NULL != c); 940 GNUNET_assert (NULL != c);
889 if (c == ch->owner) 941 if ( (NULL != ch->owner) &&
942 (c == ch->owner->c) &&
943 (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
944 {
945 free_channel_client (ch->owner);
890 ch->owner = NULL; 946 ch->owner = NULL;
891 else if (c == ch->dest) 947 }
948 else if ( (NULL != ch->dest) &&
949 (c == ch->dest->c) &&
950 (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
951 {
952 free_channel_client (ch->dest);
892 ch->dest = NULL; 953 ch->dest = NULL;
954 }
893 else 955 else
956 {
894 GNUNET_assert (0); 957 GNUNET_assert (0);
958 }
959
895 if (GNUNET_YES == ch->destroy) 960 if (GNUNET_YES == ch->destroy)
896 { 961 {
897 /* other end already destroyed, with the local client gone, no need 962 /* other end already destroyed, with the local client gone, no need
@@ -1008,6 +1073,7 @@ is_before (void *cls,
1008 * and send an ACK to the other end (once flow control allows it!) 1073 * and send an ACK to the other end (once flow control allows it!)
1009 * 1074 *
1010 * @param ch channel that got data 1075 * @param ch channel that got data
1076 * @param msg message that was received
1011 */ 1077 */
1012void 1078void
1013GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, 1079GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
@@ -1015,6 +1081,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1015{ 1081{
1016 struct GNUNET_MQ_Envelope *env; 1082 struct GNUNET_MQ_Envelope *env;
1017 struct GNUNET_CADET_LocalData *ld; 1083 struct GNUNET_CADET_LocalData *ld;
1084 struct CadetChannelClient *ccc;
1018 struct CadetOutOfOrderMessage *com; 1085 struct CadetOutOfOrderMessage *com;
1019 size_t payload_size; 1086 size_t payload_size;
1020 1087
@@ -1023,11 +1090,12 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1023 env = GNUNET_MQ_msg_extra (ld, 1090 env = GNUNET_MQ_msg_extra (ld,
1024 payload_size, 1091 payload_size,
1025 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); 1092 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1026 ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest; 1093 ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1027 GNUNET_memcpy (&ld[1], 1094 GNUNET_memcpy (&ld[1],
1028 &msg[1], 1095 &msg[1],
1029 payload_size); 1096 payload_size);
1030 if ( (GNUNET_YES == ch->client_ready) && 1097 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1098 if ( (GNUNET_YES == ccc->client_ready) &&
1031 ( (GNUNET_YES == ch->out_of_order) || 1099 ( (GNUNET_YES == ch->out_of_order) ||
1032 (msg->mid.mid == ch->mid_recv.mid) ) ) 1100 (msg->mid.mid == ch->mid_recv.mid) ) )
1033 { 1101 {
@@ -1035,8 +1103,9 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1035 "Giving %u bytes of payload from %s to client %s\n", 1103 "Giving %u bytes of payload from %s to client %s\n",
1036 (unsigned int) payload_size, 1104 (unsigned int) payload_size,
1037 GCCH_2s (ch), 1105 GCCH_2s (ch),
1038 GSC_2s (ch->owner ? ch->owner : ch->dest)); 1106 GSC_2s (ccc->c));
1039 GSC_send_to_client (ch->owner ? ch->owner : ch->dest, 1107 ccc->client_ready = GNUNET_NO;
1108 GSC_send_to_client (ccc->c,
1040 env); 1109 env);
1041 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); 1110 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1042 ch->mid_futures >>= 1; 1111 ch->mid_futures >>= 1;
@@ -1047,7 +1116,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1047 drop it (can't buffer too much!) */ 1116 drop it (can't buffer too much!) */
1048 LOG (GNUNET_ERROR_TYPE_DEBUG, 1117 LOG (GNUNET_ERROR_TYPE_DEBUG,
1049 "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n", 1118 "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1050 (GNUNET_YES == ch->client_ready) 1119 (GNUNET_YES == ccc->client_ready)
1051 ? "out-of-order" 1120 ? "out-of-order"
1052 : "client-not-ready", 1121 : "client-not-ready",
1053 (unsigned int) payload_size, 1122 (unsigned int) payload_size,
@@ -1059,36 +1128,36 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1059 com->mid = msg->mid; 1128 com->mid = msg->mid;
1060 com->env = env; 1129 com->env = env;
1061 /* sort into list ordered by "is_before" */ 1130 /* sort into list ordered by "is_before" */
1062 if ( (NULL == ch->head_recv) || 1131 if ( (NULL == ccc->head_recv) ||
1063 (GNUNET_YES == is_before (ch, 1132 (GNUNET_YES == is_before (ch,
1064 com, 1133 com,
1065 ch->head_recv)) ) 1134 ccc->head_recv)) )
1066 { 1135 {
1067 GNUNET_CONTAINER_DLL_insert (ch->head_recv, 1136 GNUNET_CONTAINER_DLL_insert (ccc->head_recv,
1068 ch->tail_recv, 1137 ccc->tail_recv,
1069 com); 1138 com);
1070 } 1139 }
1071 else 1140 else
1072 { 1141 {
1073 struct CadetOutOfOrderMessage *pos; 1142 struct CadetOutOfOrderMessage *pos;
1074 1143
1075 for (pos = ch->head_recv; 1144 for (pos = ccc->head_recv;
1076 NULL != pos; 1145 NULL != pos;
1077 pos = pos->next) 1146 pos = pos->next)
1078 { 1147 {
1079 if (GNUNET_YES != 1148 if (GNUNET_YES !=
1080 is_before (ch, 1149 is_before (NULL,
1081 pos, 1150 pos,
1082 com)) 1151 com))
1083 break; 1152 break;
1084 } 1153 }
1085 if (NULL == pos) 1154 if (NULL == pos)
1086 GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv, 1155 GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv,
1087 ch->tail_recv, 1156 ccc->tail_recv,
1088 com); 1157 com);
1089 else 1158 else
1090 GNUNET_CONTAINER_DLL_insert_after (ch->head_recv, 1159 GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv,
1091 ch->tail_recv, 1160 ccc->tail_recv,
1092 com, 1161 com,
1093 pos->prev); 1162 pos->prev);
1094 } 1163 }
@@ -1166,6 +1235,8 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1166void 1235void
1167GCCH_handle_remote_destroy (struct CadetChannel *ch) 1236GCCH_handle_remote_destroy (struct CadetChannel *ch)
1168{ 1237{
1238 struct CadetChannelClient *ccc;
1239
1169 GNUNET_assert (GNUNET_NO == ch->is_loopback); 1240 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1170 LOG (GNUNET_ERROR_TYPE_DEBUG, 1241 LOG (GNUNET_ERROR_TYPE_DEBUG,
1171 "Received remote channel DESTROY for %s\n", 1242 "Received remote channel DESTROY for %s\n",
@@ -1176,7 +1247,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch)
1176 channel_destroy (ch); 1247 channel_destroy (ch);
1177 return; 1248 return;
1178 } 1249 }
1179 if (NULL != ch->head_recv) 1250 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1251 if (NULL != ccc->head_recv)
1180 { 1252 {
1181 LOG (GNUNET_ERROR_TYPE_WARNING, 1253 LOG (GNUNET_ERROR_TYPE_WARNING,
1182 "Lost end of transmission due to remote shutdown on %s\n", 1254 "Lost end of transmission due to remote shutdown on %s\n",
@@ -1184,8 +1256,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch)
1184 /* FIXME: change API to notify client about truncated transmission! */ 1256 /* FIXME: change API to notify client about truncated transmission! */
1185 } 1257 }
1186 ch->destroy = GNUNET_YES; 1258 ch->destroy = GNUNET_YES;
1187 GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest, 1259 GSC_handle_remote_channel_destroy (ccc->c,
1188 (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest, 1260 ccc->ccn,
1189 ch); 1261 ch);
1190 channel_destroy (ch); 1262 channel_destroy (ch);
1191} 1263}
@@ -1326,7 +1398,7 @@ GCCH_handle_local_data (struct CadetChannel *ch,
1326 1398
1327 if (GNUNET_YES == ch->is_loopback) 1399 if (GNUNET_YES == ch->is_loopback)
1328 { 1400 {
1329 struct CadetClient *receiver; 1401 struct CadetChannelClient *receiver;
1330 struct GNUNET_MQ_Envelope *env; 1402 struct GNUNET_MQ_Envelope *env;
1331 struct GNUNET_CADET_LocalData *ld; 1403 struct GNUNET_CADET_LocalData *ld;
1332 int to_owner; 1404 int to_owner;
@@ -1335,25 +1407,24 @@ GCCH_handle_local_data (struct CadetChannel *ch,
1335 buf_len, 1407 buf_len,
1336 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); 1408 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1337 if (sender_ccn.channel_of_client == 1409 if (sender_ccn.channel_of_client ==
1338 ch->ccn_owner.channel_of_client) 1410 ch->owner->ccn.channel_of_client)
1339 { 1411 {
1340 receiver = ch->dest; 1412 receiver = ch->dest;
1341 ld->ccn = ch->ccn_dest;
1342 to_owner = GNUNET_NO; 1413 to_owner = GNUNET_NO;
1343 } 1414 }
1344 else 1415 else
1345 { 1416 {
1346 GNUNET_assert (sender_ccn.channel_of_client == 1417 GNUNET_assert (sender_ccn.channel_of_client ==
1347 ch->ccn_dest.channel_of_client); 1418 ch->dest->ccn.channel_of_client);
1348 receiver = ch->owner; 1419 receiver = ch->owner;
1349 ld->ccn = ch->ccn_owner;
1350 to_owner = GNUNET_YES; 1420 to_owner = GNUNET_YES;
1351 } 1421 }
1422 ld->ccn = receiver->ccn;
1352 GNUNET_memcpy (&ld[1], 1423 GNUNET_memcpy (&ld[1],
1353 buf, 1424 buf,
1354 buf_len); 1425 buf_len);
1355 /* FIXME: this does not provide for flow control! */ 1426 /* FIXME: this does not provide for flow control! */
1356 GSC_send_to_client (receiver, 1427 GSC_send_to_client (receiver->c,
1357 env); 1428 env);
1358 send_ack_to_client (ch, 1429 send_ack_to_client (ch,
1359 to_owner); 1430 to_owner);
@@ -1387,18 +1458,31 @@ GCCH_handle_local_data (struct CadetChannel *ch,
1387 1458
1388 1459
1389/** 1460/**
1390 * Try to deliver messages to the local client, if it is ready for more. 1461 * Handle ACK from client on local channel. Means the client is ready
1462 * for more data, see if we have any for it.
1391 * 1463 *
1392 * @param ch channel to process 1464 * @param ch channel to destroy
1465 * @param client_ccn ccn of the client sending the ack
1393 */ 1466 */
1394static void 1467void
1395send_client_buffered_data (struct CadetChannel *ch) 1468GCCH_handle_local_ack (struct CadetChannel *ch,
1469 struct GNUNET_CADET_ClientChannelNumber client_ccn)
1396{ 1470{
1471 struct CadetChannelClient *ccc;
1397 struct CadetOutOfOrderMessage *com; 1472 struct CadetOutOfOrderMessage *com;
1398 1473
1399 if (GNUNET_NO == ch->client_ready) 1474 if ( (NULL != ch->owner) &&
1400 return; /* client not ready */ 1475 (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1401 com = ch->head_recv; 1476 ccc = ch->owner;
1477 else if ( (NULL != ch->dest) &&
1478 (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1479 ccc = ch->dest;
1480 else
1481 GNUNET_assert (0);
1482 ccc->client_ready = GNUNET_YES;
1483 LOG (GNUNET_ERROR_TYPE_DEBUG,
1484 "Got LOCAL_ACK, client ready to receive more data!\n");
1485 com = ccc->head_recv;
1402 if (NULL == com) 1486 if (NULL == com)
1403 return; /* none pending */ 1487 return; /* none pending */
1404 if ( (com->mid.mid != ch->mid_recv.mid) && 1488 if ( (com->mid.mid != ch->mid_recv.mid) &&
@@ -1410,14 +1494,15 @@ send_client_buffered_data (struct CadetChannel *ch)
1410 GCCH_2s (ch)); 1494 GCCH_2s (ch));
1411 1495
1412 /* all good, pass next message to client */ 1496 /* all good, pass next message to client */
1413 GNUNET_CONTAINER_DLL_remove (ch->head_recv, 1497 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1414 ch->tail_recv, 1498 ccc->tail_recv,
1415 com); 1499 com);
1416 /* FIXME: if unreliable, this is not aggressive 1500 /* FIXME: if unreliable, this is not aggressive
1417 enough, as it would be OK to have lost some! */ 1501 enough, as it would be OK to have lost some! */
1418 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); 1502 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1419 ch->mid_futures >>= 1; /* equivalent to division by 2 */ 1503 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1420 GSC_send_to_client (ch->owner ? ch->owner : ch->dest, 1504 ccc->client_ready = GNUNET_NO;
1505 GSC_send_to_client (ccc->c,
1421 com->env); 1506 com->env);
1422 GNUNET_free (com); 1507 GNUNET_free (com);
1423 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && 1508 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
@@ -1435,7 +1520,7 @@ send_client_buffered_data (struct CadetChannel *ch)
1435 send_channel_data_ack (ch); 1520 send_channel_data_ack (ch);
1436 } 1521 }
1437 1522
1438 if (NULL != ch->head_recv) 1523 if (NULL != ccc->head_recv)
1439 return; 1524 return;
1440 if (GNUNET_NO == ch->destroy) 1525 if (GNUNET_NO == ch->destroy)
1441 return; 1526 return;
@@ -1445,23 +1530,6 @@ send_client_buffered_data (struct CadetChannel *ch)
1445} 1530}
1446 1531
1447 1532
1448/**
1449 * Handle ACK from client on local channel.
1450 *
1451 * @param ch channel to destroy
1452 * @param client_ccn ccn of the client sending the ack
1453 */
1454void
1455GCCH_handle_local_ack (struct CadetChannel *ch,
1456 struct GNUNET_CADET_ClientChannelNumber client_ccn)
1457{
1458 ch->client_ready = GNUNET_YES;
1459 LOG (GNUNET_ERROR_TYPE_DEBUG,
1460 "Got LOCAL_ACK, client ready to receive more data!\n");
1461 send_client_buffered_data (ch);
1462}
1463
1464
1465#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) 1533#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1466 1534
1467 1535
@@ -1497,17 +1565,17 @@ GCCH_debug (struct CadetChannel *ch,
1497 { 1565 {
1498 LOG2 (level, 1566 LOG2 (level,
1499 "CHN origin %s ready %s local-id: %u\n", 1567 "CHN origin %s ready %s local-id: %u\n",
1500 GSC_2s (ch->owner), 1568 GSC_2s (ch->owner->c),
1501 ch->client_ready ? "YES" : "NO", 1569 ch->owner->client_ready ? "YES" : "NO",
1502 ntohl (ch->ccn_owner.channel_of_client)); 1570 ntohl (ch->owner->ccn.channel_of_client));
1503 } 1571 }
1504 if (NULL != ch->dest) 1572 if (NULL != ch->dest)
1505 { 1573 {
1506 LOG2 (level, 1574 LOG2 (level,
1507 "CHN destination %s ready %s local-id: %u\n", 1575 "CHN destination %s ready %s local-id: %u\n",
1508 GSC_2s (ch->dest), 1576 GSC_2s (ch->dest->c),
1509 ch->client_ready ? "YES" : "NO", 1577 ch->dest->client_ready ? "YES" : "NO",
1510 ntohl (ch->ccn_dest.channel_of_client)); 1578 ntohl (ch->dest->ccn.channel_of_client));
1511 } 1579 }
1512 LOG2 (level, 1580 LOG2 (level,
1513 "CHN Message IDs recv: %d (%LLX), send: %d\n", 1581 "CHN Message IDs recv: %d (%LLX), send: %d\n",