aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet-new_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r--src/cadet/gnunet-service-cadet-new_channel.c2037
1 files changed, 0 insertions, 2037 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c
deleted file mode 100644
index 8769601c2..000000000
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ /dev/null
@@ -1,2037 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2001-2017 GNUnet e.V.
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
19*/
20/**
21 * @file cadet/gnunet-service-cadet-new_channel.c
22 * @brief logical links between CADET clients
23 * @author Bartlomiej Polot
24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - Congestion/flow control:
28 * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
29 * (and figure out how/where to use this!)
30 * + figure out flow control without ACKs (unreliable traffic!)
31 * - revisit handling of 'unbuffered' traffic!
32 * (need to push down through tunnel into connection selection)
33 * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
34 * reserve more bits in 'options' to allow for buffer size control?
35 */
36#include "platform.h"
37#include "gnunet_util_lib.h"
38#include "cadet.h"
39#include "gnunet_statistics_service.h"
40#include "gnunet-service-cadet-new.h"
41#include "gnunet-service-cadet-new_channel.h"
42#include "gnunet-service-cadet-new_connection.h"
43#include "gnunet-service-cadet-new_tunnels.h"
44#include "gnunet-service-cadet-new_peer.h"
45#include "gnunet-service-cadet-new_paths.h"
46
47#define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49/**
50 * How long do we initially wait before retransmitting?
51 */
52#define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54/**
55 * How long do we wait before dropping state about incoming
56 * connection to closed port?
57 */
58#define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60/**
61 * How long do we wait at least before retransmitting ever?
62 */
63#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
64
65/**
66 * Maximum message ID into the future we accept for out-of-order messages.
67 * If the message is more than this into the future, we drop it. This is
68 * important both to detect values that are actually in the past, as well
69 * as to limit adversarially triggerable memory consumption.
70 *
71 * Note that right now we have "max_pending_messages = 4" hard-coded in
72 * the logic below, so a value of 4 would suffice here. But we plan to
73 * allow larger windows in the future...
74 */
75#define MAX_OUT_OF_ORDER_DISTANCE 1024
76
77
78/**
79 * All the states a channel can be in.
80 */
81enum CadetChannelState
82{
83 /**
84 * Uninitialized status, should never appear in operation.
85 */
86 CADET_CHANNEL_NEW,
87
88 /**
89 * Channel is to a port that is not open, we're waiting for the
90 * port to be opened.
91 */
92 CADET_CHANNEL_LOOSE,
93
94 /**
95 * CHANNEL_OPEN message sent, waiting for CHANNEL_OPEN_ACK.
96 */
97 CADET_CHANNEL_OPEN_SENT,
98
99 /**
100 * Connection confirmed, ready to carry traffic.
101 */
102 CADET_CHANNEL_READY
103};
104
105
106/**
107 * Info needed to retry a message in case it gets lost.
108 * Note that we DO use this structure also for unreliable
109 * messages.
110 */
111struct CadetReliableMessage
112{
113 /**
114 * Double linked list, FIFO style
115 */
116 struct CadetReliableMessage *next;
117
118 /**
119 * Double linked list, FIFO style
120 */
121 struct CadetReliableMessage *prev;
122
123 /**
124 * Which channel is this message in?
125 */
126 struct CadetChannel *ch;
127
128 /**
129 * Entry in the tunnels queue for this message, NULL if it has left
130 * the tunnel. Used to cancel transmission in case we receive an
131 * ACK in time.
132 */
133 struct CadetTunnelQueueEntry *qe;
134
135 /**
136 * Data message we are trying to send.
137 */
138 struct GNUNET_CADET_ChannelAppDataMessage *data_message;
139
140 /**
141 * How soon should we retry if we fail to get an ACK?
142 * Messages in the queue are sorted by this value.
143 */
144 struct GNUNET_TIME_Absolute next_retry;
145
146 /**
147 * How long do we wait for an ACK after transmission?
148 * Use for the back-off calculation.
149 */
150 struct GNUNET_TIME_Relative retry_delay;
151
152 /**
153 * Time when we first successfully transmitted the message
154 * (that is, set @e num_transmissions to 1).
155 */
156 struct GNUNET_TIME_Absolute first_transmission_time;
157
158 /**
159 * Identifier of the connection that this message took when it
160 * was first transmitted. Only useful if @e num_transmissions is 1.
161 */
162 struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
163
164 /**
165 * How often was this message transmitted? #GNUNET_SYSERR if there
166 * was an error transmitting the message, #GNUNET_NO if it was not
167 * yet transmitted ever, otherwise the number of (re) transmissions.
168 */
169 int num_transmissions;
170
171};
172
173
174/**
175 * List of received out-of-order data messages.
176 */
177struct CadetOutOfOrderMessage
178{
179 /**
180 * Double linked list, FIFO style
181 */
182 struct CadetOutOfOrderMessage *next;
183
184 /**
185 * Double linked list, FIFO style
186 */
187 struct CadetOutOfOrderMessage *prev;
188
189 /**
190 * ID of the message (messages up to this point needed
191 * before we give this one to the client).
192 */
193 struct ChannelMessageIdentifier mid;
194
195 /**
196 * The envelope with the payload of the out-of-order message
197 */
198 struct GNUNET_MQ_Envelope *env;
199
200};
201
202
203/**
204 * Client endpoint of a `struct CadetChannel`. A channel may be a
205 * loopback channel, in which case it has two of these endpoints.
206 * Note that flow control also is required in both directions.
207 */
208struct CadetChannelClient
209{
210 /**
211 * Client handle. Not by itself sufficient to designate
212 * the client endpoint, as the same client handle may
213 * be used for both the owner and the destination, and
214 * we thus also need the channel ID to identify the client.
215 */
216 struct CadetClient *c;
217
218 /**
219 * Head of DLL of messages received out of order or while client was unready.
220 */
221 struct CadetOutOfOrderMessage *head_recv;
222
223 /**
224 * Tail DLL of messages received out of order or while client was unready.
225 */
226 struct CadetOutOfOrderMessage *tail_recv;
227
228 /**
229 * Local tunnel number for this client.
230 * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
231 * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
232 */
233 struct GNUNET_CADET_ClientChannelNumber ccn;
234
235 /**
236 * Number of entries currently in @a head_recv DLL.
237 */
238 unsigned int num_recv;
239
240 /**
241 * Can we send data to the client?
242 */
243 int client_ready;
244
245};
246
247
248/**
249 * Struct containing all information regarding a channel to a remote client.
250 */
251struct CadetChannel
252{
253 /**
254 * Tunnel this channel is in.
255 */
256 struct CadetTunnel *t;
257
258 /**
259 * Client owner of the tunnel, if any.
260 * (Used if this channel represends the initiating end of the tunnel.)
261 */
262 struct CadetChannelClient *owner;
263
264 /**
265 * Client destination of the tunnel, if any.
266 * (Used if this channel represents the listening end of the tunnel.)
267 */
268 struct CadetChannelClient *dest;
269
270 /**
271 * Last entry in the tunnel's queue relating to control messages
272 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
273 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
274 * transmission in case we receive updated information.
275 */
276 struct CadetTunnelQueueEntry *last_control_qe;
277
278 /**
279 * Head of DLL of messages sent and not yet ACK'd.
280 */
281 struct CadetReliableMessage *head_sent;
282
283 /**
284 * Tail of DLL of messages sent and not yet ACK'd.
285 */
286 struct CadetReliableMessage *tail_sent;
287
288 /**
289 * Task to resend/poll in case no ACK is received.
290 */
291 struct GNUNET_SCHEDULER_Task *retry_control_task;
292
293 /**
294 * Task to resend/poll in case no ACK is received.
295 */
296 struct GNUNET_SCHEDULER_Task *retry_data_task;
297
298 /**
299 * Last time the channel was used
300 */
301 struct GNUNET_TIME_Absolute timestamp;
302
303 /**
304 * Destination port of the channel.
305 */
306 struct GNUNET_HashCode port;
307
308 /**
309 * Counter for exponential backoff.
310 */
311 struct GNUNET_TIME_Relative retry_time;
312
313 /**
314 * Bitfield of already-received messages past @e mid_recv.
315 */
316 uint64_t mid_futures;
317
318 /**
319 * Next MID expected for incoming traffic.
320 */
321 struct ChannelMessageIdentifier mid_recv;
322
323 /**
324 * Next MID to use for outgoing traffic.
325 */
326 struct ChannelMessageIdentifier mid_send;
327
328 /**
329 * Total (reliable) messages pending ACK for this channel.
330 */
331 unsigned int pending_messages;
332
333 /**
334 * Maximum (reliable) messages pending ACK for this channel
335 * before we throttle the client.
336 */
337 unsigned int max_pending_messages;
338
339 /**
340 * Number identifying this channel in its tunnel.
341 */
342 struct GNUNET_CADET_ChannelTunnelNumber ctn;
343
344 /**
345 * Channel state.
346 */
347 enum CadetChannelState state;
348
349 /**
350 * Count how many ACKs we skipped, used to prevent long
351 * sequences of ACK skipping.
352 */
353 unsigned int skip_ack_series;
354
355 /**
356 * Is the tunnel bufferless (minimum latency)?
357 */
358 int nobuffer;
359
360 /**
361 * Is the tunnel reliable?
362 */
363 int reliable;
364
365 /**
366 * Is the tunnel out-of-order?
367 */
368 int out_of_order;
369
370 /**
371 * Is this channel a loopback channel, where the destination is us again?
372 */
373 int is_loopback;
374
375 /**
376 * Flag to signal the destruction of the channel. If this is set to
377 * #GNUNET_YES the channel will be destroyed once the queue is
378 * empty.
379 */
380 int destroy;
381
382};
383
384
385/**
386 * Get the static string for identification of the channel.
387 *
388 * @param ch Channel.
389 *
390 * @return Static string with the channel IDs.
391 */
392const char *
393GCCH_2s (const struct CadetChannel *ch)
394{
395 static char buf[128];
396
397 GNUNET_snprintf (buf,
398 sizeof (buf),
399 "Channel %s:%s ctn:%X(%X/%X)",
400 (GNUNET_YES == ch->is_loopback)
401 ? "loopback"
402 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
403 GNUNET_h2s (&ch->port),
404 ch->ctn,
405 (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
406 (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
407 return buf;
408}
409
410
411/**
412 * Get the channel's public ID.
413 *
414 * @param ch Channel.
415 *
416 * @return ID used to identify the channel with the remote peer.
417 */
418struct GNUNET_CADET_ChannelTunnelNumber
419GCCH_get_id (const struct CadetChannel *ch)
420{
421 return ch->ctn;
422}
423
424
425/**
426 * Release memory associated with @a ccc
427 *
428 * @param ccc data structure to clean up
429 */
430static void
431free_channel_client (struct CadetChannelClient *ccc)
432{
433 struct CadetOutOfOrderMessage *com;
434
435 while (NULL != (com = ccc->head_recv))
436 {
437 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
438 ccc->tail_recv,
439 com);
440 ccc->num_recv--;
441 GNUNET_MQ_discard (com->env);
442 GNUNET_free (com);
443 }
444 GNUNET_free (ccc);
445}
446
447
448/**
449 * Destroy the given channel.
450 *
451 * @param ch channel to destroy
452 */
453static void
454channel_destroy (struct CadetChannel *ch)
455{
456 struct CadetReliableMessage *crm;
457
458 while (NULL != (crm = ch->head_sent))
459 {
460 GNUNET_assert (ch == crm->ch);
461 if (NULL != crm->qe)
462 {
463 GCT_send_cancel (crm->qe);
464 crm->qe = NULL;
465 }
466 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
467 ch->tail_sent,
468 crm);
469 GNUNET_free (crm->data_message);
470 GNUNET_free (crm);
471 }
472 if (NULL != ch->owner)
473 {
474 free_channel_client (ch->owner);
475 ch->owner = NULL;
476 }
477 if (NULL != ch->dest)
478 {
479 free_channel_client (ch->dest);
480 ch->dest = NULL;
481 }
482 if (NULL != ch->last_control_qe)
483 {
484 GCT_send_cancel (ch->last_control_qe);
485 ch->last_control_qe = NULL;
486 }
487 if (NULL != ch->retry_data_task)
488 {
489 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
490 ch->retry_data_task = NULL;
491 }
492 if (NULL != ch->retry_control_task)
493 {
494 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
495 ch->retry_control_task = NULL;
496 }
497 if (GNUNET_NO == ch->is_loopback)
498 {
499 GCT_remove_channel (ch->t,
500 ch,
501 ch->ctn);
502 ch->t = NULL;
503 }
504 GNUNET_free (ch);
505}
506
507
508/**
509 * Send a channel create message.
510 *
511 * @param cls Channel for which to send.
512 */
513static void
514send_channel_open (void *cls);
515
516
517/**
518 * Function called once the tunnel confirms that we sent the
519 * create message. Delays for a bit until we retry.
520 *
521 * @param cls our `struct CadetChannel`.
522 * @param cid identifier of the connection within the tunnel, NULL
523 * if transmission failed
524 */
525static void
526channel_open_sent_cb (void *cls,
527 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
528{
529 struct CadetChannel *ch = cls;
530
531 GNUNET_assert (NULL != ch->last_control_qe);
532 ch->last_control_qe = NULL;
533 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
534 LOG (GNUNET_ERROR_TYPE_DEBUG,
535 "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
536 GCCH_2s (ch),
537 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
538 GNUNET_YES));
539 ch->retry_control_task
540 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
541 &send_channel_open,
542 ch);
543}
544
545
546/**
547 * Send a channel open message.
548 *
549 * @param cls Channel for which to send.
550 */
551static void
552send_channel_open (void *cls)
553{
554 struct CadetChannel *ch = cls;
555 struct GNUNET_CADET_ChannelOpenMessage msgcc;
556 uint32_t options;
557
558 ch->retry_control_task = NULL;
559 LOG (GNUNET_ERROR_TYPE_DEBUG,
560 "Sending CHANNEL_OPEN message for %s\n",
561 GCCH_2s (ch));
562 options = 0;
563 if (ch->nobuffer)
564 options |= GNUNET_CADET_OPTION_NOBUFFER;
565 if (ch->reliable)
566 options |= GNUNET_CADET_OPTION_RELIABLE;
567 if (ch->out_of_order)
568 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
569 msgcc.header.size = htons (sizeof (msgcc));
570 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
571 msgcc.opt = htonl (options);
572 msgcc.port = ch->port;
573 msgcc.ctn = ch->ctn;
574 ch->state = CADET_CHANNEL_OPEN_SENT;
575 if (NULL != ch->last_control_qe)
576 GCT_send_cancel (ch->last_control_qe);
577 ch->last_control_qe = GCT_send (ch->t,
578 &msgcc.header,
579 &channel_open_sent_cb,
580 ch);
581 GNUNET_assert (NULL == ch->retry_control_task);
582}
583
584
585/**
586 * Function called once and only once after a channel was bound
587 * to its tunnel via #GCT_add_channel() is ready for transmission.
588 * Note that this is only the case for channels that this peer
589 * initiates, as for incoming channels we assume that they are
590 * ready for transmission immediately upon receiving the open
591 * message. Used to bootstrap the #GCT_send() process.
592 *
593 * @param ch the channel for which the tunnel is now ready
594 */
595void
596GCCH_tunnel_up (struct CadetChannel *ch)
597{
598 GNUNET_assert (NULL == ch->retry_control_task);
599 LOG (GNUNET_ERROR_TYPE_DEBUG,
600 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
601 GCCH_2s (ch));
602 ch->retry_control_task
603 = GNUNET_SCHEDULER_add_now (&send_channel_open,
604 ch);
605}
606
607
608/**
609 * Create a new channel.
610 *
611 * @param owner local client owning the channel
612 * @param ccn local number of this channel at the @a owner
613 * @param destination peer to which we should build the channel
614 * @param port desired port at @a destination
615 * @param options options for the channel
616 * @return handle to the new channel
617 */
618struct CadetChannel *
619GCCH_channel_local_new (struct CadetClient *owner,
620 struct GNUNET_CADET_ClientChannelNumber ccn,
621 struct CadetPeer *destination,
622 const struct GNUNET_HashCode *port,
623 uint32_t options)
624{
625 struct CadetChannel *ch;
626 struct CadetChannelClient *ccco;
627
628 ccco = GNUNET_new (struct CadetChannelClient);
629 ccco->c = owner;
630 ccco->ccn = ccn;
631 ccco->client_ready = GNUNET_YES;
632
633 ch = GNUNET_new (struct CadetChannel);
634 ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
635 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
636 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
637 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
638 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
639 ch->owner = ccco;
640 ch->port = *port;
641 if (0 == memcmp (&my_full_id,
642 GCP_get_id (destination),
643 sizeof (struct GNUNET_PeerIdentity)))
644 {
645 struct CadetClient *c;
646
647 ch->is_loopback = GNUNET_YES;
648 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
649 port);
650 if (NULL == c)
651 {
652 /* port closed, wait for it to possibly open */
653 ch->state = CADET_CHANNEL_LOOSE;
654 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
655 port,
656 ch,
657 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
658 LOG (GNUNET_ERROR_TYPE_DEBUG,
659 "Created loose incoming loopback channel to port %s\n",
660 GNUNET_h2s (&ch->port));
661 }
662 else
663 {
664 GCCH_bind (ch,
665 c);
666 }
667 }
668 else
669 {
670 ch->t = GCP_get_tunnel (destination,
671 GNUNET_YES);
672 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
673 ch->ctn = GCT_add_channel (ch->t,
674 ch);
675 }
676 GNUNET_STATISTICS_update (stats,
677 "# channels",
678 1,
679 GNUNET_NO);
680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "Created channel to port %s at peer %s for %s using %s\n",
682 GNUNET_h2s (port),
683 GCP_2s (destination),
684 GSC_2s (owner),
685 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
686 return ch;
687}
688
689
690/**
691 * We had an incoming channel to a port that is closed.
692 * It has not been opened for a while, drop it.
693 *
694 * @param cls the channel to drop
695 */
696static void
697timeout_closed_cb (void *cls)
698{
699 struct CadetChannel *ch = cls;
700
701 ch->retry_control_task = NULL;
702 LOG (GNUNET_ERROR_TYPE_DEBUG,
703 "Closing incoming channel to port %s from peer %s due to timeout\n",
704 GNUNET_h2s (&ch->port),
705 GCP_2s (GCT_get_destination (ch->t)));
706 channel_destroy (ch);
707}
708
709
710/**
711 * Create a new channel based on a request coming in over the network.
712 *
713 * @param t tunnel to the remote peer
714 * @param ctn identifier of this channel in the tunnel
715 * @param port desired local port
716 * @param options options for the channel
717 * @return handle to the new channel
718 */
719struct CadetChannel *
720GCCH_channel_incoming_new (struct CadetTunnel *t,
721 struct GNUNET_CADET_ChannelTunnelNumber ctn,
722 const struct GNUNET_HashCode *port,
723 uint32_t options)
724{
725 struct CadetChannel *ch;
726 struct CadetClient *c;
727
728 ch = GNUNET_new (struct CadetChannel);
729 ch->port = *port;
730 ch->t = t;
731 ch->ctn = ctn;
732 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
733 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
734 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
735 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
736 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
737 GNUNET_STATISTICS_update (stats,
738 "# channels",
739 1,
740 GNUNET_NO);
741
742 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
743 port);
744 if (NULL == c)
745 {
746 /* port closed, wait for it to possibly open */
747 ch->state = CADET_CHANNEL_LOOSE;
748 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
749 port,
750 ch,
751 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
752 GNUNET_assert (NULL == ch->retry_control_task);
753 ch->retry_control_task
754 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
755 &timeout_closed_cb,
756 ch);
757 LOG (GNUNET_ERROR_TYPE_DEBUG,
758 "Created loose incoming channel to port %s from peer %s\n",
759 GNUNET_h2s (&ch->port),
760 GCP_2s (GCT_get_destination (ch->t)));
761 }
762 else
763 {
764 GCCH_bind (ch,
765 c);
766 }
767 GNUNET_STATISTICS_update (stats,
768 "# channels",
769 1,
770 GNUNET_NO);
771 return ch;
772}
773
774
775/**
776 * Function called once the tunnel confirms that we sent the
777 * ACK message. Just remembers it was sent, we do not expect
778 * ACKs for ACKs ;-).
779 *
780 * @param cls our `struct CadetChannel`.
781 * @param cid identifier of the connection within the tunnel, NULL
782 * if transmission failed
783 */
784static void
785send_ack_cb (void *cls,
786 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
787{
788 struct CadetChannel *ch = cls;
789
790 GNUNET_assert (NULL != ch->last_control_qe);
791 ch->last_control_qe = NULL;
792}
793
794
795/**
796 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
797 *
798 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
799 */
800static void
801send_channel_data_ack (struct CadetChannel *ch)
802{
803 struct GNUNET_CADET_ChannelDataAckMessage msg;
804
805 if (GNUNET_NO == ch->reliable)
806 return; /* no ACKs */
807 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
808 msg.header.size = htons (sizeof (msg));
809 msg.ctn = ch->ctn;
810 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
811 msg.futures = GNUNET_htonll (ch->mid_futures);
812 LOG (GNUNET_ERROR_TYPE_DEBUG,
813 "Sending DATA_ACK %u:%llX via %s\n",
814 (unsigned int) ntohl (msg.mid.mid),
815 (unsigned long long) ch->mid_futures,
816 GCCH_2s (ch));
817 if (NULL != ch->last_control_qe)
818 GCT_send_cancel (ch->last_control_qe);
819 ch->last_control_qe = GCT_send (ch->t,
820 &msg.header,
821 &send_ack_cb,
822 ch);
823}
824
825
826/**
827 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
828 * connection is up.
829 *
830 * @param cls the `struct CadetChannel`
831 */
832static void
833send_open_ack (void *cls)
834{
835 struct CadetChannel *ch = cls;
836 struct GNUNET_CADET_ChannelManageMessage msg;
837
838 ch->retry_control_task = NULL;
839 LOG (GNUNET_ERROR_TYPE_DEBUG,
840 "Sending CHANNEL_OPEN_ACK on %s\n",
841 GCCH_2s (ch));
842 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
843 msg.header.size = htons (sizeof (msg));
844 msg.reserved = htonl (0);
845 msg.ctn = ch->ctn;
846 if (NULL != ch->last_control_qe)
847 GCT_send_cancel (ch->last_control_qe);
848 ch->last_control_qe = GCT_send (ch->t,
849 &msg.header,
850 &send_ack_cb,
851 ch);
852}
853
854
855/**
856 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
857 * this channel. If the binding was successful, (re)transmit the
858 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
859 *
860 * @param ch channel that got the duplicate open
861 * @param cti identifier of the connection that delivered the message
862 */
863void
864GCCH_handle_duplicate_open (struct CadetChannel *ch,
865 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
866{
867 if (NULL == ch->dest)
868 {
869 LOG (GNUNET_ERROR_TYPE_DEBUG,
870 "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n",
871 GCCH_2s (ch));
872 return;
873 }
874 if (NULL != ch->retry_control_task)
875 {
876 LOG (GNUNET_ERROR_TYPE_DEBUG,
877 "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n",
878 GCCH_2s (ch));
879 return;
880 }
881 LOG (GNUNET_ERROR_TYPE_DEBUG,
882 "Retransmitting CHANNEL_OPEN_ACK on %s\n",
883 GCCH_2s (ch));
884 ch->retry_control_task
885 = GNUNET_SCHEDULER_add_now (&send_open_ack,
886 ch);
887}
888
889
890/**
891 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
892 *
893 * @param ch channel the ack is for
894 * @param to_owner #GNUNET_YES to send to owner,
895 * #GNUNET_NO to send to dest
896 */
897static void
898send_ack_to_client (struct CadetChannel *ch,
899 int to_owner)
900{
901 struct GNUNET_MQ_Envelope *env;
902 struct GNUNET_CADET_LocalAck *ack;
903 struct CadetChannelClient *ccc;
904
905 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
906 if (NULL == ccc)
907 {
908 /* This can happen if we are just getting ACKs after
909 our local client already disconnected. */
910 GNUNET_assert (GNUNET_YES == ch->destroy);
911 return;
912 }
913 env = GNUNET_MQ_msg (ack,
914 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
915 ack->ccn = ccc->ccn;
916 LOG (GNUNET_ERROR_TYPE_DEBUG,
917 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
918 GSC_2s (ccc->c),
919 (GNUNET_YES == to_owner) ? "owner" : "dest",
920 ntohl (ack->ccn.channel_of_client),
921 ch->pending_messages,
922 ch->max_pending_messages);
923 GSC_send_to_client (ccc->c,
924 env);
925}
926
927
928/**
929 * A client is bound to the port that we have a channel
930 * open to. Send the acknowledgement for the connection
931 * request and establish the link with the client.
932 *
933 * @param ch open incoming channel
934 * @param c client listening on the respective port
935 */
936void
937GCCH_bind (struct CadetChannel *ch,
938 struct CadetClient *c)
939{
940 uint32_t options;
941 struct CadetChannelClient *cccd;
942
943 LOG (GNUNET_ERROR_TYPE_DEBUG,
944 "Binding %s from %s to port %s of %s\n",
945 GCCH_2s (ch),
946 GCT_2s (ch->t),
947 GNUNET_h2s (&ch->port),
948 GSC_2s (c));
949 if (NULL != ch->retry_control_task)
950 {
951 /* there might be a timeout task here */
952 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
953 ch->retry_control_task = NULL;
954 }
955 options = 0;
956 if (ch->nobuffer)
957 options |= GNUNET_CADET_OPTION_NOBUFFER;
958 if (ch->reliable)
959 options |= GNUNET_CADET_OPTION_RELIABLE;
960 if (ch->out_of_order)
961 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
962 cccd = GNUNET_new (struct CadetChannelClient);
963 GNUNET_assert (NULL == ch->dest);
964 ch->dest = cccd;
965 cccd->c = c;
966 cccd->client_ready = GNUNET_YES;
967 cccd->ccn = GSC_bind (c,
968 ch,
969 (GNUNET_YES == ch->is_loopback)
970 ? GCP_get (&my_full_id,
971 GNUNET_YES)
972 : GCT_get_destination (ch->t),
973 &ch->port,
974 options);
975 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
976 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
977 ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
978 if (GNUNET_YES == ch->is_loopback)
979 {
980 ch->state = CADET_CHANNEL_OPEN_SENT;
981 GCCH_handle_channel_open_ack (ch,
982 NULL);
983 }
984 else
985 {
986 /* notify other peer that we accepted the connection */
987 ch->state = CADET_CHANNEL_READY;
988 ch->retry_control_task
989 = GNUNET_SCHEDULER_add_now (&send_open_ack,
990 ch);
991 }
992 /* give client it's initial supply of ACKs */
993 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
994 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
995 for (unsigned int i=0;i<ch->max_pending_messages;i++)
996 send_ack_to_client (ch,
997 GNUNET_NO);
998}
999
1000
1001/**
1002 * One of our clients has disconnected, tell the other one that we
1003 * are finished. Done asynchronously to avoid concurrent modification
1004 * issues if this is the same client.
1005 *
1006 * @param cls the `struct CadetChannel` where one of the ends is now dead
1007 */
1008static void
1009signal_remote_destroy_cb (void *cls)
1010{
1011 struct CadetChannel *ch = cls;
1012 struct CadetChannelClient *ccc;
1013
1014 /* Find which end is left... */
1015 ch->retry_control_task = NULL;
1016 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1017 GSC_handle_remote_channel_destroy (ccc->c,
1018 ccc->ccn,
1019 ch);
1020 channel_destroy (ch);
1021}
1022
1023
1024/**
1025 * Destroy locally created channel. Called by the local client, so no
1026 * need to tell the client.
1027 *
1028 * @param ch channel to destroy
1029 * @param c client that caused the destruction
1030 * @param ccn client number of the client @a c
1031 */
1032void
1033GCCH_channel_local_destroy (struct CadetChannel *ch,
1034 struct CadetClient *c,
1035 struct GNUNET_CADET_ClientChannelNumber ccn)
1036{
1037 LOG (GNUNET_ERROR_TYPE_DEBUG,
1038 "%s asks for destruction of %s\n",
1039 GSC_2s (c),
1040 GCCH_2s (ch));
1041 GNUNET_assert (NULL != c);
1042 if ( (NULL != ch->owner) &&
1043 (c == ch->owner->c) &&
1044 (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
1045 {
1046 free_channel_client (ch->owner);
1047 ch->owner = NULL;
1048 }
1049 else if ( (NULL != ch->dest) &&
1050 (c == ch->dest->c) &&
1051 (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
1052 {
1053 free_channel_client (ch->dest);
1054 ch->dest = NULL;
1055 }
1056 else
1057 {
1058 GNUNET_assert (0);
1059 }
1060
1061 if (GNUNET_YES == ch->destroy)
1062 {
1063 /* other end already destroyed, with the local client gone, no need
1064 to finish transmissions, just destroy immediately. */
1065 channel_destroy (ch);
1066 return;
1067 }
1068 if ( (NULL != ch->head_sent) &&
1069 ( (NULL != ch->owner) ||
1070 (NULL != ch->dest) ) )
1071 {
1072 /* Wait for other end to destroy us as well,
1073 and otherwise allow send queue to be transmitted first */
1074 ch->destroy = GNUNET_YES;
1075 return;
1076 }
1077 if ( (GNUNET_YES == ch->is_loopback) &&
1078 ( (NULL != ch->owner) ||
1079 (NULL != ch->dest) ) )
1080 {
1081 if (NULL != ch->retry_control_task)
1082 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1083 ch->retry_control_task
1084 = GNUNET_SCHEDULER_add_now (&signal_remote_destroy_cb,
1085 ch);
1086 return;
1087 }
1088 if (GNUNET_NO == ch->is_loopback)
1089 {
1090 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1091 switch (ch->state)
1092 {
1093 case CADET_CHANNEL_NEW:
1094 /* We gave up on a channel that we created as a client to a remote
1095 target, but that never went anywhere. Nothing to do here. */
1096 break;
1097 case CADET_CHANNEL_LOOSE:
1098 GSC_drop_loose_channel (&ch->port,
1099 ch);
1100 break;
1101 default:
1102 GCT_send_channel_destroy (ch->t,
1103 ch->ctn);
1104 }
1105 }
1106 /* Nothing left to do, just finish destruction */
1107 channel_destroy (ch);
1108}
1109
1110
1111/**
1112 * We got an acknowledgement for the creation of the channel
1113 * (the port is open on the other side). Begin transmissions.
1114 *
1115 * @param ch channel to destroy
1116 * @param cti identifier of the connection that delivered the message
1117 */
1118void
1119GCCH_handle_channel_open_ack (struct CadetChannel *ch,
1120 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1121{
1122 switch (ch->state)
1123 {
1124 case CADET_CHANNEL_NEW:
1125 /* this should be impossible */
1126 GNUNET_break (0);
1127 break;
1128 case CADET_CHANNEL_LOOSE:
1129 /* This makes no sense. */
1130 GNUNET_break_op (0);
1131 break;
1132 case CADET_CHANNEL_OPEN_SENT:
1133 if (NULL == ch->owner)
1134 {
1135 /* We're not the owner, wrong direction! */
1136 GNUNET_break_op (0);
1137 return;
1138 }
1139 LOG (GNUNET_ERROR_TYPE_DEBUG,
1140 "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1141 GCCH_2s (ch));
1142 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1143 {
1144 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1145 ch->retry_control_task = NULL;
1146 }
1147 ch->state = CADET_CHANNEL_READY;
1148 /* On first connect, send client as many ACKs as we allow messages
1149 to be buffered! */
1150 for (unsigned int i=0;i<ch->max_pending_messages;i++)
1151 send_ack_to_client (ch,
1152 GNUNET_YES);
1153 break;
1154 case CADET_CHANNEL_READY:
1155 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1156 LOG (GNUNET_ERROR_TYPE_DEBUG,
1157 "Received duplicate channel OPEN_ACK for %s\n",
1158 GCCH_2s (ch));
1159 GNUNET_STATISTICS_update (stats,
1160 "# duplicate CREATE_ACKs",
1161 1,
1162 GNUNET_NO);
1163 break;
1164 }
1165}
1166
1167
1168/**
1169 * Test if element @a e1 comes before element @a e2.
1170 *
1171 * @param cls closure, to a flag where we indicate duplicate packets
1172 * @param m1 a message of to sort
1173 * @param m2 another message to sort
1174 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1175 */
1176static int
1177is_before (void *cls,
1178 struct CadetOutOfOrderMessage *m1,
1179 struct CadetOutOfOrderMessage *m2)
1180{
1181 int *duplicate = cls;
1182 uint32_t v1 = ntohl (m1->mid.mid);
1183 uint32_t v2 = ntohl (m2->mid.mid);
1184 uint32_t delta;
1185
1186 delta = v2 - v1;
1187 if (0 == delta)
1188 *duplicate = GNUNET_YES;
1189 if (delta > (uint32_t) INT_MAX)
1190 {
1191 /* in overflow range, we can safely assume we wrapped around */
1192 return GNUNET_NO;
1193 }
1194 else
1195 {
1196 /* result is small, thus v2 > v1, thus m1 < m2 */
1197 return GNUNET_YES;
1198 }
1199}
1200
1201
1202/**
1203 * We got payload data for a channel. Pass it on to the client
1204 * and send an ACK to the other end (once flow control allows it!)
1205 *
1206 * @param ch channel that got data
1207 * @param cti identifier of the connection that delivered the message
1208 * @param msg message that was received
1209 */
1210void
1211GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1212 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1213 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1214{
1215 struct GNUNET_MQ_Envelope *env;
1216 struct GNUNET_CADET_LocalData *ld;
1217 struct CadetChannelClient *ccc;
1218 size_t payload_size;
1219 struct CadetOutOfOrderMessage *com;
1220 int duplicate;
1221 uint32_t mid_min;
1222 uint32_t mid_max;
1223 uint32_t mid_msg;
1224 uint32_t delta;
1225
1226 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1227 if ( (GNUNET_YES == ch->destroy) &&
1228 (NULL == ch->owner) &&
1229 (NULL == ch->dest) )
1230 {
1231 /* This client is gone, but we still have messages to send to
1232 the other end (which is why @a ch is not yet dead). However,
1233 we cannot pass messages to our client anymore. */
1234 LOG (GNUNET_ERROR_TYPE_DEBUG,
1235 "Dropping incoming payload on %s as this end is already closed\n",
1236 GCCH_2s (ch));
1237 /* send back DESTROY notification to stop further retransmissions! */
1238 GCT_send_channel_destroy (ch->t,
1239 ch->ctn);
1240 return;
1241 }
1242 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1243 env = GNUNET_MQ_msg_extra (ld,
1244 payload_size,
1245 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1246 ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1247 GNUNET_memcpy (&ld[1],
1248 &msg[1],
1249 payload_size);
1250 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1251 if ( (GNUNET_YES == ccc->client_ready) &&
1252 ( (GNUNET_YES == ch->out_of_order) ||
1253 (msg->mid.mid == ch->mid_recv.mid) ) )
1254 {
1255 LOG (GNUNET_ERROR_TYPE_DEBUG,
1256 "Giving %u bytes of payload with MID %u from %s to client %s\n",
1257 (unsigned int) payload_size,
1258 ntohl (msg->mid.mid),
1259 GCCH_2s (ch),
1260 GSC_2s (ccc->c));
1261 ccc->client_ready = GNUNET_NO;
1262 GSC_send_to_client (ccc->c,
1263 env);
1264 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1265 ch->mid_futures >>= 1;
1266 send_channel_data_ack (ch);
1267 return;
1268 }
1269
1270 if (GNUNET_YES == ch->reliable)
1271 {
1272 /* check if message ought to be dropped because it is ancient/too distant/duplicate */
1273 mid_min = ntohl (ch->mid_recv.mid);
1274 mid_max = mid_min + ch->max_pending_messages;
1275 mid_msg = ntohl (msg->mid.mid);
1276 if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
1277 ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
1278 {
1279 LOG (GNUNET_ERROR_TYPE_DEBUG,
1280 "%s at %u drops ancient or far-future message %u\n",
1281 GCCH_2s (ch),
1282 (unsigned int) mid_min,
1283 ntohl (msg->mid.mid));
1284
1285 GNUNET_STATISTICS_update (stats,
1286 "# duplicate DATA (ancient or future)",
1287 1,
1288 GNUNET_NO);
1289 GNUNET_MQ_discard (env);
1290 send_channel_data_ack (ch);
1291 return;
1292 }
1293 /* mark bit for future ACKs */
1294 delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
1295 if (delta < 64)
1296 {
1297 if (0 != (ch->mid_futures & (1LLU << delta)))
1298 {
1299 /* Duplicate within the queue, drop also */
1300 LOG (GNUNET_ERROR_TYPE_DEBUG,
1301 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1302 (unsigned int) payload_size,
1303 GCCH_2s (ch),
1304 ntohl (msg->mid.mid));
1305 GNUNET_STATISTICS_update (stats,
1306 "# duplicate DATA",
1307 1,
1308 GNUNET_NO);
1309 GNUNET_MQ_discard (env);
1310 send_channel_data_ack (ch);
1311 return;
1312 }
1313 ch->mid_futures |= (1LLU << delta);
1314 LOG (GNUNET_ERROR_TYPE_DEBUG,
1315 "Marked bit %llX for mid %u (base: %u); now: %llX\n",
1316 (1LLU << delta),
1317 mid_msg,
1318 mid_min,
1319 ch->mid_futures);
1320 }
1321 }
1322 else /* ! ch->reliable */
1323 {
1324 /* Channel is unreliable, so we do not ACK. But we also cannot
1325 allow buffering everything, so check if we have space... */
1326 if (ccc->num_recv >= ch->max_pending_messages)
1327 {
1328 struct CadetOutOfOrderMessage *drop;
1329
1330 /* Yep, need to drop. Drop the oldest message in
1331 the buffer. */
1332 LOG (GNUNET_ERROR_TYPE_DEBUG,
1333 "Queue full due slow client on %s, dropping oldest message\n",
1334 GCCH_2s (ch));
1335 GNUNET_STATISTICS_update (stats,
1336 "# messages dropped due to slow client",
1337 1,
1338 GNUNET_NO);
1339 drop = ccc->head_recv;
1340 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1341 ccc->tail_recv,
1342 drop);
1343 ccc->num_recv--;
1344 GNUNET_MQ_discard (drop->env);
1345 GNUNET_free (drop);
1346 }
1347 }
1348
1349 /* Insert message into sorted out-of-order queue */
1350 com = GNUNET_new (struct CadetOutOfOrderMessage);
1351 com->mid = msg->mid;
1352 com->env = env;
1353 duplicate = GNUNET_NO;
1354 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1355 is_before,
1356 &duplicate,
1357 ccc->head_recv,
1358 ccc->tail_recv,
1359 com);
1360 ccc->num_recv++;
1361 if (GNUNET_YES == duplicate)
1362 {
1363 /* Duplicate within the queue, drop also (this is not covered by
1364 the case above if "delta" >= 64, which could be the case if
1365 max_pending_messages is also >= 64 or if our client is unready
1366 and we are seeing retransmissions of the message our client is
1367 blocked on. */
1368 LOG (GNUNET_ERROR_TYPE_DEBUG,
1369 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1370 (unsigned int) payload_size,
1371 GCCH_2s (ch),
1372 ntohl (msg->mid.mid));
1373 GNUNET_STATISTICS_update (stats,
1374 "# duplicate DATA",
1375 1,
1376 GNUNET_NO);
1377 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1378 ccc->tail_recv,
1379 com);
1380 ccc->num_recv--;
1381 GNUNET_MQ_discard (com->env);
1382 GNUNET_free (com);
1383 send_channel_data_ack (ch);
1384 return;
1385 }
1386 LOG (GNUNET_ERROR_TYPE_DEBUG,
1387 "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1388 (GNUNET_YES == ccc->client_ready)
1389 ? "out-of-order"
1390 : "client-not-ready",
1391 (unsigned int) payload_size,
1392 GCCH_2s (ch),
1393 ntohl (ccc->ccn.channel_of_client),
1394 ccc,
1395 ntohl (msg->mid.mid),
1396 ntohl (ch->mid_recv.mid));
1397 /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
1398 the sender may already be transmitting the previous one. Needs
1399 experimental evaluation to see if/when this ACK helps or
1400 hurts. (We might even want another option.) */
1401 send_channel_data_ack (ch);
1402}
1403
1404
1405/**
1406 * Function called once the tunnel has sent one of our messages.
1407 * If the message is unreliable, simply frees the `crm`. If the
1408 * message was reliable, calculate retransmission time and
1409 * wait for ACK (or retransmit).
1410 *
1411 * @param cls the `struct CadetReliableMessage` that was sent
1412 * @param cid identifier of the connection within the tunnel, NULL
1413 * if transmission failed
1414 */
1415static void
1416data_sent_cb (void *cls,
1417 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
1418
1419
1420/**
1421 * We need to retry a transmission, the last one took too long to
1422 * be acknowledged.
1423 *
1424 * @param cls the `struct CadetChannel` where we need to retransmit
1425 */
1426static void
1427retry_transmission (void *cls)
1428{
1429 struct CadetChannel *ch = cls;
1430 struct CadetReliableMessage *crm = ch->head_sent;
1431
1432 ch->retry_data_task = NULL;
1433 GNUNET_assert (NULL == crm->qe);
1434 LOG (GNUNET_ERROR_TYPE_DEBUG,
1435 "Retrying transmission on %s of message %u\n",
1436 GCCH_2s (ch),
1437 (unsigned int) ntohl (crm->data_message->mid.mid));
1438 crm->qe = GCT_send (ch->t,
1439 &crm->data_message->header,
1440 &data_sent_cb,
1441 crm);
1442 GNUNET_assert (NULL == ch->retry_data_task);
1443}
1444
1445
1446/**
1447 * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1448 * the queue and tell our client that it can send more.
1449 *
1450 * @param ch the channel that got the PLAINTEXT_DATA_ACK
1451 * @param cti identifier of the connection that delivered the message
1452 * @param crm the message that got acknowledged
1453 */
1454static void
1455handle_matching_ack (struct CadetChannel *ch,
1456 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1457 struct CadetReliableMessage *crm)
1458{
1459 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1460 ch->tail_sent,
1461 crm);
1462 ch->pending_messages--;
1463 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1464 LOG (GNUNET_ERROR_TYPE_DEBUG,
1465 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1466 GCCH_2s (ch),
1467 (unsigned int) ntohl (crm->data_message->mid.mid),
1468 ch->pending_messages);
1469 if (NULL != crm->qe)
1470 {
1471 GCT_send_cancel (crm->qe);
1472 crm->qe = NULL;
1473 }
1474 if ( (1 == crm->num_transmissions) &&
1475 (NULL != cti) )
1476 {
1477 GCC_ack_observed (cti);
1478 if (0 == memcmp (cti,
1479 &crm->connection_taken,
1480 sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
1481 {
1482 GCC_latency_observed (cti,
1483 GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
1484 }
1485 }
1486 GNUNET_free (crm->data_message);
1487 GNUNET_free (crm);
1488 send_ack_to_client (ch,
1489 (NULL == ch->owner)
1490 ? GNUNET_NO
1491 : GNUNET_YES);
1492}
1493
1494
1495/**
1496 * We got an acknowledgement for payload data for a channel.
1497 * Possibly resume transmissions.
1498 *
1499 * @param ch channel that got the ack
1500 * @param cti identifier of the connection that delivered the message
1501 * @param ack details about what was received
1502 */
1503void
1504GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1505 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1506 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1507{
1508 struct CadetReliableMessage *crm;
1509 struct CadetReliableMessage *crmn;
1510 int found;
1511 uint32_t mid_base;
1512 uint64_t mid_mask;
1513 unsigned int delta;
1514
1515 GNUNET_break (GNUNET_NO == ch->is_loopback);
1516 if (GNUNET_NO == ch->reliable)
1517 {
1518 /* not expecting ACKs on unreliable channel, odd */
1519 GNUNET_break_op (0);
1520 return;
1521 }
1522 /* mid_base is the MID of the next message that the
1523 other peer expects (i.e. that is missing!), everything
1524 LOWER (but excluding mid_base itself) was received. */
1525 mid_base = ntohl (ack->mid.mid);
1526 mid_mask = GNUNET_htonll (ack->futures);
1527 found = GNUNET_NO;
1528 for (crm = ch->head_sent;
1529 NULL != crm;
1530 crm = crmn)
1531 {
1532 crmn = crm->next;
1533 delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
1534 if (delta >= UINT_MAX - ch->max_pending_messages)
1535 {
1536 /* overflow, means crm was a bit in the past, so this ACK counts for it. */
1537 LOG (GNUNET_ERROR_TYPE_DEBUG,
1538 "Got DATA_ACK with base %u satisfying past message %u on %s\n",
1539 (unsigned int) mid_base,
1540 ntohl (crm->data_message->mid.mid),
1541 GCCH_2s (ch));
1542 handle_matching_ack (ch,
1543 cti,
1544 crm);
1545 found = GNUNET_YES;
1546 continue;
1547 }
1548 delta--;
1549 if (delta >= 64)
1550 continue;
1551 LOG (GNUNET_ERROR_TYPE_DEBUG,
1552 "Testing bit %llX for mid %u (base: %u)\n",
1553 (1LLU << delta),
1554 ntohl (crm->data_message->mid.mid),
1555 mid_base);
1556 if (0 != (mid_mask & (1LLU << delta)))
1557 {
1558 LOG (GNUNET_ERROR_TYPE_DEBUG,
1559 "Got DATA_ACK with mask for %u on %s\n",
1560 ntohl (crm->data_message->mid.mid),
1561 GCCH_2s (ch));
1562 handle_matching_ack (ch,
1563 cti,
1564 crm);
1565 found = GNUNET_YES;
1566 }
1567 }
1568 if (GNUNET_NO == found)
1569 {
1570 /* ACK for message we already dropped, might have been a
1571 duplicate ACK? Ignore. */
1572 LOG (GNUNET_ERROR_TYPE_DEBUG,
1573 "Duplicate DATA_ACK on %s, ignoring\n",
1574 GCCH_2s (ch));
1575 GNUNET_STATISTICS_update (stats,
1576 "# duplicate DATA_ACKs",
1577 1,
1578 GNUNET_NO);
1579 return;
1580 }
1581 if (NULL != ch->retry_data_task)
1582 {
1583 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1584 ch->retry_data_task = NULL;
1585 }
1586 if ( (NULL != ch->head_sent) &&
1587 (NULL == ch->head_sent->qe) )
1588 ch->retry_data_task
1589 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1590 &retry_transmission,
1591 ch);
1592}
1593
1594
1595/**
1596 * Destroy channel, based on the other peer closing the
1597 * connection. Also needs to remove this channel from
1598 * the tunnel.
1599 *
1600 * @param ch channel to destroy
1601 * @param cti identifier of the connection that delivered the message,
1602 * NULL if we are simulating receiving a destroy due to shutdown
1603 */
1604void
1605GCCH_handle_remote_destroy (struct CadetChannel *ch,
1606 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1607{
1608 struct CadetChannelClient *ccc;
1609
1610 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1611 LOG (GNUNET_ERROR_TYPE_DEBUG,
1612 "Received remote channel DESTROY for %s\n",
1613 GCCH_2s (ch));
1614 if (GNUNET_YES == ch->destroy)
1615 {
1616 /* Local client already gone, this is instant-death. */
1617 channel_destroy (ch);
1618 return;
1619 }
1620 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1621 if ( (NULL != ccc) &&
1622 (NULL != ccc->head_recv) )
1623 {
1624 LOG (GNUNET_ERROR_TYPE_WARNING,
1625 "Lost end of transmission due to remote shutdown on %s\n",
1626 GCCH_2s (ch));
1627 /* FIXME: change API to notify client about truncated transmission! */
1628 }
1629 ch->destroy = GNUNET_YES;
1630 if (NULL != ccc)
1631 GSC_handle_remote_channel_destroy (ccc->c,
1632 ccc->ccn,
1633 ch);
1634 channel_destroy (ch);
1635}
1636
1637
1638/**
1639 * Test if element @a e1 comes before element @a e2.
1640 *
1641 * @param cls closure, to a flag where we indicate duplicate packets
1642 * @param crm1 an element of to sort
1643 * @param crm2 another element to sort
1644 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1645 */
1646static int
1647cmp_crm_by_next_retry (void *cls,
1648 struct CadetReliableMessage *crm1,
1649 struct CadetReliableMessage *crm2)
1650{
1651 if (crm1->next_retry.abs_value_us <
1652 crm2->next_retry.abs_value_us)
1653 return GNUNET_YES;
1654 return GNUNET_NO;
1655}
1656
1657
1658/**
1659 * Function called once the tunnel has sent one of our messages.
1660 * If the message is unreliable, simply frees the `crm`. If the
1661 * message was reliable, calculate retransmission time and
1662 * wait for ACK (or retransmit).
1663 *
1664 * @param cls the `struct CadetReliableMessage` that was sent
1665 * @param cid identifier of the connection within the tunnel, NULL
1666 * if transmission failed
1667 */
1668static void
1669data_sent_cb (void *cls,
1670 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
1671{
1672 struct CadetReliableMessage *crm = cls;
1673 struct CadetChannel *ch = crm->ch;
1674
1675 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1676 GNUNET_assert (NULL != crm->qe);
1677 crm->qe = NULL;
1678 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1679 ch->tail_sent,
1680 crm);
1681 if (GNUNET_NO == ch->reliable)
1682 {
1683 GNUNET_free (crm->data_message);
1684 GNUNET_free (crm);
1685 ch->pending_messages--;
1686 send_ack_to_client (ch,
1687 (NULL == ch->owner)
1688 ? GNUNET_NO
1689 : GNUNET_YES);
1690 return;
1691 }
1692 if (NULL == cid)
1693 {
1694 /* There was an error sending. */
1695 crm->num_transmissions = GNUNET_SYSERR;
1696 }
1697 else if (GNUNET_SYSERR != crm->num_transmissions)
1698 {
1699 /* Increment transmission counter, and possibly store @a cid
1700 if this was the first transmission. */
1701 crm->num_transmissions++;
1702 if (1 == crm->num_transmissions)
1703 {
1704 crm->first_transmission_time = GNUNET_TIME_absolute_get ();
1705 crm->connection_taken = *cid;
1706 GCC_ack_expected (cid);
1707 }
1708 }
1709 if ( (0 == crm->retry_delay.rel_value_us) &&
1710 (NULL != cid) )
1711 {
1712 struct CadetConnection *cc = GCC_lookup (cid);
1713
1714 if (NULL != cc)
1715 crm->retry_delay = GCC_get_metrics (cc)->aged_latency;
1716 else
1717 crm->retry_delay = ch->retry_time;
1718 }
1719 crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1720 crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1721 MIN_RTT_DELAY);
1722 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1723
1724 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1725 cmp_crm_by_next_retry,
1726 NULL,
1727 ch->head_sent,
1728 ch->tail_sent,
1729 crm);
1730 LOG (GNUNET_ERROR_TYPE_DEBUG,
1731 "Message %u sent, next transmission on %s in %s\n",
1732 (unsigned int) ntohl (crm->data_message->mid.mid),
1733 GCCH_2s (ch),
1734 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1735 GNUNET_YES));
1736 if (NULL == ch->head_sent->qe)
1737 {
1738 if (NULL != ch->retry_data_task)
1739 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1740 ch->retry_data_task
1741 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1742 &retry_transmission,
1743 ch);
1744 }
1745}
1746
1747
1748/**
1749 * Handle data given by a client.
1750 *
1751 * Check whether the client is allowed to send in this tunnel, save if
1752 * channel is reliable and send an ACK to the client if there is still
1753 * buffer space in the tunnel.
1754 *
1755 * @param ch Channel.
1756 * @param sender_ccn ccn of the sender
1757 * @param buf payload to transmit.
1758 * @param buf_len number of bytes in @a buf
1759 * @return #GNUNET_OK if everything goes well,
1760 * #GNUNET_SYSERR in case of an error.
1761 */
1762int
1763GCCH_handle_local_data (struct CadetChannel *ch,
1764 struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1765 const char *buf,
1766 size_t buf_len)
1767{
1768 struct CadetReliableMessage *crm;
1769
1770 if (ch->pending_messages > ch->max_pending_messages)
1771 {
1772 GNUNET_break (0);
1773 return GNUNET_SYSERR;
1774 }
1775 if (GNUNET_YES == ch->destroy)
1776 {
1777 /* we are going down, drop messages */
1778 return GNUNET_OK;
1779 }
1780 ch->pending_messages++;
1781
1782 if (GNUNET_YES == ch->is_loopback)
1783 {
1784 struct CadetChannelClient *receiver;
1785 struct GNUNET_MQ_Envelope *env;
1786 struct GNUNET_CADET_LocalData *ld;
1787 int ack_to_owner;
1788
1789 env = GNUNET_MQ_msg_extra (ld,
1790 buf_len,
1791 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1792 if ( (NULL != ch->owner) &&
1793 (sender_ccn.channel_of_client ==
1794 ch->owner->ccn.channel_of_client) )
1795 {
1796 receiver = ch->dest;
1797 ack_to_owner = GNUNET_YES;
1798 }
1799 else if ( (NULL != ch->dest) &&
1800 (sender_ccn.channel_of_client ==
1801 ch->dest->ccn.channel_of_client) )
1802 {
1803 receiver = ch->owner;
1804 ack_to_owner = GNUNET_NO;
1805 }
1806 else
1807 {
1808 GNUNET_break (0);
1809 return GNUNET_SYSERR;
1810 }
1811 ld->ccn = receiver->ccn;
1812 GNUNET_memcpy (&ld[1],
1813 buf,
1814 buf_len);
1815 if (GNUNET_YES == receiver->client_ready)
1816 {
1817 ch->pending_messages--;
1818 GSC_send_to_client (receiver->c,
1819 env);
1820 send_ack_to_client (ch,
1821 ack_to_owner);
1822 }
1823 else
1824 {
1825 struct CadetOutOfOrderMessage *oom;
1826
1827 oom = GNUNET_new (struct CadetOutOfOrderMessage);
1828 oom->env = env;
1829 GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1830 receiver->tail_recv,
1831 oom);
1832 receiver->num_recv++;
1833 }
1834 return GNUNET_OK;
1835 }
1836
1837 /* Everything is correct, send the message. */
1838 crm = GNUNET_malloc (sizeof (*crm));
1839 crm->ch = ch;
1840 crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1841 + buf_len);
1842 crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1843 crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1844 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1845 crm->data_message->mid = ch->mid_send;
1846 crm->data_message->ctn = ch->ctn;
1847 GNUNET_memcpy (&crm->data_message[1],
1848 buf,
1849 buf_len);
1850 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1851 ch->tail_sent,
1852 crm);
1853 LOG (GNUNET_ERROR_TYPE_DEBUG,
1854 "Sending message %u from local client to %s with %u bytes\n",
1855 ntohl (crm->data_message->mid.mid),
1856 GCCH_2s (ch),
1857 buf_len);
1858 if (NULL != ch->retry_data_task)
1859 {
1860 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1861 ch->retry_data_task = NULL;
1862 }
1863 crm->qe = GCT_send (ch->t,
1864 &crm->data_message->header,
1865 &data_sent_cb,
1866 crm);
1867 GNUNET_assert (NULL == ch->retry_data_task);
1868 return GNUNET_OK;
1869}
1870
1871
1872/**
1873 * Handle ACK from client on local channel. Means the client is ready
1874 * for more data, see if we have any for it.
1875 *
1876 * @param ch channel to destroy
1877 * @param client_ccn ccn of the client sending the ack
1878 */
1879void
1880GCCH_handle_local_ack (struct CadetChannel *ch,
1881 struct GNUNET_CADET_ClientChannelNumber client_ccn)
1882{
1883 struct CadetChannelClient *ccc;
1884 struct CadetOutOfOrderMessage *com;
1885
1886 if ( (NULL != ch->owner) &&
1887 (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1888 ccc = ch->owner;
1889 else if ( (NULL != ch->dest) &&
1890 (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1891 ccc = ch->dest;
1892 else
1893 GNUNET_assert (0);
1894 ccc->client_ready = GNUNET_YES;
1895 com = ccc->head_recv;
1896 if (NULL == com)
1897 {
1898 LOG (GNUNET_ERROR_TYPE_DEBUG,
1899 "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1900 GSC_2s (ccc->c),
1901 ntohl (client_ccn.channel_of_client),
1902 GCCH_2s (ch),
1903 ntohl (ccc->ccn.channel_of_client),
1904 ccc);
1905 return; /* none pending */
1906 }
1907 if (GNUNET_YES == ch->is_loopback)
1908 {
1909 int to_owner;
1910
1911 /* Messages are always in-order, just send */
1912 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1913 ccc->tail_recv,
1914 com);
1915 ccc->num_recv--;
1916 GSC_send_to_client (ccc->c,
1917 com->env);
1918 /* Notify sender that we can receive more */
1919 if (ccc->ccn.channel_of_client ==
1920 ch->owner->ccn.channel_of_client)
1921 {
1922 to_owner = GNUNET_NO;
1923 }
1924 else
1925 {
1926 GNUNET_assert (ccc->ccn.channel_of_client ==
1927 ch->dest->ccn.channel_of_client);
1928 to_owner = GNUNET_YES;
1929 }
1930 send_ack_to_client (ch,
1931 to_owner);
1932 GNUNET_free (com);
1933 return;
1934 }
1935
1936 if ( (com->mid.mid != ch->mid_recv.mid) &&
1937 (GNUNET_NO == ch->out_of_order) &&
1938 (GNUNET_YES == ch->reliable) )
1939 {
1940 LOG (GNUNET_ERROR_TYPE_DEBUG,
1941 "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1942 GSC_2s (ccc->c),
1943 ntohl (ccc->ccn.channel_of_client),
1944 ntohl (com->mid.mid),
1945 ntohl (ch->mid_recv.mid));
1946 return; /* missing next one in-order */
1947 }
1948
1949 LOG (GNUNET_ERROR_TYPE_DEBUG,
1950 "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
1951 ntohl (com->mid.mid),
1952 GSC_2s (ccc->c),
1953 ntohl (ccc->ccn.channel_of_client),
1954 GCCH_2s (ch));
1955
1956 /* all good, pass next message to client */
1957 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1958 ccc->tail_recv,
1959 com);
1960 ccc->num_recv--;
1961 /* FIXME: if unreliable, this is not aggressive
1962 enough, as it would be OK to have lost some! */
1963
1964 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1965 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1966 ccc->client_ready = GNUNET_NO;
1967 GSC_send_to_client (ccc->c,
1968 com->env);
1969 GNUNET_free (com);
1970 send_channel_data_ack (ch);
1971 if (NULL != ccc->head_recv)
1972 return;
1973 if (GNUNET_NO == ch->destroy)
1974 return;
1975 GCT_send_channel_destroy (ch->t,
1976 ch->ctn);
1977 channel_destroy (ch);
1978}
1979
1980
1981#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1982
1983
1984/**
1985 * Log channel info.
1986 *
1987 * @param ch Channel.
1988 * @param level Debug level to use.
1989 */
1990void
1991GCCH_debug (struct CadetChannel *ch,
1992 enum GNUNET_ErrorType level)
1993{
1994 int do_log;
1995
1996 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1997 "cadet-chn",
1998 __FILE__, __FUNCTION__, __LINE__);
1999 if (0 == do_log)
2000 return;
2001
2002 if (NULL == ch)
2003 {
2004 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
2005 return;
2006 }
2007 LOG2 (level,
2008 "CHN %s:%X (%p)\n",
2009 GCT_2s (ch->t),
2010 ch->ctn,
2011 ch);
2012 if (NULL != ch->owner)
2013 {
2014 LOG2 (level,
2015 "CHN origin %s ready %s local-id: %u\n",
2016 GSC_2s (ch->owner->c),
2017 ch->owner->client_ready ? "YES" : "NO",
2018 ntohl (ch->owner->ccn.channel_of_client));
2019 }
2020 if (NULL != ch->dest)
2021 {
2022 LOG2 (level,
2023 "CHN destination %s ready %s local-id: %u\n",
2024 GSC_2s (ch->dest->c),
2025 ch->dest->client_ready ? "YES" : "NO",
2026 ntohl (ch->dest->ccn.channel_of_client));
2027 }
2028 LOG2 (level,
2029 "CHN Message IDs recv: %d (%LLX), send: %d\n",
2030 ntohl (ch->mid_recv.mid),
2031 (unsigned long long) ch->mid_futures,
2032 ntohl (ch->mid_send.mid));
2033}
2034
2035
2036
2037/* end of gnunet-service-cadet-new_channel.c */