summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cadet/gnunet-service-cadet_peer.c39
-rw-r--r--src/core/test_core_api.c6
-rw-r--r--src/core/test_core_api_reliability.c10
-rw-r--r--src/core/test_core_quota_compliance.c10
-rw-r--r--src/dht/gnunet-service-dht.c22
-rw-r--r--src/dht/gnunet-service-dht.h4
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c8
-rw-r--r--src/hostlist/gnunet-daemon-hostlist_client.c76
-rw-r--r--src/hostlist/test_gnunet_daemon_hostlist.c2
-rw-r--r--src/hostlist/test_gnunet_daemon_hostlist_reconnect.c2
-rw-r--r--src/include/gnunet_transport_core_service.h94
-rw-r--r--src/include/gnunet_transport_service.h15
-rw-r--r--src/peerinfo-tool/gnunet-peerinfo.c18
-rw-r--r--src/testbed/gnunet-service-testbed_connectionpool.c3
-rw-r--r--src/testbed/gnunet-service-testbed_connectionpool.h4
-rw-r--r--src/testbed/gnunet-service-testbed_oc.c35
-rw-r--r--src/topology/gnunet-daemon-topology.c55
-rw-r--r--src/transport/Makefile.am6
-rw-r--r--src/transport/transport-testing.c6
-rw-r--r--src/transport/transport_api.c1245
-rw-r--r--src/transport/transport_api_get_hello.c199
-rw-r--r--src/transport/transport_api_offer_hello.c98
22 files changed, 787 insertions, 1170 deletions
diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c
index fa16db4bb..101b9e22a 100644
--- a/src/cadet/gnunet-service-cadet_peer.c
+++ b/src/cadet/gnunet-service-cadet_peer.c
@@ -245,14 +245,14 @@ static unsigned long long drop_percent;
static struct GNUNET_CORE_Handle *core_handle;
/**
- * Handle to communicate with ATS.
+ * Our configuration;
*/
-static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Handle to try to start new connections.
+ * Handle to communicate with ATS.
*/
-static struct GNUNET_TRANSPORT_Handle *transport_handle;
+static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
/**
* Shutdown falg.
@@ -557,6 +557,7 @@ core_init (void *cls,
const struct GNUNET_CONFIGURATION_Handle *c = cls;
static int i = 0;
+ cfg = c;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id)))
{
@@ -1841,24 +1842,6 @@ GCP_init (const struct GNUNET_CONFIGURATION_Handle *c)
NULL, /* Don't notify about all outbound messages */
GNUNET_NO, /* For header-only out notification */
core_handlers); /* Register these handlers */
- if (GNUNET_YES !=
- GNUNET_CONFIGURATION_get_value_yesno (c, "CADET", "DISABLE_TRY_CONNECT"))
- {
- transport_handle = GNUNET_TRANSPORT_connect (c, &my_full_id, NULL, /* cls */
- /* Notify callbacks */
- NULL, NULL, NULL);
- }
- else
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "* DISABLE TRYING CONNECT in config *\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "* Use this only for test purposes. *\n");
- LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
- transport_handle = NULL;
- }
-
-
-
if (NULL == core_handle)
{
GNUNET_break (0);
@@ -1886,11 +1869,6 @@ GCP_shutdown (void)
GNUNET_CORE_disconnect (core_handle);
core_handle = NULL;
}
- if (NULL != transport_handle)
- {
- GNUNET_TRANSPORT_disconnect (transport_handle);
- transport_handle = NULL;
- }
if (NULL != ats_ch)
{
GNUNET_ATS_connectivity_done (ats_ch);
@@ -2591,7 +2569,10 @@ GCP_try_connect (struct CadetPeer *peer)
struct GNUNET_HELLO_Message *hello;
struct GNUNET_MessageHeader *mh;
- if (NULL == transport_handle)
+ if (GNUNET_YES !=
+ GNUNET_CONFIGURATION_get_value_yesno (cfg,
+ "CADET",
+ "DISABLE_TRY_CONNECT"))
return;
GCC_check_connections ();
if (GNUNET_YES == GCP_is_neighbor (peer))
@@ -2606,7 +2587,7 @@ GCP_try_connect (struct CadetPeer *peer)
GNUNET_TRANSPORT_offer_hello_cancel (peer->hello_offer);
peer->hello_offer = NULL;
}
- peer->hello_offer = GNUNET_TRANSPORT_offer_hello (transport_handle,
+ peer->hello_offer = GNUNET_TRANSPORT_offer_hello (cfg,
mh,
&hello_offer_done,
peer);
diff --git a/src/core/test_core_api.c b/src/core/test_core_api.c
index 92ee038da..43f4c421e 100644
--- a/src/core/test_core_api.c
+++ b/src/core/test_core_api.c
@@ -65,9 +65,9 @@ process_hello (void *cls,
"Received (my) `%s' from transport service\n", "HELLO");
GNUNET_assert (message != NULL);
if ((p == &p1) && (p2.th != NULL))
- GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL);
if ((p == &p2) && (p1.th != NULL))
- GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL);
}
@@ -280,7 +280,7 @@ setup_peer (struct PeerContext *p,
GNUNET_assert (NULL != p->th);
p->ats = GNUNET_ATS_connectivity_init (p->cfg);
GNUNET_assert (NULL != p->ats);
- p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+ p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
GNUNET_free (binary);
}
diff --git a/src/core/test_core_api_reliability.c b/src/core/test_core_api_reliability.c
index c7672afdb..94c223b74 100644
--- a/src/core/test_core_api_reliability.c
+++ b/src/core/test_core_api_reliability.c
@@ -412,14 +412,14 @@ process_hello (void *cls,
GNUNET_assert (message != NULL);
p->hello = GNUNET_copy_message (message);
if ((p == &p1) && (p2.th != NULL))
- GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL);
if ((p == &p2) && (p1.th != NULL))
- GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL);
if ((p == &p1) && (p2.hello != NULL))
- GNUNET_TRANSPORT_offer_hello (p1.th, p2.hello, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p1.cfg, p2.hello, NULL, NULL);
if ((p == &p2) && (p1.hello != NULL))
- GNUNET_TRANSPORT_offer_hello (p2.th, p1.hello, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p2.cfg, p1.hello, NULL, NULL);
}
@@ -442,7 +442,7 @@ setup_peer (struct PeerContext *p,
GNUNET_assert (p->th != NULL);
p->ats = GNUNET_ATS_connectivity_init (p->cfg);
GNUNET_assert (NULL != p->ats);
- p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+ p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
GNUNET_free (binary);
}
diff --git a/src/core/test_core_quota_compliance.c b/src/core/test_core_quota_compliance.c
index 05b1ae3d9..28d836e2e 100644
--- a/src/core/test_core_quota_compliance.c
+++ b/src/core/test_core_quota_compliance.c
@@ -547,14 +547,14 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message)
p->hello = GNUNET_malloc (ntohs (message->size));
memcpy (p->hello, message, ntohs (message->size));
if ((p == &p1) && (p2.th != NULL))
- GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL);
if ((p == &p2) && (p1.th != NULL))
- GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL);
if ((p == &p1) && (p2.hello != NULL))
- GNUNET_TRANSPORT_offer_hello (p1.th, p2.hello, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p1.cfg, p2.hello, NULL, NULL);
if ((p == &p2) && (p1.hello != NULL))
- GNUNET_TRANSPORT_offer_hello (p2.th, p1.hello, NULL, NULL);
+ GNUNET_TRANSPORT_offer_hello (p2.cfg, p1.hello, NULL, NULL);
}
@@ -579,7 +579,7 @@ setup_peer (struct PeerContext *p, const char *cfgname)
GNUNET_assert (p->th != NULL);
p->ats = GNUNET_ATS_connectivity_init (p->cfg);
GNUNET_assert (NULL != p->ats);
- p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+ p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
GNUNET_free (binary);
}
diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c
index abdd77548..e3b9d59a4 100644
--- a/src/dht/gnunet-service-dht.c
+++ b/src/dht/gnunet-service-dht.c
@@ -67,11 +67,6 @@ struct GNUNET_SERVER_Handle *GDS_server;
struct GNUNET_MessageHeader *GDS_my_hello;
/**
- * Handle to the transport service, for getting our hello
- */
-struct GNUNET_TRANSPORT_Handle *GDS_transport_handle;
-
-/**
* Handle to get our current HELLO.
*/
static struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
@@ -112,11 +107,6 @@ shutdown_task (void *cls)
GNUNET_TRANSPORT_get_hello_cancel (ghh);
ghh = NULL;
}
- if (GDS_transport_handle != NULL)
- {
- GNUNET_TRANSPORT_disconnect (GDS_transport_handle);
- GDS_transport_handle = NULL;
- }
GDS_NEIGHBOURS_done ();
GDS_DATACACHE_done ();
GDS_ROUTING_done ();
@@ -170,15 +160,9 @@ run (void *cls,
}
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
NULL);
- GDS_transport_handle =
- GNUNET_TRANSPORT_connect (GDS_cfg, NULL, NULL, NULL, NULL, NULL);
- if (GDS_transport_handle == NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Failed to connect to transport service!\n"));
- return;
- }
- ghh = GNUNET_TRANSPORT_get_hello (GDS_transport_handle, &process_hello, NULL);
+ ghh = GNUNET_TRANSPORT_get_hello (GDS_cfg,
+ &process_hello,
+ NULL);
}
diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h
index 6f641cb96..4684c2324 100644
--- a/src/dht/gnunet-service-dht.h
+++ b/src/dht/gnunet-service-dht.h
@@ -57,9 +57,5 @@ extern struct GNUNET_SERVER_Handle *GDS_server;
*/
extern struct GNUNET_MessageHeader *GDS_my_hello;
-/**
- * Handle to the transport service, for getting our hello
- */
-extern struct GNUNET_TRANSPORT_Handle *GDS_transport_handle;
#endif
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index 4add3c4ae..b24a95ab2 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -592,13 +592,11 @@ try_connect (const struct GNUNET_PeerIdentity *pid,
ci,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
}
- if ( (NULL != GDS_transport_handle) &&
- (NULL != ci->oh) &&
+ if ( (NULL != ci->oh) &&
(NULL != h) )
GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
- if ( (NULL != GDS_transport_handle) &&
- (NULL != h) )
- ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
+ if (NULL != h)
+ ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
h,
&offer_hello_done,
ci);
diff --git a/src/hostlist/gnunet-daemon-hostlist_client.c b/src/hostlist/gnunet-daemon-hostlist_client.c
index df0cabe1d..c8c74a9ba 100644
--- a/src/hostlist/gnunet-daemon-hostlist_client.c
+++ b/src/hostlist/gnunet-daemon-hostlist_client.c
@@ -142,6 +142,14 @@ struct Hostlist
};
+struct HelloOffer
+{
+ struct HelloOffer *next;
+ struct HelloOffer *prev;
+ struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
+};
+
+
/**
* Our configuration.
*/
@@ -153,11 +161,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
static struct GNUNET_STATISTICS_Handle *stats;
/**
- * Transport handle.
- */
-static struct GNUNET_TRANSPORT_Handle *transport;
-
-/**
* Proxy hostname or ip we are using (can be NULL).
*/
static char *proxy;
@@ -312,6 +315,25 @@ static unsigned int stat_hellos_obtained;
*/
static unsigned int stat_connection_count;
+static struct HelloOffer *ho_head;
+
+static struct HelloOffer *ho_tail;
+
+
+/**
+ * Hello offer complete. Clean up.
+ */
+static void
+done_offer_hello (void *cls)
+{
+ struct HelloOffer *ho = cls;
+
+ GNUNET_CONTAINER_DLL_remove (ho_head,
+ ho_tail,
+ ho);
+ GNUNET_free (ho);
+}
+
/**
* Process downloaded bits by calling callback on each HELLO.
@@ -331,6 +353,7 @@ callback_download (void *ptr,
static char download_buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
const char *cbuf = ptr;
const struct GNUNET_MessageHeader *msg;
+ struct HelloOffer *ho;
size_t total;
size_t cpy;
size_t left;
@@ -390,7 +413,22 @@ callback_download (void *ptr,
("# valid HELLOs downloaded from hostlist servers"),
1, GNUNET_NO);
stat_hellos_obtained++;
- GNUNET_TRANSPORT_offer_hello (transport, msg, NULL, NULL);
+
+ ho = GNUNET_new (struct HelloOffer);
+ ho->ohh = GNUNET_TRANSPORT_offer_hello (cfg,
+ msg,
+ &done_offer_hello,
+ ho);
+ if (NULL == ho->ohh)
+ {
+ GNUNET_free (ho);
+ }
+ else
+ {
+ GNUNET_CONTAINER_DLL_insert (ho_head,
+ ho_tail,
+ ho);
+ }
}
else
{
@@ -405,7 +443,9 @@ callback_download (void *ptr,
stat_hellos_obtained++;
return total;
}
- memmove (download_buffer, &download_buffer[msize], download_pos - msize);
+ memmove (download_buffer,
+ &download_buffer[msize],
+ download_pos - msize);
download_pos -= msize;
}
return total;
@@ -1532,13 +1572,6 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c,
GNUNET_break (0);
return GNUNET_SYSERR;
}
- transport = GNUNET_TRANSPORT_connect (c, NULL, NULL, NULL, NULL, NULL);
- if (NULL == transport)
- {
- GNUNET_break (0);
- curl_global_cleanup ();
- return GNUNET_SYSERR;
- }
cfg = c;
stats = st;
@@ -1687,8 +1720,18 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c,
void
GNUNET_HOSTLIST_client_stop ()
{
+ struct HelloOffer *ho;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Hostlist client shutdown\n");
+ while (NULL != (ho = ho_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (ho_head,
+ ho_tail,
+ ho);
+ GNUNET_TRANSPORT_offer_hello_cancel (ho->ohh);
+ GNUNET_free (ho);
+ }
if (NULL != sget)
{
GNUNET_STATISTICS_get_cancel (sget);
@@ -1725,11 +1768,6 @@ GNUNET_HOSTLIST_client_stop ()
ti_check_download = NULL;
curl_global_cleanup ();
}
- if (NULL != transport)
- {
- GNUNET_TRANSPORT_disconnect (transport);
- transport = NULL;
- }
GNUNET_free_non_null (proxy);
proxy = NULL;
GNUNET_free_non_null (proxy_username);
diff --git a/src/hostlist/test_gnunet_daemon_hostlist.c b/src/hostlist/test_gnunet_daemon_hostlist.c
index 5f8ece9b8..6a5850c4d 100644
--- a/src/hostlist/test_gnunet_daemon_hostlist.c
+++ b/src/hostlist/test_gnunet_daemon_hostlist.c
@@ -147,7 +147,7 @@ setup_peer (struct PeerContext *p, const char *cfgname)
p->th =
GNUNET_TRANSPORT_connect (p->cfg, NULL, p, NULL, &notify_connect, NULL);
GNUNET_assert (p->th != NULL);
- p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+ p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
GNUNET_free (binary);
}
diff --git a/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c b/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c
index 3dad137a2..30f26717f 100644
--- a/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c
+++ b/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c
@@ -116,7 +116,7 @@ setup_peer (struct PeerContext *p, const char *cfgname)
p->th =
GNUNET_TRANSPORT_connect (p->cfg, NULL, p, NULL, &notify_connect, NULL);
GNUNET_assert (p->th != NULL);
- p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+ p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
GNUNET_free (binary);
}
diff --git a/src/include/gnunet_transport_core_service.h b/src/include/gnunet_transport_core_service.h
index 816d5efaa..6dada4f54 100644
--- a/src/include/gnunet_transport_core_service.h
+++ b/src/include/gnunet_transport_core_service.h
@@ -50,49 +50,41 @@ extern "C"
/**
- * Function called by the transport for each received message.
- *
- * @param cls closure
- * @param peer (claimed) identity of the other peer
- * @param message the message
- */
-typedef void
-(*GNUNET_TRANSPORT_ReceiveCallback) (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message);
-
-
-/**
* Opaque handle to the service.
*/
-struct GNUNET_TRANSPORT_Handle;
+struct GNUNET_TRANSPORT_CoreHandle;
/**
- * Function called to notify CORE service that another
- * @a peer connected to us.
+ * Function called to notify transport users that another
+ * peer connected to us.
*
* @param cls closure
- * @param peer the peer that connected, never NULL
- * @param mq message queue for sending messages to this peer
+ * @param peer the peer that connected
+ * @param mq message queue to use to transmit to @a peer
+ * @return closure to use in MQ handlers
*/
-typedef void
-(*GNUNET_TRANSPORT_NotifyConnect) (void *cls,
+typedef void *
+(*GNUNET_TRANSPORT_NotifyConnecT) (void *cls,
const struct GNUNET_PeerIdentity *peer,
struct GNUNET_MQ_Handle *mq);
/**
- * Function called to notify CORE service that another
- * @a peer disconnected from us. The associated message
- * queue must not be used henceforth.
+ * Function called to notify transport users that another peer
+ * disconnected from us. The message queue that was given to the
+ * connect notification will be destroyed and must not be used
+ * henceforth.
*
- * @param cls closure
- * @param peer the peer that disconnected, never NULL
+ * @param cls closure from #GNUNET_TRANSPORT_connecT
+ * @param peer the peer that disconnected
+ * @param handlers_cls closure of the handlers, was returned from the
+ * connect notification callback
*/
typedef void
-(*GNUNET_TRANSPORT_NotifyDisconnect) (void *cls,
- const struct GNUNET_PeerIdentity *peer);
+(*GNUNET_TRANSPORT_NotifyDisconnecT) (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *handler_cls);
/**
@@ -108,34 +100,41 @@ typedef void
*
* @param cls the closure
* @param neighbour peer that we have excess bandwidth to
+ * @param handlers_cls closure of the handlers, was returned from the
+ * connect notification callback
*/
typedef void
-(*GNUNET_TRANSPORT_NotifyExcessBandwidth)(void *cls,
- const struct GNUNET_PeerIdentity *neighbour);
+(*GNUNET_TRANSPORT_NotifyExcessBandwidtH)(void *cls,
+ const struct GNUNET_PeerIdentity *neighbour,
+ void *handlers_cls);
+
/**
- * Connect to the transport service.
+ * Connect to the transport service. Note that the connection may
+ * complete (or fail) asynchronously.
*
* @param cfg configuration to use
- * @param self our own identity (if API should check that it matches
+ * @param self our own identity (API should check that it matches
* the identity found by transport), or NULL (no check)
- * @param cls closure for the callbacks
- * @param rec_handlers NULL-terminated array of handlers for incoming
- * messages, or NULL
+ * @param handlers array of message handlers; note that the
+ * closures provided will be ignored and replaced
+ * with the respective return value from @a nc
+ * @param handlers array with handlers to call when we receive messages, or NULL
+ * @param cls closure for the @a nc, @a nd and @a neb callbacks
* @param nc function to call on connect events, or NULL
* @param nd function to call on disconnect events, or NULL
- * @param neb function to call if we have excess bandwidth to a peer
+ * @param neb function to call if we have excess bandwidth to a peer, or NULL
* @return NULL on error
*/
-struct GNUNET_TRANSPORT_Handle *
+struct GNUNET_TRANSPORT_CoreHandle *
GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_PeerIdentity *self,
+ const struct GNUNET_MQ_MessageHandler *handlers,
void *cls,
- GNUNET_MQ_MessageHandler *rec_handlers,
- GNUNET_TRANSPORT_NotifyConnect nc,
- GNUNET_TRANSPORT_NotifyDisconnect nd,
- GNUNET_TRANSPORT_NotifyExcessBandwidth neb);
+ GNUNET_TRANSPORT_NotifyConnecT nc,
+ GNUNET_TRANSPORT_NotifyDisconnecT nd,
+ GNUNET_TRANSPORT_NotifyExcessBandwidtH neb);
/**
@@ -144,22 +143,19 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
* @param handle handle returned from connect
*/
void
-GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle);
+GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle);
/**
- * Checks if a given peer is connected to us. Convenience
- * API in case a client does not track connect/disconnect
- * events internally.
+ * Checks if a given peer is connected to us and get the message queue.
*
* @param handle connection to transport service
* @param peer the peer to check
- * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
+ * @return NULL if disconnected, otherwise message queue for @a peer
*/
-int
-GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
- const struct GNUNET_PeerIdentity *peer);
-
+struct GNUNET_MQ_Handle *
+GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
+ const struct GNUNET_PeerIdentity *peer);
#if 0 /* keep Emacsens' auto-indent happy */
diff --git a/src/include/gnunet_transport_service.h b/src/include/gnunet_transport_service.h
index 96182424e..b40763b92 100644
--- a/src/include/gnunet_transport_service.h
+++ b/src/include/gnunet_transport_service.h
@@ -66,12 +66,6 @@ typedef void
/**
- * Opaque handle to the service.
- */
-struct GNUNET_TRANSPORT_Handle;
-
-
-/**
* Function called to notify transport users that another
* peer connected to us.
*
@@ -82,6 +76,7 @@ typedef void
(*GNUNET_TRANSPORT_NotifyConnect) (void *cls,
const struct GNUNET_PeerIdentity *peer);
+
/**
* Function called to notify transport users that another
* peer disconnected from us.
@@ -288,13 +283,13 @@ struct GNUNET_TRANSPORT_GetHelloHandle;
* Obtain updates on changes to the HELLO message for this peer. The callback
* given in this function is never called synchronously.
*
- * @param handle connection to transport service
+ * @param cfg configuration
* @param rec function to call with the HELLO
* @param rec_cls closure for @a rec
* @return handle to cancel the operation
*/
struct GNUNET_TRANSPORT_GetHelloHandle *
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
GNUNET_TRANSPORT_HelloUpdateCallback rec,
void *rec_cls);
@@ -319,7 +314,7 @@ struct GNUNET_TRANSPORT_OfferHelloHandle;
* the transport service may just ignore this message if the HELLO is
* malformed or useless due to our local configuration.
*
- * @param handle connection to transport service
+ * @param cfg configuration
* @param hello the hello message
* @param cont continuation to call when HELLO has been sent,
* tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
@@ -330,7 +325,7 @@ struct GNUNET_TRANSPORT_OfferHelloHandle;
*
*/
struct GNUNET_TRANSPORT_OfferHelloHandle *
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_MessageHeader *hello,
GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls);
diff --git a/src/peerinfo-tool/gnunet-peerinfo.c b/src/peerinfo-tool/gnunet-peerinfo.c
index b6aa224fd..14f1e4604 100644
--- a/src/peerinfo-tool/gnunet-peerinfo.c
+++ b/src/peerinfo-tool/gnunet-peerinfo.c
@@ -183,11 +183,6 @@ static struct GNUNET_SCHEDULER_Task * tt;
static struct GNUNET_TRANSPORT_GetHelloHandle *gh;
/**
- * Connection to transport service.
- */
-static struct GNUNET_TRANSPORT_Handle *transport;
-
-/**
* Current iterator context (if active, otherwise NULL).
*/
static struct GNUNET_PEERINFO_IteratorContext *pic;
@@ -641,11 +636,6 @@ shutdown_task (void *cls)
GNUNET_TRANSPORT_get_hello_cancel (gh);
gh = NULL;
}
- if (NULL != transport)
- {
- GNUNET_TRANSPORT_disconnect (transport);
- transport = NULL;
- }
while (NULL != (pc = pc_head))
{
GNUNET_CONTAINER_DLL_remove (pc_head,
@@ -702,8 +692,6 @@ hello_callback (void *cls,
&my_peer_identity));
GNUNET_TRANSPORT_get_hello_cancel (gh);
gh = NULL;
- GNUNET_TRANSPORT_disconnect (transport);
- transport = NULL;
if (NULL != dump_hello)
dump_my_hello ();
tt = GNUNET_SCHEDULER_add_now (&state_machine, NULL);
@@ -740,11 +728,7 @@ testservice_task (void *cls,
(GNUNET_YES == get_uri) ||
(NULL != dump_hello) )
{
- transport = GNUNET_TRANSPORT_connect (cfg,
- NULL,
- NULL,
- NULL, NULL, NULL);
- gh = GNUNET_TRANSPORT_get_hello (transport,
+ gh = GNUNET_TRANSPORT_get_hello (cfg,
&hello_callback,
NULL);
}
diff --git a/src/testbed/gnunet-service-testbed_connectionpool.c b/src/testbed/gnunet-service-testbed_connectionpool.c
index 0fa2a6456..47b6fab08 100644
--- a/src/testbed/gnunet-service-testbed_connectionpool.c
+++ b/src/testbed/gnunet-service-testbed_connectionpool.c
@@ -463,7 +463,8 @@ connection_ready (void *cls)
entry->handle_core,
entry->handle_transport,
entry->handle_ats_connectivity,
- entry->peer_identity);
+ entry->peer_identity,
+ entry->cfg);
}
diff --git a/src/testbed/gnunet-service-testbed_connectionpool.h b/src/testbed/gnunet-service-testbed_connectionpool.h
index 589421840..54b37f6d5 100644
--- a/src/testbed/gnunet-service-testbed_connectionpool.h
+++ b/src/testbed/gnunet-service-testbed_connectionpool.h
@@ -85,13 +85,15 @@ GST_connection_pool_destroy (void);
* @param ac the handle to ATS, can be NULL if it is not requested
* @param peer_id the identity of the peer. Will be NULL if ch is NULL. In other
* cases, its value being NULL means that CORE connection has failed.
+ * @param cfg configuration of the peer
*/
typedef void
(*GST_connection_pool_connection_ready_cb) (void *cls,
struct GNUNET_CORE_Handle *ch,
struct GNUNET_TRANSPORT_Handle *th,
struct GNUNET_ATS_ConnectivityHandle *ac,
- const struct GNUNET_PeerIdentity *peer_id);
+ const struct GNUNET_PeerIdentity *peer_id,
+ const struct GNUNET_CONFIGURATION_Handle *cfg);
/**
diff --git a/src/testbed/gnunet-service-testbed_oc.c b/src/testbed/gnunet-service-testbed_oc.c
index de462da7a..8902a359c 100644
--- a/src/testbed/gnunet-service-testbed_oc.c
+++ b/src/testbed/gnunet-service-testbed_oc.c
@@ -49,6 +49,11 @@ struct ConnectivitySuggestContext
struct GNUNET_TRANSPORT_Handle *th_;
/**
+ * Configuration of the peer from cache. Do not free!
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
* The GetCacheHandle for the peer2's transport handle
* (used to offer the HELLO to the peer).
*/
@@ -699,13 +704,15 @@ overlay_connect_notify (void *cls,
* @param th the handle to TRANSPORT. Can be NULL if it is not requested
* @param ac the handle to ATS. Can be NULL if it is not requested
* @param my_identity the identity of our peer
+ * @param cfg configuration of the peer
*/
static void
occ_cache_get_handle_ats_occ_cb (void *cls,
struct GNUNET_CORE_Handle *ch,
struct GNUNET_TRANSPORT_Handle *th,
struct GNUNET_ATS_ConnectivityHandle *ac,
- const struct GNUNET_PeerIdentity *my_identity)
+ const struct GNUNET_PeerIdentity *my_identity,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct OverlayConnectContext *occ = cls;
struct LocalPeer2Context *lp2c;
@@ -754,7 +761,8 @@ occ_cache_get_handle_ats_rocc_cb (void *cls,
struct GNUNET_CORE_Handle *ch,
struct GNUNET_TRANSPORT_Handle *th,
struct GNUNET_ATS_ConnectivityHandle *ac,
- const struct GNUNET_PeerIdentity *my_identity)
+ const struct GNUNET_PeerIdentity *my_identity,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct RemoteOverlayConnectCtx *rocc = cls;
@@ -896,7 +904,7 @@ send_hello (void *cls)
other_peer_str);
GNUNET_free (other_peer_str);
lp2c->ohh =
- GNUNET_TRANSPORT_offer_hello (lp2c->tcc.th_,
+ GNUNET_TRANSPORT_offer_hello (lp2c->tcc.cfg,
occ->hello,
occ_hello_sent_cb,
occ);
@@ -922,13 +930,15 @@ send_hello (void *cls)
* @param th the handle to TRANSPORT. Can be NULL if it is not requested
* @param ac the handle to ATS. Can be NULL if it is not requested
* @param ignore_ peer identity which is ignored in this callback
- */
+ * @param cfg configuration of the peer
+*/
static void
p2_transport_connect_cache_callback (void *cls,
struct GNUNET_CORE_Handle *ch,
struct GNUNET_TRANSPORT_Handle *th,
struct GNUNET_ATS_ConnectivityHandle *ac,
- const struct GNUNET_PeerIdentity *ignore_)
+ const struct GNUNET_PeerIdentity *ignore_,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct OverlayConnectContext *occ = cls;
@@ -945,6 +955,7 @@ p2_transport_connect_cache_callback (void *cls,
return;
}
occ->p2ctx.local.tcc.th_ = th;
+ occ->p2ctx.local.tcc.cfg = cfg;
GNUNET_asprintf (&occ->emsg,
"0x%llx: Timeout while offering HELLO to %s",
occ->op_id,
@@ -1068,7 +1079,8 @@ p1_transport_connect_cache_callback (void *cls,
struct GNUNET_CORE_Handle *ch,
struct GNUNET_TRANSPORT_Handle *th,
struct GNUNET_ATS_ConnectivityHandle *ac,
- const struct GNUNET_PeerIdentity *ignore_)
+ const struct GNUNET_PeerIdentity *ignore_,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct OverlayConnectContext *occ = cls;
@@ -1092,7 +1104,7 @@ p1_transport_connect_cache_callback (void *cls,
"0x%llx: Timeout while acquiring HELLO of peer %s",
occ->op_id,
GNUNET_i2s (&occ->peer_identity));
- occ->ghh = GNUNET_TRANSPORT_get_hello (occ->p1th_,
+ occ->ghh = GNUNET_TRANSPORT_get_hello (cfg,
&hello_update_cb,
occ);
}
@@ -1112,7 +1124,8 @@ occ_cache_get_handle_core_cb (void *cls,
struct GNUNET_CORE_Handle *ch,
struct GNUNET_TRANSPORT_Handle *th,
struct GNUNET_ATS_ConnectivityHandle *ac,
- const struct GNUNET_PeerIdentity *my_identity)
+ const struct GNUNET_PeerIdentity *my_identity,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct OverlayConnectContext *occ = cls;
const struct GNUNET_MessageHeader *hello;
@@ -1743,7 +1756,7 @@ attempt_connect_task (void *cls)
GNUNET_i2s (&rocc->a_id),
rocc->peer->id);
rocc->ohh =
- GNUNET_TRANSPORT_offer_hello (rocc->tcc.th_,
+ GNUNET_TRANSPORT_offer_hello (rocc->tcc.cfg,
rocc->hello,
&rocc_hello_sent_cb,
rocc);
@@ -1772,7 +1785,8 @@ rocc_cache_get_handle_transport_cb (void *cls,
struct GNUNET_CORE_Handle *ch,
struct GNUNET_TRANSPORT_Handle *th,
struct GNUNET_ATS_ConnectivityHandle *ac,
- const struct GNUNET_PeerIdentity *ignore_)
+ const struct GNUNET_PeerIdentity *ignore_,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct RemoteOverlayConnectCtx *rocc = cls;
@@ -1783,6 +1797,7 @@ rocc_cache_get_handle_transport_cb (void *cls,
return;
}
rocc->tcc.th_ = th;
+ rocc->tcc.cfg = cfg;
if (GNUNET_YES ==
GNUNET_TRANSPORT_check_peer_connected (rocc->tcc.th_,
&rocc->a_id))
diff --git a/src/topology/gnunet-daemon-topology.c b/src/topology/gnunet-daemon-topology.c
index eddac8c8a..9baaf513d 100644
--- a/src/topology/gnunet-daemon-topology.c
+++ b/src/topology/gnunet-daemon-topology.c
@@ -142,11 +142,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
static struct GNUNET_CORE_Handle *handle;
/**
- * Handle to the TRANSPORT service.
- */
-static struct GNUNET_TRANSPORT_Handle *transport;
-
-/**
* Handle to the ATS service.
*/
static struct GNUNET_ATS_ConnectivityHandle *ats;
@@ -180,6 +175,11 @@ static struct GNUNET_TRANSPORT_Blacklist *blacklist;
static struct GNUNET_SCHEDULER_Task *add_task;
/**
+ * Active HELLO offering to transport service.
+ */
+static struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
+
+/**
* Flag to disallow non-friend connections (pure F2F mode).
*/
static int friends_only;
@@ -1008,6 +1008,16 @@ read_friends_file (const struct GNUNET_CONFIGURATION_Handle *cfg)
/**
+ * Hello offer complete. Clean up.
+ */
+static void
+done_offer_hello (void *cls)
+{
+ oh = NULL;
+}
+
+
+/**
* This function is called whenever an encrypted HELLO message is
* received.
*
@@ -1055,11 +1065,12 @@ handle_encrypted_hello (void *cls,
(friend_count < minimum_friend_count))
return GNUNET_OK;
}
- if (NULL != transport)
- GNUNET_TRANSPORT_offer_hello (transport,
- message,
- NULL,
- NULL);
+ if (NULL != oh)
+ GNUNET_TRANSPORT_offer_hello_cancel (oh);
+ oh = GNUNET_TRANSPORT_offer_hello (cfg,
+ message,
+ &done_offer_hello,
+ NULL);
return GNUNET_OK;
}
@@ -1136,11 +1147,6 @@ cleaning_task (void *cls)
GNUNET_PEERINFO_notify_cancel (peerinfo_notify);
peerinfo_notify = NULL;
}
- if (NULL != transport)
- {
- GNUNET_TRANSPORT_disconnect (transport);
- transport = NULL;
- }
if (NULL != handle)
{
GNUNET_CORE_disconnect (handle);
@@ -1152,6 +1158,11 @@ cleaning_task (void *cls)
GNUNET_SCHEDULER_cancel (add_task);
add_task = NULL;
}
+ if (NULL != oh)
+ {
+ GNUNET_TRANSPORT_offer_hello_cancel (oh);
+ oh = NULL;
+ }
GNUNET_CONTAINER_multipeermap_iterate (peers,
&free_peer,
NULL);
@@ -1223,12 +1234,6 @@ run (void *cls,
&blacklist_check,
NULL);
ats = GNUNET_ATS_connectivity_init (cfg);
- transport = GNUNET_TRANSPORT_connect (cfg,
- NULL,
- NULL,
- NULL,
- NULL,
- NULL);
handle =
GNUNET_CORE_connect (cfg, NULL,
&core_init,
@@ -1239,14 +1244,6 @@ run (void *cls,
handlers);
GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
NULL);
- if (NULL == transport)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Failed to connect to `%s' service.\n"),
- "transport");
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
if (NULL == handle)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index 3a1170c10..48793bd87 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -163,10 +163,12 @@ libgnunettransporttesting_la_LDFLAGS = \
libgnunettransport_la_SOURCES = \
transport_api.c transport.h \
- transport_api_blacklist.c \
transport_api_address_to_string.c \
+ transport_api_blacklist.c \
+ transport_api_get_hello.c \
transport_api_monitor_peers.c \
- transport_api_monitor_plugins.c
+ transport_api_monitor_plugins.c \
+ transport_api_offer_hello.c
libgnunettransport_la_LIBADD = \
$(top_builddir)/src/hello/libgnunethello.la \
diff --git a/src/transport/transport-testing.c b/src/transport/transport-testing.c
index 4a514ea72..4a3bf3c3e 100644
--- a/src/transport/transport-testing.c
+++ b/src/transport/transport-testing.c
@@ -246,7 +246,7 @@ offer_hello (void *cls)
if (NULL != cc->oh)
GNUNET_TRANSPORT_offer_hello_cancel (cc->oh);
cc->oh =
- GNUNET_TRANSPORT_offer_hello (cc->p1->th,
+ GNUNET_TRANSPORT_offer_hello (cc->p1->cfg,
(const struct GNUNET_MessageHeader *) cc->p2->hello,
&hello_offered,
cc);
@@ -380,7 +380,7 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_handle *tth
GNUNET_TRANSPORT_TESTING_stop_peer (tth, p);
return NULL;
}
- p->ghh = GNUNET_TRANSPORT_get_hello (p->th,
+ p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg,
&get_hello,
p);
GNUNET_assert (p->ghh != NULL);
@@ -465,7 +465,7 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct PeerContext *p,
&notify_disconnect);
GNUNET_assert (NULL != p->th);
p->ats = GNUNET_ATS_connectivity_init (p->cfg);
- p->ghh = GNUNET_TRANSPORT_get_hello (p->th,
+ p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg,
&get_hello,
p);
GNUNET_assert (NULL != p->ghh);
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c
index 59f249686..e7db5493e 100644
--- a/src/transport/transport_api.c
+++ b/src/transport/transport_api.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- Copyright (C) 2009-2013 GNUnet e.V.
+ Copyright (C) 2009-2013, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -163,86 +163,6 @@ struct Neighbour
};
-/**
- * Linked list of functions to call whenever our HELLO is updated.
- */
-struct GNUNET_TRANSPORT_GetHelloHandle
-{
-
- /**
- * This is a doubly linked list.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *next;
-
- /**
- * This is a doubly linked list.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *prev;
-
- /**
- * Transport handle.
- */
- struct GNUNET_TRANSPORT_Handle *handle;
-
- /**
- * Callback to call once we got our HELLO.
- */
- GNUNET_TRANSPORT_HelloUpdateCallback rec;
-
- /**
- * Task for calling the HelloUpdateCallback when we already have a HELLO
- */
- struct GNUNET_SCHEDULER_Task *notify_task;
-
- /**
- * Closure for @e rec.
- */
- void *rec_cls;
-
-};
-
-
-/**
- * Entry in linked list for all offer-HELLO requests.
- */
-struct GNUNET_TRANSPORT_OfferHelloHandle
-{
- /**
- * For the DLL.
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
-
- /**
- * For the DLL.
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *next;
-
- /**
- * Transport service handle we use for transmission.
- */
- struct GNUNET_TRANSPORT_Handle *th;
-
- /**
- * Transmission handle for this request.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *tth;
-
- /**
- * Function to call once we are done.
- */
- GNUNET_SCHEDULER_TaskCallback cont;
-
- /**
- * Closure for @e cont
- */
- void *cls;
-
- /**
- * The HELLO message to be transmitted.
- */
- struct GNUNET_MessageHeader *msg;
-};
-
/**
* Handle for the transport service (includes all of the
@@ -277,16 +197,6 @@ struct GNUNET_TRANSPORT_Handle
GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
/**
- * Head of DLL of control messages.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *control_head;
-
- /**
- * Tail of DLL of control messages.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
-
- /**
* The current HELLO message for this peer. Updated
* whenever transports change their addresses.
*/
@@ -295,32 +205,7 @@ struct GNUNET_TRANSPORT_Handle
/**
* My client connection to the transport service.
*/
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Handle to our registration with the client for notification.
- */
- struct GNUNET_CLIENT_TransmitHandle *cth;
-
- /**
- * Linked list of pending requests for our HELLO.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
-
- /**
- * Linked list of pending requests for our HELLO.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
-
- /**
- * Linked list of pending offer HELLO requests head
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
-
- /**
- * Linked list of pending offer HELLO requests tail
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
+ struct GNUNET_MQ_Handle *mq;
/**
* My configuration.
@@ -458,7 +343,8 @@ outbound_bw_tracker_update (void *cls)
GNUNET_STRINGS_relative_time_to_string (delay,
GNUNET_NO));
GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
- n->hn, delay.rel_value_us);
+ n->hn,
+ delay.rel_value_us);
schedule_transmission (n->h);
}
@@ -558,268 +444,296 @@ neighbour_delete (void *cls,
/**
- * Function we use for handling incoming messages.
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Error receiving from transport service, disconnecting temporarily.\n");
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+}
+
+
+/**
+ * Function we use for checking incoming HELLO messages.
*
* @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
- * @param msg message received, NULL on timeout or fatal error
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_hello (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PeerIdentity me;
+
+ if (GNUNET_OK !=
+ GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
+ &me))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_i2s (&me));
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function we use for handling incoming HELLO messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
*/
static void
-demultiplexer (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_hello (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+
+ GNUNET_free_non_null (h->my_hello);
+ h->my_hello = GNUNET_copy_message (msg);
+}
+
+
+/**
+ * Function we use for handling incoming connect messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param cim message received
+ */
+static void
+handle_connect (void *cls,
+ const struct ConnectInfoMessage *cim)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct Neighbour *n;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving CONNECT message for `%s'.\n",
+ GNUNET_i2s (&cim->id));
+ n = neighbour_find (h, &cim->id);
+ if (NULL != n)
+ {
+ GNUNET_break (0);
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+ return;
+ }
+ n = neighbour_add (h,
+ &cim->id);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving CONNECT message for `%s' with quota %u\n",
+ GNUNET_i2s (&cim->id),
+ ntohl (cim->quota_out.value__));
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
+ cim->quota_out);
+ if (NULL != h->nc_cb)
+ h->nc_cb (h->cls,
+ &n->id);
+}
+
+
+/**
+ * Function we use for handling incoming disconnect messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param dim message received
+ */
+static void
+handle_disconnect (void *cls,
+ const struct DisconnectInfoMessage *dim)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct Neighbour *n;
+
+ GNUNET_break (ntohl (dim->reserved) == 0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving DISCONNECT message for `%s'.\n",
+ GNUNET_i2s (&dim->peer));
+ n = neighbour_find (h, &dim->peer);
+ if (NULL == n)
+ {
+ GNUNET_break (0);
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+ return;
+ }
+ neighbour_delete (h,
+ &dim->peer,
+ n);
+}
+
+
+/**
+ * Function we use for handling incoming send-ok messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param okm message received
+ */
+static void
+handle_send_ok (void *cls,
+ const struct SendOkMessage *okm)
{
struct GNUNET_TRANSPORT_Handle *h = cls;
- const struct DisconnectInfoMessage *dim;
- const struct ConnectInfoMessage *cim;
- const struct InboundMessage *im;
- const struct GNUNET_MessageHeader *imm;
- const struct SendOkMessage *okm;
- const struct QuotaSetMessage *qm;
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
- struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
struct Neighbour *n;
- struct GNUNET_PeerIdentity me;
- uint16_t size;
uint32_t bytes_msg;
uint32_t bytes_physical;
- GNUNET_assert (NULL != h->client);
- if (GNUNET_YES == h->reconnecting)
+ bytes_msg = ntohl (okm->bytes_msg);
+ bytes_physical = ntohl (okm->bytes_physical);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving SEND_OK message, transmission to %s %s.\n",
+ GNUNET_i2s (&okm->peer),
+ ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+
+ n = neighbour_find (h,
+ &okm->peer);
+ if (NULL == n)
{
+ /* We should never get a 'SEND_OK' for a peer that we are not
+ connected to */
+ GNUNET_break (0);
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
return;
}
- if (NULL == msg)
+ if (bytes_physical > bytes_msg)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Error receiving from transport service, disconnecting temporarily.\n");
+ "Overhead for %u byte message was %u\n",
+ bytes_msg,
+ bytes_physical - bytes_msg);
+ n->traffic_overhead += bytes_physical - bytes_msg;
+ }
+ GNUNET_break (GNUNET_NO == n->is_ready);
+ n->is_ready = GNUNET_YES;
+ if (NULL != n->unready_warn_task)
+ {
+ GNUNET_SCHEDULER_cancel (n->unready_warn_task);
+ n->unready_warn_task = NULL;
+ }
+ if ((NULL != n->th) && (NULL == n->hn))
+ {
+ GNUNET_assert (NULL != n->th->timeout_task);
+ GNUNET_SCHEDULER_cancel (n->th->timeout_task);
+ n->th->timeout_task = NULL;
+ /* we've been waiting for this (congestion, not quota,
+ * caused delayed transmission) */
+ n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
+ n,
+ 0);
+ }
+ schedule_transmission (h);
+}
+
+
+/**
+ * Function we use for checking incoming "inbound" messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param im message received
+ */
+static int
+check_recv (void *cls,
+ const struct InboundMessage *im)
+{
+ const struct GNUNET_MessageHeader *imm;
+ uint16_t size;
+
+ size = ntohs (im->header.size);
+ if (size <
+ sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ imm = (const struct GNUNET_MessageHeader *) &im[1];
+ if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function we use for handling incoming messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param im message received
+ */
+static void
+handle_recv (void *cls,
+ const struct InboundMessage *im)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ const struct GNUNET_MessageHeader *imm
+ = (const struct GNUNET_MessageHeader *) &im[1];
+ struct Neighbour *n;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u with %u bytes from `%s'.\n",
+ (unsigned int) ntohs (imm->type),
+ (unsigned int) ntohs (imm->size),
+ GNUNET_i2s (&im->peer));
+ n = neighbour_find (h, &im->peer);
+ if (NULL == n)
+ {
+ GNUNET_break (0);
h->reconnecting = GNUNET_YES;
disconnect_and_schedule_reconnect (h);
return;
}
- GNUNET_CLIENT_receive (h->client,
- &demultiplexer,
- h,
- GNUNET_TIME_UNIT_FOREVER_REL);
- size = ntohs (msg->size);
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- if (GNUNET_OK !=
- GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
- &me))
- {
- GNUNET_break (0);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
- (unsigned int) size,
- GNUNET_i2s (&me));
- GNUNET_free_non_null (h->my_hello);
- h->my_hello = NULL;
- if (size < sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- break;
- }
- h->my_hello = GNUNET_copy_message (msg);
- hwl = h->hwl_head;
- while (NULL != hwl)
- {
- next_hwl = hwl->next;
- hwl->rec (hwl->rec_cls,
- h->my_hello);
- hwl = next_hwl;
- }
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
- if (size < sizeof (struct ConnectInfoMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- cim = (const struct ConnectInfoMessage *) msg;
- if (size !=
- sizeof (struct ConnectInfoMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving CONNECT message for `%s'.\n",
- GNUNET_i2s (&cim->id));
- n = neighbour_find (h, &cim->id);
- if (NULL != n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- n = neighbour_add (h,
- &cim->id);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving CONNECT message for `%s' with quota %u\n",
- GNUNET_i2s (&cim->id),
- ntohl (cim->quota_out.value__));
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- cim->quota_out);
- if (NULL != h->nc_cb)
- h->nc_cb (h->cls,
- &n->id);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
- if (size != sizeof (struct DisconnectInfoMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- dim = (const struct DisconnectInfoMessage *) msg;
- GNUNET_break (ntohl (dim->reserved) == 0);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving DISCONNECT message for `%s'.\n",
- GNUNET_i2s (&dim->peer));
- n = neighbour_find (h, &dim->peer);
- if (NULL == n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- neighbour_delete (h,
- &dim->peer,
- n);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
- if (size != sizeof (struct SendOkMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- okm = (const struct SendOkMessage *) msg;
- bytes_msg = ntohl (okm->bytes_msg);
- bytes_physical = ntohl (okm->bytes_physical);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving SEND_OK message, transmission to %s %s.\n",
- GNUNET_i2s (&okm->peer),
- ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+ if (NULL != h->rec)
+ h->rec (h->cls,
+ &im->peer,
+ imm);
+}
- n = neighbour_find (h,
- &okm->peer);
- if (NULL == n)
- {
- /* We should never get a 'SEND_OK' for a peer that we are not
- connected to */
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- if (bytes_physical > bytes_msg)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Overhead for %u byte message was %u\n",
- bytes_msg,
- bytes_physical - bytes_msg);
- n->traffic_overhead += bytes_physical - bytes_msg;
- }
- GNUNET_break (GNUNET_NO == n->is_ready);
- n->is_ready = GNUNET_YES;
- if (NULL != n->unready_warn_task)
- {
- GNUNET_SCHEDULER_cancel (n->unready_warn_task);
- n->unready_warn_task = NULL;
- }
- if ((NULL != n->th) && (NULL == n->hn))
- {
- GNUNET_assert (NULL != n->th->timeout_task);
- GNUNET_SCHEDULER_cancel (n->th->timeout_task);
- n->th->timeout_task = NULL;
- /* we've been waiting for this (congestion, not quota,
- * caused delayed transmission) */
- n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
- n,
- 0);
- }
- schedule_transmission (h);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
- if (size <
- sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- im = (const struct InboundMessage *) msg;
- imm = (const struct GNUNET_MessageHeader *) &im[1];
- if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u with %u bytes from `%s'.\n",
- (unsigned int) ntohs (imm->type),
- (unsigned int) ntohs (imm->size),
- GNUNET_i2s (&im->peer));
- n = neighbour_find (h, &im->peer);
- if (NULL == n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- if (NULL != h->rec)
- h->rec (h->cls,
- &im->peer,
- imm);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
- if (size != sizeof (struct QuotaSetMessage))
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- qm = (const struct QuotaSetMessage *) msg;
- n = neighbour_find (h, &qm->peer);
- if (NULL == n)
- {
- GNUNET_break (0);
- h->reconnecting = GNUNET_YES;
- disconnect_and_schedule_reconnect (h);
- break;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving SET_QUOTA message for `%s' with quota %u\n",
- GNUNET_i2s (&qm->peer),
- ntohl (qm->quota.value__));
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- qm->quota);
- break;
- default:
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Received unexpected message of type %u in %s:%u\n"),
- ntohs (msg->type),
- __FILE__,
- __LINE__);
+
+/**
+ * Function we use for handling incoming set quota messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
+ */
+static void
+handle_set_quota (void *cls,
+ const struct QuotaSetMessage *qm)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct Neighbour *n;
+
+ n = neighbour_find (h, &qm->peer);
+ if (NULL == n)
+ {
GNUNET_break (0);
- break;
+ h->reconnecting = GNUNET_YES;
+ disconnect_and_schedule_reconnect (h);
+ return;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving SET_QUOTA message for `%s' with quota %u\n",
+ GNUNET_i2s (&qm->peer),
+ ntohl (qm->quota.value__));
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
+ qm->quota);
}
@@ -854,104 +768,53 @@ timeout_request_due_to_congestion (void *cls)
/**
- * Transmit message(s) to service.
+ * Transmit ready message(s) to service.
*
- * @param cls handle to transport
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
+ * @param h handle to transport
*/
-static size_t
-transport_notify_ready (void *cls,
- size_t size,
- void *buf)
+static void
+transmit_ready (struct GNUNET_TRANSPORT_Handle *h)
{
- struct GNUNET_TRANSPORT_Handle *h = cls;
struct GNUNET_TRANSPORT_TransmitHandle *th;
struct GNUNET_TIME_Relative delay;
struct Neighbour *n;
- char *cbuf;
- struct OutboundMessage obm;
- size_t ret;
- size_t nret;
+ struct OutboundMessage *obm;
+ struct GNUNET_MQ_Envelope *env;
size_t mret;
- GNUNET_assert (NULL != h->client);
- h->cth = NULL;
- if (NULL == buf)
- {
- /* transmission failed */
- disconnect_and_schedule_reconnect (h);
- return 0;
- }
-
- cbuf = buf;
- ret = 0;
- /* first send control messages */
- while ( (NULL != (th = h->control_head)) &&
- (th->notify_size <= size) )
- {
- GNUNET_CONTAINER_DLL_remove (h->control_head,
- h->control_tail,
- th);
- nret = th->notify (th->notify_cls,
- size,
- &cbuf[ret]);
- delay = GNUNET_TIME_absolute_get_duration (th->request_start);
- if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Added %u bytes of control message at %u after %s delay\n",
- nret,
- ret,
- GNUNET_STRINGS_relative_time_to_string (delay,
- GNUNET_YES));
- else
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Added %u bytes of control message at %u after %s delay\n",
- nret,
- ret,
- GNUNET_STRINGS_relative_time_to_string (delay,
- GNUNET_YES));
- GNUNET_free (th);
- ret += nret;
- size -= nret;
- }
-
- /* then, if possible and no control messages pending, send data messages */
- while ( (NULL == h->control_head) &&
- (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
+ GNUNET_assert (NULL != h->mq);
+ while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
{
+ th = n->th;
if (GNUNET_YES != n->is_ready)
{
/* peer not ready, wait for notification! */
GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
n->hn = NULL;
GNUNET_assert (NULL == n->th->timeout_task);
- n->th->timeout_task
+ th->timeout_task
= GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
- (n->th->timeout),
+ (th->timeout),
&timeout_request_due_to_congestion,
- n->th);
+ th);
continue;
}
- th = n->th;
- if (th->notify_size + sizeof (struct OutboundMessage) > size)
- break; /* does not fit */
- if (GNUNET_BANDWIDTH_tracker_get_delay
- (&n->out_tracker,
- th->notify_size).rel_value_us > 0)
+ if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
+ th->notify_size).rel_value_us > 0)
break; /* too early */
GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
n->hn = NULL;
n->th = NULL;
- GNUNET_assert (size >= sizeof (struct OutboundMessage));
+ env = GNUNET_MQ_msg_extra (obm,
+ th->notify_size,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
mret = th->notify (th->notify_cls,
- size - sizeof (struct OutboundMessage),
- &cbuf[ret + sizeof (struct OutboundMessage)]);
- GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
+ th->notify_size,
+ &obm[1]);
if (0 == mret)
{
GNUNET_free (th);
+ GNUNET_MQ_discard (env);
continue;
}
if (NULL != n->unready_warn_task)
@@ -961,20 +824,13 @@ transport_notify_ready (void *cls,
n);
n->last_payload = GNUNET_TIME_absolute_get ();
n->is_ready = GNUNET_NO;
- GNUNET_assert (mret + sizeof (struct OutboundMessage) <
- GNUNET_SERVER_MAX_MESSAGE_SIZE);
- obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
- obm.header.size = htons (mret + sizeof (struct OutboundMessage));
- obm.reserved = htonl (0);
- obm.timeout =
+ obm->reserved = htonl (0);
+ obm->timeout =
GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
(th->timeout));
- obm.peer = n->id;
- memcpy (&cbuf[ret],
- &obm,
- sizeof (struct OutboundMessage));
- ret += (mret + sizeof (struct OutboundMessage));
- size -= (mret + sizeof (struct OutboundMessage));
+ obm->peer = n->id;
+ GNUNET_MQ_send (h->mq,
+ env);
GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
mret);
delay = GNUNET_TIME_absolute_get_duration (th->request_start);
@@ -995,14 +851,9 @@ transport_notify_ready (void *cls,
GNUNET_YES),
(unsigned int) n->out_tracker.available_bytes_per_s__);
GNUNET_free (th);
- break;
}
/* if there are more pending messages, try to schedule those */
schedule_transmission (h);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to transport service\n",
- ret);
- return ret;
}
@@ -1016,12 +867,11 @@ static void
schedule_transmission_task (void *cls)
{
struct GNUNET_TRANSPORT_Handle *h = cls;
- size_t size;
struct GNUNET_TRANSPORT_TransmitHandle *th;
struct Neighbour *n;
h->quota_task = NULL;
- GNUNET_assert (NULL != h->client);
+ GNUNET_assert (NULL != h->mq);
/* destroy all requests that have timed out */
while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
(0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
@@ -1040,29 +890,12 @@ schedule_transmission_task (void *cls)
NULL));
GNUNET_free (th);
}
- if (NULL != h->cth)
- return;
- if (NULL != h->control_head)
- {
- size = h->control_head->notify_size;
- }
- else
- {
- n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
- if (NULL == n)
- return; /* no pending messages */
- size = n->th->notify_size + sizeof (struct OutboundMessage);
- }
+ n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
+ if (NULL == n)
+ return; /* no pending messages */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Calling notify_transmit_ready\n");
- h->cth
- = GNUNET_CLIENT_notify_transmit_ready (h->client,
- size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &transport_notify_ready,
- h);
- GNUNET_assert (NULL != h->cth);
+ transmit_ready (h);
}
@@ -1078,15 +911,13 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
struct GNUNET_TIME_Relative delay;
struct Neighbour *n;
- GNUNET_assert (NULL != h->client);
+ GNUNET_assert (NULL != h->mq);
if (NULL != h->quota_task)
{
GNUNET_SCHEDULER_cancel (h->quota_task);
h->quota_task = NULL;
}
- if (NULL != h->control_head)
- delay = GNUNET_TIME_UNIT_ZERO;
- else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
+ if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
{
delay =
GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
@@ -1111,83 +942,6 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
/**
- * Queue control request for transmission to the transport
- * service.
- *
- * @param h handle to the transport service
- * @param size number of bytes to be transmitted
- * @param notify function to call to get the content
- * @param notify_cls closure for @a notify
- * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
- */
-static struct GNUNET_TRANSPORT_TransmitHandle *
-schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
- size_t size,
- GNUNET_TRANSPORT_TransmitReadyNotify notify,
- void *notify_cls)
-{
- struct GNUNET_TRANSPORT_TransmitHandle *th;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Control transmit of %u bytes requested\n",
- size);
- th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
- th->notify = notify;
- th->notify_cls = notify_cls;
- th->notify_size = size;
- th->request_start = GNUNET_TIME_absolute_get ();
- GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
- h->control_tail,
- th);
- schedule_transmission (h);
- return th;
-}
-
-
-/**
- * Transmit START message to service.
- *
- * @param cls unused
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_start (void *cls,
- size_t size,
- void *buf)
-{
- struct GNUNET_TRANSPORT_Handle *h = cls;
- struct StartMessage s;
- uint32_t options;
-
- if (NULL == buf)
- {
- /* Can only be shutdown, just give up */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Shutdown while trying to transmit START request.\n");
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting START request.\n");
- GNUNET_assert (size >= sizeof (struct StartMessage));
- s.header.size = htons (sizeof (struct StartMessage));
- s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
- options = 0;
- if (h->check_self)
- options |= 1;
- if (NULL != h->rec)
- options |= 2;
- s.options = htonl (options);
- s.self = h->self;
- memcpy (buf, &s, sizeof (struct StartMessage));
- GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
- GNUNET_TIME_UNIT_FOREVER_REL);
- return sizeof (struct StartMessage);
-}
-
-
-/**
* Try again to connect to transport service.
*
* @param cls the handle to the transport service
@@ -1195,20 +949,61 @@ send_start (void *cls,
static void
reconnect (void *cls)
{
+ GNUNET_MQ_hd_var_size (hello,
+ GNUNET_MESSAGE_TYPE_HELLO,
+ struct GNUNET_MessageHeader);
+ GNUNET_MQ_hd_fixed_size (connect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
+ struct ConnectInfoMessage);
+ GNUNET_MQ_hd_fixed_size (disconnect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
+ struct DisconnectInfoMessage);
+ GNUNET_MQ_hd_fixed_size (send_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
+ struct SendOkMessage);
+ GNUNET_MQ_hd_var_size (recv,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
+ struct InboundMessage);
+ GNUNET_MQ_hd_fixed_size (set_quota,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
+ struct QuotaSetMessage);
struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_hello_handler (h),
+ make_connect_handler (h),
+ make_disconnect_handler (h),
+ make_send_ok_handler (h),
+ make_recv_handler (h),
+ make_set_quota_handler (h),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct StartMessage *s;
+ uint32_t options;
h->reconnect_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connecting to transport service.\n");
- GNUNET_assert (NULL == h->client);
- GNUNET_assert (NULL == h->control_head);
- GNUNET_assert (NULL == h->control_tail);
+ GNUNET_assert (NULL == h->mq);
h->reconnecting = GNUNET_NO;
- h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
-
- GNUNET_assert (NULL != h->client);
- schedule_control_transmit (h, sizeof (struct StartMessage),
- &send_start, h);
+ h->mq = GNUNET_CLIENT_connecT (h->cfg,
+ "transport",
+ handlers,
+ &mq_error_handler,
+ h);
+ if (NULL == h->mq)
+ return;
+ env = GNUNET_MQ_msg (s,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+ options = 0;
+ if (h->check_self)
+ options |= 1;
+ if (NULL != h->rec)
+ options |= 2;
+ s->options = htonl (options);
+ s->self = h->self;
+ GNUNET_MQ_send (h->mq,
+ env);
}
@@ -1221,20 +1016,11 @@ reconnect (void *cls)
static void
disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
{
- struct GNUNET_TRANSPORT_TransmitHandle *th;
-
GNUNET_assert (NULL == h->reconnect_task);
- if (NULL != h->cth)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
- h->cth = NULL;
- }
- if (NULL != h->client)
+ if (NULL != h->mq)
{
- GNUNET_CLIENT_disconnect (h->client);
- h->client = NULL;
-/* LOG (GNUNET_ERROR_TYPE_ERROR,
- "Client disconnect done \n");*/
+ GNUNET_MQ_destroy (h->mq);
+ h->mq = NULL;
}
/* Forget about all neighbours that we used to be connected to */
GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
@@ -1245,16 +1031,6 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
GNUNET_SCHEDULER_cancel (h->quota_task);
h->quota_task = NULL;
}
- while ((NULL != (th = h->control_head)))
- {
- GNUNET_CONTAINER_DLL_remove (h->control_head,
- h->control_tail,
- th);
- th->notify (th->notify_cls,
- 0,
- NULL);
- GNUNET_free (th);
- }
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Scheduling task to reconnect to transport service in %s.\n",
GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
@@ -1268,109 +1044,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
/**
- * Cancel control request for transmission to the transport service.
- *
- * @param th handle to the transport service
- * @param tth transmit handle to cancel
- */
-static void
-cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
- struct GNUNET_TRANSPORT_TransmitHandle *tth)
-{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Canceling transmit of contral transmission requested\n");
- GNUNET_CONTAINER_DLL_remove (th->control_head,
- th->control_tail,
- tth);
- GNUNET_free (tth);
-}
-
-
-/**
- * Send HELLO message to the service.
- *
- * @param cls the HELLO message to send
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_hello (void *cls,
- size_t size,
- void *buf)
-{
- struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
- struct GNUNET_MessageHeader *msg = ohh->msg;
- uint16_t ssize;
-
- if (NULL == buf)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout while trying to transmit `%s' request.\n",
- "HELLO");
- if (NULL != ohh->cont)
- ohh->cont (ohh->cls);
- GNUNET_free (msg);
- GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
- ohh->th->oh_tail,
- ohh);
- GNUNET_free (ohh);
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting `%s' request.\n",
- "HELLO");
- ssize = ntohs (msg->size);
- GNUNET_assert (size >= ssize);
- memcpy (buf,
- msg,
- ssize);
- GNUNET_free (msg);
- if (NULL != ohh->cont)
- ohh->cont (ohh->cls);
- GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
- ohh->th->oh_tail,
- ohh);
- GNUNET_free (ohh);
- return ssize;
-}
-
-
-/**
- * Send traffic metric message to the service.
- *
- * @param cls the message to send
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_metric (void *cls,
- size_t size,
- void *buf)
-{
- struct TrafficMetricMessage *msg = cls;
- uint16_t ssize;
-
- if (NULL == buf)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
- GNUNET_free (msg);
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting TRAFFIC_METRIC request.\n");
- ssize = ntohs (msg->header.size);
- GNUNET_assert (size >= ssize);
- memcpy (buf, msg, ssize);
- GNUNET_free (msg);
- return ssize;
-}
-
-
-/**
- * Set transport metrics for a peer and a direction
+ * Set transport metrics for a peer and a direction.
*
* @param handle transport handle
* @param peer the peer to set the metric for
@@ -1388,101 +1062,21 @@ GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
struct GNUNET_TIME_Relative delay_in,
struct GNUNET_TIME_Relative delay_out)
{
+ struct GNUNET_MQ_Envelope *env;
struct TrafficMetricMessage *msg;
- msg = GNUNET_new (struct TrafficMetricMessage);
- msg->header.size = htons (sizeof (struct TrafficMetricMessage));
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
+ if (NULL == handle->mq)
+ return;
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
msg->reserved = htonl (0);
msg->peer = *peer;
GNUNET_ATS_properties_hton (&msg->properties,
prop);
msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
- schedule_control_transmit (handle,
- sizeof (struct TrafficMetricMessage),
- &send_metric,
- msg);
-}
-
-
-/**
- * Offer the transport service the HELLO of another peer. Note that
- * the transport service may just ignore this message if the HELLO is
- * malformed or useless due to our local configuration.
- *
- * @param handle connection to transport service
- * @param hello the hello message
- * @param cont continuation to call when HELLO has been sent,
- * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
- * tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
- * @param cont_cls closure for @a cont
- * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
- * in case of failure @a cont will not be called
- *
- */
-struct GNUNET_TRANSPORT_OfferHelloHandle *
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
- const struct GNUNET_MessageHeader *hello,
- GNUNET_SCHEDULER_TaskCallback cont,
- void *cont_cls)
-{
- struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
- struct GNUNET_MessageHeader *msg;
- struct GNUNET_PeerIdentity peer;
- uint16_t size;
-
- if (NULL == handle->client)
- return NULL;
- GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
- size = ntohs (hello->size);
- GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
- if (GNUNET_OK !=
- GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
- &peer))
- {
- GNUNET_break (0);
- return NULL;
- }
-
- msg = GNUNET_malloc (size);
- memcpy (msg, hello, size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Offering HELLO message of `%s' to transport for validation.\n",
- GNUNET_i2s (&peer));
-
- ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
- ohh->th = handle;
- ohh->cont = cont;
- ohh->cls = cont_cls;
- ohh->msg = msg;
- ohh->tth = schedule_control_transmit (handle,
- size,
- &send_hello,
- ohh);
- GNUNET_CONTAINER_DLL_insert (handle->oh_head,
- handle->oh_tail,
- ohh);
- return ohh;
-}
-
-
-/**
- * Cancel the request to transport to offer the HELLO message
- *
- * @param ohh the handle for the operation to cancel
- */
-void
-GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
-{
- struct GNUNET_TRANSPORT_Handle *th = ohh->th;
-
- cancel_control_transmit (ohh->th, ohh->tth);
- GNUNET_CONTAINER_DLL_remove (th->oh_head,
- th->oh_tail,
- ohh);
- GNUNET_free (ohh->msg);
- GNUNET_free (ohh);
+ GNUNET_MQ_send (handle->mq,
+ env);
}
@@ -1506,76 +1100,6 @@ GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
/**
- * Task to call the HelloUpdateCallback of the GetHelloHandle
- *
- * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
- */
-static void
-call_hello_update_cb_async (void *cls)
-{
- struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
-
- GNUNET_assert (NULL != ghh->handle->my_hello);
- GNUNET_assert (NULL != ghh->notify_task);
- ghh->notify_task = NULL;
- ghh->rec (ghh->rec_cls,
- ghh->handle->my_hello);
-}
-
-
-/**
- * Obtain the HELLO message for this peer. The callback given in this function
- * is never called synchronously.
- *
- * @param handle connection to transport service
- * @param rec function to call with the HELLO, sender will be our peer
- * identity; message and sender will be NULL on timeout
- * (handshake with transport service pending/failed).
- * cost estimate will be 0.
- * @param rec_cls closure for @a rec
- * @return handle to cancel the operation
- */
-struct GNUNET_TRANSPORT_GetHelloHandle *
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
- GNUNET_TRANSPORT_HelloUpdateCallback rec,
- void *rec_cls)
-{
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
-
- hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
- hwl->rec = rec;
- hwl->rec_cls = rec_cls;
- hwl->handle = handle;
- GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
- handle->hwl_tail,
- hwl);
- if (NULL != handle->my_hello)
- hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
- hwl);
- return hwl;
-}
-
-
-/**
- * Stop receiving updates about changes to our HELLO message.
- *
- * @param ghh handle to cancel
- */
-void
-GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
-{
- struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
-
- if (NULL != ghh->notify_task)
- GNUNET_SCHEDULER_cancel (ghh->notify_task);
- GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
- handle->hwl_tail,
- ghh);
- GNUNET_free (ghh);
-}
-
-
-/**
* Connect to the transport service. Note that the connection may
* complete (or fail) asynchronously.
*
@@ -1629,40 +1153,35 @@ GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
GNUNET_TRANSPORT_NotifyDisconnect nd,
GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
{
- struct GNUNET_TRANSPORT_Handle *ret;
+ struct GNUNET_TRANSPORT_Handle *h;
- ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
+ h = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
if (NULL != self)
{
- ret->self = *self;
- ret->check_self = GNUNET_YES;
+ h->self = *self;
+ h->check_self = GNUNET_YES;
}
- ret->cfg = cfg;
- ret->cls = cls;
- ret->rec = rec;
- ret->nc_cb = nc;
- ret->nd_cb = nd;
- ret->neb_cb = neb;
- ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+ h->cfg = cfg;
+ h->cls = cls;
+ h->rec = rec;
+ h->nc_cb = nc;
+ h->nd_cb = nd;
+ h->neb_cb = neb;
+ h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Connecting to transport service.\n");
- ret->client = GNUNET_CLIENT_connect ("transport",
- cfg);
- if (NULL == ret->client)
+ reconnect (h);
+ if (NULL == h->mq)
{
- GNUNET_free (ret);
+ GNUNET_free (h);
return NULL;
}
- ret->neighbours =
+ h->neighbours =
GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
GNUNET_YES);
- ret->ready_heap =
+ h->ready_heap =
GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- schedule_control_transmit (ret,
- sizeof (struct StartMessage),
- &send_start,
- ret);
- return ret;
+ return h;
}
@@ -1694,8 +1213,6 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
}
GNUNET_free_non_null (handle->my_hello);
handle->my_hello = NULL;
- GNUNET_assert (NULL == handle->hwl_head);
- GNUNET_assert (NULL == handle->hwl_tail);
GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
handle->ready_heap = NULL;
GNUNET_free (handle);
diff --git a/src/transport/transport_api_get_hello.c b/src/transport/transport_api_get_hello.c
index 8087159c6..9a65616a9 100644
--- a/src/transport/transport_api_get_hello.c
+++ b/src/transport/transport_api_get_hello.c
@@ -34,25 +34,20 @@
/**
- * Linked list of functions to call whenever our HELLO is updated.
+ * Functions to call with this peer's HELLO.
*/
struct GNUNET_TRANSPORT_GetHelloHandle
{
/**
- * This is a doubly linked list.
+ * Our configuration.
*/
- struct GNUNET_TRANSPORT_GetHelloHandle *next;
-
- /**
- * This is a doubly linked list.
- */
- struct GNUNET_TRANSPORT_GetHelloHandle *prev;
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
* Transport handle.
*/
- struct GNUNET_TRANSPORT_Handle *handle;
+ struct GNUNET_MQ_Handle *mq;
/**
* Callback to call once we got our HELLO.
@@ -60,34 +55,158 @@ struct GNUNET_TRANSPORT_GetHelloHandle
GNUNET_TRANSPORT_HelloUpdateCallback rec;
/**
+ * Closure for @e rec.
+ */
+ void *rec_cls;
+
+ /**
* Task for calling the HelloUpdateCallback when we already have a HELLO
*/
struct GNUNET_SCHEDULER_Task *notify_task;
/**
- * Closure for @e rec.
+ * ID of the task trying to reconnect to the service.
*/
- void *rec_cls;
+ struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+ /**
+ * Delay until we try to reconnect.
+ */
+ struct GNUNET_TIME_Relative reconnect_delay;
};
+/**
+ * Function we use for checking incoming HELLO messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_hello (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PeerIdentity me;
+
+ if (GNUNET_OK !=
+ GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
+ &me))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_i2s (&me));
+ return GNUNET_OK;
+}
+
/**
- * Task to call the HelloUpdateCallback of the GetHelloHandle
+ * Function we use for handling incoming HELLO messages.
*
- * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
+ * @param cls closure, a `struct GNUNET_TRANSPORT_GetHelloHandle *`
+ * @param msg message received
*/
static void
-call_hello_update_cb_async (void *cls)
+handle_hello (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
- GNUNET_assert (NULL != ghh->handle->my_hello);
- GNUNET_assert (NULL != ghh->notify_task);
- ghh->notify_task = NULL;
ghh->rec (ghh->rec_cls,
- ghh->handle->my_hello);
+ msg);
+}
+
+
+/**
+ * Function that will schedule the job that will try
+ * to connect us again to the client.
+ *
+ * @param ghh transport service to reconnect
+ */
+static void
+schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh);
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Error receiving from transport service, disconnecting temporarily.\n");
+ GNUNET_MQ_destroy (ghh->mq);
+ ghh->mq = NULL;
+ schedule_reconnect (ghh);
+}
+
+
+/**
+ * Try again to connect to transport service.
+ *
+ * @param cls the handle to the transport service
+ */
+static void
+reconnect (void *cls)
+{
+ GNUNET_MQ_hd_var_size (hello,
+ GNUNET_MESSAGE_TYPE_HELLO,
+ struct GNUNET_MessageHeader);
+ struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_hello_handler (ghh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct StartMessage *s;
+
+ ghh->reconnect_task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Connecting to transport service.\n");
+ GNUNET_assert (NULL == ghh->mq);
+ ghh->mq = GNUNET_CLIENT_connecT (ghh->cfg,
+ "transport",
+ handlers,
+ &mq_error_handler,
+ ghh);
+ if (NULL == ghh->mq)
+ return;
+ env = GNUNET_MQ_msg (s,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+ s->options = htonl (0);
+ GNUNET_MQ_send (ghh->mq,
+ env);
+}
+
+
+/**
+ * Function that will schedule the job that will try
+ * to connect us again to the client.
+ *
+ * @param ghh transport service to reconnect
+ */
+static void
+schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
+{
+ ghh->reconnect_task =
+ GNUNET_SCHEDULER_add_delayed (ghh->reconnect_delay,
+ &reconnect,
+ ghh);
+ ghh->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ghh->reconnect_delay);
}
@@ -95,7 +214,7 @@ call_hello_update_cb_async (void *cls)
* Obtain the HELLO message for this peer. The callback given in this function
* is never called synchronously.
*
- * @param handle connection to transport service
+ * @param cfg configuration
* @param rec function to call with the HELLO, sender will be our peer
* identity; message and sender will be NULL on timeout
* (handshake with transport service pending/failed).
@@ -104,23 +223,23 @@ call_hello_update_cb_async (void *cls)
* @return handle to cancel the operation
*/
struct GNUNET_TRANSPORT_GetHelloHandle *
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
GNUNET_TRANSPORT_HelloUpdateCallback rec,
void *rec_cls)
{
- struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
-
- hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
- hwl->rec = rec;
- hwl->rec_cls = rec_cls;
- hwl->handle = handle;
- GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
- handle->hwl_tail,
- hwl);
- if (NULL != handle->my_hello)
- hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
- hwl);
- return hwl;
+ struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
+
+ ghh = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
+ ghh->rec = rec;
+ ghh->rec_cls = rec_cls;
+ ghh->cfg = cfg;
+ reconnect (ghh);
+ if (NULL == ghh->mq)
+ {
+ GNUNET_free (ghh);
+ return NULL;
+ }
+ return ghh;
}
@@ -132,15 +251,13 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
void
GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
{
- struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
-
- if (NULL != ghh->notify_task)
- GNUNET_SCHEDULER_cancel (ghh->notify_task);
- GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
- handle->hwl_tail,
- ghh);
+ if (NULL != ghh->mq)
+ {
+ GNUNET_MQ_destroy (ghh->mq);
+ ghh->mq = NULL;
+ }
GNUNET_free (ghh);
}
-/* end of transport_api_hello.c */
+/* end of transport_api_get_hello.c */
diff --git a/src/transport/transport_api_offer_hello.c b/src/transport/transport_api_offer_hello.c
index 0abce2d62..951ab9ba4 100644
--- a/src/transport/transport_api_offer_hello.c
+++ b/src/transport/transport_api_offer_hello.c
@@ -23,31 +23,23 @@
* @brief library to offer HELLOs to transport service
* @author Christian Grothoff
*/
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_hello_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_transport_service.h"
+
/**
* Entry in linked list for all offer-HELLO requests.
*/
struct GNUNET_TRANSPORT_OfferHelloHandle
{
- /**
- * For the DLL.
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
-
- /**
- * For the DLL.
- */
- struct GNUNET_TRANSPORT_OfferHelloHandle *next;
/**
* Transport service handle we use for transmission.
*/
- struct GNUNET_TRANSPORT_Handle *th;
-
- /**
- * Transmission handle for this request.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *tth;
+ struct GNUNET_MQ_Handle *mq;
/**
* Function to call once we are done.
@@ -59,20 +51,31 @@ struct GNUNET_TRANSPORT_OfferHelloHandle
*/
void *cls;
- /**
- * The HELLO message to be transmitted.
- */
- struct GNUNET_MessageHeader *msg;
};
+/**
+ * Done sending HELLO message to the service, notify application.
+ *
+ * @param cls the handle for the operation
+ */
+static void
+finished_hello (void *cls)
+{
+ struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
+
+ if (NULL != ohh->cont)
+ ohh->cont (ohh->cls);
+ GNUNET_TRANSPORT_offer_hello_cancel (ohh);
+}
+
/**
* Offer the transport service the HELLO of another peer. Note that
* the transport service may just ignore this message if the HELLO is
* malformed or useless due to our local configuration.
*
- * @param handle connection to transport service
+ * @param cfg configuration
* @param hello the hello message
* @param cont continuation to call when HELLO has been sent,
* tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
@@ -83,46 +86,43 @@ struct GNUNET_TRANSPORT_OfferHelloHandle
*
*/
struct GNUNET_TRANSPORT_OfferHelloHandle *
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_MessageHeader *hello,
GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls)
{
- struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
- struct GNUNET_MessageHeader *msg;
+ struct GNUNET_TRANSPORT_OfferHelloHandle *ohh
+ = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
+ struct GNUNET_MQ_Envelope *env;
struct GNUNET_PeerIdentity peer;
- uint16_t size;
- if (NULL == handle->mq)
- return NULL;
- GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
- size = ntohs (hello->size);
- GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
if (GNUNET_OK !=
GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
&peer))
{
GNUNET_break (0);
+ GNUNET_free (ohh);
+ return NULL;
+ }
+ ohh->mq = GNUNET_CLIENT_connecT (cfg,
+ "transport",
+ NULL,
+ NULL,
+ ohh);
+ if (NULL == ohh->mq)
+ {
+ GNUNET_free (ohh);
return NULL;
}
-
- msg = GNUNET_malloc (size);
- memcpy (msg, hello, size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Offering HELLO message of `%s' to transport for validation.\n",
- GNUNET_i2s (&peer));
- ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
- ohh->th = handle;
ohh->cont = cont;
ohh->cls = cont_cls;
- ohh->msg = msg;
- ohh->tth = schedule_control_transmit (handle,
- size,
- &send_hello,
- ohh);
- GNUNET_CONTAINER_DLL_insert (handle->oh_head,
- handle->oh_tail,
- ohh);
+ GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
+ env = GNUNET_MQ_msg_copy (hello);
+ GNUNET_MQ_notify_sent (env,
+ &finished_hello,
+ ohh);
+ GNUNET_MQ_send (ohh->mq,
+ env);
return ohh;
}
@@ -135,13 +135,7 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
void
GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
{
- struct GNUNET_TRANSPORT_Handle *th = ohh->th;
-
- cancel_control_transmit (ohh->th, ohh->tth);
- GNUNET_CONTAINER_DLL_remove (th->oh_head,
- th->oh_tail,
- ohh);
- GNUNET_free (ohh->msg);
+ GNUNET_MQ_destroy (ohh->mq);
GNUNET_free (ohh);
}