summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-22 22:55:05 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-22 22:55:05 +0100
commitff10602f5ab7df06dc850206159e76bd7a7891ea (patch)
treeb58cfeacdbd851eeac2cc772ce77fec73bd4760b /src/transport/gnunet-service-tng.c
parent25b14e096d09acfee96219de359ecaa0b56e2a34 (diff)
working on crazy fragmentation logic
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c380
1 files changed, 340 insertions, 40 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 3365ea5d5..76d5265a8 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -428,31 +428,50 @@ struct TransportDVLearn
/**
* Outer layer of an encapsulated message send over multiple hops.
+ * The path given only includes the identities of the subsequent
+ * peers, i.e. it will be empty if we are the receiver. Each
+ * forwarding peer should scan the list from the end, and if it can,
+ * forward to the respective peer. The list should then be shortened
+ * by all the entries up to and including that peer. Each hop should
+ * also increment @e total_hops to allow the receiver to get a precise
+ * estimate on the number of hops the message travelled. Senders must
+ * provide a learned path that thus should work, but intermediaries
+ * know of a shortcut, they are allowed to send the message via that
+ * shortcut.
+ *
+ * If a peer finds itself still on the list, it must drop the message.
*/
struct TransportDVBox
{
/**
- * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV
+ * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
*/
struct GNUNET_MessageHeader header;
/**
+ * Number of total hops this messages travelled. In NBO.
+ * @e origin sets this to zero, to be incremented at
+ * each hop.
+ */
+ uint16_t total_hops GNUNET_PACKED;
+
+ /**
* Number of hops this messages includes. In NBO.
*/
uint16_t num_hops GNUNET_PACKED;
/**
- * Position of our peer in the sequence.
- * To be incremented at each hop. In NBO.
+ * Identity of the peer that originated the message.
*/
- uint16_t current_hop GNUNET_PACKED;
+ struct GNUNET_PeerIdentity origin;
/* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
- the first is the sender, the last the receiver; the current
- peer may be one in the middle. */
+ excluding the @e origin and the current peer, the last must be
+ the ultimate target; if @e num_hops is zero, the receiver of this
+ message is the ultimate target. */
/* Followed by the actual message, which itself may be
- another box, but not a DV_LEARN message! */
+ another box, but not a DV_LEARN or DV_BOX message! */
};
@@ -696,7 +715,60 @@ struct Neighbour
/**
- * Transmission request from CORE that is awaiting delivery.
+ * Types of different pending messages.
+ */
+enum PendingMessageType
+{
+
+ /**
+ * Ordinary message received from the CORE service.
+ */
+ PMT_CORE = 0,
+
+ /**
+ * Fragment box.
+ */
+ PMT_FRAGMENT_BOX = 1,
+
+ /**
+ * Reliability box.
+ */
+ PMT_RELIABILITY_BOX = 2,
+
+ /**
+ * Any type of acknowledgement.
+ */
+ PMT_ACKNOWLEDGEMENT = 3
+
+
+};
+
+
+/**
+ * Transmission request that is awaiting delivery. The original
+ * transmission requests from CORE may be too big for some queues.
+ * In this case, a *tree* of fragments is created. At each
+ * level of the tree, fragments are kept in a DLL ordered by which
+ * fragment should be sent next (at the head). The tree is searched
+ * top-down, with the original message at the root.
+ *
+ * To select a node for transmission, first it is checked if the
+ * current node's message fits with the MTU. If it does not, we
+ * either calculate the next fragment (based on @e frag_off) from the
+ * current node, or, if all fragments have already been created,
+ * descend to the @e head_frag. Even though the node was already
+ * fragmented, the fragment may be too big if the fragment was
+ * generated for a queue with a larger MTU. In this case, the node
+ * may be fragmented again, thus creating a tree.
+ *
+ * When acknowledgements for fragments are received, the tree
+ * must be pruned, removing those parts that were already
+ * acknowledged. When fragments are sent over a reliable
+ * channel, they can be immediately removed.
+ *
+ * If a message is ever fragmented, then the original "full" message
+ * is never again transmitted (even if it fits below the MTU), and
+ * only (remaining) fragments are sent.
*/
struct PendingMessage
{
@@ -711,26 +783,51 @@ struct PendingMessage
struct PendingMessage *prev_neighbour;
/**
- * Kept in a MDLL of messages from this @a client.
+ * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
*/
struct PendingMessage *next_client;
-
+
/**
- * Kept in a MDLL of messages from this @a client.
+ * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
*/
struct PendingMessage *prev_client;
/**
+ * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
+ */
+ struct PendingMessage *next_frag;
+
+ /**
+ * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX)
+ */
+ struct PendingMessage *prev_frag;
+
+ /**
* Target of the request.
*/
struct Neighbour *target;
-
+
/**
- * Client that issued the transmission request.
+ * Client that issued the transmission request, if @e pmt is #PMT_CORE.
*/
struct TransportClient *client;
+
+ /**
+ * Head of a MDLL of fragments created for this core message.
+ */
+ struct PendingMessage *head_frag;
+
+ /**
+ * Tail of a MDLL of fragments created for this core message.
+ */
+ struct PendingMessage *tail_frag;
/**
+ * Our parent in the fragmentation tree.
+ */
+ struct PendingMessage *frag_parent;
+
+ /**
* At what time should we give up on the transmission (and no longer retry)?
*/
struct GNUNET_TIME_Absolute timeout;
@@ -739,12 +836,38 @@ struct PendingMessage
* What is the earliest time for us to retry transmission of this message?
*/
struct GNUNET_TIME_Absolute next_attempt;
+
+ /**
+ * UUID to use for this message (used for reassembly of fragments, only
+ * initialized if @e msg_uuid_set is #GNUNET_YES).
+ */
+ struct GNUNET_ShortHashCode msg_uuid;
+
+ /**
+ * Counter incremented per generated fragment.
+ */
+ uint32_t frag_uuidgen;
/**
+ * Type of the pending message.
+ */
+ enum PendingMessageType pmt;
+
+ /**
* Size of the original message.
*/
- uint32_t bytes_msg;
+ uint16_t bytes_msg;
+ /**
+ * Offset at which we should generate the next fragment.
+ */
+ uint16_t frag_off;
+
+ /**
+ * #GNUNET_YES once @e msg_uuid was initialized
+ */
+ int16_t msg_uuid_set;
+
/* Followed by @e bytes_msg to transmit */
};
@@ -1483,6 +1606,28 @@ check_client_send (void *cls,
/**
+ * Free fragment tree below @e root, excluding @e root itself.
+ *
+ * @param root root of the tree to free
+ */
+static void
+free_fragment_tree (struct PendingMessage *root)
+{
+ struct PendingMessage *frag;
+
+ while (NULL != (frag = root->head_frag))
+ {
+ free_fragment_tree (frag);
+ GNUNET_CONTAINER_MDLL_remove (frag,
+ root->head_frag,
+ root->tail_frag,
+ frag);
+ GNUNET_free (frag);
+ }
+}
+
+
+/**
* Send a response to the @a pm that we have processed a
* "send" request with status @a success. We
* transmitted @a bytes_physical on the actual wire.
@@ -1509,7 +1654,7 @@ client_send_response (struct PendingMessage *pm,
env = GNUNET_MQ_msg (som,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
som->success = htonl ((uint32_t) success);
- som->bytes_msg = htonl (pm->bytes_msg);
+ som->bytes_msg = htons (pm->bytes_msg);
som->bytes_physical = htonl (bytes_physical);
som->peer = target->pid;
GNUNET_MQ_send (tc->mq,
@@ -1523,6 +1668,7 @@ client_send_response (struct PendingMessage *pm,
target->pending_msg_head,
target->pending_msg_tail,
pm);
+ free_fragment_tree (pm);
GNUNET_free (pm);
}
@@ -2024,18 +2170,98 @@ static struct PendingMessage *
fragment_message (struct PendingMessage *pm,
uint16_t mtu)
{
- if (0)
+ struct PendingMessage *ff;
+
+ if (GNUNET_NO == pm->msg_uuid_set)
{
- /* mtu too small */
- // FIMXE: bitch
- client_send_response (pm,
- GNUNET_NO,
- 0);
- return NULL;
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &pm->msg_uuid,
+ sizeof (pm->msg_uuid));
+ pm->msg_uuid_set = GNUNET_YES;
+ }
+
+ /* This invariant is established in #handle_add_queue_message() */
+ GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
+
+ /* select fragment for transmission, descending the tree if it has
+ been expanded until we are at a leaf or at a fragment that is small enough */
+ ff = pm;
+ while ( ( (ff->bytes_msg > mtu) ||
+ (pm == ff) ) &&
+ (ff->frag_off == ff->bytes_msg) &&
+ (NULL != ff->head_frag) )
+ {
+ ff = ff->head_frag; /* descent into fragmented fragments */
}
- /* FIXME: return first fragment here! */
- return NULL;
+ if ( ( (ff->bytes_msg > mtu) ||
+ (pm == ff) ) &&
+ (pm->frag_off < pm->bytes_msg) )
+ {
+ /* Did not yet calculate all fragments, calculate next fragment */
+ struct PendingMessage *frag;
+ struct TransportFragmentBox tfb;
+ const char *orig;
+ char *msg;
+ uint16_t fragmax;
+ uint16_t fragsize;
+ uint16_t msize;
+ uint16_t xoff = 0;
+
+ orig = (const char *) &ff[1];
+ msize = ff->bytes_msg;
+ if (pm != ff)
+ {
+ const struct TransportFragmentBox *tfbo;
+
+ tfbo = (const struct TransportFragmentBox *) orig;
+ orig += sizeof (struct TransportFragmentBox);
+ msize -= sizeof (struct TransportFragmentBox);
+ xoff = ntohs (tfbo->frag_off);
+ }
+ fragmax = mtu - sizeof (struct TransportFragmentBox);
+ fragsize = GNUNET_MIN (msize - ff->frag_off,
+ fragmax);
+ frag = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct TransportFragmentBox) +
+ fragsize);
+ frag->target = pm->target;
+ frag->frag_parent = ff;
+ frag->timeout = pm->timeout;
+ frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
+ frag->pmt = PMT_FRAGMENT_BOX;
+ msg = (char *) &frag[1];
+ tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
+ tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
+ fragsize);
+ tfb.frag_uuid = htonl (pm->frag_uuidgen++);
+ tfb.msg_uuid = pm->msg_uuid;
+ tfb.frag_off = htons (ff->frag_off + xoff);
+ tfb.msg_size = htons (pm->bytes_msg);
+ memcpy (msg,
+ &tfb,
+ sizeof (tfb));
+ memcpy (&msg[sizeof (tfb)],
+ &orig[ff->frag_off],
+ fragsize);
+ GNUNET_CONTAINER_MDLL_insert (frag,
+ ff->head_frag,
+ ff->tail_frag,
+ frag);
+ ff->frag_off += fragsize;
+ ff = frag;
+ }
+
+ /* Move head to the tail and return it */
+ GNUNET_CONTAINER_MDLL_remove (frag,
+ ff->frag_parent->head_frag,
+ ff->frag_parent->tail_frag,
+ ff);
+ GNUNET_CONTAINER_MDLL_insert_tail (frag,
+ ff->frag_parent->head_frag,
+ ff->frag_parent->tail_frag,
+ ff);
+ return ff;
}
@@ -2052,11 +2278,12 @@ fragment_message (struct PendingMessage *pm,
static struct PendingMessage *
reliability_box_message (struct PendingMessage *pm)
{
- if (0) // FIXME
+ if (PMT_CORE != pm->pmt)
{
- /* already fragmented or reliability boxed, do nothing */
+ /* already fragmented or reliability boxed, or control message: do nothing */
return pm;
}
+
if (0) // FIXME
{
/* failed hard */
@@ -2086,6 +2313,7 @@ transmit_on_queue (void *cls)
struct GNUNET_ATS_Session *queue = cls;
struct Neighbour *n = queue->neighbour;
struct PendingMessage *pm;
+ struct PendingMessage *s;
uint32_t overhead;
queue->transmit_task = NULL;
@@ -2100,38 +2328,102 @@ transmit_on_queue (void *cls)
overhead = 0;
if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
overhead += sizeof (struct TransportReliabilityBox);
- if ( (0 != queue->mtu) &&
- (pm->bytes_msg + overhead > queue->mtu) )
- pm = fragment_message (pm,
- queue->mtu);
- if (NULL == pm)
+ s = pm;
+ if ( ( (0 != queue->mtu) &&
+ (pm->bytes_msg + overhead > queue->mtu) ) ||
+ (NULL != pm->head_frag /* fragments already exist, should
+ respect that even if MTU is 0 for
+ this queue */) )
+ s = fragment_message (s,
+ (0 == queue->mtu)
+ ? UINT16_MAX /* no real maximum */
+ : queue->mtu);
+ if (NULL == s)
{
/* Fragmentation failed, try next message... */
schedule_transmit_on_queue (queue);
return;
}
if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- pm = reliability_box_message (pm);
- if (NULL == pm)
+ s = reliability_box_message (s);
+ if (NULL == s)
{
/* Reliability boxing failed, try next message... */
schedule_transmit_on_queue (queue);
return;
}
- // FIXME: actually do transmission
+ // FIXME: actually give 's' to communicator for transmission here!
- // FIXME: unless 'pm' is an ACK or control, move 'pm' back in the
- // transmission queue (simplistic: to the end, better: with position
- // depending on type, timeout, etc.)
-
- // FIXME: do something similar in defragmentation / reliability ACK handling!
- if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
+ // FIXME: do something similar to the logic below
+ // in defragmentation / reliability ACK handling!
+
+ /* Check if this transmission somehow conclusively finished handing 'pm'
+ even without any explicit ACKs */
+ if ( (PMT_CORE == s->pmt) &&
+ (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
{
+ /* Full message sent, and over reliabile channel */
client_send_response (pm,
GNUNET_YES,
pm->bytes_msg);
}
+ else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
+ (PMT_FRAGMENT_BOX == s->pmt) )
+ {
+ struct PendingMessage *pos;
+
+ /* Fragment sent over reliabile channel */
+ free_fragment_tree (s);
+ pos = s->frag_parent;
+ GNUNET_CONTAINER_MDLL_remove (frag,
+ pos->head_frag,
+ pos->tail_frag,
+ s);
+ GNUNET_free (s);
+ /* check if subtree is done */
+ while ( (NULL == pos->head_frag) &&
+ (pos->frag_off == pos->bytes_msg) &&
+ (pos != pm) )
+ {
+ s = pos;
+ pos = s->frag_parent;
+ GNUNET_CONTAINER_MDLL_remove (frag,
+ pos->head_frag,
+ pos->tail_frag,
+ s);
+ GNUNET_free (s);
+ }
+
+ /* Was this the last applicable fragmment? */
+ if ( (NULL == pm->head_frag) &&
+ (pm->frag_off == pm->bytes_msg) )
+ client_send_response (pm,
+ GNUNET_YES,
+ pm->bytes_msg /* FIXME: calculate and add overheads! */);
+ }
+ else if (PMT_CORE != pm->pmt)
+ {
+ /* This was an acknowledgement of some type, always free */
+
+ struct Neighbour *neighbour = pm->target;
+ GNUNET_CONTAINER_MDLL_remove (neighbour,
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pm);
+ GNUNET_free (pm);
+ }
+ else
+ {
+ /* message not finished, waiting for acknowledgement */
+ // FIXME: update time by which we might retransmit 's' based on
+ // queue characteristics (i.e. RTT)
+
+ // FIXME: move 'pm' back in the transmission queue (simplistic: to
+ // the end, better: with position depending on type, timeout,
+ // etc.)
+ }
+
/* finally, re-schedule self */
schedule_transmit_on_queue (queue);
}
@@ -2217,6 +2509,14 @@ handle_add_queue_message (void *cls,
const char *addr;
uint16_t addr_len;
+ if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
+ {
+ /* MTU so small as to be useless for transmissions,
+ required for #fragment_message()! */
+ GNUNET_break_op (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
neighbour = lookup_neighbour (&aqm->receiver);
if (NULL == neighbour)
{