From 1d82e0617ef9538824a0e58692ecb13808f4f829 Mon Sep 17 00:00:00 2001 From: marshall Date: Thu, 6 Jul 2023 15:40:56 -0400 Subject: transport (quic): add functions for mq handling --- src/transport/gnunet-communicator-quic.c | 462 ++++++++++++++++++++++++++----- 1 file changed, 399 insertions(+), 63 deletions(-) diff --git a/src/transport/gnunet-communicator-quic.c b/src/transport/gnunet-communicator-quic.c index 08bd35fc2..7cf95368d 100644 --- a/src/transport/gnunet-communicator-quic.c +++ b/src/transport/gnunet-communicator-quic.c @@ -10,16 +10,12 @@ #include "gnunet_transport_communication_service.h" #include "gnunet_nt_lib.h" #include "gnunet_nat_service.h" - #include "stdint.h" #include "inttypes.h" -#define DEFAULT_REKEY_TIME_INTERVAL GNUNET_TIME_UNIT_DAYS + #define COMMUNICATOR_CONFIG_SECTION "communicator-quic" -#define DEFAULT_REKEY_MAX_BYTES (1024LLU * 1024 * 1024 * 4LLU) #define COMMUNICATOR_ADDRESS_PREFIX "quic" #define MAX_DATAGRAM_SIZE 1350 -// #define STREAM_ID_MAX (UINT64_MAX - (0b11 << 62)) -// #define STREAM_ID_MAX UINT64_MAX - 0xC000000000000000 /* Currently equivalent to QUICHE_MAX_CONN_ID_LEN */ #define LOCAL_CONN_ID_LEN 20 @@ -27,14 +23,12 @@ sizeof("quiche") - 1 + \ sizeof(struct sockaddr_storage) + \ QUICHE_MAX_CONN_ID_LEN - #define CID_LEN sizeof(uint8_t) * QUICHE_MAX_CONN_ID_LEN #define TOKEN_LEN sizeof(uint8_t) * MAX_TOKEN_LEN /** * Map of DCID (uint8_t) -> quic_conn for quickly retrieving connections to other peers. */ struct GNUNET_CONTAINER_MultiHashMap *conn_map; - static const struct GNUNET_CONFIGURATION_Handle *cfg; static struct GNUNET_TIME_Relative rekey_interval; static struct GNUNET_NETWORK_Handle *udp_sock; @@ -44,8 +38,101 @@ static struct GNUNET_TRANSPORT_ApplicationHandle *ah; static int have_v6_socket; static uint16_t my_port; static unsigned long long rekey_max_bytes; - static quiche_config *config = NULL; + +/** + * Information we track per receiving address we have recently been + * in contact with (encryption to receiver). + */ +struct ReceiverAddress +{ + /** + * To whom are we talking to. + */ + struct GNUNET_PeerIdentity target; + + /** + * Address of the receiver in the human-readable format + * with the #COMMUNICATOR_ADDRESS_PREFIX. + */ + char *foreign_addr; + + /** + * Address of the other peer. + */ + struct sockaddr *address; + + /** + * Length of the address. + */ + socklen_t address_len; + + /** + * Default message queue we are providing for the #ch. + */ + struct GNUNET_MQ_Handle *d_mq; + + /** + * handle for default queue with the #ch. + */ + struct GNUNET_TRANSPORT_QueueHandle *d_qh; + + /** + * Timeout for this receiver address. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * MTU we allowed transport for this receiver's default queue. + */ + size_t d_mtu; + + /** + * Which network type does this queue use? + */ + enum GNUNET_NetworkType nt; + + /** + * receiver_destroy already called on receiver. + */ + int receiver_destroy_called; + + /** + * Entry in sender expiration heap. + */ + struct GNUNET_CONTAINER_HeapNode *hn; +}; + +/** + * Receivers (map from peer identity to `struct ReceiverAddress`) + */ +static struct GNUNET_CONTAINER_MultiPeerMap *receivers; + +/** + * Expiration heap for senders (contains `struct SenderAddress`) + */ +static struct GNUNET_CONTAINER_Heap *senders_heap; + +/** + * Expiration heap for receivers (contains `struct ReceiverAddress`) + */ +static struct GNUNET_CONTAINER_Heap *receivers_heap; + +/** + * ID of timeout task + */ +static struct GNUNET_SCHEDULER_Task *timeout_task; + +/** + * Network scanner to determine network types. + */ +static struct GNUNET_NT_InterfaceScanner *is; + +/** + * For logging statistics. + */ +static struct GNUNET_STATISTICS_Handle *stats; + /** * QUIC connection object. A connection has a unique SCID/DCID pair. Here we store our SCID * (incoming packet DCID field == outgoing packet SCID field) for a given connection. This @@ -251,42 +338,46 @@ flush_egress (struct quic_conn *conn) /** - * Shutdown the QUIC communicator. + * Signature of functions implementing the sending functionality of a + * message queue. * - * @param cls NULL (always) + * @param mq the message queue + * @param msg the message to send + * @param impl_state our `struct ReceiverAddress` */ static void -do_shutdown (void *cls) +mq_send_d (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "do_shutdown\n"); - - GNUNET_CONTAINER_multihashmap_destroy (conn_map); - quiche_config_free (config); + struct ReceiverAddress *receiver = impl_state; + uint16_t msize = ntohs (msg->size); - if (NULL != read_task) - { - GNUNET_SCHEDULER_cancel (read_task); - read_task = NULL; - } - if (NULL != udp_sock) - { - GNUNET_break (GNUNET_OK == - GNUNET_NETWORK_socket_close (udp_sock)); - udp_sock = NULL; - } - if (NULL != ch) + GNUNET_assert (mq == receiver->d_mq); + if (msize > receiver->d_mtu) { - GNUNET_TRANSPORT_communicator_disconnect (ch); - ch = NULL; - } - if (NULL != ah) - { - GNUNET_TRANSPORT_application_done (ah); - ah = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "msize: %u, mtu: %lu\n", + msize, + receiver->d_mtu); + GNUNET_break (0); + if (GNUNET_YES != receiver->receiver_destroy_called) + receiver_destroy (receiver); + return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "do_shutdown finished\n"); + reschedule_receiver_timeout (receiver); + // if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, + // dgram, + // sizeof(dgram), + // receiver->address, + // receiver->address_len)) + // GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); + // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + // "Sending UDPBox with payload size %u, %u acks left\n", + // msize, + // receiver->acks_available); + // GNUNET_MQ_impl_send_continue (mq); + // return; } @@ -425,6 +516,251 @@ udp_address_to_sockaddr (const char *bindto, socklen_t *sock_len) } +/** + * Setup the MQ for the @a receiver. If a queue exists, + * the existing one is destroyed. Then the MTU is + * recalculated and a fresh queue is initialized. + * + * @param receiver receiver to setup MQ for + */ +static void +setup_receiver_mq (struct ReceiverAddress *receiver) +{ + size_t base_mtu; + + switch (receiver->address->sa_family) + { + case AF_INET: + base_mtu = 1480 /* Ethernet MTU, 1500 - Ethernet header - VLAN tag */ + - sizeof(struct GNUNET_TUN_IPv4Header) /* 20 */ + - sizeof(struct GNUNET_TUN_UdpHeader) /* 8 */; + break; + + case AF_INET6: + base_mtu = 1280 /* Minimum MTU required by IPv6 */ + - sizeof(struct GNUNET_TUN_IPv6Header) /* 40 */ + - sizeof(struct GNUNET_TUN_UdpHeader) /* 8 */; + break; + + default: + GNUNET_assert (0); + break; + } + /* MTU == base_mtu */ + receiver->d_mtu = base_mtu; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Setting up MQs and QHs\n"); + /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to + 1404 (IPv4 + Box) bytes, depending on circumstances... */ + if (NULL == receiver->d_mq) + receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d, + &mq_destroy_d, + &mq_cancel, + receiver, + NULL, + &mq_error, + receiver); + receiver->d_qh = + GNUNET_TRANSPORT_communicator_mq_add (ch, + &receiver->target, + receiver->foreign_addr, + receiver->d_mtu, + GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, + 0, /* Priority */ + receiver->nt, + GNUNET_TRANSPORT_CS_OUTBOUND, + receiver->d_mq); +} + + +/** + * Function called when the transport service has received a + * backchannel message for this communicator (!) via a different return + * path. Should be an acknowledgement. + * + * @param cls closure, NULL + * @param sender which peer sent the notification + * @param msg payload + */ +static void +notify_cb (void *cls, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *msg) +{ + // const struct UDPAck *ack; + + // (void) cls; + // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + // "Storing UDPAck received from backchannel from %s\n", + // GNUNET_i2s_full (sender)); + // if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_ACK) || + // (ntohs (msg->size) != sizeof(struct UDPAck))) + // { + // GNUNET_break_op (0); + // return; + // } + // ack = (const struct UDPAck *) msg; + // GNUNET_CONTAINER_multipeermap_get_multiple (receivers, + // sender, + // &handle_ack, + // (void *) ack); +} + + +/** + * Task run to check #receiver_heap and #sender_heap for timeouts. + * + * @param cls unused, NULL + */ +static void +check_timeouts (void *cls) +{ + // struct GNUNET_TIME_Relative st; + // struct GNUNET_TIME_Relative rt; + // struct GNUNET_TIME_Relative delay; + // struct ReceiverAddress *receiver; + // struct SenderAddress *sender; + + // (void) cls; + // timeout_task = NULL; + // rt = GNUNET_TIME_UNIT_FOREVER_REL; + // while (NULL != (receiver = GNUNET_CONTAINER_heap_peek (receivers_heap))) + // { + // /* if (GNUNET_YES != receiver->receiver_destroy_called) */ + // /* { */ + // rt = GNUNET_TIME_absolute_get_remaining (receiver->timeout); + // if (0 != rt.rel_value_us) + // break; + // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + // "Receiver timed out\n"); + // receiver_destroy (receiver); + // // } + // } + // st = GNUNET_TIME_UNIT_FOREVER_REL; + // while (NULL != (sender = GNUNET_CONTAINER_heap_peek (senders_heap))) + // { + // if (GNUNET_YES != sender->sender_destroy_called) + // { + // st = GNUNET_TIME_absolute_get_remaining (sender->timeout); + // if (0 != st.rel_value_us) + // break; + // sender_destroy (sender); + // } + // } + // delay = GNUNET_TIME_relative_min (rt, st); + // if (delay.rel_value_us < GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us) + // timeout_task = GNUNET_SCHEDULER_add_delayed (delay, &check_timeouts, NULL); +} + + +/** + * Function called by the transport service to initialize a + * message queue given address information about another peer. + * If and when the communication channel is established, the + * communicator must call #GNUNET_TRANSPORT_communicator_mq_add() + * to notify the service that the channel is now up. It is + * the responsibility of the communicator to manage sane + * retries and timeouts for any @a peer/@a address combination + * provided by the transport service. Timeouts and retries + * do not need to be signalled to the transport service. + * + * @param cls closure + * @param peer identity of the other peer + * @param address where to send the message, human-readable + * communicator-specific format, 0-terminated, UTF-8 + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is + * invalid + */ +static int +mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address) +{ + struct ReceiverAddress *receiver; + const char *path; + struct sockaddr *in; + socklen_t in_len; + + if (0 != strncmp (address, + COMMUNICATOR_ADDRESS_PREFIX "-", + strlen (COMMUNICATOR_ADDRESS_PREFIX "-"))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")]; + in = udp_address_to_sockaddr (path, &in_len); + + receiver = GNUNET_new (struct ReceiverAddress); + receiver->address = in; + receiver->address_len = in_len; + receiver->target = *peer; + receiver->nt = GNUNET_NT_scanner_get_type (is, in, in_len); + (void) GNUNET_CONTAINER_multipeermap_put ( + receivers, + &receiver->target, + receiver, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Added %s to receivers\n", + GNUNET_i2s_full (&receiver->target)); + receiver->timeout = + GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + receiver->hn = GNUNET_CONTAINER_heap_insert (receivers_heap, + receiver, + receiver->timeout.abs_value_us); + GNUNET_STATISTICS_set (stats, + "# receivers active", + GNUNET_CONTAINER_multipeermap_size (receivers), + GNUNET_NO); + receiver->foreign_addr = + sockaddr_to_udpaddr_string (receiver->address, receiver->address_len); + setup_receiver_mq (receiver); + if (NULL == timeout_task) + timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts, NULL); + return GNUNET_OK; +} + + +/** + * Shutdown the QUIC communicator. + * + * @param cls NULL (always) + */ +static void +do_shutdown (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "do_shutdown\n"); + + GNUNET_CONTAINER_multihashmap_destroy (conn_map); + quiche_config_free (config); + + if (NULL != read_task) + { + GNUNET_SCHEDULER_cancel (read_task); + read_task = NULL; + } + if (NULL != udp_sock) + { + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (udp_sock)); + udp_sock = NULL; + } + if (NULL != ch) + { + GNUNET_TRANSPORT_communicator_disconnect (ch); + ch = NULL; + } + if (NULL != ah) + { + GNUNET_TRANSPORT_application_done (ah); + ah = NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "do_shutdown finished\n"); +} + + static void sock_read (void *cls) { @@ -704,19 +1040,19 @@ run (void *cls, return; } - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_time (cfg, - COMMUNICATOR_CONFIG_SECTION, - "REKEY_INTERVAL", - &rekey_interval)) - rekey_interval = DEFAULT_REKEY_TIME_INTERVAL; + // if (GNUNET_OK != + // GNUNET_CONFIGURATION_get_value_time (cfg, + // COMMUNICATOR_CONFIG_SECTION, + // "REKEY_INTERVAL", + // &rekey_interval)) + // rekey_interval = DEFAULT_REKEY_TIME_INTERVAL; - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_size (cfg, - COMMUNICATOR_CONFIG_SECTION, - "REKEY_MAX_BYTES", - &rekey_max_bytes)) - rekey_max_bytes = DEFAULT_REKEY_MAX_BYTES; + // if (GNUNET_OK != + // GNUNET_CONFIGURATION_get_value_size (cfg, + // COMMUNICATOR_CONFIG_SECTION, + // "REKEY_MAX_BYTES", + // &rekey_max_bytes)) + // rekey_max_bytes = DEFAULT_REKEY_MAX_BYTES; in = udp_address_to_sockaddr (bindto, &in_len); @@ -792,20 +1128,20 @@ run (void *cls, udp_sock, &sock_read, NULL); - // ch = GNUNET_TRANSPORT_communicator_connect (cfg, - // COMMUNICATOR_CONFIG_SECTION, - // COMMUNICATOR_ADDRESS_PREFIX, - // GNUNET_TRANSPORT_CC_UNRELIABLE, - // &mq_init, - // NULL, - // &enc_notify_cb, - // NULL); - // if (NULL == ch) - // { - // GNUNET_break (0); - // GNUNET_SCHEDULER_shutdown (); - // return; - // } + ch = GNUNET_TRANSPORT_communicator_connect (cfg, + COMMUNICATOR_CONFIG_SECTION, + COMMUNICATOR_ADDRESS_PREFIX, + GNUNET_TRANSPORT_CC_RELIABLE, + &mq_init, + NULL, + ¬ify_cb, + NULL); + if (NULL == ch) + { + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + } ah = GNUNET_TRANSPORT_application_init (cfg); if (NULL == ah) { -- cgit v1.2.3