aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormarshall <stmr@umich.edu>2023-07-06 15:40:56 -0400
committermarshall <stmr@umich.edu>2023-07-18 11:12:18 -0400
commit1d82e0617ef9538824a0e58692ecb13808f4f829 (patch)
treea10c71a9a12475ffd6738ca6293f1bfaa2c413a2
parentc96d793732120a652fef861cad961e0250b787bc (diff)
downloadgnunet-1d82e0617ef9538824a0e58692ecb13808f4f829.tar.gz
gnunet-1d82e0617ef9538824a0e58692ecb13808f4f829.zip
transport (quic): add functions for mq handling
-rw-r--r--src/transport/gnunet-communicator-quic.c462
1 files changed, 399 insertions, 63 deletions
diff --git a/src/transport/gnunet-communicator-quic.c b/src/transport/gnunet-communicator-quic.c
index 08bd35fc2..7cf95368d 100644
--- a/src/transport/gnunet-communicator-quic.c
+++ b/src/transport/gnunet-communicator-quic.c
@@ -10,16 +10,12 @@
10#include "gnunet_transport_communication_service.h" 10#include "gnunet_transport_communication_service.h"
11#include "gnunet_nt_lib.h" 11#include "gnunet_nt_lib.h"
12#include "gnunet_nat_service.h" 12#include "gnunet_nat_service.h"
13
14#include "stdint.h" 13#include "stdint.h"
15#include "inttypes.h" 14#include "inttypes.h"
16#define DEFAULT_REKEY_TIME_INTERVAL GNUNET_TIME_UNIT_DAYS 15
17#define COMMUNICATOR_CONFIG_SECTION "communicator-quic" 16#define COMMUNICATOR_CONFIG_SECTION "communicator-quic"
18#define DEFAULT_REKEY_MAX_BYTES (1024LLU * 1024 * 1024 * 4LLU)
19#define COMMUNICATOR_ADDRESS_PREFIX "quic" 17#define COMMUNICATOR_ADDRESS_PREFIX "quic"
20#define MAX_DATAGRAM_SIZE 1350 18#define MAX_DATAGRAM_SIZE 1350
21// #define STREAM_ID_MAX (UINT64_MAX - (0b11 << 62))
22// #define STREAM_ID_MAX UINT64_MAX - 0xC000000000000000
23 19
24/* Currently equivalent to QUICHE_MAX_CONN_ID_LEN */ 20/* Currently equivalent to QUICHE_MAX_CONN_ID_LEN */
25#define LOCAL_CONN_ID_LEN 20 21#define LOCAL_CONN_ID_LEN 20
@@ -27,14 +23,12 @@
27 sizeof("quiche") - 1 + \ 23 sizeof("quiche") - 1 + \
28 sizeof(struct sockaddr_storage) + \ 24 sizeof(struct sockaddr_storage) + \
29 QUICHE_MAX_CONN_ID_LEN 25 QUICHE_MAX_CONN_ID_LEN
30
31#define CID_LEN sizeof(uint8_t) * QUICHE_MAX_CONN_ID_LEN 26#define CID_LEN sizeof(uint8_t) * QUICHE_MAX_CONN_ID_LEN
32#define TOKEN_LEN sizeof(uint8_t) * MAX_TOKEN_LEN 27#define TOKEN_LEN sizeof(uint8_t) * MAX_TOKEN_LEN
33/** 28/**
34 * Map of DCID (uint8_t) -> quic_conn for quickly retrieving connections to other peers. 29 * Map of DCID (uint8_t) -> quic_conn for quickly retrieving connections to other peers.
35 */ 30 */
36struct GNUNET_CONTAINER_MultiHashMap *conn_map; 31struct GNUNET_CONTAINER_MultiHashMap *conn_map;
37
38static const struct GNUNET_CONFIGURATION_Handle *cfg; 32static const struct GNUNET_CONFIGURATION_Handle *cfg;
39static struct GNUNET_TIME_Relative rekey_interval; 33static struct GNUNET_TIME_Relative rekey_interval;
40static struct GNUNET_NETWORK_Handle *udp_sock; 34static struct GNUNET_NETWORK_Handle *udp_sock;
@@ -44,8 +38,101 @@ static struct GNUNET_TRANSPORT_ApplicationHandle *ah;
44static int have_v6_socket; 38static int have_v6_socket;
45static uint16_t my_port; 39static uint16_t my_port;
46static unsigned long long rekey_max_bytes; 40static unsigned long long rekey_max_bytes;
47
48static quiche_config *config = NULL; 41static quiche_config *config = NULL;
42
43/**
44 * Information we track per receiving address we have recently been
45 * in contact with (encryption to receiver).
46 */
47struct ReceiverAddress
48{
49 /**
50 * To whom are we talking to.
51 */
52 struct GNUNET_PeerIdentity target;
53
54 /**
55 * Address of the receiver in the human-readable format
56 * with the #COMMUNICATOR_ADDRESS_PREFIX.
57 */
58 char *foreign_addr;
59
60 /**
61 * Address of the other peer.
62 */
63 struct sockaddr *address;
64
65 /**
66 * Length of the address.
67 */
68 socklen_t address_len;
69
70 /**
71 * Default message queue we are providing for the #ch.
72 */
73 struct GNUNET_MQ_Handle *d_mq;
74
75 /**
76 * handle for default queue with the #ch.
77 */
78 struct GNUNET_TRANSPORT_QueueHandle *d_qh;
79
80 /**
81 * Timeout for this receiver address.
82 */
83 struct GNUNET_TIME_Absolute timeout;
84
85 /**
86 * MTU we allowed transport for this receiver's default queue.
87 */
88 size_t d_mtu;
89
90 /**
91 * Which network type does this queue use?
92 */
93 enum GNUNET_NetworkType nt;
94
95 /**
96 * receiver_destroy already called on receiver.
97 */
98 int receiver_destroy_called;
99
100 /**
101 * Entry in sender expiration heap.
102 */
103 struct GNUNET_CONTAINER_HeapNode *hn;
104};
105
106/**
107 * Receivers (map from peer identity to `struct ReceiverAddress`)
108 */
109static struct GNUNET_CONTAINER_MultiPeerMap *receivers;
110
111/**
112 * Expiration heap for senders (contains `struct SenderAddress`)
113 */
114static struct GNUNET_CONTAINER_Heap *senders_heap;
115
116/**
117 * Expiration heap for receivers (contains `struct ReceiverAddress`)
118 */
119static struct GNUNET_CONTAINER_Heap *receivers_heap;
120
121/**
122 * ID of timeout task
123 */
124static struct GNUNET_SCHEDULER_Task *timeout_task;
125
126/**
127 * Network scanner to determine network types.
128 */
129static struct GNUNET_NT_InterfaceScanner *is;
130
131/**
132 * For logging statistics.
133 */
134static struct GNUNET_STATISTICS_Handle *stats;
135
49/** 136/**
50 * QUIC connection object. A connection has a unique SCID/DCID pair. Here we store our SCID 137 * QUIC connection object. A connection has a unique SCID/DCID pair. Here we store our SCID
51 * (incoming packet DCID field == outgoing packet SCID field) for a given connection. This 138 * (incoming packet DCID field == outgoing packet SCID field) for a given connection. This
@@ -251,42 +338,46 @@ flush_egress (struct quic_conn *conn)
251 338
252 339
253/** 340/**
254 * Shutdown the QUIC communicator. 341 * Signature of functions implementing the sending functionality of a
342 * message queue.
255 * 343 *
256 * @param cls NULL (always) 344 * @param mq the message queue
345 * @param msg the message to send
346 * @param impl_state our `struct ReceiverAddress`
257 */ 347 */
258static void 348static void
259do_shutdown (void *cls) 349mq_send_d (struct GNUNET_MQ_Handle *mq,
350 const struct GNUNET_MessageHeader *msg,
351 void *impl_state)
260{ 352{
261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 353 struct ReceiverAddress *receiver = impl_state;
262 "do_shutdown\n"); 354 uint16_t msize = ntohs (msg->size);
263
264 GNUNET_CONTAINER_multihashmap_destroy (conn_map);
265 quiche_config_free (config);
266 355
267 if (NULL != read_task) 356 GNUNET_assert (mq == receiver->d_mq);
268 { 357 if (msize > receiver->d_mtu)
269 GNUNET_SCHEDULER_cancel (read_task);
270 read_task = NULL;
271 }
272 if (NULL != udp_sock)
273 {
274 GNUNET_break (GNUNET_OK ==
275 GNUNET_NETWORK_socket_close (udp_sock));
276 udp_sock = NULL;
277 }
278 if (NULL != ch)
279 { 358 {
280 GNUNET_TRANSPORT_communicator_disconnect (ch); 359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281 ch = NULL; 360 "msize: %u, mtu: %lu\n",
282 } 361 msize,
283 if (NULL != ah) 362 receiver->d_mtu);
284 { 363 GNUNET_break (0);
285 GNUNET_TRANSPORT_application_done (ah); 364 if (GNUNET_YES != receiver->receiver_destroy_called)
286 ah = NULL; 365 receiver_destroy (receiver);
366 return;
287 } 367 }
288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 368 reschedule_receiver_timeout (receiver);
289 "do_shutdown finished\n"); 369 // if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
370 // dgram,
371 // sizeof(dgram),
372 // receiver->address,
373 // receiver->address_len))
374 // GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
375 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376 // "Sending UDPBox with payload size %u, %u acks left\n",
377 // msize,
378 // receiver->acks_available);
379 // GNUNET_MQ_impl_send_continue (mq);
380 // return;
290} 381}
291 382
292 383
@@ -425,6 +516,251 @@ udp_address_to_sockaddr (const char *bindto, socklen_t *sock_len)
425} 516}
426 517
427 518
519/**
520 * Setup the MQ for the @a receiver. If a queue exists,
521 * the existing one is destroyed. Then the MTU is
522 * recalculated and a fresh queue is initialized.
523 *
524 * @param receiver receiver to setup MQ for
525 */
526static void
527setup_receiver_mq (struct ReceiverAddress *receiver)
528{
529 size_t base_mtu;
530
531 switch (receiver->address->sa_family)
532 {
533 case AF_INET:
534 base_mtu = 1480 /* Ethernet MTU, 1500 - Ethernet header - VLAN tag */
535 - sizeof(struct GNUNET_TUN_IPv4Header) /* 20 */
536 - sizeof(struct GNUNET_TUN_UdpHeader) /* 8 */;
537 break;
538
539 case AF_INET6:
540 base_mtu = 1280 /* Minimum MTU required by IPv6 */
541 - sizeof(struct GNUNET_TUN_IPv6Header) /* 40 */
542 - sizeof(struct GNUNET_TUN_UdpHeader) /* 8 */;
543 break;
544
545 default:
546 GNUNET_assert (0);
547 break;
548 }
549 /* MTU == base_mtu */
550 receiver->d_mtu = base_mtu;
551
552 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553 "Setting up MQs and QHs\n");
554 /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
555 1404 (IPv4 + Box) bytes, depending on circumstances... */
556 if (NULL == receiver->d_mq)
557 receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
558 &mq_destroy_d,
559 &mq_cancel,
560 receiver,
561 NULL,
562 &mq_error,
563 receiver);
564 receiver->d_qh =
565 GNUNET_TRANSPORT_communicator_mq_add (ch,
566 &receiver->target,
567 receiver->foreign_addr,
568 receiver->d_mtu,
569 GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
570 0, /* Priority */
571 receiver->nt,
572 GNUNET_TRANSPORT_CS_OUTBOUND,
573 receiver->d_mq);
574}
575
576
577/**
578 * Function called when the transport service has received a
579 * backchannel message for this communicator (!) via a different return
580 * path. Should be an acknowledgement.
581 *
582 * @param cls closure, NULL
583 * @param sender which peer sent the notification
584 * @param msg payload
585 */
586static void
587notify_cb (void *cls,
588 const struct GNUNET_PeerIdentity *sender,
589 const struct GNUNET_MessageHeader *msg)
590{
591 // const struct UDPAck *ack;
592
593 // (void) cls;
594 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595 // "Storing UDPAck received from backchannel from %s\n",
596 // GNUNET_i2s_full (sender));
597 // if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_COMMUNICATOR_UDP_ACK) ||
598 // (ntohs (msg->size) != sizeof(struct UDPAck)))
599 // {
600 // GNUNET_break_op (0);
601 // return;
602 // }
603 // ack = (const struct UDPAck *) msg;
604 // GNUNET_CONTAINER_multipeermap_get_multiple (receivers,
605 // sender,
606 // &handle_ack,
607 // (void *) ack);
608}
609
610
611/**
612 * Task run to check #receiver_heap and #sender_heap for timeouts.
613 *
614 * @param cls unused, NULL
615 */
616static void
617check_timeouts (void *cls)
618{
619 // struct GNUNET_TIME_Relative st;
620 // struct GNUNET_TIME_Relative rt;
621 // struct GNUNET_TIME_Relative delay;
622 // struct ReceiverAddress *receiver;
623 // struct SenderAddress *sender;
624
625 // (void) cls;
626 // timeout_task = NULL;
627 // rt = GNUNET_TIME_UNIT_FOREVER_REL;
628 // while (NULL != (receiver = GNUNET_CONTAINER_heap_peek (receivers_heap)))
629 // {
630 // /* if (GNUNET_YES != receiver->receiver_destroy_called) */
631 // /* { */
632 // rt = GNUNET_TIME_absolute_get_remaining (receiver->timeout);
633 // if (0 != rt.rel_value_us)
634 // break;
635 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636 // "Receiver timed out\n");
637 // receiver_destroy (receiver);
638 // // }
639 // }
640 // st = GNUNET_TIME_UNIT_FOREVER_REL;
641 // while (NULL != (sender = GNUNET_CONTAINER_heap_peek (senders_heap)))
642 // {
643 // if (GNUNET_YES != sender->sender_destroy_called)
644 // {
645 // st = GNUNET_TIME_absolute_get_remaining (sender->timeout);
646 // if (0 != st.rel_value_us)
647 // break;
648 // sender_destroy (sender);
649 // }
650 // }
651 // delay = GNUNET_TIME_relative_min (rt, st);
652 // if (delay.rel_value_us < GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
653 // timeout_task = GNUNET_SCHEDULER_add_delayed (delay, &check_timeouts, NULL);
654}
655
656
657/**
658 * Function called by the transport service to initialize a
659 * message queue given address information about another peer.
660 * If and when the communication channel is established, the
661 * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
662 * to notify the service that the channel is now up. It is
663 * the responsibility of the communicator to manage sane
664 * retries and timeouts for any @a peer/@a address combination
665 * provided by the transport service. Timeouts and retries
666 * do not need to be signalled to the transport service.
667 *
668 * @param cls closure
669 * @param peer identity of the other peer
670 * @param address where to send the message, human-readable
671 * communicator-specific format, 0-terminated, UTF-8
672 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is
673 * invalid
674 */
675static int
676mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address)
677{
678 struct ReceiverAddress *receiver;
679 const char *path;
680 struct sockaddr *in;
681 socklen_t in_len;
682
683 if (0 != strncmp (address,
684 COMMUNICATOR_ADDRESS_PREFIX "-",
685 strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
686 {
687 GNUNET_break_op (0);
688 return GNUNET_SYSERR;
689 }
690 path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
691 in = udp_address_to_sockaddr (path, &in_len);
692
693 receiver = GNUNET_new (struct ReceiverAddress);
694 receiver->address = in;
695 receiver->address_len = in_len;
696 receiver->target = *peer;
697 receiver->nt = GNUNET_NT_scanner_get_type (is, in, in_len);
698 (void) GNUNET_CONTAINER_multipeermap_put (
699 receivers,
700 &receiver->target,
701 receiver,
702 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704 "Added %s to receivers\n",
705 GNUNET_i2s_full (&receiver->target));
706 receiver->timeout =
707 GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
708 receiver->hn = GNUNET_CONTAINER_heap_insert (receivers_heap,
709 receiver,
710 receiver->timeout.abs_value_us);
711 GNUNET_STATISTICS_set (stats,
712 "# receivers active",
713 GNUNET_CONTAINER_multipeermap_size (receivers),
714 GNUNET_NO);
715 receiver->foreign_addr =
716 sockaddr_to_udpaddr_string (receiver->address, receiver->address_len);
717 setup_receiver_mq (receiver);
718 if (NULL == timeout_task)
719 timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts, NULL);
720 return GNUNET_OK;
721}
722
723
724/**
725 * Shutdown the QUIC communicator.
726 *
727 * @param cls NULL (always)
728 */
729static void
730do_shutdown (void *cls)
731{
732 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
733 "do_shutdown\n");
734
735 GNUNET_CONTAINER_multihashmap_destroy (conn_map);
736 quiche_config_free (config);
737
738 if (NULL != read_task)
739 {
740 GNUNET_SCHEDULER_cancel (read_task);
741 read_task = NULL;
742 }
743 if (NULL != udp_sock)
744 {
745 GNUNET_break (GNUNET_OK ==
746 GNUNET_NETWORK_socket_close (udp_sock));
747 udp_sock = NULL;
748 }
749 if (NULL != ch)
750 {
751 GNUNET_TRANSPORT_communicator_disconnect (ch);
752 ch = NULL;
753 }
754 if (NULL != ah)
755 {
756 GNUNET_TRANSPORT_application_done (ah);
757 ah = NULL;
758 }
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760 "do_shutdown finished\n");
761}
762
763
428static void 764static void
429sock_read (void *cls) 765sock_read (void *cls)
430{ 766{
@@ -704,19 +1040,19 @@ run (void *cls,
704 return; 1040 return;
705 } 1041 }
706 1042
707 if (GNUNET_OK != 1043 // if (GNUNET_OK !=
708 GNUNET_CONFIGURATION_get_value_time (cfg, 1044 // GNUNET_CONFIGURATION_get_value_time (cfg,
709 COMMUNICATOR_CONFIG_SECTION, 1045 // COMMUNICATOR_CONFIG_SECTION,
710 "REKEY_INTERVAL", 1046 // "REKEY_INTERVAL",
711 &rekey_interval)) 1047 // &rekey_interval))
712 rekey_interval = DEFAULT_REKEY_TIME_INTERVAL; 1048 // rekey_interval = DEFAULT_REKEY_TIME_INTERVAL;
713 1049
714 if (GNUNET_OK != 1050 // if (GNUNET_OK !=
715 GNUNET_CONFIGURATION_get_value_size (cfg, 1051 // GNUNET_CONFIGURATION_get_value_size (cfg,
716 COMMUNICATOR_CONFIG_SECTION, 1052 // COMMUNICATOR_CONFIG_SECTION,
717 "REKEY_MAX_BYTES", 1053 // "REKEY_MAX_BYTES",
718 &rekey_max_bytes)) 1054 // &rekey_max_bytes))
719 rekey_max_bytes = DEFAULT_REKEY_MAX_BYTES; 1055 // rekey_max_bytes = DEFAULT_REKEY_MAX_BYTES;
720 1056
721 in = udp_address_to_sockaddr (bindto, &in_len); 1057 in = udp_address_to_sockaddr (bindto, &in_len);
722 1058
@@ -792,20 +1128,20 @@ run (void *cls,
792 udp_sock, 1128 udp_sock,
793 &sock_read, 1129 &sock_read,
794 NULL); 1130 NULL);
795 // ch = GNUNET_TRANSPORT_communicator_connect (cfg, 1131 ch = GNUNET_TRANSPORT_communicator_connect (cfg,
796 // COMMUNICATOR_CONFIG_SECTION, 1132 COMMUNICATOR_CONFIG_SECTION,
797 // COMMUNICATOR_ADDRESS_PREFIX, 1133 COMMUNICATOR_ADDRESS_PREFIX,
798 // GNUNET_TRANSPORT_CC_UNRELIABLE, 1134 GNUNET_TRANSPORT_CC_RELIABLE,
799 // &mq_init, 1135 &mq_init,
800 // NULL, 1136 NULL,
801 // &enc_notify_cb, 1137 &notify_cb,
802 // NULL); 1138 NULL);
803 // if (NULL == ch) 1139 if (NULL == ch)
804 // { 1140 {
805 // GNUNET_break (0); 1141 GNUNET_break (0);
806 // GNUNET_SCHEDULER_shutdown (); 1142 GNUNET_SCHEDULER_shutdown ();
807 // return; 1143 return;
808 // } 1144 }
809 ah = GNUNET_TRANSPORT_application_init (cfg); 1145 ah = GNUNET_TRANSPORT_application_init (cfg);
810 if (NULL == ah) 1146 if (NULL == ah)
811 { 1147 {