From cb26df28be6f46898c34d7e8957baa86fa56ed11 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 26 Jan 2019 12:27:50 +0100 Subject: data structures for defragmentation --- src/transport/gnunet-service-tng.c | 623 +++++++++++++++++++++++++++++++++++-- 1 file changed, 595 insertions(+), 28 deletions(-) (limited to 'src/transport') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index cb6fcebdc..ac4a262d7 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -33,8 +33,11 @@ * transport-to-transport traffic) * * Implement: - * - manage defragmentation, retransmission, track RTT, loss, etc. - * - DV data structures, learning, forgetting, using them! + * - data structures for defragmentation + * - manage defragmentation + * - ACK handling / retransmission + * - track RTT, distance, loss, etc. + * - DV data structures, learning, forgetting & using them! * * Easy: * - use ATS bandwidth allocation callback and schedule transmissions! @@ -83,6 +86,7 @@ #include "gnunet_peerstore_service.h" #include "gnunet_hello_lib.h" #include "gnunet_ats_transport_service.h" +#include "gnunet_signatures.h" #include "transport.h" @@ -99,6 +103,16 @@ */ #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +/** + * How long are ephemeral keys valid? + */ +#define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4) + +/** + * How long do we keep partially reassembled messages around before giving up? + */ +#define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4) + /** * How many messages can we have pending for a given communicator * process before we start to throttle that communicator? @@ -169,8 +183,7 @@ struct TransportBackchannelEncapsulationMessage /** - * Body by which a peqer confirms that it is using an ephemeral - * key. + * Body by which a peer confirms that it is using an ephemeral key. */ struct EphemeralConfirmation { @@ -182,9 +195,23 @@ struct EphemeralConfirmation /** * How long is this signature over the ephemeral key valid? + * Note that the receiver MUST IGNORE the absolute time, and + * only interpret the value as a mononic time and reject + * "older" values than the last one observed. Even with this, + * there is no real guarantee against replay achieved here, + * as the latest timestamp is not persisted. This is + * necessary as we do not want to require synchronized + * clocks and may not have a bidirectional communication + * channel. Communicators must protect against replay + * attacks when using backchannel communication! */ struct GNUNET_TIME_AbsoluteNBO ephemeral_validity; + /** + * Target's peer identity. + */ + struct GNUNET_PeerIdentity target; + /** * Ephemeral key setup by the sender for @e target, used * to encrypt the payload. @@ -376,6 +403,12 @@ struct TransportFragmentAckMessage * average transmission time of the sender minus this value. */ struct GNUNET_TIME_RelativeNBO avg_ack_delay; + + /** + * How long until the receiver will stop trying reassembly + * of this message? + */ + struct GNUNET_TIME_RelativeNBO reassembly_timeout; }; @@ -548,6 +581,11 @@ struct EphemeralCacheEntry */ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key; + /** + * Our private ephemeral key. + */ + struct GNUNET_CRYPTO_EcdhePrivateKey private_key; + /** * Node in the ephemeral cache for this entry. * Used for expiration. @@ -726,6 +764,96 @@ struct GNUNET_ATS_Session }; +/** + * Information we keep for a message that we are reassembling. + */ +struct ReassemblyContext +{ + + /** + * Original message ID for of the message that all the + * fragments belong to. + */ + struct GNUNET_ShortHashCode msg_uuid; + + /** + * Which neighbour is this context for? + */ + struct Neighbour *neighbour; + + /** + * Entry in the reassembly heap (sorted by expiration). + */ + struct GNUNET_CONTAINER_HeapNode *hn; + + /** + * Bitfield with @e msg_size bits representing the positions + * where we have received fragments. When we receive a fragment, + * we check the bits in @e bitfield before incrementing @e msg_missing. + * + * Allocated after the reassembled message. + */ + uint8_t *bitfield; + + /** + * Task for sending ACK. We may send ACKs either because of hitting + * the @e extra_acks limit, or based on time and @e num_acks. This + * task is for the latter case. + */ + struct GNUNET_SCHEDULER_Task *ack_task; + + /** + * At what time will we give up reassembly of this message? + */ + struct GNUNET_TIME_Absolute reassembly_timeout; + + /** + * Average delay of all acks in @e extra_acks and @e frag_uuid. + * Should be reset to zero when @e num_acks is set to 0. + */ + struct GNUNET_TIME_Relative avg_ack_delay; + + /** + * Time we received the last fragment. @e avg_ack_delay must be + * incremented by now - @e last_frag multiplied by @e num_acks. + */ + struct GNUNET_TIME_Absolute last_frag; + + /** + * Bitfield of up to 64 additional fragments following @e frag_uuid + * to be acknowledged in the next cummulative ACK. + */ + uint64_t extra_acks; + + /** + * Unique ID of the lowest fragment UUID to be acknowledged in the + * next cummulative ACK. Only valid if @e num_acks > 0. + */ + uint32_t frag_uuid; + + /** + * Number of ACKs we have accumulated so far. Reset to 0 + * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK. + */ + unsigned int num_acks; + + /** + * How big is the message we are reassembling in total? + */ + uint16_t msg_size; + + /** + * How many bytes of the message are still missing? Defragmentation + * is complete when @e msg_missing == 0. + */ + uint16_t msg_missing; + + /* Followed by @e msg_size bytes of the (partially) defragmented original message */ + + /* Followed by @e bitfield data */ +}; + + /** * A neighbour that at least one communicator is connected to. */ @@ -737,6 +865,25 @@ struct Neighbour */ struct GNUNET_PeerIdentity pid; + /** + * Map with `struct ReassemblyContext` structs for fragments under + * reassembly. May be NULL if we currently have no fragments from + * this @e pid (lazy initialization). + */ + struct GNUNET_CONTAINER_MultiShortmap *reassembly_map; + + /** + * Heap with `struct ReassemblyContext` structs for fragments under + * reassembly. May be NULL if we currently have no fragments from + * this @e pid (lazy initialization). + */ + struct GNUNET_CONTAINER_Heap *reassembly_heap; + + /** + * Task to free old entries from the @e reassembly_heap and @e reassembly_map. + */ + struct GNUNET_SCHEDULER_Task *reassembly_timeout_task; + /** * Head of list of messages pending for this neighbour. */ @@ -1177,6 +1324,11 @@ static struct GNUNET_CONTAINER_Heap *ephemeral_heap; */ static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map; +/** + * Task to free expired ephemerals. + */ +static struct GNUNET_SCHEDULER_Task *ephemeral_task; + /** * Our connection to ATS for allocation and bootstrapping. */ @@ -1363,6 +1515,76 @@ client_connect_cb (void *cls, } +/** + * Free @a rc + * + * @param rc data structure to free + */ +static void +free_reassembly_context (struct ReassemblyContext *rc) +{ + struct Neighbour *n = rc->neighbour; + + GNUNET_assert (rc == + GNUNET_CONTAINER_heap_remove_node (rc->hn)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map, + &rc->msg_uuid, + rc)); + GNUNET_free (rc); +} + + +/** + * Task run to clean up reassembly context of a neighbour that have expired. + * + * @param cls a `struct Neighbour` + */ +static void +reassembly_cleanup_task (void *cls) +{ + struct Neighbour *n = cls; + struct ReassemblyContext *rc; + + n->reassembly_timeout_task = NULL; + while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap))) + { + if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us) + { + free_reassembly_context (rc); + continue; + } + GNUNET_assert (NULL == n->reassembly_timeout_task); + n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout, + &reassembly_cleanup_task, + n); + return; + } +} + + +/** + * function called to #free_reassembly_context(). + * + * @param cls NULL + * @param key unused + * @param value a `struct ReassemblyContext` to free + * @return #GNUNET_OK (continue iteration) + */ +static int +free_reassembly_cb (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReassemblyContext *rc = value; + (void) cls; + (void) key; + + free_reassembly_context (rc); + return GNUNET_OK; +} + + /** * Release memory used by @a neighbour. * @@ -1378,6 +1600,18 @@ free_neighbour (struct Neighbour *neighbour) neighbour)); if (NULL != neighbour->timeout_task) GNUNET_SCHEDULER_cancel (neighbour->timeout_task); + if (NULL != neighbour->reassembly_map) + { + GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map, + &free_reassembly_cb, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map); + neighbour->reassembly_map = NULL; + GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap); + neighbour->reassembly_heap = NULL; + } + if (NULL != neighbour->reassembly_timeout_task) + GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task); GNUNET_free (neighbour); } @@ -2054,11 +2288,147 @@ static int check_communicator_backchannel (void *cls, const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) { - // FIXME: check encapsulated message - // FIXME: check 0-termination of communcator at target + const struct GNUNET_MessageHeader *inbox; + const char *is; + uint16_t msize; + uint16_t isize; + + msize = ntohs (cb->header.size) - sizeof (*cb); + if (UINT16_MAX - msize > + sizeof (struct TransportBackchannelEncapsulationMessage) + + sizeof (struct TransportBackchannelRequestPayload) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + inbox = (const struct GNUNET_MessageHeader *) &cb[1]; + isize = ntohs (inbox->size); + if (isize >= msize) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + is = (const char *) inbox; + is += isize; + msize -= isize; + GNUNET_assert (msize > 0); + if ('\0' != is[msize-1]) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } return GNUNET_OK; } + +/** + * Remove memory used by expired ephemeral keys. + * + * @param cls NULL + */ +static void +expire_ephemerals (void *cls) +{ + struct EphemeralCacheEntry *ece; + + (void) cls; + ephemeral_task = NULL; + while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap))) + { + if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) + { + free_ephemeral (ece); + continue; + } + ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity, + &expire_ephemerals, + NULL); + return; + } +} + + +/** + * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate + * one, cache it and return it. + * + * @param pid peer to look up ephemeral for + * @param private_key[out] set to the private key + * @param ephemeral_key[out] set to the key + * @param ephemeral_sender_sig[out] set to the signature + * @param ephemeral_validity[out] set to the validity expiration time + */ +static void +lookup_ephemeral (const struct GNUNET_PeerIdentity *pid, + struct GNUNET_CRYPTO_EcdhePrivateKey *private_key, + struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key, + struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig, + struct GNUNET_TIME_Absolute *ephemeral_validity) +{ + struct EphemeralCacheEntry *ece; + struct EphemeralConfirmation ec; + + ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map, + pid); + if ( (NULL != ece) && + (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) ) + { + free_ephemeral (ece); + ece = NULL; + } + if (NULL == ece) + { + ece = GNUNET_new (struct EphemeralCacheEntry); + ece->target = *pid; + ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg), + EPHEMERAL_VALIDITY); + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key)); + GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key, + &ece->ephemeral_key); + ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL); + ec.purpose.size = htonl (sizeof (ec)); + ec.target = *pid; + ec.ephemeral_key = ece->ephemeral_key; + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_eddsa_sign (GST_my_private_key, + &ec.purpose, + &ece->sender_sig)); + ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap, + ece, + ece->ephemeral_validity.abs_value_us); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (ephemeral_map, + &ece->target, + ece, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + if (NULL == ephemeral_task) + ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity, + &expire_ephemerals, + NULL); + } + *private_key = ece->private_key; + *ephemeral_key = ece->ephemeral_key; + *ephemeral_sender_sig = ece->sender_sig; + *ephemeral_validity = ece->ephemeral_validity; +} + + +/** + * We need to transmit @a hdr to @a target. If necessary, this may + * involve DV routing or even broadcasting and fragmentation. + * + * @param target peer to receive @a hdr + * @param hdr header of the message to route + */ +static void +route_message (const struct GNUNET_PeerIdentity *target, + struct GNUNET_MessageHeader *hdr) +{ + // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting) + GNUNET_free (hdr); +} + /** * Communicator requests backchannel transmission. Process the request. @@ -2071,11 +2441,48 @@ handle_communicator_backchannel (void *cls, const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) { struct TransportClient *tc = cls; - - // FIXME: determine path (possibly DV)! to target peer - // FIXME: encapsulate message, encrypt message! - // FIXME: possibly fragment message - // FIXME: possibly DV-route message! + struct GNUNET_CRYPTO_EcdhePrivateKey private_key; + struct GNUNET_TIME_Absolute ephemeral_validity; + struct TransportBackchannelEncapsulationMessage *enc; + struct TransportBackchannelRequestPayload ppay; + char *mpos; + uint16_t msize; + + /* encapsulate and encrypt message */ + msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload); + enc = GNUNET_malloc (sizeof (*enc) + msize); + enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION); + enc->header.size = htons (sizeof (*enc) + msize); + enc->target = cb->pid; + lookup_ephemeral (&cb->pid, + &private_key, + &enc->ephemeral_key, + &ppay.sender_sig, + &ephemeral_validity); + // FIXME: setup 'iv' +#if FIXME + dh_key_derive (&private_key, + &cb->pid, + &enc->iv, + &key); +#endif + ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity); + ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg)); + mpos = (char *) &enc[1]; +#if FIXME + encrypt (key, + &ppay, + &mpos, + sizeof (ppay)); + encrypt (key, + &cb[1], + &mpos, + ntohs (cb->header.size) - sizeof (*cb)); + hmac (key, + &enc->hmac); +#endif + route_message (&cb->pid, + &enc->header); GNUNET_SERVICE_client_continue (tc->client); } @@ -2260,9 +2667,27 @@ struct CommunicatorMessageContext * Additional information for flow control and about the sender. */ struct GNUNET_TRANSPORT_IncomingMessage im; + + /** + * Number of hops the message has travelled (if DV-routed). + * FIXME: make use of this in ACK handling! + */ + uint16_t total_hops; }; +/** + * Given an inbound message @a msg from a communicator @a cmc, + * demultiplex it based on the type calling the right handler. + * + * @param cmc context for demultiplexing + * @param msg message to demultiplex + */ +static void +demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, + const struct GNUNET_MessageHeader *msg); + + /** * Send ACK to communicator (if requested) and free @a cmc. * @@ -2385,9 +2810,89 @@ handle_fragment_box (void *cls, const struct TransportFragmentBox *fb) { struct CommunicatorMessageContext *cmc = cls; + struct Neighbour *n; + struct ReassemblyContext *rc; + const struct GNUNET_MessageHeader *msg; + uint16_t msize; + + n = GNUNET_CONTAINER_multipeermap_get (neighbours, + &cmc->im.sender); + if (NULL == n) + { + struct GNUNET_SERVICE_Client *client = cmc->tc->client; + + GNUNET_break (0); + finish_cmc_handling (cmc); + GNUNET_SERVICE_client_drop (client); + return; + } + if (NULL == n->reassembly_map) + { + n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8, + GNUNET_YES); + n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION, + &reassembly_cleanup_task, + n); + } + msize = ntohs (fb->msg_size); + rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map, + &fb->msg_uuid); + if (NULL == rc) + { + rc = GNUNET_malloc (sizeof (*rc) + + msize + /* reassembly payload buffer */ + (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */); + rc->msg_uuid = fb->msg_uuid; + rc->neighbour = n; + rc->msg_size = msize; + rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION); + rc->last_frag = GNUNET_TIME_absolute_get (); + rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap, + rc, + rc->reassembly_timeout.abs_value_us); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multishortmap_put (n->reassembly_map, + &rc->msg_uuid, + rc, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size); + rc->msg_missing = rc->msg_size; + } + if (msize != rc->msg_size) + { + GNUNET_break (0); + finish_cmc_handling (cmc); + return; + } - // FIXME: do work! - finish_cmc_handling (cmc); + // FIXME: do work: reassemble + + /* is reassembly complete? */ + if (0 != rc->msg_missing) + { + /* FIXME: possibly send ACK! */ + finish_cmc_handling (cmc); + return; + } + /* reassembly is complete, verify result */ + msg = (const struct GNUNET_MessageHeader *) &rc[1]; + if (ntohs (msg->size) != rc->msg_size) + { + GNUNET_break (0); + free_reassembly_context (rc); + finish_cmc_handling (cmc); + return; + } + /* successful reassembly */ + /* FIXME: definitively send ACK! */ + demultiplex_with_cmc (cmc, + msg); + /* FIXME: really free here? Might be bad if fragments are still + en-route and we forget that we finished this reassembly immediately! + -> keep around until timeout? + -> shorten timeout based on ACK? */ + free_reassembly_context (rc); } @@ -2436,11 +2941,27 @@ handle_reliability_box (void *cls, { struct CommunicatorMessageContext *cmc = cls; const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1]; - - // FIXME: send back reliability ACK (possibly conditional) - /* forward encapsulated message to CORE */ - handle_raw_message (cmc, - inbox); + + if (0 == ntohl (rb->ack_countdown)) + { + struct TransportReliabilityAckMessage *ack; + + /* FIXME: implement cummulative ACKs and ack_countdown, + then setting the avg_ack_delay field below: */ + ack = GNUNET_malloc (sizeof (*ack) + + sizeof (struct GNUNET_ShortHashCode)); + ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK); + ack->header.size = htons (sizeof (*ack) + + sizeof (struct GNUNET_ShortHashCode)); + memcpy (&ack[1], + &rb->msg_uuid, + sizeof (struct GNUNET_ShortHashCode)); + route_message (&cmc->im.sender, + &ack->header); + } + /* continue with inner message */ + demultiplex_with_cmc (cmc, + inbox); } @@ -2496,7 +3017,16 @@ handle_backchannel_encapsulation (void *cls, { struct CommunicatorMessageContext *cmc = cls; - // FIMXE: test if it is for me, if not, try to forward to target (DV routes!) + if (0 != memcmp (&be->target, + &GST_my_identity, + sizeof (struct GNUNET_PeerIdentity))) + { + /* not for me, try to route to target */ + route_message (&be->target, + GNUNET_copy_message (&be->header)); + finish_cmc_handling (cmc); + return; + } // FIXME: compute shared secret // FIXME: check HMAC // FIXME: decrypt payload @@ -2616,10 +3146,25 @@ handle_dv_box (void *cls, const struct TransportDVBox *dvb) { struct CommunicatorMessageContext *cmc = cls; - - // FIXME: are we the target? Then unbox and handle message. - // FIXME: if we are not the target, shorten path and forward along. - finish_cmc_handling (cmc); + uint16_t size = ntohs (dvb->header.size); + uint16_t num_hops = ntohs (dvb->num_hops); + const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1]; + const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops]; + + if (num_hops > 0) + { + // FIXME: if we are not the target, shorten path and forward along. + // Try from the _end_ of hops array if we know the given + // neighbour (shortening the path!). + // NOTE: increment total_hops! + finish_cmc_handling (cmc); + return; + } + /* We are the target. Unbox and handle message. */ + cmc->im.sender = dvb->origin; + cmc->total_hops = ntohs (dvb->total_hops); + demultiplex_with_cmc (cmc, + inbox); } @@ -2657,6 +3202,25 @@ handle_incoming_msg (void *cls, { struct TransportClient *tc = cls; struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext); + + cmc->tc = tc; + cmc->im = *im; + demultiplex_with_cmc (cmc, + (const struct GNUNET_MessageHeader *) &im[1]); +} + + +/** + * Given an inbound message @a msg from a communicator @a cmc, + * demultiplex it based on the type calling the right handler. + * + * @param cmc context for demultiplexing + * @param msg message to demultiplex + */ +static void +demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, + const struct GNUNET_MessageHeader *msg) +{ struct GNUNET_MQ_MessageHandler handlers[] = { GNUNET_MQ_hd_var_size (fragment_box, GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT, @@ -2690,14 +3254,12 @@ handle_incoming_msg (void *cls, }; int ret; - cmc->tc = tc; - cmc->im = *im; ret = GNUNET_MQ_handle_message (handlers, - (const struct GNUNET_MessageHeader *) &im[1]); + msg); if (GNUNET_SYSERR == ret) { GNUNET_break (0); - GNUNET_SERVICE_client_drop (tc->client); + GNUNET_SERVICE_client_drop (cmc->tc->client); GNUNET_free (cmc); return; } @@ -2705,7 +3267,7 @@ handle_incoming_msg (void *cls, { /* unencapsulated 'raw' message */ handle_raw_message (&cmc, - (const struct GNUNET_MessageHeader *) &im[1]); + msg); } } @@ -3731,6 +4293,11 @@ do_shutdown (void *cls) { (void) cls; + if (NULL != ephemeral_task) + { + GNUNET_SCHEDULER_cancel (ephemeral_task); + ephemeral_task = NULL; + } GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL); -- cgit v1.2.3