diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-01-19 15:52:22 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-01-19 15:52:22 +0100 |
commit | bc43d8978d8695ff97cc67b65c29769e9c7f8f0e (patch) | |
tree | 94d1428dc2b4e70d87d6521b487a92c6fc139827 /src/cadet/gnunet-service-cadet-new_core.c | |
parent | bd3147503e27ddefcb6ba0dcb99c2b32947622a4 (diff) | |
download | gnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.tar.gz gnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.zip |
much work on connection/route/peer-level queue management
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_core.c')
-rw-r--r-- | src/cadet/gnunet-service-cadet-new_core.c | 267 |
1 files changed, 188 insertions, 79 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_core.c b/src/cadet/gnunet-service-cadet-new_core.c index 3d8406dc9..9ce4418de 100644 --- a/src/cadet/gnunet-service-cadet-new_core.c +++ b/src/cadet/gnunet-service-cadet-new_core.c | |||
@@ -25,6 +25,12 @@ | |||
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | * | 26 | * |
27 | * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) | 27 | * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) |
28 | * | ||
29 | * TODO: | ||
30 | * - pass encrypted ACK to connection (!) | ||
31 | * - given BROKEN messages, destroy paths (?) | ||
32 | * - | ||
33 | * - handle POLL (if needed) | ||
28 | */ | 34 | */ |
29 | #include "platform.h" | 35 | #include "platform.h" |
30 | #include "gnunet-service-cadet-new_core.h" | 36 | #include "gnunet-service-cadet-new_core.h" |
@@ -35,6 +41,54 @@ | |||
35 | #include "gnunet_core_service.h" | 41 | #include "gnunet_core_service.h" |
36 | #include "cadet_protocol.h" | 42 | #include "cadet_protocol.h" |
37 | 43 | ||
44 | /** | ||
45 | * Number of messages we are willing to buffer per route. | ||
46 | */ | ||
47 | #define ROUTE_BUFFER_SIZE 8 | ||
48 | |||
49 | |||
50 | /** | ||
51 | * Information we keep per direction for a route. | ||
52 | */ | ||
53 | struct RouteDirection | ||
54 | { | ||
55 | /** | ||
56 | * Target peer. | ||
57 | */ | ||
58 | struct CadetPeer *hop; | ||
59 | |||
60 | /** | ||
61 | * Route this direction is part of. | ||
62 | */ | ||
63 | struct CadetRoute *my_route; | ||
64 | |||
65 | /** | ||
66 | * Message queue manager for @e hop. | ||
67 | */ | ||
68 | struct GCP_MessageQueueManager *mqm; | ||
69 | |||
70 | /** | ||
71 | * Cyclic message buffer to @e hop. | ||
72 | */ | ||
73 | struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE]; | ||
74 | |||
75 | /** | ||
76 | * Next write offset to use to append messages to @e out_buffer. | ||
77 | */ | ||
78 | unsigned int out_wpos; | ||
79 | |||
80 | /** | ||
81 | * Next read offset to use to retrieve messages from @e out_buffer. | ||
82 | */ | ||
83 | unsigned int out_rpos; | ||
84 | |||
85 | /** | ||
86 | * Is @e mqm currently ready for transmission? | ||
87 | */ | ||
88 | int is_ready; | ||
89 | |||
90 | }; | ||
91 | |||
38 | 92 | ||
39 | /** | 93 | /** |
40 | * Description of a segment of a `struct CadetConnection` at the | 94 | * Description of a segment of a `struct CadetConnection` at the |
@@ -48,24 +102,14 @@ struct CadetRoute | |||
48 | { | 102 | { |
49 | 103 | ||
50 | /** | 104 | /** |
51 | * Previous hop on this route. | 105 | * Information about the next hop on this route. |
52 | */ | 106 | */ |
53 | struct CadetPeer *prev_hop; | 107 | struct RouteDirection next; |
54 | 108 | ||
55 | /** | 109 | /** |
56 | * Next hop on this route. | 110 | * Information about the previous hop on this route. |
57 | */ | 111 | */ |
58 | struct CadetPeer *next_hop; | 112 | struct RouteDirection prev; |
59 | |||
60 | /** | ||
61 | * Message queue notifications for @e prev_hop. | ||
62 | */ | ||
63 | struct GCP_MessageQueueManager *prev_mqm; | ||
64 | |||
65 | /** | ||
66 | * Message queue notifications for @e next_hop. | ||
67 | */ | ||
68 | struct GCP_MessageQueueManager *next_mqm; | ||
69 | 113 | ||
70 | /** | 114 | /** |
71 | * Unique identifier for the connection that uses this route. | 115 | * Unique identifier for the connection that uses this route. |
@@ -77,12 +121,6 @@ struct CadetRoute | |||
77 | */ | 121 | */ |
78 | struct GNUNET_TIME_Absolute last_use; | 122 | struct GNUNET_TIME_Absolute last_use; |
79 | 123 | ||
80 | /** | ||
81 | * Counter, used to verify that both MQs are up when the route is | ||
82 | * initialized. | ||
83 | */ | ||
84 | unsigned int up; | ||
85 | |||
86 | }; | 124 | }; |
87 | 125 | ||
88 | 126 | ||
@@ -125,6 +163,8 @@ route_message (struct CadetPeer *prev, | |||
125 | const struct GNUNET_MessageHeader *msg) | 163 | const struct GNUNET_MessageHeader *msg) |
126 | { | 164 | { |
127 | struct CadetRoute *route; | 165 | struct CadetRoute *route; |
166 | struct RouteDirection *dir; | ||
167 | struct GNUNET_MQ_Envelope *env; | ||
128 | 168 | ||
129 | route = get_route (cid); | 169 | route = get_route (cid); |
130 | if (NULL == route) | 170 | if (NULL == route) |
@@ -136,13 +176,33 @@ route_message (struct CadetPeer *prev, | |||
136 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); | 176 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); |
137 | bm->cid = *cid; | 177 | bm->cid = *cid; |
138 | bm->peer1 = my_full_id; | 178 | bm->peer1 = my_full_id; |
139 | GCP_send (prev, | 179 | GCP_send_ooo (prev, |
140 | env); | 180 | env); |
141 | return; | 181 | return; |
142 | } | 182 | } |
143 | /* FIXME: support round-robin queue management here somewhere! */ | 183 | dir = (prev == route->prev.hop) ? &route->next : &route->prev; |
144 | GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop, | 184 | if (GNUNET_YES == dir->is_ready) |
145 | GNUNET_MQ_msg_copy (msg)); | 185 | { |
186 | dir->is_ready = GNUNET_NO; | ||
187 | GCP_send (dir->mqm, | ||
188 | GNUNET_MQ_msg_copy (msg)); | ||
189 | return; | ||
190 | } | ||
191 | env = dir->out_buffer[dir->out_wpos]; | ||
192 | if (NULL != env) | ||
193 | { | ||
194 | /* Queue full, drop earliest message in queue */ | ||
195 | GNUNET_assert (dir->out_rpos == dir->out_wpos); | ||
196 | GNUNET_MQ_discard (env); | ||
197 | dir->out_rpos++; | ||
198 | if (ROUTE_BUFFER_SIZE == dir->out_rpos) | ||
199 | dir->out_rpos = 0; | ||
200 | } | ||
201 | env = GNUNET_MQ_msg_copy (msg); | ||
202 | dir->out_buffer[dir->out_wpos] = env; | ||
203 | dir->out_wpos++; | ||
204 | if (ROUTE_BUFFER_SIZE == dir->out_wpos) | ||
205 | dir->out_wpos = 0; | ||
146 | } | 206 | } |
147 | 207 | ||
148 | 208 | ||
@@ -170,6 +230,29 @@ check_connection_create (void *cls, | |||
170 | 230 | ||
171 | 231 | ||
172 | /** | 232 | /** |
233 | * Free internal data of a route direction. | ||
234 | * | ||
235 | * @param dir direction to destroy (do NOT free memory of 'dir' itself) | ||
236 | */ | ||
237 | static void | ||
238 | destroy_direction (struct RouteDirection *dir) | ||
239 | { | ||
240 | for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++) | ||
241 | if (NULL != dir->out_buffer[i]) | ||
242 | { | ||
243 | GNUNET_MQ_discard (dir->out_buffer[i]); | ||
244 | dir->out_buffer[i] = NULL; | ||
245 | } | ||
246 | if (NULL != dir->mqm) | ||
247 | { | ||
248 | GCP_request_mq_cancel (dir->mqm, | ||
249 | NULL); | ||
250 | dir->mqm = NULL; | ||
251 | } | ||
252 | } | ||
253 | |||
254 | |||
255 | /** | ||
173 | * Destroy our state for @a route. | 256 | * Destroy our state for @a route. |
174 | * | 257 | * |
175 | * @param route route to destroy | 258 | * @param route route to destroy |
@@ -177,8 +260,8 @@ check_connection_create (void *cls, | |||
177 | static void | 260 | static void |
178 | destroy_route (struct CadetRoute *route) | 261 | destroy_route (struct CadetRoute *route) |
179 | { | 262 | { |
180 | GCP_request_mq_cancel (route->next_mqm); | 263 | destroy_direction (&route->prev); |
181 | GCP_request_mq_cancel (route->prev_mqm); | 264 | destroy_direction (&route->next); |
182 | GNUNET_free (route); | 265 | GNUNET_free (route); |
183 | } | 266 | } |
184 | 267 | ||
@@ -192,7 +275,7 @@ destroy_route (struct CadetRoute *route) | |||
192 | * @param peer2 another one of the peers where a link is broken | 275 | * @param peer2 another one of the peers where a link is broken |
193 | */ | 276 | */ |
194 | static void | 277 | static void |
195 | send_broken (struct CadetPeer *target, | 278 | send_broken (struct RouteDirection *target, |
196 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, | 279 | const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, |
197 | const struct GNUNET_PeerIdentity *peer1, | 280 | const struct GNUNET_PeerIdentity *peer1, |
198 | const struct GNUNET_PeerIdentity *peer2) | 281 | const struct GNUNET_PeerIdentity *peer2) |
@@ -207,8 +290,9 @@ send_broken (struct CadetPeer *target, | |||
207 | bm->peer1 = *peer1; | 290 | bm->peer1 = *peer1; |
208 | if (NULL != peer2) | 291 | if (NULL != peer2) |
209 | bm->peer2 = *peer2; | 292 | bm->peer2 = *peer2; |
210 | GCP_send (target, | 293 | GCP_request_mq_cancel (target->mqm, |
211 | env); | 294 | env); |
295 | target->mqm = NULL; | ||
212 | } | 296 | } |
213 | 297 | ||
214 | 298 | ||
@@ -218,53 +302,64 @@ send_broken (struct CadetPeer *target, | |||
218 | * be called immediately when we register, and then again | 302 | * be called immediately when we register, and then again |
219 | * later if the connection ever goes down. | 303 | * later if the connection ever goes down. |
220 | * | 304 | * |
221 | * @param cls the `struct CadetRoute` | 305 | * @param cls the `struct RouteDirection` |
222 | * @param mq the message queue, NULL if connection went down | 306 | * @param available #GNUNET_YES if sending is now possible, |
307 | * #GNUNET_NO if sending is no longer possible | ||
308 | * #GNUNET_SYSERR if sending is no longer possible | ||
309 | * and the last envelope was discarded | ||
223 | */ | 310 | */ |
224 | static void | 311 | static void |
225 | mqm_cr_destroy_prev (void *cls, | 312 | dir_ready_cb (void *cls, |
226 | struct GNUNET_MQ_Handle *mq) | 313 | int ready) |
227 | { | 314 | { |
228 | struct CadetRoute *route = cls; | 315 | struct RouteDirection *dir = cls; |
316 | struct CadetRoute *route = dir->my_route; | ||
317 | struct RouteDirection *odir; | ||
229 | 318 | ||
230 | if (NULL != mq) | 319 | if (GNUNET_YES == ready) |
231 | { | 320 | { |
232 | route->up |= 1; | 321 | struct GNUNET_MQ_Envelope *env; |
322 | |||
323 | dir->is_ready = GNUNET_YES; | ||
324 | if (NULL != (env = dir->out_buffer[dir->out_rpos])) | ||
325 | { | ||
326 | dir->out_buffer[dir->out_rpos] = NULL; | ||
327 | dir->out_rpos++; | ||
328 | if (ROUTE_BUFFER_SIZE == dir->out_rpos) | ||
329 | dir->out_rpos = 0; | ||
330 | dir->is_ready = GNUNET_NO; | ||
331 | GCP_send (dir->mqm, | ||
332 | env); | ||
333 | } | ||
233 | return; | 334 | return; |
234 | } | 335 | } |
235 | send_broken (route->next_hop, | 336 | odir = (dir == &route->next) ? &route->prev : &route->next; |
337 | send_broken (&route->next, | ||
236 | &route->cid, | 338 | &route->cid, |
237 | GCP_get_id (route->prev_hop), | 339 | GCP_get_id (odir->hop), |
238 | &my_full_id); | 340 | &my_full_id); |
239 | destroy_route (route); | 341 | destroy_route (route); |
240 | } | 342 | } |
241 | 343 | ||
242 | 344 | ||
243 | /** | 345 | /** |
244 | * Function called when the message queue to the previous hop | 346 | * Initialize one of the directions of a route. |
245 | * becomes available/unavailable. We expect this function to | ||
246 | * be called immediately when we register, and then again | ||
247 | * later if the connection ever goes down. | ||
248 | * | 347 | * |
249 | * @param cls the `struct CadetRoute` | 348 | * @param route route the direction belongs to |
250 | * @param mq the message queue, NULL if connection went down | 349 | * @param dir direction to initialize |
350 | * @param hop next hop on in the @a dir | ||
251 | */ | 351 | */ |
252 | static void | 352 | static void |
253 | mqm_cr_destroy_next (void *cls, | 353 | dir_init (struct RouteDirection *dir, |
254 | struct GNUNET_MQ_Handle *mq) | 354 | struct CadetRoute *route, |
355 | struct CadetPeer *hop) | ||
255 | { | 356 | { |
256 | struct CadetRoute *route = cls; | 357 | dir->hop = hop; |
257 | 358 | dir->my_route = route; | |
258 | if (NULL != mq) | 359 | dir->mqm = GCP_request_mq (hop, |
259 | { | 360 | &dir_ready_cb, |
260 | route->up |= 2; | 361 | dir); |
261 | return; | 362 | GNUNET_assert (GNUNET_YES == dir->is_ready); |
262 | } | ||
263 | send_broken (route->prev_hop, | ||
264 | &route->cid, | ||
265 | GCP_get_id (route->next_hop), | ||
266 | &my_full_id); | ||
267 | destroy_route (route); | ||
268 | } | 363 | } |
269 | 364 | ||
270 | 365 | ||
@@ -310,16 +405,28 @@ handle_connection_create (void *cls, | |||
310 | if (NULL != | 405 | if (NULL != |
311 | get_route (&msg->cid)) | 406 | get_route (&msg->cid)) |
312 | { | 407 | { |
313 | /* CID not chosen at random, collides */ | 408 | /* Duplicate CREATE, pass it on, previous one might have been lost! */ |
314 | GNUNET_break_op (0); | 409 | route_message (sender, |
410 | &msg->cid, | ||
411 | &msg->header); | ||
315 | return; | 412 | return; |
316 | } | 413 | } |
317 | if (off == path_length - 1) | 414 | if (off == path_length - 1) |
318 | { | 415 | { |
319 | /* We are the destination, create connection */ | 416 | /* We are the destination, create connection */ |
417 | struct CadetConnection *cc; | ||
320 | struct CadetPeerPath *path; | 418 | struct CadetPeerPath *path; |
321 | struct CadetPeer *origin; | 419 | struct CadetPeer *origin; |
322 | 420 | ||
421 | cc = GNUNET_CONTAINER_multishortmap_get (connections, | ||
422 | &msg->cid.connection_of_tunnel); | ||
423 | if (NULL != cc) | ||
424 | { | ||
425 | /* Duplicate CREATE, likely our ACK got lost, retransmit the ACK! */ | ||
426 | GNUNET_break (0); // FIXME: not implemented! | ||
427 | return; | ||
428 | } | ||
429 | |||
323 | path = GCPP_get_path_from_route (path_length, | 430 | path = GCPP_get_path_from_route (path_length, |
324 | pids); | 431 | pids); |
325 | origin = GCP_get (&pids[0], | 432 | origin = GCP_get (&pids[0], |
@@ -327,35 +434,37 @@ handle_connection_create (void *cls, | |||
327 | GCT_add_inbound_connection (GCT_create_tunnel (origin), | 434 | GCT_add_inbound_connection (GCT_create_tunnel (origin), |
328 | &msg->cid, | 435 | &msg->cid, |
329 | path); | 436 | path); |
330 | |||
331 | return; | 437 | return; |
332 | } | 438 | } |
333 | /* We are merely a hop on the way, check if we can support the route */ | 439 | /* We are merely a hop on the way, check if we can support the route */ |
334 | next = GCP_get (&pids[off + 1], | 440 | next = GCP_get (&pids[off + 1], |
335 | GNUNET_NO); | 441 | GNUNET_NO); |
336 | if ( (NULL == next) || | 442 | if ( (NULL == next) || |
337 | (NULL == GCP_get_mq (next)) ) | 443 | (GNUNET_NO == GCP_has_core_connection (next)) ) |
338 | { | 444 | { |
339 | /* unworkable, send back BROKEN notification */ | 445 | /* unworkable, send back BROKEN notification */ |
340 | send_broken (sender, | 446 | struct GNUNET_MQ_Envelope *env; |
341 | &msg->cid, | 447 | struct GNUNET_CADET_ConnectionBrokenMessage *bm; |
342 | &pids[off + 1], | 448 | |
343 | &my_full_id); | 449 | env = GNUNET_MQ_msg (bm, |
450 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); | ||
451 | bm->cid = msg->cid; | ||
452 | bm->peer1 = pids[off + 1]; | ||
453 | bm->peer2 = my_full_id; | ||
454 | GCP_send_ooo (sender, | ||
455 | env); | ||
344 | return; | 456 | return; |
345 | } | 457 | } |
346 | 458 | ||
347 | /* Workable route, create routing entry */ | 459 | /* Workable route, create routing entry */ |
348 | route = GNUNET_new (struct CadetRoute); | 460 | route = GNUNET_new (struct CadetRoute); |
349 | route->cid = msg->cid; | 461 | route->cid = msg->cid; |
350 | route->prev_mqm = GCP_request_mq (sender, | 462 | dir_init (&route->prev, |
351 | &mqm_cr_destroy_prev, | 463 | route, |
352 | route); | 464 | sender); |
353 | route->next_mqm = GCP_request_mq (next, | 465 | dir_init (&route->next, |
354 | &mqm_cr_destroy_next, | 466 | route, |
355 | route); | 467 | next); |
356 | route->prev_hop = sender; | ||
357 | route->next_hop = next; | ||
358 | GNUNET_assert ((1|2) == route->up); | ||
359 | GNUNET_assert (GNUNET_OK == | 468 | GNUNET_assert (GNUNET_OK == |
360 | GNUNET_CONTAINER_multishortmap_put (routes, | 469 | GNUNET_CONTAINER_multishortmap_put (routes, |
361 | &route->cid.connection_of_tunnel, | 470 | &route->cid.connection_of_tunnel, |
@@ -371,8 +480,8 @@ handle_connection_create (void *cls, | |||
371 | * @param msg Message itself. | 480 | * @param msg Message itself. |
372 | */ | 481 | */ |
373 | static void | 482 | static void |
374 | handle_connection_ack (void *cls, | 483 | handle_connection_create_ack (void *cls, |
375 | const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) | 484 | const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) |
376 | { | 485 | { |
377 | struct CadetPeer *peer = cls; | 486 | struct CadetPeer *peer = cls; |
378 | struct CadetConnection *cc; | 487 | struct CadetConnection *cc; |
@@ -734,7 +843,7 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c) | |||
734 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, | 843 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, |
735 | struct GNUNET_CADET_ConnectionCreateMessage, | 844 | struct GNUNET_CADET_ConnectionCreateMessage, |
736 | NULL), | 845 | NULL), |
737 | GNUNET_MQ_hd_fixed_size (connection_ack, | 846 | GNUNET_MQ_hd_fixed_size (connection_create_ack, |
738 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, | 847 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, |
739 | struct GNUNET_CADET_ConnectionCreateMessageAckMessage, | 848 | struct GNUNET_CADET_ConnectionCreateMessageAckMessage, |
740 | NULL), | 849 | NULL), |