From 198c09654354d09a9b33f27cf095e0295f70826c Mon Sep 17 00:00:00 2001 From: Martin Schanzenbach Date: Mon, 1 Jun 2020 16:39:35 +0200 Subject: tng: more UDP communicator backchannels Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter. --- src/transport/gnunet-communicator-udp.c | 384 +++++++++++++++++++------------- 1 file changed, 234 insertions(+), 150 deletions(-) (limited to 'src/transport/gnunet-communicator-udp.c') diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c index 344ba5180..46d9766d0 100644 --- a/src/transport/gnunet-communicator-udp.c +++ b/src/transport/gnunet-communicator-udp.c @@ -549,14 +549,24 @@ struct ReceiverAddress struct GNUNET_CONTAINER_HeapNode *hn; /** - * Message queue we are providing for the #ch. + * KX message queue we are providing for the #ch. */ - struct GNUNET_MQ_Handle *mq; + struct GNUNET_MQ_Handle *kx_mq; + + /** + * Default message queue we are providing for the #ch. + */ + struct GNUNET_MQ_Handle *d_mq; + + /** + * handle for KX queue with the #ch. + */ + struct GNUNET_TRANSPORT_QueueHandle *kx_qh; /** - * handle for this queue with the #ch. + * handle for default queue with the #ch. */ - struct GNUNET_TRANSPORT_QueueHandle *qh; + struct GNUNET_TRANSPORT_QueueHandle *d_qh; /** * Timeout for this receiver address. @@ -564,9 +574,14 @@ struct ReceiverAddress struct GNUNET_TIME_Absolute timeout; /** - * MTU we allowed transport for this receiver right now. + * MTU we allowed transport for this receiver's KX queue. */ - size_t mtu; + size_t kx_mtu; + + /** + * MTU we allowed transport for this receiver's default queue. + */ + size_t d_mtu; /** * Length of the DLL at @a ss_head. @@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting receiver for peer `%s'\n", GNUNET_i2s (&receiver->target)); - if (NULL != (mq = receiver->mq)) + if (NULL != (mq = receiver->kx_mq)) { - receiver->mq = NULL; + receiver->kx_mq = NULL; GNUNET_MQ_destroy (mq); } - if (NULL != receiver->qh) + if (NULL != receiver->kx_qh) { - GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); - receiver->qh = NULL; + GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh); + receiver->kx_qh = NULL; + } + if (NULL != (mq = receiver->d_mq)) + { + receiver->d_mq = NULL; + GNUNET_MQ_destroy (mq); + } + if (NULL != receiver->d_qh) + { + GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh); + receiver->d_qh = NULL; } GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (receivers, @@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) (void) pid; for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Checking shared secrets\n"); if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode))) { uint32_t allowed; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found matching mac\n"); + "Found matching mac\n"); allowed = ntohl (ack->sequence_max); if (allowed > ss->sequence_allowed) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u > %u (%u)\n", allowed, ss->sequence_allowed, - receiver->acks_available); + "%u > %u (%u)\n", allowed, ss->sequence_allowed, + receiver->acks_available); receiver->acks_available += (allowed - ss->sequence_allowed); - if ((allowed - ss->sequence_allowed) == receiver->acks_available) - { - /* we just incremented from zero => MTU change! */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "we just incremented from zero => MTU change!\n"); - //TODO setup_receiver_mq (receiver); - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Tell transport we have more acks!\n"); + GNUNET_TRANSPORT_communicator_mq_update (ch, + receiver->d_qh, + (allowed - ss->sequence_allowed), + 1); ss->sequence_allowed = allowed; /* move ss to head to avoid discarding it anytime soon! */ GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss); @@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram, size_t pad_size) * @param impl_state our `struct ReceiverAddress` */ static void -mq_send (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) +mq_send_kx (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) { struct ReceiverAddress *receiver = impl_state; uint16_t msize = ntohs (msg->size); + struct UdpHandshakeSignature uhs; + struct UDPConfirmation uc; + struct InitialKX kx; + struct GNUNET_CRYPTO_EcdhePrivateKey epriv; + char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)]; + size_t dpos; + gcry_cipher_hd_t out_cipher; + struct SharedSecret *ss; + - GNUNET_assert (mq == receiver->mq); - if (msize > receiver->mtu) + GNUNET_assert (mq == receiver->kx_mq); + if (msize > receiver->kx_mtu) { GNUNET_break (0); receiver_destroy (receiver); @@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq, } reschedule_receiver_timeout (receiver); - if (0 == receiver->acks_available) + /* setup key material */ + GNUNET_CRYPTO_ecdhe_key_create (&epriv); + + ss = setup_shared_secret_enc (&epriv, receiver); + setup_cipher (&ss->master, 0, &out_cipher); + /* compute 'uc' */ + uc.sender = my_identity; + uc.monotonic_time = + GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); + uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE); + uhs.purpose.size = htonl (sizeof(uhs)); + uhs.sender = my_identity; + uhs.receiver = receiver->target; + GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral); + uhs.monotonic_time = uc.monotonic_time; + GNUNET_CRYPTO_eddsa_sign (my_private_key, + &uhs, + &uc.sender_sig); + /* Leave space for kx */ + dpos = sizeof(kx); + /* Append encrypted uc to dgram */ + GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher, + &dgram[dpos], + sizeof(uc), + &uc, + sizeof(uc))); + dpos += sizeof(uc); + /* Append encrypted payload to dgram */ + GNUNET_assert ( + 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); + dpos += msize; + do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); + /* Datagram starts with kx */ + kx.ephemeral = uhs.ephemeral; + GNUNET_assert ( + 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag))); + gcry_cipher_close (out_cipher); + memcpy (dgram, &kx, sizeof(kx)); + if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, + dgram, + sizeof(dgram), + receiver->address, + receiver->address_len)) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending KX to %s\n", GNUNET_a2s (receiver->address, + receiver->address_len)); + GNUNET_MQ_impl_send_continue (mq); +} + + +/** + * Signature of functions implementing the sending functionality of a + * message queue. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state our `struct ReceiverAddress` + */ +static void +mq_send_d (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) +{ + struct ReceiverAddress *receiver = impl_state; + uint16_t msize = ntohs (msg->size); + + GNUNET_assert (mq == receiver->d_mq); + if ((msize > receiver->d_mtu) || + (0 == receiver->acks_available)) { - /* use KX encryption method */ - struct UdpHandshakeSignature uhs; - struct UDPConfirmation uc; - struct InitialKX kx; - struct GNUNET_CRYPTO_EcdhePrivateKey epriv; - char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)]; - size_t dpos; - gcry_cipher_hd_t out_cipher; - struct SharedSecret *ss; + GNUNET_break (0); + receiver_destroy (receiver); + return; + } + reschedule_receiver_timeout (receiver); - /* setup key material */ - GNUNET_CRYPTO_ecdhe_key_create (&epriv); + /* begin "BOX" encryption method, scan for ACKs from tail! */ + for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev) + { + if (ss->sequence_used >= ss->sequence_allowed) + { + continue; + } + char dgram[sizeof(struct UDPBox) + receiver->d_mtu]; + struct UDPBox *box; + gcry_cipher_hd_t out_cipher; + size_t dpos; - ss = setup_shared_secret_enc (&epriv, receiver); - setup_cipher (&ss->master, 0, &out_cipher); - /* compute 'uc' */ - uc.sender = my_identity; - uc.monotonic_time = - GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); - uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE); - uhs.purpose.size = htonl (sizeof(uhs)); - uhs.sender = my_identity; - uhs.receiver = receiver->target; - GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral); - uhs.monotonic_time = uc.monotonic_time; - GNUNET_CRYPTO_eddsa_sign (my_private_key, - &uhs, - &uc.sender_sig); - /* Leave space for kx */ - dpos = sizeof(kx); - /* Append encrypted uc to dgram */ - GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher, - &dgram[dpos], - sizeof(uc), - &uc, - sizeof(uc))); - dpos += sizeof(uc); + box = (struct UDPBox *) dgram; + ss->sequence_used++; + get_kid (&ss->master, ss->sequence_used, &box->kid); + setup_cipher (&ss->master, ss->sequence_used, &out_cipher); /* Append encrypted payload to dgram */ + dpos = sizeof(struct UDPBox); GNUNET_assert ( 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); dpos += msize; do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); - /* Datagram starts with kx */ - kx.ephemeral = uhs.ephemeral; - GNUNET_assert ( - 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag))); + GNUNET_assert (0 == gcry_cipher_gettag (out_cipher, + box->gcm_tag, + sizeof(box->gcm_tag))); gcry_cipher_close (out_cipher); - memcpy (dgram, &kx, sizeof(kx)); if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, dgram, sizeof(dgram), receiver->address, receiver->address_len)) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending KX to %s\n", GNUNET_a2s (receiver->address, - receiver->address_len)); GNUNET_MQ_impl_send_continue (mq); - return; - } /* End of KX encryption method */ - - /* begin "BOX" encryption method, scan for ACKs from tail! */ - for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "In non-kx mode...\n"); - if (ss->sequence_used < ss->sequence_allowed) + receiver->acks_available--; + if (0 == receiver->acks_available) { - char dgram[sizeof(struct UDPBox) + receiver->mtu]; - struct UDPBox *box; - gcry_cipher_hd_t out_cipher; - size_t dpos; - - box = (struct UDPBox *) dgram; - ss->sequence_used++; - get_kid (&ss->master, ss->sequence_used, &box->kid); - setup_cipher (&ss->master, ss->sequence_used, &out_cipher); - /* Append encrypted payload to dgram */ - dpos = sizeof(struct UDPBox); - GNUNET_assert ( - 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); - dpos += msize; - do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); - GNUNET_assert (0 == gcry_cipher_gettag (out_cipher, - box->gcm_tag, - sizeof(box->gcm_tag))); - gcry_cipher_close (out_cipher); - if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, - dgram, - sizeof(dgram), - receiver->address, - receiver->address_len)) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); + /* We have no more ACKs */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending data\n"); - - GNUNET_MQ_impl_send_continue (mq); - receiver->acks_available--; - if (0 == receiver->acks_available) - { - /* We have no more ACKs => MTU change! */ - setup_receiver_mq (receiver); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No more acks, MTU changed\n"); - } - return; + "No more acks\n"); } } - GNUNET_assert (0); } @@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq, * @param impl_state our `struct ReceiverAddress` */ static void -mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) +mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct ReceiverAddress *receiver = impl_state; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "MQ destroyed\n"); - if (mq == receiver->mq) + "Default MQ destroyed\n"); + if (mq == receiver->d_mq) { - receiver->mq = NULL; - //receiver_destroy (receiver); + receiver->d_mq = NULL; + receiver_destroy (receiver); + } +} + + +/** + * Signature of functions implementing the destruction of a message + * queue. Implementations must not free @a mq, but should take care + * of @a impl_state. + * + * @param mq the message queue to destroy + * @param impl_state our `struct ReceiverAddress` + */ +static void +mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state) +{ + struct ReceiverAddress *receiver = impl_state; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "KX MQ destroyed\n"); + if (mq == receiver->kx_mq) + { + receiver->kx_mq = NULL; + receiver_destroy (receiver); } } @@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver) { size_t base_mtu; - if (NULL != receiver->qh) + /*if (NULL != receiver->kx_qh) { - GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); - receiver->qh = NULL; + GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh); + receiver->kx_qh = NULL; } - //GNUNET_assert (NULL == receiver->mq); + if (NULL != receiver->d_qh) + { + GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh); + receiver->d_qh = NULL; + }*/ + // GNUNET_assert (NULL == receiver->mq); switch (receiver->address->sa_family) { case AF_INET: @@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver) GNUNET_assert (0); break; } - if (0 == receiver->acks_available) - { - /* MTU based on full KX messages */ - receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ - - sizeof(struct UDPConfirmation); /* 104 */ - } - else - { - /* MTU based on BOXed messages */ - receiver->mtu = base_mtu - sizeof(struct UDPBox); - } + /* MTU based on full KX messages */ + receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ + - sizeof(struct UDPConfirmation); /* 104 */ + /* MTU based on BOXed messages */ + receiver->d_mtu = base_mtu - sizeof(struct UDPBox); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Setting up MQs and QHs\n"); /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to 1404 (IPv4 + Box) bytes, depending on circumstances... */ - if (NULL == receiver->mq) - receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send, - &mq_destroy, - &mq_cancel, - receiver, - NULL, - &mq_error, - receiver); - receiver->qh = + if (NULL == receiver->kx_mq) + receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx, + &mq_destroy_kx, + &mq_cancel, + receiver, + NULL, + &mq_error, + receiver); + if (NULL == receiver->d_mq) + receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d, + &mq_destroy_d, + &mq_cancel, + receiver, + NULL, + &mq_error, + receiver); + + receiver->kx_qh = GNUNET_TRANSPORT_communicator_mq_add (ch, &receiver->target, receiver->foreign_addr, - receiver->mtu, + receiver->kx_mtu, + GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, + 0, /* Priority */ receiver->nt, GNUNET_TRANSPORT_CS_OUTBOUND, - receiver->mq); + receiver->kx_mq); + receiver->d_qh = + GNUNET_TRANSPORT_communicator_mq_add (ch, + &receiver->target, + receiver->foreign_addr, + receiver->d_mtu, + 0, /* Initialize with 0 acks */ + 1, /* Priority */ + receiver->nt, + GNUNET_TRANSPORT_CS_OUTBOUND, + receiver->d_mq); + } -- cgit v1.2.3