From 07533eec5c7b1637374ea1496595918861ac8b6d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 28 Jan 2019 12:43:09 +0100 Subject: more work on TCP communicator, almost there --- src/transport/gnunet-communicator-tcp.c | 498 ++++++++++++++++++++++++++++---- 1 file changed, 437 insertions(+), 61 deletions(-) (limited to 'src/transport/gnunet-communicator-tcp.c') diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c index a94559bd2..050a5f225 100644 --- a/src/transport/gnunet-communicator-tcp.c +++ b/src/transport/gnunet-communicator-tcp.c @@ -24,14 +24,8 @@ * @author Christian Grothoff * * TODO: - * - lots of basic adaptations (see FIXMEs), need NAT service - * to determine our own listen IPs! Parsing of bindto spec! - * - actual decryption and handling of boxes and rekeys! - * - message queue management: flow control towards CORE! - * (stop reading from socket until MQ send to core is done; - * will need a counter as ONE read from socket may generate - * multiple messages en route to CORE; tricky bit: queue - * may die before we get MQ sent-done callbacks!) + * - NAT service API change to handle address stops! + * - handling of rekeys! */ #include "platform.h" #include "gnunet_util_lib.h" @@ -39,6 +33,7 @@ #include "gnunet_signatures.h" #include "gnunet_constants.h" #include "gnunet_nt_lib.h" +#include "gnunet_nat_service.h" #include "gnunet_statistics_service.h" #include "gnunet_transport_communication_service.h" @@ -390,11 +385,20 @@ struct Queue */ struct GNUNET_TIME_Absolute timeout; + /** + * How may messages did we pass from this queue to CORE for which we + * have yet to receive an acknoweldgement that CORE is done with + * them? If "large" (or even just non-zero), we should throttle + * reading to provide flow control. See also #DEFAULT_MAX_QUEUE_LENGTH + * and #max_queue_length. + */ + unsigned int backpressure; + /** * Which network type does this queue use? */ enum GNUNET_NetworkType nt; - + /** * Is MQ awaiting a #GNUNET_MQ_impl_send_continue() call? */ @@ -405,6 +409,14 @@ struct Queue */ int finishing; + /** + * Did we technically destroy this queue, but kept the allocation + * around because of @e backpressure not being zero yet? Used + * simply to delay the final #GNUNET_free() operation until + * #core_read_finished_cb() has been called. + */ + int destroyed; + /** * #GNUNET_YES after #inject_key() placed the rekey message into the * plaintext buffer. Once the plaintext buffer is drained, this @@ -474,11 +486,6 @@ struct ProtoQueue */ static struct GNUNET_SCHEDULER_Task *listen_task; -/** - * Number of messages we currently have in our queues towards the transport service. - */ -static unsigned long long delivering_messages; - /** * Maximum queue length before we stop reading towards the transport service. */ @@ -504,11 +511,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *queue_map; */ static struct GNUNET_NETWORK_Handle *listen_sock; -/** - * Handle to the operation that publishes our address. - */ -static struct GNUNET_TRANSPORT_AddressIdentifier *ai; - /** * Our public key. */ @@ -524,6 +526,11 @@ static struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key; */ static const struct GNUNET_CONFIGURATION_Handle *cfg; +/** + * Connection to NAT service. + */ +static struct GNUNET_NAT_Handle *nat; + /** * Protoqueues DLL head. */ @@ -588,7 +595,10 @@ queue_destroy (struct Queue *queue) gcry_cipher_close (queue->in_cipher); gcry_cipher_close (queue->out_cipher); GNUNET_free (queue->address); - GNUNET_free (queue); + if (0 != queue->backpressure) + queue->destroyed = GNUNET_YES; + else + GNUNET_free (queue); if (NULL == listen_task) listen_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, listen_sock, @@ -679,6 +689,213 @@ reschedule_queue_timeout (struct Queue *queue) } +/** + * Queue read task. If we hit the timeout, disconnect it + * + * @param cls the `struct Queue *` to disconnect + */ +static void +queue_read (void *cls); + + +/** + * Core tells us it is done processing a message that transport + * received on a queue with status @a success. + * + * @param cls a `struct Queue *` where the message originally came from + * @param success #GNUNET_OK on success + */ +static void +core_read_finished_cb (void *cls, + int success) +{ + struct Queue *queue = cls; + + if (GNUNET_OK != success) + GNUNET_STATISTICS_update (stats, + "# messages lost in communicator API towards CORE", + 1, + GNUNET_NO); + queue->backpressure--; + /* handle deferred queue destruction */ + if ( (queue->destroyed) && + (0 == queue->backpressure) ) + { + GNUNET_free (queue); + return; + } + reschedule_queue_timeout (queue); + /* possibly unchoke reading, now that CORE made progress */ + if (NULL == queue->read_task) + queue->read_task + = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining (queue->timeout), + queue->sock, + &queue_read, + queue); +} + + +/** + * We received @a plaintext_len bytes of @a plaintext on @a queue. + * Pass it on to CORE. If transmission is actually happening, + * increase backpressure counter. + * + * @param queue the queue that received the plaintext + * @param plaintext the plaintext that was received + * @param plaintext_len number of bytes of plaintext received + */ +static void +pass_plaintext_to_core (struct Queue *queue, + const void *plaintext, + size_t plaintext_len) +{ + const struct GNUNET_MessageHeader *hdr = plaintext; + int ret; + + if (ntohs (hdr->size) != plaintext_len) + { + /* NOTE: If we ever allow multiple CORE messages in one + BOX, this will have to change! */ + GNUNET_break (0); + return; + } + ret = GNUNET_TRANSPORT_communicator_receive (ch, + &queue->target, + hdr, + &core_read_finished_cb, + queue); + if (GNUNET_OK == ret) + queue->backpressure++; + GNUNET_break (GNUNET_NO != ret); /* backpressure not working!? */ + if (GNUNET_SYSERR == ret) + GNUNET_STATISTICS_update (stats, + "# bytes lost due to CORE not running", + plaintext_len, + GNUNET_NO); +} + + +/** + * Test if we have received a full message in plaintext. + * If so, handle it. + * + * @param queue queue to process inbound plaintext for + */ +static void +try_handle_plaintext (struct Queue *queue) +{ + const struct GNUNET_MessageHeader *hdr + = (const struct GNUNET_MessageHeader *) queue->pread_buf; + const struct TCPBox *box + = (const struct TCPBox *) queue->pread_buf; + const struct TCPRekey *rekey + = (const struct TCPRekey *) queue->pread_buf; + const struct TCPFinish *fin + = (const struct TCPFinish *) queue->pread_buf; + struct TCPRekey rekeyz; + struct TCPFinish finz; + struct GNUNET_ShortHashCode tmac; + uint16_t type; + size_t size = 0; /* make compiler happy */ + + if (sizeof (*hdr) > queue->pread_off) + return; /* not even a header */ + type = ntohs (hdr->type); + switch (type) + { + case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_BOX: + /* Special case: header size excludes box itself! */ + if (ntohs (hdr->size) + sizeof (struct TCPBox) > queue->pread_off) + return; + hmac (&queue->in_hmac, + &box[1], + ntohs (hdr->size), + &tmac); + if (0 != memcmp (&tmac, + &box->hmac, + sizeof (tmac))) + { + GNUNET_break_op (0); + queue_finish (queue); + return; + } + pass_plaintext_to_core (queue, + (const void *) &box[1], + ntohs (hdr->size)); + size = ntohs (hdr->size) + sizeof (*box); + break; + case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_REKEY: + if (sizeof (*rekey) > queue->pread_off) + return; + if (ntohs (hdr->size) != sizeof (*rekey)) + { + GNUNET_break_op (0); + queue_finish (queue); + return; + } + rekeyz = *rekey; + memset (&rekeyz.hmac, + 0, + sizeof (rekeyz.hmac)); + hmac (&queue->in_hmac, + &rekeyz, + sizeof (rekeyz), + &tmac); + if (0 != memcmp (&tmac, + &box->hmac, + sizeof (tmac))) + { + GNUNET_break_op (0); + queue_finish (queue); + return; + } + // FIXME: handle rekey! + + size = ntohs (hdr->size); + break; + case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_FINISH: + if (sizeof (*fin) > queue->pread_off) + return; + if (ntohs (hdr->size) != sizeof (*fin)) + { + GNUNET_break_op (0); + queue_finish (queue); + return; + } + finz = *fin; + memset (&finz.hmac, + 0, + sizeof (finz.hmac)); + hmac (&queue->in_hmac, + &rekeyz, + sizeof (rekeyz), + &tmac); + if (0 != memcmp (&tmac, + &fin->hmac, + sizeof (tmac))) + { + GNUNET_break_op (0); + queue_finish (queue); + return; + } + /* handle FINISH by destroying queue */ + queue_destroy (queue); + break; + default: + GNUNET_break_op (0); + queue_finish (queue); + return; + } + GNUNET_assert (0 != size); + /* 'size' bytes of plaintext were used, shift buffer */ + GNUNET_assert (size <= queue->pread_off); + memmove (queue->pread_buf, + &queue->pread_buf[size], + queue->pread_off - size); + queue->pread_off -= size; +} + + /** * Queue read task. If we hit the timeout, disconnect it * @@ -718,10 +935,20 @@ queue_read (void *cls) queue->cread_off += rcvd; if (queue->pread_off < sizeof (queue->pread_buf)) { - /* FIXME: decrypt */ - - /* FIXME: check plaintext for complete messages, if complete, hand to CORE */ - /* FIXME: CORE flow control: suspend doing more until CORE has ACKed */ + size_t max = GNUNET_MIN (sizeof (queue->pread_buf) - queue->pread_off, + queue->cread_off); + GNUNET_assert (0 == + gcry_cipher_decrypt (queue->in_cipher, + &queue->pread_buf[queue->pread_off], + max, + queue->cread_buf, + max)); + queue->pread_off += max; + memmove (queue->cread_buf, + &queue->cread_buf[max], + queue->cread_off - max); + queue->cread_off -= max; + try_handle_plaintext (queue); } if (BUF_SIZE == queue->cread_off) @@ -729,14 +956,15 @@ queue_read (void *cls) left = GNUNET_TIME_absolute_get_remaining (queue->timeout); if (0 != left.rel_value_us) { - /* not actually our turn yet, but let's at least update - the monitor, it may think we're about to die ... */ - queue->read_task - = GNUNET_SCHEDULER_add_read_net (left, - queue->sock, - &queue_read, - queue); - + if (max_queue_length < queue->backpressure) + { + /* continue reading */ + queue->read_task + = GNUNET_SCHEDULER_add_read_net (left, + queue->sock, + &queue_read, + queue); + } return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -760,9 +988,119 @@ tcp_address_to_sockaddr (const char *bindto, socklen_t *sock_len) { struct sockaddr *in; - size_t slen; + unsigned int port; + char dummy[2]; + char *colon; + char *cp; + + if (1 == SSCANF (bindto, + "%u%1s", + &port, + dummy)) + { + /* interpreting value as just a PORT number */ + if (port > UINT16_MAX) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "BINDTO specification `%s' invalid: value too large for port\n", + bindto); + return NULL; + } + if (GNUNET_YES == + GNUNET_CONFIGURATION_get_value_yesno (cfg, + COMMUNICATOR_CONFIG_SECTION, + "DISABLE_V6")) + { + struct sockaddr_in *i4; + + i4 = GNUNET_malloc (sizeof (struct sockaddr_in)); + i4->sin_family = AF_INET; + i4->sin_port = htons ((uint16_t) port); + *sock_len = sizeof (struct sockaddr_in); + in = (struct sockaddr *) i4; + } + else + { + struct sockaddr_in6 *i6; + + i6 = GNUNET_malloc (sizeof (struct sockaddr_in6)); + i6->sin6_family = AF_INET6; + i6->sin6_port = htons ((uint16_t) port); + *sock_len = sizeof (struct sockaddr_in6); + in = (struct sockaddr *) i6; + } + return in; + } + cp = GNUNET_strdup (bindto); + colon = strrchr (cp, ':'); + if (NULL != colon) + { + /* interpet value after colon as port */ + *colon = '\0'; + colon++; + if (1 == SSCANF (colon, + "%u%1s", + &port, + dummy)) + { + /* interpreting value as just a PORT number */ + if (port > UINT16_MAX) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "BINDTO specification `%s' invalid: value too large for port\n", + bindto); + GNUNET_free (cp); + return NULL; + } + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "BINDTO specification `%s' invalid: last ':' not followed by number\n", + bindto); + GNUNET_free (cp); + return NULL; + } + } + else + { + /* interpret missing port as 0, aka pick any free one */ + port = 0; + } + { + /* try IPv4 */ + struct sockaddr_in v4; - /* FIXME: parse, allocate, return! */ + if (1 == inet_pton (AF_INET, + cp, + &v4)) + { + v4.sin_port = htons ((uint16_t) port); + in = GNUNET_memdup (&v4, + sizeof (v4)); + *sock_len = sizeof (v4); + GNUNET_free (cp); + return in; + } + } + { + /* try IPv6 */ + struct sockaddr_in6 v6; + + if (1 == inet_pton (AF_INET6, + cp, + &v6)) + { + v6.sin6_port = htons ((uint16_t) port); + in = GNUNET_memdup (&v6, + sizeof (v6)); + *sock_len = sizeof (v6); + GNUNET_free (cp); + return in; + } + } + /* FIXME (feature!): maybe also try getnameinfo()? */ + GNUNET_free (cp); return NULL; } @@ -966,8 +1304,8 @@ queue_write (void *cls) size_t usent = (size_t) sent; memmove (queue->cwrite_buf, - &queue->cwrite_buf[sent], - queue->cwrite_off - sent); + &queue->cwrite_buf[usent], + queue->cwrite_off - usent); reschedule_queue_timeout (queue); } /* can we encrypt more? (always encrypt full messages, needed @@ -1670,6 +2008,11 @@ get_queue_delete_it (void *cls, static void do_shutdown (void *cls) { + if (NULL != nat) + { + GNUNET_NAT_unregister (nat); + nat = NULL; + } if (NULL != listen_task) { GNUNET_SCHEDULER_cancel (listen_task); @@ -1685,11 +2028,6 @@ do_shutdown (void *cls) &get_queue_delete_it, NULL); GNUNET_CONTAINER_multipeermap_destroy (queue_map); - if (NULL != ai) - { - GNUNET_TRANSPORT_communicator_address_remove (ai); - ai = NULL; - } if (NULL != ch) { GNUNET_TRANSPORT_communicator_disconnect (ch); @@ -1732,6 +2070,51 @@ enc_notify_cb (void *cls, } +/** + * Signature of the callback passed to #GNUNET_NAT_register() for + * a function to call whenever our set of 'valid' addresses changes. + * + * @param cls closure + * @param add_remove #GNUNET_YES to add a new public IP address, + * #GNUNET_NO to remove a previous (now invalid) one + * @param ac address class the address belongs to + * @param addr either the previous or the new public IP address + * @param addrlen actual length of the @a addr + */ +static void +nat_address_cb (void *cls, + int add_remove, + enum GNUNET_NAT_AddressClass ac, + const struct sockaddr *addr, + socklen_t addrlen) +{ + char *my_addr; + static struct GNUNET_TRANSPORT_AddressIdentifier *ai; // FIXME: store in *ctx of NAT! + + if (GNUNET_YES == add_remove) + { + // FIXME: do better job at stringification of @a addr? + GNUNET_asprintf (&my_addr, + "%s-%s", + COMMUNICATOR_ADDRESS_PREFIX, + GNUNET_a2s (addr, + addrlen)); + // FIXME: translate 'ac' to 'nt'? + ai = GNUNET_TRANSPORT_communicator_address_add (ch, + my_addr, + GNUNET_NT_LOOPBACK, // FIXME: wrong NT! + GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_free (my_addr); + } + else + { + // FIXME: support removal! => improve NAT API! + GNUNET_TRANSPORT_communicator_address_remove (ai); + ai = NULL; + } +} + + /** * Setup communicator and launch network interactions. * @@ -1749,9 +2132,8 @@ run (void *cls, char *bindto; struct sockaddr *in; socklen_t in_len; - char *my_addr; - (void) cls; + (void) cls; cfg = c; if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg, @@ -1810,6 +2192,7 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", bindto); + GNUNET_free (bindto); stats = GNUNET_STATISTICS_create ("C-TCP", cfg); GNUNET_SCHEDULER_add_shutdown (&do_shutdown, @@ -1824,13 +2207,13 @@ run (void *cls, } GNUNET_CRYPTO_eddsa_key_get_public (my_private_key, &my_identity.public_key); - + /* start listening */ listen_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, listen_sock, &listen_cb, NULL); queue_map = GNUNET_CONTAINER_multipeermap_create (10, - GNUNET_NO); + GNUNET_NO); ch = GNUNET_TRANSPORT_communicator_connect (cfg, COMMUNICATOR_CONFIG_SECTION, COMMUNICATOR_ADDRESS_PREFIX, @@ -1843,24 +2226,17 @@ run (void *cls, { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); - GNUNET_free (bindto); return; } - // FIXME: bindto is wrong here, we MUST get our external - // IP address and really look at 'in' here as we might - // be bound to loopback or some other specific IP address! - GNUNET_asprintf (&my_addr, - "%s-%s", - COMMUNICATOR_ADDRESS_PREFIX, - bindto); - GNUNET_free (bindto); - // FIXME: based on our bindto, we might not be able to tell the - // network type yet! What to do here!? - ai = GNUNET_TRANSPORT_communicator_address_add (ch, - my_addr, - GNUNET_NT_LOOPBACK, // FIXME: wrong NT! - GNUNET_TIME_UNIT_FOREVER_REL); - GNUNET_free (my_addr); + nat = GNUNET_NAT_register (cfg, + COMMUNICATOR_CONFIG_SECTION, + IPPROTO_TCP, + 1 /* one address */, + (const struct sockaddr **) &in, + &in_len, + &nat_address_cb, + NULL /* FIXME: support reversal! */, + NULL /* closure */); } -- cgit v1.2.3