diff options
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_channel.c')
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_channel.c | 1054 |
1 files changed, 795 insertions, 259 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 5acd098b6..0c6241817 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c | |||
@@ -18,7 +18,6 @@ | |||
18 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, | 18 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, |
19 | Boston, MA 02110-1301, USA. | 19 | Boston, MA 02110-1301, USA. |
20 | */ | 20 | */ |
21 | |||
22 | /** | 21 | /** |
23 | * @file cadet/gnunet-service-cadet-new_channel.c | 22 | * @file cadet/gnunet-service-cadet-new_channel.c |
24 | * @brief logical links between CADET clients | 23 | * @brief logical links between CADET clients |
@@ -26,10 +25,16 @@ | |||
26 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
27 | * | 26 | * |
28 | * TODO: | 27 | * TODO: |
29 | * - estimate max bandwidth using bursts and use to optimize | 28 | * - FIXME: send ACKs back to loopback clients! |
30 | * transmission rate(s) | 29 | * |
30 | * - introduce shutdown so we can have half-closed channels, modify | ||
31 | * destroy to include MID to have FIN-ACK equivalents, etc. | ||
32 | * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! | ||
33 | * - check that '0xFFULL' really is sufficient for flow control! | ||
34 | * - revisit handling of 'unreliable' traffic! | ||
35 | * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'. | ||
36 | * - figure out flow control without ACKs (unreliable traffic!) | ||
31 | */ | 37 | */ |
32 | |||
33 | #include "platform.h" | 38 | #include "platform.h" |
34 | #include "gnunet_util_lib.h" | 39 | #include "gnunet_util_lib.h" |
35 | #include "cadet.h" | 40 | #include "cadet.h" |
@@ -41,7 +46,7 @@ | |||
41 | #include "gnunet-service-cadet-new_peer.h" | 46 | #include "gnunet-service-cadet-new_peer.h" |
42 | #include "gnunet-service-cadet-new_paths.h" | 47 | #include "gnunet-service-cadet-new_paths.h" |
43 | 48 | ||
44 | #define LOG(level, ...) GNUNET_log (level,__VA_ARGS__) | 49 | #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__) |
45 | 50 | ||
46 | /** | 51 | /** |
47 | * How long do we initially wait before retransmitting? | 52 | * How long do we initially wait before retransmitting? |
@@ -68,7 +73,7 @@ enum CadetChannelState | |||
68 | /** | 73 | /** |
69 | * Connection create message sent, waiting for ACK. | 74 | * Connection create message sent, waiting for ACK. |
70 | */ | 75 | */ |
71 | CADET_CHANNEL_CREATE_SENT, | 76 | CADET_CHANNEL_OPEN_SENT, |
72 | 77 | ||
73 | /** | 78 | /** |
74 | * Connection confirmed, ready to carry traffic. | 79 | * Connection confirmed, ready to carry traffic. |
@@ -121,9 +126,8 @@ struct CadetReliableMessage | |||
121 | /** | 126 | /** |
122 | * Data message we are trying to send. | 127 | * Data message we are trying to send. |
123 | */ | 128 | */ |
124 | struct GNUNET_CADET_ChannelAppDataMessage data_message; | 129 | struct GNUNET_CADET_ChannelAppDataMessage *data_message; |
125 | 130 | ||
126 | /* followed by variable-size payload */ | ||
127 | }; | 131 | }; |
128 | 132 | ||
129 | 133 | ||
@@ -143,7 +147,8 @@ struct CadetOutOfOrderMessage | |||
143 | struct CadetOutOfOrderMessage *prev; | 147 | struct CadetOutOfOrderMessage *prev; |
144 | 148 | ||
145 | /** | 149 | /** |
146 | * ID of the message (ACK needed to free) | 150 | * ID of the message (messages up to this point needed |
151 | * before we give this one to the client). | ||
147 | */ | 152 | */ |
148 | struct ChannelMessageIdentifier mid; | 153 | struct ChannelMessageIdentifier mid; |
149 | 154 | ||
@@ -156,6 +161,46 @@ struct CadetOutOfOrderMessage | |||
156 | 161 | ||
157 | 162 | ||
158 | /** | 163 | /** |
164 | * Client endpoint of a `struct CadetChannel`. A channel may be a | ||
165 | * loopback channel, in which case it has two of these endpoints. | ||
166 | * Note that flow control also is required in both directions. | ||
167 | */ | ||
168 | struct CadetChannelClient | ||
169 | { | ||
170 | /** | ||
171 | * Client handle. Not by itself sufficient to designate | ||
172 | * the client endpoint, as the same client handle may | ||
173 | * be used for both the owner and the destination, and | ||
174 | * we thus also need the channel ID to identify the client. | ||
175 | */ | ||
176 | struct CadetClient *c; | ||
177 | |||
178 | /** | ||
179 | * Head of DLL of messages received out of order or while client was unready. | ||
180 | */ | ||
181 | struct CadetOutOfOrderMessage *head_recv; | ||
182 | |||
183 | /** | ||
184 | * Tail DLL of messages received out of order or while client was unready. | ||
185 | */ | ||
186 | struct CadetOutOfOrderMessage *tail_recv; | ||
187 | |||
188 | /** | ||
189 | * Local tunnel number for this client. | ||
190 | * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI, | ||
191 | * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) | ||
192 | */ | ||
193 | struct GNUNET_CADET_ClientChannelNumber ccn; | ||
194 | |||
195 | /** | ||
196 | * Can we send data to the client? | ||
197 | */ | ||
198 | int client_ready; | ||
199 | |||
200 | }; | ||
201 | |||
202 | |||
203 | /** | ||
159 | * Struct containing all information regarding a channel to a remote client. | 204 | * Struct containing all information regarding a channel to a remote client. |
160 | */ | 205 | */ |
161 | struct CadetChannel | 206 | struct CadetChannel |
@@ -166,24 +211,24 @@ struct CadetChannel | |||
166 | struct CadetTunnel *t; | 211 | struct CadetTunnel *t; |
167 | 212 | ||
168 | /** | 213 | /** |
169 | * Last entry in the tunnel's queue relating to control messages | ||
170 | * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or | ||
171 | * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel | ||
172 | * transmission in case we receive updated information. | ||
173 | */ | ||
174 | struct CadetTunnelQueueEntry *last_control_qe; | ||
175 | |||
176 | /** | ||
177 | * Client owner of the tunnel, if any. | 214 | * Client owner of the tunnel, if any. |
178 | * (Used if this channel represends the initiating end of the tunnel.) | 215 | * (Used if this channel represends the initiating end of the tunnel.) |
179 | */ | 216 | */ |
180 | struct CadetClient *owner; | 217 | struct CadetChannelClient *owner; |
181 | 218 | ||
182 | /** | 219 | /** |
183 | * Client destination of the tunnel, if any. | 220 | * Client destination of the tunnel, if any. |
184 | * (Used if this channel represents the listening end of the tunnel.) | 221 | * (Used if this channel represents the listening end of the tunnel.) |
185 | */ | 222 | */ |
186 | struct CadetClient *dest; | 223 | struct CadetChannelClient *dest; |
224 | |||
225 | /** | ||
226 | * Last entry in the tunnel's queue relating to control messages | ||
227 | * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or | ||
228 | * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel | ||
229 | * transmission in case we receive updated information. | ||
230 | */ | ||
231 | struct CadetTunnelQueueEntry *last_control_qe; | ||
187 | 232 | ||
188 | /** | 233 | /** |
189 | * Head of DLL of messages sent and not yet ACK'd. | 234 | * Head of DLL of messages sent and not yet ACK'd. |
@@ -196,19 +241,14 @@ struct CadetChannel | |||
196 | struct CadetReliableMessage *tail_sent; | 241 | struct CadetReliableMessage *tail_sent; |
197 | 242 | ||
198 | /** | 243 | /** |
199 | * Head of DLL of messages received out of order or while client was unready. | 244 | * Task to resend/poll in case no ACK is received. |
200 | */ | 245 | */ |
201 | struct CadetOutOfOrderMessage *head_recv; | 246 | struct GNUNET_SCHEDULER_Task *retry_control_task; |
202 | |||
203 | /** | ||
204 | * Tail DLL of messages received out of order or while client was unready. | ||
205 | */ | ||
206 | struct CadetOutOfOrderMessage *tail_recv; | ||
207 | 247 | ||
208 | /** | 248 | /** |
209 | * Task to resend/poll in case no ACK is received. | 249 | * Task to resend/poll in case no ACK is received. |
210 | */ | 250 | */ |
211 | struct GNUNET_SCHEDULER_Task *retry_task; | 251 | struct GNUNET_SCHEDULER_Task *retry_data_task; |
212 | 252 | ||
213 | /** | 253 | /** |
214 | * Last time the channel was used | 254 | * Last time the channel was used |
@@ -259,13 +299,7 @@ struct CadetChannel | |||
259 | /** | 299 | /** |
260 | * Number identifying this channel in its tunnel. | 300 | * Number identifying this channel in its tunnel. |
261 | */ | 301 | */ |
262 | struct GNUNET_CADET_ChannelTunnelNumber gid; | 302 | struct GNUNET_CADET_ChannelTunnelNumber ctn; |
263 | |||
264 | /** | ||
265 | * Local tunnel number for local client owning the channel. | ||
266 | * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 ) | ||
267 | */ | ||
268 | struct GNUNET_CADET_ClientChannelNumber lid; | ||
269 | 303 | ||
270 | /** | 304 | /** |
271 | * Channel state. | 305 | * Channel state. |
@@ -273,16 +307,6 @@ struct CadetChannel | |||
273 | enum CadetChannelState state; | 307 | enum CadetChannelState state; |
274 | 308 | ||
275 | /** | 309 | /** |
276 | * Can we send data to the client? | ||
277 | */ | ||
278 | int client_ready; | ||
279 | |||
280 | /** | ||
281 | * Can the client send data to us? | ||
282 | */ | ||
283 | int client_allowed; | ||
284 | |||
285 | /** | ||
286 | * Is the tunnel bufferless (minimum latency)? | 310 | * Is the tunnel bufferless (minimum latency)? |
287 | */ | 311 | */ |
288 | int nobuffer; | 312 | int nobuffer; |
@@ -298,6 +322,11 @@ struct CadetChannel | |||
298 | int out_of_order; | 322 | int out_of_order; |
299 | 323 | ||
300 | /** | 324 | /** |
325 | * Is this channel a loopback channel, where the destination is us again? | ||
326 | */ | ||
327 | int is_loopback; | ||
328 | |||
329 | /** | ||
301 | * Flag to signal the destruction of the channel. If this is set to | 330 | * Flag to signal the destruction of the channel. If this is set to |
302 | * #GNUNET_YES the channel will be destroyed once the queue is | 331 | * #GNUNET_YES the channel will be destroyed once the queue is |
303 | * empty. | 332 | * empty. |
@@ -307,7 +336,6 @@ struct CadetChannel | |||
307 | }; | 336 | }; |
308 | 337 | ||
309 | 338 | ||
310 | |||
311 | /** | 339 | /** |
312 | * Get the static string for identification of the channel. | 340 | * Get the static string for identification of the channel. |
313 | * | 341 | * |
@@ -320,15 +348,16 @@ GCCH_2s (const struct CadetChannel *ch) | |||
320 | { | 348 | { |
321 | static char buf[128]; | 349 | static char buf[128]; |
322 | 350 | ||
323 | if (NULL == ch) | ||
324 | return "(NULL Channel)"; | ||
325 | GNUNET_snprintf (buf, | 351 | GNUNET_snprintf (buf, |
326 | sizeof (buf), | 352 | sizeof (buf), |
327 | "%s:%s gid:%X (%X)", | 353 | "Channel %s:%s ctn:%X(%X/%X)", |
328 | GCT_2s (ch->t), | 354 | (GNUNET_YES == ch->is_loopback) |
355 | ? "loopback" | ||
356 | : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), | ||
329 | GNUNET_h2s (&ch->port), | 357 | GNUNET_h2s (&ch->port), |
330 | ch->gid, | 358 | ch->ctn, |
331 | ntohl (ch->lid.channel_of_client)); | 359 | (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client), |
360 | (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client)); | ||
332 | return buf; | 361 | return buf; |
333 | } | 362 | } |
334 | 363 | ||
@@ -343,7 +372,29 @@ GCCH_2s (const struct CadetChannel *ch) | |||
343 | struct GNUNET_CADET_ChannelTunnelNumber | 372 | struct GNUNET_CADET_ChannelTunnelNumber |
344 | GCCH_get_id (const struct CadetChannel *ch) | 373 | GCCH_get_id (const struct CadetChannel *ch) |
345 | { | 374 | { |
346 | return ch->gid; | 375 | return ch->ctn; |
376 | } | ||
377 | |||
378 | |||
379 | /** | ||
380 | * Release memory associated with @a ccc | ||
381 | * | ||
382 | * @param ccc data structure to clean up | ||
383 | */ | ||
384 | static void | ||
385 | free_channel_client (struct CadetChannelClient *ccc) | ||
386 | { | ||
387 | struct CadetOutOfOrderMessage *com; | ||
388 | |||
389 | while (NULL != (com = ccc->head_recv)) | ||
390 | { | ||
391 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, | ||
392 | ccc->tail_recv, | ||
393 | com); | ||
394 | GNUNET_MQ_discard (com->env); | ||
395 | GNUNET_free (com); | ||
396 | } | ||
397 | GNUNET_free (ccc); | ||
347 | } | 398 | } |
348 | 399 | ||
349 | 400 | ||
@@ -356,7 +407,6 @@ static void | |||
356 | channel_destroy (struct CadetChannel *ch) | 407 | channel_destroy (struct CadetChannel *ch) |
357 | { | 408 | { |
358 | struct CadetReliableMessage *crm; | 409 | struct CadetReliableMessage *crm; |
359 | struct CadetOutOfOrderMessage *com; | ||
360 | 410 | ||
361 | while (NULL != (crm = ch->head_sent)) | 411 | while (NULL != (crm = ch->head_sent)) |
362 | { | 412 | { |
@@ -369,29 +419,41 @@ channel_destroy (struct CadetChannel *ch) | |||
369 | GNUNET_CONTAINER_DLL_remove (ch->head_sent, | 419 | GNUNET_CONTAINER_DLL_remove (ch->head_sent, |
370 | ch->tail_sent, | 420 | ch->tail_sent, |
371 | crm); | 421 | crm); |
422 | GNUNET_free (crm->data_message); | ||
372 | GNUNET_free (crm); | 423 | GNUNET_free (crm); |
373 | } | 424 | } |
374 | while (NULL != (com = ch->head_recv)) | 425 | if (NULL != ch->owner) |
375 | { | 426 | { |
376 | GNUNET_CONTAINER_DLL_remove (ch->head_recv, | 427 | free_channel_client (ch->owner); |
377 | ch->tail_recv, | 428 | ch->owner = NULL; |
378 | com); | 429 | } |
379 | GNUNET_MQ_discard (com->env); | 430 | if (NULL != ch->dest) |
380 | GNUNET_free (com); | 431 | { |
432 | free_channel_client (ch->dest); | ||
433 | ch->dest = NULL; | ||
381 | } | 434 | } |
382 | if (NULL != ch->last_control_qe) | 435 | if (NULL != ch->last_control_qe) |
383 | { | 436 | { |
384 | GCT_send_cancel (ch->last_control_qe); | 437 | GCT_send_cancel (ch->last_control_qe); |
385 | ch->last_control_qe = NULL; | 438 | ch->last_control_qe = NULL; |
386 | } | 439 | } |
387 | if (NULL != ch->retry_task) | 440 | if (NULL != ch->retry_data_task) |
441 | { | ||
442 | GNUNET_SCHEDULER_cancel (ch->retry_data_task); | ||
443 | ch->retry_data_task = NULL; | ||
444 | } | ||
445 | if (NULL != ch->retry_control_task) | ||
388 | { | 446 | { |
389 | GNUNET_SCHEDULER_cancel (ch->retry_task); | 447 | GNUNET_SCHEDULER_cancel (ch->retry_control_task); |
390 | ch->retry_task = NULL; | 448 | ch->retry_control_task = NULL; |
449 | } | ||
450 | if (GNUNET_NO == ch->is_loopback) | ||
451 | { | ||
452 | GCT_remove_channel (ch->t, | ||
453 | ch, | ||
454 | ch->ctn); | ||
455 | ch->t = NULL; | ||
391 | } | 456 | } |
392 | GCT_remove_channel (ch->t, | ||
393 | ch, | ||
394 | ch->gid); | ||
395 | GNUNET_free (ch); | 457 | GNUNET_free (ch); |
396 | } | 458 | } |
397 | 459 | ||
@@ -402,7 +464,7 @@ channel_destroy (struct CadetChannel *ch) | |||
402 | * @param cls Channel for which to send. | 464 | * @param cls Channel for which to send. |
403 | */ | 465 | */ |
404 | static void | 466 | static void |
405 | send_create (void *cls); | 467 | send_channel_open (void *cls); |
406 | 468 | ||
407 | 469 | ||
408 | /** | 470 | /** |
@@ -412,30 +474,41 @@ send_create (void *cls); | |||
412 | * @param cls our `struct CadetChannel`. | 474 | * @param cls our `struct CadetChannel`. |
413 | */ | 475 | */ |
414 | static void | 476 | static void |
415 | create_sent_cb (void *cls) | 477 | channel_open_sent_cb (void *cls) |
416 | { | 478 | { |
417 | struct CadetChannel *ch = cls; | 479 | struct CadetChannel *ch = cls; |
418 | 480 | ||
481 | GNUNET_assert (NULL != ch->last_control_qe); | ||
419 | ch->last_control_qe = NULL; | 482 | ch->last_control_qe = NULL; |
420 | ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); | 483 | ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); |
421 | ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time, | 484 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
422 | &send_create, | 485 | "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n", |
423 | ch); | 486 | GCCH_2s (ch), |
487 | GNUNET_STRINGS_relative_time_to_string (ch->retry_time, | ||
488 | GNUNET_YES)); | ||
489 | ch->retry_control_task | ||
490 | = GNUNET_SCHEDULER_add_delayed (ch->retry_time, | ||
491 | &send_channel_open, | ||
492 | ch); | ||
424 | } | 493 | } |
425 | 494 | ||
426 | 495 | ||
427 | /** | 496 | /** |
428 | * Send a channel create message. | 497 | * Send a channel open message. |
429 | * | 498 | * |
430 | * @param cls Channel for which to send. | 499 | * @param cls Channel for which to send. |
431 | */ | 500 | */ |
432 | static void | 501 | static void |
433 | send_create (void *cls) | 502 | send_channel_open (void *cls) |
434 | { | 503 | { |
435 | struct CadetChannel *ch = cls; | 504 | struct CadetChannel *ch = cls; |
436 | struct GNUNET_CADET_ChannelOpenMessage msgcc; | 505 | struct GNUNET_CADET_ChannelOpenMessage msgcc; |
437 | uint32_t options; | 506 | uint32_t options; |
438 | 507 | ||
508 | ch->retry_control_task = NULL; | ||
509 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
510 | "Sending CHANNEL_OPEN message for %s\n", | ||
511 | GCCH_2s (ch)); | ||
439 | options = 0; | 512 | options = 0; |
440 | if (ch->nobuffer) | 513 | if (ch->nobuffer) |
441 | options |= GNUNET_CADET_OPTION_NOBUFFER; | 514 | options |= GNUNET_CADET_OPTION_NOBUFFER; |
@@ -447,20 +520,43 @@ send_create (void *cls) | |||
447 | msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN); | 520 | msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN); |
448 | msgcc.opt = htonl (options); | 521 | msgcc.opt = htonl (options); |
449 | msgcc.port = ch->port; | 522 | msgcc.port = ch->port; |
450 | msgcc.chid = ch->gid; | 523 | msgcc.ctn = ch->ctn; |
451 | ch->state = CADET_CHANNEL_CREATE_SENT; | 524 | ch->state = CADET_CHANNEL_OPEN_SENT; |
452 | ch->last_control_qe = GCT_send (ch->t, | 525 | ch->last_control_qe = GCT_send (ch->t, |
453 | &msgcc.header, | 526 | &msgcc.header, |
454 | &create_sent_cb, | 527 | &channel_open_sent_cb, |
455 | ch); | 528 | ch); |
456 | } | 529 | } |
457 | 530 | ||
458 | 531 | ||
459 | /** | 532 | /** |
533 | * Function called once and only once after a channel was bound | ||
534 | * to its tunnel via #GCT_add_channel() is ready for transmission. | ||
535 | * Note that this is only the case for channels that this peer | ||
536 | * initiates, as for incoming channels we assume that they are | ||
537 | * ready for transmission immediately upon receiving the open | ||
538 | * message. Used to bootstrap the #GCT_send() process. | ||
539 | * | ||
540 | * @param ch the channel for which the tunnel is now ready | ||
541 | */ | ||
542 | void | ||
543 | GCCH_tunnel_up (struct CadetChannel *ch) | ||
544 | { | ||
545 | GNUNET_assert (NULL == ch->retry_control_task); | ||
546 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
547 | "Tunnel up, sending CHANNEL_OPEN on %s now\n", | ||
548 | GCCH_2s (ch)); | ||
549 | ch->retry_control_task | ||
550 | = GNUNET_SCHEDULER_add_now (&send_channel_open, | ||
551 | ch); | ||
552 | } | ||
553 | |||
554 | |||
555 | /** | ||
460 | * Create a new channel. | 556 | * Create a new channel. |
461 | * | 557 | * |
462 | * @param owner local client owning the channel | 558 | * @param owner local client owning the channel |
463 | * @param owner_id local chid of this channel at the @a owner | 559 | * @param ccn local number of this channel at the @a owner |
464 | * @param destination peer to which we should build the channel | 560 | * @param destination peer to which we should build the channel |
465 | * @param port desired port at @a destination | 561 | * @param port desired port at @a destination |
466 | * @param options options for the channel | 562 | * @param options options for the channel |
@@ -468,33 +564,74 @@ send_create (void *cls) | |||
468 | */ | 564 | */ |
469 | struct CadetChannel * | 565 | struct CadetChannel * |
470 | GCCH_channel_local_new (struct CadetClient *owner, | 566 | GCCH_channel_local_new (struct CadetClient *owner, |
471 | struct GNUNET_CADET_ClientChannelNumber owner_id, | 567 | struct GNUNET_CADET_ClientChannelNumber ccn, |
472 | struct CadetPeer *destination, | 568 | struct CadetPeer *destination, |
473 | const struct GNUNET_HashCode *port, | 569 | const struct GNUNET_HashCode *port, |
474 | uint32_t options) | 570 | uint32_t options) |
475 | { | 571 | { |
476 | struct CadetChannel *ch; | 572 | struct CadetChannel *ch; |
573 | struct CadetChannelClient *ccco; | ||
574 | |||
575 | ccco = GNUNET_new (struct CadetChannelClient); | ||
576 | ccco->c = owner; | ||
577 | ccco->ccn = ccn; | ||
578 | ccco->client_ready = GNUNET_YES; | ||
477 | 579 | ||
478 | ch = GNUNET_new (struct CadetChannel); | 580 | ch = GNUNET_new (struct CadetChannel); |
479 | ch->max_pending_messages = 32; /* FIXME: allow control via options | 581 | ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */ |
480 | or adjust dynamically... */ | ||
481 | ch->owner = owner; | ||
482 | ch->lid = owner_id; | ||
483 | ch->port = *port; | ||
484 | ch->t = GCP_get_tunnel (destination, | ||
485 | GNUNET_YES); | ||
486 | ch->gid = GCT_add_channel (ch->t, | ||
487 | ch); | ||
488 | ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; | ||
489 | ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); | 582 | ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); |
490 | ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); | 583 | ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); |
491 | ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); | 584 | ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); |
492 | ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create, | 585 | ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ |
493 | ch); | 586 | ch->owner = ccco; |
587 | ch->port = *port; | ||
588 | if (0 == memcmp (&my_full_id, | ||
589 | GCP_get_id (destination), | ||
590 | sizeof (struct GNUNET_PeerIdentity))) | ||
591 | { | ||
592 | struct CadetClient *c; | ||
593 | |||
594 | ch->is_loopback = GNUNET_YES; | ||
595 | c = GNUNET_CONTAINER_multihashmap_get (open_ports, | ||
596 | port); | ||
597 | if (NULL == c) | ||
598 | { | ||
599 | /* port closed, wait for it to possibly open */ | ||
600 | (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, | ||
601 | port, | ||
602 | ch, | ||
603 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
604 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
605 | "Created loose incoming loopback channel to port %s\n", | ||
606 | GNUNET_h2s (&ch->port)); | ||
607 | } | ||
608 | else | ||
609 | { | ||
610 | ch->dest = GNUNET_new (struct CadetChannelClient); | ||
611 | ch->dest->c = c; | ||
612 | ch->dest->client_ready = GNUNET_YES; | ||
613 | GCCH_bind (ch, | ||
614 | ch->dest->c); | ||
615 | } | ||
616 | } | ||
617 | else | ||
618 | { | ||
619 | ch->t = GCP_get_tunnel (destination, | ||
620 | GNUNET_YES); | ||
621 | ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; | ||
622 | ch->ctn = GCT_add_channel (ch->t, | ||
623 | ch); | ||
624 | } | ||
494 | GNUNET_STATISTICS_update (stats, | 625 | GNUNET_STATISTICS_update (stats, |
495 | "# channels", | 626 | "# channels", |
496 | 1, | 627 | 1, |
497 | GNUNET_NO); | 628 | GNUNET_NO); |
629 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
630 | "Created channel to port %s at peer %s for %s using %s\n", | ||
631 | GNUNET_h2s (port), | ||
632 | GCP_2s (destination), | ||
633 | GSC_2s (owner), | ||
634 | (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t)); | ||
498 | return ch; | 635 | return ch; |
499 | } | 636 | } |
500 | 637 | ||
@@ -510,23 +647,27 @@ timeout_closed_cb (void *cls) | |||
510 | { | 647 | { |
511 | struct CadetChannel *ch = cls; | 648 | struct CadetChannel *ch = cls; |
512 | 649 | ||
513 | ch->retry_task = NULL; | 650 | ch->retry_control_task = NULL; |
651 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
652 | "Closing incoming channel to port %s from peer %s due to timeout\n", | ||
653 | GNUNET_h2s (&ch->port), | ||
654 | GCP_2s (GCT_get_destination (ch->t))); | ||
514 | channel_destroy (ch); | 655 | channel_destroy (ch); |
515 | } | 656 | } |
516 | 657 | ||
517 | 658 | ||
518 | /** | 659 | /** |
519 | * Create a new channel. | 660 | * Create a new channel based on a request coming in over the network. |
520 | * | 661 | * |
521 | * @param t tunnel to the remote peer | 662 | * @param t tunnel to the remote peer |
522 | * @param gid identifier of this channel in the tunnel | 663 | * @param ctn identifier of this channel in the tunnel |
523 | * @param port desired local port | 664 | * @param port desired local port |
524 | * @param options options for the channel | 665 | * @param options options for the channel |
525 | * @return handle to the new channel | 666 | * @return handle to the new channel |
526 | */ | 667 | */ |
527 | struct CadetChannel * | 668 | struct CadetChannel * |
528 | GCCH_channel_incoming_new (struct CadetTunnel *t, | 669 | GCCH_channel_incoming_new (struct CadetTunnel *t, |
529 | struct GNUNET_CADET_ChannelTunnelNumber gid, | 670 | struct GNUNET_CADET_ChannelTunnelNumber ctn, |
530 | const struct GNUNET_HashCode *port, | 671 | const struct GNUNET_HashCode *port, |
531 | uint32_t options) | 672 | uint32_t options) |
532 | { | 673 | { |
@@ -534,15 +675,14 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, | |||
534 | struct CadetClient *c; | 675 | struct CadetClient *c; |
535 | 676 | ||
536 | ch = GNUNET_new (struct CadetChannel); | 677 | ch = GNUNET_new (struct CadetChannel); |
537 | ch->max_pending_messages = 32; /* FIXME: allow control via options | ||
538 | or adjust dynamically... */ | ||
539 | ch->port = *port; | 678 | ch->port = *port; |
540 | ch->t = t; | 679 | ch->t = t; |
541 | ch->gid = gid; | 680 | ch->ctn = ctn; |
542 | ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; | 681 | ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; |
543 | ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); | 682 | ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); |
544 | ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); | 683 | ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); |
545 | ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); | 684 | ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); |
685 | ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ | ||
546 | GNUNET_STATISTICS_update (stats, | 686 | GNUNET_STATISTICS_update (stats, |
547 | "# channels", | 687 | "# channels", |
548 | 1, | 688 | 1, |
@@ -557,9 +697,14 @@ GCCH_channel_incoming_new (struct CadetTunnel *t, | |||
557 | port, | 697 | port, |
558 | ch, | 698 | ch, |
559 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 699 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
560 | ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT, | 700 | ch->retry_control_task |
561 | &timeout_closed_cb, | 701 | = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT, |
562 | ch); | 702 | &timeout_closed_cb, |
703 | ch); | ||
704 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
705 | "Created loose incoming channel to port %s from peer %s\n", | ||
706 | GNUNET_h2s (&ch->port), | ||
707 | GCP_2s (GCT_get_destination (ch->t))); | ||
563 | } | 708 | } |
564 | else | 709 | else |
565 | { | 710 | { |
@@ -586,23 +731,24 @@ send_ack_cb (void *cls) | |||
586 | { | 731 | { |
587 | struct CadetChannel *ch = cls; | 732 | struct CadetChannel *ch = cls; |
588 | 733 | ||
734 | GNUNET_assert (NULL != ch->last_control_qe); | ||
589 | ch->last_control_qe = NULL; | 735 | ch->last_control_qe = NULL; |
590 | } | 736 | } |
591 | 737 | ||
592 | 738 | ||
593 | /** | 739 | /** |
594 | * Compute and send the current ACK to the other peer. | 740 | * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer. |
595 | * | 741 | * |
596 | * @param ch channel to send the ACK for | 742 | * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for |
597 | */ | 743 | */ |
598 | static void | 744 | static void |
599 | send_channel_ack (struct CadetChannel *ch) | 745 | send_channel_data_ack (struct CadetChannel *ch) |
600 | { | 746 | { |
601 | struct GNUNET_CADET_ChannelDataAckMessage msg; | 747 | struct GNUNET_CADET_ChannelDataAckMessage msg; |
602 | 748 | ||
603 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK); | 749 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK); |
604 | msg.header.size = htons (sizeof (msg)); | 750 | msg.header.size = htons (sizeof (msg)); |
605 | msg.gid = ch->gid; | 751 | msg.ctn = ch->ctn; |
606 | msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1); | 752 | msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1); |
607 | msg.futures = GNUNET_htonll (ch->mid_futures); | 753 | msg.futures = GNUNET_htonll (ch->mid_futures); |
608 | if (NULL != ch->last_control_qe) | 754 | if (NULL != ch->last_control_qe) |
@@ -615,18 +761,93 @@ send_channel_ack (struct CadetChannel *ch) | |||
615 | 761 | ||
616 | 762 | ||
617 | /** | 763 | /** |
618 | * Send our initial ACK to the client confirming that the | 764 | * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the |
619 | * connection is up. | 765 | * connection is up. |
620 | * | 766 | * |
621 | * @param cls the `struct CadetChannel` | 767 | * @param cls the `struct CadetChannel` |
622 | */ | 768 | */ |
623 | static void | 769 | static void |
624 | send_connect_ack (void *cls) | 770 | send_open_ack (void *cls) |
625 | { | 771 | { |
626 | struct CadetChannel *ch = cls; | 772 | struct CadetChannel *ch = cls; |
773 | struct GNUNET_CADET_ChannelManageMessage msg; | ||
627 | 774 | ||
628 | ch->retry_task = NULL; | 775 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
629 | send_channel_ack (ch); | 776 | "Sending CHANNEL_OPEN_ACK on %s\n", |
777 | GCCH_2s (ch)); | ||
778 | ch->retry_control_task = NULL; | ||
779 | msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK); | ||
780 | msg.header.size = htons (sizeof (msg)); | ||
781 | msg.reserved = htonl (0); | ||
782 | msg.ctn = ch->ctn; | ||
783 | if (NULL != ch->last_control_qe) | ||
784 | GCT_send_cancel (ch->last_control_qe); | ||
785 | ch->last_control_qe = GCT_send (ch->t, | ||
786 | &msg.header, | ||
787 | &send_ack_cb, | ||
788 | ch); | ||
789 | } | ||
790 | |||
791 | |||
792 | /** | ||
793 | * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for | ||
794 | * this channel. If the binding was successful, (re)transmit the | ||
795 | * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK. | ||
796 | * | ||
797 | * @param ch channel that got the duplicate open | ||
798 | */ | ||
799 | void | ||
800 | GCCH_handle_duplicate_open (struct CadetChannel *ch) | ||
801 | { | ||
802 | if (NULL == ch->dest) | ||
803 | { | ||
804 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
805 | "Ignoring duplicate channel OPEN on %s: port is closed\n", | ||
806 | GCCH_2s (ch)); | ||
807 | return; | ||
808 | } | ||
809 | if (NULL != ch->retry_control_task) | ||
810 | { | ||
811 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
812 | "Ignoring duplicate channel OPEN on %s: control message is pending\n", | ||
813 | GCCH_2s (ch)); | ||
814 | return; | ||
815 | } | ||
816 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
817 | "Retransmitting OPEN_ACK on %s\n", | ||
818 | GCCH_2s (ch)); | ||
819 | ch->retry_control_task | ||
820 | = GNUNET_SCHEDULER_add_now (&send_open_ack, | ||
821 | ch); | ||
822 | } | ||
823 | |||
824 | |||
825 | /** | ||
826 | * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages. | ||
827 | * | ||
828 | * @param ch channel the ack is for | ||
829 | * @param to_owner #GNUNET_YES to send to owner, | ||
830 | * #GNUNET_NO to send to dest | ||
831 | */ | ||
832 | static void | ||
833 | send_ack_to_client (struct CadetChannel *ch, | ||
834 | int to_owner) | ||
835 | { | ||
836 | struct GNUNET_MQ_Envelope *env; | ||
837 | struct GNUNET_CADET_LocalAck *ack; | ||
838 | struct CadetChannelClient *ccc; | ||
839 | |||
840 | env = GNUNET_MQ_msg (ack, | ||
841 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); | ||
842 | ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest; | ||
843 | ack->ccn = ccc->ccn; | ||
844 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
845 | "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n", | ||
846 | GSC_2s (ccc->c), | ||
847 | (GNUNET_YES == to_owner) ? "owner" : "dest", | ||
848 | ntohl (ack->ccn.channel_of_client)); | ||
849 | GSC_send_to_client (ccc->c, | ||
850 | env); | ||
630 | } | 851 | } |
631 | 852 | ||
632 | 853 | ||
@@ -643,12 +864,19 @@ GCCH_bind (struct CadetChannel *ch, | |||
643 | struct CadetClient *c) | 864 | struct CadetClient *c) |
644 | { | 865 | { |
645 | uint32_t options; | 866 | uint32_t options; |
867 | struct CadetChannelClient *cccd; | ||
646 | 868 | ||
647 | if (NULL != ch->retry_task) | 869 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
870 | "Binding %s from %s to port %s of %s\n", | ||
871 | GCCH_2s (ch), | ||
872 | GCT_2s (ch->t), | ||
873 | GNUNET_h2s (&ch->port), | ||
874 | GSC_2s (c)); | ||
875 | if (NULL != ch->retry_control_task) | ||
648 | { | 876 | { |
649 | /* there might be a timeout task here */ | 877 | /* there might be a timeout task here */ |
650 | GNUNET_SCHEDULER_cancel (ch->retry_task); | 878 | GNUNET_SCHEDULER_cancel (ch->retry_control_task); |
651 | ch->retry_task = NULL; | 879 | ch->retry_control_task = NULL; |
652 | } | 880 | } |
653 | options = 0; | 881 | options = 0; |
654 | if (ch->nobuffer) | 882 | if (ch->nobuffer) |
@@ -657,29 +885,79 @@ GCCH_bind (struct CadetChannel *ch, | |||
657 | options |= GNUNET_CADET_OPTION_RELIABLE; | 885 | options |= GNUNET_CADET_OPTION_RELIABLE; |
658 | if (ch->out_of_order) | 886 | if (ch->out_of_order) |
659 | options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; | 887 | options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; |
660 | ch->dest = c; | 888 | cccd = GNUNET_new (struct CadetChannelClient); |
661 | ch->lid = GSC_bind (c, | 889 | ch->dest = cccd; |
662 | ch, | 890 | cccd->c = c; |
663 | GCT_get_destination (ch->t), | 891 | cccd->client_ready = GNUNET_YES; |
664 | &ch->port, | 892 | cccd->ccn = GSC_bind (c, |
665 | options); | 893 | ch, |
666 | ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ | 894 | (GNUNET_YES == ch->is_loopback) |
667 | 895 | ? GCP_get (&my_full_id, | |
668 | /* notify other peer that we accepted the connection */ | 896 | GNUNET_YES) |
669 | ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack, | 897 | : GCT_get_destination (ch->t), |
670 | ch); | 898 | &ch->port, |
899 | options); | ||
900 | GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < | ||
901 | GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); | ||
902 | ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */ | ||
903 | if (GNUNET_YES == ch->is_loopback) | ||
904 | { | ||
905 | ch->state = CADET_CHANNEL_OPEN_SENT; | ||
906 | GCCH_handle_channel_open_ack (ch); | ||
907 | } | ||
908 | else | ||
909 | { | ||
910 | /* notify other peer that we accepted the connection */ | ||
911 | ch->retry_control_task | ||
912 | = GNUNET_SCHEDULER_add_now (&send_open_ack, | ||
913 | ch); | ||
914 | } | ||
915 | /* give client it's initial supply of ACKs */ | ||
916 | GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < | ||
917 | GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); | ||
918 | for (unsigned int i=0;i<ch->max_pending_messages;i++) | ||
919 | send_ack_to_client (ch, | ||
920 | GNUNET_NO); | ||
671 | } | 921 | } |
672 | 922 | ||
673 | 923 | ||
674 | /** | 924 | /** |
675 | * Destroy locally created channel. Called by the | 925 | * Destroy locally created channel. Called by the local client, so no |
676 | * local client, so no need to tell the client. | 926 | * need to tell the client. |
677 | * | 927 | * |
678 | * @param ch channel to destroy | 928 | * @param ch channel to destroy |
929 | * @param c client that caused the destruction | ||
930 | * @param ccn client number of the client @a c | ||
679 | */ | 931 | */ |
680 | void | 932 | void |
681 | GCCH_channel_local_destroy (struct CadetChannel *ch) | 933 | GCCH_channel_local_destroy (struct CadetChannel *ch, |
934 | struct CadetClient *c, | ||
935 | struct GNUNET_CADET_ClientChannelNumber ccn) | ||
682 | { | 936 | { |
937 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
938 | "%s asks for destruction of %s\n", | ||
939 | GSC_2s (c), | ||
940 | GCCH_2s (ch)); | ||
941 | GNUNET_assert (NULL != c); | ||
942 | if ( (NULL != ch->owner) && | ||
943 | (c == ch->owner->c) && | ||
944 | (ccn.channel_of_client == ch->owner->ccn.channel_of_client) ) | ||
945 | { | ||
946 | free_channel_client (ch->owner); | ||
947 | ch->owner = NULL; | ||
948 | } | ||
949 | else if ( (NULL != ch->dest) && | ||
950 | (c == ch->dest->c) && | ||
951 | (ccn.channel_of_client == ch->dest->ccn.channel_of_client) ) | ||
952 | { | ||
953 | free_channel_client (ch->dest); | ||
954 | ch->dest = NULL; | ||
955 | } | ||
956 | else | ||
957 | { | ||
958 | GNUNET_assert (0); | ||
959 | } | ||
960 | |||
683 | if (GNUNET_YES == ch->destroy) | 961 | if (GNUNET_YES == ch->destroy) |
684 | { | 962 | { |
685 | /* other end already destroyed, with the local client gone, no need | 963 | /* other end already destroyed, with the local client gone, no need |
@@ -687,41 +965,279 @@ GCCH_channel_local_destroy (struct CadetChannel *ch) | |||
687 | channel_destroy (ch); | 965 | channel_destroy (ch); |
688 | return; | 966 | return; |
689 | } | 967 | } |
690 | if (NULL != ch->head_sent) | 968 | if ( (NULL != ch->head_sent) || |
969 | (NULL != ch->owner) || | ||
970 | (NULL != ch->dest) ) | ||
691 | { | 971 | { |
692 | /* allow send queue to train first */ | 972 | /* Wait for other end to destroy us as well, |
973 | and otherwise allow send queue to be transmitted first */ | ||
693 | ch->destroy = GNUNET_YES; | 974 | ch->destroy = GNUNET_YES; |
694 | return; | 975 | return; |
695 | } | 976 | } |
977 | /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */ | ||
978 | if (CADET_CHANNEL_NEW != ch->state) | ||
979 | GCT_send_channel_destroy (ch->t, | ||
980 | ch->ctn); | ||
696 | /* Nothing left to do, just finish destruction */ | 981 | /* Nothing left to do, just finish destruction */ |
697 | channel_destroy (ch); | 982 | channel_destroy (ch); |
698 | } | 983 | } |
699 | 984 | ||
700 | 985 | ||
701 | /** | 986 | /** |
702 | * Destroy channel that was incoming. Called by the | 987 | * We got an acknowledgement for the creation of the channel |
703 | * local client, so no need to tell the client. | 988 | * (the port is open on the other side). Begin transmissions. |
704 | * | 989 | * |
705 | * @param ch channel to destroy | 990 | * @param ch channel to destroy |
706 | */ | 991 | */ |
707 | void | 992 | void |
708 | GCCH_channel_incoming_destroy (struct CadetChannel *ch) | 993 | GCCH_handle_channel_open_ack (struct CadetChannel *ch) |
709 | { | 994 | { |
710 | if (GNUNET_YES == ch->destroy) | 995 | switch (ch->state) |
711 | { | 996 | { |
712 | /* other end already destroyed, with the remote client gone, no need | 997 | case CADET_CHANNEL_NEW: |
713 | to finish transmissions, just destroy immediately. */ | 998 | /* this should be impossible */ |
714 | channel_destroy (ch); | 999 | GNUNET_break (0); |
1000 | break; | ||
1001 | case CADET_CHANNEL_OPEN_SENT: | ||
1002 | if (NULL == ch->owner) | ||
1003 | { | ||
1004 | /* We're not the owner, wrong direction! */ | ||
1005 | GNUNET_break_op (0); | ||
1006 | return; | ||
1007 | } | ||
1008 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1009 | "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n", | ||
1010 | GCCH_2s (ch)); | ||
1011 | if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */ | ||
1012 | { | ||
1013 | GNUNET_SCHEDULER_cancel (ch->retry_control_task); | ||
1014 | ch->retry_control_task = NULL; | ||
1015 | } | ||
1016 | ch->state = CADET_CHANNEL_READY; | ||
1017 | /* On first connect, send client as many ACKs as we allow messages | ||
1018 | to be buffered! */ | ||
1019 | for (unsigned int i=0;i<ch->max_pending_messages;i++) | ||
1020 | send_ack_to_client (ch, | ||
1021 | GNUNET_YES); | ||
1022 | break; | ||
1023 | case CADET_CHANNEL_READY: | ||
1024 | /* duplicate ACK, maybe we retried the CREATE. Ignore. */ | ||
1025 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1026 | "Received duplicate channel OPEN_ACK for %s\n", | ||
1027 | GCCH_2s (ch)); | ||
1028 | GNUNET_STATISTICS_update (stats, | ||
1029 | "# duplicate CREATE_ACKs", | ||
1030 | 1, | ||
1031 | GNUNET_NO); | ||
1032 | break; | ||
1033 | } | ||
1034 | } | ||
1035 | |||
1036 | |||
1037 | /** | ||
1038 | * Test if element @a e1 comes before element @a e2. | ||
1039 | * | ||
1040 | * TODO: use opportunity to create generic list insertion sort | ||
1041 | * logic in container! | ||
1042 | * | ||
1043 | * @param cls closure, our `struct CadetChannel` | ||
1044 | * @param e1 an element of to sort | ||
1045 | * @param e2 another element to sort | ||
1046 | * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO | ||
1047 | */ | ||
1048 | static int | ||
1049 | is_before (void *cls, | ||
1050 | void *e1, | ||
1051 | void *e2) | ||
1052 | { | ||
1053 | struct CadetOutOfOrderMessage *m1 = e1; | ||
1054 | struct CadetOutOfOrderMessage *m2 = e2; | ||
1055 | uint32_t v1 = ntohl (m1->mid.mid); | ||
1056 | uint32_t v2 = ntohl (m2->mid.mid); | ||
1057 | uint32_t delta; | ||
1058 | |||
1059 | delta = v1 - v2; | ||
1060 | if (delta > (uint32_t) INT_MAX) | ||
1061 | { | ||
1062 | /* in overflow range, we can safely assume we wrapped around */ | ||
1063 | return GNUNET_NO; | ||
1064 | } | ||
1065 | else | ||
1066 | { | ||
1067 | return GNUNET_YES; | ||
1068 | } | ||
1069 | } | ||
1070 | |||
1071 | |||
1072 | /** | ||
1073 | * We got payload data for a channel. Pass it on to the client | ||
1074 | * and send an ACK to the other end (once flow control allows it!) | ||
1075 | * | ||
1076 | * @param ch channel that got data | ||
1077 | * @param msg message that was received | ||
1078 | */ | ||
1079 | void | ||
1080 | GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, | ||
1081 | const struct GNUNET_CADET_ChannelAppDataMessage *msg) | ||
1082 | { | ||
1083 | struct GNUNET_MQ_Envelope *env; | ||
1084 | struct GNUNET_CADET_LocalData *ld; | ||
1085 | struct CadetChannelClient *ccc; | ||
1086 | struct CadetOutOfOrderMessage *com; | ||
1087 | size_t payload_size; | ||
1088 | |||
1089 | GNUNET_assert (GNUNET_NO == ch->is_loopback); | ||
1090 | if ( (GNUNET_YES == ch->destroy) && | ||
1091 | (NULL == ch->owner) && | ||
1092 | (NULL == ch->dest) ) | ||
1093 | { | ||
1094 | /* This client is gone, but we still have messages to send to | ||
1095 | the other end (which is why @a ch is not yet dead). However, | ||
1096 | we cannot pass messages to our client anymore. */ | ||
1097 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1098 | "Dropping incoming payload on %s as this end is already closed\n", | ||
1099 | GCCH_2s (ch)); | ||
1100 | /* FIXME: send back ACK/NACK/Closed notification | ||
1101 | to stop retransmissions! */ | ||
715 | return; | 1102 | return; |
716 | } | 1103 | } |
717 | if (NULL != ch->head_recv) | 1104 | payload_size = ntohs (msg->header.size) - sizeof (*msg); |
1105 | env = GNUNET_MQ_msg_extra (ld, | ||
1106 | payload_size, | ||
1107 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); | ||
1108 | ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn; | ||
1109 | GNUNET_memcpy (&ld[1], | ||
1110 | &msg[1], | ||
1111 | payload_size); | ||
1112 | ccc = (NULL != ch->owner) ? ch->owner : ch->dest; | ||
1113 | if ( (GNUNET_YES == ccc->client_ready) && | ||
1114 | ( (GNUNET_YES == ch->out_of_order) || | ||
1115 | (msg->mid.mid == ch->mid_recv.mid) ) ) | ||
718 | { | 1116 | { |
719 | /* allow local client to see all data first */ | 1117 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
720 | ch->destroy = GNUNET_YES; | 1118 | "Giving %u bytes of payload from %s to client %s\n", |
1119 | (unsigned int) payload_size, | ||
1120 | GCCH_2s (ch), | ||
1121 | GSC_2s (ccc->c)); | ||
1122 | ccc->client_ready = GNUNET_NO; | ||
1123 | GSC_send_to_client (ccc->c, | ||
1124 | env); | ||
1125 | ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); | ||
1126 | ch->mid_futures >>= 1; | ||
1127 | } | ||
1128 | else | ||
1129 | { | ||
1130 | /* FIXME-SECURITY: if the element is WAY too far ahead, | ||
1131 | drop it (can't buffer too much!) */ | ||
1132 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1133 | "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n", | ||
1134 | (GNUNET_YES == ccc->client_ready) | ||
1135 | ? "out-of-order" | ||
1136 | : "client-not-ready", | ||
1137 | (unsigned int) payload_size, | ||
1138 | GCCH_2s (ch), | ||
1139 | ntohl (msg->mid.mid), | ||
1140 | ntohl (ch->mid_recv.mid)); | ||
1141 | |||
1142 | com = GNUNET_new (struct CadetOutOfOrderMessage); | ||
1143 | com->mid = msg->mid; | ||
1144 | com->env = env; | ||
1145 | /* sort into list ordered by "is_before" */ | ||
1146 | if ( (NULL == ccc->head_recv) || | ||
1147 | (GNUNET_YES == is_before (ch, | ||
1148 | com, | ||
1149 | ccc->head_recv)) ) | ||
1150 | { | ||
1151 | GNUNET_CONTAINER_DLL_insert (ccc->head_recv, | ||
1152 | ccc->tail_recv, | ||
1153 | com); | ||
1154 | } | ||
1155 | else | ||
1156 | { | ||
1157 | struct CadetOutOfOrderMessage *pos; | ||
1158 | |||
1159 | for (pos = ccc->head_recv; | ||
1160 | NULL != pos; | ||
1161 | pos = pos->next) | ||
1162 | { | ||
1163 | if (GNUNET_YES != | ||
1164 | is_before (NULL, | ||
1165 | pos, | ||
1166 | com)) | ||
1167 | break; | ||
1168 | } | ||
1169 | if (NULL == pos) | ||
1170 | GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv, | ||
1171 | ccc->tail_recv, | ||
1172 | com); | ||
1173 | else | ||
1174 | GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv, | ||
1175 | ccc->tail_recv, | ||
1176 | com, | ||
1177 | pos->prev); | ||
1178 | } | ||
1179 | } | ||
1180 | } | ||
1181 | |||
1182 | |||
1183 | /** | ||
1184 | * We got an acknowledgement for payload data for a channel. | ||
1185 | * Possibly resume transmissions. | ||
1186 | * | ||
1187 | * @param ch channel that got the ack | ||
1188 | * @param ack details about what was received | ||
1189 | */ | ||
1190 | void | ||
1191 | GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, | ||
1192 | const struct GNUNET_CADET_ChannelDataAckMessage *ack) | ||
1193 | { | ||
1194 | struct CadetReliableMessage *crm; | ||
1195 | |||
1196 | GNUNET_break (GNUNET_NO == ch->is_loopback); | ||
1197 | if (GNUNET_NO == ch->reliable) | ||
1198 | { | ||
1199 | /* not expecting ACKs on unreliable channel, odd */ | ||
1200 | GNUNET_break_op (0); | ||
721 | return; | 1201 | return; |
722 | } | 1202 | } |
723 | /* Nothing left to do, just finish destruction */ | 1203 | for (crm = ch->head_sent; |
724 | channel_destroy (ch); | 1204 | NULL != crm; |
1205 | crm = crm->next) | ||
1206 | if (ack->mid.mid == crm->data_message->mid.mid) | ||
1207 | break; | ||
1208 | if (NULL == crm) | ||
1209 | { | ||
1210 | /* ACK for message we already dropped, might have been a | ||
1211 | duplicate ACK? Ignore. */ | ||
1212 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1213 | "Duplicate DATA_ACK on %s, ignoring\n", | ||
1214 | GCCH_2s (ch)); | ||
1215 | GNUNET_STATISTICS_update (stats, | ||
1216 | "# duplicate DATA_ACKs", | ||
1217 | 1, | ||
1218 | GNUNET_NO); | ||
1219 | return; | ||
1220 | } | ||
1221 | GNUNET_CONTAINER_DLL_remove (ch->head_sent, | ||
1222 | ch->tail_sent, | ||
1223 | crm); | ||
1224 | GNUNET_free (crm->data_message); | ||
1225 | GNUNET_free (crm); | ||
1226 | ch->pending_messages--; | ||
1227 | send_ack_to_client (ch, | ||
1228 | (NULL == ch->owner) | ||
1229 | ? GNUNET_NO | ||
1230 | : GNUNET_YES); | ||
1231 | GNUNET_assert (ch->pending_messages < ch->max_pending_messages); | ||
1232 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1233 | "Received DATA_ACK on %s for message %u (%u ACKs pending)\n", | ||
1234 | GCCH_2s (ch), | ||
1235 | (unsigned int) ntohl (ack->mid.mid), | ||
1236 | ch->pending_messages); | ||
1237 | send_ack_to_client (ch, | ||
1238 | (NULL == ch->owner) | ||
1239 | ? GNUNET_NO | ||
1240 | : GNUNET_YES); | ||
725 | } | 1241 | } |
726 | 1242 | ||
727 | 1243 | ||
@@ -730,19 +1246,36 @@ GCCH_channel_incoming_destroy (struct CadetChannel *ch) | |||
730 | * connection. Also needs to remove this channel from | 1246 | * connection. Also needs to remove this channel from |
731 | * the tunnel. | 1247 | * the tunnel. |
732 | * | 1248 | * |
733 | * FIXME: need to make it possible to defer destruction until we have | ||
734 | * received all messages up to the destroy, and right now the destroy | ||
735 | * message (and this API) fails to give is the information we need! | ||
736 | * | ||
737 | * FIXME: also need to know if the other peer got a destroy from | ||
738 | * us before! | ||
739 | * | ||
740 | * @param ch channel to destroy | 1249 | * @param ch channel to destroy |
741 | */ | 1250 | */ |
742 | void | 1251 | void |
743 | GCCH_channel_remote_destroy (struct CadetChannel *ch) | 1252 | GCCH_handle_remote_destroy (struct CadetChannel *ch) |
744 | { | 1253 | { |
745 | GNUNET_break (0); // FIXME! | 1254 | struct CadetChannelClient *ccc; |
1255 | |||
1256 | GNUNET_assert (GNUNET_NO == ch->is_loopback); | ||
1257 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1258 | "Received remote channel DESTROY for %s\n", | ||
1259 | GCCH_2s (ch)); | ||
1260 | if (GNUNET_YES == ch->destroy) | ||
1261 | { | ||
1262 | /* Local client already gone, this is instant-death. */ | ||
1263 | channel_destroy (ch); | ||
1264 | return; | ||
1265 | } | ||
1266 | ccc = (NULL != ch->owner) ? ch->owner : ch->dest; | ||
1267 | if (NULL != ccc->head_recv) | ||
1268 | { | ||
1269 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1270 | "Lost end of transmission due to remote shutdown on %s\n", | ||
1271 | GCCH_2s (ch)); | ||
1272 | /* FIXME: change API to notify client about truncated transmission! */ | ||
1273 | } | ||
1274 | ch->destroy = GNUNET_YES; | ||
1275 | GSC_handle_remote_channel_destroy (ccc->c, | ||
1276 | ccc->ccn, | ||
1277 | ch); | ||
1278 | channel_destroy (ch); | ||
746 | } | 1279 | } |
747 | 1280 | ||
748 | 1281 | ||
@@ -770,67 +1303,16 @@ retry_transmission (void *cls) | |||
770 | struct CadetChannel *ch = cls; | 1303 | struct CadetChannel *ch = cls; |
771 | struct CadetReliableMessage *crm = ch->head_sent; | 1304 | struct CadetReliableMessage *crm = ch->head_sent; |
772 | 1305 | ||
1306 | ch->retry_data_task = NULL; | ||
773 | GNUNET_assert (NULL == crm->qe); | 1307 | GNUNET_assert (NULL == crm->qe); |
774 | crm->qe = GCT_send (ch->t, | 1308 | crm->qe = GCT_send (ch->t, |
775 | &crm->data_message.header, | 1309 | &crm->data_message->header, |
776 | &data_sent_cb, | 1310 | &data_sent_cb, |
777 | crm); | 1311 | crm); |
778 | } | 1312 | } |
779 | 1313 | ||
780 | 1314 | ||
781 | /** | 1315 | /** |
782 | * Check if we can now allow the client to transmit, and if so, | ||
783 | * let the client know about it. | ||
784 | * | ||
785 | * @param ch channel to check | ||
786 | */ | ||
787 | static void | ||
788 | GCCH_check_allow_client (struct CadetChannel *ch) | ||
789 | { | ||
790 | struct GNUNET_MQ_Envelope *env; | ||
791 | struct GNUNET_CADET_LocalAck *msg; | ||
792 | |||
793 | if (GNUNET_YES == ch->client_allowed) | ||
794 | return; /* client already allowed! */ | ||
795 | if (CADET_CHANNEL_READY != ch->state) | ||
796 | { | ||
797 | /* destination did not yet ACK our CREATE! */ | ||
798 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
799 | "Channel %s not yet ready, throttling client until ACK.\n", | ||
800 | GCCH_2s (ch)); | ||
801 | return; | ||
802 | } | ||
803 | if (ch->pending_messages > ch->max_pending_messages) | ||
804 | { | ||
805 | /* Too many messages in queue. */ | ||
806 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
807 | "Message queue still too long on channel %s, throttling client until ACK.\n", | ||
808 | GCCH_2s (ch)); | ||
809 | return; | ||
810 | } | ||
811 | if ( (NULL != ch->head_sent) && | ||
812 | (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) ) | ||
813 | { | ||
814 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
815 | "Gap in ACKs too big on channel %s, throttling client until ACK.\n", | ||
816 | GCCH_2s (ch)); | ||
817 | return; | ||
818 | } | ||
819 | ch->client_allowed = GNUNET_YES; | ||
820 | |||
821 | |||
822 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
823 | "Sending local ack to channel %s client\n", | ||
824 | GCCH_2s (ch)); | ||
825 | env = GNUNET_MQ_msg (msg, | ||
826 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); | ||
827 | msg->channel_id = ch->lid; | ||
828 | GSC_send_to_client (ch->owner ? ch->owner : ch->dest, | ||
829 | env); | ||
830 | } | ||
831 | |||
832 | |||
833 | /** | ||
834 | * Function called once the tunnel has sent one of our messages. | 1316 | * Function called once the tunnel has sent one of our messages. |
835 | * If the message is unreliable, simply frees the `crm`. If the | 1317 | * If the message is unreliable, simply frees the `crm`. If the |
836 | * message was reliable, calculate retransmission time and | 1318 | * message was reliable, calculate retransmission time and |
@@ -845,6 +1327,7 @@ data_sent_cb (void *cls) | |||
845 | struct CadetChannel *ch = crm->ch; | 1327 | struct CadetChannel *ch = crm->ch; |
846 | struct CadetReliableMessage *off; | 1328 | struct CadetReliableMessage *off; |
847 | 1329 | ||
1330 | GNUNET_assert (GNUNET_NO == ch->is_loopback); | ||
848 | crm->qe = NULL; | 1331 | crm->qe = NULL; |
849 | GNUNET_CONTAINER_DLL_remove (ch->head_sent, | 1332 | GNUNET_CONTAINER_DLL_remove (ch->head_sent, |
850 | ch->tail_sent, | 1333 | ch->tail_sent, |
@@ -853,7 +1336,10 @@ data_sent_cb (void *cls) | |||
853 | { | 1336 | { |
854 | GNUNET_free (crm); | 1337 | GNUNET_free (crm); |
855 | ch->pending_messages--; | 1338 | ch->pending_messages--; |
856 | GCCH_check_allow_client (ch); | 1339 | send_ack_to_client (ch, |
1340 | (NULL == ch->owner) | ||
1341 | ? GNUNET_NO | ||
1342 | : GNUNET_YES); | ||
857 | return; | 1343 | return; |
858 | } | 1344 | } |
859 | if (0 == crm->retry_delay.rel_value_us) | 1345 | if (0 == crm->retry_delay.rel_value_us) |
@@ -868,11 +1354,12 @@ data_sent_cb (void *cls) | |||
868 | GNUNET_CONTAINER_DLL_insert (ch->head_sent, | 1354 | GNUNET_CONTAINER_DLL_insert (ch->head_sent, |
869 | ch->tail_sent, | 1355 | ch->tail_sent, |
870 | crm); | 1356 | crm); |
871 | if (NULL != ch->retry_task) | 1357 | if (NULL != ch->retry_data_task) |
872 | GNUNET_SCHEDULER_cancel (ch->retry_task); | 1358 | GNUNET_SCHEDULER_cancel (ch->retry_data_task); |
873 | ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay, | 1359 | ch->retry_data_task |
874 | &retry_transmission, | 1360 | = GNUNET_SCHEDULER_add_delayed (crm->retry_delay, |
875 | ch); | 1361 | &retry_transmission, |
1362 | ch); | ||
876 | return; | 1363 | return; |
877 | } | 1364 | } |
878 | for (off = ch->head_sent; NULL != off; off = off->next) | 1365 | for (off = ch->head_sent; NULL != off; off = off->next) |
@@ -904,82 +1391,142 @@ data_sent_cb (void *cls) | |||
904 | * buffer space in the tunnel. | 1391 | * buffer space in the tunnel. |
905 | * | 1392 | * |
906 | * @param ch Channel. | 1393 | * @param ch Channel. |
907 | * @param message payload to transmit. | 1394 | * @param sender_ccn ccn of the sender |
1395 | * @param buf payload to transmit. | ||
1396 | * @param buf_len number of bytes in @a buf | ||
908 | * @return #GNUNET_OK if everything goes well, | 1397 | * @return #GNUNET_OK if everything goes well, |
909 | * #GNUNET_SYSERR in case of an error. | 1398 | * #GNUNET_SYSERR in case of an error. |
910 | */ | 1399 | */ |
911 | int | 1400 | int |
912 | GCCH_handle_local_data (struct CadetChannel *ch, | 1401 | GCCH_handle_local_data (struct CadetChannel *ch, |
913 | const struct GNUNET_MessageHeader *message) | 1402 | struct GNUNET_CADET_ClientChannelNumber sender_ccn, |
1403 | const char *buf, | ||
1404 | size_t buf_len) | ||
914 | { | 1405 | { |
915 | uint16_t payload_size = ntohs (message->size); | ||
916 | struct CadetReliableMessage *crm; | 1406 | struct CadetReliableMessage *crm; |
917 | 1407 | ||
918 | if (GNUNET_NO == ch->client_allowed) | 1408 | if (ch->pending_messages > ch->max_pending_messages) |
919 | { | 1409 | { |
920 | GNUNET_break_op (0); | 1410 | GNUNET_break (0); |
921 | return GNUNET_SYSERR; | 1411 | return GNUNET_SYSERR; |
922 | } | 1412 | } |
923 | ch->client_allowed = GNUNET_NO; | ||
924 | ch->pending_messages++; | 1413 | ch->pending_messages++; |
925 | 1414 | ||
1415 | if (GNUNET_YES == ch->is_loopback) | ||
1416 | { | ||
1417 | struct CadetChannelClient *receiver; | ||
1418 | struct GNUNET_MQ_Envelope *env; | ||
1419 | struct GNUNET_CADET_LocalData *ld; | ||
1420 | int to_owner; | ||
1421 | |||
1422 | env = GNUNET_MQ_msg_extra (ld, | ||
1423 | buf_len, | ||
1424 | GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); | ||
1425 | if (sender_ccn.channel_of_client == | ||
1426 | ch->owner->ccn.channel_of_client) | ||
1427 | { | ||
1428 | receiver = ch->dest; | ||
1429 | to_owner = GNUNET_NO; | ||
1430 | } | ||
1431 | else | ||
1432 | { | ||
1433 | GNUNET_assert (sender_ccn.channel_of_client == | ||
1434 | ch->dest->ccn.channel_of_client); | ||
1435 | receiver = ch->owner; | ||
1436 | to_owner = GNUNET_YES; | ||
1437 | } | ||
1438 | ld->ccn = receiver->ccn; | ||
1439 | GNUNET_memcpy (&ld[1], | ||
1440 | buf, | ||
1441 | buf_len); | ||
1442 | /* FIXME: this does not provide for flow control! */ | ||
1443 | GSC_send_to_client (receiver->c, | ||
1444 | env); | ||
1445 | send_ack_to_client (ch, | ||
1446 | to_owner); | ||
1447 | return GNUNET_OK; | ||
1448 | } | ||
1449 | |||
926 | /* Everything is correct, send the message. */ | 1450 | /* Everything is correct, send the message. */ |
927 | crm = GNUNET_malloc (sizeof (*crm) + payload_size); | 1451 | crm = GNUNET_malloc (sizeof (*crm)); |
928 | crm->ch = ch; | 1452 | crm->ch = ch; |
929 | crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size); | 1453 | crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) |
930 | crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA); | 1454 | + buf_len); |
1455 | crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len); | ||
1456 | crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA); | ||
931 | ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1); | 1457 | ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1); |
932 | crm->data_message.mid = ch->mid_send; | 1458 | crm->data_message->mid = ch->mid_send; |
933 | crm->data_message.gid = ch->gid; | 1459 | crm->data_message->ctn = ch->ctn; |
934 | GNUNET_memcpy (&crm[1], | 1460 | GNUNET_memcpy (&crm->data_message[1], |
935 | message, | 1461 | buf, |
936 | payload_size); | 1462 | buf_len); |
937 | GNUNET_CONTAINER_DLL_insert (ch->head_sent, | 1463 | GNUNET_CONTAINER_DLL_insert (ch->head_sent, |
938 | ch->tail_sent, | 1464 | ch->tail_sent, |
939 | crm); | 1465 | crm); |
940 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1466 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
941 | "Sending %u bytes from local client to channel %s\n", | 1467 | "Sending %u bytes from local client to %s\n", |
942 | payload_size, | 1468 | buf_len, |
943 | GCCH_2s (ch)); | 1469 | GCCH_2s (ch)); |
944 | crm->qe = GCT_send (ch->t, | 1470 | crm->qe = GCT_send (ch->t, |
945 | &crm->data_message.header, | 1471 | &crm->data_message->header, |
946 | &data_sent_cb, | 1472 | &data_sent_cb, |
947 | crm); | 1473 | crm); |
948 | GCCH_check_allow_client (ch); | ||
949 | return GNUNET_OK; | 1474 | return GNUNET_OK; |
950 | } | 1475 | } |
951 | 1476 | ||
952 | 1477 | ||
953 | /** | 1478 | /** |
954 | * Try to deliver messages to the local client, if it is ready for more. | 1479 | * Handle ACK from client on local channel. Means the client is ready |
1480 | * for more data, see if we have any for it. | ||
955 | * | 1481 | * |
956 | * @param ch channel to process | 1482 | * @param ch channel to destroy |
1483 | * @param client_ccn ccn of the client sending the ack | ||
957 | */ | 1484 | */ |
958 | static void | 1485 | void |
959 | send_client_buffered_data (struct CadetChannel *ch) | 1486 | GCCH_handle_local_ack (struct CadetChannel *ch, |
1487 | struct GNUNET_CADET_ClientChannelNumber client_ccn) | ||
960 | { | 1488 | { |
1489 | struct CadetChannelClient *ccc; | ||
961 | struct CadetOutOfOrderMessage *com; | 1490 | struct CadetOutOfOrderMessage *com; |
962 | 1491 | ||
963 | if (GNUNET_NO == ch->client_ready) | 1492 | if ( (NULL != ch->owner) && |
964 | return; /* client not ready */ | 1493 | (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) ) |
965 | com = ch->head_recv; | 1494 | ccc = ch->owner; |
1495 | else if ( (NULL != ch->dest) && | ||
1496 | (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) ) | ||
1497 | ccc = ch->dest; | ||
1498 | else | ||
1499 | GNUNET_assert (0); | ||
1500 | ccc->client_ready = GNUNET_YES; | ||
1501 | com = ccc->head_recv; | ||
966 | if (NULL == com) | 1502 | if (NULL == com) |
1503 | { | ||
1504 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1505 | "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending)!\n", | ||
1506 | GSC_2s (ccc->c), | ||
1507 | ntohl (ccc->ccn.channel_of_client)); | ||
967 | return; /* none pending */ | 1508 | return; /* none pending */ |
1509 | } | ||
968 | if ( (com->mid.mid != ch->mid_recv.mid) && | 1510 | if ( (com->mid.mid != ch->mid_recv.mid) && |
969 | (GNUNET_NO == ch->out_of_order) ) | 1511 | (GNUNET_NO == ch->out_of_order) ) |
970 | return; /* missing next one in-order */ | 1512 | return; /* missing next one in-order */ |
971 | 1513 | ||
972 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1514 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
973 | "Passing payload message to client on channel %s\n", | 1515 | "Got LOCAL ACK, passing payload message to %s-%X on %s\n", |
1516 | GSC_2s (ccc->c), | ||
1517 | ntohl (ccc->ccn.channel_of_client), | ||
974 | GCCH_2s (ch)); | 1518 | GCCH_2s (ch)); |
975 | 1519 | ||
976 | /* all good, pass next message to client */ | 1520 | /* all good, pass next message to client */ |
977 | GNUNET_CONTAINER_DLL_remove (ch->head_recv, | 1521 | GNUNET_CONTAINER_DLL_remove (ccc->head_recv, |
978 | ch->tail_recv, | 1522 | ccc->tail_recv, |
979 | com); | 1523 | com); |
1524 | /* FIXME: if unreliable, this is not aggressive | ||
1525 | enough, as it would be OK to have lost some! */ | ||
980 | ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); | 1526 | ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); |
981 | ch->mid_futures >>= 1; /* equivalent to division by 2 */ | 1527 | ch->mid_futures >>= 1; /* equivalent to division by 2 */ |
982 | GSC_send_to_client (ch->owner ? ch->owner : ch->dest, | 1528 | ccc->client_ready = GNUNET_NO; |
1529 | GSC_send_to_client (ccc->c, | ||
983 | com->env); | 1530 | com->env); |
984 | GNUNET_free (com); | 1531 | GNUNET_free (com); |
985 | if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && | 1532 | if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && |
@@ -991,33 +1538,22 @@ send_client_buffered_data (struct CadetChannel *ch) | |||
991 | maximum of 64 bits, and 15 is getting too close for comfort.) | 1538 | maximum of 64 bits, and 15 is getting too close for comfort.) |
992 | So we should send one now. */ | 1539 | So we should send one now. */ |
993 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1540 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
994 | "Sender on channel %s likely blocked on flow-control, sending ACK now.\n", | 1541 | "Sender on %s likely blocked on flow-control, sending ACK now.\n", |
995 | GCCH_2s (ch)); | 1542 | GCCH_2s (ch)); |
996 | if (GNUNET_YES == ch->reliable) | 1543 | if (GNUNET_YES == ch->reliable) |
997 | send_channel_ack (ch); | 1544 | send_channel_data_ack (ch); |
998 | } | 1545 | } |
999 | 1546 | ||
1000 | if (NULL != ch->head_recv) | 1547 | if (NULL != ccc->head_recv) |
1001 | return; | 1548 | return; |
1002 | if (GNUNET_NO == ch->destroy) | 1549 | if (GNUNET_NO == ch->destroy) |
1003 | return; | 1550 | return; |
1551 | GCT_send_channel_destroy (ch->t, | ||
1552 | ch->ctn); | ||
1004 | channel_destroy (ch); | 1553 | channel_destroy (ch); |
1005 | } | 1554 | } |
1006 | 1555 | ||
1007 | 1556 | ||
1008 | /** | ||
1009 | * Handle ACK from client on local channel. | ||
1010 | * | ||
1011 | * @param ch channel to destroy | ||
1012 | */ | ||
1013 | void | ||
1014 | GCCH_handle_local_ack (struct CadetChannel *ch) | ||
1015 | { | ||
1016 | ch->client_ready = GNUNET_YES; | ||
1017 | send_client_buffered_data (ch); | ||
1018 | } | ||
1019 | |||
1020 | |||
1021 | #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) | 1557 | #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) |
1022 | 1558 | ||
1023 | 1559 | ||
@@ -1045,25 +1581,25 @@ GCCH_debug (struct CadetChannel *ch, | |||
1045 | return; | 1581 | return; |
1046 | } | 1582 | } |
1047 | LOG2 (level, | 1583 | LOG2 (level, |
1048 | "CHN Channel %s:%X (%p)\n", | 1584 | "CHN %s:%X (%p)\n", |
1049 | GCT_2s (ch->t), | 1585 | GCT_2s (ch->t), |
1050 | ch->gid, | 1586 | ch->ctn, |
1051 | ch); | 1587 | ch); |
1052 | if (NULL != ch->owner) | 1588 | if (NULL != ch->owner) |
1053 | { | 1589 | { |
1054 | LOG2 (level, | 1590 | LOG2 (level, |
1055 | "CHN origin %s ready %s local-id: %u\n", | 1591 | "CHN origin %s ready %s local-id: %u\n", |
1056 | GSC_2s (ch->owner), | 1592 | GSC_2s (ch->owner->c), |
1057 | ch->client_ready ? "YES" : "NO", | 1593 | ch->owner->client_ready ? "YES" : "NO", |
1058 | ntohl (ch->lid.channel_of_client)); | 1594 | ntohl (ch->owner->ccn.channel_of_client)); |
1059 | } | 1595 | } |
1060 | if (NULL != ch->dest) | 1596 | if (NULL != ch->dest) |
1061 | { | 1597 | { |
1062 | LOG2 (level, | 1598 | LOG2 (level, |
1063 | "CHN destination %s ready %s local-id: %u\n", | 1599 | "CHN destination %s ready %s local-id: %u\n", |
1064 | GSC_2s (ch->dest), | 1600 | GSC_2s (ch->dest->c), |
1065 | ch->client_ready ? "YES" : "NO", | 1601 | ch->dest->client_ready ? "YES" : "NO", |
1066 | ntohl (ch->lid.channel_of_client)); | 1602 | ntohl (ch->dest->ccn.channel_of_client)); |
1067 | } | 1603 | } |
1068 | LOG2 (level, | 1604 | LOG2 (level, |
1069 | "CHN Message IDs recv: %d (%LLX), send: %d\n", | 1605 | "CHN Message IDs recv: %d (%LLX), send: %d\n", |