summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-24 21:08:12 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-24 21:08:12 +0100
commit0c8d3480c3cc8bbc1b6bc0a6ae155510aae8f1f3 (patch)
tree25c42af3f33ed869451e83e46ed88e2adaaae5f3 /src/transport/gnunet-service-tng.c
parent4aef73f70ec3bd117236b81b777022dc8cca966c (diff)
tng: towards communicator flow control:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c407
1 files changed, 292 insertions, 115 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 6c3373013..8febbdfff 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -99,18 +99,26 @@
#define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
- * How many messages can we have pending for a given client process
- * before we start to drop incoming messages? We typically should
- * have only one client and so this would be the primary buffer for
- * messages, so the number should be chosen rather generously.
- *
- * The expectation here is that most of the time the queue is large
- * enough so that a drop is virtually never required. Note that
- * this value must be about as large as 'TOTAL_MSGS' in the
- * 'test_transport_api_reliability.c', otherwise that testcase may
- * fail.
+ * How many messages can we have pending for a given communicator
+ * process before we start to throttle that communicator?
+ *
+ * Used if a communicator might be CPU-bound and cannot handle the traffic.
+ */
+#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
+
+/**
+ * How many messages can we have pending for a given session (queue to
+ * a particular peer via a communicator) process before we start to
+ * throttle that queue?
+ *
+ * Used if ATS assigns more bandwidth to a particular transmission
+ * method than that transmission method can right now handle. (Yes,
+ * ATS should eventually notice utilization below allocation and
+ * adjust, but we don't want to queue up tons of messages in the
+ * meantime). Must be significantly below
+ * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
*/
-#define MAX_PENDING (128 * 1024)
+#define SESSION_QUEUE_LIMIT 32
GNUNET_NETWORK_STRUCT_BEGIN
@@ -555,6 +563,40 @@ struct Neighbour;
/**
+ * Entry identifying transmission in one of our `struct
+ * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to
+ * ensure we do not overwhelm a communicator and limit the number of
+ * messages outstanding per communicator (say in case communicator is
+ * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
+ * what the communicator can actually provide towards a particular
+ * peer/target).
+ */
+struct QueueEntry
+{
+
+ /**
+ * Kept as a DLL.
+ */
+ struct QueueEntry *next;
+
+ /**
+ * Kept as a DLL.
+ */
+ struct QueueEntry *prev;
+
+ /**
+ * ATS session this entry is queued with.
+ */
+ struct GNUNET_ATS_Session *session;
+
+ /**
+ * Message ID used for this message with the queue used for transmission.
+ */
+ uint64_t mid;
+};
+
+
+/**
* An ATS session is a message queue provided by a communicator
* via which we can reach a particular neighbour.
*/
@@ -581,6 +623,16 @@ struct GNUNET_ATS_Session
struct GNUNET_ATS_Session *next_client;
/**
+ * Head of DLL of unacked transmission requests.
+ */
+ struct QueueEntry *queue_head;
+
+ /**
+ * End of DLL of unacked transmission requests.
+ */
+ struct QueueEntry *queue_tail;
+
+ /**
* Which neighbour is this ATS session for?
*/
struct Neighbour *neighbour;
@@ -612,6 +664,11 @@ struct GNUNET_ATS_Session
struct GNUNET_TIME_Relative rtt;
/**
+ * Message ID generator for transmissions on this queue.
+ */
+ uint64_t mid_gen;
+
+ /**
* Unique identifier of this ATS session with the communicator.
*/
uint32_t qid;
@@ -627,24 +684,29 @@ struct GNUNET_ATS_Session
uint32_t distance;
/**
- * Network type offered by this ATS session.
+ * Messages pending.
*/
- enum GNUNET_NetworkType nt;
+ uint32_t num_msg_pending;
/**
- * Connection status for this ATS session.
+ * Bytes pending.
*/
- enum GNUNET_TRANSPORT_ConnectionStatus cs;
+ uint32_t num_bytes_pending;
/**
- * Messages pending.
+ * Length of the DLL starting at @e queue_head.
*/
- uint32_t num_msg_pending;
+ unsigned int queue_length;
+
+ /**
+ * Network type offered by this ATS session.
+ */
+ enum GNUNET_NetworkType nt;
/**
- * Bytes pending.
+ * Connection status for this ATS session.
*/
- uint32_t num_bytes_pending;
+ enum GNUNET_TRANSPORT_ConnectionStatus cs;
/**
* How much outbound bandwidth do we have available for this session?
@@ -842,11 +904,6 @@ struct PendingMessage
* initialized if @e msg_uuid_set is #GNUNET_YES).
*/
struct GNUNET_ShortHashCode msg_uuid;
-
- /**
- * Message ID used for this message with the queue used for transmission.
- */
- uint64_t mid;
/**
* Counter incremented per generated fragment.
@@ -1035,6 +1092,13 @@ struct TransportClient
struct AddressListEntry *addr_tail;
/**
+ * Number of queue entries in all queues to this communicator. Used
+ * throttle sending to a communicator if we see that the communicator
+ * is globally unable to keep up.
+ */
+ unsigned int total_queue_length;
+
+ /**
* Characteristics of this communicator.
*/
enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
@@ -1382,41 +1446,142 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
/**
- * Free @a queue.
+ * We believe we are ready to transmit a message on a queue. Double-checks
+ * with the queue's "tracker_out" and then gives the message to the
+ * communicator for transmission (updating the tracker, and re-scheduling
+ * itself if applicable).
+ *
+ * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ */
+static void
+transmit_on_queue (void *cls);
+
+
+/**
+ * Schedule next run of #transmit_on_queue(). Does NOTHING if
+ * we should run immediately or if the message queue is empty.
+ * Test for no task being added AND queue not being empty to
+ * transmit immediately afterwards! This function must only
+ * be called if the message queue is non-empty!
*
- * @param queue the queue to free
+ * @param queue the queue to do scheduling for
+ */
+static void
+schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
+{
+ struct Neighbour *n = queue->neighbour;
+ struct PendingMessage *pm = n->pending_msg_head;
+ struct GNUNET_TIME_Relative out_delay;
+ unsigned int wsize;
+
+ GNUNET_assert (NULL != pm);
+ if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Transmission throttled due to communicator queue limit",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+ if (queue->queue_length >= SESSION_QUEUE_LIMIT)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Transmission throttled due to session queue limit",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+
+ wsize = (0 == queue->mtu)
+ ? pm->bytes_msg /* FIXME: add overheads? */
+ : queue->mtu;
+ out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
+ wsize);
+ out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
+ out_delay);
+ if (0 == out_delay.rel_value_us)
+ return; /* we should run immediately! */
+ /* queue has changed since we were scheduled, reschedule again */
+ queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
+ &transmit_on_queue,
+ queue);
+ if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Next transmission on queue `%s' in %s (high delay)\n",
+ queue->address,
+ GNUNET_STRINGS_relative_time_to_string (out_delay,
+ GNUNET_YES));
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Next transmission on queue `%s' in %s\n",
+ queue->address,
+ GNUNET_STRINGS_relative_time_to_string (out_delay,
+ GNUNET_YES));
+}
+
+
+/**
+ * Free @a session.
+ *
+ * @param session the session to free
*/
static void
-free_queue (struct GNUNET_ATS_Session *queue)
+free_session (struct GNUNET_ATS_Session *session)
{
- struct Neighbour *neighbour = queue->neighbour;
- struct TransportClient *tc = queue->tc;
+ struct Neighbour *neighbour = session->neighbour;
+ struct TransportClient *tc = session->tc;
struct MonitorEvent me = {
.cs = GNUNET_TRANSPORT_CS_DOWN,
.rtt = GNUNET_TIME_UNIT_FOREVER_REL
};
+ struct QueueEntry *qe;
+ int maxxed;
- if (NULL != queue->transmit_task)
+ if (NULL != session->transmit_task)
{
- GNUNET_SCHEDULER_cancel (queue->transmit_task);
- queue->transmit_task = NULL;
+ GNUNET_SCHEDULER_cancel (session->transmit_task);
+ session->transmit_task = NULL;
}
GNUNET_CONTAINER_MDLL_remove (neighbour,
neighbour->session_head,
neighbour->session_tail,
- queue);
+ session);
GNUNET_CONTAINER_MDLL_remove (client,
tc->details.communicator.session_head,
tc->details.communicator.session_tail,
- queue);
+ session);
+ maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
+ while (NULL != (qe = session->queue_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (session->queue_head,
+ session->queue_tail,
+ qe);
+ session->queue_length--;
+ tc->details.communicator.total_queue_length--;
+ GNUNET_free (qe);
+ }
+ GNUNET_assert (0 == session->queue_length);
+ if ( (maxxed) &&
+ (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
+ {
+ /* Communicator dropped below threshold, resume all queues */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Transmission throttled due to communicator queue limit",
+ -1,
+ GNUNET_NO);
+ for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
+ NULL != s;
+ s = s->next_client)
+ schedule_transmit_on_queue (s);
+ }
notify_monitors (&neighbour->pid,
- queue->address,
- queue->nt,
+ session->address,
+ session->nt,
&me);
- GNUNET_ATS_session_del (queue->sr);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
- GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
- GNUNET_free (queue);
+ GNUNET_ATS_session_del (session->sr);
+ GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
+ GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
+ GNUNET_free (session);
if (NULL == neighbour->session_head)
{
cores_send_disconnect_info (&neighbour->pid);
@@ -1499,7 +1664,7 @@ client_disconnect_cb (void *cls,
struct AddressListEntry *ale;
while (NULL != (q = tc->details.communicator.session_head))
- free_queue (q);
+ free_session (q);
while (NULL != (ale = tc->details.communicator.addr_head))
free_address_list_entry (ale);
GNUNET_free (tc->details.communicator.address_prefix);
@@ -2104,64 +2269,6 @@ tracker_update_in_cb (void *cls)
/**
- * We believe we are ready to transmit a message on a queue. Double-checks
- * with the queue's "tracker_out" and then gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
- *
- * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
- */
-static void
-transmit_on_queue (void *cls);
-
-
-/**
- * Schedule next run of #transmit_on_queue(). Does NOTHING if
- * we should run immediately or if the message queue is empty.
- * Test for no task being added AND queue not being empty to
- * transmit immediately afterwards! This function must only
- * be called if the message queue is non-empty!
- *
- * @param queue the queue to do scheduling for
- */
-static void
-schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
-{
- struct Neighbour *n = queue->neighbour;
- struct PendingMessage *pm = n->pending_msg_head;
- struct GNUNET_TIME_Relative out_delay;
- unsigned int wsize;
-
- GNUNET_assert (NULL != pm);
- wsize = (0 == queue->mtu)
- ? pm->bytes_msg /* FIXME: add overheads? */
- : queue->mtu;
- out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
- wsize);
- out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
- out_delay);
- if (0 == out_delay.rel_value_us)
- return; /* we should run immediately! */
- /* queue has changed since we were scheduled, reschedule again */
- queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
- &transmit_on_queue,
- queue);
- if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Next transmission on queue `%s' in %s (high delay)\n",
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay,
- GNUNET_YES));
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Next transmission on queue `%s' in %s\n",
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay,
- GNUNET_YES));
-}
-
-
-/**
* Fragment the given @a pm to the given @a mtu. Adds
* additional fragments to the neighbour as well. If the
* @a mtu is too small, generates and error for the @a pm
@@ -2317,6 +2424,7 @@ transmit_on_queue (void *cls)
{
struct GNUNET_ATS_Session *queue = cls;
struct Neighbour *n = queue->neighbour;
+ struct QueueEntry *qe;
struct PendingMessage *pm;
struct PendingMessage *s;
uint32_t overhead;
@@ -2361,19 +2469,29 @@ transmit_on_queue (void *cls)
return;
}
- // pm->mid = queue->mid_gen++;
+ /* Pass 's' for transission to the communicator */
+ qe = GNUNET_new (struct QueueEntry);
+ qe->mid = queue->mid_gen++;
+ qe->session = queue;
+ // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
+ GNUNET_CONTAINER_DLL_insert (queue->queue_head,
+ queue->queue_tail,
+ qe);
env = GNUNET_MQ_msg_extra (smt,
s->bytes_msg,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
smt->qid = queue->qid;
- // smt->mid = pm->mid;
- // smt->receiver = pid;
+ smt->mid = qe->mid;
+ smt->receiver = n->pid;
memcpy (&smt[1],
&s[1],
s->bytes_msg);
+ GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
+ queue->queue_length++;
+ queue->tc->details.communicator.total_queue_length++;
+ GNUNET_MQ_send (queue->tc->mq,
+ env);
- // FIXME: actually give 's' to communicator for transmission here!
-
// FIXME: do something similar to the logic below
// in defragmentation / reliability ACK handling!
@@ -2653,18 +2771,18 @@ handle_del_queue_message (void *cls,
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head;
- NULL != queue;
- queue = queue->next_client)
+ for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
+ NULL != session;
+ session = session->next_client)
{
- struct Neighbour *neighbour = queue->neighbour;
+ struct Neighbour *neighbour = session->neighbour;
- if ( (dqm->qid != queue->qid) ||
+ if ( (dqm->qid != session->qid) ||
(0 != memcmp (&dqm->receiver,
&neighbour->pid,
sizeof (struct GNUNET_PeerIdentity))) )
continue;
- free_queue (queue);
+ free_session (session);
GNUNET_SERVICE_client_continue (tc->client);
return;
}
@@ -2684,20 +2802,79 @@ handle_send_message_ack (void *cls,
const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
{
struct TransportClient *tc = cls;
-
+ struct QueueEntry *queue;
+
if (CT_COMMUNICATOR != tc->type)
{
GNUNET_break (0);
GNUNET_SERVICE_client_drop (tc->client);
return;
}
+
+ /* find our queue entry matching the ACK */
+ queue = NULL;
+ for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
+ NULL != session;
+ session = session->next_client)
+ {
+ if (0 != memcmp (&session->neighbour->pid,
+ &sma->receiver,
+ sizeof (struct GNUNET_PeerIdentity)))
+ continue;
+ for (struct QueueEntry *qe = session->queue_head;
+ NULL != qe;
+ qe = qe->next)
+ {
+ if (qe->mid != sma->mid)
+ continue;
+ queue = qe;
+ break;
+ }
+ break;
+ }
+ if (NULL == queue)
+ {
+ /* this should never happen */
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
+ queue->session->queue_tail,
+ queue);
+ queue->session->queue_length--;
+ tc->details.communicator.total_queue_length--;
+ GNUNET_SERVICE_client_continue (tc->client);
+
+ /* if applicable, resume transmissions that waited on ACK */
+ if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
+ {
+ /* Communicator dropped below threshold, resume all queues */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Transmission throttled due to communicator queue limit",
+ -1,
+ GNUNET_NO);
+ for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
+ NULL != session;
+ session = session->next_client)
+ schedule_transmit_on_queue (session);
+ }
+ else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
+ {
+ /* queue dropped below threshold; only resume this one queue */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Transmission throttled due to session queue limit",
+ -1,
+ GNUNET_NO);
+ schedule_transmit_on_queue (queue->session);
+ }
+
+ /* TODO: we also should react on the status! */
+ // FIXME: this probably requires queue->pm = s assignment!
// FIXME: react to communicator status about transmission request. We got:
sma->status; // OK success, SYSERR failure
- sma->mid; // message ID of original message
- sma->receiver; // receiver of original message
-
- GNUNET_SERVICE_client_continue (tc->client);
+ GNUNET_free (queue);
}