From 6e3599bab213760c66f13f6103ebf650bbe5b7e9 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 8 Jul 2016 16:34:31 +0000 Subject: migrate transport_core API to MQ --- src/transport/Makefile.am | 6 +- src/transport/transport-testing.c | 6 +- src/transport/transport_api.c | 1245 +++++++++-------------------- src/transport/transport_api_get_hello.c | 199 ++++- src/transport/transport_api_offer_hello.c | 98 ++- 5 files changed, 592 insertions(+), 962 deletions(-) (limited to 'src/transport') diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am index 3a1170c10..48793bd87 100644 --- a/src/transport/Makefile.am +++ b/src/transport/Makefile.am @@ -163,10 +163,12 @@ libgnunettransporttesting_la_LDFLAGS = \ libgnunettransport_la_SOURCES = \ transport_api.c transport.h \ - transport_api_blacklist.c \ transport_api_address_to_string.c \ + transport_api_blacklist.c \ + transport_api_get_hello.c \ transport_api_monitor_peers.c \ - transport_api_monitor_plugins.c + transport_api_monitor_plugins.c \ + transport_api_offer_hello.c libgnunettransport_la_LIBADD = \ $(top_builddir)/src/hello/libgnunethello.la \ diff --git a/src/transport/transport-testing.c b/src/transport/transport-testing.c index 4a514ea72..4a3bf3c3e 100644 --- a/src/transport/transport-testing.c +++ b/src/transport/transport-testing.c @@ -246,7 +246,7 @@ offer_hello (void *cls) if (NULL != cc->oh) GNUNET_TRANSPORT_offer_hello_cancel (cc->oh); cc->oh = - GNUNET_TRANSPORT_offer_hello (cc->p1->th, + GNUNET_TRANSPORT_offer_hello (cc->p1->cfg, (const struct GNUNET_MessageHeader *) cc->p2->hello, &hello_offered, cc); @@ -380,7 +380,7 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_handle *tth GNUNET_TRANSPORT_TESTING_stop_peer (tth, p); return NULL; } - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &get_hello, p); GNUNET_assert (p->ghh != NULL); @@ -465,7 +465,7 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct PeerContext *p, ¬ify_disconnect); GNUNET_assert (NULL != p->th); p->ats = GNUNET_ATS_connectivity_init (p->cfg); - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &get_hello, p); GNUNET_assert (NULL != p->ghh); diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c index 59f249686..e7db5493e 100644 --- a/src/transport/transport_api.c +++ b/src/transport/transport_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2013 GNUnet e.V. + Copyright (C) 2009-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -163,86 +163,6 @@ struct Neighbour }; -/** - * Linked list of functions to call whenever our HELLO is updated. - */ -struct GNUNET_TRANSPORT_GetHelloHandle -{ - - /** - * This is a doubly linked list. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *next; - - /** - * This is a doubly linked list. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *prev; - - /** - * Transport handle. - */ - struct GNUNET_TRANSPORT_Handle *handle; - - /** - * Callback to call once we got our HELLO. - */ - GNUNET_TRANSPORT_HelloUpdateCallback rec; - - /** - * Task for calling the HelloUpdateCallback when we already have a HELLO - */ - struct GNUNET_SCHEDULER_Task *notify_task; - - /** - * Closure for @e rec. - */ - void *rec_cls; - -}; - - -/** - * Entry in linked list for all offer-HELLO requests. - */ -struct GNUNET_TRANSPORT_OfferHelloHandle -{ - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *prev; - - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *next; - - /** - * Transport service handle we use for transmission. - */ - struct GNUNET_TRANSPORT_Handle *th; - - /** - * Transmission handle for this request. - */ - struct GNUNET_TRANSPORT_TransmitHandle *tth; - - /** - * Function to call once we are done. - */ - GNUNET_SCHEDULER_TaskCallback cont; - - /** - * Closure for @e cont - */ - void *cls; - - /** - * The HELLO message to be transmitted. - */ - struct GNUNET_MessageHeader *msg; -}; - /** * Handle for the transport service (includes all of the @@ -276,16 +196,6 @@ struct GNUNET_TRANSPORT_Handle */ GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb; - /** - * Head of DLL of control messages. - */ - struct GNUNET_TRANSPORT_TransmitHandle *control_head; - - /** - * Tail of DLL of control messages. - */ - struct GNUNET_TRANSPORT_TransmitHandle *control_tail; - /** * The current HELLO message for this peer. Updated * whenever transports change their addresses. @@ -295,32 +205,7 @@ struct GNUNET_TRANSPORT_Handle /** * My client connection to the transport service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Handle to our registration with the client for notification. - */ - struct GNUNET_CLIENT_TransmitHandle *cth; - - /** - * Linked list of pending requests for our HELLO. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head; - - /** - * Linked list of pending requests for our HELLO. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail; - - /** - * Linked list of pending offer HELLO requests head - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head; - - /** - * Linked list of pending offer HELLO requests tail - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail; + struct GNUNET_MQ_Handle *mq; /** * My configuration. @@ -458,7 +343,8 @@ outbound_bw_tracker_update (void *cls) GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_NO)); GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap, - n->hn, delay.rel_value_us); + n->hn, + delay.rel_value_us); schedule_transmission (n->h); } @@ -558,268 +444,296 @@ neighbour_delete (void *cls, /** - * Function we use for handling incoming messages. + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Error receiving from transport service, disconnecting temporarily.\n"); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); +} + + +/** + * Function we use for checking incoming HELLO messages. * * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` - * @param msg message received, NULL on timeout or fatal error + * @param msg message received + * @return #GNUNET_OK if message is well-formed + */ +static int +check_hello (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PeerIdentity me; + + if (GNUNET_OK != + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, + &me)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n", + (unsigned int) ntohs (msg->size), + GNUNET_i2s (&me)); + return GNUNET_OK; +} + + +/** + * Function we use for handling incoming HELLO messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param msg message received */ static void -demultiplexer (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_hello (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + + GNUNET_free_non_null (h->my_hello); + h->my_hello = GNUNET_copy_message (msg); +} + + +/** + * Function we use for handling incoming connect messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param cim message received + */ +static void +handle_connect (void *cls, + const struct ConnectInfoMessage *cim) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + struct Neighbour *n; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving CONNECT message for `%s'.\n", + GNUNET_i2s (&cim->id)); + n = neighbour_find (h, &cim->id); + if (NULL != n) + { + GNUNET_break (0); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); + return; + } + n = neighbour_add (h, + &cim->id); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving CONNECT message for `%s' with quota %u\n", + GNUNET_i2s (&cim->id), + ntohl (cim->quota_out.value__)); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, + cim->quota_out); + if (NULL != h->nc_cb) + h->nc_cb (h->cls, + &n->id); +} + + +/** + * Function we use for handling incoming disconnect messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param dim message received + */ +static void +handle_disconnect (void *cls, + const struct DisconnectInfoMessage *dim) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + struct Neighbour *n; + + GNUNET_break (ntohl (dim->reserved) == 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving DISCONNECT message for `%s'.\n", + GNUNET_i2s (&dim->peer)); + n = neighbour_find (h, &dim->peer); + if (NULL == n) + { + GNUNET_break (0); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); + return; + } + neighbour_delete (h, + &dim->peer, + n); +} + + +/** + * Function we use for handling incoming send-ok messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param okm message received + */ +static void +handle_send_ok (void *cls, + const struct SendOkMessage *okm) { struct GNUNET_TRANSPORT_Handle *h = cls; - const struct DisconnectInfoMessage *dim; - const struct ConnectInfoMessage *cim; - const struct InboundMessage *im; - const struct GNUNET_MessageHeader *imm; - const struct SendOkMessage *okm; - const struct QuotaSetMessage *qm; - struct GNUNET_TRANSPORT_GetHelloHandle *hwl; - struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl; struct Neighbour *n; - struct GNUNET_PeerIdentity me; - uint16_t size; uint32_t bytes_msg; uint32_t bytes_physical; - GNUNET_assert (NULL != h->client); - if (GNUNET_YES == h->reconnecting) + bytes_msg = ntohl (okm->bytes_msg); + bytes_physical = ntohl (okm->bytes_physical); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving SEND_OK message, transmission to %s %s.\n", + GNUNET_i2s (&okm->peer), + ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); + + n = neighbour_find (h, + &okm->peer); + if (NULL == n) { + /* We should never get a 'SEND_OK' for a peer that we are not + connected to */ + GNUNET_break (0); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); return; } - if (NULL == msg) + if (bytes_physical > bytes_msg) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Error receiving from transport service, disconnecting temporarily.\n"); + "Overhead for %u byte message was %u\n", + bytes_msg, + bytes_physical - bytes_msg); + n->traffic_overhead += bytes_physical - bytes_msg; + } + GNUNET_break (GNUNET_NO == n->is_ready); + n->is_ready = GNUNET_YES; + if (NULL != n->unready_warn_task) + { + GNUNET_SCHEDULER_cancel (n->unready_warn_task); + n->unready_warn_task = NULL; + } + if ((NULL != n->th) && (NULL == n->hn)) + { + GNUNET_assert (NULL != n->th->timeout_task); + GNUNET_SCHEDULER_cancel (n->th->timeout_task); + n->th->timeout_task = NULL; + /* we've been waiting for this (congestion, not quota, + * caused delayed transmission) */ + n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, + n, + 0); + } + schedule_transmission (h); +} + + +/** + * Function we use for checking incoming "inbound" messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param im message received + */ +static int +check_recv (void *cls, + const struct InboundMessage *im) +{ + const struct GNUNET_MessageHeader *imm; + uint16_t size; + + size = ntohs (im->header.size); + if (size < + sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + imm = (const struct GNUNET_MessageHeader *) &im[1]; + if (ntohs (imm->size) + sizeof (struct InboundMessage) != size) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Function we use for handling incoming messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param im message received + */ +static void +handle_recv (void *cls, + const struct InboundMessage *im) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + const struct GNUNET_MessageHeader *imm + = (const struct GNUNET_MessageHeader *) &im[1]; + struct Neighbour *n; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u with %u bytes from `%s'.\n", + (unsigned int) ntohs (imm->type), + (unsigned int) ntohs (imm->size), + GNUNET_i2s (&im->peer)); + n = neighbour_find (h, &im->peer); + if (NULL == n) + { + GNUNET_break (0); h->reconnecting = GNUNET_YES; disconnect_and_schedule_reconnect (h); return; } - GNUNET_CLIENT_receive (h->client, - &demultiplexer, - h, - GNUNET_TIME_UNIT_FOREVER_REL); - size = ntohs (msg->size); - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: - if (GNUNET_OK != - GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, - &me)) - { - GNUNET_break (0); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n", - (unsigned int) size, - GNUNET_i2s (&me)); - GNUNET_free_non_null (h->my_hello); - h->my_hello = NULL; - if (size < sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break (0); - break; - } - h->my_hello = GNUNET_copy_message (msg); - hwl = h->hwl_head; - while (NULL != hwl) - { - next_hwl = hwl->next; - hwl->rec (hwl->rec_cls, - h->my_hello); - hwl = next_hwl; - } - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT: - if (size < sizeof (struct ConnectInfoMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - cim = (const struct ConnectInfoMessage *) msg; - if (size != - sizeof (struct ConnectInfoMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving CONNECT message for `%s'.\n", - GNUNET_i2s (&cim->id)); - n = neighbour_find (h, &cim->id); - if (NULL != n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - n = neighbour_add (h, - &cim->id); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving CONNECT message for `%s' with quota %u\n", - GNUNET_i2s (&cim->id), - ntohl (cim->quota_out.value__)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - cim->quota_out); - if (NULL != h->nc_cb) - h->nc_cb (h->cls, - &n->id); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT: - if (size != sizeof (struct DisconnectInfoMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - dim = (const struct DisconnectInfoMessage *) msg; - GNUNET_break (ntohl (dim->reserved) == 0); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving DISCONNECT message for `%s'.\n", - GNUNET_i2s (&dim->peer)); - n = neighbour_find (h, &dim->peer); - if (NULL == n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - neighbour_delete (h, - &dim->peer, - n); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK: - if (size != sizeof (struct SendOkMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - okm = (const struct SendOkMessage *) msg; - bytes_msg = ntohl (okm->bytes_msg); - bytes_physical = ntohl (okm->bytes_physical); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SEND_OK message, transmission to %s %s.\n", - GNUNET_i2s (&okm->peer), - ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); + if (NULL != h->rec) + h->rec (h->cls, + &im->peer, + imm); +} - n = neighbour_find (h, - &okm->peer); - if (NULL == n) - { - /* We should never get a 'SEND_OK' for a peer that we are not - connected to */ - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - if (bytes_physical > bytes_msg) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Overhead for %u byte message was %u\n", - bytes_msg, - bytes_physical - bytes_msg); - n->traffic_overhead += bytes_physical - bytes_msg; - } - GNUNET_break (GNUNET_NO == n->is_ready); - n->is_ready = GNUNET_YES; - if (NULL != n->unready_warn_task) - { - GNUNET_SCHEDULER_cancel (n->unready_warn_task); - n->unready_warn_task = NULL; - } - if ((NULL != n->th) && (NULL == n->hn)) - { - GNUNET_assert (NULL != n->th->timeout_task); - GNUNET_SCHEDULER_cancel (n->th->timeout_task); - n->th->timeout_task = NULL; - /* we've been waiting for this (congestion, not quota, - * caused delayed transmission) */ - n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, - n, - 0); - } - schedule_transmission (h); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV: - if (size < - sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - im = (const struct InboundMessage *) msg; - imm = (const struct GNUNET_MessageHeader *) &im[1]; - if (ntohs (imm->size) + sizeof (struct InboundMessage) != size) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u with %u bytes from `%s'.\n", - (unsigned int) ntohs (imm->type), - (unsigned int) ntohs (imm->size), - GNUNET_i2s (&im->peer)); - n = neighbour_find (h, &im->peer); - if (NULL == n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - if (NULL != h->rec) - h->rec (h->cls, - &im->peer, - imm); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA: - if (size != sizeof (struct QuotaSetMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - qm = (const struct QuotaSetMessage *) msg; - n = neighbour_find (h, &qm->peer); - if (NULL == n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SET_QUOTA message for `%s' with quota %u\n", - GNUNET_i2s (&qm->peer), - ntohl (qm->quota.value__)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - qm->quota); - break; - default: - LOG (GNUNET_ERROR_TYPE_ERROR, - _("Received unexpected message of type %u in %s:%u\n"), - ntohs (msg->type), - __FILE__, - __LINE__); + +/** + * Function we use for handling incoming set quota messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param msg message received + */ +static void +handle_set_quota (void *cls, + const struct QuotaSetMessage *qm) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + struct Neighbour *n; + + n = neighbour_find (h, &qm->peer); + if (NULL == n) + { GNUNET_break (0); - break; + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); + return; } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving SET_QUOTA message for `%s' with quota %u\n", + GNUNET_i2s (&qm->peer), + ntohl (qm->quota.value__)); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, + qm->quota); } @@ -854,104 +768,53 @@ timeout_request_due_to_congestion (void *cls) /** - * Transmit message(s) to service. + * Transmit ready message(s) to service. * - * @param cls handle to transport - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf + * @param h handle to transport */ -static size_t -transport_notify_ready (void *cls, - size_t size, - void *buf) +static void +transmit_ready (struct GNUNET_TRANSPORT_Handle *h) { - struct GNUNET_TRANSPORT_Handle *h = cls; struct GNUNET_TRANSPORT_TransmitHandle *th; struct GNUNET_TIME_Relative delay; struct Neighbour *n; - char *cbuf; - struct OutboundMessage obm; - size_t ret; - size_t nret; + struct OutboundMessage *obm; + struct GNUNET_MQ_Envelope *env; size_t mret; - GNUNET_assert (NULL != h->client); - h->cth = NULL; - if (NULL == buf) - { - /* transmission failed */ - disconnect_and_schedule_reconnect (h); - return 0; - } - - cbuf = buf; - ret = 0; - /* first send control messages */ - while ( (NULL != (th = h->control_head)) && - (th->notify_size <= size) ) - { - GNUNET_CONTAINER_DLL_remove (h->control_head, - h->control_tail, - th); - nret = th->notify (th->notify_cls, - size, - &cbuf[ret]); - delay = GNUNET_TIME_absolute_get_duration (th->request_start); - if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) - LOG (GNUNET_ERROR_TYPE_WARNING, - "Added %u bytes of control message at %u after %s delay\n", - nret, - ret, - GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES)); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Added %u bytes of control message at %u after %s delay\n", - nret, - ret, - GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES)); - GNUNET_free (th); - ret += nret; - size -= nret; - } - - /* then, if possible and no control messages pending, send data messages */ - while ( (NULL == h->control_head) && - (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) ) + GNUNET_assert (NULL != h->mq); + while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) { + th = n->th; if (GNUNET_YES != n->is_ready) { /* peer not ready, wait for notification! */ GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); n->hn = NULL; GNUNET_assert (NULL == n->th->timeout_task); - n->th->timeout_task + th->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (n->th->timeout), + (th->timeout), &timeout_request_due_to_congestion, - n->th); + th); continue; } - th = n->th; - if (th->notify_size + sizeof (struct OutboundMessage) > size) - break; /* does not fit */ - if (GNUNET_BANDWIDTH_tracker_get_delay - (&n->out_tracker, - th->notify_size).rel_value_us > 0) + if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, + th->notify_size).rel_value_us > 0) break; /* too early */ GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); n->hn = NULL; n->th = NULL; - GNUNET_assert (size >= sizeof (struct OutboundMessage)); + env = GNUNET_MQ_msg_extra (obm, + th->notify_size, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); mret = th->notify (th->notify_cls, - size - sizeof (struct OutboundMessage), - &cbuf[ret + sizeof (struct OutboundMessage)]); - GNUNET_assert (mret <= size - sizeof (struct OutboundMessage)); + th->notify_size, + &obm[1]); if (0 == mret) { GNUNET_free (th); + GNUNET_MQ_discard (env); continue; } if (NULL != n->unready_warn_task) @@ -961,20 +824,13 @@ transport_notify_ready (void *cls, n); n->last_payload = GNUNET_TIME_absolute_get (); n->is_ready = GNUNET_NO; - GNUNET_assert (mret + sizeof (struct OutboundMessage) < - GNUNET_SERVER_MAX_MESSAGE_SIZE); - obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); - obm.header.size = htons (mret + sizeof (struct OutboundMessage)); - obm.reserved = htonl (0); - obm.timeout = + obm->reserved = htonl (0); + obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout)); - obm.peer = n->id; - memcpy (&cbuf[ret], - &obm, - sizeof (struct OutboundMessage)); - ret += (mret + sizeof (struct OutboundMessage)); - size -= (mret + sizeof (struct OutboundMessage)); + obm->peer = n->id; + GNUNET_MQ_send (h->mq, + env); GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret); delay = GNUNET_TIME_absolute_get_duration (th->request_start); @@ -995,14 +851,9 @@ transport_notify_ready (void *cls, GNUNET_YES), (unsigned int) n->out_tracker.available_bytes_per_s__); GNUNET_free (th); - break; } /* if there are more pending messages, try to schedule those */ schedule_transmission (h); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes to transport service\n", - ret); - return ret; } @@ -1016,12 +867,11 @@ static void schedule_transmission_task (void *cls) { struct GNUNET_TRANSPORT_Handle *h = cls; - size_t size; struct GNUNET_TRANSPORT_TransmitHandle *th; struct Neighbour *n; h->quota_task = NULL; - GNUNET_assert (NULL != h->client); + GNUNET_assert (NULL != h->mq); /* destroy all requests that have timed out */ while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) && (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) ) @@ -1040,29 +890,12 @@ schedule_transmission_task (void *cls) NULL)); GNUNET_free (th); } - if (NULL != h->cth) - return; - if (NULL != h->control_head) - { - size = h->control_head->notify_size; - } - else - { - n = GNUNET_CONTAINER_heap_peek (h->ready_heap); - if (NULL == n) - return; /* no pending messages */ - size = n->th->notify_size + sizeof (struct OutboundMessage); - } + n = GNUNET_CONTAINER_heap_peek (h->ready_heap); + if (NULL == n) + return; /* no pending messages */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling notify_transmit_ready\n"); - h->cth - = GNUNET_CLIENT_notify_transmit_ready (h->client, - size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &transport_notify_ready, - h); - GNUNET_assert (NULL != h->cth); + transmit_ready (h); } @@ -1078,15 +911,13 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) struct GNUNET_TIME_Relative delay; struct Neighbour *n; - GNUNET_assert (NULL != h->client); + GNUNET_assert (NULL != h->mq); if (NULL != h->quota_task) { GNUNET_SCHEDULER_cancel (h->quota_task); h->quota_task = NULL; } - if (NULL != h->control_head) - delay = GNUNET_TIME_UNIT_ZERO; - else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) + if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) { delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, @@ -1110,83 +941,6 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) } -/** - * Queue control request for transmission to the transport - * service. - * - * @param h handle to the transport service - * @param size number of bytes to be transmitted - * @param notify function to call to get the content - * @param notify_cls closure for @a notify - * @return a `struct GNUNET_TRANSPORT_TransmitHandle` - */ -static struct GNUNET_TRANSPORT_TransmitHandle * -schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h, - size_t size, - GNUNET_TRANSPORT_TransmitReadyNotify notify, - void *notify_cls) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Control transmit of %u bytes requested\n", - size); - th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle); - th->notify = notify; - th->notify_cls = notify_cls; - th->notify_size = size; - th->request_start = GNUNET_TIME_absolute_get (); - GNUNET_CONTAINER_DLL_insert_tail (h->control_head, - h->control_tail, - th); - schedule_transmission (h); - return th; -} - - -/** - * Transmit START message to service. - * - * @param cls unused - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf - */ -static size_t -send_start (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_TRANSPORT_Handle *h = cls; - struct StartMessage s; - uint32_t options; - - if (NULL == buf) - { - /* Can only be shutdown, just give up */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Shutdown while trying to transmit START request.\n"); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting START request.\n"); - GNUNET_assert (size >= sizeof (struct StartMessage)); - s.header.size = htons (sizeof (struct StartMessage)); - s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START); - options = 0; - if (h->check_self) - options |= 1; - if (NULL != h->rec) - options |= 2; - s.options = htonl (options); - s.self = h->self; - memcpy (buf, &s, sizeof (struct StartMessage)); - GNUNET_CLIENT_receive (h->client, &demultiplexer, h, - GNUNET_TIME_UNIT_FOREVER_REL); - return sizeof (struct StartMessage); -} - - /** * Try again to connect to transport service. * @@ -1195,20 +949,61 @@ send_start (void *cls, static void reconnect (void *cls) { + GNUNET_MQ_hd_var_size (hello, + GNUNET_MESSAGE_TYPE_HELLO, + struct GNUNET_MessageHeader); + GNUNET_MQ_hd_fixed_size (connect, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, + struct ConnectInfoMessage); + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, + struct DisconnectInfoMessage); + GNUNET_MQ_hd_fixed_size (send_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, + struct SendOkMessage); + GNUNET_MQ_hd_var_size (recv, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, + struct InboundMessage); + GNUNET_MQ_hd_fixed_size (set_quota, + GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, + struct QuotaSetMessage); struct GNUNET_TRANSPORT_Handle *h = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_hello_handler (h), + make_connect_handler (h), + make_disconnect_handler (h), + make_send_ok_handler (h), + make_recv_handler (h), + make_set_quota_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct StartMessage *s; + uint32_t options; h->reconnect_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); - GNUNET_assert (NULL == h->client); - GNUNET_assert (NULL == h->control_head); - GNUNET_assert (NULL == h->control_tail); + GNUNET_assert (NULL == h->mq); h->reconnecting = GNUNET_NO; - h->client = GNUNET_CLIENT_connect ("transport", h->cfg); - - GNUNET_assert (NULL != h->client); - schedule_control_transmit (h, sizeof (struct StartMessage), - &send_start, h); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "transport", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) + return; + env = GNUNET_MQ_msg (s, + GNUNET_MESSAGE_TYPE_TRANSPORT_START); + options = 0; + if (h->check_self) + options |= 1; + if (NULL != h->rec) + options |= 2; + s->options = htonl (options); + s->self = h->self; + GNUNET_MQ_send (h->mq, + env); } @@ -1221,20 +1016,11 @@ reconnect (void *cls) static void disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) { - struct GNUNET_TRANSPORT_TransmitHandle *th; - GNUNET_assert (NULL == h->reconnect_task); - if (NULL != h->cth) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; - } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; -/* LOG (GNUNET_ERROR_TYPE_ERROR, - "Client disconnect done \n");*/ + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } /* Forget about all neighbours that we used to be connected to */ GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, @@ -1245,16 +1031,6 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) GNUNET_SCHEDULER_cancel (h->quota_task); h->quota_task = NULL; } - while ((NULL != (th = h->control_head))) - { - GNUNET_CONTAINER_DLL_remove (h->control_head, - h->control_tail, - th); - th->notify (th->notify_cls, - 0, - NULL); - GNUNET_free (th); - } LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling task to reconnect to transport service in %s.\n", GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, @@ -1268,109 +1044,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) /** - * Cancel control request for transmission to the transport service. - * - * @param th handle to the transport service - * @param tth transmit handle to cancel - */ -static void -cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th, - struct GNUNET_TRANSPORT_TransmitHandle *tth) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Canceling transmit of contral transmission requested\n"); - GNUNET_CONTAINER_DLL_remove (th->control_head, - th->control_tail, - tth); - GNUNET_free (tth); -} - - -/** - * Send HELLO message to the service. - * - * @param cls the HELLO message to send - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf - */ -static size_t -send_hello (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls; - struct GNUNET_MessageHeader *msg = ohh->msg; - uint16_t ssize; - - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout while trying to transmit `%s' request.\n", - "HELLO"); - if (NULL != ohh->cont) - ohh->cont (ohh->cls); - GNUNET_free (msg); - GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head, - ohh->th->oh_tail, - ohh); - GNUNET_free (ohh); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' request.\n", - "HELLO"); - ssize = ntohs (msg->size); - GNUNET_assert (size >= ssize); - memcpy (buf, - msg, - ssize); - GNUNET_free (msg); - if (NULL != ohh->cont) - ohh->cont (ohh->cls); - GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head, - ohh->th->oh_tail, - ohh); - GNUNET_free (ohh); - return ssize; -} - - -/** - * Send traffic metric message to the service. - * - * @param cls the message to send - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf - */ -static size_t -send_metric (void *cls, - size_t size, - void *buf) -{ - struct TrafficMetricMessage *msg = cls; - uint16_t ssize; - - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout while trying to transmit TRAFFIC_METRIC request.\n"); - GNUNET_free (msg); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting TRAFFIC_METRIC request.\n"); - ssize = ntohs (msg->header.size); - GNUNET_assert (size >= ssize); - memcpy (buf, msg, ssize); - GNUNET_free (msg); - return ssize; -} - - -/** - * Set transport metrics for a peer and a direction + * Set transport metrics for a peer and a direction. * * @param handle transport handle * @param peer the peer to set the metric for @@ -1388,101 +1062,21 @@ GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle, struct GNUNET_TIME_Relative delay_in, struct GNUNET_TIME_Relative delay_out) { + struct GNUNET_MQ_Envelope *env; struct TrafficMetricMessage *msg; - msg = GNUNET_new (struct TrafficMetricMessage); - msg->header.size = htons (sizeof (struct TrafficMetricMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC); + if (NULL == handle->mq) + return; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC); msg->reserved = htonl (0); msg->peer = *peer; GNUNET_ATS_properties_hton (&msg->properties, prop); msg->delay_in = GNUNET_TIME_relative_hton (delay_in); msg->delay_out = GNUNET_TIME_relative_hton (delay_out); - schedule_control_transmit (handle, - sizeof (struct TrafficMetricMessage), - &send_metric, - msg); -} - - -/** - * Offer the transport service the HELLO of another peer. Note that - * the transport service may just ignore this message if the HELLO is - * malformed or useless due to our local configuration. - * - * @param handle connection to transport service - * @param hello the hello message - * @param cont continuation to call when HELLO has been sent, - * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail - * tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success - * @param cont_cls closure for @a cont - * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure, - * in case of failure @a cont will not be called - * - */ -struct GNUNET_TRANSPORT_OfferHelloHandle * -GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, - const struct GNUNET_MessageHeader *hello, - GNUNET_SCHEDULER_TaskCallback cont, - void *cont_cls) -{ - struct GNUNET_TRANSPORT_OfferHelloHandle *ohh; - struct GNUNET_MessageHeader *msg; - struct GNUNET_PeerIdentity peer; - uint16_t size; - - if (NULL == handle->client) - return NULL; - GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); - size = ntohs (hello->size); - GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); - if (GNUNET_OK != - GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, - &peer)) - { - GNUNET_break (0); - return NULL; - } - - msg = GNUNET_malloc (size); - memcpy (msg, hello, size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Offering HELLO message of `%s' to transport for validation.\n", - GNUNET_i2s (&peer)); - - ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle); - ohh->th = handle; - ohh->cont = cont; - ohh->cls = cont_cls; - ohh->msg = msg; - ohh->tth = schedule_control_transmit (handle, - size, - &send_hello, - ohh); - GNUNET_CONTAINER_DLL_insert (handle->oh_head, - handle->oh_tail, - ohh); - return ohh; -} - - -/** - * Cancel the request to transport to offer the HELLO message - * - * @param ohh the handle for the operation to cancel - */ -void -GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh) -{ - struct GNUNET_TRANSPORT_Handle *th = ohh->th; - - cancel_control_transmit (ohh->th, ohh->tth); - GNUNET_CONTAINER_DLL_remove (th->oh_head, - th->oh_tail, - ohh); - GNUNET_free (ohh->msg); - GNUNET_free (ohh); + GNUNET_MQ_send (handle->mq, + env); } @@ -1505,76 +1099,6 @@ GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle, } -/** - * Task to call the HelloUpdateCallback of the GetHelloHandle - * - * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle` - */ -static void -call_hello_update_cb_async (void *cls) -{ - struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; - - GNUNET_assert (NULL != ghh->handle->my_hello); - GNUNET_assert (NULL != ghh->notify_task); - ghh->notify_task = NULL; - ghh->rec (ghh->rec_cls, - ghh->handle->my_hello); -} - - -/** - * Obtain the HELLO message for this peer. The callback given in this function - * is never called synchronously. - * - * @param handle connection to transport service - * @param rec function to call with the HELLO, sender will be our peer - * identity; message and sender will be NULL on timeout - * (handshake with transport service pending/failed). - * cost estimate will be 0. - * @param rec_cls closure for @a rec - * @return handle to cancel the operation - */ -struct GNUNET_TRANSPORT_GetHelloHandle * -GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, - GNUNET_TRANSPORT_HelloUpdateCallback rec, - void *rec_cls) -{ - struct GNUNET_TRANSPORT_GetHelloHandle *hwl; - - hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle); - hwl->rec = rec; - hwl->rec_cls = rec_cls; - hwl->handle = handle; - GNUNET_CONTAINER_DLL_insert (handle->hwl_head, - handle->hwl_tail, - hwl); - if (NULL != handle->my_hello) - hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async, - hwl); - return hwl; -} - - -/** - * Stop receiving updates about changes to our HELLO message. - * - * @param ghh handle to cancel - */ -void -GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh) -{ - struct GNUNET_TRANSPORT_Handle *handle = ghh->handle; - - if (NULL != ghh->notify_task) - GNUNET_SCHEDULER_cancel (ghh->notify_task); - GNUNET_CONTAINER_DLL_remove (handle->hwl_head, - handle->hwl_tail, - ghh); - GNUNET_free (ghh); -} - - /** * Connect to the transport service. Note that the connection may * complete (or fail) asynchronously. @@ -1629,40 +1153,35 @@ GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_TRANSPORT_NotifyDisconnect nd, GNUNET_TRANSPORT_NotifyExcessBandwidth neb) { - struct GNUNET_TRANSPORT_Handle *ret; + struct GNUNET_TRANSPORT_Handle *h; - ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle); + h = GNUNET_new (struct GNUNET_TRANSPORT_Handle); if (NULL != self) { - ret->self = *self; - ret->check_self = GNUNET_YES; + h->self = *self; + h->check_self = GNUNET_YES; } - ret->cfg = cfg; - ret->cls = cls; - ret->rec = rec; - ret->nc_cb = nc; - ret->nd_cb = nd; - ret->neb_cb = neb; - ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; + h->cfg = cfg; + h->cls = cls; + h->rec = rec; + h->nc_cb = nc; + h->nd_cb = nd; + h->neb_cb = neb; + h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); - ret->client = GNUNET_CLIENT_connect ("transport", - cfg); - if (NULL == ret->client) + reconnect (h); + if (NULL == h->mq) { - GNUNET_free (ret); + GNUNET_free (h); return NULL; } - ret->neighbours = + h->neighbours = GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES); - ret->ready_heap = + h->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - schedule_control_transmit (ret, - sizeof (struct StartMessage), - &send_start, - ret); - return ret; + return h; } @@ -1694,8 +1213,6 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) } GNUNET_free_non_null (handle->my_hello); handle->my_hello = NULL; - GNUNET_assert (NULL == handle->hwl_head); - GNUNET_assert (NULL == handle->hwl_tail); GNUNET_CONTAINER_heap_destroy (handle->ready_heap); handle->ready_heap = NULL; GNUNET_free (handle); diff --git a/src/transport/transport_api_get_hello.c b/src/transport/transport_api_get_hello.c index 8087159c6..9a65616a9 100644 --- a/src/transport/transport_api_get_hello.c +++ b/src/transport/transport_api_get_hello.c @@ -34,60 +34,179 @@ /** - * Linked list of functions to call whenever our HELLO is updated. + * Functions to call with this peer's HELLO. */ struct GNUNET_TRANSPORT_GetHelloHandle { /** - * This is a doubly linked list. + * Our configuration. */ - struct GNUNET_TRANSPORT_GetHelloHandle *next; - - /** - * This is a doubly linked list. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *prev; + const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Transport handle. */ - struct GNUNET_TRANSPORT_Handle *handle; + struct GNUNET_MQ_Handle *mq; /** * Callback to call once we got our HELLO. */ GNUNET_TRANSPORT_HelloUpdateCallback rec; + /** + * Closure for @e rec. + */ + void *rec_cls; + /** * Task for calling the HelloUpdateCallback when we already have a HELLO */ struct GNUNET_SCHEDULER_Task *notify_task; /** - * Closure for @e rec. + * ID of the task trying to reconnect to the service. */ - void *rec_cls; + struct GNUNET_SCHEDULER_Task *reconnect_task; + + /** + * Delay until we try to reconnect. + */ + struct GNUNET_TIME_Relative reconnect_delay; }; +/** + * Function we use for checking incoming HELLO messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param msg message received + * @return #GNUNET_OK if message is well-formed + */ +static int +check_hello (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PeerIdentity me; + + if (GNUNET_OK != + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, + &me)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n", + (unsigned int) ntohs (msg->size), + GNUNET_i2s (&me)); + return GNUNET_OK; +} + /** - * Task to call the HelloUpdateCallback of the GetHelloHandle + * Function we use for handling incoming HELLO messages. * - * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle` + * @param cls closure, a `struct GNUNET_TRANSPORT_GetHelloHandle *` + * @param msg message received */ static void -call_hello_update_cb_async (void *cls) +handle_hello (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; - GNUNET_assert (NULL != ghh->handle->my_hello); - GNUNET_assert (NULL != ghh->notify_task); - ghh->notify_task = NULL; ghh->rec (ghh->rec_cls, - ghh->handle->my_hello); + msg); +} + + +/** + * Function that will schedule the job that will try + * to connect us again to the client. + * + * @param ghh transport service to reconnect + */ +static void +schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh); + + +/** + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Error receiving from transport service, disconnecting temporarily.\n"); + GNUNET_MQ_destroy (ghh->mq); + ghh->mq = NULL; + schedule_reconnect (ghh); +} + + +/** + * Try again to connect to transport service. + * + * @param cls the handle to the transport service + */ +static void +reconnect (void *cls) +{ + GNUNET_MQ_hd_var_size (hello, + GNUNET_MESSAGE_TYPE_HELLO, + struct GNUNET_MessageHeader); + struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_hello_handler (ghh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct StartMessage *s; + + ghh->reconnect_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to transport service.\n"); + GNUNET_assert (NULL == ghh->mq); + ghh->mq = GNUNET_CLIENT_connecT (ghh->cfg, + "transport", + handlers, + &mq_error_handler, + ghh); + if (NULL == ghh->mq) + return; + env = GNUNET_MQ_msg (s, + GNUNET_MESSAGE_TYPE_TRANSPORT_START); + s->options = htonl (0); + GNUNET_MQ_send (ghh->mq, + env); +} + + +/** + * Function that will schedule the job that will try + * to connect us again to the client. + * + * @param ghh transport service to reconnect + */ +static void +schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh) +{ + ghh->reconnect_task = + GNUNET_SCHEDULER_add_delayed (ghh->reconnect_delay, + &reconnect, + ghh); + ghh->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ghh->reconnect_delay); } @@ -95,7 +214,7 @@ call_hello_update_cb_async (void *cls) * Obtain the HELLO message for this peer. The callback given in this function * is never called synchronously. * - * @param handle connection to transport service + * @param cfg configuration * @param rec function to call with the HELLO, sender will be our peer * identity; message and sender will be NULL on timeout * (handshake with transport service pending/failed). @@ -104,23 +223,23 @@ call_hello_update_cb_async (void *cls) * @return handle to cancel the operation */ struct GNUNET_TRANSPORT_GetHelloHandle * -GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, +GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_TRANSPORT_HelloUpdateCallback rec, void *rec_cls) { - struct GNUNET_TRANSPORT_GetHelloHandle *hwl; - - hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle); - hwl->rec = rec; - hwl->rec_cls = rec_cls; - hwl->handle = handle; - GNUNET_CONTAINER_DLL_insert (handle->hwl_head, - handle->hwl_tail, - hwl); - if (NULL != handle->my_hello) - hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async, - hwl); - return hwl; + struct GNUNET_TRANSPORT_GetHelloHandle *ghh; + + ghh = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle); + ghh->rec = rec; + ghh->rec_cls = rec_cls; + ghh->cfg = cfg; + reconnect (ghh); + if (NULL == ghh->mq) + { + GNUNET_free (ghh); + return NULL; + } + return ghh; } @@ -132,15 +251,13 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, void GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh) { - struct GNUNET_TRANSPORT_Handle *handle = ghh->handle; - - if (NULL != ghh->notify_task) - GNUNET_SCHEDULER_cancel (ghh->notify_task); - GNUNET_CONTAINER_DLL_remove (handle->hwl_head, - handle->hwl_tail, - ghh); + if (NULL != ghh->mq) + { + GNUNET_MQ_destroy (ghh->mq); + ghh->mq = NULL; + } GNUNET_free (ghh); } -/* end of transport_api_hello.c */ +/* end of transport_api_get_hello.c */ diff --git a/src/transport/transport_api_offer_hello.c b/src/transport/transport_api_offer_hello.c index 0abce2d62..951ab9ba4 100644 --- a/src/transport/transport_api_offer_hello.c +++ b/src/transport/transport_api_offer_hello.c @@ -23,31 +23,23 @@ * @brief library to offer HELLOs to transport service * @author Christian Grothoff */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_hello_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_transport_service.h" + /** * Entry in linked list for all offer-HELLO requests. */ struct GNUNET_TRANSPORT_OfferHelloHandle { - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *prev; - - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *next; /** * Transport service handle we use for transmission. */ - struct GNUNET_TRANSPORT_Handle *th; - - /** - * Transmission handle for this request. - */ - struct GNUNET_TRANSPORT_TransmitHandle *tth; + struct GNUNET_MQ_Handle *mq; /** * Function to call once we are done. @@ -59,20 +51,31 @@ struct GNUNET_TRANSPORT_OfferHelloHandle */ void *cls; - /** - * The HELLO message to be transmitted. - */ - struct GNUNET_MessageHeader *msg; }; +/** + * Done sending HELLO message to the service, notify application. + * + * @param cls the handle for the operation + */ +static void +finished_hello (void *cls) +{ + struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls; + + if (NULL != ohh->cont) + ohh->cont (ohh->cls); + GNUNET_TRANSPORT_offer_hello_cancel (ohh); +} + /** * Offer the transport service the HELLO of another peer. Note that * the transport service may just ignore this message if the HELLO is * malformed or useless due to our local configuration. * - * @param handle connection to transport service + * @param cfg configuration * @param hello the hello message * @param cont continuation to call when HELLO has been sent, * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail @@ -83,46 +86,43 @@ struct GNUNET_TRANSPORT_OfferHelloHandle * */ struct GNUNET_TRANSPORT_OfferHelloHandle * -GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, +GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_MessageHeader *hello, GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls) { - struct GNUNET_TRANSPORT_OfferHelloHandle *ohh; - struct GNUNET_MessageHeader *msg; + struct GNUNET_TRANSPORT_OfferHelloHandle *ohh + = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle); + struct GNUNET_MQ_Envelope *env; struct GNUNET_PeerIdentity peer; - uint16_t size; - if (NULL == handle->mq) - return NULL; - GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); - size = ntohs (hello->size); - GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, &peer)) { GNUNET_break (0); + GNUNET_free (ohh); + return NULL; + } + ohh->mq = GNUNET_CLIENT_connecT (cfg, + "transport", + NULL, + NULL, + ohh); + if (NULL == ohh->mq) + { + GNUNET_free (ohh); return NULL; } - - msg = GNUNET_malloc (size); - memcpy (msg, hello, size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Offering HELLO message of `%s' to transport for validation.\n", - GNUNET_i2s (&peer)); - ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle); - ohh->th = handle; ohh->cont = cont; ohh->cls = cont_cls; - ohh->msg = msg; - ohh->tth = schedule_control_transmit (handle, - size, - &send_hello, - ohh); - GNUNET_CONTAINER_DLL_insert (handle->oh_head, - handle->oh_tail, - ohh); + GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); + env = GNUNET_MQ_msg_copy (hello); + GNUNET_MQ_notify_sent (env, + &finished_hello, + ohh); + GNUNET_MQ_send (ohh->mq, + env); return ohh; } @@ -135,13 +135,7 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, void GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh) { - struct GNUNET_TRANSPORT_Handle *th = ohh->th; - - cancel_control_transmit (ohh->th, ohh->tth); - GNUNET_CONTAINER_DLL_remove (th->oh_head, - th->oh_tail, - ohh); - GNUNET_free (ohh->msg); + GNUNET_MQ_destroy (ohh->mq); GNUNET_free (ohh); } -- cgit v1.2.3