From f8cea0ca2c5a683cdd0209e3027599c56b2ec4f3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 26 Jan 2019 17:20:47 +0100 Subject: reassembly logic --- src/transport/gnunet-service-tng.c | 123 +++++++++++++++++++++++++++++++++++-- 1 file changed, 117 insertions(+), 6 deletions(-) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index ac4a262d7..5a8bf5bc1 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -50,6 +50,9 @@ * directions, allow multiple messages per peer simultaneously (tag * confirmations with unique message ID), and replace quota-out with * proper flow control; + * - if messages are below MTU, consider adding ACKs and other stuff + * (requires planning at receiver, and additional MST-style demultiplex + * at receiver!) * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" @@ -2799,6 +2802,37 @@ check_fragment_box (void *cls, } +/** + * Generate a fragment acknowledgement for an @a rc. + * + * @param rc context to generate ACK for, @a rc ACK state is reset + */ +static void +send_fragment_ack (struct ReassemblyContext *rc) +{ + struct TransportFragmentAckMessage *ack; + + ack = GNUNET_new (struct TransportFragmentAckMessage); + ack->header.size = htons (sizeof (struct TransportFragmentAckMessage)); + ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK); + ack->frag_uuid = htonl (rc->frag_uuid); + ack->extra_acks = GNUNET_htonll (rc->extra_acks); + ack->msg_uuid = rc->msg_uuid; + ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay); + if (0 == rc->msg_missing) + ack->reassembly_timeout + = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */ + else + ack->reassembly_timeout + = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)); + route_message (&rc->neighbour->pid, + &ack->header); + rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO; + rc->num_acks = 0; + rc->extra_acks = 0LLU; +} + + /** * Communicator gave us a fragment. Process the request. * @@ -2814,6 +2848,12 @@ handle_fragment_box (void *cls, struct ReassemblyContext *rc; const struct GNUNET_MessageHeader *msg; uint16_t msize; + uint16_t fsize; + uint16_t frag_off; + uint32_t frag_uuid; + char *target; + struct GNUNET_TIME_Relative cdelay; + int ack_now; n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); @@ -2856,9 +2896,14 @@ handle_fragment_box (void *cls, &rc->msg_uuid, rc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size); + target = (char *) &rc[1]; + rc->bitfield = (uint8_t *) (target + rc->msg_size); rc->msg_missing = rc->msg_size; } + else + { + target = (char *) &rc[1]; + } if (msize != rc->msg_size) { GNUNET_break (0); @@ -2866,12 +2911,78 @@ handle_fragment_box (void *cls, return; } - // FIXME: do work: reassemble - + /* reassemble */ + fsize = ntohs (fb->header.size) - sizeof (*fb); + frag_off = ntohs (fb->frag_off); + memcpy (&target[frag_off], + &fb[1], + fsize); + /* update bitfield and msg_missing */ + for (unsigned int i=frag_off;ibitfield[i / 8] & (1 << (i % 8)))) + { + rc->bitfield[i / 8] |= (1 << (i % 8)); + rc->msg_missing--; + } + } + + /* Compute cummulative ACK */ + frag_uuid = ntohl (fb->frag_uuid); + cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag); + cdelay = GNUNET_TIME_relative_multiply (cdelay, + rc->num_acks); + rc->last_frag = GNUNET_TIME_absolute_get (); + rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, + cdelay); + ack_now = GNUNET_NO; + if (0 == rc->num_acks) + { + /* case one: first ack */ + rc->frag_uuid = frag_uuid; + rc->extra_acks = 0LLU; + rc->num_acks = 1; + } + else if ( (frag_uuid >= rc->frag_uuid) && + (frag_uuid <= rc->frag_uuid + 64) ) + { + /* case two: ack fits after existing min UUID */ + if ( (frag_uuid == rc->frag_uuid) || + (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) ) + { + /* duplicate fragment, ack now! */ + ack_now = GNUNET_YES; + } + else + { + rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1)); + rc->num_acks++; + } + } + else if ( (rc->frag_uuid > frag_uuid) && + ( ( (rc->frag_uuid == frag_uuid + 64) && + (0 == rc->extra_acks) ) || + ( (rc->frag_uuid < frag_uuid + 64) && + (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) ) + { + /* can fit ack by shifting extra acks and starting at + frag_uid, test above esured that the bits we will + shift 'extra_acks' by are all zero. */ + rc->extra_acks <<= (rc->frag_uuid - frag_uuid); + rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1)); + rc->frag_uuid = frag_uuid; + rc->num_acks++; + } + if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */ + ack_now = GNUNET_YES; /* maximum acks received */ + // FIXME: possibly also ACK based on RTT (but for that we'd need to + // determine the session used for the ACK first!) + /* is reassembly complete? */ if (0 != rc->msg_missing) { - /* FIXME: possibly send ACK! */ + if (ack_now) + send_fragment_ack (rc); finish_cmc_handling (cmc); return; } @@ -2885,7 +2996,7 @@ handle_fragment_box (void *cls, return; } /* successful reassembly */ - /* FIXME: definitively send ACK! */ + send_fragment_ack (rc); demultiplex_with_cmc (cmc, msg); /* FIXME: really free here? Might be bad if fragments are still @@ -3146,7 +3257,7 @@ handle_dv_box (void *cls, const struct TransportDVBox *dvb) { struct CommunicatorMessageContext *cmc = cls; - uint16_t size = ntohs (dvb->header.size); + uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb); uint16_t num_hops = ntohs (dvb->num_hops); const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1]; const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops]; -- cgit v1.2.3