summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c628
1 files changed, 376 insertions, 252 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index b64bfb182..6494a5dfd 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -41,7 +41,7 @@
* #3 transport should use validation to also establish
* effective flow control (for uni-directional transports!)
* #4 UDP broadcasting logic must be extended to use the new API
- * #5 only validated addresses go to ATS for scheduling; that
+ * #5 only validated addresses are selected for scheduling; that
* also ensures we know the RTT
* #6 to ensure flow control and RTT are OK, we always do the
* 'validation', even if address comes from PEERSTORE
@@ -59,10 +59,7 @@
* -
*
* Easy:
- * - use ATS bandwidth allocation callback and schedule transmissions!
- *
- * Plan:
- * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
+ * - figure out how to call XXX_suggestion_cb!
*
* Later:
* - change transport-core API to provide proper flow control in both
@@ -98,8 +95,6 @@
* "latest timestamps seen" data
* - if transport implements DV, we likely need a 3rd peermap
* in addition to ephemerals and (direct) neighbours
- * => in this data structure, we should track ATS metrics (distance, RTT, etc.)
- * as well as latest timestamps seen, goodput, fragments for transmission, etc.
* ==> check if stuff needs to be moved out of "Neighbour"
* - transport should encapsualte core-level messages and do its
* own ACKing for RTT/goodput/loss measurements _and_ fragment
@@ -111,7 +106,6 @@
#include "gnunet_transport_monitor_service.h"
#include "gnunet_peerstore_service.h"
#include "gnunet_hello_lib.h"
-#include "gnunet_ats_transport_service.h"
#include "gnunet_signatures.h"
#include "transport.h"
@@ -148,18 +142,11 @@
#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
/**
- * How many messages can we have pending for a given session (queue to
+ * How many messages can we have pending for a given queue (queue to
* a particular peer via a communicator) process before we start to
* throttle that queue?
- *
- * Used if ATS assigns more bandwidth to a particular transmission
- * method than that transmission method can right now handle. (Yes,
- * ATS should eventually notice utilization below allocation and
- * adjust, but we don't want to queue up tons of messages in the
- * meantime). Must be significantly below
- * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
*/
-#define SESSION_QUEUE_LIMIT 32
+#define QUEUE_LENGTH_LIMIT 32
GNUNET_NETWORK_STRUCT_BEGIN
@@ -547,7 +534,6 @@ struct TransportDVBox
GNUNET_NETWORK_STRUCT_END
-
/**
* What type of client is the `struct TransportClient` about?
*/
@@ -571,7 +557,12 @@ enum ClientType
/**
* It is a communicator, use for communication.
*/
- CT_COMMUNICATOR = 3
+ CT_COMMUNICATOR = 3,
+
+ /**
+ * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
+ */
+ CT_APPLICATION = 4
};
@@ -725,11 +716,18 @@ struct DistanceVector
/**
+ * A queue is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
+ */
+struct Queue;
+
+
+/**
* Entry identifying transmission in one of our `struct
- * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to
+ * Queue` which still awaits an ACK. This is used to
* ensure we do not overwhelm a communicator and limit the number of
* messages outstanding per communicator (say in case communicator is
- * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
+ * CPU bound) and per queue (in case bandwidth allocation exceeds
* what the communicator can actually provide towards a particular
* peer/target).
*/
@@ -747,9 +745,9 @@ struct QueueEntry
struct QueueEntry *prev;
/**
- * ATS session this entry is queued with.
+ * Queue this entry is queued with.
*/
- struct GNUNET_ATS_Session *session;
+ struct Queue *queue;
/**
* Message ID used for this message with the queue used for transmission.
@@ -759,30 +757,30 @@ struct QueueEntry
/**
- * An ATS session is a message queue provided by a communicator
+ * A queue is a message queue provided by a communicator
* via which we can reach a particular neighbour.
*/
-struct GNUNET_ATS_Session
+struct Queue
{
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *next_neighbour;
+ struct Queue *next_neighbour;
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *prev_neighbour;
+ struct Queue *prev_neighbour;
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *prev_client;
+ struct Queue *prev_client;
/**
* Kept in a MDLL.
*/
- struct GNUNET_ATS_Session *next_client;
+ struct Queue *next_client;
/**
* Head of DLL of unacked transmission requests.
@@ -795,33 +793,28 @@ struct GNUNET_ATS_Session
struct QueueEntry *queue_tail;
/**
- * Which neighbour is this ATS session for?
+ * Which neighbour is this queue for?
*/
struct Neighbour *neighbour;
/**
- * Which communicator offers this ATS session?
+ * Which communicator offers this queue?
*/
struct TransportClient *tc;
/**
- * Address served by the ATS session.
+ * Address served by the queue.
*/
const char *address;
/**
- * Handle by which we inform ATS about this queue.
- */
- struct GNUNET_ATS_SessionRecord *sr;
-
- /**
* Task scheduled for the time when this queue can (likely) transmit the
* next message. Still needs to check with the @e tracker_out to be sure.
*/
struct GNUNET_SCHEDULER_Task *transmit_task;
/**
- * Our current RTT estimate for this ATS session.
+ * Our current RTT estimate for this queue.
*/
struct GNUNET_TIME_Relative rtt;
@@ -831,17 +824,17 @@ struct GNUNET_ATS_Session
uint64_t mid_gen;
/**
- * Unique identifier of this ATS session with the communicator.
+ * Unique identifier of this queue with the communicator.
*/
uint32_t qid;
/**
- * Maximum transmission unit supported by this ATS session.
+ * Maximum transmission unit supported by this queue.
*/
uint32_t mtu;
/**
- * Distance to the target of this ATS session.
+ * Distance to the target of this queue.
*/
uint32_t distance;
@@ -861,22 +854,22 @@ struct GNUNET_ATS_Session
unsigned int queue_length;
/**
- * Network type offered by this ATS session.
+ * Network type offered by this queue.
*/
enum GNUNET_NetworkType nt;
/**
- * Connection status for this ATS session.
+ * Connection status for this queue.
*/
enum GNUNET_TRANSPORT_ConnectionStatus cs;
/**
- * How much outbound bandwidth do we have available for this session?
+ * How much outbound bandwidth do we have available for this queue?
*/
struct GNUNET_BANDWIDTH_Tracker tracker_out;
/**
- * How much inbound bandwidth do we have available for this session?
+ * How much inbound bandwidth do we have available for this queue?
*/
struct GNUNET_BANDWIDTH_Tracker tracker_in;
};
@@ -1025,14 +1018,14 @@ struct Neighbour
struct DistanceVectorHop *dv_tail;
/**
- * Head of DLL of ATS sessions to this peer.
+ * Head of DLL of queues to this peer.
*/
- struct GNUNET_ATS_Session *session_head;
+ struct Queue *queue_head;
/**
- * Tail of DLL of ATS sessions to this peer.
+ * Tail of DLL of queues to this peer.
*/
- struct GNUNET_ATS_Session *session_tail;
+ struct Queue *queue_tail;
/**
* Task run to cleanup pending messages that have exceeded their timeout.
@@ -1040,13 +1033,12 @@ struct Neighbour
struct GNUNET_SCHEDULER_Task *timeout_task;
/**
- * Quota at which CORE is allowed to transmit to this peer
- * according to ATS.
+ * Quota at which CORE is allowed to transmit to this peer.
*
* FIXME: not yet used, tricky to get right given multiple queues!
- * (=> Idea: let ATS set a quota per queue and we add them up here?)
+ * (=> Idea: measure???)
* FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero (before ATS?)
+ * Options: start at a minimum value or at literally zero?
* (=> Current thought: clean would be zero!)
*/
struct GNUNET_BANDWIDTH_Value32NBO quota_out;
@@ -1060,6 +1052,40 @@ struct Neighbour
/**
+ * A peer that an application (client) would like us to talk to directly.
+ */
+struct PeerRequest
+{
+
+ /**
+ * Which peer is this about?
+ */
+ struct GNUNET_PeerIdentity pid;
+
+ /**
+ * Client responsible for the request.
+ */
+ struct TransportClient *tc;
+
+ /**
+ * Handle for watching the peerstore for HELLOs for this peer.
+ */
+ struct GNUNET_PEERSTORE_WatchContext *wc;
+
+ /**
+ * What kind of performance preference does this @e tc have?
+ */
+ enum GNUNET_MQ_PreferenceKind pk;
+
+ /**
+ * How much bandwidth would this @e tc like to see?
+ */
+ struct GNUNET_BANDWIDTH_Value32NBO bw;
+
+};
+
+
+/**
* Types of different pending messages.
*/
enum PendingMessageType
@@ -1362,12 +1388,12 @@ struct TransportClient
/**
* Head of DLL of queues offered by this communicator.
*/
- struct GNUNET_ATS_Session *session_head;
+ struct Queue *queue_head;
/**
* Tail of DLL of queues offered by this communicator.
*/
- struct GNUNET_ATS_Session *session_tail;
+ struct Queue *queue_tail;
/**
* Head of list of the addresses of this peer offered by this communicator.
@@ -1393,6 +1419,19 @@ struct TransportClient
} communicator;
+ /**
+ * Information for @e type #CT_APPLICATION
+ */
+ struct {
+
+ /**
+ * Map of requests for peers the given client application would like to
+ * see connections for. Maps from PIDs to `struct PeerRequest`.
+ */
+ struct GNUNET_CONTAINER_MultiPeerMap *requests;
+
+ } application;
+
} details;
};
@@ -1465,11 +1504,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
*/
static struct GNUNET_SCHEDULER_Task *ephemeral_task;
-/**
- * Our connection to ATS for allocation and bootstrapping.
- */
-static struct GNUNET_ATS_TransportHandle *ats;
-
/**
* Free cached ephemeral key.
@@ -1781,7 +1815,7 @@ free_neighbour (struct Neighbour *neighbour)
{
struct DistanceVectorHop *dvh;
- GNUNET_assert (NULL == neighbour->session_head);
+ GNUNET_assert (NULL == neighbour->queue_head);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (neighbours,
&neighbour->pid,
@@ -1886,7 +1920,7 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
* communicator for transmission (updating the tracker, and re-scheduling
* itself if applicable).
*
- * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ * @param cls the `struct Queue` to process transmissions for
*/
static void
transmit_on_queue (void *cls);
@@ -1902,7 +1936,7 @@ transmit_on_queue (void *cls);
* @param queue the queue to do scheduling for
*/
static void
-schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
+schedule_transmit_on_queue (struct Queue *queue)
{
struct Neighbour *n = queue->neighbour;
struct PendingMessage *pm = n->pending_msg_head;
@@ -1919,10 +1953,10 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
GNUNET_NO);
return;
}
- if (queue->queue_length >= SESSION_QUEUE_LIMIT)
+ if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
{
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to session queue limit",
+ "# Transmission throttled due to queue queue limit",
1,
GNUNET_NO);
return;
@@ -1958,15 +1992,15 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
/**
- * Free @a session.
+ * Free @a queue.
*
- * @param session the session to free
+ * @param queue the queue to free
*/
static void
-free_session (struct GNUNET_ATS_Session *session)
+free_queue (struct Queue *queue)
{
- struct Neighbour *neighbour = session->neighbour;
- struct TransportClient *tc = session->tc;
+ struct Neighbour *neighbour = queue->neighbour;
+ struct TransportClient *tc = queue->tc;
struct MonitorEvent me = {
.cs = GNUNET_TRANSPORT_CS_DOWN,
.rtt = GNUNET_TIME_UNIT_FOREVER_REL
@@ -1974,30 +2008,30 @@ free_session (struct GNUNET_ATS_Session *session)
struct QueueEntry *qe;
int maxxed;
- if (NULL != session->transmit_task)
+ if (NULL != queue->transmit_task)
{
- GNUNET_SCHEDULER_cancel (session->transmit_task);
- session->transmit_task = NULL;
+ GNUNET_SCHEDULER_cancel (queue->transmit_task);
+ queue->transmit_task = NULL;
}
GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->session_head,
- neighbour->session_tail,
- session);
+ neighbour->queue_head,
+ neighbour->queue_tail,
+ queue);
GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.communicator.session_head,
- tc->details.communicator.session_tail,
- session);
+ tc->details.communicator.queue_head,
+ tc->details.communicator.queue_tail,
+ queue);
maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
- while (NULL != (qe = session->queue_head))
+ while (NULL != (qe = queue->queue_head))
{
- GNUNET_CONTAINER_DLL_remove (session->queue_head,
- session->queue_tail,
+ GNUNET_CONTAINER_DLL_remove (queue->queue_head,
+ queue->queue_tail,
qe);
- session->queue_length--;
+ queue->queue_length--;
tc->details.communicator.total_queue_length--;
GNUNET_free (qe);
}
- GNUNET_assert (0 == session->queue_length);
+ GNUNET_assert (0 == queue->queue_length);
if ( (maxxed) &&
(COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
{
@@ -2006,20 +2040,19 @@ free_session (struct GNUNET_ATS_Session *session)
"# Transmission throttled due to communicator queue limit",
-1,
GNUNET_NO);
- for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
+ for (struct Queue *s = tc->details.communicator.queue_head;
NULL != s;
s = s->next_client)
schedule_transmit_on_queue (s);
}
notify_monitors (&neighbour->pid,
- session->address,
- session->nt,
+ queue->address,
+ queue->nt,
&me);
- GNUNET_ATS_session_del (session->sr);
- GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
- GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
- GNUNET_free (session);
- if (NULL == neighbour->session_head)
+ GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
+ GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
+ GNUNET_free (queue);
+ if (NULL == neighbour->queue_head)
{
cores_send_disconnect_info (&neighbour->pid);
free_neighbour (neighbour);
@@ -2055,6 +2088,33 @@ free_address_list_entry (struct AddressListEntry *ale)
/**
+ * Stop the peer request in @a value.
+ *
+ * @param cls a `struct TransportClient` that no longer makes the request
+ * @param pid the peer's identity
+ * @param value a `struct PeerRequest`
+ * @return #GNUNET_YES (always)
+ */
+static int
+stop_peer_request (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct TransportClient *tc = cls;
+ struct PeerRequest *pr = value;
+
+ GNUNET_PEERSTORE_watch_cancel (pr->wc);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
+ pid,
+ pr));
+ GNUNET_free (pr);
+
+ return GNUNET_OK;
+}
+
+
+/**
* Called whenever a client is disconnected. Frees our
* resources associated with that client.
*
@@ -2097,16 +2157,22 @@ client_disconnect_cb (void *cls,
break;
case CT_COMMUNICATOR:
{
- struct GNUNET_ATS_Session *q;
+ struct Queue *q;
struct AddressListEntry *ale;
- while (NULL != (q = tc->details.communicator.session_head))
- free_session (q);
+ while (NULL != (q = tc->details.communicator.queue_head))
+ free_queue (q);
while (NULL != (ale = tc->details.communicator.addr_head))
- free_address_list_entry (ale);
+ free_address_list_entry (ale);
GNUNET_free (tc->details.communicator.address_prefix);
}
break;
+ case CT_APPLICATION:
+ GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
+ &stop_peer_request,
+ tc);
+ GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
+ break;
}
GNUNET_free (tc);
}
@@ -2419,7 +2485,7 @@ handle_client_send (void *cls,
}
if (! was_empty)
return; /* all queues must already be busy */
- for (struct GNUNET_ATS_Session *queue = target->session_head;
+ for (struct Queue *queue = target->queue_head;
NULL != queue;
queue = queue->next_neighbour)
{
@@ -2491,7 +2557,7 @@ handle_communicator_available (void *cls,
*/
static int
check_communicator_backchannel (void *cls,
- const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
+ const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
{
const struct GNUNET_MessageHeader *inbox;
const char *is;
@@ -2565,10 +2631,10 @@ expire_ephemerals (void *cls)
*/
static void
lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
- struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
- struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
- struct GNUNET_TIME_Absolute *ephemeral_validity)
+ struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
+ struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
+ struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
+ struct GNUNET_TIME_Absolute *ephemeral_validity)
{
struct EphemeralCacheEntry *ece;
struct EphemeralConfirmation ec;
@@ -2643,7 +2709,7 @@ route_message (const struct GNUNET_PeerIdentity *target,
*/
static void
handle_communicator_backchannel (void *cls,
- const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
+ const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
{
struct TransportClient *tc = cls;
struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
@@ -2729,7 +2795,7 @@ store_pi (void *cls);
*/
static void
peerstore_store_cb (void *cls,
- int success)
+ int success)
{
struct AddressListEntry *ale = cls;
@@ -3178,7 +3244,7 @@ handle_fragment_box (void *cls,
if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
ack_now = GNUNET_YES; /* maximum acks received */
// FIXME: possibly also ACK based on RTT (but for that we'd need to
- // determine the session used for the ACK first!)
+ // determine the queue used for the ACK first!)
/* is reassembly complete? */
if (0 != rc->msg_missing)
@@ -3289,7 +3355,7 @@ handle_reliability_box (void *cls,
*/
static void
handle_reliability_ack (void *cls,
- const struct TransportReliabilityAckMessage *ra)
+ const struct TransportReliabilityAckMessage *ra)
{
struct CommunicatorMessageContext *cmc = cls;
@@ -3308,7 +3374,7 @@ handle_reliability_ack (void *cls,
*/
static int
check_backchannel_encapsulation (void *cls,
- const struct TransportBackchannelEncapsulationMessage *be)
+ const struct TransportBackchannelEncapsulationMessage *be)
{
uint16_t size = ntohs (be->header.size);
@@ -3329,7 +3395,7 @@ check_backchannel_encapsulation (void *cls,
*/
static void
handle_backchannel_encapsulation (void *cls,
- const struct TransportBackchannelEncapsulationMessage *be)
+ const struct TransportBackchannelEncapsulationMessage *be)
{
struct CommunicatorMessageContext *cmc = cls;
@@ -3361,7 +3427,7 @@ handle_backchannel_encapsulation (void *cls,
*/
static int
check_dv_learn (void *cls,
- const struct TransportDVLearn *dvl)
+ const struct TransportDVLearn *dvl)
{
uint16_t size = ntohs (dvl->header.size);
uint16_t num_hops = ntohs (dvl->num_hops);
@@ -3375,15 +3441,15 @@ check_dv_learn (void *cls,
for (unsigned int i=0;i<num_hops;i++)
{
if (0 == memcmp (&dvl->initiator,
- &hops[i],
- sizeof (struct GNUNET_PeerIdentity)))
+ &hops[i],
+ sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
if (0 == memcmp (&GST_my_identity,
- &hops[i],
- sizeof (struct GNUNET_PeerIdentity)))
+ &hops[i],
+ sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
@@ -3401,7 +3467,7 @@ check_dv_learn (void *cls,
*/
static void
handle_dv_learn (void *cls,
- const struct TransportDVLearn *dvl)
+ const struct TransportDVLearn *dvl)
{
struct CommunicatorMessageContext *cmc = cls;
@@ -3420,7 +3486,7 @@ handle_dv_learn (void *cls,
*/
static int
check_dv_box (void *cls,
- const struct TransportDVBox *dvb)
+ const struct TransportDVBox *dvb)
{
uint16_t size = ntohs (dvb->header.size);
uint16_t num_hops = ntohs (dvb->num_hops);
@@ -3614,12 +3680,12 @@ check_add_queue_message (void *cls,
* Bandwidth tracker informs us that the delay until we should receive
* more has changed.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
+ * @param cls a `struct Queue` for which the delay changed
*/
static void
tracker_update_in_cb (void *cls)
{
- struct GNUNET_ATS_Session *queue = cls;
+ struct Queue *queue = cls;
struct GNUNET_TIME_Relative in_delay;
unsigned int rsize;
@@ -3816,12 +3882,12 @@ reliability_box_message (struct PendingMessage *pm)
* communicator for transmission (updating the tracker, and re-scheduling
* itself if applicable).
*
- * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ * @param cls the `struct Queue` to process transmissions for
*/
static void
transmit_on_queue (void *cls)
{
- struct GNUNET_ATS_Session *queue = cls;
+ struct Queue *queue = cls;
struct Neighbour *n = queue->neighbour;
struct QueueEntry *qe;
struct PendingMessage *pm;
@@ -3871,7 +3937,7 @@ transmit_on_queue (void *cls)
/* Pass 's' for transission to the communicator */
qe = GNUNET_new (struct QueueEntry);
qe->mid = queue->mid_gen++;
- qe->session = queue;
+ qe->queue = queue;
// qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
GNUNET_CONTAINER_DLL_insert (queue->queue_head,
queue->queue_tail,
@@ -4007,12 +4073,12 @@ transmit_on_queue (void *cls)
* Bandwidth tracker informs us that the delay until we
* can transmit again changed.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
+ * @param cls a `struct Queue` for which the delay changed
*/
static void
tracker_update_out_cb (void *cls)
{
- struct GNUNET_ATS_Session *queue = cls;
+ struct Queue *queue = cls;
struct Neighbour *n = queue->neighbour;
if (NULL == n->pending_msg_head)
@@ -4032,7 +4098,7 @@ tracker_update_out_cb (void *cls)
* Bandwidth tracker informs us that excessive outbound bandwidth was
* allocated which is not being used.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
+ * @param cls a `struct Queue` for which the excess was noted
*/
static void
tracker_excess_out_cb (void *cls)
@@ -4041,7 +4107,7 @@ tracker_excess_out_cb (void *cls)
this is done internally within transport_api2_core already,
but we probably want to change the logic and trigger it
from here via a message instead! */
- /* TODO: maybe inform ATS at this point? */
+ /* TODO: maybe inform someone at this point? */
GNUNET_STATISTICS_update (GST_stats,
"# Excess outbound bandwidth reported",
1,
@@ -4054,12 +4120,12 @@ tracker_excess_out_cb (void *cls)
* Bandwidth tracker informs us that excessive inbound bandwidth was allocated
* which is not being used.
*
- * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
+ * @param cls a `struct Queue` for which the excess was noted
*/
static void
tracker_excess_in_cb (void *cls)
{
- /* TODO: maybe inform ATS at this point? */
+ /* TODO: maybe inform somone at this point? */
GNUNET_STATISTICS_update (GST_stats,
"# Excess inbound bandwidth reported",
1,
@@ -4078,7 +4144,7 @@ handle_add_queue_message (void *cls,
const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
{
struct TransportClient *tc = cls;
- struct GNUNET_ATS_Session *queue;
+ struct Queue *queue;
struct Neighbour *neighbour;
const char *addr;
uint16_t addr_len;
@@ -4108,7 +4174,7 @@ handle_add_queue_message (void *cls,
addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
addr = (const char *) &aqm[1];
- queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
+ queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
queue->tc = tc;
queue->address = (const char *) &queue[1];
queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
@@ -4134,38 +4200,6 @@ handle_add_queue_message (void *cls,
memcpy (&queue[1],
addr,
addr_len);
- /* notify ATS about new queue */
- {
- struct GNUNET_ATS_Properties prop = {
- .delay = GNUNET_TIME_UNIT_FOREVER_REL,
- .mtu = queue->mtu,
- .nt = queue->nt,
- .cc = tc->details.communicator.cc
- };
-
- queue->sr = GNUNET_ATS_session_add (ats,
- &neighbour->pid,
- queue->address,
- queue,
- &prop);
- if (NULL == queue->sr)
- {
- /* This can only happen if the 'address' was way too long for ATS
- (approaching 64k in strlen()!). In this case, the communicator
- must be buggy and we drop it. */
- GNUNET_break (0);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
- GNUNET_free (queue);
- if (NULL == neighbour->session_head)
- {
- cores_send_disconnect_info (&neighbour->pid);
- free_neighbour (neighbour);
- }
- GNUNET_SERVICE_client_drop (tc->client);
- return;
- }
- }
/* notify monitors about new queue */
{
struct MonitorEvent me = {
@@ -4179,12 +4213,12 @@ handle_add_queue_message (void *cls,
&me);
}
GNUNET_CONTAINER_MDLL_insert (neighbour,
- neighbour->session_head,
- neighbour->session_tail,
+ neighbour->queue_head,
+ neighbour->queue_tail,
queue);
GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.communicator.session_head,
- tc->details.communicator.session_tail,
+ tc->details.communicator.queue_head,
+ tc->details.communicator.queue_tail,
queue);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -4208,18 +4242,18 @@ handle_del_queue_message (void *cls,
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
- NULL != session;
- session = session->next_client)
+ for (struct Queue *queue = tc->details.communicator.queue_head;
+ NULL != queue;
+ queue = queue->next_client)
{
- struct Neighbour *neighbour = session->neighbour;
+ struct Neighbour *neighbour = queue->neighbour;
- if ( (dqm->qid != session->qid) ||
+ if ( (dqm->qid != queue->qid) ||
(0 != memcmp (&dqm->receiver,
&neighbour->pid,
sizeof (struct GNUNET_PeerIdentity))) )
continue;
- free_session (session);
+ free_queue (queue);
GNUNET_SERVICE_client_continue (tc->client);
return;
}
@@ -4239,7 +4273,7 @@ handle_send_message_ack (void *cls,
const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
{
struct TransportClient *tc = cls;
- struct QueueEntry *queue;
+ struct QueueEntry *qe;
if (CT_COMMUNICATOR != tc->type)
{
@@ -4249,37 +4283,37 @@ handle_send_message_ack (void *cls,
}
/* find our queue entry matching the ACK */
- queue = NULL;
- for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
- NULL != session;
- session = session->next_client)
+ qe = NULL;
+ for (struct Queue *queue = tc->details.communicator.queue_head;
+ NULL != queue;
+ queue = queue->next_client)
{
- if (0 != memcmp (&session->neighbour->pid,
+ if (0 != memcmp (&queue->neighbour->pid,
&sma->receiver,
sizeof (struct GNUNET_PeerIdentity)))
continue;
- for (struct QueueEntry *qe = session->queue_head;
- NULL != qe;
- qe = qe->next)
+ for (struct QueueEntry *qep = queue->queue_head;
+ NULL != qep;
+ qep = qep->next)
{
- if (qe->mid != sma->mid)
- continue;
- queue = qe;
+ if (qep->mid != sma->mid)
+ continue;
+ qe = qep;
break;
}
break;
}
- if (NULL == queue)
+ if (NULL == qe)
{
/* this should never happen */
GNUNET_break (0);
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
- queue->session->queue_tail,
- queue);
- queue->session->queue_length--;
+ GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
+ qe->queue->queue_tail,
+ qe);
+ qe->queue->queue_length--;
tc->details.communicator.total_queue_length--;
GNUNET_SERVICE_client_continue (tc->client);
@@ -4291,19 +4325,19 @@ handle_send_message_ack (void *cls,
"# Transmission throttled due to communicator queue limit",
-1,
GNUNET_NO);
- for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
- NULL != session;
- session = session->next_client)
- schedule_transmit_on_queue (session);
+ for (struct Queue *queue = tc->details.communicator.queue_head;
+ NULL != queue;
+ queue = queue->next_client)
+ schedule_transmit_on_queue (queue);
}
- else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
+ else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
{
/* queue dropped below threshold; only resume this one queue */
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to session queue limit",
+ "# Transmission throttled due to queue queue limit",
-1,
GNUNET_NO);
- schedule_transmit_on_queue (queue->session);
+ schedule_transmit_on_queue (qe->queue);
}
/* TODO: we also should react on the status! */
@@ -4311,7 +4345,7 @@ handle_send_message_ack (void *cls,
// FIXME: react to communicator status about transmission request. We got:
sma->status; // OK success, SYSERR failure
- GNUNET_free (queue);
+ GNUNET_free (qe);
}
@@ -4333,7 +4367,7 @@ notify_client_queues (void *cls,
struct Neighbour *neighbour = value;
GNUNET_assert (CT_MONITOR == tc->type);
- for (struct GNUNET_ATS_Session *q = neighbour->session_head;
+ for (struct Queue *q = neighbour->queue_head;
NULL != q;
q = q->next_neighbour)
{
@@ -4384,31 +4418,6 @@ handle_monitor_start (void *cls,
/**
- * Signature of a function called by ATS with the current bandwidth
- * allocation to be used as determined by ATS.
- *
- * @param cls closure, NULL
- * @param session session this is about
- * @param bandwidth_out assigned outbound bandwidth for the connection,
- * 0 to signal disconnect
- * @param bandwidth_in assigned inbound bandwidth for the connection,
- * 0 to signal disconnect
- */
-static void
-ats_allocation_cb (void *cls,
- struct GNUNET_ATS_Session *session,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
- struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
-{
- (void) cls;
- GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
- bandwidth_out);
- GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
- bandwidth_in);
-}
-
-
-/**
* Find transport client providing communication service
* for the protocol @a prefix.
*
@@ -4429,24 +4438,22 @@ lookup_communicator (const char *prefix)
return tc;
}
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
+ "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
prefix);
return NULL;
}
/**
- * Signature of a function called by ATS suggesting transport to
- * try connecting with a particular address.
+ * Signature of a function called with a communicator @a address of a peer
+ * @a pid that an application wants us to connect to.
*
- * @param cls closure, NULL
* @param pid target peer
* @param address the address to try
*/
static void
-ats_suggestion_cb (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- const char *address)
+suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
+ const char *address)
{
static uint32_t idgen;
struct TransportClient *tc;
@@ -4455,18 +4462,17 @@ ats_suggestion_cb (void *cls,
struct GNUNET_MQ_Envelope *env;
size_t alen;
- (void) cls;
prefix = GNUNET_HELLO_address_to_prefix (address);
if (NULL == prefix)
{
- GNUNET_break (0); /* ATS gave invalid address!? */
+ GNUNET_break (0); /* We got an invalid address!? */
return;
}
tc = lookup_communicator (prefix);
if (NULL == tc)
{
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions ignored due to missing communicator",
+ "# Suggestions ignored due to missing communicator",
1,
GNUNET_NO);
return;
@@ -4511,7 +4517,7 @@ handle_queue_create_ok (void *cls,
return;
}
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions succeeded at communicator",
+ "# Suggestions succeeded at communicator",
1,
GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -4531,7 +4537,7 @@ handle_queue_create_ok (void *cls,
*/
static void
handle_queue_create_fail (void *cls,
- const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
+ const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
{
struct TransportClient *tc = cls;
@@ -4545,7 +4551,7 @@ handle_queue_create_fail (void *cls,
"Request #%u for communicator to create queue failed\n",
(unsigned int) ntohs (cqr->request_id));
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions failed in queue creation at communicator",
+ "# Suggestions failed in queue creation at communicator",
1,
GNUNET_NO);
GNUNET_SERVICE_client_continue (tc->client);
@@ -4553,6 +4559,131 @@ handle_queue_create_fail (void *cls,
/**
+ * Function called by PEERSTORE for each matching record.
+ *
+ * @param cls closure
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
+ */
+static void
+handle_hello (void *cls,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
+{
+ struct PeerRequest *pr = cls;
+ const char *val;
+
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Got failure from PEERSTORE: %s\n",
+ emsg);
+ return;
+ }
+ val = record->value;
+ if ( (0 == record->value_size) ||
+ ('\0' != val[record->value_size - 1]) )
+ {
+ GNUNET_break (0);
+ return;
+ }
+ suggest_to_connect (&pr->pid,
+ (const char *) record->value);
+}
+
+
+/**
+ * We have received a `struct ExpressPreferenceMessage` from an application client.
+ *
+ * @param cls handle to the client
+ * @param msg the start message
+ */
+static void
+handle_suggest (void *cls,
+ const struct ExpressPreferenceMessage *msg)
+{
+ struct TransportClient *tc = cls;
+ struct PeerRequest *pr;
+
+ if (CT_NONE == tc->type)
+ {
+ tc->type = CT_APPLICATION;
+ tc->details.application.requests
+ = GNUNET_CONTAINER_multipeermap_create (16,
+ GNUNET_YES);
+ }
+ if (CT_APPLICATION != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client suggested we talk to %s with preference %d at rate %u\n",
+ GNUNET_i2s (&msg->peer),
+ (int) ntohl (msg->pk),
+ (int) ntohl (msg->bw.value__));
+ pr = GNUNET_new (struct PeerRequest);
+ pr->tc = tc;
+ pr->pid = msg->peer;
+ pr->bw = msg->bw;
+ pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
+ if (GNUNET_YES !=
+ GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
+ &pr->pid,
+ pr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+ {
+ GNUNET_break (0);
+ GNUNET_free (pr);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ pr->wc = GNUNET_PEERSTORE_watch (peerstore,
+ "transport",
+ &pr->pid,
+ "hello",
+ &handle_hello,
+ pr);
+ GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
+/**
+ * We have received a `struct ExpressPreferenceMessage` from an application client.
+ *
+ * @param cls handle to the client
+ * @param msg the start message
+ */
+static void
+handle_suggest_cancel (void *cls,
+ const struct ExpressPreferenceMessage *msg)
+{
+ struct TransportClient *tc = cls;
+ struct PeerRequest *pr;
+
+ if (CT_APPLICATION != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
+ &msg->peer);
+ if (NULL == pr)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ (void) stop_peer_request (tc,
+ &pr->pid,
+ pr);
+ GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
+/**
* Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
* messages. We do nothing here, real verification is done later.
*
@@ -4692,13 +4823,8 @@ do_shutdown (void *cls)
ephemeral_task = NULL;
}
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
- &free_neighbour_cb,
- NULL);
- if (NULL != ats)
- {
- GNUNET_ATS_transport_done (ats);
- ats = NULL;
- }
+ &free_neighbour_cb,
+ NULL);
if (NULL != peerstore)
{
GNUNET_PEERSTORE_disconnect (peerstore,
@@ -4779,17 +4905,6 @@ run (void *cls,
GNUNET_SCHEDULER_shutdown ();
return;
}
- ats = GNUNET_ATS_transport_init (GST_cfg,
- &ats_allocation_cb,
- NULL,
- &ats_suggestion_cb,
- NULL);
- if (NULL == ats)
- {
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
}
@@ -4803,6 +4918,15 @@ GNUNET_SERVICE_MAIN
&client_connect_cb,
&client_disconnect_cb,
NULL,
+ /* communication with applications */
+ GNUNET_MQ_hd_fixed_size (suggest,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
+ struct ExpressPreferenceMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (suggest_cancel,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
+ struct ExpressPreferenceMessage,
+ NULL),
/* communication with core */
GNUNET_MQ_hd_fixed_size (client_start,
GNUNET_MESSAGE_TYPE_TRANSPORT_START,