diff options
author | Bart Polot <bart@net.in.tum.de> | 2016-09-20 01:21:59 +0000 |
---|---|---|
committer | Bart Polot <bart@net.in.tum.de> | 2016-09-20 01:21:59 +0000 |
commit | b4d5f474eef10017a470dccb01dae86c32bd5ddb (patch) | |
tree | 4b97bb46f4ab15c732e284ef0b275cc0dbc3173a /src/cadet/gnunet-service-cadet_peer.c | |
parent | 506899aa2be2b4d5dc09c1740969c28ddf43c82d (diff) | |
download | gnunet-b4d5f474eef10017a470dccb01dae86c32bd5ddb.tar.gz gnunet-b4d5f474eef10017a470dccb01dae86c32bd5ddb.zip |
Port CADET to CORE MQ API
Diffstat (limited to 'src/cadet/gnunet-service-cadet_peer.c')
-rw-r--r-- | src/cadet/gnunet-service-cadet_peer.c | 1404 |
1 files changed, 415 insertions, 989 deletions
diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c index 64d9168fd..5ccd8f014 100644 --- a/src/cadet/gnunet-service-cadet_peer.c +++ b/src/cadet/gnunet-service-cadet_peer.c | |||
@@ -44,164 +44,145 @@ | |||
44 | /******************************** STRUCTS **********************************/ | 44 | /******************************** STRUCTS **********************************/ |
45 | /******************************************************************************/ | 45 | /******************************************************************************/ |
46 | 46 | ||
47 | |||
47 | /** | 48 | /** |
48 | * Struct containing info about a queued transmission to this peer | 49 | * Struct containing all information regarding a given peer |
49 | */ | 50 | */ |
50 | struct CadetPeerQueue | 51 | struct CadetPeer |
51 | { | 52 | { |
52 | /** | 53 | /** |
53 | * DLL next | 54 | * ID of the peer |
54 | */ | 55 | */ |
55 | struct CadetPeerQueue *next; | 56 | GNUNET_PEER_Id id; |
56 | 57 | ||
57 | /** | 58 | /** |
58 | * DLL previous | 59 | * Last time we heard from this peer |
59 | */ | 60 | */ |
60 | struct CadetPeerQueue *prev; | 61 | struct GNUNET_TIME_Absolute last_contact; |
61 | 62 | ||
62 | /** | 63 | /** |
63 | * Peer this transmission is directed to. | 64 | * Paths to reach the peer, ordered by ascending hop count |
64 | */ | 65 | */ |
65 | struct CadetPeer *peer; | 66 | struct CadetPeerPath *path_head; |
66 | 67 | ||
67 | /** | 68 | /** |
68 | * Connection this message belongs to. | 69 | * Paths to reach the peer, ordered by ascending hop count |
69 | */ | 70 | */ |
70 | struct CadetConnection *c; | 71 | struct CadetPeerPath *path_tail; |
71 | 72 | ||
72 | /** | 73 | /** |
73 | * Is FWD in c? | 74 | * Handle to stop the DHT search for paths to this peer |
74 | */ | 75 | */ |
75 | int fwd; | 76 | struct GCD_search_handle *search_h; |
76 | 77 | ||
77 | /** | 78 | /** |
78 | * Pointer to info stucture used as cls. | 79 | * Handle to stop the DHT search for paths to this peer |
79 | */ | 80 | */ |
80 | void *cls; | 81 | struct GNUNET_SCHEDULER_Task *search_delayed; |
81 | 82 | ||
82 | /** | 83 | /** |
83 | * Type of message | 84 | * Tunnel to this peer, if any. |
84 | */ | 85 | */ |
85 | uint16_t type; | 86 | struct CadetTunnel *tunnel; |
86 | 87 | ||
87 | /** | 88 | /** |
88 | * Type of message | 89 | * Connections that go through this peer; indexed by tid. |
89 | */ | 90 | */ |
90 | uint16_t payload_type; | 91 | struct GNUNET_CONTAINER_MultiHashMap *connections; |
91 | 92 | ||
92 | /** | 93 | /** |
93 | * Type of message | 94 | * Handle for core transmissions. |
94 | */ | 95 | */ |
95 | uint32_t payload_id; | 96 | struct GNUNET_MQ_Handle *core_mq; |
96 | 97 | ||
97 | /** | 98 | /** |
98 | * Size of the message | 99 | * How many messages are in the queue to this peer. |
99 | */ | 100 | */ |
100 | size_t size; | 101 | unsigned int queue_n; |
101 | 102 | ||
102 | /** | 103 | /** |
103 | * Set when this message starts waiting for CORE. | 104 | * Hello message. |
104 | */ | 105 | */ |
105 | struct GNUNET_TIME_Absolute start_waiting; | 106 | struct GNUNET_HELLO_Message* hello; |
106 | 107 | ||
107 | /** | 108 | /** |
108 | * Function to call on sending. | 109 | * Handle to us offering the HELLO to the transport. |
109 | */ | 110 | */ |
110 | GCP_sent cont; | 111 | struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer; |
111 | 112 | ||
112 | /** | 113 | /** |
113 | * Closure for callback. | 114 | * Handle to our ATS request asking ATS to suggest an address |
115 | * to TRANSPORT for this peer (to establish a direct link). | ||
114 | */ | 116 | */ |
115 | void *cont_cls; | 117 | struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion; |
118 | |||
116 | }; | 119 | }; |
117 | 120 | ||
118 | 121 | ||
119 | /** | 122 | /** |
120 | * Struct containing all information regarding a given peer | 123 | * Information about a queued message on the peer level. |
121 | */ | 124 | */ |
122 | struct CadetPeer | 125 | struct CadetPeerQueue { |
123 | { | ||
124 | /** | ||
125 | * ID of the peer | ||
126 | */ | ||
127 | GNUNET_PEER_Id id; | ||
128 | 126 | ||
129 | /** | 127 | /** |
130 | * Last time we heard from this peer | 128 | * Envelope to cancel message before MQ sends it. |
131 | */ | ||
132 | struct GNUNET_TIME_Absolute last_contact; | ||
133 | |||
134 | /** | ||
135 | * Paths to reach the peer, ordered by ascending hop count | ||
136 | */ | 129 | */ |
137 | struct CadetPeerPath *path_head; | 130 | struct GNUNET_MQ_Envelope *env; |
138 | 131 | ||
139 | /** | 132 | /** |
140 | * Paths to reach the peer, ordered by ascending hop count | 133 | * Peer (neighbor) this message is being sent to. |
141 | */ | 134 | */ |
142 | struct CadetPeerPath *path_tail; | 135 | struct CadetPeer *peer; |
143 | |||
144 | /** | ||
145 | * Handle to stop the DHT search for paths to this peer | ||
146 | */ | ||
147 | struct GCD_search_handle *search_h; | ||
148 | |||
149 | /** | ||
150 | * Handle to stop the DHT search for paths to this peer | ||
151 | */ | ||
152 | struct GNUNET_SCHEDULER_Task *search_delayed; | ||
153 | 136 | ||
154 | /** | 137 | /** |
155 | * Tunnel to this peer, if any. | 138 | * Continuation to call to notify higher layers about message sent. |
156 | */ | 139 | */ |
157 | struct CadetTunnel *tunnel; | 140 | GCP_sent cont; |
158 | 141 | ||
159 | /** | 142 | /** |
160 | * Connections that go through this peer; indexed by tid. | 143 | * Closure for @a cont. |
161 | */ | 144 | */ |
162 | struct GNUNET_CONTAINER_MultiHashMap *connections; | 145 | void *cont_cls; |
163 | 146 | ||
164 | /** | 147 | /** |
165 | * Handle for queued transmissions | 148 | * Time when message was queued for sending. |
166 | */ | 149 | */ |
167 | struct GNUNET_CORE_TransmitHandle *core_transmit; | 150 | struct GNUNET_TIME_Absolute queue_timestamp; |
168 | 151 | ||
169 | /** | 152 | /** |
170 | * Timestamp | 153 | * #GNUNET_YES if message was management traffic (POLL, ACK, ...). |
171 | */ | 154 | */ |
172 | struct GNUNET_TIME_Absolute tmt_time; | 155 | int management_traffic; |
173 | 156 | ||
174 | /** | 157 | /** |
175 | * Transmission queue to core DLL head | 158 | * Message type. |
176 | */ | 159 | */ |
177 | struct CadetPeerQueue *queue_head; | 160 | uint16_t type; |
178 | 161 | ||
179 | /** | 162 | /** |
180 | * Transmission queue to core DLL tail | 163 | * Message size. |
181 | */ | 164 | */ |
182 | struct CadetPeerQueue *queue_tail; | 165 | uint16_t size; |
183 | 166 | ||
184 | /** | 167 | /** |
185 | * How many messages are in the queue to this peer. | 168 | * Type of the message's payload, if it was encrypted data. |
186 | */ | 169 | */ |
187 | unsigned int queue_n; | 170 | uint16_t payload_type; |
188 | 171 | ||
189 | /** | 172 | /** |
190 | * Hello message. | 173 | *ID of the payload (PID, ACK #, ...). |
191 | */ | 174 | */ |
192 | struct GNUNET_HELLO_Message* hello; | 175 | uint16_t payload_id; |
193 | 176 | ||
194 | /** | 177 | /** |
195 | * Handle to us offering the HELLO to the transport. | 178 | * Connection this message was sent on. |
196 | */ | 179 | */ |
197 | struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer; | 180 | struct CadetConnection *c; |
198 | 181 | ||
199 | /** | 182 | /** |
200 | * Handle to our ATS request asking ATS to suggest an address | 183 | * Direction in @a c this message was send on (#GNUNET_YES = FWD). |
201 | * to TRANSPORT for this peer (to establish a direct link). | ||
202 | */ | 184 | */ |
203 | struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion; | 185 | int c_fwd; |
204 | |||
205 | }; | 186 | }; |
206 | 187 | ||
207 | 188 | ||
@@ -261,98 +242,6 @@ static int in_shutdown; | |||
261 | 242 | ||
262 | 243 | ||
263 | /******************************************************************************/ | 244 | /******************************************************************************/ |
264 | /***************************** DEBUG *********************************/ | ||
265 | /******************************************************************************/ | ||
266 | |||
267 | /** | ||
268 | * Log all kinds of info about the queueing status of a peer. | ||
269 | * | ||
270 | * @param p Peer whose queue to show. | ||
271 | * @param level Error level to use for logging. | ||
272 | */ | ||
273 | static void | ||
274 | queue_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level) | ||
275 | { | ||
276 | struct GNUNET_TIME_Relative core_wait_time; | ||
277 | struct CadetPeerQueue *q; | ||
278 | int do_log; | ||
279 | |||
280 | do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), | ||
281 | "cadet-p2p", | ||
282 | __FILE__, __FUNCTION__, __LINE__); | ||
283 | if (0 == do_log) | ||
284 | return; | ||
285 | |||
286 | LOG2 (level, "QQQ Message queue towards %s\n", GCP_2s (p)); | ||
287 | LOG2 (level, "QQQ queue length: %u\n", p->queue_n); | ||
288 | LOG2 (level, "QQQ core tmt rdy: %p\n", p->core_transmit); | ||
289 | if (NULL != p->core_transmit) | ||
290 | { | ||
291 | core_wait_time = GNUNET_TIME_absolute_get_duration (p->tmt_time); | ||
292 | LOG2 (level, "QQQ core called %s ago\n", | ||
293 | GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO)); | ||
294 | } | ||
295 | for (q = p->queue_head; NULL != q; q = q->next) | ||
296 | { | ||
297 | LOG2 (level, "QQQ - %s %s on %s\n", | ||
298 | GC_m2s (q->type), GC_f2s (q->fwd), GCC_2s (q->c)); | ||
299 | LOG2 (level, "QQQ payload %s, %u\n", | ||
300 | GC_m2s (q->payload_type), q->payload_id); | ||
301 | LOG2 (level, "QQQ size: %u bytes\n", q->size); | ||
302 | } | ||
303 | |||
304 | LOG2 (level, "QQQ End queue towards %s\n", GCP_2s (p)); | ||
305 | } | ||
306 | |||
307 | |||
308 | /** | ||
309 | * Log all kinds of info about a peer. | ||
310 | * | ||
311 | * @param peer Peer. | ||
312 | */ | ||
313 | void | ||
314 | GCP_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level) | ||
315 | { | ||
316 | struct CadetPeerPath *path; | ||
317 | unsigned int conns; | ||
318 | int do_log; | ||
319 | |||
320 | do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), | ||
321 | "cadet-p2p", | ||
322 | __FILE__, __FUNCTION__, __LINE__); | ||
323 | if (0 == do_log) | ||
324 | return; | ||
325 | |||
326 | if (NULL == p) | ||
327 | { | ||
328 | LOG2 (level, "PPP DEBUG PEER NULL\n"); | ||
329 | return; | ||
330 | } | ||
331 | |||
332 | LOG2 (level, "PPP DEBUG PEER %s\n", GCP_2s (p)); | ||
333 | LOG2 (level, "PPP last contact %s\n", | ||
334 | GNUNET_STRINGS_absolute_time_to_string (p->last_contact)); | ||
335 | for (path = p->path_head; NULL != path; path = path->next) | ||
336 | { | ||
337 | char *s; | ||
338 | |||
339 | s = path_2s (path); | ||
340 | LOG2 (level, "PPP path: %s\n", s); | ||
341 | GNUNET_free (s); | ||
342 | } | ||
343 | |||
344 | LOG2 (level, "PPP core transmit handle %p\n", p->core_transmit); | ||
345 | LOG2 (level, "PPP DHT GET handle %p\n", p->search_h); | ||
346 | conns = 0; | ||
347 | if (NULL != p->connections) | ||
348 | conns += GNUNET_CONTAINER_multihashmap_size (p->connections); | ||
349 | LOG2 (level, "PPP # connections over link to peer: %u\n", conns); | ||
350 | queue_debug (p, level); | ||
351 | LOG2 (level, "PPP DEBUG END\n"); | ||
352 | } | ||
353 | |||
354 | |||
355 | /******************************************************************************/ | ||
356 | /***************************** CORE HELPERS *********************************/ | 245 | /***************************** CORE HELPERS *********************************/ |
357 | /******************************************************************************/ | 246 | /******************************************************************************/ |
358 | 247 | ||
@@ -415,12 +304,16 @@ pop_direct_path (struct CadetPeer *peer) | |||
415 | /** | 304 | /** |
416 | * Method called whenever a given peer connects. | 305 | * Method called whenever a given peer connects. |
417 | * | 306 | * |
418 | * @param cls closure | 307 | * @param cls Core closure (unused). |
419 | * @param peer peer identity this notification is about | 308 | * @param peer Peer identity this notification is about |
309 | * @param mq Message Queue to this peer. | ||
310 | * | ||
311 | * @return Internal closure for handlers (CadetPeer struct). | ||
420 | */ | 312 | */ |
421 | static void | 313 | static void * |
422 | core_connect (void *cls, | 314 | core_connect_handler (void *cls, |
423 | const struct GNUNET_PeerIdentity *peer) | 315 | const struct GNUNET_PeerIdentity *peer, |
316 | struct GNUNET_MQ_Handle *mq) | ||
424 | { | 317 | { |
425 | struct CadetPeer *neighbor; | 318 | struct CadetPeer *neighbor; |
426 | struct CadetPeerPath *path; | 319 | struct CadetPeerPath *path; |
@@ -431,6 +324,8 @@ core_connect (void *cls, | |||
431 | sizeof (own_id), | 324 | sizeof (own_id), |
432 | "%s", | 325 | "%s", |
433 | GNUNET_i2s (&my_full_id)); | 326 | GNUNET_i2s (&my_full_id)); |
327 | |||
328 | /* Save a path to the neighbor */ | ||
434 | neighbor = GCP_get (peer, GNUNET_YES); | 329 | neighbor = GCP_get (peer, GNUNET_YES); |
435 | if (myid == neighbor->id) | 330 | if (myid == neighbor->id) |
436 | { | 331 | { |
@@ -448,11 +343,14 @@ core_connect (void *cls, | |||
448 | path = path_new (2); | 343 | path = path_new (2); |
449 | path->peers[1] = neighbor->id; | 344 | path->peers[1] = neighbor->id; |
450 | GNUNET_PEER_change_rc (neighbor->id, 1); | 345 | GNUNET_PEER_change_rc (neighbor->id, 1); |
346 | GNUNET_assert (NULL == neighbor->core_mq); | ||
347 | neighbor->core_mq = mq; | ||
451 | } | 348 | } |
452 | path->peers[0] = myid; | 349 | path->peers[0] = myid; |
453 | GNUNET_PEER_change_rc (myid, 1); | 350 | GNUNET_PEER_change_rc (myid, 1); |
454 | GCP_add_path (neighbor, path, GNUNET_YES); | 351 | GCP_add_path (neighbor, path, GNUNET_YES); |
455 | 352 | ||
353 | /* Create the connections hashmap */ | ||
456 | GNUNET_assert (NULL == neighbor->connections); | 354 | GNUNET_assert (NULL == neighbor->connections); |
457 | neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO); | 355 | neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO); |
458 | GNUNET_STATISTICS_update (stats, | 356 | GNUNET_STATISTICS_update (stats, |
@@ -462,42 +360,47 @@ core_connect (void *cls, | |||
462 | 360 | ||
463 | if ( (NULL != GCP_get_tunnel (neighbor)) && | 361 | if ( (NULL != GCP_get_tunnel (neighbor)) && |
464 | (0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) ) | 362 | (0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) ) |
363 | { | ||
465 | GCP_connect (neighbor); | 364 | GCP_connect (neighbor); |
365 | } | ||
466 | GCC_check_connections (); | 366 | GCC_check_connections (); |
367 | |||
368 | return neighbor; | ||
467 | } | 369 | } |
468 | 370 | ||
469 | 371 | ||
470 | /** | 372 | /** |
471 | * Method called whenever a peer disconnects. | 373 | * Method called whenever a peer disconnects. |
472 | * | 374 | * |
473 | * @param cls closure | 375 | * @param cls Core closure (unused). |
474 | * @param peer peer identity this notification is about | 376 | * @param peer Peer identity this notification is about. |
377 | * @param internal_cls Internal closure (CadetPeer struct). | ||
475 | */ | 378 | */ |
476 | static void | 379 | static void |
477 | core_disconnect (void *cls, | 380 | core_disconnect_handler (void *cls, |
478 | const struct GNUNET_PeerIdentity *peer) | 381 | const struct GNUNET_PeerIdentity *peer, |
382 | void *internal_cls) | ||
479 | { | 383 | { |
480 | struct CadetPeer *p; | 384 | struct CadetPeer *p = internal_cls; |
481 | struct CadetPeerPath *direct_path; | 385 | struct CadetPeerPath *direct_path; |
482 | char own_id[16]; | 386 | char own_id[16]; |
483 | 387 | ||
484 | GCC_check_connections (); | 388 | GCC_check_connections (); |
485 | strncpy (own_id, GNUNET_i2s (&my_full_id), 16); | 389 | strncpy (own_id, GNUNET_i2s (&my_full_id), 16); |
486 | own_id[15] = '\0'; | 390 | own_id[15] = '\0'; |
487 | p = GNUNET_CONTAINER_multipeermap_get (peers, peer); | ||
488 | if (NULL == p) | ||
489 | { | ||
490 | GNUNET_break (GNUNET_YES == in_shutdown); | ||
491 | return; | ||
492 | } | ||
493 | if (myid == p->id) | 391 | if (myid == p->id) |
392 | { | ||
494 | LOG (GNUNET_ERROR_TYPE_INFO, | 393 | LOG (GNUNET_ERROR_TYPE_INFO, |
495 | "DISCONNECTED %s (self)\n", | 394 | "DISCONNECTED %s (self)\n", |
496 | own_id); | 395 | own_id); |
396 | } | ||
497 | else | 397 | else |
398 | { | ||
498 | LOG (GNUNET_ERROR_TYPE_INFO, | 399 | LOG (GNUNET_ERROR_TYPE_INFO, |
499 | "DISCONNECTED %s <= %s\n", | 400 | "DISCONNECTED %s <= %s\n", |
500 | own_id, GNUNET_i2s (peer)); | 401 | own_id, GNUNET_i2s (peer)); |
402 | p->core_mq = NULL; | ||
403 | } | ||
501 | direct_path = pop_direct_path (p); | 404 | direct_path = pop_direct_path (p); |
502 | if (NULL != p->connections) | 405 | if (NULL != p->connections) |
503 | { | 406 | { |
@@ -507,12 +410,6 @@ core_disconnect (void *cls, | |||
507 | GNUNET_CONTAINER_multihashmap_destroy (p->connections); | 410 | GNUNET_CONTAINER_multihashmap_destroy (p->connections); |
508 | p->connections = NULL; | 411 | p->connections = NULL; |
509 | } | 412 | } |
510 | if (NULL != p->core_transmit) | ||
511 | { | ||
512 | GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit); | ||
513 | p->core_transmit = NULL; | ||
514 | p->tmt_time.abs_value_us = 0; | ||
515 | } | ||
516 | GNUNET_STATISTICS_update (stats, | 413 | GNUNET_STATISTICS_update (stats, |
517 | "# peers", | 414 | "# peers", |
518 | -1, | 415 | -1, |
@@ -522,167 +419,283 @@ core_disconnect (void *cls, | |||
522 | } | 419 | } |
523 | 420 | ||
524 | 421 | ||
422 | /******************************************************************************/ | ||
423 | /******************************************************************************/ | ||
424 | /******************************************************************************/ | ||
425 | /******************************************************************************/ | ||
426 | /******************************************************************************/ | ||
427 | |||
525 | /** | 428 | /** |
526 | * Functions to handle messages from core | 429 | * Check if the create_connection message has the appropriate size. |
527 | */ | 430 | * |
528 | static struct GNUNET_CORE_MessageHandler core_handlers[] = { | 431 | * @param cls Closure (unused). |
529 | {&GCC_handle_create, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0}, | 432 | * @param msg Message to check. |
530 | {&GCC_handle_confirm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, | 433 | * |
531 | sizeof (struct GNUNET_CADET_ConnectionACK)}, | 434 | * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise. |
532 | {&GCC_handle_broken, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, | 435 | */ |
533 | sizeof (struct GNUNET_CADET_ConnectionBroken)}, | 436 | static int |
534 | {&GCC_handle_destroy, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, | 437 | check_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg) |
535 | sizeof (struct GNUNET_CADET_ConnectionDestroy)}, | 438 | { |
536 | {&GCC_handle_ack, GNUNET_MESSAGE_TYPE_CADET_ACK, | 439 | uint16_t size; |
537 | sizeof (struct GNUNET_CADET_ACK)}, | ||
538 | {&GCC_handle_poll, GNUNET_MESSAGE_TYPE_CADET_POLL, | ||
539 | sizeof (struct GNUNET_CADET_Poll)}, | ||
540 | {&GCC_handle_kx, GNUNET_MESSAGE_TYPE_CADET_KX, 0}, | ||
541 | {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_AX, 0}, | ||
542 | {NULL, 0, 0} | ||
543 | }; | ||
544 | 440 | ||
441 | size = ntohs (msg->header.size); | ||
442 | if (size < sizeof (*msg)) | ||
443 | { | ||
444 | GNUNET_break_op (0); | ||
445 | return GNUNET_NO; | ||
446 | } | ||
447 | return GNUNET_YES; | ||
448 | } | ||
545 | 449 | ||
546 | /** | 450 | /** |
547 | * To be called on core init/fail. | 451 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE |
548 | * | 452 | * |
549 | * @param cls Closure (config) | 453 | * @param cls Closure (CadetPeer for neighbor that sent the message). |
550 | * @param identity the public identity of this peer | 454 | * @param msg Message itself. |
551 | */ | 455 | */ |
552 | static void | 456 | static void |
553 | core_init (void *cls, | 457 | handle_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg) |
554 | const struct GNUNET_PeerIdentity *identity) | ||
555 | { | 458 | { |
556 | const struct GNUNET_CONFIGURATION_Handle *c = cls; | 459 | struct CadetPeer *peer = cls; |
557 | static int i = 0; | 460 | GCC_handle_create (peer, msg); |
461 | } | ||
558 | 462 | ||
559 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); | 463 | |
560 | if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id))) | 464 | /** |
561 | { | 465 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK |
562 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n")); | 466 | * |
563 | LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (identity)); | 467 | * @param cls Closure (CadetPeer for neighbor that sent the message). |
564 | LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id)); | 468 | * @param msg Message itself. |
565 | GNUNET_CORE_disconnect (core_handle); | 469 | */ |
566 | core_handle = GNUNET_CORE_connect (c, /* Main configuration */ | 470 | static void |
567 | NULL, /* Closure passed to CADET functions */ | 471 | handle_confirm (void *cls, const struct GNUNET_CADET_ConnectionACK *msg) |
568 | &core_init, /* Call core_init once connected */ | 472 | { |
569 | &core_connect, /* Handle connects */ | 473 | struct CadetPeer *peer = cls; |
570 | &core_disconnect, /* remove peers on disconnects */ | 474 | GCC_handle_confirm (peer, msg); |
571 | NULL, /* Don't notify about all incoming messages */ | ||
572 | GNUNET_NO, /* For header only in notification */ | ||
573 | NULL, /* Don't notify about all outbound messages */ | ||
574 | GNUNET_NO, /* For header-only out notification */ | ||
575 | core_handlers); /* Register these handlers */ | ||
576 | if (10 < i++) | ||
577 | GNUNET_assert (0); | ||
578 | } | ||
579 | GML_start (); | ||
580 | } | 475 | } |
581 | 476 | ||
582 | 477 | ||
583 | /** | 478 | /** |
584 | * Core callback to write a pre-constructed data packet to core buffer | 479 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN |
585 | * | 480 | * |
586 | * @param cls Closure (CadetTransmissionDescriptor with data in "data" member). | 481 | * @param cls Closure (CadetPeer for neighbor that sent the message). |
587 | * @param size Number of bytes available in buf. | 482 | * @param msg Message itself. |
588 | * @param buf Where the to write the message. | 483 | */ |
589 | * | 484 | static void |
590 | * @return number of bytes written to buf | 485 | handle_broken (void *cls, const struct GNUNET_CADET_ConnectionBroken *msg) |
591 | */ | ||
592 | static size_t | ||
593 | send_core_data_raw (void *cls, size_t size, void *buf) | ||
594 | { | 486 | { |
595 | struct GNUNET_MessageHeader *msg = cls; | 487 | struct CadetPeer *peer = cls; |
596 | size_t total_size; | 488 | GCC_handle_broken (peer, msg); |
489 | } | ||
597 | 490 | ||
598 | GNUNET_assert (NULL != msg); | ||
599 | total_size = ntohs (msg->size); | ||
600 | 491 | ||
601 | if (total_size > size) | 492 | /** |
602 | { | 493 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY |
603 | GNUNET_break (0); | 494 | * |
604 | return 0; | 495 | * @param cls Closure (CadetPeer for neighbor that sent the message). |
605 | } | 496 | * @param msg Message itself. |
606 | GNUNET_memcpy (buf, msg, total_size); | 497 | */ |
607 | GNUNET_free (cls); | 498 | static void |
608 | return total_size; | 499 | handle_destroy (void *cls, const struct GNUNET_CADET_ConnectionDestroy *msg) |
500 | { | ||
501 | struct CadetPeer *peer = cls; | ||
502 | GCC_handle_destroy (peer, msg); | ||
609 | } | 503 | } |
610 | 504 | ||
611 | 505 | ||
612 | /** | 506 | /** |
613 | * Function to send a create connection message to a peer. | 507 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_ACK |
614 | * | 508 | * |
615 | * @param c Connection to create. | 509 | * @param cls Closure (CadetPeer for neighbor that sent the message). |
616 | * @param size number of bytes available in buf | 510 | * @param msg Message itself. |
617 | * @param buf where the callee should write the message | ||
618 | * @return number of bytes written to buf | ||
619 | */ | 511 | */ |
620 | static size_t | 512 | static void |
621 | send_core_connection_create (struct CadetConnection *c, size_t size, void *buf) | 513 | handle_ack (void *cls, const struct GNUNET_CADET_ACK *msg) |
622 | { | 514 | { |
623 | struct GNUNET_CADET_ConnectionCreate *msg; | 515 | struct CadetPeer *peer = cls; |
624 | struct GNUNET_PeerIdentity *peer_ptr; | 516 | GCC_handle_ack (peer, msg); |
625 | const struct CadetPeerPath *p = GCC_get_path (c); | 517 | } |
626 | size_t size_needed; | ||
627 | int i; | ||
628 | 518 | ||
629 | if (NULL == p) | ||
630 | return 0; | ||
631 | 519 | ||
632 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n"); | 520 | /** |
633 | size_needed = | 521 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_POLL |
634 | sizeof (struct GNUNET_CADET_ConnectionCreate) + | 522 | * |
635 | p->length * sizeof (struct GNUNET_PeerIdentity); | 523 | * @param cls Closure (CadetPeer for neighbor that sent the message). |
524 | * @param msg Message itself. | ||
525 | */ | ||
526 | static void | ||
527 | handle_poll (void *cls, const struct GNUNET_CADET_Poll *msg) | ||
528 | { | ||
529 | struct CadetPeer *peer = cls; | ||
530 | GCC_handle_poll (peer, msg); | ||
531 | } | ||
636 | 532 | ||
637 | if (size < size_needed || NULL == buf) | 533 | |
534 | /** | ||
535 | * Check if the Key eXchange message has the appropriate size. | ||
536 | * | ||
537 | * @param cls Closure (unused). | ||
538 | * @param msg Message to check. | ||
539 | * | ||
540 | * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise. | ||
541 | */ | ||
542 | static int | ||
543 | check_kx (void *cls, const struct GNUNET_CADET_KX *msg) | ||
544 | { | ||
545 | uint16_t size; | ||
546 | uint16_t expected_size; | ||
547 | |||
548 | size = ntohs (msg->header.size); | ||
549 | expected_size = sizeof (struct GNUNET_CADET_KX) | ||
550 | + sizeof (struct GNUNET_MessageHeader); | ||
551 | |||
552 | if (size < expected_size) | ||
638 | { | 553 | { |
639 | GNUNET_break (0); | 554 | GNUNET_break_op (0); |
640 | return 0; | 555 | return GNUNET_NO; |
641 | } | 556 | } |
642 | msg = (struct GNUNET_CADET_ConnectionCreate *) buf; | 557 | return GNUNET_YES; |
643 | msg->header.size = htons (size_needed); | 558 | } |
644 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE); | ||
645 | msg->cid = *GCC_get_id (c); | ||
646 | 559 | ||
647 | peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1]; | 560 | /** |
648 | for (i = 0; i < p->length; i++) | 561 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_KX |
562 | * | ||
563 | * @param cls Closure (CadetPeer for neighbor that sent the message). | ||
564 | * @param msg Message itself. | ||
565 | */ | ||
566 | static void | ||
567 | handle_kx (void *cls, const struct GNUNET_CADET_KX *msg) | ||
568 | { | ||
569 | struct CadetPeer *peer = cls; | ||
570 | GCC_handle_kx (peer, msg); | ||
571 | } | ||
572 | |||
573 | |||
574 | /** | ||
575 | * Check if the encrypted message has the appropriate size. | ||
576 | * | ||
577 | * @param cls Closure (unused). | ||
578 | * @param msg Message to check. | ||
579 | * | ||
580 | * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise. | ||
581 | */ | ||
582 | static int | ||
583 | check_encrypted (void *cls, const struct GNUNET_CADET_AX *msg) | ||
584 | { | ||
585 | uint16_t size; | ||
586 | uint16_t minimum_size; | ||
587 | |||
588 | size = ntohs (msg->header.size); | ||
589 | minimum_size = sizeof (struct GNUNET_CADET_AX) | ||
590 | + sizeof (struct GNUNET_MessageHeader); | ||
591 | |||
592 | if (size < minimum_size) | ||
649 | { | 593 | { |
650 | GNUNET_PEER_resolve (p->peers[i], peer_ptr++); | 594 | GNUNET_break_op (0); |
595 | return GNUNET_NO; | ||
651 | } | 596 | } |
597 | return GNUNET_YES; | ||
598 | } | ||
652 | 599 | ||
653 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 600 | /** |
654 | "CONNECTION CREATE (%u bytes long) sent!\n", | 601 | * Handle for #GNUNET_MESSAGE_TYPE_CADET_AX (AXolotl encrypted traffic). |
655 | size_needed); | 602 | * |
656 | return size_needed; | 603 | * @param cls Closure (CadetPeer for neighbor that sent the message). |
604 | * @param msg Message itself. | ||
605 | */ | ||
606 | static void | ||
607 | handle_encrypted (void *cls, const struct GNUNET_CADET_AX *msg) | ||
608 | { | ||
609 | struct CadetPeer *peer = cls; | ||
610 | GCC_handle_encrypted (peer, msg); | ||
657 | } | 611 | } |
658 | 612 | ||
659 | 613 | ||
660 | /** | 614 | /** |
661 | * Creates a path ack message in buf and frees all unused resources. | 615 | * To be called on core init/fail. |
662 | * | 616 | * |
663 | * @param c Connection to send an ACK on. | 617 | * @param cls Closure (config) |
664 | * @param size number of bytes available in buf | 618 | * @param identity The public identity of this peer. |
665 | * @param buf where the callee should write the message | 619 | */ |
620 | static void | ||
621 | core_init_notify (void *cls, | ||
622 | const struct GNUNET_PeerIdentity *identity); | ||
623 | |||
624 | |||
625 | static void | ||
626 | connect_to_core (const struct GNUNET_CONFIGURATION_Handle *c) | ||
627 | { | ||
628 | struct GNUNET_MQ_MessageHandler core_handlers[] = { | ||
629 | GNUNET_MQ_hd_var_size (create, | ||
630 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, | ||
631 | struct GNUNET_CADET_ConnectionCreate, | ||
632 | NULL), | ||
633 | GNUNET_MQ_hd_fixed_size (confirm, | ||
634 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, | ||
635 | struct GNUNET_CADET_ConnectionACK, | ||
636 | NULL), | ||
637 | GNUNET_MQ_hd_fixed_size (broken, | ||
638 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, | ||
639 | struct GNUNET_CADET_ConnectionBroken, | ||
640 | NULL), | ||
641 | GNUNET_MQ_hd_fixed_size (destroy, | ||
642 | GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, | ||
643 | struct GNUNET_CADET_ConnectionDestroy, | ||
644 | NULL), | ||
645 | GNUNET_MQ_hd_fixed_size (ack, | ||
646 | GNUNET_MESSAGE_TYPE_CADET_ACK, | ||
647 | struct GNUNET_CADET_ACK, | ||
648 | NULL), | ||
649 | GNUNET_MQ_hd_fixed_size (poll, | ||
650 | GNUNET_MESSAGE_TYPE_CADET_POLL, | ||
651 | struct GNUNET_CADET_Poll, | ||
652 | NULL), | ||
653 | GNUNET_MQ_hd_var_size (kx, | ||
654 | GNUNET_MESSAGE_TYPE_CADET_KX, | ||
655 | struct GNUNET_CADET_KX, | ||
656 | NULL), | ||
657 | GNUNET_MQ_hd_var_size (encrypted, | ||
658 | GNUNET_MESSAGE_TYPE_CADET_AX, | ||
659 | struct GNUNET_CADET_AX, | ||
660 | NULL), | ||
661 | GNUNET_MQ_handler_end () | ||
662 | }; | ||
663 | core_handle = GNUNET_CORE_connecT (c, NULL, | ||
664 | &core_init_notify, | ||
665 | &core_connect_handler, | ||
666 | &core_disconnect_handler, | ||
667 | core_handlers); | ||
668 | } | ||
669 | |||
670 | /******************************************************************************/ | ||
671 | /******************************************************************************/ | ||
672 | /******************************************************************************/ | ||
673 | /******************************************************************************/ | ||
674 | /******************************************************************************/ | ||
675 | |||
676 | /** | ||
677 | * To be called on core init/fail. | ||
666 | * | 678 | * |
667 | * @return number of bytes written to buf | 679 | * @param cls Closure (config) |
680 | * @param identity The public identity of this peer. | ||
668 | */ | 681 | */ |
669 | static size_t | 682 | static void |
670 | send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf) | 683 | core_init_notify (void *cls, |
684 | const struct GNUNET_PeerIdentity *core_identity) | ||
671 | { | 685 | { |
672 | struct GNUNET_CADET_ConnectionACK *msg = buf; | 686 | const struct GNUNET_CONFIGURATION_Handle *c = cls; |
673 | 687 | ||
674 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n"); | 688 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); |
675 | if (sizeof (struct GNUNET_CADET_ConnectionACK) > size) | 689 | if (0 != memcmp (core_identity, &my_full_id, sizeof (my_full_id))) |
676 | { | 690 | { |
677 | GNUNET_break (0); | 691 | LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n")); |
678 | return 0; | 692 | LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (core_identity)); |
693 | LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id)); | ||
694 | GNUNET_CORE_disconnecT (core_handle); | ||
695 | connect_to_core (c); | ||
696 | return; | ||
679 | } | 697 | } |
680 | msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK)); | 698 | GML_start (); |
681 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK); | ||
682 | msg->cid = *GCC_get_id (c); | ||
683 | |||
684 | LOG (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n"); | ||
685 | return sizeof (struct GNUNET_CADET_ConnectionACK); | ||
686 | } | 699 | } |
687 | 700 | ||
688 | 701 | ||
@@ -697,8 +710,11 @@ send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf) | |||
697 | * @param q Queued message | 710 | * @param q Queued message |
698 | * | 711 | * |
699 | * @return CORE priority to use. | 712 | * @return CORE priority to use. |
713 | * | ||
714 | * FIXME make static | ||
715 | * FIXME use when sending | ||
700 | */ | 716 | */ |
701 | static enum GNUNET_CORE_Priority | 717 | enum GNUNET_CORE_Priority |
702 | get_priority (struct CadetPeerQueue *q) | 718 | get_priority (struct CadetPeerQueue *q) |
703 | { | 719 | { |
704 | enum GNUNET_CORE_Priority low; | 720 | enum GNUNET_CORE_Priority low; |
@@ -711,7 +727,7 @@ get_priority (struct CadetPeerQueue *q) | |||
711 | } | 727 | } |
712 | 728 | ||
713 | /* Relayed traffic has lower priority, our own traffic has higher */ | 729 | /* Relayed traffic has lower priority, our own traffic has higher */ |
714 | if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->fwd)) | 730 | if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->c_fwd)) |
715 | { | 731 | { |
716 | low = GNUNET_CORE_PRIO_BEST_EFFORT; | 732 | low = GNUNET_CORE_PRIO_BEST_EFFORT; |
717 | high = GNUNET_CORE_PRIO_URGENT; | 733 | high = GNUNET_CORE_PRIO_URGENT; |
@@ -784,20 +800,6 @@ peer_destroy (struct CadetPeer *peer) | |||
784 | GNUNET_ATS_connectivity_suggest_cancel (peer->connectivity_suggestion); | 800 | GNUNET_ATS_connectivity_suggest_cancel (peer->connectivity_suggestion); |
785 | peer->connectivity_suggestion = NULL; | 801 | peer->connectivity_suggestion = NULL; |
786 | } | 802 | } |
787 | while (NULL != peer->queue_head) | ||
788 | { | ||
789 | /* This function destroys the current peer->queue_head but | ||
790 | * replaces it with the next in the queue, so it is correct | ||
791 | * to while() here. | ||
792 | */ | ||
793 | GCP_queue_destroy (peer->queue_head, GNUNET_YES, GNUNET_NO, 0); | ||
794 | } | ||
795 | if (NULL != peer->core_transmit) | ||
796 | { | ||
797 | GNUNET_break (0); /* GCP_queue_destroy should've cancelled it! */ | ||
798 | GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); | ||
799 | peer->core_transmit = NULL; | ||
800 | } | ||
801 | 803 | ||
802 | GNUNET_free_non_null (peer->hello); | 804 | GNUNET_free_non_null (peer->hello); |
803 | GNUNET_free (peer); | 805 | GNUNET_free (peer); |
@@ -831,7 +833,6 @@ shutdown_peer (void *cls, | |||
831 | } | 833 | } |
832 | 834 | ||
833 | 835 | ||
834 | |||
835 | /** | 836 | /** |
836 | * Check if peer is searching for a path (either active or delayed search). | 837 | * Check if peer is searching for a path (either active or delayed search). |
837 | * | 838 | * |
@@ -996,64 +997,6 @@ peer_get_best_path (const struct CadetPeer *peer) | |||
996 | 997 | ||
997 | 998 | ||
998 | /** | 999 | /** |
999 | * Is this queue element sendable? | ||
1000 | * | ||
1001 | * - All management traffic is always sendable. | ||
1002 | * - For payload traffic, check the connection flow control. | ||
1003 | * | ||
1004 | * @param q Queue element to inspect. | ||
1005 | * @return #GNUNET_YES if it is sendable, #GNUNET_NO otherwise. | ||
1006 | */ | ||
1007 | static int | ||
1008 | queue_is_sendable (struct CadetPeerQueue *q) | ||
1009 | { | ||
1010 | /* Is PID-independent? */ | ||
1011 | switch (q->type) | ||
1012 | { | ||
1013 | case GNUNET_MESSAGE_TYPE_CADET_ACK: | ||
1014 | case GNUNET_MESSAGE_TYPE_CADET_POLL: | ||
1015 | case GNUNET_MESSAGE_TYPE_CADET_KX: | ||
1016 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: | ||
1017 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: | ||
1018 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: | ||
1019 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: | ||
1020 | return GNUNET_YES; | ||
1021 | |||
1022 | case GNUNET_MESSAGE_TYPE_CADET_AX: | ||
1023 | break; | ||
1024 | |||
1025 | default: | ||
1026 | GNUNET_break (0); | ||
1027 | } | ||
1028 | |||
1029 | return GCC_is_sendable (q->c, q->fwd); | ||
1030 | } | ||
1031 | |||
1032 | |||
1033 | /** | ||
1034 | * Get first sendable message. | ||
1035 | * | ||
1036 | * @param peer The destination peer. | ||
1037 | * | ||
1038 | * @return First transmittable message, if any. Otherwise, NULL. | ||
1039 | */ | ||
1040 | static struct CadetPeerQueue * | ||
1041 | peer_get_first_message (const struct CadetPeer *peer) | ||
1042 | { | ||
1043 | struct CadetPeerQueue *q; | ||
1044 | |||
1045 | for (q = peer->queue_head; NULL != q; q = q->next) | ||
1046 | { | ||
1047 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking q:%p on c:%s\n", q, GCC_2s (q->c)); | ||
1048 | if (queue_is_sendable (q)) | ||
1049 | return q; | ||
1050 | } | ||
1051 | |||
1052 | return NULL; | ||
1053 | } | ||
1054 | |||
1055 | |||
1056 | /** | ||
1057 | * Function to process paths received for a new peer addition. The recorded | 1000 | * Function to process paths received for a new peer addition. The recorded |
1058 | * paths form the initial tunnel, which can be optimized later. | 1001 | * paths form the initial tunnel, which can be optimized later. |
1059 | * Called on each result obtained for the DHT search. | 1002 | * Called on each result obtained for the DHT search. |
@@ -1090,19 +1033,6 @@ search_handler (void *cls, const struct CadetPeerPath *path) | |||
1090 | 1033 | ||
1091 | 1034 | ||
1092 | /** | 1035 | /** |
1093 | * Adjust core requested size to accomodate an ACK. | ||
1094 | * | ||
1095 | * @param message_size Requested size. | ||
1096 | * | ||
1097 | * @return Size enough to fit @c message_size and an ACK. | ||
1098 | */ | ||
1099 | static size_t | ||
1100 | get_core_size (size_t message_size) | ||
1101 | { | ||
1102 | return message_size + sizeof (struct GNUNET_CADET_ACK); | ||
1103 | } | ||
1104 | |||
1105 | /** | ||
1106 | * Test if a message type is connection management traffic | 1036 | * Test if a message type is connection management traffic |
1107 | * or regular payload traffic. | 1037 | * or regular payload traffic. |
1108 | * | 1038 | * |
@@ -1119,85 +1049,13 @@ is_connection_management (uint16_t type) | |||
1119 | 1049 | ||
1120 | 1050 | ||
1121 | /** | 1051 | /** |
1122 | * Fill a core buffer with the appropriate data for the queued message. | ||
1123 | * | ||
1124 | * @param queue Queue element for the message. | ||
1125 | * @param buf Core buffer to fill. | ||
1126 | * @param size Size remaining in @c buf. | ||
1127 | * @param[out] pid In case its an encrypted payload, set payload. | ||
1128 | * | ||
1129 | * @return Bytes written to @c buf. | ||
1130 | */ | ||
1131 | static size_t | ||
1132 | fill_buf (struct CadetPeerQueue *queue, void *buf, size_t size, uint32_t *pid) | ||
1133 | { | ||
1134 | struct CadetConnection *c = queue->c; | ||
1135 | size_t msg_size; | ||
1136 | |||
1137 | switch (queue->type) | ||
1138 | { | ||
1139 | case GNUNET_MESSAGE_TYPE_CADET_AX: | ||
1140 | *pid = GCC_get_pid (queue->c, queue->fwd); | ||
1141 | LOG (GNUNET_ERROR_TYPE_DEBUG, " ax payload ID %u\n", *pid); | ||
1142 | msg_size = send_core_data_raw (queue->cls, size, buf); | ||
1143 | ((struct GNUNET_CADET_AX *) buf)->pid = htonl (*pid); | ||
1144 | break; | ||
1145 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: | ||
1146 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: | ||
1147 | case GNUNET_MESSAGE_TYPE_CADET_KX: | ||
1148 | case GNUNET_MESSAGE_TYPE_CADET_ACK: | ||
1149 | case GNUNET_MESSAGE_TYPE_CADET_POLL: | ||
1150 | LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type)); | ||
1151 | msg_size = send_core_data_raw (queue->cls, size, buf); | ||
1152 | break; | ||
1153 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: | ||
1154 | LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n"); | ||
1155 | if (GCC_is_origin (c, GNUNET_YES)) | ||
1156 | msg_size = send_core_connection_create (c, size, buf); | ||
1157 | else | ||
1158 | msg_size = send_core_data_raw (queue->cls, size, buf); | ||
1159 | break; | ||
1160 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: | ||
1161 | LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n"); | ||
1162 | if (GCC_is_origin (c, GNUNET_NO) || | ||
1163 | GCC_is_origin (c, GNUNET_YES)) | ||
1164 | { | ||
1165 | msg_size = send_core_connection_ack (c, size, buf); | ||
1166 | } | ||
1167 | else | ||
1168 | { | ||
1169 | msg_size = send_core_data_raw (queue->cls, size, buf); | ||
1170 | } | ||
1171 | break; | ||
1172 | case GNUNET_MESSAGE_TYPE_CADET_DATA: | ||
1173 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: | ||
1174 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: | ||
1175 | /* This should be encapsulted */ | ||
1176 | msg_size = 0; | ||
1177 | GNUNET_assert (0); | ||
1178 | break; | ||
1179 | default: | ||
1180 | GNUNET_break (0); | ||
1181 | LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type); | ||
1182 | msg_size = 0; | ||
1183 | } | ||
1184 | |||
1185 | GNUNET_assert (size >= msg_size); | ||
1186 | |||
1187 | return msg_size; | ||
1188 | } | ||
1189 | |||
1190 | |||
1191 | /** | ||
1192 | * Debug function should NEVER return true in production code, useful to | 1052 | * Debug function should NEVER return true in production code, useful to |
1193 | * simulate losses for testcases. | 1053 | * simulate losses for testcases. |
1194 | * | 1054 | * |
1195 | * @param q Queue handle with info about the message. | ||
1196 | * | ||
1197 | * @return #GNUNET_YES or #GNUNET_NO with the decision to drop. | 1055 | * @return #GNUNET_YES or #GNUNET_NO with the decision to drop. |
1198 | */ | 1056 | */ |
1199 | static int | 1057 | static int |
1200 | should_I_drop (struct CadetPeerQueue *q) | 1058 | should_I_drop (void) |
1201 | { | 1059 | { |
1202 | if (0 == drop_percent) | 1060 | if (0 == drop_percent) |
1203 | return GNUNET_NO; | 1061 | return GNUNET_NO; |
@@ -1209,297 +1067,87 @@ should_I_drop (struct CadetPeerQueue *q) | |||
1209 | } | 1067 | } |
1210 | 1068 | ||
1211 | 1069 | ||
1212 | /** | ||
1213 | * Core callback to write a queued packet to core buffer | ||
1214 | * | ||
1215 | * @param cls Closure (peer info). | ||
1216 | * @param size Number of bytes available in buf. | ||
1217 | * @param buf Where the to write the message. | ||
1218 | * | ||
1219 | * @return number of bytes written to buf | ||
1220 | */ | ||
1221 | static size_t | ||
1222 | queue_send (void *cls, size_t size, void *buf) | ||
1223 | { | ||
1224 | struct CadetPeer *peer = cls; | ||
1225 | struct CadetConnection *c; | ||
1226 | struct CadetPeerQueue *queue; | ||
1227 | struct GNUNET_TIME_Relative core_wait_time; | ||
1228 | const char *wait_s; | ||
1229 | const struct GNUNET_PeerIdentity *dst_id; | ||
1230 | size_t msg_size; | ||
1231 | size_t total_size; | ||
1232 | size_t rest; | ||
1233 | char *dst; | ||
1234 | uint32_t pid; | ||
1235 | |||
1236 | GCC_check_connections (); | ||
1237 | LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); | ||
1238 | LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); | ||
1239 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n", | ||
1240 | GCP_2s (peer), size); | ||
1241 | |||
1242 | /* Sanity checking */ | ||
1243 | if (NULL == buf || 0 == size) | ||
1244 | { | ||
1245 | LOG (GNUNET_ERROR_TYPE_DEBUG, " not allowed/\n"); | ||
1246 | if (GNUNET_NO == in_shutdown) | ||
1247 | { | ||
1248 | queue = peer_get_first_message (peer); | ||
1249 | if (NULL == queue) | ||
1250 | { | ||
1251 | peer->core_transmit = NULL; | ||
1252 | peer->tmt_time.abs_value_us = 0; | ||
1253 | GCC_check_connections (); | ||
1254 | return 0; | ||
1255 | } | ||
1256 | dst_id = GNUNET_PEER_resolve2 (peer->id); | ||
1257 | peer->core_transmit = | ||
1258 | GNUNET_CORE_notify_transmit_ready (core_handle, | ||
1259 | GNUNET_NO, get_priority (queue), | ||
1260 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1261 | dst_id, | ||
1262 | get_core_size (queue->size), | ||
1263 | &queue_send, | ||
1264 | peer); | ||
1265 | peer->tmt_time = GNUNET_TIME_absolute_get (); | ||
1266 | } | ||
1267 | else | ||
1268 | { | ||
1269 | peer->core_transmit = NULL; | ||
1270 | peer->tmt_time.abs_value_us = 0; | ||
1271 | } | ||
1272 | GCC_check_connections (); | ||
1273 | return 0; | ||
1274 | } | ||
1275 | |||
1276 | /* Init */ | ||
1277 | rest = size; | ||
1278 | total_size = 0; | ||
1279 | dst = (char *) buf; | ||
1280 | pid = 0; | ||
1281 | peer->core_transmit = NULL; | ||
1282 | queue = peer_get_first_message (peer); | ||
1283 | if (NULL == queue) | ||
1284 | { | ||
1285 | GNUNET_break (0); /* Core tmt_rdy should've been canceled */ | ||
1286 | peer->tmt_time.abs_value_us = 0; | ||
1287 | return 0; | ||
1288 | } | ||
1289 | core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time); | ||
1290 | wait_s = GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_YES); | ||
1291 | if (core_wait_time.rel_value_us >= 1000000) | ||
1292 | { | ||
1293 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
1294 | " %s: core wait time %s (> 1 second) for %u bytes\n", | ||
1295 | GCP_2s (peer), wait_s, queue->size); | ||
1296 | } | ||
1297 | peer->tmt_time.abs_value_us = 0; | ||
1298 | |||
1299 | /* Copy all possible messages to the core buffer */ | ||
1300 | while (NULL != queue && rest >= queue->size) | ||
1301 | { | ||
1302 | c = queue->c; | ||
1303 | |||
1304 | LOG (GNUNET_ERROR_TYPE_DEBUG, " on conn %s %s\n", | ||
1305 | GCC_2s (c), GC_f2s(queue->fwd)); | ||
1306 | LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok (%u/%u)\n", | ||
1307 | queue->size, total_size, size); | ||
1308 | |||
1309 | msg_size = fill_buf (queue, (void *) dst, size, &pid); | ||
1310 | |||
1311 | if (should_I_drop (queue)) | ||
1312 | { | ||
1313 | LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n", | ||
1314 | GC_m2s (queue->type), GC_m2s (queue->payload_type), | ||
1315 | queue->payload_id, GCC_2s (c), GC_f2s (queue->fwd)); | ||
1316 | msg_size = 0; | ||
1317 | } | ||
1318 | else | ||
1319 | { | ||
1320 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
1321 | ">>> %s (%s %4u) on conn %s (%p) %s [%5u], after %s\n", | ||
1322 | GC_m2s (queue->type), GC_m2s (queue->payload_type), | ||
1323 | queue->payload_id, GCC_2s (c), c, | ||
1324 | GC_f2s (queue->fwd), msg_size, wait_s); | ||
1325 | } | ||
1326 | total_size += msg_size; | ||
1327 | rest -= msg_size; | ||
1328 | dst = &dst[msg_size]; | ||
1329 | msg_size = 0; | ||
1330 | |||
1331 | /* Free queue, but cls was freed by send_core_* in fill_buf. */ | ||
1332 | (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid); | ||
1333 | |||
1334 | /* Next! */ | ||
1335 | queue = peer_get_first_message (peer); | ||
1336 | } | ||
1337 | |||
1338 | /* If more data in queue, send next */ | ||
1339 | if (NULL != queue) | ||
1340 | { | ||
1341 | LOG (GNUNET_ERROR_TYPE_DEBUG, " more data! (%u)\n", queue->size); | ||
1342 | if (NULL == peer->core_transmit) | ||
1343 | { | ||
1344 | dst_id = GNUNET_PEER_resolve2 (peer->id); | ||
1345 | peer->core_transmit = | ||
1346 | GNUNET_CORE_notify_transmit_ready (core_handle, | ||
1347 | GNUNET_NO, get_priority (queue), | ||
1348 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1349 | dst_id, | ||
1350 | get_core_size (queue->size), | ||
1351 | &queue_send, | ||
1352 | peer); | ||
1353 | peer->tmt_time = GNUNET_TIME_absolute_get (); | ||
1354 | queue->start_waiting = GNUNET_TIME_absolute_get (); | ||
1355 | } | ||
1356 | else | ||
1357 | { | ||
1358 | LOG (GNUNET_ERROR_TYPE_DEBUG, "* tmt rdy called somewhere else\n"); | ||
1359 | } | ||
1360 | // GCC_start_poll (); FIXME needed? | ||
1361 | } | ||
1362 | else | ||
1363 | { | ||
1364 | // GCC_stop_poll(); FIXME needed? | ||
1365 | } | ||
1366 | |||
1367 | LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", total_size); | ||
1368 | queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); | ||
1369 | GCC_check_connections (); | ||
1370 | return total_size; | ||
1371 | } | ||
1372 | |||
1373 | |||
1374 | /******************************************************************************/ | 1070 | /******************************************************************************/ |
1375 | /******************************** API ***********************************/ | 1071 | /******************************** API ***********************************/ |
1376 | /******************************************************************************/ | 1072 | /******************************************************************************/ |
1377 | 1073 | ||
1378 | |||
1379 | /** | 1074 | /** |
1380 | * Free a transmission that was already queued with all resources | 1075 | * Call the continuation after a message has been sent or dropped. |
1381 | * associated to the request. | ||
1382 | * | ||
1383 | * If connection was marked to be destroyed, and this was the last queued | ||
1384 | * message on it, the connection will be free'd as a result. | ||
1385 | * | 1076 | * |
1386 | * @param queue Queue handler to cancel. | 1077 | * @param q Queue handle. |
1387 | * @param clear_cls Is it necessary to free associated cls? | 1078 | * @param sent #GNUNET_YES if was sent to CORE, #GNUNET_NO if dropped. |
1388 | * @param sent Was it really sent? (Could have been canceled) | ||
1389 | * @param pid PID, if relevant (was sent and was a payload message). | ||
1390 | * | ||
1391 | * @return #GNUNET_YES if connection was destroyed as a result, | ||
1392 | * #GNUNET_NO otherwise. | ||
1393 | */ | 1079 | */ |
1394 | int | 1080 | static void |
1395 | GCP_queue_destroy (struct CadetPeerQueue *queue, | 1081 | call_peer_cont (const struct CadetPeerQueue *q, int sent) |
1396 | int clear_cls, | ||
1397 | int sent, | ||
1398 | uint32_t pid) | ||
1399 | { | 1082 | { |
1400 | struct CadetPeer *peer; | 1083 | LOG (GNUNET_ERROR_TYPE_DEBUG, " core mq just sent %s\n", GC_m2s (q->type)); |
1401 | int connection_destroyed; | 1084 | if (NULL != q->cont) |
1402 | |||
1403 | GCC_check_connections (); | ||
1404 | peer = queue->peer; | ||
1405 | LOG (GNUNET_ERROR_TYPE_DEBUG, "queue destroy %s\n", GC_m2s (queue->type)); | ||
1406 | if (GNUNET_YES == clear_cls) | ||
1407 | { | ||
1408 | LOG (GNUNET_ERROR_TYPE_DEBUG, " free cls\n"); | ||
1409 | switch (queue->type) | ||
1410 | { | ||
1411 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: | ||
1412 | LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n"); | ||
1413 | /* fall through */ | ||
1414 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: | ||
1415 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: | ||
1416 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: | ||
1417 | case GNUNET_MESSAGE_TYPE_CADET_KX: | ||
1418 | case GNUNET_MESSAGE_TYPE_CADET_AX: | ||
1419 | case GNUNET_MESSAGE_TYPE_CADET_ACK: | ||
1420 | case GNUNET_MESSAGE_TYPE_CADET_POLL: | ||
1421 | GNUNET_free_non_null (queue->cls); | ||
1422 | break; | ||
1423 | |||
1424 | default: | ||
1425 | GNUNET_break (0); | ||
1426 | LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n", | ||
1427 | GC_m2s (queue->type)); | ||
1428 | } | ||
1429 | } | ||
1430 | GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); | ||
1431 | |||
1432 | if (!is_connection_management (queue->type)) | ||
1433 | { | ||
1434 | peer->queue_n--; | ||
1435 | } | ||
1436 | |||
1437 | if (NULL != queue->cont) | ||
1438 | { | 1085 | { |
1439 | struct GNUNET_TIME_Relative wait_time; | 1086 | struct GNUNET_TIME_Relative wait_time; |
1440 | 1087 | ||
1441 | wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting); | 1088 | wait_time = GNUNET_TIME_absolute_get_duration (q->queue_timestamp); |
1442 | LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n", | 1089 | LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n", |
1443 | GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO)); | 1090 | GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO)); |
1444 | connection_destroyed = queue->cont (queue->cont_cls, | 1091 | q->cont (q->cont_cls, |
1445 | queue->c, sent, queue->type, pid, | 1092 | q->c, q->c_fwd, sent, |
1446 | queue->fwd, queue->size, wait_time); | 1093 | q->type, q->payload_type, q->payload_id, |
1447 | } | 1094 | q->size, wait_time); |
1448 | else | ||
1449 | { | ||
1450 | connection_destroyed = GNUNET_NO; | ||
1451 | } | 1095 | } |
1096 | } | ||
1452 | 1097 | ||
1453 | if (NULL == peer_get_first_message (peer) && NULL != peer->core_transmit) | 1098 | |
1099 | /** | ||
1100 | * Function called by MQ when a message is sent to CORE. | ||
1101 | * | ||
1102 | * @param cls Closure (queue handle). | ||
1103 | */ | ||
1104 | static void | ||
1105 | mq_sent (void *cls) | ||
1106 | { | ||
1107 | struct CadetPeerQueue *q = cls; | ||
1108 | |||
1109 | if (GNUNET_NO == q->management_traffic) | ||
1454 | { | 1110 | { |
1455 | GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); | 1111 | q->peer->queue_n--; |
1456 | peer->core_transmit = NULL; | ||
1457 | peer->tmt_time.abs_value_us = 0; | ||
1458 | } | 1112 | } |
1459 | 1113 | call_peer_cont (q, GNUNET_YES); | |
1460 | GNUNET_free (queue); | 1114 | GNUNET_free (q); |
1461 | GCC_check_connections (); | ||
1462 | return connection_destroyed; | ||
1463 | } | 1115 | } |
1464 | 1116 | ||
1465 | 1117 | ||
1466 | /** | 1118 | /** |
1467 | * @brief Queue and pass message to core when possible. | 1119 | * @brief Send a message to another peer (using CORE). |
1468 | * | 1120 | * |
1469 | * @param peer Peer towards which to queue the message. | 1121 | * @param peer Peer towards which to queue the message. |
1470 | * @param cls Closure (@c type dependant). It will be used by queue_send to | 1122 | * @param message Message to send. |
1471 | * build the message to be sent if not already prebuilt. | 1123 | * @param payload_type Type of the message's payload, for debug messages. |
1472 | * @param type Type of the message. | ||
1473 | * @param payload_type Type of the message's payload | ||
1474 | * 0 if the message is a retransmission (unknown payload). | 1124 | * 0 if the message is a retransmission (unknown payload). |
1475 | * UINT16_MAX if the message does not have payload. | 1125 | * UINT16_MAX if the message does not have payload. |
1476 | * @param payload_id ID of the payload (MID, ACK #, etc) | 1126 | * @param payload_id ID of the payload (MID, ACK #, etc) |
1477 | * @param size Size of the message. | ||
1478 | * @param c Connection this message belongs to (can be NULL). | 1127 | * @param c Connection this message belongs to (can be NULL). |
1479 | * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) | 1128 | * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) |
1480 | * @param cont Continuation to be called once CORE has taken the message. | 1129 | * @param cont Continuation to be called once CORE has sent the message. |
1481 | * @param cont_cls Closure for @c cont. | 1130 | * @param cont_cls Closure for @c cont. |
1482 | * | 1131 | * |
1483 | * @return Handle to cancel the message before it is sent. Once cont is called | 1132 | * @return A handle to the message in the queue or NULL (if dropped). |
1484 | * message has been sent and therefore the handle is no longer valid. | ||
1485 | */ | 1133 | */ |
1486 | struct CadetPeerQueue * | 1134 | struct CadetPeerQueue * |
1487 | GCP_queue_add (struct CadetPeer *peer, | 1135 | GCP_send (struct CadetPeer *peer, |
1488 | void *cls, | 1136 | const struct GNUNET_MessageHeader *message, |
1489 | uint16_t type, | 1137 | uint16_t payload_type, |
1490 | uint16_t payload_type, | 1138 | uint32_t payload_id, |
1491 | uint32_t payload_id, | 1139 | struct CadetConnection *c, |
1492 | size_t size, | 1140 | int fwd, |
1493 | struct CadetConnection *c, | 1141 | GCP_sent cont, |
1494 | int fwd, | 1142 | void *cont_cls) |
1495 | GCP_sent cont, | ||
1496 | void *cont_cls) | ||
1497 | { | 1143 | { |
1498 | struct CadetPeerQueue *q; | 1144 | struct CadetPeerQueue *q; |
1499 | int priority; | 1145 | uint16_t type; |
1500 | int call_core; | 1146 | uint16_t size; |
1501 | 1147 | ||
1502 | GCC_check_connections (); | 1148 | GCC_check_connections (); |
1149 | type = ntohs (message->type); | ||
1150 | size = ntohs (message->size); | ||
1503 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1151 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1504 | "que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n", | 1152 | "que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n", |
1505 | GC_m2s (type), GC_m2s (payload_type), payload_id, | 1153 | GC_m2s (type), GC_m2s (payload_type), payload_id, |
@@ -1508,282 +1156,68 @@ GCP_queue_add (struct CadetPeer *peer, | |||
1508 | if (NULL == peer->connections) | 1156 | if (NULL == peer->connections) |
1509 | { | 1157 | { |
1510 | /* We are not connected to this peer, ignore request. */ | 1158 | /* We are not connected to this peer, ignore request. */ |
1159 | GNUNET_break (0); | ||
1511 | LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer)); | 1160 | LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer)); |
1512 | GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1, | 1161 | GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1, |
1513 | GNUNET_NO); | 1162 | GNUNET_NO); |
1514 | return NULL; | 1163 | return NULL; |
1515 | } | 1164 | } |
1516 | 1165 | ||
1517 | priority = 0; | ||
1518 | if (is_connection_management (type)) | ||
1519 | { | ||
1520 | priority = 100; | ||
1521 | } | ||
1522 | LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority); | ||
1523 | |||
1524 | call_core = (NULL == c || GNUNET_MESSAGE_TYPE_CADET_KX == type) ? | ||
1525 | GNUNET_YES : GCC_is_sendable (c, fwd); | ||
1526 | q = GNUNET_new (struct CadetPeerQueue); | 1166 | q = GNUNET_new (struct CadetPeerQueue); |
1527 | q->cls = cls; | 1167 | q->env = GNUNET_MQ_msg_copy (message); |
1168 | q->peer = peer; | ||
1169 | q->cont = cont; | ||
1170 | q->cont_cls = cont_cls; | ||
1171 | q->queue_timestamp = GNUNET_TIME_absolute_get (); | ||
1172 | q->management_traffic = is_connection_management (type); | ||
1528 | q->type = type; | 1173 | q->type = type; |
1174 | q->size = size; | ||
1529 | q->payload_type = payload_type; | 1175 | q->payload_type = payload_type; |
1530 | q->payload_id = payload_id; | 1176 | q->payload_id = payload_id; |
1531 | q->size = size; | ||
1532 | q->peer = peer; | ||
1533 | q->c = c; | 1177 | q->c = c; |
1534 | q->fwd = fwd; | 1178 | q->c_fwd = fwd; |
1535 | q->cont = cont; | 1179 | GNUNET_MQ_notify_sent (q->env, mq_sent, q); |
1536 | q->cont_cls = cont_cls; | ||
1537 | if (100 > priority) | ||
1538 | { | ||
1539 | GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, q); | ||
1540 | peer->queue_n++; | ||
1541 | } | ||
1542 | else | ||
1543 | { | ||
1544 | GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, q); | ||
1545 | call_core = GNUNET_YES; | ||
1546 | } | ||
1547 | 1180 | ||
1548 | q->start_waiting = GNUNET_TIME_absolute_get (); | 1181 | if (GNUNET_YES == q->management_traffic) |
1549 | if (NULL == peer->core_transmit && GNUNET_YES == call_core) | ||
1550 | { | 1182 | { |
1551 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1183 | GNUNET_MQ_send (peer->core_mq, q->env); // FIXME implement "_urgent", use |
1552 | "calling core tmt rdy towards %s for %u bytes\n", | ||
1553 | GCP_2s (peer), size); | ||
1554 | peer->core_transmit = | ||
1555 | GNUNET_CORE_notify_transmit_ready (core_handle, | ||
1556 | GNUNET_NO, get_priority (q), | ||
1557 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1558 | GNUNET_PEER_resolve2 (peer->id), | ||
1559 | get_core_size (size), | ||
1560 | &queue_send, peer); | ||
1561 | peer->tmt_time = GNUNET_TIME_absolute_get (); | ||
1562 | } | ||
1563 | else if (GNUNET_NO == call_core) | ||
1564 | { | ||
1565 | LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s not needed\n", | ||
1566 | GCP_2s (peer)); | ||
1567 | |||
1568 | } | 1184 | } |
1569 | else | 1185 | else |
1570 | { | 1186 | { |
1571 | struct GNUNET_TIME_Relative elapsed; | 1187 | if (GNUNET_YES == should_I_drop()) |
1572 | elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time); | ||
1573 | LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n", | ||
1574 | GCP_2s (peer), | ||
1575 | GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO)); | ||
1576 | |||
1577 | } | ||
1578 | queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); | ||
1579 | GCC_check_connections (); | ||
1580 | return q; | ||
1581 | } | ||
1582 | |||
1583 | |||
1584 | /** | ||
1585 | * Cancel all queued messages to a peer that belong to a certain connection. | ||
1586 | * | ||
1587 | * @param peer Peer towards whom to cancel. | ||
1588 | * @param c Connection whose queued messages to cancel. Might be destroyed by | ||
1589 | * the sent continuation call. | ||
1590 | */ | ||
1591 | void | ||
1592 | GCP_queue_cancel (struct CadetPeer *peer, | ||
1593 | struct CadetConnection *c) | ||
1594 | { | ||
1595 | struct CadetPeerQueue *q; | ||
1596 | struct CadetPeerQueue *next; | ||
1597 | struct CadetPeerQueue *prev; | ||
1598 | int connection_destroyed; | ||
1599 | |||
1600 | GCC_check_connections (); | ||
1601 | connection_destroyed = GNUNET_NO; | ||
1602 | for (q = peer->queue_head; NULL != q; q = next) | ||
1603 | { | ||
1604 | prev = q->prev; | ||
1605 | if (q->c == c) | ||
1606 | { | ||
1607 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1608 | "GMP queue cancel %s\n", | ||
1609 | GC_m2s (q->type)); | ||
1610 | GNUNET_assert (GNUNET_NO == connection_destroyed); | ||
1611 | if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY == q->type) | ||
1612 | { | ||
1613 | q->c = NULL; | ||
1614 | } | ||
1615 | else | ||
1616 | { | ||
1617 | connection_destroyed = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0); | ||
1618 | } | ||
1619 | |||
1620 | /* Get next from prev, q->next might be already freed: | ||
1621 | * queue destroy -> callback -> GCC_destroy -> cancel_queues -> here | ||
1622 | */ | ||
1623 | if (NULL == prev) | ||
1624 | next = peer->queue_head; | ||
1625 | else | ||
1626 | next = prev->next; | ||
1627 | } | ||
1628 | else | ||
1629 | { | ||
1630 | next = q->next; | ||
1631 | } | ||
1632 | } | ||
1633 | |||
1634 | if ( (NULL == peer->queue_head) && | ||
1635 | (NULL != peer->core_transmit) ) | ||
1636 | { | ||
1637 | GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); | ||
1638 | peer->core_transmit = NULL; | ||
1639 | peer->tmt_time.abs_value_us = 0; | ||
1640 | } | ||
1641 | GCC_check_connections (); | ||
1642 | } | ||
1643 | |||
1644 | |||
1645 | /** | ||
1646 | * Get the first transmittable message for a connection. | ||
1647 | * | ||
1648 | * @param peer Neighboring peer. | ||
1649 | * @param c Connection. | ||
1650 | * | ||
1651 | * @return First transmittable message. | ||
1652 | */ | ||
1653 | static struct CadetPeerQueue * | ||
1654 | connection_get_first_message (struct CadetPeer *peer, struct CadetConnection *c) | ||
1655 | { | ||
1656 | struct CadetPeerQueue *q; | ||
1657 | |||
1658 | for (q = peer->queue_head; NULL != q; q = q->next) | ||
1659 | { | ||
1660 | if (q->c != c) | ||
1661 | continue; | ||
1662 | if (queue_is_sendable (q)) | ||
1663 | { | 1188 | { |
1664 | LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable!!\n"); | 1189 | LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n", |
1665 | return q; | 1190 | GC_m2s (q->type), GC_m2s (q->payload_type), |
1191 | q->payload_id, GCC_2s (c), GC_f2s (q->c_fwd)); | ||
1192 | GNUNET_MQ_discard (q->env); | ||
1193 | call_peer_cont (q, GNUNET_NO); | ||
1194 | GNUNET_free (q); | ||
1195 | return NULL; | ||
1666 | } | 1196 | } |
1667 | LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n"); | 1197 | GNUNET_MQ_send (peer->core_mq, q->env); |
1198 | peer->queue_n++; | ||
1668 | } | 1199 | } |
1669 | 1200 | ||
1670 | return NULL; | ||
1671 | } | ||
1672 | |||
1673 | |||
1674 | /** | ||
1675 | * Get the first message for a connection and unqueue it. | ||
1676 | * | ||
1677 | * Only tunnel (or higher) level messages are unqueued. Connection specific | ||
1678 | * messages are silently destroyed upon encounter. | ||
1679 | * | ||
1680 | * @param peer Neighboring peer. | ||
1681 | * @param c Connection. | ||
1682 | * @param destroyed[in/out] Was the connection destroyed (prev/as a result)?. | ||
1683 | * Can NOT be NULL. | ||
1684 | * | ||
1685 | * @return First message for this connection. | ||
1686 | */ | ||
1687 | struct GNUNET_MessageHeader * | ||
1688 | GCP_connection_pop (struct CadetPeer *peer, | ||
1689 | struct CadetConnection *c, | ||
1690 | int *destroyed) | ||
1691 | { | ||
1692 | struct CadetPeerQueue *q; | ||
1693 | struct CadetPeerQueue *next; | ||
1694 | struct GNUNET_MessageHeader *msg; | ||
1695 | int dest; | ||
1696 | |||
1697 | GCC_check_connections (); | ||
1698 | GNUNET_assert (NULL != destroyed); | ||
1699 | LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_pop on conn %p\n", c); | ||
1700 | for (q = peer->queue_head; NULL != q; q = next) | ||
1701 | { | ||
1702 | next = q->next; | ||
1703 | if (q->c != c) | ||
1704 | continue; | ||
1705 | LOG (GNUNET_ERROR_TYPE_DEBUG, " - queued: %s (%s %u), cont: %p\n", | ||
1706 | GC_m2s (q->type), GC_m2s (q->payload_type), q->payload_id, | ||
1707 | q->cont); | ||
1708 | switch (q->type) | ||
1709 | { | ||
1710 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: | ||
1711 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: | ||
1712 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: | ||
1713 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: | ||
1714 | case GNUNET_MESSAGE_TYPE_CADET_ACK: | ||
1715 | case GNUNET_MESSAGE_TYPE_CADET_POLL: | ||
1716 | dest = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0); | ||
1717 | if (GNUNET_YES == dest) | ||
1718 | { | ||
1719 | GNUNET_break (GNUNET_NO == *destroyed); | ||
1720 | *destroyed = GNUNET_YES; | ||
1721 | } | ||
1722 | continue; | ||
1723 | |||
1724 | case GNUNET_MESSAGE_TYPE_CADET_KX: | ||
1725 | case GNUNET_MESSAGE_TYPE_CADET_AX: | ||
1726 | case GNUNET_MESSAGE_TYPE_CADET_AX_KX: | ||
1727 | msg = (struct GNUNET_MessageHeader *) q->cls; | ||
1728 | dest = GCP_queue_destroy (q, GNUNET_NO, GNUNET_NO, 0); | ||
1729 | if (GNUNET_YES == dest) | ||
1730 | { | ||
1731 | GNUNET_break (GNUNET_NO == *destroyed); | ||
1732 | *destroyed = GNUNET_YES; | ||
1733 | } | ||
1734 | return msg; | ||
1735 | |||
1736 | default: | ||
1737 | GNUNET_break (0); | ||
1738 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Unknown message %s\n", GC_m2s (q->type)); | ||
1739 | } | ||
1740 | } | ||
1741 | GCC_check_connections (); | 1201 | GCC_check_connections (); |
1742 | return NULL; | 1202 | return q; |
1743 | } | 1203 | } |
1744 | 1204 | ||
1745 | 1205 | ||
1746 | /** | 1206 | /** |
1747 | * Unlock a possibly locked queue for a connection. | 1207 | * Cancel sending a message. Message must have been sent with |
1208 | * #GCP_send before. May not be called after the notify sent | ||
1209 | * callback has been called. | ||
1748 | * | 1210 | * |
1749 | * If there is a message that can be sent on this connection, call core for it. | 1211 | * It DOES call the continuation given to #GCP_send. |
1750 | * Otherwise (if core transmit is already called or there is no sendable | ||
1751 | * message) do nothing. | ||
1752 | * | 1212 | * |
1753 | * @param peer Peer who keeps the queue. | 1213 | * @param q Queue handle to cancel |
1754 | * @param c Connection whose messages to unlock. | ||
1755 | */ | 1214 | */ |
1756 | void | 1215 | void |
1757 | GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c) | 1216 | GCP_send_cancel (struct CadetPeerQueue *q) |
1758 | { | 1217 | { |
1759 | struct CadetPeerQueue *q; | 1218 | call_peer_cont (q, GNUNET_NO); |
1760 | size_t size; | 1219 | GNUNET_MQ_send_cancel (q->env); |
1761 | 1220 | GNUNET_free (q); | |
1762 | GCC_check_connections (); | ||
1763 | if (NULL != peer->core_transmit) | ||
1764 | { | ||
1765 | LOG (GNUNET_ERROR_TYPE_DEBUG, " already unlocked!\n"); | ||
1766 | return; /* Already unlocked */ | ||
1767 | } | ||
1768 | |||
1769 | q = connection_get_first_message (peer, c); | ||
1770 | if (NULL == q) | ||
1771 | { | ||
1772 | LOG (GNUNET_ERROR_TYPE_DEBUG, " queue empty!\n"); | ||
1773 | return; /* Nothing to transmit */ | ||
1774 | } | ||
1775 | |||
1776 | size = q->size; | ||
1777 | peer->core_transmit = | ||
1778 | GNUNET_CORE_notify_transmit_ready (core_handle, | ||
1779 | GNUNET_NO, get_priority (q), | ||
1780 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1781 | GNUNET_PEER_resolve2 (peer->id), | ||
1782 | get_core_size (size), | ||
1783 | &queue_send, | ||
1784 | peer); | ||
1785 | peer->tmt_time = GNUNET_TIME_absolute_get (); | ||
1786 | GCC_check_connections (); | ||
1787 | } | 1221 | } |
1788 | 1222 | ||
1789 | 1223 | ||
@@ -1824,23 +1258,12 @@ GCP_init (const struct GNUNET_CONFIGURATION_Handle *c) | |||
1824 | LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); | 1258 | LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); |
1825 | } | 1259 | } |
1826 | ats_ch = GNUNET_ATS_connectivity_init (c); | 1260 | ats_ch = GNUNET_ATS_connectivity_init (c); |
1827 | core_handle = GNUNET_CORE_connect (c, /* Main configuration */ | 1261 | connect_to_core (c); |
1828 | NULL, /* Closure passed to CADET functions */ | ||
1829 | &core_init, /* Call core_init once connected */ | ||
1830 | &core_connect, /* Handle connects */ | ||
1831 | &core_disconnect, /* remove peers on disconnects */ | ||
1832 | NULL, /* Don't notify about all incoming messages */ | ||
1833 | GNUNET_NO, /* For header only in notification */ | ||
1834 | NULL, /* Don't notify about all outbound messages */ | ||
1835 | GNUNET_NO, /* For header-only out notification */ | ||
1836 | core_handlers); /* Register these handlers */ | ||
1837 | if (NULL == core_handle) | 1262 | if (NULL == core_handle) |
1838 | { | 1263 | { |
1839 | GNUNET_break (0); | 1264 | GNUNET_break (0); |
1840 | GNUNET_SCHEDULER_shutdown (); | 1265 | GNUNET_SCHEDULER_shutdown (); |
1841 | return; | ||
1842 | } | 1266 | } |
1843 | |||
1844 | } | 1267 | } |
1845 | 1268 | ||
1846 | 1269 | ||
@@ -1853,13 +1276,10 @@ GCP_shutdown (void) | |||
1853 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1276 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1854 | "Shutting down peer subsystem\n"); | 1277 | "Shutting down peer subsystem\n"); |
1855 | in_shutdown = GNUNET_YES; | 1278 | in_shutdown = GNUNET_YES; |
1856 | GNUNET_CONTAINER_multipeermap_iterate (peers, | ||
1857 | &shutdown_peer, | ||
1858 | NULL); | ||
1859 | if (NULL != core_handle) | 1279 | if (NULL != core_handle) |
1860 | { | 1280 | { |
1861 | GNUNET_CORE_disconnect (core_handle); | 1281 | GNUNET_CORE_disconnecT (core_handle); |
1862 | core_handle = NULL; | 1282 | core_handle = NULL; |
1863 | } | 1283 | } |
1864 | if (NULL != ats_ch) | 1284 | if (NULL != ats_ch) |
1865 | { | 1285 | { |
@@ -1867,6 +1287,12 @@ GCP_shutdown (void) | |||
1867 | ats_ch = NULL; | 1287 | ats_ch = NULL; |
1868 | } | 1288 | } |
1869 | GNUNET_PEER_change_rc (myid, -1); | 1289 | GNUNET_PEER_change_rc (myid, -1); |
1290 | /* With MQ API, CORE calls the disconnect handler for every peer | ||
1291 | * after calling GNUNET_CORE_disconnecT, shutdown must occur *after* that. | ||
1292 | */ | ||
1293 | GNUNET_CONTAINER_multipeermap_iterate (peers, | ||
1294 | &shutdown_peer, | ||
1295 | NULL); | ||
1870 | GNUNET_CONTAINER_multipeermap_destroy (peers); | 1296 | GNUNET_CONTAINER_multipeermap_destroy (peers); |
1871 | peers = NULL; | 1297 | peers = NULL; |
1872 | } | 1298 | } |
@@ -2054,7 +1480,6 @@ GCP_is_neighbor (const struct CadetPeer *peer) | |||
2054 | } | 1480 | } |
2055 | 1481 | ||
2056 | /* Is not a neighbor but connections is not NULL, probably disconnecting */ | 1482 | /* Is not a neighbor but connections is not NULL, probably disconnecting */ |
2057 | GNUNET_break (0); | ||
2058 | return GNUNET_NO; | 1483 | return GNUNET_NO; |
2059 | } | 1484 | } |
2060 | 1485 | ||
@@ -2254,7 +1679,8 @@ GCP_add_path_to_all (const struct CadetPeerPath *p, int confirmed) | |||
2254 | { | 1679 | { |
2255 | unsigned int i; | 1680 | unsigned int i; |
2256 | 1681 | ||
2257 | /* TODO: invert and add */ | 1682 | /* TODO: invert and add to origin */ |
1683 | /* TODO: replace all "GCP_add_path" with this, make the other one static */ | ||
2258 | GCC_check_connections (); | 1684 | GCC_check_connections (); |
2259 | for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ; | 1685 | for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ; |
2260 | for (i++; i < p->length; i++) | 1686 | for (i++; i < p->length; i++) |