From bdc605b20237d49213d1ab37d62752a5f97d1d2e Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 15 Apr 2019 22:42:26 +0200 Subject: add basic handling of fragment acks --- src/transport/gnunet-service-tng.c | 176 +++++++++++++++++++++++++++++-------- 1 file changed, 141 insertions(+), 35 deletions(-) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 568e5b1d7..6a8a3fc4d 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -34,8 +34,6 @@ * * Implement next: * - DV data structures: - * + initiation of DV learn (incl. RTT measurement logic!) - * - security considerations? add signatures to routes? initiator signature? * + using DV routes! * - handling of DV-boxed messages that need to be forwarded * - route_message implementation, including using DV data structures @@ -133,8 +131,8 @@ #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) /** - * We only consider queues as "quality" connections when - * suppressing the generation of DV initiation messages if + * We only consider queues as "quality" connections when + * suppressing the generation of DV initiation messages if * the latency of the queue is below this threshold. */ #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) @@ -183,7 +181,7 @@ * do we need to have to suppress initiating DV learn messages? */ #define DV_LEARN_QUALITY_THRESHOLD 100 - + /** * When do we forget an invalid address for sure? */ @@ -817,7 +815,7 @@ enum ClientType /** - * When did we launch this DV learning activity? + * When did we launch this DV learning activity? */ struct LearnLaunchEntry { @@ -3737,6 +3735,49 @@ handle_fragment_box (void *cls, } +/** + * Check the @a fa against the fragments associated with @a pm. + * If it matches, remove the matching fragments from the transmission + * list. + * + * @param pm pending message to check against the ack + * @param fa the ack that was received + * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not + */ +static int +check_ack_against_pm (struct PendingMessage *pm, + const struct TransportFragmentAckMessage *fa) +{ + int match; + struct PendingMessage *nxt; + uint32_t fs = ntohl (fa->frag_uuid); + uint64_t xtra = GNUNET_ntohll (fa->extra_acks); + + match = GNUNET_NO; + for (struct PendingMessage *frag = pm->head_frag; + NULL != frag; + frag = nxt) + { + const struct TransportFragmentBox *tfb + = (const struct TransportFragmentBox *) &pm[1]; + uint32_t fu = ntohl (tfb->frag_uuid); + + GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt); + nxt = frag->next_frag; + /* Check for exact match or match in the 'xtra' bitmask */ + if ( (fu == fs) || + ( (fu > fs) && + (fu <= fs + 64) && + (0 != (1LLU << (fu - fs - 1) & xtra)) ) ) + { + match = GNUNET_YES; + free_fragment_tree (frag); + } + } + return match; +} + + /** * Communicator gave us a fragment acknowledgement. Process the request. * @@ -3748,11 +3789,76 @@ handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa) { struct CommunicatorMessageContext *cmc = cls; + struct Neighbour *n; + int matched; - // FIXME: do work: identify original message; then identify fragments being acked; - // remove those from the tree to prevent retransmission; - // compute RTT - // if entire message is ACKed, handle that as well. + n = GNUNET_CONTAINER_multipeermap_get (neighbours, + &cmc->im.sender); + if (NULL == n) + { + struct GNUNET_SERVICE_Client *client = cmc->tc->client; + + GNUNET_break (0); + finish_cmc_handling (cmc); + GNUNET_SERVICE_client_drop (client); + return; + } + /* FIXME-OPTIMIZE: maybe use another hash map here? */ + matched = GNUNET_NO; + for (struct PendingMessage *pm = n->pending_msg_head; + NULL != pm; + pm = pm->prev_neighbour) + { + if (0 != + GNUNET_memcmp (&fa->msg_uuid, + &pm->msg_uuid)) + continue; + matched = GNUNET_YES; + if (GNUNET_YES == + check_ack_against_pm (pm, + fa)) + { + struct GNUNET_TIME_Relative avg_ack_delay + = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay); + // FIXME: update RTT and other reliability data! + // ISSUE: we don't know which of n's queues the message(s) + // took (and in fact the different messages might have gone + // over different queues and possibly over multiple). + // => track queues with PendingMessages, and update RTT only if + // the queue used is unique? + // -> how can we get loss rates? + // -> or, add extra state to Box and ACK to identify queue? + (void) avg_ack_delay; + } + else + { + GNUNET_STATISTICS_update (GST_stats, + "# FRAGMENT_ACKS dropped, no matching fragment", + 1, + GNUNET_NO); + } + if (NULL == pm->head_frag) + { + // if entire message is ACKed, handle that as well. + // => clean up PM, any post actions? + free_pending_message (pm); + } + else + { + struct GNUNET_TIME_Relative reassembly_timeout + = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout); + // OPTIMIZE-FIXME: adjust retransmission strategy based on reassembly_timeout! + (void) reassembly_timeout; + } + break; + } + if (GNUNET_NO == matched) + { + GNUNET_STATISTICS_update (GST_stats, + "# FRAGMENT_ACKS dropped, no matching pending message", + 1, + GNUNET_NO); + } finish_cmc_handling (cmc); } @@ -4921,8 +5027,8 @@ set_pending_message_uuid (struct PendingMessage *pm) if (pm->msg_uuid_set) return; GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &pm->msg_uuid, - sizeof (pm->msg_uuid)); + &pm->msg_uuid, + sizeof (pm->msg_uuid)); pm->msg_uuid_set = GNUNET_YES; } @@ -4939,7 +5045,7 @@ set_pending_message_uuid (struct PendingMessage *pm) */ static struct PendingMessage * fragment_message (struct PendingMessage *pm, - uint16_t mtu) + uint16_t mtu) { struct PendingMessage *ff; @@ -4952,15 +5058,15 @@ fragment_message (struct PendingMessage *pm, been expanded until we are at a leaf or at a fragment that is small enough */ ff = pm; while ( ( (ff->bytes_msg > mtu) || - (pm == ff) ) && - (ff->frag_off == ff->bytes_msg) && - (NULL != ff->head_frag) ) + (pm == ff) ) && + (ff->frag_off == ff->bytes_msg) && + (NULL != ff->head_frag) ) { ff = ff->head_frag; /* descent into fragmented fragments */ } if ( ( (ff->bytes_msg > mtu) || - (pm == ff) ) && + (pm == ff) ) && (pm->frag_off < pm->bytes_msg) ) { /* Did not yet calculate all fragments, calculate next fragment */ @@ -4986,10 +5092,10 @@ fragment_message (struct PendingMessage *pm, } fragmax = mtu - sizeof (struct TransportFragmentBox); fragsize = GNUNET_MIN (msize - ff->frag_off, - fragmax); + fragmax); frag = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (struct TransportFragmentBox) + - fragsize); + sizeof (struct TransportFragmentBox) + + fragsize); frag->target = pm->target; frag->frag_parent = ff; frag->timeout = pm->timeout; @@ -4998,21 +5104,21 @@ fragment_message (struct PendingMessage *pm, msg = (char *) &frag[1]; tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); tfb.header.size = htons (sizeof (struct TransportFragmentBox) + - fragsize); + fragsize); tfb.frag_uuid = htonl (pm->frag_uuidgen++); tfb.msg_uuid = pm->msg_uuid; tfb.frag_off = htons (ff->frag_off + xoff); tfb.msg_size = htons (pm->bytes_msg); memcpy (msg, - &tfb, - sizeof (tfb)); + &tfb, + sizeof (tfb)); memcpy (&msg[sizeof (tfb)], - &orig[ff->frag_off], - fragsize); + &orig[ff->frag_off], + fragsize); GNUNET_CONTAINER_MDLL_insert (frag, - ff->head_frag, - ff->tail_frag, - frag); + ff->head_frag, + ff->tail_frag, + frag); ff->frag_off += fragsize; ff = frag; } @@ -5322,7 +5428,7 @@ static void tracker_excess_out_cb (void *cls) { (void) cls; - + /* FIXME: trigger excess bandwidth report to core? Right now, this is done internally within transport_api2_core already, but we probably want to change the logic and trigger it @@ -5719,7 +5825,7 @@ struct QueueQualityContext { /** * Set to the @e k'th queue encountered. - */ + */ struct Queue *q; /** @@ -5729,7 +5835,7 @@ struct QueueQualityContext /** * Set to the total number of queues encountered. - */ + */ unsigned int num_queues; /** @@ -5784,7 +5890,7 @@ check_connection_quality (void *cls, /** - * Task run when we CONSIDER initiating a DV learn + * Task run when we CONSIDER initiating a DV learn * process. We first check that sending out a message is * even possible (queues exist), then that it is desirable * (if not, reschedule the task for later), and finally @@ -5882,15 +5988,15 @@ start_dv_learn (void *cls) &check_connection_quality, &qqc); GNUNET_assert (NULL != qqc.q); - + /* Do this as close to transmission time as possible! */ - lle->launch_time = GNUNET_TIME_absolute_get (); + lle->launch_time = GNUNET_TIME_absolute_get (); // FIXME: not so easy, need to BOX this message // in a transmission request! (mistake also done elsewhere!) GNUNET_MQ_send (qqc.q->tc->mq, env); - /* reschedule this job, randomizing the time it runs (but no + /* reschedule this job, randomizing the time it runs (but no actual backoff!) */ dvlearn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY), -- cgit v1.2.3