From 0c8d3480c3cc8bbc1b6bc0a6ae155510aae8f1f3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 24 Jan 2019 21:08:12 +0100 Subject: tng: towards communicator flow control: --- src/transport/gnunet-service-tng.c | 407 ++++++++++++++++++++++++++----------- 1 file changed, 292 insertions(+), 115 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 6c3373013..8febbdfff 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -99,18 +99,26 @@ #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) /** - * How many messages can we have pending for a given client process - * before we start to drop incoming messages? We typically should - * have only one client and so this would be the primary buffer for - * messages, so the number should be chosen rather generously. - * - * The expectation here is that most of the time the queue is large - * enough so that a drop is virtually never required. Note that - * this value must be about as large as 'TOTAL_MSGS' in the - * 'test_transport_api_reliability.c', otherwise that testcase may - * fail. + * How many messages can we have pending for a given communicator + * process before we start to throttle that communicator? + * + * Used if a communicator might be CPU-bound and cannot handle the traffic. + */ +#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512 + +/** + * How many messages can we have pending for a given session (queue to + * a particular peer via a communicator) process before we start to + * throttle that queue? + * + * Used if ATS assigns more bandwidth to a particular transmission + * method than that transmission method can right now handle. (Yes, + * ATS should eventually notice utilization below allocation and + * adjust, but we don't want to queue up tons of messages in the + * meantime). Must be significantly below + * #COMMUNICATOR_TOTAL_QUEUE_LIMIT. */ -#define MAX_PENDING (128 * 1024) +#define SESSION_QUEUE_LIMIT 32 GNUNET_NETWORK_STRUCT_BEGIN @@ -554,6 +562,40 @@ struct TransportClient; struct Neighbour; +/** + * Entry identifying transmission in one of our `struct + * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to + * ensure we do not overwhelm a communicator and limit the number of + * messages outstanding per communicator (say in case communicator is + * CPU bound) and per queue (in case ATS bandwidth allocation exceeds + * what the communicator can actually provide towards a particular + * peer/target). + */ +struct QueueEntry +{ + + /** + * Kept as a DLL. + */ + struct QueueEntry *next; + + /** + * Kept as a DLL. + */ + struct QueueEntry *prev; + + /** + * ATS session this entry is queued with. + */ + struct GNUNET_ATS_Session *session; + + /** + * Message ID used for this message with the queue used for transmission. + */ + uint64_t mid; +}; + + /** * An ATS session is a message queue provided by a communicator * via which we can reach a particular neighbour. @@ -580,6 +622,16 @@ struct GNUNET_ATS_Session */ struct GNUNET_ATS_Session *next_client; + /** + * Head of DLL of unacked transmission requests. + */ + struct QueueEntry *queue_head; + + /** + * End of DLL of unacked transmission requests. + */ + struct QueueEntry *queue_tail; + /** * Which neighbour is this ATS session for? */ @@ -611,6 +663,11 @@ struct GNUNET_ATS_Session */ struct GNUNET_TIME_Relative rtt; + /** + * Message ID generator for transmissions on this queue. + */ + uint64_t mid_gen; + /** * Unique identifier of this ATS session with the communicator. */ @@ -627,24 +684,29 @@ struct GNUNET_ATS_Session uint32_t distance; /** - * Network type offered by this ATS session. + * Messages pending. */ - enum GNUNET_NetworkType nt; + uint32_t num_msg_pending; /** - * Connection status for this ATS session. + * Bytes pending. */ - enum GNUNET_TRANSPORT_ConnectionStatus cs; + uint32_t num_bytes_pending; /** - * Messages pending. + * Length of the DLL starting at @e queue_head. */ - uint32_t num_msg_pending; + unsigned int queue_length; + + /** + * Network type offered by this ATS session. + */ + enum GNUNET_NetworkType nt; /** - * Bytes pending. + * Connection status for this ATS session. */ - uint32_t num_bytes_pending; + enum GNUNET_TRANSPORT_ConnectionStatus cs; /** * How much outbound bandwidth do we have available for this session? @@ -842,11 +904,6 @@ struct PendingMessage * initialized if @e msg_uuid_set is #GNUNET_YES). */ struct GNUNET_ShortHashCode msg_uuid; - - /** - * Message ID used for this message with the queue used for transmission. - */ - uint64_t mid; /** * Counter incremented per generated fragment. @@ -1034,6 +1091,13 @@ struct TransportClient */ struct AddressListEntry *addr_tail; + /** + * Number of queue entries in all queues to this communicator. Used + * throttle sending to a communicator if we see that the communicator + * is globally unable to keep up. + */ + unsigned int total_queue_length; + /** * Characteristics of this communicator. */ @@ -1382,41 +1446,142 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) /** - * Free @a queue. + * We believe we are ready to transmit a message on a queue. Double-checks + * with the queue's "tracker_out" and then gives the message to the + * communicator for transmission (updating the tracker, and re-scheduling + * itself if applicable). + * + * @param cls the `struct GNUNET_ATS_Session` to process transmissions for + */ +static void +transmit_on_queue (void *cls); + + +/** + * Schedule next run of #transmit_on_queue(). Does NOTHING if + * we should run immediately or if the message queue is empty. + * Test for no task being added AND queue not being empty to + * transmit immediately afterwards! This function must only + * be called if the message queue is non-empty! * - * @param queue the queue to free + * @param queue the queue to do scheduling for + */ +static void +schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) +{ + struct Neighbour *n = queue->neighbour; + struct PendingMessage *pm = n->pending_msg_head; + struct GNUNET_TIME_Relative out_delay; + unsigned int wsize; + + GNUNET_assert (NULL != pm); + if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT) + { + GNUNET_STATISTICS_update (GST_stats, + "# Transmission throttled due to communicator queue limit", + 1, + GNUNET_NO); + return; + } + if (queue->queue_length >= SESSION_QUEUE_LIMIT) + { + GNUNET_STATISTICS_update (GST_stats, + "# Transmission throttled due to session queue limit", + 1, + GNUNET_NO); + return; + } + + wsize = (0 == queue->mtu) + ? pm->bytes_msg /* FIXME: add overheads? */ + : queue->mtu; + out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, + wsize); + out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), + out_delay); + if (0 == out_delay.rel_value_us) + return; /* we should run immediately! */ + /* queue has changed since we were scheduled, reschedule again */ + queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, + &transmit_on_queue, + queue); + if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Next transmission on queue `%s' in %s (high delay)\n", + queue->address, + GNUNET_STRINGS_relative_time_to_string (out_delay, + GNUNET_YES)); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Next transmission on queue `%s' in %s\n", + queue->address, + GNUNET_STRINGS_relative_time_to_string (out_delay, + GNUNET_YES)); +} + + +/** + * Free @a session. + * + * @param session the session to free */ static void -free_queue (struct GNUNET_ATS_Session *queue) +free_session (struct GNUNET_ATS_Session *session) { - struct Neighbour *neighbour = queue->neighbour; - struct TransportClient *tc = queue->tc; + struct Neighbour *neighbour = session->neighbour; + struct TransportClient *tc = session->tc; struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN, .rtt = GNUNET_TIME_UNIT_FOREVER_REL }; + struct QueueEntry *qe; + int maxxed; - if (NULL != queue->transmit_task) + if (NULL != session->transmit_task) { - GNUNET_SCHEDULER_cancel (queue->transmit_task); - queue->transmit_task = NULL; + GNUNET_SCHEDULER_cancel (session->transmit_task); + session->transmit_task = NULL; } GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->session_head, neighbour->session_tail, - queue); + session); GNUNET_CONTAINER_MDLL_remove (client, tc->details.communicator.session_head, tc->details.communicator.session_tail, - queue); + session); + maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length); + while (NULL != (qe = session->queue_head)) + { + GNUNET_CONTAINER_DLL_remove (session->queue_head, + session->queue_tail, + qe); + session->queue_length--; + tc->details.communicator.total_queue_length--; + GNUNET_free (qe); + } + GNUNET_assert (0 == session->queue_length); + if ( (maxxed) && + (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) ) + { + /* Communicator dropped below threshold, resume all queues */ + GNUNET_STATISTICS_update (GST_stats, + "# Transmission throttled due to communicator queue limit", + -1, + GNUNET_NO); + for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head; + NULL != s; + s = s->next_client) + schedule_transmit_on_queue (s); + } notify_monitors (&neighbour->pid, - queue->address, - queue->nt, + session->address, + session->nt, &me); - GNUNET_ATS_session_del (queue->sr); - GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); - GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); - GNUNET_free (queue); + GNUNET_ATS_session_del (session->sr); + GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in); + GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out); + GNUNET_free (session); if (NULL == neighbour->session_head) { cores_send_disconnect_info (&neighbour->pid); @@ -1499,7 +1664,7 @@ client_disconnect_cb (void *cls, struct AddressListEntry *ale; while (NULL != (q = tc->details.communicator.session_head)) - free_queue (q); + free_session (q); while (NULL != (ale = tc->details.communicator.addr_head)) free_address_list_entry (ale); GNUNET_free (tc->details.communicator.address_prefix); @@ -2103,64 +2268,6 @@ tracker_update_in_cb (void *cls) } -/** - * We believe we are ready to transmit a message on a queue. Double-checks - * with the queue's "tracker_out" and then gives the message to the - * communicator for transmission (updating the tracker, and re-scheduling - * itself if applicable). - * - * @param cls the `struct GNUNET_ATS_Session` to process transmissions for - */ -static void -transmit_on_queue (void *cls); - - -/** - * Schedule next run of #transmit_on_queue(). Does NOTHING if - * we should run immediately or if the message queue is empty. - * Test for no task being added AND queue not being empty to - * transmit immediately afterwards! This function must only - * be called if the message queue is non-empty! - * - * @param queue the queue to do scheduling for - */ -static void -schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) -{ - struct Neighbour *n = queue->neighbour; - struct PendingMessage *pm = n->pending_msg_head; - struct GNUNET_TIME_Relative out_delay; - unsigned int wsize; - - GNUNET_assert (NULL != pm); - wsize = (0 == queue->mtu) - ? pm->bytes_msg /* FIXME: add overheads? */ - : queue->mtu; - out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, - wsize); - out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), - out_delay); - if (0 == out_delay.rel_value_us) - return; /* we should run immediately! */ - /* queue has changed since we were scheduled, reschedule again */ - queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, - &transmit_on_queue, - queue); - if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Next transmission on queue `%s' in %s (high delay)\n", - queue->address, - GNUNET_STRINGS_relative_time_to_string (out_delay, - GNUNET_YES)); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Next transmission on queue `%s' in %s\n", - queue->address, - GNUNET_STRINGS_relative_time_to_string (out_delay, - GNUNET_YES)); -} - - /** * Fragment the given @a pm to the given @a mtu. Adds * additional fragments to the neighbour as well. If the @@ -2317,6 +2424,7 @@ transmit_on_queue (void *cls) { struct GNUNET_ATS_Session *queue = cls; struct Neighbour *n = queue->neighbour; + struct QueueEntry *qe; struct PendingMessage *pm; struct PendingMessage *s; uint32_t overhead; @@ -2361,19 +2469,29 @@ transmit_on_queue (void *cls) return; } - // pm->mid = queue->mid_gen++; + /* Pass 's' for transission to the communicator */ + qe = GNUNET_new (struct QueueEntry); + qe->mid = queue->mid_gen++; + qe->session = queue; + // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'! + GNUNET_CONTAINER_DLL_insert (queue->queue_head, + queue->queue_tail, + qe); env = GNUNET_MQ_msg_extra (smt, s->bytes_msg, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); smt->qid = queue->qid; - // smt->mid = pm->mid; - // smt->receiver = pid; + smt->mid = qe->mid; + smt->receiver = n->pid; memcpy (&smt[1], &s[1], s->bytes_msg); + GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); + queue->queue_length++; + queue->tc->details.communicator.total_queue_length++; + GNUNET_MQ_send (queue->tc->mq, + env); - // FIXME: actually give 's' to communicator for transmission here! - // FIXME: do something similar to the logic below // in defragmentation / reliability ACK handling! @@ -2653,18 +2771,18 @@ handle_del_queue_message (void *cls, GNUNET_SERVICE_client_drop (tc->client); return; } - for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head; - NULL != queue; - queue = queue->next_client) + for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; + NULL != session; + session = session->next_client) { - struct Neighbour *neighbour = queue->neighbour; + struct Neighbour *neighbour = session->neighbour; - if ( (dqm->qid != queue->qid) || + if ( (dqm->qid != session->qid) || (0 != memcmp (&dqm->receiver, &neighbour->pid, sizeof (struct GNUNET_PeerIdentity))) ) continue; - free_queue (queue); + free_session (session); GNUNET_SERVICE_client_continue (tc->client); return; } @@ -2684,20 +2802,79 @@ handle_send_message_ack (void *cls, const struct GNUNET_TRANSPORT_SendMessageToAck *sma) { struct TransportClient *tc = cls; - + struct QueueEntry *queue; + if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } + + /* find our queue entry matching the ACK */ + queue = NULL; + for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; + NULL != session; + session = session->next_client) + { + if (0 != memcmp (&session->neighbour->pid, + &sma->receiver, + sizeof (struct GNUNET_PeerIdentity))) + continue; + for (struct QueueEntry *qe = session->queue_head; + NULL != qe; + qe = qe->next) + { + if (qe->mid != sma->mid) + continue; + queue = qe; + break; + } + break; + } + if (NULL == queue) + { + /* this should never happen */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + GNUNET_CONTAINER_DLL_remove (queue->session->queue_head, + queue->session->queue_tail, + queue); + queue->session->queue_length--; + tc->details.communicator.total_queue_length--; + GNUNET_SERVICE_client_continue (tc->client); + + /* if applicable, resume transmissions that waited on ACK */ + if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length) + { + /* Communicator dropped below threshold, resume all queues */ + GNUNET_STATISTICS_update (GST_stats, + "# Transmission throttled due to communicator queue limit", + -1, + GNUNET_NO); + for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; + NULL != session; + session = session->next_client) + schedule_transmit_on_queue (session); + } + else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length) + { + /* queue dropped below threshold; only resume this one queue */ + GNUNET_STATISTICS_update (GST_stats, + "# Transmission throttled due to session queue limit", + -1, + GNUNET_NO); + schedule_transmit_on_queue (queue->session); + } + + /* TODO: we also should react on the status! */ + // FIXME: this probably requires queue->pm = s assignment! // FIXME: react to communicator status about transmission request. We got: sma->status; // OK success, SYSERR failure - sma->mid; // message ID of original message - sma->receiver; // receiver of original message - - GNUNET_SERVICE_client_continue (tc->client); + GNUNET_free (queue); } -- cgit v1.2.3