aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-08 16:34:31 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-08 16:34:31 +0000
commit6e3599bab213760c66f13f6103ebf650bbe5b7e9 (patch)
treef56a0bbe3ce64c818c87bae6171ba800ab05b701 /src/transport
parent2c0a286c8c29e135c68556658b6ac6cef48a874a (diff)
downloadgnunet-6e3599bab213760c66f13f6103ebf650bbe5b7e9.tar.gz
gnunet-6e3599bab213760c66f13f6103ebf650bbe5b7e9.zip
migrate transport_core API to MQ
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/Makefile.am6
-rw-r--r--src/transport/transport-testing.c6
-rw-r--r--src/transport/transport_api.c1245
-rw-r--r--src/transport/transport_api_get_hello.c199
-rw-r--r--src/transport/transport_api_offer_hello.c98
5 files changed, 592 insertions, 962 deletions
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index 3a1170c10..48793bd87 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -163,10 +163,12 @@ libgnunettransporttesting_la_LDFLAGS = \
163 163
164libgnunettransport_la_SOURCES = \ 164libgnunettransport_la_SOURCES = \
165 transport_api.c transport.h \ 165 transport_api.c transport.h \
166 transport_api_blacklist.c \
167 transport_api_address_to_string.c \ 166 transport_api_address_to_string.c \
167 transport_api_blacklist.c \
168 transport_api_get_hello.c \
168 transport_api_monitor_peers.c \ 169 transport_api_monitor_peers.c \
169 transport_api_monitor_plugins.c 170 transport_api_monitor_plugins.c \
171 transport_api_offer_hello.c
170 172
171libgnunettransport_la_LIBADD = \ 173libgnunettransport_la_LIBADD = \
172 $(top_builddir)/src/hello/libgnunethello.la \ 174 $(top_builddir)/src/hello/libgnunethello.la \
diff --git a/src/transport/transport-testing.c b/src/transport/transport-testing.c
index 4a514ea72..4a3bf3c3e 100644
--- a/src/transport/transport-testing.c
+++ b/src/transport/transport-testing.c
@@ -246,7 +246,7 @@ offer_hello (void *cls)
246 if (NULL != cc->oh) 246 if (NULL != cc->oh)
247 GNUNET_TRANSPORT_offer_hello_cancel (cc->oh); 247 GNUNET_TRANSPORT_offer_hello_cancel (cc->oh);
248 cc->oh = 248 cc->oh =
249 GNUNET_TRANSPORT_offer_hello (cc->p1->th, 249 GNUNET_TRANSPORT_offer_hello (cc->p1->cfg,
250 (const struct GNUNET_MessageHeader *) cc->p2->hello, 250 (const struct GNUNET_MessageHeader *) cc->p2->hello,
251 &hello_offered, 251 &hello_offered,
252 cc); 252 cc);
@@ -380,7 +380,7 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_handle *tth
380 GNUNET_TRANSPORT_TESTING_stop_peer (tth, p); 380 GNUNET_TRANSPORT_TESTING_stop_peer (tth, p);
381 return NULL; 381 return NULL;
382 } 382 }
383 p->ghh = GNUNET_TRANSPORT_get_hello (p->th, 383 p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg,
384 &get_hello, 384 &get_hello,
385 p); 385 p);
386 GNUNET_assert (p->ghh != NULL); 386 GNUNET_assert (p->ghh != NULL);
@@ -465,7 +465,7 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct PeerContext *p,
465 &notify_disconnect); 465 &notify_disconnect);
466 GNUNET_assert (NULL != p->th); 466 GNUNET_assert (NULL != p->th);
467 p->ats = GNUNET_ATS_connectivity_init (p->cfg); 467 p->ats = GNUNET_ATS_connectivity_init (p->cfg);
468 p->ghh = GNUNET_TRANSPORT_get_hello (p->th, 468 p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg,
469 &get_hello, 469 &get_hello,
470 p); 470 p);
471 GNUNET_assert (NULL != p->ghh); 471 GNUNET_assert (NULL != p->ghh);
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c
index 59f249686..e7db5493e 100644
--- a/src/transport/transport_api.c
+++ b/src/transport/transport_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009-2013 GNUnet e.V. 3 Copyright (C) 2009-2013, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -163,86 +163,6 @@ struct Neighbour
163}; 163};
164 164
165 165
166/**
167 * Linked list of functions to call whenever our HELLO is updated.
168 */
169struct GNUNET_TRANSPORT_GetHelloHandle
170{
171
172 /**
173 * This is a doubly linked list.
174 */
175 struct GNUNET_TRANSPORT_GetHelloHandle *next;
176
177 /**
178 * This is a doubly linked list.
179 */
180 struct GNUNET_TRANSPORT_GetHelloHandle *prev;
181
182 /**
183 * Transport handle.
184 */
185 struct GNUNET_TRANSPORT_Handle *handle;
186
187 /**
188 * Callback to call once we got our HELLO.
189 */
190 GNUNET_TRANSPORT_HelloUpdateCallback rec;
191
192 /**
193 * Task for calling the HelloUpdateCallback when we already have a HELLO
194 */
195 struct GNUNET_SCHEDULER_Task *notify_task;
196
197 /**
198 * Closure for @e rec.
199 */
200 void *rec_cls;
201
202};
203
204
205/**
206 * Entry in linked list for all offer-HELLO requests.
207 */
208struct GNUNET_TRANSPORT_OfferHelloHandle
209{
210 /**
211 * For the DLL.
212 */
213 struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
214
215 /**
216 * For the DLL.
217 */
218 struct GNUNET_TRANSPORT_OfferHelloHandle *next;
219
220 /**
221 * Transport service handle we use for transmission.
222 */
223 struct GNUNET_TRANSPORT_Handle *th;
224
225 /**
226 * Transmission handle for this request.
227 */
228 struct GNUNET_TRANSPORT_TransmitHandle *tth;
229
230 /**
231 * Function to call once we are done.
232 */
233 GNUNET_SCHEDULER_TaskCallback cont;
234
235 /**
236 * Closure for @e cont
237 */
238 void *cls;
239
240 /**
241 * The HELLO message to be transmitted.
242 */
243 struct GNUNET_MessageHeader *msg;
244};
245
246 166
247/** 167/**
248 * Handle for the transport service (includes all of the 168 * Handle for the transport service (includes all of the
@@ -277,16 +197,6 @@ struct GNUNET_TRANSPORT_Handle
277 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb; 197 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
278 198
279 /** 199 /**
280 * Head of DLL of control messages.
281 */
282 struct GNUNET_TRANSPORT_TransmitHandle *control_head;
283
284 /**
285 * Tail of DLL of control messages.
286 */
287 struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
288
289 /**
290 * The current HELLO message for this peer. Updated 200 * The current HELLO message for this peer. Updated
291 * whenever transports change their addresses. 201 * whenever transports change their addresses.
292 */ 202 */
@@ -295,32 +205,7 @@ struct GNUNET_TRANSPORT_Handle
295 /** 205 /**
296 * My client connection to the transport service. 206 * My client connection to the transport service.
297 */ 207 */
298 struct GNUNET_CLIENT_Connection *client; 208 struct GNUNET_MQ_Handle *mq;
299
300 /**
301 * Handle to our registration with the client for notification.
302 */
303 struct GNUNET_CLIENT_TransmitHandle *cth;
304
305 /**
306 * Linked list of pending requests for our HELLO.
307 */
308 struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
309
310 /**
311 * Linked list of pending requests for our HELLO.
312 */
313 struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
314
315 /**
316 * Linked list of pending offer HELLO requests head
317 */
318 struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
319
320 /**
321 * Linked list of pending offer HELLO requests tail
322 */
323 struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
324 209
325 /** 210 /**
326 * My configuration. 211 * My configuration.
@@ -458,7 +343,8 @@ outbound_bw_tracker_update (void *cls)
458 GNUNET_STRINGS_relative_time_to_string (delay, 343 GNUNET_STRINGS_relative_time_to_string (delay,
459 GNUNET_NO)); 344 GNUNET_NO));
460 GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap, 345 GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
461 n->hn, delay.rel_value_us); 346 n->hn,
347 delay.rel_value_us);
462 schedule_transmission (n->h); 348 schedule_transmission (n->h);
463} 349}
464 350
@@ -558,268 +444,296 @@ neighbour_delete (void *cls,
558 444
559 445
560/** 446/**
561 * Function we use for handling incoming messages. 447 * Generic error handler, called with the appropriate
448 * error code and the same closure specified at the creation of
449 * the message queue.
450 * Not every message queue implementation supports an error handler.
451 *
452 * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
453 * @param error error code
454 */
455static void
456mq_error_handler (void *cls,
457 enum GNUNET_MQ_Error error)
458{
459 struct GNUNET_TRANSPORT_Handle *h = cls;
460
461 LOG (GNUNET_ERROR_TYPE_DEBUG,
462 "Error receiving from transport service, disconnecting temporarily.\n");
463 h->reconnecting = GNUNET_YES;
464 disconnect_and_schedule_reconnect (h);
465}
466
467
468/**
469 * Function we use for checking incoming HELLO messages.
562 * 470 *
563 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` 471 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
564 * @param msg message received, NULL on timeout or fatal error 472 * @param msg message received
473 * @return #GNUNET_OK if message is well-formed
474 */
475static int
476check_hello (void *cls,
477 const struct GNUNET_MessageHeader *msg)
478{
479 struct GNUNET_PeerIdentity me;
480
481 if (GNUNET_OK !=
482 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
483 &me))
484 {
485 GNUNET_break (0);
486 return GNUNET_SYSERR;
487 }
488 LOG (GNUNET_ERROR_TYPE_DEBUG,
489 "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
490 (unsigned int) ntohs (msg->size),
491 GNUNET_i2s (&me));
492 return GNUNET_OK;
493}
494
495
496/**
497 * Function we use for handling incoming HELLO messages.
498 *
499 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
500 * @param msg message received
565 */ 501 */
566static void 502static void
567demultiplexer (void *cls, 503handle_hello (void *cls,
568 const struct GNUNET_MessageHeader *msg) 504 const struct GNUNET_MessageHeader *msg)
505{
506 struct GNUNET_TRANSPORT_Handle *h = cls;
507
508 GNUNET_free_non_null (h->my_hello);
509 h->my_hello = GNUNET_copy_message (msg);
510}
511
512
513/**
514 * Function we use for handling incoming connect messages.
515 *
516 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
517 * @param cim message received
518 */
519static void
520handle_connect (void *cls,
521 const struct ConnectInfoMessage *cim)
522{
523 struct GNUNET_TRANSPORT_Handle *h = cls;
524 struct Neighbour *n;
525
526 LOG (GNUNET_ERROR_TYPE_DEBUG,
527 "Receiving CONNECT message for `%s'.\n",
528 GNUNET_i2s (&cim->id));
529 n = neighbour_find (h, &cim->id);
530 if (NULL != n)
531 {
532 GNUNET_break (0);
533 h->reconnecting = GNUNET_YES;
534 disconnect_and_schedule_reconnect (h);
535 return;
536 }
537 n = neighbour_add (h,
538 &cim->id);
539 LOG (GNUNET_ERROR_TYPE_DEBUG,
540 "Receiving CONNECT message for `%s' with quota %u\n",
541 GNUNET_i2s (&cim->id),
542 ntohl (cim->quota_out.value__));
543 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
544 cim->quota_out);
545 if (NULL != h->nc_cb)
546 h->nc_cb (h->cls,
547 &n->id);
548}
549
550
551/**
552 * Function we use for handling incoming disconnect messages.
553 *
554 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
555 * @param dim message received
556 */
557static void
558handle_disconnect (void *cls,
559 const struct DisconnectInfoMessage *dim)
560{
561 struct GNUNET_TRANSPORT_Handle *h = cls;
562 struct Neighbour *n;
563
564 GNUNET_break (ntohl (dim->reserved) == 0);
565 LOG (GNUNET_ERROR_TYPE_DEBUG,
566 "Receiving DISCONNECT message for `%s'.\n",
567 GNUNET_i2s (&dim->peer));
568 n = neighbour_find (h, &dim->peer);
569 if (NULL == n)
570 {
571 GNUNET_break (0);
572 h->reconnecting = GNUNET_YES;
573 disconnect_and_schedule_reconnect (h);
574 return;
575 }
576 neighbour_delete (h,
577 &dim->peer,
578 n);
579}
580
581
582/**
583 * Function we use for handling incoming send-ok messages.
584 *
585 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
586 * @param okm message received
587 */
588static void
589handle_send_ok (void *cls,
590 const struct SendOkMessage *okm)
569{ 591{
570 struct GNUNET_TRANSPORT_Handle *h = cls; 592 struct GNUNET_TRANSPORT_Handle *h = cls;
571 const struct DisconnectInfoMessage *dim;
572 const struct ConnectInfoMessage *cim;
573 const struct InboundMessage *im;
574 const struct GNUNET_MessageHeader *imm;
575 const struct SendOkMessage *okm;
576 const struct QuotaSetMessage *qm;
577 struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
578 struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
579 struct Neighbour *n; 593 struct Neighbour *n;
580 struct GNUNET_PeerIdentity me;
581 uint16_t size;
582 uint32_t bytes_msg; 594 uint32_t bytes_msg;
583 uint32_t bytes_physical; 595 uint32_t bytes_physical;
584 596
585 GNUNET_assert (NULL != h->client); 597 bytes_msg = ntohl (okm->bytes_msg);
586 if (GNUNET_YES == h->reconnecting) 598 bytes_physical = ntohl (okm->bytes_physical);
599 LOG (GNUNET_ERROR_TYPE_DEBUG,
600 "Receiving SEND_OK message, transmission to %s %s.\n",
601 GNUNET_i2s (&okm->peer),
602 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
603
604 n = neighbour_find (h,
605 &okm->peer);
606 if (NULL == n)
587 { 607 {
608 /* We should never get a 'SEND_OK' for a peer that we are not
609 connected to */
610 GNUNET_break (0);
611 h->reconnecting = GNUNET_YES;
612 disconnect_and_schedule_reconnect (h);
588 return; 613 return;
589 } 614 }
590 if (NULL == msg) 615 if (bytes_physical > bytes_msg)
591 { 616 {
592 LOG (GNUNET_ERROR_TYPE_DEBUG, 617 LOG (GNUNET_ERROR_TYPE_DEBUG,
593 "Error receiving from transport service, disconnecting temporarily.\n"); 618 "Overhead for %u byte message was %u\n",
619 bytes_msg,
620 bytes_physical - bytes_msg);
621 n->traffic_overhead += bytes_physical - bytes_msg;
622 }
623 GNUNET_break (GNUNET_NO == n->is_ready);
624 n->is_ready = GNUNET_YES;
625 if (NULL != n->unready_warn_task)
626 {
627 GNUNET_SCHEDULER_cancel (n->unready_warn_task);
628 n->unready_warn_task = NULL;
629 }
630 if ((NULL != n->th) && (NULL == n->hn))
631 {
632 GNUNET_assert (NULL != n->th->timeout_task);
633 GNUNET_SCHEDULER_cancel (n->th->timeout_task);
634 n->th->timeout_task = NULL;
635 /* we've been waiting for this (congestion, not quota,
636 * caused delayed transmission) */
637 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
638 n,
639 0);
640 }
641 schedule_transmission (h);
642}
643
644
645/**
646 * Function we use for checking incoming "inbound" messages.
647 *
648 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
649 * @param im message received
650 */
651static int
652check_recv (void *cls,
653 const struct InboundMessage *im)
654{
655 const struct GNUNET_MessageHeader *imm;
656 uint16_t size;
657
658 size = ntohs (im->header.size);
659 if (size <
660 sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
661 {
662 GNUNET_break (0);
663 return GNUNET_SYSERR;
664 }
665 imm = (const struct GNUNET_MessageHeader *) &im[1];
666 if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
667 {
668 GNUNET_break (0);
669 return GNUNET_SYSERR;
670 }
671 return GNUNET_OK;
672}
673
674
675/**
676 * Function we use for handling incoming messages.
677 *
678 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
679 * @param im message received
680 */
681static void
682handle_recv (void *cls,
683 const struct InboundMessage *im)
684{
685 struct GNUNET_TRANSPORT_Handle *h = cls;
686 const struct GNUNET_MessageHeader *imm
687 = (const struct GNUNET_MessageHeader *) &im[1];
688 struct Neighbour *n;
689
690 LOG (GNUNET_ERROR_TYPE_DEBUG,
691 "Received message of type %u with %u bytes from `%s'.\n",
692 (unsigned int) ntohs (imm->type),
693 (unsigned int) ntohs (imm->size),
694 GNUNET_i2s (&im->peer));
695 n = neighbour_find (h, &im->peer);
696 if (NULL == n)
697 {
698 GNUNET_break (0);
594 h->reconnecting = GNUNET_YES; 699 h->reconnecting = GNUNET_YES;
595 disconnect_and_schedule_reconnect (h); 700 disconnect_and_schedule_reconnect (h);
596 return; 701 return;
597 } 702 }
598 GNUNET_CLIENT_receive (h->client, 703 if (NULL != h->rec)
599 &demultiplexer, 704 h->rec (h->cls,
600 h, 705 &im->peer,
601 GNUNET_TIME_UNIT_FOREVER_REL); 706 imm);
602 size = ntohs (msg->size); 707}
603 switch (ntohs (msg->type))
604 {
605 case GNUNET_MESSAGE_TYPE_HELLO:
606 if (GNUNET_OK !=
607 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
608 &me))
609 {
610 GNUNET_break (0);
611 break;
612 }
613 LOG (GNUNET_ERROR_TYPE_DEBUG,
614 "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
615 (unsigned int) size,
616 GNUNET_i2s (&me));
617 GNUNET_free_non_null (h->my_hello);
618 h->my_hello = NULL;
619 if (size < sizeof (struct GNUNET_MessageHeader))
620 {
621 GNUNET_break (0);
622 break;
623 }
624 h->my_hello = GNUNET_copy_message (msg);
625 hwl = h->hwl_head;
626 while (NULL != hwl)
627 {
628 next_hwl = hwl->next;
629 hwl->rec (hwl->rec_cls,
630 h->my_hello);
631 hwl = next_hwl;
632 }
633 break;
634 case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
635 if (size < sizeof (struct ConnectInfoMessage))
636 {
637 GNUNET_break (0);
638 h->reconnecting = GNUNET_YES;
639 disconnect_and_schedule_reconnect (h);
640 break;
641 }
642 cim = (const struct ConnectInfoMessage *) msg;
643 if (size !=
644 sizeof (struct ConnectInfoMessage))
645 {
646 GNUNET_break (0);
647 h->reconnecting = GNUNET_YES;
648 disconnect_and_schedule_reconnect (h);
649 break;
650 }
651 LOG (GNUNET_ERROR_TYPE_DEBUG,
652 "Receiving CONNECT message for `%s'.\n",
653 GNUNET_i2s (&cim->id));
654 n = neighbour_find (h, &cim->id);
655 if (NULL != n)
656 {
657 GNUNET_break (0);
658 h->reconnecting = GNUNET_YES;
659 disconnect_and_schedule_reconnect (h);
660 break;
661 }
662 n = neighbour_add (h,
663 &cim->id);
664 LOG (GNUNET_ERROR_TYPE_DEBUG,
665 "Receiving CONNECT message for `%s' with quota %u\n",
666 GNUNET_i2s (&cim->id),
667 ntohl (cim->quota_out.value__));
668 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
669 cim->quota_out);
670 if (NULL != h->nc_cb)
671 h->nc_cb (h->cls,
672 &n->id);
673 break;
674 case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
675 if (size != sizeof (struct DisconnectInfoMessage))
676 {
677 GNUNET_break (0);
678 h->reconnecting = GNUNET_YES;
679 disconnect_and_schedule_reconnect (h);
680 break;
681 }
682 dim = (const struct DisconnectInfoMessage *) msg;
683 GNUNET_break (ntohl (dim->reserved) == 0);
684 LOG (GNUNET_ERROR_TYPE_DEBUG,
685 "Receiving DISCONNECT message for `%s'.\n",
686 GNUNET_i2s (&dim->peer));
687 n = neighbour_find (h, &dim->peer);
688 if (NULL == n)
689 {
690 GNUNET_break (0);
691 h->reconnecting = GNUNET_YES;
692 disconnect_and_schedule_reconnect (h);
693 break;
694 }
695 neighbour_delete (h,
696 &dim->peer,
697 n);
698 break;
699 case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
700 if (size != sizeof (struct SendOkMessage))
701 {
702 GNUNET_break (0);
703 h->reconnecting = GNUNET_YES;
704 disconnect_and_schedule_reconnect (h);
705 break;
706 }
707 okm = (const struct SendOkMessage *) msg;
708 bytes_msg = ntohl (okm->bytes_msg);
709 bytes_physical = ntohl (okm->bytes_physical);
710 LOG (GNUNET_ERROR_TYPE_DEBUG,
711 "Receiving SEND_OK message, transmission to %s %s.\n",
712 GNUNET_i2s (&okm->peer),
713 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
714 708
715 n = neighbour_find (h, 709
716 &okm->peer); 710/**
717 if (NULL == n) 711 * Function we use for handling incoming set quota messages.
718 { 712 *
719 /* We should never get a 'SEND_OK' for a peer that we are not 713 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
720 connected to */ 714 * @param msg message received
721 GNUNET_break (0); 715 */
722 h->reconnecting = GNUNET_YES; 716static void
723 disconnect_and_schedule_reconnect (h); 717handle_set_quota (void *cls,
724 break; 718 const struct QuotaSetMessage *qm)
725 } 719{
726 if (bytes_physical > bytes_msg) 720 struct GNUNET_TRANSPORT_Handle *h = cls;
727 { 721 struct Neighbour *n;
728 LOG (GNUNET_ERROR_TYPE_DEBUG, 722
729 "Overhead for %u byte message was %u\n", 723 n = neighbour_find (h, &qm->peer);
730 bytes_msg, 724 if (NULL == n)
731 bytes_physical - bytes_msg); 725 {
732 n->traffic_overhead += bytes_physical - bytes_msg;
733 }
734 GNUNET_break (GNUNET_NO == n->is_ready);
735 n->is_ready = GNUNET_YES;
736 if (NULL != n->unready_warn_task)
737 {
738 GNUNET_SCHEDULER_cancel (n->unready_warn_task);
739 n->unready_warn_task = NULL;
740 }
741 if ((NULL != n->th) && (NULL == n->hn))
742 {
743 GNUNET_assert (NULL != n->th->timeout_task);
744 GNUNET_SCHEDULER_cancel (n->th->timeout_task);
745 n->th->timeout_task = NULL;
746 /* we've been waiting for this (congestion, not quota,
747 * caused delayed transmission) */
748 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
749 n,
750 0);
751 }
752 schedule_transmission (h);
753 break;
754 case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
755 if (size <
756 sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
757 {
758 GNUNET_break (0);
759 h->reconnecting = GNUNET_YES;
760 disconnect_and_schedule_reconnect (h);
761 break;
762 }
763 im = (const struct InboundMessage *) msg;
764 imm = (const struct GNUNET_MessageHeader *) &im[1];
765 if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
766 {
767 GNUNET_break (0);
768 h->reconnecting = GNUNET_YES;
769 disconnect_and_schedule_reconnect (h);
770 break;
771 }
772 LOG (GNUNET_ERROR_TYPE_DEBUG,
773 "Received message of type %u with %u bytes from `%s'.\n",
774 (unsigned int) ntohs (imm->type),
775 (unsigned int) ntohs (imm->size),
776 GNUNET_i2s (&im->peer));
777 n = neighbour_find (h, &im->peer);
778 if (NULL == n)
779 {
780 GNUNET_break (0);
781 h->reconnecting = GNUNET_YES;
782 disconnect_and_schedule_reconnect (h);
783 break;
784 }
785 if (NULL != h->rec)
786 h->rec (h->cls,
787 &im->peer,
788 imm);
789 break;
790 case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
791 if (size != sizeof (struct QuotaSetMessage))
792 {
793 GNUNET_break (0);
794 h->reconnecting = GNUNET_YES;
795 disconnect_and_schedule_reconnect (h);
796 break;
797 }
798 qm = (const struct QuotaSetMessage *) msg;
799 n = neighbour_find (h, &qm->peer);
800 if (NULL == n)
801 {
802 GNUNET_break (0);
803 h->reconnecting = GNUNET_YES;
804 disconnect_and_schedule_reconnect (h);
805 break;
806 }
807 LOG (GNUNET_ERROR_TYPE_DEBUG,
808 "Receiving SET_QUOTA message for `%s' with quota %u\n",
809 GNUNET_i2s (&qm->peer),
810 ntohl (qm->quota.value__));
811 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
812 qm->quota);
813 break;
814 default:
815 LOG (GNUNET_ERROR_TYPE_ERROR,
816 _("Received unexpected message of type %u in %s:%u\n"),
817 ntohs (msg->type),
818 __FILE__,
819 __LINE__);
820 GNUNET_break (0); 726 GNUNET_break (0);
821 break; 727 h->reconnecting = GNUNET_YES;
728 disconnect_and_schedule_reconnect (h);
729 return;
822 } 730 }
731 LOG (GNUNET_ERROR_TYPE_DEBUG,
732 "Receiving SET_QUOTA message for `%s' with quota %u\n",
733 GNUNET_i2s (&qm->peer),
734 ntohl (qm->quota.value__));
735 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
736 qm->quota);
823} 737}
824 738
825 739
@@ -854,104 +768,53 @@ timeout_request_due_to_congestion (void *cls)
854 768
855 769
856/** 770/**
857 * Transmit message(s) to service. 771 * Transmit ready message(s) to service.
858 * 772 *
859 * @param cls handle to transport 773 * @param h handle to transport
860 * @param size number of bytes available in @a buf
861 * @param buf where to copy the message
862 * @return number of bytes copied to @a buf
863 */ 774 */
864static size_t 775static void
865transport_notify_ready (void *cls, 776transmit_ready (struct GNUNET_TRANSPORT_Handle *h)
866 size_t size,
867 void *buf)
868{ 777{
869 struct GNUNET_TRANSPORT_Handle *h = cls;
870 struct GNUNET_TRANSPORT_TransmitHandle *th; 778 struct GNUNET_TRANSPORT_TransmitHandle *th;
871 struct GNUNET_TIME_Relative delay; 779 struct GNUNET_TIME_Relative delay;
872 struct Neighbour *n; 780 struct Neighbour *n;
873 char *cbuf; 781 struct OutboundMessage *obm;
874 struct OutboundMessage obm; 782 struct GNUNET_MQ_Envelope *env;
875 size_t ret;
876 size_t nret;
877 size_t mret; 783 size_t mret;
878 784
879 GNUNET_assert (NULL != h->client); 785 GNUNET_assert (NULL != h->mq);
880 h->cth = NULL; 786 while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
881 if (NULL == buf)
882 {
883 /* transmission failed */
884 disconnect_and_schedule_reconnect (h);
885 return 0;
886 }
887
888 cbuf = buf;
889 ret = 0;
890 /* first send control messages */
891 while ( (NULL != (th = h->control_head)) &&
892 (th->notify_size <= size) )
893 {
894 GNUNET_CONTAINER_DLL_remove (h->control_head,
895 h->control_tail,
896 th);
897 nret = th->notify (th->notify_cls,
898 size,
899 &cbuf[ret]);
900 delay = GNUNET_TIME_absolute_get_duration (th->request_start);
901 if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
902 LOG (GNUNET_ERROR_TYPE_WARNING,
903 "Added %u bytes of control message at %u after %s delay\n",
904 nret,
905 ret,
906 GNUNET_STRINGS_relative_time_to_string (delay,
907 GNUNET_YES));
908 else
909 LOG (GNUNET_ERROR_TYPE_DEBUG,
910 "Added %u bytes of control message at %u after %s delay\n",
911 nret,
912 ret,
913 GNUNET_STRINGS_relative_time_to_string (delay,
914 GNUNET_YES));
915 GNUNET_free (th);
916 ret += nret;
917 size -= nret;
918 }
919
920 /* then, if possible and no control messages pending, send data messages */
921 while ( (NULL == h->control_head) &&
922 (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
923 { 787 {
788 th = n->th;
924 if (GNUNET_YES != n->is_ready) 789 if (GNUNET_YES != n->is_ready)
925 { 790 {
926 /* peer not ready, wait for notification! */ 791 /* peer not ready, wait for notification! */
927 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); 792 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
928 n->hn = NULL; 793 n->hn = NULL;
929 GNUNET_assert (NULL == n->th->timeout_task); 794 GNUNET_assert (NULL == n->th->timeout_task);
930 n->th->timeout_task 795 th->timeout_task
931 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining 796 = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
932 (n->th->timeout), 797 (th->timeout),
933 &timeout_request_due_to_congestion, 798 &timeout_request_due_to_congestion,
934 n->th); 799 th);
935 continue; 800 continue;
936 } 801 }
937 th = n->th; 802 if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
938 if (th->notify_size + sizeof (struct OutboundMessage) > size) 803 th->notify_size).rel_value_us > 0)
939 break; /* does not fit */
940 if (GNUNET_BANDWIDTH_tracker_get_delay
941 (&n->out_tracker,
942 th->notify_size).rel_value_us > 0)
943 break; /* too early */ 804 break; /* too early */
944 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); 805 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
945 n->hn = NULL; 806 n->hn = NULL;
946 n->th = NULL; 807 n->th = NULL;
947 GNUNET_assert (size >= sizeof (struct OutboundMessage)); 808 env = GNUNET_MQ_msg_extra (obm,
809 th->notify_size,
810 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
948 mret = th->notify (th->notify_cls, 811 mret = th->notify (th->notify_cls,
949 size - sizeof (struct OutboundMessage), 812 th->notify_size,
950 &cbuf[ret + sizeof (struct OutboundMessage)]); 813 &obm[1]);
951 GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
952 if (0 == mret) 814 if (0 == mret)
953 { 815 {
954 GNUNET_free (th); 816 GNUNET_free (th);
817 GNUNET_MQ_discard (env);
955 continue; 818 continue;
956 } 819 }
957 if (NULL != n->unready_warn_task) 820 if (NULL != n->unready_warn_task)
@@ -961,20 +824,13 @@ transport_notify_ready (void *cls,
961 n); 824 n);
962 n->last_payload = GNUNET_TIME_absolute_get (); 825 n->last_payload = GNUNET_TIME_absolute_get ();
963 n->is_ready = GNUNET_NO; 826 n->is_ready = GNUNET_NO;
964 GNUNET_assert (mret + sizeof (struct OutboundMessage) < 827 obm->reserved = htonl (0);
965 GNUNET_SERVER_MAX_MESSAGE_SIZE); 828 obm->timeout =
966 obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
967 obm.header.size = htons (mret + sizeof (struct OutboundMessage));
968 obm.reserved = htonl (0);
969 obm.timeout =
970 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining 829 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
971 (th->timeout)); 830 (th->timeout));
972 obm.peer = n->id; 831 obm->peer = n->id;
973 memcpy (&cbuf[ret], 832 GNUNET_MQ_send (h->mq,
974 &obm, 833 env);
975 sizeof (struct OutboundMessage));
976 ret += (mret + sizeof (struct OutboundMessage));
977 size -= (mret + sizeof (struct OutboundMessage));
978 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, 834 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
979 mret); 835 mret);
980 delay = GNUNET_TIME_absolute_get_duration (th->request_start); 836 delay = GNUNET_TIME_absolute_get_duration (th->request_start);
@@ -995,14 +851,9 @@ transport_notify_ready (void *cls,
995 GNUNET_YES), 851 GNUNET_YES),
996 (unsigned int) n->out_tracker.available_bytes_per_s__); 852 (unsigned int) n->out_tracker.available_bytes_per_s__);
997 GNUNET_free (th); 853 GNUNET_free (th);
998 break;
999 } 854 }
1000 /* if there are more pending messages, try to schedule those */ 855 /* if there are more pending messages, try to schedule those */
1001 schedule_transmission (h); 856 schedule_transmission (h);
1002 LOG (GNUNET_ERROR_TYPE_DEBUG,
1003 "Transmitting %u bytes to transport service\n",
1004 ret);
1005 return ret;
1006} 857}
1007 858
1008 859
@@ -1016,12 +867,11 @@ static void
1016schedule_transmission_task (void *cls) 867schedule_transmission_task (void *cls)
1017{ 868{
1018 struct GNUNET_TRANSPORT_Handle *h = cls; 869 struct GNUNET_TRANSPORT_Handle *h = cls;
1019 size_t size;
1020 struct GNUNET_TRANSPORT_TransmitHandle *th; 870 struct GNUNET_TRANSPORT_TransmitHandle *th;
1021 struct Neighbour *n; 871 struct Neighbour *n;
1022 872
1023 h->quota_task = NULL; 873 h->quota_task = NULL;
1024 GNUNET_assert (NULL != h->client); 874 GNUNET_assert (NULL != h->mq);
1025 /* destroy all requests that have timed out */ 875 /* destroy all requests that have timed out */
1026 while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) && 876 while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
1027 (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) ) 877 (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
@@ -1040,29 +890,12 @@ schedule_transmission_task (void *cls)
1040 NULL)); 890 NULL));
1041 GNUNET_free (th); 891 GNUNET_free (th);
1042 } 892 }
1043 if (NULL != h->cth) 893 n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
1044 return; 894 if (NULL == n)
1045 if (NULL != h->control_head) 895 return; /* no pending messages */
1046 {
1047 size = h->control_head->notify_size;
1048 }
1049 else
1050 {
1051 n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
1052 if (NULL == n)
1053 return; /* no pending messages */
1054 size = n->th->notify_size + sizeof (struct OutboundMessage);
1055 }
1056 LOG (GNUNET_ERROR_TYPE_DEBUG, 896 LOG (GNUNET_ERROR_TYPE_DEBUG,
1057 "Calling notify_transmit_ready\n"); 897 "Calling notify_transmit_ready\n");
1058 h->cth 898 transmit_ready (h);
1059 = GNUNET_CLIENT_notify_transmit_ready (h->client,
1060 size,
1061 GNUNET_TIME_UNIT_FOREVER_REL,
1062 GNUNET_NO,
1063 &transport_notify_ready,
1064 h);
1065 GNUNET_assert (NULL != h->cth);
1066} 899}
1067 900
1068 901
@@ -1078,15 +911,13 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
1078 struct GNUNET_TIME_Relative delay; 911 struct GNUNET_TIME_Relative delay;
1079 struct Neighbour *n; 912 struct Neighbour *n;
1080 913
1081 GNUNET_assert (NULL != h->client); 914 GNUNET_assert (NULL != h->mq);
1082 if (NULL != h->quota_task) 915 if (NULL != h->quota_task)
1083 { 916 {
1084 GNUNET_SCHEDULER_cancel (h->quota_task); 917 GNUNET_SCHEDULER_cancel (h->quota_task);
1085 h->quota_task = NULL; 918 h->quota_task = NULL;
1086 } 919 }
1087 if (NULL != h->control_head) 920 if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
1088 delay = GNUNET_TIME_UNIT_ZERO;
1089 else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
1090 { 921 {
1091 delay = 922 delay =
1092 GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 923 GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
@@ -1111,83 +942,6 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
1111 942
1112 943
1113/** 944/**
1114 * Queue control request for transmission to the transport
1115 * service.
1116 *
1117 * @param h handle to the transport service
1118 * @param size number of bytes to be transmitted
1119 * @param notify function to call to get the content
1120 * @param notify_cls closure for @a notify
1121 * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
1122 */
1123static struct GNUNET_TRANSPORT_TransmitHandle *
1124schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
1125 size_t size,
1126 GNUNET_TRANSPORT_TransmitReadyNotify notify,
1127 void *notify_cls)
1128{
1129 struct GNUNET_TRANSPORT_TransmitHandle *th;
1130
1131 LOG (GNUNET_ERROR_TYPE_DEBUG,
1132 "Control transmit of %u bytes requested\n",
1133 size);
1134 th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1135 th->notify = notify;
1136 th->notify_cls = notify_cls;
1137 th->notify_size = size;
1138 th->request_start = GNUNET_TIME_absolute_get ();
1139 GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
1140 h->control_tail,
1141 th);
1142 schedule_transmission (h);
1143 return th;
1144}
1145
1146
1147/**
1148 * Transmit START message to service.
1149 *
1150 * @param cls unused
1151 * @param size number of bytes available in @a buf
1152 * @param buf where to copy the message
1153 * @return number of bytes copied to @a buf
1154 */
1155static size_t
1156send_start (void *cls,
1157 size_t size,
1158 void *buf)
1159{
1160 struct GNUNET_TRANSPORT_Handle *h = cls;
1161 struct StartMessage s;
1162 uint32_t options;
1163
1164 if (NULL == buf)
1165 {
1166 /* Can only be shutdown, just give up */
1167 LOG (GNUNET_ERROR_TYPE_DEBUG,
1168 "Shutdown while trying to transmit START request.\n");
1169 return 0;
1170 }
1171 LOG (GNUNET_ERROR_TYPE_DEBUG,
1172 "Transmitting START request.\n");
1173 GNUNET_assert (size >= sizeof (struct StartMessage));
1174 s.header.size = htons (sizeof (struct StartMessage));
1175 s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1176 options = 0;
1177 if (h->check_self)
1178 options |= 1;
1179 if (NULL != h->rec)
1180 options |= 2;
1181 s.options = htonl (options);
1182 s.self = h->self;
1183 memcpy (buf, &s, sizeof (struct StartMessage));
1184 GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
1185 GNUNET_TIME_UNIT_FOREVER_REL);
1186 return sizeof (struct StartMessage);
1187}
1188
1189
1190/**
1191 * Try again to connect to transport service. 945 * Try again to connect to transport service.
1192 * 946 *
1193 * @param cls the handle to the transport service 947 * @param cls the handle to the transport service
@@ -1195,20 +949,61 @@ send_start (void *cls,
1195static void 949static void
1196reconnect (void *cls) 950reconnect (void *cls)
1197{ 951{
952 GNUNET_MQ_hd_var_size (hello,
953 GNUNET_MESSAGE_TYPE_HELLO,
954 struct GNUNET_MessageHeader);
955 GNUNET_MQ_hd_fixed_size (connect,
956 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
957 struct ConnectInfoMessage);
958 GNUNET_MQ_hd_fixed_size (disconnect,
959 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
960 struct DisconnectInfoMessage);
961 GNUNET_MQ_hd_fixed_size (send_ok,
962 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
963 struct SendOkMessage);
964 GNUNET_MQ_hd_var_size (recv,
965 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
966 struct InboundMessage);
967 GNUNET_MQ_hd_fixed_size (set_quota,
968 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
969 struct QuotaSetMessage);
1198 struct GNUNET_TRANSPORT_Handle *h = cls; 970 struct GNUNET_TRANSPORT_Handle *h = cls;
971 struct GNUNET_MQ_MessageHandler handlers[] = {
972 make_hello_handler (h),
973 make_connect_handler (h),
974 make_disconnect_handler (h),
975 make_send_ok_handler (h),
976 make_recv_handler (h),
977 make_set_quota_handler (h),
978 GNUNET_MQ_handler_end ()
979 };
980 struct GNUNET_MQ_Envelope *env;
981 struct StartMessage *s;
982 uint32_t options;
1199 983
1200 h->reconnect_task = NULL; 984 h->reconnect_task = NULL;
1201 LOG (GNUNET_ERROR_TYPE_DEBUG, 985 LOG (GNUNET_ERROR_TYPE_DEBUG,
1202 "Connecting to transport service.\n"); 986 "Connecting to transport service.\n");
1203 GNUNET_assert (NULL == h->client); 987 GNUNET_assert (NULL == h->mq);
1204 GNUNET_assert (NULL == h->control_head);
1205 GNUNET_assert (NULL == h->control_tail);
1206 h->reconnecting = GNUNET_NO; 988 h->reconnecting = GNUNET_NO;
1207 h->client = GNUNET_CLIENT_connect ("transport", h->cfg); 989 h->mq = GNUNET_CLIENT_connecT (h->cfg,
1208 990 "transport",
1209 GNUNET_assert (NULL != h->client); 991 handlers,
1210 schedule_control_transmit (h, sizeof (struct StartMessage), 992 &mq_error_handler,
1211 &send_start, h); 993 h);
994 if (NULL == h->mq)
995 return;
996 env = GNUNET_MQ_msg (s,
997 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
998 options = 0;
999 if (h->check_self)
1000 options |= 1;
1001 if (NULL != h->rec)
1002 options |= 2;
1003 s->options = htonl (options);
1004 s->self = h->self;
1005 GNUNET_MQ_send (h->mq,
1006 env);
1212} 1007}
1213 1008
1214 1009
@@ -1221,20 +1016,11 @@ reconnect (void *cls)
1221static void 1016static void
1222disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) 1017disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1223{ 1018{
1224 struct GNUNET_TRANSPORT_TransmitHandle *th;
1225
1226 GNUNET_assert (NULL == h->reconnect_task); 1019 GNUNET_assert (NULL == h->reconnect_task);
1227 if (NULL != h->cth) 1020 if (NULL != h->mq)
1228 {
1229 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
1230 h->cth = NULL;
1231 }
1232 if (NULL != h->client)
1233 { 1021 {
1234 GNUNET_CLIENT_disconnect (h->client); 1022 GNUNET_MQ_destroy (h->mq);
1235 h->client = NULL; 1023 h->mq = NULL;
1236/* LOG (GNUNET_ERROR_TYPE_ERROR,
1237 "Client disconnect done \n");*/
1238 } 1024 }
1239 /* Forget about all neighbours that we used to be connected to */ 1025 /* Forget about all neighbours that we used to be connected to */
1240 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, 1026 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
@@ -1245,16 +1031,6 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1245 GNUNET_SCHEDULER_cancel (h->quota_task); 1031 GNUNET_SCHEDULER_cancel (h->quota_task);
1246 h->quota_task = NULL; 1032 h->quota_task = NULL;
1247 } 1033 }
1248 while ((NULL != (th = h->control_head)))
1249 {
1250 GNUNET_CONTAINER_DLL_remove (h->control_head,
1251 h->control_tail,
1252 th);
1253 th->notify (th->notify_cls,
1254 0,
1255 NULL);
1256 GNUNET_free (th);
1257 }
1258 LOG (GNUNET_ERROR_TYPE_DEBUG, 1034 LOG (GNUNET_ERROR_TYPE_DEBUG,
1259 "Scheduling task to reconnect to transport service in %s.\n", 1035 "Scheduling task to reconnect to transport service in %s.\n",
1260 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, 1036 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
@@ -1268,109 +1044,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1268 1044
1269 1045
1270/** 1046/**
1271 * Cancel control request for transmission to the transport service. 1047 * Set transport metrics for a peer and a direction.
1272 *
1273 * @param th handle to the transport service
1274 * @param tth transmit handle to cancel
1275 */
1276static void
1277cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
1278 struct GNUNET_TRANSPORT_TransmitHandle *tth)
1279{
1280 LOG (GNUNET_ERROR_TYPE_DEBUG,
1281 "Canceling transmit of contral transmission requested\n");
1282 GNUNET_CONTAINER_DLL_remove (th->control_head,
1283 th->control_tail,
1284 tth);
1285 GNUNET_free (tth);
1286}
1287
1288
1289/**
1290 * Send HELLO message to the service.
1291 *
1292 * @param cls the HELLO message to send
1293 * @param size number of bytes available in @a buf
1294 * @param buf where to copy the message
1295 * @return number of bytes copied to @a buf
1296 */
1297static size_t
1298send_hello (void *cls,
1299 size_t size,
1300 void *buf)
1301{
1302 struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
1303 struct GNUNET_MessageHeader *msg = ohh->msg;
1304 uint16_t ssize;
1305
1306 if (NULL == buf)
1307 {
1308 LOG (GNUNET_ERROR_TYPE_DEBUG,
1309 "Timeout while trying to transmit `%s' request.\n",
1310 "HELLO");
1311 if (NULL != ohh->cont)
1312 ohh->cont (ohh->cls);
1313 GNUNET_free (msg);
1314 GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1315 ohh->th->oh_tail,
1316 ohh);
1317 GNUNET_free (ohh);
1318 return 0;
1319 }
1320 LOG (GNUNET_ERROR_TYPE_DEBUG,
1321 "Transmitting `%s' request.\n",
1322 "HELLO");
1323 ssize = ntohs (msg->size);
1324 GNUNET_assert (size >= ssize);
1325 memcpy (buf,
1326 msg,
1327 ssize);
1328 GNUNET_free (msg);
1329 if (NULL != ohh->cont)
1330 ohh->cont (ohh->cls);
1331 GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1332 ohh->th->oh_tail,
1333 ohh);
1334 GNUNET_free (ohh);
1335 return ssize;
1336}
1337
1338
1339/**
1340 * Send traffic metric message to the service.
1341 *
1342 * @param cls the message to send
1343 * @param size number of bytes available in @a buf
1344 * @param buf where to copy the message
1345 * @return number of bytes copied to @a buf
1346 */
1347static size_t
1348send_metric (void *cls,
1349 size_t size,
1350 void *buf)
1351{
1352 struct TrafficMetricMessage *msg = cls;
1353 uint16_t ssize;
1354
1355 if (NULL == buf)
1356 {
1357 LOG (GNUNET_ERROR_TYPE_DEBUG,
1358 "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
1359 GNUNET_free (msg);
1360 return 0;
1361 }
1362 LOG (GNUNET_ERROR_TYPE_DEBUG,
1363 "Transmitting TRAFFIC_METRIC request.\n");
1364 ssize = ntohs (msg->header.size);
1365 GNUNET_assert (size >= ssize);
1366 memcpy (buf, msg, ssize);
1367 GNUNET_free (msg);
1368 return ssize;
1369}
1370
1371
1372/**
1373 * Set transport metrics for a peer and a direction
1374 * 1048 *
1375 * @param handle transport handle 1049 * @param handle transport handle
1376 * @param peer the peer to set the metric for 1050 * @param peer the peer to set the metric for
@@ -1388,101 +1062,21 @@ GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1388 struct GNUNET_TIME_Relative delay_in, 1062 struct GNUNET_TIME_Relative delay_in,
1389 struct GNUNET_TIME_Relative delay_out) 1063 struct GNUNET_TIME_Relative delay_out)
1390{ 1064{
1065 struct GNUNET_MQ_Envelope *env;
1391 struct TrafficMetricMessage *msg; 1066 struct TrafficMetricMessage *msg;
1392 1067
1393 msg = GNUNET_new (struct TrafficMetricMessage); 1068 if (NULL == handle->mq)
1394 msg->header.size = htons (sizeof (struct TrafficMetricMessage)); 1069 return;
1395 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC); 1070 env = GNUNET_MQ_msg (msg,
1071 GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1396 msg->reserved = htonl (0); 1072 msg->reserved = htonl (0);
1397 msg->peer = *peer; 1073 msg->peer = *peer;
1398 GNUNET_ATS_properties_hton (&msg->properties, 1074 GNUNET_ATS_properties_hton (&msg->properties,
1399 prop); 1075 prop);
1400 msg->delay_in = GNUNET_TIME_relative_hton (delay_in); 1076 msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1401 msg->delay_out = GNUNET_TIME_relative_hton (delay_out); 1077 msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1402 schedule_control_transmit (handle, 1078 GNUNET_MQ_send (handle->mq,
1403 sizeof (struct TrafficMetricMessage), 1079 env);
1404 &send_metric,
1405 msg);
1406}
1407
1408
1409/**
1410 * Offer the transport service the HELLO of another peer. Note that
1411 * the transport service may just ignore this message if the HELLO is
1412 * malformed or useless due to our local configuration.
1413 *
1414 * @param handle connection to transport service
1415 * @param hello the hello message
1416 * @param cont continuation to call when HELLO has been sent,
1417 * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
1418 * tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
1419 * @param cont_cls closure for @a cont
1420 * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
1421 * in case of failure @a cont will not be called
1422 *
1423 */
1424struct GNUNET_TRANSPORT_OfferHelloHandle *
1425GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1426 const struct GNUNET_MessageHeader *hello,
1427 GNUNET_SCHEDULER_TaskCallback cont,
1428 void *cont_cls)
1429{
1430 struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
1431 struct GNUNET_MessageHeader *msg;
1432 struct GNUNET_PeerIdentity peer;
1433 uint16_t size;
1434
1435 if (NULL == handle->client)
1436 return NULL;
1437 GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1438 size = ntohs (hello->size);
1439 GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1440 if (GNUNET_OK !=
1441 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
1442 &peer))
1443 {
1444 GNUNET_break (0);
1445 return NULL;
1446 }
1447
1448 msg = GNUNET_malloc (size);
1449 memcpy (msg, hello, size);
1450 LOG (GNUNET_ERROR_TYPE_DEBUG,
1451 "Offering HELLO message of `%s' to transport for validation.\n",
1452 GNUNET_i2s (&peer));
1453
1454 ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
1455 ohh->th = handle;
1456 ohh->cont = cont;
1457 ohh->cls = cont_cls;
1458 ohh->msg = msg;
1459 ohh->tth = schedule_control_transmit (handle,
1460 size,
1461 &send_hello,
1462 ohh);
1463 GNUNET_CONTAINER_DLL_insert (handle->oh_head,
1464 handle->oh_tail,
1465 ohh);
1466 return ohh;
1467}
1468
1469
1470/**
1471 * Cancel the request to transport to offer the HELLO message
1472 *
1473 * @param ohh the handle for the operation to cancel
1474 */
1475void
1476GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
1477{
1478 struct GNUNET_TRANSPORT_Handle *th = ohh->th;
1479
1480 cancel_control_transmit (ohh->th, ohh->tth);
1481 GNUNET_CONTAINER_DLL_remove (th->oh_head,
1482 th->oh_tail,
1483 ohh);
1484 GNUNET_free (ohh->msg);
1485 GNUNET_free (ohh);
1486} 1080}
1487 1081
1488 1082
@@ -1506,76 +1100,6 @@ GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1506 1100
1507 1101
1508/** 1102/**
1509 * Task to call the HelloUpdateCallback of the GetHelloHandle
1510 *
1511 * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
1512 */
1513static void
1514call_hello_update_cb_async (void *cls)
1515{
1516 struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
1517
1518 GNUNET_assert (NULL != ghh->handle->my_hello);
1519 GNUNET_assert (NULL != ghh->notify_task);
1520 ghh->notify_task = NULL;
1521 ghh->rec (ghh->rec_cls,
1522 ghh->handle->my_hello);
1523}
1524
1525
1526/**
1527 * Obtain the HELLO message for this peer. The callback given in this function
1528 * is never called synchronously.
1529 *
1530 * @param handle connection to transport service
1531 * @param rec function to call with the HELLO, sender will be our peer
1532 * identity; message and sender will be NULL on timeout
1533 * (handshake with transport service pending/failed).
1534 * cost estimate will be 0.
1535 * @param rec_cls closure for @a rec
1536 * @return handle to cancel the operation
1537 */
1538struct GNUNET_TRANSPORT_GetHelloHandle *
1539GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1540 GNUNET_TRANSPORT_HelloUpdateCallback rec,
1541 void *rec_cls)
1542{
1543 struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
1544
1545 hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
1546 hwl->rec = rec;
1547 hwl->rec_cls = rec_cls;
1548 hwl->handle = handle;
1549 GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
1550 handle->hwl_tail,
1551 hwl);
1552 if (NULL != handle->my_hello)
1553 hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
1554 hwl);
1555 return hwl;
1556}
1557
1558
1559/**
1560 * Stop receiving updates about changes to our HELLO message.
1561 *
1562 * @param ghh handle to cancel
1563 */
1564void
1565GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
1566{
1567 struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
1568
1569 if (NULL != ghh->notify_task)
1570 GNUNET_SCHEDULER_cancel (ghh->notify_task);
1571 GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
1572 handle->hwl_tail,
1573 ghh);
1574 GNUNET_free (ghh);
1575}
1576
1577
1578/**
1579 * Connect to the transport service. Note that the connection may 1103 * Connect to the transport service. Note that the connection may
1580 * complete (or fail) asynchronously. 1104 * complete (or fail) asynchronously.
1581 * 1105 *
@@ -1629,40 +1153,35 @@ GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1629 GNUNET_TRANSPORT_NotifyDisconnect nd, 1153 GNUNET_TRANSPORT_NotifyDisconnect nd,
1630 GNUNET_TRANSPORT_NotifyExcessBandwidth neb) 1154 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1631{ 1155{
1632 struct GNUNET_TRANSPORT_Handle *ret; 1156 struct GNUNET_TRANSPORT_Handle *h;
1633 1157
1634 ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle); 1158 h = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1635 if (NULL != self) 1159 if (NULL != self)
1636 { 1160 {
1637 ret->self = *self; 1161 h->self = *self;
1638 ret->check_self = GNUNET_YES; 1162 h->check_self = GNUNET_YES;
1639 } 1163 }
1640 ret->cfg = cfg; 1164 h->cfg = cfg;
1641 ret->cls = cls; 1165 h->cls = cls;
1642 ret->rec = rec; 1166 h->rec = rec;
1643 ret->nc_cb = nc; 1167 h->nc_cb = nc;
1644 ret->nd_cb = nd; 1168 h->nd_cb = nd;
1645 ret->neb_cb = neb; 1169 h->neb_cb = neb;
1646 ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1170 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1647 LOG (GNUNET_ERROR_TYPE_DEBUG, 1171 LOG (GNUNET_ERROR_TYPE_DEBUG,
1648 "Connecting to transport service.\n"); 1172 "Connecting to transport service.\n");
1649 ret->client = GNUNET_CLIENT_connect ("transport", 1173 reconnect (h);
1650 cfg); 1174 if (NULL == h->mq)
1651 if (NULL == ret->client)
1652 { 1175 {
1653 GNUNET_free (ret); 1176 GNUNET_free (h);
1654 return NULL; 1177 return NULL;
1655 } 1178 }
1656 ret->neighbours = 1179 h->neighbours =
1657 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, 1180 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1658 GNUNET_YES); 1181 GNUNET_YES);
1659 ret->ready_heap = 1182 h->ready_heap =
1660 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1183 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1661 schedule_control_transmit (ret, 1184 return h;
1662 sizeof (struct StartMessage),
1663 &send_start,
1664 ret);
1665 return ret;
1666} 1185}
1667 1186
1668 1187
@@ -1694,8 +1213,6 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1694 } 1213 }
1695 GNUNET_free_non_null (handle->my_hello); 1214 GNUNET_free_non_null (handle->my_hello);
1696 handle->my_hello = NULL; 1215 handle->my_hello = NULL;
1697 GNUNET_assert (NULL == handle->hwl_head);
1698 GNUNET_assert (NULL == handle->hwl_tail);
1699 GNUNET_CONTAINER_heap_destroy (handle->ready_heap); 1216 GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1700 handle->ready_heap = NULL; 1217 handle->ready_heap = NULL;
1701 GNUNET_free (handle); 1218 GNUNET_free (handle);
diff --git a/src/transport/transport_api_get_hello.c b/src/transport/transport_api_get_hello.c
index 8087159c6..9a65616a9 100644
--- a/src/transport/transport_api_get_hello.c
+++ b/src/transport/transport_api_get_hello.c
@@ -34,25 +34,20 @@
34 34
35 35
36/** 36/**
37 * Linked list of functions to call whenever our HELLO is updated. 37 * Functions to call with this peer's HELLO.
38 */ 38 */
39struct GNUNET_TRANSPORT_GetHelloHandle 39struct GNUNET_TRANSPORT_GetHelloHandle
40{ 40{
41 41
42 /** 42 /**
43 * This is a doubly linked list. 43 * Our configuration.
44 */ 44 */
45 struct GNUNET_TRANSPORT_GetHelloHandle *next; 45 const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47 /**
48 * This is a doubly linked list.
49 */
50 struct GNUNET_TRANSPORT_GetHelloHandle *prev;
51 46
52 /** 47 /**
53 * Transport handle. 48 * Transport handle.
54 */ 49 */
55 struct GNUNET_TRANSPORT_Handle *handle; 50 struct GNUNET_MQ_Handle *mq;
56 51
57 /** 52 /**
58 * Callback to call once we got our HELLO. 53 * Callback to call once we got our HELLO.
@@ -60,34 +55,158 @@ struct GNUNET_TRANSPORT_GetHelloHandle
60 GNUNET_TRANSPORT_HelloUpdateCallback rec; 55 GNUNET_TRANSPORT_HelloUpdateCallback rec;
61 56
62 /** 57 /**
58 * Closure for @e rec.
59 */
60 void *rec_cls;
61
62 /**
63 * Task for calling the HelloUpdateCallback when we already have a HELLO 63 * Task for calling the HelloUpdateCallback when we already have a HELLO
64 */ 64 */
65 struct GNUNET_SCHEDULER_Task *notify_task; 65 struct GNUNET_SCHEDULER_Task *notify_task;
66 66
67 /** 67 /**
68 * Closure for @e rec. 68 * ID of the task trying to reconnect to the service.
69 */ 69 */
70 void *rec_cls; 70 struct GNUNET_SCHEDULER_Task *reconnect_task;
71
72 /**
73 * Delay until we try to reconnect.
74 */
75 struct GNUNET_TIME_Relative reconnect_delay;
71 76
72}; 77};
73 78
74 79
80/**
81 * Function we use for checking incoming HELLO messages.
82 *
83 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
84 * @param msg message received
85 * @return #GNUNET_OK if message is well-formed
86 */
87static int
88check_hello (void *cls,
89 const struct GNUNET_MessageHeader *msg)
90{
91 struct GNUNET_PeerIdentity me;
92
93 if (GNUNET_OK !=
94 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
95 &me))
96 {
97 GNUNET_break (0);
98 return GNUNET_SYSERR;
99 }
100 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
101 "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
102 (unsigned int) ntohs (msg->size),
103 GNUNET_i2s (&me));
104 return GNUNET_OK;
105}
106
75 107
76/** 108/**
77 * Task to call the HelloUpdateCallback of the GetHelloHandle 109 * Function we use for handling incoming HELLO messages.
78 * 110 *
79 * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle` 111 * @param cls closure, a `struct GNUNET_TRANSPORT_GetHelloHandle *`
112 * @param msg message received
80 */ 113 */
81static void 114static void
82call_hello_update_cb_async (void *cls) 115handle_hello (void *cls,
116 const struct GNUNET_MessageHeader *msg)
83{ 117{
84 struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; 118 struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
85 119
86 GNUNET_assert (NULL != ghh->handle->my_hello);
87 GNUNET_assert (NULL != ghh->notify_task);
88 ghh->notify_task = NULL;
89 ghh->rec (ghh->rec_cls, 120 ghh->rec (ghh->rec_cls,
90 ghh->handle->my_hello); 121 msg);
122}
123
124
125/**
126 * Function that will schedule the job that will try
127 * to connect us again to the client.
128 *
129 * @param ghh transport service to reconnect
130 */
131static void
132schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh);
133
134
135/**
136 * Generic error handler, called with the appropriate
137 * error code and the same closure specified at the creation of
138 * the message queue.
139 * Not every message queue implementation supports an error handler.
140 *
141 * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
142 * @param error error code
143 */
144static void
145mq_error_handler (void *cls,
146 enum GNUNET_MQ_Error error)
147{
148 struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
149
150 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
151 "Error receiving from transport service, disconnecting temporarily.\n");
152 GNUNET_MQ_destroy (ghh->mq);
153 ghh->mq = NULL;
154 schedule_reconnect (ghh);
155}
156
157
158/**
159 * Try again to connect to transport service.
160 *
161 * @param cls the handle to the transport service
162 */
163static void
164reconnect (void *cls)
165{
166 GNUNET_MQ_hd_var_size (hello,
167 GNUNET_MESSAGE_TYPE_HELLO,
168 struct GNUNET_MessageHeader);
169 struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
170 struct GNUNET_MQ_MessageHandler handlers[] = {
171 make_hello_handler (ghh),
172 GNUNET_MQ_handler_end ()
173 };
174 struct GNUNET_MQ_Envelope *env;
175 struct StartMessage *s;
176
177 ghh->reconnect_task = NULL;
178 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
179 "Connecting to transport service.\n");
180 GNUNET_assert (NULL == ghh->mq);
181 ghh->mq = GNUNET_CLIENT_connecT (ghh->cfg,
182 "transport",
183 handlers,
184 &mq_error_handler,
185 ghh);
186 if (NULL == ghh->mq)
187 return;
188 env = GNUNET_MQ_msg (s,
189 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
190 s->options = htonl (0);
191 GNUNET_MQ_send (ghh->mq,
192 env);
193}
194
195
196/**
197 * Function that will schedule the job that will try
198 * to connect us again to the client.
199 *
200 * @param ghh transport service to reconnect
201 */
202static void
203schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
204{
205 ghh->reconnect_task =
206 GNUNET_SCHEDULER_add_delayed (ghh->reconnect_delay,
207 &reconnect,
208 ghh);
209 ghh->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ghh->reconnect_delay);
91} 210}
92 211
93 212
@@ -95,7 +214,7 @@ call_hello_update_cb_async (void *cls)
95 * Obtain the HELLO message for this peer. The callback given in this function 214 * Obtain the HELLO message for this peer. The callback given in this function
96 * is never called synchronously. 215 * is never called synchronously.
97 * 216 *
98 * @param handle connection to transport service 217 * @param cfg configuration
99 * @param rec function to call with the HELLO, sender will be our peer 218 * @param rec function to call with the HELLO, sender will be our peer
100 * identity; message and sender will be NULL on timeout 219 * identity; message and sender will be NULL on timeout
101 * (handshake with transport service pending/failed). 220 * (handshake with transport service pending/failed).
@@ -104,23 +223,23 @@ call_hello_update_cb_async (void *cls)
104 * @return handle to cancel the operation 223 * @return handle to cancel the operation
105 */ 224 */
106struct GNUNET_TRANSPORT_GetHelloHandle * 225struct GNUNET_TRANSPORT_GetHelloHandle *
107GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, 226GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
108 GNUNET_TRANSPORT_HelloUpdateCallback rec, 227 GNUNET_TRANSPORT_HelloUpdateCallback rec,
109 void *rec_cls) 228 void *rec_cls)
110{ 229{
111 struct GNUNET_TRANSPORT_GetHelloHandle *hwl; 230 struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
112 231
113 hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle); 232 ghh = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
114 hwl->rec = rec; 233 ghh->rec = rec;
115 hwl->rec_cls = rec_cls; 234 ghh->rec_cls = rec_cls;
116 hwl->handle = handle; 235 ghh->cfg = cfg;
117 GNUNET_CONTAINER_DLL_insert (handle->hwl_head, 236 reconnect (ghh);
118 handle->hwl_tail, 237 if (NULL == ghh->mq)
119 hwl); 238 {
120 if (NULL != handle->my_hello) 239 GNUNET_free (ghh);
121 hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async, 240 return NULL;
122 hwl); 241 }
123 return hwl; 242 return ghh;
124} 243}
125 244
126 245
@@ -132,15 +251,13 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
132void 251void
133GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh) 252GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
134{ 253{
135 struct GNUNET_TRANSPORT_Handle *handle = ghh->handle; 254 if (NULL != ghh->mq)
136 255 {
137 if (NULL != ghh->notify_task) 256 GNUNET_MQ_destroy (ghh->mq);
138 GNUNET_SCHEDULER_cancel (ghh->notify_task); 257 ghh->mq = NULL;
139 GNUNET_CONTAINER_DLL_remove (handle->hwl_head, 258 }
140 handle->hwl_tail,
141 ghh);
142 GNUNET_free (ghh); 259 GNUNET_free (ghh);
143} 260}
144 261
145 262
146/* end of transport_api_hello.c */ 263/* end of transport_api_get_hello.c */
diff --git a/src/transport/transport_api_offer_hello.c b/src/transport/transport_api_offer_hello.c
index 0abce2d62..951ab9ba4 100644
--- a/src/transport/transport_api_offer_hello.c
+++ b/src/transport/transport_api_offer_hello.c
@@ -23,31 +23,23 @@
23 * @brief library to offer HELLOs to transport service 23 * @brief library to offer HELLOs to transport service
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 */ 25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_hello_lib.h"
29#include "gnunet_protocols.h"
30#include "gnunet_transport_service.h"
31
26 32
27/** 33/**
28 * Entry in linked list for all offer-HELLO requests. 34 * Entry in linked list for all offer-HELLO requests.
29 */ 35 */
30struct GNUNET_TRANSPORT_OfferHelloHandle 36struct GNUNET_TRANSPORT_OfferHelloHandle
31{ 37{
32 /**
33 * For the DLL.
34 */
35 struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
36
37 /**
38 * For the DLL.
39 */
40 struct GNUNET_TRANSPORT_OfferHelloHandle *next;
41 38
42 /** 39 /**
43 * Transport service handle we use for transmission. 40 * Transport service handle we use for transmission.
44 */ 41 */
45 struct GNUNET_TRANSPORT_Handle *th; 42 struct GNUNET_MQ_Handle *mq;
46
47 /**
48 * Transmission handle for this request.
49 */
50 struct GNUNET_TRANSPORT_TransmitHandle *tth;
51 43
52 /** 44 /**
53 * Function to call once we are done. 45 * Function to call once we are done.
@@ -59,20 +51,31 @@ struct GNUNET_TRANSPORT_OfferHelloHandle
59 */ 51 */
60 void *cls; 52 void *cls;
61 53
62 /**
63 * The HELLO message to be transmitted.
64 */
65 struct GNUNET_MessageHeader *msg;
66}; 54};
67 55
68 56
57/**
58 * Done sending HELLO message to the service, notify application.
59 *
60 * @param cls the handle for the operation
61 */
62static void
63finished_hello (void *cls)
64{
65 struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
66
67 if (NULL != ohh->cont)
68 ohh->cont (ohh->cls);
69 GNUNET_TRANSPORT_offer_hello_cancel (ohh);
70}
71
69 72
70/** 73/**
71 * Offer the transport service the HELLO of another peer. Note that 74 * Offer the transport service the HELLO of another peer. Note that
72 * the transport service may just ignore this message if the HELLO is 75 * the transport service may just ignore this message if the HELLO is
73 * malformed or useless due to our local configuration. 76 * malformed or useless due to our local configuration.
74 * 77 *
75 * @param handle connection to transport service 78 * @param cfg configuration
76 * @param hello the hello message 79 * @param hello the hello message
77 * @param cont continuation to call when HELLO has been sent, 80 * @param cont continuation to call when HELLO has been sent,
78 * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail 81 * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
@@ -83,46 +86,43 @@ struct GNUNET_TRANSPORT_OfferHelloHandle
83 * 86 *
84 */ 87 */
85struct GNUNET_TRANSPORT_OfferHelloHandle * 88struct GNUNET_TRANSPORT_OfferHelloHandle *
86GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, 89GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
87 const struct GNUNET_MessageHeader *hello, 90 const struct GNUNET_MessageHeader *hello,
88 GNUNET_SCHEDULER_TaskCallback cont, 91 GNUNET_SCHEDULER_TaskCallback cont,
89 void *cont_cls) 92 void *cont_cls)
90{ 93{
91 struct GNUNET_TRANSPORT_OfferHelloHandle *ohh; 94 struct GNUNET_TRANSPORT_OfferHelloHandle *ohh
92 struct GNUNET_MessageHeader *msg; 95 = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
96 struct GNUNET_MQ_Envelope *env;
93 struct GNUNET_PeerIdentity peer; 97 struct GNUNET_PeerIdentity peer;
94 uint16_t size;
95 98
96 if (NULL == handle->mq)
97 return NULL;
98 GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
99 size = ntohs (hello->size);
100 GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
101 if (GNUNET_OK != 99 if (GNUNET_OK !=
102 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, 100 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
103 &peer)) 101 &peer))
104 { 102 {
105 GNUNET_break (0); 103 GNUNET_break (0);
104 GNUNET_free (ohh);
105 return NULL;
106 }
107 ohh->mq = GNUNET_CLIENT_connecT (cfg,
108 "transport",
109 NULL,
110 NULL,
111 ohh);
112 if (NULL == ohh->mq)
113 {
114 GNUNET_free (ohh);
106 return NULL; 115 return NULL;
107 } 116 }
108
109 msg = GNUNET_malloc (size);
110 memcpy (msg, hello, size);
111 LOG (GNUNET_ERROR_TYPE_DEBUG,
112 "Offering HELLO message of `%s' to transport for validation.\n",
113 GNUNET_i2s (&peer));
114 ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
115 ohh->th = handle;
116 ohh->cont = cont; 117 ohh->cont = cont;
117 ohh->cls = cont_cls; 118 ohh->cls = cont_cls;
118 ohh->msg = msg; 119 GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
119 ohh->tth = schedule_control_transmit (handle, 120 env = GNUNET_MQ_msg_copy (hello);
120 size, 121 GNUNET_MQ_notify_sent (env,
121 &send_hello, 122 &finished_hello,
122 ohh); 123 ohh);
123 GNUNET_CONTAINER_DLL_insert (handle->oh_head, 124 GNUNET_MQ_send (ohh->mq,
124 handle->oh_tail, 125 env);
125 ohh);
126 return ohh; 126 return ohh;
127} 127}
128 128
@@ -135,13 +135,7 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
135void 135void
136GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh) 136GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
137{ 137{
138 struct GNUNET_TRANSPORT_Handle *th = ohh->th; 138 GNUNET_MQ_destroy (ohh->mq);
139
140 cancel_control_transmit (ohh->th, ohh->tth);
141 GNUNET_CONTAINER_DLL_remove (th->oh_head,
142 th->oh_tail,
143 ohh);
144 GNUNET_free (ohh->msg);
145 GNUNET_free (ohh); 139 GNUNET_free (ohh);
146} 140}
147 141