summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-05-11 22:13:47 +0200
committerChristian Grothoff <christian@grothoff.org>2019-05-11 22:13:47 +0200
commit84b3c87161116786074b16f54f2d22e526421db0 (patch)
treee29ac20eaf0b84c357a6ed99134b05f54804f187
parentdc4902a536daa8e46dbbd79a9fa98498b5e848b3 (diff)
clean up transmission logic to have queues 'pull' for pending messages while control traffic is 'pushed' into queues
-rw-r--r--src/transport/gnunet-service-tng.c702
1 files changed, 445 insertions, 257 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index f07e1c88d..56cf61c2b 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -24,15 +24,8 @@
*
* TODO:
* Implement next:
- * - realize "pull" based logic (#handle_client_send()) for
- * `struct PendingMessage` which waits for a queue on any
- * applicable route to be 'ready', in contrast
- * to the 'push' based routing we use for control messages.
- * Basically, when a queue goes idle, it should "search"
- * via its neighbour for either virtual links or DVH's that
- * have it as first hop and then find messages in those
- * virtual links!
- * - realize transport-to-transport flow control (needed in case
+ * - FIXME-NEXT: logic to decide which pm to pick for a given queue (sorting!)
+ * - FIXME-FC: realize transport-to-transport flow control (needed in case
* communicators do not offer flow control). Note that we may not
* want to simply delay the ACKs as that may cause unnecessary
* re-transmissions. => Introduce proper flow and congestion window(s)!
@@ -1375,7 +1368,7 @@ struct DistanceVector
* Do we have a confirmed working queue and are thus visible to
* CORE? If so, this is the virtual link, otherwise NULL.
*/
- struct VirtualLink *link;
+ struct VirtualLink *vl;
/**
* Signature affirming @e ephemeral_key of type
@@ -1565,6 +1558,12 @@ struct Queue
* Connection status for this queue.
*/
enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
+ /**
+ * Set to #GNUNET_YES if this queue is idle waiting for some
+ * virtual link to give it a pending message.
+ */
+ int idle;
};
@@ -1696,7 +1695,7 @@ struct Neighbour
* Do we have a confirmed working queue and are thus visible to
* CORE? If so, this is the virtual link, otherwise NULL.
*/
- struct VirtualLink *link;
+ struct VirtualLink *vl;
/**
* Latest DVLearn monotonic time seen from this peer. Initialized only
@@ -1766,17 +1765,7 @@ enum PendingMessageType
/**
* Reliability box.
*/
- PMT_RELIABILITY_BOX = 2,
-
- /**
- * Any type of acknowledgement.
- */
- PMT_ACKNOWLEDGEMENT = 3,
-
- /**
- * Control traffic generated by the TRANSPORT service itself.
- */
- PMT_CONTROL = 4
+ PMT_RELIABILITY_BOX = 2
};
@@ -2752,6 +2741,41 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
/**
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
+ *
+ * @param cls a `struct VirtualLink`
+ */
+static void
+check_link_down (void *cls);
+
+
+/**
+ * Send message to CORE clients that we lost a connection.
+ *
+ * @param pid peer the connection was for
+ */
+static void
+cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Informing CORE clients about disconnect from %s\n",
+ GNUNET_i2s (pid));
+ for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
+ {
+ struct GNUNET_MQ_Envelope *env;
+ struct DisconnectInfoMessage *dim;
+
+ if (CT_CORE != tc->type)
+ continue;
+ env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
+ dim->peer = *pid;
+ GNUNET_MQ_send (tc->mq, env);
+ }
+}
+
+
+/**
* Free entry in #dv_routes. First frees all hops to the target, and
* if there are no entries left, frees @a dv as well.
*
@@ -2766,11 +2790,33 @@ free_dv_route (struct DistanceVector *dv)
free_distance_vector_hop (dvh);
if (NULL == dv->dv_head)
{
+ struct VirtualLink *vl;
+
GNUNET_assert (
GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
+ if (NULL != (vl = dv->vl))
+ {
+ GNUNET_assert (dv == vl->dv);
+ vl->dv = NULL;
+ if (NULL == vl->n)
+ {
+ cores_send_disconnect_info (&dv->target);
+ free_virtual_link (vl);
+ }
+ else
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+ }
+ dv->vl = NULL;
+ }
+
if (NULL != dv->timeout_task)
+ {
GNUNET_SCHEDULER_cancel (dv->timeout_task);
+ dv->timeout_task = NULL;
+ }
GNUNET_free (dv);
}
}
@@ -2950,6 +2996,7 @@ static void
free_neighbour (struct Neighbour *neighbour)
{
struct DistanceVectorHop *dvh;
+ struct VirtualLink *vl;
GNUNET_assert (NULL == neighbour->queue_head);
GNUNET_assert (GNUNET_YES ==
@@ -2989,6 +3036,22 @@ free_neighbour (struct Neighbour *neighbour)
GNUNET_PEERSTORE_store_cancel (neighbour->sc);
neighbour->sc = NULL;
}
+ if (NULL != (vl = neighbour->vl))
+ {
+ GNUNET_assert (neighbour == vl->n);
+ vl->n = NULL;
+ if (NULL == vl->dv)
+ {
+ cores_send_disconnect_info (&vl->target);
+ free_virtual_link (vl);
+ }
+ else
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl);
+ }
+ neighbour->vl = NULL;
+ }
GNUNET_free (neighbour);
}
@@ -3034,31 +3097,6 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
/**
- * Send message to CORE clients that we lost a connection.
- *
- * @param pid peer the connection was for
- */
-static void
-cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Informing CORE clients about disconnect from %s\n",
- GNUNET_i2s (pid));
- for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
- {
- struct GNUNET_MQ_Envelope *env;
- struct DisconnectInfoMessage *dim;
-
- if (CT_CORE != tc->type)
- continue;
- env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
- dim->peer = *pid;
- GNUNET_MQ_send (tc->mq, env);
- }
-}
-
-
-/**
* We believe we are ready to transmit a message on a queue. Gives the
* message to the communicator for transmission (updating the tracker,
* and re-scheduling itself if applicable).
@@ -3070,19 +3108,15 @@ transmit_on_queue (void *cls);
/**
- * Schedule next run of #transmit_on_queue(). Does NOTHING if
- * we should run immediately or if the message queue is empty.
- * Test for no task being added AND queue not being empty to
- * transmit immediately afterwards! This function must only
- * be called if the message queue is non-empty!
+ * Called whenever something changed that might effect when we
+ * try to do the next transmission on @a queue using #transmit_on_queue().
*
* @param queue the queue to do scheduling for
- * @param inside_job set to #GNUNET_YES if called from
- * #transmit_on_queue() itself and NOT setting
- * the task means running immediately
+ * @param p task priority to use, if @a queue is scheduled
*/
static void
-schedule_transmit_on_queue (struct Queue *queue, int inside_job)
+schedule_transmit_on_queue (struct Queue *queue,
+ enum GNUNET_SCHEDULER_Priority p)
{
if (queue->tc->details.communicator.total_queue_length >=
COMMUNICATOR_TOTAL_QUEUE_LIMIT)
@@ -3092,6 +3126,7 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
"# Transmission throttled due to communicator queue limit",
1,
GNUNET_NO);
+ queue->idle = GNUNET_NO;
return;
}
if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
@@ -3100,38 +3135,18 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
"# Transmission throttled due to queue queue limit",
1,
GNUNET_NO);
+ queue->idle = GNUNET_NO;
return;
}
-#if FIXME - NEXT
- struct Neighbour *n = queue->neighbour;
- struct GNUNET_TIME_Relative out_delay;
-
- if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
- {
- GNUNET_log (
- GNUNET_ERROR_TYPE_DEBUG,
- "Schedule transmission <%llu> on queue %llu of %s decides to run immediately\n",
- pm->logging_uuid,
- (unsigned long long) queue->qid,
- GNUNET_i2s (&n->pid));
- return; /* we should run immediately! */
- }
- /* queue has changed since we were scheduled, reschedule again */
+ /* queue might indeed be ready, schedule it */
+ if (NULL != queue->transmit_task)
+ GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task =
- GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue);
- if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Next transmission <%llu> on queue `%s' in %s (high delay)\n",
- pm->logging_uuid,
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Next transmission <%llu> on queue `%s' in %s\n",
- pm->logging_uuid,
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES));
-#endif
+ GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Considering transmission on queue `%s' to %s\n",
+ queue->address,
+ GNUNET_i2s (&queue->neighbour->pid));
}
@@ -3156,15 +3171,21 @@ check_link_down (void *cls)
pos = pos->next_dv)
dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ {
+ vl->dv->vl = NULL;
vl->dv = NULL;
+ }
q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
- if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
+ {
+ vl->n->vl = NULL;
vl->n = NULL;
+ }
if ((NULL == vl->n) && (NULL == vl->dv))
{
- cores_send_disconnect_info (&dv->target);
+ cores_send_disconnect_info (&vl->target);
free_virtual_link (vl);
return;
}
@@ -3229,7 +3250,7 @@ free_queue (struct Queue *queue)
if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
tc->details.communicator.total_queue_length))
{
- /* Communicator dropped below threshold, resume all queues */
+ /* Communicator dropped below threshold, resume all _other_ queues */
GNUNET_STATISTICS_update (
GST_stats,
"# Transmission throttled due to communicator queue limit",
@@ -3237,7 +3258,7 @@ free_queue (struct Queue *queue)
GNUNET_NO);
for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
s = s->next_client)
- schedule_transmit_on_queue (s, GNUNET_NO);
+ schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
GNUNET_free (queue);
@@ -3580,6 +3601,79 @@ pick_random_dv_hops (const struct DistanceVector *dv,
/**
+ * There is a message at the head of the pending messages for @a vl
+ * which may be ready for transmission. Check if a queue is ready to
+ * take it.
+ *
+ * This function must (1) check for flow control to ensure that we can
+ * right now send to @a vl, (2) check that the pending message in the
+ * queue is actually eligible, (3) determine if any applicable queue
+ * (direct neighbour or DVH path) is ready to accept messages, and
+ * (4) prioritize based on the preferences associated with the
+ * pending message.
+ *
+ * So yeah, easy.
+ *
+ * @param vl virtual link where we should check for transmission
+ */
+static void
+check_vl_transmission (struct VirtualLink *vl)
+{
+ struct Neighbour *n = vl->n;
+ struct DistanceVector *dv = vl->dv;
+ struct GNUNET_TIME_Absolute now;
+ int elig;
+
+ /* FIXME-FC: need to implement virtual link flow control! */
+
+ /* Check that we have an eligible pending message!
+ (cheaper than having #transmit_on_queue() find out!) */
+ elig = GNUNET_NO;
+ for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
+ pm = pm->next_vl)
+ {
+ if (NULL != pm->qe)
+ continue; /* not eligible, is in a queue! */
+ elig = GNUNET_YES;
+ break;
+ }
+ if (GNUNET_NO == elig)
+ return;
+
+ /* Notify queues at direct neighbours that we are interested */
+ now = GNUNET_TIME_absolute_get ();
+ if (NULL != n)
+ {
+ for (struct Queue *queue = n->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ if ((GNUNET_YES == queue->idle) &&
+ (queue->validated_until.abs_value_us > now.abs_value_us))
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ }
+ /* Notify queues via DV that we are interested */
+ if (NULL != dv)
+ {
+ /* Do DV with lower scheduler priority, which effectively means that
+ IF a neighbour exists and is available, we prefer it. */
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ struct Neighbour *nh = pos->next_hop;
+
+ if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+ continue; /* skip this one: path not validated */
+ for (struct Queue *queue = nh->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ if ((GNUNET_YES == queue->idle) &&
+ (queue->validated_until.abs_value_us > now.abs_value_us))
+ schedule_transmit_on_queue (queue,
+ GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+ }
+ }
+}
+
+
+/**
* Client asked for transmission to a peer. Process the request.
*
* @param cls the client
@@ -3594,7 +3688,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
uint32_t bytes_msg;
struct VirtualLink *vl;
enum GNUNET_MQ_PriorityPreferences pp;
- int was_empty;
GNUNET_assert (CT_CORE == tc->type);
obmm = (const struct GNUNET_MessageHeader *) &obm[1];
@@ -3631,32 +3724,11 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
tc->details.core.pending_msg_head,
tc->details.core.pending_msg_tail,
pm);
- was_empty = (NULL == vl->pending_msg_head);
GNUNET_CONTAINER_MDLL_insert (vl,
vl->pending_msg_head,
vl->pending_msg_tail,
pm);
- if (! was_empty)
- return; /* all queues must already be busy */
-#if 0
- // FIXME: check if any DVH or neighbour queue of 'vl'
- // is ready for transmission now. If so, encapsulate
- // 'pm' accordingly and send!
- for (struct Queue *queue = target->queue_head; NULL != queue;
- queue = queue->next_neighbour)
- {
- /* try transmission on any queue that is idle */
- if (NULL == queue->transmit_task)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queue %llu to %s is idle, triggering transmission\n",
- (unsigned long long) queue->qid,
- GNUNET_i2s (&queue->neighbour->pid));
- queue->transmit_task =
- GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue);
- }
- }
-#endif
+ check_vl_transmission (vl);
}
@@ -3861,7 +3933,7 @@ update_ephemeral (struct DistanceVector *dv)
/**
- * Send the control message @a payload on @a queue.
+ * Send the message @a payload on @a queue.
*
* @param queue the queue to use for transmission
* @param pm pending message to update once transmission is done, may be NULL!
@@ -3879,6 +3951,7 @@ queue_send_msg (struct Queue *queue,
struct GNUNET_TRANSPORT_SendMessageTo *smt;
struct GNUNET_MQ_Envelope *env;
+ queue->idle = GNUNET_NO;
GNUNET_log (
GNUNET_ERROR_TYPE_DEBUG,
"Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
@@ -3910,6 +3983,11 @@ queue_send_msg (struct Queue *queue,
GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
queue->queue_length++;
queue->tc->details.communicator.total_queue_length++;
+ if (COMMUNICATOR_TOTAL_QUEUE_LIMIT ==
+ queue->tc->details.communicator.total_queue_length)
+ queue->idle = GNUNET_NO;
+ if (QUEUE_LENGTH_LIMIT == queue->queue_length)
+ queue->idle = GNUNET_NO;
GNUNET_MQ_send (queue->tc->mq, env);
}
}
@@ -5210,6 +5288,50 @@ update_dvh_performance (struct DistanceVectorHop *dvh,
/**
+ * We have completed transmission of @a pm, remove it from
+ * the transmission queues (and if it is a fragment, continue
+ * up the tree as necessary).
+ *
+ * @param pm pending message that was transmitted
+ */
+static void
+completed_pending_message (struct PendingMessage *pm)
+{
+ struct PendingMessage *pos;
+
+ switch (pm->pmt)
+ {
+ case PMT_CORE:
+ case PMT_RELIABILITY_BOX:
+ /* Full message sent, we are done */
+ client_send_response (pm);
+ return;
+ case PMT_FRAGMENT_BOX:
+ /* Fragment sent over reliabile channel */
+ free_fragment_tree (pm);
+ pos = pm->frag_parent;
+ GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+ GNUNET_free (pm);
+ /* check if subtree is done */
+ while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
+ (pos != pm))
+ {
+ pm = pos;
+ pos = pm->frag_parent;
+ GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
+ GNUNET_free (pm);
+ }
+
+ /* Was this the last applicable fragmment? */
+ if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
+ (pos->frag_off == pos->bytes_msg))
+ client_send_response (pos);
+ return;
+ }
+}
+
+
+/**
* The @a pa was acknowledged, process the acknowledgement.
*
* @param pa the pending acknowledgement that was satisfied
@@ -5220,7 +5342,6 @@ static void
handle_acknowledged (struct PendingAcknowledgement *pa,
struct GNUNET_TIME_Relative ack_delay)
{
- struct PendingMessage *pm = pa->pm;
struct GNUNET_TIME_Relative delay;
delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
@@ -5232,25 +5353,8 @@ handle_acknowledged (struct PendingAcknowledgement *pa,
update_queue_performance (pa->queue, delay, pa->message_size);
if (NULL != pa->dvh)
update_dvh_performance (pa->dvh, delay, pa->message_size);
- if (NULL != pm)
- {
- if (NULL != pm->frag_parent)
- {
- pm = pm->frag_parent;
- free_fragment_tree (pa->pm);
- }
- while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
- {
- struct PendingMessage *parent = pm->frag_parent;
-
- free_fragment_tree (pm);
- pm = parent;
- }
- if (NULL != pm->head_frag)
- pm = NULL; /* we are done, otherwise free 'pm' below */
- }
- if (NULL != pm)
- free_pending_message (pm);
+ if (NULL != pa->pm)
+ completed_pending_message (pa->pm);
free_pending_acknowledgement (pa);
}
@@ -5494,6 +5598,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
vl->target = dv->target;
vl->dv = dv;
+ dv->vl = vl;
vl->core_recv_window = RECV_WINDOW_SIZE;
vl->visibility_task =
GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
@@ -7185,6 +7290,7 @@ handle_validation_response (
vl = GNUNET_new (struct VirtualLink);
vl->target = n->pid;
vl->n = n;
+ n->vl = vl;
vl->core_recv_window = RECV_WINDOW_SIZE;
vl->visibility_task =
GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
@@ -7604,10 +7710,134 @@ update_pm_next_attempt (struct PendingMessage *pm,
/**
- * We believe we are ready to transmit a message on a queue.
- * Gives the message to the
- * communicator for transmission (updating the tracker, and re-scheduling
- * itself if applicable).
+ * Context for #select_best_pending_from_link().
+ */
+struct PendingMessageScoreContext
+{
+ /**
+ * Set to the best message that was found, NULL for none.
+ */
+ struct PendingMessage *best;
+
+ /**
+ * DVH that @e best should take, or NULL for direct transmission.
+ */
+ struct DistanceVectorHop *dvh;
+
+ /**
+ * What is the estimated total overhead for this message?
+ */
+ size_t real_overhead;
+
+ /**
+ * Number of pending messages we seriously considered this time.
+ */
+ unsigned int consideration_counter;
+
+ /**
+ * Did we have to fragment?
+ */
+ int frag;
+
+ /**
+ * Did we have to reliability box?
+ */
+ int relb;
+};
+
+
+/**
+ * Select the best pending message from @a vl for transmission
+ * via @a queue.
+ *
+ * @param sc[in,out] best message so far (NULL for none), plus scoring data
+ * @param queue the queue that will be used for transmission
+ * @param vl the virtual link providing the messages
+ * @param dvh path we are currently considering, or NULL for none
+ * @param overhead number of bytes of overhead to be expected
+ * from DV encapsulation (0 for without DV)
+ */
+static void
+select_best_pending_from_link (struct PendingMessageScoreContext *sc,
+ struct Queue *queue,
+ struct VirtualLink *vl,
+ struct DistanceVectorHop *dvh,
+ size_t overhead)
+{
+ /* FIXME-NEXT: right now we ignore all the 'fancy' sorting
+ we do on the pending message list, resulting in a
+ linear time algorithm (PLUS linear time list management).
+ So we should probably either avoid keeping a sorted list,
+ or find a way to make the sorting useful here! */
+ for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos;
+ pos = pos->next_vl)
+ {
+ size_t real_overhead = overhead;
+ int frag;
+ int relb;
+
+ if (NULL != pos->qe)
+ continue; /* not eligible */
+ sc->consideration_counter++;
+ /* determine if we have to reliability-box, if so add reliability box
+ overhead */
+ relb = GNUNET_NO;
+ if ((GNUNET_NO == frag) &&
+ (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) &&
+ (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc))
+ {
+ relb = GNUNET_YES;
+ real_overhead += sizeof (struct TransportReliabilityBoxMessage);
+ }
+ /* determine if we have to fragment, if so add fragmentation
+ overhead! */
+ frag = GNUNET_NO;
+ if ( ( (0 != queue->mtu) &&
+ (pos->bytes_msg + real_overhead > queue->mtu) ) ||
+ (pos->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
+ (NULL != pos->head_frag /* fragments already exist, should
+ respect that even if MTU is 0 for
+ this queue */) )
+ {
+ frag = GNUNET_YES;
+ relb = GNUNET_NO; /* if we fragment, we never also reliability box */
+ if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
+ {
+ /* FIXME-OPTIMIZE: we could use an optimized, shorter fragmentation
+ header without the ACK UUID when using a *reliable* channel! */
+ }
+ real_overhead = overhead + sizeof (struct TransportFragmentBoxMessage);
+ }
+
+ /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
+ message would beat it! */
+ if (NULL != sc->best)
+ {
+ /* FIXME-NEXT: CHECK if pos fits queue BETTER than pm, if not:
+ continue; */
+ /* NOTE: use 'overhead' to estimate need for fragmentation,
+ prefer it if MTU is sufficient and close! */
+ }
+ sc->best = pos;
+ sc->dvh = dvh;
+ sc->frag = frag;
+ sc->relb = relb;
+ }
+}
+
+
+/**
+ * We believe we are ready to transmit a `struct PendingMessage` on a
+ * queue, the big question is which one! We need to see if there is
+ * one pending that is allowed by flow control and congestion control
+ * and (ideally) matches our queue's performance profile.
+ *
+ * If such a message is found, we give the message to the communicator
+ * for transmission (updating the tracker, and re-scheduling ourselves
+ * if applicable).
+ *
+ * If no such message is found, the queue's `idle` field must be set
+ * to #GNUNET_YES.
*
* @param cls the `struct Queue` to process transmissions for
*/
@@ -7615,128 +7845,99 @@ static void
transmit_on_queue (void *cls)
{
struct Queue *queue = cls;
-
- queue->transmit_task = NULL;
-#if FIXME - NEXT
struct Neighbour *n = queue->neighbour;
+ struct PendingMessageScoreContext sc;
struct PendingMessage *pm;
- struct PendingMessage *s;
- uint32_t overhead;
- if (NULL == (pm = n->pending_msg_head))
+ queue->transmit_task = NULL;
+ if (NULL == n->vl)
{
- /* no message pending, nothing to do here! */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No messages waiting on queue %s to %s, going to sleep\n",
- queue->address,
- GNUNET_i2s (&n->pid));
+ "Virtual link `%s' is down, cannot have PM for queue `%s'\n",
+ GNUNET_i2s (&n->pid),
+ queue->address);
+ queue->idle = GNUNET_YES;
return;
}
- if (NULL != pm->qe)
+ memset (&sc, 0, sizeof (sc));
+ select_best_pending_from_link (&sc, queue, n->vl, NULL, 0);
+ if (NULL == sc.best)
+ {
+ /* Also look at DVH that have the n as first hop! */
+ for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh;
+ dvh = dvh->next_neighbour)
+ {
+ select_best_pending_from_link (&sc,
+ queue,
+ dvh->dv->vl,
+ dvh,
+ sizeof (struct GNUNET_PeerIdentity) *
+ (1 + dvh->distance) +
+ sizeof (struct TransportDVBoxMessage) +
+ sizeof (struct TransportDVBoxPayloadP));
+ }
+ }
+ if (NULL == sc.best)
{
- /* message still pending with communciator!
- LOGGING-FIXME: Use stats? Should this not be rare? */
+ /* no message pending, nothing to do here! */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Waiting on communicator for queue %s to %s, going to sleep\n",
+ "No pending messages, queue `%s' to %s now idle\n",
queue->address,
GNUNET_i2s (&n->pid));
+ queue->idle = GNUNET_YES;
return;
}
- schedule_transmit_on_queue (queue, GNUNET_YES);
- if (NULL != queue->transmit_task)
+
+ /* Given selection in `sc`, do transmission */
+ pm = sc.best;
+ if (GNUNET_YES == sc.frag)
{
- GNUNET_log (
- GNUNET_ERROR_TYPE_DEBUG,
- "Scheduled transmission on queue %s to %s for later, going to sleep\n",
- queue->address,
- GNUNET_i2s (&n->pid));
- return; /* do it later */
- }
- overhead = 0;
- if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- overhead += sizeof (struct TransportReliabilityBoxMessage);
- s = pm;
- if ( ( (0 != queue->mtu) &&
- (pm->bytes_msg + overhead > queue->mtu) ) ||
- (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
- (NULL != pm->head_frag /* fragments already exist, should
- respect that even if MTU is 0 for
- this queue */) )
- s = fragment_message (queue, pm->dvh, s);
- if (NULL == s)
- {
- /* Fragmentation failed, try next message... */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
- queue->address,
- GNUNET_i2s (&n->pid),
- pm->logging_uuid);
- schedule_transmit_on_queue (queue, GNUNET_NO);
- return;
+ pm = fragment_message (queue, sc.dvh, sc.best);
+ if (NULL == pm)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragmentation failed queue %s to %s for <%llu>, trying again\n",
+ queue->address,
+ GNUNET_i2s (&n->pid),
+ pm->logging_uuid);
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ }
}
- if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- // FIXME-OPTIMIZE: and if reliability was requested for 's' by core!
- s = reliability_box_message (queue, pm->dvh, s);
- if (NULL == s)
+ else if (GNUNET_YES == sc.relb)
{
- /* Reliability boxing failed, try next message... */
- GNUNET_log (
- GNUNET_ERROR_TYPE_DEBUG,
- "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
- queue->address,
- GNUNET_i2s (&n->pid),
- pm->logging_uuid);
- schedule_transmit_on_queue (queue, GNUNET_NO);
- return;
+ pm = reliability_box_message (queue, sc.dvh, sc.best);
+ if (NULL == pm)
+ {
+ /* Reliability boxing failed, try next message... */
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Reliability boxing failed queue %s to %s for <%llu>, trying again\n",
+ queue->address,
+ GNUNET_i2s (&n->pid),
+ pm->logging_uuid);
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ return;
+ }
}
+ else
+ pm = sc.best; /* no boxing required */
- /* Pass 's' for transission to the communicator */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Passing message <%llu> to queue %s for peer %s\n",
- s->logging_uuid,
- queue->address,
- GNUNET_i2s (&n->pid));
- queue_send_msg (queue, s, &s[1], s->bytes_msg);
- // FIXME: do something similar to the logic below
- // in defragmentation / reliability ACK handling!
+ /* Pass 'pm' for transission to the communicator */
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_DEBUG,
+ "Passing message <%llu> to queue %s for peer %s (considered %u others)\n",
+ pm->logging_uuid,
+ queue->address,
+ GNUNET_i2s (&n->pid),
+ sc.consideration_counter);
+ queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
/* Check if this transmission somehow conclusively finished handing 'pm'
even without any explicit ACKs */
- if ((PMT_CORE == s->pmt) &&
+ if ((PMT_CORE == pm->pmt) ||
(GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
{
- /* Full message sent, and over reliabile channel */
- client_send_response (pm);
- }
- else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
- queue->tc->details.communicator.cc) &&
- (PMT_FRAGMENT_BOX == s->pmt))
- {
- struct PendingMessage *pos;
-
- /* Fragment sent over reliabile channel */
- free_fragment_tree (s);
- pos = s->frag_parent;
- GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
- GNUNET_free (s);
- /* check if subtree is done */
- while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
- (pos != pm))
- {
- s = pos;
- pos = s->frag_parent;
- GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s);
- GNUNET_free (s);
- }
-
- /* Was this the last applicable fragmment? */
- if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
- client_send_response (pm);
- }
- else if (PMT_CORE != pm->pmt)
- {
- /* This was an acknowledgement of some type, always free */
- free_pending_message (pm);
+ completed_pending_message (pm);
}
else
{
@@ -7748,15 +7949,13 @@ transmit_on_queue (void *cls)
retransmitting. Note that in the future this heuristic should
likely be improved further (measure RTT stability, consider
message urgency and size when delaying ACKs, etc.) */
- update_pm_next_attempt (s,
+ update_pm_next_attempt (pm,
GNUNET_TIME_relative_to_absolute (
GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
4)));
}
-
/* finally, re-schedule queue transmission task itself */
- schedule_transmit_on_queue (queue, GNUNET_NO);
-#endif
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
@@ -7871,7 +8070,7 @@ handle_send_message_ack (void *cls,
for (struct Queue *queue = tc->details.communicator.queue_head;
NULL != queue;
queue = queue->next_client)
- schedule_transmit_on_queue (queue, GNUNET_NO);
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
{
@@ -7880,7 +8079,7 @@ handle_send_message_ack (void *cls,
"# Transmission throttled due to queue queue limit",
-1,
GNUNET_NO);
- schedule_transmit_on_queue (qe->queue, GNUNET_NO);
+ schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
if (NULL != (pm = qe->pm))
@@ -7894,21 +8093,7 @@ handle_send_message_ack (void *cls,
transmit on queue for queues of the neighbour */
vl = pm->vl;
if (vl->pending_msg_head == pm)
- {
-#if FIXME - NEXT
- for (struct Queue *queue = n->queue_head; NULL != queue;
- queue = queue->next_neighbour)
- schedule_transmit_on_queue (queue, GNUNET_NO);
-#endif
- }
- if (GNUNET_OK != ntohl (sma->status))
- {
- GNUNET_log (
- GNUNET_ERROR_TYPE_INFO,
- "Queue failed in transmission <%llu>, will try retransmission immediately\n",
- pm->logging_uuid);
- update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
- }
+ check_vl_transmission (vl);
}
GNUNET_free (qe);
}
@@ -8431,6 +8616,7 @@ handle_add_queue_message (void *cls,
queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
queue->neighbour = neighbour;
+ queue->idle = GNUNET_YES;
memcpy (&queue[1], addr, addr_len);
/* notify monitors about new queue */
{
@@ -8452,6 +8638,8 @@ handle_add_queue_message (void *cls,
&aqm->receiver,
&check_validation_request_pending,
queue);
+ /* look for traffic for this queue */
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
/* might be our first queue, try launching DV learning */
if (NULL == dvlearn_task)
dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);