/*
This file is part of GNUnet.
Copyright (C) 2001-2017 GNUnet e.V.
GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
/**
* @file cadet/gnunet-service-cadet_channel.c
* @brief logical links between CADET clients
* @author Bartlomiej Polot
* @author Christian Grothoff
*
* TODO:
* - Congestion/flow control:
* + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
* (and figure out how/where to use this!)
* + figure out flow control without ACKs (unreliable traffic!)
* - revisit handling of 'unbuffered' traffic!
* (need to push down through tunnel into connection selection)
* - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
* reserve more bits in 'options' to allow for buffer size control?
*/
#include "platform.h"
#include "cadet.h"
#include "gnunet_statistics_service.h"
#include "gnunet-service-cadet_channel.h"
#include "gnunet-service-cadet_connection.h"
#include "gnunet-service-cadet_tunnels.h"
#include "gnunet-service-cadet_paths.h"
#define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
/**
* How long do we initially wait before retransmitting?
*/
#define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
/**
* How long do we wait before dropping state about incoming
* connection to closed port?
*/
#define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
/**
* How long do we wait at least before retransmitting ever?
*/
#define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
/**
* Maximum message ID into the future we accept for out-of-order messages.
* If the message is more than this into the future, we drop it. This is
* important both to detect values that are actually in the past, as well
* as to limit adversarially triggerable memory consumption.
*
* Note that right now we have "max_pending_messages = 4" hard-coded in
* the logic below, so a value of 4 would suffice here. But we plan to
* allow larger windows in the future...
*/
#define MAX_OUT_OF_ORDER_DISTANCE 1024
/**
* All the states a channel can be in.
*/
enum CadetChannelState
{
/**
* Uninitialized status, should never appear in operation.
*/
CADET_CHANNEL_NEW,
/**
* Channel is to a port that is not open, we're waiting for the
* port to be opened.
*/
CADET_CHANNEL_LOOSE,
/**
* CHANNEL_OPEN message sent, waiting for CHANNEL_OPEN_ACK.
*/
CADET_CHANNEL_OPEN_SENT,
/**
* Connection confirmed, ready to carry traffic.
*/
CADET_CHANNEL_READY
};
/**
* Info needed to retry a message in case it gets lost.
* Note that we DO use this structure also for unreliable
* messages.
*/
struct CadetReliableMessage
{
/**
* Double linked list, FIFO style
*/
struct CadetReliableMessage *next;
/**
* Double linked list, FIFO style
*/
struct CadetReliableMessage *prev;
/**
* Which channel is this message in?
*/
struct CadetChannel *ch;
/**
* Entry in the tunnels queue for this message, NULL if it has left
* the tunnel. Used to cancel transmission in case we receive an
* ACK in time.
*/
struct CadetTunnelQueueEntry *qe;
/**
* Data message we are trying to send.
*/
struct GNUNET_CADET_ChannelAppDataMessage *data_message;
/**
* How soon should we retry if we fail to get an ACK?
* Messages in the queue are sorted by this value.
*/
struct GNUNET_TIME_Absolute next_retry;
/**
* How long do we wait for an ACK after transmission?
* Use for the back-off calculation.
*/
struct GNUNET_TIME_Relative retry_delay;
/**
* Time when we first successfully transmitted the message
* (that is, set @e num_transmissions to 1).
*/
struct GNUNET_TIME_Absolute first_transmission_time;
/**
* Identifier of the connection that this message took when it
* was first transmitted. Only useful if @e num_transmissions is 1.
*/
struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
/**
* How often was this message transmitted? #GNUNET_SYSERR if there
* was an error transmitting the message, #GNUNET_NO if it was not
* yet transmitted ever, otherwise the number of (re) transmissions.
*/
int num_transmissions;
};
/**
* List of received out-of-order data messages.
*/
struct CadetOutOfOrderMessage
{
/**
* Double linked list, FIFO style
*/
struct CadetOutOfOrderMessage *next;
/**
* Double linked list, FIFO style
*/
struct CadetOutOfOrderMessage *prev;
/**
* ID of the message (messages up to this point needed
* before we give this one to the client).
*/
struct ChannelMessageIdentifier mid;
/**
* The envelope with the payload of the out-of-order message
*/
struct GNUNET_MQ_Envelope *env;
};
/**
* Client endpoint of a `struct CadetChannel`. A channel may be a
* loopback channel, in which case it has two of these endpoints.
* Note that flow control also is required in both directions.
*/
struct CadetChannelClient
{
/**
* Client handle. Not by itself sufficient to designate
* the client endpoint, as the same client handle may
* be used for both the owner and the destination, and
* we thus also need the channel ID to identify the client.
*/
struct CadetClient *c;
/**
* Head of DLL of messages received out of order or while client was unready.
*/
struct CadetOutOfOrderMessage *head_recv;
/**
* Tail DLL of messages received out of order or while client was unready.
*/
struct CadetOutOfOrderMessage *tail_recv;
/**
* Local tunnel number for this client.
* (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
* otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
*/
struct GNUNET_CADET_ClientChannelNumber ccn;
/**
* Number of entries currently in @a head_recv DLL.
*/
unsigned int num_recv;
/**
* Can we send data to the client?
*/
int client_ready;
};
/**
* Struct containing all information regarding a channel to a remote client.
*/
struct CadetChannel
{
/**
* Tunnel this channel is in.
*/
struct CadetTunnel *t;
/**
* Client owner of the tunnel, if any.
* (Used if this channel represends the initiating end of the tunnel.)
*/
struct CadetChannelClient *owner;
/**
* Client destination of the tunnel, if any.
* (Used if this channel represents the listening end of the tunnel.)
*/
struct CadetChannelClient *dest;
/**
* Last entry in the tunnel's queue relating to control messages
* (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
* #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
* transmission in case we receive updated information.
*/
struct CadetTunnelQueueEntry *last_control_qe;
/**
* Head of DLL of messages sent and not yet ACK'd.
*/
struct CadetReliableMessage *head_sent;
/**
* Tail of DLL of messages sent and not yet ACK'd.
*/
struct CadetReliableMessage *tail_sent;
/**
* Task to resend/poll in case no ACK is received.
*/
struct GNUNET_SCHEDULER_Task *retry_control_task;
/**
* Task to resend/poll in case no ACK is received.
*/
struct GNUNET_SCHEDULER_Task *retry_data_task;
/**
* Last time the channel was used
*/
struct GNUNET_TIME_Absolute timestamp;
/**
* Destination port of the channel.
*/
struct GNUNET_HashCode port;
/**
* Hash'ed port of the channel with initiator and destination PID.
*/
struct GNUNET_HashCode h_port;
/**
* Counter for exponential backoff.
*/
struct GNUNET_TIME_Relative retry_time;
/**
* Bitfield of already-received messages past @e mid_recv.
*/
uint64_t mid_futures;
/**
* Next MID expected for incoming traffic.
*/
struct ChannelMessageIdentifier mid_recv;
/**
* Next MID to use for outgoing traffic.
*/
struct ChannelMessageIdentifier mid_send;
/**
* Total (reliable) messages pending ACK for this channel.
*/
unsigned int pending_messages;
/**
* Maximum (reliable) messages pending ACK for this channel
* before we throttle the client.
*/
unsigned int max_pending_messages;
/**
* Number identifying this channel in its tunnel.
*/
struct GNUNET_CADET_ChannelTunnelNumber ctn;
/**
* Channel state.
*/
enum CadetChannelState state;
/**
* Count how many ACKs we skipped, used to prevent long
* sequences of ACK skipping.
*/
unsigned int skip_ack_series;
/**
* Is the tunnel bufferless (minimum latency)?
*/
int nobuffer;
/**
* Is the tunnel reliable?
*/
int reliable;
/**
* Is the tunnel out-of-order?
*/
int out_of_order;
/**
* Is this channel a loopback channel, where the destination is us again?
*/
int is_loopback;
/**
* Flag to signal the destruction of the channel. If this is set to
* #GNUNET_YES the channel will be destroyed once the queue is
* empty.
*/
int destroy;
};
/**
* Get the static string for identification of the channel.
*
* @param ch Channel.
*
* @return Static string with the channel IDs.
*/
const char *
GCCH_2s (const struct CadetChannel *ch)
{
static char buf[128];
GNUNET_snprintf (buf,
sizeof (buf),
"Channel %s:%s ctn:%X(%X/%X)",
(GNUNET_YES == ch->is_loopback)
? "loopback"
: GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
GNUNET_h2s (&ch->port),
ch->ctn,
(NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
(NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
return buf;
}
/**
* Hash the @a port and @a initiator and @a listener to
* calculate the "challenge" @a h_port we send to the other
* peer on #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN.
*
* @param[out] h_port set to the hash of @a port, @a initiator and @a listener
* @param port cadet port, as seen by CADET clients
* @param listener peer that is listining on @a port
*/
void
GCCH_hash_port (struct GNUNET_HashCode *h_port,
const struct GNUNET_HashCode *port,
const struct GNUNET_PeerIdentity *listener)
{
struct GNUNET_HashContext *hc;
hc = GNUNET_CRYPTO_hash_context_start ();
GNUNET_CRYPTO_hash_context_read (hc,
port,
sizeof (*port));
GNUNET_CRYPTO_hash_context_read (hc,
listener,
sizeof (*listener));
GNUNET_CRYPTO_hash_context_finish (hc,
h_port);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Calculated port hash %s\n",
GNUNET_h2s (h_port));
}
/**
* Get the channel's public ID.
*
* @param ch Channel.
*
* @return ID used to identify the channel with the remote peer.
*/
struct GNUNET_CADET_ChannelTunnelNumber
GCCH_get_id (const struct CadetChannel *ch)
{
return ch->ctn;
}
/**
* Release memory associated with @a ccc
*
* @param ccc data structure to clean up
*/
static void
free_channel_client (struct CadetChannelClient *ccc)
{
struct CadetOutOfOrderMessage *com;
while (NULL != (com = ccc->head_recv))
{
GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
ccc->tail_recv,
com);
ccc->num_recv--;
GNUNET_MQ_discard (com->env);
GNUNET_free (com);
}
GNUNET_free (ccc);
}
/**
* Destroy the given channel.
*
* @param ch channel to destroy
*/
static void
channel_destroy (struct CadetChannel *ch)
{
struct CadetReliableMessage *crm;
while (NULL != (crm = ch->head_sent))
{
GNUNET_assert (ch == crm->ch);
if (NULL != crm->qe)
{
GCT_send_cancel (crm->qe);
crm->qe = NULL;
}
GNUNET_CONTAINER_DLL_remove (ch->head_sent,
ch->tail_sent,
crm);
GNUNET_free (crm->data_message);
GNUNET_free (crm);
}
if (NULL != ch->owner)
{
free_channel_client (ch->owner);
ch->owner = NULL;
}
if (NULL != ch->dest)
{
free_channel_client (ch->dest);
ch->dest = NULL;
}
if (NULL != ch->last_control_qe)
{
GCT_send_cancel (ch->last_control_qe);
ch->last_control_qe = NULL;
}
if (NULL != ch->retry_data_task)
{
GNUNET_SCHEDULER_cancel (ch->retry_data_task);
ch->retry_data_task = NULL;
}
if (NULL != ch->retry_control_task)
{
GNUNET_SCHEDULER_cancel (ch->retry_control_task);
ch->retry_control_task = NULL;
}
if (GNUNET_NO == ch->is_loopback)
{
GCT_remove_channel (ch->t,
ch,
ch->ctn);
ch->t = NULL;
}
GNUNET_free (ch);
}
/**
* Send a channel create message.
*
* @param cls Channel for which to send.
*/
static void
send_channel_open (void *cls);
/**
* Function called once the tunnel confirms that we sent the
* create message. Delays for a bit until we retry.
*
* @param cls our `struct CadetChannel`.
* @param cid identifier of the connection within the tunnel, NULL
* if transmission failed
*/
static void
channel_open_sent_cb (void *cls,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
struct CadetChannel *ch = cls;
GNUNET_assert (NULL != ch->last_control_qe);
ch->last_control_qe = NULL;
ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
GCCH_2s (ch),
GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
GNUNET_YES));
ch->retry_control_task
= GNUNET_SCHEDULER_add_delayed (ch->retry_time,
&send_channel_open,
ch);
}
/**
* Send a channel open message.
*
* @param cls Channel for which to send.
*/
static void
send_channel_open (void *cls)
{
struct CadetChannel *ch = cls;
struct GNUNET_CADET_ChannelOpenMessage msgcc;
uint32_t options;
ch->retry_control_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending CHANNEL_OPEN message for %s\n",
GCCH_2s (ch));
options = 0;
if (ch->nobuffer)
options |= GNUNET_CADET_OPTION_NOBUFFER;
if (ch->reliable)
options |= GNUNET_CADET_OPTION_RELIABLE;
if (ch->out_of_order)
options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
msgcc.header.size = htons (sizeof (msgcc));
msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
msgcc.opt = htonl (options);
msgcc.h_port = ch->h_port;
msgcc.ctn = ch->ctn;
ch->state = CADET_CHANNEL_OPEN_SENT;
if (NULL != ch->last_control_qe)
GCT_send_cancel (ch->last_control_qe);
ch->last_control_qe = GCT_send (ch->t,
&msgcc.header,
&channel_open_sent_cb,
ch);
GNUNET_assert (NULL == ch->retry_control_task);
}
/**
* Function called once and only once after a channel was bound
* to its tunnel via #GCT_add_channel() is ready for transmission.
* Note that this is only the case for channels that this peer
* initiates, as for incoming channels we assume that they are
* ready for transmission immediately upon receiving the open
* message. Used to bootstrap the #GCT_send() process.
*
* @param ch the channel for which the tunnel is now ready
*/
void
GCCH_tunnel_up (struct CadetChannel *ch)
{
GNUNET_assert (NULL == ch->retry_control_task);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Tunnel up, sending CHANNEL_OPEN on %s now\n",
GCCH_2s (ch));
ch->retry_control_task
= GNUNET_SCHEDULER_add_now (&send_channel_open,
ch);
}
/**
* Create a new channel.
*
* @param owner local client owning the channel
* @param ccn local number of this channel at the @a owner
* @param destination peer to which we should build the channel
* @param port desired port at @a destination
* @param options options for the channel
* @return handle to the new channel
*/
struct CadetChannel *
GCCH_channel_local_new (struct CadetClient *owner,
struct GNUNET_CADET_ClientChannelNumber ccn,
struct CadetPeer *destination,
const struct GNUNET_HashCode *port,
uint32_t options)
{
struct CadetChannel *ch;
struct CadetChannelClient *ccco;
ccco = GNUNET_new (struct CadetChannelClient);
ccco->c = owner;
ccco->ccn = ccn;
ccco->client_ready = GNUNET_YES;
ch = GNUNET_new (struct CadetChannel);
ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
ch->owner = ccco;
ch->port = *port;
GCCH_hash_port (&ch->h_port,
port,
GCP_get_id (destination));
if (0 == memcmp (&my_full_id,
GCP_get_id (destination),
sizeof (struct GNUNET_PeerIdentity)))
{
struct OpenPort *op;
ch->is_loopback = GNUNET_YES;
op = GNUNET_CONTAINER_multihashmap_get (open_ports,
&ch->h_port);
if (NULL == op)
{
/* port closed, wait for it to possibly open */
ch->state = CADET_CHANNEL_LOOSE;
(void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
&ch->h_port,
ch,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Created loose incoming loopback channel to port %s\n",
GNUNET_h2s (&ch->port));
}
else
{
GCCH_bind (ch,
op->c,
&op->port);
}
}
else
{
ch->t = GCP_get_tunnel (destination,
GNUNET_YES);
ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
ch->ctn = GCT_add_channel (ch->t,
ch);
}
GNUNET_STATISTICS_update (stats,
"# channels",
1,
GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Created channel to port %s at peer %s for %s using %s\n",
GNUNET_h2s (port),
GCP_2s (destination),
GSC_2s (owner),
(GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
return ch;
}
/**
* We had an incoming channel to a port that is closed.
* It has not been opened for a while, drop it.
*
* @param cls the channel to drop
*/
static void
timeout_closed_cb (void *cls)
{
struct CadetChannel *ch = cls;
ch->retry_control_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Closing incoming channel to port %s from peer %s due to timeout\n",
GNUNET_h2s (&ch->port),
GCP_2s (GCT_get_destination (ch->t)));
channel_destroy (ch);
}
/**
* Create a new channel based on a request coming in over the network.
*
* @param t tunnel to the remote peer
* @param ctn identifier of this channel in the tunnel
* @param h_port desired hash of local port
* @param options options for the channel
* @return handle to the new channel
*/
struct CadetChannel *
GCCH_channel_incoming_new (struct CadetTunnel *t,
struct GNUNET_CADET_ChannelTunnelNumber ctn,
const struct GNUNET_HashCode *h_port,
uint32_t options)
{
struct CadetChannel *ch;
struct OpenPort *op;
ch = GNUNET_new (struct CadetChannel);
ch->h_port = *h_port;
ch->t = t;
ch->ctn = ctn;
ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
GNUNET_STATISTICS_update (stats,
"# channels",
1,
GNUNET_NO);
op = GNUNET_CONTAINER_multihashmap_get (open_ports,
h_port);
if (NULL == op)
{
/* port closed, wait for it to possibly open */
ch->state = CADET_CHANNEL_LOOSE;
(void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
&ch->h_port,
ch,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_assert (NULL == ch->retry_control_task);
ch->retry_control_task
= GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
&timeout_closed_cb,
ch);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Created loose incoming channel to port %s from peer %s\n",
GNUNET_h2s (&ch->port),
GCP_2s (GCT_get_destination (ch->t)));
}
else
{
GCCH_bind (ch,
op->c,
&op->port);
}
GNUNET_STATISTICS_update (stats,
"# channels",
1,
GNUNET_NO);
return ch;
}
/**
* Function called once the tunnel confirms that we sent the
* ACK message. Just remembers it was sent, we do not expect
* ACKs for ACKs ;-).
*
* @param cls our `struct CadetChannel`.
* @param cid identifier of the connection within the tunnel, NULL
* if transmission failed
*/
static void
send_ack_cb (void *cls,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
struct CadetChannel *ch = cls;
GNUNET_assert (NULL != ch->last_control_qe);
ch->last_control_qe = NULL;
}
/**
* Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
*
* @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
*/
static void
send_channel_data_ack (struct CadetChannel *ch)
{
struct GNUNET_CADET_ChannelDataAckMessage msg;
if (GNUNET_NO == ch->reliable)
return; /* no ACKs */
msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
msg.header.size = htons (sizeof (msg));
msg.ctn = ch->ctn;
msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
msg.futures = GNUNET_htonll (ch->mid_futures);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending DATA_ACK %u:%llX via %s\n",
(unsigned int) ntohl (msg.mid.mid),
(unsigned long long) ch->mid_futures,
GCCH_2s (ch));
if (NULL != ch->last_control_qe)
GCT_send_cancel (ch->last_control_qe);
ch->last_control_qe = GCT_send (ch->t,
&msg.header,
&send_ack_cb,
ch);
}
/**
* Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
* connection is up.
*
* @param cls the `struct CadetChannel`
*/
static void
send_open_ack (void *cls)
{
struct CadetChannel *ch = cls;
struct GNUNET_CADET_ChannelOpenAckMessage msg;
ch->retry_control_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending CHANNEL_OPEN_ACK on %s\n",
GCCH_2s (ch));
msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
msg.header.size = htons (sizeof (msg));
msg.reserved = htonl (0);
msg.ctn = ch->ctn;
msg.port = ch->port;
if (NULL != ch->last_control_qe)
GCT_send_cancel (ch->last_control_qe);
ch->last_control_qe = GCT_send (ch->t,
&msg.header,
&send_ack_cb,
ch);
}
/**
* We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
* this channel. If the binding was successful, (re)transmit the
* #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
*
* @param ch channel that got the duplicate open
* @param cti identifier of the connection that delivered the message
*/
void
GCCH_handle_duplicate_open (struct CadetChannel *ch,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
{
if (NULL == ch->dest)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n",
GCCH_2s (ch));
return;
}
if (NULL != ch->retry_control_task)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n",
GCCH_2s (ch));
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Retransmitting CHANNEL_OPEN_ACK on %s\n",
GCCH_2s (ch));
ch->retry_control_task
= GNUNET_SCHEDULER_add_now (&send_open_ack,
ch);
}
/**
* Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
*
* @param ch channel the ack is for
* @param to_owner #GNUNET_YES to send to owner,
* #GNUNET_NO to send to dest
*/
static void
send_ack_to_client (struct CadetChannel *ch,
int to_owner)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_LocalAck *ack;
struct CadetChannelClient *ccc;
ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
if (NULL == ccc)
{
/* This can happen if we are just getting ACKs after
our local client already disconnected. */
GNUNET_assert (GNUNET_YES == ch->destroy);
return;
}
env = GNUNET_MQ_msg (ack,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
ack->ccn = ccc->ccn;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
GSC_2s (ccc->c),
(GNUNET_YES == to_owner) ? "owner" : "dest",
ntohl (ack->ccn.channel_of_client),
ch->pending_messages,
ch->max_pending_messages);
GSC_send_to_client (ccc->c,
env);
}
/**
* A client is bound to the port that we have a channel
* open to. Send the acknowledgement for the connection
* request and establish the link with the client.
*
* @param ch open incoming channel
* @param c client listening on the respective @a port
* @param port the port @a is listening on
*/
void
GCCH_bind (struct CadetChannel *ch,
struct CadetClient *c,
const struct GNUNET_HashCode *port)
{
uint32_t options;
struct CadetChannelClient *cccd;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Binding %s from %s to port %s of %s\n",
GCCH_2s (ch),
GCT_2s (ch->t),
GNUNET_h2s (&ch->port),
GSC_2s (c));
if (NULL != ch->retry_control_task)
{
/* there might be a timeout task here */
GNUNET_SCHEDULER_cancel (ch->retry_control_task);
ch->retry_control_task = NULL;
}
options = 0;
if (ch->nobuffer)
options |= GNUNET_CADET_OPTION_NOBUFFER;
if (ch->reliable)
options |= GNUNET_CADET_OPTION_RELIABLE;
if (ch->out_of_order)
options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
cccd = GNUNET_new (struct CadetChannelClient);
GNUNET_assert (NULL == ch->dest);
ch->dest = cccd;
ch->port = *port;
cccd->c = c;
cccd->client_ready = GNUNET_YES;
cccd->ccn = GSC_bind (c,
ch,
(GNUNET_YES == ch->is_loopback)
? GCP_get (&my_full_id,
GNUNET_YES)
: GCT_get_destination (ch->t),
port,
options);
GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
if (GNUNET_YES == ch->is_loopback)
{
ch->state = CADET_CHANNEL_OPEN_SENT;
GCCH_handle_channel_open_ack (ch,
NULL,
port);
}
else
{
/* notify other peer that we accepted the connection */
ch->state = CADET_CHANNEL_READY;
ch->retry_control_task
= GNUNET_SCHEDULER_add_now (&send_open_ack,
ch);
}
/* give client it's initial supply of ACKs */
GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
for (unsigned int i=0;imax_pending_messages;i++)
send_ack_to_client (ch,
GNUNET_NO);
}
/**
* One of our clients has disconnected, tell the other one that we
* are finished. Done asynchronously to avoid concurrent modification
* issues if this is the same client.
*
* @param cls the `struct CadetChannel` where one of the ends is now dead
*/
static void
signal_remote_destroy_cb (void *cls)
{
struct CadetChannel *ch = cls;
struct CadetChannelClient *ccc;
/* Find which end is left... */
ch->retry_control_task = NULL;
ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
GSC_handle_remote_channel_destroy (ccc->c,
ccc->ccn,
ch);
channel_destroy (ch);
}
/**
* Destroy locally created channel. Called by the local client, so no
* need to tell the client.
*
* @param ch channel to destroy
* @param c client that caused the destruction
* @param ccn client number of the client @a c
*/
void
GCCH_channel_local_destroy (struct CadetChannel *ch,
struct CadetClient *c,
struct GNUNET_CADET_ClientChannelNumber ccn)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s asks for destruction of %s\n",
GSC_2s (c),
GCCH_2s (ch));
GNUNET_assert (NULL != c);
if ( (NULL != ch->owner) &&
(c == ch->owner->c) &&
(ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
{
free_channel_client (ch->owner);
ch->owner = NULL;
}
else if ( (NULL != ch->dest) &&
(c == ch->dest->c) &&
(ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
{
free_channel_client (ch->dest);
ch->dest = NULL;
}
else
{
GNUNET_assert (0);
}
if (GNUNET_YES == ch->destroy)
{
/* other end already destroyed, with the local client gone, no need
to finish transmissions, just destroy immediately. */
channel_destroy (ch);
return;
}
if ( (NULL != ch->head_sent) &&
( (NULL != ch->owner) ||
(NULL != ch->dest) ) )
{
/* Wait for other end to destroy us as well,
and otherwise allow send queue to be transmitted first */
ch->destroy = GNUNET_YES;
return;
}
if ( (GNUNET_YES == ch->is_loopback) &&
( (NULL != ch->owner) ||
(NULL != ch->dest) ) )
{
if (NULL != ch->retry_control_task)
GNUNET_SCHEDULER_cancel (ch->retry_control_task);
ch->retry_control_task
= GNUNET_SCHEDULER_add_now (&signal_remote_destroy_cb,
ch);
return;
}
if (GNUNET_NO == ch->is_loopback)
{
/* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
switch (ch->state)
{
case CADET_CHANNEL_NEW:
/* We gave up on a channel that we created as a client to a remote
target, but that never went anywhere. Nothing to do here. */
break;
case CADET_CHANNEL_LOOSE:
GSC_drop_loose_channel (&ch->h_port,
ch);
break;
default:
GCT_send_channel_destroy (ch->t,
ch->ctn);
}
}
/* Nothing left to do, just finish destruction */
channel_destroy (ch);
}
/**
* We got an acknowledgement for the creation of the channel
* (the port is open on the other side). Verify that the
* other end really has the right port, and begin transmissions.
*
* @param ch channel to destroy
* @param cti identifier of the connection that delivered the message
* @param port port number (needed to verify receiver knows the port)
*/
void
GCCH_handle_channel_open_ack (struct CadetChannel *ch,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
const struct GNUNET_HashCode *port)
{
switch (ch->state)
{
case CADET_CHANNEL_NEW:
/* this should be impossible */
GNUNET_break (0);
break;
case CADET_CHANNEL_LOOSE:
/* This makes no sense. */
GNUNET_break_op (0);
break;
case CADET_CHANNEL_OPEN_SENT:
if (NULL == ch->owner)
{
/* We're not the owner, wrong direction! */
GNUNET_break_op (0);
return;
}
if (0 != memcmp (&ch->port,
port,
sizeof (struct GNUNET_HashCode)))
{
/* Other peer failed to provide the right port,
refuse connection. */
GNUNET_break_op (0);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
GCCH_2s (ch));
if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
{
GNUNET_SCHEDULER_cancel (ch->retry_control_task);
ch->retry_control_task = NULL;
}
ch->state = CADET_CHANNEL_READY;
/* On first connect, send client as many ACKs as we allow messages
to be buffered! */
for (unsigned int i=0;imax_pending_messages;i++)
send_ack_to_client (ch,
GNUNET_YES);
break;
case CADET_CHANNEL_READY:
/* duplicate ACK, maybe we retried the CREATE. Ignore. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received duplicate channel OPEN_ACK for %s\n",
GCCH_2s (ch));
GNUNET_STATISTICS_update (stats,
"# duplicate CREATE_ACKs",
1,
GNUNET_NO);
break;
}
}
/**
* Test if element @a e1 comes before element @a e2.
*
* @param cls closure, to a flag where we indicate duplicate packets
* @param m1 a message of to sort
* @param m2 another message to sort
* @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
*/
static int
is_before (void *cls,
struct CadetOutOfOrderMessage *m1,
struct CadetOutOfOrderMessage *m2)
{
int *duplicate = cls;
uint32_t v1 = ntohl (m1->mid.mid);
uint32_t v2 = ntohl (m2->mid.mid);
uint32_t delta;
delta = v2 - v1;
if (0 == delta)
*duplicate = GNUNET_YES;
if (delta > (uint32_t) INT_MAX)
{
/* in overflow range, we can safely assume we wrapped around */
return GNUNET_NO;
}
else
{
/* result is small, thus v2 > v1, thus m1 < m2 */
return GNUNET_YES;
}
}
/**
* We got payload data for a channel. Pass it on to the client
* and send an ACK to the other end (once flow control allows it!)
*
* @param ch channel that got data
* @param cti identifier of the connection that delivered the message
* @param msg message that was received
*/
void
GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
const struct GNUNET_CADET_ChannelAppDataMessage *msg)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_LocalData *ld;
struct CadetChannelClient *ccc;
size_t payload_size;
struct CadetOutOfOrderMessage *com;
int duplicate;
uint32_t mid_min;
uint32_t mid_max;
uint32_t mid_msg;
uint32_t delta;
GNUNET_assert (GNUNET_NO == ch->is_loopback);
if ( (NULL == ch->owner) &&
(NULL == ch->dest) )
{
/* This client is gone, but we still have messages to send to
the other end (which is why @a ch is not yet dead). However,
we cannot pass messages to our client anymore. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Dropping incoming payload on %s as this end is already closed\n",
GCCH_2s (ch));
/* send back DESTROY notification to stop further retransmissions! */
if (GNUNET_YES == ch->destroy)
GCT_send_channel_destroy (ch->t,
ch->ctn);
return;
}
payload_size = ntohs (msg->header.size) - sizeof (*msg);
env = GNUNET_MQ_msg_extra (ld,
payload_size,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
GNUNET_memcpy (&ld[1],
&msg[1],
payload_size);
ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
if ( (GNUNET_YES == ccc->client_ready) &&
( (GNUNET_YES == ch->out_of_order) ||
(msg->mid.mid == ch->mid_recv.mid) ) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Giving %u bytes of payload with MID %u from %s to client %s\n",
(unsigned int) payload_size,
ntohl (msg->mid.mid),
GCCH_2s (ch),
GSC_2s (ccc->c));
ccc->client_ready = GNUNET_NO;
GSC_send_to_client (ccc->c,
env);
ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
ch->mid_futures >>= 1;
send_channel_data_ack (ch);
return;
}
if (GNUNET_YES == ch->reliable)
{
/* check if message ought to be dropped because it is ancient/too distant/duplicate */
mid_min = ntohl (ch->mid_recv.mid);
mid_max = mid_min + ch->max_pending_messages;
mid_msg = ntohl (msg->mid.mid);
if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s at %u drops ancient or far-future message %u\n",
GCCH_2s (ch),
(unsigned int) mid_min,
ntohl (msg->mid.mid));
GNUNET_STATISTICS_update (stats,
"# duplicate DATA (ancient or future)",
1,
GNUNET_NO);
GNUNET_MQ_discard (env);
send_channel_data_ack (ch);
return;
}
/* mark bit for future ACKs */
delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
if (delta < 64)
{
if (0 != (ch->mid_futures & (1LLU << delta)))
{
/* Duplicate within the queue, drop also */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Duplicate payload of %u bytes on %s (mid %u) dropped\n",
(unsigned int) payload_size,
GCCH_2s (ch),
ntohl (msg->mid.mid));
GNUNET_STATISTICS_update (stats,
"# duplicate DATA",
1,
GNUNET_NO);
GNUNET_MQ_discard (env);
send_channel_data_ack (ch);
return;
}
ch->mid_futures |= (1LLU << delta);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Marked bit %llX for mid %u (base: %u); now: %llX\n",
(1LLU << delta),
mid_msg,
mid_min,
ch->mid_futures);
}
}
else /* ! ch->reliable */
{
/* Channel is unreliable, so we do not ACK. But we also cannot
allow buffering everything, so check if we have space... */
if (ccc->num_recv >= ch->max_pending_messages)
{
struct CadetOutOfOrderMessage *drop;
/* Yep, need to drop. Drop the oldest message in
the buffer. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queue full due slow client on %s, dropping oldest message\n",
GCCH_2s (ch));
GNUNET_STATISTICS_update (stats,
"# messages dropped due to slow client",
1,
GNUNET_NO);
drop = ccc->head_recv;
GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
ccc->tail_recv,
drop);
ccc->num_recv--;
GNUNET_MQ_discard (drop->env);
GNUNET_free (drop);
}
}
/* Insert message into sorted out-of-order queue */
com = GNUNET_new (struct CadetOutOfOrderMessage);
com->mid = msg->mid;
com->env = env;
duplicate = GNUNET_NO;
GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
is_before,
&duplicate,
ccc->head_recv,
ccc->tail_recv,
com);
ccc->num_recv++;
if (GNUNET_YES == duplicate)
{
/* Duplicate within the queue, drop also (this is not covered by
the case above if "delta" >= 64, which could be the case if
max_pending_messages is also >= 64 or if our client is unready
and we are seeing retransmissions of the message our client is
blocked on. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Duplicate payload of %u bytes on %s (mid %u) dropped\n",
(unsigned int) payload_size,
GCCH_2s (ch),
ntohl (msg->mid.mid));
GNUNET_STATISTICS_update (stats,
"# duplicate DATA",
1,
GNUNET_NO);
GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
ccc->tail_recv,
com);
ccc->num_recv--;
GNUNET_MQ_discard (com->env);
GNUNET_free (com);
send_channel_data_ack (ch);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
(GNUNET_YES == ccc->client_ready)
? "out-of-order"
: "client-not-ready",
(unsigned int) payload_size,
GCCH_2s (ch),
ntohl (ccc->ccn.channel_of_client),
ccc,
ntohl (msg->mid.mid),
ntohl (ch->mid_recv.mid));
/* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
the sender may already be transmitting the previous one. Needs
experimental evaluation to see if/when this ACK helps or
hurts. (We might even want another option.) */
send_channel_data_ack (ch);
}
/**
* Function called once the tunnel has sent one of our messages.
* If the message is unreliable, simply frees the `crm`. If the
* message was reliable, calculate retransmission time and
* wait for ACK (or retransmit).
*
* @param cls the `struct CadetReliableMessage` that was sent
* @param cid identifier of the connection within the tunnel, NULL
* if transmission failed
*/
static void
data_sent_cb (void *cls,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
/**
* We need to retry a transmission, the last one took too long to
* be acknowledged.
*
* @param cls the `struct CadetChannel` where we need to retransmit
*/
static void
retry_transmission (void *cls)
{
struct CadetChannel *ch = cls;
struct CadetReliableMessage *crm = ch->head_sent;
ch->retry_data_task = NULL;
GNUNET_assert (NULL == crm->qe);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Retrying transmission on %s of message %u\n",
GCCH_2s (ch),
(unsigned int) ntohl (crm->data_message->mid.mid));
crm->qe = GCT_send (ch->t,
&crm->data_message->header,
&data_sent_cb,
crm);
GNUNET_assert (NULL == ch->retry_data_task);
}
/**
* We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
* the queue and tell our client that it can send more.
*
* @param ch the channel that got the PLAINTEXT_DATA_ACK
* @param cti identifier of the connection that delivered the message
* @param crm the message that got acknowledged
*/
static void
handle_matching_ack (struct CadetChannel *ch,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
struct CadetReliableMessage *crm)
{
GNUNET_CONTAINER_DLL_remove (ch->head_sent,
ch->tail_sent,
crm);
ch->pending_messages--;
GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
GCCH_2s (ch),
(unsigned int) ntohl (crm->data_message->mid.mid),
ch->pending_messages);
if (NULL != crm->qe)
{
GCT_send_cancel (crm->qe);
crm->qe = NULL;
}
if ( (1 == crm->num_transmissions) &&
(NULL != cti) )
{
GCC_ack_observed (cti);
if (0 == memcmp (cti,
&crm->connection_taken,
sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
{
GCC_latency_observed (cti,
GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
}
}
GNUNET_free (crm->data_message);
GNUNET_free (crm);
send_ack_to_client (ch,
(NULL == ch->owner)
? GNUNET_NO
: GNUNET_YES);
}
/**
* We got an acknowledgement for payload data for a channel.
* Possibly resume transmissions.
*
* @param ch channel that got the ack
* @param cti identifier of the connection that delivered the message
* @param ack details about what was received
*/
void
GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
const struct GNUNET_CADET_ChannelDataAckMessage *ack)
{
struct CadetReliableMessage *crm;
struct CadetReliableMessage *crmn;
int found;
uint32_t mid_base;
uint64_t mid_mask;
unsigned int delta;
GNUNET_break (GNUNET_NO == ch->is_loopback);
if (GNUNET_NO == ch->reliable)
{
/* not expecting ACKs on unreliable channel, odd */
GNUNET_break_op (0);
return;
}
/* mid_base is the MID of the next message that the
other peer expects (i.e. that is missing!), everything
LOWER (but excluding mid_base itself) was received. */
mid_base = ntohl (ack->mid.mid);
mid_mask = GNUNET_htonll (ack->futures);
found = GNUNET_NO;
for (crm = ch->head_sent;
NULL != crm;
crm = crmn)
{
crmn = crm->next;
delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
if (delta >= UINT_MAX - ch->max_pending_messages)
{
/* overflow, means crm was a bit in the past, so this ACK counts for it. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got DATA_ACK with base %u satisfying past message %u on %s\n",
(unsigned int) mid_base,
ntohl (crm->data_message->mid.mid),
GCCH_2s (ch));
handle_matching_ack (ch,
cti,
crm);
found = GNUNET_YES;
continue;
}
delta--;
if (delta >= 64)
continue;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Testing bit %llX for mid %u (base: %u)\n",
(1LLU << delta),
ntohl (crm->data_message->mid.mid),
mid_base);
if (0 != (mid_mask & (1LLU << delta)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got DATA_ACK with mask for %u on %s\n",
ntohl (crm->data_message->mid.mid),
GCCH_2s (ch));
handle_matching_ack (ch,
cti,
crm);
found = GNUNET_YES;
}
}
if (GNUNET_NO == found)
{
/* ACK for message we already dropped, might have been a
duplicate ACK? Ignore. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Duplicate DATA_ACK on %s, ignoring\n",
GCCH_2s (ch));
GNUNET_STATISTICS_update (stats,
"# duplicate DATA_ACKs",
1,
GNUNET_NO);
return;
}
if (NULL != ch->retry_data_task)
{
GNUNET_SCHEDULER_cancel (ch->retry_data_task);
ch->retry_data_task = NULL;
}
if ( (NULL != ch->head_sent) &&
(NULL == ch->head_sent->qe) )
ch->retry_data_task
= GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
&retry_transmission,
ch);
}
/**
* Destroy channel, based on the other peer closing the
* connection. Also needs to remove this channel from
* the tunnel.
*
* @param ch channel to destroy
* @param cti identifier of the connection that delivered the message,
* NULL if we are simulating receiving a destroy due to shutdown
*/
void
GCCH_handle_remote_destroy (struct CadetChannel *ch,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
{
struct CadetChannelClient *ccc;
GNUNET_assert (GNUNET_NO == ch->is_loopback);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received remote channel DESTROY for %s\n",
GCCH_2s (ch));
if (GNUNET_YES == ch->destroy)
{
/* Local client already gone, this is instant-death. */
channel_destroy (ch);
return;
}
ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
if ( (NULL != ccc) &&
(NULL != ccc->head_recv) )
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Lost end of transmission due to remote shutdown on %s\n",
GCCH_2s (ch));
/* FIXME: change API to notify client about truncated transmission! */
}
ch->destroy = GNUNET_YES;
if (NULL != ccc)
GSC_handle_remote_channel_destroy (ccc->c,
ccc->ccn,
ch);
channel_destroy (ch);
}
/**
* Test if element @a e1 comes before element @a e2.
*
* @param cls closure, to a flag where we indicate duplicate packets
* @param crm1 an element of to sort
* @param crm2 another element to sort
* @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
*/
static int
cmp_crm_by_next_retry (void *cls,
struct CadetReliableMessage *crm1,
struct CadetReliableMessage *crm2)
{
if (crm1->next_retry.abs_value_us <
crm2->next_retry.abs_value_us)
return GNUNET_YES;
return GNUNET_NO;
}
/**
* Function called once the tunnel has sent one of our messages.
* If the message is unreliable, simply frees the `crm`. If the
* message was reliable, calculate retransmission time and
* wait for ACK (or retransmit).
*
* @param cls the `struct CadetReliableMessage` that was sent
* @param cid identifier of the connection within the tunnel, NULL
* if transmission failed
*/
static void
data_sent_cb (void *cls,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
struct CadetReliableMessage *crm = cls;
struct CadetChannel *ch = crm->ch;
GNUNET_assert (GNUNET_NO == ch->is_loopback);
GNUNET_assert (NULL != crm->qe);
crm->qe = NULL;
GNUNET_CONTAINER_DLL_remove (ch->head_sent,
ch->tail_sent,
crm);
if (GNUNET_NO == ch->reliable)
{
GNUNET_free (crm->data_message);
GNUNET_free (crm);
ch->pending_messages--;
send_ack_to_client (ch,
(NULL == ch->owner)
? GNUNET_NO
: GNUNET_YES);
return;
}
if (NULL == cid)
{
/* There was an error sending. */
crm->num_transmissions = GNUNET_SYSERR;
}
else if (GNUNET_SYSERR != crm->num_transmissions)
{
/* Increment transmission counter, and possibly store @a cid
if this was the first transmission. */
crm->num_transmissions++;
if (1 == crm->num_transmissions)
{
crm->first_transmission_time = GNUNET_TIME_absolute_get ();
crm->connection_taken = *cid;
GCC_ack_expected (cid);
}
}
if ( (0 == crm->retry_delay.rel_value_us) &&
(NULL != cid) )
{
struct CadetConnection *cc = GCC_lookup (cid);
if (NULL != cc)
crm->retry_delay = GCC_get_metrics (cc)->aged_latency;
else
crm->retry_delay = ch->retry_time;
}
crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
MIN_RTT_DELAY);
crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
cmp_crm_by_next_retry,
NULL,
ch->head_sent,
ch->tail_sent,
crm);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Message %u sent, next transmission on %s in %s\n",
(unsigned int) ntohl (crm->data_message->mid.mid),
GCCH_2s (ch),
GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
GNUNET_YES));
if (NULL == ch->head_sent->qe)
{
if (NULL != ch->retry_data_task)
GNUNET_SCHEDULER_cancel (ch->retry_data_task);
ch->retry_data_task
= GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
&retry_transmission,
ch);
}
}
/**
* Handle data given by a client.
*
* Check whether the client is allowed to send in this tunnel, save if
* channel is reliable and send an ACK to the client if there is still
* buffer space in the tunnel.
*
* @param ch Channel.
* @param sender_ccn ccn of the sender
* @param buf payload to transmit.
* @param buf_len number of bytes in @a buf
* @return #GNUNET_OK if everything goes well,
* #GNUNET_SYSERR in case of an error.
*/
int
GCCH_handle_local_data (struct CadetChannel *ch,
struct GNUNET_CADET_ClientChannelNumber sender_ccn,
const char *buf,
size_t buf_len)
{
struct CadetReliableMessage *crm;
if (ch->pending_messages >= ch->max_pending_messages)
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
if (GNUNET_YES == ch->destroy)
{
/* we are going down, drop messages */
return GNUNET_OK;
}
ch->pending_messages++;
if (GNUNET_YES == ch->is_loopback)
{
struct CadetChannelClient *receiver;
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_LocalData *ld;
int ack_to_owner;
env = GNUNET_MQ_msg_extra (ld,
buf_len,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
if ( (NULL != ch->owner) &&
(sender_ccn.channel_of_client ==
ch->owner->ccn.channel_of_client) )
{
receiver = ch->dest;
ack_to_owner = GNUNET_YES;
}
else if ( (NULL != ch->dest) &&
(sender_ccn.channel_of_client ==
ch->dest->ccn.channel_of_client) )
{
receiver = ch->owner;
ack_to_owner = GNUNET_NO;
}
else
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
GNUNET_assert (NULL != receiver);
ld->ccn = receiver->ccn;
GNUNET_memcpy (&ld[1],
buf,
buf_len);
if (GNUNET_YES == receiver->client_ready)
{
ch->pending_messages--;
GSC_send_to_client (receiver->c,
env);
send_ack_to_client (ch,
ack_to_owner);
}
else
{
struct CadetOutOfOrderMessage *oom;
oom = GNUNET_new (struct CadetOutOfOrderMessage);
oom->env = env;
GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
receiver->tail_recv,
oom);
receiver->num_recv++;
}
return GNUNET_OK;
}
/* Everything is correct, send the message. */
crm = GNUNET_malloc (sizeof (*crm));
crm->ch = ch;
crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
+ buf_len);
crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
crm->data_message->mid = ch->mid_send;
crm->data_message->ctn = ch->ctn;
GNUNET_memcpy (&crm->data_message[1],
buf,
buf_len);
GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
ch->tail_sent,
crm);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending message %u from local client to %s with %u bytes\n",
ntohl (crm->data_message->mid.mid),
GCCH_2s (ch),
buf_len);
if (NULL != ch->retry_data_task)
{
GNUNET_SCHEDULER_cancel (ch->retry_data_task);
ch->retry_data_task = NULL;
}
crm->qe = GCT_send (ch->t,
&crm->data_message->header,
&data_sent_cb,
crm);
GNUNET_assert (NULL == ch->retry_data_task);
return GNUNET_OK;
}
/**
* Handle ACK from client on local channel. Means the client is ready
* for more data, see if we have any for it.
*
* @param ch channel to destroy
* @param client_ccn ccn of the client sending the ack
*/
void
GCCH_handle_local_ack (struct CadetChannel *ch,
struct GNUNET_CADET_ClientChannelNumber client_ccn)
{
struct CadetChannelClient *ccc;
struct CadetOutOfOrderMessage *com;
if ( (NULL != ch->owner) &&
(ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
ccc = ch->owner;
else if ( (NULL != ch->dest) &&
(ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
ccc = ch->dest;
else
GNUNET_assert (0);
ccc->client_ready = GNUNET_YES;
com = ccc->head_recv;
if (NULL == com)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
GSC_2s (ccc->c),
ntohl (client_ccn.channel_of_client),
GCCH_2s (ch),
ntohl (ccc->ccn.channel_of_client),
ccc);
return; /* none pending */
}
if (GNUNET_YES == ch->is_loopback)
{
int to_owner;
/* Messages are always in-order, just send */
GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
ccc->tail_recv,
com);
ccc->num_recv--;
GSC_send_to_client (ccc->c,
com->env);
/* Notify sender that we can receive more */
if ( (NULL != ch->owner) &&
(ccc->ccn.channel_of_client ==
ch->owner->ccn.channel_of_client) )
{
to_owner = GNUNET_NO;
}
else
{
GNUNET_assert ( (NULL != ch->dest) &&
(ccc->ccn.channel_of_client ==
ch->dest->ccn.channel_of_client) );
to_owner = GNUNET_YES;
}
send_ack_to_client (ch,
to_owner);
GNUNET_free (com);
return;
}
if ( (com->mid.mid != ch->mid_recv.mid) &&
(GNUNET_NO == ch->out_of_order) &&
(GNUNET_YES == ch->reliable) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
GSC_2s (ccc->c),
ntohl (ccc->ccn.channel_of_client),
ntohl (com->mid.mid),
ntohl (ch->mid_recv.mid));
return; /* missing next one in-order */
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
ntohl (com->mid.mid),
GSC_2s (ccc->c),
ntohl (ccc->ccn.channel_of_client),
GCCH_2s (ch));
/* all good, pass next message to client */
GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
ccc->tail_recv,
com);
ccc->num_recv--;
/* FIXME: if unreliable, this is not aggressive
enough, as it would be OK to have lost some! */
ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
ch->mid_futures >>= 1; /* equivalent to division by 2 */
ccc->client_ready = GNUNET_NO;
GSC_send_to_client (ccc->c,
com->env);
GNUNET_free (com);
send_channel_data_ack (ch);
if (NULL != ccc->head_recv)
return;
if (GNUNET_NO == ch->destroy)
return;
GCT_send_channel_destroy (ch->t,
ch->ctn);
channel_destroy (ch);
}
#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
/**
* Log channel info.
*
* @param ch Channel.
* @param level Debug level to use.
*/
void
GCCH_debug (struct CadetChannel *ch,
enum GNUNET_ErrorType level)
{
#if !defined(GNUNET_CULL_LOGGING)
int do_log;
do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
"cadet-chn",
__FILE__, __FUNCTION__, __LINE__);
if (0 == do_log)
return;
if (NULL == ch)
{
LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
return;
}
LOG2 (level,
"CHN %s:%X (%p)\n",
GCT_2s (ch->t),
ch->ctn,
ch);
if (NULL != ch->owner)
{
LOG2 (level,
"CHN origin %s ready %s local-id: %u\n",
GSC_2s (ch->owner->c),
ch->owner->client_ready ? "YES" : "NO",
ntohl (ch->owner->ccn.channel_of_client));
}
if (NULL != ch->dest)
{
LOG2 (level,
"CHN destination %s ready %s local-id: %u\n",
GSC_2s (ch->dest->c),
ch->dest->client_ready ? "YES" : "NO",
ntohl (ch->dest->ccn.channel_of_client));
}
LOG2 (level,
"CHN Message IDs recv: %d (%LLX), send: %d\n",
ntohl (ch->mid_recv.mid),
(unsigned long long) ch->mid_futures,
ntohl (ch->mid_send.mid));
#endif
}
/* end of gnunet-service-cadet-new_channel.c */