diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-01-19 15:52:22 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-01-19 15:52:22 +0100 |
commit | bc43d8978d8695ff97cc67b65c29769e9c7f8f0e (patch) | |
tree | 94d1428dc2b4e70d87d6521b487a92c6fc139827 /src | |
parent | bd3147503e27ddefcb6ba0dcb99c2b32947622a4 (diff) | |
download | gnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.tar.gz gnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.zip |
much work on connection/route/peer-level queue management
Diffstat (limited to 'src')
-rw-r--r-- | src/cadet/gnunet-service-cadet-new.h | 39 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_connection.c | 361 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_connection.h | 26 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_core.c | 267 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_paths.c | 29 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_paths.h | 10 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_peer.c | 172 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_peer.h | 76 | ||||
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_tunnels.c | 66 |
9 files changed, 732 insertions, 314 deletions
diff --git a/src/cadet/gnunet-service-cadet-new.h b/src/cadet/gnunet-service-cadet-new.h index 862a0f088..9f4667e23 100644 --- a/src/cadet/gnunet-service-cadet-new.h +++ b/src/cadet/gnunet-service-cadet-new.h | |||
@@ -105,7 +105,44 @@ struct CadetPeerPathEntry | |||
105 | /** | 105 | /** |
106 | * Entry in list of connections used by tunnel, with metadata. | 106 | * Entry in list of connections used by tunnel, with metadata. |
107 | */ | 107 | */ |
108 | struct CadetTConnection; | 108 | struct CadetTConnection |
109 | { | ||
110 | /** | ||
111 | * Next in DLL. | ||
112 | */ | ||
113 | struct CadetTConnection *next; | ||
114 | |||
115 | /** | ||
116 | * Prev in DLL. | ||
117 | */ | ||
118 | struct CadetTConnection *prev; | ||
119 | |||
120 | /** | ||
121 | * Connection handle. | ||
122 | */ | ||
123 | struct CadetConnection *cc; | ||
124 | |||
125 | /** | ||
126 | * Tunnel this connection belongs to. | ||
127 | */ | ||
128 | struct CadetTunnel *t; | ||
129 | |||
130 | /** | ||
131 | * Creation time, to keep oldest connection alive. | ||
132 | */ | ||
133 | struct GNUNET_TIME_Absolute created; | ||
134 | |||
135 | /** | ||
136 | * Connection throughput, to keep fastest connection alive. | ||
137 | */ | ||
138 | uint32_t throughput; | ||
139 | |||
140 | /** | ||
141 | * Is the connection currently ready for transmission? | ||
142 | */ | ||
143 | int is_ready; | ||
144 | }; | ||
145 | |||
109 | 146 | ||
110 | /** | 147 | /** |
111 | * Active path through the network (used by a tunnel). There may | 148 | * Active path through the network (used by a tunnel). There may |
diff --git a/src/cadet/gnunet-service-cadet-new_connection.c b/src/cadet/gnunet-service-cadet-new_connection.c index 5123f9d45..bf88d78e1 100644 --- a/src/cadet/gnunet-service-cadet-new_connection.c +++ b/src/cadet/gnunet-service-cadet-new_connection.c | |||
@@ -27,11 +27,8 @@ | |||
27 | * @author Christian Grothoff | 27 | * @author Christian Grothoff |
28 | * | 28 | * |
29 | * TODO: | 29 | * TODO: |
30 | * - congestion control | ||
31 | * - GCC_debug() | ||
32 | * - keepalive messages | 30 | * - keepalive messages |
33 | * - performance metrics | 31 | * - keep performance metrics (?) |
34 | * - back-off reset | ||
35 | */ | 32 | */ |
36 | #include "platform.h" | 33 | #include "platform.h" |
37 | #include "gnunet-service-cadet-new_channel.h" | 34 | #include "gnunet-service-cadet-new_channel.h" |
@@ -64,19 +61,16 @@ enum CadetConnectionState | |||
64 | CADET_CONNECTION_SENT, | 61 | CADET_CONNECTION_SENT, |
65 | 62 | ||
66 | /** | 63 | /** |
67 | * Connection confirmed, ready to carry traffic. | 64 | * We are an inbound connection, and received a CREATE. Need to |
65 | * send an CREATE_ACK back. | ||
68 | */ | 66 | */ |
69 | CADET_CONNECTION_READY, | 67 | CADET_CONNECTION_CREATE_RECEIVED, |
70 | 68 | ||
71 | /** | 69 | /** |
72 | * Connection to be destroyed, just waiting to empty queues. | 70 | * Connection confirmed, ready to carry traffic. |
73 | */ | 71 | */ |
74 | CADET_CONNECTION_DESTROYED, | 72 | CADET_CONNECTION_READY |
75 | 73 | ||
76 | /** | ||
77 | * Connection to be destroyed because of a distant peer, same as DESTROYED. | ||
78 | */ | ||
79 | CADET_CONNECTION_BROKEN | ||
80 | }; | 74 | }; |
81 | 75 | ||
82 | 76 | ||
@@ -112,11 +106,6 @@ struct CadetConnection | |||
112 | struct GNUNET_MQ_Envelope *env; | 106 | struct GNUNET_MQ_Envelope *env; |
113 | 107 | ||
114 | /** | 108 | /** |
115 | * Message queue to the first hop, or NULL if we have no connection yet. | ||
116 | */ | ||
117 | struct GNUNET_MQ_Handle *mq; | ||
118 | |||
119 | /** | ||
120 | * Handle for calling #GCP_request_mq_cancel() once we are finished. | 109 | * Handle for calling #GCP_request_mq_cancel() once we are finished. |
121 | */ | 110 | */ |
122 | struct GCP_MessageQueueManager *mq_man; | 111 | struct GCP_MessageQueueManager *mq_man; |
@@ -129,7 +118,7 @@ struct CadetConnection | |||
129 | /** | 118 | /** |
130 | * Function to call once we are ready to transmit. | 119 | * Function to call once we are ready to transmit. |
131 | */ | 120 | */ |
132 | GNUNET_SCHEDULER_TaskCallback ready_cb; | 121 | GCC_ReadyCallback ready_cb; |
133 | 122 | ||
134 | /** | 123 | /** |
135 | * Closure for @e ready_cb. | 124 | * Closure for @e ready_cb. |
@@ -151,22 +140,12 @@ struct CadetConnection | |||
151 | */ | 140 | */ |
152 | unsigned int off; | 141 | unsigned int off; |
153 | 142 | ||
154 | }; | 143 | /** |
155 | 144 | * Are we ready to transmit via @e mq_man right now? | |
145 | */ | ||
146 | int mqm_ready; | ||
156 | 147 | ||
157 | /** | 148 | }; |
158 | * Is the given connection currently ready for transmission? | ||
159 | * | ||
160 | * @param cc connection to transmit on | ||
161 | * @return #GNUNET_YES if we could transmit | ||
162 | */ | ||
163 | int | ||
164 | GCC_is_ready (struct CadetConnection *cc) | ||
165 | { | ||
166 | return ( (NULL != cc->mq) && | ||
167 | (CADET_CONNECTION_READY == cc->state) && | ||
168 | (NULL == cc->env) ) ? GNUNET_YES : GNUNET_NO; | ||
169 | } | ||
170 | 149 | ||
171 | 150 | ||
172 | /** | 151 | /** |
@@ -177,29 +156,19 @@ GCC_is_ready (struct CadetConnection *cc) | |||
177 | void | 156 | void |
178 | GCC_destroy (struct CadetConnection *cc) | 157 | GCC_destroy (struct CadetConnection *cc) |
179 | { | 158 | { |
180 | if (NULL != cc->env) | 159 | struct GNUNET_MQ_Envelope *env = NULL; |
181 | { | 160 | |
182 | if (NULL != cc->mq) | 161 | if (CADET_CONNECTION_SENDING_CREATE != cc->state) |
183 | GNUNET_MQ_send_cancel (cc->env); | ||
184 | else | ||
185 | GNUNET_MQ_discard (cc->env); | ||
186 | cc->env = NULL; | ||
187 | } | ||
188 | if ( (NULL != cc->mq) && | ||
189 | (CADET_CONNECTION_SENDING_CREATE != cc->state) ) | ||
190 | { | 162 | { |
191 | /* Need to notify next hop that we are down. */ | ||
192 | struct GNUNET_MQ_Envelope *env; | ||
193 | struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg; | 163 | struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg; |
194 | 164 | ||
165 | /* Need to notify next hop that we are down. */ | ||
195 | env = GNUNET_MQ_msg (destroy_msg, | 166 | env = GNUNET_MQ_msg (destroy_msg, |
196 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY); | 167 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY); |
197 | destroy_msg->cid = cc->cid; | 168 | destroy_msg->cid = cc->cid; |
198 | GNUNET_MQ_send (cc->mq, | ||
199 | env); | ||
200 | } | 169 | } |
201 | cc->mq = NULL; | 170 | GCP_request_mq_cancel (cc->mq_man, |
202 | GCP_request_mq_cancel (cc->mq_man); | 171 | env); |
203 | cc->mq_man = NULL; | 172 | cc->mq_man = NULL; |
204 | GCPP_del_connection (cc->path, | 173 | GCPP_del_connection (cc->path, |
205 | cc->off, | 174 | cc->off, |
@@ -234,14 +203,20 @@ GCC_get_ct (struct CadetConnection *cc) | |||
234 | void | 203 | void |
235 | GCC_handle_connection_ack (struct CadetConnection *cc) | 204 | GCC_handle_connection_ack (struct CadetConnection *cc) |
236 | { | 205 | { |
237 | GNUNET_SCHEDULER_cancel (cc->task); | 206 | if (NULL != cc->task) |
238 | #if FIXME | 207 | { |
208 | GNUNET_SCHEDULER_cancel (cc->task); | ||
209 | cc->task = NULL; | ||
210 | } | ||
211 | #if FIXME_KEEPALIVE | ||
239 | cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period, | 212 | cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period, |
240 | &send_keepalive, | 213 | &send_keepalive, |
241 | cc); | 214 | cc); |
242 | #endif | 215 | #endif |
243 | cc->state = CADET_CONNECTION_READY; | 216 | cc->state = CADET_CONNECTION_READY; |
244 | cc->ready_cb (cc->ready_cb_cls); | 217 | if (GNUNET_YES == cc->mqm_ready) |
218 | cc->ready_cb (cc->ready_cb_cls, | ||
219 | GNUNET_YES); | ||
245 | } | 220 | } |
246 | 221 | ||
247 | 222 | ||
@@ -255,6 +230,12 @@ void | |||
255 | GCC_handle_kx (struct CadetConnection *cc, | 230 | GCC_handle_kx (struct CadetConnection *cc, |
256 | const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg) | 231 | const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg) |
257 | { | 232 | { |
233 | if (CADET_CONNECTION_SENT == cc->state) | ||
234 | { | ||
235 | /* We didn't get the CREATE_ACK, but instead got payload. That's fine, | ||
236 | clearly something is working, so pretend we got an ACK. */ | ||
237 | GCC_handle_connection_ack (cc); | ||
238 | } | ||
258 | GCT_handle_kx (cc->ct, | 239 | GCT_handle_kx (cc->ct, |
259 | msg); | 240 | msg); |
260 | } | 241 | } |
@@ -270,6 +251,12 @@ void | |||
270 | GCC_handle_encrypted (struct CadetConnection *cc, | 251 | GCC_handle_encrypted (struct CadetConnection *cc, |
271 | const struct GNUNET_CADET_TunnelEncryptedMessage *msg) | 252 | const struct GNUNET_CADET_TunnelEncryptedMessage *msg) |
272 | { | 253 | { |
254 | if (CADET_CONNECTION_SENT == cc->state) | ||
255 | { | ||
256 | /* We didn't get the CREATE_ACK, but instead got payload. That's fine, | ||
257 | clearly something is working, so pretend we got an ACK. */ | ||
258 | GCC_handle_connection_ack (cc); | ||
259 | } | ||
273 | GCT_handle_encrypted (cc->ct, | 260 | GCT_handle_encrypted (cc->ct, |
274 | msg); | 261 | msg); |
275 | } | 262 | } |
@@ -281,37 +268,40 @@ GCC_handle_encrypted (struct CadetConnection *cc, | |||
281 | * @param cls the `struct CadetConnection` to initiate | 268 | * @param cls the `struct CadetConnection` to initiate |
282 | */ | 269 | */ |
283 | static void | 270 | static void |
284 | send_create (void *cls); | 271 | send_create (void *cls) |
285 | |||
286 | |||
287 | /** | ||
288 | * We finished transmission of the create message, now wait for | ||
289 | * ACK or retransmit. | ||
290 | * | ||
291 | * @param cls the `struct CadetConnection` that sent the create message | ||
292 | */ | ||
293 | static void | ||
294 | transmit_create_done_cb (void *cls) | ||
295 | { | 272 | { |
296 | struct CadetConnection *cc = cls; | 273 | struct CadetConnection *cc = cls; |
274 | struct GNUNET_CADET_ConnectionCreateMessage *create_msg; | ||
275 | struct GNUNET_PeerIdentity *pids; | ||
276 | struct GNUNET_MQ_Envelope *env; | ||
277 | unsigned int path_length; | ||
297 | 278 | ||
279 | cc->task = NULL; | ||
280 | GNUNET_assert (GNUNET_YES == cc->mqm_ready); | ||
281 | path_length = GCPP_get_length (cc->path); | ||
282 | env = GNUNET_MQ_msg_extra (create_msg, | ||
283 | path_length * sizeof (struct GNUNET_PeerIdentity), | ||
284 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE); | ||
285 | create_msg->cid = cc->cid; | ||
286 | pids = (struct GNUNET_PeerIdentity *) &create_msg[1]; | ||
287 | for (unsigned int i=0;i<path_length;i++) | ||
288 | pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path, | ||
289 | i)); | ||
290 | cc->env = env; | ||
291 | cc->mqm_ready = GNUNET_NO; | ||
298 | cc->state = CADET_CONNECTION_SENT; | 292 | cc->state = CADET_CONNECTION_SENT; |
299 | cc->env = NULL; | 293 | GCP_send (cc->mq_man, |
300 | /* FIXME: at some point, we need to reset the delay back to 0! */ | 294 | env); |
301 | cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay); | ||
302 | cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay, | ||
303 | &send_create, | ||
304 | cc); | ||
305 | } | 295 | } |
306 | 296 | ||
307 | 297 | ||
308 | /** | 298 | /** |
309 | * Send a CREATE message to the first hop. | 299 | * Send a CREATE_ACK message towards the origin. |
310 | * | 300 | * |
311 | * @param cls the `struct CadetConnection` to initiate | 301 | * @param cls the `struct CadetConnection` to initiate |
312 | */ | 302 | */ |
313 | static void | 303 | static void |
314 | send_create (void *cls) | 304 | send_create_ack (void *cls) |
315 | { | 305 | { |
316 | struct CadetConnection *cc = cls; | 306 | struct CadetConnection *cc = cls; |
317 | struct GNUNET_CADET_ConnectionCreateMessage *create_msg; | 307 | struct GNUNET_CADET_ConnectionCreateMessage *create_msg; |
@@ -320,7 +310,7 @@ send_create (void *cls) | |||
320 | unsigned int path_length; | 310 | unsigned int path_length; |
321 | 311 | ||
322 | cc->task = NULL; | 312 | cc->task = NULL; |
323 | GNUNET_assert (NULL != cc->mq); | 313 | GNUNET_assert (GNUNET_YES == cc->mqm_ready); |
324 | path_length = GCPP_get_length (cc->path); | 314 | path_length = GCPP_get_length (cc->path); |
325 | env = GNUNET_MQ_msg_extra (create_msg, | 315 | env = GNUNET_MQ_msg_extra (create_msg, |
326 | path_length * sizeof (struct GNUNET_PeerIdentity), | 316 | path_length * sizeof (struct GNUNET_PeerIdentity), |
@@ -331,11 +321,42 @@ send_create (void *cls) | |||
331 | pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path, | 321 | pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path, |
332 | i)); | 322 | i)); |
333 | cc->env = env; | 323 | cc->env = env; |
334 | GNUNET_MQ_notify_sent (env, | 324 | cc->mqm_ready = GNUNET_NO; |
335 | &transmit_create_done_cb, | 325 | cc->state = CADET_CONNECTION_READY; |
336 | cc); | 326 | GCP_send (cc->mq_man, |
337 | GNUNET_MQ_send (cc->mq, | 327 | env); |
338 | env); | 328 | } |
329 | |||
330 | |||
331 | /** | ||
332 | * We got a #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE for a | ||
333 | * connection that we already have. Either our ACK got lost | ||
334 | * or something is fishy. Consider retransmitting the ACK. | ||
335 | * | ||
336 | * @param cc connection that got the duplicate CREATE | ||
337 | */ | ||
338 | void | ||
339 | GCC_handle_duplicate_create (struct CadetConnection *cc) | ||
340 | { | ||
341 | if (GNUNET_YES == cc->mqm_ready) | ||
342 | { | ||
343 | /* Tell tunnel that we are not ready for transmission anymore | ||
344 | (until CREATE_ACK is done) */ | ||
345 | cc->ready_cb (cc->ready_cb_cls, | ||
346 | GNUNET_NO); | ||
347 | |||
348 | /* Revert back to the state of having only received the 'CREATE', | ||
349 | and immediately proceed to send the CREATE_ACK. */ | ||
350 | cc->state = CADET_CONNECTION_CREATE_RECEIVED; | ||
351 | cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack, | ||
352 | cc); | ||
353 | } | ||
354 | else | ||
355 | { | ||
356 | /* We are currently sending something else back, which | ||
357 | can only be an ACK or payload, either of which would | ||
358 | do. So actually no need to do anything. */ | ||
359 | } | ||
339 | } | 360 | } |
340 | 361 | ||
341 | 362 | ||
@@ -344,34 +365,62 @@ send_create (void *cls) | |||
344 | * peer at the first hop. Adjust accordingly. | 365 | * peer at the first hop. Adjust accordingly. |
345 | * | 366 | * |
346 | * @param cls the `struct CadetConnection` | 367 | * @param cls the `struct CadetConnection` |
347 | * @param mq NULL if the CORE connection was lost, non-NULL if | 368 | * @param available #GNUNET_YES if sending is now possible, |
348 | * it became available | 369 | * #GNUNET_NO if sending is no longer possible |
370 | * #GNUNET_SYSERR if sending is no longer possible | ||
371 | * and the last envelope was discarded | ||
349 | */ | 372 | */ |
350 | static void | 373 | static void |
351 | manage_first_hop_mq (void *cls, | 374 | manage_first_hop_mq (void *cls, |
352 | struct GNUNET_MQ_Handle *mq) | 375 | int available) |
353 | { | 376 | { |
354 | struct CadetConnection *cc = cls; | 377 | struct CadetConnection *cc = cls; |
355 | 378 | ||
356 | if (NULL == mq) | 379 | if (GNUNET_YES != available) |
357 | { | 380 | { |
358 | /* Connection is down, for now... */ | 381 | /* Connection is down, for now... */ |
359 | cc->mq = NULL; | 382 | cc->mqm_ready = GNUNET_NO; |
383 | cc->state = CADET_CONNECTION_NEW; | ||
384 | cc->retry_delay = GNUNET_TIME_UNIT_ZERO; | ||
360 | if (NULL != cc->task) | 385 | if (NULL != cc->task) |
361 | { | 386 | { |
362 | GNUNET_SCHEDULER_cancel (cc->task); | 387 | GNUNET_SCHEDULER_cancel (cc->task); |
363 | cc->task = NULL; | 388 | cc->task = NULL; |
364 | } | 389 | } |
390 | cc->ready_cb (cc->ready_cb_cls, | ||
391 | GNUNET_NO); | ||
365 | return; | 392 | return; |
366 | } | 393 | } |
367 | 394 | ||
368 | cc->mq = mq; | 395 | cc->mqm_ready = GNUNET_YES; |
369 | cc->state = CADET_CONNECTION_SENDING_CREATE; | 396 | switch (cc->state) |
370 | 397 | { | |
371 | /* Now repeat sending connection creation messages | 398 | case CADET_CONNECTION_NEW: |
372 | down the path, until we get an ACK! */ | 399 | /* Transmit immediately */ |
373 | cc->task = GNUNET_SCHEDULER_add_now (&send_create, | 400 | cc->task = GNUNET_SCHEDULER_add_now (&send_create, |
374 | cc); | 401 | cc); |
402 | break; | ||
403 | case CADET_CONNECTION_SENDING_CREATE: | ||
404 | /* Should not be possible to be called in this state. */ | ||
405 | GNUNET_assert (0); | ||
406 | break; | ||
407 | case CADET_CONNECTION_SENT: | ||
408 | /* Retry a bit later... */ | ||
409 | cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay); | ||
410 | cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay, | ||
411 | &send_create, | ||
412 | cc); | ||
413 | break; | ||
414 | case CADET_CONNECTION_CREATE_RECEIVED: | ||
415 | /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */ | ||
416 | cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack, | ||
417 | cc); | ||
418 | break; | ||
419 | case CADET_CONNECTION_READY: | ||
420 | cc->ready_cb (cc->ready_cb_cls, | ||
421 | GNUNET_YES); | ||
422 | break; | ||
423 | } | ||
375 | } | 424 | } |
376 | 425 | ||
377 | 426 | ||
@@ -383,6 +432,7 @@ manage_first_hop_mq (void *cls, | |||
383 | * @param destination where to go | 432 | * @param destination where to go |
384 | * @param path which path to take (may not be the full path) | 433 | * @param path which path to take (may not be the full path) |
385 | * @param ct which tunnel uses this connection | 434 | * @param ct which tunnel uses this connection |
435 | * @param init_state initial state for the connection | ||
386 | * @param ready_cb function to call when ready to transmit | 436 | * @param ready_cb function to call when ready to transmit |
387 | * @param ready_cb_cls closure for @a cb | 437 | * @param ready_cb_cls closure for @a cb |
388 | * @return handle to the connection | 438 | * @return handle to the connection |
@@ -392,7 +442,8 @@ connection_create (struct CadetPeer *destination, | |||
392 | struct CadetPeerPath *path, | 442 | struct CadetPeerPath *path, |
393 | struct CadetTConnection *ct, | 443 | struct CadetTConnection *ct, |
394 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, | 444 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, |
395 | GNUNET_SCHEDULER_TaskCallback ready_cb, | 445 | enum CadetConnectionState init_state, |
446 | GCC_ReadyCallback ready_cb, | ||
396 | void *ready_cb_cls) | 447 | void *ready_cb_cls) |
397 | { | 448 | { |
398 | struct CadetConnection *cc; | 449 | struct CadetConnection *cc; |
@@ -403,6 +454,7 @@ connection_create (struct CadetPeer *destination, | |||
403 | destination); | 454 | destination); |
404 | GNUNET_assert (UINT_MAX > off); | 455 | GNUNET_assert (UINT_MAX > off); |
405 | cc = GNUNET_new (struct CadetConnection); | 456 | cc = GNUNET_new (struct CadetConnection); |
457 | cc->state = init_state; | ||
406 | cc->ct = ct; | 458 | cc->ct = ct; |
407 | cc->cid = *cid; | 459 | cc->cid = *cid; |
408 | GNUNET_assert (GNUNET_OK == | 460 | GNUNET_assert (GNUNET_OK == |
@@ -448,19 +500,16 @@ GCC_create_inbound (struct CadetPeer *destination, | |||
448 | struct CadetPeerPath *path, | 500 | struct CadetPeerPath *path, |
449 | struct CadetTConnection *ct, | 501 | struct CadetTConnection *ct, |
450 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, | 502 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, |
451 | GNUNET_SCHEDULER_TaskCallback ready_cb, | 503 | GCC_ReadyCallback ready_cb, |
452 | void *ready_cb_cls) | 504 | void *ready_cb_cls) |
453 | { | 505 | { |
454 | struct CadetConnection *cc; | 506 | return connection_create (destination, |
455 | 507 | path, | |
456 | cc = connection_create (destination, | 508 | ct, |
457 | path, | 509 | cid, |
458 | ct, | 510 | CADET_CONNECTION_CREATE_RECEIVED, |
459 | cid, | 511 | ready_cb, |
460 | ready_cb, | 512 | ready_cb_cls); |
461 | ready_cb_cls); | ||
462 | /* FIXME: send CREATE_ACK? */ | ||
463 | return cc; | ||
464 | } | 513 | } |
465 | 514 | ||
466 | 515 | ||
@@ -479,41 +528,21 @@ struct CadetConnection * | |||
479 | GCC_create (struct CadetPeer *destination, | 528 | GCC_create (struct CadetPeer *destination, |
480 | struct CadetPeerPath *path, | 529 | struct CadetPeerPath *path, |
481 | struct CadetTConnection *ct, | 530 | struct CadetTConnection *ct, |
482 | GNUNET_SCHEDULER_TaskCallback ready_cb, | 531 | GCC_ReadyCallback ready_cb, |
483 | void *ready_cb_cls) | 532 | void *ready_cb_cls) |
484 | { | 533 | { |
485 | struct GNUNET_CADET_ConnectionTunnelIdentifier cid; | 534 | struct GNUNET_CADET_ConnectionTunnelIdentifier cid; |
486 | struct CadetConnection *cc; | ||
487 | 535 | ||
488 | GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, | 536 | GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, |
489 | &cid, | 537 | &cid, |
490 | sizeof (cid)); | 538 | sizeof (cid)); |
491 | cc = connection_create (destination, | 539 | return connection_create (destination, |
492 | path, | 540 | path, |
493 | ct, | 541 | ct, |
494 | &cid, | 542 | &cid, |
495 | ready_cb, | 543 | CADET_CONNECTION_NEW, |
496 | ready_cb_cls); | 544 | ready_cb, |
497 | /* FIXME: send CREATE? */ | 545 | ready_cb_cls); |
498 | return cc; | ||
499 | } | ||
500 | |||
501 | |||
502 | /** | ||
503 | * We finished transmission of a message, if we are still ready, tell | ||
504 | * the tunnel! | ||
505 | * | ||
506 | * @param cls our `struct CadetConnection` | ||
507 | */ | ||
508 | static void | ||
509 | transmit_done_cb (void *cls) | ||
510 | { | ||
511 | struct CadetConnection *cc = cls; | ||
512 | |||
513 | cc->env = NULL; | ||
514 | if ( (NULL != cc->mq) && | ||
515 | (CADET_CONNECTION_READY == cc->state) ) | ||
516 | cc->ready_cb (cc->ready_cb_cls); | ||
517 | } | 546 | } |
518 | 547 | ||
519 | 548 | ||
@@ -524,21 +553,18 @@ transmit_done_cb (void *cls) | |||
524 | * connection is right now ready for transmission. | 553 | * connection is right now ready for transmission. |
525 | * | 554 | * |
526 | * @param cc connection identification | 555 | * @param cc connection identification |
527 | * @param env envelope with message to transmit | 556 | * @param env envelope with message to transmit; must NOT |
557 | * yet have a #GNUNET_MQ_notify_sent() callback attached to it | ||
528 | */ | 558 | */ |
529 | void | 559 | void |
530 | GCC_transmit (struct CadetConnection *cc, | 560 | GCC_transmit (struct CadetConnection *cc, |
531 | struct GNUNET_MQ_Envelope *env) | 561 | struct GNUNET_MQ_Envelope *env) |
532 | { | 562 | { |
533 | GNUNET_assert (NULL == cc->env); | 563 | GNUNET_assert (GNUNET_YES == cc->mqm_ready); |
534 | cc->env = env; | 564 | GNUNET_assert (CADET_CONNECTION_READY == cc->state); |
535 | GNUNET_MQ_notify_sent (env, | 565 | cc->mqm_ready = GNUNET_NO; |
536 | &transmit_done_cb, | 566 | GCP_send (cc->mq_man, |
537 | cc); | 567 | env); |
538 | if ( (NULL != cc->mq) && | ||
539 | (CADET_CONNECTION_READY == cc->state) ) | ||
540 | GNUNET_MQ_send (cc->mq, | ||
541 | env); | ||
542 | } | 568 | } |
543 | 569 | ||
544 | 570 | ||
@@ -569,6 +595,39 @@ GCC_get_id (struct CadetConnection *cc) | |||
569 | 595 | ||
570 | 596 | ||
571 | /** | 597 | /** |
598 | * Get a (static) string for a connection. | ||
599 | * | ||
600 | * @param cc Connection. | ||
601 | */ | ||
602 | const char * | ||
603 | GCC_2s (const struct CadetConnection *cc) | ||
604 | { | ||
605 | static char buf[128]; | ||
606 | |||
607 | if (NULL == cc) | ||
608 | return "Connection(NULL)"; | ||
609 | |||
610 | if (NULL != cc->ct) | ||
611 | { | ||
612 | GNUNET_snprintf (buf, | ||
613 | sizeof (buf), | ||
614 | "Connection(%s(Tunnel(%s)))", | ||
615 | GNUNET_sh2s (&cc->cid.connection_of_tunnel), | ||
616 | GCT_2s (cc->ct->t)); | ||
617 | return buf; | ||
618 | } | ||
619 | GNUNET_snprintf (buf, | ||
620 | sizeof (buf), | ||
621 | "Connection(%s(Tunnel(NULL)))", | ||
622 | GNUNET_sh2s (&cc->cid.connection_of_tunnel)); | ||
623 | return buf; | ||
624 | } | ||
625 | |||
626 | |||
627 | #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__) | ||
628 | |||
629 | |||
630 | /** | ||
572 | * Log connection info. | 631 | * Log connection info. |
573 | * | 632 | * |
574 | * @param cc connection | 633 | * @param cc connection |
@@ -578,7 +637,29 @@ void | |||
578 | GCC_debug (struct CadetConnection *cc, | 637 | GCC_debug (struct CadetConnection *cc, |
579 | enum GNUNET_ErrorType level) | 638 | enum GNUNET_ErrorType level) |
580 | { | 639 | { |
581 | GNUNET_break (0); // FIXME: implement... | 640 | int do_log; |
641 | char *s; | ||
642 | |||
643 | do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), | ||
644 | "cadet-con", | ||
645 | __FILE__, __FUNCTION__, __LINE__); | ||
646 | if (0 == do_log) | ||
647 | return; | ||
648 | if (NULL == cc) | ||
649 | { | ||
650 | LOG2 (level, | ||
651 | "Connection (NULL)\n"); | ||
652 | return; | ||
653 | } | ||
654 | s = GCPP_2s (cc->path); | ||
655 | LOG2 (level, | ||
656 | "Connection %s to %s via path %s in state %d is %s\n", | ||
657 | GCC_2s (cc), | ||
658 | GCP_2s (cc->destination), | ||
659 | s, | ||
660 | cc->state, | ||
661 | (GNUNET_YES == cc->mqm_ready) ? "ready" : "busy"); | ||
662 | GNUNET_free (s); | ||
582 | } | 663 | } |
583 | 664 | ||
584 | /* end of gnunet-service-cadet-new_connection.c */ | 665 | /* end of gnunet-service-cadet-new_connection.c */ |
diff --git a/src/cadet/gnunet-service-cadet-new_connection.h b/src/cadet/gnunet-service-cadet-new_connection.h index f2364dea4..99426776d 100644 --- a/src/cadet/gnunet-service-cadet-new_connection.h +++ b/src/cadet/gnunet-service-cadet-new_connection.h | |||
@@ -33,14 +33,17 @@ | |||
33 | #include "gnunet-service-cadet-new_peer.h" | 33 | #include "gnunet-service-cadet-new_peer.h" |
34 | #include "cadet_protocol.h" | 34 | #include "cadet_protocol.h" |
35 | 35 | ||
36 | |||
36 | /** | 37 | /** |
37 | * Is the given connection currently ready for transmission? | 38 | * Function called to notify tunnel about change in our readyness. |
38 | * | 39 | * |
39 | * @param cc connection to transmit on | 40 | * @param cls closure |
40 | * @return #GNUNET_YES if we could transmit | 41 | * @param is_ready #GNUNET_YES if the connection is now ready for transmission, |
42 | * #GNUNET_NO if the connection is no longer ready for transmission | ||
41 | */ | 43 | */ |
42 | int | 44 | typedef void |
43 | GCC_is_ready (struct CadetConnection *cc); | 45 | (*GCC_ReadyCallback)(void *cls, |
46 | int is_ready); | ||
44 | 47 | ||
45 | 48 | ||
46 | /** | 49 | /** |
@@ -67,7 +70,7 @@ struct CadetConnection * | |||
67 | GCC_create (struct CadetPeer *destination, | 70 | GCC_create (struct CadetPeer *destination, |
68 | struct CadetPeerPath *path, | 71 | struct CadetPeerPath *path, |
69 | struct CadetTConnection *ct, | 72 | struct CadetTConnection *ct, |
70 | GNUNET_SCHEDULER_TaskCallback ready_cb, | 73 | GCC_ReadyCallback ready_cb, |
71 | void *ready_cb_cls); | 74 | void *ready_cb_cls); |
72 | 75 | ||
73 | 76 | ||
@@ -88,7 +91,7 @@ GCC_create_inbound (struct CadetPeer *destination, | |||
88 | struct CadetPeerPath *path, | 91 | struct CadetPeerPath *path, |
89 | struct CadetTConnection *ct, | 92 | struct CadetTConnection *ct, |
90 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, | 93 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, |
91 | GNUNET_SCHEDULER_TaskCallback ready_cb, | 94 | GCC_ReadyCallback ready_cb, |
92 | void *ready_cb_cls); | 95 | void *ready_cb_cls); |
93 | 96 | ||
94 | 97 | ||
@@ -171,6 +174,15 @@ GCC_get_id (struct CadetConnection *cc); | |||
171 | 174 | ||
172 | 175 | ||
173 | /** | 176 | /** |
177 | * Get a (static) string for a connection. | ||
178 | * | ||
179 | * @param cc Connection. | ||
180 | */ | ||
181 | const char * | ||
182 | GCC_2s (const struct CadetConnection *cc); | ||
183 | |||
184 | |||
185 | /** | ||
174 | * Log connection info. | 186 | * Log connection info. |
175 | * | 187 | * |
176 | * @param cc connection | 188 | * @param cc connection |
diff --git a/src/cadet/gnunet-service-cadet-new_core.c b/src/cadet/gnunet-service-cadet-new_core.c index 3d8406dc9..9ce4418de 100644 --- a/src/cadet/gnunet-service-cadet-new_core.c +++ b/src/cadet/gnunet-service-cadet-new_core.c | |||
@@ -25,6 +25,12 @@ | |||
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | * | 26 | * |
27 | * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) | 27 | * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) |
28 | * | ||
29 | * TODO: | ||
30 | * - pass encrypted ACK to connection (!) | ||
31 | * - given BROKEN messages, destroy paths (?) | ||
32 | * - | ||
33 | * - handle POLL (if needed) | ||
28 | */ | 34 | */ |
29 | #include "platform.h" | 35 | #include "platform.h" |
30 | #include "gnunet-service-cadet-new_core.h" | 36 | #include "gnunet-service-cadet-new_core.h" |
@@ -35,6 +41,54 @@ | |||
35 | #include "gnunet_core_service.h" | 41 | #include "gnunet_core_service.h" |
36 | #include "cadet_protocol.h" | 42 | #include "cadet_protocol.h" |
37 | 43 | ||
44 | /** | ||
45 | * Number of messages we are willing to buffer per route. | ||
46 | */ | ||
47 | #define ROUTE_BUFFER_SIZE 8 | ||
48 | |||
49 | |||
50 | /** | ||
51 | * Information we keep per direction for a route. | ||
52 | */ | ||
53 | struct RouteDirection | ||
54 | { | ||
55 | /** | ||
56 | * Target peer. | ||
57 | */ | ||
58 | struct CadetPeer *hop; | ||
59 | |||
60 | /** | ||
61 | * Route this direction is part of. | ||
62 | */ | ||
63 | struct CadetRoute *my_route; | ||
64 | |||
65 | /** | ||
66 | * Message queue manager for @e hop. | ||
67 | */ | ||
68 | struct GCP_MessageQueueManager *mqm; | ||
69 | |||
70 | /** | ||
71 | * Cyclic message buffer to @e hop. | ||
72 | */ | ||
73 | struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE]; | ||
74 | |||
75 | /** | ||
76 | * Next write offset to use to append messages to @e out_buffer. | ||
77 | */ | ||
78 | unsigned int out_wpos; | ||
79 | |||
80 | /** | ||
81 | * Next read offset to use to retrieve messages from @e out_buffer. | ||
82 | */ | ||
83 | unsigned int out_rpos; | ||
84 | |||
85 | /** | ||
86 | * Is @e mqm currently ready for transmission? | ||
87 | */ | ||
88 | int is_ready; | ||
89 | |||
90 | }; | ||
91 | |||
38 | 92 | ||
39 | /** | 93 | /** |
40 | * Description of a segment of a `struct CadetConnection` at the | 94 | * Description of a segment of a `struct CadetConnection` at the |
@@ -48,24 +102,14 @@ struct CadetRoute | |||
48 | { | 102 | { |
49 | 103 | ||
50 | /** | 104 | /** |
51 | * Previous hop on this route. | 105 | * Information about the next hop on this route. |
52 | */ | 106 | */ |
53 | struct CadetPeer *prev_hop; | 107 | struct RouteDirection next; |
54 | 108 | ||
55 | /** | 109 | /** |
56 | * Next hop on this route. | 110 | * Information about the previous hop on this route. |
57 | */ | 111 | */ |
58 | struct CadetPeer *next_hop; | 112 | struct RouteDirection prev; |
59 | |||
60 | /** | ||
61 | * Message queue notifications for @e prev_hop. | ||
62 | */ | ||
63 | struct GCP_MessageQueueManager *prev_mqm; | ||
64 | |||
65 | /** | ||
66 | * Message queue notifications for @e next_hop. | ||
67 | */ | ||
68 | struct GCP_MessageQueueManager *next_mqm; | ||
69 | 113 | ||
70 | /** | 114 | /** |
71 | * Unique identifier for the connection that uses this route. | 115 | * Unique identifier for the connection that uses this route. |
@@ -77,12 +121,6 @@ struct CadetRoute | |||
77 | */ | 121 | */ |
78 | struct GNUNET_TIME_Absolute last_use; | 122 | struct GNUNET_TIME_Absolute last_use; |
79 | 123 | ||
80 | /** | ||
81 | * Counter, used to verify that both MQs are up when the route is | ||
82 | * initialized. | ||
83 | */ | ||
84 | unsigned int up; | ||
85 | |||
86 | }; | 124 | }; |
87 | 125 | ||
88 | 126 | ||
@@ -125,6 +163,8 @@ route_message (struct CadetPeer *prev, | |||
125 | const struct GNUNET_MessageHeader *msg) | 163 | const struct GNUNET_MessageHeader *msg) |
126 | { | 164 | { |
127 | struct CadetRoute *route; | 165 | struct CadetRoute *route; |
166 | struct RouteDirection *dir; | ||
167 | struct GNUNET_MQ_Envelope *env; | ||
128 | 168 | ||
129 | route = get_route (cid); | 169 | route = get_route (cid); |
130 | if (NULL == route) | 170 | if (NULL == route) |
@@ -136,13 +176,33 @@ route_message (struct CadetPeer *prev, | |||
136 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); | 176 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); |
137 | bm->cid = *cid; | 177 | bm->cid = *cid; |
138 | bm->peer1 = my_full_id; | 178 | bm->peer1 = my_full_id; |
139 | GCP_send (prev, | 179 | GCP_send_ooo (prev, |
140 | env); | 180 | env); |
141 | return; | 181 | return; |
142 | } | 182 | } |
143 | /* FIXME: support round-robin queue management here somewhere! */ | 183 | dir = (prev == route->prev.hop) ? &route->next : &route->prev; |
144 | GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop, | 184 | if (GNUNET_YES == dir->is_ready) |
145 | GNUNET_MQ_msg_copy (msg)); | 185 | { |
186 | dir->is_ready = GNUNET_NO; | ||
187 | GCP_send (dir->mqm, | ||
188 | GNUNET_MQ_msg_copy (msg)); | ||
189 | return; | ||
190 | } | ||
191 | env = dir->out_buffer[dir->out_wpos]; | ||
192 | if (NULL != env) | ||
193 | { | ||
194 | /* Queue full, drop earliest message in queue */ | ||
195 | GNUNET_assert (dir->out_rpos == dir->out_wpos); | ||
196 | GNUNET_MQ_discard (env); | ||
197 | dir->out_rpos++; | ||
198 | if (ROUTE_BUFFER_SIZE == dir->out_rpos) | ||
199 | dir->out_rpos = 0; | ||
200 | } | ||
201 | env = GNUNET_MQ_msg_copy (msg); | ||
202 | dir->out_buffer[dir->out_wpos] = env; | ||
203 | dir->out_wpos++; | ||
204 | if (ROUTE_BUFFER_SIZE == dir->out_wpos) | ||
205 | dir->out_wpos = 0; | ||
146 | } | 206 | } |
147 | 207 | ||
148 | 208 | ||
@@ -170,6 +230,29 @@ check_connection_create (void *cls, | |||
170 | 230 | ||
171 | 231 | ||
172 | /** | 232 | /** |
233 | * Free internal data of a route direction. | ||
234 | * | ||
235 | * @param dir direction to destroy (do NOT free memory of 'dir' itself) | ||
236 | */ | ||
237 | static void | ||
238 | destroy_direction (struct RouteDirection *dir) | ||
239 | { | ||
240 | for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++) | ||
241 | if (NULL != dir->out_buffer[i]) | ||
242 | { | ||
243 | GNUNET_MQ_discard (dir->out_buffer[i]); | ||
244 | dir->out_buffer[i] = NULL; | ||
245 | } | ||
246 | if (NULL != dir->mqm) | ||
247 | { | ||
248 | GCP_request_mq_cancel (dir->mqm, | ||
249 | NULL); | ||
250 | dir->mqm = NULL; | ||
251 | } | ||
252 | } | ||
253 | |||
254 | |||
255 | /** | ||
173 | * Destroy our state for @a route. | 256 | * Destroy our state for @a route. |
174 | * | 257 | * |
175 | * @param route route to destroy | 258 | * @param route route to destroy |
@@ -177,8 +260,8 @@ check_connection_create (void *cls, | |||
177 | static void | 260 | static void |
178 | destroy_route (struct CadetRoute *route) | 261 | destroy_route (struct CadetRoute *route) |
179 | { | 262 | { |
180 | GCP_request_mq_cancel (route->next_mqm); | 263 | destroy_direction (&route->prev); |
181 | GCP_request_mq_cancel (route->prev_mqm); | 264 | destroy_direction (&route->next); |
182 | GNUNET_free (route); | 265 | GNUNET_free (route); |
183 | } | 266 | } |
184 | 267 | ||
@@ -192,7 +275,7 @@ destroy_route (struct CadetRoute *route) | |||
192 | * @param peer2 another one of the peers where a link is broken | 275 | * @param peer2 another one of the peers where a link is broken |
193 | */ | 276 | */ |
194 | static void | 277 | static void |
195 | send_broken (struct CadetPeer *target, | 278 | send_broken (struct RouteDirection *target, |
196 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, | 279 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, |
197 | const struct GNUNET_PeerIdentity *peer1, | 280 | const struct GNUNET_PeerIdentity *peer1, |
198 | const struct GNUNET_PeerIdentity *peer2) | 281 | const struct GNUNET_PeerIdentity *peer2) |
@@ -207,8 +290,9 @@ send_broken (struct CadetPeer *target, | |||
207 | bm->peer1 = *peer1; | 290 | bm->peer1 = *peer1; |
208 | if (NULL != peer2) | 291 | if (NULL != peer2) |
209 | bm->peer2 = *peer2; | 292 | bm->peer2 = *peer2; |
210 | GCP_send (target, | 293 | GCP_request_mq_cancel (target->mqm, |
211 | env); | 294 | env); |
295 | target->mqm = NULL; | ||
212 | } | 296 | } |
213 | 297 | ||
214 | 298 | ||
@@ -218,53 +302,64 @@ send_broken (struct CadetPeer *target, | |||
218 | * be called immediately when we register, and then again | 302 | * be called immediately when we register, and then again |
219 | * later if the connection ever goes down. | 303 | * later if the connection ever goes down. |
220 | * | 304 | * |
221 | * @param cls the `struct CadetRoute` | 305 | * @param cls the `struct RouteDirection` |
222 | * @param mq the message queue, NULL if connection went down | 306 | * @param available #GNUNET_YES if sending is now possible, |
307 | * #GNUNET_NO if sending is no longer possible | ||
308 | * #GNUNET_SYSERR if sending is no longer possible | ||
309 | * and the last envelope was discarded | ||
223 | */ | 310 | */ |
224 | static void | 311 | static void |
225 | mqm_cr_destroy_prev (void *cls, | 312 | dir_ready_cb (void *cls, |
226 | struct GNUNET_MQ_Handle *mq) | 313 | int ready) |
227 | { | 314 | { |
228 | struct CadetRoute *route = cls; | 315 | struct RouteDirection *dir = cls; |
316 | struct CadetRoute *route = dir->my_route; | ||
317 | struct RouteDirection *odir; | ||
229 | 318 | ||
230 | if (NULL != mq) | 319 | if (GNUNET_YES == ready) |
231 | { | 320 | { |
232 | route->up |= 1; | 321 | struct GNUNET_MQ_Envelope *env; |
322 | |||
323 | dir->is_ready = GNUNET_YES; | ||
324 | if (NULL != (env = dir->out_buffer[dir->out_rpos])) | ||
325 | { | ||
326 | dir->out_buffer[dir->out_rpos] = NULL; | ||
327 | dir->out_rpos++; | ||
328 | if (ROUTE_BUFFER_SIZE == dir->out_rpos) | ||
329 | dir->out_rpos = 0; | ||
330 | dir->is_ready = GNUNET_NO; | ||
331 | GCP_send (dir->mqm, | ||
332 | env); | ||
333 | } | ||
233 | return; | 334 | return; |
234 | } | 335 | } |
235 | send_broken (route->next_hop, | 336 | odir = (dir == &route->next) ? &route->prev : &route->next; |
337 | send_broken (&route->next, | ||
236 | &route->cid, | 338 | &route->cid, |
237 | GCP_get_id (route->prev_hop), | 339 | GCP_get_id (odir->hop), |
238 | &my_full_id); | 340 | &my_full_id); |
239 | destroy_route (route); | 341 | destroy_route (route); |
240 | } | 342 | } |
241 | 343 | ||
242 | 344 | ||
243 | /** | 345 | /** |
244 | * Function called when the message queue to the previous hop | 346 | * Initialize one of the directions of a route. |
245 | * becomes available/unavailable. We expect this function to | ||
246 | * be called immediately when we register, and then again | ||
247 | * later if the connection ever goes down. | ||
248 | * | 347 | * |
249 | * @param cls the `struct CadetRoute` | 348 | * @param route route the direction belongs to |
250 | * @param mq the message queue, NULL if connection went down | 349 | * @param dir direction to initialize |
350 | * @param hop next hop on in the @a dir | ||
251 | */ | 351 | */ |
252 | static void | 352 | static void |
253 | mqm_cr_destroy_next (void *cls, | 353 | dir_init (struct RouteDirection *dir, |
254 | struct GNUNET_MQ_Handle *mq) | 354 | struct CadetRoute *route, |
355 | struct CadetPeer *hop) | ||
255 | { | 356 | { |
256 | struct CadetRoute *route = cls; | 357 | dir->hop = hop; |
257 | 358 | dir->my_route = route; | |
258 | if (NULL != mq) | 359 | dir->mqm = GCP_request_mq (hop, |
259 | { | 360 | &dir_ready_cb, |
260 | route->up |= 2; | 361 | dir); |
261 | return; | 362 | GNUNET_assert (GNUNET_YES == dir->is_ready); |
262 | } | ||
263 | send_broken (route->prev_hop, | ||
264 | &route->cid, | ||
265 | GCP_get_id (route->next_hop), | ||
266 | &my_full_id); | ||
267 | destroy_route (route); | ||
268 | } | 363 | } |
269 | 364 | ||
270 | 365 | ||
@@ -310,16 +405,28 @@ handle_connection_create (void *cls, | |||
310 | if (NULL != | 405 | if (NULL != |
311 | get_route (&msg->cid)) | 406 | get_route (&msg->cid)) |
312 | { | 407 | { |
313 | /* CID not chosen at random, collides */ | 408 | /* Duplicate CREATE, pass it on, previous one might have been lost! */ |
314 | GNUNET_break_op (0); | 409 | route_message (sender, |
410 | &msg->cid, | ||
411 | &msg->header); | ||
315 | return; | 412 | return; |
316 | } | 413 | } |
317 | if (off == path_length - 1) | 414 | if (off == path_length - 1) |
318 | { | 415 | { |
319 | /* We are the destination, create connection */ | 416 | /* We are the destination, create connection */ |
417 | struct CadetConnection *cc; | ||
320 | struct CadetPeerPath *path; | 418 | struct CadetPeerPath *path; |
321 | struct CadetPeer *origin; | 419 | struct CadetPeer *origin; |
322 | 420 | ||
421 | cc = GNUNET_CONTAINER_multishortmap_get (connections, | ||
422 | &msg->cid.connection_of_tunnel); | ||
423 | if (NULL != cc) | ||
424 | { | ||
425 | /* Duplicate CREATE, likely our ACK got lost, retransmit the ACK! */ | ||
426 | GNUNET_break (0); // FIXME: not implemented! | ||
427 | return; | ||
428 | } | ||
429 | |||
323 | path = GCPP_get_path_from_route (path_length, | 430 | path = GCPP_get_path_from_route (path_length, |
324 | pids); | 431 | pids); |
325 | origin = GCP_get (&pids[0], | 432 | origin = GCP_get (&pids[0], |
@@ -327,35 +434,37 @@ handle_connection_create (void *cls, | |||
327 | GCT_add_inbound_connection (GCT_create_tunnel (origin), | 434 | GCT_add_inbound_connection (GCT_create_tunnel (origin), |
328 | &msg->cid, | 435 | &msg->cid, |
329 | path); | 436 | path); |
330 | |||
331 | return; | 437 | return; |
332 | } | 438 | } |
333 | /* We are merely a hop on the way, check if we can support the route */ | 439 | /* We are merely a hop on the way, check if we can support the route */ |
334 | next = GCP_get (&pids[off + 1], | 440 | next = GCP_get (&pids[off + 1], |
335 | GNUNET_NO); | 441 | GNUNET_NO); |
336 | if ( (NULL == next) || | 442 | if ( (NULL == next) || |
337 | (NULL == GCP_get_mq (next)) ) | 443 | (GNUNET_NO == GCP_has_core_connection (next)) ) |
338 | { | 444 | { |
339 | /* unworkable, send back BROKEN notification */ | 445 | /* unworkable, send back BROKEN notification */ |
340 | send_broken (sender, | 446 | struct GNUNET_MQ_Envelope *env; |
341 | &msg->cid, | 447 | struct GNUNET_CADET_ConnectionBrokenMessage *bm; |
342 | &pids[off + 1], | 448 | |
343 | &my_full_id); | 449 | env = GNUNET_MQ_msg (bm, |
450 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); | ||
451 | bm->cid = msg->cid; | ||
452 | bm->peer1 = pids[off + 1]; | ||
453 | bm->peer2 = my_full_id; | ||
454 | GCP_send_ooo (sender, | ||
455 | env); | ||
344 | return; | 456 | return; |
345 | } | 457 | } |
346 | 458 | ||
347 | /* Workable route, create routing entry */ | 459 | /* Workable route, create routing entry */ |
348 | route = GNUNET_new (struct CadetRoute); | 460 | route = GNUNET_new (struct CadetRoute); |
349 | route->cid = msg->cid; | 461 | route->cid = msg->cid; |
350 | route->prev_mqm = GCP_request_mq (sender, | 462 | dir_init (&route->prev, |
351 | &mqm_cr_destroy_prev, | 463 | route, |
352 | route); | 464 | sender); |
353 | route->next_mqm = GCP_request_mq (next, | 465 | dir_init (&route->next, |
354 | &mqm_cr_destroy_next, | 466 | route, |
355 | route); | 467 | next); |
356 | route->prev_hop = sender; | ||
357 | route->next_hop = next; | ||
358 | GNUNET_assert ((1|2) == route->up); | ||
359 | GNUNET_assert (GNUNET_OK == | 468 | GNUNET_assert (GNUNET_OK == |
360 | GNUNET_CONTAINER_multishortmap_put (routes, | 469 | GNUNET_CONTAINER_multishortmap_put (routes, |
361 | &route->cid.connection_of_tunnel, | 470 | &route->cid.connection_of_tunnel, |
@@ -371,8 +480,8 @@ handle_connection_create (void *cls, | |||
371 | * @param msg Message itself. | 480 | * @param msg Message itself. |
372 | */ | 481 | */ |
373 | static void | 482 | static void |
374 | handle_connection_ack (void *cls, | 483 | handle_connection_create_ack (void *cls, |
375 | const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) | 484 | const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) |
376 | { | 485 | { |
377 | struct CadetPeer *peer = cls; | 486 | struct CadetPeer *peer = cls; |
378 | struct CadetConnection *cc; | 487 | struct CadetConnection *cc; |
@@ -734,7 +843,7 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c) | |||
734 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, | 843 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, |
735 | struct GNUNET_CADET_ConnectionCreateMessage, | 844 | struct GNUNET_CADET_ConnectionCreateMessage, |
736 | NULL), | 845 | NULL), |
737 | GNUNET_MQ_hd_fixed_size (connection_ack, | 846 | GNUNET_MQ_hd_fixed_size (connection_create_ack, |
738 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, | 847 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, |
739 | struct GNUNET_CADET_ConnectionCreateMessageAckMessage, | 848 | struct GNUNET_CADET_ConnectionCreateMessageAckMessage, |
740 | NULL), | 849 | NULL), |
diff --git a/src/cadet/gnunet-service-cadet-new_paths.c b/src/cadet/gnunet-service-cadet-new_paths.c index 3f6edef39..3cfce337c 100644 --- a/src/cadet/gnunet-service-cadet-new_paths.c +++ b/src/cadet/gnunet-service-cadet-new_paths.c | |||
@@ -525,4 +525,33 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path, | |||
525 | } | 525 | } |
526 | 526 | ||
527 | 527 | ||
528 | /** | ||
529 | * Convert a path to a human-readable string. | ||
530 | * | ||
531 | * @param path path to convert | ||
532 | * @return string, to be freed by caller (unlike other *_2s APIs!) | ||
533 | */ | ||
534 | char * | ||
535 | GCPP_2s (struct CadetPeerPath *path) | ||
536 | { | ||
537 | char *s; | ||
538 | char *old; | ||
539 | |||
540 | old = GNUNET_strdup (""); | ||
541 | for (unsigned int i = 0; | ||
542 | i < path->entries_length; | ||
543 | i++) | ||
544 | { | ||
545 | GNUNET_asprintf (&s, | ||
546 | "%s %s", | ||
547 | old, | ||
548 | GCP_2s (GCPP_get_peer_at_offset (path, | ||
549 | i))); | ||
550 | GNUNET_free_non_null (old); | ||
551 | old = s; | ||
552 | } | ||
553 | return old; | ||
554 | } | ||
555 | |||
556 | |||
528 | /* end of gnunet-service-cadet-new_paths.c */ | 557 | /* end of gnunet-service-cadet-new_paths.c */ |
diff --git a/src/cadet/gnunet-service-cadet-new_paths.h b/src/cadet/gnunet-service-cadet-new_paths.h index 6a864e8ec..5714368c7 100644 --- a/src/cadet/gnunet-service-cadet-new_paths.h +++ b/src/cadet/gnunet-service-cadet-new_paths.h | |||
@@ -169,4 +169,14 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path, | |||
169 | unsigned int off); | 169 | unsigned int off); |
170 | 170 | ||
171 | 171 | ||
172 | /** | ||
173 | * Convert a path to a human-readable string. | ||
174 | * | ||
175 | * @param path path to convert | ||
176 | * @return string, to be freed by caller (unlike other *_2s APIs!) | ||
177 | */ | ||
178 | char * | ||
179 | GCPP_2s (struct CadetPeerPath *p); | ||
180 | |||
181 | |||
172 | #endif | 182 | #endif |
diff --git a/src/cadet/gnunet-service-cadet-new_peer.c b/src/cadet/gnunet-service-cadet-new_peer.c index c57622181..5b978ff77 100644 --- a/src/cadet/gnunet-service-cadet-new_peer.c +++ b/src/cadet/gnunet-service-cadet-new_peer.c | |||
@@ -86,6 +86,11 @@ struct GCP_MessageQueueManager | |||
86 | */ | 86 | */ |
87 | struct CadetPeer *cp; | 87 | struct CadetPeer *cp; |
88 | 88 | ||
89 | /** | ||
90 | * Envelope this manager would like to transmit once it is its turn. | ||
91 | */ | ||
92 | struct GNUNET_MQ_Envelope *env; | ||
93 | |||
89 | }; | 94 | }; |
90 | 95 | ||
91 | 96 | ||
@@ -189,6 +194,11 @@ struct CadetPeer | |||
189 | unsigned int num_paths; | 194 | unsigned int num_paths; |
190 | 195 | ||
191 | /** | 196 | /** |
197 | * Number of message queue managers of this peer that have a message in waiting. | ||
198 | */ | ||
199 | unsigned int mqm_ready_counter; | ||
200 | |||
201 | /** | ||
192 | * Current length of the @e path_heads and @path_tails arrays. | 202 | * Current length of the @e path_heads and @path_tails arrays. |
193 | * The arrays should be grown as needed. | 203 | * The arrays should be grown as needed. |
194 | */ | 204 | */ |
@@ -265,50 +275,121 @@ destroy_peer (void *cls) | |||
265 | 275 | ||
266 | 276 | ||
267 | /** | 277 | /** |
268 | * Get the message queue for peer @a cp. | 278 | * Set the message queue to @a mq for peer @a cp and notify watchers. |
269 | * | 279 | * |
270 | * @param cp peer to modify | 280 | * @param cp peer to modify |
271 | * @return message queue (can be NULL) | 281 | * @param mq message queue to set (can be NULL) |
272 | */ | 282 | */ |
273 | struct GNUNET_MQ_Handle * | 283 | void |
274 | GCP_get_mq (struct CadetPeer *cp) | 284 | GCP_set_mq (struct CadetPeer *cp, |
285 | struct GNUNET_MQ_Handle *mq) | ||
275 | { | 286 | { |
276 | return cp->core_mq; | 287 | cp->core_mq = mq; |
288 | |||
289 | for (struct GCP_MessageQueueManager *mqm = cp->mqm_head; | ||
290 | NULL != mqm; | ||
291 | mqm = mqm->next) | ||
292 | { | ||
293 | if (NULL == mq) | ||
294 | { | ||
295 | if (NULL != mqm->env) | ||
296 | { | ||
297 | GNUNET_MQ_discard (mqm->env); | ||
298 | mqm->env = NULL; | ||
299 | mqm->cb (mqm->cb_cls, | ||
300 | GNUNET_SYSERR); | ||
301 | } | ||
302 | else | ||
303 | { | ||
304 | mqm->cb (mqm->cb_cls, | ||
305 | GNUNET_NO); | ||
306 | } | ||
307 | } | ||
308 | else | ||
309 | { | ||
310 | GNUNET_assert (NULL == mqm->env); | ||
311 | mqm->cb (mqm->cb_cls, | ||
312 | GNUNET_YES); | ||
313 | } | ||
314 | } | ||
277 | } | 315 | } |
278 | 316 | ||
279 | 317 | ||
280 | /** | 318 | /** |
281 | * Set the message queue to @a mq for peer @a cp and notify watchers. | 319 | * Transmit current envelope from this @a mqm. |
282 | * | 320 | * |
283 | * @param cp peer to modify | 321 | * @param mqm mqm to transmit message for now |
284 | * @param mq message queue to set (can be NULL) | ||
285 | */ | 322 | */ |
286 | void | 323 | static void |
287 | GCP_set_mq (struct CadetPeer *cp, | 324 | mqm_execute (struct GCP_MessageQueueManager *mqm) |
288 | struct GNUNET_MQ_Handle *mq) | ||
289 | { | 325 | { |
290 | cp->core_mq = mq; | 326 | struct CadetPeer *cp = mqm->cp; |
327 | |||
328 | /* Move entry to the end of the DLL, to be fair. */ | ||
329 | if (mqm != cp->mqm_tail) | ||
330 | { | ||
331 | GNUNET_CONTAINER_DLL_remove (cp->mqm_head, | ||
332 | cp->mqm_tail, | ||
333 | mqm); | ||
334 | GNUNET_CONTAINER_DLL_insert_tail (cp->mqm_head, | ||
335 | cp->mqm_tail, | ||
336 | mqm); | ||
337 | } | ||
338 | GNUNET_MQ_send (cp->core_mq, | ||
339 | mqm->env); | ||
340 | mqm->env = NULL; | ||
341 | cp->mqm_ready_counter--; | ||
342 | } | ||
343 | |||
344 | |||
345 | /** | ||
346 | * Function called when CORE took one of the messages from | ||
347 | * a message queue manager and transmitted it. | ||
348 | * | ||
349 | * @param cls the `struct CadetPeeer` where we made progress | ||
350 | */ | ||
351 | static void | ||
352 | mqm_send_done (void *cls) | ||
353 | { | ||
354 | struct CadetPeer *cp = cls; | ||
355 | |||
356 | if (0 == cp->mqm_ready_counter) | ||
357 | return; /* nothing to do */ | ||
291 | for (struct GCP_MessageQueueManager *mqm = cp->mqm_head; | 358 | for (struct GCP_MessageQueueManager *mqm = cp->mqm_head; |
292 | NULL != mqm; | 359 | NULL != mqm; |
293 | mqm = mqm->next) | 360 | mqm = mqm->next) |
294 | mqm->cb (mqm->cb_cls, | 361 | { |
295 | mq); | 362 | if (NULL == mqm->env) |
363 | continue; | ||
364 | mqm_execute (mqm); | ||
365 | return; | ||
366 | } | ||
296 | } | 367 | } |
297 | 368 | ||
298 | 369 | ||
299 | /** | 370 | /** |
300 | * Send the message in @a env to @a cp. | 371 | * Send the message in @a env to @a cp. |
301 | * | 372 | * |
302 | * @param cp the peer | 373 | * @param mqm the message queue manager to use for transmission |
303 | * @param env envelope with the message to send | 374 | * @param env envelope with the message to send; must NOT |
375 | * yet have a #GNUNET_MQ_notify_sent() callback attached to it | ||
304 | */ | 376 | */ |
305 | void | 377 | void |
306 | GCP_send (struct CadetPeer *cp, | 378 | GCP_send (struct GCP_MessageQueueManager *mqm, |
307 | struct GNUNET_MQ_Envelope *env) | 379 | struct GNUNET_MQ_Envelope *env) |
308 | { | 380 | { |
381 | struct CadetPeer *cp = mqm->cp; | ||
382 | |||
309 | GNUNET_assert (NULL != cp->core_mq); | 383 | GNUNET_assert (NULL != cp->core_mq); |
310 | GNUNET_MQ_send (cp->core_mq, | 384 | GNUNET_assert (NULL == mqm->env); |
311 | env); | 385 | GNUNET_MQ_notify_sent (env, |
386 | &mqm_send_done, | ||
387 | cp); | ||
388 | mqm->env = env; | ||
389 | cp->mqm_ready_counter++; | ||
390 | if (0 != GNUNET_MQ_get_length (cp->core_mq)) | ||
391 | return; | ||
392 | mqm_execute (mqm); | ||
312 | } | 393 | } |
313 | 394 | ||
314 | 395 | ||
@@ -865,6 +946,19 @@ GCP_drop_tunnel (struct CadetPeer *peer, | |||
865 | 946 | ||
866 | 947 | ||
867 | /** | 948 | /** |
949 | * Test if @a cp has a core-level connection | ||
950 | * | ||
951 | * @param cp peer to test | ||
952 | * @return #GNUNET_YES if @a cp has a core-level connection | ||
953 | */ | ||
954 | int | ||
955 | GCP_has_core_connection (struct CadetPeer *cp) | ||
956 | { | ||
957 | return (NULL != cp->core_mq) ? GNUNET_YES : GNUNET_NO; | ||
958 | } | ||
959 | |||
960 | |||
961 | /** | ||
868 | * Start message queue change notifications. | 962 | * Start message queue change notifications. |
869 | * | 963 | * |
870 | * @param cp peer to notify for | 964 | * @param cp peer to notify for |
@@ -888,7 +982,7 @@ GCP_request_mq (struct CadetPeer *cp, | |||
888 | mqm); | 982 | mqm); |
889 | if (NULL != cp->core_mq) | 983 | if (NULL != cp->core_mq) |
890 | cb (cb_cls, | 984 | cb (cb_cls, |
891 | cp->core_mq); | 985 | GNUNET_YES); |
892 | return mqm; | 986 | return mqm; |
893 | } | 987 | } |
894 | 988 | ||
@@ -897,12 +991,24 @@ GCP_request_mq (struct CadetPeer *cp, | |||
897 | * Stops message queue change notifications. | 991 | * Stops message queue change notifications. |
898 | * | 992 | * |
899 | * @param mqm handle matching request to cancel | 993 | * @param mqm handle matching request to cancel |
994 | * @param last_env final message to transmit, or NULL | ||
900 | */ | 995 | */ |
901 | void | 996 | void |
902 | GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm) | 997 | GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm, |
998 | struct GNUNET_MQ_Envelope *last_env) | ||
903 | { | 999 | { |
904 | struct CadetPeer *cp = mqm->cp; | 1000 | struct CadetPeer *cp = mqm->cp; |
905 | 1001 | ||
1002 | if (NULL != mqm->env) | ||
1003 | GNUNET_MQ_discard (mqm->env); | ||
1004 | if (NULL != last_env) | ||
1005 | { | ||
1006 | if (NULL != cp->core_mq) | ||
1007 | GNUNET_MQ_send (cp->core_mq, | ||
1008 | last_env); | ||
1009 | else | ||
1010 | GNUNET_MQ_discard (last_env); | ||
1011 | } | ||
906 | GNUNET_CONTAINER_DLL_remove (cp->mqm_head, | 1012 | GNUNET_CONTAINER_DLL_remove (cp->mqm_head, |
907 | cp->mqm_tail, | 1013 | cp->mqm_tail, |
908 | mqm); | 1014 | mqm); |
@@ -910,5 +1016,29 @@ GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm) | |||
910 | } | 1016 | } |
911 | 1017 | ||
912 | 1018 | ||
1019 | /** | ||
1020 | * Send the message in @a env to @a cp, overriding queueing logic. | ||
1021 | * This function should only be used to send error messages outside | ||
1022 | * of flow and congestion control, similar to ICMP. Note that | ||
1023 | * the envelope may be silently discarded as well. | ||
1024 | * | ||
1025 | * @param cp peer to send the message to | ||
1026 | * @param env envelope with the message to send | ||
1027 | */ | ||
1028 | void | ||
1029 | GCP_send_ooo (struct CadetPeer *cp, | ||
1030 | struct GNUNET_MQ_Envelope *env) | ||
1031 | { | ||
1032 | if (NULL == cp->core_mq) | ||
1033 | { | ||
1034 | GNUNET_MQ_discard (env); | ||
1035 | return; | ||
1036 | } | ||
1037 | GNUNET_MQ_send (cp->core_mq, | ||
1038 | env); | ||
1039 | } | ||
1040 | |||
1041 | |||
1042 | |||
913 | 1043 | ||
914 | /* end of gnunet-service-cadet-new_peer.c */ | 1044 | /* end of gnunet-service-cadet-new_peer.c */ |
diff --git a/src/cadet/gnunet-service-cadet-new_peer.h b/src/cadet/gnunet-service-cadet-new_peer.h index 6b4ee1b56..c633f47e5 100644 --- a/src/cadet/gnunet-service-cadet-new_peer.h +++ b/src/cadet/gnunet-service-cadet-new_peer.h | |||
@@ -262,7 +262,12 @@ GCP_destroy_all_peers (void); | |||
262 | 262 | ||
263 | /** | 263 | /** |
264 | * Data structure used to track whom we have to notify about changes | 264 | * Data structure used to track whom we have to notify about changes |
265 | * to our message queue. | 265 | * in our ability to transmit to a given peer. |
266 | * | ||
267 | * All queue managers will be given equal chance for sending messages | ||
268 | * to @a cp. This construct this guarantees fairness for access to @a | ||
269 | * cp among the different message queues. Each connection or route | ||
270 | * will have its respective message queue managers for each direction. | ||
266 | */ | 271 | */ |
267 | struct GCP_MessageQueueManager; | 272 | struct GCP_MessageQueueManager; |
268 | 273 | ||
@@ -271,15 +276,19 @@ struct GCP_MessageQueueManager; | |||
271 | * Function to call with updated message queue object. | 276 | * Function to call with updated message queue object. |
272 | * | 277 | * |
273 | * @param cls closure | 278 | * @param cls closure |
274 | * @param mq NULL if MQ is gone, otherwise an active message queue | 279 | * @param available #GNUNET_YES if sending is now possible, |
280 | * #GNUNET_NO if sending is no longer possible | ||
281 | * #GNUNET_SYSERR if sending is no longer possible | ||
282 | * and the last envelope was discarded | ||
275 | */ | 283 | */ |
276 | typedef void | 284 | typedef void |
277 | (*GCP_MessageQueueNotificationCallback)(void *cls, | 285 | (*GCP_MessageQueueNotificationCallback)(void *cls, |
278 | struct GNUNET_MQ_Handle *mq); | 286 | int available); |
279 | 287 | ||
280 | 288 | ||
281 | /** | 289 | /** |
282 | * Start message queue change notifications. | 290 | * Start message queue change notifications. Will create a new slot |
291 | * to manage the message queue to the given @a cp. | ||
283 | * | 292 | * |
284 | * @param cp peer to notify for | 293 | * @param cp peer to notify for |
285 | * @param cb function to call if mq becomes available or unavailable | 294 | * @param cb function to call if mq becomes available or unavailable |
@@ -293,44 +302,67 @@ GCP_request_mq (struct CadetPeer *cp, | |||
293 | 302 | ||
294 | 303 | ||
295 | /** | 304 | /** |
296 | * Stops message queue change notifications. | 305 | * Test if @a cp has a core-level connection |
297 | * | 306 | * |
298 | * @param mqm handle matching request to cancel | 307 | * @param cp peer to test |
308 | * @return #GNUNET_YES if @a cp has a core-level connection | ||
309 | */ | ||
310 | int | ||
311 | GCP_has_core_connection (struct CadetPeer *cp); | ||
312 | |||
313 | |||
314 | /** | ||
315 | * Send the message in @a env via a @a mqm. Must only be called at | ||
316 | * most once after the respective | ||
317 | * #GCP_MessageQueueNotificationCallback was called with `available` | ||
318 | * set to #GNUNET_YES, and not after the callback was called with | ||
319 | * `available` set to #GNUNET_NO or #GNUNET_SYSERR. | ||
320 | * | ||
321 | * @param mqm message queue manager for the transmission | ||
322 | * @param env envelope with the message to send; must NOT | ||
323 | * yet have a #GNUNET_MQ_notify_sent() callback attached to it | ||
299 | */ | 324 | */ |
300 | void | 325 | void |
301 | GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm); | 326 | GCP_send (struct GCP_MessageQueueManager *mqm, |
327 | struct GNUNET_MQ_Envelope *env); | ||
302 | 328 | ||
303 | 329 | ||
304 | /** | 330 | /** |
305 | * Set the message queue to @a mq for peer @a cp and notify watchers. | 331 | * Send the message in @a env to @a cp, overriding queueing logic. |
332 | * This function should only be used to send error messages outside | ||
333 | * of flow and congestion control, similar to ICMP. Note that | ||
334 | * the envelope may be silently discarded as well. | ||
306 | * | 335 | * |
307 | * @param cp peer to modify | 336 | * @param cp peer to send the message to |
308 | * @param mq message queue to set (can be NULL) | 337 | * @param env envelope with the message to send |
309 | */ | 338 | */ |
310 | void | 339 | void |
311 | GCP_set_mq (struct CadetPeer *cp, | 340 | GCP_send_ooo (struct CadetPeer *cp, |
312 | struct GNUNET_MQ_Handle *mq); | 341 | struct GNUNET_MQ_Envelope *env); |
313 | 342 | ||
314 | 343 | ||
315 | /** | 344 | /** |
316 | * Get the message queue for peer @a cp. | 345 | * Stops message queue change notifications and sends a last message. |
346 | * In practice, this is implemented by sending that @a last_env | ||
347 | * message immediately (if any), ignoring queue order. | ||
317 | * | 348 | * |
318 | * @param cp peer to modify | 349 | * @param mqm handle matching request to cancel |
319 | * @return message queue (can be NULL) | 350 | * @param last_env final message to transmit, or NULL |
320 | */ | 351 | */ |
321 | struct GNUNET_MQ_Handle * | 352 | void |
322 | GCP_get_mq (struct CadetPeer *cp); | 353 | GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm, |
354 | struct GNUNET_MQ_Envelope *last_env); | ||
323 | 355 | ||
324 | 356 | ||
325 | /** | 357 | /** |
326 | * Send the message in @a env to @a cp. | 358 | * Set the message queue to @a mq for peer @a cp and notify watchers. |
327 | * | 359 | * |
328 | * @param cp the peer | 360 | * @param cp peer to modify |
329 | * @param env envelope with the message to send | 361 | * @param mq message queue to set (can be NULL) |
330 | */ | 362 | */ |
331 | void | 363 | void |
332 | GCP_send (struct CadetPeer *cp, | 364 | GCP_set_mq (struct CadetPeer *cp, |
333 | struct GNUNET_MQ_Envelope *env); | 365 | struct GNUNET_MQ_Handle *mq); |
334 | 366 | ||
335 | 367 | ||
336 | #endif | 368 | #endif |
diff --git a/src/cadet/gnunet-service-cadet-new_tunnels.c b/src/cadet/gnunet-service-cadet-new_tunnels.c index 9161c41f7..23b270b82 100644 --- a/src/cadet/gnunet-service-cadet-new_tunnels.c +++ b/src/cadet/gnunet-service-cadet-new_tunnels.c | |||
@@ -227,43 +227,6 @@ struct CadetTunnelAxolotl | |||
227 | 227 | ||
228 | 228 | ||
229 | /** | 229 | /** |
230 | * Entry in list of connections used by tunnel, with metadata. | ||
231 | */ | ||
232 | struct CadetTConnection | ||
233 | { | ||
234 | /** | ||
235 | * Next in DLL. | ||
236 | */ | ||
237 | struct CadetTConnection *next; | ||
238 | |||
239 | /** | ||
240 | * Prev in DLL. | ||
241 | */ | ||
242 | struct CadetTConnection *prev; | ||
243 | |||
244 | /** | ||
245 | * Connection handle. | ||
246 | */ | ||
247 | struct CadetConnection *cc; | ||
248 | |||
249 | /** | ||
250 | * Tunnel this connection belongs to. | ||
251 | */ | ||
252 | struct CadetTunnel *t; | ||
253 | |||
254 | /** | ||
255 | * Creation time, to keep oldest connection alive. | ||
256 | */ | ||
257 | struct GNUNET_TIME_Absolute created; | ||
258 | |||
259 | /** | ||
260 | * Connection throughput, to keep fastest connection alive. | ||
261 | */ | ||
262 | uint32_t throughput; | ||
263 | }; | ||
264 | |||
265 | |||
266 | /** | ||
267 | * Struct used to save messages in a non-ready tunnel to send once connected. | 230 | * Struct used to save messages in a non-ready tunnel to send once connected. |
268 | */ | 231 | */ |
269 | struct CadetTunnelQueueEntry | 232 | struct CadetTunnelQueueEntry |
@@ -1418,18 +1381,27 @@ destroy_tunnel (void *cls) | |||
1418 | 1381 | ||
1419 | 1382 | ||
1420 | /** | 1383 | /** |
1421 | * A connection is ready for transmission. Looks at our message queue | 1384 | * A connection is @a is_ready for transmission. Looks at our message |
1422 | * and if there is a message, sends it out via the connection. | 1385 | * queue and if there is a message, sends it out via the connection. |
1423 | * | 1386 | * |
1424 | * @param cls the `struct CadetTConnection` that is ready | 1387 | * @param cls the `struct CadetTConnection` that is @a is_ready |
1388 | * @param is_ready #GNUNET_YES if connection are now ready, | ||
1389 | * #GNUNET_NO if connection are no longer ready | ||
1425 | */ | 1390 | */ |
1426 | static void | 1391 | static void |
1427 | connection_ready_cb (void *cls) | 1392 | connection_ready_cb (void *cls, |
1393 | int is_ready) | ||
1428 | { | 1394 | { |
1429 | struct CadetTConnection *ct = cls; | 1395 | struct CadetTConnection *ct = cls; |
1430 | struct CadetTunnel *t = ct->t; | 1396 | struct CadetTunnel *t = ct->t; |
1431 | struct CadetTunnelQueueEntry *tq = t->tq_head; | 1397 | struct CadetTunnelQueueEntry *tq = t->tq_head; |
1432 | 1398 | ||
1399 | if (GNUNET_NO == ct->is_ready) | ||
1400 | { | ||
1401 | ct->is_ready = GNUNET_NO; | ||
1402 | return; | ||
1403 | } | ||
1404 | ct->is_ready = GNUNET_YES; | ||
1433 | if (NULL == tq) | 1405 | if (NULL == tq) |
1434 | return; /* no messages pending right now */ | 1406 | return; /* no messages pending right now */ |
1435 | 1407 | ||
@@ -1440,6 +1412,7 @@ connection_ready_cb (void *cls) | |||
1440 | tq); | 1412 | tq); |
1441 | if (NULL != tq->cid) | 1413 | if (NULL != tq->cid) |
1442 | *tq->cid = *GCC_get_id (ct->cc); | 1414 | *tq->cid = *GCC_get_id (ct->cc); |
1415 | ct->is_ready = GNUNET_NO; | ||
1443 | GCC_transmit (ct->cc, | 1416 | GCC_transmit (ct->cc, |
1444 | tq->env); | 1417 | tq->env); |
1445 | tq->cont (tq->cont_cls); | 1418 | tq->cont (tq->cont_cls); |
@@ -1453,6 +1426,8 @@ connection_ready_cb (void *cls) | |||
1453 | * at our message queue and if there is a message, picks a connection | 1426 | * at our message queue and if there is a message, picks a connection |
1454 | * to send it on. | 1427 | * to send it on. |
1455 | * | 1428 | * |
1429 | * FIXME: yuck... Need better selection logic! | ||
1430 | * | ||
1456 | * @param t tunnel to process messages on | 1431 | * @param t tunnel to process messages on |
1457 | */ | 1432 | */ |
1458 | static void | 1433 | static void |
@@ -1465,11 +1440,14 @@ trigger_transmissions (struct CadetTunnel *t) | |||
1465 | for (ct = t->connection_head; | 1440 | for (ct = t->connection_head; |
1466 | NULL != ct; | 1441 | NULL != ct; |
1467 | ct = ct->next) | 1442 | ct = ct->next) |
1468 | if (GNUNET_YES == GCC_is_ready (ct->cc)) | 1443 | if (GNUNET_YES == ct->is_ready) |
1469 | break; | 1444 | break; |
1470 | if (NULL == ct) | 1445 | if (NULL == ct) |
1471 | return; /* no connections ready */ | 1446 | return; /* no connections ready */ |
1472 | connection_ready_cb (ct); | 1447 | |
1448 | /* FIXME: a bit hackish to do it like this... */ | ||
1449 | connection_ready_cb (ct, | ||
1450 | GNUNET_YES); | ||
1473 | } | 1451 | } |
1474 | 1452 | ||
1475 | 1453 | ||
@@ -1567,7 +1545,7 @@ consider_path_cb (void *cls, | |||
1567 | path, | 1545 | path, |
1568 | ct, | 1546 | ct, |
1569 | &connection_ready_cb, | 1547 | &connection_ready_cb, |
1570 | t); | 1548 | ct); |
1571 | /* FIXME: schedule job to kill connection (and path?) if it takes | 1549 | /* FIXME: schedule job to kill connection (and path?) if it takes |
1572 | too long to get ready! (And track performance data on how long | 1550 | too long to get ready! (And track performance data on how long |
1573 | other connections took with the tunnel!) | 1551 | other connections took with the tunnel!) |