From cadf559899f7dfaf24ed27cab923414058f207b3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 28 Jan 2019 09:29:42 +0100 Subject: more work on TCP communicator --- src/transport/gnunet-communicator-tcp.c | 596 +++++++++++++++++++++++++------- 1 file changed, 480 insertions(+), 116 deletions(-) (limited to 'src/transport/gnunet-communicator-tcp.c') diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c index 5a397c296..a94559bd2 100644 --- a/src/transport/gnunet-communicator-tcp.c +++ b/src/transport/gnunet-communicator-tcp.c @@ -24,11 +24,14 @@ * @author Christian Grothoff * * TODO: - * - lots of basic adaptations (see FIXMEs) - * - better message queue management - * - actually encrypt, hmac, decrypt - * - actually transmit - * - + * - lots of basic adaptations (see FIXMEs), need NAT service + * to determine our own listen IPs! Parsing of bindto spec! + * - actual decryption and handling of boxes and rekeys! + * - message queue management: flow control towards CORE! + * (stop reading from socket until MQ send to core is done; + * will need a counter as ONE read from socket may generate + * multiple messages en route to CORE; tricky bit: queue + * may die before we get MQ sent-done callbacks!) */ #include "platform.h" #include "gnunet_util_lib.h" @@ -60,12 +63,24 @@ */ #define REKEY_TIME_INTERVAL GNUNET_TIME_UNIT_DAYS +/** + * How long do we wait until we must have received the initial KX? + */ +#define PROTO_QUEUE_TIMEOUT GNUNET_TIME_UNIT_MINUTES + /** * How often do we rekey based on number of bytes transmitted? * (additionally randomized). */ #define REKEY_MAX_BYTES (1024LLU * 1024 * 1024 * 4LLU) +/** + * Size of the initial key exchange message sent first in both + * directions. + */ +#define INITIAL_KX_SIZE (sizeof (struct GNUNET_CRYPTO_EcdhePublicKey)+sizeof (struct TCPConfirmation)) + + /** * Address prefix used by the communicator. */ @@ -399,6 +414,61 @@ struct Queue }; +/** + * Handle for an incoming connection where we do not yet have enough + * information to setup a full queue. + */ +struct ProtoQueue +{ + + /** + * Kept in a DLL. + */ + struct ProtoQueue *next; + + /** + * Kept in a DLL. + */ + struct ProtoQueue *prev; + + /** + * socket that we transmit all data with on this queue + */ + struct GNUNET_NETWORK_Handle *sock; + + /** + * ID of read task for this connection. + */ + struct GNUNET_SCHEDULER_Task *read_task; + + /** + * Address of the other peer. + */ + struct sockaddr *address; + + /** + * Length of the address. + */ + socklen_t address_len; + + /** + * Timeout for this protoqueue. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Buffer for reading all the information we need to upgrade from + * protoqueue to queue. + */ + char ibuf[INITIAL_KX_SIZE]; + + /** + * Current offset for reading into @e ibuf. + */ + size_t ibuf_off; +}; + + /** * ID of listen task */ @@ -454,6 +524,16 @@ static struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key; */ static const struct GNUNET_CONFIGURATION_Handle *cfg; +/** + * Protoqueues DLL head. + */ +static struct ProtoQueue *proto_head; + +/** + * Protoqueues DLL tail. + */ +static struct ProtoQueue *proto_tail; + /** * We have been notified that our listen socket has something to @@ -514,7 +594,6 @@ queue_destroy (struct Queue *queue) listen_sock, &listen_cb, NULL); - } @@ -559,8 +638,44 @@ hmac (struct GNUNET_HashCode *hmac_secret, static void queue_finish (struct Queue *queue) { - // FIXME: try to send 'finish' message first!? - queue_destroy (queue); + struct TCPFinish fin; + + memset (&fin, + 0, + sizeof (fin)); + fin.header.size = htons (sizeof (fin)); + fin.header.type = htons (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_FINISH); + hmac (&queue->out_hmac, + &fin, + sizeof (fin), + &fin.hmac); + /* if there is any message left in pwrite_buf, we + overwrite it (possibly dropping the last message + from CORE hard here) */ + memcpy (queue->pwrite_buf, + &fin, + sizeof (fin)); + queue->pwrite_off = sizeof (fin); + /* This flag will ensure that #queue_write() no longer + notifies CORE about the possibility of sending + more data, and that #queue_write() will call + #queue_destroy() once the @c fin was fully written. */ + queue->finishing = GNUNET_YES; +} + + +/** + * Increment queue timeout due to activity. We do not immediately + * notify the monitor here as that might generate excessive + * signalling. + * + * @param queue queue for which the timeout should be rescheduled + */ +static void +reschedule_queue_timeout (struct Queue *queue) +{ + queue->timeout + = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); } @@ -577,16 +692,29 @@ queue_read (void *cls) ssize_t rcvd; queue->read_task = NULL; - /* FIXME: perform read! */ rcvd = GNUNET_NETWORK_socket_recv (queue->sock, &queue->cread_buf[queue->cread_off], BUF_SIZE - queue->cread_off); if (-1 == rcvd) { - // FIXME: error handling... + if ( (EAGAIN != errno) && + (EINTR != errno) ) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, + "recv"); + queue_finish (queue); + return; + } + /* try again */ + queue->read_task + = GNUNET_SCHEDULER_add_read_net (left, + queue->sock, + &queue_read, + queue); + return; } if (0 != rcvd) - /* update queue timeout */ + reschedule_queue_timeout (queue); queue->cread_off += rcvd; if (queue->pread_off < sizeof (queue->pread_buf)) { @@ -620,22 +748,6 @@ queue_read (void *cls) } -/** - * Increment queue timeout due to activity. We do not immediately - * notify the monitor here as that might generate excessive - * signalling. - * - * @param queue queue for which the timeout should be rescheduled - */ -static void -reschedule_queue_timeout (struct Queue *queue) -{ - GNUNET_assert (NULL != queue->read_task); - queue->timeout - = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); -} - - /** * Convert TCP bind specification to a `struct sockaddr *` * @@ -856,7 +968,7 @@ queue_write (void *cls) memmove (queue->cwrite_buf, &queue->cwrite_buf[sent], queue->cwrite_off - sent); - /* FIXME: update queue timeout */ + reschedule_queue_timeout (queue); } /* can we encrypt more? (always encrypt full messages, needed such that #mq_cancel() can work!) */ @@ -889,6 +1001,13 @@ queue_write (void *cls) queue->mq_awaits_continue = GNUNET_NO; GNUNET_MQ_impl_send_continue (queue->mq); } + /* did we just finish writing 'finish'? */ + if ( (0 == queue->cwrite_off) && + (GNUNET_YES == queue->finishing) ) + { + queue_destroy (queue); + return; + } /* do we care to write more? */ if (0 < queue->cwrite_off) queue->write_task @@ -917,6 +1036,8 @@ mq_send (struct GNUNET_MQ_Handle *mq, struct TCPBox box; GNUNET_assert (mq == queue->mq); + if (GNUNET_YES == queue->finishing) + return; /* this queue is dying, drop msg */ GNUNET_assert (0 == queue->pread_off); box.header.type = htons (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_BOX); box.header.size = htons (msize); @@ -1005,31 +1126,16 @@ mq_error (void *cls, /** - * Creates a new outbound queue the transport service will use to send - * data to another peer. + * Add the given @a queue to our internal data structure. Setup the + * MQ processing and inform transport that the queue is ready. Must + * be called after the KX for outgoing messages has been bootstrapped. * - * @param sock the queue's socket - * @param target the target peer - * @param cs inbound or outbound queue - * @param in the address - * @param in_len number of bytes in @a in - * @return the queue or NULL of max connections exceeded - */ -static struct Queue * -setup_queue (struct GNUNET_NETWORK_Handle *sock, - const struct GNUNET_PeerIdentity *target, - enum GNUNET_TRANSPORT_ConnectionStatus cs, - const struct sockaddr *in, - socklen_t in_len) + * @param queue queue to boot + */ +static void +boot_queue (struct Queue *queue, + enum GNUNET_TRANSPORT_ConnectionStatus cs) { - struct Queue *queue; - - queue = GNUNET_new (struct Queue); - queue->target = *target; - queue->address = GNUNET_memdup (in, - in_len); - queue->address_len = in_len; - queue->sock = sock; queue->nt = 0; // FIXME: determine NT! (void) GNUNET_CONTAINER_multipeermap_put (queue_map, &queue->target, @@ -1041,11 +1147,6 @@ setup_queue (struct GNUNET_NETWORK_Handle *sock, GNUNET_NO); queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - queue->read_task - = GNUNET_SCHEDULER_add_read_net (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - queue->sock, - &queue_read, - queue); queue->mq = GNUNET_MQ_queue_for_callbacks (&mq_send, &mq_destroy, @@ -1086,19 +1187,217 @@ setup_queue (struct GNUNET_NETWORK_Handle *sock, queue->mq); GNUNET_free (foreign_addr); } - return queue; } /** - * We have been notified that our listen socket has something to - * read. Do the read and reschedule this function to be called again - * once more is available. + * Generate and transmit our ephemeral key and the signature for + * the initial KX with the other peer. Must be called first, before + * any other bytes are ever written to the output buffer. Note that + * our cipher must already be initialized when calling this function. + * Helper function for #start_initial_kx_out(). * - * @param cls NULL + * @param queue queue to do KX for + * @param epub our public key for the KX */ static void -listen_cb (void *cls); +transmit_kx (struct Queue *queue, + const struct GNUNET_CRYPTO_EcdhePublicKey *epub) +{ + struct TcpHandshakeSignature ths; + struct TCPConfirmation tc; + + memcpy (queue->cwrite_buf, + epub, + sizeof (*epub)); + queue->cwrite_off = sizeof (epub); + /* compute 'tc' and append in encrypted format to cwrite_buf */ + tc.sender = my_identity; + tc.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); + ths.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE); + ths.purpose.size = htonl (sizeof (ths)); + ths.sender = my_identity; + ths.receiver = queue->target; + ths.ephemeral = *epub; + ths.monotonic_time = tc.monotonic_time; + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_eddsa_sign (my_private_key, + &ths.purpose, + &tc.sender_sig)); + GNUNET_assert (0 == + gcry_cipher_encrypt (queue->out_cipher, + &queue->cwrite_buf[queue->cwrite_off], + sizeof (tc), + &tc, + sizeof (tc))); + queue->cwrite_off += sizeof (tc); +} + + +/** + * Initialize our key material for outgoing transmissions and + * inform the other peer about it. Must be called first before + * any data is sent. + * + * @param queue the queue to setup + */ +static void +start_initial_kx_out (struct Queue *queue) +{ + struct GNUNET_CRYPTO_EcdhePublicKey epub; + + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_ecdhe_key_create2 (&queue->ephemeral)); + GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, + &epub); + setup_out_cipher (queue); + transmit_kx (queue, + &epub); +} + + +/** + * We have received the first bytes from the other side on a @a queue. + * Decrypt the @a tc contained in @a ibuf and check the signature. + * Note that #setup_in_cipher() must have already been called. + * + * @param queue queue to decrypt initial bytes from other peer for + * @param tc[out] where to store the result + * @param ibuf incoming data, of size + * `INITIAL_KX_SIZE` + * @return #GNUNET_OK if the signature was OK, #GNUNET_SYSERR if not + */ +static int +decrypt_and_check_tc (struct Queue *queue, + struct TCPConfirmation *tc, + char *ibuf) +{ + struct TcpHandshakeSignature ths; + + GNUNET_assert (0 == + gcry_cipher_decrypt (queue->in_cipher, + tc, + sizeof (*tc), + &ibuf[sizeof (struct GNUNET_CRYPTO_EcdhePublicKey)], + sizeof (tc))); + ths.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE); + ths.purpose.size = htonl (sizeof (ths)); + ths.sender = tc->sender; + ths.receiver = my_identity; + memcpy (&ths.ephemeral, + ibuf, + sizeof (struct GNUNET_CRYPTO_EcdhePublicKey)); + ths.monotonic_time = tc->monotonic_time; + return GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE, + &ths.purpose, + &tc->sender_sig, + &tc->sender.public_key); +} + + +/** + * Closes socket and frees memory associated with @a pq. + * + * @param pq proto queue to free + */ +static void +free_proto_queue (struct ProtoQueue *pq) +{ + GNUNET_NETWORK_socket_close (pq->sock); + GNUNET_free (pq->address); + GNUNET_CONTAINER_DLL_remove (proto_head, + proto_tail, + pq); + GNUNET_free (pq); +} + + +/** + * Read from the socket of the proto queue until we have enough data + * to upgrade to full queue. + * + * @param cls a `struct ProtoQueue` + */ +static void +proto_read_kx (void *cls) +{ + struct ProtoQueue *pq = cls; + ssize_t rcvd; + struct GNUNET_TIME_Relative left; + struct Queue *queue; + struct TCPConfirmation tc; + + pq->read_task = NULL; + left = GNUNET_TIME_absolute_get_remaining (pq->timeout); + if (0 == left.rel_value_us) + { + free_proto_queue (pq); + return; + } + rcvd = GNUNET_NETWORK_socket_recv (pq->sock, + &pq->ibuf[pq->ibuf_off], + sizeof (pq->ibuf) - pq->ibuf_off); + if (-1 == rcvd) + { + if ( (EAGAIN != errno) && + (EINTR != errno) ) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, + "recv"); + free_proto_queue (pq); + return; + } + /* try again */ + pq->read_task = GNUNET_SCHEDULER_add_read_net (left, + pq->sock, + &proto_read_kx, + pq); + return; + } + pq->ibuf_off += rcvd; + if (pq->ibuf_off > sizeof (pq->ibuf)) + { + /* read more */ + pq->read_task = GNUNET_SCHEDULER_add_read_net (left, + pq->sock, + &proto_read_kx, + pq); + return; + } + /* we got all the data, let's find out who we are talking to! */ + queue = GNUNET_new (struct Queue); + setup_in_cipher ((const struct GNUNET_CRYPTO_EcdhePublicKey *) pq->ibuf, + queue); + if (GNUNET_OK != + decrypt_and_check_tc (queue, + &tc, + pq->ibuf)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Invalid TCP KX received from %s\n", + GNUNET_a2s (queue->address, + queue->address_len)); + gcry_cipher_close (queue->in_cipher); + GNUNET_free (queue); + free_proto_queue (pq); + return; + } + queue->address = pq->address; /* steals reference */ + queue->address_len = pq->address_len; + queue->target = tc.sender; + start_initial_kx_out (queue); + boot_queue (queue, + GNUNET_TRANSPORT_CS_INBOUND); + queue->read_task + = GNUNET_SCHEDULER_add_read_net (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + queue->sock, + &queue_read, + queue); + GNUNET_CONTAINER_DLL_remove (proto_head, + proto_tail, + pq); + GNUNET_free (pq); +} /** @@ -1111,10 +1410,10 @@ listen_cb (void *cls); static void listen_cb (void *cls) { - struct Queue *queue; struct sockaddr_storage in; socklen_t addrlen; struct GNUNET_NETWORK_Handle *sock; + struct ProtoQueue *pq; listen_task = NULL; GNUNET_assert (NULL != listen_sock); @@ -1143,22 +1442,112 @@ listen_cb (void *cls) "accept"); return; } -#if 0 - // FIXME: setup proto-queue first here, until we have received the starting - // messages! - queue = setup_queue (sock, - GNUNET_TRANSPORT_CS_INBOUND, - (struct sockaddr *) &in, - addrlen); - if (NULL == queue) + pq = GNUNET_new (struct ProtoQueue); + pq->address_len = addrlen; + pq->address = GNUNET_memdup (&in, + addrlen); + pq->timeout = GNUNET_TIME_relative_to_absolute (PROTO_QUEUE_TIMEOUT); + pq->sock = sock; + pq->read_task = GNUNET_SCHEDULER_add_read_net (PROTO_QUEUE_TIMEOUT, + pq->sock, + &proto_read_kx, + pq); + GNUNET_CONTAINER_DLL_insert (proto_head, + proto_tail, + pq); +} + + +/** + * Read from the socket of the queue until we have enough data + * to initialize the decryption logic and can switch to regular + * reading. + * + * @param cls a `struct Queue` + */ +static void +queue_read_kx (void *cls) +{ + struct Queue *queue = cls; + ssize_t rcvd; + struct GNUNET_TIME_Relative left; + struct TCPConfirmation tc; + + queue->read_task = NULL; + left = GNUNET_TIME_absolute_get_remaining (queue->timeout); + if (0 == left.rel_value_us) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Maximum number of TCP connections exceeded, dropping incoming connection\n")); + queue_destroy (queue); + return; + } + rcvd = GNUNET_NETWORK_socket_recv (queue->sock, + &queue->cread_buf[queue->cread_off], + BUF_SIZE - queue->cread_off); + if (-1 == rcvd) + { + if ( (EAGAIN != errno) && + (EINTR != errno) ) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, + "recv"); + queue_destroy (queue); + return; + } + queue->read_task = GNUNET_SCHEDULER_add_read_net (left, + queue->sock, + &queue_read_kx, + queue); + return; + } + queue->cread_off += rcvd; + if (queue->cread_off < + INITIAL_KX_SIZE) + { + /* read more */ + queue->read_task = GNUNET_SCHEDULER_add_read_net (left, + queue->sock, + &queue_read_kx, + queue); + return; + } + /* we got all the data, let's find out who we are talking to! */ + setup_in_cipher ((const struct GNUNET_CRYPTO_EcdhePublicKey *) queue->cread_buf, + queue); + if (GNUNET_OK != + decrypt_and_check_tc (queue, + &tc, + queue->cread_buf)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Invalid TCP KX received from %s\n", + GNUNET_a2s (queue->address, + queue->address_len)); + queue_destroy (queue); + return; + } + if (0 != memcmp (&tc.sender, + &queue->target, + sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Invalid sender in TCP KX received from %s\n", + GNUNET_a2s (queue->address, + queue->address_len)); + queue_destroy (queue); return; } -#endif -} + /* update queue timeout */ + reschedule_queue_timeout (queue); + /* prepare to continue with regular read task immediately */ + memmove (queue->cread_buf, + &queue->cread_buf[INITIAL_KX_SIZE], + queue->cread_off - (INITIAL_KX_SIZE)); + queue->cread_off -= INITIAL_KX_SIZE; + queue->read_task = GNUNET_SCHEDULER_add_now (&queue_read, + queue); +} + /** * Function called by the transport service to initialize a @@ -1187,10 +1576,7 @@ mq_init (void *cls, struct sockaddr *in; socklen_t in_len; struct GNUNET_NETWORK_Handle *sock; - struct GNUNET_CRYPTO_EcdhePublicKey epub; - struct TcpHandshakeSignature ths; - struct TCPConfirmation tc; - + if (0 != strncmp (address, COMMUNICATOR_ADDRESS_PREFIX "-", strlen (COMMUNICATOR_ADDRESS_PREFIX "-"))) @@ -1227,12 +1613,19 @@ mq_init (void *cls, GNUNET_free (in); return GNUNET_SYSERR; } - queue = setup_queue (sock, - peer, - GNUNET_TRANSPORT_CS_OUTBOUND, - in, - in_len); - GNUNET_free (in); + + queue = GNUNET_new (struct Queue); + queue->target = *peer; + queue->address = in; + queue->address_len = in_len; + queue->sock = sock; + boot_queue (queue, + GNUNET_TRANSPORT_CS_OUTBOUND); + queue->read_task + = GNUNET_SCHEDULER_add_read_net (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + queue->sock, + &queue_read_kx, + queue); if (NULL == queue) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1242,37 +1635,8 @@ mq_init (void *cls, GNUNET_NETWORK_socket_close (sock); return GNUNET_NO; } - GNUNET_assert (GNUNET_OK == - GNUNET_CRYPTO_ecdhe_key_create2 (&queue->ephemeral)); - GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, - &epub); - setup_out_cipher (queue); - memcpy (queue->cwrite_buf, - &epub, - sizeof (epub)); - queue->cwrite_off = sizeof (epub); - /* compute 'tc' and append in encrypted format to cwrite_buf */ - tc.sender = my_identity; - tc.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); - ths.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE); - ths.purpose.size = htonl (sizeof (ths)); - ths.sender = my_identity; - ths.receiver = queue->target; - ths.ephemeral = epub; - ths.monotonic_time = tc.monotonic_time; - GNUNET_assert (GNUNET_OK == - GNUNET_CRYPTO_eddsa_sign (my_private_key, - &ths.purpose, - &tc.sender_sig)); - GNUNET_assert (0 == - gcry_cipher_encrypt (queue->out_cipher, - &queue->cwrite_buf[queue->cwrite_off], - sizeof (tc), - &tc, - sizeof (tc))); - queue->cwrite_off += sizeof (tc); - - return GNUNET_OK; + start_initial_kx_out (queue); + return GNUNET_OK; } -- cgit v1.2.3