aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet_core.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/gnunet-service-cadet_core.c')
-rw-r--r--src/cadet/gnunet-service-cadet_core.c513
1 files changed, 201 insertions, 312 deletions
diff --git a/src/cadet/gnunet-service-cadet_core.c b/src/cadet/gnunet-service-cadet_core.c
index 879230d29..ec70a968b 100644
--- a/src/cadet/gnunet-service-cadet_core.c
+++ b/src/cadet/gnunet-service-cadet_core.c
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
@@ -39,15 +39,13 @@
39#include "gnunet_statistics_service.h" 39#include "gnunet_statistics_service.h"
40#include "cadet_protocol.h" 40#include "cadet_protocol.h"
41 41
42 42#define LOG(level, ...) GNUNET_log_from (level, "cadet-cor", __VA_ARGS__)
43#define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
44 43
45/** 44/**
46 * Information we keep per direction for a route. 45 * Information we keep per direction for a route.
47 */ 46 */
48struct RouteDirection; 47struct RouteDirection;
49 48
50
51/** 49/**
52 * Set of CadetRoutes that have exactly the same number of messages 50 * Set of CadetRoutes that have exactly the same number of messages
53 * in their buffer. Used so we can efficiently find all of those 51 * in their buffer. Used so we can efficiently find all of those
@@ -140,7 +138,6 @@ struct RouteDirection
140 * Is @e mqm currently ready for transmission? 138 * Is @e mqm currently ready for transmission?
141 */ 139 */
142 int is_ready; 140 int is_ready;
143
144}; 141};
145 142
146 143
@@ -179,11 +176,6 @@ struct CadetRoute
179 * Position of this route in the #route_heap. 176 * Position of this route in the #route_heap.
180 */ 177 */
181 struct GNUNET_CONTAINER_HeapNode *hn; 178 struct GNUNET_CONTAINER_HeapNode *hn;
182
183 /**
184 * Options for the route, control buffering.
185 */
186 enum GNUNET_CADET_ChannelOption options;
187}; 179};
188 180
189 181
@@ -263,24 +255,17 @@ lower_rung (struct RouteDirection *dir)
263 struct Rung *rung = dir->rung; 255 struct Rung *rung = dir->rung;
264 struct Rung *prev; 256 struct Rung *prev;
265 257
266 GNUNET_CONTAINER_DLL_remove (rung->rd_head, 258 GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
267 rung->rd_tail,
268 dir);
269 prev = rung->prev; 259 prev = rung->prev;
270 GNUNET_assert (NULL != prev); 260 GNUNET_assert (NULL != prev);
271 if (prev->rung_off != rung->rung_off - 1) 261 if (prev->rung_off != rung->rung_off - 1)
272 { 262 {
273 prev = GNUNET_new (struct Rung); 263 prev = GNUNET_new (struct Rung);
274 prev->rung_off = rung->rung_off - 1; 264 prev->rung_off = rung->rung_off - 1;
275 GNUNET_CONTAINER_DLL_insert_after (rung_head, 265 GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung->prev, prev);
276 rung_tail,
277 rung->prev,
278 prev);
279 } 266 }
280 GNUNET_assert (NULL != prev); 267 GNUNET_assert (NULL != prev);
281 GNUNET_CONTAINER_DLL_insert (prev->rd_head, 268 GNUNET_CONTAINER_DLL_insert (prev->rd_head, prev->rd_tail, dir);
282 prev->rd_tail,
283 dir);
284 dir->rung = prev; 269 dir->rung = prev;
285} 270}
286 271
@@ -293,19 +278,13 @@ lower_rung (struct RouteDirection *dir)
293 * @param env envelope to discard 278 * @param env envelope to discard
294 */ 279 */
295static void 280static void
296discard_buffer (struct RouteDirection *dir, 281discard_buffer (struct RouteDirection *dir, struct GNUNET_MQ_Envelope *env)
297 struct GNUNET_MQ_Envelope *env)
298{ 282{
299 GNUNET_MQ_dll_remove (&dir->env_head, 283 GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
300 &dir->env_tail,
301 env);
302 cur_buffers--; 284 cur_buffers--;
303 GNUNET_MQ_discard (env); 285 GNUNET_MQ_discard (env);
304 lower_rung (dir); 286 lower_rung (dir);
305 GNUNET_STATISTICS_set (stats, 287 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
306 "# buffer use",
307 cur_buffers,
308 GNUNET_NO);
309} 288}
310 289
311 290
@@ -327,12 +306,9 @@ discard_all_from_rung_tail ()
327 "# messages dropped due to full buffer", 306 "# messages dropped due to full buffer",
328 1, 307 1,
329 GNUNET_NO); 308 GNUNET_NO);
330 discard_buffer (dir, 309 discard_buffer (dir, dir->env_head);
331 dir->env_head);
332 } 310 }
333 GNUNET_CONTAINER_DLL_remove (rung_head, 311 GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, tail);
334 rung_tail,
335 tail);
336 GNUNET_free (tail); 312 GNUNET_free (tail);
337} 313}
338 314
@@ -349,7 +325,8 @@ discard_all_from_rung_tail ()
349static void 325static void
350route_message (struct CadetPeer *prev, 326route_message (struct CadetPeer *prev,
351 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, 327 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
352 const struct GNUNET_MessageHeader *msg) 328 const struct GNUNET_MessageHeader *msg,
329 const enum GNUNET_MQ_PriorityPreferences priority)
353{ 330{
354 struct CadetRoute *route; 331 struct CadetRoute *route;
355 struct RouteDirection *dir; 332 struct RouteDirection *dir;
@@ -375,17 +352,14 @@ route_message (struct CadetPeer *prev,
375 /* No need to respond to these! */ 352 /* No need to respond to these! */
376 return; 353 return;
377 } 354 }
378 env = GNUNET_MQ_msg (bm, 355 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
379 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
380 bm->cid = *cid; 356 bm->cid = *cid;
381 bm->peer1 = my_full_id; 357 bm->peer1 = my_full_id;
382 GCP_send_ooo (prev, 358 GCP_send_ooo (prev, env);
383 env);
384 return; 359 return;
385 } 360 }
386 route->last_use = GNUNET_TIME_absolute_get (); 361 route->last_use = GNUNET_TIME_absolute_get ();
387 GNUNET_CONTAINER_heap_update_cost (route->hn, 362 GNUNET_CONTAINER_heap_update_cost (route->hn, route->last_use.abs_value_us);
388 route->last_use.abs_value_us);
389 dir = (prev == route->prev.hop) ? &route->next : &route->prev; 363 dir = (prev == route->prev.hop) ? &route->next : &route->prev;
390 if (GNUNET_YES == dir->is_ready) 364 if (GNUNET_YES == dir->is_ready)
391 { 365 {
@@ -396,27 +370,24 @@ route_message (struct CadetPeer *prev,
396 GNUNET_i2s (GCP_get_id (dir->hop)), 370 GNUNET_i2s (GCP_get_id (dir->hop)),
397 GNUNET_sh2s (&cid->connection_of_tunnel)); 371 GNUNET_sh2s (&cid->connection_of_tunnel));
398 dir->is_ready = GNUNET_NO; 372 dir->is_ready = GNUNET_NO;
399 GCP_send (dir->mqm, 373 GCP_send (dir->mqm, GNUNET_MQ_msg_copy (msg));
400 GNUNET_MQ_msg_copy (msg));
401 return; 374 return;
402 } 375 }
403 /* Check if buffering is disallowed, and if so, make sure we only queue 376 /* Check if low latency is required and if the previous message was
404 one message per direction. */ 377 unreliable; if so, make sure we only queue one message per
405 if ( (0 != (route->options & GNUNET_CADET_OPTION_NOBUFFER)) && 378 direction (no buffering). */
406 (NULL != dir->env_head) ) 379 if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
407 discard_buffer (dir, 380 (NULL != dir->env_head) &&
408 dir->env_head); 381 (0 ==
382 (GNUNET_MQ_env_get_options (dir->env_head) & GNUNET_MQ_PREF_UNRELIABLE)))
383 discard_buffer (dir, dir->env_head);
409 /* Check for duplicates */ 384 /* Check for duplicates */
410 for (const struct GNUNET_MQ_Envelope *env = dir->env_head; 385 for (const struct GNUNET_MQ_Envelope *env = dir->env_head; NULL != env;
411 NULL != env;
412 env = GNUNET_MQ_env_next (env)) 386 env = GNUNET_MQ_env_next (env))
413 { 387 {
414 const struct GNUNET_MessageHeader *hdr = GNUNET_MQ_env_get_msg (env); 388 const struct GNUNET_MessageHeader *hdr = GNUNET_MQ_env_get_msg (env);
415 389
416 if ( (hdr->size == msg->size) && 390 if ((hdr->size == msg->size) && (0 == memcmp (hdr, msg, ntohs (msg->size))))
417 (0 == memcmp (hdr,
418 msg,
419 ntohs (msg->size))) )
420 { 391 {
421 LOG (GNUNET_ERROR_TYPE_DEBUG, 392 LOG (GNUNET_ERROR_TYPE_DEBUG,
422 "Received duplicate of message already in buffer, dropping\n"); 393 "Received duplicate of message already in buffer, dropping\n");
@@ -447,31 +418,22 @@ route_message (struct CadetPeer *prev,
447 "# messages dropped due to full buffer", 418 "# messages dropped due to full buffer",
448 1, 419 1,
449 GNUNET_NO); 420 GNUNET_NO);
450 discard_buffer (dir, 421 discard_buffer (dir, dir->env_head);
451 dir->env_head);
452 rung = dir->rung; 422 rung = dir->rung;
453 } 423 }
454 } 424 }
455 /* remove 'dir' from current rung */ 425 /* remove 'dir' from current rung */
456 GNUNET_CONTAINER_DLL_remove (rung->rd_head, 426 GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
457 rung->rd_tail,
458 dir);
459 /* make 'nxt' point to the next higher rung, create if necessary */ 427 /* make 'nxt' point to the next higher rung, create if necessary */
460 nxt = rung->next; 428 nxt = rung->next;
461 if ( (NULL == nxt) || 429 if ((NULL == nxt) || (rung->rung_off + 1 != nxt->rung_off))
462 (rung->rung_off + 1 != nxt->rung_off) )
463 { 430 {
464 nxt = GNUNET_new (struct Rung); 431 nxt = GNUNET_new (struct Rung);
465 nxt->rung_off = rung->rung_off + 1; 432 nxt->rung_off = rung->rung_off + 1;
466 GNUNET_CONTAINER_DLL_insert_after (rung_head, 433 GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung, nxt);
467 rung_tail,
468 rung,
469 nxt);
470 } 434 }
471 /* insert 'dir' into next higher rung */ 435 /* insert 'dir' into next higher rung */
472 GNUNET_CONTAINER_DLL_insert (nxt->rd_head, 436 GNUNET_CONTAINER_DLL_insert (nxt->rd_head, nxt->rd_tail, dir);
473 nxt->rd_tail,
474 dir);
475 dir->rung = nxt; 437 dir->rung = nxt;
476 438
477 /* add message into 'dir' buffer */ 439 /* add message into 'dir' buffer */
@@ -482,21 +444,21 @@ route_message (struct CadetPeer *prev,
482 GNUNET_i2s (GCP_get_id (dir->hop)), 444 GNUNET_i2s (GCP_get_id (dir->hop)),
483 GNUNET_sh2s (&cid->connection_of_tunnel)); 445 GNUNET_sh2s (&cid->connection_of_tunnel));
484 env = GNUNET_MQ_msg_copy (msg); 446 env = GNUNET_MQ_msg_copy (msg);
485 GNUNET_MQ_dll_insert_tail (&dir->env_head, 447 GNUNET_MQ_env_set_options (env, priority);
486 &dir->env_tail, 448 if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
487 env); 449 (0 != (priority & GNUNET_MQ_PREF_OUT_OF_ORDER)) &&
450 (NULL != dir->env_head) &&
451 (0 == (GNUNET_MQ_env_get_options (dir->env_head) &
452 GNUNET_MQ_PREF_LOW_LATENCY)))
453 GNUNET_MQ_dll_insert_head (&dir->env_head, &dir->env_tail, env);
454 else
455 GNUNET_MQ_dll_insert_tail (&dir->env_head, &dir->env_tail, env);
488 cur_buffers++; 456 cur_buffers++;
489 GNUNET_STATISTICS_set (stats, 457 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
490 "# buffer use",
491 cur_buffers,
492 GNUNET_NO);
493 /* Clean up 'rung' if now empty (and not head) */ 458 /* Clean up 'rung' if now empty (and not head) */
494 if ( (NULL == rung->rd_head) && 459 if ((NULL == rung->rd_head) && (rung != rung_head))
495 (rung != rung_head) )
496 { 460 {
497 GNUNET_CONTAINER_DLL_remove (rung_head, 461 GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, rung);
498 rung_tail,
499 rung);
500 GNUNET_free (rung); 462 GNUNET_free (rung);
501 } 463 }
502} 464}
@@ -541,18 +503,14 @@ destroy_direction (struct RouteDirection *dir)
541 "# messages dropped due to route destruction", 503 "# messages dropped due to route destruction",
542 1, 504 1,
543 GNUNET_NO); 505 GNUNET_NO);
544 discard_buffer (dir, 506 discard_buffer (dir, env);
545 env);
546 } 507 }
547 if (NULL != dir->mqm) 508 if (NULL != dir->mqm)
548 { 509 {
549 GCP_request_mq_cancel (dir->mqm, 510 GCP_request_mq_cancel (dir->mqm, NULL);
550 NULL);
551 dir->mqm = NULL; 511 dir->mqm = NULL;
552 } 512 }
553 GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, 513 GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, rung_head->rd_tail, dir);
554 rung_head->rd_tail,
555 dir);
556} 514}
557 515
558 516
@@ -566,15 +524,15 @@ destroy_route (struct CadetRoute *route)
566{ 524{
567 LOG (GNUNET_ERROR_TYPE_DEBUG, 525 LOG (GNUNET_ERROR_TYPE_DEBUG,
568 "Destroying route from %s to %s of connection %s\n", 526 "Destroying route from %s to %s of connection %s\n",
569 GNUNET_i2s (GCP_get_id (route->prev.hop)), 527 GNUNET_i2s (GCP_get_id (route->prev.hop)),
570 GNUNET_i2s2 (GCP_get_id (route->next.hop)), 528 GNUNET_i2s2 (GCP_get_id (route->next.hop)),
571 GNUNET_sh2s (&route->cid.connection_of_tunnel)); 529 GNUNET_sh2s (&route->cid.connection_of_tunnel));
572 GNUNET_assert (route == 530 GNUNET_assert (route == GNUNET_CONTAINER_heap_remove_node (route->hn));
573 GNUNET_CONTAINER_heap_remove_node (route->hn)); 531 GNUNET_assert (
574 GNUNET_assert (GNUNET_YES == 532 GNUNET_YES ==
575 GNUNET_CONTAINER_multishortmap_remove (routes, 533 GNUNET_CONTAINER_multishortmap_remove (routes,
576 &route->cid.connection_of_tunnel, 534 &route->cid.connection_of_tunnel,
577 route)); 535 route));
578 GNUNET_STATISTICS_set (stats, 536 GNUNET_STATISTICS_set (stats,
579 "# routes", 537 "# routes",
580 GNUNET_CONTAINER_multishortmap_size (routes), 538 GNUNET_CONTAINER_multishortmap_size (routes),
@@ -611,15 +569,13 @@ send_broken (struct RouteDirection *target,
611 GNUNET_i2s2 (peer2), 569 GNUNET_i2s2 (peer2),
612 GNUNET_sh2s (&cid->connection_of_tunnel)); 570 GNUNET_sh2s (&cid->connection_of_tunnel));
613 571
614 env = GNUNET_MQ_msg (bm, 572 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
615 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
616 bm->cid = *cid; 573 bm->cid = *cid;
617 if (NULL != peer1) 574 if (NULL != peer1)
618 bm->peer1 = *peer1; 575 bm->peer1 = *peer1;
619 if (NULL != peer2) 576 if (NULL != peer2)
620 bm->peer2 = *peer2; 577 bm->peer2 = *peer2;
621 GCP_request_mq_cancel (target->mqm, 578 GCP_request_mq_cancel (target->mqm, env);
622 env);
623 target->mqm = NULL; 579 target->mqm = NULL;
624} 580}
625 581
@@ -639,33 +595,22 @@ timeout_cb (void *cls)
639 struct GNUNET_TIME_Absolute exp; 595 struct GNUNET_TIME_Absolute exp;
640 596
641 timeout_task = NULL; 597 timeout_task = NULL;
642 linger = GNUNET_TIME_relative_multiply (keepalive_period, 598 linger = GNUNET_TIME_relative_multiply (keepalive_period, 3);
643 3);
644 while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap))) 599 while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
645 { 600 {
646 exp = GNUNET_TIME_absolute_add (r->last_use, 601 exp = GNUNET_TIME_absolute_add (r->last_use, linger);
647 linger);
648 if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us) 602 if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us)
649 { 603 {
650 /* Route not yet timed out, wait until it does. */ 604 /* Route not yet timed out, wait until it does. */
651 timeout_task = GNUNET_SCHEDULER_add_at (exp, 605 timeout_task = GNUNET_SCHEDULER_add_at (exp, &timeout_cb, NULL);
652 &timeout_cb,
653 NULL);
654 return; 606 return;
655 } 607 }
656 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 608 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
657 "Sending BROKEN due to timeout (%s was last use, %s linger)\n", 609 "Sending BROKEN due to timeout (%s was last use, %s linger)\n",
658 GNUNET_STRINGS_absolute_time_to_string (r->last_use), 610 GNUNET_STRINGS_absolute_time_to_string (r->last_use),
659 GNUNET_STRINGS_relative_time_to_string (linger, 611 GNUNET_STRINGS_relative_time_to_string (linger, GNUNET_YES));
660 GNUNET_YES)); 612 send_broken (&r->prev, &r->cid, NULL, NULL);
661 send_broken (&r->prev, 613 send_broken (&r->next, &r->cid, NULL, NULL);
662 &r->cid,
663 NULL,
664 NULL);
665 send_broken (&r->next,
666 &r->cid,
667 NULL,
668 NULL);
669 destroy_route (r); 614 destroy_route (r);
670 } 615 }
671 /* No more routes left, so no need for a #timeout_task */ 616 /* No more routes left, so no need for a #timeout_task */
@@ -685,8 +630,7 @@ timeout_cb (void *cls)
685 * and the last envelope was discarded 630 * and the last envelope was discarded
686 */ 631 */
687static void 632static void
688dir_ready_cb (void *cls, 633dir_ready_cb (void *cls, int ready)
689 int ready)
690{ 634{
691 struct RouteDirection *dir = cls; 635 struct RouteDirection *dir = cls;
692 struct CadetRoute *route = dir->my_route; 636 struct CadetRoute *route = dir->my_route;
@@ -699,28 +643,18 @@ dir_ready_cb (void *cls,
699 dir->is_ready = GNUNET_YES; 643 dir->is_ready = GNUNET_YES;
700 if (NULL != (env = dir->env_head)) 644 if (NULL != (env = dir->env_head))
701 { 645 {
702 GNUNET_MQ_dll_remove (&dir->env_head, 646 GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
703 &dir->env_tail,
704 env);
705 cur_buffers--; 647 cur_buffers--;
706 GNUNET_STATISTICS_set (stats, 648 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
707 "# buffer use",
708 cur_buffers,
709 GNUNET_NO);
710 lower_rung (dir); 649 lower_rung (dir);
711 dir->is_ready = GNUNET_NO; 650 dir->is_ready = GNUNET_NO;
712 GCP_send (dir->mqm, 651 GCP_send (dir->mqm, env);
713 env);
714 } 652 }
715 return; 653 return;
716 } 654 }
717 odir = (dir == &route->next) ? &route->prev : &route->next; 655 odir = (dir == &route->next) ? &route->prev : &route->next;
718 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 656 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending BROKEN due to MQ going down\n");
719 "Sending BROKEN due to MQ going down\n"); 657 send_broken (&route->next, &route->cid, GCP_get_id (odir->hop), &my_full_id);
720 send_broken (&route->next,
721 &route->cid,
722 GCP_get_id (odir->hop),
723 &my_full_id);
724 destroy_route (route); 658 destroy_route (route);
725} 659}
726 660
@@ -739,12 +673,8 @@ dir_init (struct RouteDirection *dir,
739{ 673{
740 dir->hop = hop; 674 dir->hop = hop;
741 dir->my_route = route; 675 dir->my_route = route;
742 dir->mqm = GCP_request_mq (hop, 676 dir->mqm = GCP_request_mq (hop, &dir_ready_cb, dir);
743 &dir_ready_cb, 677 GNUNET_CONTAINER_DLL_insert (rung_head->rd_head, rung_head->rd_tail, dir);
744 dir);
745 GNUNET_CONTAINER_DLL_insert (rung_head->rd_head,
746 rung_head->rd_tail,
747 dir);
748 dir->rung = rung_head; 678 dir->rung = rung_head;
749 GNUNET_assert (GNUNET_YES == dir->is_ready); 679 GNUNET_assert (GNUNET_YES == dir->is_ready);
750} 680}
@@ -761,21 +691,20 @@ dir_init (struct RouteDirection *dir,
761 * or NULL. 691 * or NULL.
762 */ 692 */
763static void 693static void
764send_broken_without_mqm (struct CadetPeer *target, 694send_broken_without_mqm (
765 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, 695 struct CadetPeer *target,
766 const struct GNUNET_PeerIdentity *failure_at) 696 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
697 const struct GNUNET_PeerIdentity *failure_at)
767{ 698{
768 struct GNUNET_MQ_Envelope *env; 699 struct GNUNET_MQ_Envelope *env;
769 struct GNUNET_CADET_ConnectionBrokenMessage *bm; 700 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
770 701
771 env = GNUNET_MQ_msg (bm, 702 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
772 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
773 bm->cid = *cid; 703 bm->cid = *cid;
774 bm->peer1 = my_full_id; 704 bm->peer1 = my_full_id;
775 if (NULL != failure_at) 705 if (NULL != failure_at)
776 bm->peer2 = *failure_at; 706 bm->peer2 = *failure_at;
777 GCP_send_ooo (target, 707 GCP_send_ooo (target, env);
778 env);
779} 708}
780 709
781 710
@@ -786,19 +715,19 @@ send_broken_without_mqm (struct CadetPeer *target,
786 * @param msg Message itself. 715 * @param msg Message itself.
787 */ 716 */
788static void 717static void
789handle_connection_create (void *cls, 718handle_connection_create (
790 const struct GNUNET_CADET_ConnectionCreateMessage *msg) 719 void *cls,
720 const struct GNUNET_CADET_ConnectionCreateMessage *msg)
791{ 721{
792 struct CadetPeer *sender = cls; 722 struct CadetPeer *sender = cls;
793 struct CadetPeer *next; 723 struct CadetPeer *next;
794 const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1]; 724 const struct GNUNET_PeerIdentity *pids =
725 (const struct GNUNET_PeerIdentity *) &msg[1];
795 struct CadetRoute *route; 726 struct CadetRoute *route;
796 uint16_t size = ntohs (msg->header.size) - sizeof (*msg); 727 uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
797 unsigned int path_length; 728 unsigned int path_length;
798 unsigned int off; 729 unsigned int off;
799 enum GNUNET_CADET_ChannelOption options;
800 730
801 options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options);
802 path_length = size / sizeof (struct GNUNET_PeerIdentity); 731 path_length = size / sizeof (struct GNUNET_PeerIdentity);
803 if (0 == path_length) 732 if (0 == path_length)
804 { 733 {
@@ -816,20 +745,19 @@ handle_connection_create (void *cls,
816 { 745 {
817 struct GNUNET_CONTAINER_MultiPeerMap *map; 746 struct GNUNET_CONTAINER_MultiPeerMap *map;
818 747
819 map = GNUNET_CONTAINER_multipeermap_create (path_length * 2, 748 map = GNUNET_CONTAINER_multipeermap_create (path_length * 2, GNUNET_YES);
820 GNUNET_YES);
821 GNUNET_assert (NULL != map); 749 GNUNET_assert (NULL != map);
822 for (unsigned int i=0;i<path_length;i++) 750 for (unsigned int i = 0; i < path_length; i++)
823 { 751 {
824 LOG (GNUNET_ERROR_TYPE_DEBUG, 752 LOG (GNUNET_ERROR_TYPE_DEBUG,
825 "CADET_CONNECTION_CREATE has peer %s at offset %u\n", 753 "CADET_CONNECTION_CREATE has peer %s at offset %u\n",
826 GNUNET_i2s (&pids[i]), 754 GNUNET_i2s (&pids[i]),
827 i); 755 i);
828 if (GNUNET_SYSERR == 756 if (GNUNET_SYSERR == GNUNET_CONTAINER_multipeermap_put (
829 GNUNET_CONTAINER_multipeermap_put (map, 757 map,
830 &pids[i], 758 &pids[i],
831 NULL, 759 NULL,
832 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) 760 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
833 { 761 {
834 /* bogus request */ 762 /* bogus request */
835 GNUNET_CONTAINER_multipeermap_destroy (map); 763 GNUNET_CONTAINER_multipeermap_destroy (map);
@@ -842,9 +770,8 @@ handle_connection_create (void *cls,
842 GNUNET_CONTAINER_multipeermap_destroy (map); 770 GNUNET_CONTAINER_multipeermap_destroy (map);
843 } 771 }
844 /* Initiator is at offset 0, find us */ 772 /* Initiator is at offset 0, find us */
845 for (off=1;off<path_length;off++) 773 for (off = 1; off < path_length; off++)
846 if (0 == GNUNET_memcmp (&my_full_id, 774 if (0 == GNUNET_memcmp (&my_full_id, &pids[off]))
847 &pids[off]))
848 break; 775 break;
849 if (off == path_length) 776 if (off == path_length)
850 { 777 {
@@ -854,16 +781,14 @@ handle_connection_create (void *cls,
854 return; 781 return;
855 } 782 }
856 /* Check previous hop */ 783 /* Check previous hop */
857 if (sender != GCP_get (&pids[off - 1], 784 if (sender != GCP_get (&pids[off - 1], GNUNET_NO))
858 GNUNET_NO))
859 { 785 {
860 LOG (GNUNET_ERROR_TYPE_DEBUG, 786 LOG (GNUNET_ERROR_TYPE_DEBUG,
861 "Dropping CADET_CONNECTION_CREATE without sender at previous hop in the path\n"); 787 "Dropping CADET_CONNECTION_CREATE without sender at previous hop in the path\n");
862 GNUNET_break_op (0); 788 GNUNET_break_op (0);
863 return; 789 return;
864 } 790 }
865 if (NULL != 791 if (NULL != (route = get_route (&msg->cid)))
866 (route = get_route (&msg->cid)))
867 { 792 {
868 /* Duplicate CREATE, pass it on, previous one might have been lost! */ 793 /* Duplicate CREATE, pass it on, previous one might have been lost! */
869 794
@@ -872,7 +797,9 @@ handle_connection_create (void *cls,
872 GNUNET_sh2s (&msg->cid.connection_of_tunnel)); 797 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
873 route_message (sender, 798 route_message (sender,
874 &msg->cid, 799 &msg->cid,
875 &msg->header); 800 &msg->header,
801 GNUNET_MQ_PRIO_CRITICAL_CONTROL |
802 GNUNET_MQ_PREF_LOW_LATENCY);
876 return; 803 return;
877 } 804 }
878 if (off == path_length - 1) 805 if (off == path_length - 1)
@@ -892,19 +819,15 @@ handle_connection_create (void *cls,
892 return; 819 return;
893 } 820 }
894 821
895 origin = GCP_get (&pids[0], 822 origin = GCP_get (&pids[0], GNUNET_YES);
896 GNUNET_YES);
897 LOG (GNUNET_ERROR_TYPE_DEBUG, 823 LOG (GNUNET_ERROR_TYPE_DEBUG,
898 "I am destination for CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n", 824 "I am destination for CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
899 GCP_2s (origin), 825 GCP_2s (origin),
900 GNUNET_sh2s (&msg->cid.connection_of_tunnel)); 826 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
901 path = GCPP_get_path_from_route (path_length - 1, 827 path = GCPP_get_path_from_route (path_length - 1, pids);
902 pids);
903 if (GNUNET_OK != 828 if (GNUNET_OK !=
904 GCT_add_inbound_connection (GCP_get_tunnel (origin, 829 GCT_add_inbound_connection (GCP_get_tunnel (origin, GNUNET_YES),
905 GNUNET_YES),
906 &msg->cid, 830 &msg->cid,
907 (enum GNUNET_CADET_ChannelOption) ntohl (msg->options),
908 path)) 831 path))
909 { 832 {
910 /* Send back BROKEN: duplicate connection on the same path, 833 /* Send back BROKEN: duplicate connection on the same path,
@@ -914,18 +837,14 @@ handle_connection_create (void *cls,
914 GCP_2s (sender), 837 GCP_2s (sender),
915 GNUNET_sh2s (&msg->cid.connection_of_tunnel), 838 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
916 GCPP_2s (path)); 839 GCPP_2s (path));
917 send_broken_without_mqm (sender, 840 send_broken_without_mqm (sender, &msg->cid, NULL);
918 &msg->cid,
919 NULL);
920 return; 841 return;
921 } 842 }
922 return; 843 return;
923 } 844 }
924 /* We are merely a hop on the way, check if we can support the route */ 845 /* We are merely a hop on the way, check if we can support the route */
925 next = GCP_get (&pids[off + 1], 846 next = GCP_get (&pids[off + 1], GNUNET_NO);
926 GNUNET_NO); 847 if ((NULL == next) || (GNUNET_NO == GCP_has_core_connection (next)))
927 if ( (NULL == next) ||
928 (GNUNET_NO == GCP_has_core_connection (next)) )
929 { 848 {
930 /* unworkable, send back BROKEN notification */ 849 /* unworkable, send back BROKEN notification */
931 LOG (GNUNET_ERROR_TYPE_DEBUG, 850 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -934,9 +853,7 @@ handle_connection_create (void *cls,
934 GNUNET_sh2s (&msg->cid.connection_of_tunnel), 853 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
935 GNUNET_i2s (&pids[off + 1]), 854 GNUNET_i2s (&pids[off + 1]),
936 off + 1); 855 off + 1);
937 send_broken_without_mqm (sender, 856 send_broken_without_mqm (sender, &msg->cid, &pids[off + 1]);
938 &msg->cid,
939 &pids[off + 1]);
940 return; 857 return;
941 } 858 }
942 if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes)) 859 if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
@@ -945,9 +862,7 @@ handle_connection_create (void *cls,
945 "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n", 862 "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
946 GCP_2s (sender), 863 GCP_2s (sender),
947 GNUNET_sh2s (&msg->cid.connection_of_tunnel)); 864 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
948 send_broken_without_mqm (sender, 865 send_broken_without_mqm (sender, &msg->cid, &pids[off - 1]);
949 &msg->cid,
950 &pids[off - 1]);
951 return; 866 return;
952 } 867 }
953 868
@@ -959,20 +874,16 @@ handle_connection_create (void *cls,
959 GNUNET_i2s (&pids[off + 1]), 874 GNUNET_i2s (&pids[off + 1]),
960 off + 1); 875 off + 1);
961 route = GNUNET_new (struct CadetRoute); 876 route = GNUNET_new (struct CadetRoute);
962 route->options = options;
963 route->cid = msg->cid; 877 route->cid = msg->cid;
964 route->last_use = GNUNET_TIME_absolute_get (); 878 route->last_use = GNUNET_TIME_absolute_get ();
965 dir_init (&route->prev, 879 dir_init (&route->prev, route, sender);
966 route, 880 dir_init (&route->next, route, next);
967 sender);
968 dir_init (&route->next,
969 route,
970 next);
971 GNUNET_assert (GNUNET_OK == 881 GNUNET_assert (GNUNET_OK ==
972 GNUNET_CONTAINER_multishortmap_put (routes, 882 GNUNET_CONTAINER_multishortmap_put (
973 &route->cid.connection_of_tunnel, 883 routes,
974 route, 884 &route->cid.connection_of_tunnel,
975 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 885 route,
886 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
976 GNUNET_STATISTICS_set (stats, 887 GNUNET_STATISTICS_set (stats,
977 "# routes", 888 "# routes",
978 GNUNET_CONTAINER_multishortmap_size (routes), 889 GNUNET_CONTAINER_multishortmap_size (routes),
@@ -981,14 +892,16 @@ handle_connection_create (void *cls,
981 route, 892 route,
982 route->last_use.abs_value_us); 893 route->last_use.abs_value_us);
983 if (NULL == timeout_task) 894 if (NULL == timeout_task)
984 timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period, 895 timeout_task =
985 3), 896 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
986 &timeout_cb, 897 3),
987 NULL); 898 &timeout_cb,
899 NULL);
988 /* also pass CREATE message along to next hop */ 900 /* also pass CREATE message along to next hop */
989 route_message (sender, 901 route_message (sender,
990 &msg->cid, 902 &msg->cid,
991 &msg->header); 903 &msg->header,
904 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
992} 905}
993 906
994 907
@@ -999,8 +912,9 @@ handle_connection_create (void *cls,
999 * @param msg Message itself. 912 * @param msg Message itself.
1000 */ 913 */
1001static void 914static void
1002handle_connection_create_ack (void *cls, 915handle_connection_create_ack (
1003 const struct GNUNET_CADET_ConnectionCreateAckMessage *msg) 916 void *cls,
917 const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
1004{ 918{
1005 struct CadetPeer *peer = cls; 919 struct CadetPeer *peer = cls;
1006 struct CadetConnection *cc; 920 struct CadetConnection *cc;
@@ -1011,12 +925,9 @@ handle_connection_create_ack (void *cls,
1011 { 925 {
1012 /* verify ACK came from the right direction */ 926 /* verify ACK came from the right direction */
1013 unsigned int len; 927 unsigned int len;
1014 struct CadetPeerPath *path = GCC_get_path (cc, 928 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1015 &len);
1016 929
1017 if (peer != 930 if (peer != GCPP_get_peer_at_offset (path, 0))
1018 GCPP_get_peer_at_offset (path,
1019 0))
1020 { 931 {
1021 /* received ACK from unexpected direction, ignore! */ 932 /* received ACK from unexpected direction, ignore! */
1022 GNUNET_break_op (0); 933 GNUNET_break_op (0);
@@ -1032,7 +943,8 @@ handle_connection_create_ack (void *cls,
1032 /* We're just an intermediary peer, route the message along its path */ 943 /* We're just an intermediary peer, route the message along its path */
1033 route_message (peer, 944 route_message (peer,
1034 &msg->cid, 945 &msg->cid,
1035 &msg->header); 946 &msg->header,
947 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
1036} 948}
1037 949
1038 950
@@ -1044,8 +956,9 @@ handle_connection_create_ack (void *cls,
1044 * @deprecated duplicate logic with #handle_destroy(); dedup! 956 * @deprecated duplicate logic with #handle_destroy(); dedup!
1045 */ 957 */
1046static void 958static void
1047handle_connection_broken (void *cls, 959handle_connection_broken (
1048 const struct GNUNET_CADET_ConnectionBrokenMessage *msg) 960 void *cls,
961 const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
1049{ 962{
1050 struct CadetPeer *peer = cls; 963 struct CadetPeer *peer = cls;
1051 struct CadetConnection *cc; 964 struct CadetConnection *cc;
@@ -1057,12 +970,9 @@ handle_connection_broken (void *cls,
1057 { 970 {
1058 /* verify message came from the right direction */ 971 /* verify message came from the right direction */
1059 unsigned int len; 972 unsigned int len;
1060 struct CadetPeerPath *path = GCC_get_path (cc, 973 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1061 &len);
1062 974
1063 if (peer != 975 if (peer != GCPP_get_peer_at_offset (path, 0))
1064 GCPP_get_peer_at_offset (path,
1065 0))
1066 { 976 {
1067 /* received message from unexpected direction, ignore! */ 977 /* received message from unexpected direction, ignore! */
1068 GNUNET_break_op (0); 978 GNUNET_break_op (0);
@@ -1080,7 +990,8 @@ handle_connection_broken (void *cls,
1080 /* We're just an intermediary peer, route the message along its path */ 990 /* We're just an intermediary peer, route the message along its path */
1081 route_message (peer, 991 route_message (peer,
1082 &msg->cid, 992 &msg->cid,
1083 &msg->header); 993 &msg->header,
994 GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
1084 route = get_route (&msg->cid); 995 route = get_route (&msg->cid);
1085 if (NULL != route) 996 if (NULL != route)
1086 destroy_route (route); 997 destroy_route (route);
@@ -1095,8 +1006,9 @@ handle_connection_broken (void *cls,
1095 * @param msg Message itself. 1006 * @param msg Message itself.
1096 */ 1007 */
1097static void 1008static void
1098handle_connection_destroy (void *cls, 1009handle_connection_destroy (
1099 const struct GNUNET_CADET_ConnectionDestroyMessage *msg) 1010 void *cls,
1011 const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
1100{ 1012{
1101 struct CadetPeer *peer = cls; 1013 struct CadetPeer *peer = cls;
1102 struct CadetConnection *cc; 1014 struct CadetConnection *cc;
@@ -1108,12 +1020,9 @@ handle_connection_destroy (void *cls,
1108 { 1020 {
1109 /* verify message came from the right direction */ 1021 /* verify message came from the right direction */
1110 unsigned int len; 1022 unsigned int len;
1111 struct CadetPeerPath *path = GCC_get_path (cc, 1023 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1112 &len);
1113 1024
1114 if (peer != 1025 if (peer != GCPP_get_peer_at_offset (path, 0))
1115 GCPP_get_peer_at_offset (path,
1116 0))
1117 { 1026 {
1118 /* received message from unexpected direction, ignore! */ 1027 /* received message from unexpected direction, ignore! */
1119 GNUNET_break_op (0); 1028 GNUNET_break_op (0);
@@ -1133,7 +1042,8 @@ handle_connection_destroy (void *cls,
1133 GNUNET_sh2s (&msg->cid.connection_of_tunnel)); 1042 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1134 route_message (peer, 1043 route_message (peer,
1135 &msg->cid, 1044 &msg->cid,
1136 &msg->header); 1045 &msg->header,
1046 GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
1137 route = get_route (&msg->cid); 1047 route = get_route (&msg->cid);
1138 if (NULL != route) 1048 if (NULL != route)
1139 destroy_route (route); 1049 destroy_route (route);
@@ -1165,26 +1075,23 @@ handle_tunnel_kx (void *cls,
1165 { 1075 {
1166 /* verify message came from the right direction */ 1076 /* verify message came from the right direction */
1167 unsigned int len; 1077 unsigned int len;
1168 struct CadetPeerPath *path = GCC_get_path (cc, 1078 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1169 &len);
1170 1079
1171 if (peer != 1080 if (peer != GCPP_get_peer_at_offset (path, 0))
1172 GCPP_get_peer_at_offset (path,
1173 0))
1174 { 1081 {
1175 /* received message from unexpected direction, ignore! */ 1082 /* received message from unexpected direction, ignore! */
1176 GNUNET_break_op (0); 1083 GNUNET_break_op (0);
1177 return; 1084 return;
1178 } 1085 }
1179 GCC_handle_kx (cc, 1086 GCC_handle_kx (cc, msg);
1180 msg);
1181 return; 1087 return;
1182 } 1088 }
1183 1089
1184 /* We're just an intermediary peer, route the message along its path */ 1090 /* We're just an intermediary peer, route the message along its path */
1185 route_message (peer, 1091 route_message (peer,
1186 &msg->cid, 1092 &msg->cid,
1187 &msg->header); 1093 &msg->header,
1094 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
1188} 1095}
1189 1096
1190 1097
@@ -1195,8 +1102,9 @@ handle_tunnel_kx (void *cls,
1195 * @param msg Message itself. 1102 * @param msg Message itself.
1196 */ 1103 */
1197static void 1104static void
1198handle_tunnel_kx_auth (void *cls, 1105handle_tunnel_kx_auth (
1199 const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg) 1106 void *cls,
1107 const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
1200{ 1108{
1201 struct CadetPeer *peer = cls; 1109 struct CadetPeer *peer = cls;
1202 struct CadetConnection *cc; 1110 struct CadetConnection *cc;
@@ -1207,26 +1115,23 @@ handle_tunnel_kx_auth (void *cls,
1207 { 1115 {
1208 /* verify message came from the right direction */ 1116 /* verify message came from the right direction */
1209 unsigned int len; 1117 unsigned int len;
1210 struct CadetPeerPath *path = GCC_get_path (cc, 1118 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1211 &len);
1212 1119
1213 if (peer != 1120 if (peer != GCPP_get_peer_at_offset (path, 0))
1214 GCPP_get_peer_at_offset (path,
1215 0))
1216 { 1121 {
1217 /* received message from unexpected direction, ignore! */ 1122 /* received message from unexpected direction, ignore! */
1218 GNUNET_break_op (0); 1123 GNUNET_break_op (0);
1219 return; 1124 return;
1220 } 1125 }
1221 GCC_handle_kx_auth (cc, 1126 GCC_handle_kx_auth (cc, msg);
1222 msg);
1223 return; 1127 return;
1224 } 1128 }
1225 1129
1226 /* We're just an intermediary peer, route the message along its path */ 1130 /* We're just an intermediary peer, route the message along its path */
1227 route_message (peer, 1131 route_message (peer,
1228 &msg->kx.cid, 1132 &msg->kx.cid,
1229 &msg->kx.header); 1133 &msg->kx.header,
1134 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
1230} 1135}
1231 1136
1232 1137
@@ -1265,25 +1170,19 @@ handle_tunnel_encrypted (void *cls,
1265 { 1170 {
1266 /* verify message came from the right direction */ 1171 /* verify message came from the right direction */
1267 unsigned int len; 1172 unsigned int len;
1268 struct CadetPeerPath *path = GCC_get_path (cc, 1173 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1269 &len);
1270 1174
1271 if (peer != 1175 if (peer != GCPP_get_peer_at_offset (path, 0))
1272 GCPP_get_peer_at_offset (path,
1273 0))
1274 { 1176 {
1275 /* received message from unexpected direction, ignore! */ 1177 /* received message from unexpected direction, ignore! */
1276 GNUNET_break_op (0); 1178 GNUNET_break_op (0);
1277 return; 1179 return;
1278 } 1180 }
1279 GCC_handle_encrypted (cc, 1181 GCC_handle_encrypted (cc, msg);
1280 msg);
1281 return; 1182 return;
1282 } 1183 }
1283 /* We're just an intermediary peer, route the message along its path */ 1184 /* We're just an intermediary peer, route the message along its path */
1284 route_message (peer, 1185 route_message (peer, &msg->cid, &msg->header, GNUNET_MQ_PRIO_BEST_EFFORT);
1285 &msg->cid,
1286 &msg->header);
1287} 1186}
1288 1187
1289 1188
@@ -1300,17 +1199,14 @@ handle_tunnel_encrypted (void *cls,
1300 * @param my_identity ID of this peer, NULL if we failed 1199 * @param my_identity ID of this peer, NULL if we failed
1301 */ 1200 */
1302static void 1201static void
1303core_init_cb (void *cls, 1202core_init_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
1304 const struct GNUNET_PeerIdentity *my_identity)
1305{ 1203{
1306 if (NULL == my_identity) 1204 if (NULL == my_identity)
1307 { 1205 {
1308 GNUNET_break (0); 1206 GNUNET_break (0);
1309 return; 1207 return;
1310 } 1208 }
1311 GNUNET_break (0 == 1209 GNUNET_break (0 == GNUNET_memcmp (my_identity, &my_full_id));
1312 GNUNET_memcmp (my_identity,
1313 &my_full_id));
1314} 1210}
1315 1211
1316 1212
@@ -1330,10 +1226,8 @@ core_connect_cb (void *cls,
1330 LOG (GNUNET_ERROR_TYPE_DEBUG, 1226 LOG (GNUNET_ERROR_TYPE_DEBUG,
1331 "CORE connection to peer %s was established.\n", 1227 "CORE connection to peer %s was established.\n",
1332 GNUNET_i2s (peer)); 1228 GNUNET_i2s (peer));
1333 cp = GCP_get (peer, 1229 cp = GCP_get (peer, GNUNET_YES);
1334 GNUNET_YES); 1230 GCP_set_mq (cp, mq);
1335 GCP_set_mq (cp,
1336 mq);
1337 return cp; 1231 return cp;
1338} 1232}
1339 1233
@@ -1354,8 +1248,7 @@ core_disconnect_cb (void *cls,
1354 LOG (GNUNET_ERROR_TYPE_DEBUG, 1248 LOG (GNUNET_ERROR_TYPE_DEBUG,
1355 "CORE connection to peer %s went down.\n", 1249 "CORE connection to peer %s went down.\n",
1356 GNUNET_i2s (peer)); 1250 GNUNET_i2s (peer));
1357 GCP_set_mq (cp, 1251 GCP_set_mq (cp, NULL);
1358 NULL);
1359} 1252}
1360 1253
1361 1254
@@ -1367,52 +1260,48 @@ core_disconnect_cb (void *cls,
1367void 1260void
1368GCO_init (const struct GNUNET_CONFIGURATION_Handle *c) 1261GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
1369{ 1262{
1370 struct GNUNET_MQ_MessageHandler handlers[] = { 1263 struct GNUNET_MQ_MessageHandler handlers[] =
1371 GNUNET_MQ_hd_var_size (connection_create, 1264 {GNUNET_MQ_hd_var_size (connection_create,
1372 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 1265 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
1373 struct GNUNET_CADET_ConnectionCreateMessage, 1266 struct GNUNET_CADET_ConnectionCreateMessage,
1374 NULL), 1267 NULL),
1375 GNUNET_MQ_hd_fixed_size (connection_create_ack, 1268 GNUNET_MQ_hd_fixed_size (connection_create_ack,
1376 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, 1269 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
1377 struct GNUNET_CADET_ConnectionCreateAckMessage, 1270 struct GNUNET_CADET_ConnectionCreateAckMessage,
1378 NULL), 1271 NULL),
1379 GNUNET_MQ_hd_fixed_size (connection_broken, 1272 GNUNET_MQ_hd_fixed_size (connection_broken,
1380 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, 1273 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1381 struct GNUNET_CADET_ConnectionBrokenMessage, 1274 struct GNUNET_CADET_ConnectionBrokenMessage,
1382 NULL), 1275 NULL),
1383 GNUNET_MQ_hd_fixed_size (connection_destroy, 1276 GNUNET_MQ_hd_fixed_size (connection_destroy,
1384 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, 1277 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
1385 struct GNUNET_CADET_ConnectionDestroyMessage, 1278 struct GNUNET_CADET_ConnectionDestroyMessage,
1386 NULL), 1279 NULL),
1387 GNUNET_MQ_hd_fixed_size (tunnel_kx, 1280 GNUNET_MQ_hd_fixed_size (tunnel_kx,
1388 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX, 1281 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
1389 struct GNUNET_CADET_TunnelKeyExchangeMessage, 1282 struct GNUNET_CADET_TunnelKeyExchangeMessage,
1390 NULL), 1283 NULL),
1391 GNUNET_MQ_hd_fixed_size (tunnel_kx_auth, 1284 GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
1392 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH, 1285 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
1393 struct GNUNET_CADET_TunnelKeyExchangeAuthMessage, 1286 struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
1394 NULL), 1287 NULL),
1395 GNUNET_MQ_hd_var_size (tunnel_encrypted, 1288 GNUNET_MQ_hd_var_size (tunnel_encrypted,
1396 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED, 1289 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
1397 struct GNUNET_CADET_TunnelEncryptedMessage, 1290 struct GNUNET_CADET_TunnelEncryptedMessage,
1398 NULL), 1291 NULL),
1399 GNUNET_MQ_handler_end () 1292 GNUNET_MQ_handler_end ()};
1400 }; 1293
1401 1294 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
1402 if (GNUNET_OK != 1295 "CADET",
1403 GNUNET_CONFIGURATION_get_value_number (c, 1296 "MAX_ROUTES",
1404 "CADET", 1297 &max_routes))
1405 "MAX_ROUTES",
1406 &max_routes))
1407 max_routes = 5000; 1298 max_routes = 5000;
1408 if (GNUNET_OK != 1299 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
1409 GNUNET_CONFIGURATION_get_value_number (c, 1300 "CADET",
1410 "CADET", 1301 "MAX_MSGS_QUEUE",
1411 "MAX_MSGS_QUEUE", 1302 &max_buffers))
1412 &max_buffers))
1413 max_buffers = 10000; 1303 max_buffers = 10000;
1414 routes = GNUNET_CONTAINER_multishortmap_create (1024, 1304 routes = GNUNET_CONTAINER_multishortmap_create (1024, GNUNET_NO);
1415 GNUNET_NO);
1416 route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1305 route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1417 core = GNUNET_CORE_connect (c, 1306 core = GNUNET_CORE_connect (c,
1418 NULL, 1307 NULL,