summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-communicator-tcp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-28 12:43:09 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-28 12:43:09 +0100
commit07533eec5c7b1637374ea1496595918861ac8b6d (patch)
treefe97a71907b96fe28c477526901895f9eaffcc87 /src/transport/gnunet-communicator-tcp.c
parentcadf559899f7dfaf24ed27cab923414058f207b3 (diff)
more work on TCP communicator, almost there
Diffstat (limited to 'src/transport/gnunet-communicator-tcp.c')
-rw-r--r--src/transport/gnunet-communicator-tcp.c498
1 files changed, 437 insertions, 61 deletions
diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c
index a94559bd2..050a5f225 100644
--- a/src/transport/gnunet-communicator-tcp.c
+++ b/src/transport/gnunet-communicator-tcp.c
@@ -24,14 +24,8 @@
* @author Christian Grothoff
*
* TODO:
- * - lots of basic adaptations (see FIXMEs), need NAT service
- * to determine our own listen IPs! Parsing of bindto spec!
- * - actual decryption and handling of boxes and rekeys!
- * - message queue management: flow control towards CORE!
- * (stop reading from socket until MQ send to core is done;
- * will need a counter as ONE read from socket may generate
- * multiple messages en route to CORE; tricky bit: queue
- * may die before we get MQ sent-done callbacks!)
+ * - NAT service API change to handle address stops!
+ * - handling of rekeys!
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -39,6 +33,7 @@
#include "gnunet_signatures.h"
#include "gnunet_constants.h"
#include "gnunet_nt_lib.h"
+#include "gnunet_nat_service.h"
#include "gnunet_statistics_service.h"
#include "gnunet_transport_communication_service.h"
@@ -391,10 +386,19 @@ struct Queue
struct GNUNET_TIME_Absolute timeout;
/**
+ * How may messages did we pass from this queue to CORE for which we
+ * have yet to receive an acknoweldgement that CORE is done with
+ * them? If "large" (or even just non-zero), we should throttle
+ * reading to provide flow control. See also #DEFAULT_MAX_QUEUE_LENGTH
+ * and #max_queue_length.
+ */
+ unsigned int backpressure;
+
+ /**
* Which network type does this queue use?
*/
enum GNUNET_NetworkType nt;
-
+
/**
* Is MQ awaiting a #GNUNET_MQ_impl_send_continue() call?
*/
@@ -406,6 +410,14 @@ struct Queue
int finishing;
/**
+ * Did we technically destroy this queue, but kept the allocation
+ * around because of @e backpressure not being zero yet? Used
+ * simply to delay the final #GNUNET_free() operation until
+ * #core_read_finished_cb() has been called.
+ */
+ int destroyed;
+
+ /**
* #GNUNET_YES after #inject_key() placed the rekey message into the
* plaintext buffer. Once the plaintext buffer is drained, this
* means we must switch to the new key material.
@@ -475,11 +487,6 @@ struct ProtoQueue
static struct GNUNET_SCHEDULER_Task *listen_task;
/**
- * Number of messages we currently have in our queues towards the transport service.
- */
-static unsigned long long delivering_messages;
-
-/**
* Maximum queue length before we stop reading towards the transport service.
*/
static unsigned long long max_queue_length;
@@ -505,11 +512,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
static struct GNUNET_NETWORK_Handle *listen_sock;
/**
- * Handle to the operation that publishes our address.
- */
-static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
-
-/**
* Our public key.
*/
static struct GNUNET_PeerIdentity my_identity;
@@ -525,6 +527,11 @@ static struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key;
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
+ * Connection to NAT service.
+ */
+static struct GNUNET_NAT_Handle *nat;
+
+/**
* Protoqueues DLL head.
*/
static struct ProtoQueue *proto_head;
@@ -588,7 +595,10 @@ queue_destroy (struct Queue *queue)
gcry_cipher_close (queue->in_cipher);
gcry_cipher_close (queue->out_cipher);
GNUNET_free (queue->address);
- GNUNET_free (queue);
+ if (0 != queue->backpressure)
+ queue->destroyed = GNUNET_YES;
+ else
+ GNUNET_free (queue);
if (NULL == listen_task)
listen_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
listen_sock,
@@ -685,6 +695,213 @@ reschedule_queue_timeout (struct Queue *queue)
* @param cls the `struct Queue *` to disconnect
*/
static void
+queue_read (void *cls);
+
+
+/**
+ * Core tells us it is done processing a message that transport
+ * received on a queue with status @a success.
+ *
+ * @param cls a `struct Queue *` where the message originally came from
+ * @param success #GNUNET_OK on success
+ */
+static void
+core_read_finished_cb (void *cls,
+ int success)
+{
+ struct Queue *queue = cls;
+
+ if (GNUNET_OK != success)
+ GNUNET_STATISTICS_update (stats,
+ "# messages lost in communicator API towards CORE",
+ 1,
+ GNUNET_NO);
+ queue->backpressure--;
+ /* handle deferred queue destruction */
+ if ( (queue->destroyed) &&
+ (0 == queue->backpressure) )
+ {
+ GNUNET_free (queue);
+ return;
+ }
+ reschedule_queue_timeout (queue);
+ /* possibly unchoke reading, now that CORE made progress */
+ if (NULL == queue->read_task)
+ queue->read_task
+ = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining (queue->timeout),
+ queue->sock,
+ &queue_read,
+ queue);
+}
+
+
+/**
+ * We received @a plaintext_len bytes of @a plaintext on @a queue.
+ * Pass it on to CORE. If transmission is actually happening,
+ * increase backpressure counter.
+ *
+ * @param queue the queue that received the plaintext
+ * @param plaintext the plaintext that was received
+ * @param plaintext_len number of bytes of plaintext received
+ */
+static void
+pass_plaintext_to_core (struct Queue *queue,
+ const void *plaintext,
+ size_t plaintext_len)
+{
+ const struct GNUNET_MessageHeader *hdr = plaintext;
+ int ret;
+
+ if (ntohs (hdr->size) != plaintext_len)
+ {
+ /* NOTE: If we ever allow multiple CORE messages in one
+ BOX, this will have to change! */
+ GNUNET_break (0);
+ return;
+ }
+ ret = GNUNET_TRANSPORT_communicator_receive (ch,
+ &queue->target,
+ hdr,
+ &core_read_finished_cb,
+ queue);
+ if (GNUNET_OK == ret)
+ queue->backpressure++;
+ GNUNET_break (GNUNET_NO != ret); /* backpressure not working!? */
+ if (GNUNET_SYSERR == ret)
+ GNUNET_STATISTICS_update (stats,
+ "# bytes lost due to CORE not running",
+ plaintext_len,
+ GNUNET_NO);
+}
+
+
+/**
+ * Test if we have received a full message in plaintext.
+ * If so, handle it.
+ *
+ * @param queue queue to process inbound plaintext for
+ */
+static void
+try_handle_plaintext (struct Queue *queue)
+{
+ const struct GNUNET_MessageHeader *hdr
+ = (const struct GNUNET_MessageHeader *) queue->pread_buf;
+ const struct TCPBox *box
+ = (const struct TCPBox *) queue->pread_buf;
+ const struct TCPRekey *rekey
+ = (const struct TCPRekey *) queue->pread_buf;
+ const struct TCPFinish *fin
+ = (const struct TCPFinish *) queue->pread_buf;
+ struct TCPRekey rekeyz;
+ struct TCPFinish finz;
+ struct GNUNET_ShortHashCode tmac;
+ uint16_t type;
+ size_t size = 0; /* make compiler happy */
+
+ if (sizeof (*hdr) > queue->pread_off)
+ return; /* not even a header */
+ type = ntohs (hdr->type);
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_BOX:
+ /* Special case: header size excludes box itself! */
+ if (ntohs (hdr->size) + sizeof (struct TCPBox) > queue->pread_off)
+ return;
+ hmac (&queue->in_hmac,
+ &box[1],
+ ntohs (hdr->size),
+ &tmac);
+ if (0 != memcmp (&tmac,
+ &box->hmac,
+ sizeof (tmac)))
+ {
+ GNUNET_break_op (0);
+ queue_finish (queue);
+ return;
+ }
+ pass_plaintext_to_core (queue,
+ (const void *) &box[1],
+ ntohs (hdr->size));
+ size = ntohs (hdr->size) + sizeof (*box);
+ break;
+ case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_REKEY:
+ if (sizeof (*rekey) > queue->pread_off)
+ return;
+ if (ntohs (hdr->size) != sizeof (*rekey))
+ {
+ GNUNET_break_op (0);
+ queue_finish (queue);
+ return;
+ }
+ rekeyz = *rekey;
+ memset (&rekeyz.hmac,
+ 0,
+ sizeof (rekeyz.hmac));
+ hmac (&queue->in_hmac,
+ &rekeyz,
+ sizeof (rekeyz),
+ &tmac);
+ if (0 != memcmp (&tmac,
+ &box->hmac,
+ sizeof (tmac)))
+ {
+ GNUNET_break_op (0);
+ queue_finish (queue);
+ return;
+ }
+ // FIXME: handle rekey!
+
+ size = ntohs (hdr->size);
+ break;
+ case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_FINISH:
+ if (sizeof (*fin) > queue->pread_off)
+ return;
+ if (ntohs (hdr->size) != sizeof (*fin))
+ {
+ GNUNET_break_op (0);
+ queue_finish (queue);
+ return;
+ }
+ finz = *fin;
+ memset (&finz.hmac,
+ 0,
+ sizeof (finz.hmac));
+ hmac (&queue->in_hmac,
+ &rekeyz,
+ sizeof (rekeyz),
+ &tmac);
+ if (0 != memcmp (&tmac,
+ &fin->hmac,
+ sizeof (tmac)))
+ {
+ GNUNET_break_op (0);
+ queue_finish (queue);
+ return;
+ }
+ /* handle FINISH by destroying queue */
+ queue_destroy (queue);
+ break;
+ default:
+ GNUNET_break_op (0);
+ queue_finish (queue);
+ return;
+ }
+ GNUNET_assert (0 != size);
+ /* 'size' bytes of plaintext were used, shift buffer */
+ GNUNET_assert (size <= queue->pread_off);
+ memmove (queue->pread_buf,
+ &queue->pread_buf[size],
+ queue->pread_off - size);
+ queue->pread_off -= size;
+}
+
+
+/**
+ * Queue read task. If we hit the timeout, disconnect it
+ *
+ * @param cls the `struct Queue *` to disconnect
+ */
+static void
queue_read (void *cls)
{
struct Queue *queue = cls;
@@ -718,10 +935,20 @@ queue_read (void *cls)
queue->cread_off += rcvd;
if (queue->pread_off < sizeof (queue->pread_buf))
{
- /* FIXME: decrypt */
-
- /* FIXME: check plaintext for complete messages, if complete, hand to CORE */
- /* FIXME: CORE flow control: suspend doing more until CORE has ACKed */
+ size_t max = GNUNET_MIN (sizeof (queue->pread_buf) - queue->pread_off,
+ queue->cread_off);
+ GNUNET_assert (0 ==
+ gcry_cipher_decrypt (queue->in_cipher,
+ &queue->pread_buf[queue->pread_off],
+ max,
+ queue->cread_buf,
+ max));
+ queue->pread_off += max;
+ memmove (queue->cread_buf,
+ &queue->cread_buf[max],
+ queue->cread_off - max);
+ queue->cread_off -= max;
+ try_handle_plaintext (queue);
}
if (BUF_SIZE == queue->cread_off)
@@ -729,14 +956,15 @@ queue_read (void *cls)
left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
if (0 != left.rel_value_us)
{
- /* not actually our turn yet, but let's at least update
- the monitor, it may think we're about to die ... */
- queue->read_task
- = GNUNET_SCHEDULER_add_read_net (left,
- queue->sock,
- &queue_read,
- queue);
-
+ if (max_queue_length < queue->backpressure)
+ {
+ /* continue reading */
+ queue->read_task
+ = GNUNET_SCHEDULER_add_read_net (left,
+ queue->sock,
+ &queue_read,
+ queue);
+ }
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -760,9 +988,119 @@ tcp_address_to_sockaddr (const char *bindto,
socklen_t *sock_len)
{
struct sockaddr *in;
- size_t slen;
+ unsigned int port;
+ char dummy[2];
+ char *colon;
+ char *cp;
+
+ if (1 == SSCANF (bindto,
+ "%u%1s",
+ &port,
+ dummy))
+ {
+ /* interpreting value as just a PORT number */
+ if (port > UINT16_MAX)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "BINDTO specification `%s' invalid: value too large for port\n",
+ bindto);
+ return NULL;
+ }
+ if (GNUNET_YES ==
+ GNUNET_CONFIGURATION_get_value_yesno (cfg,
+ COMMUNICATOR_CONFIG_SECTION,
+ "DISABLE_V6"))
+ {
+ struct sockaddr_in *i4;
+
+ i4 = GNUNET_malloc (sizeof (struct sockaddr_in));
+ i4->sin_family = AF_INET;
+ i4->sin_port = htons ((uint16_t) port);
+ *sock_len = sizeof (struct sockaddr_in);
+ in = (struct sockaddr *) i4;
+ }
+ else
+ {
+ struct sockaddr_in6 *i6;
+
+ i6 = GNUNET_malloc (sizeof (struct sockaddr_in6));
+ i6->sin6_family = AF_INET6;
+ i6->sin6_port = htons ((uint16_t) port);
+ *sock_len = sizeof (struct sockaddr_in6);
+ in = (struct sockaddr *) i6;
+ }
+ return in;
+ }
+ cp = GNUNET_strdup (bindto);
+ colon = strrchr (cp, ':');
+ if (NULL != colon)
+ {
+ /* interpet value after colon as port */
+ *colon = '\0';
+ colon++;
+ if (1 == SSCANF (colon,
+ "%u%1s",
+ &port,
+ dummy))
+ {
+ /* interpreting value as just a PORT number */
+ if (port > UINT16_MAX)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "BINDTO specification `%s' invalid: value too large for port\n",
+ bindto);
+ GNUNET_free (cp);
+ return NULL;
+ }
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "BINDTO specification `%s' invalid: last ':' not followed by number\n",
+ bindto);
+ GNUNET_free (cp);
+ return NULL;
+ }
+ }
+ else
+ {
+ /* interpret missing port as 0, aka pick any free one */
+ port = 0;
+ }
+ {
+ /* try IPv4 */
+ struct sockaddr_in v4;
- /* FIXME: parse, allocate, return! */
+ if (1 == inet_pton (AF_INET,
+ cp,
+ &v4))
+ {
+ v4.sin_port = htons ((uint16_t) port);
+ in = GNUNET_memdup (&v4,
+ sizeof (v4));
+ *sock_len = sizeof (v4);
+ GNUNET_free (cp);
+ return in;
+ }
+ }
+ {
+ /* try IPv6 */
+ struct sockaddr_in6 v6;
+
+ if (1 == inet_pton (AF_INET6,
+ cp,
+ &v6))
+ {
+ v6.sin6_port = htons ((uint16_t) port);
+ in = GNUNET_memdup (&v6,
+ sizeof (v6));
+ *sock_len = sizeof (v6);
+ GNUNET_free (cp);
+ return in;
+ }
+ }
+ /* FIXME (feature!): maybe also try getnameinfo()? */
+ GNUNET_free (cp);
return NULL;
}
@@ -966,8 +1304,8 @@ queue_write (void *cls)
size_t usent = (size_t) sent;
memmove (queue->cwrite_buf,
- &queue->cwrite_buf[sent],
- queue->cwrite_off - sent);
+ &queue->cwrite_buf[usent],
+ queue->cwrite_off - usent);
reschedule_queue_timeout (queue);
}
/* can we encrypt more? (always encrypt full messages, needed
@@ -1670,6 +2008,11 @@ get_queue_delete_it (void *cls,
static void
do_shutdown (void *cls)
{
+ if (NULL != nat)
+ {
+ GNUNET_NAT_unregister (nat);
+ nat = NULL;
+ }
if (NULL != listen_task)
{
GNUNET_SCHEDULER_cancel (listen_task);
@@ -1685,11 +2028,6 @@ do_shutdown (void *cls)
&get_queue_delete_it,
NULL);
GNUNET_CONTAINER_multipeermap_destroy (queue_map);
- if (NULL != ai)
- {
- GNUNET_TRANSPORT_communicator_address_remove (ai);
- ai = NULL;
- }
if (NULL != ch)
{
GNUNET_TRANSPORT_communicator_disconnect (ch);
@@ -1733,6 +2071,51 @@ enc_notify_cb (void *cls,
/**
+ * Signature of the callback passed to #GNUNET_NAT_register() for
+ * a function to call whenever our set of 'valid' addresses changes.
+ *
+ * @param cls closure
+ * @param add_remove #GNUNET_YES to add a new public IP address,
+ * #GNUNET_NO to remove a previous (now invalid) one
+ * @param ac address class the address belongs to
+ * @param addr either the previous or the new public IP address
+ * @param addrlen actual length of the @a addr
+ */
+static void
+nat_address_cb (void *cls,
+ int add_remove,
+ enum GNUNET_NAT_AddressClass ac,
+ const struct sockaddr *addr,
+ socklen_t addrlen)
+{
+ char *my_addr;
+ static struct GNUNET_TRANSPORT_AddressIdentifier *ai; // FIXME: store in *ctx of NAT!
+
+ if (GNUNET_YES == add_remove)
+ {
+ // FIXME: do better job at stringification of @a addr?
+ GNUNET_asprintf (&my_addr,
+ "%s-%s",
+ COMMUNICATOR_ADDRESS_PREFIX,
+ GNUNET_a2s (addr,
+ addrlen));
+ // FIXME: translate 'ac' to 'nt'?
+ ai = GNUNET_TRANSPORT_communicator_address_add (ch,
+ my_addr,
+ GNUNET_NT_LOOPBACK, // FIXME: wrong NT!
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ GNUNET_free (my_addr);
+ }
+ else
+ {
+ // FIXME: support removal! => improve NAT API!
+ GNUNET_TRANSPORT_communicator_address_remove (ai);
+ ai = NULL;
+ }
+}
+
+
+/**
* Setup communicator and launch network interactions.
*
* @param cls NULL (always)
@@ -1749,9 +2132,8 @@ run (void *cls,
char *bindto;
struct sockaddr *in;
socklen_t in_len;
- char *my_addr;
- (void) cls;
+ (void) cls;
cfg = c;
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_filename (cfg,
@@ -1810,6 +2192,7 @@ run (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Bound to `%s'\n",
bindto);
+ GNUNET_free (bindto);
stats = GNUNET_STATISTICS_create ("C-TCP",
cfg);
GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
@@ -1824,13 +2207,13 @@ run (void *cls,
}
GNUNET_CRYPTO_eddsa_key_get_public (my_private_key,
&my_identity.public_key);
-
+ /* start listening */
listen_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
listen_sock,
&listen_cb,
NULL);
queue_map = GNUNET_CONTAINER_multipeermap_create (10,
- GNUNET_NO);
+ GNUNET_NO);
ch = GNUNET_TRANSPORT_communicator_connect (cfg,
COMMUNICATOR_CONFIG_SECTION,
COMMUNICATOR_ADDRESS_PREFIX,
@@ -1843,24 +2226,17 @@ run (void *cls,
{
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (bindto);
return;
}
- // FIXME: bindto is wrong here, we MUST get our external
- // IP address and really look at 'in' here as we might
- // be bound to loopback or some other specific IP address!
- GNUNET_asprintf (&my_addr,
- "%s-%s",
- COMMUNICATOR_ADDRESS_PREFIX,
- bindto);
- GNUNET_free (bindto);
- // FIXME: based on our bindto, we might not be able to tell the
- // network type yet! What to do here!?
- ai = GNUNET_TRANSPORT_communicator_address_add (ch,
- my_addr,
- GNUNET_NT_LOOPBACK, // FIXME: wrong NT!
- GNUNET_TIME_UNIT_FOREVER_REL);
- GNUNET_free (my_addr);
+ nat = GNUNET_NAT_register (cfg,
+ COMMUNICATOR_CONFIG_SECTION,
+ IPPROTO_TCP,
+ 1 /* one address */,
+ (const struct sockaddr **) &in,
+ &in_len,
+ &nat_address_cb,
+ NULL /* FIXME: support reversal! */,
+ NULL /* closure */);
}