From 84b3c87161116786074b16f54f2d22e526421db0 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 11 May 2019 22:13:47 +0200 Subject: clean up transmission logic to have queues 'pull' for pending messages while control traffic is 'pushed' into queues --- src/transport/gnunet-service-tng.c | 702 +++++++++++++++++++++++-------------- 1 file changed, 445 insertions(+), 257 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index f07e1c88d..56cf61c2b 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -24,15 +24,8 @@ * * TODO: * Implement next: - * - realize "pull" based logic (#handle_client_send()) for - * `struct PendingMessage` which waits for a queue on any - * applicable route to be 'ready', in contrast - * to the 'push' based routing we use for control messages. - * Basically, when a queue goes idle, it should "search" - * via its neighbour for either virtual links or DVH's that - * have it as first hop and then find messages in those - * virtual links! - * - realize transport-to-transport flow control (needed in case + * - FIXME-NEXT: logic to decide which pm to pick for a given queue (sorting!) + * - FIXME-FC: realize transport-to-transport flow control (needed in case * communicators do not offer flow control). Note that we may not * want to simply delay the ACKs as that may cause unnecessary * re-transmissions. => Introduce proper flow and congestion window(s)! @@ -1375,7 +1368,7 @@ struct DistanceVector * Do we have a confirmed working queue and are thus visible to * CORE? If so, this is the virtual link, otherwise NULL. */ - struct VirtualLink *link; + struct VirtualLink *vl; /** * Signature affirming @e ephemeral_key of type @@ -1565,6 +1558,12 @@ struct Queue * Connection status for this queue. */ enum GNUNET_TRANSPORT_ConnectionStatus cs; + + /** + * Set to #GNUNET_YES if this queue is idle waiting for some + * virtual link to give it a pending message. + */ + int idle; }; @@ -1696,7 +1695,7 @@ struct Neighbour * Do we have a confirmed working queue and are thus visible to * CORE? If so, this is the virtual link, otherwise NULL. */ - struct VirtualLink *link; + struct VirtualLink *vl; /** * Latest DVLearn monotonic time seen from this peer. Initialized only @@ -1766,17 +1765,7 @@ enum PendingMessageType /** * Reliability box. */ - PMT_RELIABILITY_BOX = 2, - - /** - * Any type of acknowledgement. - */ - PMT_ACKNOWLEDGEMENT = 3, - - /** - * Control traffic generated by the TRANSPORT service itself. - */ - PMT_CONTROL = 4 + PMT_RELIABILITY_BOX = 2 }; @@ -2751,6 +2740,41 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh) } +/** + * Task run to check whether the hops of the @a cls still + * are validated, or if we need to core about disconnection. + * + * @param cls a `struct VirtualLink` + */ +static void +check_link_down (void *cls); + + +/** + * Send message to CORE clients that we lost a connection. + * + * @param pid peer the connection was for + */ +static void +cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Informing CORE clients about disconnect from %s\n", + GNUNET_i2s (pid)); + for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) + { + struct GNUNET_MQ_Envelope *env; + struct DisconnectInfoMessage *dim; + + if (CT_CORE != tc->type) + continue; + env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); + dim->peer = *pid; + GNUNET_MQ_send (tc->mq, env); + } +} + + /** * Free entry in #dv_routes. First frees all hops to the target, and * if there are no entries left, frees @a dv as well. @@ -2766,11 +2790,33 @@ free_dv_route (struct DistanceVector *dv) free_distance_vector_hop (dvh); if (NULL == dv->dv_head) { + struct VirtualLink *vl; + GNUNET_assert ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); + if (NULL != (vl = dv->vl)) + { + GNUNET_assert (dv == vl->dv); + vl->dv = NULL; + if (NULL == vl->n) + { + cores_send_disconnect_info (&dv->target); + free_virtual_link (vl); + } + else + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl); + } + dv->vl = NULL; + } + if (NULL != dv->timeout_task) + { GNUNET_SCHEDULER_cancel (dv->timeout_task); + dv->timeout_task = NULL; + } GNUNET_free (dv); } } @@ -2950,6 +2996,7 @@ static void free_neighbour (struct Neighbour *neighbour) { struct DistanceVectorHop *dvh; + struct VirtualLink *vl; GNUNET_assert (NULL == neighbour->queue_head); GNUNET_assert (GNUNET_YES == @@ -2989,6 +3036,22 @@ free_neighbour (struct Neighbour *neighbour) GNUNET_PEERSTORE_store_cancel (neighbour->sc); neighbour->sc = NULL; } + if (NULL != (vl = neighbour->vl)) + { + GNUNET_assert (neighbour == vl->n); + vl->n = NULL; + if (NULL == vl->dv) + { + cores_send_disconnect_info (&vl->target); + free_virtual_link (vl); + } + else + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + vl->visibility_task = GNUNET_SCHEDULER_add_now (&check_link_down, vl); + } + neighbour->vl = NULL; + } GNUNET_free (neighbour); } @@ -3033,31 +3096,6 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid) } -/** - * Send message to CORE clients that we lost a connection. - * - * @param pid peer the connection was for - */ -static void -cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Informing CORE clients about disconnect from %s\n", - GNUNET_i2s (pid)); - for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) - { - struct GNUNET_MQ_Envelope *env; - struct DisconnectInfoMessage *dim; - - if (CT_CORE != tc->type) - continue; - env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); - dim->peer = *pid; - GNUNET_MQ_send (tc->mq, env); - } -} - - /** * We believe we are ready to transmit a message on a queue. Gives the * message to the communicator for transmission (updating the tracker, @@ -3070,19 +3108,15 @@ transmit_on_queue (void *cls); /** - * Schedule next run of #transmit_on_queue(). Does NOTHING if - * we should run immediately or if the message queue is empty. - * Test for no task being added AND queue not being empty to - * transmit immediately afterwards! This function must only - * be called if the message queue is non-empty! + * Called whenever something changed that might effect when we + * try to do the next transmission on @a queue using #transmit_on_queue(). * * @param queue the queue to do scheduling for - * @param inside_job set to #GNUNET_YES if called from - * #transmit_on_queue() itself and NOT setting - * the task means running immediately + * @param p task priority to use, if @a queue is scheduled */ static void -schedule_transmit_on_queue (struct Queue *queue, int inside_job) +schedule_transmit_on_queue (struct Queue *queue, + enum GNUNET_SCHEDULER_Priority p) { if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT) @@ -3092,6 +3126,7 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) "# Transmission throttled due to communicator queue limit", 1, GNUNET_NO); + queue->idle = GNUNET_NO; return; } if (queue->queue_length >= QUEUE_LENGTH_LIMIT) @@ -3100,38 +3135,18 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) "# Transmission throttled due to queue queue limit", 1, GNUNET_NO); + queue->idle = GNUNET_NO; return; } -#if FIXME - NEXT - struct Neighbour *n = queue->neighbour; - struct GNUNET_TIME_Relative out_delay; - - if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us)) - { - GNUNET_log ( - GNUNET_ERROR_TYPE_DEBUG, - "Schedule transmission <%llu> on queue %llu of %s decides to run immediately\n", - pm->logging_uuid, - (unsigned long long) queue->qid, - GNUNET_i2s (&n->pid)); - return; /* we should run immediately! */ - } - /* queue has changed since we were scheduled, reschedule again */ + /* queue might indeed be ready, schedule it */ + if (NULL != queue->transmit_task) + GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = - GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); - if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Next transmission <%llu> on queue `%s' in %s (high delay)\n", - pm->logging_uuid, - queue->address, - GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES)); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Next transmission <%llu> on queue `%s' in %s\n", - pm->logging_uuid, - queue->address, - GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES)); -#endif + GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Considering transmission on queue `%s' to %s\n", + queue->address, + GNUNET_i2s (&queue->neighbour->pid)); } @@ -3156,15 +3171,21 @@ check_link_down (void *cls) pos = pos->next_dv) dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + { + vl->dv->vl = NULL; vl->dv = NULL; + } q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); - if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us) + { + vl->n->vl = NULL; vl->n = NULL; + } if ((NULL == vl->n) && (NULL == vl->dv)) { - cores_send_disconnect_info (&dv->target); + cores_send_disconnect_info (&vl->target); free_virtual_link (vl); return; } @@ -3229,7 +3250,7 @@ free_queue (struct Queue *queue) if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length)) { - /* Communicator dropped below threshold, resume all queues */ + /* Communicator dropped below threshold, resume all _other_ queues */ GNUNET_STATISTICS_update ( GST_stats, "# Transmission throttled due to communicator queue limit", @@ -3237,7 +3258,7 @@ free_queue (struct Queue *queue) GNUNET_NO); for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; s = s->next_client) - schedule_transmit_on_queue (s, GNUNET_NO); + schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT); } notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); GNUNET_free (queue); @@ -3579,6 +3600,79 @@ pick_random_dv_hops (const struct DistanceVector *dv, } +/** + * There is a message at the head of the pending messages for @a vl + * which may be ready for transmission. Check if a queue is ready to + * take it. + * + * This function must (1) check for flow control to ensure that we can + * right now send to @a vl, (2) check that the pending message in the + * queue is actually eligible, (3) determine if any applicable queue + * (direct neighbour or DVH path) is ready to accept messages, and + * (4) prioritize based on the preferences associated with the + * pending message. + * + * So yeah, easy. + * + * @param vl virtual link where we should check for transmission + */ +static void +check_vl_transmission (struct VirtualLink *vl) +{ + struct Neighbour *n = vl->n; + struct DistanceVector *dv = vl->dv; + struct GNUNET_TIME_Absolute now; + int elig; + + /* FIXME-FC: need to implement virtual link flow control! */ + + /* Check that we have an eligible pending message! + (cheaper than having #transmit_on_queue() find out!) */ + elig = GNUNET_NO; + for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; + pm = pm->next_vl) + { + if (NULL != pm->qe) + continue; /* not eligible, is in a queue! */ + elig = GNUNET_YES; + break; + } + if (GNUNET_NO == elig) + return; + + /* Notify queues at direct neighbours that we are interested */ + now = GNUNET_TIME_absolute_get (); + if (NULL != n) + { + for (struct Queue *queue = n->queue_head; NULL != queue; + queue = queue->next_neighbour) + if ((GNUNET_YES == queue->idle) && + (queue->validated_until.abs_value_us > now.abs_value_us)) + schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + } + /* Notify queues via DV that we are interested */ + if (NULL != dv) + { + /* Do DV with lower scheduler priority, which effectively means that + IF a neighbour exists and is available, we prefer it. */ + for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; + pos = pos->next_dv) + { + struct Neighbour *nh = pos->next_hop; + + if (pos->path_valid_until.abs_value_us <= now.abs_value_us) + continue; /* skip this one: path not validated */ + for (struct Queue *queue = nh->queue_head; NULL != queue; + queue = queue->next_neighbour) + if ((GNUNET_YES == queue->idle) && + (queue->validated_until.abs_value_us > now.abs_value_us)) + schedule_transmit_on_queue (queue, + GNUNET_SCHEDULER_PRIORITY_BACKGROUND); + } + } +} + + /** * Client asked for transmission to a peer. Process the request. * @@ -3594,7 +3688,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) uint32_t bytes_msg; struct VirtualLink *vl; enum GNUNET_MQ_PriorityPreferences pp; - int was_empty; GNUNET_assert (CT_CORE == tc->type); obmm = (const struct GNUNET_MessageHeader *) &obm[1]; @@ -3631,32 +3724,11 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); - was_empty = (NULL == vl->pending_msg_head); GNUNET_CONTAINER_MDLL_insert (vl, vl->pending_msg_head, vl->pending_msg_tail, pm); - if (! was_empty) - return; /* all queues must already be busy */ -#if 0 - // FIXME: check if any DVH or neighbour queue of 'vl' - // is ready for transmission now. If so, encapsulate - // 'pm' accordingly and send! - for (struct Queue *queue = target->queue_head; NULL != queue; - queue = queue->next_neighbour) - { - /* try transmission on any queue that is idle */ - if (NULL == queue->transmit_task) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue %llu to %s is idle, triggering transmission\n", - (unsigned long long) queue->qid, - GNUNET_i2s (&queue->neighbour->pid)); - queue->transmit_task = - GNUNET_SCHEDULER_add_now (&transmit_on_queue, queue); - } - } -#endif + check_vl_transmission (vl); } @@ -3861,7 +3933,7 @@ update_ephemeral (struct DistanceVector *dv) /** - * Send the control message @a payload on @a queue. + * Send the message @a payload on @a queue. * * @param queue the queue to use for transmission * @param pm pending message to update once transmission is done, may be NULL! @@ -3879,6 +3951,7 @@ queue_send_msg (struct Queue *queue, struct GNUNET_TRANSPORT_SendMessageTo *smt; struct GNUNET_MQ_Envelope *env; + queue->idle = GNUNET_NO; GNUNET_log ( GNUNET_ERROR_TYPE_DEBUG, "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n", @@ -3910,6 +3983,11 @@ queue_send_msg (struct Queue *queue, GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); queue->queue_length++; queue->tc->details.communicator.total_queue_length++; + if (COMMUNICATOR_TOTAL_QUEUE_LIMIT == + queue->tc->details.communicator.total_queue_length) + queue->idle = GNUNET_NO; + if (QUEUE_LENGTH_LIMIT == queue->queue_length) + queue->idle = GNUNET_NO; GNUNET_MQ_send (queue->tc->mq, env); } } @@ -5209,6 +5287,50 @@ update_dvh_performance (struct DistanceVectorHop *dvh, } +/** + * We have completed transmission of @a pm, remove it from + * the transmission queues (and if it is a fragment, continue + * up the tree as necessary). + * + * @param pm pending message that was transmitted + */ +static void +completed_pending_message (struct PendingMessage *pm) +{ + struct PendingMessage *pos; + + switch (pm->pmt) + { + case PMT_CORE: + case PMT_RELIABILITY_BOX: + /* Full message sent, we are done */ + client_send_response (pm); + return; + case PMT_FRAGMENT_BOX: + /* Fragment sent over reliabile channel */ + free_fragment_tree (pm); + pos = pm->frag_parent; + GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); + GNUNET_free (pm); + /* check if subtree is done */ + while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) && + (pos != pm)) + { + pm = pos; + pos = pm->frag_parent; + GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); + GNUNET_free (pm); + } + + /* Was this the last applicable fragmment? */ + if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) && + (pos->frag_off == pos->bytes_msg)) + client_send_response (pos); + return; + } +} + + /** * The @a pa was acknowledged, process the acknowledgement. * @@ -5220,7 +5342,6 @@ static void handle_acknowledged (struct PendingAcknowledgement *pa, struct GNUNET_TIME_Relative ack_delay) { - struct PendingMessage *pm = pa->pm; struct GNUNET_TIME_Relative delay; delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time); @@ -5232,25 +5353,8 @@ handle_acknowledged (struct PendingAcknowledgement *pa, update_queue_performance (pa->queue, delay, pa->message_size); if (NULL != pa->dvh) update_dvh_performance (pa->dvh, delay, pa->message_size); - if (NULL != pm) - { - if (NULL != pm->frag_parent) - { - pm = pm->frag_parent; - free_fragment_tree (pa->pm); - } - while ((NULL != pm->frag_parent) && (NULL == pm->head_frag)) - { - struct PendingMessage *parent = pm->frag_parent; - - free_fragment_tree (pm); - pm = parent; - } - if (NULL != pm->head_frag) - pm = NULL; /* we are done, otherwise free 'pm' below */ - } - if (NULL != pm) - free_pending_message (pm); + if (NULL != pa->pm) + completed_pending_message (pa->pm); free_pending_acknowledgement (pa); } @@ -5494,6 +5598,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); vl->target = dv->target; vl->dv = dv; + dv->vl = vl; vl->core_recv_window = RECV_WINDOW_SIZE; vl->visibility_task = GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); @@ -7185,6 +7290,7 @@ handle_validation_response ( vl = GNUNET_new (struct VirtualLink); vl->target = n->pid; vl->n = n; + n->vl = vl; vl->core_recv_window = RECV_WINDOW_SIZE; vl->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); @@ -7604,10 +7710,134 @@ update_pm_next_attempt (struct PendingMessage *pm, /** - * We believe we are ready to transmit a message on a queue. - * Gives the message to the - * communicator for transmission (updating the tracker, and re-scheduling - * itself if applicable). + * Context for #select_best_pending_from_link(). + */ +struct PendingMessageScoreContext +{ + /** + * Set to the best message that was found, NULL for none. + */ + struct PendingMessage *best; + + /** + * DVH that @e best should take, or NULL for direct transmission. + */ + struct DistanceVectorHop *dvh; + + /** + * What is the estimated total overhead for this message? + */ + size_t real_overhead; + + /** + * Number of pending messages we seriously considered this time. + */ + unsigned int consideration_counter; + + /** + * Did we have to fragment? + */ + int frag; + + /** + * Did we have to reliability box? + */ + int relb; +}; + + +/** + * Select the best pending message from @a vl for transmission + * via @a queue. + * + * @param sc[in,out] best message so far (NULL for none), plus scoring data + * @param queue the queue that will be used for transmission + * @param vl the virtual link providing the messages + * @param dvh path we are currently considering, or NULL for none + * @param overhead number of bytes of overhead to be expected + * from DV encapsulation (0 for without DV) + */ +static void +select_best_pending_from_link (struct PendingMessageScoreContext *sc, + struct Queue *queue, + struct VirtualLink *vl, + struct DistanceVectorHop *dvh, + size_t overhead) +{ + /* FIXME-NEXT: right now we ignore all the 'fancy' sorting + we do on the pending message list, resulting in a + linear time algorithm (PLUS linear time list management). + So we should probably either avoid keeping a sorted list, + or find a way to make the sorting useful here! */ + for (struct PendingMessage *pos = vl->pending_msg_head; NULL != pos; + pos = pos->next_vl) + { + size_t real_overhead = overhead; + int frag; + int relb; + + if (NULL != pos->qe) + continue; /* not eligible */ + sc->consideration_counter++; + /* determine if we have to reliability-box, if so add reliability box + overhead */ + relb = GNUNET_NO; + if ((GNUNET_NO == frag) && + (0 == (pos->prefs & GNUNET_MQ_PREF_UNRELIABLE)) && + (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)) + { + relb = GNUNET_YES; + real_overhead += sizeof (struct TransportReliabilityBoxMessage); + } + /* determine if we have to fragment, if so add fragmentation + overhead! */ + frag = GNUNET_NO; + if ( ( (0 != queue->mtu) && + (pos->bytes_msg + real_overhead > queue->mtu) ) || + (pos->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) || + (NULL != pos->head_frag /* fragments already exist, should + respect that even if MTU is 0 for + this queue */) ) + { + frag = GNUNET_YES; + relb = GNUNET_NO; /* if we fragment, we never also reliability box */ + if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) + { + /* FIXME-OPTIMIZE: we could use an optimized, shorter fragmentation + header without the ACK UUID when using a *reliable* channel! */ + } + real_overhead = overhead + sizeof (struct TransportFragmentBoxMessage); + } + + /* Finally, compare to existing 'best' in sc to see if this 'pos' pending + message would beat it! */ + if (NULL != sc->best) + { + /* FIXME-NEXT: CHECK if pos fits queue BETTER than pm, if not: + continue; */ + /* NOTE: use 'overhead' to estimate need for fragmentation, + prefer it if MTU is sufficient and close! */ + } + sc->best = pos; + sc->dvh = dvh; + sc->frag = frag; + sc->relb = relb; + } +} + + +/** + * We believe we are ready to transmit a `struct PendingMessage` on a + * queue, the big question is which one! We need to see if there is + * one pending that is allowed by flow control and congestion control + * and (ideally) matches our queue's performance profile. + * + * If such a message is found, we give the message to the communicator + * for transmission (updating the tracker, and re-scheduling ourselves + * if applicable). + * + * If no such message is found, the queue's `idle` field must be set + * to #GNUNET_YES. * * @param cls the `struct Queue` to process transmissions for */ @@ -7615,128 +7845,99 @@ static void transmit_on_queue (void *cls) { struct Queue *queue = cls; - - queue->transmit_task = NULL; -#if FIXME - NEXT struct Neighbour *n = queue->neighbour; + struct PendingMessageScoreContext sc; struct PendingMessage *pm; - struct PendingMessage *s; - uint32_t overhead; - if (NULL == (pm = n->pending_msg_head)) + queue->transmit_task = NULL; + if (NULL == n->vl) { - /* no message pending, nothing to do here! */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages waiting on queue %s to %s, going to sleep\n", - queue->address, - GNUNET_i2s (&n->pid)); + "Virtual link `%s' is down, cannot have PM for queue `%s'\n", + GNUNET_i2s (&n->pid), + queue->address); + queue->idle = GNUNET_YES; return; } - if (NULL != pm->qe) + memset (&sc, 0, sizeof (sc)); + select_best_pending_from_link (&sc, queue, n->vl, NULL, 0); + if (NULL == sc.best) + { + /* Also look at DVH that have the n as first hop! */ + for (struct DistanceVectorHop *dvh = n->dv_head; NULL != dvh; + dvh = dvh->next_neighbour) + { + select_best_pending_from_link (&sc, + queue, + dvh->dv->vl, + dvh, + sizeof (struct GNUNET_PeerIdentity) * + (1 + dvh->distance) + + sizeof (struct TransportDVBoxMessage) + + sizeof (struct TransportDVBoxPayloadP)); + } + } + if (NULL == sc.best) { - /* message still pending with communciator! - LOGGING-FIXME: Use stats? Should this not be rare? */ + /* no message pending, nothing to do here! */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Waiting on communicator for queue %s to %s, going to sleep\n", + "No pending messages, queue `%s' to %s now idle\n", queue->address, GNUNET_i2s (&n->pid)); + queue->idle = GNUNET_YES; return; } - schedule_transmit_on_queue (queue, GNUNET_YES); - if (NULL != queue->transmit_task) + + /* Given selection in `sc`, do transmission */ + pm = sc.best; + if (GNUNET_YES == sc.frag) { - GNUNET_log ( - GNUNET_ERROR_TYPE_DEBUG, - "Scheduled transmission on queue %s to %s for later, going to sleep\n", - queue->address, - GNUNET_i2s (&n->pid)); - return; /* do it later */ - } - overhead = 0; - if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) - overhead += sizeof (struct TransportReliabilityBoxMessage); - s = pm; - if ( ( (0 != queue->mtu) && - (pm->bytes_msg + overhead > queue->mtu) ) || - (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) || - (NULL != pm->head_frag /* fragments already exist, should - respect that even if MTU is 0 for - this queue */) ) - s = fragment_message (queue, pm->dvh, s); - if (NULL == s) - { - /* Fragmentation failed, try next message... */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Fragmentation failed queue %s to %s for <%llu>, trying again\n", - queue->address, - GNUNET_i2s (&n->pid), - pm->logging_uuid); - schedule_transmit_on_queue (queue, GNUNET_NO); - return; + pm = fragment_message (queue, sc.dvh, sc.best); + if (NULL == pm) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Fragmentation failed queue %s to %s for <%llu>, trying again\n", + queue->address, + GNUNET_i2s (&n->pid), + pm->logging_uuid); + schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + } } - if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) - // FIXME-OPTIMIZE: and if reliability was requested for 's' by core! - s = reliability_box_message (queue, pm->dvh, s); - if (NULL == s) + else if (GNUNET_YES == sc.relb) { - /* Reliability boxing failed, try next message... */ - GNUNET_log ( - GNUNET_ERROR_TYPE_DEBUG, - "Reliability boxing failed queue %s to %s for <%llu>, trying again\n", - queue->address, - GNUNET_i2s (&n->pid), - pm->logging_uuid); - schedule_transmit_on_queue (queue, GNUNET_NO); - return; + pm = reliability_box_message (queue, sc.dvh, sc.best); + if (NULL == pm) + { + /* Reliability boxing failed, try next message... */ + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Reliability boxing failed queue %s to %s for <%llu>, trying again\n", + queue->address, + GNUNET_i2s (&n->pid), + pm->logging_uuid); + schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + return; + } } + else + pm = sc.best; /* no boxing required */ - /* Pass 's' for transission to the communicator */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Passing message <%llu> to queue %s for peer %s\n", - s->logging_uuid, - queue->address, - GNUNET_i2s (&n->pid)); - queue_send_msg (queue, s, &s[1], s->bytes_msg); - // FIXME: do something similar to the logic below - // in defragmentation / reliability ACK handling! + /* Pass 'pm' for transission to the communicator */ + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Passing message <%llu> to queue %s for peer %s (considered %u others)\n", + pm->logging_uuid, + queue->address, + GNUNET_i2s (&n->pid), + sc.consideration_counter); + queue_send_msg (queue, pm, &pm[1], pm->bytes_msg); /* Check if this transmission somehow conclusively finished handing 'pm' even without any explicit ACKs */ - if ((PMT_CORE == s->pmt) && + if ((PMT_CORE == pm->pmt) || (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) { - /* Full message sent, and over reliabile channel */ - client_send_response (pm); - } - else if ((GNUNET_TRANSPORT_CC_RELIABLE == - queue->tc->details.communicator.cc) && - (PMT_FRAGMENT_BOX == s->pmt)) - { - struct PendingMessage *pos; - - /* Fragment sent over reliabile channel */ - free_fragment_tree (s); - pos = s->frag_parent; - GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s); - GNUNET_free (s); - /* check if subtree is done */ - while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) && - (pos != pm)) - { - s = pos; - pos = s->frag_parent; - GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s); - GNUNET_free (s); - } - - /* Was this the last applicable fragmment? */ - if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg)) - client_send_response (pm); - } - else if (PMT_CORE != pm->pmt) - { - /* This was an acknowledgement of some type, always free */ - free_pending_message (pm); + completed_pending_message (pm); } else { @@ -7748,15 +7949,13 @@ transmit_on_queue (void *cls) retransmitting. Note that in the future this heuristic should likely be improved further (measure RTT stability, consider message urgency and size when delaying ACKs, etc.) */ - update_pm_next_attempt (s, + update_pm_next_attempt (pm, GNUNET_TIME_relative_to_absolute ( GNUNET_TIME_relative_multiply (queue->pd.aged_rtt, 4))); } - /* finally, re-schedule queue transmission task itself */ - schedule_transmit_on_queue (queue, GNUNET_NO); -#endif + schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); } @@ -7871,7 +8070,7 @@ handle_send_message_ack (void *cls, for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue; queue = queue->next_client) - schedule_transmit_on_queue (queue, GNUNET_NO); + schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); } else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) { @@ -7880,7 +8079,7 @@ handle_send_message_ack (void *cls, "# Transmission throttled due to queue queue limit", -1, GNUNET_NO); - schedule_transmit_on_queue (qe->queue, GNUNET_NO); + schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); } if (NULL != (pm = qe->pm)) @@ -7894,21 +8093,7 @@ handle_send_message_ack (void *cls, transmit on queue for queues of the neighbour */ vl = pm->vl; if (vl->pending_msg_head == pm) - { -#if FIXME - NEXT - for (struct Queue *queue = n->queue_head; NULL != queue; - queue = queue->next_neighbour) - schedule_transmit_on_queue (queue, GNUNET_NO); -#endif - } - if (GNUNET_OK != ntohl (sma->status)) - { - GNUNET_log ( - GNUNET_ERROR_TYPE_INFO, - "Queue failed in transmission <%llu>, will try retransmission immediately\n", - pm->logging_uuid); - update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS); - } + check_vl_transmission (vl); } GNUNET_free (qe); } @@ -8431,6 +8616,7 @@ handle_add_queue_message (void *cls, queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); queue->neighbour = neighbour; + queue->idle = GNUNET_YES; memcpy (&queue[1], addr, addr_len); /* notify monitors about new queue */ { @@ -8452,6 +8638,8 @@ handle_add_queue_message (void *cls, &aqm->receiver, &check_validation_request_pending, queue); + /* look for traffic for this queue */ + schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); /* might be our first queue, try launching DV learning */ if (NULL == dvlearn_task) dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL); -- cgit v1.2.3