summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c123
1 files changed, 117 insertions, 6 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index ac4a262d7..5a8bf5bc1 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -50,6 +50,9 @@
* directions, allow multiple messages per peer simultaneously (tag
* confirmations with unique message ID), and replace quota-out with
* proper flow control;
+ * - if messages are below MTU, consider adding ACKs and other stuff
+ * (requires planning at receiver, and additional MST-style demultiplex
+ * at receiver!)
*
* Design realizations / discussion:
* - communicators do flow control by calling MQ "notify sent"
@@ -2800,6 +2803,37 @@ check_fragment_box (void *cls,
/**
+ * Generate a fragment acknowledgement for an @a rc.
+ *
+ * @param rc context to generate ACK for, @a rc ACK state is reset
+ */
+static void
+send_fragment_ack (struct ReassemblyContext *rc)
+{
+ struct TransportFragmentAckMessage *ack;
+
+ ack = GNUNET_new (struct TransportFragmentAckMessage);
+ ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
+ ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
+ ack->frag_uuid = htonl (rc->frag_uuid);
+ ack->extra_acks = GNUNET_htonll (rc->extra_acks);
+ ack->msg_uuid = rc->msg_uuid;
+ ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
+ if (0 == rc->msg_missing)
+ ack->reassembly_timeout
+ = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
+ else
+ ack->reassembly_timeout
+ = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
+ route_message (&rc->neighbour->pid,
+ &ack->header);
+ rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
+ rc->num_acks = 0;
+ rc->extra_acks = 0LLU;
+}
+
+
+/**
* Communicator gave us a fragment. Process the request.
*
* @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
@@ -2814,6 +2848,12 @@ handle_fragment_box (void *cls,
struct ReassemblyContext *rc;
const struct GNUNET_MessageHeader *msg;
uint16_t msize;
+ uint16_t fsize;
+ uint16_t frag_off;
+ uint32_t frag_uuid;
+ char *target;
+ struct GNUNET_TIME_Relative cdelay;
+ int ack_now;
n = GNUNET_CONTAINER_multipeermap_get (neighbours,
&cmc->im.sender);
@@ -2856,9 +2896,14 @@ handle_fragment_box (void *cls,
&rc->msg_uuid,
rc,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size);
+ target = (char *) &rc[1];
+ rc->bitfield = (uint8_t *) (target + rc->msg_size);
rc->msg_missing = rc->msg_size;
}
+ else
+ {
+ target = (char *) &rc[1];
+ }
if (msize != rc->msg_size)
{
GNUNET_break (0);
@@ -2866,12 +2911,78 @@ handle_fragment_box (void *cls,
return;
}
- // FIXME: do work: reassemble
-
+ /* reassemble */
+ fsize = ntohs (fb->header.size) - sizeof (*fb);
+ frag_off = ntohs (fb->frag_off);
+ memcpy (&target[frag_off],
+ &fb[1],
+ fsize);
+ /* update bitfield and msg_missing */
+ for (unsigned int i=frag_off;i<frag_off+fsize;i++)
+ {
+ if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
+ {
+ rc->bitfield[i / 8] |= (1 << (i % 8));
+ rc->msg_missing--;
+ }
+ }
+
+ /* Compute cummulative ACK */
+ frag_uuid = ntohl (fb->frag_uuid);
+ cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
+ cdelay = GNUNET_TIME_relative_multiply (cdelay,
+ rc->num_acks);
+ rc->last_frag = GNUNET_TIME_absolute_get ();
+ rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
+ cdelay);
+ ack_now = GNUNET_NO;
+ if (0 == rc->num_acks)
+ {
+ /* case one: first ack */
+ rc->frag_uuid = frag_uuid;
+ rc->extra_acks = 0LLU;
+ rc->num_acks = 1;
+ }
+ else if ( (frag_uuid >= rc->frag_uuid) &&
+ (frag_uuid <= rc->frag_uuid + 64) )
+ {
+ /* case two: ack fits after existing min UUID */
+ if ( (frag_uuid == rc->frag_uuid) ||
+ (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
+ {
+ /* duplicate fragment, ack now! */
+ ack_now = GNUNET_YES;
+ }
+ else
+ {
+ rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
+ rc->num_acks++;
+ }
+ }
+ else if ( (rc->frag_uuid > frag_uuid) &&
+ ( ( (rc->frag_uuid == frag_uuid + 64) &&
+ (0 == rc->extra_acks) ) ||
+ ( (rc->frag_uuid < frag_uuid + 64) &&
+ (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
+ {
+ /* can fit ack by shifting extra acks and starting at
+ frag_uid, test above esured that the bits we will
+ shift 'extra_acks' by are all zero. */
+ rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
+ rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
+ rc->frag_uuid = frag_uuid;
+ rc->num_acks++;
+ }
+ if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
+ ack_now = GNUNET_YES; /* maximum acks received */
+ // FIXME: possibly also ACK based on RTT (but for that we'd need to
+ // determine the session used for the ACK first!)
+
/* is reassembly complete? */
if (0 != rc->msg_missing)
{
- /* FIXME: possibly send ACK! */
+ if (ack_now)
+ send_fragment_ack (rc);
finish_cmc_handling (cmc);
return;
}
@@ -2885,7 +2996,7 @@ handle_fragment_box (void *cls,
return;
}
/* successful reassembly */
- /* FIXME: definitively send ACK! */
+ send_fragment_ack (rc);
demultiplex_with_cmc (cmc,
msg);
/* FIXME: really free here? Might be bad if fragments are still
@@ -3146,7 +3257,7 @@ handle_dv_box (void *cls,
const struct TransportDVBox *dvb)
{
struct CommunicatorMessageContext *cmc = cls;
- uint16_t size = ntohs (dvb->header.size);
+ uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
uint16_t num_hops = ntohs (dvb->num_hops);
const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];