/* This file is part of GNUnet. Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V. GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . SPDX-License-Identifier: AGPL3.0-or-later */ /** * @file transport/gnunet-service-tng.c * @brief main for gnunet-service-tng * @author Christian Grothoff * * TODO: * - figure out how to transmit (selective) ACKs in case of uni-directional * communicators (with/without core? DV-only?) When do we use ACKs? * => communicators use selective ACKs for flow control * => transport uses message-level ACKs for RTT, fragment confirmation * => integrate DV into transport, use neither core nor communicators * but rather give communicators transport-encapsulated messages * (which could be core-data, background-channel traffic, or * transport-to-transport traffic) * * Implement next: * - address validation: what is our plan here? * #1 Peerstore only gets 'validated' addresses * #2 transport needs another API to "trigger" validation! * API may be used by core/application or communicators; * => use yet another lib/MQ/connection? * #3 transport should use validation to also establish * effective flow control (for uni-directional transports!) * #4 UDP broadcasting logic must be extended to use the new API * #5 only validated addresses go to ATS for scheduling; that * also ensures we know the RTT * #6 to ensure flow control and RTT are OK, we always do the * 'validation', even if address comes from PEERSTORE * #7 * - ACK handling / retransmission * - address verification * - track RTT, distance, loss, etc. * - DV data structures: * + learning * + forgetting * + using them! * - routing of messages (using DV data structures!) * - handling of DV-boxed messages that need to be forwarded * - backchannel message encryption & decryption * - * * Easy: * - use ATS bandwidth allocation callback and schedule transmissions! * * Plan: * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update()) * * Later: * - change transport-core API to provide proper flow control in both * directions, allow multiple messages per peer simultaneously (tag * confirmations with unique message ID), and replace quota-out with * proper flow control; * - if messages are below MTU, consider adding ACKs and other stuff * (requires planning at receiver, and additional MST-style demultiplex * at receiver!) * - could avoid copying body of message into each fragment and keep * fragments as just pointers into the original message and only * fully build fragments just before transmission (optimization, should * reduce CPU and memory use) * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" * when 'ready'. They determine flow implicitly (i.e. TCP blocking) * or explicitly via background channel FC ACKs. As long as the * channel is not full, they may 'notify sent' even if the other * peer has not yet confirmed receipt. The other peer confirming * is _only_ for FC, not for more reliable transmission; reliable * transmission (i.e. of fragments) is left to _transport_. * - ACKs sent back in uni-directional communicators are done via * the background channel API; here transport _may_ initially * broadcast (with bounded # hops) if no path is known; * - transport should _integrate_ DV-routing and build a view of * the network; then background channel traffic can be * routed via DV as well as explicit "DV" traffic. * - background channel is also used for ACKs and NAT traversal support * - transport service is responsible for AEAD'ing the background * channel, timestamps and monotonic time are used against replay * of old messages -> peerstore needs to be supplied with * "latest timestamps seen" data * - if transport implements DV, we likely need a 3rd peermap * in addition to ephemerals and (direct) neighbours * => in this data structure, we should track ATS metrics (distance, RTT, etc.) * as well as latest timestamps seen, goodput, fragments for transmission, etc. * ==> check if stuff needs to be moved out of "Neighbour" * - transport should encapsualte core-level messages and do its * own ACKing for RTT/goodput/loss measurements _and_ fragment * for retransmission */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_statistics_service.h" #include "gnunet_transport_monitor_service.h" #include "gnunet_peerstore_service.h" #include "gnunet_hello_lib.h" #include "gnunet_ats_transport_service.h" #include "gnunet_signatures.h" #include "transport.h" /** * What is the size we assume for a read operation in the * absence of an MTU for the purpose of flow control? */ #define IN_PACKET_SIZE_WITHOUT_MTU 128 /** * If a queue delays the next message by more than this number * of seconds we log a warning. Note: this is for testing, * the value chosen here might be too aggressively low! */ #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) /** * How long are ephemeral keys valid? */ #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4) /** * How long do we keep partially reassembled messages around before giving up? */ #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4) /** * How many messages can we have pending for a given communicator * process before we start to throttle that communicator? * * Used if a communicator might be CPU-bound and cannot handle the traffic. */ #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512 /** * How many messages can we have pending for a given session (queue to * a particular peer via a communicator) process before we start to * throttle that queue? * * Used if ATS assigns more bandwidth to a particular transmission * method than that transmission method can right now handle. (Yes, * ATS should eventually notice utilization below allocation and * adjust, but we don't want to queue up tons of messages in the * meantime). Must be significantly below * #COMMUNICATOR_TOTAL_QUEUE_LIMIT. */ #define SESSION_QUEUE_LIMIT 32 GNUNET_NETWORK_STRUCT_BEGIN /** * Outer layer of an encapsulated backchannel message. */ struct TransportBackchannelEncapsulationMessage { /** * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION. */ struct GNUNET_MessageHeader header; /** * Distance the backchannel message has traveled, to be updated at * each hop. Used to bound the number of hops in case a backchannel * message is broadcast and thus travels without routing * information (during initial backchannel discovery). */ uint32_t distance; /** * Target's peer identity (as backchannels may be transmitted * indirectly, or even be broadcast). */ struct GNUNET_PeerIdentity target; /** * Ephemeral key setup by the sender for @e target, used * to encrypt the payload. */ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key; // FIXME: probably should add random IV here as well, // especially if we re-use ephemeral keys! /** * HMAC over the ciphertext of the encrypted, variable-size * body that follows. Verified via DH of @e target and * @e ephemeral_key */ struct GNUNET_HashCode hmac; /* Followed by encrypted, variable-size payload */ }; /** * Body by which a peer confirms that it is using an ephemeral key. */ struct EphemeralConfirmation { /** * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL */ struct GNUNET_CRYPTO_EccSignaturePurpose purpose; /** * How long is this signature over the ephemeral key valid? * Note that the receiver MUST IGNORE the absolute time, and * only interpret the value as a mononic time and reject * "older" values than the last one observed. Even with this, * there is no real guarantee against replay achieved here, * as the latest timestamp is not persisted. This is * necessary as we do not want to require synchronized * clocks and may not have a bidirectional communication * channel. Communicators must protect against replay * attacks when using backchannel communication! */ struct GNUNET_TIME_AbsoluteNBO ephemeral_validity; /** * Target's peer identity. */ struct GNUNET_PeerIdentity target; /** * Ephemeral key setup by the sender for @e target, used * to encrypt the payload. */ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key; }; /** * Plaintext of the variable-size payload that is encrypted * within a `struct TransportBackchannelEncapsulationMessage` */ struct TransportBackchannelRequestPayload { /** * Sender's peer identity. */ struct GNUNET_PeerIdentity sender; /** * Signature of the sender over an * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL. */ struct GNUNET_CRYPTO_EddsaSignature sender_sig; /** * How long is this signature over the ephemeral key * valid? */ struct GNUNET_TIME_AbsoluteNBO ephemeral_validity; /** * Current monotonic time of the sending transport service. Used to * detect replayed messages. Note that the receiver should remember * a list of the recently seen timestamps and only reject messages * if the timestamp is in the list, or the list is "full" and the * timestamp is smaller than the lowest in the list. This list of * timestamps per peer should be persisted to guard against replays * after restarts. */ struct GNUNET_TIME_AbsoluteNBO monotonic_time; /* Followed by a `struct GNUNET_MessageHeader` with a message for a communicator */ /* Followed by a 0-termianted string specifying the name of the communicator which is to receive the message */ }; /** * Outer layer of an encapsulated unfragmented application message sent * over an unreliable channel. */ struct TransportReliabilityBox { /** * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX */ struct GNUNET_MessageHeader header; /** * Number of messages still to be sent before a commulative * ACK is requested. Zero if an ACK is requested immediately. * In NBO. Note that the receiver may send the ACK faster * if it believes that is reasonable. */ uint32_t ack_countdown GNUNET_PACKED; /** * Unique ID of the message used for signalling receipt of * messages sent over possibly unreliable channels. Should * be a random. */ struct GNUNET_ShortHashCode msg_uuid; }; /** * Confirmation that the receiver got a * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the * confirmation may be transmitted over a completely different queue, * so ACKs are identified by a combination of PID of sender and * message UUID, without the queue playing any role! */ struct TransportReliabilityAckMessage { /** * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK */ struct GNUNET_MessageHeader header; /** * Reserved. Zero. */ uint32_t reserved GNUNET_PACKED; /** * How long was the ACK delayed relative to the average time of * receipt of the messages being acknowledged? Used to calculate * the average RTT by taking the receipt time of the ack minus the * average transmission time of the sender minus this value. */ struct GNUNET_TIME_RelativeNBO avg_ack_delay; /* followed by any number of `struct GNUNET_ShortHashCode` messages providing ACKs */ }; /** * Outer layer of an encapsulated fragmented application message. */ struct TransportFragmentBox { /** * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT */ struct GNUNET_MessageHeader header; /** * Unique ID of this fragment (and fragment transmission!). Will * change even if a fragement is retransmitted to make each * transmission attempt unique! Should be incremented by one for * each fragment transmission. If a client receives a duplicate * fragment (same @e frag_off), it must send * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately. */ uint32_t frag_uuid GNUNET_PACKED; /** * Original message ID for of the message that all the1 * fragments belong to. Must be the same for all fragments. */ struct GNUNET_ShortHashCode msg_uuid; /** * Offset of this fragment in the overall message. */ uint16_t frag_off GNUNET_PACKED; /** * Total size of the message that is being fragmented. */ uint16_t msg_size GNUNET_PACKED; }; /** * Outer layer of an fragmented application message sent over a queue * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is * received, the receiver has two RTTs or 64 further fragments with * the same basic message time to send an acknowledgement, possibly * acknowledging up to 65 fragments in one ACK. ACKs must also be * sent immediately once all fragments were sent. */ struct TransportFragmentAckMessage { /** * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK */ struct GNUNET_MessageHeader header; /** * Unique ID of the lowest fragment UUID being acknowledged. */ uint32_t frag_uuid GNUNET_PACKED; /** * Bitfield of up to 64 additional fragments following the * @e msg_uuid being acknowledged by this message. */ uint64_t extra_acks GNUNET_PACKED; /** * Original message ID for of the message that all the * fragments belong to. */ struct GNUNET_ShortHashCode msg_uuid; /** * How long was the ACK delayed relative to the average time of * receipt of the fragments being acknowledged? Used to calculate * the average RTT by taking the receipt time of the ack minus the * average transmission time of the sender minus this value. */ struct GNUNET_TIME_RelativeNBO avg_ack_delay; /** * How long until the receiver will stop trying reassembly * of this message? */ struct GNUNET_TIME_RelativeNBO reassembly_timeout; }; /** * Internal message used by transport for distance vector learning. * If @e num_hops does not exceed the threshold, peers should append * themselves to the peer list and flood the message (possibly only * to a subset of their neighbours to limit discoverability of the * network topology). To the extend that the @e bidirectional bits * are set, peers may learn the inverse paths even if they did not * initiate. * * Unless received on a bidirectional queue and @e num_hops just * zero, peers that can forward to the initator should always try to * forward to the initiator. */ struct TransportDVLearn { /** * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN */ struct GNUNET_MessageHeader header; /** * Number of hops this messages has travelled, in NBO. Zero if * sent by initiator. */ uint16_t num_hops GNUNET_PACKED; /** * Bitmask of the last 16 hops indicating whether they are confirmed * available (without DV) in both directions or not, in NBO. Used * to possibly instantly learn a path in both directions. Each peer * should shift this value by one to the left, and then set the * lowest bit IF the current sender can be reached from it (without * DV routing). */ uint16_t bidirectional GNUNET_PACKED; /** * Peers receiving this message and delaying forwarding to other * peers for any reason should increment this value such as to * enable the origin to determine the actual network-only delay * in addition to the real-time delay (assuming the message loops * back to the origin). */ struct GNUNET_TIME_Relative cummulative_non_network_delay; /** * Identity of the peer that started this learning activity. */ struct GNUNET_PeerIdentity initiator; /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values, excluding the initiator of the DV trace; the last entry is the current sender; the current peer must not be included. */ }; /** * Outer layer of an encapsulated message send over multiple hops. * The path given only includes the identities of the subsequent * peers, i.e. it will be empty if we are the receiver. Each * forwarding peer should scan the list from the end, and if it can, * forward to the respective peer. The list should then be shortened * by all the entries up to and including that peer. Each hop should * also increment @e total_hops to allow the receiver to get a precise * estimate on the number of hops the message travelled. Senders must * provide a learned path that thus should work, but intermediaries * know of a shortcut, they are allowed to send the message via that * shortcut. * * If a peer finds itself still on the list, it must drop the message. */ struct TransportDVBox { /** * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX */ struct GNUNET_MessageHeader header; /** * Number of total hops this messages travelled. In NBO. * @e origin sets this to zero, to be incremented at * each hop. */ uint16_t total_hops GNUNET_PACKED; /** * Number of hops this messages includes. In NBO. */ uint16_t num_hops GNUNET_PACKED; /** * Identity of the peer that originated the message. */ struct GNUNET_PeerIdentity origin; /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values; excluding the @e origin and the current peer, the last must be the ultimate target; if @e num_hops is zero, the receiver of this message is the ultimate target. */ /* Followed by the actual message, which itself may be another box, but not a DV_LEARN or DV_BOX message! */ }; GNUNET_NETWORK_STRUCT_END /** * What type of client is the `struct TransportClient` about? */ enum ClientType { /** * We do not know yet (client is fresh). */ CT_NONE = 0, /** * Is the CORE service, we need to forward traffic to it. */ CT_CORE = 1, /** * It is a monitor, forward monitor data. */ CT_MONITOR = 2, /** * It is a communicator, use for communication. */ CT_COMMUNICATOR = 3 }; /** * Entry in our cache of ephemeral keys we currently use. * This way, we only sign an ephemeral once per @e target, * and then can re-use it over multiple * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION * messages (as signing is expensive). */ struct EphemeralCacheEntry { /** * Target's peer identity (we don't re-use ephemerals * to limit linkability of messages). */ struct GNUNET_PeerIdentity target; /** * Signature affirming @e ephemeral_key of type * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL */ struct GNUNET_CRYPTO_EddsaSignature sender_sig; /** * How long is @e sender_sig valid */ struct GNUNET_TIME_Absolute ephemeral_validity; /** * Our ephemeral key. */ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key; /** * Our private ephemeral key. */ struct GNUNET_CRYPTO_EcdhePrivateKey private_key; /** * Node in the ephemeral cache for this entry. * Used for expiration. */ struct GNUNET_CONTAINER_HeapNode *hn; }; /** * Client connected to the transport service. */ struct TransportClient; /** * A neighbour that at least one communicator is connected to. */ struct Neighbour; /** * Entry in our #dv_routes table, representing a (set of) distance * vector routes to a particular peer. */ struct DistanceVector; /** * One possible hop towards a DV target. */ struct DistanceVectorHop { /** * Kept in a MDLL, sorted by @e timeout. */ struct DistanceVectorHop *next_dv; /** * Kept in a MDLL, sorted by @e timeout. */ struct DistanceVectorHop *prev_dv; /** * Kept in a MDLL. */ struct DistanceVectorHop *next_neighbour; /** * Kept in a MDLL. */ struct DistanceVectorHop *prev_neighbour; /** * What would be the next hop to @e target? */ struct Neighbour *next_hop; /** * Distance vector entry this hop belongs with. */ struct DistanceVector *dv; /** * Array of @e distance hops to the target, excluding @e next_hop. * NULL if the entire path is us to @e next_hop to `target`. Allocated * at the end of this struct. */ const struct GNUNET_PeerIdentity *path; /** * At what time do we forget about this path unless we see it again * while learning? */ struct GNUNET_TIME_Absolute timeout; /** * How many hops in total to the `target` (excluding @e next_hop and `target` itself), * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)? */ unsigned int distance; }; /** * Entry in our #dv_routes table, representing a (set of) distance * vector routes to a particular peer. */ struct DistanceVector { /** * To which peer is this a route? */ struct GNUNET_PeerIdentity target; /** * Known paths to @e target. */ struct DistanceVectorHop *dv_head; /** * Known paths to @e target. */ struct DistanceVectorHop *dv_tail; /** * Task scheduled to purge expired paths from @e dv_head MDLL. */ struct GNUNET_SCHEDULER_Task *timeout_task; }; /** * Entry identifying transmission in one of our `struct * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to * ensure we do not overwhelm a communicator and limit the number of * messages outstanding per communicator (say in case communicator is * CPU bound) and per queue (in case ATS bandwidth allocation exceeds * what the communicator can actually provide towards a particular * peer/target). */ struct QueueEntry { /** * Kept as a DLL. */ struct QueueEntry *next; /** * Kept as a DLL. */ struct QueueEntry *prev; /** * ATS session this entry is queued with. */ struct GNUNET_ATS_Session *session; /** * Message ID used for this message with the queue used for transmission. */ uint64_t mid; }; /** * An ATS session is a message queue provided by a communicator * via which we can reach a particular neighbour. */ struct GNUNET_ATS_Session { /** * Kept in a MDLL. */ struct GNUNET_ATS_Session *next_neighbour; /** * Kept in a MDLL. */ struct GNUNET_ATS_Session *prev_neighbour; /** * Kept in a MDLL. */ struct GNUNET_ATS_Session *prev_client; /** * Kept in a MDLL. */ struct GNUNET_ATS_Session *next_client; /** * Head of DLL of unacked transmission requests. */ struct QueueEntry *queue_head; /** * End of DLL of unacked transmission requests. */ struct QueueEntry *queue_tail; /** * Which neighbour is this ATS session for? */ struct Neighbour *neighbour; /** * Which communicator offers this ATS session? */ struct TransportClient *tc; /** * Address served by the ATS session. */ const char *address; /** * Handle by which we inform ATS about this queue. */ struct GNUNET_ATS_SessionRecord *sr; /** * Task scheduled for the time when this queue can (likely) transmit the * next message. Still needs to check with the @e tracker_out to be sure. */ struct GNUNET_SCHEDULER_Task *transmit_task; /** * Our current RTT estimate for this ATS session. */ struct GNUNET_TIME_Relative rtt; /** * Message ID generator for transmissions on this queue. */ uint64_t mid_gen; /** * Unique identifier of this ATS session with the communicator. */ uint32_t qid; /** * Maximum transmission unit supported by this ATS session. */ uint32_t mtu; /** * Distance to the target of this ATS session. */ uint32_t distance; /** * Messages pending. */ uint32_t num_msg_pending; /** * Bytes pending. */ uint32_t num_bytes_pending; /** * Length of the DLL starting at @e queue_head. */ unsigned int queue_length; /** * Network type offered by this ATS session. */ enum GNUNET_NetworkType nt; /** * Connection status for this ATS session. */ enum GNUNET_TRANSPORT_ConnectionStatus cs; /** * How much outbound bandwidth do we have available for this session? */ struct GNUNET_BANDWIDTH_Tracker tracker_out; /** * How much inbound bandwidth do we have available for this session? */ struct GNUNET_BANDWIDTH_Tracker tracker_in; }; /** * Information we keep for a message that we are reassembling. */ struct ReassemblyContext { /** * Original message ID for of the message that all the * fragments belong to. */ struct GNUNET_ShortHashCode msg_uuid; /** * Which neighbour is this context for? */ struct Neighbour *neighbour; /** * Entry in the reassembly heap (sorted by expiration). */ struct GNUNET_CONTAINER_HeapNode *hn; /** * Bitfield with @e msg_size bits representing the positions * where we have received fragments. When we receive a fragment, * we check the bits in @e bitfield before incrementing @e msg_missing. * * Allocated after the reassembled message. */ uint8_t *bitfield; /** * Task for sending ACK. We may send ACKs either because of hitting * the @e extra_acks limit, or based on time and @e num_acks. This * task is for the latter case. */ struct GNUNET_SCHEDULER_Task *ack_task; /** * At what time will we give up reassembly of this message? */ struct GNUNET_TIME_Absolute reassembly_timeout; /** * Average delay of all acks in @e extra_acks and @e frag_uuid. * Should be reset to zero when @e num_acks is set to 0. */ struct GNUNET_TIME_Relative avg_ack_delay; /** * Time we received the last fragment. @e avg_ack_delay must be * incremented by now - @e last_frag multiplied by @e num_acks. */ struct GNUNET_TIME_Absolute last_frag; /** * Bitfield of up to 64 additional fragments following @e frag_uuid * to be acknowledged in the next cummulative ACK. */ uint64_t extra_acks; /** * Unique ID of the lowest fragment UUID to be acknowledged in the * next cummulative ACK. Only valid if @e num_acks > 0. */ uint32_t frag_uuid; /** * Number of ACKs we have accumulated so far. Reset to 0 * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK. */ unsigned int num_acks; /** * How big is the message we are reassembling in total? */ uint16_t msg_size; /** * How many bytes of the message are still missing? Defragmentation * is complete when @e msg_missing == 0. */ uint16_t msg_missing; /* Followed by @e msg_size bytes of the (partially) defragmented original message */ /* Followed by @e bitfield data */ }; /** * A neighbour that at least one communicator is connected to. */ struct Neighbour { /** * Which peer is this about? */ struct GNUNET_PeerIdentity pid; /** * Map with `struct ReassemblyContext` structs for fragments under * reassembly. May be NULL if we currently have no fragments from * this @e pid (lazy initialization). */ struct GNUNET_CONTAINER_MultiShortmap *reassembly_map; /** * Heap with `struct ReassemblyContext` structs for fragments under * reassembly. May be NULL if we currently have no fragments from * this @e pid (lazy initialization). */ struct GNUNET_CONTAINER_Heap *reassembly_heap; /** * Task to free old entries from the @e reassembly_heap and @e reassembly_map. */ struct GNUNET_SCHEDULER_Task *reassembly_timeout_task; /** * Head of list of messages pending for this neighbour. */ struct PendingMessage *pending_msg_head; /** * Tail of list of messages pending for this neighbour. */ struct PendingMessage *pending_msg_tail; /** * Head of MDLL of DV hops that have this neighbour as next hop. Must be * purged if this neighbour goes down. */ struct DistanceVectorHop *dv_head; /** * Tail of MDLL of DV hops that have this neighbour as next hop. Must be * purged if this neighbour goes down. */ struct DistanceVectorHop *dv_tail; /** * Head of DLL of ATS sessions to this peer. */ struct GNUNET_ATS_Session *session_head; /** * Tail of DLL of ATS sessions to this peer. */ struct GNUNET_ATS_Session *session_tail; /** * Task run to cleanup pending messages that have exceeded their timeout. */ struct GNUNET_SCHEDULER_Task *timeout_task; /** * Quota at which CORE is allowed to transmit to this peer * according to ATS. * * FIXME: not yet used, tricky to get right given multiple queues! * (=> Idea: let ATS set a quota per queue and we add them up here?) * FIXME: how do we set this value initially when we tell CORE? * Options: start at a minimum value or at literally zero (before ATS?) * (=> Current thought: clean would be zero!) */ struct GNUNET_BANDWIDTH_Value32NBO quota_out; /** * What is the earliest timeout of any message in @e pending_msg_tail? */ struct GNUNET_TIME_Absolute earliest_timeout; }; /** * Types of different pending messages. */ enum PendingMessageType { /** * Ordinary message received from the CORE service. */ PMT_CORE = 0, /** * Fragment box. */ PMT_FRAGMENT_BOX = 1, /** * Reliability box. */ PMT_RELIABILITY_BOX = 2, /** * Any type of acknowledgement. */ PMT_ACKNOWLEDGEMENT = 3 }; /** * Transmission request that is awaiting delivery. The original * transmission requests from CORE may be too big for some queues. * In this case, a *tree* of fragments is created. At each * level of the tree, fragments are kept in a DLL ordered by which * fragment should be sent next (at the head). The tree is searched * top-down, with the original message at the root. * * To select a node for transmission, first it is checked if the * current node's message fits with the MTU. If it does not, we * either calculate the next fragment (based on @e frag_off) from the * current node, or, if all fragments have already been created, * descend to the @e head_frag. Even though the node was already * fragmented, the fragment may be too big if the fragment was * generated for a queue with a larger MTU. In this case, the node * may be fragmented again, thus creating a tree. * * When acknowledgements for fragments are received, the tree * must be pruned, removing those parts that were already * acknowledged. When fragments are sent over a reliable * channel, they can be immediately removed. * * If a message is ever fragmented, then the original "full" message * is never again transmitted (even if it fits below the MTU), and * only (remaining) fragments are sent. */ struct PendingMessage { /** * Kept in a MDLL of messages for this @a target. */ struct PendingMessage *next_neighbour; /** * Kept in a MDLL of messages for this @a target. */ struct PendingMessage *prev_neighbour; /** * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE) */ struct PendingMessage *next_client; /** * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE) */ struct PendingMessage *prev_client; /** * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx) */ struct PendingMessage *next_frag; /** * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX) */ struct PendingMessage *prev_frag; /** * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE. */ struct PendingMessage *bpm; /** * Target of the request. */ struct Neighbour *target; /** * Client that issued the transmission request, if @e pmt is #PMT_CORE. */ struct TransportClient *client; /** * Head of a MDLL of fragments created for this core message. */ struct PendingMessage *head_frag; /** * Tail of a MDLL of fragments created for this core message. */ struct PendingMessage *tail_frag; /** * Our parent in the fragmentation tree. */ struct PendingMessage *frag_parent; /** * At what time should we give up on the transmission (and no longer retry)? */ struct GNUNET_TIME_Absolute timeout; /** * What is the earliest time for us to retry transmission of this message? */ struct GNUNET_TIME_Absolute next_attempt; /** * UUID to use for this message (used for reassembly of fragments, only * initialized if @e msg_uuid_set is #GNUNET_YES). */ struct GNUNET_ShortHashCode msg_uuid; /** * Counter incremented per generated fragment. */ uint32_t frag_uuidgen; /** * Type of the pending message. */ enum PendingMessageType pmt; /** * Size of the original message. */ uint16_t bytes_msg; /** * Offset at which we should generate the next fragment. */ uint16_t frag_off; /** * #GNUNET_YES once @e msg_uuid was initialized */ int16_t msg_uuid_set; /* Followed by @e bytes_msg to transmit */ }; /** * One of the addresses of this peer. */ struct AddressListEntry { /** * Kept in a DLL. */ struct AddressListEntry *next; /** * Kept in a DLL. */ struct AddressListEntry *prev; /** * Which communicator provides this address? */ struct TransportClient *tc; /** * The actual address. */ const char *address; /** * Current context for storing this address in the peerstore. */ struct GNUNET_PEERSTORE_StoreContext *sc; /** * Task to periodically do @e st operation. */ struct GNUNET_SCHEDULER_Task *st; /** * What is a typical lifetime the communicator expects this * address to have? (Always from now.) */ struct GNUNET_TIME_Relative expiration; /** * Address identifier used by the communicator. */ uint32_t aid; /** * Network type offered by this address. */ enum GNUNET_NetworkType nt; }; /** * Client connected to the transport service. */ struct TransportClient { /** * Kept in a DLL. */ struct TransportClient *next; /** * Kept in a DLL. */ struct TransportClient *prev; /** * Handle to the client. */ struct GNUNET_SERVICE_Client *client; /** * Message queue to the client. */ struct GNUNET_MQ_Handle *mq; /** * What type of client is this? */ enum ClientType type; union { /** * Information for @e type #CT_CORE. */ struct { /** * Head of list of messages pending for this client, sorted by * transmission time ("next_attempt" + possibly internal prioritization). */ struct PendingMessage *pending_msg_head; /** * Tail of list of messages pending for this client. */ struct PendingMessage *pending_msg_tail; } core; /** * Information for @e type #CT_MONITOR. */ struct { /** * Peer identity to monitor the addresses of. * Zero to monitor all neighbours. Valid if * @e type is #CT_MONITOR. */ struct GNUNET_PeerIdentity peer; /** * Is this a one-shot monitor? */ int one_shot; } monitor; /** * Information for @e type #CT_COMMUNICATOR. */ struct { /** * If @e type is #CT_COMMUNICATOR, this communicator * supports communicating using these addresses. */ char *address_prefix; /** * Head of DLL of queues offered by this communicator. */ struct GNUNET_ATS_Session *session_head; /** * Tail of DLL of queues offered by this communicator. */ struct GNUNET_ATS_Session *session_tail; /** * Head of list of the addresses of this peer offered by this communicator. */ struct AddressListEntry *addr_head; /** * Tail of list of the addresses of this peer offered by this communicator. */ struct AddressListEntry *addr_tail; /** * Number of queue entries in all queues to this communicator. Used * throttle sending to a communicator if we see that the communicator * is globally unable to keep up. */ unsigned int total_queue_length; /** * Characteristics of this communicator. */ enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; } communicator; } details; }; /** * Head of linked list of all clients to this service. */ static struct TransportClient *clients_head; /** * Tail of linked list of all clients to this service. */ static struct TransportClient *clients_tail; /** * Statistics handle. */ static struct GNUNET_STATISTICS_Handle *GST_stats; /** * Configuration handle. */ static const struct GNUNET_CONFIGURATION_Handle *GST_cfg; /** * Our public key. */ static struct GNUNET_PeerIdentity GST_my_identity; /** * Our private key. */ static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key; /** * Map from PIDs to `struct Neighbour` entries. A peer is * a neighbour if we have an MQ to it from some communicator. */ static struct GNUNET_CONTAINER_MultiPeerMap *neighbours; /** * Map from PIDs to `struct DistanceVector` entries describing * known paths to the peer. */ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes; /** * Database for peer's HELLOs. */ static struct GNUNET_PEERSTORE_Handle *peerstore; /** * Heap sorting `struct EphemeralCacheEntry` by their * key/signature validity. */ static struct GNUNET_CONTAINER_Heap *ephemeral_heap; /** * Hash map for looking up `struct EphemeralCacheEntry`s * by peer identity. (We may have ephemerals in our * cache for which we do not have a neighbour entry, * and similar many neighbours may not need ephemerals, * so we use a second map.) */ static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map; /** * Task to free expired ephemerals. */ static struct GNUNET_SCHEDULER_Task *ephemeral_task; /** * Our connection to ATS for allocation and bootstrapping. */ static struct GNUNET_ATS_TransportHandle *ats; /** * Free cached ephemeral key. * * @param ece cached signature to free */ static void free_ephemeral (struct EphemeralCacheEntry *ece) { GNUNET_CONTAINER_multipeermap_remove (ephemeral_map, &ece->target, ece); GNUNET_CONTAINER_heap_remove_node (ece->hn); GNUNET_free (ece); } /** * Lookup neighbour record for peer @a pid. * * @param pid neighbour to look for * @return NULL if we do not have this peer as a neighbour */ static struct Neighbour * lookup_neighbour (const struct GNUNET_PeerIdentity *pid) { return GNUNET_CONTAINER_multipeermap_get (neighbours, pid); } /** * Details about what to notify monitors about. */ struct MonitorEvent { /** * @deprecated To be discussed if we keep these... */ struct GNUNET_TIME_Absolute last_validation; struct GNUNET_TIME_Absolute valid_until; struct GNUNET_TIME_Absolute next_validation; /** * Current round-trip time estimate. */ struct GNUNET_TIME_Relative rtt; /** * Connection status. */ enum GNUNET_TRANSPORT_ConnectionStatus cs; /** * Messages pending. */ uint32_t num_msg_pending; /** * Bytes pending. */ uint32_t num_bytes_pending; }; /** * Free a @dvh, and if it is the last path to the `target`,also * free the associated DV entry in #dv_routes. * * @param dvh hop to free */ static void free_distance_vector_hop (struct DistanceVectorHop *dvh) { struct Neighbour *n = dvh->next_hop; struct DistanceVector *dv = dvh->dv; GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh); GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh); GNUNET_free (dvh); if (NULL == dv->dv_head) { GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); if (NULL != dv->timeout_task) GNUNET_SCHEDULER_cancel (dv->timeout_task); GNUNET_free (dv); } } /** * Free entry in #dv_routes. First frees all hops to the target, and * the last target will implicitly free @a dv as well. * * @param dv route to free */ static void free_dv_route (struct DistanceVector *dv) { struct DistanceVectorHop *dvh; while (NULL != (dvh = dv->dv_head)) free_distance_vector_hop (dvh); } /** * Notify monitor @a tc about an event. That @a tc * cares about the event has already been checked. * * Send @a tc information in @a me about a @a peer's status with * respect to some @a address to all monitors that care. * * @param tc monitor to inform * @param peer peer the information is about * @param address address the information is about * @param nt network type associated with @a address * @param me detailed information to transmit */ static void notify_monitor (struct TransportClient *tc, const struct GNUNET_PeerIdentity *peer, const char *address, enum GNUNET_NetworkType nt, const struct MonitorEvent *me) { struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_MonitorData *md; size_t addr_len = strlen (address) + 1; env = GNUNET_MQ_msg_extra (md, addr_len, GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA); md->nt = htonl ((uint32_t) nt); md->peer = *peer; md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation); md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until); md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation); md->rtt = GNUNET_TIME_relative_hton (me->rtt); md->cs = htonl ((uint32_t) me->cs); md->num_msg_pending = htonl (me->num_msg_pending); md->num_bytes_pending = htonl (me->num_bytes_pending); memcpy (&md[1], address, addr_len); GNUNET_MQ_send (tc->mq, env); } /** * Send information in @a me about a @a peer's status with respect * to some @a address to all monitors that care. * * @param peer peer the information is about * @param address address the information is about * @param nt network type associated with @a address * @param me detailed information to transmit */ static void notify_monitors (const struct GNUNET_PeerIdentity *peer, const char *address, enum GNUNET_NetworkType nt, const struct MonitorEvent *me) { static struct GNUNET_PeerIdentity zero; for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) { if (CT_MONITOR != tc->type) continue; if (tc->details.monitor.one_shot) continue; if ( (0 != memcmp (&tc->details.monitor.peer, &zero, sizeof (zero))) && (0 != memcmp (&tc->details.monitor.peer, peer, sizeof (*peer))) ) continue; notify_monitor (tc, peer, address, nt, me); } } /** * Called whenever a client connects. Allocates our * data structures associated with that client. * * @param cls closure, NULL * @param client identification of the client * @param mq message queue for the client * @return our `struct TransportClient` */ static void * client_connect_cb (void *cls, struct GNUNET_SERVICE_Client *client, struct GNUNET_MQ_Handle *mq) { struct TransportClient *tc; tc = GNUNET_new (struct TransportClient); tc->client = client; tc->mq = mq; GNUNET_CONTAINER_DLL_insert (clients_head, clients_tail, tc); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected\n", tc); return tc; } /** * Free @a rc * * @param rc data structure to free */ static void free_reassembly_context (struct ReassemblyContext *rc) { struct Neighbour *n = rc->neighbour; GNUNET_assert (rc == GNUNET_CONTAINER_heap_remove_node (rc->hn)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map, &rc->msg_uuid, rc)); GNUNET_free (rc); } /** * Task run to clean up reassembly context of a neighbour that have expired. * * @param cls a `struct Neighbour` */ static void reassembly_cleanup_task (void *cls) { struct Neighbour *n = cls; struct ReassemblyContext *rc; n->reassembly_timeout_task = NULL; while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap))) { if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us) { free_reassembly_context (rc); continue; } GNUNET_assert (NULL == n->reassembly_timeout_task); n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout, &reassembly_cleanup_task, n); return; } } /** * function called to #free_reassembly_context(). * * @param cls NULL * @param key unused * @param value a `struct ReassemblyContext` to free * @return #GNUNET_OK (continue iteration) */ static int free_reassembly_cb (void *cls, const struct GNUNET_ShortHashCode *key, void *value) { struct ReassemblyContext *rc = value; (void) cls; (void) key; free_reassembly_context (rc); return GNUNET_OK; } /** * Release memory used by @a neighbour. * * @param neighbour neighbour entry to free */ static void free_neighbour (struct Neighbour *neighbour) { struct DistanceVectorHop *dvh; GNUNET_assert (NULL == neighbour->session_head); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (neighbours, &neighbour->pid, neighbour)); if (NULL != neighbour->timeout_task) GNUNET_SCHEDULER_cancel (neighbour->timeout_task); if (NULL != neighbour->reassembly_map) { GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map, &free_reassembly_cb, NULL); GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map); neighbour->reassembly_map = NULL; GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap); neighbour->reassembly_heap = NULL; } while (NULL != (dvh = neighbour->dv_head)) free_distance_vector_hop (dvh); if (NULL != neighbour->reassembly_timeout_task) GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task); GNUNET_free (neighbour); } /** * Send message to CORE clients that we lost a connection. * * @param tc client to inform (must be CORE client) * @param pid peer the connection is for * @param quota_out current quota for the peer */ static void core_send_connect_info (struct TransportClient *tc, const struct GNUNET_PeerIdentity *pid, struct GNUNET_BANDWIDTH_Value32NBO quota_out) { struct GNUNET_MQ_Envelope *env; struct ConnectInfoMessage *cim; GNUNET_assert (CT_CORE == tc->type); env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); cim->quota_out = quota_out; cim->id = *pid; GNUNET_MQ_send (tc->mq, env); } /** * Send message to CORE clients that we gained a connection * * @param pid peer the queue was for * @param quota_out current quota for the peer */ static void cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, struct GNUNET_BANDWIDTH_Value32NBO quota_out) { for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) { if (CT_CORE != tc->type) continue; core_send_connect_info (tc, pid, quota_out); } } /** * Send message to CORE clients that we lost a connection. * * @param pid peer the connection was for */ static void cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) { for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) { struct GNUNET_MQ_Envelope *env; struct DisconnectInfoMessage *dim; if (CT_CORE != tc->type) continue; env = GNUNET_MQ_msg (dim, GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); dim->peer = *pid; GNUNET_MQ_send (tc->mq, env); } } /** * We believe we are ready to transmit a message on a queue. Double-checks * with the queue's "tracker_out" and then gives the message to the * communicator for transmission (updating the tracker, and re-scheduling * itself if applicable). * * @param cls the `struct GNUNET_ATS_Session` to process transmissions for */ static void transmit_on_queue (void *cls); /** * Schedule next run of #transmit_on_queue(). Does NOTHING if * we should run immediately or if the message queue is empty. * Test for no task being added AND queue not being empty to * transmit immediately afterwards! This function must only * be called if the message queue is non-empty! * * @param queue the queue to do scheduling for */ static void schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) { struct Neighbour *n = queue->neighbour; struct PendingMessage *pm = n->pending_msg_head; struct GNUNET_TIME_Relative out_delay; unsigned int wsize; GNUNET_assert (NULL != pm); if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT) { GNUNET_STATISTICS_update (GST_stats, "# Transmission throttled due to communicator queue limit", 1, GNUNET_NO); return; } if (queue->queue_length >= SESSION_QUEUE_LIMIT) { GNUNET_STATISTICS_update (GST_stats, "# Transmission throttled due to session queue limit", 1, GNUNET_NO); return; } wsize = (0 == queue->mtu) ? pm->bytes_msg /* FIXME: add overheads? */ : queue->mtu; out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, wsize); out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), out_delay); if (0 == out_delay.rel_value_us) return; /* we should run immediately! */ /* queue has changed since we were scheduled, reschedule again */ queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, &transmit_on_queue, queue); if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Next transmission on queue `%s' in %s (high delay)\n", queue->address, GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES)); else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Next transmission on queue `%s' in %s\n", queue->address, GNUNET_STRINGS_relative_time_to_string (out_delay, GNUNET_YES)); } /** * Free @a session. * * @param session the session to free */ static void free_session (struct GNUNET_ATS_Session *session) { struct Neighbour *neighbour = session->neighbour; struct TransportClient *tc = session->tc; struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN, .rtt = GNUNET_TIME_UNIT_FOREVER_REL }; struct QueueEntry *qe; int maxxed; if (NULL != session->transmit_task) { GNUNET_SCHEDULER_cancel (session->transmit_task); session->transmit_task = NULL; } GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->session_head, neighbour->session_tail, session); GNUNET_CONTAINER_MDLL_remove (client, tc->details.communicator.session_head, tc->details.communicator.session_tail, session); maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length); while (NULL != (qe = session->queue_head)) { GNUNET_CONTAINER_DLL_remove (session->queue_head, session->queue_tail, qe); session->queue_length--; tc->details.communicator.total_queue_length--; GNUNET_free (qe); } GNUNET_assert (0 == session->queue_length); if ( (maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) ) { /* Communicator dropped below threshold, resume all queues */ GNUNET_STATISTICS_update (GST_stats, "# Transmission throttled due to communicator queue limit", -1, GNUNET_NO); for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head; NULL != s; s = s->next_client) schedule_transmit_on_queue (s); } notify_monitors (&neighbour->pid, session->address, session->nt, &me); GNUNET_ATS_session_del (session->sr); GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in); GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out); GNUNET_free (session); if (NULL == neighbour->session_head) { cores_send_disconnect_info (&neighbour->pid); free_neighbour (neighbour); } } /** * Free @a ale * * @param ale address list entry to free */ static void free_address_list_entry (struct AddressListEntry *ale) { struct TransportClient *tc = ale->tc; GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head, tc->details.communicator.addr_tail, ale); if (NULL != ale->sc) { GNUNET_PEERSTORE_store_cancel (ale->sc); ale->sc = NULL; } if (NULL != ale->st) { GNUNET_SCHEDULER_cancel (ale->st); ale->st = NULL; } GNUNET_free (ale); } /** * Called whenever a client is disconnected. Frees our * resources associated with that client. * * @param cls closure, NULL * @param client identification of the client * @param app_ctx our `struct TransportClient` */ static void client_disconnect_cb (void *cls, struct GNUNET_SERVICE_Client *client, void *app_ctx) { struct TransportClient *tc = app_ctx; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected, cleaning up.\n", tc); GNUNET_CONTAINER_DLL_remove (clients_head, clients_tail, tc); switch (tc->type) { case CT_NONE: break; case CT_CORE: { struct PendingMessage *pm; while (NULL != (pm = tc->details.core.pending_msg_head)) { GNUNET_CONTAINER_MDLL_remove (client, tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); pm->client = NULL; } } break; case CT_MONITOR: break; case CT_COMMUNICATOR: { struct GNUNET_ATS_Session *q; struct AddressListEntry *ale; while (NULL != (q = tc->details.communicator.session_head)) free_session (q); while (NULL != (ale = tc->details.communicator.addr_head)) free_address_list_entry (ale); GNUNET_free (tc->details.communicator.address_prefix); } break; } GNUNET_free (tc); } /** * Iterator telling new CORE client about all existing * connections to peers. * * @param cls the new `struct TransportClient` * @param pid a connected peer * @param value the `struct Neighbour` with more information * @return #GNUNET_OK (continue to iterate) */ static int notify_client_connect_info (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) { struct TransportClient *tc = cls; struct Neighbour *neighbour = value; core_send_connect_info (tc, pid, neighbour->quota_out); return GNUNET_OK; } /** * Initialize a "CORE" client. We got a start message from this * client, so add it to the list of clients for broadcasting of * inbound messages. * * @param cls the client * @param start the start message that was sent */ static void handle_client_start (void *cls, const struct StartMessage *start) { struct TransportClient *tc = cls; uint32_t options; options = ntohl (start->options); if ( (0 != (1 & options)) && (0 != memcmp (&start->self, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)) ) ) { /* client thinks this is a different peer, reject */ GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } if (CT_NONE != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } tc->type = CT_CORE; GNUNET_CONTAINER_multipeermap_iterate (neighbours, ¬ify_client_connect_info, tc); GNUNET_SERVICE_client_continue (tc->client); } /** * Client asked for transmission to a peer. Process the request. * * @param cls the client * @param obm the send message that was sent */ static int check_client_send (void *cls, const struct OutboundMessage *obm) { struct TransportClient *tc = cls; uint16_t size; const struct GNUNET_MessageHeader *obmm; if (CT_CORE != tc->type) { GNUNET_break (0); return GNUNET_SYSERR; } size = ntohs (obm->header.size) - sizeof (struct OutboundMessage); if (size < sizeof (struct GNUNET_MessageHeader)) { GNUNET_break (0); return GNUNET_SYSERR; } obmm = (const struct GNUNET_MessageHeader *) &obm[1]; if (size != ntohs (obmm->size)) { GNUNET_break (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Free fragment tree below @e root, excluding @e root itself. * * @param root root of the tree to free */ static void free_fragment_tree (struct PendingMessage *root) { struct PendingMessage *frag; while (NULL != (frag = root->head_frag)) { free_fragment_tree (frag); GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag); GNUNET_free (frag); } } /** * Release memory associated with @a pm and remove @a pm from associated * data structures. @a pm must be a top-level pending message and not * a fragment in the tree. The entire tree is freed (if applicable). * * @param pm the pending message to free */ static void free_pending_message (struct PendingMessage *pm) { struct TransportClient *tc = pm->client; struct Neighbour *target = pm->target; if (NULL != tc) { GNUNET_CONTAINER_MDLL_remove (client, tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); } GNUNET_CONTAINER_MDLL_remove (neighbour, target->pending_msg_head, target->pending_msg_tail, pm); free_fragment_tree (pm); GNUNET_free_non_null (pm->bpm); GNUNET_free (pm); } /** * Send a response to the @a pm that we have processed a * "send" request with status @a success. We * transmitted @a bytes_physical on the actual wire. * Sends a confirmation to the "core" client responsible * for the original request and free's @a pm. * * @param pm handle to the original pending message * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR * for transmission failure * @param bytes_physical amount of bandwidth consumed */ static void client_send_response (struct PendingMessage *pm, int success, uint32_t bytes_physical) { struct TransportClient *tc = pm->client; struct Neighbour *target = pm->target; struct GNUNET_MQ_Envelope *env; struct SendOkMessage *som; if (NULL != tc) { env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); som->success = htonl ((uint32_t) success); som->bytes_msg = htons (pm->bytes_msg); som->bytes_physical = htonl (bytes_physical); som->peer = target->pid; GNUNET_MQ_send (tc->mq, env); } free_pending_message (pm); } /** * Checks the message queue for a neighbour for messages that have timed * out and purges them. * * @param cls a `struct Neighbour` */ static void check_queue_timeouts (void *cls) { struct Neighbour *n = cls; struct PendingMessage *pm; struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Absolute earliest_timeout; n->timeout_task = NULL; earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; now = GNUNET_TIME_absolute_get (); for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm) { pm = pos->next_neighbour; if (pos->timeout.abs_value_us <= now.abs_value_us) { GNUNET_STATISTICS_update (GST_stats, "# messages dropped (timeout before confirmation)", 1, GNUNET_NO); client_send_response (pm, GNUNET_NO, 0); continue; } earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout); } n->earliest_timeout = earliest_timeout; if (NULL != n->pending_msg_head) n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n); } /** * Client asked for transmission to a peer. Process the request. * * @param cls the client * @param obm the send message that was sent */ static void handle_client_send (void *cls, const struct OutboundMessage *obm) { struct TransportClient *tc = cls; struct PendingMessage *pm; const struct GNUNET_MessageHeader *obmm; struct Neighbour *target; uint32_t bytes_msg; GNUNET_assert (CT_CORE == tc->type); obmm = (const struct GNUNET_MessageHeader *) &obm[1]; bytes_msg = ntohs (obmm->size); target = lookup_neighbour (&obm->peer); if (NULL == target) { /* Failure: don't have this peer as a neighbour (anymore). Might have gone down asynchronously, so this is NOT a protocol violation by CORE. Still count the event, as this should be rare. */ struct GNUNET_MQ_Envelope *env; struct SendOkMessage *som; env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); som->success = htonl (GNUNET_SYSERR); som->bytes_msg = htonl (bytes_msg); som->bytes_physical = htonl (0); som->peer = obm->peer; GNUNET_MQ_send (tc->mq, env); GNUNET_SERVICE_client_continue (tc->client); GNUNET_STATISTICS_update (GST_stats, "# messages dropped (neighbour unknown)", 1, GNUNET_NO); return; } pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); pm->client = tc; pm->target = target; pm->bytes_msg = bytes_msg; pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); memcpy (&pm[1], &obm[1], bytes_msg); GNUNET_CONTAINER_MDLL_insert (neighbour, target->pending_msg_head, target->pending_msg_tail, pm); GNUNET_CONTAINER_MDLL_insert (client, tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) { target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; if (NULL != target->timeout_task) GNUNET_SCHEDULER_cancel (target->timeout_task); target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout, &check_queue_timeouts, target); } } /** * Communicator started. Test message is well-formed. * * @param cls the client * @param cam the send message that was sent */ static int check_communicator_available (void *cls, const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam) { struct TransportClient *tc = cls; uint16_t size; if (CT_NONE != tc->type) { GNUNET_break (0); return GNUNET_SYSERR; } tc->type = CT_COMMUNICATOR; size = ntohs (cam->header.size) - sizeof (*cam); if (0 == size) return GNUNET_OK; /* receive-only communicator */ GNUNET_MQ_check_zero_termination (cam); return GNUNET_OK; } /** * Communicator started. Process the request. * * @param cls the client * @param cam the send message that was sent */ static void handle_communicator_available (void *cls, const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam) { struct TransportClient *tc = cls; uint16_t size; size = ntohs (cam->header.size) - sizeof (*cam); if (0 == size) return; /* receive-only communicator */ tc->details.communicator.address_prefix = GNUNET_strdup ((const char *) &cam[1]); tc->details.communicator.cc = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc); GNUNET_SERVICE_client_continue (tc->client); } /** * Communicator requests backchannel transmission. Check the request. * * @param cls the client * @param cb the send message that was sent * @return #GNUNET_OK if message is well-formed */ static int check_communicator_backchannel (void *cls, const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) { const struct GNUNET_MessageHeader *inbox; const char *is; uint16_t msize; uint16_t isize; msize = ntohs (cb->header.size) - sizeof (*cb); if (UINT16_MAX - msize > sizeof (struct TransportBackchannelEncapsulationMessage) + sizeof (struct TransportBackchannelRequestPayload) ) { GNUNET_break (0); return GNUNET_SYSERR; } inbox = (const struct GNUNET_MessageHeader *) &cb[1]; isize = ntohs (inbox->size); if (isize >= msize) { GNUNET_break (0); return GNUNET_SYSERR; } is = (const char *) inbox; is += isize; msize -= isize; GNUNET_assert (msize > 0); if ('\0' != is[msize-1]) { GNUNET_break (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Remove memory used by expired ephemeral keys. * * @param cls NULL */ static void expire_ephemerals (void *cls) { struct EphemeralCacheEntry *ece; (void) cls; ephemeral_task = NULL; while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap))) { if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) { free_ephemeral (ece); continue; } ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity, &expire_ephemerals, NULL); return; } } /** * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate * one, cache it and return it. * * @param pid peer to look up ephemeral for * @param private_key[out] set to the private key * @param ephemeral_key[out] set to the key * @param ephemeral_sender_sig[out] set to the signature * @param ephemeral_validity[out] set to the validity expiration time */ static void lookup_ephemeral (const struct GNUNET_PeerIdentity *pid, struct GNUNET_CRYPTO_EcdhePrivateKey *private_key, struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key, struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig, struct GNUNET_TIME_Absolute *ephemeral_validity) { struct EphemeralCacheEntry *ece; struct EphemeralConfirmation ec; ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map, pid); if ( (NULL != ece) && (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) ) { free_ephemeral (ece); ece = NULL; } if (NULL == ece) { ece = GNUNET_new (struct EphemeralCacheEntry); ece->target = *pid; ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg), EPHEMERAL_VALIDITY); GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key)); GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key, &ece->ephemeral_key); ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL); ec.purpose.size = htonl (sizeof (ec)); ec.target = *pid; ec.ephemeral_key = ece->ephemeral_key; GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key, &ec.purpose, &ece->sender_sig)); ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap, ece, ece->ephemeral_validity.abs_value_us); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (ephemeral_map, &ece->target, ece, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); if (NULL == ephemeral_task) ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity, &expire_ephemerals, NULL); } *private_key = ece->private_key; *ephemeral_key = ece->ephemeral_key; *ephemeral_sender_sig = ece->sender_sig; *ephemeral_validity = ece->ephemeral_validity; } /** * We need to transmit @a hdr to @a target. If necessary, this may * involve DV routing or even broadcasting and fragmentation. * * @param target peer to receive @a hdr * @param hdr header of the message to route */ static void route_message (const struct GNUNET_PeerIdentity *target, struct GNUNET_MessageHeader *hdr) { // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting) GNUNET_free (hdr); } /** * Communicator requests backchannel transmission. Process the request. * * @param cls the client * @param cb the send message that was sent */ static void handle_communicator_backchannel (void *cls, const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb) { struct TransportClient *tc = cls; struct GNUNET_CRYPTO_EcdhePrivateKey private_key; struct GNUNET_TIME_Absolute ephemeral_validity; struct TransportBackchannelEncapsulationMessage *enc; struct TransportBackchannelRequestPayload ppay; char *mpos; uint16_t msize; /* encapsulate and encrypt message */ msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload); enc = GNUNET_malloc (sizeof (*enc) + msize); enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION); enc->header.size = htons (sizeof (*enc) + msize); enc->target = cb->pid; lookup_ephemeral (&cb->pid, &private_key, &enc->ephemeral_key, &ppay.sender_sig, &ephemeral_validity); // FIXME: setup 'iv' #if FIXME dh_key_derive (&private_key, &cb->pid, &enc->iv, &key); #endif ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity); ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg)); mpos = (char *) &enc[1]; #if FIXME encrypt (key, &ppay, &mpos, sizeof (ppay)); encrypt (key, &cb[1], &mpos, ntohs (cb->header.size) - sizeof (*cb)); hmac (key, &enc->hmac); #endif route_message (&cb->pid, &enc->header); GNUNET_SERVICE_client_continue (tc->client); } /** * Address of our peer added. Test message is well-formed. * * @param cls the client * @param aam the send message that was sent * @return #GNUNET_OK if message is well-formed */ static int check_add_address (void *cls, const struct GNUNET_TRANSPORT_AddAddressMessage *aam) { struct TransportClient *tc = cls; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); return GNUNET_SYSERR; } GNUNET_MQ_check_zero_termination (aam); return GNUNET_OK; } /** * Ask peerstore to store our address. * * @param cls an `struct AddressListEntry *` */ static void store_pi (void *cls); /** * Function called when peerstore is done storing our address. */ static void peerstore_store_cb (void *cls, int success) { struct AddressListEntry *ale = cls; ale->sc = NULL; if (GNUNET_YES != success) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to store our own address `%s' in peerstore!\n", ale->address); /* refresh period is 1/4 of expiration time, that should be plenty without being excessive. */ ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration, 4ULL), &store_pi, ale); } /** * Ask peerstore to store our address. * * @param cls an `struct AddressListEntry *` */ static void store_pi (void *cls) { struct AddressListEntry *ale = cls; void *addr; size_t addr_len; struct GNUNET_TIME_Absolute expiration; ale->st = NULL; expiration = GNUNET_TIME_relative_to_absolute (ale->expiration); GNUNET_HELLO_sign_address (ale->address, ale->nt, expiration, GST_my_private_key, &addr, &addr_len); ale->sc = GNUNET_PEERSTORE_store (peerstore, "transport", &GST_my_identity, GNUNET_HELLO_PEERSTORE_KEY, addr, addr_len, expiration, GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, &peerstore_store_cb, ale); GNUNET_free (addr); if (NULL == ale->sc) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to store our address `%s' with peerstore\n", ale->address); ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &store_pi, ale); } } /** * Address of our peer added. Process the request. * * @param cls the client * @param aam the send message that was sent */ static void handle_add_address (void *cls, const struct GNUNET_TRANSPORT_AddAddressMessage *aam) { struct TransportClient *tc = cls; struct AddressListEntry *ale; size_t slen; slen = ntohs (aam->header.size) - sizeof (*aam); ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen); ale->tc = tc; ale->address = (const char *) &ale[1]; ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration); ale->aid = aam->aid; ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt); memcpy (&ale[1], &aam[1], slen); GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head, tc->details.communicator.addr_tail, ale); ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale); GNUNET_SERVICE_client_continue (tc->client); } /** * Address of our peer deleted. Process the request. * * @param cls the client * @param dam the send message that was sent */ static void handle_del_address (void *cls, const struct GNUNET_TRANSPORT_DelAddressMessage *dam) { struct TransportClient *tc = cls; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } for (struct AddressListEntry *ale = tc->details.communicator.addr_head; NULL != ale; ale = ale->next) { if (dam->aid != ale->aid) continue; GNUNET_assert (ale->tc == tc); free_address_list_entry (ale); GNUNET_SERVICE_client_continue (tc->client); } GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); } /** * Context from #handle_incoming_msg(). Closure for many * message handlers below. */ struct CommunicatorMessageContext { /** * Which communicator provided us with the message. */ struct TransportClient *tc; /** * Additional information for flow control and about the sender. */ struct GNUNET_TRANSPORT_IncomingMessage im; /** * Number of hops the message has travelled (if DV-routed). * FIXME: make use of this in ACK handling! */ uint16_t total_hops; }; /** * Given an inbound message @a msg from a communicator @a cmc, * demultiplex it based on the type calling the right handler. * * @param cmc context for demultiplexing * @param msg message to demultiplex */ static void demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, const struct GNUNET_MessageHeader *msg); /** * Send ACK to communicator (if requested) and free @a cmc. * * @param cmc context for which we are done handling the message */ static void finish_cmc_handling (struct CommunicatorMessageContext *cmc) { if (0 != ntohl (cmc->im.fc_on)) { /* send ACK when done to communicator for flow control! */ struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_IncomingMessageAck *ack; env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK); ack->reserved = htonl (0); ack->fc_id = cmc->im.fc_id; ack->sender = cmc->im.sender; GNUNET_MQ_send (cmc->tc->mq, env); } GNUNET_SERVICE_client_continue (cmc->tc->client); GNUNET_free (cmc); } /** * Communicator gave us an unencapsulated message to pass as-is to * CORE. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param mh the message that was received */ static void handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) { struct CommunicatorMessageContext *cmc = cls; uint16_t size = ntohs (mh->size); if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) || (size < sizeof (struct GNUNET_MessageHeader)) ) { struct GNUNET_SERVICE_Client *client = cmc->tc->client; GNUNET_break (0); finish_cmc_handling (cmc); GNUNET_SERVICE_client_drop (client); return; } /* Forward to all CORE clients */ for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) { struct GNUNET_MQ_Envelope *env; struct InboundMessage *im; if (CT_CORE != tc->type) continue; env = GNUNET_MQ_msg_extra (im, size, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); im->peer = cmc->im.sender; memcpy (&im[1], mh, size); GNUNET_MQ_send (tc->mq, env); } /* FIXME: consider doing this _only_ once the message was drained from the CORE MQs to extend flow control to CORE! (basically, increment counter in cmc, decrement on MQ send continuation! */ finish_cmc_handling (cmc); } /** * Communicator gave us a fragment box. Check the message. * * @param cls a `struct CommunicatorMessageContext` * @param fb the send message that was sent * @return #GNUNET_YES if message is well-formed */ static int check_fragment_box (void *cls, const struct TransportFragmentBox *fb) { uint16_t size = ntohs (fb->header.size); uint16_t bsize = size - sizeof (*fb); if (0 == bsize) { GNUNET_break_op (0); return GNUNET_SYSERR; } if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size)) { GNUNET_break_op (0); return GNUNET_SYSERR; } if (ntohs (fb->frag_off) >= ntohs (fb->msg_size)) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_YES; } /** * Generate a fragment acknowledgement for an @a rc. * * @param rc context to generate ACK for, @a rc ACK state is reset */ static void send_fragment_ack (struct ReassemblyContext *rc) { struct TransportFragmentAckMessage *ack; ack = GNUNET_new (struct TransportFragmentAckMessage); ack->header.size = htons (sizeof (struct TransportFragmentAckMessage)); ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK); ack->frag_uuid = htonl (rc->frag_uuid); ack->extra_acks = GNUNET_htonll (rc->extra_acks); ack->msg_uuid = rc->msg_uuid; ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay); if (0 == rc->msg_missing) ack->reassembly_timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */ else ack->reassembly_timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)); route_message (&rc->neighbour->pid, &ack->header); rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO; rc->num_acks = 0; rc->extra_acks = 0LLU; } /** * Communicator gave us a fragment. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param fb the message that was received */ static void handle_fragment_box (void *cls, const struct TransportFragmentBox *fb) { struct CommunicatorMessageContext *cmc = cls; struct Neighbour *n; struct ReassemblyContext *rc; const struct GNUNET_MessageHeader *msg; uint16_t msize; uint16_t fsize; uint16_t frag_off; uint32_t frag_uuid; char *target; struct GNUNET_TIME_Relative cdelay; int ack_now; n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); if (NULL == n) { struct GNUNET_SERVICE_Client *client = cmc->tc->client; GNUNET_break (0); finish_cmc_handling (cmc); GNUNET_SERVICE_client_drop (client); return; } if (NULL == n->reassembly_map) { n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8, GNUNET_YES); n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION, &reassembly_cleanup_task, n); } msize = ntohs (fb->msg_size); rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map, &fb->msg_uuid); if (NULL == rc) { rc = GNUNET_malloc (sizeof (*rc) + msize + /* reassembly payload buffer */ (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */); rc->msg_uuid = fb->msg_uuid; rc->neighbour = n; rc->msg_size = msize; rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION); rc->last_frag = GNUNET_TIME_absolute_get (); rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap, rc, rc->reassembly_timeout.abs_value_us); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multishortmap_put (n->reassembly_map, &rc->msg_uuid, rc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); target = (char *) &rc[1]; rc->bitfield = (uint8_t *) (target + rc->msg_size); rc->msg_missing = rc->msg_size; } else { target = (char *) &rc[1]; } if (msize != rc->msg_size) { GNUNET_break (0); finish_cmc_handling (cmc); return; } /* reassemble */ fsize = ntohs (fb->header.size) - sizeof (*fb); frag_off = ntohs (fb->frag_off); memcpy (&target[frag_off], &fb[1], fsize); /* update bitfield and msg_missing */ for (unsigned int i=frag_off;ibitfield[i / 8] & (1 << (i % 8)))) { rc->bitfield[i / 8] |= (1 << (i % 8)); rc->msg_missing--; } } /* Compute cummulative ACK */ frag_uuid = ntohl (fb->frag_uuid); cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag); cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks); rc->last_frag = GNUNET_TIME_absolute_get (); rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay); ack_now = GNUNET_NO; if (0 == rc->num_acks) { /* case one: first ack */ rc->frag_uuid = frag_uuid; rc->extra_acks = 0LLU; rc->num_acks = 1; } else if ( (frag_uuid >= rc->frag_uuid) && (frag_uuid <= rc->frag_uuid + 64) ) { /* case two: ack fits after existing min UUID */ if ( (frag_uuid == rc->frag_uuid) || (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) ) { /* duplicate fragment, ack now! */ ack_now = GNUNET_YES; } else { rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1)); rc->num_acks++; } } else if ( (rc->frag_uuid > frag_uuid) && ( ( (rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks) ) || ( (rc->frag_uuid < frag_uuid + 64) && (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) ) { /* can fit ack by shifting extra acks and starting at frag_uid, test above esured that the bits we will shift 'extra_acks' by are all zero. */ rc->extra_acks <<= (rc->frag_uuid - frag_uuid); rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1)); rc->frag_uuid = frag_uuid; rc->num_acks++; } if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */ ack_now = GNUNET_YES; /* maximum acks received */ // FIXME: possibly also ACK based on RTT (but for that we'd need to // determine the session used for the ACK first!) /* is reassembly complete? */ if (0 != rc->msg_missing) { if (ack_now) send_fragment_ack (rc); finish_cmc_handling (cmc); return; } /* reassembly is complete, verify result */ msg = (const struct GNUNET_MessageHeader *) &rc[1]; if (ntohs (msg->size) != rc->msg_size) { GNUNET_break (0); free_reassembly_context (rc); finish_cmc_handling (cmc); return; } /* successful reassembly */ send_fragment_ack (rc); demultiplex_with_cmc (cmc, msg); /* FIXME: really free here? Might be bad if fragments are still en-route and we forget that we finished this reassembly immediately! -> keep around until timeout? -> shorten timeout based on ACK? */ free_reassembly_context (rc); } /** * Communicator gave us a fragment acknowledgement. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param fa the message that was received */ static void handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa) { struct CommunicatorMessageContext *cmc = cls; // FIXME: do work: identify original message; then identify fragments being acked; // remove those from the tree to prevent retransmission; // compute RTT // if entire message is ACKed, handle that as well. finish_cmc_handling (cmc); } /** * Communicator gave us a reliability box. Check the message. * * @param cls a `struct CommunicatorMessageContext` * @param rb the send message that was sent * @return #GNUNET_YES if message is well-formed */ static int check_reliability_box (void *cls, const struct TransportReliabilityBox *rb) { GNUNET_MQ_check_boxed_message (rb); return GNUNET_YES; } /** * Communicator gave us a reliability box. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param rb the message that was received */ static void handle_reliability_box (void *cls, const struct TransportReliabilityBox *rb) { struct CommunicatorMessageContext *cmc = cls; const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1]; if (0 == ntohl (rb->ack_countdown)) { struct TransportReliabilityAckMessage *ack; /* FIXME: implement cummulative ACKs and ack_countdown, then setting the avg_ack_delay field below: */ ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct GNUNET_ShortHashCode)); ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK); ack->header.size = htons (sizeof (*ack) + sizeof (struct GNUNET_ShortHashCode)); memcpy (&ack[1], &rb->msg_uuid, sizeof (struct GNUNET_ShortHashCode)); route_message (&cmc->im.sender, &ack->header); } /* continue with inner message */ demultiplex_with_cmc (cmc, inbox); } /** * Communicator gave us a reliability ack. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param ra the message that was received */ static void handle_reliability_ack (void *cls, const struct TransportReliabilityAckMessage *ra) { struct CommunicatorMessageContext *cmc = cls; // FIXME: do work: find message that was acknowledged, and // remove from transmission queue; update RTT. finish_cmc_handling (cmc); } /** * Communicator gave us a backchannel encapsulation. Check the message. * * @param cls a `struct CommunicatorMessageContext` * @param be the send message that was sent * @return #GNUNET_YES if message is well-formed */ static int check_backchannel_encapsulation (void *cls, const struct TransportBackchannelEncapsulationMessage *be) { uint16_t size = ntohs (be->header.size); if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_YES; } /** * Communicator gave us a backchannel encapsulation. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param be the message that was received */ static void handle_backchannel_encapsulation (void *cls, const struct TransportBackchannelEncapsulationMessage *be) { struct CommunicatorMessageContext *cmc = cls; if (0 != memcmp (&be->target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity))) { /* not for me, try to route to target */ route_message (&be->target, GNUNET_copy_message (&be->header)); finish_cmc_handling (cmc); return; } // FIXME: compute shared secret // FIXME: check HMAC // FIXME: decrypt payload // FIXME: forward to specified communicator! // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING) finish_cmc_handling (cmc); } /** * Communicator gave us a DV learn message. Check the message. * * @param cls a `struct CommunicatorMessageContext` * @param dvl the send message that was sent * @return #GNUNET_YES if message is well-formed */ static int check_dv_learn (void *cls, const struct TransportDVLearn *dvl) { uint16_t size = ntohs (dvl->header.size); uint16_t num_hops = ntohs (dvl->num_hops); const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1]; if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity)) { GNUNET_break_op (0); return GNUNET_SYSERR; } for (unsigned int i=0;iinitiator, &hops[i], sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); return GNUNET_SYSERR; } if (0 == memcmp (&GST_my_identity, &hops[i], sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); return GNUNET_SYSERR; } } return GNUNET_YES; } /** * Communicator gave us a DV learn message. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param dvl the message that was received */ static void handle_dv_learn (void *cls, const struct TransportDVLearn *dvl) { struct CommunicatorMessageContext *cmc = cls; // FIXME: learn path from DV message (if bi-directional flags are set) // FIXME: expand DV message, forward on (unless path is getting too long) finish_cmc_handling (cmc); } /** * Communicator gave us a DV box. Check the message. * * @param cls a `struct CommunicatorMessageContext` * @param dvb the send message that was sent * @return #GNUNET_YES if message is well-formed */ static int check_dv_box (void *cls, const struct TransportDVBox *dvb) { uint16_t size = ntohs (dvb->header.size); uint16_t num_hops = ntohs (dvb->num_hops); const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1]; const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops]; uint16_t isize; uint16_t itype; if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); return GNUNET_SYSERR; } isize = ntohs (inbox->size); if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize) { GNUNET_break_op (0); return GNUNET_SYSERR; } itype = ntohs (inbox->type); if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) || (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) ) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_YES; } /** * Communicator gave us a DV box. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param dvb the message that was received */ static void handle_dv_box (void *cls, const struct TransportDVBox *dvb) { struct CommunicatorMessageContext *cmc = cls; uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb); uint16_t num_hops = ntohs (dvb->num_hops); const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1]; const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops]; if (num_hops > 0) { // FIXME: if we are not the target, shorten path and forward along. // Try from the _end_ of hops array if we know the given // neighbour (shortening the path!). // NOTE: increment total_hops! finish_cmc_handling (cmc); return; } /* We are the target. Unbox and handle message. */ cmc->im.sender = dvb->origin; cmc->total_hops = ntohs (dvb->total_hops); demultiplex_with_cmc (cmc, inbox); } /** * Client notified us about transmission from a peer. Process the request. * * @param cls a `struct TransportClient` which sent us the message * @param obm the send message that was sent * @return #GNUNET_YES if message is well-formed */ static int check_incoming_msg (void *cls, const struct GNUNET_TRANSPORT_IncomingMessage *im) { struct TransportClient *tc = cls; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); return GNUNET_SYSERR; } GNUNET_MQ_check_boxed_message (im); return GNUNET_OK; } /** * Incoming meessage. Process the request. * * @param im the send message that was received */ static void handle_incoming_msg (void *cls, const struct GNUNET_TRANSPORT_IncomingMessage *im) { struct TransportClient *tc = cls; struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext); cmc->tc = tc; cmc->im = *im; demultiplex_with_cmc (cmc, (const struct GNUNET_MessageHeader *) &im[1]); } /** * Given an inbound message @a msg from a communicator @a cmc, * demultiplex it based on the type calling the right handler. * * @param cmc context for demultiplexing * @param msg message to demultiplex */ static void demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, const struct GNUNET_MessageHeader *msg) { struct GNUNET_MQ_MessageHandler handlers[] = { GNUNET_MQ_hd_var_size (fragment_box, GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT, struct TransportFragmentBox, &cmc), GNUNET_MQ_hd_fixed_size (fragment_ack, GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK, struct TransportFragmentAckMessage, &cmc), GNUNET_MQ_hd_var_size (reliability_box, GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX, struct TransportReliabilityBox, &cmc), GNUNET_MQ_hd_fixed_size (reliability_ack, GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK, struct TransportReliabilityAckMessage, &cmc), GNUNET_MQ_hd_var_size (backchannel_encapsulation, GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION, struct TransportBackchannelEncapsulationMessage, &cmc), GNUNET_MQ_hd_var_size (dv_learn, GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN, struct TransportDVLearn, &cmc), GNUNET_MQ_hd_var_size (dv_box, GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX, struct TransportDVBox, &cmc), GNUNET_MQ_handler_end() }; int ret; ret = GNUNET_MQ_handle_message (handlers, msg); if (GNUNET_SYSERR == ret) { GNUNET_break (0); GNUNET_SERVICE_client_drop (cmc->tc->client); GNUNET_free (cmc); return; } if (GNUNET_NO == ret) { /* unencapsulated 'raw' message */ handle_raw_message (&cmc, msg); } } /** * New queue became available. Check message. * * @param cls the client * @param aqm the send message that was sent */ static int check_add_queue_message (void *cls, const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) { struct TransportClient *tc = cls; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); return GNUNET_SYSERR; } GNUNET_MQ_check_zero_termination (aqm); return GNUNET_OK; } /** * Bandwidth tracker informs us that the delay until we should receive * more has changed. * * @param cls a `struct GNUNET_ATS_Session` for which the delay changed */ static void tracker_update_in_cb (void *cls) { struct GNUNET_ATS_Session *queue = cls; struct GNUNET_TIME_Relative in_delay; unsigned int rsize; rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu; in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, rsize); // FIXME: how exactly do we do inbound flow control? } /** * If necessary, generates the UUID for a @a pm * * @param pm pending message to generate UUID for. */ static void set_pending_message_uuid (struct PendingMessage *pm) { if (pm->msg_uuid_set) return; GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &pm->msg_uuid, sizeof (pm->msg_uuid)); pm->msg_uuid_set = GNUNET_YES; } /** * Fragment the given @a pm to the given @a mtu. Adds * additional fragments to the neighbour as well. If the * @a mtu is too small, generates and error for the @a pm * and returns NULL. * * @param pm pending message to fragment for transmission * @param mtu MTU to apply * @return new message to transmit */ static struct PendingMessage * fragment_message (struct PendingMessage *pm, uint16_t mtu) { struct PendingMessage *ff; set_pending_message_uuid (pm); /* This invariant is established in #handle_add_queue_message() */ GNUNET_assert (mtu > sizeof (struct TransportFragmentBox)); /* select fragment for transmission, descending the tree if it has been expanded until we are at a leaf or at a fragment that is small enough */ ff = pm; while ( ( (ff->bytes_msg > mtu) || (pm == ff) ) && (ff->frag_off == ff->bytes_msg) && (NULL != ff->head_frag) ) { ff = ff->head_frag; /* descent into fragmented fragments */ } if ( ( (ff->bytes_msg > mtu) || (pm == ff) ) && (pm->frag_off < pm->bytes_msg) ) { /* Did not yet calculate all fragments, calculate next fragment */ struct PendingMessage *frag; struct TransportFragmentBox tfb; const char *orig; char *msg; uint16_t fragmax; uint16_t fragsize; uint16_t msize; uint16_t xoff = 0; orig = (const char *) &ff[1]; msize = ff->bytes_msg; if (pm != ff) { const struct TransportFragmentBox *tfbo; tfbo = (const struct TransportFragmentBox *) orig; orig += sizeof (struct TransportFragmentBox); msize -= sizeof (struct TransportFragmentBox); xoff = ntohs (tfbo->frag_off); } fragmax = mtu - sizeof (struct TransportFragmentBox); fragsize = GNUNET_MIN (msize - ff->frag_off, fragmax); frag = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct TransportFragmentBox) + fragsize); frag->target = pm->target; frag->frag_parent = ff; frag->timeout = pm->timeout; frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize; frag->pmt = PMT_FRAGMENT_BOX; msg = (char *) &frag[1]; tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); tfb.header.size = htons (sizeof (struct TransportFragmentBox) + fragsize); tfb.frag_uuid = htonl (pm->frag_uuidgen++); tfb.msg_uuid = pm->msg_uuid; tfb.frag_off = htons (ff->frag_off + xoff); tfb.msg_size = htons (pm->bytes_msg); memcpy (msg, &tfb, sizeof (tfb)); memcpy (&msg[sizeof (tfb)], &orig[ff->frag_off], fragsize); GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, ff->tail_frag, frag); ff->frag_off += fragsize; ff = frag; } /* Move head to the tail and return it */ GNUNET_CONTAINER_MDLL_remove (frag, ff->frag_parent->head_frag, ff->frag_parent->tail_frag, ff); GNUNET_CONTAINER_MDLL_insert_tail (frag, ff->frag_parent->head_frag, ff->frag_parent->tail_frag, ff); return ff; } /** * Reliability-box the given @a pm. On error (can there be any), NULL * may be returned, otherwise the "replacement" for @a pm (which * should then be added to the respective neighbour's queue instead of * @a pm). If the @a pm is already fragmented or reliability boxed, * or itself an ACK, this function simply returns @a pm. * * @param pm pending message to box for transmission over unreliabile queue * @return new message to transmit */ static struct PendingMessage * reliability_box_message (struct PendingMessage *pm) { struct TransportReliabilityBox rbox; struct PendingMessage *bpm; char *msg; if (PMT_CORE != pm->pmt) return pm; /* already fragmented or reliability boxed, or control message: do nothing */ if (NULL != pm->bpm) return pm->bpm; /* already computed earlier: do nothing */ GNUNET_assert (NULL == pm->head_frag); if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX) { /* failed hard */ GNUNET_break (0); client_send_response (pm, GNUNET_NO, 0); return NULL; } bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) + pm->bytes_msg); bpm->target = pm->target; bpm->frag_parent = pm; GNUNET_CONTAINER_MDLL_insert (frag, pm->head_frag, pm->tail_frag, bpm); bpm->timeout = pm->timeout; bpm->pmt = PMT_RELIABILITY_BOX; bpm->bytes_msg = pm->bytes_msg + sizeof (rbox); set_pending_message_uuid (bpm); rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX); rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg); rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support rbox.msg_uuid = pm->msg_uuid; msg = (char *) &bpm[1]; memcpy (msg, &rbox, sizeof (rbox)); memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg); pm->bpm = bpm; return bpm; } /** * We believe we are ready to transmit a message on a queue. Double-checks * with the queue's "tracker_out" and then gives the message to the * communicator for transmission (updating the tracker, and re-scheduling * itself if applicable). * * @param cls the `struct GNUNET_ATS_Session` to process transmissions for */ static void transmit_on_queue (void *cls) { struct GNUNET_ATS_Session *queue = cls; struct Neighbour *n = queue->neighbour; struct QueueEntry *qe; struct PendingMessage *pm; struct PendingMessage *s; uint32_t overhead; struct GNUNET_TRANSPORT_SendMessageTo *smt; struct GNUNET_MQ_Envelope *env; queue->transmit_task = NULL; if (NULL == (pm = n->pending_msg_head)) { /* no message pending, nothing to do here! */ return; } schedule_transmit_on_queue (queue); if (NULL != queue->transmit_task) return; /* do it later */ overhead = 0; if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) overhead += sizeof (struct TransportReliabilityBox); s = pm; if ( ( (0 != queue->mtu) && (pm->bytes_msg + overhead > queue->mtu) ) || (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) || (NULL != pm->head_frag /* fragments already exist, should respect that even if MTU is 0 for this queue */) ) s = fragment_message (s, (0 == queue->mtu) ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo) : queue->mtu); if (NULL == s) { /* Fragmentation failed, try next message... */ schedule_transmit_on_queue (queue); return; } if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) s = reliability_box_message (s); if (NULL == s) { /* Reliability boxing failed, try next message... */ schedule_transmit_on_queue (queue); return; } /* Pass 's' for transission to the communicator */ qe = GNUNET_new (struct QueueEntry); qe->mid = queue->mid_gen++; qe->session = queue; // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'! GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe); env = GNUNET_MQ_msg_extra (smt, s->bytes_msg, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); smt->qid = queue->qid; smt->mid = qe->mid; smt->receiver = n->pid; memcpy (&smt[1], &s[1], s->bytes_msg); GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); queue->queue_length++; queue->tc->details.communicator.total_queue_length++; GNUNET_MQ_send (queue->tc->mq, env); // FIXME: do something similar to the logic below // in defragmentation / reliability ACK handling! /* Check if this transmission somehow conclusively finished handing 'pm' even without any explicit ACKs */ if ( (PMT_CORE == s->pmt) && (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) ) { /* Full message sent, and over reliabile channel */ client_send_response (pm, GNUNET_YES, pm->bytes_msg); } else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) && (PMT_FRAGMENT_BOX == s->pmt) ) { struct PendingMessage *pos; /* Fragment sent over reliabile channel */ free_fragment_tree (s); pos = s->frag_parent; GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s); GNUNET_free (s); /* check if subtree is done */ while ( (NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) && (pos != pm) ) { s = pos; pos = s->frag_parent; GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, s); GNUNET_free (s); } /* Was this the last applicable fragmment? */ if ( (NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg) ) client_send_response (pm, GNUNET_YES, pm->bytes_msg /* FIXME: calculate and add overheads! */); } else if (PMT_CORE != pm->pmt) { /* This was an acknowledgement of some type, always free */ free_pending_message (pm); } else { /* message not finished, waiting for acknowledgement */ struct Neighbour *neighbour = pm->target; /* Update time by which we might retransmit 's' based on queue characteristics (i.e. RTT); it takes one RTT for the message to arrive and the ACK to come back in the best case; but the other side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before retransmitting. Note that in the future this heuristic should likely be improved further (measure RTT stability, consider message urgency and size when delaying ACKs, etc.) */ s->next_attempt = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (queue->rtt, 4)); if (s == pm) { struct PendingMessage *pos; /* re-insert sort in neighbour list */ GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->pending_msg_head, neighbour->pending_msg_tail, pm); pos = neighbour->pending_msg_tail; while ( (NULL != pos) && (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) ) pos = pos->prev_neighbour; GNUNET_CONTAINER_MDLL_insert_after (neighbour, neighbour->pending_msg_head, neighbour->pending_msg_tail, pos, pm); } else { /* re-insert sort in fragment list */ struct PendingMessage *fp = s->frag_parent; struct PendingMessage *pos; GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, s); pos = fp->tail_frag; while ( (NULL != pos) && (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) ) pos = pos->prev_frag; GNUNET_CONTAINER_MDLL_insert_after (frag, fp->head_frag, fp->tail_frag, pos, s); } } /* finally, re-schedule queue transmission task itself */ schedule_transmit_on_queue (queue); } /** * Bandwidth tracker informs us that the delay until we * can transmit again changed. * * @param cls a `struct GNUNET_ATS_Session` for which the delay changed */ static void tracker_update_out_cb (void *cls) { struct GNUNET_ATS_Session *queue = cls; struct Neighbour *n = queue->neighbour; if (NULL == n->pending_msg_head) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bandwidth allocation updated for empty transmission queue `%s'\n", queue->address); return; /* no message pending, nothing to do here! */ } GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = NULL; schedule_transmit_on_queue (queue); } /** * Bandwidth tracker informs us that excessive outbound bandwidth was * allocated which is not being used. * * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted */ static void tracker_excess_out_cb (void *cls) { /* FIXME: trigger excess bandwidth report to core? Right now, this is done internally within transport_api2_core already, but we probably want to change the logic and trigger it from here via a message instead! */ /* TODO: maybe inform ATS at this point? */ GNUNET_STATISTICS_update (GST_stats, "# Excess outbound bandwidth reported", 1, GNUNET_NO); } /** * Bandwidth tracker informs us that excessive inbound bandwidth was allocated * which is not being used. * * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted */ static void tracker_excess_in_cb (void *cls) { /* TODO: maybe inform ATS at this point? */ GNUNET_STATISTICS_update (GST_stats, "# Excess inbound bandwidth reported", 1, GNUNET_NO); } /** * New queue became available. Process the request. * * @param cls the client * @param aqm the send message that was sent */ static void handle_add_queue_message (void *cls, const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) { struct TransportClient *tc = cls; struct GNUNET_ATS_Session *queue; struct Neighbour *neighbour; const char *addr; uint16_t addr_len; if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox)) { /* MTU so small as to be useless for transmissions, required for #fragment_message()! */ GNUNET_break_op (0); GNUNET_SERVICE_client_drop (tc->client); return; } neighbour = lookup_neighbour (&aqm->receiver); if (NULL == neighbour) { neighbour = GNUNET_new (struct Neighbour); neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; neighbour->pid = aqm->receiver; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (neighbours, &neighbour->pid, neighbour, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); cores_send_connect_info (&neighbour->pid, GNUNET_BANDWIDTH_ZERO); } addr_len = ntohs (aqm->header.size) - sizeof (*aqm); addr = (const char *) &aqm[1]; queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len); queue->tc = tc; queue->address = (const char *) &queue[1]; queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL; queue->qid = aqm->qid; queue->mtu = ntohl (aqm->mtu); queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt); queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); queue->neighbour = neighbour; GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in, &tracker_update_in_cb, queue, GNUNET_BANDWIDTH_ZERO, GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, &tracker_excess_in_cb, queue); GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out, &tracker_update_out_cb, queue, GNUNET_BANDWIDTH_ZERO, GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, &tracker_excess_out_cb, queue); memcpy (&queue[1], addr, addr_len); /* notify ATS about new queue */ { struct GNUNET_ATS_Properties prop = { .delay = GNUNET_TIME_UNIT_FOREVER_REL, .mtu = queue->mtu, .nt = queue->nt, .cc = tc->details.communicator.cc }; queue->sr = GNUNET_ATS_session_add (ats, &neighbour->pid, queue->address, queue, &prop); if (NULL == queue->sr) { /* This can only happen if the 'address' was way too long for ATS (approaching 64k in strlen()!). In this case, the communicator must be buggy and we drop it. */ GNUNET_break (0); GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); GNUNET_free (queue); if (NULL == neighbour->session_head) { cores_send_disconnect_info (&neighbour->pid); free_neighbour (neighbour); } GNUNET_SERVICE_client_drop (tc->client); return; } } /* notify monitors about new queue */ { struct MonitorEvent me = { .rtt = queue->rtt, .cs = queue->cs }; notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); } GNUNET_CONTAINER_MDLL_insert (neighbour, neighbour->session_head, neighbour->session_tail, queue); GNUNET_CONTAINER_MDLL_insert (client, tc->details.communicator.session_head, tc->details.communicator.session_tail, queue); GNUNET_SERVICE_client_continue (tc->client); } /** * Queue to a peer went down. Process the request. * * @param cls the client * @param dqm the send message that was sent */ static void handle_del_queue_message (void *cls, const struct GNUNET_TRANSPORT_DelQueueMessage *dqm) { struct TransportClient *tc = cls; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; NULL != session; session = session->next_client) { struct Neighbour *neighbour = session->neighbour; if ( (dqm->qid != session->qid) || (0 != memcmp (&dqm->receiver, &neighbour->pid, sizeof (struct GNUNET_PeerIdentity))) ) continue; free_session (session); GNUNET_SERVICE_client_continue (tc->client); return; } GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); } /** * Message was transmitted. Process the request. * * @param cls the client * @param sma the send message that was sent */ static void handle_send_message_ack (void *cls, const struct GNUNET_TRANSPORT_SendMessageToAck *sma) { struct TransportClient *tc = cls; struct QueueEntry *queue; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } /* find our queue entry matching the ACK */ queue = NULL; for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; NULL != session; session = session->next_client) { if (0 != memcmp (&session->neighbour->pid, &sma->receiver, sizeof (struct GNUNET_PeerIdentity))) continue; for (struct QueueEntry *qe = session->queue_head; NULL != qe; qe = qe->next) { if (qe->mid != sma->mid) continue; queue = qe; break; } break; } if (NULL == queue) { /* this should never happen */ GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } GNUNET_CONTAINER_DLL_remove (queue->session->queue_head, queue->session->queue_tail, queue); queue->session->queue_length--; tc->details.communicator.total_queue_length--; GNUNET_SERVICE_client_continue (tc->client); /* if applicable, resume transmissions that waited on ACK */ if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length) { /* Communicator dropped below threshold, resume all queues */ GNUNET_STATISTICS_update (GST_stats, "# Transmission throttled due to communicator queue limit", -1, GNUNET_NO); for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; NULL != session; session = session->next_client) schedule_transmit_on_queue (session); } else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length) { /* queue dropped below threshold; only resume this one queue */ GNUNET_STATISTICS_update (GST_stats, "# Transmission throttled due to session queue limit", -1, GNUNET_NO); schedule_transmit_on_queue (queue->session); } /* TODO: we also should react on the status! */ // FIXME: this probably requires queue->pm = s assignment! // FIXME: react to communicator status about transmission request. We got: sma->status; // OK success, SYSERR failure GNUNET_free (queue); } /** * Iterator telling new MONITOR client about all existing * queues to peers. * * @param cls the new `struct TransportClient` * @param pid a connected peer * @param value the `struct Neighbour` with more information * @return #GNUNET_OK (continue to iterate) */ static int notify_client_queues (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) { struct TransportClient *tc = cls; struct Neighbour *neighbour = value; GNUNET_assert (CT_MONITOR == tc->type); for (struct GNUNET_ATS_Session *q = neighbour->session_head; NULL != q; q = q->next_neighbour) { struct MonitorEvent me = { .rtt = q->rtt, .cs = q->cs, .num_msg_pending = q->num_msg_pending, .num_bytes_pending = q->num_bytes_pending }; notify_monitor (tc, pid, q->address, q->nt, &me); } return GNUNET_OK; } /** * Initialize a monitor client. * * @param cls the client * @param start the start message that was sent */ static void handle_monitor_start (void *cls, const struct GNUNET_TRANSPORT_MonitorStart *start) { struct TransportClient *tc = cls; if (CT_NONE != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } tc->type = CT_MONITOR; tc->details.monitor.peer = start->peer; tc->details.monitor.one_shot = ntohl (start->one_shot); GNUNET_CONTAINER_multipeermap_iterate (neighbours, ¬ify_client_queues, tc); GNUNET_SERVICE_client_mark_monitor (tc->client); GNUNET_SERVICE_client_continue (tc->client); } /** * Signature of a function called by ATS with the current bandwidth * allocation to be used as determined by ATS. * * @param cls closure, NULL * @param session session this is about * @param bandwidth_out assigned outbound bandwidth for the connection, * 0 to signal disconnect * @param bandwidth_in assigned inbound bandwidth for the connection, * 0 to signal disconnect */ static void ats_allocation_cb (void *cls, struct GNUNET_ATS_Session *session, struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in) { (void) cls; GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out, bandwidth_out); GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in, bandwidth_in); } /** * Find transport client providing communication service * for the protocol @a prefix. * * @param prefix communicator name * @return NULL if no such transport client is available */ static struct TransportClient * lookup_communicator (const char *prefix) { for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) { if (CT_COMMUNICATOR != tc->type) continue; if (0 == strcmp (prefix, tc->details.communicator.address_prefix)) return tc; } GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n", prefix); return NULL; } /** * Signature of a function called by ATS suggesting transport to * try connecting with a particular address. * * @param cls closure, NULL * @param pid target peer * @param address the address to try */ static void ats_suggestion_cb (void *cls, const struct GNUNET_PeerIdentity *pid, const char *address) { static uint32_t idgen; struct TransportClient *tc; char *prefix; struct GNUNET_TRANSPORT_CreateQueue *cqm; struct GNUNET_MQ_Envelope *env; size_t alen; (void) cls; prefix = GNUNET_HELLO_address_to_prefix (address); if (NULL == prefix) { GNUNET_break (0); /* ATS gave invalid address!? */ return; } tc = lookup_communicator (prefix); if (NULL == tc) { GNUNET_STATISTICS_update (GST_stats, "# ATS suggestions ignored due to missing communicator", 1, GNUNET_NO); return; } /* forward suggestion for queue creation to communicator */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Request #%u for `%s' communicator to create queue to `%s'\n", (unsigned int) idgen, prefix, address); alen = strlen (address) + 1; env = GNUNET_MQ_msg_extra (cqm, alen, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE); cqm->request_id = htonl (idgen++); cqm->receiver = *pid; memcpy (&cqm[1], address, alen); GNUNET_MQ_send (tc->mq, env); } /** * Communicator tells us that our request to create a queue "worked", that * is setting up the queue is now in process. * * @param cls the `struct TransportClient` * @param cqr confirmation message */ static void handle_queue_create_ok (void *cls, const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) { struct TransportClient *tc = cls; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } GNUNET_STATISTICS_update (GST_stats, "# ATS suggestions succeeded at communicator", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Request #%u for communicator to create queue succeeded\n", (unsigned int) ntohs (cqr->request_id)); GNUNET_SERVICE_client_continue (tc->client); } /** * Communicator tells us that our request to create a queue failed. This usually * indicates that the provided address is simply invalid or that the communicator's * resources are exhausted. * * @param cls the `struct TransportClient` * @param cqr failure message */ static void handle_queue_create_fail (void *cls, const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) { struct TransportClient *tc = cls; if (CT_COMMUNICATOR != tc->type) { GNUNET_break (0); GNUNET_SERVICE_client_drop (tc->client); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Request #%u for communicator to create queue failed\n", (unsigned int) ntohs (cqr->request_id)); GNUNET_STATISTICS_update (GST_stats, "# ATS suggestions failed in queue creation at communicator", 1, GNUNET_NO); GNUNET_SERVICE_client_continue (tc->client); } /** * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY * messages. We do nothing here, real verification is done later. * * @param cls a `struct TransportClient *` * @param msg message to verify * @return #GNUNET_OK */ static int check_address_consider_verify (void *cls, const struct GNUNET_TRANSPORT_AddressToVerify *hdr) { (void) cls; (void) hdr; return GNUNET_OK; } /** * Given another peers address, consider checking it for validity * and then adding it to the Peerstore. * * @param cls a `struct TransportClient` * @param hdr message containing the raw address data and * signature in the body, see #GNUNET_HELLO_extract_address() */ static void handle_address_consider_verify (void *cls, const struct GNUNET_TRANSPORT_AddressToVerify *hdr) { char *address; enum GNUNET_NetworkType nt; struct GNUNET_TIME_Absolute expiration; (void) cls; // FIXME: pre-check: do we know this address already? // FIXME: pre-check: rate-limit signature verification / validation! address = GNUNET_HELLO_extract_address (&hdr[1], ntohs (hdr->header.size) - sizeof (*hdr), &hdr->peer, &nt, &expiration); if (NULL == address) { GNUNET_break_op (0); return; } if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us) return; /* expired */ // FIXME: do begin actual verification here! GNUNET_free (address); } /** * Free neighbour entry. * * @param cls NULL * @param pid unused * @param value a `struct Neighbour` * @return #GNUNET_OK (always) */ static int free_neighbour_cb (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) { struct Neighbour *neighbour = value; (void) cls; (void) pid; GNUNET_break (0); // should this ever happen? free_neighbour (neighbour); return GNUNET_OK; } /** * Free DV route entry. * * @param cls NULL * @param pid unused * @param value a `struct DistanceVector` * @return #GNUNET_OK (always) */ static int free_dv_routes_cb (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) { struct DistanceVector *dv = value; (void) cls; (void) pid; free_dv_route (dv); return GNUNET_OK; } /** * Free ephemeral entry. * * @param cls NULL * @param pid unused * @param value a `struct Neighbour` * @return #GNUNET_OK (always) */ static int free_ephemeral_cb (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) { struct EphemeralCacheEntry *ece = value; (void) cls; (void) pid; free_ephemeral (ece); return GNUNET_OK; } /** * Function called when the service shuts down. Unloads our plugins * and cancels pending validations. * * @param cls closure, unused */ static void do_shutdown (void *cls) { (void) cls; if (NULL != ephemeral_task) { GNUNET_SCHEDULER_cancel (ephemeral_task); ephemeral_task = NULL; } GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL); if (NULL != ats) { GNUNET_ATS_transport_done (ats); ats = NULL; } if (NULL != peerstore) { GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO); peerstore = NULL; } if (NULL != GST_stats) { GNUNET_STATISTICS_destroy (GST_stats, GNUNET_NO); GST_stats = NULL; } if (NULL != GST_my_private_key) { GNUNET_free (GST_my_private_key); GST_my_private_key = NULL; } GNUNET_CONTAINER_multipeermap_destroy (neighbours); neighbours = NULL; GNUNET_CONTAINER_multipeermap_iterate (dv_routes, &free_dv_routes_cb, NULL); GNUNET_CONTAINER_multipeermap_destroy (dv_routes); dv_routes = NULL; GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map, &free_ephemeral_cb, NULL); GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map); ephemeral_map = NULL; GNUNET_CONTAINER_heap_destroy (ephemeral_heap); ephemeral_heap = NULL; } /** * Initiate transport service. * * @param cls closure * @param c configuration to use * @param service the initialized service */ static void run (void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *service) { (void) cls; /* setup globals */ GST_cfg = c; neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg); if (NULL == GST_my_private_key) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Transport service is lacking key configuration settings. Exiting.\n")); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key, &GST_my_identity.public_key); GNUNET_log(GNUNET_ERROR_TYPE_INFO, "My identity is `%s'\n", GNUNET_i2s_full (&GST_my_identity)); GST_stats = GNUNET_STATISTICS_create ("transport", GST_cfg); GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); peerstore = GNUNET_PEERSTORE_connect (GST_cfg); if (NULL == peerstore) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } ats = GNUNET_ATS_transport_init (GST_cfg, &ats_allocation_cb, NULL, &ats_suggestion_cb, NULL); if (NULL == ats) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); return; } } /** * Define "main" method using service macro. */ GNUNET_SERVICE_MAIN ("transport", GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, &run, &client_connect_cb, &client_disconnect_cb, NULL, /* communication with core */ GNUNET_MQ_hd_fixed_size (client_start, GNUNET_MESSAGE_TYPE_TRANSPORT_START, struct StartMessage, NULL), GNUNET_MQ_hd_var_size (client_send, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, struct OutboundMessage, NULL), /* communication with communicators */ GNUNET_MQ_hd_var_size (communicator_available, GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, struct GNUNET_TRANSPORT_CommunicatorAvailableMessage, NULL), GNUNET_MQ_hd_var_size (communicator_backchannel, GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL, struct GNUNET_TRANSPORT_CommunicatorBackchannel, NULL), GNUNET_MQ_hd_var_size (add_address, GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS, struct GNUNET_TRANSPORT_AddAddressMessage, NULL), GNUNET_MQ_hd_fixed_size (del_address, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS, struct GNUNET_TRANSPORT_DelAddressMessage, NULL), GNUNET_MQ_hd_var_size (incoming_msg, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, struct GNUNET_TRANSPORT_IncomingMessage, NULL), GNUNET_MQ_hd_fixed_size (queue_create_ok, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, struct GNUNET_TRANSPORT_CreateQueueResponse, NULL), GNUNET_MQ_hd_fixed_size (queue_create_fail, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, struct GNUNET_TRANSPORT_CreateQueueResponse, NULL), GNUNET_MQ_hd_var_size (add_queue_message, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, struct GNUNET_TRANSPORT_AddQueueMessage, NULL), GNUNET_MQ_hd_var_size (address_consider_verify, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY, struct GNUNET_TRANSPORT_AddressToVerify, NULL), GNUNET_MQ_hd_fixed_size (del_queue_message, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, struct GNUNET_TRANSPORT_DelQueueMessage, NULL), GNUNET_MQ_hd_fixed_size (send_message_ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK, struct GNUNET_TRANSPORT_SendMessageToAck, NULL), /* communication with monitors */ GNUNET_MQ_hd_fixed_size (monitor_start, GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START, struct GNUNET_TRANSPORT_MonitorStart, NULL), GNUNET_MQ_handler_end ()); /* end of file gnunet-service-transport.c */