From 8ed6d64262665ba9ce306823f569213feabba669 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 24 Jan 2017 21:00:23 +0100 Subject: fix client-client loopback flow control --- src/cadet/gnunet-service-cadet-new_channel.c | 316 ++++++++++++++++----------- 1 file changed, 192 insertions(+), 124 deletions(-) (limited to 'src/cadet/gnunet-service-cadet-new_channel.c') diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c index 74aafe5a1..98cfa8383 100644 --- a/src/cadet/gnunet-service-cadet-new_channel.c +++ b/src/cadet/gnunet-service-cadet-new_channel.c @@ -25,6 +25,8 @@ * @author Christian Grothoff * * TODO: + * - FIXME: send ACKs back to loopback clients! + * * - introduce shutdown so we can have half-closed channels, modify * destroy to include MID to have FIN-ACK equivalents, etc. * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! @@ -159,6 +161,46 @@ struct CadetOutOfOrderMessage }; +/** + * Client endpoint of a `struct CadetChannel`. A channel may be a + * loopback channel, in which case it has two of these endpoints. + * Note that flow control also is required in both directions. + */ +struct CadetChannelClient +{ + /** + * Client handle. Not by itself sufficient to designate + * the client endpoint, as the same client handle may + * be used for both the owner and the destination, and + * we thus also need the channel ID to identify the client. + */ + struct CadetClient *c; + + /** + * Head of DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *head_recv; + + /** + * Tail DLL of messages received out of order or while client was unready. + */ + struct CadetOutOfOrderMessage *tail_recv; + + /** + * Local tunnel number for this client. + * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI, + * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) + */ + struct GNUNET_CADET_ClientChannelNumber ccn; + + /** + * Can we send data to the client? + */ + int client_ready; + +}; + + /** * Struct containing all information regarding a channel to a remote client. */ @@ -173,13 +215,13 @@ struct CadetChannel * Client owner of the tunnel, if any. * (Used if this channel represends the initiating end of the tunnel.) */ - struct CadetClient *owner; + struct CadetChannelClient *owner; /** * Client destination of the tunnel, if any. * (Used if this channel represents the listening end of the tunnel.) */ - struct CadetClient *dest; + struct CadetChannelClient *dest; /** * Last entry in the tunnel's queue relating to control messages @@ -199,16 +241,6 @@ struct CadetChannel */ struct CadetReliableMessage *tail_sent; - /** - * Head of DLL of messages received out of order or while client was unready. - */ - struct CadetOutOfOrderMessage *head_recv; - - /** - * Tail DLL of messages received out of order or while client was unready. - */ - struct CadetOutOfOrderMessage *tail_recv; - /** * Task to resend/poll in case no ACK is received. */ @@ -270,28 +302,11 @@ struct CadetChannel */ struct GNUNET_CADET_ChannelTunnelNumber ctn; - /** - * Local tunnel number for local client @e owner owning the channel. - * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) - */ - struct GNUNET_CADET_ClientChannelNumber ccn_owner; - - /** - * Local tunnel number for local client @e dest owning the channel. - * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) - */ - struct GNUNET_CADET_ClientChannelNumber ccn_dest; - /** * Channel state. */ enum CadetChannelState state; - /** - * Can we send data to the client? - */ - int client_ready; - /** * Is the tunnel bufferless (minimum latency)? */ @@ -342,8 +357,8 @@ GCCH_2s (const struct CadetChannel *ch) : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))), GNUNET_h2s (&ch->port), ch->ctn, - ntohl (ch->ccn_owner.channel_of_client), - ntohl (ch->ccn_dest.channel_of_client)); + (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client), + (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client)); return buf; } @@ -362,6 +377,28 @@ GCCH_get_id (const struct CadetChannel *ch) } +/** + * Release memory associated with @a ccc + * + * @param ccc data structure to clean up + */ +static void +free_channel_client (struct CadetChannelClient *ccc) +{ + struct CadetOutOfOrderMessage *com; + + while (NULL != (com = ccc->head_recv)) + { + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, + com); + GNUNET_MQ_discard (com->env); + GNUNET_free (com); + } + GNUNET_free (ccc); +} + + /** * Destroy the given channel. * @@ -371,7 +408,6 @@ static void channel_destroy (struct CadetChannel *ch) { struct CadetReliableMessage *crm; - struct CadetOutOfOrderMessage *com; while (NULL != (crm = ch->head_sent)) { @@ -386,13 +422,15 @@ channel_destroy (struct CadetChannel *ch) crm); GNUNET_free (crm); } - while (NULL != (com = ch->head_recv)) + if (NULL != ch->owner) { - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, - com); - GNUNET_MQ_discard (com->env); - GNUNET_free (com); + free_channel_client (ch->owner); + ch->owner = NULL; + } + if (NULL != ch->dest) + { + free_channel_client (ch->dest); + ch->dest = NULL; } if (NULL != ch->last_control_qe) { @@ -444,7 +482,7 @@ channel_open_sent_cb (void *cls) ch->last_control_qe = NULL; ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sent CHANNEL_OPEN on %s, retrying in %s\n", + "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n", GCCH_2s (ch), GNUNET_STRINGS_relative_time_to_string (ch->retry_time, GNUNET_YES)); @@ -532,23 +570,30 @@ GCCH_channel_local_new (struct CadetClient *owner, uint32_t options) { struct CadetChannel *ch; + struct CadetChannelClient *ccco; + + ccco = GNUNET_new (struct CadetChannelClient); + ccco->c = owner; + ccco->ccn = ccn; + ccco->client_ready = GNUNET_YES; ch = GNUNET_new (struct CadetChannel); ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */ - ch->owner = owner; - ch->ccn_owner = ccn; + ch->owner = ccco; ch->port = *port; if (0 == memcmp (&my_full_id, GCP_get_id (destination), sizeof (struct GNUNET_PeerIdentity))) { + struct CadetClient *c; + ch->is_loopback = GNUNET_YES; - ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports, - port); - if (NULL == ch->dest) + c = GNUNET_CONTAINER_multihashmap_get (open_ports, + port); + if (NULL == c) { /* port closed, wait for it to possibly open */ (void) GNUNET_CONTAINER_multihashmap_put (loose_channels, @@ -561,8 +606,11 @@ GCCH_channel_local_new (struct CadetClient *owner, } else { + ch->dest = GNUNET_new (struct CadetChannelClient); + ch->dest->c = c; + ch->dest->client_ready = GNUNET_YES; GCCH_bind (ch, - ch->dest); + ch->dest->c); } } else @@ -786,20 +834,18 @@ send_ack_to_client (struct CadetChannel *ch, { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalAck *ack; - struct CadetClient *c; + struct CadetChannelClient *ccc; env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); - ack->ccn = (GNUNET_YES == to_owner) ? ch->ccn_owner : ch->ccn_dest; - c = (GNUNET_YES == to_owner) - ? ch->owner - : ch->dest; + ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest; + ack->ccn = ccc->ccn; LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n", - GSC_2s (c), + GSC_2s (ccc->c), (GNUNET_YES == to_owner) ? "owner" : "dest", ntohl (ack->ccn.channel_of_client)); - GSC_send_to_client (c, + GSC_send_to_client (ccc->c, env); } @@ -817,6 +863,7 @@ GCCH_bind (struct CadetChannel *ch, struct CadetClient *c) { uint32_t options; + struct CadetChannelClient *cccd; LOG (GNUNET_ERROR_TYPE_DEBUG, "Binding %s from %s to port %s of %s\n", @@ -837,16 +884,19 @@ GCCH_bind (struct CadetChannel *ch, options |= GNUNET_CADET_OPTION_RELIABLE; if (ch->out_of_order) options |= GNUNET_CADET_OPTION_OUT_OF_ORDER; - ch->dest = c; - ch->ccn_dest = GSC_bind (c, - ch, - (GNUNET_YES == ch->is_loopback) - ? GCP_get (&my_full_id, - GNUNET_YES) - : GCT_get_destination (ch->t), - &ch->port, - options); - GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < + cccd = GNUNET_new (struct CadetChannelClient); + ch->dest = cccd; + cccd->c = c; + cccd->client_ready = GNUNET_YES; + cccd->ccn = GSC_bind (c, + ch, + (GNUNET_YES == ch->is_loopback) + ? GCP_get (&my_full_id, + GNUNET_YES) + : GCT_get_destination (ch->t), + &ch->port, + options); + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */ if (GNUNET_YES == ch->is_loopback) @@ -862,7 +912,7 @@ GCCH_bind (struct CadetChannel *ch, ch); } /* give client it's initial supply of ACKs */ - GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) < + GNUNET_assert (ntohl (cccd->ccn.channel_of_client) < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); for (unsigned int i=0;imax_pending_messages;i++) send_ack_to_client (ch, @@ -876,22 +926,37 @@ GCCH_bind (struct CadetChannel *ch, * * @param ch channel to destroy * @param c client that caused the destruction + * @param ccn client number of the client @a c */ void GCCH_channel_local_destroy (struct CadetChannel *ch, - struct CadetClient *c) + struct CadetClient *c, + struct GNUNET_CADET_ClientChannelNumber ccn) { LOG (GNUNET_ERROR_TYPE_DEBUG, "%s asks for destruction of %s\n", GSC_2s (c), GCCH_2s (ch)); GNUNET_assert (NULL != c); - if (c == ch->owner) + if ( (NULL != ch->owner) && + (c == ch->owner->c) && + (ccn.channel_of_client == ch->owner->ccn.channel_of_client) ) + { + free_channel_client (ch->owner); ch->owner = NULL; - else if (c == ch->dest) + } + else if ( (NULL != ch->dest) && + (c == ch->dest->c) && + (ccn.channel_of_client == ch->dest->ccn.channel_of_client) ) + { + free_channel_client (ch->dest); ch->dest = NULL; + } else + { GNUNET_assert (0); + } + if (GNUNET_YES == ch->destroy) { /* other end already destroyed, with the local client gone, no need @@ -1008,6 +1073,7 @@ is_before (void *cls, * and send an ACK to the other end (once flow control allows it!) * * @param ch channel that got data + * @param msg message that was received */ void GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, @@ -1015,6 +1081,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalData *ld; + struct CadetChannelClient *ccc; struct CadetOutOfOrderMessage *com; size_t payload_size; @@ -1023,11 +1090,12 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, env = GNUNET_MQ_msg_extra (ld, payload_size, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest; + ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn; GNUNET_memcpy (&ld[1], &msg[1], payload_size); - if ( (GNUNET_YES == ch->client_ready) && + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if ( (GNUNET_YES == ccc->client_ready) && ( (GNUNET_YES == ch->out_of_order) || (msg->mid.mid == ch->mid_recv.mid) ) ) { @@ -1035,8 +1103,9 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, "Giving %u bytes of payload from %s to client %s\n", (unsigned int) payload_size, GCCH_2s (ch), - GSC_2s (ch->owner ? ch->owner : ch->dest)); - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + GSC_2s (ccc->c)); + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, env); ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid)); ch->mid_futures >>= 1; @@ -1047,7 +1116,7 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, drop it (can't buffer too much!) */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n", - (GNUNET_YES == ch->client_ready) + (GNUNET_YES == ccc->client_ready) ? "out-of-order" : "client-not-ready", (unsigned int) payload_size, @@ -1059,36 +1128,36 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, com->mid = msg->mid; com->env = env; /* sort into list ordered by "is_before" */ - if ( (NULL == ch->head_recv) || + if ( (NULL == ccc->head_recv) || (GNUNET_YES == is_before (ch, com, - ch->head_recv)) ) + ccc->head_recv)) ) { - GNUNET_CONTAINER_DLL_insert (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_insert (ccc->head_recv, + ccc->tail_recv, com); } else { struct CadetOutOfOrderMessage *pos; - for (pos = ch->head_recv; + for (pos = ccc->head_recv; NULL != pos; pos = pos->next) { if (GNUNET_YES != - is_before (ch, + is_before (NULL, pos, com)) break; } if (NULL == pos) - GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv, + ccc->tail_recv, com); else - GNUNET_CONTAINER_DLL_insert_after (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv, + ccc->tail_recv, com, pos->prev); } @@ -1166,6 +1235,8 @@ GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, void GCCH_handle_remote_destroy (struct CadetChannel *ch) { + struct CadetChannelClient *ccc; + GNUNET_assert (GNUNET_NO == ch->is_loopback); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received remote channel DESTROY for %s\n", @@ -1176,7 +1247,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) channel_destroy (ch); return; } - if (NULL != ch->head_recv) + ccc = (NULL != ch->owner) ? ch->owner : ch->dest; + if (NULL != ccc->head_recv) { LOG (GNUNET_ERROR_TYPE_WARNING, "Lost end of transmission due to remote shutdown on %s\n", @@ -1184,8 +1256,8 @@ GCCH_handle_remote_destroy (struct CadetChannel *ch) /* FIXME: change API to notify client about truncated transmission! */ } ch->destroy = GNUNET_YES; - GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest, - (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest, + GSC_handle_remote_channel_destroy (ccc->c, + ccc->ccn, ch); channel_destroy (ch); } @@ -1326,7 +1398,7 @@ GCCH_handle_local_data (struct CadetChannel *ch, if (GNUNET_YES == ch->is_loopback) { - struct CadetClient *receiver; + struct CadetChannelClient *receiver; struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_LocalData *ld; int to_owner; @@ -1335,25 +1407,24 @@ GCCH_handle_local_data (struct CadetChannel *ch, buf_len, GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); if (sender_ccn.channel_of_client == - ch->ccn_owner.channel_of_client) + ch->owner->ccn.channel_of_client) { receiver = ch->dest; - ld->ccn = ch->ccn_dest; to_owner = GNUNET_NO; } else { GNUNET_assert (sender_ccn.channel_of_client == - ch->ccn_dest.channel_of_client); + ch->dest->ccn.channel_of_client); receiver = ch->owner; - ld->ccn = ch->ccn_owner; to_owner = GNUNET_YES; } + ld->ccn = receiver->ccn; GNUNET_memcpy (&ld[1], buf, buf_len); /* FIXME: this does not provide for flow control! */ - GSC_send_to_client (receiver, + GSC_send_to_client (receiver->c, env); send_ack_to_client (ch, to_owner); @@ -1387,18 +1458,31 @@ GCCH_handle_local_data (struct CadetChannel *ch, /** - * Try to deliver messages to the local client, if it is ready for more. + * Handle ACK from client on local channel. Means the client is ready + * for more data, see if we have any for it. * - * @param ch channel to process + * @param ch channel to destroy + * @param client_ccn ccn of the client sending the ack */ -static void -send_client_buffered_data (struct CadetChannel *ch) +void +GCCH_handle_local_ack (struct CadetChannel *ch, + struct GNUNET_CADET_ClientChannelNumber client_ccn) { + struct CadetChannelClient *ccc; struct CadetOutOfOrderMessage *com; - if (GNUNET_NO == ch->client_ready) - return; /* client not ready */ - com = ch->head_recv; + if ( (NULL != ch->owner) && + (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->owner; + else if ( (NULL != ch->dest) && + (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) ) + ccc = ch->dest; + else + GNUNET_assert (0); + ccc->client_ready = GNUNET_YES; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got LOCAL_ACK, client ready to receive more data!\n"); + com = ccc->head_recv; if (NULL == com) return; /* none pending */ if ( (com->mid.mid != ch->mid_recv.mid) && @@ -1410,14 +1494,15 @@ send_client_buffered_data (struct CadetChannel *ch) GCCH_2s (ch)); /* all good, pass next message to client */ - GNUNET_CONTAINER_DLL_remove (ch->head_recv, - ch->tail_recv, + GNUNET_CONTAINER_DLL_remove (ccc->head_recv, + ccc->tail_recv, com); /* FIXME: if unreliable, this is not aggressive enough, as it would be OK to have lost some! */ ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); ch->mid_futures >>= 1; /* equivalent to division by 2 */ - GSC_send_to_client (ch->owner ? ch->owner : ch->dest, + ccc->client_ready = GNUNET_NO; + GSC_send_to_client (ccc->c, com->env); GNUNET_free (com); if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) && @@ -1435,7 +1520,7 @@ send_client_buffered_data (struct CadetChannel *ch) send_channel_data_ack (ch); } - if (NULL != ch->head_recv) + if (NULL != ccc->head_recv) return; if (GNUNET_NO == ch->destroy) return; @@ -1445,23 +1530,6 @@ send_client_buffered_data (struct CadetChannel *ch) } -/** - * Handle ACK from client on local channel. - * - * @param ch channel to destroy - * @param client_ccn ccn of the client sending the ack - */ -void -GCCH_handle_local_ack (struct CadetChannel *ch, - struct GNUNET_CADET_ClientChannelNumber client_ccn) -{ - ch->client_ready = GNUNET_YES; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got LOCAL_ACK, client ready to receive more data!\n"); - send_client_buffered_data (ch); -} - - #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__) @@ -1497,17 +1565,17 @@ GCCH_debug (struct CadetChannel *ch, { LOG2 (level, "CHN origin %s ready %s local-id: %u\n", - GSC_2s (ch->owner), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn_owner.channel_of_client)); + GSC_2s (ch->owner->c), + ch->owner->client_ready ? "YES" : "NO", + ntohl (ch->owner->ccn.channel_of_client)); } if (NULL != ch->dest) { LOG2 (level, "CHN destination %s ready %s local-id: %u\n", - GSC_2s (ch->dest), - ch->client_ready ? "YES" : "NO", - ntohl (ch->ccn_dest.channel_of_client)); + GSC_2s (ch->dest->c), + ch->dest->client_ready ? "YES" : "NO", + ntohl (ch->dest->ccn.channel_of_client)); } LOG2 (level, "CHN Message IDs recv: %d (%LLX), send: %d\n", -- cgit v1.2.3