aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/gnunet-service-cadet_channel.c')
-rw-r--r--src/cadet/gnunet-service-cadet_channel.c3461
1 files changed, 1468 insertions, 1993 deletions
diff --git a/src/cadet/gnunet-service-cadet_channel.c b/src/cadet/gnunet-service-cadet_channel.c
index 7b7c6e57c..68e29b66b 100644
--- a/src/cadet/gnunet-service-cadet_channel.c
+++ b/src/cadet/gnunet-service-cadet_channel.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V. 3 Copyright (C) 2001-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -17,31 +17,63 @@
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA. 18 Boston, MA 02110-1301, USA.
19*/ 19*/
20 20/**
21 21 * @file cadet/gnunet-service-cadet_channel.c
22 * @brief logical links between CADET clients
23 * @author Bartlomiej Polot
24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - Congestion/flow control:
28 * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
29 * (and figure out how/where to use this!)
30 * + figure out flow control without ACKs (unreliable traffic!)
31 * - revisit handling of 'unbuffered' traffic!
32 * (need to push down through tunnel into connection selection)
33 * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
34 * reserve more bits in 'options' to allow for buffer size control?
35 */
22#include "platform.h" 36#include "platform.h"
23#include "gnunet_util_lib.h" 37#include "cadet.h"
24
25#include "gnunet_statistics_service.h" 38#include "gnunet_statistics_service.h"
39#include "gnunet-service-cadet_channel.h"
40#include "gnunet-service-cadet_connection.h"
41#include "gnunet-service-cadet_tunnels.h"
42#include "gnunet-service-cadet_paths.h"
26 43
27#include "cadet.h" 44#define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
28#include "cadet_protocol.h"
29 45
30#include "gnunet-service-cadet_channel.h" 46/**
31#include "gnunet-service-cadet_local.h" 47 * How long do we initially wait before retransmitting?
32#include "gnunet-service-cadet_tunnel.h" 48 */
33#include "gnunet-service-cadet_peer.h" 49#define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
34 50
35#define LOG(level, ...) GNUNET_log_from(level,"cadet-chn",__VA_ARGS__) 51/**
36#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) 52 * How long do we wait before dropping state about incoming
53 * connection to closed port?
54 */
55#define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
37 56
38#define CADET_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(\ 57/**
39 GNUNET_TIME_UNIT_MILLISECONDS, 250) 58 * How long do we wait at least before retransmitting ever?
40#define CADET_RETRANSMIT_MARGIN 4 59 */
60#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
61
62/**
63 * Maximum message ID into the future we accept for out-of-order messages.
64 * If the message is more than this into the future, we drop it. This is
65 * important both to detect values that are actually in the past, as well
66 * as to limit adversarially triggerable memory consumption.
67 *
68 * Note that right now we have "max_pending_messages = 4" hard-coded in
69 * the logic below, so a value of 4 would suffice here. But we plan to
70 * allow larger windows in the future...
71 */
72#define MAX_OUT_OF_ORDER_DISTANCE 1024
41 73
42 74
43/** 75/**
44 * All the states a connection can be in. 76 * All the states a channel can be in.
45 */ 77 */
46enum CadetChannelState 78enum CadetChannelState
47{ 79{
@@ -51,9 +83,15 @@ enum CadetChannelState
51 CADET_CHANNEL_NEW, 83 CADET_CHANNEL_NEW,
52 84
53 /** 85 /**
54 * Connection create message sent, waiting for ACK. 86 * Channel is to a port that is not open, we're waiting for the
87 * port to be opened.
88 */
89 CADET_CHANNEL_LOOSE,
90
91 /**
92 * CHANNEL_OPEN message sent, waiting for CHANNEL_OPEN_ACK.
55 */ 93 */
56 CADET_CHANNEL_SENT, 94 CADET_CHANNEL_OPEN_SENT,
57 95
58 /** 96 /**
59 * Connection confirmed, ready to carry traffic. 97 * Connection confirmed, ready to carry traffic.
@@ -63,138 +101,144 @@ enum CadetChannelState
63 101
64 102
65/** 103/**
66 * Info holder for channel messages in queues. 104 * Info needed to retry a message in case it gets lost.
105 * Note that we DO use this structure also for unreliable
106 * messages.
67 */ 107 */
68struct CadetChannelQueue 108struct CadetReliableMessage
69{ 109{
70 /** 110 /**
71 * Tunnel Queue. 111 * Double linked list, FIFO style
72 */ 112 */
73 struct CadetTunnelQueue *tq; 113 struct CadetReliableMessage *next;
74 114
75 /** 115 /**
76 * Message type (DATA/DATA_ACK) 116 * Double linked list, FIFO style
77 */ 117 */
78 uint16_t type; 118 struct CadetReliableMessage *prev;
79 119
80 /** 120 /**
81 * Message copy (for DATAs, to start retransmission timer) 121 * Which channel is this message in?
82 */ 122 */
83 struct CadetReliableMessage *copy; 123 struct CadetChannel *ch;
84 124
85 /** 125 /**
86 * Reliability (for DATA_ACKs, to access rel->ack_q) 126 * Entry in the tunnels queue for this message, NULL if it has left
127 * the tunnel. Used to cancel transmission in case we receive an
128 * ACK in time.
87 */ 129 */
88 struct CadetChannelReliability *rel; 130 struct CadetTunnelQueueEntry *qe;
89};
90
91 131
92/**
93 * Info needed to retry a message in case it gets lost.
94 */
95struct CadetReliableMessage
96{
97 /** 132 /**
98 * Double linked list, FIFO style 133 * Data message we are trying to send.
99 */ 134 */
100 struct CadetReliableMessage *next; 135 struct GNUNET_CADET_ChannelAppDataMessage *data_message;
101 struct CadetReliableMessage *prev;
102 136
103 /** 137 /**
104 * Type of message (payload, channel management). 138 * How soon should we retry if we fail to get an ACK?
139 * Messages in the queue are sorted by this value.
105 */ 140 */
106 int16_t type; 141 struct GNUNET_TIME_Absolute next_retry;
107 142
108 /** 143 /**
109 * Tunnel Reliability queue this message is in. 144 * How long do we wait for an ACK after transmission?
145 * Use for the back-off calculation.
110 */ 146 */
111 struct CadetChannelReliability *rel; 147 struct GNUNET_TIME_Relative retry_delay;
112 148
113 /** 149 /**
114 * ID of the message (ACK needed to free) 150 * Time when we first successfully transmitted the message
151 * (that is, set @e num_transmissions to 1).
115 */ 152 */
116 uint32_t mid; 153 struct GNUNET_TIME_Absolute first_transmission_time;
117 154
118 /** 155 /**
119 * Tunnel Queue. 156 * Identifier of the connection that this message took when it
157 * was first transmitted. Only useful if @e num_transmissions is 1.
120 */ 158 */
121 struct CadetChannelQueue *chq; 159 struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
122 160
123 /** 161 /**
124 * When was this message issued (to calculate ACK delay) 162 * How often was this message transmitted? #GNUNET_SYSERR if there
163 * was an error transmitting the message, #GNUNET_NO if it was not
164 * yet transmitted ever, otherwise the number of (re) transmissions.
125 */ 165 */
126 struct GNUNET_TIME_Absolute timestamp; 166 int num_transmissions;
127 167
128 /* struct GNUNET_CADET_ChannelAppDataMessage with payload */
129}; 168};
130 169
131 170
132/** 171/**
133 * Info about the traffic state for a client in a channel. 172 * List of received out-of-order data messages.
134 */ 173 */
135struct CadetChannelReliability 174struct CadetOutOfOrderMessage
136{ 175{
137 /** 176 /**
138 * Channel this is about. 177 * Double linked list, FIFO style
139 */ 178 */
140 struct CadetChannel *ch; 179 struct CadetOutOfOrderMessage *next;
141 180
142 /** 181 /**
143 * DLL of messages sent and not yet ACK'd. 182 * Double linked list, FIFO style
144 */ 183 */
145 struct CadetReliableMessage *head_sent; 184 struct CadetOutOfOrderMessage *prev;
146 struct CadetReliableMessage *tail_sent;
147 185
148 /** 186 /**
149 * DLL of messages received out of order. 187 * ID of the message (messages up to this point needed
188 * before we give this one to the client).
150 */ 189 */
151 struct CadetReliableMessage *head_recv; 190 struct ChannelMessageIdentifier mid;
152 struct CadetReliableMessage *tail_recv;
153 191
154 /** 192 /**
155 * Messages received. 193 * The envelope with the payload of the out-of-order message
156 */ 194 */
157 unsigned int n_recv; 195 struct GNUNET_MQ_Envelope *env;
158 196
159 /** 197};
160 * Next MID to use for outgoing traffic.
161 */
162 uint32_t mid_send;
163 198
164 /**
165 * Next MID expected for incoming traffic.
166 */
167 uint32_t mid_recv;
168 199
200/**
201 * Client endpoint of a `struct CadetChannel`. A channel may be a
202 * loopback channel, in which case it has two of these endpoints.
203 * Note that flow control also is required in both directions.
204 */
205struct CadetChannelClient
206{
169 /** 207 /**
170 * Handle for queued unique data CREATE, DATA_ACK. 208 * Client handle. Not by itself sufficient to designate
209 * the client endpoint, as the same client handle may
210 * be used for both the owner and the destination, and
211 * we thus also need the channel ID to identify the client.
171 */ 212 */
172 struct CadetChannelQueue *uniq; 213 struct CadetClient *c;
173 214
174 /** 215 /**
175 * Can we send data to the client? 216 * Head of DLL of messages received out of order or while client was unready.
176 */ 217 */
177 int client_ready; 218 struct CadetOutOfOrderMessage *head_recv;
178 219
179 /** 220 /**
180 * Can the client send data to us? 221 * Tail DLL of messages received out of order or while client was unready.
181 */ 222 */
182 int client_allowed; 223 struct CadetOutOfOrderMessage *tail_recv;
183 224
184 /** 225 /**
185 * Task to resend/poll in case no ACK is received. 226 * Local tunnel number for this client.
227 * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
228 * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
186 */ 229 */
187 struct GNUNET_SCHEDULER_Task * retry_task; 230 struct GNUNET_CADET_ClientChannelNumber ccn;
188 231
189 /** 232 /**
190 * Counter for exponential backoff. 233 * Number of entries currently in @a head_recv DLL.
191 */ 234 */
192 struct GNUNET_TIME_Relative retry_timer; 235 unsigned int num_recv;
193 236
194 /** 237 /**
195 * How long does it usually take to get an ACK. 238 * Can we send data to the client?
196 */ 239 */
197 struct GNUNET_TIME_Relative expected_delay; 240 int client_ready;
241
198}; 242};
199 243
200 244
@@ -209,41 +253,44 @@ struct CadetChannel
209 struct CadetTunnel *t; 253 struct CadetTunnel *t;
210 254
211 /** 255 /**
212 * Destination port of the channel. 256 * Client owner of the tunnel, if any.
257 * (Used if this channel represends the initiating end of the tunnel.)
213 */ 258 */
214 struct GNUNET_HashCode port; 259 struct CadetChannelClient *owner;
215 260
216 /** 261 /**
217 * Global channel number ( < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) 262 * Client destination of the tunnel, if any.
263 * (Used if this channel represents the listening end of the tunnel.)
218 */ 264 */
219 struct GNUNET_CADET_ChannelTunnelNumber gid; 265 struct CadetChannelClient *dest;
220 266
221 /** 267 /**
222 * Local tunnel number for root (owner) client. 268 * Last entry in the tunnel's queue relating to control messages
223 * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 ) 269 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
270 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
271 * transmission in case we receive updated information.
224 */ 272 */
225 struct GNUNET_CADET_ClientChannelNumber lid_root; 273 struct CadetTunnelQueueEntry *last_control_qe;
226 274
227 /** 275 /**
228 * Local tunnel number for local destination clients (incoming number) 276 * Head of DLL of messages sent and not yet ACK'd.
229 * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV or 0).
230 */ 277 */
231 struct GNUNET_CADET_ClientChannelNumber lid_dest; 278 struct CadetReliableMessage *head_sent;
232 279
233 /** 280 /**
234 * Channel state. 281 * Tail of DLL of messages sent and not yet ACK'd.
235 */ 282 */
236 enum CadetChannelState state; 283 struct CadetReliableMessage *tail_sent;
237 284
238 /** 285 /**
239 * Is the tunnel bufferless (minimum latency)? 286 * Task to resend/poll in case no ACK is received.
240 */ 287 */
241 int nobuffer; 288 struct GNUNET_SCHEDULER_Task *retry_control_task;
242 289
243 /** 290 /**
244 * Is the tunnel reliable? 291 * Task to resend/poll in case no ACK is received.
245 */ 292 */
246 int reliable; 293 struct GNUNET_SCHEDULER_Task *retry_data_task;
247 294
248 /** 295 /**
249 * Last time the channel was used 296 * Last time the channel was used
@@ -251,21 +298,29 @@ struct CadetChannel
251 struct GNUNET_TIME_Absolute timestamp; 298 struct GNUNET_TIME_Absolute timestamp;
252 299
253 /** 300 /**
254 * Client owner of the tunnel, if any 301 * Destination port of the channel.
255 */ 302 */
256 struct CadetClient *root; 303 struct GNUNET_HashCode port;
257 304
258 /** 305 /**
259 * Client destination of the tunnel, if any. 306 * Counter for exponential backoff.
260 */ 307 */
261 struct CadetClient *dest; 308 struct GNUNET_TIME_Relative retry_time;
262 309
263 /** 310 /**
264 * Flag to signal the destruction of the channel. 311 * Bitfield of already-received messages past @e mid_recv.
265 * If this is set to #GNUNET_YES the channel will be destroyed
266 * when the queue is empty.
267 */ 312 */
268 int destroy; 313 uint64_t mid_futures;
314
315 /**
316 * Next MID expected for incoming traffic.
317 */
318 struct ChannelMessageIdentifier mid_recv;
319
320 /**
321 * Next MID to use for outgoing traffic.
322 */
323 struct ChannelMessageIdentifier mid_send;
269 324
270 /** 325 /**
271 * Total (reliable) messages pending ACK for this channel. 326 * Total (reliable) messages pending ACK for this channel.
@@ -273,2290 +328,1710 @@ struct CadetChannel
273 unsigned int pending_messages; 328 unsigned int pending_messages;
274 329
275 /** 330 /**
276 * Reliability data. 331 * Maximum (reliable) messages pending ACK for this channel
277 * Only present (non-NULL) at the owner of a tunnel. 332 * before we throttle the client.
278 */ 333 */
279 struct CadetChannelReliability *root_rel; 334 unsigned int max_pending_messages;
280 335
281 /** 336 /**
282 * Reliability data. 337 * Number identifying this channel in its tunnel.
283 * Only present (non-NULL) at the destination of a tunnel.
284 */ 338 */
285 struct CadetChannelReliability *dest_rel; 339 struct GNUNET_CADET_ChannelTunnelNumber ctn;
286 340
287}; 341 /**
342 * Channel state.
343 */
344 enum CadetChannelState state;
288 345
346 /**
347 * Count how many ACKs we skipped, used to prevent long
348 * sequences of ACK skipping.
349 */
350 unsigned int skip_ack_series;
289 351
290/******************************************************************************/ 352 /**
291/******************************* GLOBALS ***********************************/ 353 * Is the tunnel bufferless (minimum latency)?
292/******************************************************************************/ 354 */
355 int nobuffer;
293 356
294/** 357 /**
295 * Global handle to the statistics service. 358 * Is the tunnel reliable?
296 */ 359 */
297extern struct GNUNET_STATISTICS_Handle *stats; 360 int reliable;
298 361
299/** 362 /**
300 * Local peer own ID (memory efficient handle). 363 * Is the tunnel out-of-order?
301 */ 364 */
302extern GNUNET_PEER_Id myid; 365 int out_of_order;
303 366
367 /**
368 * Is this channel a loopback channel, where the destination is us again?
369 */
370 int is_loopback;
304 371
305/******************************************************************************/ 372 /**
306/******************************** STATIC ***********************************/ 373 * Flag to signal the destruction of the channel. If this is set to
307/******************************************************************************/ 374 * #GNUNET_YES the channel will be destroyed once the queue is
375 * empty.
376 */
377 int destroy;
308 378
379};
309 380
310/**
311 * Destroy a reliable message after it has been acknowledged, either by
312 * direct mid ACK or bitfield. Updates the appropriate data structures and
313 * timers and frees all memory.
314 *
315 * @param copy Message that is no longer needed: remote peer got it.
316 * @param update_time Is the timing information relevant?
317 * If this message is ACK in a batch the timing information
318 * is skewed by the retransmission, count only for the
319 * retransmitted message.
320 *
321 * @return #GNUNET_YES if channel was destroyed as a result of the call,
322 * #GNUNET_NO otherwise.
323 */
324static int
325rel_message_free (struct CadetReliableMessage *copy, int update_time);
326 381
327/** 382/**
328 * send a channel create message. 383 * Get the static string for identification of the channel.
329 * 384 *
330 * @param ch Channel for which to send. 385 * @param ch Channel.
331 */
332static void
333send_create (struct CadetChannel *ch);
334
335/**
336 * Confirm we got a channel create, FWD ack.
337 * 386 *
338 * @param ch The channel to confirm. 387 * @return Static string with the channel IDs.
339 * @param fwd Should we send a FWD ACK? (going dest->root)
340 */ 388 */
341static void 389const char *
342send_ack (struct CadetChannel *ch, int fwd); 390GCCH_2s (const struct CadetChannel *ch)
343 391{
392 static char buf[128];
393
394 GNUNET_snprintf (buf,
395 sizeof (buf),
396 "Channel %s:%s ctn:%X(%X/%X)",
397 (GNUNET_YES == ch->is_loopback)
398 ? "loopback"
399 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
400 GNUNET_h2s (&ch->port),
401 ch->ctn,
402 (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
403 (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
404 return buf;
405}
344 406
345 407
346/** 408/**
347 * Test if the channel is loopback: both root and dest are on the local peer. 409 * Get the channel's public ID.
348 * 410 *
349 * @param ch Channel to test. 411 * @param ch Channel.
350 * 412 *
351 * @return #GNUNET_YES if channel is loopback, #GNUNET_NO otherwise. 413 * @return ID used to identify the channel with the remote peer.
352 */ 414 */
353static int 415struct GNUNET_CADET_ChannelTunnelNumber
354is_loopback (const struct CadetChannel *ch) 416GCCH_get_id (const struct CadetChannel *ch)
355{ 417{
356 if (NULL != ch->t) 418 return ch->ctn;
357 return GCT_is_loopback (ch->t);
358
359 return (NULL != ch->root && NULL != ch->dest);
360} 419}
361 420
362 421
363/** 422/**
364 * Save a copy of the data message for later retransmission. 423 * Release memory associated with @a ccc
365 * 424 *
366 * @param msg Message to copy. 425 * @param ccc data structure to clean up
367 * @param mid Message ID.
368 * @param rel Reliability data for retransmission.
369 */ 426 */
370static struct CadetReliableMessage * 427static void
371copy_message (const struct GNUNET_CADET_ChannelAppDataMessage *msg, uint32_t mid, 428free_channel_client (struct CadetChannelClient *ccc)
372 struct CadetChannelReliability *rel)
373{ 429{
374 struct CadetReliableMessage *copy; 430 struct CadetOutOfOrderMessage *com;
375 uint16_t size;
376 431
377 size = ntohs (msg->header.size); 432 while (NULL != (com = ccc->head_recv))
378 copy = GNUNET_malloc (sizeof (*copy) + size); 433 {
379 copy->mid = mid; 434 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
380 copy->rel = rel; 435 ccc->tail_recv,
381 copy->type = GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA; 436 com);
382 GNUNET_memcpy (&copy[1], msg, size); 437 ccc->num_recv--;
383 438 GNUNET_MQ_discard (com->env);
384 return copy; 439 GNUNET_free (com);
440 }
441 GNUNET_free (ccc);
385} 442}
386 443
444
387/** 445/**
388 * We have received a message out of order, or the client is not ready. 446 * Destroy the given channel.
389 * Buffer it until we receive an ACK from the client or the missing
390 * message from the channel.
391 * 447 *
392 * @param msg Message to buffer (MUST be of type CADET_DATA). 448 * @param ch channel to destroy
393 * @param rel Reliability data to the corresponding direction.
394 */ 449 */
395static void 450static void
396add_buffered_data (const struct GNUNET_CADET_ChannelAppDataMessage *msg, 451channel_destroy (struct CadetChannel *ch)
397 struct CadetChannelReliability *rel)
398{ 452{
399 struct CadetReliableMessage *copy; 453 struct CadetReliableMessage *crm;
400 struct CadetReliableMessage *prev;
401 uint32_t mid;
402
403 mid = ntohl (msg->mid);
404 454
405 LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data MID %u (%u)\n", 455 while (NULL != (crm = ch->head_sent))
406 mid, rel->n_recv);
407
408 rel->n_recv++;
409
410 // FIXME do something better than O(n), although n < 64...
411 // FIXME start from the end (most messages are the latest ones)
412 for (prev = rel->head_recv; NULL != prev; prev = prev->next)
413 { 456 {
414 LOG (GNUNET_ERROR_TYPE_DEBUG, " prev %u\n", prev->mid); 457 GNUNET_assert (ch == crm->ch);
415 if (prev->mid == mid) 458 if (NULL != crm->qe)
416 { 459 {
417 LOG (GNUNET_ERROR_TYPE_DEBUG, " already there!\n"); 460 GCT_send_cancel (crm->qe);
418 rel->n_recv--; 461 crm->qe = NULL;
419 return;
420 }
421 else if (GC_is_pid_bigger (prev->mid, mid))
422 {
423 LOG (GNUNET_ERROR_TYPE_DEBUG, " bingo!\n");
424 copy = copy_message (msg, mid, rel);
425 GNUNET_CONTAINER_DLL_insert_before (rel->head_recv, rel->tail_recv,
426 prev, copy);
427 return;
428 } 462 }
463 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
464 ch->tail_sent,
465 crm);
466 GNUNET_free (crm->data_message);
467 GNUNET_free (crm);
468 }
469 if (NULL != ch->owner)
470 {
471 free_channel_client (ch->owner);
472 ch->owner = NULL;
429 } 473 }
430 copy = copy_message (msg, mid, rel); 474 if (NULL != ch->dest)
431 LOG (GNUNET_ERROR_TYPE_DEBUG, " insert at tail! (now: %u)\n", rel->n_recv); 475 {
432 GNUNET_CONTAINER_DLL_insert_tail (rel->head_recv, rel->tail_recv, copy); 476 free_channel_client (ch->dest);
433 LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data END\n"); 477 ch->dest = NULL;
478 }
479 if (NULL != ch->last_control_qe)
480 {
481 GCT_send_cancel (ch->last_control_qe);
482 ch->last_control_qe = NULL;
483 }
484 if (NULL != ch->retry_data_task)
485 {
486 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
487 ch->retry_data_task = NULL;
488 }
489 if (NULL != ch->retry_control_task)
490 {
491 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
492 ch->retry_control_task = NULL;
493 }
494 if (GNUNET_NO == ch->is_loopback)
495 {
496 GCT_remove_channel (ch->t,
497 ch,
498 ch->ctn);
499 ch->t = NULL;
500 }
501 GNUNET_free (ch);
434} 502}
435 503
436 504
437/** 505/**
438 * Add a destination client to a channel, initializing all data structures 506 * Send a channel create message.
439 * in the channel and the client.
440 * 507 *
441 * @param ch Channel to which add the destination. 508 * @param cls Channel for which to send.
442 * @param c Client which to add to the channel.
443 */ 509 */
444static void 510static void
445add_destination (struct CadetChannel *ch, struct CadetClient *c) 511send_channel_open (void *cls);
446{
447 if (NULL != ch->dest)
448 {
449 GNUNET_break (0);
450 return;
451 }
452
453 /* Assign local id as destination */
454 ch->lid_dest = GML_get_next_ccn (c);
455
456 /* Store in client's hashmap */
457 GML_channel_add (c, ch->lid_dest, ch);
458
459 GNUNET_break (NULL == ch->dest_rel);
460 ch->dest_rel = GNUNET_new (struct CadetChannelReliability);
461 ch->dest_rel->ch = ch;
462 ch->dest_rel->expected_delay.rel_value_us = 0;
463 ch->dest_rel->retry_timer = CADET_RETRANSMIT_TIME;
464
465 ch->dest = c;
466}
467 512
468 513
469/** 514/**
470 * Set options in a channel, extracted from a bit flag field. 515 * Function called once the tunnel confirms that we sent the
516 * create message. Delays for a bit until we retry.
471 * 517 *
472 * @param ch Channel to set options to. 518 * @param cls our `struct CadetChannel`.
473 * @param options Bit array in host byte order. 519 * @param cid identifier of the connection within the tunnel, NULL
520 * if transmission failed
474 */ 521 */
475static void 522static void
476channel_set_options (struct CadetChannel *ch, uint32_t options) 523channel_open_sent_cb (void *cls,
524 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
477{ 525{
478 ch->nobuffer = (options & GNUNET_CADET_OPTION_NOBUFFER) != 0 ? 526 struct CadetChannel *ch = cls;
479 GNUNET_YES : GNUNET_NO; 527
480 ch->reliable = (options & GNUNET_CADET_OPTION_RELIABLE) != 0 ? 528 GNUNET_assert (NULL != ch->last_control_qe);
481 GNUNET_YES : GNUNET_NO; 529 ch->last_control_qe = NULL;
530 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
531 LOG (GNUNET_ERROR_TYPE_DEBUG,
532 "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
533 GCCH_2s (ch),
534 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
535 GNUNET_YES));
536 ch->retry_control_task
537 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
538 &send_channel_open,
539 ch);
482} 540}
483 541
484 542
485/** 543/**
486 * Get a bit flag field with the options of a channel. 544 * Send a channel open message.
487 * 545 *
488 * @param ch Channel to get options from. 546 * @param cls Channel for which to send.
489 *
490 * @return Bit array in host byte order.
491 */ 547 */
492static uint32_t 548static void
493channel_get_options (struct CadetChannel *ch) 549send_channel_open (void *cls)
494{ 550{
551 struct CadetChannel *ch = cls;
552 struct GNUNET_CADET_ChannelOpenMessage msgcc;
495 uint32_t options; 553 uint32_t options;
496 554
555 ch->retry_control_task = NULL;
556 LOG (GNUNET_ERROR_TYPE_DEBUG,
557 "Sending CHANNEL_OPEN message for %s\n",
558 GCCH_2s (ch));
497 options = 0; 559 options = 0;
498 if (ch->nobuffer) 560 if (ch->nobuffer)
499 options |= GNUNET_CADET_OPTION_NOBUFFER; 561 options |= GNUNET_CADET_OPTION_NOBUFFER;
500 if (ch->reliable) 562 if (ch->reliable)
501 options |= GNUNET_CADET_OPTION_RELIABLE; 563 options |= GNUNET_CADET_OPTION_RELIABLE;
502 564 if (ch->out_of_order)
503 return options; 565 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
566 msgcc.header.size = htons (sizeof (msgcc));
567 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
568 msgcc.opt = htonl (options);
569 msgcc.port = ch->port;
570 msgcc.ctn = ch->ctn;
571 ch->state = CADET_CHANNEL_OPEN_SENT;
572 if (NULL != ch->last_control_qe)
573 GCT_send_cancel (ch->last_control_qe);
574 ch->last_control_qe = GCT_send (ch->t,
575 &msgcc.header,
576 &channel_open_sent_cb,
577 ch);
578 GNUNET_assert (NULL == ch->retry_control_task);
504} 579}
505 580
506 581
507/** 582/**
508 * Notify a client that the channel is no longer valid. 583 * Function called once and only once after a channel was bound
509 * 584 * to its tunnel via #GCT_add_channel() is ready for transmission.
510 * @param ch Channel that is destroyed. 585 * Note that this is only the case for channels that this peer
511 * @param local_only Should we avoid sending it to other peers? 586 * initiates, as for incoming channels we assume that they are
587 * ready for transmission immediately upon receiving the open
588 * message. Used to bootstrap the #GCT_send() process.
589 *
590 * @param ch the channel for which the tunnel is now ready
512 */ 591 */
513static void 592void
514send_destroy (struct CadetChannel *ch, int local_only) 593GCCH_tunnel_up (struct CadetChannel *ch)
515{ 594{
516 struct GNUNET_CADET_ChannelManageMessage msg; 595 GNUNET_assert (NULL == ch->retry_control_task);
517 596 LOG (GNUNET_ERROR_TYPE_DEBUG,
518 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); 597 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
519 msg.header.size = htons (sizeof (msg)); 598 GCCH_2s (ch));
520 msg.ctn = ch->gid; 599 ch->retry_control_task
521 600 = GNUNET_SCHEDULER_add_now (&send_channel_open,
522 /* If root is not NULL, notify. 601 ch);
523 * If it's NULL, check lid_root. When a local destroy comes in, root
524 * is set to NULL but lid_root is left untouched. In this case, do nothing,
525 * the client is the one who requested the channel to be destroyed.
526 */
527 if (NULL != ch->root)
528 GML_send_channel_destroy (ch->root, ch->lid_root);
529 else if (0 == ch->lid_root.channel_of_client && GNUNET_NO == local_only)
530 GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
531
532 if (NULL != ch->dest)
533 GML_send_channel_destroy (ch->dest, ch->lid_dest);
534 else if (0 == ch->lid_dest.channel_of_client && GNUNET_NO == local_only)
535 GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, NULL);
536} 602}
537 603
538 604
539/** 605/**
540 * Notify the destination client that a new incoming channel was created. 606 * Create a new channel.
541 * 607 *
542 * @param ch Channel that was created. 608 * @param owner local client owning the channel
609 * @param ccn local number of this channel at the @a owner
610 * @param destination peer to which we should build the channel
611 * @param port desired port at @a destination
612 * @param options options for the channel
613 * @return handle to the new channel
543 */ 614 */
544static void 615struct CadetChannel *
545send_client_create (struct CadetChannel *ch) 616GCCH_channel_local_new (struct CadetClient *owner,
617 struct GNUNET_CADET_ClientChannelNumber ccn,
618 struct CadetPeer *destination,
619 const struct GNUNET_HashCode *port,
620 uint32_t options)
546{ 621{
547 uint32_t opt; 622 struct CadetChannel *ch;
548 623 struct CadetChannelClient *ccco;
549 if (NULL == ch->dest)
550 return;
551
552 opt = 0;
553 opt |= GNUNET_YES == ch->reliable ? GNUNET_CADET_OPTION_RELIABLE : 0;
554 opt |= GNUNET_YES == ch->nobuffer ? GNUNET_CADET_OPTION_NOBUFFER : 0;
555 GML_send_channel_create (ch->dest,
556 ch->lid_dest,
557 &ch->port,
558 opt,
559 GCT_get_destination (ch->t));
560
561}
562 624
625 ccco = GNUNET_new (struct CadetChannelClient);
626 ccco->c = owner;
627 ccco->ccn = ccn;
628 ccco->client_ready = GNUNET_YES;
563 629
564/** 630 ch = GNUNET_new (struct CadetChannel);
565 * Send data to a client. 631 ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
566 * 632 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
567 * If the client is ready, send directly, otherwise buffer while listening 633 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
568 * for a local ACK. 634 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
569 * 635 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
570 * @param ch Channel 636 ch->owner = ccco;
571 * @param msg Message. 637 ch->port = *port;
572 * @param fwd Is this a fwd (root->dest) message? 638 if (0 == memcmp (&my_full_id,
573 */ 639 GCP_get_id (destination),
574static void 640 sizeof (struct GNUNET_PeerIdentity)))
575send_client_data (struct CadetChannel *ch, 641 {
576 const struct GNUNET_CADET_ChannelAppDataMessage *msg, 642 struct CadetClient *c;
577 int fwd) 643
578{ 644 ch->is_loopback = GNUNET_YES;
579 if (fwd) 645 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
580 { 646 port);
581 if (ch->dest_rel->client_ready) 647 if (NULL == c)
582 { 648 {
583 GML_send_data (ch->dest, msg, ch->lid_dest); 649 /* port closed, wait for it to possibly open */
584 ch->dest_rel->client_ready = GNUNET_NO; 650 ch->state = CADET_CHANNEL_LOOSE;
585 ch->dest_rel->mid_recv++; 651 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
652 port,
653 ch,
654 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
655 LOG (GNUNET_ERROR_TYPE_DEBUG,
656 "Created loose incoming loopback channel to port %s\n",
657 GNUNET_h2s (&ch->port));
586 } 658 }
587 else 659 else
588 add_buffered_data (msg, ch->dest_rel); 660 {
661 GCCH_bind (ch,
662 c);
663 }
589 } 664 }
590 else 665 else
591 { 666 {
592 if (ch->root_rel->client_ready) 667 ch->t = GCP_get_tunnel (destination,
593 { 668 GNUNET_YES);
594 GML_send_data (ch->root, msg, ch->lid_root); 669 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
595 ch->root_rel->client_ready = GNUNET_NO; 670 ch->ctn = GCT_add_channel (ch->t,
596 ch->root_rel->mid_recv++; 671 ch);
597 }
598 else
599 add_buffered_data (msg, ch->root_rel);
600 } 672 }
673 GNUNET_STATISTICS_update (stats,
674 "# channels",
675 1,
676 GNUNET_NO);
677 LOG (GNUNET_ERROR_TYPE_DEBUG,
678 "Created channel to port %s at peer %s for %s using %s\n",
679 GNUNET_h2s (port),
680 GCP_2s (destination),
681 GSC_2s (owner),
682 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
683 return ch;
601} 684}
602 685
603 686
604/** 687/**
605 * Send a buffered message to the client, for in order delivery or 688 * We had an incoming channel to a port that is closed.
606 * as result of client ACK. 689 * It has not been opened for a while, drop it.
607 * 690 *
608 * @param ch Channel on which to empty the message buffer. 691 * @param cls the channel to drop
609 * @param c Client to send to.
610 * @param fwd Is this to send FWD data?.
611 */ 692 */
612static void 693static void
613send_client_buffered_data (struct CadetChannel *ch, 694timeout_closed_cb (void *cls)
614 struct CadetClient *c,
615 int fwd)
616{ 695{
617 struct CadetReliableMessage *copy; 696 struct CadetChannel *ch = cls;
618 struct CadetChannelReliability *rel;
619
620 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
621 rel = fwd ? ch->dest_rel : ch->root_rel;
622 if (GNUNET_NO == rel->client_ready)
623 {
624 LOG (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
625 return;
626 }
627 697
628 copy = rel->head_recv; 698 ch->retry_control_task = NULL;
629 /* We never buffer channel management messages */ 699 LOG (GNUNET_ERROR_TYPE_DEBUG,
630 if (NULL != copy) 700 "Closing incoming channel to port %s from peer %s due to timeout\n",
631 { 701 GNUNET_h2s (&ch->port),
632 if (copy->mid == rel->mid_recv || GNUNET_NO == ch->reliable) 702 GCP_2s (GCT_get_destination (ch->t)));
633 { 703 channel_destroy (ch);
634 struct GNUNET_CADET_ChannelAppDataMessage *msg = (struct GNUNET_CADET_ChannelAppDataMessage *) &copy[1];
635
636 LOG (GNUNET_ERROR_TYPE_DEBUG, " have %u! now expecting %u\n",
637 copy->mid, rel->mid_recv + 1);
638 send_client_data (ch, msg, fwd);
639 rel->n_recv--;
640 GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
641 LOG (GNUNET_ERROR_TYPE_DEBUG, " free copy recv MID %u (%p), %u left\n",
642 copy->mid, copy, rel->n_recv);
643 GNUNET_free (copy);
644 GCCH_send_data_ack (ch, fwd);
645 }
646 else
647 {
648 LOG (GNUNET_ERROR_TYPE_DEBUG, " reliable && don't have %u, next is %u\n",
649 rel->mid_recv, copy->mid);
650 if (GNUNET_YES == ch->destroy)
651 {
652 /* We don't have the next data piece and the remote peer has closed the
653 * channel. We won't receive it anymore, so just destroy the channel.
654 * FIXME: wait some time to allow other connections to
655 * deliver missing messages
656 */
657 send_destroy (ch, GNUNET_YES);
658 GCCH_destroy (ch);
659 }
660 }
661 }
662 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data END\n");
663} 704}
664 705
665 706
666/** 707/**
667 * Allow a client to send more data. 708 * Create a new channel based on a request coming in over the network.
668 *
669 * In case the client was already allowed to send data, do nothing.
670 * 709 *
671 * @param ch Channel. 710 * @param t tunnel to the remote peer
672 * @param fwd Is this a FWD ACK? (FWD ACKs are sent to root) 711 * @param ctn identifier of this channel in the tunnel
712 * @param port desired local port
713 * @param options options for the channel
714 * @return handle to the new channel
673 */ 715 */
674static void 716struct CadetChannel *
675send_client_ack (struct CadetChannel *ch, int fwd) 717GCCH_channel_incoming_new (struct CadetTunnel *t,
718 struct GNUNET_CADET_ChannelTunnelNumber ctn,
719 const struct GNUNET_HashCode *port,
720 uint32_t options)
676{ 721{
677 struct CadetChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel; 722 struct CadetChannel *ch;
678 struct CadetClient *c = fwd ? ch->root : ch->dest; 723 struct CadetClient *c;
724
725 ch = GNUNET_new (struct CadetChannel);
726 ch->port = *port;
727 ch->t = t;
728 ch->ctn = ctn;
729 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
730 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
731 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
732 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
733 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
734 GNUNET_STATISTICS_update (stats,
735 "# channels",
736 1,
737 GNUNET_NO);
679 738
739 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
740 port);
680 if (NULL == c) 741 if (NULL == c)
681 { 742 {
682 GNUNET_break (GNUNET_NO != ch->destroy); 743 /* port closed, wait for it to possibly open */
683 return; 744 ch->state = CADET_CHANNEL_LOOSE;
745 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
746 port,
747 ch,
748 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
749 GNUNET_assert (NULL == ch->retry_control_task);
750 ch->retry_control_task
751 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
752 &timeout_closed_cb,
753 ch);
754 LOG (GNUNET_ERROR_TYPE_DEBUG,
755 "Created loose incoming channel to port %s from peer %s\n",
756 GNUNET_h2s (&ch->port),
757 GCP_2s (GCT_get_destination (ch->t)));
684 } 758 }
685 LOG (GNUNET_ERROR_TYPE_DEBUG, 759 else
686 " sending %s ack to client on channel %s\n",
687 GC_f2s (fwd), GCCH_2s (ch));
688
689 if (NULL == rel)
690 { 760 {
691 GNUNET_break (0); 761 GCCH_bind (ch,
692 return; 762 c);
693 } 763 }
694 764 GNUNET_STATISTICS_update (stats,
695 if (GNUNET_YES == rel->client_allowed) 765 "# channels",
696 { 766 1,
697 LOG (GNUNET_ERROR_TYPE_DEBUG, " already allowed\n"); 767 GNUNET_NO);
698 return; 768 return ch;
699 }
700 rel->client_allowed = GNUNET_YES;
701
702 GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest);
703} 769}
704 770
705 771
706/** 772/**
707 * Notify the root that the destination rejected the channel. 773 * Function called once the tunnel confirms that we sent the
774 * ACK message. Just remembers it was sent, we do not expect
775 * ACKs for ACKs ;-).
708 * 776 *
709 * @param ch Rejected channel. 777 * @param cls our `struct CadetChannel`.
778 * @param cid identifier of the connection within the tunnel, NULL
779 * if transmission failed
710 */ 780 */
711static void 781static void
712send_client_nack (struct CadetChannel *ch) 782send_ack_cb (void *cls,
783 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
713{ 784{
714 if (NULL == ch->root) 785 struct CadetChannel *ch = cls;
715 { 786
716 GNUNET_break (0); 787 GNUNET_assert (NULL != ch->last_control_qe);
717 return; 788 ch->last_control_qe = NULL;
718 }
719 GML_send_channel_nack (ch->root, ch->lid_root);
720} 789}
721 790
722 791
723/** 792/**
724 * We haven't received an ACK after a certain time: restransmit the message. 793 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
725 * 794 *
726 * @param cls Closure (CadetChannelReliability with the message to restransmit) 795 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
727 */ 796 */
728static void 797static void
729channel_retransmit_message (void *cls) 798send_channel_data_ack (struct CadetChannel *ch)
730{ 799{
731 struct CadetChannelReliability *rel = cls; 800 struct GNUNET_CADET_ChannelDataAckMessage msg;
732 struct CadetReliableMessage *copy;
733 struct CadetChannel *ch;
734 struct GNUNET_CADET_ChannelAppDataMessage *payload;
735 int fwd;
736
737 rel->retry_task = NULL;
738 ch = rel->ch;
739 copy = rel->head_sent;
740 if (NULL == copy)
741 {
742 GNUNET_break (0); // FIXME tripped in rps testcase
743 return;
744 }
745
746 payload = (struct GNUNET_CADET_ChannelAppDataMessage *) &copy[1];
747 fwd = (rel == ch->root_rel);
748
749 /* Message not found in the queue that we are going to use. */
750 LOG (GNUNET_ERROR_TYPE_DEBUG, "RETRANSMIT MID %u\n", copy->mid);
751 801
752 GCCH_send_prebuilt_message (&payload->header, ch, fwd, copy); 802 if (GNUNET_NO == ch->reliable)
753 GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); 803 return; /* no ACKs */
804 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
805 msg.header.size = htons (sizeof (msg));
806 msg.ctn = ch->ctn;
807 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
808 msg.futures = GNUNET_htonll (ch->mid_futures);
809 LOG (GNUNET_ERROR_TYPE_DEBUG,
810 "Sending DATA_ACK %u:%llX via %s\n",
811 (unsigned int) ntohl (msg.mid.mid),
812 (unsigned long long) ch->mid_futures,
813 GCCH_2s (ch));
814 if (NULL != ch->last_control_qe)
815 GCT_send_cancel (ch->last_control_qe);
816 ch->last_control_qe = GCT_send (ch->t,
817 &msg.header,
818 &send_ack_cb,
819 ch);
754} 820}
755 821
756 822
757/** 823/**
758 * We haven't received an Channel ACK after a certain time: resend the CREATE. 824 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
825 * connection is up.
759 * 826 *
760 * @param cls Closure (CadetChannelReliability of the channel to recreate) 827 * @param cls the `struct CadetChannel`
761 */ 828 */
762static void 829static void
763channel_recreate (void *cls) 830send_open_ack (void *cls)
764{ 831{
765 struct CadetChannelReliability *rel = cls; 832 struct CadetChannel *ch = cls;
766 833 struct GNUNET_CADET_ChannelManageMessage msg;
767 rel->retry_task = NULL;
768 LOG (GNUNET_ERROR_TYPE_DEBUG, "RE-CREATE\n");
769 GNUNET_STATISTICS_update (stats,
770 "# data retransmitted", 1, GNUNET_NO);
771 834
772 if (rel == rel->ch->root_rel) 835 ch->retry_control_task = NULL;
773 { 836 LOG (GNUNET_ERROR_TYPE_DEBUG,
774 send_create (rel->ch); 837 "Sending CHANNEL_OPEN_ACK on %s\n",
775 } 838 GCCH_2s (ch));
776 else if (rel == rel->ch->dest_rel) 839 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
777 { 840 msg.header.size = htons (sizeof (msg));
778 send_ack (rel->ch, GNUNET_YES); 841 msg.reserved = htonl (0);
779 } 842 msg.ctn = ch->ctn;
780 else 843 if (NULL != ch->last_control_qe)
781 { 844 GCT_send_cancel (ch->last_control_qe);
782 GNUNET_break (0); 845 ch->last_control_qe = GCT_send (ch->t,
783 } 846 &msg.header,
847 &send_ack_cb,
848 ch);
784} 849}
785 850
786 851
787/** 852/**
788 * Message has been sent: start retransmission timer. 853 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
854 * this channel. If the binding was successful, (re)transmit the
855 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
789 * 856 *
790 * @param cls Closure (queue structure). 857 * @param ch channel that got the duplicate open
791 * @param t Tunnel. 858 * @param cti identifier of the connection that delivered the message
792 * @param q Queue handler (no longer valid).
793 * @param type Type of message.
794 * @param size Size of the message.
795 */ 859 */
796static void 860void
797ch_message_sent (void *cls, 861GCCH_handle_duplicate_open (struct CadetChannel *ch,
798 struct CadetTunnel *t, 862 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
799 struct CadetTunnelQueue *q,
800 uint16_t type, size_t size)
801{ 863{
802 struct CadetChannelQueue *chq = cls; 864 if (NULL == ch->dest)
803 struct CadetReliableMessage *copy = chq->copy;
804 struct CadetChannelReliability *rel;
805
806 LOG (GNUNET_ERROR_TYPE_DEBUG, "channel_message_sent callback %s\n",
807 GC_m2s (chq->type));
808
809 switch (chq->type)
810 { 865 {
811 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA: 866 LOG (GNUNET_ERROR_TYPE_DEBUG,
812 LOG (GNUNET_ERROR_TYPE_DEBUG, "data MID %u sent\n", copy->mid); 867 "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n",
813 GNUNET_assert (chq == copy->chq); 868 GCCH_2s (ch));
814 copy->timestamp = GNUNET_TIME_absolute_get (); 869 return;
815 rel = copy->rel;
816 if (NULL == rel->retry_task)
817 {
818 LOG (GNUNET_ERROR_TYPE_DEBUG, " scheduling retry in %d * %s\n",
819 CADET_RETRANSMIT_MARGIN,
820 GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
821 GNUNET_YES));
822 if (0 != rel->expected_delay.rel_value_us)
823 {
824 rel->retry_timer =
825 GNUNET_TIME_relative_saturating_multiply (rel->expected_delay,
826 CADET_RETRANSMIT_MARGIN);
827 }
828 else
829 {
830 rel->retry_timer = CADET_RETRANSMIT_TIME;
831 }
832 LOG (GNUNET_ERROR_TYPE_DEBUG, " using delay %s\n",
833 GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
834 GNUNET_NO));
835 rel->retry_task =
836 GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
837 &channel_retransmit_message, rel);
838 }
839 else
840 {
841 LOG (GNUNET_ERROR_TYPE_DEBUG, "retry running %p\n", rel->retry_task);
842 }
843 copy->chq = NULL;
844 break;
845
846
847 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK:
848 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
849 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK:
850 LOG (GNUNET_ERROR_TYPE_DEBUG, "sent %s\n", GC_m2s (chq->type));
851 rel = chq->rel;
852 GNUNET_assert (rel->uniq == chq);
853 rel->uniq = NULL;
854
855 if (CADET_CHANNEL_READY != rel->ch->state
856 && GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK != type
857 && GNUNET_NO == rel->ch->destroy)
858 {
859 GNUNET_assert (NULL == rel->retry_task);
860 LOG (GNUNET_ERROR_TYPE_DEBUG, "STD BACKOFF %s\n",
861 GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
862 GNUNET_NO));
863 rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
864 rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
865 &channel_recreate, rel);
866 }
867 break;
868
869 default:
870 GNUNET_break (0);
871 } 870 }
872 871 if (NULL != ch->retry_control_task)
873 GNUNET_free (chq); 872 {
873 LOG (GNUNET_ERROR_TYPE_DEBUG,
874 "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n",
875 GCCH_2s (ch));
876 return;
877 }
878 LOG (GNUNET_ERROR_TYPE_DEBUG,
879 "Retransmitting CHANNEL_OPEN_ACK on %s\n",
880 GCCH_2s (ch));
881 ch->retry_control_task
882 = GNUNET_SCHEDULER_add_now (&send_open_ack,
883 ch);
874} 884}
875 885
876 886
877/** 887/**
878 * send a channel create message. 888 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
879 * 889 *
880 * @param ch Channel for which to send. 890 * @param ch channel the ack is for
891 * @param to_owner #GNUNET_YES to send to owner,
892 * #GNUNET_NO to send to dest
881 */ 893 */
882static void 894static void
883send_create (struct CadetChannel *ch) 895send_ack_to_client (struct CadetChannel *ch,
896 int to_owner)
884{ 897{
885 struct GNUNET_CADET_ChannelOpenMessage msgcc; 898 struct GNUNET_MQ_Envelope *env;
899 struct GNUNET_CADET_LocalAck *ack;
900 struct CadetChannelClient *ccc;
886 901
887 msgcc.header.size = htons (sizeof (msgcc)); 902 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
888 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN); 903 if (NULL == ccc)
889 msgcc.ctn = ch->gid; 904 {
890 msgcc.port = ch->port; 905 /* This can happen if we are just getting ACKs after
891 msgcc.opt = htonl (channel_get_options (ch)); 906 our local client already disconnected. */
892 907 GNUNET_assert (GNUNET_YES == ch->destroy);
893 GCCH_send_prebuilt_message (&msgcc.header, ch, GNUNET_YES, NULL); 908 return;
909 }
910 env = GNUNET_MQ_msg (ack,
911 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
912 ack->ccn = ccc->ccn;
913 LOG (GNUNET_ERROR_TYPE_DEBUG,
914 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
915 GSC_2s (ccc->c),
916 (GNUNET_YES == to_owner) ? "owner" : "dest",
917 ntohl (ack->ccn.channel_of_client),
918 ch->pending_messages,
919 ch->max_pending_messages);
920 GSC_send_to_client (ccc->c,
921 env);
894} 922}
895 923
896 924
897/** 925/**
898 * Confirm we got a channel create or FWD ack. 926 * A client is bound to the port that we have a channel
927 * open to. Send the acknowledgement for the connection
928 * request and establish the link with the client.
899 * 929 *
900 * @param ch The channel to confirm. 930 * @param ch open incoming channel
901 * @param fwd Should we send a FWD ACK? (going dest->root) 931 * @param c client listening on the respective port
902 */ 932 */
903static void 933void
904send_ack (struct CadetChannel *ch, int fwd) 934GCCH_bind (struct CadetChannel *ch,
935 struct CadetClient *c)
905{ 936{
906 struct GNUNET_CADET_ChannelManageMessage msg; 937 uint32_t options;
938 struct CadetChannelClient *cccd;
907 939
908 msg.header.size = htons (sizeof (msg));
909 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
910 LOG (GNUNET_ERROR_TYPE_DEBUG, 940 LOG (GNUNET_ERROR_TYPE_DEBUG,
911 " sending channel %s ack for channel %s\n", 941 "Binding %s from %s to port %s of %s\n",
912 GC_f2s (fwd), GCCH_2s (ch)); 942 GCCH_2s (ch),
913 943 GCT_2s (ch->t),
914 msg.ctn =ch->gid; 944 GNUNET_h2s (&ch->port),
915 GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL); 945 GSC_2s (c));
946 if (NULL != ch->retry_control_task)
947 {
948 /* there might be a timeout task here */
949 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
950 ch->retry_control_task = NULL;
951 }
952 options = 0;
953 if (ch->nobuffer)
954 options |= GNUNET_CADET_OPTION_NOBUFFER;
955 if (ch->reliable)
956 options |= GNUNET_CADET_OPTION_RELIABLE;
957 if (ch->out_of_order)
958 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
959 cccd = GNUNET_new (struct CadetChannelClient);
960 GNUNET_assert (NULL == ch->dest);
961 ch->dest = cccd;
962 cccd->c = c;
963 cccd->client_ready = GNUNET_YES;
964 cccd->ccn = GSC_bind (c,
965 ch,
966 (GNUNET_YES == ch->is_loopback)
967 ? GCP_get (&my_full_id,
968 GNUNET_YES)
969 : GCT_get_destination (ch->t),
970 &ch->port,
971 options);
972 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
973 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
974 ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
975 if (GNUNET_YES == ch->is_loopback)
976 {
977 ch->state = CADET_CHANNEL_OPEN_SENT;
978 GCCH_handle_channel_open_ack (ch,
979 NULL);
980 }
981 else
982 {
983 /* notify other peer that we accepted the connection */
984 ch->state = CADET_CHANNEL_READY;
985 ch->retry_control_task
986 = GNUNET_SCHEDULER_add_now (&send_open_ack,
987 ch);
988 }
989 /* give client it's initial supply of ACKs */
990 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
991 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
992 for (unsigned int i=0;i<ch->max_pending_messages;i++)
993 send_ack_to_client (ch,
994 GNUNET_NO);
916} 995}
917 996
918 997
919/** 998/**
920 * Send a message and don't keep any info about it: we won't need to cancel it 999 * One of our clients has disconnected, tell the other one that we
921 * or resend it. 1000 * are finished. Done asynchronously to avoid concurrent modification
1001 * issues if this is the same client.
922 * 1002 *
923 * @param msg Header of the message to fire away. 1003 * @param cls the `struct CadetChannel` where one of the ends is now dead
924 * @param ch Channel on which the message should go.
925 * @param force Is this a forced (undroppable) message?
926 */ 1004 */
927static void 1005static void
928fire_and_forget (const struct GNUNET_MessageHeader *msg, 1006signal_remote_destroy_cb (void *cls)
929 struct CadetChannel *ch,
930 int force)
931{ 1007{
932 GNUNET_break (NULL == 1008 struct CadetChannel *ch = cls;
933 GCT_send_prebuilt_message (msg, ch->t, NULL, 1009 struct CadetChannelClient *ccc;
934 force, NULL, NULL)); 1010
1011 /* Find which end is left... */
1012 ch->retry_control_task = NULL;
1013 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1014 GSC_handle_remote_channel_destroy (ccc->c,
1015 ccc->ccn,
1016 ch);
1017 channel_destroy (ch);
935} 1018}
936 1019
937 1020
938/** 1021/**
939 * Notify that a channel create didn't succeed. 1022 * Destroy locally created channel. Called by the local client, so no
1023 * need to tell the client.
940 * 1024 *
941 * @param ch The channel to reject. 1025 * @param ch channel to destroy
1026 * @param c client that caused the destruction
1027 * @param ccn client number of the client @a c
942 */ 1028 */
943static void 1029void
944send_nack (struct CadetChannel *ch) 1030GCCH_channel_local_destroy (struct CadetChannel *ch,
1031 struct CadetClient *c,
1032 struct GNUNET_CADET_ClientChannelNumber ccn)
945{ 1033{
946 struct GNUNET_CADET_ChannelManageMessage msg;
947
948 msg.header.size = htons (sizeof (msg));
949 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED);
950 LOG (GNUNET_ERROR_TYPE_DEBUG, 1034 LOG (GNUNET_ERROR_TYPE_DEBUG,
951 " sending channel NACK for channel %s\n", 1035 "%s asks for destruction of %s\n",
1036 GSC_2s (c),
952 GCCH_2s (ch)); 1037 GCCH_2s (ch));
1038 GNUNET_assert (NULL != c);
1039 if ( (NULL != ch->owner) &&
1040 (c == ch->owner->c) &&
1041 (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
1042 {
1043 free_channel_client (ch->owner);
1044 ch->owner = NULL;
1045 }
1046 else if ( (NULL != ch->dest) &&
1047 (c == ch->dest->c) &&
1048 (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
1049 {
1050 free_channel_client (ch->dest);
1051 ch->dest = NULL;
1052 }
1053 else
1054 {
1055 GNUNET_assert (0);
1056 }
953 1057
954 msg.ctn = ch->gid; 1058 if (GNUNET_YES == ch->destroy)
955 GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
956}
957
958
959/**
960 * Destroy all reliable messages queued for a channel,
961 * during a channel destruction.
962 * Frees the reliability structure itself.
963 *
964 * @param rel Reliability data for a channel.
965 */
966static void
967channel_rel_free_all (struct CadetChannelReliability *rel)
968{
969 struct CadetReliableMessage *copy;
970 struct CadetReliableMessage *next;
971
972 if (NULL == rel)
973 return;
974
975 for (copy = rel->head_recv; NULL != copy; copy = next)
976 { 1059 {
977 next = copy->next; 1060 /* other end already destroyed, with the local client gone, no need
978 GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy); 1061 to finish transmissions, just destroy immediately. */
979 LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE ALL RECV %p\n", copy); 1062 channel_destroy (ch);
980 GNUNET_break (NULL == copy->chq); 1063 return;
981 GNUNET_free (copy);
982 } 1064 }
983 for (copy = rel->head_sent; NULL != copy; copy = next) 1065 if ( (NULL != ch->head_sent) &&
1066 ( (NULL != ch->owner) ||
1067 (NULL != ch->dest) ) )
984 { 1068 {
985 next = copy->next; 1069 /* Wait for other end to destroy us as well,
986 GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy); 1070 and otherwise allow send queue to be transmitted first */
987 LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE ALL SEND %p\n", copy); 1071 ch->destroy = GNUNET_YES;
988 if (NULL != copy->chq) 1072 return;
989 {
990 if (NULL != copy->chq->tq)
991 {
992 GCT_cancel (copy->chq->tq);
993 /* ch_message_sent will free copy->q */
994 }
995 else
996 {
997 GNUNET_free (copy->chq);
998 GNUNET_break (0);
999 }
1000 }
1001 GNUNET_free (copy);
1002 } 1073 }
1003 if (NULL != rel->uniq && NULL != rel->uniq->tq) 1074 if ( (GNUNET_YES == ch->is_loopback) &&
1075 ( (NULL != ch->owner) ||
1076 (NULL != ch->dest) ) )
1004 { 1077 {
1005 GCT_cancel (rel->uniq->tq); 1078 if (NULL != ch->retry_control_task)
1006 /* ch_message_sent is called freeing uniq */ 1079 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1080 ch->retry_control_task
1081 = GNUNET_SCHEDULER_add_now (&signal_remote_destroy_cb,
1082 ch);
1083 return;
1007 } 1084 }
1008 if (NULL != rel->retry_task) 1085 if (GNUNET_NO == ch->is_loopback)
1009 { 1086 {
1010 GNUNET_SCHEDULER_cancel (rel->retry_task); 1087 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1011 rel->retry_task = NULL; 1088 switch (ch->state)
1089 {
1090 case CADET_CHANNEL_NEW:
1091 /* We gave up on a channel that we created as a client to a remote
1092 target, but that never went anywhere. Nothing to do here. */
1093 break;
1094 case CADET_CHANNEL_LOOSE:
1095 GSC_drop_loose_channel (&ch->port,
1096 ch);
1097 break;
1098 default:
1099 GCT_send_channel_destroy (ch->t,
1100 ch->ctn);
1101 }
1012 } 1102 }
1013 GNUNET_free (rel); 1103 /* Nothing left to do, just finish destruction */
1104 channel_destroy (ch);
1014} 1105}
1015 1106
1016 1107
1017/** 1108/**
1018 * Mark future messages as ACK'd. 1109 * We got an acknowledgement for the creation of the channel
1019 * 1110 * (the port is open on the other side). Begin transmissions.
1020 * @param rel Reliability data.
1021 * @param msg DataACK message with a bitfield of future ACK'd messages.
1022 * 1111 *
1023 * @return How many messages have been freed. 1112 * @param ch channel to destroy
1113 * @param cti identifier of the connection that delivered the message
1024 */ 1114 */
1025static unsigned int 1115void
1026channel_rel_free_sent (struct CadetChannelReliability *rel, 1116GCCH_handle_channel_open_ack (struct CadetChannel *ch,
1027 const struct GNUNET_CADET_ChannelDataAckMessage *msg) 1117 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1028{ 1118{
1029 struct CadetReliableMessage *copy; 1119 switch (ch->state)
1030 struct CadetReliableMessage *next;
1031 uint64_t bitfield;
1032 uint64_t mask;
1033 uint32_t mid;
1034 uint32_t target;
1035 unsigned int i;
1036 unsigned int r;
1037
1038 bitfield = msg->futures;
1039 mid = ntohl (msg->mid);
1040 LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable %u %lX\n", mid, bitfield);
1041 LOG (GNUNET_ERROR_TYPE_DEBUG, " rel %p, head %p\n", rel, rel->head_sent);
1042 for (i = 0, r = 0, copy = rel->head_sent;
1043 i < 64 && NULL != copy && 0 != bitfield;
1044 i++)
1045 { 1120 {
1046 LOG (GNUNET_ERROR_TYPE_DEBUG, " trying bit %u (mid %u)\n", i, mid + i + 1); 1121 case CADET_CHANNEL_NEW:
1047 mask = 0x1LL << i; 1122 /* this should be impossible */
1048 if (0 == (bitfield & mask)) 1123 GNUNET_break (0);
1049 continue; 1124 break;
1050 1125 case CADET_CHANNEL_LOOSE:
1051 LOG (GNUNET_ERROR_TYPE_DEBUG, " set!\n"); 1126 /* This makes no sense. */
1052 /* Bit was set, clear the bit from the bitfield */ 1127 GNUNET_break_op (0);
1053 bitfield &= ~mask; 1128 break;
1054 1129 case CADET_CHANNEL_OPEN_SENT:
1055 /* The i-th bit was set. Do we have that copy? */ 1130 if (NULL == ch->owner)
1056 /* Skip copies with mid < target */
1057 target = mid + i + 1;
1058 LOG (GNUNET_ERROR_TYPE_DEBUG, " target %u\n", target);
1059 while (NULL != copy && GC_is_pid_bigger (target, copy->mid))
1060 copy = copy->next;
1061
1062 /* Did we run out of copies? (previously freed, it's ok) */
1063 if (NULL == copy)
1064 { 1131 {
1065 LOG (GNUNET_ERROR_TYPE_DEBUG, "run out of copies...\n"); 1132 /* We're not the owner, wrong direction! */
1066 return r; 1133 GNUNET_break_op (0);
1134 return;
1067 } 1135 }
1068 1136 LOG (GNUNET_ERROR_TYPE_DEBUG,
1069 /* Did we overshoot the target? (previously freed, it's ok) */ 1137 "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1070 if (GC_is_pid_bigger (copy->mid, target)) 1138 GCCH_2s (ch));
1139 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1071 { 1140 {
1072 LOG (GNUNET_ERROR_TYPE_DEBUG, " next copy %u\n", copy->mid); 1141 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1073 i += copy->mid - target - 1; /* MID: 90, t = 85, i += 4 (i++ later) */ 1142 ch->retry_control_task = NULL;
1074 mask = (0x1LL << (i + 1)) - 1; /* Mask = i-th bit and all before */
1075 bitfield &= ~mask; /* Clear all bits up to MID - 1 */
1076 continue;
1077 } 1143 }
1078 1144 ch->state = CADET_CHANNEL_READY;
1079 /* Now copy->mid == target, free it */ 1145 /* On first connect, send client as many ACKs as we allow messages
1080 next = copy->next; 1146 to be buffered! */
1081 GNUNET_break (GNUNET_YES != rel_message_free (copy, GNUNET_YES)); 1147 for (unsigned int i=0;i<ch->max_pending_messages;i++)
1082 r++; 1148 send_ack_to_client (ch,
1083 copy = next; 1149 GNUNET_YES);
1150 break;
1151 case CADET_CHANNEL_READY:
1152 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1153 LOG (GNUNET_ERROR_TYPE_DEBUG,
1154 "Received duplicate channel OPEN_ACK for %s\n",
1155 GCCH_2s (ch));
1156 GNUNET_STATISTICS_update (stats,
1157 "# duplicate CREATE_ACKs",
1158 1,
1159 GNUNET_NO);
1160 break;
1084 } 1161 }
1085 LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n");
1086 return r;
1087} 1162}
1088 1163
1089 1164
1090/** 1165/**
1091 * Destroy a reliable message after it has been acknowledged, either by 1166 * Test if element @a e1 comes before element @a e2.
1092 * direct mid ACK or bitfield. Updates the appropriate data structures and
1093 * timers and frees all memory.
1094 * 1167 *
1095 * @param copy Message that is no longer needed: remote peer got it. 1168 * @param cls closure, to a flag where we indicate duplicate packets
1096 * @param update_time Is the timing information relevant? 1169 * @param m1 a message of to sort
1097 * If this message is ACK in a batch the timing information 1170 * @param m2 another message to sort
1098 * is skewed by the retransmission, count only for the 1171 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1099 * retransmitted message.
1100 *
1101 * @return #GNUNET_YES if channel was destroyed as a result of the call,
1102 * #GNUNET_NO otherwise.
1103 */ 1172 */
1104static int 1173static int
1105rel_message_free (struct CadetReliableMessage *copy, int update_time) 1174is_before (void *cls,
1175 struct CadetOutOfOrderMessage *m1,
1176 struct CadetOutOfOrderMessage *m2)
1106{ 1177{
1107 struct CadetChannelReliability *rel; 1178 int *duplicate = cls;
1108 struct GNUNET_TIME_Relative time; 1179 uint32_t v1 = ntohl (m1->mid.mid);
1180 uint32_t v2 = ntohl (m2->mid.mid);
1181 uint32_t delta;
1109 1182
1110 rel = copy->rel; 1183 delta = v2 - v1;
1111 LOG (GNUNET_ERROR_TYPE_DEBUG, "Freeing %u\n", copy->mid); 1184 if (0 == delta)
1112 if (GNUNET_YES == update_time) 1185 *duplicate = GNUNET_YES;
1186 if (delta > (uint32_t) INT_MAX)
1113 { 1187 {
1114 time = GNUNET_TIME_absolute_get_duration (copy->timestamp); 1188 /* in overflow range, we can safely assume we wrapped around */
1115 if (0 == rel->expected_delay.rel_value_us) 1189 return GNUNET_NO;
1116 rel->expected_delay = time;
1117 else
1118 {
1119 rel->expected_delay.rel_value_us *= 7;
1120 rel->expected_delay.rel_value_us += time.rel_value_us;
1121 rel->expected_delay.rel_value_us /= 8;
1122 }
1123 LOG (GNUNET_ERROR_TYPE_DEBUG, " message time %12s\n",
1124 GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO));
1125 LOG (GNUNET_ERROR_TYPE_DEBUG, " new delay %12s\n",
1126 GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
1127 GNUNET_NO));
1128 rel->retry_timer = rel->expected_delay;
1129 } 1190 }
1130 else 1191 else
1131 { 1192 {
1132 LOG (GNUNET_ERROR_TYPE_DEBUG, "batch free, ignoring timing\n"); 1193 /* result is small, thus v2 > v1, thus m1 < m2 */
1133 }
1134 rel->ch->pending_messages--;
1135 if (NULL != copy->chq)
1136 {
1137 GCT_cancel (copy->chq->tq);
1138 /* copy->q is set to NULL by ch_message_sent */
1139 }
1140 GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
1141 LOG (GNUNET_ERROR_TYPE_DEBUG, " free send copy MID %u at %p\n",
1142 copy->mid, copy);
1143 GNUNET_free (copy);
1144
1145 if (GNUNET_NO != rel->ch->destroy && 0 == rel->ch->pending_messages)
1146 {
1147 GCCH_destroy (rel->ch);
1148 return GNUNET_YES; 1194 return GNUNET_YES;
1149 } 1195 }
1150 return GNUNET_NO;
1151} 1196}
1152 1197
1153 1198
1154/** 1199/**
1155 * Channel was ACK'd by remote peer, mark as ready and cancel retransmission. 1200 * We got payload data for a channel. Pass it on to the client
1201 * and send an ACK to the other end (once flow control allows it!)
1156 * 1202 *
1157 * @param ch Channel to mark as ready. 1203 * @param ch channel that got data
1158 * @param fwd Was the ACK message a FWD ACK? (dest->root, SYNACK) 1204 * @param cti identifier of the connection that delivered the message
1205 * @param msg message that was received
1159 */ 1206 */
1160static void 1207void
1161channel_confirm (struct CadetChannel *ch, int fwd) 1208GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1209 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1210 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1162{ 1211{
1163 struct CadetChannelReliability *rel; 1212 struct GNUNET_MQ_Envelope *env;
1164 enum CadetChannelState oldstate; 1213 struct GNUNET_CADET_LocalData *ld;
1165 1214 struct CadetChannelClient *ccc;
1166 rel = fwd ? ch->root_rel : ch->dest_rel; 1215 size_t payload_size;
1167 if (NULL == rel) 1216 struct CadetOutOfOrderMessage *com;
1217 int duplicate;
1218 uint32_t mid_min;
1219 uint32_t mid_max;
1220 uint32_t mid_msg;
1221 uint32_t delta;
1222
1223 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1224 if ( (GNUNET_YES == ch->destroy) &&
1225 (NULL == ch->owner) &&
1226 (NULL == ch->dest) )
1227 {
1228 /* This client is gone, but we still have messages to send to
1229 the other end (which is why @a ch is not yet dead). However,
1230 we cannot pass messages to our client anymore. */
1231 LOG (GNUNET_ERROR_TYPE_DEBUG,
1232 "Dropping incoming payload on %s as this end is already closed\n",
1233 GCCH_2s (ch));
1234 /* send back DESTROY notification to stop further retransmissions! */
1235 GCT_send_channel_destroy (ch->t,
1236 ch->ctn);
1237 return;
1238 }
1239 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1240 env = GNUNET_MQ_msg_extra (ld,
1241 payload_size,
1242 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1243 ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1244 GNUNET_memcpy (&ld[1],
1245 &msg[1],
1246 payload_size);
1247 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1248 if ( (GNUNET_YES == ccc->client_ready) &&
1249 ( (GNUNET_YES == ch->out_of_order) ||
1250 (msg->mid.mid == ch->mid_recv.mid) ) )
1168 { 1251 {
1169 GNUNET_break (GNUNET_NO != ch->destroy); 1252 LOG (GNUNET_ERROR_TYPE_DEBUG,
1253 "Giving %u bytes of payload with MID %u from %s to client %s\n",
1254 (unsigned int) payload_size,
1255 ntohl (msg->mid.mid),
1256 GCCH_2s (ch),
1257 GSC_2s (ccc->c));
1258 ccc->client_ready = GNUNET_NO;
1259 GSC_send_to_client (ccc->c,
1260 env);
1261 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1262 ch->mid_futures >>= 1;
1263 send_channel_data_ack (ch);
1170 return; 1264 return;
1171 } 1265 }
1172 LOG (GNUNET_ERROR_TYPE_DEBUG, " channel confirm %s %s\n",
1173 GC_f2s (fwd), GCCH_2s (ch));
1174 oldstate = ch->state;
1175 ch->state = CADET_CHANNEL_READY;
1176 1266
1177 if (CADET_CHANNEL_READY != oldstate || GNUNET_YES == is_loopback (ch)) 1267 if (GNUNET_YES == ch->reliable)
1178 { 1268 {
1179 rel->client_ready = GNUNET_YES; 1269 /* check if message ought to be dropped because it is ancient/too distant/duplicate */
1180 rel->expected_delay = rel->retry_timer; 1270 mid_min = ntohl (ch->mid_recv.mid);
1181 LOG (GNUNET_ERROR_TYPE_DEBUG, " confirm retry timer %s\n", 1271 mid_max = mid_min + ch->max_pending_messages;
1182 GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO)); 1272 mid_msg = ntohl (msg->mid.mid);
1183 if (GCT_get_connections_buffer (ch->t) > 0 || GCT_is_loopback (ch->t)) 1273 if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
1184 send_client_ack (ch, fwd); 1274 ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
1185
1186 if (NULL != rel->retry_task)
1187 { 1275 {
1188 GNUNET_SCHEDULER_cancel (rel->retry_task); 1276 LOG (GNUNET_ERROR_TYPE_DEBUG,
1189 rel->retry_task = NULL; 1277 "%s at %u drops ancient or far-future message %u\n",
1190 } 1278 GCCH_2s (ch),
1191 else if (NULL != rel->uniq) 1279 (unsigned int) mid_min,
1192 { 1280 ntohl (msg->mid.mid));
1193 GCT_cancel (rel->uniq->tq); 1281
1194 /* ch_message_sent will free and NULL uniq */ 1282 GNUNET_STATISTICS_update (stats,
1283 "# duplicate DATA (ancient or future)",
1284 1,
1285 GNUNET_NO);
1286 GNUNET_MQ_discard (env);
1287 send_channel_data_ack (ch);
1288 return;
1195 } 1289 }
1196 else if (GNUNET_NO == is_loopback (ch)) 1290 /* mark bit for future ACKs */
1291 delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
1292 if (delta < 64)
1197 { 1293 {
1198 /* We SHOULD have been trying to retransmit this! */ 1294 if (0 != (ch->mid_futures & (1LLU << delta)))
1199 GNUNET_break (0); 1295 {
1296 /* Duplicate within the queue, drop also */
1297 LOG (GNUNET_ERROR_TYPE_DEBUG,
1298 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1299 (unsigned int) payload_size,
1300 GCCH_2s (ch),
1301 ntohl (msg->mid.mid));
1302 GNUNET_STATISTICS_update (stats,
1303 "# duplicate DATA",
1304 1,
1305 GNUNET_NO);
1306 GNUNET_MQ_discard (env);
1307 send_channel_data_ack (ch);
1308 return;
1309 }
1310 ch->mid_futures |= (1LLU << delta);
1311 LOG (GNUNET_ERROR_TYPE_DEBUG,
1312 "Marked bit %llX for mid %u (base: %u); now: %llX\n",
1313 (1LLU << delta),
1314 mid_msg,
1315 mid_min,
1316 ch->mid_futures);
1200 } 1317 }
1201 } 1318 }
1202 1319 else /* ! ch->reliable */
1203 /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */
1204 if (GNUNET_YES == fwd)
1205 send_ack (ch, GNUNET_NO);
1206}
1207
1208
1209/**
1210 * Save a copy to retransmit in case it gets lost.
1211 *
1212 * Initializes all needed callbacks and timers.
1213 *
1214 * @param ch Channel this message goes on.
1215 * @param msg Message to copy.
1216 * @param fwd Is this fwd traffic?
1217 */
1218static struct CadetReliableMessage *
1219channel_save_copy (struct CadetChannel *ch,
1220 const struct GNUNET_MessageHeader *msg,
1221 int fwd)
1222{
1223 struct CadetChannelReliability *rel;
1224 struct CadetReliableMessage *copy;
1225 uint32_t mid;
1226 uint16_t type;
1227 uint16_t size;
1228
1229 rel = fwd ? ch->root_rel : ch->dest_rel;
1230 mid = rel->mid_send - 1;
1231 type = ntohs (msg->type);
1232 size = ntohs (msg->size);
1233
1234 LOG (GNUNET_ERROR_TYPE_DEBUG, "save MID %u %s\n", mid, GC_m2s (type));
1235 copy = GNUNET_malloc (sizeof (struct CadetReliableMessage) + size);
1236 LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", copy);
1237 copy->mid = mid;
1238 copy->rel = rel;
1239 copy->type = type;
1240 GNUNET_memcpy (&copy[1], msg, size);
1241 GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
1242 ch->pending_messages++;
1243
1244 return copy;
1245}
1246
1247
1248/**
1249 * Create a new channel.
1250 *
1251 * @param t Tunnel this channel is in.
1252 * @param owner Client that owns the channel, NULL for foreign channels.
1253 * @param lid_root Local ID for root client.
1254 *
1255 * @return A new initialized channel. NULL on error.
1256 */
1257static struct CadetChannel *
1258channel_new (struct CadetTunnel *t,
1259 struct CadetClient *owner,
1260 struct GNUNET_CADET_ClientChannelNumber lid_root)
1261{
1262 struct CadetChannel *ch;
1263
1264 ch = GNUNET_new (struct CadetChannel);
1265 ch->root = owner;
1266 ch->lid_root = lid_root;
1267 ch->t = t;
1268
1269 GNUNET_STATISTICS_update (stats, "# channels", 1, GNUNET_NO);
1270
1271 if (NULL != owner)
1272 {
1273 ch->gid = GCT_get_next_ctn (t);
1274 GML_channel_add (owner, lid_root, ch);
1275 }
1276 GCT_add_channel (t, ch);
1277
1278 return ch;
1279}
1280
1281
1282/**
1283 * Handle a loopback message: call the appropriate handler for the message type.
1284 *
1285 * @param ch Channel this message is on.
1286 * @param msgh Message header.
1287 * @param fwd Is this FWD traffic?
1288 */
1289void
1290handle_loopback (struct CadetChannel *ch,
1291 const struct GNUNET_MessageHeader *msgh,
1292 int fwd)
1293{
1294 uint16_t type;
1295
1296 type = ntohs (msgh->type);
1297 LOG (GNUNET_ERROR_TYPE_DEBUG,
1298 "Loopback %s %s message!\n",
1299 GC_f2s (fwd), GC_m2s (type));
1300
1301 switch (type)
1302 { 1320 {
1303 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA: 1321 /* Channel is unreliable, so we do not ACK. But we also cannot
1304 /* Don't send hop ACK, wait for client to ACK */ 1322 allow buffering everything, so check if we have space... */
1305 LOG (GNUNET_ERROR_TYPE_DEBUG, "SEND loopback %u (%u)\n", 1323 if (ccc->num_recv >= ch->max_pending_messages)
1306 ntohl (((struct GNUNET_CADET_ChannelAppDataMessage *) msgh)->mid), ntohs (msgh->size)); 1324 {
1307 GCCH_handle_data (ch, (struct GNUNET_CADET_ChannelAppDataMessage *) msgh, fwd); 1325 struct CadetOutOfOrderMessage *drop;
1308 break;
1309
1310 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK:
1311 GCCH_handle_data_ack (ch,
1312 (const struct GNUNET_CADET_ChannelDataAckMessage *) msgh, fwd);
1313 break;
1314
1315 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
1316 GCCH_handle_create (ch->t,
1317 (const struct GNUNET_CADET_ChannelOpenMessage *) msgh);
1318 break;
1319
1320 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK:
1321 GCCH_handle_ack (ch,
1322 (const struct GNUNET_CADET_ChannelManageMessage *) msgh,
1323 fwd);
1324 break;
1325
1326 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED:
1327 GCCH_handle_nack (ch);
1328 break;
1329
1330 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1331 GCCH_handle_destroy (ch,
1332 (const struct GNUNET_CADET_ChannelManageMessage *) msgh,
1333 fwd);
1334 break;
1335 1326
1336 default: 1327 /* Yep, need to drop. Drop the oldest message in
1337 GNUNET_break_op (0); 1328 the buffer. */
1338 LOG (GNUNET_ERROR_TYPE_DEBUG, 1329 LOG (GNUNET_ERROR_TYPE_DEBUG,
1339 "end-to-end message not known (%u)\n", 1330 "Queue full due slow client on %s, dropping oldest message\n",
1340 ntohs (msgh->type)); 1331 GCCH_2s (ch));
1332 GNUNET_STATISTICS_update (stats,
1333 "# messages dropped due to slow client",
1334 1,
1335 GNUNET_NO);
1336 drop = ccc->head_recv;
1337 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1338 ccc->tail_recv,
1339 drop);
1340 ccc->num_recv--;
1341 GNUNET_MQ_discard (drop->env);
1342 GNUNET_free (drop);
1343 }
1341 } 1344 }
1342}
1343
1344
1345
1346/******************************************************************************/
1347/******************************** API ***********************************/
1348/******************************************************************************/
1349
1350/**
1351 * Destroy a channel and free all resources.
1352 *
1353 * @param ch Channel to destroy.
1354 */
1355void
1356GCCH_destroy (struct CadetChannel *ch)
1357{
1358 struct CadetClient *c;
1359 struct CadetTunnel *t;
1360 1345
1361 if (NULL == ch) 1346 /* Insert message into sorted out-of-order queue */
1347 com = GNUNET_new (struct CadetOutOfOrderMessage);
1348 com->mid = msg->mid;
1349 com->env = env;
1350 duplicate = GNUNET_NO;
1351 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1352 is_before,
1353 &duplicate,
1354 ccc->head_recv,
1355 ccc->tail_recv,
1356 com);
1357 ccc->num_recv++;
1358 if (GNUNET_YES == duplicate)
1359 {
1360 /* Duplicate within the queue, drop also (this is not covered by
1361 the case above if "delta" >= 64, which could be the case if
1362 max_pending_messages is also >= 64 or if our client is unready
1363 and we are seeing retransmissions of the message our client is
1364 blocked on. */
1365 LOG (GNUNET_ERROR_TYPE_DEBUG,
1366 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1367 (unsigned int) payload_size,
1368 GCCH_2s (ch),
1369 ntohl (msg->mid.mid));
1370 GNUNET_STATISTICS_update (stats,
1371 "# duplicate DATA",
1372 1,
1373 GNUNET_NO);
1374 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1375 ccc->tail_recv,
1376 com);
1377 ccc->num_recv--;
1378 GNUNET_MQ_discard (com->env);
1379 GNUNET_free (com);
1380 send_channel_data_ack (ch);
1362 return; 1381 return;
1363 if (2 == ch->destroy)
1364 return; /* recursive call */
1365 ch->destroy = 2;
1366
1367 LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n",
1368 GCT_2s (ch->t), ch->gid);
1369 GCCH_debug (ch, GNUNET_ERROR_TYPE_DEBUG);
1370
1371 c = ch->root;
1372 if (NULL != c)
1373 {
1374 GML_channel_remove (c, ch->lid_root, ch);
1375 }
1376
1377 c = ch->dest;
1378 if (NULL != c)
1379 {
1380 GML_channel_remove (c, ch->lid_dest, ch);
1381 } 1382 }
1382 1383 LOG (GNUNET_ERROR_TYPE_DEBUG,
1383 channel_rel_free_all (ch->root_rel); 1384 "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1384 channel_rel_free_all (ch->dest_rel); 1385 (GNUNET_YES == ccc->client_ready)
1385 1386 ? "out-of-order"
1386 t = ch->t; 1387 : "client-not-ready",
1387 GCT_remove_channel (t, ch); 1388 (unsigned int) payload_size,
1388 GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO); 1389 GCCH_2s (ch),
1389 1390 ntohl (ccc->ccn.channel_of_client),
1390 GNUNET_free (ch); 1391 ccc,
1391 GCT_destroy_if_empty (t); 1392 ntohl (msg->mid.mid),
1392} 1393 ntohl (ch->mid_recv.mid));
1393 1394 /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
1394 1395 the sender may already be transmitting the previous one. Needs
1395/** 1396 experimental evaluation to see if/when this ACK helps or
1396 * Get the channel's public ID. 1397 hurts. (We might even want another option.) */
1397 * 1398 send_channel_data_ack (ch);
1398 * @param ch Channel.
1399 *
1400 * @return ID used to identify the channel with the remote peer.
1401 */
1402struct GNUNET_CADET_ChannelTunnelNumber
1403GCCH_get_id (const struct CadetChannel *ch)
1404{
1405 return ch->gid;
1406}
1407
1408
1409/**
1410 * Get the channel tunnel.
1411 *
1412 * @param ch Channel to get the tunnel from.
1413 *
1414 * @return tunnel of the channel.
1415 */
1416struct CadetTunnel *
1417GCCH_get_tunnel (const struct CadetChannel *ch)
1418{
1419 return ch->t;
1420}
1421
1422
1423/**
1424 * Get free buffer space towards the client on a specific channel.
1425 *
1426 * @param ch Channel.
1427 * @param fwd Is query about FWD traffic?
1428 *
1429 * @return Free buffer space [0 - 64]
1430 */
1431unsigned int
1432GCCH_get_buffer (struct CadetChannel *ch, int fwd)
1433{
1434 struct CadetChannelReliability *rel;
1435
1436 rel = fwd ? ch->dest_rel : ch->root_rel;
1437 LOG (GNUNET_ERROR_TYPE_DEBUG, " get buffer, channel %s\n", GCCH_2s (ch));
1438 GCCH_debug (ch, GNUNET_ERROR_TYPE_DEBUG);
1439 /* If rel is NULL it means that the end is not yet created,
1440 * most probably is a loopback channel at the point of sending
1441 * the ChannelCreate to itself.
1442 */
1443 if (NULL == rel)
1444 {
1445 LOG (GNUNET_ERROR_TYPE_DEBUG, " rel is NULL: max\n");
1446 return 64;
1447 }
1448
1449 LOG (GNUNET_ERROR_TYPE_DEBUG, " n_recv %d\n", rel->n_recv);
1450 return (64 - rel->n_recv);
1451}
1452
1453
1454/**
1455 * Get flow control status of end point: is client allow to send?
1456 *
1457 * @param ch Channel.
1458 * @param fwd Is query about FWD traffic? (Request root status).
1459 *
1460 * @return #GNUNET_YES if client is allowed to send us data.
1461 */
1462int
1463GCCH_get_allowed (struct CadetChannel *ch, int fwd)
1464{
1465 struct CadetChannelReliability *rel;
1466
1467 rel = fwd ? ch->root_rel : ch->dest_rel;
1468
1469 if (NULL == rel)
1470 {
1471 /* Probably shutting down: root/dest NULL'ed to mark disconnection */
1472 GNUNET_break (GNUNET_NO != ch->destroy);
1473 return 0;
1474 }
1475
1476 return rel->client_allowed;
1477} 1399}
1478 1400
1479 1401
1480/** 1402/**
1481 * Is the root client for this channel on this peer? 1403 * Function called once the tunnel has sent one of our messages.
1482 * 1404 * If the message is unreliable, simply frees the `crm`. If the
1483 * @param ch Channel. 1405 * message was reliable, calculate retransmission time and
1484 * @param fwd Is this for fwd traffic? 1406 * wait for ACK (or retransmit).
1485 * 1407 *
1486 * @return #GNUNET_YES in case it is. 1408 * @param cls the `struct CadetReliableMessage` that was sent
1409 * @param cid identifier of the connection within the tunnel, NULL
1410 * if transmission failed
1487 */ 1411 */
1488int 1412static void
1489GCCH_is_origin (struct CadetChannel *ch, int fwd) 1413data_sent_cb (void *cls,
1490{ 1414 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
1491 struct CadetClient *c;
1492
1493 c = fwd ? ch->root : ch->dest;
1494 return NULL != c;
1495}
1496 1415
1497 1416
1498/** 1417/**
1499 * Is the destination client for this channel on this peer? 1418 * We need to retry a transmission, the last one took too long to
1500 * 1419 * be acknowledged.
1501 * @param ch Channel.
1502 * @param fwd Is this for fwd traffic?
1503 * 1420 *
1504 * @return #GNUNET_YES in case it is. 1421 * @param cls the `struct CadetChannel` where we need to retransmit
1505 */ 1422 */
1506int 1423static void
1507GCCH_is_terminal (struct CadetChannel *ch, int fwd) 1424retry_transmission (void *cls)
1508{ 1425{
1509 struct CadetClient *c; 1426 struct CadetChannel *ch = cls;
1427 struct CadetReliableMessage *crm = ch->head_sent;
1510 1428
1511 c = fwd ? ch->dest : ch->root; 1429 ch->retry_data_task = NULL;
1512 return NULL != c; 1430 GNUNET_assert (NULL == crm->qe);
1431 LOG (GNUNET_ERROR_TYPE_DEBUG,
1432 "Retrying transmission on %s of message %u\n",
1433 GCCH_2s (ch),
1434 (unsigned int) ntohl (crm->data_message->mid.mid));
1435 crm->qe = GCT_send (ch->t,
1436 &crm->data_message->header,
1437 &data_sent_cb,
1438 crm);
1439 GNUNET_assert (NULL == ch->retry_data_task);
1513} 1440}
1514 1441
1515 1442
1516/** 1443/**
1517 * Send an end-to-end ACK message for the most recent in-sequence payload. 1444 * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1445 * the queue and tell our client that it can send more.
1518 * 1446 *
1519 * If channel is not reliable, do nothing. 1447 * @param ch the channel that got the PLAINTEXT_DATA_ACK
1520 * 1448 * @param cti identifier of the connection that delivered the message
1521 * @param ch Channel this is about. 1449 * @param crm the message that got acknowledged
1522 * @param fwd Is for FWD traffic? (ACK dest->owner)
1523 */ 1450 */
1524void 1451static void
1525GCCH_send_data_ack (struct CadetChannel *ch, int fwd) 1452handle_matching_ack (struct CadetChannel *ch,
1453 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1454 struct CadetReliableMessage *crm)
1526{ 1455{
1527 struct GNUNET_CADET_ChannelDataAckMessage msg; 1456 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1528 struct CadetChannelReliability *rel; 1457 ch->tail_sent,
1529 struct CadetReliableMessage *copy; 1458 crm);
1530 unsigned int delta; 1459 ch->pending_messages--;
1531 uint64_t mask; 1460 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1532 uint32_t ack; 1461 LOG (GNUNET_ERROR_TYPE_DEBUG,
1533 1462 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1534 if (GNUNET_NO == ch->reliable) 1463 GCCH_2s (ch),
1535 return; 1464 (unsigned int) ntohl (crm->data_message->mid.mid),
1536 1465 ch->pending_messages);
1537 rel = fwd ? ch->dest_rel : ch->root_rel; 1466 if (NULL != crm->qe)
1538 ack = rel->mid_recv - 1; 1467 {
1539 1468 GCT_send_cancel (crm->qe);
1540 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK); 1469 crm->qe = NULL;
1541 msg.header.size = htons (sizeof (msg)); 1470 }
1542 msg.ctn = ch->gid; 1471 if ( (1 == crm->num_transmissions) &&
1543 msg.mid = htonl (ack); 1472 (NULL != cti) )
1544 1473 {
1545 msg.futures = 0LL; 1474 GCC_ack_observed (cti);
1546 for (copy = rel->head_recv; NULL != copy; copy = copy->next) 1475 if (0 == memcmp (cti,
1547 { 1476 &crm->connection_taken,
1548 if (copy->type != GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA) 1477 sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
1549 { 1478 {
1550 LOG (GNUNET_ERROR_TYPE_DEBUG, " Type %s, expected DATA\n", 1479 GCC_latency_observed (cti,
1551 GC_m2s (copy->type)); 1480 GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
1552 continue;
1553 } 1481 }
1554 GNUNET_assert (GC_is_pid_bigger(copy->mid, ack));
1555 delta = copy->mid - (ack + 1);
1556 if (63 < delta)
1557 break;
1558 mask = 0x1LL << delta;
1559 msg.futures |= mask;
1560 LOG (GNUNET_ERROR_TYPE_DEBUG,
1561 " setting bit for %u (delta %u) (%lX) -> %lX\n",
1562 copy->mid, delta, mask, msg.futures);
1563 } 1482 }
1564 1483 GNUNET_free (crm->data_message);
1565 GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL); 1484 GNUNET_free (crm);
1566 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n"); 1485 send_ack_to_client (ch,
1486 (NULL == ch->owner)
1487 ? GNUNET_NO
1488 : GNUNET_YES);
1567} 1489}
1568 1490
1569 1491
1570/** 1492/**
1571 * Allow a client to send us more data, in case it was choked. 1493 * We got an acknowledgement for payload data for a channel.
1494 * Possibly resume transmissions.
1572 * 1495 *
1573 * @param ch Channel. 1496 * @param ch channel that got the ack
1574 * @param fwd Is this about FWD traffic? (Root client). 1497 * @param cti identifier of the connection that delivered the message
1498 * @param ack details about what was received
1575 */ 1499 */
1576void 1500void
1577GCCH_allow_client (struct CadetChannel *ch, int fwd) 1501GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1502 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1503 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1578{ 1504{
1579 struct CadetChannelReliability *rel; 1505 struct CadetReliableMessage *crm;
1580 unsigned int buffer; 1506 struct CadetReliableMessage *crmn;
1581 1507 int found;
1582 LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH allow\n"); 1508 uint32_t mid_base;
1509 uint64_t mid_mask;
1510 unsigned int delta;
1583 1511
1584 if (CADET_CHANNEL_READY != ch->state) 1512 GNUNET_break (GNUNET_NO == ch->is_loopback);
1513 if (GNUNET_NO == ch->reliable)
1585 { 1514 {
1586 LOG (GNUNET_ERROR_TYPE_DEBUG, " channel not ready yet!\n"); 1515 /* not expecting ACKs on unreliable channel, odd */
1516 GNUNET_break_op (0);
1587 return; 1517 return;
1588 } 1518 }
1589 1519 /* mid_base is the MID of the next message that the
1590 if (GNUNET_YES == ch->reliable) 1520 other peer expects (i.e. that is missing!), everything
1591 { 1521 LOWER (but excluding mid_base itself) was received. */
1592 rel = fwd ? ch->root_rel : ch->dest_rel; 1522 mid_base = ntohl (ack->mid.mid);
1593 if (NULL == rel) 1523 mid_mask = GNUNET_htonll (ack->futures);
1594 { 1524 found = GNUNET_NO;
1595 GNUNET_break (GNUNET_NO != ch->destroy); 1525 for (crm = ch->head_sent;
1596 return; 1526 NULL != crm;
1597 } 1527 crm = crmn)
1598 if (NULL != rel->head_sent) 1528 {
1529 crmn = crm->next;
1530 delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
1531 if (delta >= UINT_MAX - ch->max_pending_messages)
1599 { 1532 {
1600 if (64 <= rel->mid_send - rel->head_sent->mid) 1533 /* overflow, means crm was a bit in the past, so this ACK counts for it. */
1601 { 1534 LOG (GNUNET_ERROR_TYPE_DEBUG,
1602 LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n"); 1535 "Got DATA_ACK with base %u satisfying past message %u on %s\n",
1603 return; 1536 (unsigned int) mid_base,
1604 } 1537 ntohl (crm->data_message->mid.mid),
1605 else 1538 GCCH_2s (ch));
1606 { 1539 handle_matching_ack (ch,
1607 LOG (GNUNET_ERROR_TYPE_DEBUG, " gap ok: %u - %u\n", 1540 cti,
1608 rel->head_sent->mid, rel->mid_send); 1541 crm);
1609 struct CadetReliableMessage *aux; 1542 found = GNUNET_YES;
1610 for (aux = rel->head_sent; NULL != aux; aux = aux->next) 1543 continue;
1611 {
1612 LOG (GNUNET_ERROR_TYPE_DEBUG, " - sent mid %u\n", aux->mid);
1613 }
1614 }
1615 } 1544 }
1616 else 1545 delta--;
1546 if (delta >= 64)
1547 continue;
1548 LOG (GNUNET_ERROR_TYPE_DEBUG,
1549 "Testing bit %llX for mid %u (base: %u)\n",
1550 (1LLU << delta),
1551 ntohl (crm->data_message->mid.mid),
1552 mid_base);
1553 if (0 != (mid_mask & (1LLU << delta)))
1617 { 1554 {
1618 LOG (GNUNET_ERROR_TYPE_DEBUG, " head sent is NULL\n"); 1555 LOG (GNUNET_ERROR_TYPE_DEBUG,
1556 "Got DATA_ACK with mask for %u on %s\n",
1557 ntohl (crm->data_message->mid.mid),
1558 GCCH_2s (ch));
1559 handle_matching_ack (ch,
1560 cti,
1561 crm);
1562 found = GNUNET_YES;
1619 } 1563 }
1620 } 1564 }
1621 1565 if (GNUNET_NO == found)
1622 if (is_loopback (ch))
1623 buffer = GCCH_get_buffer (ch, fwd);
1624 else
1625 buffer = GCT_get_connections_buffer (ch->t);
1626
1627 if (0 == buffer)
1628 { 1566 {
1629 LOG (GNUNET_ERROR_TYPE_DEBUG, " no buffer space.\n"); 1567 /* ACK for message we already dropped, might have been a
1568 duplicate ACK? Ignore. */
1569 LOG (GNUNET_ERROR_TYPE_DEBUG,
1570 "Duplicate DATA_ACK on %s, ignoring\n",
1571 GCCH_2s (ch));
1572 GNUNET_STATISTICS_update (stats,
1573 "# duplicate DATA_ACKs",
1574 1,
1575 GNUNET_NO);
1630 return; 1576 return;
1631 } 1577 }
1632 1578 if (NULL != ch->retry_data_task)
1633 LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer space %u, allowing\n", buffer); 1579 {
1634 send_client_ack (ch, fwd); 1580 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1581 ch->retry_data_task = NULL;
1582 }
1583 if ( (NULL != ch->head_sent) &&
1584 (NULL == ch->head_sent->qe) )
1585 ch->retry_data_task
1586 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1587 &retry_transmission,
1588 ch);
1635} 1589}
1636 1590
1637 1591
1638/** 1592/**
1639 * Log channel info. 1593 * Destroy channel, based on the other peer closing the
1594 * connection. Also needs to remove this channel from
1595 * the tunnel.
1640 * 1596 *
1641 * @param ch Channel. 1597 * @param ch channel to destroy
1642 * @param level Debug level to use. 1598 * @param cti identifier of the connection that delivered the message,
1599 * NULL if we are simulating receiving a destroy due to shutdown
1643 */ 1600 */
1644void 1601void
1645GCCH_debug (struct CadetChannel *ch, enum GNUNET_ErrorType level) 1602GCCH_handle_remote_destroy (struct CadetChannel *ch,
1603 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1646{ 1604{
1647 int do_log; 1605 struct CadetChannelClient *ccc;
1648 1606
1649 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), 1607 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1650 "cadet-chn", 1608 LOG (GNUNET_ERROR_TYPE_DEBUG,
1651 __FILE__, __FUNCTION__, __LINE__); 1609 "Received remote channel DESTROY for %s\n",
1652 if (0 == do_log) 1610 GCCH_2s (ch));
1653 return; 1611 if (GNUNET_YES == ch->destroy)
1654
1655 if (NULL == ch)
1656 { 1612 {
1657 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n"); 1613 /* Local client already gone, this is instant-death. */
1614 channel_destroy (ch);
1658 return; 1615 return;
1659 } 1616 }
1660 LOG2 (level, "CHN Channel %s:%X (%p)\n", GCT_2s (ch->t), ch->gid, ch); 1617 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1661 LOG2 (level, "CHN root %p/%p\n", ch->root, ch->root_rel); 1618 if ( (NULL != ccc) &&
1662 if (NULL != ch->root) 1619 (NULL != ccc->head_recv) )
1663 {
1664 LOG2 (level, "CHN cli %s\n", GML_2s (ch->root));
1665 LOG2 (level, "CHN ready %s\n", ch->root_rel->client_ready ? "YES" : "NO");
1666 LOG2 (level, "CHN id %X\n", ch->lid_root.channel_of_client);
1667 LOG2 (level, "CHN recv %d\n", ch->root_rel->n_recv);
1668 LOG2 (level, "CHN MID r: %d, s: %d\n",
1669 ch->root_rel->mid_recv, ch->root_rel->mid_send);
1670 }
1671 LOG2 (level, "CHN dest %p/%p\n",
1672 ch->dest, ch->dest_rel);
1673 if (NULL != ch->dest)
1674 { 1620 {
1675 LOG2 (level, "CHN cli %s\n", GML_2s (ch->dest)); 1621 LOG (GNUNET_ERROR_TYPE_WARNING,
1676 LOG2 (level, "CHN ready %s\n", ch->dest_rel->client_ready ? "YES" : "NO"); 1622 "Lost end of transmission due to remote shutdown on %s\n",
1677 LOG2 (level, "CHN id %X\n", ch->lid_dest); 1623 GCCH_2s (ch));
1678 LOG2 (level, "CHN recv %d\n", ch->dest_rel->n_recv); 1624 /* FIXME: change API to notify client about truncated transmission! */
1679 LOG2 (level, "CHN MID r: %d, s: %d\n",
1680 ch->dest_rel->mid_recv, ch->dest_rel->mid_send);
1681
1682 } 1625 }
1626 ch->destroy = GNUNET_YES;
1627 if (NULL != ccc)
1628 GSC_handle_remote_channel_destroy (ccc->c,
1629 ccc->ccn,
1630 ch);
1631 channel_destroy (ch);
1683} 1632}
1684 1633
1685 1634
1686/** 1635/**
1687 * Handle an ACK given by a client. 1636 * Test if element @a e1 comes before element @a e2.
1688 *
1689 * Mark client as ready and send him any buffered data we could have for him.
1690 * 1637 *
1691 * @param ch Channel. 1638 * @param cls closure, to a flag where we indicate duplicate packets
1692 * @param fwd Is this a "FWD ACK"? (FWD ACKs are sent by dest and go BCK) 1639 * @param crm1 an element of to sort
1640 * @param crm2 another element to sort
1641 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1693 */ 1642 */
1694void 1643static int
1695GCCH_handle_local_ack (struct CadetChannel *ch, int fwd) 1644cmp_crm_by_next_retry (void *cls,
1645 struct CadetReliableMessage *crm1,
1646 struct CadetReliableMessage *crm2)
1696{ 1647{
1697 struct CadetChannelReliability *rel; 1648 if (crm1->next_retry.abs_value_us <
1698 struct CadetClient *c; 1649 crm2->next_retry.abs_value_us)
1699 1650 return GNUNET_YES;
1700 rel = fwd ? ch->dest_rel : ch->root_rel; 1651 return GNUNET_NO;
1701 c = fwd ? ch->dest : ch->root;
1702
1703 rel->client_ready = GNUNET_YES;
1704 send_client_buffered_data (ch, c, fwd);
1705
1706 if (GNUNET_YES == ch->destroy && 0 == rel->n_recv)
1707 {
1708 send_destroy (ch, GNUNET_YES);
1709 GCCH_destroy (ch);
1710 return;
1711 }
1712 /* if loopback is marked for destruction, no need to ACK to the other peer,
1713 * it requested the destruction and is already gone, therefore, else if.
1714 */
1715 else if (is_loopback (ch))
1716 {
1717 unsigned int buffer;
1718
1719 buffer = GCCH_get_buffer (ch, fwd);
1720 if (0 < buffer)
1721 GCCH_allow_client (ch, fwd);
1722
1723 return;
1724 }
1725 GCT_send_connection_acks (ch->t);
1726} 1652}
1727 1653
1728 1654
1729/** 1655/**
1730 * Handle data given by a client. 1656 * Function called once the tunnel has sent one of our messages.
1731 * 1657 * If the message is unreliable, simply frees the `crm`. If the
1732 * Check whether the client is allowed to send in this tunnel, save if channel 1658 * message was reliable, calculate retransmission time and
1733 * is reliable and send an ACK to the client if there is still buffer space 1659 * wait for ACK (or retransmit).
1734 * in the tunnel. 1660 *
1735 * 1661 * @param cls the `struct CadetReliableMessage` that was sent
1736 * @param ch Channel. 1662 * @param cid identifier of the connection within the tunnel, NULL
1737 * @param c Client which sent the data. 1663 * if transmission failed
1738 * @param fwd Is this a FWD data?
1739 * @param message Data message.
1740 * @param size Size of data.
1741 *
1742 * @return #GNUNET_OK if everything goes well, #GNUNET_SYSERR in case of en error.
1743 */ 1664 */
1744int 1665static void
1745GCCH_handle_local_data (struct CadetChannel *ch, 1666data_sent_cb (void *cls,
1746 struct CadetClient *c, 1667 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
1747 int fwd,
1748 const struct GNUNET_MessageHeader *message,
1749 size_t size)
1750{ 1668{
1751 struct CadetChannelReliability *rel; 1669 struct CadetReliableMessage *crm = cls;
1752 struct GNUNET_CADET_ChannelAppDataMessage *payload; 1670 struct CadetChannel *ch = crm->ch;
1753 uint16_t p2p_size = sizeof(struct GNUNET_CADET_ChannelAppDataMessage) + size; 1671
1754 unsigned char cbuf[p2p_size]; 1672 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1755 unsigned char buffer; 1673 GNUNET_assert (NULL != crm->qe);
1756 1674 crm->qe = NULL;
1757 /* Is the client in the channel? */ 1675 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1758 if ( !( (fwd && 1676 ch->tail_sent,
1759 ch->root == c) 1677 crm);
1760 || 1678 if (GNUNET_NO == ch->reliable)
1761 (!fwd &&
1762 ch->dest == c) ) )
1763 { 1679 {
1764 GNUNET_break_op (0); 1680 GNUNET_free (crm->data_message);
1765 return GNUNET_SYSERR; 1681 GNUNET_free (crm);
1682 ch->pending_messages--;
1683 send_ack_to_client (ch,
1684 (NULL == ch->owner)
1685 ? GNUNET_NO
1686 : GNUNET_YES);
1687 return;
1766 } 1688 }
1767 1689 if (NULL == cid)
1768 rel = fwd ? ch->root_rel : ch->dest_rel;
1769
1770 if (GNUNET_NO == rel->client_allowed)
1771 { 1690 {
1772 GNUNET_break_op (0); 1691 /* There was an error sending. */
1773 return GNUNET_SYSERR; 1692 crm->num_transmissions = GNUNET_SYSERR;
1774 } 1693 }
1775 1694 else if (GNUNET_SYSERR != crm->num_transmissions)
1776 rel->client_allowed = GNUNET_NO;
1777
1778 /* Ok, everything is correct, send the message. */
1779 payload = (struct GNUNET_CADET_ChannelAppDataMessage *) cbuf;
1780 payload->mid = htonl (rel->mid_send);
1781 rel->mid_send++;
1782 GNUNET_memcpy (&payload[1], message, size);
1783 payload->header.size = htons (p2p_size);
1784 payload->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1785 payload->ctn = ch->gid;
1786 LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channel...\n");
1787 GCCH_send_prebuilt_message (&payload->header, ch, fwd, NULL);
1788
1789 if (is_loopback (ch))
1790 buffer = GCCH_get_buffer (ch, fwd);
1791 else
1792 buffer = GCT_get_connections_buffer (ch->t);
1793
1794 if (0 < buffer)
1795 GCCH_allow_client (ch, fwd);
1796
1797 return GNUNET_OK;
1798}
1799
1800
1801/**
1802 * Handle a channel destroy requested by a client.
1803 *
1804 * TODO: add "reason" field
1805 *
1806 * Destroy the channel and the tunnel in case this was the last channel.
1807 *
1808 * @param ch Channel.
1809 * @param c Client that requested the destruction (to avoid notifying him).
1810 * @param is_root Is the request coming from root?
1811 */
1812void
1813GCCH_handle_local_destroy (struct CadetChannel *ch,
1814 struct CadetClient *c,
1815 int is_root)
1816{
1817 ch->destroy = GNUNET_YES;
1818 /* Cleanup after the tunnel */
1819 if (GNUNET_NO == is_root && c == ch->dest)
1820 { 1695 {
1821 LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is destination.\n", GML_2s (c)); 1696 /* Increment transmission counter, and possibly store @a cid
1822 GML_client_delete_channel (c, ch, ch->lid_dest); 1697 if this was the first transmission. */
1823 ch->dest = NULL; 1698 crm->num_transmissions++;
1699 if (1 == crm->num_transmissions)
1700 {
1701 crm->first_transmission_time = GNUNET_TIME_absolute_get ();
1702 crm->connection_taken = *cid;
1703 GCC_ack_expected (cid);
1704 }
1824 } 1705 }
1825 if (GNUNET_YES == is_root && c == ch->root) 1706 if ( (0 == crm->retry_delay.rel_value_us) &&
1707 (NULL != cid) )
1826 { 1708 {
1827 LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is owner.\n", GML_2s (c)); 1709 struct CadetConnection *cc = GCC_lookup (cid);
1828 GML_client_delete_channel (c, ch, ch->lid_root);
1829 ch->root = NULL;
1830 }
1831 1710
1832 send_destroy (ch, GNUNET_NO); 1711 if (NULL != cc)
1833 if (0 == ch->pending_messages) 1712 crm->retry_delay = GCC_get_metrics (cc)->aged_latency;
1834 GCCH_destroy (ch); 1713 else
1714 crm->retry_delay = ch->retry_time;
1715 }
1716 crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1717 crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1718 MIN_RTT_DELAY);
1719 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1720
1721 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1722 cmp_crm_by_next_retry,
1723 NULL,
1724 ch->head_sent,
1725 ch->tail_sent,
1726 crm);
1727 LOG (GNUNET_ERROR_TYPE_DEBUG,
1728 "Message %u sent, next transmission on %s in %s\n",
1729 (unsigned int) ntohl (crm->data_message->mid.mid),
1730 GCCH_2s (ch),
1731 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1732 GNUNET_YES));
1733 if (NULL == ch->head_sent->qe)
1734 {
1735 if (NULL != ch->retry_data_task)
1736 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1737 ch->retry_data_task
1738 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1739 &retry_transmission,
1740 ch);
1741 }
1835} 1742}
1836 1743
1837 1744
1838/** 1745/**
1839 * Handle a channel create requested by a client. 1746 * Handle data given by a client.
1840 *
1841 * Create the channel and the tunnel in case this was the first0 channel.
1842 * 1747 *
1843 * @param c Client that requested the creation (will be the root). 1748 * Check whether the client is allowed to send in this tunnel, save if
1844 * @param msg Create Channel message. 1749 * channel is reliable and send an ACK to the client if there is still
1750 * buffer space in the tunnel.
1845 * 1751 *
1846 * @return #GNUNET_OK if everything went fine, #GNUNET_SYSERR otherwise. 1752 * @param ch Channel.
1753 * @param sender_ccn ccn of the sender
1754 * @param buf payload to transmit.
1755 * @param buf_len number of bytes in @a buf
1756 * @return #GNUNET_OK if everything goes well,
1757 * #GNUNET_SYSERR in case of an error.
1847 */ 1758 */
1848int 1759int
1849GCCH_handle_local_create (struct CadetClient *c, 1760GCCH_handle_local_data (struct CadetChannel *ch,
1850 struct GNUNET_CADET_LocalChannelCreateMessage *msg) 1761 struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1762 const char *buf,
1763 size_t buf_len)
1851{ 1764{
1852 struct CadetChannel *ch; 1765 struct CadetReliableMessage *crm;
1853 struct CadetTunnel *t;
1854 struct CadetPeer *peer;
1855 struct GNUNET_CADET_ClientChannelNumber ccn;
1856
1857 LOG (GNUNET_ERROR_TYPE_DEBUG, " towards %s:%u\n",
1858 GNUNET_i2s (&msg->peer), GNUNET_h2s (&msg->port));
1859 ccn = msg->ccn;
1860
1861 /* Sanity check for duplicate channel IDs */
1862 if (NULL != GML_channel_get (c, ccn))
1863 {
1864 GNUNET_break (0);
1865 return GNUNET_SYSERR;
1866 }
1867 1766
1868 peer = GCP_get (&msg->peer, GNUNET_YES); 1767 if (ch->pending_messages > ch->max_pending_messages)
1869 GCP_add_tunnel (peer);
1870 t = GCP_get_tunnel (peer);
1871
1872 if (GCP_get_short_id (peer) == myid)
1873 {
1874 GCT_change_cstate (t, CADET_TUNNEL_READY);
1875 }
1876 else
1877 {
1878 /* FIXME change to a tunnel API, eliminate ch <-> peer connection */
1879 GCP_connect (peer);
1880 }
1881
1882 /* Create channel */
1883 ch = channel_new (t, c, ccn);
1884 if (NULL == ch)
1885 { 1768 {
1886 GNUNET_break (0); 1769 GNUNET_break (0);
1887 return GNUNET_SYSERR; 1770 return GNUNET_SYSERR;
1888 } 1771 }
1889 ch->port = msg->port; 1772 if (GNUNET_YES == ch->destroy)
1890 channel_set_options (ch, ntohl (msg->opt));
1891
1892 /* In unreliable channels, we'll use the DLL to buffer BCK data */
1893 ch->root_rel = GNUNET_new (struct CadetChannelReliability);
1894 ch->root_rel->ch = ch;
1895 ch->root_rel->retry_timer = CADET_RETRANSMIT_TIME;
1896 ch->root_rel->expected_delay.rel_value_us = 0;
1897
1898 LOG (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s\n", GCCH_2s (ch));
1899
1900 send_create (ch);
1901
1902 return GNUNET_OK;
1903}
1904
1905
1906/**
1907 * Handler for cadet network payload traffic.
1908 *
1909 * @param ch Channel for the message.
1910 * @param msg Unencryted data message.
1911 * @param fwd Is this message fwd? This only is meaningful in loopback channels.
1912 * #GNUNET_YES if message is FWD on the respective channel (loopback)
1913 * #GNUNET_NO if message is BCK on the respective channel (loopback)
1914 * #GNUNET_SYSERR if message on a one-ended channel (remote)
1915 */
1916void
1917GCCH_handle_data (struct CadetChannel *ch,
1918 const struct GNUNET_CADET_ChannelAppDataMessage *msg,
1919 int fwd)
1920{
1921 struct CadetChannelReliability *rel;
1922 struct CadetClient *c;
1923 struct GNUNET_MessageHeader *payload_msg;
1924 uint32_t mid;
1925 uint16_t payload_type;
1926 uint16_t payload_size;
1927
1928 /* If this is a remote (non-loopback) channel, find 'fwd'. */
1929 if (GNUNET_SYSERR == fwd)
1930 { 1773 {
1931 if (is_loopback (ch)) 1774 /* we are going down, drop messages */
1932 { 1775 return GNUNET_OK;
1933 /* It is a loopback channel after all... */
1934 GNUNET_break (0);
1935 return;
1936 }
1937 fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
1938 } 1776 }
1777 ch->pending_messages++;
1939 1778
1940 /* Initialize FWD/BCK data */ 1779 if (GNUNET_YES == ch->is_loopback)
1941 c = fwd ? ch->dest : ch->root;
1942 rel = fwd ? ch->dest_rel : ch->root_rel;
1943
1944 if (NULL == c)
1945 { 1780 {
1946 GNUNET_break (GNUNET_NO != ch->destroy); 1781 struct CadetChannelClient *receiver;
1947 return; 1782 struct GNUNET_MQ_Envelope *env;
1948 } 1783 struct GNUNET_CADET_LocalData *ld;
1784 int ack_to_owner;
1949 1785
1950 if (CADET_CHANNEL_READY != ch->state) 1786 env = GNUNET_MQ_msg_extra (ld,
1951 { 1787 buf_len,
1952 if (GNUNET_NO == fwd) 1788 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1789 if ( (NULL != ch->owner) &&
1790 (sender_ccn.channel_of_client ==
1791 ch->owner->ccn.channel_of_client) )
1953 { 1792 {
1954 /* If we are the root, this means the other peer has sent traffic before 1793 receiver = ch->dest;
1955 * receiving our ACK. Even if the SYNACK goes missing, no traffic should 1794 ack_to_owner = GNUNET_YES;
1956 * be sent before the ACK.
1957 */
1958 GNUNET_break_op (0);
1959 return;
1960 } 1795 }
1961 /* If we are the dest, this means that the SYNACK got to the root but 1796 else if ( (NULL != ch->dest) &&
1962 * the ACK went missing. Treat this as an ACK. 1797 (sender_ccn.channel_of_client ==
1963 */ 1798 ch->dest->ccn.channel_of_client) )
1964 channel_confirm (ch, GNUNET_NO);
1965 }
1966
1967 payload_msg = (struct GNUNET_MessageHeader *) &msg[1];
1968 payload_type = ntohs (payload_msg->type);
1969 payload_size = ntohs (payload_msg->size);
1970
1971 GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1972 GNUNET_STATISTICS_update (stats, "# bytes received", payload_size, GNUNET_NO);
1973
1974 mid = ntohl (msg->mid);
1975 LOG (GNUNET_ERROR_TYPE_INFO, "<== %s (%s %4u) on chan %s (%p) %s [%5u]\n",
1976 GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA), GC_m2s (payload_type), mid,
1977 GCCH_2s (ch), ch, GC_f2s (fwd), ntohs (msg->header.size));
1978
1979 if ( (GNUNET_NO == ch->reliable) ||
1980 ( (! GC_is_pid_bigger (rel->mid_recv, mid)) &&
1981 GC_is_pid_bigger (rel->mid_recv + 64, mid) ) )
1982 {
1983 if (GNUNET_YES == ch->reliable)
1984 { 1799 {
1985 /* Is this the exact next expected messasge? */ 1800 receiver = ch->owner;
1986 if (mid == rel->mid_recv) 1801 ack_to_owner = GNUNET_NO;
1987 {
1988 LOG (GNUNET_ERROR_TYPE_DEBUG,
1989 "as expected, sending to client\n");
1990 send_client_data (ch, msg, fwd);
1991 }
1992 else
1993 {
1994 LOG (GNUNET_ERROR_TYPE_DEBUG,
1995 "save for later\n");
1996 add_buffered_data (msg, rel);
1997 }
1998 } 1802 }
1999 else 1803 else
2000 { 1804 {
2001 /* Tunnel is unreliable: send to clients directly */ 1805 GNUNET_break (0);
2002 /* FIXME: accept Out Of Order traffic */ 1806 return GNUNET_SYSERR;
2003 rel->mid_recv = mid + 1;
2004 send_client_data (ch, msg, fwd);
2005 } 1807 }
2006 } 1808 GNUNET_assert (NULL != receiver);
2007 else 1809 ld->ccn = receiver->ccn;
2008 { 1810 GNUNET_memcpy (&ld[1],
2009 GNUNET_STATISTICS_update (stats, "# duplicate MID", 1, GNUNET_NO); 1811 buf,
2010 if (GC_is_pid_bigger (rel->mid_recv, mid)) 1812 buf_len);
1813 if (GNUNET_YES == receiver->client_ready)
2011 { 1814 {
2012 GNUNET_break_op (0); 1815 ch->pending_messages--;
2013 LOG (GNUNET_ERROR_TYPE_WARNING, 1816 GSC_send_to_client (receiver->c,
2014 "MID %u on channel %s not expected (window: %u - %u). Dropping!\n", 1817 env);
2015 mid, GCCH_2s (ch), rel->mid_recv, rel->mid_recv + 63); 1818 send_ack_to_client (ch,
1819 ack_to_owner);
2016 } 1820 }
2017 else 1821 else
2018 { 1822 {
2019 LOG (GNUNET_ERROR_TYPE_INFO, 1823 struct CadetOutOfOrderMessage *oom;
2020 "Duplicate MID %u, channel %s (expecting MID %u). Re-sending ACK!\n", 1824
2021 mid, GCCH_2s (ch), rel->mid_recv); 1825 oom = GNUNET_new (struct CadetOutOfOrderMessage);
2022 if (NULL != rel->uniq) 1826 oom->env = env;
2023 { 1827 GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
2024 LOG (GNUNET_ERROR_TYPE_WARNING, 1828 receiver->tail_recv,
2025 "We are trying to send an ACK, but don't seem have the " 1829 oom);
2026 "bandwidth. Have you set enough [ats] QUOTA in your config?\n"); 1830 receiver->num_recv++;
2027 }
2028
2029 } 1831 }
2030 } 1832 return GNUNET_OK;
2031 1833 }
2032 GCCH_send_data_ack (ch, fwd); 1834
1835 /* Everything is correct, send the message. */
1836 crm = GNUNET_malloc (sizeof (*crm));
1837 crm->ch = ch;
1838 crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1839 + buf_len);
1840 crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1841 crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1842 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1843 crm->data_message->mid = ch->mid_send;
1844 crm->data_message->ctn = ch->ctn;
1845 GNUNET_memcpy (&crm->data_message[1],
1846 buf,
1847 buf_len);
1848 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1849 ch->tail_sent,
1850 crm);
1851 LOG (GNUNET_ERROR_TYPE_DEBUG,
1852 "Sending message %u from local client to %s with %u bytes\n",
1853 ntohl (crm->data_message->mid.mid),
1854 GCCH_2s (ch),
1855 buf_len);
1856 if (NULL != ch->retry_data_task)
1857 {
1858 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1859 ch->retry_data_task = NULL;
1860 }
1861 crm->qe = GCT_send (ch->t,
1862 &crm->data_message->header,
1863 &data_sent_cb,
1864 crm);
1865 GNUNET_assert (NULL == ch->retry_data_task);
1866 return GNUNET_OK;
2033} 1867}
2034 1868
2035 1869
2036/** 1870/**
2037 * Handler for cadet network traffic end-to-end ACKs. 1871 * Handle ACK from client on local channel. Means the client is ready
1872 * for more data, see if we have any for it.
2038 * 1873 *
2039 * @param ch Channel on which we got this message. 1874 * @param ch channel to destroy
2040 * @param msg Data message. 1875 * @param client_ccn ccn of the client sending the ack
2041 * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2042 * #GNUNET_YES if message is FWD on the respective channel (loopback)
2043 * #GNUNET_NO if message is BCK on the respective channel (loopback)
2044 * #GNUNET_SYSERR if message on a one-ended channel (remote)
2045 */ 1876 */
2046void 1877void
2047GCCH_handle_data_ack (struct CadetChannel *ch, 1878GCCH_handle_local_ack (struct CadetChannel *ch,
2048 const struct GNUNET_CADET_ChannelDataAckMessage *msg, 1879 struct GNUNET_CADET_ClientChannelNumber client_ccn)
2049 int fwd)
2050{ 1880{
2051 struct CadetChannelReliability *rel; 1881 struct CadetChannelClient *ccc;
2052 struct CadetReliableMessage *copy; 1882 struct CadetOutOfOrderMessage *com;
2053 struct CadetReliableMessage *next; 1883
2054 uint32_t ack; 1884 if ( (NULL != ch->owner) &&
2055 int work; 1885 (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
2056 1886 ccc = ch->owner;
2057 /* If this is a remote (non-loopback) channel, find 'fwd'. */ 1887 else if ( (NULL != ch->dest) &&
2058 if (GNUNET_SYSERR == fwd) 1888 (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
2059 { 1889 ccc = ch->dest;
2060 if (is_loopback (ch))
2061 {
2062 /* It is a loopback channel after all... */
2063 GNUNET_break (0);
2064 return;
2065 }
2066 /* Inverted: if message came 'FWD' is a 'BCK ACK'. */
2067 fwd = (NULL != ch->dest) ? GNUNET_NO : GNUNET_YES;
2068 }
2069
2070 ack = ntohl (msg->mid);
2071 LOG (GNUNET_ERROR_TYPE_INFO,
2072 "<== %s (0x%010lX %4u) on chan %s (%p) %s [%5u]\n",
2073 GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK), msg->futures, ack,
2074 GCCH_2s (ch), ch, GC_f2s (fwd), ntohs (msg->header.size));
2075
2076 if (GNUNET_YES == fwd)
2077 rel = ch->root_rel;
2078 else 1890 else
2079 rel = ch->dest_rel; 1891 GNUNET_assert (0);
2080 1892 ccc->client_ready = GNUNET_YES;
2081 if (NULL == rel) 1893 com = ccc->head_recv;
1894 if (NULL == com)
2082 { 1895 {
2083 GNUNET_break (GNUNET_NO != ch->destroy); 1896 LOG (GNUNET_ERROR_TYPE_DEBUG,
2084 return; 1897 "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
2085 } 1898 GSC_2s (ccc->c),
2086 1899 ntohl (client_ccn.channel_of_client),
2087 /* Free ACK'd copies: no need to retransmit those anymore FIXME refactor */ 1900 GCCH_2s (ch),
2088 for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next) 1901 ntohl (ccc->ccn.channel_of_client),
2089 { 1902 ccc);
2090 if (GC_is_pid_bigger (copy->mid, ack)) 1903 return; /* none pending */
2091 { 1904 }
2092 LOG (GNUNET_ERROR_TYPE_DEBUG, " head %u, out!\n", copy->mid); 1905 if (GNUNET_YES == ch->is_loopback)
2093 if (0 < channel_rel_free_sent (rel, msg)) 1906 {
2094 work = GNUNET_YES; 1907 int to_owner;
2095 break; 1908
2096 } 1909 /* Messages are always in-order, just send */
2097 work = GNUNET_YES; 1910 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
2098 LOG (GNUNET_ERROR_TYPE_DEBUG, " id %u\n", copy->mid); 1911 ccc->tail_recv,
2099 next = copy->next; 1912 com);
2100 if (GNUNET_YES == rel_message_free (copy, GNUNET_YES)) 1913 ccc->num_recv--;
2101 { 1914 GSC_send_to_client (ccc->c,
2102 LOG (GNUNET_ERROR_TYPE_DEBUG, " channel destoyed\n"); 1915 com->env);
2103 return; 1916 /* Notify sender that we can receive more */
2104 } 1917 if ( (NULL != ch->owner) &&
2105 } 1918 (ccc->ccn.channel_of_client ==
2106 1919 ch->owner->ccn.channel_of_client) )
2107 /* ACK client if needed and possible */
2108 GCCH_allow_client (ch, fwd);
2109
2110 /* If some message was free'd, update the retransmission delay */
2111 if (GNUNET_YES == work)
2112 {
2113 if (NULL != rel->retry_task)
2114 { 1920 {
2115 GNUNET_SCHEDULER_cancel (rel->retry_task); 1921 to_owner = GNUNET_NO;
2116 rel->retry_task = NULL;
2117 if (NULL != rel->head_sent && NULL == rel->head_sent->chq)
2118 {
2119 struct GNUNET_TIME_Absolute new_target;
2120 struct GNUNET_TIME_Relative delay;
2121
2122 delay = GNUNET_TIME_relative_saturating_multiply (rel->retry_timer,
2123 CADET_RETRANSMIT_MARGIN);
2124 new_target = GNUNET_TIME_absolute_add (rel->head_sent->timestamp,
2125 delay);
2126 delay = GNUNET_TIME_absolute_get_remaining (new_target);
2127 rel->retry_task =
2128 GNUNET_SCHEDULER_add_delayed (delay,
2129 &channel_retransmit_message,
2130 rel);
2131 }
2132 } 1922 }
2133 else 1923 else
2134 { 1924 {
2135 /* Work was done but no task was pending. 1925 GNUNET_assert ( (NULL != ch->dest) &&
2136 * Task was cancelled by a retransmission that is sitting in the queue. 1926 (ccc->ccn.channel_of_client ==
2137 */ 1927 ch->dest->ccn.channel_of_client) );
2138 // FIXME add test to make sure this is the case, probably add return 1928 to_owner = GNUNET_YES;
2139 // value to GCCH_send_prebuilt_message
2140 } 1929 }
1930 send_ack_to_client (ch,
1931 to_owner);
1932 GNUNET_free (com);
1933 return;
2141 } 1934 }
2142}
2143 1935
2144 1936 if ( (com->mid.mid != ch->mid_recv.mid) &&
2145/** 1937 (GNUNET_NO == ch->out_of_order) &&
2146 * Handler for channel create messages. 1938 (GNUNET_YES == ch->reliable) )
2147 *
2148 * Does not have fwd parameter because it's always 'FWD': channel is incoming.
2149 *
2150 * @param t Tunnel this channel will be in.
2151 * @param msg Channel crate message.
2152 */
2153struct CadetChannel *
2154GCCH_handle_create (struct CadetTunnel *t,
2155 const struct GNUNET_CADET_ChannelOpenMessage *msg)
2156{
2157 struct GNUNET_CADET_ClientChannelNumber ccn;
2158 struct GNUNET_CADET_ChannelTunnelNumber gid;
2159 struct CadetChannel *ch;
2160 struct CadetClient *c;
2161 int new_channel;
2162 const struct GNUNET_HashCode *port;
2163
2164 gid = msg->ctn;
2165 ch = GCT_get_channel (t, gid);
2166 if (NULL == ch)
2167 {
2168 /* Create channel */
2169 ccn.channel_of_client = htonl (0);
2170 ch = channel_new (t, NULL, ccn);
2171 ch->gid = gid;
2172 channel_set_options (ch, ntohl (msg->opt));
2173 new_channel = GNUNET_YES;
2174 }
2175 else
2176 { 1939 {
2177 new_channel = GNUNET_NO; 1940 LOG (GNUNET_ERROR_TYPE_DEBUG,
1941 "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1942 GSC_2s (ccc->c),
1943 ntohl (ccc->ccn.channel_of_client),
1944 ntohl (com->mid.mid),
1945 ntohl (ch->mid_recv.mid));
1946 return; /* missing next one in-order */
2178 } 1947 }
2179 port = &msg->port;
2180
2181 LOG (GNUNET_ERROR_TYPE_INFO,
2182 "<== %s ( 0x%08X %4u) on chan %s (%p) %s [%5u]\n",
2183 GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN), ccn, port,
2184 GCCH_2s (ch), ch, GC_f2s (GNUNET_YES), ntohs (msg->header.size));
2185
2186 if (GNUNET_YES == new_channel || GCT_is_loopback (t))
2187 {
2188 /* Find a destination client */
2189 ch->port = *port;
2190 LOG (GNUNET_ERROR_TYPE_DEBUG, " port %s\n", GNUNET_h2s (port));
2191 c = GML_client_get_by_port (port);
2192 if (NULL == c)
2193 {
2194 LOG (GNUNET_ERROR_TYPE_DEBUG, " no client has port registered\n");
2195 if (is_loopback (ch))
2196 {
2197 LOG (GNUNET_ERROR_TYPE_DEBUG, " loopback: destroy on handler\n");
2198 send_nack (ch);
2199 }
2200 else
2201 {
2202 LOG (GNUNET_ERROR_TYPE_DEBUG, " not loopback: destroy now\n");
2203 send_nack (ch);
2204 GCCH_destroy (ch);
2205 }
2206 return NULL;
2207 }
2208 else
2209 {
2210 LOG (GNUNET_ERROR_TYPE_DEBUG, " client %p has port registered\n", c);
2211 }
2212
2213 add_destination (ch, c);
2214 if (GNUNET_YES == ch->reliable)
2215 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reliable\n");
2216 else
2217 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not Reliable\n");
2218 1948
2219 send_client_create (ch); 1949 LOG (GNUNET_ERROR_TYPE_DEBUG,
2220 ch->state = CADET_CHANNEL_SENT; 1950 "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
2221 } 1951 ntohl (com->mid.mid),
2222 else 1952 GSC_2s (ccc->c),
2223 { 1953 ntohl (ccc->ccn.channel_of_client),
2224 LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate create channel\n"); 1954 GCCH_2s (ch));
2225 if (NULL != ch->dest_rel->retry_task)
2226 {
2227 LOG (GNUNET_ERROR_TYPE_DEBUG, " clearing retry task\n");
2228 /* we were waiting to re-send our 'SYNACK', wait no more! */
2229 GNUNET_SCHEDULER_cancel (ch->dest_rel->retry_task);
2230 ch->dest_rel->retry_task = NULL;
2231 }
2232 else if (NULL != ch->dest_rel->uniq)
2233 {
2234 /* we are waiting to for our 'SYNACK' to leave the queue, all done! */
2235 return ch;
2236 }
2237 }
2238 send_ack (ch, GNUNET_YES);
2239 1955
2240 return ch; 1956 /* all good, pass next message to client */
1957 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1958 ccc->tail_recv,
1959 com);
1960 ccc->num_recv--;
1961 /* FIXME: if unreliable, this is not aggressive
1962 enough, as it would be OK to have lost some! */
1963
1964 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1965 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1966 ccc->client_ready = GNUNET_NO;
1967 GSC_send_to_client (ccc->c,
1968 com->env);
1969 GNUNET_free (com);
1970 send_channel_data_ack (ch);
1971 if (NULL != ccc->head_recv)
1972 return;
1973 if (GNUNET_NO == ch->destroy)
1974 return;
1975 GCT_send_channel_destroy (ch->t,
1976 ch->ctn);
1977 channel_destroy (ch);
2241} 1978}
2242 1979
2243 1980
2244/** 1981#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
2245 * Handler for channel NACK messages.
2246 *
2247 * NACK messages always go dest -> root, no need for 'fwd' or 'msg' parameter.
2248 *
2249 * @param ch Channel.
2250 */
2251void
2252GCCH_handle_nack (struct CadetChannel *ch)
2253{
2254 LOG (GNUNET_ERROR_TYPE_INFO,
2255 "<== %s ( 0x%08X %4u) on chan %s (%p) %s [%5u]\n",
2256 GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED), ch->gid, 0,
2257 GCCH_2s (ch), ch, "---", 0);
2258
2259 send_client_nack (ch);
2260 GCCH_destroy (ch);
2261}
2262 1982
2263 1983
2264/** 1984/**
2265 * Handler for channel ack messages. 1985 * Log channel info.
2266 * 1986 *
2267 * @param ch Channel. 1987 * @param ch Channel.
2268 * @param msg Message. 1988 * @param level Debug level to use.
2269 * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2270 * #GNUNET_YES if message is FWD on the respective channel (loopback)
2271 * #GNUNET_NO if message is BCK on the respective channel (loopback)
2272 * #GNUNET_SYSERR if message on a one-ended channel (remote)
2273 */
2274void
2275GCCH_handle_ack (struct CadetChannel *ch,
2276 const struct GNUNET_CADET_ChannelManageMessage *msg,
2277 int fwd)
2278{
2279 LOG (GNUNET_ERROR_TYPE_INFO,
2280 "<== %s ( 0x%08X %4u) on chan %s (%p) %s [%5u]\n",
2281 GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK), ch->gid, 0,
2282 GCCH_2s (ch), ch, GC_f2s (fwd), ntohs (msg->header.size));
2283
2284 /* If this is a remote (non-loopback) channel, find 'fwd'. */
2285 if (GNUNET_SYSERR == fwd)
2286 {
2287 if (is_loopback (ch))
2288 {
2289 /* It is a loopback channel after all... */
2290 GNUNET_break (0);
2291 return;
2292 }
2293 fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
2294 }
2295
2296 channel_confirm (ch, !fwd);
2297}
2298
2299
2300/**
2301 * Handler for channel destroy messages.
2302 *
2303 * @param ch Channel to be destroyed of.
2304 * @param msg Message.
2305 * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2306 * #GNUNET_YES if message is FWD on the respective channel (loopback)
2307 * #GNUNET_NO if message is BCK on the respective channel (loopback)
2308 * #GNUNET_SYSERR if message on a one-ended channel (remote)
2309 */ 1989 */
2310void 1990void
2311GCCH_handle_destroy (struct CadetChannel *ch, 1991GCCH_debug (struct CadetChannel *ch,
2312 const struct GNUNET_CADET_ChannelManageMessage *msg, 1992 enum GNUNET_ErrorType level)
2313 int fwd)
2314{ 1993{
2315 struct CadetChannelReliability *rel; 1994 int do_log;
2316
2317 LOG (GNUNET_ERROR_TYPE_INFO,
2318 "<== %s ( 0x%08X %4u) on chan %s (%p) %s [%5u]\n",
2319 GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY), ch->gid, 0,
2320 GCCH_2s (ch), ch, GC_f2s (fwd), ntohs (msg->header.size));
2321
2322 /* If this is a remote (non-loopback) channel, find 'fwd'. */
2323 if (GNUNET_SYSERR == fwd)
2324 {
2325 if (is_loopback (ch))
2326 {
2327 /* It is a loopback channel after all... */
2328 GNUNET_break (0);
2329 return;
2330 }
2331 fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
2332 }
2333 1995
2334 GCCH_debug (ch, GNUNET_ERROR_TYPE_DEBUG); 1996 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
2335 if ( (fwd && NULL == ch->dest) || (!fwd && NULL == ch->root) ) 1997 "cadet-chn",
2336 { 1998 __FILE__, __FUNCTION__, __LINE__);
2337 /* Not for us (don't destroy twice a half-open loopback channel) */ 1999 if (0 == do_log)
2338 return; 2000 return;
2339 }
2340
2341 rel = fwd ? ch->dest_rel : ch->root_rel;
2342 if (0 == rel->n_recv)
2343 {
2344 send_destroy (ch, GNUNET_YES);
2345 GCCH_destroy (ch);
2346 }
2347 else
2348 {
2349 ch->destroy = GNUNET_YES;
2350 }
2351}
2352
2353
2354/**
2355 * Sends an already built message on a channel.
2356 *
2357 * If the channel is on a loopback tunnel, notifies the appropriate destination
2358 * client locally.
2359 *
2360 * On a normal channel passes the message to the tunnel for encryption and
2361 * sending on a connection.
2362 *
2363 * This function DOES NOT save the message for retransmission.
2364 *
2365 * @param message Message to send. Function makes a copy of it.
2366 * @param ch Channel on which this message is transmitted.
2367 * @param fwd Is this a fwd message?
2368 * @param existing_copy This is a retransmission, don't save a copy.
2369 */
2370void
2371GCCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2372 struct CadetChannel *ch, int fwd,
2373 void *existing_copy)
2374{
2375 struct CadetChannelQueue *chq;
2376 uint32_t data_id;
2377 uint16_t type;
2378 uint16_t size;
2379 char info[32];
2380
2381 type = ntohs (message->type);
2382 size = ntohs (message->size);
2383
2384 data_id = 0;
2385 switch (type)
2386 {
2387 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA:
2388 {
2389 struct GNUNET_CADET_ChannelAppDataMessage *data_msg;
2390 struct GNUNET_MessageHeader *payload_msg;
2391 uint16_t payload_type;
2392
2393 data_msg = (struct GNUNET_CADET_ChannelAppDataMessage *) message;
2394 data_id = ntohl (data_msg->mid);
2395 payload_msg = (struct GNUNET_MessageHeader *) &data_msg[1];
2396 payload_type = ntohs (payload_msg->type);
2397 strncpy (info, GC_m2s (payload_type), 31);
2398 info[31] = '\0';
2399 break;
2400 }
2401 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK:
2402 {
2403 struct GNUNET_CADET_ChannelDataAckMessage *ack_msg;
2404 ack_msg = (struct GNUNET_CADET_ChannelDataAckMessage *) message;
2405 data_id = ntohl (ack_msg->mid);
2406 SPRINTF (info, "0x%010lX",
2407 (unsigned long int) ack_msg->futures);
2408 break;
2409 }
2410 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
2411 {
2412 struct GNUNET_CADET_ChannelOpenMessage *cc_msg;
2413 cc_msg = (struct GNUNET_CADET_ChannelOpenMessage *) message;
2414 SPRINTF (info, " 0x%08X", ntohl (cc_msg->ctn.cn));
2415 break;
2416 }
2417 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK:
2418 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED:
2419 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
2420 {
2421 struct GNUNET_CADET_ChannelManageMessage *m_msg;
2422 m_msg = (struct GNUNET_CADET_ChannelManageMessage *) message;
2423 SPRINTF (info, " 0x%08X", ntohl (m_msg->ctn.cn));
2424 break;
2425 }
2426 default:
2427 info[0] = '\0';
2428 }
2429 LOG (GNUNET_ERROR_TYPE_INFO,
2430 "==> %s (%12s %4u) on chan %s (%p) %s [%5u]\n",
2431 GC_m2s (type), info, data_id,
2432 GCCH_2s (ch), ch, GC_f2s (fwd), size);
2433 2001
2434 if (GCT_is_loopback (ch->t)) 2002 if (NULL == ch)
2435 { 2003 {
2436 handle_loopback (ch, message, fwd); 2004 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
2437 return; 2005 return;
2438 } 2006 }
2439 2007 LOG2 (level,
2440 switch (type) 2008 "CHN %s:%X (%p)\n",
2009 GCT_2s (ch->t),
2010 ch->ctn,
2011 ch);
2012 if (NULL != ch->owner)
2441 { 2013 {
2442 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA: 2014 LOG2 (level,
2443 if (GNUNET_YES == ch->reliable) 2015 "CHN origin %s ready %s local-id: %u\n",
2444 { 2016 GSC_2s (ch->owner->c),
2445 chq = GNUNET_new (struct CadetChannelQueue); 2017 ch->owner->client_ready ? "YES" : "NO",
2446 chq->type = type; 2018 ntohl (ch->owner->ccn.channel_of_client));
2447 if (NULL == existing_copy)
2448 chq->copy = channel_save_copy (ch, message, fwd);
2449 else
2450 {
2451 chq->copy = (struct CadetReliableMessage *) existing_copy;
2452 if (NULL != chq->copy->chq)
2453 {
2454 /* Last retransmission was queued but not yet sent!
2455 * This retransmission was scheduled by a ch_message_sent which
2456 * followed a very fast RTT, so the tiny delay made the
2457 * retransmission function to execute before the previous
2458 * retransmitted message even had a chance to leave the peer.
2459 * Cancel this message and wait until the pending
2460 * retransmission leaves the peer and ch_message_sent starts
2461 * the timer for the next one.
2462 */
2463 GNUNET_free (chq);
2464 LOG (GNUNET_ERROR_TYPE_DEBUG,
2465 " exisitng copy not yet transmitted!\n");
2466 return;
2467 }
2468 LOG (GNUNET_ERROR_TYPE_DEBUG,
2469 " using existing copy: %p {r:%p q:%p t:%u}\n",
2470 existing_copy,
2471 chq->copy->rel, chq->copy->chq, chq->copy->type);
2472 }
2473 LOG (GNUNET_ERROR_TYPE_DEBUG, " new chq: %p\n", chq);
2474 chq->copy->chq = chq;
2475 chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL,
2476 GNUNET_YES,
2477 &ch_message_sent, chq);
2478 /* q itself is stored in copy */
2479 GNUNET_assert (NULL != chq->tq || GNUNET_NO != ch->destroy);
2480 }
2481 else
2482 {
2483 fire_and_forget (message, ch, GNUNET_NO);
2484 }
2485 break;
2486
2487
2488 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK:
2489 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN:
2490 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK:
2491 chq = GNUNET_new (struct CadetChannelQueue);
2492 chq->type = type;
2493 chq->rel = fwd ? ch->root_rel : ch->dest_rel;
2494 if (NULL != chq->rel->uniq)
2495 {
2496 if (NULL != chq->rel->uniq->tq)
2497 {
2498 GCT_cancel (chq->rel->uniq->tq);
2499 /* ch_message_sent is called, freeing and NULLing uniq */
2500 GNUNET_break (NULL == chq->rel->uniq);
2501 }
2502 else
2503 {
2504 GNUNET_break (0);
2505 GNUNET_free (chq->rel->uniq);
2506 }
2507 }
2508
2509 chq->rel->uniq = chq;
2510 chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL, GNUNET_YES,
2511 &ch_message_sent, chq);
2512 if (NULL == chq->tq)
2513 {
2514 GNUNET_break (0);
2515 chq->rel->uniq = NULL;
2516 GCT_debug (ch->t, GNUNET_ERROR_TYPE_ERROR);
2517 GNUNET_free (chq);
2518 chq = NULL;
2519 return;
2520 }
2521 break;
2522
2523
2524 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
2525 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED:
2526 fire_and_forget (message, ch, GNUNET_YES);
2527 break;
2528
2529
2530 default:
2531 GNUNET_break (0);
2532 LOG (GNUNET_ERROR_TYPE_WARNING, "type %s unknown!\n", GC_m2s (type));
2533 fire_and_forget (message, ch, GNUNET_YES);
2534 } 2019 }
2020 if (NULL != ch->dest)
2021 {
2022 LOG2 (level,
2023 "CHN destination %s ready %s local-id: %u\n",
2024 GSC_2s (ch->dest->c),
2025 ch->dest->client_ready ? "YES" : "NO",
2026 ntohl (ch->dest->ccn.channel_of_client));
2027 }
2028 LOG2 (level,
2029 "CHN Message IDs recv: %d (%LLX), send: %d\n",
2030 ntohl (ch->mid_recv.mid),
2031 (unsigned long long) ch->mid_futures,
2032 ntohl (ch->mid_send.mid));
2535} 2033}
2536 2034
2537 2035
2538/**
2539 * Get the static string for identification of the channel.
2540 *
2541 * @param ch Channel.
2542 *
2543 * @return Static string with the channel IDs.
2544 */
2545const char *
2546GCCH_2s (const struct CadetChannel *ch)
2547{
2548 static char buf[64];
2549
2550 if (NULL == ch)
2551 return "(NULL Channel)";
2552 2036
2553 SPRINTF (buf, 2037/* end of gnunet-service-cadet-new_channel.c */
2554 "%s:%s gid:%X (%X / %X)",
2555 GCT_2s (ch->t),
2556 GNUNET_h2s (&ch->port),
2557 ntohl (ch->gid.cn),
2558 ntohl (ch->lid_root.channel_of_client),
2559 ntohl (ch->lid_dest.channel_of_client));
2560
2561 return buf;
2562}