aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet-new_core.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-01-19 15:52:22 +0100
committerChristian Grothoff <christian@grothoff.org>2017-01-19 15:52:22 +0100
commitbc43d8978d8695ff97cc67b65c29769e9c7f8f0e (patch)
tree94d1428dc2b4e70d87d6521b487a92c6fc139827 /src/cadet/gnunet-service-cadet-new_core.c
parentbd3147503e27ddefcb6ba0dcb99c2b32947622a4 (diff)
downloadgnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.tar.gz
gnunet-bc43d8978d8695ff97cc67b65c29769e9c7f8f0e.zip
much work on connection/route/peer-level queue management
Diffstat (limited to 'src/cadet/gnunet-service-cadet-new_core.c')
-rw-r--r--src/cadet/gnunet-service-cadet-new_core.c267
1 files changed, 188 insertions, 79 deletions
diff --git a/src/cadet/gnunet-service-cadet-new_core.c b/src/cadet/gnunet-service-cadet-new_core.c
index 3d8406dc9..9ce4418de 100644
--- a/src/cadet/gnunet-service-cadet-new_core.c
+++ b/src/cadet/gnunet-service-cadet-new_core.c
@@ -25,6 +25,12 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 * 26 *
27 * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) 27 * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
28 *
29 * TODO:
30 * - pass encrypted ACK to connection (!)
31 * - given BROKEN messages, destroy paths (?)
32 * -
33 * - handle POLL (if needed)
28 */ 34 */
29#include "platform.h" 35#include "platform.h"
30#include "gnunet-service-cadet-new_core.h" 36#include "gnunet-service-cadet-new_core.h"
@@ -35,6 +41,54 @@
35#include "gnunet_core_service.h" 41#include "gnunet_core_service.h"
36#include "cadet_protocol.h" 42#include "cadet_protocol.h"
37 43
44/**
45 * Number of messages we are willing to buffer per route.
46 */
47#define ROUTE_BUFFER_SIZE 8
48
49
50/**
51 * Information we keep per direction for a route.
52 */
53struct RouteDirection
54{
55 /**
56 * Target peer.
57 */
58 struct CadetPeer *hop;
59
60 /**
61 * Route this direction is part of.
62 */
63 struct CadetRoute *my_route;
64
65 /**
66 * Message queue manager for @e hop.
67 */
68 struct GCP_MessageQueueManager *mqm;
69
70 /**
71 * Cyclic message buffer to @e hop.
72 */
73 struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
74
75 /**
76 * Next write offset to use to append messages to @e out_buffer.
77 */
78 unsigned int out_wpos;
79
80 /**
81 * Next read offset to use to retrieve messages from @e out_buffer.
82 */
83 unsigned int out_rpos;
84
85 /**
86 * Is @e mqm currently ready for transmission?
87 */
88 int is_ready;
89
90};
91
38 92
39/** 93/**
40 * Description of a segment of a `struct CadetConnection` at the 94 * Description of a segment of a `struct CadetConnection` at the
@@ -48,24 +102,14 @@ struct CadetRoute
48{ 102{
49 103
50 /** 104 /**
51 * Previous hop on this route. 105 * Information about the next hop on this route.
52 */ 106 */
53 struct CadetPeer *prev_hop; 107 struct RouteDirection next;
54 108
55 /** 109 /**
56 * Next hop on this route. 110 * Information about the previous hop on this route.
57 */ 111 */
58 struct CadetPeer *next_hop; 112 struct RouteDirection prev;
59
60 /**
61 * Message queue notifications for @e prev_hop.
62 */
63 struct GCP_MessageQueueManager *prev_mqm;
64
65 /**
66 * Message queue notifications for @e next_hop.
67 */
68 struct GCP_MessageQueueManager *next_mqm;
69 113
70 /** 114 /**
71 * Unique identifier for the connection that uses this route. 115 * Unique identifier for the connection that uses this route.
@@ -77,12 +121,6 @@ struct CadetRoute
77 */ 121 */
78 struct GNUNET_TIME_Absolute last_use; 122 struct GNUNET_TIME_Absolute last_use;
79 123
80 /**
81 * Counter, used to verify that both MQs are up when the route is
82 * initialized.
83 */
84 unsigned int up;
85
86}; 124};
87 125
88 126
@@ -125,6 +163,8 @@ route_message (struct CadetPeer *prev,
125 const struct GNUNET_MessageHeader *msg) 163 const struct GNUNET_MessageHeader *msg)
126{ 164{
127 struct CadetRoute *route; 165 struct CadetRoute *route;
166 struct RouteDirection *dir;
167 struct GNUNET_MQ_Envelope *env;
128 168
129 route = get_route (cid); 169 route = get_route (cid);
130 if (NULL == route) 170 if (NULL == route)
@@ -136,13 +176,33 @@ route_message (struct CadetPeer *prev,
136 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); 176 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
137 bm->cid = *cid; 177 bm->cid = *cid;
138 bm->peer1 = my_full_id; 178 bm->peer1 = my_full_id;
139 GCP_send (prev, 179 GCP_send_ooo (prev,
140 env); 180 env);
141 return; 181 return;
142 } 182 }
143 /* FIXME: support round-robin queue management here somewhere! */ 183 dir = (prev == route->prev.hop) ? &route->next : &route->prev;
144 GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop, 184 if (GNUNET_YES == dir->is_ready)
145 GNUNET_MQ_msg_copy (msg)); 185 {
186 dir->is_ready = GNUNET_NO;
187 GCP_send (dir->mqm,
188 GNUNET_MQ_msg_copy (msg));
189 return;
190 }
191 env = dir->out_buffer[dir->out_wpos];
192 if (NULL != env)
193 {
194 /* Queue full, drop earliest message in queue */
195 GNUNET_assert (dir->out_rpos == dir->out_wpos);
196 GNUNET_MQ_discard (env);
197 dir->out_rpos++;
198 if (ROUTE_BUFFER_SIZE == dir->out_rpos)
199 dir->out_rpos = 0;
200 }
201 env = GNUNET_MQ_msg_copy (msg);
202 dir->out_buffer[dir->out_wpos] = env;
203 dir->out_wpos++;
204 if (ROUTE_BUFFER_SIZE == dir->out_wpos)
205 dir->out_wpos = 0;
146} 206}
147 207
148 208
@@ -170,6 +230,29 @@ check_connection_create (void *cls,
170 230
171 231
172/** 232/**
233 * Free internal data of a route direction.
234 *
235 * @param dir direction to destroy (do NOT free memory of 'dir' itself)
236 */
237static void
238destroy_direction (struct RouteDirection *dir)
239{
240 for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
241 if (NULL != dir->out_buffer[i])
242 {
243 GNUNET_MQ_discard (dir->out_buffer[i]);
244 dir->out_buffer[i] = NULL;
245 }
246 if (NULL != dir->mqm)
247 {
248 GCP_request_mq_cancel (dir->mqm,
249 NULL);
250 dir->mqm = NULL;
251 }
252}
253
254
255/**
173 * Destroy our state for @a route. 256 * Destroy our state for @a route.
174 * 257 *
175 * @param route route to destroy 258 * @param route route to destroy
@@ -177,8 +260,8 @@ check_connection_create (void *cls,
177static void 260static void
178destroy_route (struct CadetRoute *route) 261destroy_route (struct CadetRoute *route)
179{ 262{
180 GCP_request_mq_cancel (route->next_mqm); 263 destroy_direction (&route->prev);
181 GCP_request_mq_cancel (route->prev_mqm); 264 destroy_direction (&route->next);
182 GNUNET_free (route); 265 GNUNET_free (route);
183} 266}
184 267
@@ -192,7 +275,7 @@ destroy_route (struct CadetRoute *route)
192 * @param peer2 another one of the peers where a link is broken 275 * @param peer2 another one of the peers where a link is broken
193 */ 276 */
194static void 277static void
195send_broken (struct CadetPeer *target, 278send_broken (struct RouteDirection *target,
196 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, 279 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
197 const struct GNUNET_PeerIdentity *peer1, 280 const struct GNUNET_PeerIdentity *peer1,
198 const struct GNUNET_PeerIdentity *peer2) 281 const struct GNUNET_PeerIdentity *peer2)
@@ -207,8 +290,9 @@ send_broken (struct CadetPeer *target,
207 bm->peer1 = *peer1; 290 bm->peer1 = *peer1;
208 if (NULL != peer2) 291 if (NULL != peer2)
209 bm->peer2 = *peer2; 292 bm->peer2 = *peer2;
210 GCP_send (target, 293 GCP_request_mq_cancel (target->mqm,
211 env); 294 env);
295 target->mqm = NULL;
212} 296}
213 297
214 298
@@ -218,53 +302,64 @@ send_broken (struct CadetPeer *target,
218 * be called immediately when we register, and then again 302 * be called immediately when we register, and then again
219 * later if the connection ever goes down. 303 * later if the connection ever goes down.
220 * 304 *
221 * @param cls the `struct CadetRoute` 305 * @param cls the `struct RouteDirection`
222 * @param mq the message queue, NULL if connection went down 306 * @param available #GNUNET_YES if sending is now possible,
307 * #GNUNET_NO if sending is no longer possible
308 * #GNUNET_SYSERR if sending is no longer possible
309 * and the last envelope was discarded
223 */ 310 */
224static void 311static void
225mqm_cr_destroy_prev (void *cls, 312dir_ready_cb (void *cls,
226 struct GNUNET_MQ_Handle *mq) 313 int ready)
227{ 314{
228 struct CadetRoute *route = cls; 315 struct RouteDirection *dir = cls;
316 struct CadetRoute *route = dir->my_route;
317 struct RouteDirection *odir;
229 318
230 if (NULL != mq) 319 if (GNUNET_YES == ready)
231 { 320 {
232 route->up |= 1; 321 struct GNUNET_MQ_Envelope *env;
322
323 dir->is_ready = GNUNET_YES;
324 if (NULL != (env = dir->out_buffer[dir->out_rpos]))
325 {
326 dir->out_buffer[dir->out_rpos] = NULL;
327 dir->out_rpos++;
328 if (ROUTE_BUFFER_SIZE == dir->out_rpos)
329 dir->out_rpos = 0;
330 dir->is_ready = GNUNET_NO;
331 GCP_send (dir->mqm,
332 env);
333 }
233 return; 334 return;
234 } 335 }
235 send_broken (route->next_hop, 336 odir = (dir == &route->next) ? &route->prev : &route->next;
337 send_broken (&route->next,
236 &route->cid, 338 &route->cid,
237 GCP_get_id (route->prev_hop), 339 GCP_get_id (odir->hop),
238 &my_full_id); 340 &my_full_id);
239 destroy_route (route); 341 destroy_route (route);
240} 342}
241 343
242 344
243/** 345/**
244 * Function called when the message queue to the previous hop 346 * Initialize one of the directions of a route.
245 * becomes available/unavailable. We expect this function to
246 * be called immediately when we register, and then again
247 * later if the connection ever goes down.
248 * 347 *
249 * @param cls the `struct CadetRoute` 348 * @param route route the direction belongs to
250 * @param mq the message queue, NULL if connection went down 349 * @param dir direction to initialize
350 * @param hop next hop on in the @a dir
251 */ 351 */
252static void 352static void
253mqm_cr_destroy_next (void *cls, 353dir_init (struct RouteDirection *dir,
254 struct GNUNET_MQ_Handle *mq) 354 struct CadetRoute *route,
355 struct CadetPeer *hop)
255{ 356{
256 struct CadetRoute *route = cls; 357 dir->hop = hop;
257 358 dir->my_route = route;
258 if (NULL != mq) 359 dir->mqm = GCP_request_mq (hop,
259 { 360 &dir_ready_cb,
260 route->up |= 2; 361 dir);
261 return; 362 GNUNET_assert (GNUNET_YES == dir->is_ready);
262 }
263 send_broken (route->prev_hop,
264 &route->cid,
265 GCP_get_id (route->next_hop),
266 &my_full_id);
267 destroy_route (route);
268} 363}
269 364
270 365
@@ -310,16 +405,28 @@ handle_connection_create (void *cls,
310 if (NULL != 405 if (NULL !=
311 get_route (&msg->cid)) 406 get_route (&msg->cid))
312 { 407 {
313 /* CID not chosen at random, collides */ 408 /* Duplicate CREATE, pass it on, previous one might have been lost! */
314 GNUNET_break_op (0); 409 route_message (sender,
410 &msg->cid,
411 &msg->header);
315 return; 412 return;
316 } 413 }
317 if (off == path_length - 1) 414 if (off == path_length - 1)
318 { 415 {
319 /* We are the destination, create connection */ 416 /* We are the destination, create connection */
417 struct CadetConnection *cc;
320 struct CadetPeerPath *path; 418 struct CadetPeerPath *path;
321 struct CadetPeer *origin; 419 struct CadetPeer *origin;
322 420
421 cc = GNUNET_CONTAINER_multishortmap_get (connections,
422 &msg->cid.connection_of_tunnel);
423 if (NULL != cc)
424 {
425 /* Duplicate CREATE, likely our ACK got lost, retransmit the ACK! */
426 GNUNET_break (0); // FIXME: not implemented!
427 return;
428 }
429
323 path = GCPP_get_path_from_route (path_length, 430 path = GCPP_get_path_from_route (path_length,
324 pids); 431 pids);
325 origin = GCP_get (&pids[0], 432 origin = GCP_get (&pids[0],
@@ -327,35 +434,37 @@ handle_connection_create (void *cls,
327 GCT_add_inbound_connection (GCT_create_tunnel (origin), 434 GCT_add_inbound_connection (GCT_create_tunnel (origin),
328 &msg->cid, 435 &msg->cid,
329 path); 436 path);
330
331 return; 437 return;
332 } 438 }
333 /* We are merely a hop on the way, check if we can support the route */ 439 /* We are merely a hop on the way, check if we can support the route */
334 next = GCP_get (&pids[off + 1], 440 next = GCP_get (&pids[off + 1],
335 GNUNET_NO); 441 GNUNET_NO);
336 if ( (NULL == next) || 442 if ( (NULL == next) ||
337 (NULL == GCP_get_mq (next)) ) 443 (GNUNET_NO == GCP_has_core_connection (next)) )
338 { 444 {
339 /* unworkable, send back BROKEN notification */ 445 /* unworkable, send back BROKEN notification */
340 send_broken (sender, 446 struct GNUNET_MQ_Envelope *env;
341 &msg->cid, 447 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
342 &pids[off + 1], 448
343 &my_full_id); 449 env = GNUNET_MQ_msg (bm,
450 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
451 bm->cid = msg->cid;
452 bm->peer1 = pids[off + 1];
453 bm->peer2 = my_full_id;
454 GCP_send_ooo (sender,
455 env);
344 return; 456 return;
345 } 457 }
346 458
347 /* Workable route, create routing entry */ 459 /* Workable route, create routing entry */
348 route = GNUNET_new (struct CadetRoute); 460 route = GNUNET_new (struct CadetRoute);
349 route->cid = msg->cid; 461 route->cid = msg->cid;
350 route->prev_mqm = GCP_request_mq (sender, 462 dir_init (&route->prev,
351 &mqm_cr_destroy_prev, 463 route,
352 route); 464 sender);
353 route->next_mqm = GCP_request_mq (next, 465 dir_init (&route->next,
354 &mqm_cr_destroy_next, 466 route,
355 route); 467 next);
356 route->prev_hop = sender;
357 route->next_hop = next;
358 GNUNET_assert ((1|2) == route->up);
359 GNUNET_assert (GNUNET_OK == 468 GNUNET_assert (GNUNET_OK ==
360 GNUNET_CONTAINER_multishortmap_put (routes, 469 GNUNET_CONTAINER_multishortmap_put (routes,
361 &route->cid.connection_of_tunnel, 470 &route->cid.connection_of_tunnel,
@@ -371,8 +480,8 @@ handle_connection_create (void *cls,
371 * @param msg Message itself. 480 * @param msg Message itself.
372 */ 481 */
373static void 482static void
374handle_connection_ack (void *cls, 483handle_connection_create_ack (void *cls,
375 const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) 484 const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
376{ 485{
377 struct CadetPeer *peer = cls; 486 struct CadetPeer *peer = cls;
378 struct CadetConnection *cc; 487 struct CadetConnection *cc;
@@ -734,7 +843,7 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
734 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 843 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
735 struct GNUNET_CADET_ConnectionCreateMessage, 844 struct GNUNET_CADET_ConnectionCreateMessage,
736 NULL), 845 NULL),
737 GNUNET_MQ_hd_fixed_size (connection_ack, 846 GNUNET_MQ_hd_fixed_size (connection_create_ack,
738 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, 847 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
739 struct GNUNET_CADET_ConnectionCreateMessageAckMessage, 848 struct GNUNET_CADET_ConnectionCreateMessageAckMessage,
740 NULL), 849 NULL),