aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet-new_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r--src/cadet/gnunet-service-cadet-new_channel.c1054
1 files changed, 795 insertions, 259 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c
index 5acd098b6..0c6241817 100644
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ b/src/cadet/gnunet-service-cadet-new_channel.c
@@ -18,7 +18,6 @@
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 Boston, MA 02110-1301, USA. 19 Boston, MA 02110-1301, USA.
20*/ 20*/
21
22/** 21/**
23 * @file cadet/gnunet-service-cadet-new_channel.c 22 * @file cadet/gnunet-service-cadet-new_channel.c
24 * @brief logical links between CADET clients 23 * @brief logical links between CADET clients
@@ -26,10 +25,16 @@
26 * @author Christian Grothoff 25 * @author Christian Grothoff
27 * 26 *
28 * TODO: 27 * TODO:
29 * - estimate max bandwidth using bursts and use to optimize 28 * - FIXME: send ACKs back to loopback clients!
30 * transmission rate(s) 29 *
30 * - introduce shutdown so we can have half-closed channels, modify
31 * destroy to include MID to have FIN-ACK equivalents, etc.
32 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33 * - check that '0xFFULL' really is sufficient for flow control!
34 * - revisit handling of 'unreliable' traffic!
35 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36 * - figure out flow control without ACKs (unreliable traffic!)
31 */ 37 */
32
33#include "platform.h" 38#include "platform.h"
34#include "gnunet_util_lib.h" 39#include "gnunet_util_lib.h"
35#include "cadet.h" 40#include "cadet.h"
@@ -41,7 +46,7 @@
41#include "gnunet-service-cadet-new_peer.h" 46#include "gnunet-service-cadet-new_peer.h"
42#include "gnunet-service-cadet-new_paths.h" 47#include "gnunet-service-cadet-new_paths.h"
43 48
44#define LOG(level, ...) GNUNET_log (level,__VA_ARGS__) 49#define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
45 50
46/** 51/**
47 * How long do we initially wait before retransmitting? 52 * How long do we initially wait before retransmitting?
@@ -68,7 +73,7 @@ enum CadetChannelState
68 /** 73 /**
69 * Connection create message sent, waiting for ACK. 74 * Connection create message sent, waiting for ACK.
70 */ 75 */
71 CADET_CHANNEL_CREATE_SENT, 76 CADET_CHANNEL_OPEN_SENT,
72 77
73 /** 78 /**
74 * Connection confirmed, ready to carry traffic. 79 * Connection confirmed, ready to carry traffic.
@@ -121,9 +126,8 @@ struct CadetReliableMessage
121 /** 126 /**
122 * Data message we are trying to send. 127 * Data message we are trying to send.
123 */ 128 */
124 struct GNUNET_CADET_ChannelAppDataMessage data_message; 129 struct GNUNET_CADET_ChannelAppDataMessage *data_message;
125 130
126 /* followed by variable-size payload */
127}; 131};
128 132
129 133
@@ -143,7 +147,8 @@ struct CadetOutOfOrderMessage
143 struct CadetOutOfOrderMessage *prev; 147 struct CadetOutOfOrderMessage *prev;
144 148
145 /** 149 /**
146 * ID of the message (ACK needed to free) 150 * ID of the message (messages up to this point needed
151 * before we give this one to the client).
147 */ 152 */
148 struct ChannelMessageIdentifier mid; 153 struct ChannelMessageIdentifier mid;
149 154
@@ -156,6 +161,46 @@ struct CadetOutOfOrderMessage
156 161
157 162
158/** 163/**
164 * Client endpoint of a `struct CadetChannel`. A channel may be a
165 * loopback channel, in which case it has two of these endpoints.
166 * Note that flow control also is required in both directions.
167 */
168struct CadetChannelClient
169{
170 /**
171 * Client handle. Not by itself sufficient to designate
172 * the client endpoint, as the same client handle may
173 * be used for both the owner and the destination, and
174 * we thus also need the channel ID to identify the client.
175 */
176 struct CadetClient *c;
177
178 /**
179 * Head of DLL of messages received out of order or while client was unready.
180 */
181 struct CadetOutOfOrderMessage *head_recv;
182
183 /**
184 * Tail DLL of messages received out of order or while client was unready.
185 */
186 struct CadetOutOfOrderMessage *tail_recv;
187
188 /**
189 * Local tunnel number for this client.
190 * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
191 * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
192 */
193 struct GNUNET_CADET_ClientChannelNumber ccn;
194
195 /**
196 * Can we send data to the client?
197 */
198 int client_ready;
199
200};
201
202
203/**
159 * Struct containing all information regarding a channel to a remote client. 204 * Struct containing all information regarding a channel to a remote client.
160 */ 205 */
161struct CadetChannel 206struct CadetChannel
@@ -166,24 +211,24 @@ struct CadetChannel
166 struct CadetTunnel *t; 211 struct CadetTunnel *t;
167 212
168 /** 213 /**
169 * Last entry in the tunnel's queue relating to control messages
170 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
171 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
172 * transmission in case we receive updated information.
173 */
174 struct CadetTunnelQueueEntry *last_control_qe;
175
176 /**
177 * Client owner of the tunnel, if any. 214 * Client owner of the tunnel, if any.
178 * (Used if this channel represends the initiating end of the tunnel.) 215 * (Used if this channel represends the initiating end of the tunnel.)
179 */ 216 */
180 struct CadetClient *owner; 217 struct CadetChannelClient *owner;
181 218
182 /** 219 /**
183 * Client destination of the tunnel, if any. 220 * Client destination of the tunnel, if any.
184 * (Used if this channel represents the listening end of the tunnel.) 221 * (Used if this channel represents the listening end of the tunnel.)
185 */ 222 */
186 struct CadetClient *dest; 223 struct CadetChannelClient *dest;
224
225 /**
226 * Last entry in the tunnel's queue relating to control messages
227 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
228 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
229 * transmission in case we receive updated information.
230 */
231 struct CadetTunnelQueueEntry *last_control_qe;
187 232
188 /** 233 /**
189 * Head of DLL of messages sent and not yet ACK'd. 234 * Head of DLL of messages sent and not yet ACK'd.
@@ -196,19 +241,14 @@ struct CadetChannel
196 struct CadetReliableMessage *tail_sent; 241 struct CadetReliableMessage *tail_sent;
197 242
198 /** 243 /**
199 * Head of DLL of messages received out of order or while client was unready. 244 * Task to resend/poll in case no ACK is received.
200 */ 245 */
201 struct CadetOutOfOrderMessage *head_recv; 246 struct GNUNET_SCHEDULER_Task *retry_control_task;
202
203 /**
204 * Tail DLL of messages received out of order or while client was unready.
205 */
206 struct CadetOutOfOrderMessage *tail_recv;
207 247
208 /** 248 /**
209 * Task to resend/poll in case no ACK is received. 249 * Task to resend/poll in case no ACK is received.
210 */ 250 */
211 struct GNUNET_SCHEDULER_Task *retry_task; 251 struct GNUNET_SCHEDULER_Task *retry_data_task;
212 252
213 /** 253 /**
214 * Last time the channel was used 254 * Last time the channel was used
@@ -259,13 +299,7 @@ struct CadetChannel
259 /** 299 /**
260 * Number identifying this channel in its tunnel. 300 * Number identifying this channel in its tunnel.
261 */ 301 */
262 struct GNUNET_CADET_ChannelTunnelNumber gid; 302 struct GNUNET_CADET_ChannelTunnelNumber ctn;
263
264 /**
265 * Local tunnel number for local client owning the channel.
266 * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
267 */
268 struct GNUNET_CADET_ClientChannelNumber lid;
269 303
270 /** 304 /**
271 * Channel state. 305 * Channel state.
@@ -273,16 +307,6 @@ struct CadetChannel
273 enum CadetChannelState state; 307 enum CadetChannelState state;
274 308
275 /** 309 /**
276 * Can we send data to the client?
277 */
278 int client_ready;
279
280 /**
281 * Can the client send data to us?
282 */
283 int client_allowed;
284
285 /**
286 * Is the tunnel bufferless (minimum latency)? 310 * Is the tunnel bufferless (minimum latency)?
287 */ 311 */
288 int nobuffer; 312 int nobuffer;
@@ -298,6 +322,11 @@ struct CadetChannel
298 int out_of_order; 322 int out_of_order;
299 323
300 /** 324 /**
325 * Is this channel a loopback channel, where the destination is us again?
326 */
327 int is_loopback;
328
329 /**
301 * Flag to signal the destruction of the channel. If this is set to 330 * Flag to signal the destruction of the channel. If this is set to
302 * #GNUNET_YES the channel will be destroyed once the queue is 331 * #GNUNET_YES the channel will be destroyed once the queue is
303 * empty. 332 * empty.
@@ -307,7 +336,6 @@ struct CadetChannel
307}; 336};
308 337
309 338
310
311/** 339/**
312 * Get the static string for identification of the channel. 340 * Get the static string for identification of the channel.
313 * 341 *
@@ -320,15 +348,16 @@ GCCH_2s (const struct CadetChannel *ch)
320{ 348{
321 static char buf[128]; 349 static char buf[128];
322 350
323 if (NULL == ch)
324 return "(NULL Channel)";
325 GNUNET_snprintf (buf, 351 GNUNET_snprintf (buf,
326 sizeof (buf), 352 sizeof (buf),
327 "%s:%s gid:%X (%X)", 353 "Channel %s:%s ctn:%X(%X/%X)",
328 GCT_2s (ch->t), 354 (GNUNET_YES == ch->is_loopback)
355 ? "loopback"
356 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
329 GNUNET_h2s (&ch->port), 357 GNUNET_h2s (&ch->port),
330 ch->gid, 358 ch->ctn,
331 ntohl (ch->lid.channel_of_client)); 359 (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
360 (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
332 return buf; 361 return buf;
333} 362}
334 363
@@ -343,7 +372,29 @@ GCCH_2s (const struct CadetChannel *ch)
343struct GNUNET_CADET_ChannelTunnelNumber 372struct GNUNET_CADET_ChannelTunnelNumber
344GCCH_get_id (const struct CadetChannel *ch) 373GCCH_get_id (const struct CadetChannel *ch)
345{ 374{
346 return ch->gid; 375 return ch->ctn;
376}
377
378
379/**
380 * Release memory associated with @a ccc
381 *
382 * @param ccc data structure to clean up
383 */
384static void
385free_channel_client (struct CadetChannelClient *ccc)
386{
387 struct CadetOutOfOrderMessage *com;
388
389 while (NULL != (com = ccc->head_recv))
390 {
391 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
392 ccc->tail_recv,
393 com);
394 GNUNET_MQ_discard (com->env);
395 GNUNET_free (com);
396 }
397 GNUNET_free (ccc);
347} 398}
348 399
349 400
@@ -356,7 +407,6 @@ static void
356channel_destroy (struct CadetChannel *ch) 407channel_destroy (struct CadetChannel *ch)
357{ 408{
358 struct CadetReliableMessage *crm; 409 struct CadetReliableMessage *crm;
359 struct CadetOutOfOrderMessage *com;
360 410
361 while (NULL != (crm = ch->head_sent)) 411 while (NULL != (crm = ch->head_sent))
362 { 412 {
@@ -369,29 +419,41 @@ channel_destroy (struct CadetChannel *ch)
369 GNUNET_CONTAINER_DLL_remove (ch->head_sent, 419 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
370 ch->tail_sent, 420 ch->tail_sent,
371 crm); 421 crm);
422 GNUNET_free (crm->data_message);
372 GNUNET_free (crm); 423 GNUNET_free (crm);
373 } 424 }
374 while (NULL != (com = ch->head_recv)) 425 if (NULL != ch->owner)
375 { 426 {
376 GNUNET_CONTAINER_DLL_remove (ch->head_recv, 427 free_channel_client (ch->owner);
377 ch->tail_recv, 428 ch->owner = NULL;
378 com); 429 }
379 GNUNET_MQ_discard (com->env); 430 if (NULL != ch->dest)
380 GNUNET_free (com); 431 {
432 free_channel_client (ch->dest);
433 ch->dest = NULL;
381 } 434 }
382 if (NULL != ch->last_control_qe) 435 if (NULL != ch->last_control_qe)
383 { 436 {
384 GCT_send_cancel (ch->last_control_qe); 437 GCT_send_cancel (ch->last_control_qe);
385 ch->last_control_qe = NULL; 438 ch->last_control_qe = NULL;
386 } 439 }
387 if (NULL != ch->retry_task) 440 if (NULL != ch->retry_data_task)
441 {
442 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443 ch->retry_data_task = NULL;
444 }
445 if (NULL != ch->retry_control_task)
388 { 446 {
389 GNUNET_SCHEDULER_cancel (ch->retry_task); 447 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
390 ch->retry_task = NULL; 448 ch->retry_control_task = NULL;
449 }
450 if (GNUNET_NO == ch->is_loopback)
451 {
452 GCT_remove_channel (ch->t,
453 ch,
454 ch->ctn);
455 ch->t = NULL;
391 } 456 }
392 GCT_remove_channel (ch->t,
393 ch,
394 ch->gid);
395 GNUNET_free (ch); 457 GNUNET_free (ch);
396} 458}
397 459
@@ -402,7 +464,7 @@ channel_destroy (struct CadetChannel *ch)
402 * @param cls Channel for which to send. 464 * @param cls Channel for which to send.
403 */ 465 */
404static void 466static void
405send_create (void *cls); 467send_channel_open (void *cls);
406 468
407 469
408/** 470/**
@@ -412,30 +474,41 @@ send_create (void *cls);
412 * @param cls our `struct CadetChannel`. 474 * @param cls our `struct CadetChannel`.
413 */ 475 */
414static void 476static void
415create_sent_cb (void *cls) 477channel_open_sent_cb (void *cls)
416{ 478{
417 struct CadetChannel *ch = cls; 479 struct CadetChannel *ch = cls;
418 480
481 GNUNET_assert (NULL != ch->last_control_qe);
419 ch->last_control_qe = NULL; 482 ch->last_control_qe = NULL;
420 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); 483 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
421 ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time, 484 LOG (GNUNET_ERROR_TYPE_DEBUG,
422 &send_create, 485 "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
423 ch); 486 GCCH_2s (ch),
487 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
488 GNUNET_YES));
489 ch->retry_control_task
490 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
491 &send_channel_open,
492 ch);
424} 493}
425 494
426 495
427/** 496/**
428 * Send a channel create message. 497 * Send a channel open message.
429 * 498 *
430 * @param cls Channel for which to send. 499 * @param cls Channel for which to send.
431 */ 500 */
432static void 501static void
433send_create (void *cls) 502send_channel_open (void *cls)
434{ 503{
435 struct CadetChannel *ch = cls; 504 struct CadetChannel *ch = cls;
436 struct GNUNET_CADET_ChannelOpenMessage msgcc; 505 struct GNUNET_CADET_ChannelOpenMessage msgcc;
437 uint32_t options; 506 uint32_t options;
438 507
508 ch->retry_control_task = NULL;
509 LOG (GNUNET_ERROR_TYPE_DEBUG,
510 "Sending CHANNEL_OPEN message for %s\n",
511 GCCH_2s (ch));
439 options = 0; 512 options = 0;
440 if (ch->nobuffer) 513 if (ch->nobuffer)
441 options |= GNUNET_CADET_OPTION_NOBUFFER; 514 options |= GNUNET_CADET_OPTION_NOBUFFER;
@@ -447,20 +520,43 @@ send_create (void *cls)
447 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN); 520 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
448 msgcc.opt = htonl (options); 521 msgcc.opt = htonl (options);
449 msgcc.port = ch->port; 522 msgcc.port = ch->port;
450 msgcc.chid = ch->gid; 523 msgcc.ctn = ch->ctn;
451 ch->state = CADET_CHANNEL_CREATE_SENT; 524 ch->state = CADET_CHANNEL_OPEN_SENT;
452 ch->last_control_qe = GCT_send (ch->t, 525 ch->last_control_qe = GCT_send (ch->t,
453 &msgcc.header, 526 &msgcc.header,
454 &create_sent_cb, 527 &channel_open_sent_cb,
455 ch); 528 ch);
456} 529}
457 530
458 531
459/** 532/**
533 * Function called once and only once after a channel was bound
534 * to its tunnel via #GCT_add_channel() is ready for transmission.
535 * Note that this is only the case for channels that this peer
536 * initiates, as for incoming channels we assume that they are
537 * ready for transmission immediately upon receiving the open
538 * message. Used to bootstrap the #GCT_send() process.
539 *
540 * @param ch the channel for which the tunnel is now ready
541 */
542void
543GCCH_tunnel_up (struct CadetChannel *ch)
544{
545 GNUNET_assert (NULL == ch->retry_control_task);
546 LOG (GNUNET_ERROR_TYPE_DEBUG,
547 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
548 GCCH_2s (ch));
549 ch->retry_control_task
550 = GNUNET_SCHEDULER_add_now (&send_channel_open,
551 ch);
552}
553
554
555/**
460 * Create a new channel. 556 * Create a new channel.
461 * 557 *
462 * @param owner local client owning the channel 558 * @param owner local client owning the channel
463 * @param owner_id local chid of this channel at the @a owner 559 * @param ccn local number of this channel at the @a owner
464 * @param destination peer to which we should build the channel 560 * @param destination peer to which we should build the channel
465 * @param port desired port at @a destination 561 * @param port desired port at @a destination
466 * @param options options for the channel 562 * @param options options for the channel
@@ -468,33 +564,74 @@ send_create (void *cls)
468 */ 564 */
469struct CadetChannel * 565struct CadetChannel *
470GCCH_channel_local_new (struct CadetClient *owner, 566GCCH_channel_local_new (struct CadetClient *owner,
471 struct GNUNET_CADET_ClientChannelNumber owner_id, 567 struct GNUNET_CADET_ClientChannelNumber ccn,
472 struct CadetPeer *destination, 568 struct CadetPeer *destination,
473 const struct GNUNET_HashCode *port, 569 const struct GNUNET_HashCode *port,
474 uint32_t options) 570 uint32_t options)
475{ 571{
476 struct CadetChannel *ch; 572 struct CadetChannel *ch;
573 struct CadetChannelClient *ccco;
574
575 ccco = GNUNET_new (struct CadetChannelClient);
576 ccco->c = owner;
577 ccco->ccn = ccn;
578 ccco->client_ready = GNUNET_YES;
477 579
478 ch = GNUNET_new (struct CadetChannel); 580 ch = GNUNET_new (struct CadetChannel);
479 ch->max_pending_messages = 32; /* FIXME: allow control via options 581 ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
480 or adjust dynamically... */
481 ch->owner = owner;
482 ch->lid = owner_id;
483 ch->port = *port;
484 ch->t = GCP_get_tunnel (destination,
485 GNUNET_YES);
486 ch->gid = GCT_add_channel (ch->t,
487 ch);
488 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
489 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); 582 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
490 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); 583 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
491 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); 584 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
492 ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create, 585 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
493 ch); 586 ch->owner = ccco;
587 ch->port = *port;
588 if (0 == memcmp (&my_full_id,
589 GCP_get_id (destination),
590 sizeof (struct GNUNET_PeerIdentity)))
591 {
592 struct CadetClient *c;
593
594 ch->is_loopback = GNUNET_YES;
595 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
596 port);
597 if (NULL == c)
598 {
599 /* port closed, wait for it to possibly open */
600 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
601 port,
602 ch,
603 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
604 LOG (GNUNET_ERROR_TYPE_DEBUG,
605 "Created loose incoming loopback channel to port %s\n",
606 GNUNET_h2s (&ch->port));
607 }
608 else
609 {
610 ch->dest = GNUNET_new (struct CadetChannelClient);
611 ch->dest->c = c;
612 ch->dest->client_ready = GNUNET_YES;
613 GCCH_bind (ch,
614 ch->dest->c);
615 }
616 }
617 else
618 {
619 ch->t = GCP_get_tunnel (destination,
620 GNUNET_YES);
621 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
622 ch->ctn = GCT_add_channel (ch->t,
623 ch);
624 }
494 GNUNET_STATISTICS_update (stats, 625 GNUNET_STATISTICS_update (stats,
495 "# channels", 626 "# channels",
496 1, 627 1,
497 GNUNET_NO); 628 GNUNET_NO);
629 LOG (GNUNET_ERROR_TYPE_DEBUG,
630 "Created channel to port %s at peer %s for %s using %s\n",
631 GNUNET_h2s (port),
632 GCP_2s (destination),
633 GSC_2s (owner),
634 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
498 return ch; 635 return ch;
499} 636}
500 637
@@ -510,23 +647,27 @@ timeout_closed_cb (void *cls)
510{ 647{
511 struct CadetChannel *ch = cls; 648 struct CadetChannel *ch = cls;
512 649
513 ch->retry_task = NULL; 650 ch->retry_control_task = NULL;
651 LOG (GNUNET_ERROR_TYPE_DEBUG,
652 "Closing incoming channel to port %s from peer %s due to timeout\n",
653 GNUNET_h2s (&ch->port),
654 GCP_2s (GCT_get_destination (ch->t)));
514 channel_destroy (ch); 655 channel_destroy (ch);
515} 656}
516 657
517 658
518/** 659/**
519 * Create a new channel. 660 * Create a new channel based on a request coming in over the network.
520 * 661 *
521 * @param t tunnel to the remote peer 662 * @param t tunnel to the remote peer
522 * @param gid identifier of this channel in the tunnel 663 * @param ctn identifier of this channel in the tunnel
523 * @param port desired local port 664 * @param port desired local port
524 * @param options options for the channel 665 * @param options options for the channel
525 * @return handle to the new channel 666 * @return handle to the new channel
526 */ 667 */
527struct CadetChannel * 668struct CadetChannel *
528GCCH_channel_incoming_new (struct CadetTunnel *t, 669GCCH_channel_incoming_new (struct CadetTunnel *t,
529 struct GNUNET_CADET_ChannelTunnelNumber gid, 670 struct GNUNET_CADET_ChannelTunnelNumber ctn,
530 const struct GNUNET_HashCode *port, 671 const struct GNUNET_HashCode *port,
531 uint32_t options) 672 uint32_t options)
532{ 673{
@@ -534,15 +675,14 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
534 struct CadetClient *c; 675 struct CadetClient *c;
535 676
536 ch = GNUNET_new (struct CadetChannel); 677 ch = GNUNET_new (struct CadetChannel);
537 ch->max_pending_messages = 32; /* FIXME: allow control via options
538 or adjust dynamically... */
539 ch->port = *port; 678 ch->port = *port;
540 ch->t = t; 679 ch->t = t;
541 ch->gid = gid; 680 ch->ctn = ctn;
542 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; 681 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
543 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); 682 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
544 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); 683 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
545 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); 684 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
685 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
546 GNUNET_STATISTICS_update (stats, 686 GNUNET_STATISTICS_update (stats,
547 "# channels", 687 "# channels",
548 1, 688 1,
@@ -557,9 +697,14 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
557 port, 697 port,
558 ch, 698 ch,
559 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 699 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
560 ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT, 700 ch->retry_control_task
561 &timeout_closed_cb, 701 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
562 ch); 702 &timeout_closed_cb,
703 ch);
704 LOG (GNUNET_ERROR_TYPE_DEBUG,
705 "Created loose incoming channel to port %s from peer %s\n",
706 GNUNET_h2s (&ch->port),
707 GCP_2s (GCT_get_destination (ch->t)));
563 } 708 }
564 else 709 else
565 { 710 {
@@ -586,23 +731,24 @@ send_ack_cb (void *cls)
586{ 731{
587 struct CadetChannel *ch = cls; 732 struct CadetChannel *ch = cls;
588 733
734 GNUNET_assert (NULL != ch->last_control_qe);
589 ch->last_control_qe = NULL; 735 ch->last_control_qe = NULL;
590} 736}
591 737
592 738
593/** 739/**
594 * Compute and send the current ACK to the other peer. 740 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
595 * 741 *
596 * @param ch channel to send the ACK for 742 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
597 */ 743 */
598static void 744static void
599send_channel_ack (struct CadetChannel *ch) 745send_channel_data_ack (struct CadetChannel *ch)
600{ 746{
601 struct GNUNET_CADET_ChannelDataAckMessage msg; 747 struct GNUNET_CADET_ChannelDataAckMessage msg;
602 748
603 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK); 749 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
604 msg.header.size = htons (sizeof (msg)); 750 msg.header.size = htons (sizeof (msg));
605 msg.gid = ch->gid; 751 msg.ctn = ch->ctn;
606 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1); 752 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
607 msg.futures = GNUNET_htonll (ch->mid_futures); 753 msg.futures = GNUNET_htonll (ch->mid_futures);
608 if (NULL != ch->last_control_qe) 754 if (NULL != ch->last_control_qe)
@@ -615,18 +761,93 @@ send_channel_ack (struct CadetChannel *ch)
615 761
616 762
617/** 763/**
618 * Send our initial ACK to the client confirming that the 764 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
619 * connection is up. 765 * connection is up.
620 * 766 *
621 * @param cls the `struct CadetChannel` 767 * @param cls the `struct CadetChannel`
622 */ 768 */
623static void 769static void
624send_connect_ack (void *cls) 770send_open_ack (void *cls)
625{ 771{
626 struct CadetChannel *ch = cls; 772 struct CadetChannel *ch = cls;
773 struct GNUNET_CADET_ChannelManageMessage msg;
627 774
628 ch->retry_task = NULL; 775 LOG (GNUNET_ERROR_TYPE_DEBUG,
629 send_channel_ack (ch); 776 "Sending CHANNEL_OPEN_ACK on %s\n",
777 GCCH_2s (ch));
778 ch->retry_control_task = NULL;
779 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
780 msg.header.size = htons (sizeof (msg));
781 msg.reserved = htonl (0);
782 msg.ctn = ch->ctn;
783 if (NULL != ch->last_control_qe)
784 GCT_send_cancel (ch->last_control_qe);
785 ch->last_control_qe = GCT_send (ch->t,
786 &msg.header,
787 &send_ack_cb,
788 ch);
789}
790
791
792/**
793 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
794 * this channel. If the binding was successful, (re)transmit the
795 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
796 *
797 * @param ch channel that got the duplicate open
798 */
799void
800GCCH_handle_duplicate_open (struct CadetChannel *ch)
801{
802 if (NULL == ch->dest)
803 {
804 LOG (GNUNET_ERROR_TYPE_DEBUG,
805 "Ignoring duplicate channel OPEN on %s: port is closed\n",
806 GCCH_2s (ch));
807 return;
808 }
809 if (NULL != ch->retry_control_task)
810 {
811 LOG (GNUNET_ERROR_TYPE_DEBUG,
812 "Ignoring duplicate channel OPEN on %s: control message is pending\n",
813 GCCH_2s (ch));
814 return;
815 }
816 LOG (GNUNET_ERROR_TYPE_DEBUG,
817 "Retransmitting OPEN_ACK on %s\n",
818 GCCH_2s (ch));
819 ch->retry_control_task
820 = GNUNET_SCHEDULER_add_now (&send_open_ack,
821 ch);
822}
823
824
825/**
826 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
827 *
828 * @param ch channel the ack is for
829 * @param to_owner #GNUNET_YES to send to owner,
830 * #GNUNET_NO to send to dest
831 */
832static void
833send_ack_to_client (struct CadetChannel *ch,
834 int to_owner)
835{
836 struct GNUNET_MQ_Envelope *env;
837 struct GNUNET_CADET_LocalAck *ack;
838 struct CadetChannelClient *ccc;
839
840 env = GNUNET_MQ_msg (ack,
841 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
842 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
843 ack->ccn = ccc->ccn;
844 LOG (GNUNET_ERROR_TYPE_DEBUG,
845 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n",
846 GSC_2s (ccc->c),
847 (GNUNET_YES == to_owner) ? "owner" : "dest",
848 ntohl (ack->ccn.channel_of_client));
849 GSC_send_to_client (ccc->c,
850 env);
630} 851}
631 852
632 853
@@ -643,12 +864,19 @@ GCCH_bind (struct CadetChannel *ch,
643 struct CadetClient *c) 864 struct CadetClient *c)
644{ 865{
645 uint32_t options; 866 uint32_t options;
867 struct CadetChannelClient *cccd;
646 868
647 if (NULL != ch->retry_task) 869 LOG (GNUNET_ERROR_TYPE_DEBUG,
870 "Binding %s from %s to port %s of %s\n",
871 GCCH_2s (ch),
872 GCT_2s (ch->t),
873 GNUNET_h2s (&ch->port),
874 GSC_2s (c));
875 if (NULL != ch->retry_control_task)
648 { 876 {
649 /* there might be a timeout task here */ 877 /* there might be a timeout task here */
650 GNUNET_SCHEDULER_cancel (ch->retry_task); 878 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
651 ch->retry_task = NULL; 879 ch->retry_control_task = NULL;
652 } 880 }
653 options = 0; 881 options = 0;
654 if (ch->nobuffer) 882 if (ch->nobuffer)
@@ -657,29 +885,79 @@ GCCH_bind (struct CadetChannel *ch,
657 options |= GNUNET_CADET_OPTION_RELIABLE; 885 options |= GNUNET_CADET_OPTION_RELIABLE;
658 if (ch->out_of_order) 886 if (ch->out_of_order)
659 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; 887 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
660 ch->dest = c; 888 cccd = GNUNET_new (struct CadetChannelClient);
661 ch->lid = GSC_bind (c, 889 ch->dest = cccd;
662 ch, 890 cccd->c = c;
663 GCT_get_destination (ch->t), 891 cccd->client_ready = GNUNET_YES;
664 &ch->port, 892 cccd->ccn = GSC_bind (c,
665 options); 893 ch,
666 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ 894 (GNUNET_YES == ch->is_loopback)
667 895 ? GCP_get (&my_full_id,
668 /* notify other peer that we accepted the connection */ 896 GNUNET_YES)
669 ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack, 897 : GCT_get_destination (ch->t),
670 ch); 898 &ch->port,
899 options);
900 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
901 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
902 ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
903 if (GNUNET_YES == ch->is_loopback)
904 {
905 ch->state = CADET_CHANNEL_OPEN_SENT;
906 GCCH_handle_channel_open_ack (ch);
907 }
908 else
909 {
910 /* notify other peer that we accepted the connection */
911 ch->retry_control_task
912 = GNUNET_SCHEDULER_add_now (&send_open_ack,
913 ch);
914 }
915 /* give client it's initial supply of ACKs */
916 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
917 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
918 for (unsigned int i=0;i<ch->max_pending_messages;i++)
919 send_ack_to_client (ch,
920 GNUNET_NO);
671} 921}
672 922
673 923
674/** 924/**
675 * Destroy locally created channel. Called by the 925 * Destroy locally created channel. Called by the local client, so no
676 * local client, so no need to tell the client. 926 * need to tell the client.
677 * 927 *
678 * @param ch channel to destroy 928 * @param ch channel to destroy
929 * @param c client that caused the destruction
930 * @param ccn client number of the client @a c
679 */ 931 */
680void 932void
681GCCH_channel_local_destroy (struct CadetChannel *ch) 933GCCH_channel_local_destroy (struct CadetChannel *ch,
934 struct CadetClient *c,
935 struct GNUNET_CADET_ClientChannelNumber ccn)
682{ 936{
937 LOG (GNUNET_ERROR_TYPE_DEBUG,
938 "%s asks for destruction of %s\n",
939 GSC_2s (c),
940 GCCH_2s (ch));
941 GNUNET_assert (NULL != c);
942 if ( (NULL != ch->owner) &&
943 (c == ch->owner->c) &&
944 (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
945 {
946 free_channel_client (ch->owner);
947 ch->owner = NULL;
948 }
949 else if ( (NULL != ch->dest) &&
950 (c == ch->dest->c) &&
951 (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
952 {
953 free_channel_client (ch->dest);
954 ch->dest = NULL;
955 }
956 else
957 {
958 GNUNET_assert (0);
959 }
960
683 if (GNUNET_YES == ch->destroy) 961 if (GNUNET_YES == ch->destroy)
684 { 962 {
685 /* other end already destroyed, with the local client gone, no need 963 /* other end already destroyed, with the local client gone, no need
@@ -687,41 +965,279 @@ GCCH_channel_local_destroy (struct CadetChannel *ch)
687 channel_destroy (ch); 965 channel_destroy (ch);
688 return; 966 return;
689 } 967 }
690 if (NULL != ch->head_sent) 968 if ( (NULL != ch->head_sent) ||
969 (NULL != ch->owner) ||
970 (NULL != ch->dest) )
691 { 971 {
692 /* allow send queue to train first */ 972 /* Wait for other end to destroy us as well,
973 and otherwise allow send queue to be transmitted first */
693 ch->destroy = GNUNET_YES; 974 ch->destroy = GNUNET_YES;
694 return; 975 return;
695 } 976 }
977 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
978 if (CADET_CHANNEL_NEW != ch->state)
979 GCT_send_channel_destroy (ch->t,
980 ch->ctn);
696 /* Nothing left to do, just finish destruction */ 981 /* Nothing left to do, just finish destruction */
697 channel_destroy (ch); 982 channel_destroy (ch);
698} 983}
699 984
700 985
701/** 986/**
702 * Destroy channel that was incoming. Called by the 987 * We got an acknowledgement for the creation of the channel
703 * local client, so no need to tell the client. 988 * (the port is open on the other side). Begin transmissions.
704 * 989 *
705 * @param ch channel to destroy 990 * @param ch channel to destroy
706 */ 991 */
707void 992void
708GCCH_channel_incoming_destroy (struct CadetChannel *ch) 993GCCH_handle_channel_open_ack (struct CadetChannel *ch)
709{ 994{
710 if (GNUNET_YES == ch->destroy) 995 switch (ch->state)
711 { 996 {
712 /* other end already destroyed, with the remote client gone, no need 997 case CADET_CHANNEL_NEW:
713 to finish transmissions, just destroy immediately. */ 998 /* this should be impossible */
714 channel_destroy (ch); 999 GNUNET_break (0);
1000 break;
1001 case CADET_CHANNEL_OPEN_SENT:
1002 if (NULL == ch->owner)
1003 {
1004 /* We're not the owner, wrong direction! */
1005 GNUNET_break_op (0);
1006 return;
1007 }
1008 LOG (GNUNET_ERROR_TYPE_DEBUG,
1009 "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1010 GCCH_2s (ch));
1011 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1012 {
1013 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1014 ch->retry_control_task = NULL;
1015 }
1016 ch->state = CADET_CHANNEL_READY;
1017 /* On first connect, send client as many ACKs as we allow messages
1018 to be buffered! */
1019 for (unsigned int i=0;i<ch->max_pending_messages;i++)
1020 send_ack_to_client (ch,
1021 GNUNET_YES);
1022 break;
1023 case CADET_CHANNEL_READY:
1024 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1025 LOG (GNUNET_ERROR_TYPE_DEBUG,
1026 "Received duplicate channel OPEN_ACK for %s\n",
1027 GCCH_2s (ch));
1028 GNUNET_STATISTICS_update (stats,
1029 "# duplicate CREATE_ACKs",
1030 1,
1031 GNUNET_NO);
1032 break;
1033 }
1034}
1035
1036
1037/**
1038 * Test if element @a e1 comes before element @a e2.
1039 *
1040 * TODO: use opportunity to create generic list insertion sort
1041 * logic in container!
1042 *
1043 * @param cls closure, our `struct CadetChannel`
1044 * @param e1 an element of to sort
1045 * @param e2 another element to sort
1046 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1047 */
1048static int
1049is_before (void *cls,
1050 void *e1,
1051 void *e2)
1052{
1053 struct CadetOutOfOrderMessage *m1 = e1;
1054 struct CadetOutOfOrderMessage *m2 = e2;
1055 uint32_t v1 = ntohl (m1->mid.mid);
1056 uint32_t v2 = ntohl (m2->mid.mid);
1057 uint32_t delta;
1058
1059 delta = v1 - v2;
1060 if (delta > (uint32_t) INT_MAX)
1061 {
1062 /* in overflow range, we can safely assume we wrapped around */
1063 return GNUNET_NO;
1064 }
1065 else
1066 {
1067 return GNUNET_YES;
1068 }
1069}
1070
1071
1072/**
1073 * We got payload data for a channel. Pass it on to the client
1074 * and send an ACK to the other end (once flow control allows it!)
1075 *
1076 * @param ch channel that got data
1077 * @param msg message that was received
1078 */
1079void
1080GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1081 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1082{
1083 struct GNUNET_MQ_Envelope *env;
1084 struct GNUNET_CADET_LocalData *ld;
1085 struct CadetChannelClient *ccc;
1086 struct CadetOutOfOrderMessage *com;
1087 size_t payload_size;
1088
1089 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1090 if ( (GNUNET_YES == ch->destroy) &&
1091 (NULL == ch->owner) &&
1092 (NULL == ch->dest) )
1093 {
1094 /* This client is gone, but we still have messages to send to
1095 the other end (which is why @a ch is not yet dead). However,
1096 we cannot pass messages to our client anymore. */
1097 LOG (GNUNET_ERROR_TYPE_DEBUG,
1098 "Dropping incoming payload on %s as this end is already closed\n",
1099 GCCH_2s (ch));
1100 /* FIXME: send back ACK/NACK/Closed notification
1101 to stop retransmissions! */
715 return; 1102 return;
716 } 1103 }
717 if (NULL != ch->head_recv) 1104 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1105 env = GNUNET_MQ_msg_extra (ld,
1106 payload_size,
1107 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1108 ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1109 GNUNET_memcpy (&ld[1],
1110 &msg[1],
1111 payload_size);
1112 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1113 if ( (GNUNET_YES == ccc->client_ready) &&
1114 ( (GNUNET_YES == ch->out_of_order) ||
1115 (msg->mid.mid == ch->mid_recv.mid) ) )
718 { 1116 {
719 /* allow local client to see all data first */ 1117 LOG (GNUNET_ERROR_TYPE_DEBUG,
720 ch->destroy = GNUNET_YES; 1118 "Giving %u bytes of payload from %s to client %s\n",
1119 (unsigned int) payload_size,
1120 GCCH_2s (ch),
1121 GSC_2s (ccc->c));
1122 ccc->client_ready = GNUNET_NO;
1123 GSC_send_to_client (ccc->c,
1124 env);
1125 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1126 ch->mid_futures >>= 1;
1127 }
1128 else
1129 {
1130 /* FIXME-SECURITY: if the element is WAY too far ahead,
1131 drop it (can't buffer too much!) */
1132 LOG (GNUNET_ERROR_TYPE_DEBUG,
1133 "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1134 (GNUNET_YES == ccc->client_ready)
1135 ? "out-of-order"
1136 : "client-not-ready",
1137 (unsigned int) payload_size,
1138 GCCH_2s (ch),
1139 ntohl (msg->mid.mid),
1140 ntohl (ch->mid_recv.mid));
1141
1142 com = GNUNET_new (struct CadetOutOfOrderMessage);
1143 com->mid = msg->mid;
1144 com->env = env;
1145 /* sort into list ordered by "is_before" */
1146 if ( (NULL == ccc->head_recv) ||
1147 (GNUNET_YES == is_before (ch,
1148 com,
1149 ccc->head_recv)) )
1150 {
1151 GNUNET_CONTAINER_DLL_insert (ccc->head_recv,
1152 ccc->tail_recv,
1153 com);
1154 }
1155 else
1156 {
1157 struct CadetOutOfOrderMessage *pos;
1158
1159 for (pos = ccc->head_recv;
1160 NULL != pos;
1161 pos = pos->next)
1162 {
1163 if (GNUNET_YES !=
1164 is_before (NULL,
1165 pos,
1166 com))
1167 break;
1168 }
1169 if (NULL == pos)
1170 GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv,
1171 ccc->tail_recv,
1172 com);
1173 else
1174 GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv,
1175 ccc->tail_recv,
1176 com,
1177 pos->prev);
1178 }
1179 }
1180}
1181
1182
1183/**
1184 * We got an acknowledgement for payload data for a channel.
1185 * Possibly resume transmissions.
1186 *
1187 * @param ch channel that got the ack
1188 * @param ack details about what was received
1189 */
1190void
1191GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1192 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1193{
1194 struct CadetReliableMessage *crm;
1195
1196 GNUNET_break (GNUNET_NO == ch->is_loopback);
1197 if (GNUNET_NO == ch->reliable)
1198 {
1199 /* not expecting ACKs on unreliable channel, odd */
1200 GNUNET_break_op (0);
721 return; 1201 return;
722 } 1202 }
723 /* Nothing left to do, just finish destruction */ 1203 for (crm = ch->head_sent;
724 channel_destroy (ch); 1204 NULL != crm;
1205 crm = crm->next)
1206 if (ack->mid.mid == crm->data_message->mid.mid)
1207 break;
1208 if (NULL == crm)
1209 {
1210 /* ACK for message we already dropped, might have been a
1211 duplicate ACK? Ignore. */
1212 LOG (GNUNET_ERROR_TYPE_DEBUG,
1213 "Duplicate DATA_ACK on %s, ignoring\n",
1214 GCCH_2s (ch));
1215 GNUNET_STATISTICS_update (stats,
1216 "# duplicate DATA_ACKs",
1217 1,
1218 GNUNET_NO);
1219 return;
1220 }
1221 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1222 ch->tail_sent,
1223 crm);
1224 GNUNET_free (crm->data_message);
1225 GNUNET_free (crm);
1226 ch->pending_messages--;
1227 send_ack_to_client (ch,
1228 (NULL == ch->owner)
1229 ? GNUNET_NO
1230 : GNUNET_YES);
1231 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1232 LOG (GNUNET_ERROR_TYPE_DEBUG,
1233 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1234 GCCH_2s (ch),
1235 (unsigned int) ntohl (ack->mid.mid),
1236 ch->pending_messages);
1237 send_ack_to_client (ch,
1238 (NULL == ch->owner)
1239 ? GNUNET_NO
1240 : GNUNET_YES);
725} 1241}
726 1242
727 1243
@@ -730,19 +1246,36 @@ GCCH_channel_incoming_destroy (struct CadetChannel *ch)
730 * connection. Also needs to remove this channel from 1246 * connection. Also needs to remove this channel from
731 * the tunnel. 1247 * the tunnel.
732 * 1248 *
733 * FIXME: need to make it possible to defer destruction until we have
734 * received all messages up to the destroy, and right now the destroy
735 * message (and this API) fails to give is the information we need!
736 *
737 * FIXME: also need to know if the other peer got a destroy from
738 * us before!
739 *
740 * @param ch channel to destroy 1249 * @param ch channel to destroy
741 */ 1250 */
742void 1251void
743GCCH_channel_remote_destroy (struct CadetChannel *ch) 1252GCCH_handle_remote_destroy (struct CadetChannel *ch)
744{ 1253{
745 GNUNET_break (0); // FIXME! 1254 struct CadetChannelClient *ccc;
1255
1256 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1257 LOG (GNUNET_ERROR_TYPE_DEBUG,
1258 "Received remote channel DESTROY for %s\n",
1259 GCCH_2s (ch));
1260 if (GNUNET_YES == ch->destroy)
1261 {
1262 /* Local client already gone, this is instant-death. */
1263 channel_destroy (ch);
1264 return;
1265 }
1266 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1267 if (NULL != ccc->head_recv)
1268 {
1269 LOG (GNUNET_ERROR_TYPE_WARNING,
1270 "Lost end of transmission due to remote shutdown on %s\n",
1271 GCCH_2s (ch));
1272 /* FIXME: change API to notify client about truncated transmission! */
1273 }
1274 ch->destroy = GNUNET_YES;
1275 GSC_handle_remote_channel_destroy (ccc->c,
1276 ccc->ccn,
1277 ch);
1278 channel_destroy (ch);
746} 1279}
747 1280
748 1281
@@ -770,67 +1303,16 @@ retry_transmission (void *cls)
770 struct CadetChannel *ch = cls; 1303 struct CadetChannel *ch = cls;
771 struct CadetReliableMessage *crm = ch->head_sent; 1304 struct CadetReliableMessage *crm = ch->head_sent;
772 1305
1306 ch->retry_data_task = NULL;
773 GNUNET_assert (NULL == crm->qe); 1307 GNUNET_assert (NULL == crm->qe);
774 crm->qe = GCT_send (ch->t, 1308 crm->qe = GCT_send (ch->t,
775 &crm->data_message.header, 1309 &crm->data_message->header,
776 &data_sent_cb, 1310 &data_sent_cb,
777 crm); 1311 crm);
778} 1312}
779 1313
780 1314
781/** 1315/**
782 * Check if we can now allow the client to transmit, and if so,
783 * let the client know about it.
784 *
785 * @param ch channel to check
786 */
787static void
788GCCH_check_allow_client (struct CadetChannel *ch)
789{
790 struct GNUNET_MQ_Envelope *env;
791 struct GNUNET_CADET_LocalAck *msg;
792
793 if (GNUNET_YES == ch->client_allowed)
794 return; /* client already allowed! */
795 if (CADET_CHANNEL_READY != ch->state)
796 {
797 /* destination did not yet ACK our CREATE! */
798 LOG (GNUNET_ERROR_TYPE_DEBUG,
799 "Channel %s not yet ready, throttling client until ACK.\n",
800 GCCH_2s (ch));
801 return;
802 }
803 if (ch->pending_messages > ch->max_pending_messages)
804 {
805 /* Too many messages in queue. */
806 LOG (GNUNET_ERROR_TYPE_DEBUG,
807 "Message queue still too long on channel %s, throttling client until ACK.\n",
808 GCCH_2s (ch));
809 return;
810 }
811 if ( (NULL != ch->head_sent) &&
812 (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
813 {
814 LOG (GNUNET_ERROR_TYPE_DEBUG,
815 "Gap in ACKs too big on channel %s, throttling client until ACK.\n",
816 GCCH_2s (ch));
817 return;
818 }
819 ch->client_allowed = GNUNET_YES;
820
821
822 LOG (GNUNET_ERROR_TYPE_DEBUG,
823 "Sending local ack to channel %s client\n",
824 GCCH_2s (ch));
825 env = GNUNET_MQ_msg (msg,
826 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
827 msg->channel_id = ch->lid;
828 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
829 env);
830}
831
832
833/**
834 * Function called once the tunnel has sent one of our messages. 1316 * Function called once the tunnel has sent one of our messages.
835 * If the message is unreliable, simply frees the `crm`. If the 1317 * If the message is unreliable, simply frees the `crm`. If the
836 * message was reliable, calculate retransmission time and 1318 * message was reliable, calculate retransmission time and
@@ -845,6 +1327,7 @@ data_sent_cb (void *cls)
845 struct CadetChannel *ch = crm->ch; 1327 struct CadetChannel *ch = crm->ch;
846 struct CadetReliableMessage *off; 1328 struct CadetReliableMessage *off;
847 1329
1330 GNUNET_assert (GNUNET_NO == ch->is_loopback);
848 crm->qe = NULL; 1331 crm->qe = NULL;
849 GNUNET_CONTAINER_DLL_remove (ch->head_sent, 1332 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
850 ch->tail_sent, 1333 ch->tail_sent,
@@ -853,7 +1336,10 @@ data_sent_cb (void *cls)
853 { 1336 {
854 GNUNET_free (crm); 1337 GNUNET_free (crm);
855 ch->pending_messages--; 1338 ch->pending_messages--;
856 GCCH_check_allow_client (ch); 1339 send_ack_to_client (ch,
1340 (NULL == ch->owner)
1341 ? GNUNET_NO
1342 : GNUNET_YES);
857 return; 1343 return;
858 } 1344 }
859 if (0 == crm->retry_delay.rel_value_us) 1345 if (0 == crm->retry_delay.rel_value_us)
@@ -868,11 +1354,12 @@ data_sent_cb (void *cls)
868 GNUNET_CONTAINER_DLL_insert (ch->head_sent, 1354 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
869 ch->tail_sent, 1355 ch->tail_sent,
870 crm); 1356 crm);
871 if (NULL != ch->retry_task) 1357 if (NULL != ch->retry_data_task)
872 GNUNET_SCHEDULER_cancel (ch->retry_task); 1358 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
873 ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay, 1359 ch->retry_data_task
874 &retry_transmission, 1360 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
875 ch); 1361 &retry_transmission,
1362 ch);
876 return; 1363 return;
877 } 1364 }
878 for (off = ch->head_sent; NULL != off; off = off->next) 1365 for (off = ch->head_sent; NULL != off; off = off->next)
@@ -904,82 +1391,142 @@ data_sent_cb (void *cls)
904 * buffer space in the tunnel. 1391 * buffer space in the tunnel.
905 * 1392 *
906 * @param ch Channel. 1393 * @param ch Channel.
907 * @param message payload to transmit. 1394 * @param sender_ccn ccn of the sender
1395 * @param buf payload to transmit.
1396 * @param buf_len number of bytes in @a buf
908 * @return #GNUNET_OK if everything goes well, 1397 * @return #GNUNET_OK if everything goes well,
909 * #GNUNET_SYSERR in case of an error. 1398 * #GNUNET_SYSERR in case of an error.
910 */ 1399 */
911int 1400int
912GCCH_handle_local_data (struct CadetChannel *ch, 1401GCCH_handle_local_data (struct CadetChannel *ch,
913 const struct GNUNET_MessageHeader *message) 1402 struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1403 const char *buf,
1404 size_t buf_len)
914{ 1405{
915 uint16_t payload_size = ntohs (message->size);
916 struct CadetReliableMessage *crm; 1406 struct CadetReliableMessage *crm;
917 1407
918 if (GNUNET_NO == ch->client_allowed) 1408 if (ch->pending_messages > ch->max_pending_messages)
919 { 1409 {
920 GNUNET_break_op (0); 1410 GNUNET_break (0);
921 return GNUNET_SYSERR; 1411 return GNUNET_SYSERR;
922 } 1412 }
923 ch->client_allowed = GNUNET_NO;
924 ch->pending_messages++; 1413 ch->pending_messages++;
925 1414
1415 if (GNUNET_YES == ch->is_loopback)
1416 {
1417 struct CadetChannelClient *receiver;
1418 struct GNUNET_MQ_Envelope *env;
1419 struct GNUNET_CADET_LocalData *ld;
1420 int to_owner;
1421
1422 env = GNUNET_MQ_msg_extra (ld,
1423 buf_len,
1424 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1425 if (sender_ccn.channel_of_client ==
1426 ch->owner->ccn.channel_of_client)
1427 {
1428 receiver = ch->dest;
1429 to_owner = GNUNET_NO;
1430 }
1431 else
1432 {
1433 GNUNET_assert (sender_ccn.channel_of_client ==
1434 ch->dest->ccn.channel_of_client);
1435 receiver = ch->owner;
1436 to_owner = GNUNET_YES;
1437 }
1438 ld->ccn = receiver->ccn;
1439 GNUNET_memcpy (&ld[1],
1440 buf,
1441 buf_len);
1442 /* FIXME: this does not provide for flow control! */
1443 GSC_send_to_client (receiver->c,
1444 env);
1445 send_ack_to_client (ch,
1446 to_owner);
1447 return GNUNET_OK;
1448 }
1449
926 /* Everything is correct, send the message. */ 1450 /* Everything is correct, send the message. */
927 crm = GNUNET_malloc (sizeof (*crm) + payload_size); 1451 crm = GNUNET_malloc (sizeof (*crm));
928 crm->ch = ch; 1452 crm->ch = ch;
929 crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size); 1453 crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
930 crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA); 1454 + buf_len);
1455 crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1456 crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
931 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1); 1457 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
932 crm->data_message.mid = ch->mid_send; 1458 crm->data_message->mid = ch->mid_send;
933 crm->data_message.gid = ch->gid; 1459 crm->data_message->ctn = ch->ctn;
934 GNUNET_memcpy (&crm[1], 1460 GNUNET_memcpy (&crm->data_message[1],
935 message, 1461 buf,
936 payload_size); 1462 buf_len);
937 GNUNET_CONTAINER_DLL_insert (ch->head_sent, 1463 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
938 ch->tail_sent, 1464 ch->tail_sent,
939 crm); 1465 crm);
940 LOG (GNUNET_ERROR_TYPE_DEBUG, 1466 LOG (GNUNET_ERROR_TYPE_DEBUG,
941 "Sending %u bytes from local client to channel %s\n", 1467 "Sending %u bytes from local client to %s\n",
942 payload_size, 1468 buf_len,
943 GCCH_2s (ch)); 1469 GCCH_2s (ch));
944 crm->qe = GCT_send (ch->t, 1470 crm->qe = GCT_send (ch->t,
945 &crm->data_message.header, 1471 &crm->data_message->header,
946 &data_sent_cb, 1472 &data_sent_cb,
947 crm); 1473 crm);
948 GCCH_check_allow_client (ch);
949 return GNUNET_OK; 1474 return GNUNET_OK;
950} 1475}
951 1476
952 1477
953/** 1478/**
954 * Try to deliver messages to the local client, if it is ready for more. 1479 * Handle ACK from client on local channel. Means the client is ready
1480 * for more data, see if we have any for it.
955 * 1481 *
956 * @param ch channel to process 1482 * @param ch channel to destroy
1483 * @param client_ccn ccn of the client sending the ack
957 */ 1484 */
958static void 1485void
959send_client_buffered_data (struct CadetChannel *ch) 1486GCCH_handle_local_ack (struct CadetChannel *ch,
1487 struct GNUNET_CADET_ClientChannelNumber client_ccn)
960{ 1488{
1489 struct CadetChannelClient *ccc;
961 struct CadetOutOfOrderMessage *com; 1490 struct CadetOutOfOrderMessage *com;
962 1491
963 if (GNUNET_NO == ch->client_ready) 1492 if ( (NULL != ch->owner) &&
964 return; /* client not ready */ 1493 (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
965 com = ch->head_recv; 1494 ccc = ch->owner;
1495 else if ( (NULL != ch->dest) &&
1496 (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1497 ccc = ch->dest;
1498 else
1499 GNUNET_assert (0);
1500 ccc->client_ready = GNUNET_YES;
1501 com = ccc->head_recv;
966 if (NULL == com) 1502 if (NULL == com)
1503 {
1504 LOG (GNUNET_ERROR_TYPE_DEBUG,
1505 "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending)!\n",
1506 GSC_2s (ccc->c),
1507 ntohl (ccc->ccn.channel_of_client));
967 return; /* none pending */ 1508 return; /* none pending */
1509 }
968 if ( (com->mid.mid != ch->mid_recv.mid) && 1510 if ( (com->mid.mid != ch->mid_recv.mid) &&
969 (GNUNET_NO == ch->out_of_order) ) 1511 (GNUNET_NO == ch->out_of_order) )
970 return; /* missing next one in-order */ 1512 return; /* missing next one in-order */
971 1513
972 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1514 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973 "Passing payload message to client on channel %s\n", 1515 "Got LOCAL ACK, passing payload message to %s-%X on %s\n",
1516 GSC_2s (ccc->c),
1517 ntohl (ccc->ccn.channel_of_client),
974 GCCH_2s (ch)); 1518 GCCH_2s (ch));
975 1519
976 /* all good, pass next message to client */ 1520 /* all good, pass next message to client */
977 GNUNET_CONTAINER_DLL_remove (ch->head_recv, 1521 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
978 ch->tail_recv, 1522 ccc->tail_recv,
979 com); 1523 com);
1524 /* FIXME: if unreliable, this is not aggressive
1525 enough, as it would be OK to have lost some! */
980 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); 1526 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
981 ch->mid_futures >>= 1; /* equivalent to division by 2 */ 1527 ch->mid_futures >>= 1; /* equivalent to division by 2 */
982 GSC_send_to_client (ch->owner ? ch->owner : ch->dest, 1528 ccc->client_ready = GNUNET_NO;
1529 GSC_send_to_client (ccc->c,
983 com->env); 1530 com->env);
984 GNUNET_free (com); 1531 GNUNET_free (com);
985 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && 1532 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
@@ -991,33 +1538,22 @@ send_client_buffered_data (struct CadetChannel *ch)
991 maximum of 64 bits, and 15 is getting too close for comfort.) 1538 maximum of 64 bits, and 15 is getting too close for comfort.)
992 So we should send one now. */ 1539 So we should send one now. */
993 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994 "Sender on channel %s likely blocked on flow-control, sending ACK now.\n", 1541 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
995 GCCH_2s (ch)); 1542 GCCH_2s (ch));
996 if (GNUNET_YES == ch->reliable) 1543 if (GNUNET_YES == ch->reliable)
997 send_channel_ack (ch); 1544 send_channel_data_ack (ch);
998 } 1545 }
999 1546
1000 if (NULL != ch->head_recv) 1547 if (NULL != ccc->head_recv)
1001 return; 1548 return;
1002 if (GNUNET_NO == ch->destroy) 1549 if (GNUNET_NO == ch->destroy)
1003 return; 1550 return;
1551 GCT_send_channel_destroy (ch->t,
1552 ch->ctn);
1004 channel_destroy (ch); 1553 channel_destroy (ch);
1005} 1554}
1006 1555
1007 1556
1008/**
1009 * Handle ACK from client on local channel.
1010 *
1011 * @param ch channel to destroy
1012 */
1013void
1014GCCH_handle_local_ack (struct CadetChannel *ch)
1015{
1016 ch->client_ready = GNUNET_YES;
1017 send_client_buffered_data (ch);
1018}
1019
1020
1021#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) 1557#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1022 1558
1023 1559
@@ -1045,25 +1581,25 @@ GCCH_debug (struct CadetChannel *ch,
1045 return; 1581 return;
1046 } 1582 }
1047 LOG2 (level, 1583 LOG2 (level,
1048 "CHN Channel %s:%X (%p)\n", 1584 "CHN %s:%X (%p)\n",
1049 GCT_2s (ch->t), 1585 GCT_2s (ch->t),
1050 ch->gid, 1586 ch->ctn,
1051 ch); 1587 ch);
1052 if (NULL != ch->owner) 1588 if (NULL != ch->owner)
1053 { 1589 {
1054 LOG2 (level, 1590 LOG2 (level,
1055 "CHN origin %s ready %s local-id: %u\n", 1591 "CHN origin %s ready %s local-id: %u\n",
1056 GSC_2s (ch->owner), 1592 GSC_2s (ch->owner->c),
1057 ch->client_ready ? "YES" : "NO", 1593 ch->owner->client_ready ? "YES" : "NO",
1058 ntohl (ch->lid.channel_of_client)); 1594 ntohl (ch->owner->ccn.channel_of_client));
1059 } 1595 }
1060 if (NULL != ch->dest) 1596 if (NULL != ch->dest)
1061 { 1597 {
1062 LOG2 (level, 1598 LOG2 (level,
1063 "CHN destination %s ready %s local-id: %u\n", 1599 "CHN destination %s ready %s local-id: %u\n",
1064 GSC_2s (ch->dest), 1600 GSC_2s (ch->dest->c),
1065 ch->client_ready ? "YES" : "NO", 1601 ch->dest->client_ready ? "YES" : "NO",
1066 ntohl (ch->lid.channel_of_client)); 1602 ntohl (ch->dest->ccn.channel_of_client));
1067 } 1603 }
1068 LOG2 (level, 1604 LOG2 (level,
1069 "CHN Message IDs recv: %d (%LLX), send: %d\n", 1605 "CHN Message IDs recv: %d (%LLX), send: %d\n",