summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-communicator-udp.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
commit198c09654354d09a9b33f27cf095e0295f70826c (patch)
tree07aa088c8e9664dc76915cc6b664654da59359f4 /src/transport/gnunet-communicator-udp.c
parenta325c3eaa8450d325fe57959eac29da5496cfd6d (diff)
tng: more UDP communicator backchannels
Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter.
Diffstat (limited to 'src/transport/gnunet-communicator-udp.c')
-rw-r--r--src/transport/gnunet-communicator-udp.c384
1 files changed, 234 insertions, 150 deletions
diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c
index 344ba5180..46d9766d0 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -549,14 +549,24 @@ struct ReceiverAddress
struct GNUNET_CONTAINER_HeapNode *hn;
/**
- * Message queue we are providing for the #ch.
+ * KX message queue we are providing for the #ch.
*/
- struct GNUNET_MQ_Handle *mq;
+ struct GNUNET_MQ_Handle *kx_mq;
+
+ /**
+ * Default message queue we are providing for the #ch.
+ */
+ struct GNUNET_MQ_Handle *d_mq;
+
+ /**
+ * handle for KX queue with the #ch.
+ */
+ struct GNUNET_TRANSPORT_QueueHandle *kx_qh;
/**
- * handle for this queue with the #ch.
+ * handle for default queue with the #ch.
*/
- struct GNUNET_TRANSPORT_QueueHandle *qh;
+ struct GNUNET_TRANSPORT_QueueHandle *d_qh;
/**
* Timeout for this receiver address.
@@ -564,9 +574,14 @@ struct ReceiverAddress
struct GNUNET_TIME_Absolute timeout;
/**
- * MTU we allowed transport for this receiver right now.
+ * MTU we allowed transport for this receiver's KX queue.
*/
- size_t mtu;
+ size_t kx_mtu;
+
+ /**
+ * MTU we allowed transport for this receiver's default queue.
+ */
+ size_t d_mtu;
/**
* Length of the DLL at @a ss_head.
@@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Disconnecting receiver for peer `%s'\n",
GNUNET_i2s (&receiver->target));
- if (NULL != (mq = receiver->mq))
+ if (NULL != (mq = receiver->kx_mq))
{
- receiver->mq = NULL;
+ receiver->kx_mq = NULL;
GNUNET_MQ_destroy (mq);
}
- if (NULL != receiver->qh)
+ if (NULL != receiver->kx_qh)
{
- GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
- receiver->qh = NULL;
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+ receiver->kx_qh = NULL;
+ }
+ if (NULL != (mq = receiver->d_mq))
+ {
+ receiver->d_mq = NULL;
+ GNUNET_MQ_destroy (mq);
+ }
+ if (NULL != receiver->d_qh)
+ {
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+ receiver->d_qh = NULL;
}
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (receivers,
@@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value)
(void) pid;
for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Checking shared secrets\n");
if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode)))
{
uint32_t allowed;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found matching mac\n");
+ "Found matching mac\n");
allowed = ntohl (ack->sequence_max);
if (allowed > ss->sequence_allowed)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%u > %u (%u)\n", allowed, ss->sequence_allowed,
- receiver->acks_available);
+ "%u > %u (%u)\n", allowed, ss->sequence_allowed,
+ receiver->acks_available);
receiver->acks_available += (allowed - ss->sequence_allowed);
- if ((allowed - ss->sequence_allowed) == receiver->acks_available)
- {
- /* we just incremented from zero => MTU change! */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "we just incremented from zero => MTU change!\n");
- //TODO setup_receiver_mq (receiver);
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Tell transport we have more acks!\n");
+ GNUNET_TRANSPORT_communicator_mq_update (ch,
+ receiver->d_qh,
+ (allowed - ss->sequence_allowed),
+ 1);
ss->sequence_allowed = allowed;
/* move ss to head to avoid discarding it anytime soon! */
GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss);
@@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram, size_t pad_size)
* @param impl_state our `struct ReceiverAddress`
*/
static void
-mq_send (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
+mq_send_kx (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
{
struct ReceiverAddress *receiver = impl_state;
uint16_t msize = ntohs (msg->size);
+ struct UdpHandshakeSignature uhs;
+ struct UDPConfirmation uc;
+ struct InitialKX kx;
+ struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
+ char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)];
+ size_t dpos;
+ gcry_cipher_hd_t out_cipher;
+ struct SharedSecret *ss;
+
- GNUNET_assert (mq == receiver->mq);
- if (msize > receiver->mtu)
+ GNUNET_assert (mq == receiver->kx_mq);
+ if (msize > receiver->kx_mtu)
{
GNUNET_break (0);
receiver_destroy (receiver);
@@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq,
}
reschedule_receiver_timeout (receiver);
- if (0 == receiver->acks_available)
+ /* setup key material */
+ GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+
+ ss = setup_shared_secret_enc (&epriv, receiver);
+ setup_cipher (&ss->master, 0, &out_cipher);
+ /* compute 'uc' */
+ uc.sender = my_identity;
+ uc.monotonic_time =
+ GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
+ uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
+ uhs.purpose.size = htonl (sizeof(uhs));
+ uhs.sender = my_identity;
+ uhs.receiver = receiver->target;
+ GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
+ uhs.monotonic_time = uc.monotonic_time;
+ GNUNET_CRYPTO_eddsa_sign (my_private_key,
+ &uhs,
+ &uc.sender_sig);
+ /* Leave space for kx */
+ dpos = sizeof(kx);
+ /* Append encrypted uc to dgram */
+ GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
+ &dgram[dpos],
+ sizeof(uc),
+ &uc,
+ sizeof(uc)));
+ dpos += sizeof(uc);
+ /* Append encrypted payload to dgram */
+ GNUNET_assert (
+ 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
+ dpos += msize;
+ do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
+ /* Datagram starts with kx */
+ kx.ephemeral = uhs.ephemeral;
+ GNUNET_assert (
+ 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+ gcry_cipher_close (out_cipher);
+ memcpy (dgram, &kx, sizeof(kx));
+ if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
+ dgram,
+ sizeof(dgram),
+ receiver->address,
+ receiver->address_len))
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending KX to %s\n", GNUNET_a2s (receiver->address,
+ receiver->address_len));
+ GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Signature of functions implementing the sending functionality of a
+ * message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_send_d (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
+{
+ struct ReceiverAddress *receiver = impl_state;
+ uint16_t msize = ntohs (msg->size);
+
+ GNUNET_assert (mq == receiver->d_mq);
+ if ((msize > receiver->d_mtu) ||
+ (0 == receiver->acks_available))
{
- /* use KX encryption method */
- struct UdpHandshakeSignature uhs;
- struct UDPConfirmation uc;
- struct InitialKX kx;
- struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
- char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)];
- size_t dpos;
- gcry_cipher_hd_t out_cipher;
- struct SharedSecret *ss;
+ GNUNET_break (0);
+ receiver_destroy (receiver);
+ return;
+ }
+ reschedule_receiver_timeout (receiver);
- /* setup key material */
- GNUNET_CRYPTO_ecdhe_key_create (&epriv);
+ /* begin "BOX" encryption method, scan for ACKs from tail! */
+ for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
+ {
+ if (ss->sequence_used >= ss->sequence_allowed)
+ {
+ continue;
+ }
+ char dgram[sizeof(struct UDPBox) + receiver->d_mtu];
+ struct UDPBox *box;
+ gcry_cipher_hd_t out_cipher;
+ size_t dpos;
- ss = setup_shared_secret_enc (&epriv, receiver);
- setup_cipher (&ss->master, 0, &out_cipher);
- /* compute 'uc' */
- uc.sender = my_identity;
- uc.monotonic_time =
- GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
- uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
- uhs.purpose.size = htonl (sizeof(uhs));
- uhs.sender = my_identity;
- uhs.receiver = receiver->target;
- GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
- uhs.monotonic_time = uc.monotonic_time;
- GNUNET_CRYPTO_eddsa_sign (my_private_key,
- &uhs,
- &uc.sender_sig);
- /* Leave space for kx */
- dpos = sizeof(kx);
- /* Append encrypted uc to dgram */
- GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
- &dgram[dpos],
- sizeof(uc),
- &uc,
- sizeof(uc)));
- dpos += sizeof(uc);
+ box = (struct UDPBox *) dgram;
+ ss->sequence_used++;
+ get_kid (&ss->master, ss->sequence_used, &box->kid);
+ setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
/* Append encrypted payload to dgram */
+ dpos = sizeof(struct UDPBox);
GNUNET_assert (
0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
dpos += msize;
do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
- /* Datagram starts with kx */
- kx.ephemeral = uhs.ephemeral;
- GNUNET_assert (
- 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
+ GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
+ box->gcm_tag,
+ sizeof(box->gcm_tag)));
gcry_cipher_close (out_cipher);
- memcpy (dgram, &kx, sizeof(kx));
if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
dgram,
sizeof(dgram),
receiver->address,
receiver->address_len))
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending KX to %s\n", GNUNET_a2s (receiver->address,
- receiver->address_len));
GNUNET_MQ_impl_send_continue (mq);
- return;
- } /* End of KX encryption method */
-
- /* begin "BOX" encryption method, scan for ACKs from tail! */
- for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "In non-kx mode...\n");
- if (ss->sequence_used < ss->sequence_allowed)
+ receiver->acks_available--;
+ if (0 == receiver->acks_available)
{
- char dgram[sizeof(struct UDPBox) + receiver->mtu];
- struct UDPBox *box;
- gcry_cipher_hd_t out_cipher;
- size_t dpos;
-
- box = (struct UDPBox *) dgram;
- ss->sequence_used++;
- get_kid (&ss->master, ss->sequence_used, &box->kid);
- setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
- /* Append encrypted payload to dgram */
- dpos = sizeof(struct UDPBox);
- GNUNET_assert (
- 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
- dpos += msize;
- do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
- GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
- box->gcm_tag,
- sizeof(box->gcm_tag)));
- gcry_cipher_close (out_cipher);
- if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
- dgram,
- sizeof(dgram),
- receiver->address,
- receiver->address_len))
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
+ /* We have no more ACKs */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending data\n");
-
- GNUNET_MQ_impl_send_continue (mq);
- receiver->acks_available--;
- if (0 == receiver->acks_available)
- {
- /* We have no more ACKs => MTU change! */
- setup_receiver_mq (receiver);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No more acks, MTU changed\n");
- }
- return;
+ "No more acks\n");
}
}
- GNUNET_assert (0);
}
@@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq,
* @param impl_state our `struct ReceiverAddress`
*/
static void
-mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
+mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
struct ReceiverAddress *receiver = impl_state;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "MQ destroyed\n");
- if (mq == receiver->mq)
+ "Default MQ destroyed\n");
+ if (mq == receiver->d_mq)
{
- receiver->mq = NULL;
- //receiver_destroy (receiver);
+ receiver->d_mq = NULL;
+ receiver_destroy (receiver);
+ }
+}
+
+
+/**
+ * Signature of functions implementing the destruction of a message
+ * queue. Implementations must not free @a mq, but should take care
+ * of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state our `struct ReceiverAddress`
+ */
+static void
+mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct ReceiverAddress *receiver = impl_state;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "KX MQ destroyed\n");
+ if (mq == receiver->kx_mq)
+ {
+ receiver->kx_mq = NULL;
+ receiver_destroy (receiver);
}
}
@@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
{
size_t base_mtu;
- if (NULL != receiver->qh)
+ /*if (NULL != receiver->kx_qh)
{
- GNUNET_TRANSPORT_communicator_mq_del (receiver->qh);
- receiver->qh = NULL;
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
+ receiver->kx_qh = NULL;
}
- //GNUNET_assert (NULL == receiver->mq);
+ if (NULL != receiver->d_qh)
+ {
+ GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
+ receiver->d_qh = NULL;
+ }*/
+ // GNUNET_assert (NULL == receiver->mq);
switch (receiver->address->sa_family)
{
case AF_INET:
@@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
GNUNET_assert (0);
break;
}
- if (0 == receiver->acks_available)
- {
- /* MTU based on full KX messages */
- receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
- - sizeof(struct UDPConfirmation); /* 104 */
- }
- else
- {
- /* MTU based on BOXed messages */
- receiver->mtu = base_mtu - sizeof(struct UDPBox);
- }
+ /* MTU based on full KX messages */
+ receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
+ - sizeof(struct UDPConfirmation); /* 104 */
+ /* MTU based on BOXed messages */
+ receiver->d_mtu = base_mtu - sizeof(struct UDPBox);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Setting up MQs and QHs\n");
/* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
1404 (IPv4 + Box) bytes, depending on circumstances... */
- if (NULL == receiver->mq)
- receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
- &mq_destroy,
- &mq_cancel,
- receiver,
- NULL,
- &mq_error,
- receiver);
- receiver->qh =
+ if (NULL == receiver->kx_mq)
+ receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx,
+ &mq_destroy_kx,
+ &mq_cancel,
+ receiver,
+ NULL,
+ &mq_error,
+ receiver);
+ if (NULL == receiver->d_mq)
+ receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
+ &mq_destroy_d,
+ &mq_cancel,
+ receiver,
+ NULL,
+ &mq_error,
+ receiver);
+
+ receiver->kx_qh =
GNUNET_TRANSPORT_communicator_mq_add (ch,
&receiver->target,
receiver->foreign_addr,
- receiver->mtu,
+ receiver->kx_mtu,
+ GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
+ 0, /* Priority */
receiver->nt,
GNUNET_TRANSPORT_CS_OUTBOUND,
- receiver->mq);
+ receiver->kx_mq);
+ receiver->d_qh =
+ GNUNET_TRANSPORT_communicator_mq_add (ch,
+ &receiver->target,
+ receiver->foreign_addr,
+ receiver->d_mtu,
+ 0, /* Initialize with 0 acks */
+ 1, /* Priority */
+ receiver->nt,
+ GNUNET_TRANSPORT_CS_OUTBOUND,
+ receiver->d_mq);
+
}