From d97224045fe41e824406f771e24c46fb89514942 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 21 Apr 2019 12:13:33 +0200 Subject: handle communicator status, address a few FIXMEs --- src/transport/gnunet-service-tng.c | 197 ++++++++++++++++++++++++++----------- 1 file changed, 139 insertions(+), 58 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 62e9c0d8e..697c43f0d 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -1097,6 +1097,10 @@ struct DistanceVector */ struct Queue; +/** + * Message awaiting transmission. See detailed comments below. + */ +struct PendingMessage; /** * Entry identifying transmission in one of our `struct @@ -1125,6 +1129,11 @@ struct QueueEntry */ struct Queue *queue; + /** + * Pending message this entry is for, or NULL for none. + */ + struct PendingMessage *pm; + /** * Message ID used for this message with the queue used for transmission. */ @@ -1584,6 +1593,16 @@ struct PendingMessage */ struct Neighbour *target; + /** + * Set to non-NULL value if this message is currently being given to a + * communicator and we are awaiting that communicator's acknowledgement. + * Note that we must not retransmit a pending message while we're still + * in the process of giving it to a communicator. If a pending message + * is free'd while this entry is non-NULL, the @e qe reference to us + * should simply be set to NULL. + */ + struct QueueEntry *qe; + /** * Client that issued the transmission request, if @e pmt is #PMT_CORE. */ @@ -2472,9 +2491,12 @@ transmit_on_queue (void *cls); * be called if the message queue is non-empty! * * @param queue the queue to do scheduling for + * @param inside_job set to #GNUNET_YES if called from + * #transmit_on_queue() itself and NOT setting + * the task means running immediately */ static void -schedule_transmit_on_queue (struct Queue *queue) +schedule_transmit_on_queue (struct Queue *queue, int inside_job) { struct Neighbour *n = queue->neighbour; struct PendingMessage *pm = n->pending_msg_head; @@ -2507,7 +2529,7 @@ schedule_transmit_on_queue (struct Queue *queue) out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining ( pm->next_attempt), out_delay); - if (0 == out_delay.rel_value_us) + if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us)) return; /* we should run immediately! */ /* queue has changed since we were scheduled, reschedule again */ queue->transmit_task = @@ -2575,6 +2597,11 @@ free_queue (struct Queue *queue) GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe); queue->queue_length--; tc->details.communicator.total_queue_length--; + if (NULL != qe->pm) + { + GNUNET_assert (qe == qe->pm->qe); + qe->pm->qe = NULL; + } GNUNET_free (qe); } GNUNET_assert (0 == queue->queue_length); @@ -2589,7 +2616,7 @@ free_queue (struct Queue *queue) GNUNET_NO); for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; s = s->next_client) - schedule_transmit_on_queue (s); + schedule_transmit_on_queue (s, GNUNET_NO); } notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); @@ -2859,6 +2886,11 @@ free_pending_message (struct PendingMessage *pm) target->pending_msg_tail, pm); free_fragment_tree (pm); + if (NULL != pm->qe) + { + GNUNET_assert (pm == pm->qe->pm); + pm->qe->pm = NULL; + } GNUNET_free_non_null (pm->bpm); GNUNET_free (pm); } @@ -3245,8 +3277,12 @@ queue_send_msg (struct Queue *queue, qe = GNUNET_new (struct QueueEntry); qe->mid = queue->mid_gen++; qe->queue = queue; - // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'! - // (also, note that pm may be NULL!) + if (NULL != pm) + { + qe->pm = pm; + GNUNET_assert (NULL == pm->qe); + pm->qe = qe; + } GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); queue->queue_length++; @@ -6037,6 +6073,60 @@ reliability_box_message (struct PendingMessage *pm) } +/** + * Change the value of the `next_attempt` field of @a pm + * to @a next_attempt and re-order @a pm in the transmission + * list as required by the new timestmap. + * + * @param pm a pending message to update + * @param next_attempt timestamp to use + */ +static void +update_pm_next_attempt (struct PendingMessage *pm, + struct GNUNET_TIME_Absolute next_attempt) +{ + struct Neighbour *neighbour = pm->target; + + pm->next_attempt = next_attempt; + if (NULL == pm->frag_parent) + { + struct PendingMessage *pos; + + /* re-insert sort in neighbour list */ + GNUNET_CONTAINER_MDLL_remove (neighbour, + neighbour->pending_msg_head, + neighbour->pending_msg_tail, + pm); + pos = neighbour->pending_msg_tail; + while ((NULL != pos) && + (next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) + pos = pos->prev_neighbour; + GNUNET_CONTAINER_MDLL_insert_after (neighbour, + neighbour->pending_msg_head, + neighbour->pending_msg_tail, + pos, + pm); + } + else + { + /* re-insert sort in fragment list */ + struct PendingMessage *fp = pm->frag_parent; + struct PendingMessage *pos; + + GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm); + pos = fp->tail_frag; + while ((NULL != pos) && + (next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) + pos = pos->prev_frag; + GNUNET_CONTAINER_MDLL_insert_after (frag, + fp->head_frag, + fp->tail_frag, + pos, + pm); + } +} + + /** * We believe we are ready to transmit a message on a queue. Double-checks * with the queue's "tracker_out" and then gives the message to the @@ -6060,7 +6150,13 @@ transmit_on_queue (void *cls) /* no message pending, nothing to do here! */ return; } - schedule_transmit_on_queue (queue); + if (NULL != pm->qe) + { + /* message still pending with communciator! + LOGGING-FIXME: Use stats? logging? Should this not be rare? */ + return; + } + schedule_transmit_on_queue (queue, GNUNET_YES); if (NULL != queue->transmit_task) return; /* do it later */ overhead = 0; @@ -6081,7 +6177,7 @@ transmit_on_queue (void *cls) if (NULL == s) { /* Fragmentation failed, try next message... */ - schedule_transmit_on_queue (queue); + schedule_transmit_on_queue (queue, GNUNET_NO); return; } if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) @@ -6089,7 +6185,7 @@ transmit_on_queue (void *cls) if (NULL == s) { /* Reliability boxing failed, try next message... */ - schedule_transmit_on_queue (queue); + schedule_transmit_on_queue (queue, GNUNET_NO); return; } @@ -6141,57 +6237,21 @@ transmit_on_queue (void *cls) } else { - /* message not finished, waiting for acknowledgement */ - struct Neighbour *neighbour = pm->target; - /* Update time by which we might retransmit 's' based on queue + /* Message not finished, waiting for acknowledgement. + Update time by which we might retransmit 's' based on queue characteristics (i.e. RTT); it takes one RTT for the message to arrive and the ACK to come back in the best case; but the other side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before retransmitting. Note that in the future this heuristic should likely be improved further (measure RTT stability, consider message urgency and size when delaying ACKs, etc.) */ - s->next_attempt = GNUNET_TIME_relative_to_absolute ( - GNUNET_TIME_relative_multiply (queue->rtt, 4)); - if (s == pm) - { - struct PendingMessage *pos; - - /* re-insert sort in neighbour list */ - GNUNET_CONTAINER_MDLL_remove (neighbour, - neighbour->pending_msg_head, - neighbour->pending_msg_tail, - pm); - pos = neighbour->pending_msg_tail; - while ((NULL != pos) && - (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) - pos = pos->prev_neighbour; - GNUNET_CONTAINER_MDLL_insert_after (neighbour, - neighbour->pending_msg_head, - neighbour->pending_msg_tail, - pos, - pm); - } - else - { - /* re-insert sort in fragment list */ - struct PendingMessage *fp = s->frag_parent; - struct PendingMessage *pos; - - GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, s); - pos = fp->tail_frag; - while ((NULL != pos) && - (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us)) - pos = pos->prev_frag; - GNUNET_CONTAINER_MDLL_insert_after (frag, - fp->head_frag, - fp->tail_frag, - pos, - s); - } + update_pm_next_attempt (s, + GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_relative_multiply (queue->rtt, 4))); } /* finally, re-schedule queue transmission task itself */ - schedule_transmit_on_queue (queue); + schedule_transmit_on_queue (queue, GNUNET_NO); } @@ -6216,7 +6276,7 @@ tracker_update_out_cb (void *cls) } GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = NULL; - schedule_transmit_on_queue (queue); + schedule_transmit_on_queue (queue, GNUNET_NO); } @@ -6309,6 +6369,7 @@ handle_send_message_ack (void *cls, { struct TransportClient *tc = cls; struct QueueEntry *qe; + struct PendingMessage *pm; if (CT_COMMUNICATOR != tc->type) { @@ -6352,7 +6413,8 @@ handle_send_message_ack (void *cls, if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length) { - /* Communicator dropped below threshold, resume all queues */ + /* Communicator dropped below threshold, resume all queues + incident with this client! */ GNUNET_STATISTICS_update ( GST_stats, "# Transmission throttled due to communicator queue limit", @@ -6361,7 +6423,7 @@ handle_send_message_ack (void *cls, for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue; queue = queue->next_client) - schedule_transmit_on_queue (queue); + schedule_transmit_on_queue (queue, GNUNET_NO); } else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) { @@ -6370,14 +6432,33 @@ handle_send_message_ack (void *cls, "# Transmission throttled due to queue queue limit", -1, GNUNET_NO); - schedule_transmit_on_queue (qe->queue); + schedule_transmit_on_queue (qe->queue, GNUNET_NO); } - /* TODO: we also should react on the status! */ - // FIXME: this probably requires queue->pm = s assignment! - // FIXME: react to communicator status about transmission request. We got: - sma->status; // OK success, SYSERR failure + if (NULL != (pm = qe->pm)) + { + struct Neighbour *n; + GNUNET_assert (qe == pm->qe); + pm->qe = NULL; + /* If waiting for this communicator may have blocked transmission + of pm on other queues for this neighbour, force schedule + transmit on queue for queues of the neighbour */ + n = pm->target; + if (n->pending_msg_head == pm) + { + for (struct Queue *queue = n->queue_head; NULL != queue; + queue = queue->next_neighbour) + schedule_transmit_on_queue (queue, GNUNET_NO); + } + if (GNUNET_OK != ntohl (sma->status)) + { + GNUNET_log ( + GNUNET_ERROR_TYPE_INFO, + "Queue failed in transmission, will try retransmission immediately\n"); + update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS); + } + } GNUNET_free (qe); } -- cgit v1.2.3