From 6e3599bab213760c66f13f6103ebf650bbe5b7e9 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 8 Jul 2016 16:34:31 +0000 Subject: migrate transport_core API to MQ --- src/cadet/gnunet-service-cadet_peer.c | 39 +- src/core/test_core_api.c | 6 +- src/core/test_core_api_reliability.c | 10 +- src/core/test_core_quota_compliance.c | 10 +- src/dht/gnunet-service-dht.c | 22 +- src/dht/gnunet-service-dht.h | 4 - src/dht/gnunet-service-dht_neighbours.c | 8 +- src/hostlist/gnunet-daemon-hostlist_client.c | 76 +- src/hostlist/test_gnunet_daemon_hostlist.c | 2 +- .../test_gnunet_daemon_hostlist_reconnect.c | 2 +- src/include/gnunet_transport_core_service.h | 94 +- src/include/gnunet_transport_service.h | 15 +- src/peerinfo-tool/gnunet-peerinfo.c | 18 +- .../gnunet-service-testbed_connectionpool.c | 3 +- .../gnunet-service-testbed_connectionpool.h | 4 +- src/testbed/gnunet-service-testbed_oc.c | 35 +- src/topology/gnunet-daemon-topology.c | 55 +- src/transport/Makefile.am | 6 +- src/transport/transport-testing.c | 6 +- src/transport/transport_api.c | 1245 ++++++-------------- src/transport/transport_api_get_hello.c | 199 +++- src/transport/transport_api_offer_hello.c | 98 +- 22 files changed, 787 insertions(+), 1170 deletions(-) diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c index fa16db4bb..101b9e22a 100644 --- a/src/cadet/gnunet-service-cadet_peer.c +++ b/src/cadet/gnunet-service-cadet_peer.c @@ -245,14 +245,14 @@ static unsigned long long drop_percent; static struct GNUNET_CORE_Handle *core_handle; /** - * Handle to communicate with ATS. + * Our configuration; */ -static struct GNUNET_ATS_ConnectivityHandle *ats_ch; +static const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Handle to try to start new connections. + * Handle to communicate with ATS. */ -static struct GNUNET_TRANSPORT_Handle *transport_handle; +static struct GNUNET_ATS_ConnectivityHandle *ats_ch; /** * Shutdown falg. @@ -557,6 +557,7 @@ core_init (void *cls, const struct GNUNET_CONFIGURATION_Handle *c = cls; static int i = 0; + cfg = c; LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id))) { @@ -1841,24 +1842,6 @@ GCP_init (const struct GNUNET_CONFIGURATION_Handle *c) NULL, /* Don't notify about all outbound messages */ GNUNET_NO, /* For header-only out notification */ core_handlers); /* Register these handlers */ - if (GNUNET_YES != - GNUNET_CONFIGURATION_get_value_yesno (c, "CADET", "DISABLE_TRY_CONNECT")) - { - transport_handle = GNUNET_TRANSPORT_connect (c, &my_full_id, NULL, /* cls */ - /* Notify callbacks */ - NULL, NULL, NULL); - } - else - { - LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "* DISABLE TRYING CONNECT in config *\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "* Use this only for test purposes. *\n"); - LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n"); - transport_handle = NULL; - } - - - if (NULL == core_handle) { GNUNET_break (0); @@ -1886,11 +1869,6 @@ GCP_shutdown (void) GNUNET_CORE_disconnect (core_handle); core_handle = NULL; } - if (NULL != transport_handle) - { - GNUNET_TRANSPORT_disconnect (transport_handle); - transport_handle = NULL; - } if (NULL != ats_ch) { GNUNET_ATS_connectivity_done (ats_ch); @@ -2591,7 +2569,10 @@ GCP_try_connect (struct CadetPeer *peer) struct GNUNET_HELLO_Message *hello; struct GNUNET_MessageHeader *mh; - if (NULL == transport_handle) + if (GNUNET_YES != + GNUNET_CONFIGURATION_get_value_yesno (cfg, + "CADET", + "DISABLE_TRY_CONNECT")) return; GCC_check_connections (); if (GNUNET_YES == GCP_is_neighbor (peer)) @@ -2606,7 +2587,7 @@ GCP_try_connect (struct CadetPeer *peer) GNUNET_TRANSPORT_offer_hello_cancel (peer->hello_offer); peer->hello_offer = NULL; } - peer->hello_offer = GNUNET_TRANSPORT_offer_hello (transport_handle, + peer->hello_offer = GNUNET_TRANSPORT_offer_hello (cfg, mh, &hello_offer_done, peer); diff --git a/src/core/test_core_api.c b/src/core/test_core_api.c index 92ee038da..43f4c421e 100644 --- a/src/core/test_core_api.c +++ b/src/core/test_core_api.c @@ -65,9 +65,9 @@ process_hello (void *cls, "Received (my) `%s' from transport service\n", "HELLO"); GNUNET_assert (message != NULL); if ((p == &p1) && (p2.th != NULL)) - GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL); if ((p == &p2) && (p1.th != NULL)) - GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL); } @@ -280,7 +280,7 @@ setup_peer (struct PeerContext *p, GNUNET_assert (NULL != p->th); p->ats = GNUNET_ATS_connectivity_init (p->cfg); GNUNET_assert (NULL != p->ats); - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p); + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p); GNUNET_free (binary); } diff --git a/src/core/test_core_api_reliability.c b/src/core/test_core_api_reliability.c index c7672afdb..94c223b74 100644 --- a/src/core/test_core_api_reliability.c +++ b/src/core/test_core_api_reliability.c @@ -412,14 +412,14 @@ process_hello (void *cls, GNUNET_assert (message != NULL); p->hello = GNUNET_copy_message (message); if ((p == &p1) && (p2.th != NULL)) - GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL); if ((p == &p2) && (p1.th != NULL)) - GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL); if ((p == &p1) && (p2.hello != NULL)) - GNUNET_TRANSPORT_offer_hello (p1.th, p2.hello, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p1.cfg, p2.hello, NULL, NULL); if ((p == &p2) && (p1.hello != NULL)) - GNUNET_TRANSPORT_offer_hello (p2.th, p1.hello, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p2.cfg, p1.hello, NULL, NULL); } @@ -442,7 +442,7 @@ setup_peer (struct PeerContext *p, GNUNET_assert (p->th != NULL); p->ats = GNUNET_ATS_connectivity_init (p->cfg); GNUNET_assert (NULL != p->ats); - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p); + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p); GNUNET_free (binary); } diff --git a/src/core/test_core_quota_compliance.c b/src/core/test_core_quota_compliance.c index 05b1ae3d9..28d836e2e 100644 --- a/src/core/test_core_quota_compliance.c +++ b/src/core/test_core_quota_compliance.c @@ -547,14 +547,14 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message) p->hello = GNUNET_malloc (ntohs (message->size)); memcpy (p->hello, message, ntohs (message->size)); if ((p == &p1) && (p2.th != NULL)) - GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL); if ((p == &p2) && (p1.th != NULL)) - GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL); if ((p == &p1) && (p2.hello != NULL)) - GNUNET_TRANSPORT_offer_hello (p1.th, p2.hello, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p1.cfg, p2.hello, NULL, NULL); if ((p == &p2) && (p1.hello != NULL)) - GNUNET_TRANSPORT_offer_hello (p2.th, p1.hello, NULL, NULL); + GNUNET_TRANSPORT_offer_hello (p2.cfg, p1.hello, NULL, NULL); } @@ -579,7 +579,7 @@ setup_peer (struct PeerContext *p, const char *cfgname) GNUNET_assert (p->th != NULL); p->ats = GNUNET_ATS_connectivity_init (p->cfg); GNUNET_assert (NULL != p->ats); - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p); + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p); GNUNET_free (binary); } diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index abdd77548..e3b9d59a4 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -66,11 +66,6 @@ struct GNUNET_SERVER_Handle *GDS_server; */ struct GNUNET_MessageHeader *GDS_my_hello; -/** - * Handle to the transport service, for getting our hello - */ -struct GNUNET_TRANSPORT_Handle *GDS_transport_handle; - /** * Handle to get our current HELLO. */ @@ -112,11 +107,6 @@ shutdown_task (void *cls) GNUNET_TRANSPORT_get_hello_cancel (ghh); ghh = NULL; } - if (GDS_transport_handle != NULL) - { - GNUNET_TRANSPORT_disconnect (GDS_transport_handle); - GDS_transport_handle = NULL; - } GDS_NEIGHBOURS_done (); GDS_DATACACHE_done (); GDS_ROUTING_done (); @@ -170,15 +160,9 @@ run (void *cls, } GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); - GDS_transport_handle = - GNUNET_TRANSPORT_connect (GDS_cfg, NULL, NULL, NULL, NULL, NULL); - if (GDS_transport_handle == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to connect to transport service!\n")); - return; - } - ghh = GNUNET_TRANSPORT_get_hello (GDS_transport_handle, &process_hello, NULL); + ghh = GNUNET_TRANSPORT_get_hello (GDS_cfg, + &process_hello, + NULL); } diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h index 6f641cb96..4684c2324 100644 --- a/src/dht/gnunet-service-dht.h +++ b/src/dht/gnunet-service-dht.h @@ -57,9 +57,5 @@ extern struct GNUNET_SERVER_Handle *GDS_server; */ extern struct GNUNET_MessageHeader *GDS_my_hello; -/** - * Handle to the transport service, for getting our hello - */ -extern struct GNUNET_TRANSPORT_Handle *GDS_transport_handle; #endif diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index 4add3c4ae..b24a95ab2 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -592,13 +592,11 @@ try_connect (const struct GNUNET_PeerIdentity *pid, ci, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); } - if ( (NULL != GDS_transport_handle) && - (NULL != ci->oh) && + if ( (NULL != ci->oh) && (NULL != h) ) GNUNET_TRANSPORT_offer_hello_cancel (ci->oh); - if ( (NULL != GDS_transport_handle) && - (NULL != h) ) - ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle, + if (NULL != h) + ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg, h, &offer_hello_done, ci); diff --git a/src/hostlist/gnunet-daemon-hostlist_client.c b/src/hostlist/gnunet-daemon-hostlist_client.c index df0cabe1d..c8c74a9ba 100644 --- a/src/hostlist/gnunet-daemon-hostlist_client.c +++ b/src/hostlist/gnunet-daemon-hostlist_client.c @@ -142,6 +142,14 @@ struct Hostlist }; +struct HelloOffer +{ + struct HelloOffer *next; + struct HelloOffer *prev; + struct GNUNET_TRANSPORT_OfferHelloHandle *ohh; +}; + + /** * Our configuration. */ @@ -152,11 +160,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; */ static struct GNUNET_STATISTICS_Handle *stats; -/** - * Transport handle. - */ -static struct GNUNET_TRANSPORT_Handle *transport; - /** * Proxy hostname or ip we are using (can be NULL). */ @@ -312,6 +315,25 @@ static unsigned int stat_hellos_obtained; */ static unsigned int stat_connection_count; +static struct HelloOffer *ho_head; + +static struct HelloOffer *ho_tail; + + +/** + * Hello offer complete. Clean up. + */ +static void +done_offer_hello (void *cls) +{ + struct HelloOffer *ho = cls; + + GNUNET_CONTAINER_DLL_remove (ho_head, + ho_tail, + ho); + GNUNET_free (ho); +} + /** * Process downloaded bits by calling callback on each HELLO. @@ -331,6 +353,7 @@ callback_download (void *ptr, static char download_buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; const char *cbuf = ptr; const struct GNUNET_MessageHeader *msg; + struct HelloOffer *ho; size_t total; size_t cpy; size_t left; @@ -390,7 +413,22 @@ callback_download (void *ptr, ("# valid HELLOs downloaded from hostlist servers"), 1, GNUNET_NO); stat_hellos_obtained++; - GNUNET_TRANSPORT_offer_hello (transport, msg, NULL, NULL); + + ho = GNUNET_new (struct HelloOffer); + ho->ohh = GNUNET_TRANSPORT_offer_hello (cfg, + msg, + &done_offer_hello, + ho); + if (NULL == ho->ohh) + { + GNUNET_free (ho); + } + else + { + GNUNET_CONTAINER_DLL_insert (ho_head, + ho_tail, + ho); + } } else { @@ -405,7 +443,9 @@ callback_download (void *ptr, stat_hellos_obtained++; return total; } - memmove (download_buffer, &download_buffer[msize], download_pos - msize); + memmove (download_buffer, + &download_buffer[msize], + download_pos - msize); download_pos -= msize; } return total; @@ -1532,13 +1572,6 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c, GNUNET_break (0); return GNUNET_SYSERR; } - transport = GNUNET_TRANSPORT_connect (c, NULL, NULL, NULL, NULL, NULL); - if (NULL == transport) - { - GNUNET_break (0); - curl_global_cleanup (); - return GNUNET_SYSERR; - } cfg = c; stats = st; @@ -1687,8 +1720,18 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c, void GNUNET_HOSTLIST_client_stop () { + struct HelloOffer *ho; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hostlist client shutdown\n"); + while (NULL != (ho = ho_head)) + { + GNUNET_CONTAINER_DLL_remove (ho_head, + ho_tail, + ho); + GNUNET_TRANSPORT_offer_hello_cancel (ho->ohh); + GNUNET_free (ho); + } if (NULL != sget) { GNUNET_STATISTICS_get_cancel (sget); @@ -1725,11 +1768,6 @@ GNUNET_HOSTLIST_client_stop () ti_check_download = NULL; curl_global_cleanup (); } - if (NULL != transport) - { - GNUNET_TRANSPORT_disconnect (transport); - transport = NULL; - } GNUNET_free_non_null (proxy); proxy = NULL; GNUNET_free_non_null (proxy_username); diff --git a/src/hostlist/test_gnunet_daemon_hostlist.c b/src/hostlist/test_gnunet_daemon_hostlist.c index 5f8ece9b8..6a5850c4d 100644 --- a/src/hostlist/test_gnunet_daemon_hostlist.c +++ b/src/hostlist/test_gnunet_daemon_hostlist.c @@ -147,7 +147,7 @@ setup_peer (struct PeerContext *p, const char *cfgname) p->th = GNUNET_TRANSPORT_connect (p->cfg, NULL, p, NULL, ¬ify_connect, NULL); GNUNET_assert (p->th != NULL); - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p); + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p); GNUNET_free (binary); } diff --git a/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c b/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c index 3dad137a2..30f26717f 100644 --- a/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c +++ b/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c @@ -116,7 +116,7 @@ setup_peer (struct PeerContext *p, const char *cfgname) p->th = GNUNET_TRANSPORT_connect (p->cfg, NULL, p, NULL, ¬ify_connect, NULL); GNUNET_assert (p->th != NULL); - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p); + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p); GNUNET_free (binary); } diff --git a/src/include/gnunet_transport_core_service.h b/src/include/gnunet_transport_core_service.h index 816d5efaa..6dada4f54 100644 --- a/src/include/gnunet_transport_core_service.h +++ b/src/include/gnunet_transport_core_service.h @@ -49,50 +49,42 @@ extern "C" #define GNUNET_TRANSPORT_CORE_VERSION 0x00000000 -/** - * Function called by the transport for each received message. - * - * @param cls closure - * @param peer (claimed) identity of the other peer - * @param message the message - */ -typedef void -(*GNUNET_TRANSPORT_ReceiveCallback) (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message); - - /** * Opaque handle to the service. */ -struct GNUNET_TRANSPORT_Handle; +struct GNUNET_TRANSPORT_CoreHandle; /** - * Function called to notify CORE service that another - * @a peer connected to us. + * Function called to notify transport users that another + * peer connected to us. * * @param cls closure - * @param peer the peer that connected, never NULL - * @param mq message queue for sending messages to this peer + * @param peer the peer that connected + * @param mq message queue to use to transmit to @a peer + * @return closure to use in MQ handlers */ -typedef void -(*GNUNET_TRANSPORT_NotifyConnect) (void *cls, +typedef void * +(*GNUNET_TRANSPORT_NotifyConnecT) (void *cls, const struct GNUNET_PeerIdentity *peer, struct GNUNET_MQ_Handle *mq); /** - * Function called to notify CORE service that another - * @a peer disconnected from us. The associated message - * queue must not be used henceforth. + * Function called to notify transport users that another peer + * disconnected from us. The message queue that was given to the + * connect notification will be destroyed and must not be used + * henceforth. * - * @param cls closure - * @param peer the peer that disconnected, never NULL + * @param cls closure from #GNUNET_TRANSPORT_connecT + * @param peer the peer that disconnected + * @param handlers_cls closure of the handlers, was returned from the + * connect notification callback */ typedef void -(*GNUNET_TRANSPORT_NotifyDisconnect) (void *cls, - const struct GNUNET_PeerIdentity *peer); +(*GNUNET_TRANSPORT_NotifyDisconnecT) (void *cls, + const struct GNUNET_PeerIdentity *peer, + void *handler_cls); /** @@ -108,34 +100,41 @@ typedef void * * @param cls the closure * @param neighbour peer that we have excess bandwidth to + * @param handlers_cls closure of the handlers, was returned from the + * connect notification callback */ typedef void -(*GNUNET_TRANSPORT_NotifyExcessBandwidth)(void *cls, - const struct GNUNET_PeerIdentity *neighbour); +(*GNUNET_TRANSPORT_NotifyExcessBandwidtH)(void *cls, + const struct GNUNET_PeerIdentity *neighbour, + void *handlers_cls); + /** - * Connect to the transport service. + * Connect to the transport service. Note that the connection may + * complete (or fail) asynchronously. * * @param cfg configuration to use - * @param self our own identity (if API should check that it matches + * @param self our own identity (API should check that it matches * the identity found by transport), or NULL (no check) - * @param cls closure for the callbacks - * @param rec_handlers NULL-terminated array of handlers for incoming - * messages, or NULL + * @param handlers array of message handlers; note that the + * closures provided will be ignored and replaced + * with the respective return value from @a nc + * @param handlers array with handlers to call when we receive messages, or NULL + * @param cls closure for the @a nc, @a nd and @a neb callbacks * @param nc function to call on connect events, or NULL * @param nd function to call on disconnect events, or NULL - * @param neb function to call if we have excess bandwidth to a peer + * @param neb function to call if we have excess bandwidth to a peer, or NULL * @return NULL on error */ -struct GNUNET_TRANSPORT_Handle * +struct GNUNET_TRANSPORT_CoreHandle * GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_PeerIdentity *self, + const struct GNUNET_MQ_MessageHandler *handlers, void *cls, - GNUNET_MQ_MessageHandler *rec_handlers, - GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - GNUNET_TRANSPORT_NotifyExcessBandwidth neb); + GNUNET_TRANSPORT_NotifyConnecT nc, + GNUNET_TRANSPORT_NotifyDisconnecT nd, + GNUNET_TRANSPORT_NotifyExcessBandwidtH neb); /** @@ -144,22 +143,19 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param handle handle returned from connect */ void -GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle); +GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle); /** - * Checks if a given peer is connected to us. Convenience - * API in case a client does not track connect/disconnect - * events internally. + * Checks if a given peer is connected to us and get the message queue. * * @param handle connection to transport service * @param peer the peer to check - * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected) + * @return NULL if disconnected, otherwise message queue for @a peer */ -int -GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle, - const struct GNUNET_PeerIdentity *peer); - +struct GNUNET_MQ_Handle * +GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, + const struct GNUNET_PeerIdentity *peer); #if 0 /* keep Emacsens' auto-indent happy */ diff --git a/src/include/gnunet_transport_service.h b/src/include/gnunet_transport_service.h index 96182424e..b40763b92 100644 --- a/src/include/gnunet_transport_service.h +++ b/src/include/gnunet_transport_service.h @@ -65,12 +65,6 @@ typedef void const struct GNUNET_MessageHeader *message); -/** - * Opaque handle to the service. - */ -struct GNUNET_TRANSPORT_Handle; - - /** * Function called to notify transport users that another * peer connected to us. @@ -82,6 +76,7 @@ typedef void (*GNUNET_TRANSPORT_NotifyConnect) (void *cls, const struct GNUNET_PeerIdentity *peer); + /** * Function called to notify transport users that another * peer disconnected from us. @@ -288,13 +283,13 @@ struct GNUNET_TRANSPORT_GetHelloHandle; * Obtain updates on changes to the HELLO message for this peer. The callback * given in this function is never called synchronously. * - * @param handle connection to transport service + * @param cfg configuration * @param rec function to call with the HELLO * @param rec_cls closure for @a rec * @return handle to cancel the operation */ struct GNUNET_TRANSPORT_GetHelloHandle * -GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, +GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_TRANSPORT_HelloUpdateCallback rec, void *rec_cls); @@ -319,7 +314,7 @@ struct GNUNET_TRANSPORT_OfferHelloHandle; * the transport service may just ignore this message if the HELLO is * malformed or useless due to our local configuration. * - * @param handle connection to transport service + * @param cfg configuration * @param hello the hello message * @param cont continuation to call when HELLO has been sent, * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail @@ -330,7 +325,7 @@ struct GNUNET_TRANSPORT_OfferHelloHandle; * */ struct GNUNET_TRANSPORT_OfferHelloHandle * -GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, +GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_MessageHeader *hello, GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls); diff --git a/src/peerinfo-tool/gnunet-peerinfo.c b/src/peerinfo-tool/gnunet-peerinfo.c index b6aa224fd..14f1e4604 100644 --- a/src/peerinfo-tool/gnunet-peerinfo.c +++ b/src/peerinfo-tool/gnunet-peerinfo.c @@ -182,11 +182,6 @@ static struct GNUNET_SCHEDULER_Task * tt; */ static struct GNUNET_TRANSPORT_GetHelloHandle *gh; -/** - * Connection to transport service. - */ -static struct GNUNET_TRANSPORT_Handle *transport; - /** * Current iterator context (if active, otherwise NULL). */ @@ -641,11 +636,6 @@ shutdown_task (void *cls) GNUNET_TRANSPORT_get_hello_cancel (gh); gh = NULL; } - if (NULL != transport) - { - GNUNET_TRANSPORT_disconnect (transport); - transport = NULL; - } while (NULL != (pc = pc_head)) { GNUNET_CONTAINER_DLL_remove (pc_head, @@ -702,8 +692,6 @@ hello_callback (void *cls, &my_peer_identity)); GNUNET_TRANSPORT_get_hello_cancel (gh); gh = NULL; - GNUNET_TRANSPORT_disconnect (transport); - transport = NULL; if (NULL != dump_hello) dump_my_hello (); tt = GNUNET_SCHEDULER_add_now (&state_machine, NULL); @@ -740,11 +728,7 @@ testservice_task (void *cls, (GNUNET_YES == get_uri) || (NULL != dump_hello) ) { - transport = GNUNET_TRANSPORT_connect (cfg, - NULL, - NULL, - NULL, NULL, NULL); - gh = GNUNET_TRANSPORT_get_hello (transport, + gh = GNUNET_TRANSPORT_get_hello (cfg, &hello_callback, NULL); } diff --git a/src/testbed/gnunet-service-testbed_connectionpool.c b/src/testbed/gnunet-service-testbed_connectionpool.c index 0fa2a6456..47b6fab08 100644 --- a/src/testbed/gnunet-service-testbed_connectionpool.c +++ b/src/testbed/gnunet-service-testbed_connectionpool.c @@ -463,7 +463,8 @@ connection_ready (void *cls) entry->handle_core, entry->handle_transport, entry->handle_ats_connectivity, - entry->peer_identity); + entry->peer_identity, + entry->cfg); } diff --git a/src/testbed/gnunet-service-testbed_connectionpool.h b/src/testbed/gnunet-service-testbed_connectionpool.h index 589421840..54b37f6d5 100644 --- a/src/testbed/gnunet-service-testbed_connectionpool.h +++ b/src/testbed/gnunet-service-testbed_connectionpool.h @@ -85,13 +85,15 @@ GST_connection_pool_destroy (void); * @param ac the handle to ATS, can be NULL if it is not requested * @param peer_id the identity of the peer. Will be NULL if ch is NULL. In other * cases, its value being NULL means that CORE connection has failed. + * @param cfg configuration of the peer */ typedef void (*GST_connection_pool_connection_ready_cb) (void *cls, struct GNUNET_CORE_Handle *ch, struct GNUNET_TRANSPORT_Handle *th, struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *peer_id); + const struct GNUNET_PeerIdentity *peer_id, + const struct GNUNET_CONFIGURATION_Handle *cfg); /** diff --git a/src/testbed/gnunet-service-testbed_oc.c b/src/testbed/gnunet-service-testbed_oc.c index de462da7a..8902a359c 100644 --- a/src/testbed/gnunet-service-testbed_oc.c +++ b/src/testbed/gnunet-service-testbed_oc.c @@ -48,6 +48,11 @@ struct ConnectivitySuggestContext */ struct GNUNET_TRANSPORT_Handle *th_; + /** + * Configuration of the peer from cache. Do not free! + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + /** * The GetCacheHandle for the peer2's transport handle * (used to offer the HELLO to the peer). @@ -699,13 +704,15 @@ overlay_connect_notify (void *cls, * @param th the handle to TRANSPORT. Can be NULL if it is not requested * @param ac the handle to ATS. Can be NULL if it is not requested * @param my_identity the identity of our peer + * @param cfg configuration of the peer */ static void occ_cache_get_handle_ats_occ_cb (void *cls, struct GNUNET_CORE_Handle *ch, struct GNUNET_TRANSPORT_Handle *th, struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *my_identity) + const struct GNUNET_PeerIdentity *my_identity, + const struct GNUNET_CONFIGURATION_Handle *cfg) { struct OverlayConnectContext *occ = cls; struct LocalPeer2Context *lp2c; @@ -754,7 +761,8 @@ occ_cache_get_handle_ats_rocc_cb (void *cls, struct GNUNET_CORE_Handle *ch, struct GNUNET_TRANSPORT_Handle *th, struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *my_identity) + const struct GNUNET_PeerIdentity *my_identity, + const struct GNUNET_CONFIGURATION_Handle *cfg) { struct RemoteOverlayConnectCtx *rocc = cls; @@ -896,7 +904,7 @@ send_hello (void *cls) other_peer_str); GNUNET_free (other_peer_str); lp2c->ohh = - GNUNET_TRANSPORT_offer_hello (lp2c->tcc.th_, + GNUNET_TRANSPORT_offer_hello (lp2c->tcc.cfg, occ->hello, occ_hello_sent_cb, occ); @@ -922,13 +930,15 @@ send_hello (void *cls) * @param th the handle to TRANSPORT. Can be NULL if it is not requested * @param ac the handle to ATS. Can be NULL if it is not requested * @param ignore_ peer identity which is ignored in this callback - */ + * @param cfg configuration of the peer +*/ static void p2_transport_connect_cache_callback (void *cls, struct GNUNET_CORE_Handle *ch, struct GNUNET_TRANSPORT_Handle *th, struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *ignore_) + const struct GNUNET_PeerIdentity *ignore_, + const struct GNUNET_CONFIGURATION_Handle *cfg) { struct OverlayConnectContext *occ = cls; @@ -945,6 +955,7 @@ p2_transport_connect_cache_callback (void *cls, return; } occ->p2ctx.local.tcc.th_ = th; + occ->p2ctx.local.tcc.cfg = cfg; GNUNET_asprintf (&occ->emsg, "0x%llx: Timeout while offering HELLO to %s", occ->op_id, @@ -1068,7 +1079,8 @@ p1_transport_connect_cache_callback (void *cls, struct GNUNET_CORE_Handle *ch, struct GNUNET_TRANSPORT_Handle *th, struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *ignore_) + const struct GNUNET_PeerIdentity *ignore_, + const struct GNUNET_CONFIGURATION_Handle *cfg) { struct OverlayConnectContext *occ = cls; @@ -1092,7 +1104,7 @@ p1_transport_connect_cache_callback (void *cls, "0x%llx: Timeout while acquiring HELLO of peer %s", occ->op_id, GNUNET_i2s (&occ->peer_identity)); - occ->ghh = GNUNET_TRANSPORT_get_hello (occ->p1th_, + occ->ghh = GNUNET_TRANSPORT_get_hello (cfg, &hello_update_cb, occ); } @@ -1112,7 +1124,8 @@ occ_cache_get_handle_core_cb (void *cls, struct GNUNET_CORE_Handle *ch, struct GNUNET_TRANSPORT_Handle *th, struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *my_identity) + const struct GNUNET_PeerIdentity *my_identity, + const struct GNUNET_CONFIGURATION_Handle *cfg) { struct OverlayConnectContext *occ = cls; const struct GNUNET_MessageHeader *hello; @@ -1743,7 +1756,7 @@ attempt_connect_task (void *cls) GNUNET_i2s (&rocc->a_id), rocc->peer->id); rocc->ohh = - GNUNET_TRANSPORT_offer_hello (rocc->tcc.th_, + GNUNET_TRANSPORT_offer_hello (rocc->tcc.cfg, rocc->hello, &rocc_hello_sent_cb, rocc); @@ -1772,7 +1785,8 @@ rocc_cache_get_handle_transport_cb (void *cls, struct GNUNET_CORE_Handle *ch, struct GNUNET_TRANSPORT_Handle *th, struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *ignore_) + const struct GNUNET_PeerIdentity *ignore_, + const struct GNUNET_CONFIGURATION_Handle *cfg) { struct RemoteOverlayConnectCtx *rocc = cls; @@ -1783,6 +1797,7 @@ rocc_cache_get_handle_transport_cb (void *cls, return; } rocc->tcc.th_ = th; + rocc->tcc.cfg = cfg; if (GNUNET_YES == GNUNET_TRANSPORT_check_peer_connected (rocc->tcc.th_, &rocc->a_id)) diff --git a/src/topology/gnunet-daemon-topology.c b/src/topology/gnunet-daemon-topology.c index eddac8c8a..9baaf513d 100644 --- a/src/topology/gnunet-daemon-topology.c +++ b/src/topology/gnunet-daemon-topology.c @@ -141,11 +141,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; */ static struct GNUNET_CORE_Handle *handle; -/** - * Handle to the TRANSPORT service. - */ -static struct GNUNET_TRANSPORT_Handle *transport; - /** * Handle to the ATS service. */ @@ -179,6 +174,11 @@ static struct GNUNET_TRANSPORT_Blacklist *blacklist; */ static struct GNUNET_SCHEDULER_Task *add_task; +/** + * Active HELLO offering to transport service. + */ +static struct GNUNET_TRANSPORT_OfferHelloHandle *oh; + /** * Flag to disallow non-friend connections (pure F2F mode). */ @@ -1007,6 +1007,16 @@ read_friends_file (const struct GNUNET_CONFIGURATION_Handle *cfg) } +/** + * Hello offer complete. Clean up. + */ +static void +done_offer_hello (void *cls) +{ + oh = NULL; +} + + /** * This function is called whenever an encrypted HELLO message is * received. @@ -1055,11 +1065,12 @@ handle_encrypted_hello (void *cls, (friend_count < minimum_friend_count)) return GNUNET_OK; } - if (NULL != transport) - GNUNET_TRANSPORT_offer_hello (transport, - message, - NULL, - NULL); + if (NULL != oh) + GNUNET_TRANSPORT_offer_hello_cancel (oh); + oh = GNUNET_TRANSPORT_offer_hello (cfg, + message, + &done_offer_hello, + NULL); return GNUNET_OK; } @@ -1136,11 +1147,6 @@ cleaning_task (void *cls) GNUNET_PEERINFO_notify_cancel (peerinfo_notify); peerinfo_notify = NULL; } - if (NULL != transport) - { - GNUNET_TRANSPORT_disconnect (transport); - transport = NULL; - } if (NULL != handle) { GNUNET_CORE_disconnect (handle); @@ -1152,6 +1158,11 @@ cleaning_task (void *cls) GNUNET_SCHEDULER_cancel (add_task); add_task = NULL; } + if (NULL != oh) + { + GNUNET_TRANSPORT_offer_hello_cancel (oh); + oh = NULL; + } GNUNET_CONTAINER_multipeermap_iterate (peers, &free_peer, NULL); @@ -1223,12 +1234,6 @@ run (void *cls, &blacklist_check, NULL); ats = GNUNET_ATS_connectivity_init (cfg); - transport = GNUNET_TRANSPORT_connect (cfg, - NULL, - NULL, - NULL, - NULL, - NULL); handle = GNUNET_CORE_connect (cfg, NULL, &core_init, @@ -1239,14 +1244,6 @@ run (void *cls, handlers); GNUNET_SCHEDULER_add_shutdown (&cleaning_task, NULL); - if (NULL == transport) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to connect to `%s' service.\n"), - "transport"); - GNUNET_SCHEDULER_shutdown (); - return; - } if (NULL == handle) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am index 3a1170c10..48793bd87 100644 --- a/src/transport/Makefile.am +++ b/src/transport/Makefile.am @@ -163,10 +163,12 @@ libgnunettransporttesting_la_LDFLAGS = \ libgnunettransport_la_SOURCES = \ transport_api.c transport.h \ - transport_api_blacklist.c \ transport_api_address_to_string.c \ + transport_api_blacklist.c \ + transport_api_get_hello.c \ transport_api_monitor_peers.c \ - transport_api_monitor_plugins.c + transport_api_monitor_plugins.c \ + transport_api_offer_hello.c libgnunettransport_la_LIBADD = \ $(top_builddir)/src/hello/libgnunethello.la \ diff --git a/src/transport/transport-testing.c b/src/transport/transport-testing.c index 4a514ea72..4a3bf3c3e 100644 --- a/src/transport/transport-testing.c +++ b/src/transport/transport-testing.c @@ -246,7 +246,7 @@ offer_hello (void *cls) if (NULL != cc->oh) GNUNET_TRANSPORT_offer_hello_cancel (cc->oh); cc->oh = - GNUNET_TRANSPORT_offer_hello (cc->p1->th, + GNUNET_TRANSPORT_offer_hello (cc->p1->cfg, (const struct GNUNET_MessageHeader *) cc->p2->hello, &hello_offered, cc); @@ -380,7 +380,7 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_handle *tth GNUNET_TRANSPORT_TESTING_stop_peer (tth, p); return NULL; } - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &get_hello, p); GNUNET_assert (p->ghh != NULL); @@ -465,7 +465,7 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct PeerContext *p, ¬ify_disconnect); GNUNET_assert (NULL != p->th); p->ats = GNUNET_ATS_connectivity_init (p->cfg); - p->ghh = GNUNET_TRANSPORT_get_hello (p->th, + p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &get_hello, p); GNUNET_assert (NULL != p->ghh); diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c index 59f249686..e7db5493e 100644 --- a/src/transport/transport_api.c +++ b/src/transport/transport_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2013 GNUnet e.V. + Copyright (C) 2009-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -163,86 +163,6 @@ struct Neighbour }; -/** - * Linked list of functions to call whenever our HELLO is updated. - */ -struct GNUNET_TRANSPORT_GetHelloHandle -{ - - /** - * This is a doubly linked list. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *next; - - /** - * This is a doubly linked list. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *prev; - - /** - * Transport handle. - */ - struct GNUNET_TRANSPORT_Handle *handle; - - /** - * Callback to call once we got our HELLO. - */ - GNUNET_TRANSPORT_HelloUpdateCallback rec; - - /** - * Task for calling the HelloUpdateCallback when we already have a HELLO - */ - struct GNUNET_SCHEDULER_Task *notify_task; - - /** - * Closure for @e rec. - */ - void *rec_cls; - -}; - - -/** - * Entry in linked list for all offer-HELLO requests. - */ -struct GNUNET_TRANSPORT_OfferHelloHandle -{ - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *prev; - - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *next; - - /** - * Transport service handle we use for transmission. - */ - struct GNUNET_TRANSPORT_Handle *th; - - /** - * Transmission handle for this request. - */ - struct GNUNET_TRANSPORT_TransmitHandle *tth; - - /** - * Function to call once we are done. - */ - GNUNET_SCHEDULER_TaskCallback cont; - - /** - * Closure for @e cont - */ - void *cls; - - /** - * The HELLO message to be transmitted. - */ - struct GNUNET_MessageHeader *msg; -}; - /** * Handle for the transport service (includes all of the @@ -276,16 +196,6 @@ struct GNUNET_TRANSPORT_Handle */ GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb; - /** - * Head of DLL of control messages. - */ - struct GNUNET_TRANSPORT_TransmitHandle *control_head; - - /** - * Tail of DLL of control messages. - */ - struct GNUNET_TRANSPORT_TransmitHandle *control_tail; - /** * The current HELLO message for this peer. Updated * whenever transports change their addresses. @@ -295,32 +205,7 @@ struct GNUNET_TRANSPORT_Handle /** * My client connection to the transport service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Handle to our registration with the client for notification. - */ - struct GNUNET_CLIENT_TransmitHandle *cth; - - /** - * Linked list of pending requests for our HELLO. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head; - - /** - * Linked list of pending requests for our HELLO. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail; - - /** - * Linked list of pending offer HELLO requests head - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head; - - /** - * Linked list of pending offer HELLO requests tail - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail; + struct GNUNET_MQ_Handle *mq; /** * My configuration. @@ -458,7 +343,8 @@ outbound_bw_tracker_update (void *cls) GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_NO)); GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap, - n->hn, delay.rel_value_us); + n->hn, + delay.rel_value_us); schedule_transmission (n->h); } @@ -558,268 +444,296 @@ neighbour_delete (void *cls, /** - * Function we use for handling incoming messages. + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Error receiving from transport service, disconnecting temporarily.\n"); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); +} + + +/** + * Function we use for checking incoming HELLO messages. * * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` - * @param msg message received, NULL on timeout or fatal error + * @param msg message received + * @return #GNUNET_OK if message is well-formed + */ +static int +check_hello (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PeerIdentity me; + + if (GNUNET_OK != + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, + &me)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n", + (unsigned int) ntohs (msg->size), + GNUNET_i2s (&me)); + return GNUNET_OK; +} + + +/** + * Function we use for handling incoming HELLO messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param msg message received */ static void -demultiplexer (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_hello (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + + GNUNET_free_non_null (h->my_hello); + h->my_hello = GNUNET_copy_message (msg); +} + + +/** + * Function we use for handling incoming connect messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param cim message received + */ +static void +handle_connect (void *cls, + const struct ConnectInfoMessage *cim) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + struct Neighbour *n; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving CONNECT message for `%s'.\n", + GNUNET_i2s (&cim->id)); + n = neighbour_find (h, &cim->id); + if (NULL != n) + { + GNUNET_break (0); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); + return; + } + n = neighbour_add (h, + &cim->id); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving CONNECT message for `%s' with quota %u\n", + GNUNET_i2s (&cim->id), + ntohl (cim->quota_out.value__)); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, + cim->quota_out); + if (NULL != h->nc_cb) + h->nc_cb (h->cls, + &n->id); +} + + +/** + * Function we use for handling incoming disconnect messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param dim message received + */ +static void +handle_disconnect (void *cls, + const struct DisconnectInfoMessage *dim) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + struct Neighbour *n; + + GNUNET_break (ntohl (dim->reserved) == 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving DISCONNECT message for `%s'.\n", + GNUNET_i2s (&dim->peer)); + n = neighbour_find (h, &dim->peer); + if (NULL == n) + { + GNUNET_break (0); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); + return; + } + neighbour_delete (h, + &dim->peer, + n); +} + + +/** + * Function we use for handling incoming send-ok messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param okm message received + */ +static void +handle_send_ok (void *cls, + const struct SendOkMessage *okm) { struct GNUNET_TRANSPORT_Handle *h = cls; - const struct DisconnectInfoMessage *dim; - const struct ConnectInfoMessage *cim; - const struct InboundMessage *im; - const struct GNUNET_MessageHeader *imm; - const struct SendOkMessage *okm; - const struct QuotaSetMessage *qm; - struct GNUNET_TRANSPORT_GetHelloHandle *hwl; - struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl; struct Neighbour *n; - struct GNUNET_PeerIdentity me; - uint16_t size; uint32_t bytes_msg; uint32_t bytes_physical; - GNUNET_assert (NULL != h->client); - if (GNUNET_YES == h->reconnecting) + bytes_msg = ntohl (okm->bytes_msg); + bytes_physical = ntohl (okm->bytes_physical); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving SEND_OK message, transmission to %s %s.\n", + GNUNET_i2s (&okm->peer), + ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); + + n = neighbour_find (h, + &okm->peer); + if (NULL == n) { + /* We should never get a 'SEND_OK' for a peer that we are not + connected to */ + GNUNET_break (0); + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); return; } - if (NULL == msg) + if (bytes_physical > bytes_msg) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Error receiving from transport service, disconnecting temporarily.\n"); + "Overhead for %u byte message was %u\n", + bytes_msg, + bytes_physical - bytes_msg); + n->traffic_overhead += bytes_physical - bytes_msg; + } + GNUNET_break (GNUNET_NO == n->is_ready); + n->is_ready = GNUNET_YES; + if (NULL != n->unready_warn_task) + { + GNUNET_SCHEDULER_cancel (n->unready_warn_task); + n->unready_warn_task = NULL; + } + if ((NULL != n->th) && (NULL == n->hn)) + { + GNUNET_assert (NULL != n->th->timeout_task); + GNUNET_SCHEDULER_cancel (n->th->timeout_task); + n->th->timeout_task = NULL; + /* we've been waiting for this (congestion, not quota, + * caused delayed transmission) */ + n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, + n, + 0); + } + schedule_transmission (h); +} + + +/** + * Function we use for checking incoming "inbound" messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param im message received + */ +static int +check_recv (void *cls, + const struct InboundMessage *im) +{ + const struct GNUNET_MessageHeader *imm; + uint16_t size; + + size = ntohs (im->header.size); + if (size < + sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + imm = (const struct GNUNET_MessageHeader *) &im[1]; + if (ntohs (imm->size) + sizeof (struct InboundMessage) != size) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Function we use for handling incoming messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param im message received + */ +static void +handle_recv (void *cls, + const struct InboundMessage *im) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + const struct GNUNET_MessageHeader *imm + = (const struct GNUNET_MessageHeader *) &im[1]; + struct Neighbour *n; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u with %u bytes from `%s'.\n", + (unsigned int) ntohs (imm->type), + (unsigned int) ntohs (imm->size), + GNUNET_i2s (&im->peer)); + n = neighbour_find (h, &im->peer); + if (NULL == n) + { + GNUNET_break (0); h->reconnecting = GNUNET_YES; disconnect_and_schedule_reconnect (h); return; } - GNUNET_CLIENT_receive (h->client, - &demultiplexer, - h, - GNUNET_TIME_UNIT_FOREVER_REL); - size = ntohs (msg->size); - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: - if (GNUNET_OK != - GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, - &me)) - { - GNUNET_break (0); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n", - (unsigned int) size, - GNUNET_i2s (&me)); - GNUNET_free_non_null (h->my_hello); - h->my_hello = NULL; - if (size < sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break (0); - break; - } - h->my_hello = GNUNET_copy_message (msg); - hwl = h->hwl_head; - while (NULL != hwl) - { - next_hwl = hwl->next; - hwl->rec (hwl->rec_cls, - h->my_hello); - hwl = next_hwl; - } - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT: - if (size < sizeof (struct ConnectInfoMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - cim = (const struct ConnectInfoMessage *) msg; - if (size != - sizeof (struct ConnectInfoMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving CONNECT message for `%s'.\n", - GNUNET_i2s (&cim->id)); - n = neighbour_find (h, &cim->id); - if (NULL != n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - n = neighbour_add (h, - &cim->id); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving CONNECT message for `%s' with quota %u\n", - GNUNET_i2s (&cim->id), - ntohl (cim->quota_out.value__)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - cim->quota_out); - if (NULL != h->nc_cb) - h->nc_cb (h->cls, - &n->id); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT: - if (size != sizeof (struct DisconnectInfoMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - dim = (const struct DisconnectInfoMessage *) msg; - GNUNET_break (ntohl (dim->reserved) == 0); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving DISCONNECT message for `%s'.\n", - GNUNET_i2s (&dim->peer)); - n = neighbour_find (h, &dim->peer); - if (NULL == n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - neighbour_delete (h, - &dim->peer, - n); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK: - if (size != sizeof (struct SendOkMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - okm = (const struct SendOkMessage *) msg; - bytes_msg = ntohl (okm->bytes_msg); - bytes_physical = ntohl (okm->bytes_physical); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SEND_OK message, transmission to %s %s.\n", - GNUNET_i2s (&okm->peer), - ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); + if (NULL != h->rec) + h->rec (h->cls, + &im->peer, + imm); +} - n = neighbour_find (h, - &okm->peer); - if (NULL == n) - { - /* We should never get a 'SEND_OK' for a peer that we are not - connected to */ - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - if (bytes_physical > bytes_msg) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Overhead for %u byte message was %u\n", - bytes_msg, - bytes_physical - bytes_msg); - n->traffic_overhead += bytes_physical - bytes_msg; - } - GNUNET_break (GNUNET_NO == n->is_ready); - n->is_ready = GNUNET_YES; - if (NULL != n->unready_warn_task) - { - GNUNET_SCHEDULER_cancel (n->unready_warn_task); - n->unready_warn_task = NULL; - } - if ((NULL != n->th) && (NULL == n->hn)) - { - GNUNET_assert (NULL != n->th->timeout_task); - GNUNET_SCHEDULER_cancel (n->th->timeout_task); - n->th->timeout_task = NULL; - /* we've been waiting for this (congestion, not quota, - * caused delayed transmission) */ - n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, - n, - 0); - } - schedule_transmission (h); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV: - if (size < - sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - im = (const struct InboundMessage *) msg; - imm = (const struct GNUNET_MessageHeader *) &im[1]; - if (ntohs (imm->size) + sizeof (struct InboundMessage) != size) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u with %u bytes from `%s'.\n", - (unsigned int) ntohs (imm->type), - (unsigned int) ntohs (imm->size), - GNUNET_i2s (&im->peer)); - n = neighbour_find (h, &im->peer); - if (NULL == n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - if (NULL != h->rec) - h->rec (h->cls, - &im->peer, - imm); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA: - if (size != sizeof (struct QuotaSetMessage)) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - qm = (const struct QuotaSetMessage *) msg; - n = neighbour_find (h, &qm->peer); - if (NULL == n) - { - GNUNET_break (0); - h->reconnecting = GNUNET_YES; - disconnect_and_schedule_reconnect (h); - break; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SET_QUOTA message for `%s' with quota %u\n", - GNUNET_i2s (&qm->peer), - ntohl (qm->quota.value__)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - qm->quota); - break; - default: - LOG (GNUNET_ERROR_TYPE_ERROR, - _("Received unexpected message of type %u in %s:%u\n"), - ntohs (msg->type), - __FILE__, - __LINE__); + +/** + * Function we use for handling incoming set quota messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param msg message received + */ +static void +handle_set_quota (void *cls, + const struct QuotaSetMessage *qm) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + struct Neighbour *n; + + n = neighbour_find (h, &qm->peer); + if (NULL == n) + { GNUNET_break (0); - break; + h->reconnecting = GNUNET_YES; + disconnect_and_schedule_reconnect (h); + return; } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving SET_QUOTA message for `%s' with quota %u\n", + GNUNET_i2s (&qm->peer), + ntohl (qm->quota.value__)); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, + qm->quota); } @@ -854,104 +768,53 @@ timeout_request_due_to_congestion (void *cls) /** - * Transmit message(s) to service. + * Transmit ready message(s) to service. * - * @param cls handle to transport - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf + * @param h handle to transport */ -static size_t -transport_notify_ready (void *cls, - size_t size, - void *buf) +static void +transmit_ready (struct GNUNET_TRANSPORT_Handle *h) { - struct GNUNET_TRANSPORT_Handle *h = cls; struct GNUNET_TRANSPORT_TransmitHandle *th; struct GNUNET_TIME_Relative delay; struct Neighbour *n; - char *cbuf; - struct OutboundMessage obm; - size_t ret; - size_t nret; + struct OutboundMessage *obm; + struct GNUNET_MQ_Envelope *env; size_t mret; - GNUNET_assert (NULL != h->client); - h->cth = NULL; - if (NULL == buf) - { - /* transmission failed */ - disconnect_and_schedule_reconnect (h); - return 0; - } - - cbuf = buf; - ret = 0; - /* first send control messages */ - while ( (NULL != (th = h->control_head)) && - (th->notify_size <= size) ) - { - GNUNET_CONTAINER_DLL_remove (h->control_head, - h->control_tail, - th); - nret = th->notify (th->notify_cls, - size, - &cbuf[ret]); - delay = GNUNET_TIME_absolute_get_duration (th->request_start); - if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) - LOG (GNUNET_ERROR_TYPE_WARNING, - "Added %u bytes of control message at %u after %s delay\n", - nret, - ret, - GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES)); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Added %u bytes of control message at %u after %s delay\n", - nret, - ret, - GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES)); - GNUNET_free (th); - ret += nret; - size -= nret; - } - - /* then, if possible and no control messages pending, send data messages */ - while ( (NULL == h->control_head) && - (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) ) + GNUNET_assert (NULL != h->mq); + while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) { + th = n->th; if (GNUNET_YES != n->is_ready) { /* peer not ready, wait for notification! */ GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); n->hn = NULL; GNUNET_assert (NULL == n->th->timeout_task); - n->th->timeout_task + th->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (n->th->timeout), + (th->timeout), &timeout_request_due_to_congestion, - n->th); + th); continue; } - th = n->th; - if (th->notify_size + sizeof (struct OutboundMessage) > size) - break; /* does not fit */ - if (GNUNET_BANDWIDTH_tracker_get_delay - (&n->out_tracker, - th->notify_size).rel_value_us > 0) + if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, + th->notify_size).rel_value_us > 0) break; /* too early */ GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); n->hn = NULL; n->th = NULL; - GNUNET_assert (size >= sizeof (struct OutboundMessage)); + env = GNUNET_MQ_msg_extra (obm, + th->notify_size, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); mret = th->notify (th->notify_cls, - size - sizeof (struct OutboundMessage), - &cbuf[ret + sizeof (struct OutboundMessage)]); - GNUNET_assert (mret <= size - sizeof (struct OutboundMessage)); + th->notify_size, + &obm[1]); if (0 == mret) { GNUNET_free (th); + GNUNET_MQ_discard (env); continue; } if (NULL != n->unready_warn_task) @@ -961,20 +824,13 @@ transport_notify_ready (void *cls, n); n->last_payload = GNUNET_TIME_absolute_get (); n->is_ready = GNUNET_NO; - GNUNET_assert (mret + sizeof (struct OutboundMessage) < - GNUNET_SERVER_MAX_MESSAGE_SIZE); - obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); - obm.header.size = htons (mret + sizeof (struct OutboundMessage)); - obm.reserved = htonl (0); - obm.timeout = + obm->reserved = htonl (0); + obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout)); - obm.peer = n->id; - memcpy (&cbuf[ret], - &obm, - sizeof (struct OutboundMessage)); - ret += (mret + sizeof (struct OutboundMessage)); - size -= (mret + sizeof (struct OutboundMessage)); + obm->peer = n->id; + GNUNET_MQ_send (h->mq, + env); GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret); delay = GNUNET_TIME_absolute_get_duration (th->request_start); @@ -995,14 +851,9 @@ transport_notify_ready (void *cls, GNUNET_YES), (unsigned int) n->out_tracker.available_bytes_per_s__); GNUNET_free (th); - break; } /* if there are more pending messages, try to schedule those */ schedule_transmission (h); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes to transport service\n", - ret); - return ret; } @@ -1016,12 +867,11 @@ static void schedule_transmission_task (void *cls) { struct GNUNET_TRANSPORT_Handle *h = cls; - size_t size; struct GNUNET_TRANSPORT_TransmitHandle *th; struct Neighbour *n; h->quota_task = NULL; - GNUNET_assert (NULL != h->client); + GNUNET_assert (NULL != h->mq); /* destroy all requests that have timed out */ while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) && (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) ) @@ -1040,29 +890,12 @@ schedule_transmission_task (void *cls) NULL)); GNUNET_free (th); } - if (NULL != h->cth) - return; - if (NULL != h->control_head) - { - size = h->control_head->notify_size; - } - else - { - n = GNUNET_CONTAINER_heap_peek (h->ready_heap); - if (NULL == n) - return; /* no pending messages */ - size = n->th->notify_size + sizeof (struct OutboundMessage); - } + n = GNUNET_CONTAINER_heap_peek (h->ready_heap); + if (NULL == n) + return; /* no pending messages */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling notify_transmit_ready\n"); - h->cth - = GNUNET_CLIENT_notify_transmit_ready (h->client, - size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &transport_notify_ready, - h); - GNUNET_assert (NULL != h->cth); + transmit_ready (h); } @@ -1078,15 +911,13 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) struct GNUNET_TIME_Relative delay; struct Neighbour *n; - GNUNET_assert (NULL != h->client); + GNUNET_assert (NULL != h->mq); if (NULL != h->quota_task) { GNUNET_SCHEDULER_cancel (h->quota_task); h->quota_task = NULL; } - if (NULL != h->control_head) - delay = GNUNET_TIME_UNIT_ZERO; - else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) + if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) { delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, @@ -1110,83 +941,6 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) } -/** - * Queue control request for transmission to the transport - * service. - * - * @param h handle to the transport service - * @param size number of bytes to be transmitted - * @param notify function to call to get the content - * @param notify_cls closure for @a notify - * @return a `struct GNUNET_TRANSPORT_TransmitHandle` - */ -static struct GNUNET_TRANSPORT_TransmitHandle * -schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h, - size_t size, - GNUNET_TRANSPORT_TransmitReadyNotify notify, - void *notify_cls) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Control transmit of %u bytes requested\n", - size); - th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle); - th->notify = notify; - th->notify_cls = notify_cls; - th->notify_size = size; - th->request_start = GNUNET_TIME_absolute_get (); - GNUNET_CONTAINER_DLL_insert_tail (h->control_head, - h->control_tail, - th); - schedule_transmission (h); - return th; -} - - -/** - * Transmit START message to service. - * - * @param cls unused - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf - */ -static size_t -send_start (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_TRANSPORT_Handle *h = cls; - struct StartMessage s; - uint32_t options; - - if (NULL == buf) - { - /* Can only be shutdown, just give up */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Shutdown while trying to transmit START request.\n"); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting START request.\n"); - GNUNET_assert (size >= sizeof (struct StartMessage)); - s.header.size = htons (sizeof (struct StartMessage)); - s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START); - options = 0; - if (h->check_self) - options |= 1; - if (NULL != h->rec) - options |= 2; - s.options = htonl (options); - s.self = h->self; - memcpy (buf, &s, sizeof (struct StartMessage)); - GNUNET_CLIENT_receive (h->client, &demultiplexer, h, - GNUNET_TIME_UNIT_FOREVER_REL); - return sizeof (struct StartMessage); -} - - /** * Try again to connect to transport service. * @@ -1195,20 +949,61 @@ send_start (void *cls, static void reconnect (void *cls) { + GNUNET_MQ_hd_var_size (hello, + GNUNET_MESSAGE_TYPE_HELLO, + struct GNUNET_MessageHeader); + GNUNET_MQ_hd_fixed_size (connect, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, + struct ConnectInfoMessage); + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, + struct DisconnectInfoMessage); + GNUNET_MQ_hd_fixed_size (send_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, + struct SendOkMessage); + GNUNET_MQ_hd_var_size (recv, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, + struct InboundMessage); + GNUNET_MQ_hd_fixed_size (set_quota, + GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, + struct QuotaSetMessage); struct GNUNET_TRANSPORT_Handle *h = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_hello_handler (h), + make_connect_handler (h), + make_disconnect_handler (h), + make_send_ok_handler (h), + make_recv_handler (h), + make_set_quota_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct StartMessage *s; + uint32_t options; h->reconnect_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); - GNUNET_assert (NULL == h->client); - GNUNET_assert (NULL == h->control_head); - GNUNET_assert (NULL == h->control_tail); + GNUNET_assert (NULL == h->mq); h->reconnecting = GNUNET_NO; - h->client = GNUNET_CLIENT_connect ("transport", h->cfg); - - GNUNET_assert (NULL != h->client); - schedule_control_transmit (h, sizeof (struct StartMessage), - &send_start, h); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "transport", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) + return; + env = GNUNET_MQ_msg (s, + GNUNET_MESSAGE_TYPE_TRANSPORT_START); + options = 0; + if (h->check_self) + options |= 1; + if (NULL != h->rec) + options |= 2; + s->options = htonl (options); + s->self = h->self; + GNUNET_MQ_send (h->mq, + env); } @@ -1221,20 +1016,11 @@ reconnect (void *cls) static void disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) { - struct GNUNET_TRANSPORT_TransmitHandle *th; - GNUNET_assert (NULL == h->reconnect_task); - if (NULL != h->cth) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; - } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; -/* LOG (GNUNET_ERROR_TYPE_ERROR, - "Client disconnect done \n");*/ + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } /* Forget about all neighbours that we used to be connected to */ GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, @@ -1245,16 +1031,6 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) GNUNET_SCHEDULER_cancel (h->quota_task); h->quota_task = NULL; } - while ((NULL != (th = h->control_head))) - { - GNUNET_CONTAINER_DLL_remove (h->control_head, - h->control_tail, - th); - th->notify (th->notify_cls, - 0, - NULL); - GNUNET_free (th); - } LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling task to reconnect to transport service in %s.\n", GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, @@ -1268,109 +1044,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) /** - * Cancel control request for transmission to the transport service. - * - * @param th handle to the transport service - * @param tth transmit handle to cancel - */ -static void -cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th, - struct GNUNET_TRANSPORT_TransmitHandle *tth) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Canceling transmit of contral transmission requested\n"); - GNUNET_CONTAINER_DLL_remove (th->control_head, - th->control_tail, - tth); - GNUNET_free (tth); -} - - -/** - * Send HELLO message to the service. - * - * @param cls the HELLO message to send - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf - */ -static size_t -send_hello (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls; - struct GNUNET_MessageHeader *msg = ohh->msg; - uint16_t ssize; - - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout while trying to transmit `%s' request.\n", - "HELLO"); - if (NULL != ohh->cont) - ohh->cont (ohh->cls); - GNUNET_free (msg); - GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head, - ohh->th->oh_tail, - ohh); - GNUNET_free (ohh); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' request.\n", - "HELLO"); - ssize = ntohs (msg->size); - GNUNET_assert (size >= ssize); - memcpy (buf, - msg, - ssize); - GNUNET_free (msg); - if (NULL != ohh->cont) - ohh->cont (ohh->cls); - GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head, - ohh->th->oh_tail, - ohh); - GNUNET_free (ohh); - return ssize; -} - - -/** - * Send traffic metric message to the service. - * - * @param cls the message to send - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf - */ -static size_t -send_metric (void *cls, - size_t size, - void *buf) -{ - struct TrafficMetricMessage *msg = cls; - uint16_t ssize; - - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout while trying to transmit TRAFFIC_METRIC request.\n"); - GNUNET_free (msg); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting TRAFFIC_METRIC request.\n"); - ssize = ntohs (msg->header.size); - GNUNET_assert (size >= ssize); - memcpy (buf, msg, ssize); - GNUNET_free (msg); - return ssize; -} - - -/** - * Set transport metrics for a peer and a direction + * Set transport metrics for a peer and a direction. * * @param handle transport handle * @param peer the peer to set the metric for @@ -1388,101 +1062,21 @@ GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle, struct GNUNET_TIME_Relative delay_in, struct GNUNET_TIME_Relative delay_out) { + struct GNUNET_MQ_Envelope *env; struct TrafficMetricMessage *msg; - msg = GNUNET_new (struct TrafficMetricMessage); - msg->header.size = htons (sizeof (struct TrafficMetricMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC); + if (NULL == handle->mq) + return; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC); msg->reserved = htonl (0); msg->peer = *peer; GNUNET_ATS_properties_hton (&msg->properties, prop); msg->delay_in = GNUNET_TIME_relative_hton (delay_in); msg->delay_out = GNUNET_TIME_relative_hton (delay_out); - schedule_control_transmit (handle, - sizeof (struct TrafficMetricMessage), - &send_metric, - msg); -} - - -/** - * Offer the transport service the HELLO of another peer. Note that - * the transport service may just ignore this message if the HELLO is - * malformed or useless due to our local configuration. - * - * @param handle connection to transport service - * @param hello the hello message - * @param cont continuation to call when HELLO has been sent, - * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail - * tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success - * @param cont_cls closure for @a cont - * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure, - * in case of failure @a cont will not be called - * - */ -struct GNUNET_TRANSPORT_OfferHelloHandle * -GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, - const struct GNUNET_MessageHeader *hello, - GNUNET_SCHEDULER_TaskCallback cont, - void *cont_cls) -{ - struct GNUNET_TRANSPORT_OfferHelloHandle *ohh; - struct GNUNET_MessageHeader *msg; - struct GNUNET_PeerIdentity peer; - uint16_t size; - - if (NULL == handle->client) - return NULL; - GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); - size = ntohs (hello->size); - GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); - if (GNUNET_OK != - GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, - &peer)) - { - GNUNET_break (0); - return NULL; - } - - msg = GNUNET_malloc (size); - memcpy (msg, hello, size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Offering HELLO message of `%s' to transport for validation.\n", - GNUNET_i2s (&peer)); - - ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle); - ohh->th = handle; - ohh->cont = cont; - ohh->cls = cont_cls; - ohh->msg = msg; - ohh->tth = schedule_control_transmit (handle, - size, - &send_hello, - ohh); - GNUNET_CONTAINER_DLL_insert (handle->oh_head, - handle->oh_tail, - ohh); - return ohh; -} - - -/** - * Cancel the request to transport to offer the HELLO message - * - * @param ohh the handle for the operation to cancel - */ -void -GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh) -{ - struct GNUNET_TRANSPORT_Handle *th = ohh->th; - - cancel_control_transmit (ohh->th, ohh->tth); - GNUNET_CONTAINER_DLL_remove (th->oh_head, - th->oh_tail, - ohh); - GNUNET_free (ohh->msg); - GNUNET_free (ohh); + GNUNET_MQ_send (handle->mq, + env); } @@ -1505,76 +1099,6 @@ GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle, } -/** - * Task to call the HelloUpdateCallback of the GetHelloHandle - * - * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle` - */ -static void -call_hello_update_cb_async (void *cls) -{ - struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; - - GNUNET_assert (NULL != ghh->handle->my_hello); - GNUNET_assert (NULL != ghh->notify_task); - ghh->notify_task = NULL; - ghh->rec (ghh->rec_cls, - ghh->handle->my_hello); -} - - -/** - * Obtain the HELLO message for this peer. The callback given in this function - * is never called synchronously. - * - * @param handle connection to transport service - * @param rec function to call with the HELLO, sender will be our peer - * identity; message and sender will be NULL on timeout - * (handshake with transport service pending/failed). - * cost estimate will be 0. - * @param rec_cls closure for @a rec - * @return handle to cancel the operation - */ -struct GNUNET_TRANSPORT_GetHelloHandle * -GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, - GNUNET_TRANSPORT_HelloUpdateCallback rec, - void *rec_cls) -{ - struct GNUNET_TRANSPORT_GetHelloHandle *hwl; - - hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle); - hwl->rec = rec; - hwl->rec_cls = rec_cls; - hwl->handle = handle; - GNUNET_CONTAINER_DLL_insert (handle->hwl_head, - handle->hwl_tail, - hwl); - if (NULL != handle->my_hello) - hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async, - hwl); - return hwl; -} - - -/** - * Stop receiving updates about changes to our HELLO message. - * - * @param ghh handle to cancel - */ -void -GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh) -{ - struct GNUNET_TRANSPORT_Handle *handle = ghh->handle; - - if (NULL != ghh->notify_task) - GNUNET_SCHEDULER_cancel (ghh->notify_task); - GNUNET_CONTAINER_DLL_remove (handle->hwl_head, - handle->hwl_tail, - ghh); - GNUNET_free (ghh); -} - - /** * Connect to the transport service. Note that the connection may * complete (or fail) asynchronously. @@ -1629,40 +1153,35 @@ GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_TRANSPORT_NotifyDisconnect nd, GNUNET_TRANSPORT_NotifyExcessBandwidth neb) { - struct GNUNET_TRANSPORT_Handle *ret; + struct GNUNET_TRANSPORT_Handle *h; - ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle); + h = GNUNET_new (struct GNUNET_TRANSPORT_Handle); if (NULL != self) { - ret->self = *self; - ret->check_self = GNUNET_YES; + h->self = *self; + h->check_self = GNUNET_YES; } - ret->cfg = cfg; - ret->cls = cls; - ret->rec = rec; - ret->nc_cb = nc; - ret->nd_cb = nd; - ret->neb_cb = neb; - ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; + h->cfg = cfg; + h->cls = cls; + h->rec = rec; + h->nc_cb = nc; + h->nd_cb = nd; + h->neb_cb = neb; + h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); - ret->client = GNUNET_CLIENT_connect ("transport", - cfg); - if (NULL == ret->client) + reconnect (h); + if (NULL == h->mq) { - GNUNET_free (ret); + GNUNET_free (h); return NULL; } - ret->neighbours = + h->neighbours = GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES); - ret->ready_heap = + h->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - schedule_control_transmit (ret, - sizeof (struct StartMessage), - &send_start, - ret); - return ret; + return h; } @@ -1694,8 +1213,6 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) } GNUNET_free_non_null (handle->my_hello); handle->my_hello = NULL; - GNUNET_assert (NULL == handle->hwl_head); - GNUNET_assert (NULL == handle->hwl_tail); GNUNET_CONTAINER_heap_destroy (handle->ready_heap); handle->ready_heap = NULL; GNUNET_free (handle); diff --git a/src/transport/transport_api_get_hello.c b/src/transport/transport_api_get_hello.c index 8087159c6..9a65616a9 100644 --- a/src/transport/transport_api_get_hello.c +++ b/src/transport/transport_api_get_hello.c @@ -34,60 +34,179 @@ /** - * Linked list of functions to call whenever our HELLO is updated. + * Functions to call with this peer's HELLO. */ struct GNUNET_TRANSPORT_GetHelloHandle { /** - * This is a doubly linked list. + * Our configuration. */ - struct GNUNET_TRANSPORT_GetHelloHandle *next; - - /** - * This is a doubly linked list. - */ - struct GNUNET_TRANSPORT_GetHelloHandle *prev; + const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Transport handle. */ - struct GNUNET_TRANSPORT_Handle *handle; + struct GNUNET_MQ_Handle *mq; /** * Callback to call once we got our HELLO. */ GNUNET_TRANSPORT_HelloUpdateCallback rec; + /** + * Closure for @e rec. + */ + void *rec_cls; + /** * Task for calling the HelloUpdateCallback when we already have a HELLO */ struct GNUNET_SCHEDULER_Task *notify_task; /** - * Closure for @e rec. + * ID of the task trying to reconnect to the service. */ - void *rec_cls; + struct GNUNET_SCHEDULER_Task *reconnect_task; + + /** + * Delay until we try to reconnect. + */ + struct GNUNET_TIME_Relative reconnect_delay; }; +/** + * Function we use for checking incoming HELLO messages. + * + * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` + * @param msg message received + * @return #GNUNET_OK if message is well-formed + */ +static int +check_hello (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PeerIdentity me; + + if (GNUNET_OK != + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, + &me)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n", + (unsigned int) ntohs (msg->size), + GNUNET_i2s (&me)); + return GNUNET_OK; +} + /** - * Task to call the HelloUpdateCallback of the GetHelloHandle + * Function we use for handling incoming HELLO messages. * - * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle` + * @param cls closure, a `struct GNUNET_TRANSPORT_GetHelloHandle *` + * @param msg message received */ static void -call_hello_update_cb_async (void *cls) +handle_hello (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; - GNUNET_assert (NULL != ghh->handle->my_hello); - GNUNET_assert (NULL != ghh->notify_task); - ghh->notify_task = NULL; ghh->rec (ghh->rec_cls, - ghh->handle->my_hello); + msg); +} + + +/** + * Function that will schedule the job that will try + * to connect us again to the client. + * + * @param ghh transport service to reconnect + */ +static void +schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh); + + +/** + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Error receiving from transport service, disconnecting temporarily.\n"); + GNUNET_MQ_destroy (ghh->mq); + ghh->mq = NULL; + schedule_reconnect (ghh); +} + + +/** + * Try again to connect to transport service. + * + * @param cls the handle to the transport service + */ +static void +reconnect (void *cls) +{ + GNUNET_MQ_hd_var_size (hello, + GNUNET_MESSAGE_TYPE_HELLO, + struct GNUNET_MessageHeader); + struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_hello_handler (ghh), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct StartMessage *s; + + ghh->reconnect_task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to transport service.\n"); + GNUNET_assert (NULL == ghh->mq); + ghh->mq = GNUNET_CLIENT_connecT (ghh->cfg, + "transport", + handlers, + &mq_error_handler, + ghh); + if (NULL == ghh->mq) + return; + env = GNUNET_MQ_msg (s, + GNUNET_MESSAGE_TYPE_TRANSPORT_START); + s->options = htonl (0); + GNUNET_MQ_send (ghh->mq, + env); +} + + +/** + * Function that will schedule the job that will try + * to connect us again to the client. + * + * @param ghh transport service to reconnect + */ +static void +schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh) +{ + ghh->reconnect_task = + GNUNET_SCHEDULER_add_delayed (ghh->reconnect_delay, + &reconnect, + ghh); + ghh->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ghh->reconnect_delay); } @@ -95,7 +214,7 @@ call_hello_update_cb_async (void *cls) * Obtain the HELLO message for this peer. The callback given in this function * is never called synchronously. * - * @param handle connection to transport service + * @param cfg configuration * @param rec function to call with the HELLO, sender will be our peer * identity; message and sender will be NULL on timeout * (handshake with transport service pending/failed). @@ -104,23 +223,23 @@ call_hello_update_cb_async (void *cls) * @return handle to cancel the operation */ struct GNUNET_TRANSPORT_GetHelloHandle * -GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, +GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_TRANSPORT_HelloUpdateCallback rec, void *rec_cls) { - struct GNUNET_TRANSPORT_GetHelloHandle *hwl; - - hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle); - hwl->rec = rec; - hwl->rec_cls = rec_cls; - hwl->handle = handle; - GNUNET_CONTAINER_DLL_insert (handle->hwl_head, - handle->hwl_tail, - hwl); - if (NULL != handle->my_hello) - hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async, - hwl); - return hwl; + struct GNUNET_TRANSPORT_GetHelloHandle *ghh; + + ghh = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle); + ghh->rec = rec; + ghh->rec_cls = rec_cls; + ghh->cfg = cfg; + reconnect (ghh); + if (NULL == ghh->mq) + { + GNUNET_free (ghh); + return NULL; + } + return ghh; } @@ -132,15 +251,13 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle, void GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh) { - struct GNUNET_TRANSPORT_Handle *handle = ghh->handle; - - if (NULL != ghh->notify_task) - GNUNET_SCHEDULER_cancel (ghh->notify_task); - GNUNET_CONTAINER_DLL_remove (handle->hwl_head, - handle->hwl_tail, - ghh); + if (NULL != ghh->mq) + { + GNUNET_MQ_destroy (ghh->mq); + ghh->mq = NULL; + } GNUNET_free (ghh); } -/* end of transport_api_hello.c */ +/* end of transport_api_get_hello.c */ diff --git a/src/transport/transport_api_offer_hello.c b/src/transport/transport_api_offer_hello.c index 0abce2d62..951ab9ba4 100644 --- a/src/transport/transport_api_offer_hello.c +++ b/src/transport/transport_api_offer_hello.c @@ -23,31 +23,23 @@ * @brief library to offer HELLOs to transport service * @author Christian Grothoff */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_hello_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_transport_service.h" + /** * Entry in linked list for all offer-HELLO requests. */ struct GNUNET_TRANSPORT_OfferHelloHandle { - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *prev; - - /** - * For the DLL. - */ - struct GNUNET_TRANSPORT_OfferHelloHandle *next; /** * Transport service handle we use for transmission. */ - struct GNUNET_TRANSPORT_Handle *th; - - /** - * Transmission handle for this request. - */ - struct GNUNET_TRANSPORT_TransmitHandle *tth; + struct GNUNET_MQ_Handle *mq; /** * Function to call once we are done. @@ -59,20 +51,31 @@ struct GNUNET_TRANSPORT_OfferHelloHandle */ void *cls; - /** - * The HELLO message to be transmitted. - */ - struct GNUNET_MessageHeader *msg; }; +/** + * Done sending HELLO message to the service, notify application. + * + * @param cls the handle for the operation + */ +static void +finished_hello (void *cls) +{ + struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls; + + if (NULL != ohh->cont) + ohh->cont (ohh->cls); + GNUNET_TRANSPORT_offer_hello_cancel (ohh); +} + /** * Offer the transport service the HELLO of another peer. Note that * the transport service may just ignore this message if the HELLO is * malformed or useless due to our local configuration. * - * @param handle connection to transport service + * @param cfg configuration * @param hello the hello message * @param cont continuation to call when HELLO has been sent, * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail @@ -83,46 +86,43 @@ struct GNUNET_TRANSPORT_OfferHelloHandle * */ struct GNUNET_TRANSPORT_OfferHelloHandle * -GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, +GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_MessageHeader *hello, GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls) { - struct GNUNET_TRANSPORT_OfferHelloHandle *ohh; - struct GNUNET_MessageHeader *msg; + struct GNUNET_TRANSPORT_OfferHelloHandle *ohh + = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle); + struct GNUNET_MQ_Envelope *env; struct GNUNET_PeerIdentity peer; - uint16_t size; - if (NULL == handle->mq) - return NULL; - GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); - size = ntohs (hello->size); - GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, &peer)) { GNUNET_break (0); + GNUNET_free (ohh); + return NULL; + } + ohh->mq = GNUNET_CLIENT_connecT (cfg, + "transport", + NULL, + NULL, + ohh); + if (NULL == ohh->mq) + { + GNUNET_free (ohh); return NULL; } - - msg = GNUNET_malloc (size); - memcpy (msg, hello, size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Offering HELLO message of `%s' to transport for validation.\n", - GNUNET_i2s (&peer)); - ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle); - ohh->th = handle; ohh->cont = cont; ohh->cls = cont_cls; - ohh->msg = msg; - ohh->tth = schedule_control_transmit (handle, - size, - &send_hello, - ohh); - GNUNET_CONTAINER_DLL_insert (handle->oh_head, - handle->oh_tail, - ohh); + GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); + env = GNUNET_MQ_msg_copy (hello); + GNUNET_MQ_notify_sent (env, + &finished_hello, + ohh); + GNUNET_MQ_send (ohh->mq, + env); return ohh; } @@ -135,13 +135,7 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, void GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh) { - struct GNUNET_TRANSPORT_Handle *th = ohh->th; - - cancel_control_transmit (ohh->th, ohh->tth); - GNUNET_CONTAINER_DLL_remove (th->oh_head, - th->oh_tail, - ohh); - GNUNET_free (ohh->msg); + GNUNET_MQ_destroy (ohh->mq); GNUNET_free (ohh); } -- cgit v1.2.3