From 32b38707097f8dc9f7f39c526f67414f24283eca Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 21 Jan 2019 17:45:07 +0100 Subject: handle transmission timeouts --- src/transport/gnunet-service-tng.c | 192 +++++++++++++++++++++++++++++++------ 1 file changed, 163 insertions(+), 29 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 3673958ec..e205fa3d7 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -439,6 +439,11 @@ struct Neighbour */ struct GNUNET_ATS_Session *session_tail; + /** + * Task run to cleanup pending messages that have exceeded their timeout. + */ + struct GNUNET_SCHEDULER_Task *timeout_task; + /** * Quota at which CORE is allowed to transmit to this peer * according to ATS. @@ -451,6 +456,11 @@ struct Neighbour */ struct GNUNET_BANDWIDTH_Value32NBO quota_out; + /** + * What is the earliest timeout of any message in @e pending_msg_tail? + */ + struct GNUNET_TIME_Absolute earliest_timeout; + }; @@ -489,11 +499,22 @@ struct PendingMessage */ struct TransportClient *client; + /** + * At what time should we give up on the transmission (and no longer retry)? + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * What is the earliest time for us to retry transmission of this message? + */ + struct GNUNET_TIME_Absolute next_attempt; + /** * Size of the original message. */ uint32_t bytes_msg; + /* Followed by @e bytes_msg to transmit */ }; @@ -592,7 +613,8 @@ struct TransportClient struct { /** - * Head of list of messages pending for this client. + * Head of list of messages pending for this client, sorted by + * transmission time ("next_attempt" + possibly internal prioritization). */ struct PendingMessage *pending_msg_head; @@ -920,6 +942,8 @@ free_neighbour (struct Neighbour *neighbour) GNUNET_CONTAINER_multipeermap_remove (neighbours, &neighbour->pid, neighbour)); + if (NULL != neighbour->timeout_task) + GNUNET_SCHEDULER_cancel (neighbour->timeout_task); GNUNET_free (neighbour); } @@ -1272,6 +1296,50 @@ client_send_response (struct PendingMessage *pm, } +/** + * Checks the message queue for a neighbour for messages that have timed + * out and purges them. + * + * @param cls a `struct Neighbour` + */ +static void +check_queue_timeouts (void *cls) +{ + struct Neighbour *n = cls; + struct PendingMessage *pm; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Absolute earliest_timeout; + + n->timeout_task = NULL; + earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; + now = GNUNET_TIME_absolute_get (); + for (struct PendingMessage *pos = n->pending_msg_head; + NULL != pos; + pos = pm) + { + pm = pos->next_neighbour; + if (pos->timeout.abs_value_us <= now.abs_value_us) + { + GNUNET_STATISTICS_update (GST_stats, + "# messages dropped (timeout before confirmation)", + 1, + GNUNET_NO); + client_send_response (pm, + GNUNET_NO, + 0); + continue; + } + earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout, + pos->timeout); + } + n->earliest_timeout = earliest_timeout; + if (NULL != n->pending_msg_head) + n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout, + &check_queue_timeouts, + n); +} + + /** * Client asked for transmission to a peer. Process the request. * @@ -1316,10 +1384,14 @@ handle_client_send (void *cls, GNUNET_NO); return; } - pm = GNUNET_new (struct PendingMessage); + pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); pm->client = tc; pm->target = target; pm->bytes_msg = bytes_msg; + pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); + memcpy (&pm[1], + &obm[1], + bytes_msg); GNUNET_CONTAINER_MDLL_insert (neighbour, target->pending_msg_head, target->pending_msg_tail, @@ -1328,10 +1400,16 @@ handle_client_send (void *cls, tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); - // FIXME: do the work, final continuation with call to: - client_send_response (pm, - GNUNET_NO, - 0); + if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) + { + target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; + if (NULL != target->timeout_task) + GNUNET_SCHEDULER_cancel (target->timeout_task); + target->timeout_task + = GNUNET_SCHEDULER_add_at (target->earliest_timeout, + &check_queue_timeouts, + target); + } } @@ -1652,45 +1730,37 @@ tracker_update_in_cb (void *cls) * @param cls the `struct GNUNET_ATS_Session` to process transmissions for */ static void -transmit_on_queue (void *cls) -{ - struct GNUNET_ATS_Session *queue = cls; - - queue->transmit_task = NULL; - // FIXME: check if transmission is really ready - // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.) - // FIXME: re-schedule self -} +transmit_on_queue (void *cls); /** - * Bandwidth tracker informs us that the delay until we - * can transmit again changed. + * Schedule next run of #transmit_on_queue(). Does NOTHING if + * we should run immediately or if the message queue is empty. + * Test for no task being added AND queue not being empty to + * transmit immediately afterwards! This function must only + * be called if the message queue is non-empty! * - * @param cls a `struct GNUNET_ATS_Session` for which the delay changed - */ + * @param queue the queue to do scheduling for + */ static void -tracker_update_out_cb (void *cls) +schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) { - struct GNUNET_ATS_Session *queue = cls; struct Neighbour *n = queue->neighbour; struct PendingMessage *pm = n->pending_msg_head; struct GNUNET_TIME_Relative out_delay; unsigned int wsize; - if (NULL == pm) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Bandwidth allocation updated for empty transmission queue `%s'\n", - queue->address); - return; /* no message pending, nothing to do here! */ - } + GNUNET_assert (NULL != pm); wsize = (0 == queue->mtu) ? pm->bytes_msg /* FIXME: add overheads? */ : queue->mtu; out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, wsize); - GNUNET_SCHEDULER_cancel (queue->transmit_task); + out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), + out_delay); + if (0 == out_delay.rel_value_us) + return; /* we should run immediately! */ + /* queue has changed since we were scheduled, reschedule again */ queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); @@ -1709,6 +1779,69 @@ tracker_update_out_cb (void *cls) } +/** + * We believe we are ready to transmit a message on a queue. Double-checks + * with the queue's "tracker_out" and then gives the message to the + * communicator for transmission (updating the tracker, and re-scheduling + * itself if applicable). + * + * @param cls the `struct GNUNET_ATS_Session` to process transmissions for + */ +static void +transmit_on_queue (void *cls) +{ + struct GNUNET_ATS_Session *queue = cls; + struct Neighbour *n = queue->neighbour; + struct PendingMessage *pm; + + queue->transmit_task = NULL; + if (NULL == (pm = n->pending_msg_head)) + { + /* no message pending, nothing to do here! */ + return; + } + schedule_transmit_on_queue (queue); + if (NULL != queue->transmit_task) + return; /* do it later */ + + // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.) + // FIXME: upon success, do (not here in continuation!) + if (0) + { + client_send_response (pm, + GNUNET_YES, + 0); + } + /* finally, re-schedule self */ + schedule_transmit_on_queue (queue); +} + + +/** + * Bandwidth tracker informs us that the delay until we + * can transmit again changed. + * + * @param cls a `struct GNUNET_ATS_Session` for which the delay changed + */ +static void +tracker_update_out_cb (void *cls) +{ + struct GNUNET_ATS_Session *queue = cls; + struct Neighbour *n = queue->neighbour; + + if (NULL == n->pending_msg_head) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bandwidth allocation updated for empty transmission queue `%s'\n", + queue->address); + return; /* no message pending, nothing to do here! */ + } + GNUNET_SCHEDULER_cancel (queue->transmit_task); + queue->transmit_task = NULL; + schedule_transmit_on_queue (queue); +} + + /** * Bandwidth tracker informs us that excessive outbound bandwidth was * allocated which is not being used. @@ -1768,6 +1901,7 @@ handle_add_queue_message (void *cls, if (NULL == neighbour) { neighbour = GNUNET_new (struct Neighbour); + neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; neighbour->pid = aqm->receiver; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (neighbours, -- cgit v1.2.3