aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormarshall <stmr@umich.edu>2023-07-12 14:21:57 -0400
committermarshall <stmr@umich.edu>2023-07-18 11:12:18 -0400
commitb37caeb2b2b453aa4cf8c704f2ce84eec4dbc170 (patch)
tree7bd98cd351101aa3d3a2f920158a5f2c5ac913bd
parentcdb725f662c8372de06b3a2bf46e783a5305ef6a (diff)
downloadgnunet-b37caeb2b2b453aa4cf8c704f2ce84eec4dbc170.tar.gz
gnunet-b37caeb2b2b453aa4cf8c704f2ce84eec4dbc170.zip
transport (quic): mq handling fixes
-rw-r--r--src/transport/gnunet-communicator-quic.c196
1 files changed, 180 insertions, 16 deletions
diff --git a/src/transport/gnunet-communicator-quic.c b/src/transport/gnunet-communicator-quic.c
index cdbd05477..ab1b7b20b 100644
--- a/src/transport/gnunet-communicator-quic.c
+++ b/src/transport/gnunet-communicator-quic.c
@@ -71,6 +71,11 @@ struct PeerAddress
71 socklen_t address_len; 71 socklen_t address_len;
72 72
73 /** 73 /**
74 * The QUIC connection associated with this peer
75 */
76 struct quic_conn *conn;
77
78 /**
74 * Default message queue we are providing for the #ch. 79 * Default message queue we are providing for the #ch.
75 */ 80 */
76 struct GNUNET_MQ_Handle *d_mq; 81 struct GNUNET_MQ_Handle *d_mq;
@@ -170,7 +175,7 @@ struct QUIC_header
170 * ASSUMES: connection is established to peer 175 * ASSUMES: connection is established to peer
171*/ 176*/
172static void 177static void
173recv_from_streams (quiche_conn *conn, char*stream_buf, size_t buf_size) 178recv_from_streams (quiche_conn *conn, char *stream_buf, size_t buf_size)
174{ 179{
175 uint64_t s = 0; 180 uint64_t s = 0;
176 quiche_stream_iter *readable; 181 quiche_stream_iter *readable;
@@ -337,6 +342,55 @@ flush_egress (struct quic_conn *conn)
337 342
338 343
339/** 344/**
345 * Increment receiver timeout due to activity.
346 *
347 * @param receiver address for which the timeout should be rescheduled
348 */
349static void
350reschedule_peer_timeout (struct PeerAddress *peer)
351{
352 peer->timeout =
353 GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
354 GNUNET_CONTAINER_heap_update_cost (peer->hn,
355 peer->timeout.abs_value_us);
356}
357
358
359/**
360 * Destroys a receiving state due to timeout or shutdown.
361 *
362 * @param receiver entity to close down
363 */
364static void
365peer_destroy (struct PeerAddress *peer)
366{
367
368 peer->peer_destroy_called = GNUNET_YES;
369
370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
371 "Disconnecting peer for peer `%s'\n",
372 GNUNET_i2s (&peer->target));
373 if (NULL != peer->d_qh)
374 {
375 GNUNET_TRANSPORT_communicator_mq_del (peer->d_qh);
376 peer->d_qh = NULL;
377 }
378 GNUNET_assert (GNUNET_YES ==
379 GNUNET_CONTAINER_multipeermap_remove (peers,
380 &peer->target,
381 peer));
382 GNUNET_assert (peer == GNUNET_CONTAINER_heap_remove_node (peer->hn));
383 GNUNET_STATISTICS_set (stats,
384 "# peers active",
385 GNUNET_CONTAINER_multipeermap_size (peers),
386 GNUNET_NO);
387 GNUNET_free (peer->address);
388 GNUNET_free (peer->foreign_addr);
389 GNUNET_free (peer);
390}
391
392
393/**
340 * Signature of functions implementing the sending functionality of a 394 * Signature of functions implementing the sending functionality of a
341 * message queue. 395 * message queue.
342 * 396 *
@@ -351,6 +405,14 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
351{ 405{
352 struct PeerAddress *peer = impl_state; 406 struct PeerAddress *peer = impl_state;
353 uint16_t msize = ntohs (msg->size); 407 uint16_t msize = ntohs (msg->size);
408 struct quic_conn *q_conn = peer->conn;
409
410 if (NULL == q_conn->conn)
411 {
412 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
413 "peer never established quic connection\n");
414 return;
415 }
354 416
355 GNUNET_assert (mq == peer->d_mq); 417 GNUNET_assert (mq == peer->d_mq);
356 if (msize > peer->d_mtu) 418 if (msize > peer->d_mtu)
@@ -365,6 +427,7 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
365 return; 427 return;
366 } 428 }
367 reschedule_peer_timeout (peer); 429 reschedule_peer_timeout (peer);
430
368 // if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, 431 // if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
369 // dgram, 432 // dgram,
370 // sizeof(dgram), 433 // sizeof(dgram),
@@ -381,6 +444,65 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
381 444
382 445
383/** 446/**
447 * Signature of functions implementing the destruction of a message
448 * queue. Implementations must not free @a mq, but should take care
449 * of @a impl_state.
450 *
451 * @param mq the message queue to destroy
452 * @param impl_state our `struct PeerAddress`
453 */
454static void
455mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
456{
457 struct PeerAddress *peer = impl_state;
458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459 "Default MQ destroyed\n");
460 if (mq == peer->d_mq)
461 {
462 peer->d_mq = NULL;
463 if (GNUNET_YES != peer->peer_destroy_called)
464 peer_destroy (peer);
465 }
466}
467
468
469/**
470 * Implementation function that cancels the currently sent message.
471 *
472 * @param mq message queue
473 * @param impl_state our `struct PeerAddress`
474 */
475static void
476mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
477{
478 /* Cancellation is impossible with QUIC; bail */
479 GNUNET_assert (0);
480}
481
482
483/**
484 * Generic error handler, called with the appropriate
485 * error code and the same closure specified at the creation of
486 * the message queue.
487 * Not every message queue implementation supports an error handler.
488 *
489 * @param cls our `struct ReceiverAddress`
490 * @param error error code
491 */
492static void
493mq_error (void *cls, enum GNUNET_MQ_Error error)
494{
495 struct PeerAddress *peer = cls;
496
497 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
498 "MQ error in queue to %s: %d\n",
499 GNUNET_i2s (&peer->target),
500 (int) error);
501 peer_destroy (peer);
502}
503
504
505/**
384 * Convert UDP bind specification to a `struct sockaddr *` 506 * Convert UDP bind specification to a `struct sockaddr *`
385 * 507 *
386 * @param bindto bind specification to convert 508 * @param bindto bind specification to convert
@@ -557,10 +679,10 @@ setup_peer_mq (struct PeerAddress *peer)
557 peer->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d, 679 peer->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
558 &mq_destroy_d, 680 &mq_destroy_d,
559 &mq_cancel, 681 &mq_cancel,
560 receiver, 682 peer,
561 NULL, 683 NULL,
562 &mq_error, 684 &mq_error,
563 receiver); 685 peer);
564 peer->d_qh = 686 peer->d_qh =
565 GNUNET_TRANSPORT_communicator_mq_add (ch, 687 GNUNET_TRANSPORT_communicator_mq_add (ch,
566 &peer->target, 688 &peer->target,
@@ -575,6 +697,44 @@ setup_peer_mq (struct PeerAddress *peer)
575 697
576 698
577/** 699/**
700 * Taken from: UDP communicator
701 * Converts @a address to the address string format used by this
702 * communicator in HELLOs.
703 *
704 * @param address the address to convert, must be AF_INET or AF_INET6.
705 * @param address_len number of bytes in @a address
706 * @return string representation of @a address
707 */
708static char *
709sockaddr_to_udpaddr_string (const struct sockaddr *address,
710 socklen_t address_len)
711{
712 char *ret;
713
714 switch (address->sa_family)
715 {
716 case AF_INET:
717 GNUNET_asprintf (&ret,
718 "%s-%s",
719 COMMUNICATOR_ADDRESS_PREFIX,
720 GNUNET_a2s (address, address_len));
721 break;
722
723 case AF_INET6:
724 GNUNET_asprintf (&ret,
725 "%s-%s",
726 COMMUNICATOR_ADDRESS_PREFIX,
727 GNUNET_a2s (address, address_len));
728 break;
729
730 default:
731 GNUNET_assert (0);
732 }
733 return ret;
734}
735
736
737/**
578 * Function called when the transport service has received a 738 * Function called when the transport service has received a
579 * backchannel message for this communicator (!) via a different return 739 * backchannel message for this communicator (!) via a different return
580 * path. Should be an acknowledgement. 740 * path. Should be an acknowledgement.
@@ -682,9 +842,9 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity *peer_id, const
682 socklen_t in_len; 842 socklen_t in_len;
683 uint8_t scid[LOCAL_CONN_ID_LEN]; 843 uint8_t scid[LOCAL_CONN_ID_LEN];
684 844
685 struct quic_conn *conn; 845 struct quic_conn *q_conn;
686 char *bindto; 846 char *bindto;
687 socklen_t in_len; 847 socklen_t local_in_len;
688 struct sockaddr *local_addr; 848 struct sockaddr *local_addr;
689 849
690 if (GNUNET_OK != 850 if (GNUNET_OK !=
@@ -696,9 +856,9 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity *peer_id, const
696 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 856 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
697 COMMUNICATOR_CONFIG_SECTION, 857 COMMUNICATOR_CONFIG_SECTION,
698 "BINDTO"); 858 "BINDTO");
699 return; 859 return GNUNET_SYSERR;
700 } 860 }
701 local_addr = udp_address_to_sockaddr (bindto, &in_len); 861 local_addr = udp_address_to_sockaddr (bindto, &local_in_len);
702 862
703 if (0 != strncmp (address, 863 if (0 != strncmp (address,
704 COMMUNICATOR_ADDRESS_PREFIX "-", 864 COMMUNICATOR_ADDRESS_PREFIX "-",
@@ -739,24 +899,27 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity *peer_id, const
739 */ 899 */
740 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_STRONG, scid, 900 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_STRONG, scid,
741 LOCAL_CONN_ID_LEN); 901 LOCAL_CONN_ID_LEN);
742 conn = GNUNET_new (struct quic_conn); 902 q_conn = GNUNET_new (struct quic_conn);
743 GNUNET_memcpy (conn->cid, scid, LOCAL_CONN_ID_LEN); 903 GNUNET_memcpy (q_conn->cid, scid, LOCAL_CONN_ID_LEN);
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 904 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745 "Attempting to perform handshake with peer\n"); 905 "Attempting to perform QUIC handshake with peer\n");
746 conn->conn = quiche_connect (peer->foreign_addr, scid, LOCAL_CONN_ID_LEN, 906 q_conn->conn = quiche_connect (peer->foreign_addr, scid, LOCAL_CONN_ID_LEN,
747 local_addr, 907 local_addr,
748 in_len, peer->address, peer->address_len, 908 local_in_len, peer->address, peer->address_len,
749 config); 909 config);
910
911 peer->conn = q_conn;
750 /** 912 /**
751 * Insert connection into hashmap 913 * Insert connection into hashmap
752 */ 914 */
753 struct GNUNET_HashCode key; 915 struct GNUNET_HashCode key;
754 GNUNET_CRYPTO_hash (conn->cid, LOCAL_CONN_ID_LEN, &key); 916 GNUNET_CRYPTO_hash (q_conn->cid, LOCAL_CONN_ID_LEN, &key);
755 GNUNET_CONTAINER_multihashmap_put (conn_map, &key, conn, 917 GNUNET_CONTAINER_multihashmap_put (conn_map, &key, q_conn,
756 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 918 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
757 setup_peer_mq (peer); 919 setup_peer_mq (peer);
758 if (NULL == timeout_task) 920 if (NULL == timeout_task)
759 timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts, NULL); 921 timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts, NULL);
922 GNUNET_free (local_addr);
760 return GNUNET_OK; 923 return GNUNET_OK;
761} 924}
762 925
@@ -1035,6 +1198,7 @@ sock_read (void *cls)
1035 quiche_conn_free (conn->conn); 1198 quiche_conn_free (conn->conn);
1036 GNUNET_free (conn); 1199 GNUNET_free (conn);
1037 } 1200 }
1201 GNUNET_free (local_addr);
1038} 1202}
1039 1203
1040 1204