aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api_core.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-28 19:32:10 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-28 19:32:20 +0200
commit3f945e6798d8d736ceb104b59ea1269a7abdfe8a (patch)
treeb93e3dc99deda0987e85cb256b3903de8bd74853 /src/transport/transport_api_core.c
parent1227fc30369a55b82e77d35d8d128090e37dd437 (diff)
downloadgnunet-3f945e6798d8d736ceb104b59ea1269a7abdfe8a.tar.gz
gnunet-3f945e6798d8d736ceb104b59ea1269a7abdfe8a.zip
towards flow control in TNG
Diffstat (limited to 'src/transport/transport_api_core.c')
-rw-r--r--src/transport/transport_api_core.c260
1 files changed, 101 insertions, 159 deletions
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
index e86499173..a163d7ccf 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -29,11 +29,10 @@
29#include "gnunet_arm_service.h" 29#include "gnunet_arm_service.h"
30#include "gnunet_hello_lib.h" 30#include "gnunet_hello_lib.h"
31#include "gnunet_protocols.h" 31#include "gnunet_protocols.h"
32#include "gnunet_transport_core_service.h"
33#include "gnunet_transport_service.h" 32#include "gnunet_transport_service.h"
34#include "transport.h" 33#include "transport.h"
35 34
36#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) 35#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
37 36
38/** 37/**
39 * If we could not send any payload to a peer for this amount of 38 * If we could not send any payload to a peer for this amount of
@@ -113,11 +112,9 @@ struct Neighbour
113 * Size of the message in @e env. 112 * Size of the message in @e env.
114 */ 113 */
115 uint16_t env_size; 114 uint16_t env_size;
116
117}; 115};
118 116
119 117
120
121/** 118/**
122 * Handle for the transport service (includes all of the 119 * Handle for the transport service (includes all of the
123 * state for the transport service). 120 * state for the transport service).
@@ -187,7 +184,6 @@ struct GNUNET_TRANSPORT_CoreHandle
187 * (if #GNUNET_NO, then @e self is all zeros!). 184 * (if #GNUNET_NO, then @e self is all zeros!).
188 */ 185 */
189 int check_self; 186 int check_self;
190
191}; 187};
192 188
193 189
@@ -212,8 +208,7 @@ static struct Neighbour *
212neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, 208neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
213 const struct GNUNET_PeerIdentity *peer) 209 const struct GNUNET_PeerIdentity *peer)
214{ 210{
215 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, 211 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
216 peer);
217} 212}
218 213
219 214
@@ -234,9 +229,7 @@ notify_excess_cb (void *cls)
234 GNUNET_i2s (&n->id)); 229 GNUNET_i2s (&n->id));
235 230
236 if (NULL != h->neb_cb) 231 if (NULL != h->neb_cb)
237 h->neb_cb (h->cls, 232 h->neb_cb (h->cls, &n->id, n->handlers_cls);
238 &n->id,
239 n->handlers_cls);
240} 233}
241 234
242 235
@@ -251,9 +244,7 @@ notify_excess_cb (void *cls)
251 * #GNUNET_NO if not. 244 * #GNUNET_NO if not.
252 */ 245 */
253static int 246static int
254neighbour_delete (void *cls, 247neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
255 const struct GNUNET_PeerIdentity *key,
256 void *value)
257{ 248{
258 struct GNUNET_TRANSPORT_CoreHandle *handle = cls; 249 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
259 struct Neighbour *n = value; 250 struct Neighbour *n = value;
@@ -263,9 +254,7 @@ neighbour_delete (void *cls,
263 GNUNET_i2s (key)); 254 GNUNET_i2s (key));
264 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); 255 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
265 if (NULL != handle->nd_cb) 256 if (NULL != handle->nd_cb)
266 handle->nd_cb (handle->cls, 257 handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
267 &n->id,
268 n->handlers_cls);
269 if (NULL != n->timeout_task) 258 if (NULL != n->timeout_task)
270 { 259 {
271 GNUNET_SCHEDULER_cancel (n->timeout_task); 260 GNUNET_SCHEDULER_cancel (n->timeout_task);
@@ -278,10 +267,9 @@ neighbour_delete (void *cls,
278 } 267 }
279 GNUNET_MQ_destroy (n->mq); 268 GNUNET_MQ_destroy (n->mq);
280 GNUNET_assert (NULL == n->mq); 269 GNUNET_assert (NULL == n->mq);
281 GNUNET_assert (GNUNET_YES == 270 GNUNET_assert (
282 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, 271 GNUNET_YES ==
283 key, 272 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
284 n));
285 GNUNET_free (n); 273 GNUNET_free (n);
286 return GNUNET_YES; 274 return GNUNET_YES;
287} 275}
@@ -297,8 +285,7 @@ neighbour_delete (void *cls,
297 * @param error error code 285 * @param error error code
298 */ 286 */
299static void 287static void
300mq_error_handler (void *cls, 288mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
301 enum GNUNET_MQ_Error error)
302{ 289{
303 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 290 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
304 291
@@ -317,14 +304,12 @@ mq_error_handler (void *cls,
317 * @return #GNUNET_OK if message is well-formed 304 * @return #GNUNET_OK if message is well-formed
318 */ 305 */
319static int 306static int
320check_hello (void *cls, 307check_hello (void *cls, const struct GNUNET_MessageHeader *msg)
321 const struct GNUNET_MessageHeader *msg)
322{ 308{
323 struct GNUNET_PeerIdentity me; 309 struct GNUNET_PeerIdentity me;
324 310
325 if (GNUNET_OK != 311 if (GNUNET_OK !=
326 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, 312 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me))
327 &me))
328 { 313 {
329 GNUNET_break (0); 314 GNUNET_break (0);
330 return GNUNET_SYSERR; 315 return GNUNET_SYSERR;
@@ -340,8 +325,7 @@ check_hello (void *cls,
340 * @param msg message received 325 * @param msg message received
341 */ 326 */
342static void 327static void
343handle_hello (void *cls, 328handle_hello (void *cls, const struct GNUNET_MessageHeader *msg)
344 const struct GNUNET_MessageHeader *msg)
345{ 329{
346 /* we do not care => FIXME: signal in options to NEVER send HELLOs! */ 330 /* we do not care => FIXME: signal in options to NEVER send HELLOs! */
347} 331}
@@ -388,8 +372,7 @@ notify_send_done (void *cls)
388 n->env = NULL; 372 n->env = NULL;
389 n->traffic_overhead = 0; 373 n->traffic_overhead = 0;
390 } 374 }
391 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 375 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
392 128);
393 if (0 == delay.rel_value_us) 376 if (0 == delay.rel_value_us)
394 { 377 {
395 n->is_ready = GNUNET_YES; 378 n->is_ready = GNUNET_YES;
@@ -399,9 +382,8 @@ notify_send_done (void *cls)
399 GNUNET_MQ_impl_send_in_flight (n->mq); 382 GNUNET_MQ_impl_send_in_flight (n->mq);
400 /* cannot send even a small message without violating 383 /* cannot send even a small message without violating
401 quota, wait a before allowing MQ to send next message */ 384 quota, wait a before allowing MQ to send next message */
402 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, 385 n->timeout_task =
403 &notify_send_done_fin, 386 GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done_fin, n);
404 n);
405} 387}
406 388
407 389
@@ -434,20 +416,17 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
434 return; 416 return;
435 } 417 }
436 GNUNET_assert (NULL == n->env); 418 GNUNET_assert (NULL == n->env);
437 n->env = GNUNET_MQ_msg_nested_mh (obm, 419 n->env =
438 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 420 GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
439 msg);
440 obm->reserved = htonl (0); 421 obm->reserved = htonl (0);
441 obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ 422 obm->timeout = GNUNET_TIME_relative_hton (
423 GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
442 obm->peer = n->id; 424 obm->peer = n->id;
443 GNUNET_assert (NULL == n->timeout_task); 425 GNUNET_assert (NULL == n->timeout_task);
444 n->is_ready = GNUNET_NO; 426 n->is_ready = GNUNET_NO;
445 n->env_size = ntohs (msg->size); 427 n->env_size = ntohs (msg->size);
446 GNUNET_MQ_notify_sent (n->env, 428 GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
447 &notify_send_done, 429 GNUNET_MQ_send (h->mq, n->env);
448 n);
449 GNUNET_MQ_send (h->mq,
450 n->env);
451 LOG (GNUNET_ERROR_TYPE_DEBUG, 430 LOG (GNUNET_ERROR_TYPE_DEBUG,
452 "Queued message of type %u for neighbour `%s'.\n", 431 "Queued message of type %u for neighbour `%s'.\n",
453 ntohs (msg->type), 432 ntohs (msg->type),
@@ -463,8 +442,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
463 * @param impl_state state of the implementation 442 * @param impl_state state of the implementation
464 */ 443 */
465static void 444static void
466mq_destroy_impl (struct GNUNET_MQ_Handle *mq, 445mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
467 void *impl_state)
468{ 446{
469 struct Neighbour *n = impl_state; 447 struct Neighbour *n = impl_state;
470 448
@@ -481,8 +459,7 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
481 * @param impl_state state specific to the implementation 459 * @param impl_state state specific to the implementation
482 */ 460 */
483static void 461static void
484mq_cancel_impl (struct GNUNET_MQ_Handle *mq, 462mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
485 void *impl_state)
486{ 463{
487 struct Neighbour *n = impl_state; 464 struct Neighbour *n = impl_state;
488 465
@@ -506,8 +483,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
506 * @param error error code 483 * @param error error code
507 */ 484 */
508static void 485static void
509peer_mq_error_handler (void *cls, 486peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
510 enum GNUNET_MQ_Error error)
511{ 487{
512 /* struct Neighbour *n = cls; */ 488 /* struct Neighbour *n = cls; */
513 489
@@ -529,12 +505,9 @@ outbound_bw_tracker_update (void *cls)
529 505
530 if (NULL == n->timeout_task) 506 if (NULL == n->timeout_task)
531 return; 507 return;
532 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 508 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
533 128);
534 GNUNET_SCHEDULER_cancel (n->timeout_task); 509 GNUNET_SCHEDULER_cancel (n->timeout_task);
535 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, 510 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done, n);
536 &notify_send_done,
537 n);
538} 511}
539 512
540 513
@@ -545,8 +518,7 @@ outbound_bw_tracker_update (void *cls)
545 * @param cim message received 518 * @param cim message received
546 */ 519 */
547static void 520static void
548handle_connect (void *cls, 521handle_connect (void *cls, const struct ConnectInfoMessage *cim)
549 const struct ConnectInfoMessage *cim)
550{ 522{
551 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 523 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
552 struct Neighbour *n; 524 struct Neighbour *n;
@@ -555,8 +527,7 @@ handle_connect (void *cls,
555 "Receiving CONNECT message for `%s' with quota %u\n", 527 "Receiving CONNECT message for `%s' with quota %u\n",
556 GNUNET_i2s (&cim->id), 528 GNUNET_i2s (&cim->id),
557 ntohl (cim->quota_out.value__)); 529 ntohl (cim->quota_out.value__));
558 n = neighbour_find (h, 530 n = neighbour_find (h, &cim->id);
559 &cim->id);
560 if (NULL != n) 531 if (NULL != n)
561 { 532 {
562 GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */ 533 GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */
@@ -576,13 +547,13 @@ handle_connect (void *cls,
576 &notify_excess_cb, 547 &notify_excess_cb,
577 n); 548 n);
578 GNUNET_assert (GNUNET_OK == 549 GNUNET_assert (GNUNET_OK ==
579 GNUNET_CONTAINER_multipeermap_put (h->neighbours, 550 GNUNET_CONTAINER_multipeermap_put (
580 &n->id, 551 h->neighbours,
581 n, 552 &n->id,
582 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 553 n,
554 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
583 555
584 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, 556 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, cim->quota_out);
585 cim->quota_out);
586 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, 557 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
587 &mq_destroy_impl, 558 &mq_destroy_impl,
588 &mq_cancel_impl, 559 &mq_cancel_impl,
@@ -592,11 +563,8 @@ handle_connect (void *cls,
592 n); 563 n);
593 if (NULL != h->nc_cb) 564 if (NULL != h->nc_cb)
594 { 565 {
595 n->handlers_cls = h->nc_cb (h->cls, 566 n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
596 &n->id, 567 GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
597 n->mq);
598 GNUNET_MQ_set_handlers_closure (n->mq,
599 n->handlers_cls);
600 } 568 }
601} 569}
602 570
@@ -608,8 +576,7 @@ handle_connect (void *cls,
608 * @param dim message received 576 * @param dim message received
609 */ 577 */
610static void 578static void
611handle_disconnect (void *cls, 579handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
612 const struct DisconnectInfoMessage *dim)
613{ 580{
614 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 581 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
615 struct Neighbour *n; 582 struct Neighbour *n;
@@ -625,10 +592,7 @@ handle_disconnect (void *cls,
625 disconnect_and_schedule_reconnect (h); 592 disconnect_and_schedule_reconnect (h);
626 return; 593 return;
627 } 594 }
628 GNUNET_assert (GNUNET_YES == 595 GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
629 neighbour_delete (h,
630 &dim->peer,
631 n));
632} 596}
633 597
634 598
@@ -639,8 +603,7 @@ handle_disconnect (void *cls,
639 * @param okm message received 603 * @param okm message received
640 */ 604 */
641static void 605static void
642handle_send_ok (void *cls, 606handle_send_ok (void *cls, const struct SendOkMessage *okm)
643 const struct SendOkMessage *okm)
644{ 607{
645 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 608 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
646 struct Neighbour *n; 609 struct Neighbour *n;
@@ -653,8 +616,7 @@ handle_send_ok (void *cls,
653 "Receiving SEND_OK message, transmission to %s %s.\n", 616 "Receiving SEND_OK message, transmission to %s %s.\n",
654 GNUNET_i2s (&okm->peer), 617 GNUNET_i2s (&okm->peer),
655 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); 618 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
656 n = neighbour_find (h, 619 n = neighbour_find (h, &okm->peer);
657 &okm->peer);
658 if (NULL == n) 620 if (NULL == n)
659 { 621 {
660 /* We should never get a 'SEND_OK' for a peer that we are not 622 /* We should never get a 'SEND_OK' for a peer that we are not
@@ -681,8 +643,7 @@ handle_send_ok (void *cls,
681 * @param im message received 643 * @param im message received
682 */ 644 */
683static int 645static int
684check_recv (void *cls, 646check_recv (void *cls, const struct InboundMessage *im)
685 const struct InboundMessage *im)
686{ 647{
687 const struct GNUNET_MessageHeader *imm; 648 const struct GNUNET_MessageHeader *imm;
688 uint16_t size; 649 uint16_t size;
@@ -710,12 +671,11 @@ check_recv (void *cls,
710 * @param im message received 671 * @param im message received
711 */ 672 */
712static void 673static void
713handle_recv (void *cls, 674handle_recv (void *cls, const struct InboundMessage *im)
714 const struct InboundMessage *im)
715{ 675{
716 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 676 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
717 const struct GNUNET_MessageHeader *imm 677 const struct GNUNET_MessageHeader *imm =
718 = (const struct GNUNET_MessageHeader *) &im[1]; 678 (const struct GNUNET_MessageHeader *) &im[1];
719 struct Neighbour *n; 679 struct Neighbour *n;
720 680
721 LOG (GNUNET_ERROR_TYPE_DEBUG, 681 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -730,8 +690,7 @@ handle_recv (void *cls,
730 disconnect_and_schedule_reconnect (h); 690 disconnect_and_schedule_reconnect (h);
731 return; 691 return;
732 } 692 }
733 GNUNET_MQ_inject_message (n->mq, 693 GNUNET_MQ_inject_message (n->mq, imm);
734 imm);
735} 694}
736 695
737 696
@@ -742,8 +701,7 @@ handle_recv (void *cls,
742 * @param msg message received 701 * @param msg message received
743 */ 702 */
744static void 703static void
745handle_set_quota (void *cls, 704handle_set_quota (void *cls, const struct QuotaSetMessage *qm)
746 const struct QuotaSetMessage *qm)
747{ 705{
748 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 706 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
749 struct Neighbour *n; 707 struct Neighbour *n;
@@ -752,16 +710,15 @@ handle_set_quota (void *cls,
752 "Receiving SET_QUOTA message for `%s' with quota %u\n", 710 "Receiving SET_QUOTA message for `%s' with quota %u\n",
753 GNUNET_i2s (&qm->peer), 711 GNUNET_i2s (&qm->peer),
754 ntohl (qm->quota.value__)); 712 ntohl (qm->quota.value__));
755 n = neighbour_find (h, 713 n = neighbour_find (h, &qm->peer);
756 &qm->peer);
757 if (NULL == n) 714 if (NULL == n)
758 { 715 {
759 GNUNET_break (0); /* FIXME: julius reports this assertion fails sometimes? */ 716 GNUNET_break (
717 0); /* FIXME: julius reports this assertion fails sometimes? */
760 disconnect_and_schedule_reconnect (h); 718 disconnect_and_schedule_reconnect (h);
761 return; 719 return;
762 } 720 }
763 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, 721 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, qm->quota);
764 qm->quota);
765} 722}
766 723
767 724
@@ -774,50 +731,44 @@ static void
774reconnect (void *cls) 731reconnect (void *cls)
775{ 732{
776 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 733 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
777 struct GNUNET_MQ_MessageHandler handlers[] = { 734 struct GNUNET_MQ_MessageHandler handlers[] =
778 GNUNET_MQ_hd_var_size (hello, 735 {GNUNET_MQ_hd_var_size (hello,
779 GNUNET_MESSAGE_TYPE_HELLO, 736 GNUNET_MESSAGE_TYPE_HELLO,
780 struct GNUNET_MessageHeader, 737 struct GNUNET_MessageHeader,
781 h), 738 h),
782 GNUNET_MQ_hd_fixed_size (connect, 739 GNUNET_MQ_hd_fixed_size (connect,
783 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, 740 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
784 struct ConnectInfoMessage, 741 struct ConnectInfoMessage,
785 h), 742 h),
786 GNUNET_MQ_hd_fixed_size (disconnect, 743 GNUNET_MQ_hd_fixed_size (disconnect,
787 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, 744 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
788 struct DisconnectInfoMessage, 745 struct DisconnectInfoMessage,
789 h), 746 h),
790 GNUNET_MQ_hd_fixed_size (send_ok, 747 GNUNET_MQ_hd_fixed_size (send_ok,
791 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, 748 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
792 struct SendOkMessage, 749 struct SendOkMessage,
793 h), 750 h),
794 GNUNET_MQ_hd_var_size (recv, 751 GNUNET_MQ_hd_var_size (recv,
795 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, 752 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
796 struct InboundMessage, 753 struct InboundMessage,
797 h), 754 h),
798 GNUNET_MQ_hd_fixed_size (set_quota, 755 GNUNET_MQ_hd_fixed_size (set_quota,
799 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, 756 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
800 struct QuotaSetMessage, 757 struct QuotaSetMessage,
801 h), 758 h),
802 GNUNET_MQ_handler_end () 759 GNUNET_MQ_handler_end ()};
803 };
804 struct GNUNET_MQ_Envelope *env; 760 struct GNUNET_MQ_Envelope *env;
805 struct StartMessage *s; 761 struct StartMessage *s;
806 uint32_t options; 762 uint32_t options;
807 763
808 h->reconnect_task = NULL; 764 h->reconnect_task = NULL;
809 LOG (GNUNET_ERROR_TYPE_DEBUG, 765 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
810 "Connecting to transport service.\n");
811 GNUNET_assert (NULL == h->mq); 766 GNUNET_assert (NULL == h->mq);
812 h->mq = GNUNET_CLIENT_connect (h->cfg, 767 h->mq =
813 "transport", 768 GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
814 handlers,
815 &mq_error_handler,
816 h);
817 if (NULL == h->mq) 769 if (NULL == h->mq)
818 return; 770 return;
819 env = GNUNET_MQ_msg (s, 771 env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
820 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
821 options = 0; 772 options = 0;
822 if (h->check_self) 773 if (h->check_self)
823 options |= 1; 774 options |= 1;
@@ -825,8 +776,7 @@ reconnect (void *cls)
825 options |= 2; 776 options |= 2;
826 s->options = htonl (options); 777 s->options = htonl (options);
827 s->self = h->self; 778 s->self = h->self;
828 GNUNET_MQ_send (h->mq, 779 GNUNET_MQ_send (h->mq, env);
829 env);
830} 780}
831 781
832 782
@@ -841,9 +791,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
841{ 791{
842 GNUNET_assert (NULL == h->reconnect_task); 792 GNUNET_assert (NULL == h->reconnect_task);
843 /* Forget about all neighbours that we used to be connected to */ 793 /* Forget about all neighbours that we used to be connected to */
844 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, 794 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
845 &neighbour_delete,
846 h);
847 if (NULL != h->mq) 795 if (NULL != h->mq)
848 { 796 {
849 GNUNET_MQ_destroy (h->mq); 797 GNUNET_MQ_destroy (h->mq);
@@ -851,12 +799,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
851 } 799 }
852 LOG (GNUNET_ERROR_TYPE_DEBUG, 800 LOG (GNUNET_ERROR_TYPE_DEBUG,
853 "Scheduling task to reconnect to transport service in %s.\n", 801 "Scheduling task to reconnect to transport service in %s.\n",
854 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, 802 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
855 GNUNET_YES));
856 h->reconnect_task = 803 h->reconnect_task =
857 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 804 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
858 &reconnect,
859 h);
860 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); 805 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
861} 806}
862 807
@@ -874,8 +819,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
874{ 819{
875 struct Neighbour *n; 820 struct Neighbour *n;
876 821
877 n = neighbour_find (handle, 822 n = neighbour_find (handle, peer);
878 peer);
879 if (NULL == n) 823 if (NULL == n)
880 return NULL; 824 return NULL;
881 return n->mq; 825 return n->mq;
@@ -898,12 +842,12 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
898 */ 842 */
899struct GNUNET_TRANSPORT_CoreHandle * 843struct GNUNET_TRANSPORT_CoreHandle *
900GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 844GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
901 const struct GNUNET_PeerIdentity *self, 845 const struct GNUNET_PeerIdentity *self,
902 const struct GNUNET_MQ_MessageHandler *handlers, 846 const struct GNUNET_MQ_MessageHandler *handlers,
903 void *cls, 847 void *cls,
904 GNUNET_TRANSPORT_NotifyConnect nc, 848 GNUNET_TRANSPORT_NotifyConnect nc,
905 GNUNET_TRANSPORT_NotifyDisconnect nd, 849 GNUNET_TRANSPORT_NotifyDisconnect nd,
906 GNUNET_TRANSPORT_NotifyExcessBandwidth neb) 850 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
907{ 851{
908 struct GNUNET_TRANSPORT_CoreHandle *h; 852 struct GNUNET_TRANSPORT_CoreHandle *h;
909 unsigned int i; 853 unsigned int i;
@@ -922,15 +866,14 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
922 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 866 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
923 if (NULL != handlers) 867 if (NULL != handlers)
924 { 868 {
925 for (i=0;NULL != handlers[i].cb; i++) ; 869 for (i = 0; NULL != handlers[i].cb; i++)
926 h->handlers = GNUNET_new_array (i + 1, 870 ;
927 struct GNUNET_MQ_MessageHandler); 871 h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
928 GNUNET_memcpy (h->handlers, 872 GNUNET_memcpy (h->handlers,
929 handlers, 873 handlers,
930 i * sizeof (struct GNUNET_MQ_MessageHandler)); 874 i * sizeof (struct GNUNET_MQ_MessageHandler));
931 } 875 }
932 LOG (GNUNET_ERROR_TYPE_DEBUG, 876 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
933 "Connecting to transport service\n");
934 reconnect (h); 877 reconnect (h);
935 if (NULL == h->mq) 878 if (NULL == h->mq)
936 { 879 {
@@ -939,8 +882,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
939 return NULL; 882 return NULL;
940 } 883 }
941 h->neighbours = 884 h->neighbours =
942 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, 885 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
943 GNUNET_YES);
944 return h; 886 return h;
945} 887}
946 888
@@ -948,13 +890,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
948/** 890/**
949 * Disconnect from the transport service. 891 * Disconnect from the transport service.
950 * 892 *
951 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() 893 * @param handle handle to the service as returned from
894 * #GNUNET_TRANSPORT_core_connect()
952 */ 895 */
953void 896void
954GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) 897GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
955{ 898{
956 LOG (GNUNET_ERROR_TYPE_DEBUG, 899 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
957 "Transport disconnect called!\n");
958 /* this disconnects all neighbours... */ 900 /* this disconnects all neighbours... */
959 if (NULL == handle->reconnect_task) 901 if (NULL == handle->reconnect_task)
960 disconnect_and_schedule_reconnect (handle); 902 disconnect_and_schedule_reconnect (handle);