summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-03 21:25:59 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-03 21:26:08 +0200
commit670ebb20b9570120df1021e467b575a212743125 (patch)
treee94b1e8e2bb52da66b426c861f34f8e950dcf8ed
parentf13792325fc3f7e49ec2b0880eb4f1aa978e00d7 (diff)
allow applications expressing connection preferences directly to TNG, collect HELLOs from PEERSTORE for expressed prefs
-rw-r--r--src/ats/gnunet-service-ats-new.c16
-rw-r--r--src/include/gnunet_ats_application_service.h6
-rw-r--r--src/include/gnunet_peerstore_service.h4
-rw-r--r--src/include/gnunet_protocols.h12
-rw-r--r--src/include/gnunet_transport_application_service.h100
-rw-r--r--src/transport/Makefile.am10
-rw-r--r--src/transport/gnunet-service-tng.c628
-rw-r--r--src/transport/transport.h32
-rw-r--r--src/transport/transport_api2_application.c366
9 files changed, 908 insertions, 266 deletions
diff --git a/src/ats/gnunet-service-ats-new.c b/src/ats/gnunet-service-ats-new.c
index a1666d8d3..f2bc1de7f 100644
--- a/src/ats/gnunet-service-ats-new.c
+++ b/src/ats/gnunet-service-ats-new.c
@@ -304,7 +304,7 @@ prop_ntoh (const struct PropertiesNBO *properties,
*/
static void
handle_suggest (void *cls,
- const struct ExpressPreferenceMessage *msg)
+ const struct ExpressPreferenceMessage *msg)
{
struct Client *c = cls;
struct ClientPreference *cp;
@@ -344,7 +344,7 @@ handle_suggest (void *cls,
*/
static void
handle_suggest_cancel (void *cls,
- const struct ExpressPreferenceMessage *msg)
+ const struct ExpressPreferenceMessage *msg)
{
struct Client *c = cls;
struct ClientPreference *cp;
@@ -772,13 +772,13 @@ GNUNET_SERVICE_MAIN
&client_disconnect_cb,
NULL,
GNUNET_MQ_hd_fixed_size (suggest,
- GNUNET_MESSAGE_TYPE_ATS_SUGGEST,
- struct ExpressPreferenceMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_ATS_SUGGEST,
+ struct ExpressPreferenceMessage,
+ NULL),
GNUNET_MQ_hd_fixed_size (suggest_cancel,
- GNUNET_MESSAGE_TYPE_ATS_SUGGEST_CANCEL,
- struct ExpressPreferenceMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_ATS_SUGGEST_CANCEL,
+ struct ExpressPreferenceMessage,
+ NULL),
GNUNET_MQ_hd_fixed_size (start,
GNUNET_MESSAGE_TYPE_ATS_START,
struct GNUNET_MessageHeader,
diff --git a/src/include/gnunet_ats_application_service.h b/src/include/gnunet_ats_application_service.h
index e942ca4d8..fbc6f48ac 100644
--- a/src/include/gnunet_ats_application_service.h
+++ b/src/include/gnunet_ats_application_service.h
@@ -83,9 +83,9 @@ struct GNUNET_ATS_ApplicationSuggestHandle;
*/
struct GNUNET_ATS_ApplicationSuggestHandle *
GNUNET_ATS_application_suggest (struct GNUNET_ATS_ApplicationHandle *ch,
- const struct GNUNET_PeerIdentity *peer,
- enum GNUNET_MQ_PreferenceKind pk,
- struct GNUNET_BANDWIDTH_Value32NBO bw);
+ const struct GNUNET_PeerIdentity *peer,
+ enum GNUNET_MQ_PreferenceKind pk,
+ struct GNUNET_BANDWIDTH_Value32NBO bw);
/**
diff --git a/src/include/gnunet_peerstore_service.h b/src/include/gnunet_peerstore_service.h
index 55f371399..31567c004 100644
--- a/src/include/gnunet_peerstore_service.h
+++ b/src/include/gnunet_peerstore_service.h
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- Copyright (C)
+ Copyright (C) GNUnet e.V. 2004--2019
GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
@@ -11,7 +11,7 @@
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 46620b829..7f1667d51 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3177,6 +3177,18 @@ extern "C"
*/
#define GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_FC_LIMITS 1276
+/**
+ * Type of the 'struct ExpressPreferenceMessage' send by clients to TRANSPORT
+ * to establish bandwidth preference.
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST 1300
+
+/**
+ * Type of the 'struct ExpressPreferenceMessage' send by clients to TRANSPORT
+ * to abandon bandwidth preference.
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL 1301
+
/* ************** NEW (NG) ATS Messages ************* */
diff --git a/src/include/gnunet_transport_application_service.h b/src/include/gnunet_transport_application_service.h
new file mode 100644
index 000000000..31097b88e
--- /dev/null
+++ b/src/include/gnunet_transport_application_service.h
@@ -0,0 +1,100 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2010-2015, 2018, 2019 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file
+ * Bandwidth allocation API for applications to interact with
+ *
+ * @author Christian Grothoff
+ * @author Matthias Wachs
+ *
+ * @defgroup TRANSPORT service
+ * Bandwidth allocation
+ *
+ * @{
+ */
+#ifndef GNUNET_TRANSPORT_APPLICATION_SERVICE_H
+#define GNUNET_TRANSPORT_APPLICATION_SERVICE_H
+
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+
+/**
+ * Handle to the TRANSPORT subsystem for making suggestions about
+ * connections the peer would like to have.
+ */
+struct GNUNET_TRANSPORT_ApplicationHandle;
+
+
+/**
+ * Initialize the TRANSPORT application client handle.
+ *
+ * @param cfg configuration to use
+ * @return ats application handle, NULL on error
+ */
+struct GNUNET_TRANSPORT_ApplicationHandle *
+GNUNET_TRANSPORT_application_init (const struct GNUNET_CONFIGURATION_Handle *cfg);
+
+
+/**
+ * Shutdown TRANSPORT application client.
+ *
+ * @param ch handle to destroy
+ */
+void
+GNUNET_TRANSPORT_application_done (struct GNUNET_TRANSPORT_ApplicationHandle *ch);
+
+
+/**
+ * Handle for suggestion requests.
+ */
+struct GNUNET_TRANSPORT_ApplicationSuggestHandle;
+
+
+/**
+ * An application would like to communicate with a peer. TRANSPORT should
+ * allocate bandwith using a suitable address for requiremetns @a pk
+ * to transport.
+ *
+ * @param ch handle
+ * @param peer identity of the peer we need an address for
+ * @param pk what kind of application will the application require (can be
+ * #GNUNET_MQ_PREFERENCE_NONE, we will still try to connect)
+ * @param bw desired bandwith, can be zero (we will still try to connect)
+ * @return suggestion handle, NULL if request is already pending
+ */
+struct GNUNET_TRANSPORT_ApplicationSuggestHandle *
+GNUNET_TRANSPORT_application_suggest (struct GNUNET_TRANSPORT_ApplicationHandle *ch,
+ const struct GNUNET_PeerIdentity *peer,
+ enum GNUNET_MQ_PreferenceKind pk,
+ struct GNUNET_BANDWIDTH_Value32NBO bw);
+
+
+/**
+ * We no longer care about communicating with a peer.
+ *
+ * @param sh handle
+ */
+void
+GNUNET_TRANSPORT_application_suggest_cancel (struct GNUNET_TRANSPORT_ApplicationSuggestHandle *sh);
+
+/** @} */ /* end of group */
+
+#endif
+/* end of file gnunet_ats_application_service.h */
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index cd31f7cd7..2865460fd 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -155,6 +155,7 @@ endif
lib_LTLIBRARIES = \
libgnunettransport.la \
libgnunettransportaddress.la \
+ libgnunettransportapplication.la \
libgnunettransportcore.la \
libgnunettransportcommunicator.la \
libgnunettransportmonitor.la \
@@ -196,6 +197,14 @@ libgnunettransport_la_LDFLAGS = \
$(GN_LIB_LDFLAGS) $(WINFLAGS) \
-version-info 4:0:2
+libgnunettransportapplication_la_SOURCES = \
+ transport_api2_application.c
+libgnunettransportapplication_la_LIBADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(LTLIBINTL)
+libgnunettransportapplication_la_LDFLAGS = \
+ $(GN_LIB_LDFLAGS) $(WINFLAGS) \
+ -version-info 0:0:0
libgnunettransportaddress_la_SOURCES = \
@@ -360,7 +369,6 @@ gnunet_service_transport_CFLAGS = \
gnunet_service_tng_SOURCES = \
gnunet-service-tng.c
gnunet_service_tng_LDADD = \
- $(top_builddir)/src/ats/libgnunetatstransport.la \
$(top_builddir)/src/peerstore/libgnunetpeerstore.la \
$(top_builddir)/src/hello/libgnunethello.la \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index b64bfb182..6494a5dfd 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -41,7 +41,7 @@
* #3 transport should use validation to also establish
* effective flow control (for uni-directional transports!)
* #4 UDP broadcasting logic must be extended to use the new API
- * #5 only validated addresses go to ATS for scheduling; that
+ * #5 only validated addresses are selected for scheduling; that
* also ensures we know the RTT
* #6 to ensure flow control and RTT are OK, we always do the
* 'validation', even if address comes from PEERSTORE
@@ -59,10 +59,7 @@
* -
*
* Easy:
- * - use ATS bandwidth allocation callback and schedule transmissions!
- *
- * Plan:
- * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
+ * - figure out how to call XXX_suggestion_cb!
*
* Later:
* - change transport-core API to provide proper flow control in both
@@ -98,8 +95,6 @@
* "latest timestamps seen" data
* - if transport implements DV, we likely need a 3rd peermap
* in addition to ephemerals and (direct) neighbours
- * => in this data structure, we should track ATS metrics (distance, RTT, etc.)
- * as well as latest timestamps seen, goodput, fragments for transmission, etc.
* ==> check if stuff needs to be moved out of "Neighbour"
* - transport should encapsualte core-level messages and do its
* own ACKing for RTT/goodput/loss measurements _and_ fragment
@@ -111,7 +106,6 @@
#include "gnunet_transport_monitor_service.h"
#include "gnunet_peerstore_service.h"
#include "gnunet_hello_lib.h"
-#include "gnunet_ats_transport_service.h"
#include "gnunet_signatures.h"
#include "transport.h"
@@ -148,18 +142,11 @@
#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
/**
- * How many messages can we have pending for a given session (queue to
+ * How many messages can we have pending for a given queue (queue to
* a particular peer via a communicator) process before we start to
* throttle that queue?
- *
- * Used if ATS assigns more bandwidth to a particular transmission
- * method than that transmission method can right now handle. (Yes,
- * ATS should eventually notice utilization below allocation and
- * adjust, but we don't want to queue up tons of messages in the
- * meantime). Must be significantly below
- * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
*/
-#define SESSION_QUEUE_LIMIT 32
+#define QUEUE_LENGTH_LIMIT 32
GNUNET_NETWORK_STRUCT_BEGIN
@@ -547,7 +534,6 @@ struct TransportDVBox
GNUNET_NETWORK_STRUCT_END
-
/**
* What type of client is the `struct TransportClient` about?
*/
@@ -571,7 +557,12 @@ enum ClientType
/**
* It is a communicator, use for communication.
*/
- CT_COMMUNICATOR = 3
+ CT_COMMUNICATOR = 3,
+
+ /**
+ * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
+ */
+ CT_APPLICATION = 4
};
@@ -725,11 +716,18 @@ struct DistanceVector
/**
+ * A queue is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
+ */
+struct Queue;
+
+
+/**
* Entry identifying transmission in one of our `struct
- * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to
+ * Queue` which still awaits an ACK. This is used to
* ensure we do not overwhelm a communicator and limit the number of
* messages outstanding per communicator (say in case communicator is
- * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
+ * CPU bound) and per queue (in case bandwidth allocation exceeds
* what the communicator can actually provide towards a particular
* peer/target).
*/
@@ -747,9 +745,9 @@ struct QueueEntry
struct QueueEntry *prev;
/**
- * ATS session this entry is queued with.
+ * Queue this entry is queued with.
*/
- struct GNUNET_ATS_Session *session;
+ struct Queue *queue;
/**
* Message ID used for this message with the queue used for transmission.
@@ -759,30 +757,30 @@ struct QueueEntry
/**
- * An ATS session is a message queue provided by a communicator
+ * A queue is a message queue provided by a communicator
* via which we can reach a particular neighbour.
*/
-struct GNUNET_ATS_Session
+struct Queue
{
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *next_neighbour;
+ struct Queue *next_neighbour;
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *prev_neighbour;
+ struct Queue *prev_neighbour;
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *prev_client;
+ struct Queue *prev_client;
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *next_client;
+ struct Queue *next_client;
/**
* Head of DLL of unacked transmission requests.
@@ -795,33 +793,28 @@ struct GNUNET_ATS_Session
struct QueueEntry *queue_tail;
/**
- * Which neighbour is this ATS session for?
+ * Which neighbour is this queue for?
*/
struct Neighbour *neighbour;
/**
- * Which communicator offers this ATS session?
+ * Which communicator offers this queue?
*/
struct TransportClient *tc;
/**
- * Address served by the ATS session.
+ * Address served by the queue.
*/
const char *address;
/**
- * Handle by which we inform ATS about this queue.
- */
- struct GNUNET_ATS_SessionRecord *sr;
-
- /**
* Task scheduled for the time when this queue can (likely) transmit the
* next message. Still needs to check with the @e tracker_out to be sure.
*/
struct GNUNET_SCHEDULER_Task *transmit_task;
/**
- * Our current RTT estimate for this ATS session.
+ * Our current RTT estimate for this queue.
*/
struct GNUNET_TIME_Relative rtt;
@@ -831,17 +824,17 @@ struct GNUNET_ATS_Session
uint64_t mid_gen;
/**
- * Unique identifier of this ATS session with the communicator.
+ * Unique identifier of this queue with the communicator.
*/
uint32_t qid;
/**
- * Maximum transmission unit supported by this ATS session.
+ * Maximum transmission unit supported by this queue.
*/
uint32_t mtu;
/**
- * Distance to the target of this ATS session.
+ * Distance to the target of this queue.
*/
uint32_t distance;
@@ -861,22 +854,22 @@ struct GNUNET_ATS_Session
unsigned int queue_length;
/**
- * Network type offered by this ATS session.
+ * Network type offered by this queue.
*/
enum GNUNET_NetworkType nt;
/**
- * Connection status for this ATS session.
+ * Connection status for this queue.
*/
enum GNUNET_TRANSPORT_ConnectionStatus cs;
/**
- * How much outbound bandwidth do we have available for this session?
+ * How much outbound bandwidth do we have available for this queue?
*/
struct GNUNET_BANDWIDTH_Tracker tracker_out;
/**
- * How much inbound bandwidth do we have available for this session?
+ * How much inbound bandwidth do we have available for this queue?
*/
struct GNUNET_BANDWIDTH_Tracker tracker_in;
};
@@ -1025,14 +1018,14 @@ struct Neighbour
struct DistanceVectorHop *dv_tail;
/**
- * Head of DLL of ATS sessions to this peer.
+ * Head of DLL of queues to this peer.
*/
- struct GNUNET_ATS_Session *session_head;
+ struct Queue *queue_head;
/**
- * Tail of DLL of ATS sessions to this peer.
+ * Tail of DLL of queues to this peer.
*/
- struct GNUNET_ATS_Session *session_tail;
+ struct Queue *queue_tail;
/**
* Task run to cleanup pending messages that have exceeded their timeout.
@@ -1040,13 +1033,12 @@ struct Neighbour
struct GNUNET_SCHEDULER_Task *timeout_task;
/**
- * Quota at which CORE is allowed to transmit to this peer
- * according to ATS.
+ * Quota at which CORE is allowed to transmit to this peer.
*
* FIXME: not yet used, tricky to get right given multiple queues!
- * (=> Idea: let ATS set a quota per queue and we add them up here?)
+ * (=> Idea: measure???)
* FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero (before ATS?)
+ * Options: start at a minimum value or at literally zero?
* (=> Current thought: clean would be zero!)
*/
struct GNUNET_BANDWIDTH_Value32NBO quota_out;
@@ -1060,6 +1052,40 @@ struct Neighbour
/**
+ * A peer that an application (client) would like us to talk to directly.
+ */
+struct PeerRequest
+{
+
+ /**
+ * Which peer is this about?
+ */
+ struct GNUNET_PeerIdentity pid;
+
+ /**
+ * Client responsible for the request.
+ */
+ struct TransportClient *tc;
+
+ /**
+ * Handle for watching the peerstore for HELLOs for this peer.
+ */
+ struct GNUNET_PEERSTORE_WatchContext *wc;
+
+ /**
+ * What kind of performance preference does this @e tc have?
+ */
+ enum GNUNET_MQ_PreferenceKind pk;
+
+ /**
+ * How much bandwidth would this @e tc like to see?
+ */
+ struct GNUNET_BANDWIDTH_Value32NBO bw;
+
+};
+
+
+/**
* Types of different pending messages.
*/
enum PendingMessageType
@@ -1362,12 +1388,12 @@ struct TransportClient
/**
* Head of DLL of queues offered by this communicator.
*/
- struct GNUNET_ATS_Session *session_head;
+ struct Queue *queue_head;
/**
* Tail of DLL of queues offered by this communicator.
*/
- struct GNUNET_ATS_Session *session_tail;
+ struct Queue *queue_tail;
/**
* Head of list of the addresses of this peer offered by this communicator.
@@ -1393,6 +1419,19 @@ struct TransportClient
} communicator;
+ /**
+ * Information for @e type #CT_APPLICATION
+ */
+ struct {
+
+ /**
+ * Map of requests for peers the given client application would like to
+ * see connections for. Maps from PIDs to `struct PeerRequest`.
+ */
+ struct GNUNET_CONTAINER_MultiPeerMap *requests;
+
+ } application;
+
} details;
};
@@ -1465,11 +1504,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
*/
static struct GNUNET_SCHEDULER_Task *ephemeral_task;
-/**
- * Our connection to ATS for allocation and bootstrapping.
- */
-static struct GNUNET_ATS_TransportHandle *ats;
-
/**
* Free cached ephemeral key.
@@ -1781,7 +1815,7 @@ free_neighbour (struct Neighbour *neighbour)
{
struct DistanceVectorHop *dvh;
- GNUNET_assert (NULL == neighbour->session_head);
+ GNUNET_assert (NULL == neighbour->queue_head);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (neighbours,
&neighbour->pid,
@@ -1886,7 +1920,7 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
* communicator for transmission (updating the tracker, and re-scheduling
* itself if applicable).
*
- * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ * @param cls the `struct Queue` to process transmissions for
*/
static void
transmit_on_queue (void *cls);
@@ -1902,7 +1936,7 @@ transmit_on_queue (void *cls);
* @param queue the queue to do scheduling for
*/
static void
-schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
+schedule_transmit_on_queue (struct Queue *queue)
{
struct Neighbour *n = queue->neighbour;
struct PendingMessage *pm = n->pending_msg_head;
@@ -1919,10 +1953,10 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
GNUNET_NO);
return;
}
- if (queue->queue_length >= SESSION_QUEUE_LIMIT)
+ if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
{
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to session queue limit",
+ "# Transmission throttled due to queue queue limit",
1,
GNUNET_NO);
return;
@@ -1958,15 +1992,15 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
/**
- * Free @a session.
+ * Free @a queue.
*
- * @param session the session to free
+ * @param queue the queue to free
*/
static void
-free_session (struct GNUNET_ATS_Session *session)
+free_queue (struct Queue *queue)
{
- struct Neighbour *neighbour = session->neighbour;
- struct TransportClient *tc = session->tc;
+ struct Neighbour *neighbour = queue->neighbour;
+ struct TransportClient *tc = queue->tc;
struct MonitorEvent me = {
.cs = GNUNET_TRANSPORT_CS_DOWN,
.rtt = GNUNET_TIME_UNIT_FOREVER_REL
@@ -1974,30 +2008,30 @@ free_session (struct GNUNET_ATS_Session *session)
struct QueueEntry *qe;
int maxxed;
- if (NULL != session->transmit_task)
+ if (NULL != queue->transmit_task)
{
- GNUNET_SCHEDULER_cancel (session->transmit_task);
- session->transmit_task = NULL;
+ GNUNET_SCHEDULER_cancel (queue->transmit_task);
+ queue->transmit_task = NULL;
}
GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->session_head,
- neighbour->session_tail,
- session);
+ neighbour->queue_head,
+ neighbour->queue_tail,
+ queue);
GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.communicator.session_head,
- tc->details.communicator.session_tail,
- session);
+ tc->details.communicator.queue_head,
+ tc->details.communicator.queue_tail,
+ queue);
maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
- while (NULL != (qe = session->queue_head))
+ while (NULL != (qe = queue->queue_head))
{
- GNUNET_CONTAINER_DLL_remove (session->queue_head,
- session->queue_tail,
+ GNUNET_CONTAINER_DLL_remove (queue->queue_head,
+ queue->queue_tail,
qe);
- session->queue_length--;
+ queue->queue_length--;
tc->details.communicator.total_queue_length--;
GNUNET_free (qe);
}
- GNUNET_assert (0 == session->queue_length);
+ GNUNET_assert (0 == queue->queue_length);
if ( (maxxed) &&
(COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
{
@@ -2006,20 +2040,19 @@ free_session (struct GNUNET_ATS_Session *session)
"# Transmission throttled due to communicator queue limit",
-1,
GNUNET_NO);
- for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
+ for (struct Queue *s = tc->details.communicator.queue_head;
NULL != s;
s = s->next_client)
schedule_transmit_on_queue (s);
}
notify_monitors (&neighbour->pid,
- session->address,
- session->nt,
+ queue->address,
+ queue->nt,
&me);
- GNUNET_ATS_session_del (session->sr);
- GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
- GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
- GNUNET_free (session);
- if (NULL == neighbour->session_head)
+ GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
+ GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
+ GNUNET_free (queue);
+ if (NULL == neighbour->queue_head)
{
cores_send_disconnect_info (&neighbour->pid);
free_neighbour (neighbour);
@@ -2055,6 +2088,33 @@ free_address_list_entry (struct AddressListEntry *ale)
/**
+ * Stop the peer request in @a value.
+ *
+ * @param cls a `struct TransportClient` that no longer makes the request
+ * @param pid the peer's identity
+ * @param value a `struct PeerRequest`
+ * @return #GNUNET_YES (always)
+ */
+static int
+stop_peer_request (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct TransportClient *tc = cls;
+ struct PeerRequest *pr = value;
+
+ GNUNET_PEERSTORE_watch_cancel (pr->wc);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
+ pid,
+ pr));
+ GNUNET_free (pr);
+
+ return GNUNET_OK;
+}
+
+
+/**
* Called whenever a client is disconnected. Frees our
* resources associated with that client.
*
@@ -2097,16 +2157,22 @@ client_disconnect_cb (void *cls,
break;
case CT_COMMUNICATOR:
{
- struct GNUNET_ATS_Session *q;
+ struct Queue *q;
struct AddressListEntry *ale;
- while (NULL != (q = tc->details.communicator.session_head))
- free_session (q);
+ while (NULL != (q = tc->details.communicator.queue_head))
+ free_queue (q);
while (NULL != (ale = tc->details.communicator.addr_head))
- free_address_list_entry (ale);
+ free_address_list_entry (ale);
GNUNET_free (tc->details.communicator.address_prefix);
}
break;
+ case CT_APPLICATION:
+ GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
+ &stop_peer_request,
+ tc);
+ GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
+ break;
}
GNUNET_free (tc);
}
@@ -2419,7 +2485,7 @@ handle_client_send (void *cls,
}
if (! was_empty)
return; /* all queues must already be busy */
- for (struct GNUNET_ATS_Session *queue = target->session_head;
+ for (struct Queue *queue = target->queue_head;
NULL != queue;
queue = queue->next_neighbour)
{
@@ -2491,7 +2557,7 @@ handle_communicator_available (void *cls,
*/
static int
check_communicator_backchannel (void *cls,
- const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
+ const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
{
const struct GNUNET_MessageHeader *inbox;
const char *is;
@@ -2565,10 +2631,10 @@ expire_ephemerals (void *cls)
*/
static void
lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
- struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
- struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
- struct GNUNET_TIME_Absolute *ephemeral_validity)
+ struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
+ struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
+ struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
+ struct GNUNET_TIME_Absolute *ephemeral_validity)
{
struct EphemeralCacheEntry *ece;
struct EphemeralConfirmation ec;
@@ -2643,7 +2709,7 @@ route_message (const struct GNUNET_PeerIdentity *target,
*/
static void
handle_communicator_backchannel (void *cls,
- const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
+ const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
{
struct TransportClient *tc = cls;
struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
@@ -2729,7 +2795,7 @@ store_pi (void *cls);
*/
static void
peerstore_store_cb (void *cls,
- int success)
+ int success)
{
struct AddressListEntry *ale = cls;
@@ -3178,7 +3244,7 @@ handle_fragment_box (void *cls,
if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
ack_now = GNUNET_YES; /* maximum acks received */
// FIXME: possibly also ACK based on RTT (but for that we'd need to
- // determine the session used for the ACK first!)
+ // determine the queue used for the ACK first!)
/* is reassembly complete? */
if (0 != rc->msg_missing)
@@ -3289,7 +3355,7 @@ handle_reliability_box (void *cls,
*/
static void
handle_reliability_ack (void *cls,
- const struct TransportReliabilityAckMessage *ra)
+ const struct TransportReliabilityAckMessage *ra)
{
struct CommunicatorMessageContext *cmc = cls;
@@ -3308,7 +3374,7 @@ handle_reliability_ack (void *cls,
*/
static int
check_backchannel_encapsulation (void *cls,
- const struct TransportBackchannelEncapsulationMessage *be)
+ const struct TransportBackchannelEncapsulationMessage *be)
{
uint16_t size = ntohs (be->header.size);
@@ -3329,7 +3395,7 @@ check_backchannel_encapsulation (void *cls,
*/
static void
handle_backchannel_encapsulation (void *cls,
- const struct TransportBackchannelEncapsulationMessage *be)
+ const struct TransportBackchannelEncapsulationMessage *be)
{
struct CommunicatorMessageContext *cmc = cls;
@@ -3361,7 +3427,7 @@ handle_backchannel_encapsulation (void *cls,
*/
static int
check_dv_learn (void *cls,
- const struct TransportDVLearn *dvl)
+ const struct TransportDVLearn *dvl)
{
uint16_t size = ntohs (dvl->header.size);
uint16_t num_hops = ntohs (dvl->num_hops);
@@ -3375,15 +3441,15 @@ check_dv_learn (void *cls,
for (unsigned int i=0;i<num_hops;i++)
{
if (0 == memcmp (&dvl->initiator,
- &hops[i],
- sizeof (struct GNUNET_PeerIdentity)))
+ &hops[i],
+ sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
if (0 == memcmp (&GST_my_identity,
- &hops[i],
- sizeof (struct GNUNET_PeerIdentity)))
+ &hops[i],
+ sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
@@ -3401,7 +3467,7 @@ check_dv_learn (void *cls,
*/
static void
handle_dv_learn (void *cls,
- const struct TransportDVLearn *dvl)
+ const struct TransportDVLearn *dvl)
{
struct CommunicatorMessageContext *cmc = cls;
@@ -3420,7 +3486,7 @@ handle_dv_learn (void *cls,
*/
static int
check_dv_box (void *cls,
- const struct TransportDVBox *dvb)
+ const struct TransportDVBox *dvb)
{
uint16_t size = ntohs (dvb->header.size);
uint16_t num_hops = ntohs (dvb->num_hops);
@@ -3614,12 +3680,12 @@ check_add_queue_message (void *cls,
* Bandwidth tracker informs us that the delay until we should receive
* more has changed.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
+ * @param cls a `struct Queue` for which the delay changed
*/
static void
tracker_update_in_cb (void *cls)
{
- struct GNUNET_ATS_Session *queue = cls;
+ struct Queue *queue = cls;
struct GNUNET_TIME_Relative in_delay;
unsigned int rsize;
@@ -3816,12 +3882,12 @@ reliability_box_message (struct PendingMessage *pm)
* communicator for transmission (updating the tracker, and re-scheduling
* itself if applicable).
*
- * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ * @param cls the `struct Queue` to process transmissions for
*/
static void
transmit_on_queue (void *cls)
{
- struct GNUNET_ATS_Session *queue = cls;
+ struct Queue *queue = cls;
struct Neighbour *n = queue->neighbour;
struct QueueEntry *qe;
struct PendingMessage *pm;
@@ -3871,7 +3937,7 @@ transmit_on_queue (void *cls)
/* Pass 's' for transission to the communicator */
qe = GNUNET_new (struct QueueEntry);
qe->mid = queue->mid_gen++;
- qe->session = queue;
+ qe->queue = queue;
// qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
GNUNET_CONTAINER_DLL_insert (queue->queue_head,
queue->queue_tail,
@@ -4007,12 +4073,12 @@ transmit_on_queue (void *cls)
* Bandwidth tracker informs us that the delay until we
* can transmit again changed.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
+ * @param cls a `struct Queue` for which the delay changed
*/
static void
tracker_update_out_cb (void *cls)
{
- struct GNUNET_ATS_Session *queue = cls;
+ struct Queue *queue = cls;
struct Neighbour *n = queue->neighbour;
if (NULL == n->pending_msg_head)
@@ -4032,7 +4098,7 @@ tracker_update_out_cb (void *cls)
* Bandwidth tracker informs us that excessive outbound bandwidth was
* allocated which is not being used.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
+ * @param cls a `struct Queue` for which the excess was noted
*/
static void
tracker_excess_out_cb (void *cls)
@@ -4041,7 +4107,7 @@ tracker_excess_out_cb (void *cls)
this is done internally within transport_api2_core already,
but we probably want to change the logic and trigger it
from here via a message instead! */
- /* TODO: maybe inform ATS at this point? */
+ /* TODO: maybe inform someone at this point? */
GNUNET_STATISTICS_update (GST_stats,
"# Excess outbound bandwidth reported",
1,
@@ -4054,12 +4120,12 @@ tracker_excess_out_cb (void *cls)
* Bandwidth tracker informs us that excessive inbound bandwidth was allocated
* which is not being used.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
+ * @param cls a `struct Queue` for which the excess was noted
*/
static void
tracker_excess_in_cb (void *cls)
{
- /* TODO: maybe inform ATS at this point? */
+ /* TODO: maybe inform somone at this point? */
GNUNET_STATISTICS_update (GST_stats,
"# Excess inbound bandwidth reported",
1,
@@ -4078,7 +4144,7 @@ handle_add_queue_message (void *cls,
const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
{
struct TransportClient *tc = cls;
- struct GNUNET_ATS_Session *queue;
+ struct Queue *queue;
struct Neighbour *neighbour;
const char *addr;
uint16_t addr_len;
@@ -4108,7 +4174,7 @@ handle_add_queue_message (void *cls,
addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
addr = (const char *) &aqm[1];
- queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
+ queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
queue->tc = tc;
queue->address = (const char *) &queue[1];
queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
@@ -4134,38 +4200,6 @@ handle_add_queue_message (void *cls,
memcpy (&queue[1],
addr,
addr_len);
- /* notify ATS about new queue */
- {
- struct GNUNET_ATS_Properties prop = {
- .delay = GNUNET_TIME_UNIT_FOREVER_REL,
- .mtu = queue->mtu,
- .nt = queue->nt,
- .cc = tc->details.communicator.cc
- };
-
- queue->sr = GNUNET_ATS_session_add (ats,
- &neighbour->pid,
- queue->address,
- queue,
- &prop);
- if (NULL == queue->sr)
- {
- /* This can only happen if the 'address' was way too long for ATS
- (approaching 64k in strlen()!). In this case, the communicator
- must be buggy and we drop it. */
- GNUNET_break (0);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
- GNUNET_free (queue);
- if (NULL == neighbour->session_head)
- {
- cores_send_disconnect_info (&neighbour->pid);
- free_neighbour (neighbour);
- }
- GNUNET_SERVICE_client_drop (tc->client);
- return;
- }
- }
/* notify monitors about new queue */
{
struct MonitorEvent me = {
@@ -4179,12 +4213,12 @@ handle_add_queue_message (void *cls,
&me);
}
GNUNET_CONTAINER_MDLL_insert (neighbour,
- neighbour->session_head,
- neighbour->session_tail,
+ neighbour->queue_head,
+ neighbour->queue_tail,
queue);
GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.communicator.session_head,
- tc->details.communicator.session_tail,
+ tc->details.communicator.queue_head,
+ tc->details.communicator.queue_tail,
queue);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -4208,18 +4242,18 @@ handle_del_queue_message (void *cls,
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
- NULL != session;
- session = session->next_client)
+ for (struct Queue *queue = tc->details.communicator.queue_head;
+ NULL != queue;
+ queue = queue->next_client)
{
- struct Neighbour *neighbour = session->neighbour;
+ struct Neighbour *neighbour = queue->neighbour;
- if ( (dqm->qid != session->qid) ||
+ if ( (dqm->qid != queue->qid) ||
(0 != memcmp (&dqm->receiver,
&neighbour->pid,
sizeof (struct GNUNET_PeerIdentity))) )
continue;
- free_session (session);
+ free_queue (queue);
GNUNET_SERVICE_client_continue (tc->client);
return;
}
@@ -4239,7 +4273,7 @@ handle_send_message_ack (void *cls,
const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
{
struct TransportClient *tc = cls;
- struct QueueEntry *queue;
+ struct QueueEntry *qe;
if (CT_COMMUNICATOR != tc->type)
{
@@ -4249,37 +4283,37 @@ handle_send_message_ack (void *cls,
}
/* find our queue entry matching the ACK */
- queue = NULL;
- for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
- NULL != session;
- session = session->next_client)
+ qe = NULL;
+ for (struct Queue *queue = tc->details.communicator.queue_head;
+ NULL != queue;
+ queue = queue->next_client)
{
- if (0 != memcmp (&session->neighbour->pid,
+ if (0 != memcmp (&queue->neighbour->pid,
&sma->receiver,
sizeof (struct GNUNET_PeerIdentity)))
continue;
- for (struct QueueEntry *qe = session->queue_head;
- NULL != qe;
- qe = qe->next)
+ for (struct QueueEntry *qep = queue->queue_head;
+ NULL != qep;
+ qep = qep->next)
{
- if (qe->mid != sma->mid)
- continue;
- queue = qe;
+ if (qep->mid != sma->mid)
+ continue;
+ qe = qep;
break;
}
break;
}
- if (NULL == queue)
+ if (NULL == qe)
{
/* this should never happen */
GNUNET_break (0);
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
- queue->session->queue_tail,
- queue);
- queue->session->queue_length--;
+ GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
+ qe->queue->queue_tail,
+ qe);
+ qe->queue->queue_length--;
tc->details.communicator.total_queue_length--;
GNUNET_SERVICE_client_continue (tc->client);
@@ -4291,19 +4325,19 @@ handle_send_message_ack (void *cls,
"# Transmission throttled due to communicator queue limit",
-1,
GNUNET_NO);
- for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
- NULL != session;
- session = session->next_client)
- schedule_transmit_on_queue (session);
+ for (struct Queue *queue = tc->details.communicator.queue_head;
+ NULL != queue;
+ queue = queue->next_client)
+ schedule_transmit_on_queue (queue);
}
- else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
+ else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
{
/* queue dropped below threshold; only resume this one queue */
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to session queue limit",
+ "# Transmission throttled due to queue queue limit",
-1,
GNUNET_NO);
- schedule_transmit_on_queue (queue->session);
+ schedule_transmit_on_queue (qe->queue);
}
/* TODO: we also should react on the status! */
@@ -4311,7 +4345,7 @@ handle_send_message_ack (void *cls,
// FIXME: react to communicator status about transmission request. We got:
sma->status; // OK success, SYSERR failure
- GNUNET_free (queue);
+ GNUNET_free (qe);
}
@@ -4333,7 +4367,7 @@ notify_client_queues (void *cls,
struct Neighbour *neighbour = value;
GNUNET_assert (CT_MONITOR == tc->type);
- for (struct GNUNET_ATS_Session *q = neighbour->session_head;
+ for (struct Queue *q = neighbour->queue_head;
NULL != q;
q = q->next_neighbour)
{
@@ -4384,31 +4418,6 @@ handle_monitor_start (void *cls,
/**
- * Signature of a function called by ATS with the current bandwidth
- * allocation to be used as determined by ATS.
- *
- * @param cls closure, NULL
- * @param session session this is about
- * @param bandwidth_out assigned outbound bandwidth for the connection,
- * 0 to signal disconnect
- * @param bandwidth_in assigned inbound bandwidth for the connection,
- * 0 to signal disconnect
- */
-static void
-ats_allocation_cb (void *cls,
- struct GNUNET_ATS_Session *session,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
-{
- (void) cls;
- GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
- bandwidth_out);
- GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
- bandwidth_in);
-}
-
-
-/**
* Find transport client providing communication service
* for the protocol @a prefix.
*
@@ -4429,24 +4438,22 @@ lookup_communicator (const char *prefix)
return tc;
}
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
+ "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
prefix);
return NULL;
}
/**
- * Signature of a function called by ATS suggesting transport to
- * try connecting with a particular address.
+ * Signature of a function called with a communicator @a address of a peer
+ * @a pid that an application wants us to connect to.
*
- * @param cls closure, NULL
* @param pid target peer
* @param address the address to try
*/
static void
-ats_suggestion_cb (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- const char *address)
+suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
+ const char *address)
{
static uint32_t idgen;
struct TransportClient *tc;
@@ -4455,18 +4462,17 @@ ats_suggestion_cb (void *cls,
struct GNUNET_MQ_Envelope *env;
size_t alen;
- (void) cls;
prefix = GNUNET_HELLO_address_to_prefix (address);
if (NULL == prefix)
{
- GNUNET_break (0); /* ATS gave invalid address!? */
+ GNUNET_break (0); /* We got an invalid address!? */
return;
}
tc = lookup_communicator (prefix);
if (NULL == tc)
{
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions ignored due to missing communicator",
+ "# Suggestions ignored due to missing communicator",
1,
GNUNET_NO);
return;
@@ -4511,7 +4517,7 @@ handle_queue_create_ok (void *cls,
return;
}
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions succeeded at communicator",
+ "# Suggestions succeeded at communicator",
1,
GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -4531,7 +4537,7 @@ handle_queue_create_ok (void *cls,
*/
static void
handle_queue_create_fail (void *cls,
- const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
+ const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
{
struct TransportClient *tc = cls;
@@ -4545,7 +4551,7 @@ handle_queue_create_fail (void *cls,
"Request #%u for communicator to create queue failed\n",
(unsigned int) ntohs (cqr->request_id));
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions failed in queue creation at communicator",
+ "# Suggestions failed in queue creation at communicator",
1,
GNUNET_NO);
GNUNET_SERVICE_client_continue (tc->client);
@@ -4553,6 +4559,131 @@ handle_queue_create_fail (void *cls,
/**
+ * Function called by PEERSTORE for each matching record.
+ *
+ * @param cls closure
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
+ */
+static void
+handle_hello (void *cls,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
+{
+ struct PeerRequest *pr = cls;
+ const char *val;
+
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Got failure from PEERSTORE: %s\n",
+ emsg);
+ return;
+ }
+ val = record->value;
+ if ( (0 == record->value_size) ||
+ ('\0' != val[record->value_size - 1]) )
+ {
+ GNUNET_break (0);
+ return;
+ }
+ suggest_to_connect (&pr->pid,
+ (const char *) record->value);
+}
+
+
+/**
+ * We have received a `struct ExpressPreferenceMessage` from an application client.
+ *
+ * @param cls handle to the client
+ * @param msg the start message
+ */
+static void
+handle_suggest (void *cls,
+ const struct ExpressPreferenceMessage *msg)
+{
+ struct TransportClient *tc = cls;
+ struct PeerRequest *pr;
+
+ if (CT_NONE == tc->type)
+ {
+ tc->type = CT_APPLICATION;
+ tc->details.application.requests
+ = GNUNET_CONTAINER_multipeermap_create (16,
+ GNUNET_YES);
+ }
+ if (CT_APPLICATION != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client suggested we talk to %s with preference %d at rate %u\n",
+ GNUNET_i2s (&msg->peer),
+ (int) ntohl (msg->pk),
+ (int) ntohl (msg->bw.value__));
+ pr = GNUNET_new (struct PeerRequest);
+ pr->tc = tc;
+ pr->pid = msg->peer;
+ pr->bw = msg->bw;
+ pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
+ if (GNUNET_YES !=
+ GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
+ &pr->pid,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+ {
+ GNUNET_break (0);
+ GNUNET_free (pr);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ pr->wc = GNUNET_PEERSTORE_watch (peerstore,
+ "transport",
+ &pr->pid,
+ "hello",
+ &handle_hello,
+ pr);
+ GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
+/**
+ * We have received a `struct ExpressPreferenceMessage` from an application client.
+ *
+ * @param cls handle to the client
+ * @param msg the start message
+ */
+static void
+handle_suggest_cancel (void *cls,
+ const struct ExpressPreferenceMessage *msg)
+{
+ struct TransportClient *tc = cls;
+ struct PeerRequest *pr;
+
+ if (CT_APPLICATION != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
+ &msg->peer);
+ if (NULL == pr)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ (void) stop_peer_request (tc,
+ &pr->pid,
+ pr);
+ GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
+/**
* Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
* messages. We do nothing here, real verification is done later.
*
@@ -4692,13 +4823,8 @@ do_shutdown (void *cls)
ephemeral_task = NULL;
}
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
- &free_neighbour_cb,
- NULL);
- if (NULL != ats)
- {
- GNUNET_ATS_transport_done (ats);
- ats = NULL;
- }
+ &free_neighbour_cb,
+ NULL);
if (NULL != peerstore)
{
GNUNET_PEERSTORE_disconnect (peerstore,
@@ -4779,17 +4905,6 @@ run (void *cls,
GNUNET_SCHEDULER_shutdown ();
return;
}
- ats = GNUNET_ATS_transport_init (GST_cfg,
- &ats_allocation_cb,
- NULL,
- &ats_suggestion_cb,
- NULL);
- if (NULL == ats)
- {
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
}
@@ -4803,6 +4918,15 @@ GNUNET_SERVICE_MAIN
&client_connect_cb,
&client_disconnect_cb,
NULL,
+ /* communication with applications */
+ GNUNET_MQ_hd_fixed_size (suggest,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
+ struct ExpressPreferenceMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (suggest_cancel,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
+ struct ExpressPreferenceMessage,
+ NULL),
/* communication with core */
GNUNET_MQ_hd_fixed_size (client_start,
GNUNET_MESSAGE_TYPE_TRANSPORT_START,
diff --git a/src/transport/transport.h b/src/transport/transport.h
index c0e02c3d9..b231ea8ae 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -1107,6 +1107,38 @@ struct GNUNET_TRANSPORT_AddressToVerify
};
+/**
+ * Application client to TRANSPORT service: we would like to have
+ * address suggestions for this peer.
+ */
+struct ExpressPreferenceMessage
+{
+ /**
+ * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST or
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL to stop
+ * suggestions.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * What type of performance preference does the client have?
+ * A `enum GNUNET_MQ_PreferenceKind` in NBO.
+ */
+ uint32_t pk GNUNET_PACKED;
+
+ /**
+ * Peer to get address suggestions for.
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * How much bandwidth in bytes/second does the application expect?
+ */
+ struct GNUNET_BANDWIDTH_Value32NBO bw;
+
+};
+
+
#endif
GNUNET_NETWORK_STRUCT_END
diff --git a/src/transport/transport_api2_application.c b/src/transport/transport_api2_application.c
new file mode 100644
index 000000000..325438e11
--- /dev/null
+++ b/src/transport/transport_api2_application.c
@@ -0,0 +1,366 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2010--2019 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+*/
+/**
+ * @file transport/transport_api2_application.c
+ * @brief enable clients to ask TRANSPORT about establishing connections to peers
+ * @author Christian Grothoff
+ * @author Matthias Wachs
+ */
+#include "platform.h"
+#include "gnunet_transport_application_service.h"
+#include "gnunet_transport_core_service.h"
+#include "transport.h"
+
+
+#define LOG(kind,...) GNUNET_log_from(kind, "transport-application-api", __VA_ARGS__)
+
+
+/**
+ * Handle for TRANSPORT address suggestion requests.
+ */
+struct GNUNET_TRANSPORT_ApplicationSuggestHandle
+{
+ /**
+ * ID of the peer for which address suggestion was requested.
+ */
+ struct GNUNET_PeerIdentity id;
+
+ /**
+ * Connecitivity handle this suggestion handle belongs to.
+ */
+ struct GNUNET_TRANSPORT_ApplicationHandle *ch;
+
+ /**
+ * What preference is being expressed?
+ */
+ enum GNUNET_MQ_PreferenceKind pk;
+
+ /**
+ * How much bandwidth does the client expect?
+ */
+ struct GNUNET_BANDWIDTH_Value32NBO bw;
+};
+
+
+/**
+ * Handle to the TRANSPORT subsystem for application management.
+ */
+struct GNUNET_TRANSPORT_ApplicationHandle
+{
+
+ /**
+ * Our configuration.
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Map with the identities of all the peers for which we would
+ * like to have address suggestions. The key is the PID, the
+ * value is currently the `struct GNUNET_TRANSPORT_ApplicationSuggestHandle`
+ */
+ struct GNUNET_CONTAINER_MultiPeerMap *sug_requests;
+
+ /**
+ * Message queue for sending requests to the TRANSPORT service.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Task to trigger reconnect.
+ */
+ struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * Reconnect backoff delay.
+ */
+ struct GNUNET_TIME_Relative backoff;
+};
+
+
+/**
+ * Re-establish the connection to the TRANSPORT service.
+ *
+ * @param ch handle to use to re-connect.
+ */
+static void
+reconnect (struct GNUNET_TRANSPORT_ApplicationHandle *ch);
+
+
+/**
+ * Re-establish the connection to the TRANSPORT service.
+ *
+ * @param cls handle to use to re-connect.
+ */
+static void
+reconnect_task (void *cls)
+{
+ struct GNUNET_TRANSPORT_ApplicationHandle *ch = cls;
+
+ ch->task = NULL;
+ reconnect (ch);
+}
+
+
+/**
+ * Disconnect from TRANSPORT and then reconnect.
+ *
+ * @param ch our handle
+ */
+static void
+force_reconnect (struct GNUNET_TRANSPORT_ApplicationHandle *ch)
+{
+ if (NULL != ch->mq)
+ {
+ GNUNET_MQ_destroy (ch->mq);
+ ch->mq = NULL;
+ }
+ ch->backoff = GNUNET_TIME_STD_BACKOFF (ch->backoff);
+ ch->task = GNUNET_SCHEDULER_add_delayed (ch->backoff,
+ &reconnect_task,
+ ch);
+}
+
+
+/**
+ * We encountered an error handling the MQ to the
+ * TRANSPORT service. Reconnect.
+ *
+ * @param cls the `struct GNUNET_TRANSPORT_ApplicationHandle`
+ * @param error details about the error
+ */
+static void
+error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_TRANSPORT_ApplicationHandle *ch = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "TRANSPORT connection died (code %d), reconnecting\n",
+ (int) error);
+ force_reconnect (ch);
+}
+
+
+/**
+ * Transmit request for an address suggestion.
+ *
+ * @param cls the `struct GNUNET_TRANSPORT_ApplicationHandle`
+ * @param peer peer to ask for an address suggestion for
+ * @param value the `struct GNUNET_TRANSPORT_SuggestHandle`
+ * @return #GNUNET_OK (continue to iterate), #GNUNET_SYSERR on
+ * failure (message queue no longer exists)
+ */
+static int
+transmit_suggestion (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *value)
+{
+ struct GNUNET_TRANSPORT_ApplicationHandle *ch = cls;
+ struct GNUNET_TRANSPORT_ApplicationSuggestHandle *sh = value;
+ struct GNUNET_MQ_Envelope *ev;
+ struct ExpressPreferenceMessage *m;
+
+ if (NULL == ch->mq)
+ return GNUNET_SYSERR;
+ ev = GNUNET_MQ_msg (m,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST);
+ m->pk = htonl ((uint32_t) sh->pk);
+ m->bw = sh->bw;
+ m->peer = *peer;
+ GNUNET_MQ_send (ch->mq, ev);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Re-establish the connection to the TRANSPORT service.
+ *
+ * @param ch handle to use to re-connect.
+ */
+static void
+reconnect (struct GNUNET_TRANSPORT_ApplicationHandle *ch)
+{
+ static const struct GNUNET_MQ_MessageHandler handlers[] = {
+ { NULL, 0, 0 }
+ };
+
+ GNUNET_assert (NULL == ch->mq);
+ ch->mq = GNUNET_CLIENT_connect (ch->cfg,
+ "transport",
+ handlers,
+ &error_handler,
+ ch);
+ if (NULL == ch->mq)
+ {
+ force_reconnect (ch);
+ return;
+ }
+ GNUNET_CONTAINER_multipeermap_iterate (ch->sug_requests,
+ &transmit_suggestion,
+ ch);
+}
+
+
+/**
+ * Initialize the TRANSPORT application suggestion client handle.
+ *
+ * @param cfg configuration to use
+ * @return transport application handle, NULL on error
+ */
+struct GNUNET_TRANSPORT_ApplicationHandle *
+GNUNET_TRANSPORT_application_init (const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_TRANSPORT_ApplicationHandle *ch;
+
+ ch = GNUNET_new (struct GNUNET_TRANSPORT_ApplicationHandle);
+ ch->cfg = cfg;
+ ch->sug_requests = GNUNET_CONTAINER_multipeermap_create (32,
+ GNUNET_YES);
+ reconnect (ch);
+ return ch;
+}
+
+
+/**
+ * Function called to free all `struct GNUNET_TRANSPORT_ApplicationSuggestHandle`s
+ * in the map.
+ *
+ * @param cls NULL
+ * @param key the key
+ * @param value the value to free
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+free_sug_handle (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
+{
+ struct GNUNET_TRANSPORT_ApplicationSuggestHandle *cur = value;
+
+ GNUNET_free (cur);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client is done with TRANSPORT application management, release resources.
+ *
+ * @param ch handle to release
+ */
+void
+GNUNET_TRANSPORT_application_done (struct GNUNET_TRANSPORT_ApplicationHandle *ch)
+{
+ if (NULL != ch->mq)
+ {
+ GNUNET_MQ_destroy (ch->mq);
+ ch->mq = NULL;
+ }
+ if (NULL != ch->task)
+ {
+ GNUNET_SCHEDULER_cancel (ch->task);
+ ch->task = NULL;
+ }
+ GNUNET_CONTAINER_multipeermap_iterate (ch->sug_requests,
+ &free_sug_handle,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (ch->sug_requests);
+ GNUNET_free (ch);
+}
+
+
+/**
+ * We would like to receive address suggestions for a peer. TRANSPORT will
+ * respond with a call to the continuation immediately containing an address or
+ * no address if none is available. TRANSPORT can suggest more addresses until we call
+ * #GNUNET_TRANSPORT_application_suggest_cancel().
+ *
+ * @param ch handle
+ * @param peer identity of the peer we need an address for
+ * @param pk what kind of application will the application require (can be
+ * #GNUNET_MQ_PREFERENCE_NONE, we will still try to connect)
+ * @param bw desired bandwith, can be zero (we will still try to connect)
+ * @return suggest handle, NULL if a request is already pending
+ */
+struct GNUNET_TRANSPORT_ApplicationSuggestHandle *
+GNUNET_TRANSPORT_application_suggest (struct GNUNET_TRANSPORT_ApplicationHandle *ch,
+ const struct GNUNET_PeerIdentity *peer,
+ enum GNUNET_MQ_PreferenceKind pk,
+ struct GNUNET_BANDWIDTH_Value32NBO bw)
+{
+ struct GNUNET_TRANSPORT_ApplicationSuggestHandle *s;
+
+ s = GNUNET_new (struct GNUNET_TRANSPORT_ApplicationSuggestHandle);
+ s->ch = ch;
+ s->id = *peer;
+ s->pk = pk;
+ s->bw = bw;
+ (void) GNUNET_CONTAINER_multipeermap_put (ch->sug_requests,
+ &s->id,
+ s,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Requesting TRANSPORT to suggest address for `%s'\n",
+ GNUNET_i2s (peer));
+ if (NULL == ch->mq)
+ return s;
+ GNUNET_assert (GNUNET_OK ==
+ transmit_suggestion (ch,
+ &s->id,
+ s));
+ return s;
+}
+
+
+/**
+ * We no longer care about being connected to a peer.
+ *
+ * @param sh handle to stop
+ */
+void
+GNUNET_TRANSPORT_application_suggest_cancel (struct GNUNET_TRANSPORT_ApplicationSuggestHandle *sh)
+{
+ struct GNUNET_TRANSPORT_ApplicationHandle *ch = sh->ch;
+ struct GNUNET_MQ_Envelope *ev;
+ struct ExpressPreferenceMessage *m;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Telling TRANSPORT we no longer care for an address for `%s'\n",
+ GNUNET_i2s (&sh->id));
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_remove (ch->sug_requests,
+ &sh->id,
+ sh));
+ if (NULL == ch->mq)
+ {
+ GNUNET_free (sh);
+ return;
+ }
+ ev = GNUNET_MQ_msg (m,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL);
+ m->pk = htonl ((uint32_t) sh->pk);
+ m->bw = sh->bw;
+ m->peer = sh->id;
+ GNUNET_MQ_send (ch->mq,
+ ev);
+ GNUNET_free (sh);
+}
+
+
+/* end of transport_api2_application.c */