summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-26 12:27:50 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-26 12:27:50 +0100
commitcb26df28be6f46898c34d7e8957baa86fa56ed11 (patch)
tree04fb4a82c0b3f823ff6c577efa10d36c74275b3e /src/transport/gnunet-service-tng.c
parentd1a4a8f64bba3399d16b2717c67f00957963983b (diff)
data structures for defragmentation
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c623
1 files changed, 595 insertions, 28 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index cb6fcebdc..ac4a262d7 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -33,8 +33,11 @@
* transport-to-transport traffic)
*
* Implement:
- * - manage defragmentation, retransmission, track RTT, loss, etc.
- * - DV data structures, learning, forgetting, using them!
+ * - data structures for defragmentation
+ * - manage defragmentation
+ * - ACK handling / retransmission
+ * - track RTT, distance, loss, etc.
+ * - DV data structures, learning, forgetting & using them!
*
* Easy:
* - use ATS bandwidth allocation callback and schedule transmissions!
@@ -83,6 +86,7 @@
#include "gnunet_peerstore_service.h"
#include "gnunet_hello_lib.h"
#include "gnunet_ats_transport_service.h"
+#include "gnunet_signatures.h"
#include "transport.h"
@@ -100,6 +104,16 @@
#define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
+ * How long are ephemeral keys valid?
+ */
+#define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
+
+/**
+ * How long do we keep partially reassembled messages around before giving up?
+ */
+#define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
+
+/**
* How many messages can we have pending for a given communicator
* process before we start to throttle that communicator?
*
@@ -169,8 +183,7 @@ struct TransportBackchannelEncapsulationMessage
/**
- * Body by which a peqer confirms that it is using an ephemeral
- * key.
+ * Body by which a peer confirms that it is using an ephemeral key.
*/
struct EphemeralConfirmation
{
@@ -182,10 +195,24 @@ struct EphemeralConfirmation
/**
* How long is this signature over the ephemeral key valid?
+ * Note that the receiver MUST IGNORE the absolute time, and
+ * only interpret the value as a mononic time and reject
+ * "older" values than the last one observed. Even with this,
+ * there is no real guarantee against replay achieved here,
+ * as the latest timestamp is not persisted. This is
+ * necessary as we do not want to require synchronized
+ * clocks and may not have a bidirectional communication
+ * channel. Communicators must protect against replay
+ * attacks when using backchannel communication!
*/
struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
/**
+ * Target's peer identity.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
* Ephemeral key setup by the sender for @e target, used
* to encrypt the payload.
*/
@@ -376,6 +403,12 @@ struct TransportFragmentAckMessage
* average transmission time of the sender minus this value.
*/
struct GNUNET_TIME_RelativeNBO avg_ack_delay;
+
+ /**
+ * How long until the receiver will stop trying reassembly
+ * of this message?
+ */
+ struct GNUNET_TIME_RelativeNBO reassembly_timeout;
};
@@ -549,6 +582,11 @@ struct EphemeralCacheEntry
struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
/**
+ * Our private ephemeral key.
+ */
+ struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
+
+ /**
* Node in the ephemeral cache for this entry.
* Used for expiration.
*/
@@ -727,6 +765,96 @@ struct GNUNET_ATS_Session
/**
+ * Information we keep for a message that we are reassembling.
+ */
+struct ReassemblyContext
+{
+
+ /**
+ * Original message ID for of the message that all the
+ * fragments belong to.
+ */
+ struct GNUNET_ShortHashCode msg_uuid;
+
+ /**
+ * Which neighbour is this context for?
+ */
+ struct Neighbour *neighbour;
+
+ /**
+ * Entry in the reassembly heap (sorted by expiration).
+ */
+ struct GNUNET_CONTAINER_HeapNode *hn;
+
+ /**
+ * Bitfield with @e msg_size bits representing the positions
+ * where we have received fragments. When we receive a fragment,
+ * we check the bits in @e bitfield before incrementing @e msg_missing.
+ *
+ * Allocated after the reassembled message.
+ */
+ uint8_t *bitfield;
+
+ /**
+ * Task for sending ACK. We may send ACKs either because of hitting
+ * the @e extra_acks limit, or based on time and @e num_acks. This
+ * task is for the latter case.
+ */
+ struct GNUNET_SCHEDULER_Task *ack_task;
+
+ /**
+ * At what time will we give up reassembly of this message?
+ */
+ struct GNUNET_TIME_Absolute reassembly_timeout;
+
+ /**
+ * Average delay of all acks in @e extra_acks and @e frag_uuid.
+ * Should be reset to zero when @e num_acks is set to 0.
+ */
+ struct GNUNET_TIME_Relative avg_ack_delay;
+
+ /**
+ * Time we received the last fragment. @e avg_ack_delay must be
+ * incremented by now - @e last_frag multiplied by @e num_acks.
+ */
+ struct GNUNET_TIME_Absolute last_frag;
+
+ /**
+ * Bitfield of up to 64 additional fragments following @e frag_uuid
+ * to be acknowledged in the next cummulative ACK.
+ */
+ uint64_t extra_acks;
+
+ /**
+ * Unique ID of the lowest fragment UUID to be acknowledged in the
+ * next cummulative ACK. Only valid if @e num_acks > 0.
+ */
+ uint32_t frag_uuid;
+
+ /**
+ * Number of ACKs we have accumulated so far. Reset to 0
+ * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
+ */
+ unsigned int num_acks;
+
+ /**
+ * How big is the message we are reassembling in total?
+ */
+ uint16_t msg_size;
+
+ /**
+ * How many bytes of the message are still missing? Defragmentation
+ * is complete when @e msg_missing == 0.
+ */
+ uint16_t msg_missing;
+
+ /* Followed by @e msg_size bytes of the (partially) defragmented original message */
+
+ /* Followed by @e bitfield data */
+};
+
+
+/**
* A neighbour that at least one communicator is connected to.
*/
struct Neighbour
@@ -738,6 +866,25 @@ struct Neighbour
struct GNUNET_PeerIdentity pid;
/**
+ * Map with `struct ReassemblyContext` structs for fragments under
+ * reassembly. May be NULL if we currently have no fragments from
+ * this @e pid (lazy initialization).
+ */
+ struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
+
+ /**
+ * Heap with `struct ReassemblyContext` structs for fragments under
+ * reassembly. May be NULL if we currently have no fragments from
+ * this @e pid (lazy initialization).
+ */
+ struct GNUNET_CONTAINER_Heap *reassembly_heap;
+
+ /**
+ * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
+ */
+ struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
+
+ /**
* Head of list of messages pending for this neighbour.
*/
struct PendingMessage *pending_msg_head;
@@ -1178,6 +1325,11 @@ static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
/**
+ * Task to free expired ephemerals.
+ */
+static struct GNUNET_SCHEDULER_Task *ephemeral_task;
+
+/**
* Our connection to ATS for allocation and bootstrapping.
*/
static struct GNUNET_ATS_TransportHandle *ats;
@@ -1364,6 +1516,76 @@ client_connect_cb (void *cls,
/**
+ * Free @a rc
+ *
+ * @param rc data structure to free
+ */
+static void
+free_reassembly_context (struct ReassemblyContext *rc)
+{
+ struct Neighbour *n = rc->neighbour;
+
+ GNUNET_assert (rc ==
+ GNUNET_CONTAINER_heap_remove_node (rc->hn));
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
+ &rc->msg_uuid,
+ rc));
+ GNUNET_free (rc);
+}
+
+
+/**
+ * Task run to clean up reassembly context of a neighbour that have expired.
+ *
+ * @param cls a `struct Neighbour`
+ */
+static void
+reassembly_cleanup_task (void *cls)
+{
+ struct Neighbour *n = cls;
+ struct ReassemblyContext *rc;
+
+ n->reassembly_timeout_task = NULL;
+ while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
+ {
+ if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
+ {
+ free_reassembly_context (rc);
+ continue;
+ }
+ GNUNET_assert (NULL == n->reassembly_timeout_task);
+ n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
+ &reassembly_cleanup_task,
+ n);
+ return;
+ }
+}
+
+
+/**
+ * function called to #free_reassembly_context().
+ *
+ * @param cls NULL
+ * @param key unused
+ * @param value a `struct ReassemblyContext` to free
+ * @return #GNUNET_OK (continue iteration)
+ */
+static int
+free_reassembly_cb (void *cls,
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
+{
+ struct ReassemblyContext *rc = value;
+ (void) cls;
+ (void) key;
+
+ free_reassembly_context (rc);
+ return GNUNET_OK;
+}
+
+
+/**
* Release memory used by @a neighbour.
*
* @param neighbour neighbour entry to free
@@ -1378,6 +1600,18 @@ free_neighbour (struct Neighbour *neighbour)
neighbour));
if (NULL != neighbour->timeout_task)
GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
+ if (NULL != neighbour->reassembly_map)
+ {
+ GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
+ &free_reassembly_cb,
+ NULL);
+ GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
+ neighbour->reassembly_map = NULL;
+ GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
+ neighbour->reassembly_heap = NULL;
+ }
+ if (NULL != neighbour->reassembly_timeout_task)
+ GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
GNUNET_free (neighbour);
}
@@ -2054,11 +2288,147 @@ static int
check_communicator_backchannel (void *cls,
const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
{
- // FIXME: check encapsulated message
- // FIXME: check 0-termination of communcator at target
+ const struct GNUNET_MessageHeader *inbox;
+ const char *is;
+ uint16_t msize;
+ uint16_t isize;
+
+ msize = ntohs (cb->header.size) - sizeof (*cb);
+ if (UINT16_MAX - msize >
+ sizeof (struct TransportBackchannelEncapsulationMessage) +
+ sizeof (struct TransportBackchannelRequestPayload) )
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ inbox = (const struct GNUNET_MessageHeader *) &cb[1];
+ isize = ntohs (inbox->size);
+ if (isize >= msize)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ is = (const char *) inbox;
+ is += isize;
+ msize -= isize;
+ GNUNET_assert (msize > 0);
+ if ('\0' != is[msize-1])
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
return GNUNET_OK;
}
+
+/**
+ * Remove memory used by expired ephemeral keys.
+ *
+ * @param cls NULL
+ */
+static void
+expire_ephemerals (void *cls)
+{
+ struct EphemeralCacheEntry *ece;
+
+ (void) cls;
+ ephemeral_task = NULL;
+ while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
+ {
+ if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
+ {
+ free_ephemeral (ece);
+ continue;
+ }
+ ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
+ &expire_ephemerals,
+ NULL);
+ return;
+ }
+}
+
+
+/**
+ * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
+ * one, cache it and return it.
+ *
+ * @param pid peer to look up ephemeral for
+ * @param private_key[out] set to the private key
+ * @param ephemeral_key[out] set to the key
+ * @param ephemeral_sender_sig[out] set to the signature
+ * @param ephemeral_validity[out] set to the validity expiration time
+ */
+static void
+lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
+ struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
+ struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
+ struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
+ struct GNUNET_TIME_Absolute *ephemeral_validity)
+{
+ struct EphemeralCacheEntry *ece;
+ struct EphemeralConfirmation ec;
+
+ ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
+ pid);
+ if ( (NULL != ece) &&
+ (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
+ {
+ free_ephemeral (ece);
+ ece = NULL;
+ }
+ if (NULL == ece)
+ {
+ ece = GNUNET_new (struct EphemeralCacheEntry);
+ ece->target = *pid;
+ ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
+ EPHEMERAL_VALIDITY);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
+ GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
+ &ece->ephemeral_key);
+ ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
+ ec.purpose.size = htonl (sizeof (ec));
+ ec.target = *pid;
+ ec.ephemeral_key = ece->ephemeral_key;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
+ &ec.purpose,
+ &ece->sender_sig));
+ ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
+ ece,
+ ece->ephemeral_validity.abs_value_us);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
+ &ece->target,
+ ece,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ if (NULL == ephemeral_task)
+ ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
+ &expire_ephemerals,
+ NULL);
+ }
+ *private_key = ece->private_key;
+ *ephemeral_key = ece->ephemeral_key;
+ *ephemeral_sender_sig = ece->sender_sig;
+ *ephemeral_validity = ece->ephemeral_validity;
+}
+
+
+/**
+ * We need to transmit @a hdr to @a target. If necessary, this may
+ * involve DV routing or even broadcasting and fragmentation.
+ *
+ * @param target peer to receive @a hdr
+ * @param hdr header of the message to route
+ */
+static void
+route_message (const struct GNUNET_PeerIdentity *target,
+ struct GNUNET_MessageHeader *hdr)
+{
+ // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
+ GNUNET_free (hdr);
+}
+
/**
* Communicator requests backchannel transmission. Process the request.
@@ -2071,11 +2441,48 @@ handle_communicator_backchannel (void *cls,
const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
{
struct TransportClient *tc = cls;
-
- // FIXME: determine path (possibly DV)! to target peer
- // FIXME: encapsulate message, encrypt message!
- // FIXME: possibly fragment message
- // FIXME: possibly DV-route message!
+ struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
+ struct GNUNET_TIME_Absolute ephemeral_validity;
+ struct TransportBackchannelEncapsulationMessage *enc;
+ struct TransportBackchannelRequestPayload ppay;
+ char *mpos;
+ uint16_t msize;
+
+ /* encapsulate and encrypt message */
+ msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
+ enc = GNUNET_malloc (sizeof (*enc) + msize);
+ enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
+ enc->header.size = htons (sizeof (*enc) + msize);
+ enc->target = cb->pid;
+ lookup_ephemeral (&cb->pid,
+ &private_key,
+ &enc->ephemeral_key,
+ &ppay.sender_sig,
+ &ephemeral_validity);
+ // FIXME: setup 'iv'
+#if FIXME
+ dh_key_derive (&private_key,
+ &cb->pid,
+ &enc->iv,
+ &key);
+#endif
+ ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
+ ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
+ mpos = (char *) &enc[1];
+#if FIXME
+ encrypt (key,
+ &ppay,
+ &mpos,
+ sizeof (ppay));
+ encrypt (key,
+ &cb[1],
+ &mpos,
+ ntohs (cb->header.size) - sizeof (*cb));
+ hmac (key,
+ &enc->hmac);
+#endif
+ route_message (&cb->pid,
+ &enc->header);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -2260,10 +2667,28 @@ struct CommunicatorMessageContext
* Additional information for flow control and about the sender.
*/
struct GNUNET_TRANSPORT_IncomingMessage im;
+
+ /**
+ * Number of hops the message has travelled (if DV-routed).
+ * FIXME: make use of this in ACK handling!
+ */
+ uint16_t total_hops;
};
/**
+ * Given an inbound message @a msg from a communicator @a cmc,
+ * demultiplex it based on the type calling the right handler.
+ *
+ * @param cmc context for demultiplexing
+ * @param msg message to demultiplex
+ */
+static void
+demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
+ const struct GNUNET_MessageHeader *msg);
+
+
+/**
* Send ACK to communicator (if requested) and free @a cmc.
*
* @param cmc context for which we are done handling the message
@@ -2385,9 +2810,89 @@ handle_fragment_box (void *cls,
const struct TransportFragmentBox *fb)
{
struct CommunicatorMessageContext *cmc = cls;
+ struct Neighbour *n;
+ struct ReassemblyContext *rc;
+ const struct GNUNET_MessageHeader *msg;
+ uint16_t msize;
+
+ n = GNUNET_CONTAINER_multipeermap_get (neighbours,
+ &cmc->im.sender);
+ if (NULL == n)
+ {
+ struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+
+ GNUNET_break (0);
+ finish_cmc_handling (cmc);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
+ if (NULL == n->reassembly_map)
+ {
+ n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
+ GNUNET_YES);
+ n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
+ &reassembly_cleanup_task,
+ n);
+ }
+ msize = ntohs (fb->msg_size);
+ rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
+ &fb->msg_uuid);
+ if (NULL == rc)
+ {
+ rc = GNUNET_malloc (sizeof (*rc) +
+ msize + /* reassembly payload buffer */
+ (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
+ rc->msg_uuid = fb->msg_uuid;
+ rc->neighbour = n;
+ rc->msg_size = msize;
+ rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
+ rc->last_frag = GNUNET_TIME_absolute_get ();
+ rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
+ rc,
+ rc->reassembly_timeout.abs_value_us);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
+ &rc->msg_uuid,
+ rc,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size);
+ rc->msg_missing = rc->msg_size;
+ }
+ if (msize != rc->msg_size)
+ {
+ GNUNET_break (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
- // FIXME: do work!
- finish_cmc_handling (cmc);
+ // FIXME: do work: reassemble
+
+ /* is reassembly complete? */
+ if (0 != rc->msg_missing)
+ {
+ /* FIXME: possibly send ACK! */
+ finish_cmc_handling (cmc);
+ return;
+ }
+ /* reassembly is complete, verify result */
+ msg = (const struct GNUNET_MessageHeader *) &rc[1];
+ if (ntohs (msg->size) != rc->msg_size)
+ {
+ GNUNET_break (0);
+ free_reassembly_context (rc);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ /* successful reassembly */
+ /* FIXME: definitively send ACK! */
+ demultiplex_with_cmc (cmc,
+ msg);
+ /* FIXME: really free here? Might be bad if fragments are still
+ en-route and we forget that we finished this reassembly immediately!
+ -> keep around until timeout?
+ -> shorten timeout based on ACK? */
+ free_reassembly_context (rc);
}
@@ -2436,11 +2941,27 @@ handle_reliability_box (void *cls,
{
struct CommunicatorMessageContext *cmc = cls;
const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
-
- // FIXME: send back reliability ACK (possibly conditional)
- /* forward encapsulated message to CORE */
- handle_raw_message (cmc,
- inbox);
+
+ if (0 == ntohl (rb->ack_countdown))
+ {
+ struct TransportReliabilityAckMessage *ack;
+
+ /* FIXME: implement cummulative ACKs and ack_countdown,
+ then setting the avg_ack_delay field below: */
+ ack = GNUNET_malloc (sizeof (*ack) +
+ sizeof (struct GNUNET_ShortHashCode));
+ ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
+ ack->header.size = htons (sizeof (*ack) +
+ sizeof (struct GNUNET_ShortHashCode));
+ memcpy (&ack[1],
+ &rb->msg_uuid,
+ sizeof (struct GNUNET_ShortHashCode));
+ route_message (&cmc->im.sender,
+ &ack->header);
+ }
+ /* continue with inner message */
+ demultiplex_with_cmc (cmc,
+ inbox);
}
@@ -2496,7 +3017,16 @@ handle_backchannel_encapsulation (void *cls,
{
struct CommunicatorMessageContext *cmc = cls;
- // FIMXE: test if it is for me, if not, try to forward to target (DV routes!)
+ if (0 != memcmp (&be->target,
+ &GST_my_identity,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ /* not for me, try to route to target */
+ route_message (&be->target,
+ GNUNET_copy_message (&be->header));
+ finish_cmc_handling (cmc);
+ return;
+ }
// FIXME: compute shared secret
// FIXME: check HMAC
// FIXME: decrypt payload
@@ -2616,10 +3146,25 @@ handle_dv_box (void *cls,
const struct TransportDVBox *dvb)
{
struct CommunicatorMessageContext *cmc = cls;
-
- // FIXME: are we the target? Then unbox and handle message.
- // FIXME: if we are not the target, shorten path and forward along.
- finish_cmc_handling (cmc);
+ uint16_t size = ntohs (dvb->header.size);
+ uint16_t num_hops = ntohs (dvb->num_hops);
+ const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
+ const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
+
+ if (num_hops > 0)
+ {
+ // FIXME: if we are not the target, shorten path and forward along.
+ // Try from the _end_ of hops array if we know the given
+ // neighbour (shortening the path!).
+ // NOTE: increment total_hops!
+ finish_cmc_handling (cmc);
+ return;
+ }
+ /* We are the target. Unbox and handle message. */
+ cmc->im.sender = dvb->origin;
+ cmc->total_hops = ntohs (dvb->total_hops);
+ demultiplex_with_cmc (cmc,
+ inbox);
}
@@ -2657,6 +3202,25 @@ handle_incoming_msg (void *cls,
{
struct TransportClient *tc = cls;
struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
+
+ cmc->tc = tc;
+ cmc->im = *im;
+ demultiplex_with_cmc (cmc,
+ (const struct GNUNET_MessageHeader *) &im[1]);
+}
+
+
+/**
+ * Given an inbound message @a msg from a communicator @a cmc,
+ * demultiplex it based on the type calling the right handler.
+ *
+ * @param cmc context for demultiplexing
+ * @param msg message to demultiplex
+ */
+static void
+demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
+ const struct GNUNET_MessageHeader *msg)
+{
struct GNUNET_MQ_MessageHandler handlers[] = {
GNUNET_MQ_hd_var_size (fragment_box,
GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
@@ -2690,14 +3254,12 @@ handle_incoming_msg (void *cls,
};
int ret;
- cmc->tc = tc;
- cmc->im = *im;
ret = GNUNET_MQ_handle_message (handlers,
- (const struct GNUNET_MessageHeader *) &im[1]);
+ msg);
if (GNUNET_SYSERR == ret)
{
GNUNET_break (0);
- GNUNET_SERVICE_client_drop (tc->client);
+ GNUNET_SERVICE_client_drop (cmc->tc->client);
GNUNET_free (cmc);
return;
}
@@ -2705,7 +3267,7 @@ handle_incoming_msg (void *cls,
{
/* unencapsulated 'raw' message */
handle_raw_message (&cmc,
- (const struct GNUNET_MessageHeader *) &im[1]);
+ msg);
}
}
@@ -3731,6 +4293,11 @@ do_shutdown (void *cls)
{
(void) cls;
+ if (NULL != ephemeral_task)
+ {
+ GNUNET_SCHEDULER_cancel (ephemeral_task);
+ ephemeral_task = NULL;
+ }
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
&free_neighbour_cb,
NULL);