diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-01-24 21:00:23 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-01-24 21:00:23 +0100 |
commit | 8ed6d64262665ba9ce306823f569213feabba669 (patch) | |
tree | c40c8f1843c25a79e392a9766e79f1cb45eabb60 /src/cadet/gnunet-service-cadet-new_channel.c | |
parent | 3bdda27bbeb0b69bd2c5a73022b237ae0342e9c1 (diff) | |
download | gnunet-8ed6d64262665ba9ce306823f569213feabba669.tar.gz gnunet-8ed6d64262665ba9ce306823f569213feabba669.zip |
fix client-client loopback flow control
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_channel.c | 316 |
1 files changed, 192 insertions, 124 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 74aafe5a1..98cfa8383 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c | |||
@@ -25,6 +25,8 @@ | |||
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | * | 26 | * |
27 | * TODO: | 27 | * TODO: |
28 | * - FIXME: send ACKs back to loopback clients! | ||
29 | * | ||
28 | * - introduce shutdown so we can have half-closed channels, modify | 30 | * - introduce shutdown so we can have half-closed channels, modify |
29 | * destroy to include MID to have FIN-ACK equivalents, etc. | 31 | * destroy to include MID to have FIN-ACK equivalents, etc. |
30 | * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! | 32 | * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! |
@@ -160,6 +162,46 @@ struct CadetOutOfOrderMessage | |||
160 | 162 | ||
161 | 163 | ||
162 | /** | 164 | /** |
165 | * Client endpoint of a `struct CadetChannel`. A channel may be a | ||
166 | * loopback channel, in which case it has two of these endpoints. | ||
167 | * Note that flow control also is required in both directions. | ||
168 | */ | ||
169 | struct CadetChannelClient | ||
170 | { | ||
171 | /** | ||
172 | * Client handle. Not by itself sufficient to designate | ||
173 | * the client endpoint, as the same client handle may | ||
174 | * be used for both the owner and the destination, and | ||
175 | * we thus also need the channel ID to identify the client. | ||
176 | */ | ||
177 | struct CadetClient *c; | ||
178 | |||
179 | /** | ||
180 | * Head of DLL of messages received out of order or while client was unready. | ||
181 | */ | ||
182 | struct CadetOutOfOrderMessage *head_recv; | ||
183 | |||
184 | /** | ||
185 | * Tail DLL of messages received out of order or while client was unready. | ||
186 | */ | ||
187 | struct CadetOutOfOrderMessage *tail_recv; | ||
188 | |||
189 | /** | ||
190 | * Local tunnel number for this client. | ||
191 | * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI, | ||
192 | * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) | ||
193 | */ | ||
194 | struct GNUNET_CADET_ClientChannelNumber ccn; | ||
195 | |||
196 | /** | ||
197 | * Can we send data to the client? | ||
198 | */ | ||
199 | int client_ready; | ||
200 | |||
201 | }; | ||
202 | |||
203 | |||
204 | /** | ||
163 | * Struct containing all information regarding a channel to a remote client. | 205 | * Struct containing all information regarding a channel to a remote client. |
164 | */ | 206 | */ |
165 | struct CadetChannel | 207 | struct CadetChannel |
@@ -173,13 +215,13 @@ struct CadetChannel | |||
173 | * Client owner of the tunnel, if any. | 215 | * Client owner of the tunnel, if any. |
174 | * (Used if this channel represends the initiating end of the tunnel.) | 216 | * (Used if this channel represends the initiating end of the tunnel.) |
175 | */ | 217 | */ |
176 | struct CadetClient *owner; | 218 | struct CadetChannelClient *owner; |
177 | 219 | ||
178 | /** | 220 | /** |
179 | * Client destination of the tunnel, if any. | 221 | * Client destination of the tunnel, if any. |
180 | * (Used if this channel represents the listening end of the tunnel.) | 222 | * (Used if this channel represents the listening end of the tunnel.) |
181 | */ | 223 | */ |
182 | struct CadetClient *dest; | 224 | struct CadetChannelClient *dest; |
183 | 225 | ||
184 | /** | 226 | /** |
185 | * Last entry in the tunnel's queue relating to control messages | 227 | * Last entry in the tunnel's queue relating to control messages |
@@ -200,16 +242,6 @@ struct CadetChannel | |||
200 | struct CadetReliableMessage *tail_sent; | 242 | struct CadetReliableMessage *tail_sent; |
201 | 243 | ||
202 | /** | 244 | /** |
203 | * Head of DLL of messages received out of order or while client was unready. | ||
204 | */ | ||
205 | struct CadetOutOfOrderMessage *head_recv; | ||
206 | |||
207 | /** | ||
208 | * Tail DLL of messages received out of order or while client was unready. | ||
209 | */ | ||
210 | struct CadetOutOfOrderMessage *tail_recv; | ||
211 | |||
212 | /** | ||
213 | * Task to resend/poll in case no ACK is received. | 245 | * Task to resend/poll in case no ACK is received. |
214 | */ | 246 | */ |
215 | struct GNUNET_SCHEDULER_Task *retry_control_task; | 247 | struct GNUNET_SCHEDULER_Task *retry_control_task; |
@@ -271,28 +303,11 @@ struct CadetChannel | |||
271 | struct GNUNET_CADET_ChannelTunnelNumber ctn; | 303 | struct GNUNET_CADET_ChannelTunnelNumber ctn; |
272 | 304 | ||
273 | /** | 305 | /** |
274 | * Local tunnel number for local client @e owner owning the channel. | ||
275 | * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) | ||
276 | */ | ||
277 | struct GNUNET_CADET_ClientChannelNumber ccn_owner; | ||
278 | |||
279 | /** | ||
280 | * Local tunnel number for local client @e dest owning the channel. | ||
281 | * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) | ||
282 | */ | ||
283 | struct GNUNET_CADET_ClientChannelNumber ccn_dest; | ||
284 | |||
285 | /** | ||
286 | * Channel state. | 306 | * Channel state. |
287 | */ | 307 | */ |
288 | enum CadetChannelState state; | 308 | enum CadetChannelState state; |
289 | 309 | ||
290 | /** | 310 | /** |
291 | * Can we send data to the client? | ||
292 | */ | ||
293 | int client_ready; | ||
294 | |||
295 | /** | ||
296 | * Is the tunnel bufferless (minimum latency)? | 311 | * Is the tunnel bufferless (minimum latency)? |
297 | */ | 312 | */ |
298 | int nobuffer; | 313 | int nobuffer; |
@@ -342,8 +357,8 @@ GCCH_2s (const struct CadetChannel *ch) | |||
342 | : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), | 357 | : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), |
343 | GNUNET_h2s (&ch->port), | 358 | GNUNET_h2s (&ch->port), |
344 | ch->ctn, | 359 | ch->ctn, |
345 | ntohl (ch->ccn_owner.channel_of_client), | 360 | (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client), |
346 | ntohl (ch->ccn_dest.channel_of_client)); | 361 | (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client)); |
347 | return buf; | 362 | return buf; |
348 | } | 363 | } |
349 | 364 | ||
@@ -363,6 +378,28 @@ GCCH_get_id (const struct CadetChannel *ch) | |||
363 | 378 | ||
364 | 379 | ||
365 | /** | 380 | /** |
381 | * Release memory associated with @a ccc | ||
382 | * | ||
383 | * @param ccc data structure to clean up | ||
384 | */ | ||
385 | static void | ||
386 | free_channel_client (struct CadetChannelClient *ccc) | ||
387 | { | ||
388 | struct CadetOutOfOrderMessage *com; | ||
389 | |||
390 | while (NULL != (com = ccc->head_recv)) | ||
391 | { | ||
392 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, | ||
393 | ccc->tail_recv, | ||
394 | com); | ||
395 | GNUNET_MQ_discard (com->env); | ||
396 | GNUNET_free (com); | ||
397 | } | ||
398 | GNUNET_free (ccc); | ||
399 | } | ||
400 | |||
401 | |||
402 | /** | ||
366 | * Destroy the given channel. | 403 | * Destroy the given channel. |
367 | * | 404 | * |
368 | * @param ch channel to destroy | 405 | * @param ch channel to destroy |
@@ -371,7 +408,6 @@ static void | |||
371 | channel_destroy (struct CadetChannel *ch) | 408 | channel_destroy (struct CadetChannel *ch) |
372 | { | 409 | { |
373 | struct CadetReliableMessage *crm; | 410 | struct CadetReliableMessage *crm; |
374 | struct CadetOutOfOrderMessage *com; | ||
375 | 411 | ||
376 | while (NULL != (crm = ch->head_sent)) | 412 | while (NULL != (crm = ch->head_sent)) |
377 | { | 413 | { |
@@ -386,13 +422,15 @@ channel_destroy (struct CadetChannel *ch) | |||
386 | crm); | 422 | crm); |
387 | GNUNET_free (crm); | 423 | GNUNET_free (crm); |
388 | } | 424 | } |
389 | while (NULL != (com = ch->head_recv)) | 425 | if (NULL != ch->owner) |
390 | { | 426 | { |
391 | GNUNET_CONTAINER_DLL_remove (ch->head_recv, | 427 | free_channel_client (ch->owner); |
392 | ch->tail_recv, | 428 | ch->owner = NULL; |
393 | com); | 429 | } |
394 | GNUNET_MQ_discard (com->env); | 430 | if (NULL != ch->dest) |
395 | GNUNET_free (com); | 431 | { |
432 | free_channel_client (ch->dest); | ||
433 | ch->dest = NULL; | ||
396 | } | 434 | } |
397 | if (NULL != ch->last_control_qe) | 435 | if (NULL != ch->last_control_qe) |
398 | { | 436 | { |
@@ -444,7 +482,7 @@ channel_open_sent_cb (void *cls) | |||
444 | ch->last_control_qe = NULL; | 482 | ch->last_control_qe = NULL; |
445 | ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); | 483 | ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); |
446 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 484 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
447 | "Sent CHANNEL_OPEN on %s, retrying in %s\n", | 485 | "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n", |
448 | GCCH_2s (ch), | 486 | GCCH_2s (ch), |
449 | GNUNET_STRINGS_relative_time_to_string (ch->retry_time, | 487 | GNUNET_STRINGS_relative_time_to_string (ch->retry_time, |
450 | GNUNET_YES)); | 488 | GNUNET_YES)); |
@@ -532,23 +570,30 @@ GCCH_channel_local_new (struct CadetClient *owner, | |||
532 | uint32_t options) | 570 | uint32_t options) |
533 | { | 571 | { |
534 | struct CadetChannel *ch; | 572 | struct CadetChannel *ch; |
573 | struct CadetChannelClient *ccco; | ||
574 | |||
575 | ccco = GNUNET_new (struct CadetChannelClient); | ||
576 | ccco->c = owner; | ||
577 | ccco->ccn = ccn; | ||
578 | ccco->client_ready = GNUNET_YES; | ||
535 | 579 | ||
536 | ch = GNUNET_new (struct CadetChannel); | 580 | ch = GNUNET_new (struct CadetChannel); |
537 | ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); | 581 | ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); |
538 | ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); | 582 | ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); |
539 | ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); | 583 | ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); |
540 | ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ | 584 | ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ |
541 | ch->owner = owner; | 585 | ch->owner = ccco; |
542 | ch->ccn_owner = ccn; | ||
543 | ch->port = *port; | 586 | ch->port = *port; |
544 | if (0 == memcmp (&my_full_id, | 587 | if (0 == memcmp (&my_full_id, |
545 | GCP_get_id (destination), | 588 | GCP_get_id (destination), |
546 | sizeof (struct GNUNET_PeerIdentity))) | 589 | sizeof (struct GNUNET_PeerIdentity))) |
547 | { | 590 | { |
591 | struct CadetClient *c; | ||
592 | |||
548 | ch->is_loopback = GNUNET_YES; | 593 | ch->is_loopback = GNUNET_YES; |
549 | ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports, | 594 | c = GNUNET_CONTAINER_multihashmap_get (open_ports, |
550 | port); | 595 | port); |
551 | if (NULL == ch->dest) | 596 | if (NULL == c) |
552 | { | 597 | { |
553 | /* port closed, wait for it to possibly open */ | 598 | /* port closed, wait for it to possibly open */ |
554 | (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, | 599 | (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, |
@@ -561,8 +606,11 @@ GCCH_channel_local_new (struct CadetClient *owner, | |||
561 | } | 606 | } |
562 | else | 607 | else |
563 | { | 608 | { |
609 | ch->dest = GNUNET_new (struct CadetChannelClient); | ||
610 | ch->dest->c = c; | ||
611 | ch->dest->client_ready = GNUNET_YES; | ||
564 | GCCH_bind (ch, | 612 | GCCH_bind (ch, |
565 | ch->dest); | 613 | ch->dest->c); |
566 | } | 614 | } |
567 | } | 615 | } |
568 | else | 616 | else |
@@ -786,20 +834,18 @@ send_ack_to_client (struct CadetChannel *ch, | |||
786 | { | 834 | { |
787 | struct GNUNET_MQ_Envelope *env; | 835 | struct GNUNET_MQ_Envelope *env; |
788 | struct GNUNET_CADET_LocalAck *ack; | 836 | struct GNUNET_CADET_LocalAck *ack; |
789 | struct CadetClient *c; | 837 | struct CadetChannelClient *ccc; |
790 | 838 | ||
791 | env = GNUNET_MQ_msg (ack, | 839 | env = GNUNET_MQ_msg (ack, |
792 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); | 840 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); |
793 | ack->ccn = (GNUNET_YES == to_owner) ? ch->ccn_owner : ch->ccn_dest; | 841 | ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest; |
794 | c = (GNUNET_YES == to_owner) | 842 | ack->ccn = ccc->ccn; |
795 | ? ch->owner | ||
796 | : ch->dest; | ||
797 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 843 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
798 | "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n", | 844 | "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n", |
799 | GSC_2s (c), | 845 | GSC_2s (ccc->c), |
800 | (GNUNET_YES == to_owner) ? "owner" : "dest", | 846 | (GNUNET_YES == to_owner) ? "owner" : "dest", |
801 | ntohl (ack->ccn.channel_of_client)); | 847 | ntohl (ack->ccn.channel_of_client)); |
802 | GSC_send_to_client (c, | 848 | GSC_send_to_client (ccc->c, |
803 | env); | 849 | env); |
804 | } | 850 | } |
805 | 851 | ||
@@ -817,6 +863,7 @@ GCCH_bind (struct CadetChannel *ch, | |||
817 | struct CadetClient *c) | 863 | struct CadetClient *c) |
818 | { | 864 | { |
819 | uint32_t options; | 865 | uint32_t options; |
866 | struct CadetChannelClient *cccd; | ||
820 | 867 | ||
821 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 868 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
822 | "Binding %s from %s to port %s of %s\n", | 869 | "Binding %s from %s to port %s of %s\n", |
@@ -837,16 +884,19 @@ GCCH_bind (struct CadetChannel *ch, | |||
837 | options |= GNUNET_CADET_OPTION_RELIABLE; | 884 | options |= GNUNET_CADET_OPTION_RELIABLE; |
838 | if (ch->out_of_order) | 885 | if (ch->out_of_order) |
839 | options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; | 886 | options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; |
840 | ch->dest = c; | 887 | cccd = GNUNET_new (struct CadetChannelClient); |
841 | ch->ccn_dest = GSC_bind (c, | 888 | ch->dest = cccd; |
842 | ch, | 889 | cccd->c = c; |
843 | (GNUNET_YES == ch->is_loopback) | 890 | cccd->client_ready = GNUNET_YES; |
844 | ? GCP_get (&my_full_id, | 891 | cccd->ccn = GSC_bind (c, |
845 | GNUNET_YES) | 892 | ch, |
846 | : GCT_get_destination (ch->t), | 893 | (GNUNET_YES == ch->is_loopback) |
847 | &ch->port, | 894 | ? GCP_get (&my_full_id, |
848 | options); | 895 | GNUNET_YES) |
849 | GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < | 896 | : GCT_get_destination (ch->t), |
897 | &ch->port, | ||
898 | options); | ||
899 | GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < | ||
850 | GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); | 900 | GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); |
851 | ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ | 901 | ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ |
852 | if (GNUNET_YES == ch->is_loopback) | 902 | if (GNUNET_YES == ch->is_loopback) |
@@ -862,7 +912,7 @@ GCCH_bind (struct CadetChannel *ch, | |||
862 | ch); | 912 | ch); |
863 | } | 913 | } |
864 | /* give client it's initial supply of ACKs */ | 914 | /* give client it's initial supply of ACKs */ |
865 | GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < | 915 | GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < |
866 | GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); | 916 | GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); |
867 | for (unsigned int i=0;i<ch->max_pending_messages;i++) | 917 | for (unsigned int i=0;i<ch->max_pending_messages;i++) |
868 | send_ack_to_client (ch, | 918 | send_ack_to_client (ch, |
@@ -876,22 +926,37 @@ GCCH_bind (struct CadetChannel *ch, | |||
876 | * | 926 | * |
877 | * @param ch channel to destroy | 927 | * @param ch channel to destroy |
878 | * @param c client that caused the destruction | 928 | * @param c client that caused the destruction |
929 | * @param ccn client number of the client @a c | ||
879 | */ | 930 | */ |
880 | void | 931 | void |
881 | GCCH_channel_local_destroy (struct CadetChannel *ch, | 932 | GCCH_channel_local_destroy (struct CadetChannel *ch, |
882 | struct CadetClient *c) | 933 | struct CadetClient *c, |
934 | struct GNUNET_CADET_ClientChannelNumber ccn) | ||
883 | { | 935 | { |
884 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 936 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
885 | "%s asks for destruction of %s\n", | 937 | "%s asks for destruction of %s\n", |
886 | GSC_2s (c), | 938 | GSC_2s (c), |
887 | GCCH_2s (ch)); | 939 | GCCH_2s (ch)); |
888 | GNUNET_assert (NULL != c); | 940 | GNUNET_assert (NULL != c); |
889 | if (c == ch->owner) | 941 | if ( (NULL != ch->owner) && |
942 | (c == ch->owner->c) && | ||
943 | (ccn.channel_of_client == ch->owner->ccn.channel_of_client) ) | ||
944 | { | ||
945 | free_channel_client (ch->owner); | ||
890 | ch->owner = NULL; | 946 | ch->owner = NULL; |
891 | else if (c == ch->dest) | 947 | } |
948 | else if ( (NULL != ch->dest) && | ||
949 | (c == ch->dest->c) && | ||
950 | (ccn.channel_of_client == ch->dest->ccn.channel_of_client) ) | ||
951 | { | ||
952 | free_channel_client (ch->dest); | ||
892 | ch->dest = NULL; | 953 | ch->dest = NULL; |
954 | } | ||
893 | else | 955 | else |
956 | { | ||
894 | GNUNET_assert (0); | 957 | GNUNET_assert (0); |
958 | } | ||
959 | |||
895 | if (GNUNET_YES == ch->destroy) | 960 | if (GNUNET_YES == ch->destroy) |
896 | { | 961 | { |
897 | /* other end already destroyed, with the local client gone, no need | 962 | /* other end already destroyed, with the local client gone, no need |
@@ -1008,6 +1073,7 @@ is_before (void *cls, | |||
1008 | * and send an ACK to the other end (once flow control allows it!) | 1073 | * and send an ACK to the other end (once flow control allows it!) |
1009 | * | 1074 | * |
1010 | * @param ch channel that got data | 1075 | * @param ch channel that got data |
1076 | * @param msg message that was received | ||
1011 | */ | 1077 | */ |
1012 | void | 1078 | void |
1013 | GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | 1079 | GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, |
@@ -1015,6 +1081,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | |||
1015 | { | 1081 | { |
1016 | struct GNUNET_MQ_Envelope *env; | 1082 | struct GNUNET_MQ_Envelope *env; |
1017 | struct GNUNET_CADET_LocalData *ld; | 1083 | struct GNUNET_CADET_LocalData *ld; |
1084 | struct CadetChannelClient *ccc; | ||
1018 | struct CadetOutOfOrderMessage *com; | 1085 | struct CadetOutOfOrderMessage *com; |
1019 | size_t payload_size; | 1086 | size_t payload_size; |
1020 | 1087 | ||
@@ -1023,11 +1090,12 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | |||
1023 | env = GNUNET_MQ_msg_extra (ld, | 1090 | env = GNUNET_MQ_msg_extra (ld, |
1024 | payload_size, | 1091 | payload_size, |
1025 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); | 1092 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); |
1026 | ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest; | 1093 | ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn; |
1027 | GNUNET_memcpy (&ld[1], | 1094 | GNUNET_memcpy (&ld[1], |
1028 | &msg[1], | 1095 | &msg[1], |
1029 | payload_size); | 1096 | payload_size); |
1030 | if ( (GNUNET_YES == ch->client_ready) && | 1097 | ccc = (NULL != ch->owner) ? ch->owner : ch->dest; |
1098 | if ( (GNUNET_YES == ccc->client_ready) && | ||
1031 | ( (GNUNET_YES == ch->out_of_order) || | 1099 | ( (GNUNET_YES == ch->out_of_order) || |
1032 | (msg->mid.mid == ch->mid_recv.mid) ) ) | 1100 | (msg->mid.mid == ch->mid_recv.mid) ) ) |
1033 | { | 1101 | { |
@@ -1035,8 +1103,9 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | |||
1035 | "Giving %u bytes of payload from %s to client %s\n", | 1103 | "Giving %u bytes of payload from %s to client %s\n", |
1036 | (unsigned int) payload_size, | 1104 | (unsigned int) payload_size, |
1037 | GCCH_2s (ch), | 1105 | GCCH_2s (ch), |
1038 | GSC_2s (ch->owner ? ch->owner : ch->dest)); | 1106 | GSC_2s (ccc->c)); |
1039 | GSC_send_to_client (ch->owner ? ch->owner : ch->dest, | 1107 | ccc->client_ready = GNUNET_NO; |
1108 | GSC_send_to_client (ccc->c, | ||
1040 | env); | 1109 | env); |
1041 | ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); | 1110 | ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); |
1042 | ch->mid_futures >>= 1; | 1111 | ch->mid_futures >>= 1; |
@@ -1047,7 +1116,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | |||
1047 | drop it (can't buffer too much!) */ | 1116 | drop it (can't buffer too much!) */ |
1048 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1117 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1049 | "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n", | 1118 | "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n", |
1050 | (GNUNET_YES == ch->client_ready) | 1119 | (GNUNET_YES == ccc->client_ready) |
1051 | ? "out-of-order" | 1120 | ? "out-of-order" |
1052 | : "client-not-ready", | 1121 | : "client-not-ready", |
1053 | (unsigned int) payload_size, | 1122 | (unsigned int) payload_size, |
@@ -1059,36 +1128,36 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | |||
1059 | com->mid = msg->mid; | 1128 | com->mid = msg->mid; |
1060 | com->env = env; | 1129 | com->env = env; |
1061 | /* sort into list ordered by "is_before" */ | 1130 | /* sort into list ordered by "is_before" */ |
1062 | if ( (NULL == ch->head_recv) || | 1131 | if ( (NULL == ccc->head_recv) || |
1063 | (GNUNET_YES == is_before (ch, | 1132 | (GNUNET_YES == is_before (ch, |
1064 | com, | 1133 | com, |
1065 | ch->head_recv)) ) | 1134 | ccc->head_recv)) ) |
1066 | { | 1135 | { |
1067 | GNUNET_CONTAINER_DLL_insert (ch->head_recv, | 1136 | GNUNET_CONTAINER_DLL_insert (ccc->head_recv, |
1068 | ch->tail_recv, | 1137 | ccc->tail_recv, |
1069 | com); | 1138 | com); |
1070 | } | 1139 | } |
1071 | else | 1140 | else |
1072 | { | 1141 | { |
1073 | struct CadetOutOfOrderMessage *pos; | 1142 | struct CadetOutOfOrderMessage *pos; |
1074 | 1143 | ||
1075 | for (pos = ch->head_recv; | 1144 | for (pos = ccc->head_recv; |
1076 | NULL != pos; | 1145 | NULL != pos; |
1077 | pos = pos->next) | 1146 | pos = pos->next) |
1078 | { | 1147 | { |
1079 | if (GNUNET_YES != | 1148 | if (GNUNET_YES != |
1080 | is_before (ch, | 1149 | is_before (NULL, |
1081 | pos, | 1150 | pos, |
1082 | com)) | 1151 | com)) |
1083 | break; | 1152 | break; |
1084 | } | 1153 | } |
1085 | if (NULL == pos) | 1154 | if (NULL == pos) |
1086 | GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv, | 1155 | GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv, |
1087 | ch->tail_recv, | 1156 | ccc->tail_recv, |
1088 | com); | 1157 | com); |
1089 | else | 1158 | else |
1090 | GNUNET_CONTAINER_DLL_insert_after (ch->head_recv, | 1159 | GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv, |
1091 | ch->tail_recv, | 1160 | ccc->tail_recv, |
1092 | com, | 1161 | com, |
1093 | pos->prev); | 1162 | pos->prev); |
1094 | } | 1163 | } |
@@ -1166,6 +1235,8 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, | |||
1166 | void | 1235 | void |
1167 | GCCH_handle_remote_destroy (struct CadetChannel *ch) | 1236 | GCCH_handle_remote_destroy (struct CadetChannel *ch) |
1168 | { | 1237 | { |
1238 | struct CadetChannelClient *ccc; | ||
1239 | |||
1169 | GNUNET_assert (GNUNET_NO == ch->is_loopback); | 1240 | GNUNET_assert (GNUNET_NO == ch->is_loopback); |
1170 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1241 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1171 | "Received remote channel DESTROY for %s\n", | 1242 | "Received remote channel DESTROY for %s\n", |
@@ -1176,7 +1247,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) | |||
1176 | channel_destroy (ch); | 1247 | channel_destroy (ch); |
1177 | return; | 1248 | return; |
1178 | } | 1249 | } |
1179 | if (NULL != ch->head_recv) | 1250 | ccc = (NULL != ch->owner) ? ch->owner : ch->dest; |
1251 | if (NULL != ccc->head_recv) | ||
1180 | { | 1252 | { |
1181 | LOG (GNUNET_ERROR_TYPE_WARNING, | 1253 | LOG (GNUNET_ERROR_TYPE_WARNING, |
1182 | "Lost end of transmission due to remote shutdown on %s\n", | 1254 | "Lost end of transmission due to remote shutdown on %s\n", |
@@ -1184,8 +1256,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) | |||
1184 | /* FIXME: change API to notify client about truncated transmission! */ | 1256 | /* FIXME: change API to notify client about truncated transmission! */ |
1185 | } | 1257 | } |
1186 | ch->destroy = GNUNET_YES; | 1258 | ch->destroy = GNUNET_YES; |
1187 | GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest, | 1259 | GSC_handle_remote_channel_destroy (ccc->c, |
1188 | (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest, | 1260 | ccc->ccn, |
1189 | ch); | 1261 | ch); |
1190 | channel_destroy (ch); | 1262 | channel_destroy (ch); |
1191 | } | 1263 | } |
@@ -1326,7 +1398,7 @@ GCCH_handle_local_data (struct CadetChannel *ch, | |||
1326 | 1398 | ||
1327 | if (GNUNET_YES == ch->is_loopback) | 1399 | if (GNUNET_YES == ch->is_loopback) |
1328 | { | 1400 | { |
1329 | struct CadetClient *receiver; | 1401 | struct CadetChannelClient *receiver; |
1330 | struct GNUNET_MQ_Envelope *env; | 1402 | struct GNUNET_MQ_Envelope *env; |
1331 | struct GNUNET_CADET_LocalData *ld; | 1403 | struct GNUNET_CADET_LocalData *ld; |
1332 | int to_owner; | 1404 | int to_owner; |
@@ -1335,25 +1407,24 @@ GCCH_handle_local_data (struct CadetChannel *ch, | |||
1335 | buf_len, | 1407 | buf_len, |
1336 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); | 1408 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); |
1337 | if (sender_ccn.channel_of_client == | 1409 | if (sender_ccn.channel_of_client == |
1338 | ch->ccn_owner.channel_of_client) | 1410 | ch->owner->ccn.channel_of_client) |
1339 | { | 1411 | { |
1340 | receiver = ch->dest; | 1412 | receiver = ch->dest; |
1341 | ld->ccn = ch->ccn_dest; | ||
1342 | to_owner = GNUNET_NO; | 1413 | to_owner = GNUNET_NO; |
1343 | } | 1414 | } |
1344 | else | 1415 | else |
1345 | { | 1416 | { |
1346 | GNUNET_assert (sender_ccn.channel_of_client == | 1417 | GNUNET_assert (sender_ccn.channel_of_client == |
1347 | ch->ccn_dest.channel_of_client); | 1418 | ch->dest->ccn.channel_of_client); |
1348 | receiver = ch->owner; | 1419 | receiver = ch->owner; |
1349 | ld->ccn = ch->ccn_owner; | ||
1350 | to_owner = GNUNET_YES; | 1420 | to_owner = GNUNET_YES; |
1351 | } | 1421 | } |
1422 | ld->ccn = receiver->ccn; | ||
1352 | GNUNET_memcpy (&ld[1], | 1423 | GNUNET_memcpy (&ld[1], |
1353 | buf, | 1424 | buf, |
1354 | buf_len); | 1425 | buf_len); |
1355 | /* FIXME: this does not provide for flow control! */ | 1426 | /* FIXME: this does not provide for flow control! */ |
1356 | GSC_send_to_client (receiver, | 1427 | GSC_send_to_client (receiver->c, |
1357 | env); | 1428 | env); |
1358 | send_ack_to_client (ch, | 1429 | send_ack_to_client (ch, |
1359 | to_owner); | 1430 | to_owner); |
@@ -1387,18 +1458,31 @@ GCCH_handle_local_data (struct CadetChannel *ch, | |||
1387 | 1458 | ||
1388 | 1459 | ||
1389 | /** | 1460 | /** |
1390 | * Try to deliver messages to the local client, if it is ready for more. | 1461 | * Handle ACK from client on local channel. Means the client is ready |
1462 | * for more data, see if we have any for it. | ||
1391 | * | 1463 | * |
1392 | * @param ch channel to process | 1464 | * @param ch channel to destroy |
1465 | * @param client_ccn ccn of the client sending the ack | ||
1393 | */ | 1466 | */ |
1394 | static void | 1467 | void |
1395 | send_client_buffered_data (struct CadetChannel *ch) | 1468 | GCCH_handle_local_ack (struct CadetChannel *ch, |
1469 | struct GNUNET_CADET_ClientChannelNumber client_ccn) | ||
1396 | { | 1470 | { |
1471 | struct CadetChannelClient *ccc; | ||
1397 | struct CadetOutOfOrderMessage *com; | 1472 | struct CadetOutOfOrderMessage *com; |
1398 | 1473 | ||
1399 | if (GNUNET_NO == ch->client_ready) | 1474 | if ( (NULL != ch->owner) && |
1400 | return; /* client not ready */ | 1475 | (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) ) |
1401 | com = ch->head_recv; | 1476 | ccc = ch->owner; |
1477 | else if ( (NULL != ch->dest) && | ||
1478 | (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) ) | ||
1479 | ccc = ch->dest; | ||
1480 | else | ||
1481 | GNUNET_assert (0); | ||
1482 | ccc->client_ready = GNUNET_YES; | ||
1483 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1484 | "Got LOCAL_ACK, client ready to receive more data!\n"); | ||
1485 | com = ccc->head_recv; | ||
1402 | if (NULL == com) | 1486 | if (NULL == com) |
1403 | return; /* none pending */ | 1487 | return; /* none pending */ |
1404 | if ( (com->mid.mid != ch->mid_recv.mid) && | 1488 | if ( (com->mid.mid != ch->mid_recv.mid) && |
@@ -1410,14 +1494,15 @@ send_client_buffered_data (struct CadetChannel *ch) | |||
1410 | GCCH_2s (ch)); | 1494 | GCCH_2s (ch)); |
1411 | 1495 | ||
1412 | /* all good, pass next message to client */ | 1496 | /* all good, pass next message to client */ |
1413 | GNUNET_CONTAINER_DLL_remove (ch->head_recv, | 1497 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, |
1414 | ch->tail_recv, | 1498 | ccc->tail_recv, |
1415 | com); | 1499 | com); |
1416 | /* FIXME: if unreliable, this is not aggressive | 1500 | /* FIXME: if unreliable, this is not aggressive |
1417 | enough, as it would be OK to have lost some! */ | 1501 | enough, as it would be OK to have lost some! */ |
1418 | ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); | 1502 | ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); |
1419 | ch->mid_futures >>= 1; /* equivalent to division by 2 */ | 1503 | ch->mid_futures >>= 1; /* equivalent to division by 2 */ |
1420 | GSC_send_to_client (ch->owner ? ch->owner : ch->dest, | 1504 | ccc->client_ready = GNUNET_NO; |
1505 | GSC_send_to_client (ccc->c, | ||
1421 | com->env); | 1506 | com->env); |
1422 | GNUNET_free (com); | 1507 | GNUNET_free (com); |
1423 | if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && | 1508 | if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && |
@@ -1435,7 +1520,7 @@ send_client_buffered_data (struct CadetChannel *ch) | |||
1435 | send_channel_data_ack (ch); | 1520 | send_channel_data_ack (ch); |
1436 | } | 1521 | } |
1437 | 1522 | ||
1438 | if (NULL != ch->head_recv) | 1523 | if (NULL != ccc->head_recv) |
1439 | return; | 1524 | return; |
1440 | if (GNUNET_NO == ch->destroy) | 1525 | if (GNUNET_NO == ch->destroy) |
1441 | return; | 1526 | return; |
@@ -1445,23 +1530,6 @@ send_client_buffered_data (struct CadetChannel *ch) | |||
1445 | } | 1530 | } |
1446 | 1531 | ||
1447 | 1532 | ||
1448 | /** | ||
1449 | * Handle ACK from client on local channel. | ||
1450 | * | ||
1451 | * @param ch channel to destroy | ||
1452 | * @param client_ccn ccn of the client sending the ack | ||
1453 | */ | ||
1454 | void | ||
1455 | GCCH_handle_local_ack (struct CadetChannel *ch, | ||
1456 | struct GNUNET_CADET_ClientChannelNumber client_ccn) | ||
1457 | { | ||
1458 | ch->client_ready = GNUNET_YES; | ||
1459 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1460 | "Got LOCAL_ACK, client ready to receive more data!\n"); | ||
1461 | send_client_buffered_data (ch); | ||
1462 | } | ||
1463 | |||
1464 | |||
1465 | #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) | 1533 | #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) |
1466 | 1534 | ||
1467 | 1535 | ||
@@ -1497,17 +1565,17 @@ GCCH_debug (struct CadetChannel *ch, | |||
1497 | { | 1565 | { |
1498 | LOG2 (level, | 1566 | LOG2 (level, |
1499 | "CHN origin %s ready %s local-id: %u\n", | 1567 | "CHN origin %s ready %s local-id: %u\n", |
1500 | GSC_2s (ch->owner), | 1568 | GSC_2s (ch->owner->c), |
1501 | ch->client_ready ? "YES" : "NO", | 1569 | ch->owner->client_ready ? "YES" : "NO", |
1502 | ntohl (ch->ccn_owner.channel_of_client)); | 1570 | ntohl (ch->owner->ccn.channel_of_client)); |
1503 | } | 1571 | } |
1504 | if (NULL != ch->dest) | 1572 | if (NULL != ch->dest) |
1505 | { | 1573 | { |
1506 | LOG2 (level, | 1574 | LOG2 (level, |
1507 | "CHN destination %s ready %s local-id: %u\n", | 1575 | "CHN destination %s ready %s local-id: %u\n", |
1508 | GSC_2s (ch->dest), | 1576 | GSC_2s (ch->dest->c), |
1509 | ch->client_ready ? "YES" : "NO", | 1577 | ch->dest->client_ready ? "YES" : "NO", |
1510 | ntohl (ch->ccn_dest.channel_of_client)); | 1578 | ntohl (ch->dest->ccn.channel_of_client)); |
1511 | } | 1579 | } |
1512 | LOG2 (level, | 1580 | LOG2 (level, |
1513 | "CHN Message IDs recv: %d (%LLX), send: %d\n", | 1581 | "CHN Message IDs recv: %d (%LLX), send: %d\n", |