aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-01-19 15:52:22 +0100
committerChristian Grothoff <christian@grothoff.org>2017-01-19 15:52:22 +0100
commitbc43d8978d8695ff97cc67b65c29769e9c7f8f0e (patch)
tree94d1428dc2b4e70d87d6521b487a92c6fc139827 /src
parentbd3147503e27ddefcb6ba0dcb99c2b32947622a4 (diff)
downloadgnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.tar.gz
gnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.zip
much work on connection/route/peer-level queue management
Diffstat (limited to 'src')
-rw-r--r--src/cadet/gnunet-service-cadet-new.h39
-rw-r--r--src/cadet/gnunet-service-cadet-new_connection.c361
-rw-r--r--src/cadet/gnunet-service-cadet-new_connection.h26
-rw-r--r--src/cadet/gnunet-service-cadet-new_core.c267
-rw-r--r--src/cadet/gnunet-service-cadet-new_paths.c29
-rw-r--r--src/cadet/gnunet-service-cadet-new_paths.h10
-rw-r--r--src/cadet/gnunet-service-cadet-new_peer.c172
-rw-r--r--src/cadet/gnunet-service-cadet-new_peer.h76
-rw-r--r--src/cadet/gnunet-service-cadet-new_tunnels.c66
9 files changed, 732 insertions, 314 deletions
diff --git a/src/cadet/gnunet-service-cadet-new.h b/src/cadet/gnunet-service-cadet-new.h
index 862a0f088..9f4667e23 100644
--- a/src/cadet/gnunet-service-cadet-new.h
+++ b/src/cadet/gnunet-service-cadet-new.h
@@ -105,7 +105,44 @@ struct CadetPeerPathEntry
105/** 105/**
106 * Entry in list of connections used by tunnel, with metadata. 106 * Entry in list of connections used by tunnel, with metadata.
107 */ 107 */
108struct CadetTConnection; 108struct CadetTConnection
109{
110 /**
111 * Next in DLL.
112 */
113 struct CadetTConnection *next;
114
115 /**
116 * Prev in DLL.
117 */
118 struct CadetTConnection *prev;
119
120 /**
121 * Connection handle.
122 */
123 struct CadetConnection *cc;
124
125 /**
126 * Tunnel this connection belongs to.
127 */
128 struct CadetTunnel *t;
129
130 /**
131 * Creation time, to keep oldest connection alive.
132 */
133 struct GNUNET_TIME_Absolute created;
134
135 /**
136 * Connection throughput, to keep fastest connection alive.
137 */
138 uint32_t throughput;
139
140 /**
141 * Is the connection currently ready for transmission?
142 */
143 int is_ready;
144};
145
109 146
110/** 147/**
111 * Active path through the network (used by a tunnel). There may 148 * Active path through the network (used by a tunnel). There may
diff --git a/src/cadet/gnunet-service-cadet-new_connection.c b/src/cadet/gnunet-service-cadet-new_connection.c
index 5123f9d45..bf88d78e1 100644
--- a/src/cadet/gnunet-service-cadet-new_connection.c
+++ b/src/cadet/gnunet-service-cadet-new_connection.c
@@ -27,11 +27,8 @@
27 * @author Christian Grothoff 27 * @author Christian Grothoff
28 * 28 *
29 * TODO: 29 * TODO:
30 * - congestion control
31 * - GCC_debug()
32 * - keepalive messages 30 * - keepalive messages
33 * - performance metrics 31 * - keep performance metrics (?)
34 * - back-off reset
35 */ 32 */
36#include "platform.h" 33#include "platform.h"
37#include "gnunet-service-cadet-new_channel.h" 34#include "gnunet-service-cadet-new_channel.h"
@@ -64,19 +61,16 @@ enum CadetConnectionState
64 CADET_CONNECTION_SENT, 61 CADET_CONNECTION_SENT,
65 62
66 /** 63 /**
67 * Connection confirmed, ready to carry traffic. 64 * We are an inbound connection, and received a CREATE. Need to
65 * send an CREATE_ACK back.
68 */ 66 */
69 CADET_CONNECTION_READY, 67 CADET_CONNECTION_CREATE_RECEIVED,
70 68
71 /** 69 /**
72 * Connection to be destroyed, just waiting to empty queues. 70 * Connection confirmed, ready to carry traffic.
73 */ 71 */
74 CADET_CONNECTION_DESTROYED, 72 CADET_CONNECTION_READY
75 73
76 /**
77 * Connection to be destroyed because of a distant peer, same as DESTROYED.
78 */
79 CADET_CONNECTION_BROKEN
80}; 74};
81 75
82 76
@@ -112,11 +106,6 @@ struct CadetConnection
112 struct GNUNET_MQ_Envelope *env; 106 struct GNUNET_MQ_Envelope *env;
113 107
114 /** 108 /**
115 * Message queue to the first hop, or NULL if we have no connection yet.
116 */
117 struct GNUNET_MQ_Handle *mq;
118
119 /**
120 * Handle for calling #GCP_request_mq_cancel() once we are finished. 109 * Handle for calling #GCP_request_mq_cancel() once we are finished.
121 */ 110 */
122 struct GCP_MessageQueueManager *mq_man; 111 struct GCP_MessageQueueManager *mq_man;
@@ -129,7 +118,7 @@ struct CadetConnection
129 /** 118 /**
130 * Function to call once we are ready to transmit. 119 * Function to call once we are ready to transmit.
131 */ 120 */
132 GNUNET_SCHEDULER_TaskCallback ready_cb; 121 GCC_ReadyCallback ready_cb;
133 122
134 /** 123 /**
135 * Closure for @e ready_cb. 124 * Closure for @e ready_cb.
@@ -151,22 +140,12 @@ struct CadetConnection
151 */ 140 */
152 unsigned int off; 141 unsigned int off;
153 142
154}; 143 /**
155 144 * Are we ready to transmit via @e mq_man right now?
145 */
146 int mqm_ready;
156 147
157/** 148};
158 * Is the given connection currently ready for transmission?
159 *
160 * @param cc connection to transmit on
161 * @return #GNUNET_YES if we could transmit
162 */
163int
164GCC_is_ready (struct CadetConnection *cc)
165{
166 return ( (NULL != cc->mq) &&
167 (CADET_CONNECTION_READY == cc->state) &&
168 (NULL == cc->env) ) ? GNUNET_YES : GNUNET_NO;
169}
170 149
171 150
172/** 151/**
@@ -177,29 +156,19 @@ GCC_is_ready (struct CadetConnection *cc)
177void 156void
178GCC_destroy (struct CadetConnection *cc) 157GCC_destroy (struct CadetConnection *cc)
179{ 158{
180 if (NULL != cc->env) 159 struct GNUNET_MQ_Envelope *env = NULL;
181 { 160
182 if (NULL != cc->mq) 161 if (CADET_CONNECTION_SENDING_CREATE != cc->state)
183 GNUNET_MQ_send_cancel (cc->env);
184 else
185 GNUNET_MQ_discard (cc->env);
186 cc->env = NULL;
187 }
188 if ( (NULL != cc->mq) &&
189 (CADET_CONNECTION_SENDING_CREATE != cc->state) )
190 { 162 {
191 /* Need to notify next hop that we are down. */
192 struct GNUNET_MQ_Envelope *env;
193 struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg; 163 struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg;
194 164
165 /* Need to notify next hop that we are down. */
195 env = GNUNET_MQ_msg (destroy_msg, 166 env = GNUNET_MQ_msg (destroy_msg,
196 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY); 167 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
197 destroy_msg->cid = cc->cid; 168 destroy_msg->cid = cc->cid;
198 GNUNET_MQ_send (cc->mq,
199 env);
200 } 169 }
201 cc->mq = NULL; 170 GCP_request_mq_cancel (cc->mq_man,
202 GCP_request_mq_cancel (cc->mq_man); 171 env);
203 cc->mq_man = NULL; 172 cc->mq_man = NULL;
204 GCPP_del_connection (cc->path, 173 GCPP_del_connection (cc->path,
205 cc->off, 174 cc->off,
@@ -234,14 +203,20 @@ GCC_get_ct (struct CadetConnection *cc)
234void 203void
235GCC_handle_connection_ack (struct CadetConnection *cc) 204GCC_handle_connection_ack (struct CadetConnection *cc)
236{ 205{
237 GNUNET_SCHEDULER_cancel (cc->task); 206 if (NULL != cc->task)
238#if FIXME 207 {
208 GNUNET_SCHEDULER_cancel (cc->task);
209 cc->task = NULL;
210 }
211#if FIXME_KEEPALIVE
239 cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period, 212 cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period,
240 &send_keepalive, 213 &send_keepalive,
241 cc); 214 cc);
242#endif 215#endif
243 cc->state = CADET_CONNECTION_READY; 216 cc->state = CADET_CONNECTION_READY;
244 cc->ready_cb (cc->ready_cb_cls); 217 if (GNUNET_YES == cc->mqm_ready)
218 cc->ready_cb (cc->ready_cb_cls,
219 GNUNET_YES);
245} 220}
246 221
247 222
@@ -255,6 +230,12 @@ void
255GCC_handle_kx (struct CadetConnection *cc, 230GCC_handle_kx (struct CadetConnection *cc,
256 const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg) 231 const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
257{ 232{
233 if (CADET_CONNECTION_SENT == cc->state)
234 {
235 /* We didn't get the CREATE_ACK, but instead got payload. That's fine,
236 clearly something is working, so pretend we got an ACK. */
237 GCC_handle_connection_ack (cc);
238 }
258 GCT_handle_kx (cc->ct, 239 GCT_handle_kx (cc->ct,
259 msg); 240 msg);
260} 241}
@@ -270,6 +251,12 @@ void
270GCC_handle_encrypted (struct CadetConnection *cc, 251GCC_handle_encrypted (struct CadetConnection *cc,
271 const struct GNUNET_CADET_TunnelEncryptedMessage *msg) 252 const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
272{ 253{
254 if (CADET_CONNECTION_SENT == cc->state)
255 {
256 /* We didn't get the CREATE_ACK, but instead got payload. That's fine,
257 clearly something is working, so pretend we got an ACK. */
258 GCC_handle_connection_ack (cc);
259 }
273 GCT_handle_encrypted (cc->ct, 260 GCT_handle_encrypted (cc->ct,
274 msg); 261 msg);
275} 262}
@@ -281,37 +268,40 @@ GCC_handle_encrypted (struct CadetConnection *cc,
281 * @param cls the `struct CadetConnection` to initiate 268 * @param cls the `struct CadetConnection` to initiate
282 */ 269 */
283static void 270static void
284send_create (void *cls); 271send_create (void *cls)
285
286
287/**
288 * We finished transmission of the create message, now wait for
289 * ACK or retransmit.
290 *
291 * @param cls the `struct CadetConnection` that sent the create message
292 */
293static void
294transmit_create_done_cb (void *cls)
295{ 272{
296 struct CadetConnection *cc = cls; 273 struct CadetConnection *cc = cls;
274 struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
275 struct GNUNET_PeerIdentity *pids;
276 struct GNUNET_MQ_Envelope *env;
277 unsigned int path_length;
297 278
279 cc->task = NULL;
280 GNUNET_assert (GNUNET_YES == cc->mqm_ready);
281 path_length = GCPP_get_length (cc->path);
282 env = GNUNET_MQ_msg_extra (create_msg,
283 path_length * sizeof (struct GNUNET_PeerIdentity),
284 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
285 create_msg->cid = cc->cid;
286 pids = (struct GNUNET_PeerIdentity *) &create_msg[1];
287 for (unsigned int i=0;i<path_length;i++)
288 pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
289 i));
290 cc->env = env;
291 cc->mqm_ready = GNUNET_NO;
298 cc->state = CADET_CONNECTION_SENT; 292 cc->state = CADET_CONNECTION_SENT;
299 cc->env = NULL; 293 GCP_send (cc->mq_man,
300 /* FIXME: at some point, we need to reset the delay back to 0! */ 294 env);
301 cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
302 cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
303 &send_create,
304 cc);
305} 295}
306 296
307 297
308/** 298/**
309 * Send a CREATE message to the first hop. 299 * Send a CREATE_ACK message towards the origin.
310 * 300 *
311 * @param cls the `struct CadetConnection` to initiate 301 * @param cls the `struct CadetConnection` to initiate
312 */ 302 */
313static void 303static void
314send_create (void *cls) 304send_create_ack (void *cls)
315{ 305{
316 struct CadetConnection *cc = cls; 306 struct CadetConnection *cc = cls;
317 struct GNUNET_CADET_ConnectionCreateMessage *create_msg; 307 struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
@@ -320,7 +310,7 @@ send_create (void *cls)
320 unsigned int path_length; 310 unsigned int path_length;
321 311
322 cc->task = NULL; 312 cc->task = NULL;
323 GNUNET_assert (NULL != cc->mq); 313 GNUNET_assert (GNUNET_YES == cc->mqm_ready);
324 path_length = GCPP_get_length (cc->path); 314 path_length = GCPP_get_length (cc->path);
325 env = GNUNET_MQ_msg_extra (create_msg, 315 env = GNUNET_MQ_msg_extra (create_msg,
326 path_length * sizeof (struct GNUNET_PeerIdentity), 316 path_length * sizeof (struct GNUNET_PeerIdentity),
@@ -331,11 +321,42 @@ send_create (void *cls)
331 pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path, 321 pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
332 i)); 322 i));
333 cc->env = env; 323 cc->env = env;
334 GNUNET_MQ_notify_sent (env, 324 cc->mqm_ready = GNUNET_NO;
335 &transmit_create_done_cb, 325 cc->state = CADET_CONNECTION_READY;
336 cc); 326 GCP_send (cc->mq_man,
337 GNUNET_MQ_send (cc->mq, 327 env);
338 env); 328}
329
330
331/**
332 * We got a #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE for a
333 * connection that we already have. Either our ACK got lost
334 * or something is fishy. Consider retransmitting the ACK.
335 *
336 * @param cc connection that got the duplicate CREATE
337 */
338void
339GCC_handle_duplicate_create (struct CadetConnection *cc)
340{
341 if (GNUNET_YES == cc->mqm_ready)
342 {
343 /* Tell tunnel that we are not ready for transmission anymore
344 (until CREATE_ACK is done) */
345 cc->ready_cb (cc->ready_cb_cls,
346 GNUNET_NO);
347
348 /* Revert back to the state of having only received the 'CREATE',
349 and immediately proceed to send the CREATE_ACK. */
350 cc->state = CADET_CONNECTION_CREATE_RECEIVED;
351 cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
352 cc);
353 }
354 else
355 {
356 /* We are currently sending something else back, which
357 can only be an ACK or payload, either of which would
358 do. So actually no need to do anything. */
359 }
339} 360}
340 361
341 362
@@ -344,34 +365,62 @@ send_create (void *cls)
344 * peer at the first hop. Adjust accordingly. 365 * peer at the first hop. Adjust accordingly.
345 * 366 *
346 * @param cls the `struct CadetConnection` 367 * @param cls the `struct CadetConnection`
347 * @param mq NULL if the CORE connection was lost, non-NULL if 368 * @param available #GNUNET_YES if sending is now possible,
348 * it became available 369 * #GNUNET_NO if sending is no longer possible
370 * #GNUNET_SYSERR if sending is no longer possible
371 * and the last envelope was discarded
349 */ 372 */
350static void 373static void
351manage_first_hop_mq (void *cls, 374manage_first_hop_mq (void *cls,
352 struct GNUNET_MQ_Handle *mq) 375 int available)
353{ 376{
354 struct CadetConnection *cc = cls; 377 struct CadetConnection *cc = cls;
355 378
356 if (NULL == mq) 379 if (GNUNET_YES != available)
357 { 380 {
358 /* Connection is down, for now... */ 381 /* Connection is down, for now... */
359 cc->mq = NULL; 382 cc->mqm_ready = GNUNET_NO;
383 cc->state = CADET_CONNECTION_NEW;
384 cc->retry_delay = GNUNET_TIME_UNIT_ZERO;
360 if (NULL != cc->task) 385 if (NULL != cc->task)
361 { 386 {
362 GNUNET_SCHEDULER_cancel (cc->task); 387 GNUNET_SCHEDULER_cancel (cc->task);
363 cc->task = NULL; 388 cc->task = NULL;
364 } 389 }
390 cc->ready_cb (cc->ready_cb_cls,
391 GNUNET_NO);
365 return; 392 return;
366 } 393 }
367 394
368 cc->mq = mq; 395 cc->mqm_ready = GNUNET_YES;
369 cc->state = CADET_CONNECTION_SENDING_CREATE; 396 switch (cc->state)
370 397 {
371 /* Now repeat sending connection creation messages 398 case CADET_CONNECTION_NEW:
372 down the path, until we get an ACK! */ 399 /* Transmit immediately */
373 cc->task = GNUNET_SCHEDULER_add_now (&send_create, 400 cc->task = GNUNET_SCHEDULER_add_now (&send_create,
374 cc); 401 cc);
402 break;
403 case CADET_CONNECTION_SENDING_CREATE:
404 /* Should not be possible to be called in this state. */
405 GNUNET_assert (0);
406 break;
407 case CADET_CONNECTION_SENT:
408 /* Retry a bit later... */
409 cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
410 cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
411 &send_create,
412 cc);
413 break;
414 case CADET_CONNECTION_CREATE_RECEIVED:
415 /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */
416 cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
417 cc);
418 break;
419 case CADET_CONNECTION_READY:
420 cc->ready_cb (cc->ready_cb_cls,
421 GNUNET_YES);
422 break;
423 }
375} 424}
376 425
377 426
@@ -383,6 +432,7 @@ manage_first_hop_mq (void *cls,
383 * @param destination where to go 432 * @param destination where to go
384 * @param path which path to take (may not be the full path) 433 * @param path which path to take (may not be the full path)
385 * @param ct which tunnel uses this connection 434 * @param ct which tunnel uses this connection
435 * @param init_state initial state for the connection
386 * @param ready_cb function to call when ready to transmit 436 * @param ready_cb function to call when ready to transmit
387 * @param ready_cb_cls closure for @a cb 437 * @param ready_cb_cls closure for @a cb
388 * @return handle to the connection 438 * @return handle to the connection
@@ -392,7 +442,8 @@ connection_create (struct CadetPeer *destination,
392 struct CadetPeerPath *path, 442 struct CadetPeerPath *path,
393 struct CadetTConnection *ct, 443 struct CadetTConnection *ct,
394 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, 444 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
395 GNUNET_SCHEDULER_TaskCallback ready_cb, 445 enum CadetConnectionState init_state,
446 GCC_ReadyCallback ready_cb,
396 void *ready_cb_cls) 447 void *ready_cb_cls)
397{ 448{
398 struct CadetConnection *cc; 449 struct CadetConnection *cc;
@@ -403,6 +454,7 @@ connection_create (struct CadetPeer *destination,
403 destination); 454 destination);
404 GNUNET_assert (UINT_MAX > off); 455 GNUNET_assert (UINT_MAX > off);
405 cc = GNUNET_new (struct CadetConnection); 456 cc = GNUNET_new (struct CadetConnection);
457 cc->state = init_state;
406 cc->ct = ct; 458 cc->ct = ct;
407 cc->cid = *cid; 459 cc->cid = *cid;
408 GNUNET_assert (GNUNET_OK == 460 GNUNET_assert (GNUNET_OK ==
@@ -448,19 +500,16 @@ GCC_create_inbound (struct CadetPeer *destination,
448 struct CadetPeerPath *path, 500 struct CadetPeerPath *path,
449 struct CadetTConnection *ct, 501 struct CadetTConnection *ct,
450 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, 502 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
451 GNUNET_SCHEDULER_TaskCallback ready_cb, 503 GCC_ReadyCallback ready_cb,
452 void *ready_cb_cls) 504 void *ready_cb_cls)
453{ 505{
454 struct CadetConnection *cc; 506 return connection_create (destination,
455 507 path,
456 cc = connection_create (destination, 508 ct,
457 path, 509 cid,
458 ct, 510 CADET_CONNECTION_CREATE_RECEIVED,
459 cid, 511 ready_cb,
460 ready_cb, 512 ready_cb_cls);
461 ready_cb_cls);
462 /* FIXME: send CREATE_ACK? */
463 return cc;
464} 513}
465 514
466 515
@@ -479,41 +528,21 @@ struct CadetConnection *
479GCC_create (struct CadetPeer *destination, 528GCC_create (struct CadetPeer *destination,
480 struct CadetPeerPath *path, 529 struct CadetPeerPath *path,
481 struct CadetTConnection *ct, 530 struct CadetTConnection *ct,
482 GNUNET_SCHEDULER_TaskCallback ready_cb, 531 GCC_ReadyCallback ready_cb,
483 void *ready_cb_cls) 532 void *ready_cb_cls)
484{ 533{
485 struct GNUNET_CADET_ConnectionTunnelIdentifier cid; 534 struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
486 struct CadetConnection *cc;
487 535
488 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, 536 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
489 &cid, 537 &cid,
490 sizeof (cid)); 538 sizeof (cid));
491 cc = connection_create (destination, 539 return connection_create (destination,
492 path, 540 path,
493 ct, 541 ct,
494 &cid, 542 &cid,
495 ready_cb, 543 CADET_CONNECTION_NEW,
496 ready_cb_cls); 544 ready_cb,
497 /* FIXME: send CREATE? */ 545 ready_cb_cls);
498 return cc;
499}
500
501
502/**
503 * We finished transmission of a message, if we are still ready, tell
504 * the tunnel!
505 *
506 * @param cls our `struct CadetConnection`
507 */
508static void
509transmit_done_cb (void *cls)
510{
511 struct CadetConnection *cc = cls;
512
513 cc->env = NULL;
514 if ( (NULL != cc->mq) &&
515 (CADET_CONNECTION_READY == cc->state) )
516 cc->ready_cb (cc->ready_cb_cls);
517} 546}
518 547
519 548
@@ -524,21 +553,18 @@ transmit_done_cb (void *cls)
524 * connection is right now ready for transmission. 553 * connection is right now ready for transmission.
525 * 554 *
526 * @param cc connection identification 555 * @param cc connection identification
527 * @param env envelope with message to transmit 556 * @param env envelope with message to transmit; must NOT
557 * yet have a #GNUNET_MQ_notify_sent() callback attached to it
528 */ 558 */
529void 559void
530GCC_transmit (struct CadetConnection *cc, 560GCC_transmit (struct CadetConnection *cc,
531 struct GNUNET_MQ_Envelope *env) 561 struct GNUNET_MQ_Envelope *env)
532{ 562{
533 GNUNET_assert (NULL == cc->env); 563 GNUNET_assert (GNUNET_YES == cc->mqm_ready);
534 cc->env = env; 564 GNUNET_assert (CADET_CONNECTION_READY == cc->state);
535 GNUNET_MQ_notify_sent (env, 565 cc->mqm_ready = GNUNET_NO;
536 &transmit_done_cb, 566 GCP_send (cc->mq_man,
537 cc); 567 env);
538 if ( (NULL != cc->mq) &&
539 (CADET_CONNECTION_READY == cc->state) )
540 GNUNET_MQ_send (cc->mq,
541 env);
542} 568}
543 569
544 570
@@ -569,6 +595,39 @@ GCC_get_id (struct CadetConnection *cc)
569 595
570 596
571/** 597/**
598 * Get a (static) string for a connection.
599 *
600 * @param cc Connection.
601 */
602const char *
603GCC_2s (const struct CadetConnection *cc)
604{
605 static char buf[128];
606
607 if (NULL == cc)
608 return "Connection(NULL)";
609
610 if (NULL != cc->ct)
611 {
612 GNUNET_snprintf (buf,
613 sizeof (buf),
614 "Connection(%s(Tunnel(%s)))",
615 GNUNET_sh2s (&cc->cid.connection_of_tunnel),
616 GCT_2s (cc->ct->t));
617 return buf;
618 }
619 GNUNET_snprintf (buf,
620 sizeof (buf),
621 "Connection(%s(Tunnel(NULL)))",
622 GNUNET_sh2s (&cc->cid.connection_of_tunnel));
623 return buf;
624}
625
626
627#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
628
629
630/**
572 * Log connection info. 631 * Log connection info.
573 * 632 *
574 * @param cc connection 633 * @param cc connection
@@ -578,7 +637,29 @@ void
578GCC_debug (struct CadetConnection *cc, 637GCC_debug (struct CadetConnection *cc,
579 enum GNUNET_ErrorType level) 638 enum GNUNET_ErrorType level)
580{ 639{
581 GNUNET_break (0); // FIXME: implement... 640 int do_log;
641 char *s;
642
643 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
644 "cadet-con",
645 __FILE__, __FUNCTION__, __LINE__);
646 if (0 == do_log)
647 return;
648 if (NULL == cc)
649 {
650 LOG2 (level,
651 "Connection (NULL)\n");
652 return;
653 }
654 s = GCPP_2s (cc->path);
655 LOG2 (level,
656 "Connection %s to %s via path %s in state %d is %s\n",
657 GCC_2s (cc),
658 GCP_2s (cc->destination),
659 s,
660 cc->state,
661 (GNUNET_YES == cc->mqm_ready) ? "ready" : "busy");
662 GNUNET_free (s);
582} 663}
583 664
584/* end of gnunet-service-cadet-new_connection.c */ 665/* end of gnunet-service-cadet-new_connection.c */
diff --git a/src/cadet/gnunet-service-cadet-new_connection.h b/src/cadet/gnunet-service-cadet-new_connection.h
index f2364dea4..99426776d 100644
--- a/src/cadet/gnunet-service-cadet-new_connection.h
+++ b/src/cadet/gnunet-service-cadet-new_connection.h
@@ -33,14 +33,17 @@
33#include "gnunet-service-cadet-new_peer.h" 33#include "gnunet-service-cadet-new_peer.h"
34#include "cadet_protocol.h" 34#include "cadet_protocol.h"
35 35
36
36/** 37/**
37 * Is the given connection currently ready for transmission? 38 * Function called to notify tunnel about change in our readyness.
38 * 39 *
39 * @param cc connection to transmit on 40 * @param cls closure
40 * @return #GNUNET_YES if we could transmit 41 * @param is_ready #GNUNET_YES if the connection is now ready for transmission,
42 * #GNUNET_NO if the connection is no longer ready for transmission
41 */ 43 */
42int 44typedef void
43GCC_is_ready (struct CadetConnection *cc); 45(*GCC_ReadyCallback)(void *cls,
46 int is_ready);
44 47
45 48
46/** 49/**
@@ -67,7 +70,7 @@ struct CadetConnection *
67GCC_create (struct CadetPeer *destination, 70GCC_create (struct CadetPeer *destination,
68 struct CadetPeerPath *path, 71 struct CadetPeerPath *path,
69 struct CadetTConnection *ct, 72 struct CadetTConnection *ct,
70 GNUNET_SCHEDULER_TaskCallback ready_cb, 73 GCC_ReadyCallback ready_cb,
71 void *ready_cb_cls); 74 void *ready_cb_cls);
72 75
73 76
@@ -88,7 +91,7 @@ GCC_create_inbound (struct CadetPeer *destination,
88 struct CadetPeerPath *path, 91 struct CadetPeerPath *path,
89 struct CadetTConnection *ct, 92 struct CadetTConnection *ct,
90 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, 93 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
91 GNUNET_SCHEDULER_TaskCallback ready_cb, 94 GCC_ReadyCallback ready_cb,
92 void *ready_cb_cls); 95 void *ready_cb_cls);
93 96
94 97
@@ -171,6 +174,15 @@ GCC_get_id (struct CadetConnection *cc);
171 174
172 175
173/** 176/**
177 * Get a (static) string for a connection.
178 *
179 * @param cc Connection.
180 */
181const char *
182GCC_2s (const struct CadetConnection *cc);
183
184
185/**
174 * Log connection info. 186 * Log connection info.
175 * 187 *
176 * @param cc connection 188 * @param cc connection
diff --git a/src/cadet/gnunet-service-cadet-new_core.c b/src/cadet/gnunet-service-cadet-new_core.c
index 3d8406dc9..9ce4418de 100644
--- a/src/cadet/gnunet-service-cadet-new_core.c
+++ b/src/cadet/gnunet-service-cadet-new_core.c
@@ -25,6 +25,12 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 * 26 *
27 * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) 27 * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
28 *
29 * TODO:
30 * - pass encrypted ACK to connection (!)
31 * - given BROKEN messages, destroy paths (?)
32 * -
33 * - handle POLL (if needed)
28 */ 34 */
29#include "platform.h" 35#include "platform.h"
30#include "gnunet-service-cadet-new_core.h" 36#include "gnunet-service-cadet-new_core.h"
@@ -35,6 +41,54 @@
35#include "gnunet_core_service.h" 41#include "gnunet_core_service.h"
36#include "cadet_protocol.h" 42#include "cadet_protocol.h"
37 43
44/**
45 * Number of messages we are willing to buffer per route.
46 */
47#define ROUTE_BUFFER_SIZE 8
48
49
50/**
51 * Information we keep per direction for a route.
52 */
53struct RouteDirection
54{
55 /**
56 * Target peer.
57 */
58 struct CadetPeer *hop;
59
60 /**
61 * Route this direction is part of.
62 */
63 struct CadetRoute *my_route;
64
65 /**
66 * Message queue manager for @e hop.
67 */
68 struct GCP_MessageQueueManager *mqm;
69
70 /**
71 * Cyclic message buffer to @e hop.
72 */
73 struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
74
75 /**
76 * Next write offset to use to append messages to @e out_buffer.
77 */
78 unsigned int out_wpos;
79
80 /**
81 * Next read offset to use to retrieve messages from @e out_buffer.
82 */
83 unsigned int out_rpos;
84
85 /**
86 * Is @e mqm currently ready for transmission?
87 */
88 int is_ready;
89
90};
91
38 92
39/** 93/**
40 * Description of a segment of a `struct CadetConnection` at the 94 * Description of a segment of a `struct CadetConnection` at the
@@ -48,24 +102,14 @@ struct CadetRoute
48{ 102{
49 103
50 /** 104 /**
51 * Previous hop on this route. 105 * Information about the next hop on this route.
52 */ 106 */
53 struct CadetPeer *prev_hop; 107 struct RouteDirection next;
54 108
55 /** 109 /**
56 * Next hop on this route. 110 * Information about the previous hop on this route.
57 */ 111 */
58 struct CadetPeer *next_hop; 112 struct RouteDirection prev;
59
60 /**
61 * Message queue notifications for @e prev_hop.
62 */
63 struct GCP_MessageQueueManager *prev_mqm;
64
65 /**
66 * Message queue notifications for @e next_hop.
67 */
68 struct GCP_MessageQueueManager *next_mqm;
69 113
70 /** 114 /**
71 * Unique identifier for the connection that uses this route. 115 * Unique identifier for the connection that uses this route.
@@ -77,12 +121,6 @@ struct CadetRoute
77 */ 121 */
78 struct GNUNET_TIME_Absolute last_use; 122 struct GNUNET_TIME_Absolute last_use;
79 123
80 /**
81 * Counter, used to verify that both MQs are up when the route is
82 * initialized.
83 */
84 unsigned int up;
85
86}; 124};
87 125
88 126
@@ -125,6 +163,8 @@ route_message (struct CadetPeer *prev,
125 const struct GNUNET_MessageHeader *msg) 163 const struct GNUNET_MessageHeader *msg)
126{ 164{
127 struct CadetRoute *route; 165 struct CadetRoute *route;
166 struct RouteDirection *dir;
167 struct GNUNET_MQ_Envelope *env;
128 168
129 route = get_route (cid); 169 route = get_route (cid);
130 if (NULL == route) 170 if (NULL == route)
@@ -136,13 +176,33 @@ route_message (struct CadetPeer *prev,
136 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); 176 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
137 bm->cid = *cid; 177 bm->cid = *cid;
138 bm->peer1 = my_full_id; 178 bm->peer1 = my_full_id;
139 GCP_send (prev, 179 GCP_send_ooo (prev,
140 env); 180 env);
141 return; 181 return;
142 } 182 }
143 /* FIXME: support round-robin queue management here somewhere! */ 183 dir = (prev == route->prev.hop) ? &route->next : &route->prev;
144 GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop, 184 if (GNUNET_YES == dir->is_ready)
145 GNUNET_MQ_msg_copy (msg)); 185 {
186 dir->is_ready = GNUNET_NO;
187 GCP_send (dir->mqm,
188 GNUNET_MQ_msg_copy (msg));
189 return;
190 }
191 env = dir->out_buffer[dir->out_wpos];
192 if (NULL != env)
193 {
194 /* Queue full, drop earliest message in queue */
195 GNUNET_assert (dir->out_rpos == dir->out_wpos);
196 GNUNET_MQ_discard (env);
197 dir->out_rpos++;
198 if (ROUTE_BUFFER_SIZE == dir->out_rpos)
199 dir->out_rpos = 0;
200 }
201 env = GNUNET_MQ_msg_copy (msg);
202 dir->out_buffer[dir->out_wpos] = env;
203 dir->out_wpos++;
204 if (ROUTE_BUFFER_SIZE == dir->out_wpos)
205 dir->out_wpos = 0;
146} 206}
147 207
148 208
@@ -170,6 +230,29 @@ check_connection_create (void *cls,
170 230
171 231
172/** 232/**
233 * Free internal data of a route direction.
234 *
235 * @param dir direction to destroy (do NOT free memory of 'dir' itself)
236 */
237static void
238destroy_direction (struct RouteDirection *dir)
239{
240 for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
241 if (NULL != dir->out_buffer[i])
242 {
243 GNUNET_MQ_discard (dir->out_buffer[i]);
244 dir->out_buffer[i] = NULL;
245 }
246 if (NULL != dir->mqm)
247 {
248 GCP_request_mq_cancel (dir->mqm,
249 NULL);
250 dir->mqm = NULL;
251 }
252}
253
254
255/**
173 * Destroy our state for @a route. 256 * Destroy our state for @a route.
174 * 257 *
175 * @param route route to destroy 258 * @param route route to destroy
@@ -177,8 +260,8 @@ check_connection_create (void *cls,
177static void 260static void
178destroy_route (struct CadetRoute *route) 261destroy_route (struct CadetRoute *route)
179{ 262{
180 GCP_request_mq_cancel (route->next_mqm); 263 destroy_direction (&route->prev);
181 GCP_request_mq_cancel (route->prev_mqm); 264 destroy_direction (&route->next);
182 GNUNET_free (route); 265 GNUNET_free (route);
183} 266}
184 267
@@ -192,7 +275,7 @@ destroy_route (struct CadetRoute *route)
192 * @param peer2 another one of the peers where a link is broken 275 * @param peer2 another one of the peers where a link is broken
193 */ 276 */
194static void 277static void
195send_broken (struct CadetPeer *target, 278send_broken (struct RouteDirection *target,
196 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, 279 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
197 const struct GNUNET_PeerIdentity *peer1, 280 const struct GNUNET_PeerIdentity *peer1,
198 const struct GNUNET_PeerIdentity *peer2) 281 const struct GNUNET_PeerIdentity *peer2)
@@ -207,8 +290,9 @@ send_broken (struct CadetPeer *target,
207 bm->peer1 = *peer1; 290 bm->peer1 = *peer1;
208 if (NULL != peer2) 291 if (NULL != peer2)
209 bm->peer2 = *peer2; 292 bm->peer2 = *peer2;
210 GCP_send (target, 293 GCP_request_mq_cancel (target->mqm,
211 env); 294 env);
295 target->mqm = NULL;
212} 296}
213 297
214 298
@@ -218,53 +302,64 @@ send_broken (struct CadetPeer *target,
218 * be called immediately when we register, and then again 302 * be called immediately when we register, and then again
219 * later if the connection ever goes down. 303 * later if the connection ever goes down.
220 * 304 *
221 * @param cls the `struct CadetRoute` 305 * @param cls the `struct RouteDirection`
222 * @param mq the message queue, NULL if connection went down 306 * @param available #GNUNET_YES if sending is now possible,
307 * #GNUNET_NO if sending is no longer possible
308 * #GNUNET_SYSERR if sending is no longer possible
309 * and the last envelope was discarded
223 */ 310 */
224static void 311static void
225mqm_cr_destroy_prev (void *cls, 312dir_ready_cb (void *cls,
226 struct GNUNET_MQ_Handle *mq) 313 int ready)
227{ 314{
228 struct CadetRoute *route = cls; 315 struct RouteDirection *dir = cls;
316 struct CadetRoute *route = dir->my_route;
317 struct RouteDirection *odir;
229 318
230 if (NULL != mq) 319 if (GNUNET_YES == ready)
231 { 320 {
232 route->up |= 1; 321 struct GNUNET_MQ_Envelope *env;
322
323 dir->is_ready = GNUNET_YES;
324 if (NULL != (env = dir->out_buffer[dir->out_rpos]))
325 {
326 dir->out_buffer[dir->out_rpos] = NULL;
327 dir->out_rpos++;
328 if (ROUTE_BUFFER_SIZE == dir->out_rpos)
329 dir->out_rpos = 0;
330 dir->is_ready = GNUNET_NO;
331 GCP_send (dir->mqm,
332 env);
333 }
233 return; 334 return;
234 } 335 }
235 send_broken (route->next_hop, 336 odir = (dir == &route->next) ? &route->prev : &route->next;
337 send_broken (&route->next,
236 &route->cid, 338 &route->cid,
237 GCP_get_id (route->prev_hop), 339 GCP_get_id (odir->hop),
238 &my_full_id); 340 &my_full_id);
239 destroy_route (route); 341 destroy_route (route);
240} 342}
241 343
242 344
243/** 345/**
244 * Function called when the message queue to the previous hop 346 * Initialize one of the directions of a route.
245 * becomes available/unavailable. We expect this function to
246 * be called immediately when we register, and then again
247 * later if the connection ever goes down.
248 * 347 *
249 * @param cls the `struct CadetRoute` 348 * @param route route the direction belongs to
250 * @param mq the message queue, NULL if connection went down 349 * @param dir direction to initialize
350 * @param hop next hop on in the @a dir
251 */ 351 */
252static void 352static void
253mqm_cr_destroy_next (void *cls, 353dir_init (struct RouteDirection *dir,
254 struct GNUNET_MQ_Handle *mq) 354 struct CadetRoute *route,
355 struct CadetPeer *hop)
255{ 356{
256 struct CadetRoute *route = cls; 357 dir->hop = hop;
257 358 dir->my_route = route;
258 if (NULL != mq) 359 dir->mqm = GCP_request_mq (hop,
259 { 360 &dir_ready_cb,
260 route->up |= 2; 361 dir);
261 return; 362 GNUNET_assert (GNUNET_YES == dir->is_ready);
262 }
263 send_broken (route->prev_hop,
264 &route->cid,
265 GCP_get_id (route->next_hop),
266 &my_full_id);
267 destroy_route (route);
268} 363}
269 364
270 365
@@ -310,16 +405,28 @@ handle_connection_create (void *cls,
310 if (NULL != 405 if (NULL !=
311 get_route (&msg->cid)) 406 get_route (&msg->cid))
312 { 407 {
313 /* CID not chosen at random, collides */ 408 /* Duplicate CREATE, pass it on, previous one might have been lost! */
314 GNUNET_break_op (0); 409 route_message (sender,
410 &msg->cid,
411 &msg->header);
315 return; 412 return;
316 } 413 }
317 if (off == path_length - 1) 414 if (off == path_length - 1)
318 { 415 {
319 /* We are the destination, create connection */ 416 /* We are the destination, create connection */
417 struct CadetConnection *cc;
320 struct CadetPeerPath *path; 418 struct CadetPeerPath *path;
321 struct CadetPeer *origin; 419 struct CadetPeer *origin;
322 420
421 cc = GNUNET_CONTAINER_multishortmap_get (connections,
422 &msg->cid.connection_of_tunnel);
423 if (NULL != cc)
424 {
425 /* Duplicate CREATE, likely our ACK got lost, retransmit the ACK! */
426 GNUNET_break (0); // FIXME: not implemented!
427 return;
428 }
429
323 path = GCPP_get_path_from_route (path_length, 430 path = GCPP_get_path_from_route (path_length,
324 pids); 431 pids);
325 origin = GCP_get (&pids[0], 432 origin = GCP_get (&pids[0],
@@ -327,35 +434,37 @@ handle_connection_create (void *cls,
327 GCT_add_inbound_connection (GCT_create_tunnel (origin), 434 GCT_add_inbound_connection (GCT_create_tunnel (origin),
328 &msg->cid, 435 &msg->cid,
329 path); 436 path);
330
331 return; 437 return;
332 } 438 }
333 /* We are merely a hop on the way, check if we can support the route */ 439 /* We are merely a hop on the way, check if we can support the route */
334 next = GCP_get (&pids[off + 1], 440 next = GCP_get (&pids[off + 1],
335 GNUNET_NO); 441 GNUNET_NO);
336 if ( (NULL == next) || 442 if ( (NULL == next) ||
337 (NULL == GCP_get_mq (next)) ) 443 (GNUNET_NO == GCP_has_core_connection (next)) )
338 { 444 {
339 /* unworkable, send back BROKEN notification */ 445 /* unworkable, send back BROKEN notification */
340 send_broken (sender, 446 struct GNUNET_MQ_Envelope *env;
341 &msg->cid, 447 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
342 &pids[off + 1], 448
343 &my_full_id); 449 env = GNUNET_MQ_msg (bm,
450 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
451 bm->cid = msg->cid;
452 bm->peer1 = pids[off + 1];
453 bm->peer2 = my_full_id;
454 GCP_send_ooo (sender,
455 env);
344 return; 456 return;
345 } 457 }
346 458
347 /* Workable route, create routing entry */ 459 /* Workable route, create routing entry */
348 route = GNUNET_new (struct CadetRoute); 460 route = GNUNET_new (struct CadetRoute);
349 route->cid = msg->cid; 461 route->cid = msg->cid;
350 route->prev_mqm = GCP_request_mq (sender, 462 dir_init (&route->prev,
351 &mqm_cr_destroy_prev, 463 route,
352 route); 464 sender);
353 route->next_mqm = GCP_request_mq (next, 465 dir_init (&route->next,
354 &mqm_cr_destroy_next, 466 route,
355 route); 467 next);
356 route->prev_hop = sender;
357 route->next_hop = next;
358 GNUNET_assert ((1|2) == route->up);
359 GNUNET_assert (GNUNET_OK == 468 GNUNET_assert (GNUNET_OK ==
360 GNUNET_CONTAINER_multishortmap_put (routes, 469 GNUNET_CONTAINER_multishortmap_put (routes,
361 &route->cid.connection_of_tunnel, 470 &route->cid.connection_of_tunnel,
@@ -371,8 +480,8 @@ handle_connection_create (void *cls,
371 * @param msg Message itself. 480 * @param msg Message itself.
372 */ 481 */
373static void 482static void
374handle_connection_ack (void *cls, 483handle_connection_create_ack (void *cls,
375 const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) 484 const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
376{ 485{
377 struct CadetPeer *peer = cls; 486 struct CadetPeer *peer = cls;
378 struct CadetConnection *cc; 487 struct CadetConnection *cc;
@@ -734,7 +843,7 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
734 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 843 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
735 struct GNUNET_CADET_ConnectionCreateMessage, 844 struct GNUNET_CADET_ConnectionCreateMessage,
736 NULL), 845 NULL),
737 GNUNET_MQ_hd_fixed_size (connection_ack, 846 GNUNET_MQ_hd_fixed_size (connection_create_ack,
738 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, 847 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
739 struct GNUNET_CADET_ConnectionCreateMessageAckMessage, 848 struct GNUNET_CADET_ConnectionCreateMessageAckMessage,
740 NULL), 849 NULL),
diff --git a/src/cadet/gnunet-service-cadet-new_paths.c b/src/cadet/gnunet-service-cadet-new_paths.c
index 3f6edef39..3cfce337c 100644
--- a/src/cadet/gnunet-service-cadet-new_paths.c
+++ b/src/cadet/gnunet-service-cadet-new_paths.c
@@ -525,4 +525,33 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path,
525} 525}
526 526
527 527
528/**
529 * Convert a path to a human-readable string.
530 *
531 * @param path path to convert
532 * @return string, to be freed by caller (unlike other *_2s APIs!)
533 */
534char *
535GCPP_2s (struct CadetPeerPath *path)
536{
537 char *s;
538 char *old;
539
540 old = GNUNET_strdup ("");
541 for (unsigned int i = 0;
542 i < path->entries_length;
543 i++)
544 {
545 GNUNET_asprintf (&s,
546 "%s %s",
547 old,
548 GCP_2s (GCPP_get_peer_at_offset (path,
549 i)));
550 GNUNET_free_non_null (old);
551 old = s;
552 }
553 return old;
554}
555
556
528/* end of gnunet-service-cadet-new_paths.c */ 557/* end of gnunet-service-cadet-new_paths.c */
diff --git a/src/cadet/gnunet-service-cadet-new_paths.h b/src/cadet/gnunet-service-cadet-new_paths.h
index 6a864e8ec..5714368c7 100644
--- a/src/cadet/gnunet-service-cadet-new_paths.h
+++ b/src/cadet/gnunet-service-cadet-new_paths.h
@@ -169,4 +169,14 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path,
169 unsigned int off); 169 unsigned int off);
170 170
171 171
172/**
173 * Convert a path to a human-readable string.
174 *
175 * @param path path to convert
176 * @return string, to be freed by caller (unlike other *_2s APIs!)
177 */
178char *
179GCPP_2s (struct CadetPeerPath *p);
180
181
172#endif 182#endif
diff --git a/src/cadet/gnunet-service-cadet-new_peer.c b/src/cadet/gnunet-service-cadet-new_peer.c
index c57622181..5b978ff77 100644
--- a/src/cadet/gnunet-service-cadet-new_peer.c
+++ b/src/cadet/gnunet-service-cadet-new_peer.c
@@ -86,6 +86,11 @@ struct GCP_MessageQueueManager
86 */ 86 */
87 struct CadetPeer *cp; 87 struct CadetPeer *cp;
88 88
89 /**
90 * Envelope this manager would like to transmit once it is its turn.
91 */
92 struct GNUNET_MQ_Envelope *env;
93
89}; 94};
90 95
91 96
@@ -189,6 +194,11 @@ struct CadetPeer
189 unsigned int num_paths; 194 unsigned int num_paths;
190 195
191 /** 196 /**
197 * Number of message queue managers of this peer that have a message in waiting.
198 */
199 unsigned int mqm_ready_counter;
200
201 /**
192 * Current length of the @e path_heads and @path_tails arrays. 202 * Current length of the @e path_heads and @path_tails arrays.
193 * The arrays should be grown as needed. 203 * The arrays should be grown as needed.
194 */ 204 */
@@ -265,50 +275,121 @@ destroy_peer (void *cls)
265 275
266 276
267/** 277/**
268 * Get the message queue for peer @a cp. 278 * Set the message queue to @a mq for peer @a cp and notify watchers.
269 * 279 *
270 * @param cp peer to modify 280 * @param cp peer to modify
271 * @return message queue (can be NULL) 281 * @param mq message queue to set (can be NULL)
272 */ 282 */
273struct GNUNET_MQ_Handle * 283void
274GCP_get_mq (struct CadetPeer *cp) 284GCP_set_mq (struct CadetPeer *cp,
285 struct GNUNET_MQ_Handle *mq)
275{ 286{
276 return cp->core_mq; 287 cp->core_mq = mq;
288
289 for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
290 NULL != mqm;
291 mqm = mqm->next)
292 {
293 if (NULL == mq)
294 {
295 if (NULL != mqm->env)
296 {
297 GNUNET_MQ_discard (mqm->env);
298 mqm->env = NULL;
299 mqm->cb (mqm->cb_cls,
300 GNUNET_SYSERR);
301 }
302 else
303 {
304 mqm->cb (mqm->cb_cls,
305 GNUNET_NO);
306 }
307 }
308 else
309 {
310 GNUNET_assert (NULL == mqm->env);
311 mqm->cb (mqm->cb_cls,
312 GNUNET_YES);
313 }
314 }
277} 315}
278 316
279 317
280/** 318/**
281 * Set the message queue to @a mq for peer @a cp and notify watchers. 319 * Transmit current envelope from this @a mqm.
282 * 320 *
283 * @param cp peer to modify 321 * @param mqm mqm to transmit message for now
284 * @param mq message queue to set (can be NULL)
285 */ 322 */
286void 323static void
287GCP_set_mq (struct CadetPeer *cp, 324mqm_execute (struct GCP_MessageQueueManager *mqm)
288 struct GNUNET_MQ_Handle *mq)
289{ 325{
290 cp->core_mq = mq; 326 struct CadetPeer *cp = mqm->cp;
327
328 /* Move entry to the end of the DLL, to be fair. */
329 if (mqm != cp->mqm_tail)
330 {
331 GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
332 cp->mqm_tail,
333 mqm);
334 GNUNET_CONTAINER_DLL_insert_tail (cp->mqm_head,
335 cp->mqm_tail,
336 mqm);
337 }
338 GNUNET_MQ_send (cp->core_mq,
339 mqm->env);
340 mqm->env = NULL;
341 cp->mqm_ready_counter--;
342}
343
344
345/**
346 * Function called when CORE took one of the messages from
347 * a message queue manager and transmitted it.
348 *
349 * @param cls the `struct CadetPeeer` where we made progress
350 */
351static void
352mqm_send_done (void *cls)
353{
354 struct CadetPeer *cp = cls;
355
356 if (0 == cp->mqm_ready_counter)
357 return; /* nothing to do */
291 for (struct GCP_MessageQueueManager *mqm = cp->mqm_head; 358 for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
292 NULL != mqm; 359 NULL != mqm;
293 mqm = mqm->next) 360 mqm = mqm->next)
294 mqm->cb (mqm->cb_cls, 361 {
295 mq); 362 if (NULL == mqm->env)
363 continue;
364 mqm_execute (mqm);
365 return;
366 }
296} 367}
297 368
298 369
299/** 370/**
300 * Send the message in @a env to @a cp. 371 * Send the message in @a env to @a cp.
301 * 372 *
302 * @param cp the peer 373 * @param mqm the message queue manager to use for transmission
303 * @param env envelope with the message to send 374 * @param env envelope with the message to send; must NOT
375 * yet have a #GNUNET_MQ_notify_sent() callback attached to it
304 */ 376 */
305void 377void
306GCP_send (struct CadetPeer *cp, 378GCP_send (struct GCP_MessageQueueManager *mqm,
307 struct GNUNET_MQ_Envelope *env) 379 struct GNUNET_MQ_Envelope *env)
308{ 380{
381 struct CadetPeer *cp = mqm->cp;
382
309 GNUNET_assert (NULL != cp->core_mq); 383 GNUNET_assert (NULL != cp->core_mq);
310 GNUNET_MQ_send (cp->core_mq, 384 GNUNET_assert (NULL == mqm->env);
311 env); 385 GNUNET_MQ_notify_sent (env,
386 &mqm_send_done,
387 cp);
388 mqm->env = env;
389 cp->mqm_ready_counter++;
390 if (0 != GNUNET_MQ_get_length (cp->core_mq))
391 return;
392 mqm_execute (mqm);
312} 393}
313 394
314 395
@@ -865,6 +946,19 @@ GCP_drop_tunnel (struct CadetPeer *peer,
865 946
866 947
867/** 948/**
949 * Test if @a cp has a core-level connection
950 *
951 * @param cp peer to test
952 * @return #GNUNET_YES if @a cp has a core-level connection
953 */
954int
955GCP_has_core_connection (struct CadetPeer *cp)
956{
957 return (NULL != cp->core_mq) ? GNUNET_YES : GNUNET_NO;
958}
959
960
961/**
868 * Start message queue change notifications. 962 * Start message queue change notifications.
869 * 963 *
870 * @param cp peer to notify for 964 * @param cp peer to notify for
@@ -888,7 +982,7 @@ GCP_request_mq (struct CadetPeer *cp,
888 mqm); 982 mqm);
889 if (NULL != cp->core_mq) 983 if (NULL != cp->core_mq)
890 cb (cb_cls, 984 cb (cb_cls,
891 cp->core_mq); 985 GNUNET_YES);
892 return mqm; 986 return mqm;
893} 987}
894 988
@@ -897,12 +991,24 @@ GCP_request_mq (struct CadetPeer *cp,
897 * Stops message queue change notifications. 991 * Stops message queue change notifications.
898 * 992 *
899 * @param mqm handle matching request to cancel 993 * @param mqm handle matching request to cancel
994 * @param last_env final message to transmit, or NULL
900 */ 995 */
901void 996void
902GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm) 997GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
998 struct GNUNET_MQ_Envelope *last_env)
903{ 999{
904 struct CadetPeer *cp = mqm->cp; 1000 struct CadetPeer *cp = mqm->cp;
905 1001
1002 if (NULL != mqm->env)
1003 GNUNET_MQ_discard (mqm->env);
1004 if (NULL != last_env)
1005 {
1006 if (NULL != cp->core_mq)
1007 GNUNET_MQ_send (cp->core_mq,
1008 last_env);
1009 else
1010 GNUNET_MQ_discard (last_env);
1011 }
906 GNUNET_CONTAINER_DLL_remove (cp->mqm_head, 1012 GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
907 cp->mqm_tail, 1013 cp->mqm_tail,
908 mqm); 1014 mqm);
@@ -910,5 +1016,29 @@ GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm)
910} 1016}
911 1017
912 1018
1019/**
1020 * Send the message in @a env to @a cp, overriding queueing logic.
1021 * This function should only be used to send error messages outside
1022 * of flow and congestion control, similar to ICMP. Note that
1023 * the envelope may be silently discarded as well.
1024 *
1025 * @param cp peer to send the message to
1026 * @param env envelope with the message to send
1027 */
1028void
1029GCP_send_ooo (struct CadetPeer *cp,
1030 struct GNUNET_MQ_Envelope *env)
1031{
1032 if (NULL == cp->core_mq)
1033 {
1034 GNUNET_MQ_discard (env);
1035 return;
1036 }
1037 GNUNET_MQ_send (cp->core_mq,
1038 env);
1039}
1040
1041
1042
913 1043
914/* end of gnunet-service-cadet-new_peer.c */ 1044/* end of gnunet-service-cadet-new_peer.c */
diff --git a/src/cadet/gnunet-service-cadet-new_peer.h b/src/cadet/gnunet-service-cadet-new_peer.h
index 6b4ee1b56..c633f47e5 100644
--- a/src/cadet/gnunet-service-cadet-new_peer.h
+++ b/src/cadet/gnunet-service-cadet-new_peer.h
@@ -262,7 +262,12 @@ GCP_destroy_all_peers (void);
262 262
263/** 263/**
264 * Data structure used to track whom we have to notify about changes 264 * Data structure used to track whom we have to notify about changes
265 * to our message queue. 265 * in our ability to transmit to a given peer.
266 *
267 * All queue managers will be given equal chance for sending messages
268 * to @a cp. This construct this guarantees fairness for access to @a
269 * cp among the different message queues. Each connection or route
270 * will have its respective message queue managers for each direction.
266 */ 271 */
267struct GCP_MessageQueueManager; 272struct GCP_MessageQueueManager;
268 273
@@ -271,15 +276,19 @@ struct GCP_MessageQueueManager;
271 * Function to call with updated message queue object. 276 * Function to call with updated message queue object.
272 * 277 *
273 * @param cls closure 278 * @param cls closure
274 * @param mq NULL if MQ is gone, otherwise an active message queue 279 * @param available #GNUNET_YES if sending is now possible,
280 * #GNUNET_NO if sending is no longer possible
281 * #GNUNET_SYSERR if sending is no longer possible
282 * and the last envelope was discarded
275 */ 283 */
276typedef void 284typedef void
277(*GCP_MessageQueueNotificationCallback)(void *cls, 285(*GCP_MessageQueueNotificationCallback)(void *cls,
278 struct GNUNET_MQ_Handle *mq); 286 int available);
279 287
280 288
281/** 289/**
282 * Start message queue change notifications. 290 * Start message queue change notifications. Will create a new slot
291 * to manage the message queue to the given @a cp.
283 * 292 *
284 * @param cp peer to notify for 293 * @param cp peer to notify for
285 * @param cb function to call if mq becomes available or unavailable 294 * @param cb function to call if mq becomes available or unavailable
@@ -293,44 +302,67 @@ GCP_request_mq (struct CadetPeer *cp,
293 302
294 303
295/** 304/**
296 * Stops message queue change notifications. 305 * Test if @a cp has a core-level connection
297 * 306 *
298 * @param mqm handle matching request to cancel 307 * @param cp peer to test
308 * @return #GNUNET_YES if @a cp has a core-level connection
309 */
310int
311GCP_has_core_connection (struct CadetPeer *cp);
312
313
314/**
315 * Send the message in @a env via a @a mqm. Must only be called at
316 * most once after the respective
317 * #GCP_MessageQueueNotificationCallback was called with `available`
318 * set to #GNUNET_YES, and not after the callback was called with
319 * `available` set to #GNUNET_NO or #GNUNET_SYSERR.
320 *
321 * @param mqm message queue manager for the transmission
322 * @param env envelope with the message to send; must NOT
323 * yet have a #GNUNET_MQ_notify_sent() callback attached to it
299 */ 324 */
300void 325void
301GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm); 326GCP_send (struct GCP_MessageQueueManager *mqm,
327 struct GNUNET_MQ_Envelope *env);
302 328
303 329
304/** 330/**
305 * Set the message queue to @a mq for peer @a cp and notify watchers. 331 * Send the message in @a env to @a cp, overriding queueing logic.
332 * This function should only be used to send error messages outside
333 * of flow and congestion control, similar to ICMP. Note that
334 * the envelope may be silently discarded as well.
306 * 335 *
307 * @param cp peer to modify 336 * @param cp peer to send the message to
308 * @param mq message queue to set (can be NULL) 337 * @param env envelope with the message to send
309 */ 338 */
310void 339void
311GCP_set_mq (struct CadetPeer *cp, 340GCP_send_ooo (struct CadetPeer *cp,
312 struct GNUNET_MQ_Handle *mq); 341 struct GNUNET_MQ_Envelope *env);
313 342
314 343
315/** 344/**
316 * Get the message queue for peer @a cp. 345 * Stops message queue change notifications and sends a last message.
346 * In practice, this is implemented by sending that @a last_env
347 * message immediately (if any), ignoring queue order.
317 * 348 *
318 * @param cp peer to modify 349 * @param mqm handle matching request to cancel
319 * @return message queue (can be NULL) 350 * @param last_env final message to transmit, or NULL
320 */ 351 */
321struct GNUNET_MQ_Handle * 352void
322GCP_get_mq (struct CadetPeer *cp); 353GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
354 struct GNUNET_MQ_Envelope *last_env);
323 355
324 356
325/** 357/**
326 * Send the message in @a env to @a cp. 358 * Set the message queue to @a mq for peer @a cp and notify watchers.
327 * 359 *
328 * @param cp the peer 360 * @param cp peer to modify
329 * @param env envelope with the message to send 361 * @param mq message queue to set (can be NULL)
330 */ 362 */
331void 363void
332GCP_send (struct CadetPeer *cp, 364GCP_set_mq (struct CadetPeer *cp,
333 struct GNUNET_MQ_Envelope *env); 365 struct GNUNET_MQ_Handle *mq);
334 366
335 367
336#endif 368#endif
diff --git a/src/cadet/gnunet-service-cadet-new_tunnels.c b/src/cadet/gnunet-service-cadet-new_tunnels.c
index 9161c41f7..23b270b82 100644
--- a/src/cadet/gnunet-service-cadet-new_tunnels.c
+++ b/src/cadet/gnunet-service-cadet-new_tunnels.c
@@ -227,43 +227,6 @@ struct CadetTunnelAxolotl
227 227
228 228
229/** 229/**
230 * Entry in list of connections used by tunnel, with metadata.
231 */
232struct CadetTConnection
233{
234 /**
235 * Next in DLL.
236 */
237 struct CadetTConnection *next;
238
239 /**
240 * Prev in DLL.
241 */
242 struct CadetTConnection *prev;
243
244 /**
245 * Connection handle.
246 */
247 struct CadetConnection *cc;
248
249 /**
250 * Tunnel this connection belongs to.
251 */
252 struct CadetTunnel *t;
253
254 /**
255 * Creation time, to keep oldest connection alive.
256 */
257 struct GNUNET_TIME_Absolute created;
258
259 /**
260 * Connection throughput, to keep fastest connection alive.
261 */
262 uint32_t throughput;
263};
264
265
266/**
267 * Struct used to save messages in a non-ready tunnel to send once connected. 230 * Struct used to save messages in a non-ready tunnel to send once connected.
268 */ 231 */
269struct CadetTunnelQueueEntry 232struct CadetTunnelQueueEntry
@@ -1418,18 +1381,27 @@ destroy_tunnel (void *cls)
1418 1381
1419 1382
1420/** 1383/**
1421 * A connection is ready for transmission. Looks at our message queue 1384 * A connection is @a is_ready for transmission. Looks at our message
1422 * and if there is a message, sends it out via the connection. 1385 * queue and if there is a message, sends it out via the connection.
1423 * 1386 *
1424 * @param cls the `struct CadetTConnection` that is ready 1387 * @param cls the `struct CadetTConnection` that is @a is_ready
1388 * @param is_ready #GNUNET_YES if connection are now ready,
1389 * #GNUNET_NO if connection are no longer ready
1425 */ 1390 */
1426static void 1391static void
1427connection_ready_cb (void *cls) 1392connection_ready_cb (void *cls,
1393 int is_ready)
1428{ 1394{
1429 struct CadetTConnection *ct = cls; 1395 struct CadetTConnection *ct = cls;
1430 struct CadetTunnel *t = ct->t; 1396 struct CadetTunnel *t = ct->t;
1431 struct CadetTunnelQueueEntry *tq = t->tq_head; 1397 struct CadetTunnelQueueEntry *tq = t->tq_head;
1432 1398
1399 if (GNUNET_NO == ct->is_ready)
1400 {
1401 ct->is_ready = GNUNET_NO;
1402 return;
1403 }
1404 ct->is_ready = GNUNET_YES;
1433 if (NULL == tq) 1405 if (NULL == tq)
1434 return; /* no messages pending right now */ 1406 return; /* no messages pending right now */
1435 1407
@@ -1440,6 +1412,7 @@ connection_ready_cb (void *cls)
1440 tq); 1412 tq);
1441 if (NULL != tq->cid) 1413 if (NULL != tq->cid)
1442 *tq->cid = *GCC_get_id (ct->cc); 1414 *tq->cid = *GCC_get_id (ct->cc);
1415 ct->is_ready = GNUNET_NO;
1443 GCC_transmit (ct->cc, 1416 GCC_transmit (ct->cc,
1444 tq->env); 1417 tq->env);
1445 tq->cont (tq->cont_cls); 1418 tq->cont (tq->cont_cls);
@@ -1453,6 +1426,8 @@ connection_ready_cb (void *cls)
1453 * at our message queue and if there is a message, picks a connection 1426 * at our message queue and if there is a message, picks a connection
1454 * to send it on. 1427 * to send it on.
1455 * 1428 *
1429 * FIXME: yuck... Need better selection logic!
1430 *
1456 * @param t tunnel to process messages on 1431 * @param t tunnel to process messages on
1457 */ 1432 */
1458static void 1433static void
@@ -1465,11 +1440,14 @@ trigger_transmissions (struct CadetTunnel *t)
1465 for (ct = t->connection_head; 1440 for (ct = t->connection_head;
1466 NULL != ct; 1441 NULL != ct;
1467 ct = ct->next) 1442 ct = ct->next)
1468 if (GNUNET_YES == GCC_is_ready (ct->cc)) 1443 if (GNUNET_YES == ct->is_ready)
1469 break; 1444 break;
1470 if (NULL == ct) 1445 if (NULL == ct)
1471 return; /* no connections ready */ 1446 return; /* no connections ready */
1472 connection_ready_cb (ct); 1447
1448 /* FIXME: a bit hackish to do it like this... */
1449 connection_ready_cb (ct,
1450 GNUNET_YES);
1473} 1451}
1474 1452
1475 1453
@@ -1567,7 +1545,7 @@ consider_path_cb (void *cls,
1567 path, 1545 path,
1568 ct, 1546 ct,
1569 &connection_ready_cb, 1547 &connection_ready_cb,
1570 t); 1548 ct);
1571 /* FIXME: schedule job to kill connection (and path?) if it takes 1549 /* FIXME: schedule job to kill connection (and path?) if it takes
1572 too long to get ready! (And track performance data on how long 1550 too long to get ready! (And track performance data on how long
1573 other connections took with the tunnel!) 1551 other connections took with the tunnel!)