aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet_peer.c
diff options
context:
space:
mode:
authorBart Polot <bart@net.in.tum.de>2016-09-20 01:21:59 +0000
committerBart Polot <bart@net.in.tum.de>2016-09-20 01:21:59 +0000
commitb4d5f474eef10017a470dccb01dae86c32bd5ddb (patch)
tree4b97bb46f4ab15c732e284ef0b275cc0dbc3173a /src/cadet/gnunet-service-cadet_peer.c
parent506899aa2be2b4d5dc09c1740969c28ddf43c82d (diff)
downloadgnunet-b4d5f474eef10017a470dccb01dae86c32bd5ddb.tar.gz
gnunet-b4d5f474eef10017a470dccb01dae86c32bd5ddb.zip
Port CADET to CORE MQ API
Diffstat (limited to 'src/cadet/gnunet-service-cadet_peer.c')
-rw-r--r--src/cadet/gnunet-service-cadet_peer.c1404
1 files changed, 415 insertions, 989 deletions
diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c
index 64d9168fd..5ccd8f014 100644
--- a/src/cadet/gnunet-service-cadet_peer.c
+++ b/src/cadet/gnunet-service-cadet_peer.c
@@ -44,164 +44,145 @@
44/******************************** STRUCTS **********************************/ 44/******************************** STRUCTS **********************************/
45/******************************************************************************/ 45/******************************************************************************/
46 46
47
47/** 48/**
48 * Struct containing info about a queued transmission to this peer 49 * Struct containing all information regarding a given peer
49 */ 50 */
50struct CadetPeerQueue 51struct CadetPeer
51{ 52{
52 /** 53 /**
53 * DLL next 54 * ID of the peer
54 */ 55 */
55 struct CadetPeerQueue *next; 56 GNUNET_PEER_Id id;
56 57
57 /** 58 /**
58 * DLL previous 59 * Last time we heard from this peer
59 */ 60 */
60 struct CadetPeerQueue *prev; 61 struct GNUNET_TIME_Absolute last_contact;
61 62
62 /** 63 /**
63 * Peer this transmission is directed to. 64 * Paths to reach the peer, ordered by ascending hop count
64 */ 65 */
65 struct CadetPeer *peer; 66 struct CadetPeerPath *path_head;
66 67
67 /** 68 /**
68 * Connection this message belongs to. 69 * Paths to reach the peer, ordered by ascending hop count
69 */ 70 */
70 struct CadetConnection *c; 71 struct CadetPeerPath *path_tail;
71 72
72 /** 73 /**
73 * Is FWD in c? 74 * Handle to stop the DHT search for paths to this peer
74 */ 75 */
75 int fwd; 76 struct GCD_search_handle *search_h;
76 77
77 /** 78 /**
78 * Pointer to info stucture used as cls. 79 * Handle to stop the DHT search for paths to this peer
79 */ 80 */
80 void *cls; 81 struct GNUNET_SCHEDULER_Task *search_delayed;
81 82
82 /** 83 /**
83 * Type of message 84 * Tunnel to this peer, if any.
84 */ 85 */
85 uint16_t type; 86 struct CadetTunnel *tunnel;
86 87
87 /** 88 /**
88 * Type of message 89 * Connections that go through this peer; indexed by tid.
89 */ 90 */
90 uint16_t payload_type; 91 struct GNUNET_CONTAINER_MultiHashMap *connections;
91 92
92 /** 93 /**
93 * Type of message 94 * Handle for core transmissions.
94 */ 95 */
95 uint32_t payload_id; 96 struct GNUNET_MQ_Handle *core_mq;
96 97
97 /** 98 /**
98 * Size of the message 99 * How many messages are in the queue to this peer.
99 */ 100 */
100 size_t size; 101 unsigned int queue_n;
101 102
102 /** 103 /**
103 * Set when this message starts waiting for CORE. 104 * Hello message.
104 */ 105 */
105 struct GNUNET_TIME_Absolute start_waiting; 106 struct GNUNET_HELLO_Message* hello;
106 107
107 /** 108 /**
108 * Function to call on sending. 109 * Handle to us offering the HELLO to the transport.
109 */ 110 */
110 GCP_sent cont; 111 struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer;
111 112
112 /** 113 /**
113 * Closure for callback. 114 * Handle to our ATS request asking ATS to suggest an address
115 * to TRANSPORT for this peer (to establish a direct link).
114 */ 116 */
115 void *cont_cls; 117 struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion;
118
116}; 119};
117 120
118 121
119/** 122/**
120 * Struct containing all information regarding a given peer 123 * Information about a queued message on the peer level.
121 */ 124 */
122struct CadetPeer 125struct CadetPeerQueue {
123{
124 /**
125 * ID of the peer
126 */
127 GNUNET_PEER_Id id;
128 126
129 /** 127 /**
130 * Last time we heard from this peer 128 * Envelope to cancel message before MQ sends it.
131 */
132 struct GNUNET_TIME_Absolute last_contact;
133
134 /**
135 * Paths to reach the peer, ordered by ascending hop count
136 */ 129 */
137 struct CadetPeerPath *path_head; 130 struct GNUNET_MQ_Envelope *env;
138 131
139 /** 132 /**
140 * Paths to reach the peer, ordered by ascending hop count 133 * Peer (neighbor) this message is being sent to.
141 */ 134 */
142 struct CadetPeerPath *path_tail; 135 struct CadetPeer *peer;
143
144 /**
145 * Handle to stop the DHT search for paths to this peer
146 */
147 struct GCD_search_handle *search_h;
148
149 /**
150 * Handle to stop the DHT search for paths to this peer
151 */
152 struct GNUNET_SCHEDULER_Task *search_delayed;
153 136
154 /** 137 /**
155 * Tunnel to this peer, if any. 138 * Continuation to call to notify higher layers about message sent.
156 */ 139 */
157 struct CadetTunnel *tunnel; 140 GCP_sent cont;
158 141
159 /** 142 /**
160 * Connections that go through this peer; indexed by tid. 143 * Closure for @a cont.
161 */ 144 */
162 struct GNUNET_CONTAINER_MultiHashMap *connections; 145 void *cont_cls;
163 146
164 /** 147 /**
165 * Handle for queued transmissions 148 * Time when message was queued for sending.
166 */ 149 */
167 struct GNUNET_CORE_TransmitHandle *core_transmit; 150 struct GNUNET_TIME_Absolute queue_timestamp;
168 151
169 /** 152 /**
170 * Timestamp 153 * #GNUNET_YES if message was management traffic (POLL, ACK, ...).
171 */ 154 */
172 struct GNUNET_TIME_Absolute tmt_time; 155 int management_traffic;
173 156
174 /** 157 /**
175 * Transmission queue to core DLL head 158 * Message type.
176 */ 159 */
177 struct CadetPeerQueue *queue_head; 160 uint16_t type;
178 161
179 /** 162 /**
180 * Transmission queue to core DLL tail 163 * Message size.
181 */ 164 */
182 struct CadetPeerQueue *queue_tail; 165 uint16_t size;
183 166
184 /** 167 /**
185 * How many messages are in the queue to this peer. 168 * Type of the message's payload, if it was encrypted data.
186 */ 169 */
187 unsigned int queue_n; 170 uint16_t payload_type;
188 171
189 /** 172 /**
190 * Hello message. 173 *ID of the payload (PID, ACK #, ...).
191 */ 174 */
192 struct GNUNET_HELLO_Message* hello; 175 uint16_t payload_id;
193 176
194 /** 177 /**
195 * Handle to us offering the HELLO to the transport. 178 * Connection this message was sent on.
196 */ 179 */
197 struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer; 180 struct CadetConnection *c;
198 181
199 /** 182 /**
200 * Handle to our ATS request asking ATS to suggest an address 183 * Direction in @a c this message was send on (#GNUNET_YES = FWD).
201 * to TRANSPORT for this peer (to establish a direct link).
202 */ 184 */
203 struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion; 185 int c_fwd;
204
205}; 186};
206 187
207 188
@@ -261,98 +242,6 @@ static int in_shutdown;
261 242
262 243
263/******************************************************************************/ 244/******************************************************************************/
264/***************************** DEBUG *********************************/
265/******************************************************************************/
266
267/**
268 * Log all kinds of info about the queueing status of a peer.
269 *
270 * @param p Peer whose queue to show.
271 * @param level Error level to use for logging.
272 */
273static void
274queue_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
275{
276 struct GNUNET_TIME_Relative core_wait_time;
277 struct CadetPeerQueue *q;
278 int do_log;
279
280 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
281 "cadet-p2p",
282 __FILE__, __FUNCTION__, __LINE__);
283 if (0 == do_log)
284 return;
285
286 LOG2 (level, "QQQ Message queue towards %s\n", GCP_2s (p));
287 LOG2 (level, "QQQ queue length: %u\n", p->queue_n);
288 LOG2 (level, "QQQ core tmt rdy: %p\n", p->core_transmit);
289 if (NULL != p->core_transmit)
290 {
291 core_wait_time = GNUNET_TIME_absolute_get_duration (p->tmt_time);
292 LOG2 (level, "QQQ core called %s ago\n",
293 GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO));
294 }
295 for (q = p->queue_head; NULL != q; q = q->next)
296 {
297 LOG2 (level, "QQQ - %s %s on %s\n",
298 GC_m2s (q->type), GC_f2s (q->fwd), GCC_2s (q->c));
299 LOG2 (level, "QQQ payload %s, %u\n",
300 GC_m2s (q->payload_type), q->payload_id);
301 LOG2 (level, "QQQ size: %u bytes\n", q->size);
302 }
303
304 LOG2 (level, "QQQ End queue towards %s\n", GCP_2s (p));
305}
306
307
308/**
309 * Log all kinds of info about a peer.
310 *
311 * @param peer Peer.
312 */
313void
314GCP_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
315{
316 struct CadetPeerPath *path;
317 unsigned int conns;
318 int do_log;
319
320 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
321 "cadet-p2p",
322 __FILE__, __FUNCTION__, __LINE__);
323 if (0 == do_log)
324 return;
325
326 if (NULL == p)
327 {
328 LOG2 (level, "PPP DEBUG PEER NULL\n");
329 return;
330 }
331
332 LOG2 (level, "PPP DEBUG PEER %s\n", GCP_2s (p));
333 LOG2 (level, "PPP last contact %s\n",
334 GNUNET_STRINGS_absolute_time_to_string (p->last_contact));
335 for (path = p->path_head; NULL != path; path = path->next)
336 {
337 char *s;
338
339 s = path_2s (path);
340 LOG2 (level, "PPP path: %s\n", s);
341 GNUNET_free (s);
342 }
343
344 LOG2 (level, "PPP core transmit handle %p\n", p->core_transmit);
345 LOG2 (level, "PPP DHT GET handle %p\n", p->search_h);
346 conns = 0;
347 if (NULL != p->connections)
348 conns += GNUNET_CONTAINER_multihashmap_size (p->connections);
349 LOG2 (level, "PPP # connections over link to peer: %u\n", conns);
350 queue_debug (p, level);
351 LOG2 (level, "PPP DEBUG END\n");
352}
353
354
355/******************************************************************************/
356/***************************** CORE HELPERS *********************************/ 245/***************************** CORE HELPERS *********************************/
357/******************************************************************************/ 246/******************************************************************************/
358 247
@@ -415,12 +304,16 @@ pop_direct_path (struct CadetPeer *peer)
415/** 304/**
416 * Method called whenever a given peer connects. 305 * Method called whenever a given peer connects.
417 * 306 *
418 * @param cls closure 307 * @param cls Core closure (unused).
419 * @param peer peer identity this notification is about 308 * @param peer Peer identity this notification is about
309 * @param mq Message Queue to this peer.
310 *
311 * @return Internal closure for handlers (CadetPeer struct).
420 */ 312 */
421static void 313static void *
422core_connect (void *cls, 314core_connect_handler (void *cls,
423 const struct GNUNET_PeerIdentity *peer) 315 const struct GNUNET_PeerIdentity *peer,
316 struct GNUNET_MQ_Handle *mq)
424{ 317{
425 struct CadetPeer *neighbor; 318 struct CadetPeer *neighbor;
426 struct CadetPeerPath *path; 319 struct CadetPeerPath *path;
@@ -431,6 +324,8 @@ core_connect (void *cls,
431 sizeof (own_id), 324 sizeof (own_id),
432 "%s", 325 "%s",
433 GNUNET_i2s (&my_full_id)); 326 GNUNET_i2s (&my_full_id));
327
328 /* Save a path to the neighbor */
434 neighbor = GCP_get (peer, GNUNET_YES); 329 neighbor = GCP_get (peer, GNUNET_YES);
435 if (myid == neighbor->id) 330 if (myid == neighbor->id)
436 { 331 {
@@ -448,11 +343,14 @@ core_connect (void *cls,
448 path = path_new (2); 343 path = path_new (2);
449 path->peers[1] = neighbor->id; 344 path->peers[1] = neighbor->id;
450 GNUNET_PEER_change_rc (neighbor->id, 1); 345 GNUNET_PEER_change_rc (neighbor->id, 1);
346 GNUNET_assert (NULL == neighbor->core_mq);
347 neighbor->core_mq = mq;
451 } 348 }
452 path->peers[0] = myid; 349 path->peers[0] = myid;
453 GNUNET_PEER_change_rc (myid, 1); 350 GNUNET_PEER_change_rc (myid, 1);
454 GCP_add_path (neighbor, path, GNUNET_YES); 351 GCP_add_path (neighbor, path, GNUNET_YES);
455 352
353 /* Create the connections hashmap */
456 GNUNET_assert (NULL == neighbor->connections); 354 GNUNET_assert (NULL == neighbor->connections);
457 neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO); 355 neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO);
458 GNUNET_STATISTICS_update (stats, 356 GNUNET_STATISTICS_update (stats,
@@ -462,42 +360,47 @@ core_connect (void *cls,
462 360
463 if ( (NULL != GCP_get_tunnel (neighbor)) && 361 if ( (NULL != GCP_get_tunnel (neighbor)) &&
464 (0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) ) 362 (0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) )
363 {
465 GCP_connect (neighbor); 364 GCP_connect (neighbor);
365 }
466 GCC_check_connections (); 366 GCC_check_connections ();
367
368 return neighbor;
467} 369}
468 370
469 371
470/** 372/**
471 * Method called whenever a peer disconnects. 373 * Method called whenever a peer disconnects.
472 * 374 *
473 * @param cls closure 375 * @param cls Core closure (unused).
474 * @param peer peer identity this notification is about 376 * @param peer Peer identity this notification is about.
377 * @param internal_cls Internal closure (CadetPeer struct).
475 */ 378 */
476static void 379static void
477core_disconnect (void *cls, 380core_disconnect_handler (void *cls,
478 const struct GNUNET_PeerIdentity *peer) 381 const struct GNUNET_PeerIdentity *peer,
382 void *internal_cls)
479{ 383{
480 struct CadetPeer *p; 384 struct CadetPeer *p = internal_cls;
481 struct CadetPeerPath *direct_path; 385 struct CadetPeerPath *direct_path;
482 char own_id[16]; 386 char own_id[16];
483 387
484 GCC_check_connections (); 388 GCC_check_connections ();
485 strncpy (own_id, GNUNET_i2s (&my_full_id), 16); 389 strncpy (own_id, GNUNET_i2s (&my_full_id), 16);
486 own_id[15] = '\0'; 390 own_id[15] = '\0';
487 p = GNUNET_CONTAINER_multipeermap_get (peers, peer);
488 if (NULL == p)
489 {
490 GNUNET_break (GNUNET_YES == in_shutdown);
491 return;
492 }
493 if (myid == p->id) 391 if (myid == p->id)
392 {
494 LOG (GNUNET_ERROR_TYPE_INFO, 393 LOG (GNUNET_ERROR_TYPE_INFO,
495 "DISCONNECTED %s (self)\n", 394 "DISCONNECTED %s (self)\n",
496 own_id); 395 own_id);
396 }
497 else 397 else
398 {
498 LOG (GNUNET_ERROR_TYPE_INFO, 399 LOG (GNUNET_ERROR_TYPE_INFO,
499 "DISCONNECTED %s <= %s\n", 400 "DISCONNECTED %s <= %s\n",
500 own_id, GNUNET_i2s (peer)); 401 own_id, GNUNET_i2s (peer));
402 p->core_mq = NULL;
403 }
501 direct_path = pop_direct_path (p); 404 direct_path = pop_direct_path (p);
502 if (NULL != p->connections) 405 if (NULL != p->connections)
503 { 406 {
@@ -507,12 +410,6 @@ core_disconnect (void *cls,
507 GNUNET_CONTAINER_multihashmap_destroy (p->connections); 410 GNUNET_CONTAINER_multihashmap_destroy (p->connections);
508 p->connections = NULL; 411 p->connections = NULL;
509 } 412 }
510 if (NULL != p->core_transmit)
511 {
512 GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit);
513 p->core_transmit = NULL;
514 p->tmt_time.abs_value_us = 0;
515 }
516 GNUNET_STATISTICS_update (stats, 413 GNUNET_STATISTICS_update (stats,
517 "# peers", 414 "# peers",
518 -1, 415 -1,
@@ -522,167 +419,283 @@ core_disconnect (void *cls,
522} 419}
523 420
524 421
422/******************************************************************************/
423/******************************************************************************/
424/******************************************************************************/
425/******************************************************************************/
426/******************************************************************************/
427
525/** 428/**
526 * Functions to handle messages from core 429 * Check if the create_connection message has the appropriate size.
527 */ 430 *
528static struct GNUNET_CORE_MessageHandler core_handlers[] = { 431 * @param cls Closure (unused).
529 {&GCC_handle_create, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0}, 432 * @param msg Message to check.
530 {&GCC_handle_confirm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 433 *
531 sizeof (struct GNUNET_CADET_ConnectionACK)}, 434 * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
532 {&GCC_handle_broken, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, 435 */
533 sizeof (struct GNUNET_CADET_ConnectionBroken)}, 436static int
534 {&GCC_handle_destroy, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, 437check_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg)
535 sizeof (struct GNUNET_CADET_ConnectionDestroy)}, 438{
536 {&GCC_handle_ack, GNUNET_MESSAGE_TYPE_CADET_ACK, 439 uint16_t size;
537 sizeof (struct GNUNET_CADET_ACK)},
538 {&GCC_handle_poll, GNUNET_MESSAGE_TYPE_CADET_POLL,
539 sizeof (struct GNUNET_CADET_Poll)},
540 {&GCC_handle_kx, GNUNET_MESSAGE_TYPE_CADET_KX, 0},
541 {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_AX, 0},
542 {NULL, 0, 0}
543};
544 440
441 size = ntohs (msg->header.size);
442 if (size < sizeof (*msg))
443 {
444 GNUNET_break_op (0);
445 return GNUNET_NO;
446 }
447 return GNUNET_YES;
448}
545 449
546/** 450/**
547 * To be called on core init/fail. 451 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
548 * 452 *
549 * @param cls Closure (config) 453 * @param cls Closure (CadetPeer for neighbor that sent the message).
550 * @param identity the public identity of this peer 454 * @param msg Message itself.
551 */ 455 */
552static void 456static void
553core_init (void *cls, 457handle_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg)
554 const struct GNUNET_PeerIdentity *identity)
555{ 458{
556 const struct GNUNET_CONFIGURATION_Handle *c = cls; 459 struct CadetPeer *peer = cls;
557 static int i = 0; 460 GCC_handle_create (peer, msg);
461}
558 462
559 LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); 463
560 if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id))) 464/**
561 { 465 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK
562 LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n")); 466 *
563 LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (identity)); 467 * @param cls Closure (CadetPeer for neighbor that sent the message).
564 LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id)); 468 * @param msg Message itself.
565 GNUNET_CORE_disconnect (core_handle); 469 */
566 core_handle = GNUNET_CORE_connect (c, /* Main configuration */ 470static void
567 NULL, /* Closure passed to CADET functions */ 471handle_confirm (void *cls, const struct GNUNET_CADET_ConnectionACK *msg)
568 &core_init, /* Call core_init once connected */ 472{
569 &core_connect, /* Handle connects */ 473 struct CadetPeer *peer = cls;
570 &core_disconnect, /* remove peers on disconnects */ 474 GCC_handle_confirm (peer, msg);
571 NULL, /* Don't notify about all incoming messages */
572 GNUNET_NO, /* For header only in notification */
573 NULL, /* Don't notify about all outbound messages */
574 GNUNET_NO, /* For header-only out notification */
575 core_handlers); /* Register these handlers */
576 if (10 < i++)
577 GNUNET_assert (0);
578 }
579 GML_start ();
580} 475}
581 476
582 477
583/** 478/**
584 * Core callback to write a pre-constructed data packet to core buffer 479 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
585 * 480 *
586 * @param cls Closure (CadetTransmissionDescriptor with data in "data" member). 481 * @param cls Closure (CadetPeer for neighbor that sent the message).
587 * @param size Number of bytes available in buf. 482 * @param msg Message itself.
588 * @param buf Where the to write the message. 483 */
589 * 484static void
590 * @return number of bytes written to buf 485handle_broken (void *cls, const struct GNUNET_CADET_ConnectionBroken *msg)
591 */
592static size_t
593send_core_data_raw (void *cls, size_t size, void *buf)
594{ 486{
595 struct GNUNET_MessageHeader *msg = cls; 487 struct CadetPeer *peer = cls;
596 size_t total_size; 488 GCC_handle_broken (peer, msg);
489}
597 490
598 GNUNET_assert (NULL != msg);
599 total_size = ntohs (msg->size);
600 491
601 if (total_size > size) 492/**
602 { 493 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
603 GNUNET_break (0); 494 *
604 return 0; 495 * @param cls Closure (CadetPeer for neighbor that sent the message).
605 } 496 * @param msg Message itself.
606 GNUNET_memcpy (buf, msg, total_size); 497 */
607 GNUNET_free (cls); 498static void
608 return total_size; 499handle_destroy (void *cls, const struct GNUNET_CADET_ConnectionDestroy *msg)
500{
501 struct CadetPeer *peer = cls;
502 GCC_handle_destroy (peer, msg);
609} 503}
610 504
611 505
612/** 506/**
613 * Function to send a create connection message to a peer. 507 * Handle for #GNUNET_MESSAGE_TYPE_CADET_ACK
614 * 508 *
615 * @param c Connection to create. 509 * @param cls Closure (CadetPeer for neighbor that sent the message).
616 * @param size number of bytes available in buf 510 * @param msg Message itself.
617 * @param buf where the callee should write the message
618 * @return number of bytes written to buf
619 */ 511 */
620static size_t 512static void
621send_core_connection_create (struct CadetConnection *c, size_t size, void *buf) 513handle_ack (void *cls, const struct GNUNET_CADET_ACK *msg)
622{ 514{
623 struct GNUNET_CADET_ConnectionCreate *msg; 515 struct CadetPeer *peer = cls;
624 struct GNUNET_PeerIdentity *peer_ptr; 516 GCC_handle_ack (peer, msg);
625 const struct CadetPeerPath *p = GCC_get_path (c); 517}
626 size_t size_needed;
627 int i;
628 518
629 if (NULL == p)
630 return 0;
631 519
632 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n"); 520/**
633 size_needed = 521 * Handle for #GNUNET_MESSAGE_TYPE_CADET_POLL
634 sizeof (struct GNUNET_CADET_ConnectionCreate) + 522 *
635 p->length * sizeof (struct GNUNET_PeerIdentity); 523 * @param cls Closure (CadetPeer for neighbor that sent the message).
524 * @param msg Message itself.
525 */
526static void
527handle_poll (void *cls, const struct GNUNET_CADET_Poll *msg)
528{
529 struct CadetPeer *peer = cls;
530 GCC_handle_poll (peer, msg);
531}
636 532
637 if (size < size_needed || NULL == buf) 533
534/**
535 * Check if the Key eXchange message has the appropriate size.
536 *
537 * @param cls Closure (unused).
538 * @param msg Message to check.
539 *
540 * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
541 */
542static int
543check_kx (void *cls, const struct GNUNET_CADET_KX *msg)
544{
545 uint16_t size;
546 uint16_t expected_size;
547
548 size = ntohs (msg->header.size);
549 expected_size = sizeof (struct GNUNET_CADET_KX)
550 + sizeof (struct GNUNET_MessageHeader);
551
552 if (size < expected_size)
638 { 553 {
639 GNUNET_break (0); 554 GNUNET_break_op (0);
640 return 0; 555 return GNUNET_NO;
641 } 556 }
642 msg = (struct GNUNET_CADET_ConnectionCreate *) buf; 557 return GNUNET_YES;
643 msg->header.size = htons (size_needed); 558}
644 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
645 msg->cid = *GCC_get_id (c);
646 559
647 peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1]; 560/**
648 for (i = 0; i < p->length; i++) 561 * Handle for #GNUNET_MESSAGE_TYPE_CADET_KX
562 *
563 * @param cls Closure (CadetPeer for neighbor that sent the message).
564 * @param msg Message itself.
565 */
566static void
567handle_kx (void *cls, const struct GNUNET_CADET_KX *msg)
568{
569 struct CadetPeer *peer = cls;
570 GCC_handle_kx (peer, msg);
571}
572
573
574/**
575 * Check if the encrypted message has the appropriate size.
576 *
577 * @param cls Closure (unused).
578 * @param msg Message to check.
579 *
580 * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
581 */
582static int
583check_encrypted (void *cls, const struct GNUNET_CADET_AX *msg)
584{
585 uint16_t size;
586 uint16_t minimum_size;
587
588 size = ntohs (msg->header.size);
589 minimum_size = sizeof (struct GNUNET_CADET_AX)
590 + sizeof (struct GNUNET_MessageHeader);
591
592 if (size < minimum_size)
649 { 593 {
650 GNUNET_PEER_resolve (p->peers[i], peer_ptr++); 594 GNUNET_break_op (0);
595 return GNUNET_NO;
651 } 596 }
597 return GNUNET_YES;
598}
652 599
653 LOG (GNUNET_ERROR_TYPE_DEBUG, 600/**
654 "CONNECTION CREATE (%u bytes long) sent!\n", 601 * Handle for #GNUNET_MESSAGE_TYPE_CADET_AX (AXolotl encrypted traffic).
655 size_needed); 602 *
656 return size_needed; 603 * @param cls Closure (CadetPeer for neighbor that sent the message).
604 * @param msg Message itself.
605 */
606static void
607handle_encrypted (void *cls, const struct GNUNET_CADET_AX *msg)
608{
609 struct CadetPeer *peer = cls;
610 GCC_handle_encrypted (peer, msg);
657} 611}
658 612
659 613
660/** 614/**
661 * Creates a path ack message in buf and frees all unused resources. 615 * To be called on core init/fail.
662 * 616 *
663 * @param c Connection to send an ACK on. 617 * @param cls Closure (config)
664 * @param size number of bytes available in buf 618 * @param identity The public identity of this peer.
665 * @param buf where the callee should write the message 619 */
620static void
621core_init_notify (void *cls,
622 const struct GNUNET_PeerIdentity *identity);
623
624
625static void
626connect_to_core (const struct GNUNET_CONFIGURATION_Handle *c)
627{
628 struct GNUNET_MQ_MessageHandler core_handlers[] = {
629 GNUNET_MQ_hd_var_size (create,
630 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
631 struct GNUNET_CADET_ConnectionCreate,
632 NULL),
633 GNUNET_MQ_hd_fixed_size (confirm,
634 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK,
635 struct GNUNET_CADET_ConnectionACK,
636 NULL),
637 GNUNET_MQ_hd_fixed_size (broken,
638 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
639 struct GNUNET_CADET_ConnectionBroken,
640 NULL),
641 GNUNET_MQ_hd_fixed_size (destroy,
642 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
643 struct GNUNET_CADET_ConnectionDestroy,
644 NULL),
645 GNUNET_MQ_hd_fixed_size (ack,
646 GNUNET_MESSAGE_TYPE_CADET_ACK,
647 struct GNUNET_CADET_ACK,
648 NULL),
649 GNUNET_MQ_hd_fixed_size (poll,
650 GNUNET_MESSAGE_TYPE_CADET_POLL,
651 struct GNUNET_CADET_Poll,
652 NULL),
653 GNUNET_MQ_hd_var_size (kx,
654 GNUNET_MESSAGE_TYPE_CADET_KX,
655 struct GNUNET_CADET_KX,
656 NULL),
657 GNUNET_MQ_hd_var_size (encrypted,
658 GNUNET_MESSAGE_TYPE_CADET_AX,
659 struct GNUNET_CADET_AX,
660 NULL),
661 GNUNET_MQ_handler_end ()
662 };
663 core_handle = GNUNET_CORE_connecT (c, NULL,
664 &core_init_notify,
665 &core_connect_handler,
666 &core_disconnect_handler,
667 core_handlers);
668}
669
670/******************************************************************************/
671/******************************************************************************/
672/******************************************************************************/
673/******************************************************************************/
674/******************************************************************************/
675
676/**
677 * To be called on core init/fail.
666 * 678 *
667 * @return number of bytes written to buf 679 * @param cls Closure (config)
680 * @param identity The public identity of this peer.
668 */ 681 */
669static size_t 682static void
670send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf) 683core_init_notify (void *cls,
684 const struct GNUNET_PeerIdentity *core_identity)
671{ 685{
672 struct GNUNET_CADET_ConnectionACK *msg = buf; 686 const struct GNUNET_CONFIGURATION_Handle *c = cls;
673 687
674 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n"); 688 LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
675 if (sizeof (struct GNUNET_CADET_ConnectionACK) > size) 689 if (0 != memcmp (core_identity, &my_full_id, sizeof (my_full_id)))
676 { 690 {
677 GNUNET_break (0); 691 LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n"));
678 return 0; 692 LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (core_identity));
693 LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id));
694 GNUNET_CORE_disconnecT (core_handle);
695 connect_to_core (c);
696 return;
679 } 697 }
680 msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK)); 698 GML_start ();
681 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK);
682 msg->cid = *GCC_get_id (c);
683
684 LOG (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n");
685 return sizeof (struct GNUNET_CADET_ConnectionACK);
686} 699}
687 700
688 701
@@ -697,8 +710,11 @@ send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf)
697 * @param q Queued message 710 * @param q Queued message
698 * 711 *
699 * @return CORE priority to use. 712 * @return CORE priority to use.
713 *
714 * FIXME make static
715 * FIXME use when sending
700 */ 716 */
701static enum GNUNET_CORE_Priority 717enum GNUNET_CORE_Priority
702get_priority (struct CadetPeerQueue *q) 718get_priority (struct CadetPeerQueue *q)
703{ 719{
704 enum GNUNET_CORE_Priority low; 720 enum GNUNET_CORE_Priority low;
@@ -711,7 +727,7 @@ get_priority (struct CadetPeerQueue *q)
711 } 727 }
712 728
713 /* Relayed traffic has lower priority, our own traffic has higher */ 729 /* Relayed traffic has lower priority, our own traffic has higher */
714 if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->fwd)) 730 if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->c_fwd))
715 { 731 {
716 low = GNUNET_CORE_PRIO_BEST_EFFORT; 732 low = GNUNET_CORE_PRIO_BEST_EFFORT;
717 high = GNUNET_CORE_PRIO_URGENT; 733 high = GNUNET_CORE_PRIO_URGENT;
@@ -784,20 +800,6 @@ peer_destroy (struct CadetPeer *peer)
784 GNUNET_ATS_connectivity_suggest_cancel (peer->connectivity_suggestion); 800 GNUNET_ATS_connectivity_suggest_cancel (peer->connectivity_suggestion);
785 peer->connectivity_suggestion = NULL; 801 peer->connectivity_suggestion = NULL;
786 } 802 }
787 while (NULL != peer->queue_head)
788 {
789 /* This function destroys the current peer->queue_head but
790 * replaces it with the next in the queue, so it is correct
791 * to while() here.
792 */
793 GCP_queue_destroy (peer->queue_head, GNUNET_YES, GNUNET_NO, 0);
794 }
795 if (NULL != peer->core_transmit)
796 {
797 GNUNET_break (0); /* GCP_queue_destroy should've cancelled it! */
798 GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
799 peer->core_transmit = NULL;
800 }
801 803
802 GNUNET_free_non_null (peer->hello); 804 GNUNET_free_non_null (peer->hello);
803 GNUNET_free (peer); 805 GNUNET_free (peer);
@@ -831,7 +833,6 @@ shutdown_peer (void *cls,
831} 833}
832 834
833 835
834
835/** 836/**
836 * Check if peer is searching for a path (either active or delayed search). 837 * Check if peer is searching for a path (either active or delayed search).
837 * 838 *
@@ -996,64 +997,6 @@ peer_get_best_path (const struct CadetPeer *peer)
996 997
997 998
998/** 999/**
999 * Is this queue element sendable?
1000 *
1001 * - All management traffic is always sendable.
1002 * - For payload traffic, check the connection flow control.
1003 *
1004 * @param q Queue element to inspect.
1005 * @return #GNUNET_YES if it is sendable, #GNUNET_NO otherwise.
1006 */
1007static int
1008queue_is_sendable (struct CadetPeerQueue *q)
1009{
1010 /* Is PID-independent? */
1011 switch (q->type)
1012 {
1013 case GNUNET_MESSAGE_TYPE_CADET_ACK:
1014 case GNUNET_MESSAGE_TYPE_CADET_POLL:
1015 case GNUNET_MESSAGE_TYPE_CADET_KX:
1016 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1017 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1018 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1019 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1020 return GNUNET_YES;
1021
1022 case GNUNET_MESSAGE_TYPE_CADET_AX:
1023 break;
1024
1025 default:
1026 GNUNET_break (0);
1027 }
1028
1029 return GCC_is_sendable (q->c, q->fwd);
1030}
1031
1032
1033/**
1034 * Get first sendable message.
1035 *
1036 * @param peer The destination peer.
1037 *
1038 * @return First transmittable message, if any. Otherwise, NULL.
1039 */
1040static struct CadetPeerQueue *
1041peer_get_first_message (const struct CadetPeer *peer)
1042{
1043 struct CadetPeerQueue *q;
1044
1045 for (q = peer->queue_head; NULL != q; q = q->next)
1046 {
1047 LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking q:%p on c:%s\n", q, GCC_2s (q->c));
1048 if (queue_is_sendable (q))
1049 return q;
1050 }
1051
1052 return NULL;
1053}
1054
1055
1056/**
1057 * Function to process paths received for a new peer addition. The recorded 1000 * Function to process paths received for a new peer addition. The recorded
1058 * paths form the initial tunnel, which can be optimized later. 1001 * paths form the initial tunnel, which can be optimized later.
1059 * Called on each result obtained for the DHT search. 1002 * Called on each result obtained for the DHT search.
@@ -1090,19 +1033,6 @@ search_handler (void *cls, const struct CadetPeerPath *path)
1090 1033
1091 1034
1092/** 1035/**
1093 * Adjust core requested size to accomodate an ACK.
1094 *
1095 * @param message_size Requested size.
1096 *
1097 * @return Size enough to fit @c message_size and an ACK.
1098 */
1099static size_t
1100get_core_size (size_t message_size)
1101{
1102 return message_size + sizeof (struct GNUNET_CADET_ACK);
1103}
1104
1105/**
1106 * Test if a message type is connection management traffic 1036 * Test if a message type is connection management traffic
1107 * or regular payload traffic. 1037 * or regular payload traffic.
1108 * 1038 *
@@ -1119,85 +1049,13 @@ is_connection_management (uint16_t type)
1119 1049
1120 1050
1121/** 1051/**
1122 * Fill a core buffer with the appropriate data for the queued message.
1123 *
1124 * @param queue Queue element for the message.
1125 * @param buf Core buffer to fill.
1126 * @param size Size remaining in @c buf.
1127 * @param[out] pid In case its an encrypted payload, set payload.
1128 *
1129 * @return Bytes written to @c buf.
1130 */
1131static size_t
1132fill_buf (struct CadetPeerQueue *queue, void *buf, size_t size, uint32_t *pid)
1133{
1134 struct CadetConnection *c = queue->c;
1135 size_t msg_size;
1136
1137 switch (queue->type)
1138 {
1139 case GNUNET_MESSAGE_TYPE_CADET_AX:
1140 *pid = GCC_get_pid (queue->c, queue->fwd);
1141 LOG (GNUNET_ERROR_TYPE_DEBUG, " ax payload ID %u\n", *pid);
1142 msg_size = send_core_data_raw (queue->cls, size, buf);
1143 ((struct GNUNET_CADET_AX *) buf)->pid = htonl (*pid);
1144 break;
1145 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1146 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1147 case GNUNET_MESSAGE_TYPE_CADET_KX:
1148 case GNUNET_MESSAGE_TYPE_CADET_ACK:
1149 case GNUNET_MESSAGE_TYPE_CADET_POLL:
1150 LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type));
1151 msg_size = send_core_data_raw (queue->cls, size, buf);
1152 break;
1153 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1154 LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n");
1155 if (GCC_is_origin (c, GNUNET_YES))
1156 msg_size = send_core_connection_create (c, size, buf);
1157 else
1158 msg_size = send_core_data_raw (queue->cls, size, buf);
1159 break;
1160 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1161 LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n");
1162 if (GCC_is_origin (c, GNUNET_NO) ||
1163 GCC_is_origin (c, GNUNET_YES))
1164 {
1165 msg_size = send_core_connection_ack (c, size, buf);
1166 }
1167 else
1168 {
1169 msg_size = send_core_data_raw (queue->cls, size, buf);
1170 }
1171 break;
1172 case GNUNET_MESSAGE_TYPE_CADET_DATA:
1173 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1174 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1175 /* This should be encapsulted */
1176 msg_size = 0;
1177 GNUNET_assert (0);
1178 break;
1179 default:
1180 GNUNET_break (0);
1181 LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type);
1182 msg_size = 0;
1183 }
1184
1185 GNUNET_assert (size >= msg_size);
1186
1187 return msg_size;
1188}
1189
1190
1191/**
1192 * Debug function should NEVER return true in production code, useful to 1052 * Debug function should NEVER return true in production code, useful to
1193 * simulate losses for testcases. 1053 * simulate losses for testcases.
1194 * 1054 *
1195 * @param q Queue handle with info about the message.
1196 *
1197 * @return #GNUNET_YES or #GNUNET_NO with the decision to drop. 1055 * @return #GNUNET_YES or #GNUNET_NO with the decision to drop.
1198 */ 1056 */
1199static int 1057static int
1200should_I_drop (struct CadetPeerQueue *q) 1058should_I_drop (void)
1201{ 1059{
1202 if (0 == drop_percent) 1060 if (0 == drop_percent)
1203 return GNUNET_NO; 1061 return GNUNET_NO;
@@ -1209,297 +1067,87 @@ should_I_drop (struct CadetPeerQueue *q)
1209} 1067}
1210 1068
1211 1069
1212/**
1213 * Core callback to write a queued packet to core buffer
1214 *
1215 * @param cls Closure (peer info).
1216 * @param size Number of bytes available in buf.
1217 * @param buf Where the to write the message.
1218 *
1219 * @return number of bytes written to buf
1220 */
1221static size_t
1222queue_send (void *cls, size_t size, void *buf)
1223{
1224 struct CadetPeer *peer = cls;
1225 struct CadetConnection *c;
1226 struct CadetPeerQueue *queue;
1227 struct GNUNET_TIME_Relative core_wait_time;
1228 const char *wait_s;
1229 const struct GNUNET_PeerIdentity *dst_id;
1230 size_t msg_size;
1231 size_t total_size;
1232 size_t rest;
1233 char *dst;
1234 uint32_t pid;
1235
1236 GCC_check_connections ();
1237 LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1238 LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1239 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n",
1240 GCP_2s (peer), size);
1241
1242 /* Sanity checking */
1243 if (NULL == buf || 0 == size)
1244 {
1245 LOG (GNUNET_ERROR_TYPE_DEBUG, " not allowed/\n");
1246 if (GNUNET_NO == in_shutdown)
1247 {
1248 queue = peer_get_first_message (peer);
1249 if (NULL == queue)
1250 {
1251 peer->core_transmit = NULL;
1252 peer->tmt_time.abs_value_us = 0;
1253 GCC_check_connections ();
1254 return 0;
1255 }
1256 dst_id = GNUNET_PEER_resolve2 (peer->id);
1257 peer->core_transmit =
1258 GNUNET_CORE_notify_transmit_ready (core_handle,
1259 GNUNET_NO, get_priority (queue),
1260 GNUNET_TIME_UNIT_FOREVER_REL,
1261 dst_id,
1262 get_core_size (queue->size),
1263 &queue_send,
1264 peer);
1265 peer->tmt_time = GNUNET_TIME_absolute_get ();
1266 }
1267 else
1268 {
1269 peer->core_transmit = NULL;
1270 peer->tmt_time.abs_value_us = 0;
1271 }
1272 GCC_check_connections ();
1273 return 0;
1274 }
1275
1276 /* Init */
1277 rest = size;
1278 total_size = 0;
1279 dst = (char *) buf;
1280 pid = 0;
1281 peer->core_transmit = NULL;
1282 queue = peer_get_first_message (peer);
1283 if (NULL == queue)
1284 {
1285 GNUNET_break (0); /* Core tmt_rdy should've been canceled */
1286 peer->tmt_time.abs_value_us = 0;
1287 return 0;
1288 }
1289 core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
1290 wait_s = GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_YES);
1291 if (core_wait_time.rel_value_us >= 1000000)
1292 {
1293 LOG (GNUNET_ERROR_TYPE_WARNING,
1294 " %s: core wait time %s (> 1 second) for %u bytes\n",
1295 GCP_2s (peer), wait_s, queue->size);
1296 }
1297 peer->tmt_time.abs_value_us = 0;
1298
1299 /* Copy all possible messages to the core buffer */
1300 while (NULL != queue && rest >= queue->size)
1301 {
1302 c = queue->c;
1303
1304 LOG (GNUNET_ERROR_TYPE_DEBUG, " on conn %s %s\n",
1305 GCC_2s (c), GC_f2s(queue->fwd));
1306 LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok (%u/%u)\n",
1307 queue->size, total_size, size);
1308
1309 msg_size = fill_buf (queue, (void *) dst, size, &pid);
1310
1311 if (should_I_drop (queue))
1312 {
1313 LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n",
1314 GC_m2s (queue->type), GC_m2s (queue->payload_type),
1315 queue->payload_id, GCC_2s (c), GC_f2s (queue->fwd));
1316 msg_size = 0;
1317 }
1318 else
1319 {
1320 LOG (GNUNET_ERROR_TYPE_INFO,
1321 ">>> %s (%s %4u) on conn %s (%p) %s [%5u], after %s\n",
1322 GC_m2s (queue->type), GC_m2s (queue->payload_type),
1323 queue->payload_id, GCC_2s (c), c,
1324 GC_f2s (queue->fwd), msg_size, wait_s);
1325 }
1326 total_size += msg_size;
1327 rest -= msg_size;
1328 dst = &dst[msg_size];
1329 msg_size = 0;
1330
1331 /* Free queue, but cls was freed by send_core_* in fill_buf. */
1332 (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid);
1333
1334 /* Next! */
1335 queue = peer_get_first_message (peer);
1336 }
1337
1338 /* If more data in queue, send next */
1339 if (NULL != queue)
1340 {
1341 LOG (GNUNET_ERROR_TYPE_DEBUG, " more data! (%u)\n", queue->size);
1342 if (NULL == peer->core_transmit)
1343 {
1344 dst_id = GNUNET_PEER_resolve2 (peer->id);
1345 peer->core_transmit =
1346 GNUNET_CORE_notify_transmit_ready (core_handle,
1347 GNUNET_NO, get_priority (queue),
1348 GNUNET_TIME_UNIT_FOREVER_REL,
1349 dst_id,
1350 get_core_size (queue->size),
1351 &queue_send,
1352 peer);
1353 peer->tmt_time = GNUNET_TIME_absolute_get ();
1354 queue->start_waiting = GNUNET_TIME_absolute_get ();
1355 }
1356 else
1357 {
1358 LOG (GNUNET_ERROR_TYPE_DEBUG, "* tmt rdy called somewhere else\n");
1359 }
1360// GCC_start_poll (); FIXME needed?
1361 }
1362 else
1363 {
1364// GCC_stop_poll(); FIXME needed?
1365 }
1366
1367 LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", total_size);
1368 queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
1369 GCC_check_connections ();
1370 return total_size;
1371}
1372
1373
1374/******************************************************************************/ 1070/******************************************************************************/
1375/******************************** API ***********************************/ 1071/******************************** API ***********************************/
1376/******************************************************************************/ 1072/******************************************************************************/
1377 1073
1378
1379/** 1074/**
1380 * Free a transmission that was already queued with all resources 1075 * Call the continuation after a message has been sent or dropped.
1381 * associated to the request.
1382 *
1383 * If connection was marked to be destroyed, and this was the last queued
1384 * message on it, the connection will be free'd as a result.
1385 * 1076 *
1386 * @param queue Queue handler to cancel. 1077 * @param q Queue handle.
1387 * @param clear_cls Is it necessary to free associated cls? 1078 * @param sent #GNUNET_YES if was sent to CORE, #GNUNET_NO if dropped.
1388 * @param sent Was it really sent? (Could have been canceled)
1389 * @param pid PID, if relevant (was sent and was a payload message).
1390 *
1391 * @return #GNUNET_YES if connection was destroyed as a result,
1392 * #GNUNET_NO otherwise.
1393 */ 1079 */
1394int 1080static void
1395GCP_queue_destroy (struct CadetPeerQueue *queue, 1081call_peer_cont (const struct CadetPeerQueue *q, int sent)
1396 int clear_cls,
1397 int sent,
1398 uint32_t pid)
1399{ 1082{
1400 struct CadetPeer *peer; 1083 LOG (GNUNET_ERROR_TYPE_DEBUG, " core mq just sent %s\n", GC_m2s (q->type));
1401 int connection_destroyed; 1084 if (NULL != q->cont)
1402
1403 GCC_check_connections ();
1404 peer = queue->peer;
1405 LOG (GNUNET_ERROR_TYPE_DEBUG, "queue destroy %s\n", GC_m2s (queue->type));
1406 if (GNUNET_YES == clear_cls)
1407 {
1408 LOG (GNUNET_ERROR_TYPE_DEBUG, " free cls\n");
1409 switch (queue->type)
1410 {
1411 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1412 LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n");
1413 /* fall through */
1414 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1415 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1416 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1417 case GNUNET_MESSAGE_TYPE_CADET_KX:
1418 case GNUNET_MESSAGE_TYPE_CADET_AX:
1419 case GNUNET_MESSAGE_TYPE_CADET_ACK:
1420 case GNUNET_MESSAGE_TYPE_CADET_POLL:
1421 GNUNET_free_non_null (queue->cls);
1422 break;
1423
1424 default:
1425 GNUNET_break (0);
1426 LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n",
1427 GC_m2s (queue->type));
1428 }
1429 }
1430 GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
1431
1432 if (!is_connection_management (queue->type))
1433 {
1434 peer->queue_n--;
1435 }
1436
1437 if (NULL != queue->cont)
1438 { 1085 {
1439 struct GNUNET_TIME_Relative wait_time; 1086 struct GNUNET_TIME_Relative wait_time;
1440 1087
1441 wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting); 1088 wait_time = GNUNET_TIME_absolute_get_duration (q->queue_timestamp);
1442 LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n", 1089 LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n",
1443 GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO)); 1090 GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO));
1444 connection_destroyed = queue->cont (queue->cont_cls, 1091 q->cont (q->cont_cls,
1445 queue->c, sent, queue->type, pid, 1092 q->c, q->c_fwd, sent,
1446 queue->fwd, queue->size, wait_time); 1093 q->type, q->payload_type, q->payload_id,
1447 } 1094 q->size, wait_time);
1448 else
1449 {
1450 connection_destroyed = GNUNET_NO;
1451 } 1095 }
1096}
1452 1097
1453 if (NULL == peer_get_first_message (peer) && NULL != peer->core_transmit) 1098
1099/**
1100 * Function called by MQ when a message is sent to CORE.
1101 *
1102 * @param cls Closure (queue handle).
1103 */
1104static void
1105mq_sent (void *cls)
1106{
1107 struct CadetPeerQueue *q = cls;
1108
1109 if (GNUNET_NO == q->management_traffic)
1454 { 1110 {
1455 GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); 1111 q->peer->queue_n--;
1456 peer->core_transmit = NULL;
1457 peer->tmt_time.abs_value_us = 0;
1458 } 1112 }
1459 1113 call_peer_cont (q, GNUNET_YES);
1460 GNUNET_free (queue); 1114 GNUNET_free (q);
1461 GCC_check_connections ();
1462 return connection_destroyed;
1463} 1115}
1464 1116
1465 1117
1466/** 1118/**
1467 * @brief Queue and pass message to core when possible. 1119 * @brief Send a message to another peer (using CORE).
1468 * 1120 *
1469 * @param peer Peer towards which to queue the message. 1121 * @param peer Peer towards which to queue the message.
1470 * @param cls Closure (@c type dependant). It will be used by queue_send to 1122 * @param message Message to send.
1471 * build the message to be sent if not already prebuilt. 1123 * @param payload_type Type of the message's payload, for debug messages.
1472 * @param type Type of the message.
1473 * @param payload_type Type of the message's payload
1474 * 0 if the message is a retransmission (unknown payload). 1124 * 0 if the message is a retransmission (unknown payload).
1475 * UINT16_MAX if the message does not have payload. 1125 * UINT16_MAX if the message does not have payload.
1476 * @param payload_id ID of the payload (MID, ACK #, etc) 1126 * @param payload_id ID of the payload (MID, ACK #, etc)
1477 * @param size Size of the message.
1478 * @param c Connection this message belongs to (can be NULL). 1127 * @param c Connection this message belongs to (can be NULL).
1479 * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!) 1128 * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!)
1480 * @param cont Continuation to be called once CORE has taken the message. 1129 * @param cont Continuation to be called once CORE has sent the message.
1481 * @param cont_cls Closure for @c cont. 1130 * @param cont_cls Closure for @c cont.
1482 * 1131 *
1483 * @return Handle to cancel the message before it is sent. Once cont is called 1132 * @return A handle to the message in the queue or NULL (if dropped).
1484 * message has been sent and therefore the handle is no longer valid.
1485 */ 1133 */
1486struct CadetPeerQueue * 1134struct CadetPeerQueue *
1487GCP_queue_add (struct CadetPeer *peer, 1135GCP_send (struct CadetPeer *peer,
1488 void *cls, 1136 const struct GNUNET_MessageHeader *message,
1489 uint16_t type, 1137 uint16_t payload_type,
1490 uint16_t payload_type, 1138 uint32_t payload_id,
1491 uint32_t payload_id, 1139 struct CadetConnection *c,
1492 size_t size, 1140 int fwd,
1493 struct CadetConnection *c, 1141 GCP_sent cont,
1494 int fwd, 1142 void *cont_cls)
1495 GCP_sent cont,
1496 void *cont_cls)
1497{ 1143{
1498 struct CadetPeerQueue *q; 1144 struct CadetPeerQueue *q;
1499 int priority; 1145 uint16_t type;
1500 int call_core; 1146 uint16_t size;
1501 1147
1502 GCC_check_connections (); 1148 GCC_check_connections ();
1149 type = ntohs (message->type);
1150 size = ntohs (message->size);
1503 LOG (GNUNET_ERROR_TYPE_DEBUG, 1151 LOG (GNUNET_ERROR_TYPE_DEBUG,
1504 "que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n", 1152 "que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n",
1505 GC_m2s (type), GC_m2s (payload_type), payload_id, 1153 GC_m2s (type), GC_m2s (payload_type), payload_id,
@@ -1508,282 +1156,68 @@ GCP_queue_add (struct CadetPeer *peer,
1508 if (NULL == peer->connections) 1156 if (NULL == peer->connections)
1509 { 1157 {
1510 /* We are not connected to this peer, ignore request. */ 1158 /* We are not connected to this peer, ignore request. */
1159 GNUNET_break (0);
1511 LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer)); 1160 LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer));
1512 GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1, 1161 GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1,
1513 GNUNET_NO); 1162 GNUNET_NO);
1514 return NULL; 1163 return NULL;
1515 } 1164 }
1516 1165
1517 priority = 0;
1518 if (is_connection_management (type))
1519 {
1520 priority = 100;
1521 }
1522 LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
1523
1524 call_core = (NULL == c || GNUNET_MESSAGE_TYPE_CADET_KX == type) ?
1525 GNUNET_YES : GCC_is_sendable (c, fwd);
1526 q = GNUNET_new (struct CadetPeerQueue); 1166 q = GNUNET_new (struct CadetPeerQueue);
1527 q->cls = cls; 1167 q->env = GNUNET_MQ_msg_copy (message);
1168 q->peer = peer;
1169 q->cont = cont;
1170 q->cont_cls = cont_cls;
1171 q->queue_timestamp = GNUNET_TIME_absolute_get ();
1172 q->management_traffic = is_connection_management (type);
1528 q->type = type; 1173 q->type = type;
1174 q->size = size;
1529 q->payload_type = payload_type; 1175 q->payload_type = payload_type;
1530 q->payload_id = payload_id; 1176 q->payload_id = payload_id;
1531 q->size = size;
1532 q->peer = peer;
1533 q->c = c; 1177 q->c = c;
1534 q->fwd = fwd; 1178 q->c_fwd = fwd;
1535 q->cont = cont; 1179 GNUNET_MQ_notify_sent (q->env, mq_sent, q);
1536 q->cont_cls = cont_cls;
1537 if (100 > priority)
1538 {
1539 GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, q);
1540 peer->queue_n++;
1541 }
1542 else
1543 {
1544 GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, q);
1545 call_core = GNUNET_YES;
1546 }
1547 1180
1548 q->start_waiting = GNUNET_TIME_absolute_get (); 1181 if (GNUNET_YES == q->management_traffic)
1549 if (NULL == peer->core_transmit && GNUNET_YES == call_core)
1550 { 1182 {
1551 LOG (GNUNET_ERROR_TYPE_DEBUG, 1183 GNUNET_MQ_send (peer->core_mq, q->env); // FIXME implement "_urgent", use
1552 "calling core tmt rdy towards %s for %u bytes\n",
1553 GCP_2s (peer), size);
1554 peer->core_transmit =
1555 GNUNET_CORE_notify_transmit_ready (core_handle,
1556 GNUNET_NO, get_priority (q),
1557 GNUNET_TIME_UNIT_FOREVER_REL,
1558 GNUNET_PEER_resolve2 (peer->id),
1559 get_core_size (size),
1560 &queue_send, peer);
1561 peer->tmt_time = GNUNET_TIME_absolute_get ();
1562 }
1563 else if (GNUNET_NO == call_core)
1564 {
1565 LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s not needed\n",
1566 GCP_2s (peer));
1567
1568 } 1184 }
1569 else 1185 else
1570 { 1186 {
1571 struct GNUNET_TIME_Relative elapsed; 1187 if (GNUNET_YES == should_I_drop())
1572 elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
1573 LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n",
1574 GCP_2s (peer),
1575 GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO));
1576
1577 }
1578 queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
1579 GCC_check_connections ();
1580 return q;
1581}
1582
1583
1584/**
1585 * Cancel all queued messages to a peer that belong to a certain connection.
1586 *
1587 * @param peer Peer towards whom to cancel.
1588 * @param c Connection whose queued messages to cancel. Might be destroyed by
1589 * the sent continuation call.
1590 */
1591void
1592GCP_queue_cancel (struct CadetPeer *peer,
1593 struct CadetConnection *c)
1594{
1595 struct CadetPeerQueue *q;
1596 struct CadetPeerQueue *next;
1597 struct CadetPeerQueue *prev;
1598 int connection_destroyed;
1599
1600 GCC_check_connections ();
1601 connection_destroyed = GNUNET_NO;
1602 for (q = peer->queue_head; NULL != q; q = next)
1603 {
1604 prev = q->prev;
1605 if (q->c == c)
1606 {
1607 LOG (GNUNET_ERROR_TYPE_DEBUG,
1608 "GMP queue cancel %s\n",
1609 GC_m2s (q->type));
1610 GNUNET_assert (GNUNET_NO == connection_destroyed);
1611 if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY == q->type)
1612 {
1613 q->c = NULL;
1614 }
1615 else
1616 {
1617 connection_destroyed = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
1618 }
1619
1620 /* Get next from prev, q->next might be already freed:
1621 * queue destroy -> callback -> GCC_destroy -> cancel_queues -> here
1622 */
1623 if (NULL == prev)
1624 next = peer->queue_head;
1625 else
1626 next = prev->next;
1627 }
1628 else
1629 {
1630 next = q->next;
1631 }
1632 }
1633
1634 if ( (NULL == peer->queue_head) &&
1635 (NULL != peer->core_transmit) )
1636 {
1637 GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
1638 peer->core_transmit = NULL;
1639 peer->tmt_time.abs_value_us = 0;
1640 }
1641 GCC_check_connections ();
1642}
1643
1644
1645/**
1646 * Get the first transmittable message for a connection.
1647 *
1648 * @param peer Neighboring peer.
1649 * @param c Connection.
1650 *
1651 * @return First transmittable message.
1652 */
1653static struct CadetPeerQueue *
1654connection_get_first_message (struct CadetPeer *peer, struct CadetConnection *c)
1655{
1656 struct CadetPeerQueue *q;
1657
1658 for (q = peer->queue_head; NULL != q; q = q->next)
1659 {
1660 if (q->c != c)
1661 continue;
1662 if (queue_is_sendable (q))
1663 { 1188 {
1664 LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable!!\n"); 1189 LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n",
1665 return q; 1190 GC_m2s (q->type), GC_m2s (q->payload_type),
1191 q->payload_id, GCC_2s (c), GC_f2s (q->c_fwd));
1192 GNUNET_MQ_discard (q->env);
1193 call_peer_cont (q, GNUNET_NO);
1194 GNUNET_free (q);
1195 return NULL;
1666 } 1196 }
1667 LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n"); 1197 GNUNET_MQ_send (peer->core_mq, q->env);
1198 peer->queue_n++;
1668 } 1199 }
1669 1200
1670 return NULL;
1671}
1672
1673
1674/**
1675 * Get the first message for a connection and unqueue it.
1676 *
1677 * Only tunnel (or higher) level messages are unqueued. Connection specific
1678 * messages are silently destroyed upon encounter.
1679 *
1680 * @param peer Neighboring peer.
1681 * @param c Connection.
1682 * @param destroyed[in/out] Was the connection destroyed (prev/as a result)?.
1683 * Can NOT be NULL.
1684 *
1685 * @return First message for this connection.
1686 */
1687struct GNUNET_MessageHeader *
1688GCP_connection_pop (struct CadetPeer *peer,
1689 struct CadetConnection *c,
1690 int *destroyed)
1691{
1692 struct CadetPeerQueue *q;
1693 struct CadetPeerQueue *next;
1694 struct GNUNET_MessageHeader *msg;
1695 int dest;
1696
1697 GCC_check_connections ();
1698 GNUNET_assert (NULL != destroyed);
1699 LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_pop on conn %p\n", c);
1700 for (q = peer->queue_head; NULL != q; q = next)
1701 {
1702 next = q->next;
1703 if (q->c != c)
1704 continue;
1705 LOG (GNUNET_ERROR_TYPE_DEBUG, " - queued: %s (%s %u), cont: %p\n",
1706 GC_m2s (q->type), GC_m2s (q->payload_type), q->payload_id,
1707 q->cont);
1708 switch (q->type)
1709 {
1710 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1711 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1712 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1713 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1714 case GNUNET_MESSAGE_TYPE_CADET_ACK:
1715 case GNUNET_MESSAGE_TYPE_CADET_POLL:
1716 dest = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
1717 if (GNUNET_YES == dest)
1718 {
1719 GNUNET_break (GNUNET_NO == *destroyed);
1720 *destroyed = GNUNET_YES;
1721 }
1722 continue;
1723
1724 case GNUNET_MESSAGE_TYPE_CADET_KX:
1725 case GNUNET_MESSAGE_TYPE_CADET_AX:
1726 case GNUNET_MESSAGE_TYPE_CADET_AX_KX:
1727 msg = (struct GNUNET_MessageHeader *) q->cls;
1728 dest = GCP_queue_destroy (q, GNUNET_NO, GNUNET_NO, 0);
1729 if (GNUNET_YES == dest)
1730 {
1731 GNUNET_break (GNUNET_NO == *destroyed);
1732 *destroyed = GNUNET_YES;
1733 }
1734 return msg;
1735
1736 default:
1737 GNUNET_break (0);
1738 LOG (GNUNET_ERROR_TYPE_DEBUG, "Unknown message %s\n", GC_m2s (q->type));
1739 }
1740 }
1741 GCC_check_connections (); 1201 GCC_check_connections ();
1742 return NULL; 1202 return q;
1743} 1203}
1744 1204
1745 1205
1746/** 1206/**
1747 * Unlock a possibly locked queue for a connection. 1207 * Cancel sending a message. Message must have been sent with
1208 * #GCP_send before. May not be called after the notify sent
1209 * callback has been called.
1748 * 1210 *
1749 * If there is a message that can be sent on this connection, call core for it. 1211 * It DOES call the continuation given to #GCP_send.
1750 * Otherwise (if core transmit is already called or there is no sendable
1751 * message) do nothing.
1752 * 1212 *
1753 * @param peer Peer who keeps the queue. 1213 * @param q Queue handle to cancel
1754 * @param c Connection whose messages to unlock.
1755 */ 1214 */
1756void 1215void
1757GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c) 1216GCP_send_cancel (struct CadetPeerQueue *q)
1758{ 1217{
1759 struct CadetPeerQueue *q; 1218 call_peer_cont (q, GNUNET_NO);
1760 size_t size; 1219 GNUNET_MQ_send_cancel (q->env);
1761 1220 GNUNET_free (q);
1762 GCC_check_connections ();
1763 if (NULL != peer->core_transmit)
1764 {
1765 LOG (GNUNET_ERROR_TYPE_DEBUG, " already unlocked!\n");
1766 return; /* Already unlocked */
1767 }
1768
1769 q = connection_get_first_message (peer, c);
1770 if (NULL == q)
1771 {
1772 LOG (GNUNET_ERROR_TYPE_DEBUG, " queue empty!\n");
1773 return; /* Nothing to transmit */
1774 }
1775
1776 size = q->size;
1777 peer->core_transmit =
1778 GNUNET_CORE_notify_transmit_ready (core_handle,
1779 GNUNET_NO, get_priority (q),
1780 GNUNET_TIME_UNIT_FOREVER_REL,
1781 GNUNET_PEER_resolve2 (peer->id),
1782 get_core_size (size),
1783 &queue_send,
1784 peer);
1785 peer->tmt_time = GNUNET_TIME_absolute_get ();
1786 GCC_check_connections ();
1787} 1221}
1788 1222
1789 1223
@@ -1824,23 +1258,12 @@ GCP_init (const struct GNUNET_CONFIGURATION_Handle *c)
1824 LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); 1258 LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
1825 } 1259 }
1826 ats_ch = GNUNET_ATS_connectivity_init (c); 1260 ats_ch = GNUNET_ATS_connectivity_init (c);
1827 core_handle = GNUNET_CORE_connect (c, /* Main configuration */ 1261 connect_to_core (c);
1828 NULL, /* Closure passed to CADET functions */
1829 &core_init, /* Call core_init once connected */
1830 &core_connect, /* Handle connects */
1831 &core_disconnect, /* remove peers on disconnects */
1832 NULL, /* Don't notify about all incoming messages */
1833 GNUNET_NO, /* For header only in notification */
1834 NULL, /* Don't notify about all outbound messages */
1835 GNUNET_NO, /* For header-only out notification */
1836 core_handlers); /* Register these handlers */
1837 if (NULL == core_handle) 1262 if (NULL == core_handle)
1838 { 1263 {
1839 GNUNET_break (0); 1264 GNUNET_break (0);
1840 GNUNET_SCHEDULER_shutdown (); 1265 GNUNET_SCHEDULER_shutdown ();
1841 return;
1842 } 1266 }
1843
1844} 1267}
1845 1268
1846 1269
@@ -1853,13 +1276,10 @@ GCP_shutdown (void)
1853 LOG (GNUNET_ERROR_TYPE_DEBUG, 1276 LOG (GNUNET_ERROR_TYPE_DEBUG,
1854 "Shutting down peer subsystem\n"); 1277 "Shutting down peer subsystem\n");
1855 in_shutdown = GNUNET_YES; 1278 in_shutdown = GNUNET_YES;
1856 GNUNET_CONTAINER_multipeermap_iterate (peers,
1857 &shutdown_peer,
1858 NULL);
1859 if (NULL != core_handle) 1279 if (NULL != core_handle)
1860 { 1280 {
1861 GNUNET_CORE_disconnect (core_handle); 1281 GNUNET_CORE_disconnecT (core_handle);
1862 core_handle = NULL; 1282 core_handle = NULL;
1863 } 1283 }
1864 if (NULL != ats_ch) 1284 if (NULL != ats_ch)
1865 { 1285 {
@@ -1867,6 +1287,12 @@ GCP_shutdown (void)
1867 ats_ch = NULL; 1287 ats_ch = NULL;
1868 } 1288 }
1869 GNUNET_PEER_change_rc (myid, -1); 1289 GNUNET_PEER_change_rc (myid, -1);
1290 /* With MQ API, CORE calls the disconnect handler for every peer
1291 * after calling GNUNET_CORE_disconnecT, shutdown must occur *after* that.
1292 */
1293 GNUNET_CONTAINER_multipeermap_iterate (peers,
1294 &shutdown_peer,
1295 NULL);
1870 GNUNET_CONTAINER_multipeermap_destroy (peers); 1296 GNUNET_CONTAINER_multipeermap_destroy (peers);
1871 peers = NULL; 1297 peers = NULL;
1872} 1298}
@@ -2054,7 +1480,6 @@ GCP_is_neighbor (const struct CadetPeer *peer)
2054 } 1480 }
2055 1481
2056 /* Is not a neighbor but connections is not NULL, probably disconnecting */ 1482 /* Is not a neighbor but connections is not NULL, probably disconnecting */
2057 GNUNET_break (0);
2058 return GNUNET_NO; 1483 return GNUNET_NO;
2059} 1484}
2060 1485
@@ -2254,7 +1679,8 @@ GCP_add_path_to_all (const struct CadetPeerPath *p, int confirmed)
2254{ 1679{
2255 unsigned int i; 1680 unsigned int i;
2256 1681
2257 /* TODO: invert and add */ 1682 /* TODO: invert and add to origin */
1683 /* TODO: replace all "GCP_add_path" with this, make the other one static */
2258 GCC_check_connections (); 1684 GCC_check_connections ();
2259 for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ; 1685 for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ;
2260 for (i++; i < p->length; i++) 1686 for (i++; i < p->length; i++)