summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-21 17:45:07 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-21 17:45:07 +0100
commit32b38707097f8dc9f7f39c526f67414f24283eca (patch)
treec390919ff3dcc21a0328367afc05d6592278f5da /src/transport/gnunet-service-tng.c
parent5391d3d34f3bf7f40f37f9e6038466002f422bb3 (diff)
handle transmission timeouts
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c192
1 files changed, 163 insertions, 29 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 3673958ec..e205fa3d7 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -440,6 +440,11 @@ struct Neighbour
struct GNUNET_ATS_Session *session_tail;
/**
+ * Task run to cleanup pending messages that have exceeded their timeout.
+ */
+ struct GNUNET_SCHEDULER_Task *timeout_task;
+
+ /**
* Quota at which CORE is allowed to transmit to this peer
* according to ATS.
*
@@ -451,6 +456,11 @@ struct Neighbour
*/
struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+ /**
+ * What is the earliest timeout of any message in @e pending_msg_tail?
+ */
+ struct GNUNET_TIME_Absolute earliest_timeout;
+
};
@@ -490,10 +500,21 @@ struct PendingMessage
struct TransportClient *client;
/**
+ * At what time should we give up on the transmission (and no longer retry)?
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * What is the earliest time for us to retry transmission of this message?
+ */
+ struct GNUNET_TIME_Absolute next_attempt;
+
+ /**
* Size of the original message.
*/
uint32_t bytes_msg;
+ /* Followed by @e bytes_msg to transmit */
};
@@ -592,7 +613,8 @@ struct TransportClient
struct {
/**
- * Head of list of messages pending for this client.
+ * Head of list of messages pending for this client, sorted by
+ * transmission time ("next_attempt" + possibly internal prioritization).
*/
struct PendingMessage *pending_msg_head;
@@ -920,6 +942,8 @@ free_neighbour (struct Neighbour *neighbour)
GNUNET_CONTAINER_multipeermap_remove (neighbours,
&neighbour->pid,
neighbour));
+ if (NULL != neighbour->timeout_task)
+ GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
GNUNET_free (neighbour);
}
@@ -1273,6 +1297,50 @@ client_send_response (struct PendingMessage *pm,
/**
+ * Checks the message queue for a neighbour for messages that have timed
+ * out and purges them.
+ *
+ * @param cls a `struct Neighbour`
+ */
+static void
+check_queue_timeouts (void *cls)
+{
+ struct Neighbour *n = cls;
+ struct PendingMessage *pm;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Absolute earliest_timeout;
+
+ n->timeout_task = NULL;
+ earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
+ now = GNUNET_TIME_absolute_get ();
+ for (struct PendingMessage *pos = n->pending_msg_head;
+ NULL != pos;
+ pos = pm)
+ {
+ pm = pos->next_neighbour;
+ if (pos->timeout.abs_value_us <= now.abs_value_us)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# messages dropped (timeout before confirmation)",
+ 1,
+ GNUNET_NO);
+ client_send_response (pm,
+ GNUNET_NO,
+ 0);
+ continue;
+ }
+ earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
+ pos->timeout);
+ }
+ n->earliest_timeout = earliest_timeout;
+ if (NULL != n->pending_msg_head)
+ n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
+ &check_queue_timeouts,
+ n);
+}
+
+
+/**
* Client asked for transmission to a peer. Process the request.
*
* @param cls the client
@@ -1316,10 +1384,14 @@ handle_client_send (void *cls,
GNUNET_NO);
return;
}
- pm = GNUNET_new (struct PendingMessage);
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
pm->client = tc;
pm->target = target;
pm->bytes_msg = bytes_msg;
+ pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
+ memcpy (&pm[1],
+ &obm[1],
+ bytes_msg);
GNUNET_CONTAINER_MDLL_insert (neighbour,
target->pending_msg_head,
target->pending_msg_tail,
@@ -1328,10 +1400,16 @@ handle_client_send (void *cls,
tc->details.core.pending_msg_head,
tc->details.core.pending_msg_tail,
pm);
- // FIXME: do the work, final continuation with call to:
- client_send_response (pm,
- GNUNET_NO,
- 0);
+ if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
+ {
+ target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
+ if (NULL != target->timeout_task)
+ GNUNET_SCHEDULER_cancel (target->timeout_task);
+ target->timeout_task
+ = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
+ &check_queue_timeouts,
+ target);
+ }
}
@@ -1652,45 +1730,37 @@ tracker_update_in_cb (void *cls)
* @param cls the `struct GNUNET_ATS_Session` to process transmissions for
*/
static void
-transmit_on_queue (void *cls)
-{
- struct GNUNET_ATS_Session *queue = cls;
-
- queue->transmit_task = NULL;
- // FIXME: check if transmission is really ready
- // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.)
- // FIXME: re-schedule self
-}
+transmit_on_queue (void *cls);
/**
- * Bandwidth tracker informs us that the delay until we
- * can transmit again changed.
+ * Schedule next run of #transmit_on_queue(). Does NOTHING if
+ * we should run immediately or if the message queue is empty.
+ * Test for no task being added AND queue not being empty to
+ * transmit immediately afterwards! This function must only
+ * be called if the message queue is non-empty!
*
- * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
- */
+ * @param queue the queue to do scheduling for
+ */
static void
-tracker_update_out_cb (void *cls)
+schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
{
- struct GNUNET_ATS_Session *queue = cls;
struct Neighbour *n = queue->neighbour;
struct PendingMessage *pm = n->pending_msg_head;
struct GNUNET_TIME_Relative out_delay;
unsigned int wsize;
- if (NULL == pm)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Bandwidth allocation updated for empty transmission queue `%s'\n",
- queue->address);
- return; /* no message pending, nothing to do here! */
- }
+ GNUNET_assert (NULL != pm);
wsize = (0 == queue->mtu)
? pm->bytes_msg /* FIXME: add overheads? */
: queue->mtu;
out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
wsize);
- GNUNET_SCHEDULER_cancel (queue->transmit_task);
+ out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
+ out_delay);
+ if (0 == out_delay.rel_value_us)
+ return; /* we should run immediately! */
+ /* queue has changed since we were scheduled, reschedule again */
queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
&transmit_on_queue,
queue);
@@ -1710,6 +1780,69 @@ tracker_update_out_cb (void *cls)
/**
+ * We believe we are ready to transmit a message on a queue. Double-checks
+ * with the queue's "tracker_out" and then gives the message to the
+ * communicator for transmission (updating the tracker, and re-scheduling
+ * itself if applicable).
+ *
+ * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
+ */
+static void
+transmit_on_queue (void *cls)
+{
+ struct GNUNET_ATS_Session *queue = cls;
+ struct Neighbour *n = queue->neighbour;
+ struct PendingMessage *pm;
+
+ queue->transmit_task = NULL;
+ if (NULL == (pm = n->pending_msg_head))
+ {
+ /* no message pending, nothing to do here! */
+ return;
+ }
+ schedule_transmit_on_queue (queue);
+ if (NULL != queue->transmit_task)
+ return; /* do it later */
+
+ // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.)
+ // FIXME: upon success, do (not here in continuation!)
+ if (0)
+ {
+ client_send_response (pm,
+ GNUNET_YES,
+ 0);
+ }
+ /* finally, re-schedule self */
+ schedule_transmit_on_queue (queue);
+}
+
+
+/**
+ * Bandwidth tracker informs us that the delay until we
+ * can transmit again changed.
+ *
+ * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
+ */
+static void
+tracker_update_out_cb (void *cls)
+{
+ struct GNUNET_ATS_Session *queue = cls;
+ struct Neighbour *n = queue->neighbour;
+
+ if (NULL == n->pending_msg_head)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Bandwidth allocation updated for empty transmission queue `%s'\n",
+ queue->address);
+ return; /* no message pending, nothing to do here! */
+ }
+ GNUNET_SCHEDULER_cancel (queue->transmit_task);
+ queue->transmit_task = NULL;
+ schedule_transmit_on_queue (queue);
+}
+
+
+/**
* Bandwidth tracker informs us that excessive outbound bandwidth was
* allocated which is not being used.
*
@@ -1768,6 +1901,7 @@ handle_add_queue_message (void *cls,
if (NULL == neighbour)
{
neighbour = GNUNET_new (struct Neighbour);
+ neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
neighbour->pid = aqm->receiver;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_put (neighbours,