diff options
author | Martin Schanzenbach <mschanzenbach@posteo.de> | 2020-06-01 16:39:35 +0200 |
---|---|---|
committer | Martin Schanzenbach <mschanzenbach@posteo.de> | 2020-06-01 16:39:35 +0200 |
commit | 198c09654354d09a9b33f27cf095e0295f70826c (patch) | |
tree | 07aa088c8e9664dc76915cc6b664654da59359f4 /src/transport | |
parent | a325c3eaa8450d325fe57959eac29da5496cfd6d (diff) | |
download | gnunet-198c09654354d09a9b33f27cf095e0295f70826c.tar.gz gnunet-198c09654354d09a9b33f27cf095e0295f70826c.zip |
tng: more UDP communicator backchannels
Added a new message for queue updates to indicate queue length.
Queues now may also have a priority parameter.
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-communicator-tcp.c | 2 | ||||
-rw-r--r-- | src/transport/gnunet-communicator-udp.c | 384 | ||||
-rw-r--r-- | src/transport/gnunet-communicator-unix.c | 2 | ||||
-rw-r--r-- | src/transport/test_communicator_basic.c | 45 | ||||
-rw-r--r-- | src/transport/transport-testing2.c | 126 | ||||
-rw-r--r-- | src/transport/transport-testing2.h | 7 | ||||
-rw-r--r-- | src/transport/transport.h | 60 | ||||
-rw-r--r-- | src/transport/transport_api2_communication.c | 77 |
8 files changed, 515 insertions, 188 deletions
diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c index bbfacbffd..7f70c55df 100644 --- a/src/transport/gnunet-communicator-tcp.c +++ b/src/transport/gnunet-communicator-tcp.c | |||
@@ -1547,6 +1547,8 @@ boot_queue (struct Queue *queue, enum GNUNET_TRANSPORT_ConnectionStatus cs) | |||
1547 | &queue->target, | 1547 | &queue->target, |
1548 | foreign_addr, | 1548 | foreign_addr, |
1549 | 0 /* no MTU */, | 1549 | 0 /* no MTU */, |
1550 | GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, | ||
1551 | 0, /* Priority */ | ||
1550 | queue->nt, | 1552 | queue->nt, |
1551 | cs, | 1553 | cs, |
1552 | queue->mq); | 1554 | queue->mq); |
diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c index 344ba5180..46d9766d0 100644 --- a/src/transport/gnunet-communicator-udp.c +++ b/src/transport/gnunet-communicator-udp.c | |||
@@ -549,14 +549,24 @@ struct ReceiverAddress | |||
549 | struct GNUNET_CONTAINER_HeapNode *hn; | 549 | struct GNUNET_CONTAINER_HeapNode *hn; |
550 | 550 | ||
551 | /** | 551 | /** |
552 | * Message queue we are providing for the #ch. | 552 | * KX message queue we are providing for the #ch. |
553 | */ | 553 | */ |
554 | struct GNUNET_MQ_Handle *mq; | 554 | struct GNUNET_MQ_Handle *kx_mq; |
555 | |||
556 | /** | ||
557 | * Default message queue we are providing for the #ch. | ||
558 | */ | ||
559 | struct GNUNET_MQ_Handle *d_mq; | ||
560 | |||
561 | /** | ||
562 | * handle for KX queue with the #ch. | ||
563 | */ | ||
564 | struct GNUNET_TRANSPORT_QueueHandle *kx_qh; | ||
555 | 565 | ||
556 | /** | 566 | /** |
557 | * handle for this queue with the #ch. | 567 | * handle for default queue with the #ch. |
558 | */ | 568 | */ |
559 | struct GNUNET_TRANSPORT_QueueHandle *qh; | 569 | struct GNUNET_TRANSPORT_QueueHandle *d_qh; |
560 | 570 | ||
561 | /** | 571 | /** |
562 | * Timeout for this receiver address. | 572 | * Timeout for this receiver address. |
@@ -564,9 +574,14 @@ struct ReceiverAddress | |||
564 | struct GNUNET_TIME_Absolute timeout; | 574 | struct GNUNET_TIME_Absolute timeout; |
565 | 575 | ||
566 | /** | 576 | /** |
567 | * MTU we allowed transport for this receiver right now. | 577 | * MTU we allowed transport for this receiver's KX queue. |
568 | */ | 578 | */ |
569 | size_t mtu; | 579 | size_t kx_mtu; |
580 | |||
581 | /** | ||
582 | * MTU we allowed transport for this receiver's default queue. | ||
583 | */ | ||
584 | size_t d_mtu; | ||
570 | 585 | ||
571 | /** | 586 | /** |
572 | * Length of the DLL at @a ss_head. | 587 | * Length of the DLL at @a ss_head. |
@@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver) | |||
786 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 801 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
787 | "Disconnecting receiver for peer `%s'\n", | 802 | "Disconnecting receiver for peer `%s'\n", |
788 | GNUNET_i2s (&receiver->target)); | 803 | GNUNET_i2s (&receiver->target)); |
789 | if (NULL != (mq = receiver->mq)) | 804 | if (NULL != (mq = receiver->kx_mq)) |
790 | { | 805 | { |
791 | receiver->mq = NULL; | 806 | receiver->kx_mq = NULL; |
792 | GNUNET_MQ_destroy (mq); | 807 | GNUNET_MQ_destroy (mq); |
793 | } | 808 | } |
794 | if (NULL != receiver->qh) | 809 | if (NULL != receiver->kx_qh) |
795 | { | 810 | { |
796 | GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); | 811 | GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh); |
797 | receiver->qh = NULL; | 812 | receiver->kx_qh = NULL; |
813 | } | ||
814 | if (NULL != (mq = receiver->d_mq)) | ||
815 | { | ||
816 | receiver->d_mq = NULL; | ||
817 | GNUNET_MQ_destroy (mq); | ||
818 | } | ||
819 | if (NULL != receiver->d_qh) | ||
820 | { | ||
821 | GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh); | ||
822 | receiver->d_qh = NULL; | ||
798 | } | 823 | } |
799 | GNUNET_assert (GNUNET_YES == | 824 | GNUNET_assert (GNUNET_YES == |
800 | GNUNET_CONTAINER_multipeermap_remove (receivers, | 825 | GNUNET_CONTAINER_multipeermap_remove (receivers, |
@@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value) | |||
1265 | (void) pid; | 1290 | (void) pid; |
1266 | for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next) | 1291 | for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next) |
1267 | { | 1292 | { |
1268 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1269 | "Checking shared secrets\n"); | ||
1270 | if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode))) | 1293 | if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode))) |
1271 | { | 1294 | { |
1272 | uint32_t allowed; | 1295 | uint32_t allowed; |
1273 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1296 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1274 | "Found matching mac\n"); | 1297 | "Found matching mac\n"); |
1275 | 1298 | ||
1276 | allowed = ntohl (ack->sequence_max); | 1299 | allowed = ntohl (ack->sequence_max); |
1277 | 1300 | ||
1278 | if (allowed > ss->sequence_allowed) | 1301 | if (allowed > ss->sequence_allowed) |
1279 | { | 1302 | { |
1280 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1303 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1281 | "%u > %u (%u)\n", allowed, ss->sequence_allowed, | 1304 | "%u > %u (%u)\n", allowed, ss->sequence_allowed, |
1282 | receiver->acks_available); | 1305 | receiver->acks_available); |
1283 | 1306 | ||
1284 | receiver->acks_available += (allowed - ss->sequence_allowed); | 1307 | receiver->acks_available += (allowed - ss->sequence_allowed); |
1285 | if ((allowed - ss->sequence_allowed) == receiver->acks_available) | 1308 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1286 | { | 1309 | "Tell transport we have more acks!\n"); |
1287 | /* we just incremented from zero => MTU change! */ | 1310 | GNUNET_TRANSPORT_communicator_mq_update (ch, |
1288 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1311 | receiver->d_qh, |
1289 | "we just incremented from zero => MTU change!\n"); | 1312 | (allowed - ss->sequence_allowed), |
1290 | //TODO setup_receiver_mq (receiver); | 1313 | 1); |
1291 | } | ||
1292 | ss->sequence_allowed = allowed; | 1314 | ss->sequence_allowed = allowed; |
1293 | /* move ss to head to avoid discarding it anytime soon! */ | 1315 | /* move ss to head to avoid discarding it anytime soon! */ |
1294 | GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss); | 1316 | GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss); |
@@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram, size_t pad_size) | |||
1906 | * @param impl_state our `struct ReceiverAddress` | 1928 | * @param impl_state our `struct ReceiverAddress` |
1907 | */ | 1929 | */ |
1908 | static void | 1930 | static void |
1909 | mq_send (struct GNUNET_MQ_Handle *mq, | 1931 | mq_send_kx (struct GNUNET_MQ_Handle *mq, |
1910 | const struct GNUNET_MessageHeader *msg, | 1932 | const struct GNUNET_MessageHeader *msg, |
1911 | void *impl_state) | 1933 | void *impl_state) |
1912 | { | 1934 | { |
1913 | struct ReceiverAddress *receiver = impl_state; | 1935 | struct ReceiverAddress *receiver = impl_state; |
1914 | uint16_t msize = ntohs (msg->size); | 1936 | uint16_t msize = ntohs (msg->size); |
1937 | struct UdpHandshakeSignature uhs; | ||
1938 | struct UDPConfirmation uc; | ||
1939 | struct InitialKX kx; | ||
1940 | struct GNUNET_CRYPTO_EcdhePrivateKey epriv; | ||
1941 | char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)]; | ||
1942 | size_t dpos; | ||
1943 | gcry_cipher_hd_t out_cipher; | ||
1944 | struct SharedSecret *ss; | ||
1945 | |||
1915 | 1946 | ||
1916 | GNUNET_assert (mq == receiver->mq); | 1947 | GNUNET_assert (mq == receiver->kx_mq); |
1917 | if (msize > receiver->mtu) | 1948 | if (msize > receiver->kx_mtu) |
1918 | { | 1949 | { |
1919 | GNUNET_break (0); | 1950 | GNUNET_break (0); |
1920 | receiver_destroy (receiver); | 1951 | receiver_destroy (receiver); |
@@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq, | |||
1922 | } | 1953 | } |
1923 | reschedule_receiver_timeout (receiver); | 1954 | reschedule_receiver_timeout (receiver); |
1924 | 1955 | ||
1925 | if (0 == receiver->acks_available) | 1956 | /* setup key material */ |
1957 | GNUNET_CRYPTO_ecdhe_key_create (&epriv); | ||
1958 | |||
1959 | ss = setup_shared_secret_enc (&epriv, receiver); | ||
1960 | setup_cipher (&ss->master, 0, &out_cipher); | ||
1961 | /* compute 'uc' */ | ||
1962 | uc.sender = my_identity; | ||
1963 | uc.monotonic_time = | ||
1964 | GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); | ||
1965 | uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE); | ||
1966 | uhs.purpose.size = htonl (sizeof(uhs)); | ||
1967 | uhs.sender = my_identity; | ||
1968 | uhs.receiver = receiver->target; | ||
1969 | GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral); | ||
1970 | uhs.monotonic_time = uc.monotonic_time; | ||
1971 | GNUNET_CRYPTO_eddsa_sign (my_private_key, | ||
1972 | &uhs, | ||
1973 | &uc.sender_sig); | ||
1974 | /* Leave space for kx */ | ||
1975 | dpos = sizeof(kx); | ||
1976 | /* Append encrypted uc to dgram */ | ||
1977 | GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher, | ||
1978 | &dgram[dpos], | ||
1979 | sizeof(uc), | ||
1980 | &uc, | ||
1981 | sizeof(uc))); | ||
1982 | dpos += sizeof(uc); | ||
1983 | /* Append encrypted payload to dgram */ | ||
1984 | GNUNET_assert ( | ||
1985 | 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); | ||
1986 | dpos += msize; | ||
1987 | do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); | ||
1988 | /* Datagram starts with kx */ | ||
1989 | kx.ephemeral = uhs.ephemeral; | ||
1990 | GNUNET_assert ( | ||
1991 | 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag))); | ||
1992 | gcry_cipher_close (out_cipher); | ||
1993 | memcpy (dgram, &kx, sizeof(kx)); | ||
1994 | if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, | ||
1995 | dgram, | ||
1996 | sizeof(dgram), | ||
1997 | receiver->address, | ||
1998 | receiver->address_len)) | ||
1999 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); | ||
2000 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2001 | "Sending KX to %s\n", GNUNET_a2s (receiver->address, | ||
2002 | receiver->address_len)); | ||
2003 | GNUNET_MQ_impl_send_continue (mq); | ||
2004 | } | ||
2005 | |||
2006 | |||
2007 | /** | ||
2008 | * Signature of functions implementing the sending functionality of a | ||
2009 | * message queue. | ||
2010 | * | ||
2011 | * @param mq the message queue | ||
2012 | * @param msg the message to send | ||
2013 | * @param impl_state our `struct ReceiverAddress` | ||
2014 | */ | ||
2015 | static void | ||
2016 | mq_send_d (struct GNUNET_MQ_Handle *mq, | ||
2017 | const struct GNUNET_MessageHeader *msg, | ||
2018 | void *impl_state) | ||
2019 | { | ||
2020 | struct ReceiverAddress *receiver = impl_state; | ||
2021 | uint16_t msize = ntohs (msg->size); | ||
2022 | |||
2023 | GNUNET_assert (mq == receiver->d_mq); | ||
2024 | if ((msize > receiver->d_mtu) || | ||
2025 | (0 == receiver->acks_available)) | ||
1926 | { | 2026 | { |
1927 | /* use KX encryption method */ | 2027 | GNUNET_break (0); |
1928 | struct UdpHandshakeSignature uhs; | 2028 | receiver_destroy (receiver); |
1929 | struct UDPConfirmation uc; | 2029 | return; |
1930 | struct InitialKX kx; | 2030 | } |
1931 | struct GNUNET_CRYPTO_EcdhePrivateKey epriv; | 2031 | reschedule_receiver_timeout (receiver); |
1932 | char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)]; | ||
1933 | size_t dpos; | ||
1934 | gcry_cipher_hd_t out_cipher; | ||
1935 | struct SharedSecret *ss; | ||
1936 | 2032 | ||
1937 | /* setup key material */ | 2033 | /* begin "BOX" encryption method, scan for ACKs from tail! */ |
1938 | GNUNET_CRYPTO_ecdhe_key_create (&epriv); | 2034 | for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev) |
2035 | { | ||
2036 | if (ss->sequence_used >= ss->sequence_allowed) | ||
2037 | { | ||
2038 | continue; | ||
2039 | } | ||
2040 | char dgram[sizeof(struct UDPBox) + receiver->d_mtu]; | ||
2041 | struct UDPBox *box; | ||
2042 | gcry_cipher_hd_t out_cipher; | ||
2043 | size_t dpos; | ||
1939 | 2044 | ||
1940 | ss = setup_shared_secret_enc (&epriv, receiver); | 2045 | box = (struct UDPBox *) dgram; |
1941 | setup_cipher (&ss->master, 0, &out_cipher); | 2046 | ss->sequence_used++; |
1942 | /* compute 'uc' */ | 2047 | get_kid (&ss->master, ss->sequence_used, &box->kid); |
1943 | uc.sender = my_identity; | 2048 | setup_cipher (&ss->master, ss->sequence_used, &out_cipher); |
1944 | uc.monotonic_time = | ||
1945 | GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); | ||
1946 | uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE); | ||
1947 | uhs.purpose.size = htonl (sizeof(uhs)); | ||
1948 | uhs.sender = my_identity; | ||
1949 | uhs.receiver = receiver->target; | ||
1950 | GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral); | ||
1951 | uhs.monotonic_time = uc.monotonic_time; | ||
1952 | GNUNET_CRYPTO_eddsa_sign (my_private_key, | ||
1953 | &uhs, | ||
1954 | &uc.sender_sig); | ||
1955 | /* Leave space for kx */ | ||
1956 | dpos = sizeof(kx); | ||
1957 | /* Append encrypted uc to dgram */ | ||
1958 | GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher, | ||
1959 | &dgram[dpos], | ||
1960 | sizeof(uc), | ||
1961 | &uc, | ||
1962 | sizeof(uc))); | ||
1963 | dpos += sizeof(uc); | ||
1964 | /* Append encrypted payload to dgram */ | 2049 | /* Append encrypted payload to dgram */ |
2050 | dpos = sizeof(struct UDPBox); | ||
1965 | GNUNET_assert ( | 2051 | GNUNET_assert ( |
1966 | 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); | 2052 | 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); |
1967 | dpos += msize; | 2053 | dpos += msize; |
1968 | do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); | 2054 | do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); |
1969 | /* Datagram starts with kx */ | 2055 | GNUNET_assert (0 == gcry_cipher_gettag (out_cipher, |
1970 | kx.ephemeral = uhs.ephemeral; | 2056 | box->gcm_tag, |
1971 | GNUNET_assert ( | 2057 | sizeof(box->gcm_tag))); |
1972 | 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag))); | ||
1973 | gcry_cipher_close (out_cipher); | 2058 | gcry_cipher_close (out_cipher); |
1974 | memcpy (dgram, &kx, sizeof(kx)); | ||
1975 | if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, | 2059 | if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, |
1976 | dgram, | 2060 | dgram, |
1977 | sizeof(dgram), | 2061 | sizeof(dgram), |
1978 | receiver->address, | 2062 | receiver->address, |
1979 | receiver->address_len)) | 2063 | receiver->address_len)) |
1980 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); | 2064 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); |
1981 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1982 | "Sending KX to %s\n", GNUNET_a2s (receiver->address, | ||
1983 | receiver->address_len)); | ||
1984 | GNUNET_MQ_impl_send_continue (mq); | 2065 | GNUNET_MQ_impl_send_continue (mq); |
1985 | return; | 2066 | receiver->acks_available--; |
1986 | } /* End of KX encryption method */ | 2067 | if (0 == receiver->acks_available) |
1987 | |||
1988 | /* begin "BOX" encryption method, scan for ACKs from tail! */ | ||
1989 | for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev) | ||
1990 | { | ||
1991 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1992 | "In non-kx mode...\n"); | ||
1993 | if (ss->sequence_used < ss->sequence_allowed) | ||
1994 | { | 2068 | { |
1995 | char dgram[sizeof(struct UDPBox) + receiver->mtu]; | 2069 | /* We have no more ACKs */ |
1996 | struct UDPBox *box; | ||
1997 | gcry_cipher_hd_t out_cipher; | ||
1998 | size_t dpos; | ||
1999 | |||
2000 | box = (struct UDPBox *) dgram; | ||
2001 | ss->sequence_used++; | ||
2002 | get_kid (&ss->master, ss->sequence_used, &box->kid); | ||
2003 | setup_cipher (&ss->master, ss->sequence_used, &out_cipher); | ||
2004 | /* Append encrypted payload to dgram */ | ||
2005 | dpos = sizeof(struct UDPBox); | ||
2006 | GNUNET_assert ( | ||
2007 | 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); | ||
2008 | dpos += msize; | ||
2009 | do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); | ||
2010 | GNUNET_assert (0 == gcry_cipher_gettag (out_cipher, | ||
2011 | box->gcm_tag, | ||
2012 | sizeof(box->gcm_tag))); | ||
2013 | gcry_cipher_close (out_cipher); | ||
2014 | if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, | ||
2015 | dgram, | ||
2016 | sizeof(dgram), | ||
2017 | receiver->address, | ||
2018 | receiver->address_len)) | ||
2019 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); | ||
2020 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2070 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2021 | "Sending data\n"); | 2071 | "No more acks\n"); |
2022 | |||
2023 | GNUNET_MQ_impl_send_continue (mq); | ||
2024 | receiver->acks_available--; | ||
2025 | if (0 == receiver->acks_available) | ||
2026 | { | ||
2027 | /* We have no more ACKs => MTU change! */ | ||
2028 | setup_receiver_mq (receiver); | ||
2029 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2030 | "No more acks, MTU changed\n"); | ||
2031 | } | ||
2032 | return; | ||
2033 | } | 2072 | } |
2034 | } | 2073 | } |
2035 | GNUNET_assert (0); | ||
2036 | } | 2074 | } |
2037 | 2075 | ||
2038 | 2076 | ||
@@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq, | |||
2045 | * @param impl_state our `struct ReceiverAddress` | 2083 | * @param impl_state our `struct ReceiverAddress` |
2046 | */ | 2084 | */ |
2047 | static void | 2085 | static void |
2048 | mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) | 2086 | mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state) |
2049 | { | 2087 | { |
2050 | struct ReceiverAddress *receiver = impl_state; | 2088 | struct ReceiverAddress *receiver = impl_state; |
2051 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2089 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2052 | "MQ destroyed\n"); | 2090 | "Default MQ destroyed\n"); |
2053 | if (mq == receiver->mq) | 2091 | if (mq == receiver->d_mq) |
2054 | { | 2092 | { |
2055 | receiver->mq = NULL; | 2093 | receiver->d_mq = NULL; |
2056 | //receiver_destroy (receiver); | 2094 | receiver_destroy (receiver); |
2095 | } | ||
2096 | } | ||
2097 | |||
2098 | |||
2099 | /** | ||
2100 | * Signature of functions implementing the destruction of a message | ||
2101 | * queue. Implementations must not free @a mq, but should take care | ||
2102 | * of @a impl_state. | ||
2103 | * | ||
2104 | * @param mq the message queue to destroy | ||
2105 | * @param impl_state our `struct ReceiverAddress` | ||
2106 | */ | ||
2107 | static void | ||
2108 | mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state) | ||
2109 | { | ||
2110 | struct ReceiverAddress *receiver = impl_state; | ||
2111 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2112 | "KX MQ destroyed\n"); | ||
2113 | if (mq == receiver->kx_mq) | ||
2114 | { | ||
2115 | receiver->kx_mq = NULL; | ||
2116 | receiver_destroy (receiver); | ||
2057 | } | 2117 | } |
2058 | } | 2118 | } |
2059 | 2119 | ||
@@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver) | |||
2106 | { | 2166 | { |
2107 | size_t base_mtu; | 2167 | size_t base_mtu; |
2108 | 2168 | ||
2109 | if (NULL != receiver->qh) | 2169 | /*if (NULL != receiver->kx_qh) |
2110 | { | 2170 | { |
2111 | GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); | 2171 | GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh); |
2112 | receiver->qh = NULL; | 2172 | receiver->kx_qh = NULL; |
2113 | } | 2173 | } |
2114 | //GNUNET_assert (NULL == receiver->mq); | 2174 | if (NULL != receiver->d_qh) |
2175 | { | ||
2176 | GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh); | ||
2177 | receiver->d_qh = NULL; | ||
2178 | }*/ | ||
2179 | // GNUNET_assert (NULL == receiver->mq); | ||
2115 | switch (receiver->address->sa_family) | 2180 | switch (receiver->address->sa_family) |
2116 | { | 2181 | { |
2117 | case AF_INET: | 2182 | case AF_INET: |
@@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver) | |||
2130 | GNUNET_assert (0); | 2195 | GNUNET_assert (0); |
2131 | break; | 2196 | break; |
2132 | } | 2197 | } |
2133 | if (0 == receiver->acks_available) | 2198 | /* MTU based on full KX messages */ |
2134 | { | 2199 | receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ |
2135 | /* MTU based on full KX messages */ | 2200 | - sizeof(struct UDPConfirmation); /* 104 */ |
2136 | receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ | 2201 | /* MTU based on BOXed messages */ |
2137 | - sizeof(struct UDPConfirmation); /* 104 */ | 2202 | receiver->d_mtu = base_mtu - sizeof(struct UDPBox); |
2138 | } | 2203 | |
2139 | else | 2204 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2140 | { | 2205 | "Setting up MQs and QHs\n"); |
2141 | /* MTU based on BOXed messages */ | ||
2142 | receiver->mtu = base_mtu - sizeof(struct UDPBox); | ||
2143 | } | ||
2144 | /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to | 2206 | /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to |
2145 | 1404 (IPv4 + Box) bytes, depending on circumstances... */ | 2207 | 1404 (IPv4 + Box) bytes, depending on circumstances... */ |
2146 | if (NULL == receiver->mq) | 2208 | if (NULL == receiver->kx_mq) |
2147 | receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send, | 2209 | receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx, |
2148 | &mq_destroy, | 2210 | &mq_destroy_kx, |
2149 | &mq_cancel, | 2211 | &mq_cancel, |
2150 | receiver, | 2212 | receiver, |
2151 | NULL, | 2213 | NULL, |
2152 | &mq_error, | 2214 | &mq_error, |
2153 | receiver); | 2215 | receiver); |
2154 | receiver->qh = | 2216 | if (NULL == receiver->d_mq) |
2217 | receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d, | ||
2218 | &mq_destroy_d, | ||
2219 | &mq_cancel, | ||
2220 | receiver, | ||
2221 | NULL, | ||
2222 | &mq_error, | ||
2223 | receiver); | ||
2224 | |||
2225 | receiver->kx_qh = | ||
2155 | GNUNET_TRANSPORT_communicator_mq_add (ch, | 2226 | GNUNET_TRANSPORT_communicator_mq_add (ch, |
2156 | &receiver->target, | 2227 | &receiver->target, |
2157 | receiver->foreign_addr, | 2228 | receiver->foreign_addr, |
2158 | receiver->mtu, | 2229 | receiver->kx_mtu, |
2230 | GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, | ||
2231 | 0, /* Priority */ | ||
2159 | receiver->nt, | 2232 | receiver->nt, |
2160 | GNUNET_TRANSPORT_CS_OUTBOUND, | 2233 | GNUNET_TRANSPORT_CS_OUTBOUND, |
2161 | receiver->mq); | 2234 | receiver->kx_mq); |
2235 | receiver->d_qh = | ||
2236 | GNUNET_TRANSPORT_communicator_mq_add (ch, | ||
2237 | &receiver->target, | ||
2238 | receiver->foreign_addr, | ||
2239 | receiver->d_mtu, | ||
2240 | 0, /* Initialize with 0 acks */ | ||
2241 | 1, /* Priority */ | ||
2242 | receiver->nt, | ||
2243 | GNUNET_TRANSPORT_CS_OUTBOUND, | ||
2244 | receiver->d_mq); | ||
2245 | |||
2162 | } | 2246 | } |
2163 | 2247 | ||
2164 | 2248 | ||
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index 31d2e4ed3..27dda7281 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c | |||
@@ -670,6 +670,8 @@ setup_queue (const struct GNUNET_PeerIdentity *target, | |||
670 | &queue->target, | 670 | &queue->target, |
671 | foreign_addr, | 671 | foreign_addr, |
672 | UNIX_MTU, | 672 | UNIX_MTU, |
673 | GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED, | ||
674 | 0, | ||
673 | GNUNET_NT_LOOPBACK, | 675 | GNUNET_NT_LOOPBACK, |
674 | cs, | 676 | cs, |
675 | queue->mq); | 677 | queue->mq); |
diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c index 1dfcf2371..1ea79fa19 100644 --- a/src/transport/test_communicator_basic.c +++ b/src/transport/test_communicator_basic.c | |||
@@ -58,19 +58,21 @@ static char *cfg_peers_name[NUM_PEERS]; | |||
58 | 58 | ||
59 | static int ret; | 59 | static int ret; |
60 | 60 | ||
61 | static size_t long_message_size; | ||
62 | |||
61 | static struct GNUNET_TIME_Absolute start_short; | 63 | static struct GNUNET_TIME_Absolute start_short; |
62 | 64 | ||
63 | static struct GNUNET_TIME_Absolute start_long; | 65 | static struct GNUNET_TIME_Absolute start_long; |
64 | 66 | ||
65 | static struct GNUNET_TIME_Absolute timeout; | 67 | static struct GNUNET_TIME_Absolute timeout; |
66 | 68 | ||
67 | static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc; | 69 | static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc; |
68 | 70 | ||
69 | #define SHORT_MESSAGE_SIZE 128 | 71 | #define SHORT_MESSAGE_SIZE 128 |
70 | 72 | ||
71 | #define LONG_MESSAGE_SIZE 32000 | 73 | #define LONG_MESSAGE_SIZE 32000 /* FIXME */ |
72 | 74 | ||
73 | #define BURST_PACKETS 50 | 75 | #define BURST_PACKETS 500 |
74 | 76 | ||
75 | #define TOTAL_ITERATIONS 1 | 77 | #define TOTAL_ITERATIONS 1 |
76 | 78 | ||
@@ -88,6 +90,7 @@ static unsigned int iterations_left = TOTAL_ITERATIONS; | |||
88 | 90 | ||
89 | enum TestPhase | 91 | enum TestPhase |
90 | { | 92 | { |
93 | TP_INIT, | ||
91 | TP_BURST_SHORT, | 94 | TP_BURST_SHORT, |
92 | TP_BURST_LONG, | 95 | TP_BURST_LONG, |
93 | TP_SIZE_CHECK | 96 | TP_SIZE_CHECK |
@@ -230,15 +233,18 @@ static void | |||
230 | size_test (void *cls) | 233 | size_test (void *cls) |
231 | { | 234 | { |
232 | char *payload; | 235 | char *payload; |
236 | size_t max_size = 64000; | ||
233 | 237 | ||
234 | GNUNET_assert (TP_SIZE_CHECK == phase); | 238 | GNUNET_assert (TP_SIZE_CHECK == phase); |
235 | if (ack >= 64000) | 239 | if (LONG_MESSAGE_SIZE != long_message_size) |
240 | max_size = long_message_size; | ||
241 | if (ack >= max_size) | ||
236 | return; /* Leave some room for our protocol, so not 2^16 exactly */ | 242 | return; /* Leave some room for our protocol, so not 2^16 exactly */ |
237 | payload = make_payload (ack); | 243 | payload = make_payload (ack); |
238 | ack += 5; | 244 | ack += 5; |
239 | num_sent++; | 245 | num_sent++; |
240 | GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, | 246 | GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, |
241 | (ack < 64000) | 247 | (ack < max_size) |
242 | ? &size_test | 248 | ? &size_test |
243 | : NULL, | 249 | : NULL, |
244 | NULL, | 250 | NULL, |
@@ -254,7 +260,7 @@ long_test (void *cls) | |||
254 | { | 260 | { |
255 | char *payload; | 261 | char *payload; |
256 | 262 | ||
257 | payload = make_payload (LONG_MESSAGE_SIZE); | 263 | payload = make_payload (long_message_size); |
258 | num_sent++; | 264 | num_sent++; |
259 | GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, | 265 | GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, |
260 | (BURST_PACKETS == | 266 | (BURST_PACKETS == |
@@ -263,7 +269,7 @@ long_test (void *cls) | |||
263 | : &long_test, | 269 | : &long_test, |
264 | NULL, | 270 | NULL, |
265 | payload, | 271 | payload, |
266 | LONG_MESSAGE_SIZE); | 272 | long_message_size); |
267 | GNUNET_free (payload); | 273 | GNUNET_free (payload); |
268 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); | 274 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); |
269 | } | 275 | } |
@@ -288,6 +294,7 @@ short_test (void *cls) | |||
288 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); | 294 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); |
289 | } | 295 | } |
290 | 296 | ||
297 | |||
291 | static int test_prepared = GNUNET_NO; | 298 | static int test_prepared = GNUNET_NO; |
292 | 299 | ||
293 | /** | 300 | /** |
@@ -316,7 +323,6 @@ prepare_test (void *cls) | |||
316 | } | 323 | } |
317 | 324 | ||
318 | 325 | ||
319 | |||
320 | /** | 326 | /** |
321 | * @brief Handle opening of queue | 327 | * @brief Handle opening of queue |
322 | * | 328 | * |
@@ -332,18 +338,25 @@ static void | |||
332 | add_queue_cb (void *cls, | 338 | add_queue_cb (void *cls, |
333 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, | 339 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, |
334 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue * | 340 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue * |
335 | tc_queue) | 341 | tc_queue, |
342 | size_t mtu) | ||
336 | { | 343 | { |
344 | if (TP_INIT != phase) | ||
345 | return; | ||
337 | if (0 != strcmp ((char*) cls, cfg_peers_name[0])) | 346 | if (0 != strcmp ((char*) cls, cfg_peers_name[0])) |
338 | return; // TODO? | 347 | return; // TODO? |
339 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 348 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
340 | "Queue established, starting test...\n"); | 349 | "Queue established, starting test...\n"); |
341 | start_short = GNUNET_TIME_absolute_get (); | 350 | start_short = GNUNET_TIME_absolute_get (); |
342 | my_tc = tc_queue; | 351 | my_tc = tc_h; |
352 | if (0 != mtu) | ||
353 | long_message_size = mtu; | ||
354 | else | ||
355 | long_message_size = LONG_MESSAGE_SIZE; | ||
343 | phase = TP_BURST_SHORT; | 356 | phase = TP_BURST_SHORT; |
344 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); | 357 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES); |
345 | GNUNET_assert (NULL == to_task); | 358 | GNUNET_assert (NULL == to_task); |
346 | to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | 359 | to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, |
347 | &latency_timeout, | 360 | &latency_timeout, |
348 | NULL); | 361 | NULL); |
349 | prepare_test (NULL); | 362 | prepare_test (NULL); |
@@ -395,6 +408,9 @@ incoming_message_cb (void *cls, | |||
395 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); | 408 | timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); |
396 | switch (phase) | 409 | switch (phase) |
397 | { | 410 | { |
411 | case TP_INIT: | ||
412 | GNUNET_break (0); | ||
413 | break; | ||
398 | case TP_BURST_SHORT: | 414 | case TP_BURST_SHORT: |
399 | { | 415 | { |
400 | GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len); | 416 | GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len); |
@@ -428,7 +444,7 @@ incoming_message_cb (void *cls, | |||
428 | } | 444 | } |
429 | case TP_BURST_LONG: | 445 | case TP_BURST_LONG: |
430 | { | 446 | { |
431 | if (LONG_MESSAGE_SIZE != payload_len) | 447 | if (long_message_size != payload_len) |
432 | { | 448 | { |
433 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 449 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
434 | "Ignoring packet with wrong length\n"); | 450 | "Ignoring packet with wrong length\n"); |
@@ -441,7 +457,7 @@ incoming_message_cb (void *cls, | |||
441 | { | 457 | { |
442 | GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, | 458 | GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, |
443 | "Long size packet test done.\n"); | 459 | "Long size packet test done.\n"); |
444 | char *goodput = GNUNET_STRINGS_byte_size_fancy ((LONG_MESSAGE_SIZE | 460 | char *goodput = GNUNET_STRINGS_byte_size_fancy ((long_message_size |
445 | * num_received * 1000 | 461 | * num_received * 1000 |
446 | * 1000) | 462 | * 1000) |
447 | / duration.rel_value_us); | 463 | / duration.rel_value_us); |
@@ -553,6 +569,7 @@ main (int argc, | |||
553 | char *test_name; | 569 | char *test_name; |
554 | char *cfg_peer; | 570 | char *cfg_peer; |
555 | 571 | ||
572 | phase = TP_INIT; | ||
556 | ret = 1; | 573 | ret = 1; |
557 | test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]); | 574 | test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]); |
558 | communicator_name = strchr (test_name, '-'); | 575 | communicator_name = strchr (test_name, '-'); |
diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index fc6d13590..8250027f7 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c | |||
@@ -33,7 +33,7 @@ | |||
33 | #include "gnunet_hello_lib.h" | 33 | #include "gnunet_hello_lib.h" |
34 | #include "gnunet_signatures.h" | 34 | #include "gnunet_signatures.h" |
35 | #include "transport.h" | 35 | #include "transport.h" |
36 | 36 | #include <inttypes.h> | |
37 | 37 | ||
38 | #define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__) | 38 | #define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__) |
39 | 39 | ||
@@ -227,11 +227,21 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue | |||
227 | uint32_t nt; | 227 | uint32_t nt; |
228 | 228 | ||
229 | /** | 229 | /** |
230 | * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. | 230 | * Maximum transmission unit. UINT32_MAX for unlimited. |
231 | */ | 231 | */ |
232 | uint32_t mtu; | 232 | uint32_t mtu; |
233 | 233 | ||
234 | /** | 234 | /** |
235 | * Queue length. UINT64_MAX for unlimited. | ||
236 | */ | ||
237 | uint64_t q_len; | ||
238 | |||
239 | /** | ||
240 | * Queue prio | ||
241 | */ | ||
242 | uint32_t priority; | ||
243 | |||
244 | /** | ||
235 | * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. | 245 | * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. |
236 | */ | 246 | */ |
237 | uint32_t cs; | 247 | uint32_t cs; |
@@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls, | |||
370 | struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; | 380 | struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; |
371 | struct GNUNET_MQ_Envelope *env; | 381 | struct GNUNET_MQ_Envelope *env; |
372 | 382 | ||
373 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 383 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
374 | "Received backchannel message\n"); | 384 | "Received backchannel message\n"); |
375 | if (tc_h->bc_enabled != GNUNET_YES) | 385 | if (tc_h->bc_enabled != GNUNET_YES) |
376 | { | 386 | { |
377 | GNUNET_SERVICE_client_continue (client->client); | 387 | GNUNET_SERVICE_client_continue (client->client); |
@@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls, | |||
379 | } | 389 | } |
380 | /* Find client providing this communicator */ | 390 | /* Find client providing this communicator */ |
381 | /* Finally, deliver backchannel message to communicator */ | 391 | /* Finally, deliver backchannel message to communicator */ |
382 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 392 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
383 | "Delivering backchannel message of type %u to %s\n", | 393 | "Delivering backchannel message of type %u to %s\n", |
384 | ntohs (msg->type), | 394 | ntohs (msg->type), |
385 | target_communicator); | 395 | target_communicator); |
386 | other_tc_h = tc_h->bc_cb (tc_h, msg, (struct | 396 | other_tc_h = tc_h->bc_cb (tc_h, msg, (struct |
387 | GNUNET_PeerIdentity*) &bc_msg->pid); | 397 | GNUNET_PeerIdentity*) &bc_msg->pid); |
388 | env = GNUNET_MQ_msg_extra ( | 398 | env = GNUNET_MQ_msg_extra ( |
@@ -496,9 +506,6 @@ handle_incoming_msg (void *cls, | |||
496 | msg = (struct GNUNET_MessageHeader *) &inc_msg[1]; | 506 | msg = (struct GNUNET_MessageHeader *) &inc_msg[1]; |
497 | size_t payload_len = ntohs (msg->size) - sizeof (struct | 507 | size_t payload_len = ntohs (msg->size) - sizeof (struct |
498 | GNUNET_MessageHeader); | 508 | GNUNET_MessageHeader); |
499 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
500 | "Incoming message from communicator!\n"); | ||
501 | |||
502 | if (NULL != tc_h->incoming_msg_cb) | 509 | if (NULL != tc_h->incoming_msg_cb) |
503 | { | 510 | { |
504 | tc_h->incoming_msg_cb (tc_h->cb_cls, | 511 | tc_h->incoming_msg_cb (tc_h->cb_cls, |
@@ -608,15 +615,14 @@ handle_add_queue_message (void *cls, | |||
608 | client->tc; | 615 | client->tc; |
609 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; | 616 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; |
610 | 617 | ||
611 | tc_queue = tc_h->queue_head; | 618 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
612 | if (NULL != tc_queue) | 619 | "Got queue with ID %u\n", msg->qid); |
620 | for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next) | ||
613 | { | 621 | { |
614 | while (tc_queue->qid != msg->qid) | 622 | if (tc_queue->qid == msg->qid) |
615 | { | 623 | break; |
616 | tc_queue = tc_queue->next; | ||
617 | } | ||
618 | } | 624 | } |
619 | else | 625 | if (NULL == tc_queue) |
620 | { | 626 | { |
621 | tc_queue = | 627 | tc_queue = |
622 | GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue); | 628 | GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue); |
@@ -628,17 +634,59 @@ handle_add_queue_message (void *cls, | |||
628 | GNUNET_assert (tc_queue->qid == msg->qid); | 634 | GNUNET_assert (tc_queue->qid == msg->qid); |
629 | GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); | 635 | GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); |
630 | tc_queue->nt = msg->nt; | 636 | tc_queue->nt = msg->nt; |
631 | tc_queue->mtu = msg->mtu; | 637 | tc_queue->mtu = ntohl (msg->mtu); |
632 | tc_queue->cs = msg->cs; | 638 | tc_queue->cs = msg->cs; |
639 | tc_queue->priority = ntohl (msg->priority); | ||
640 | tc_queue->q_len = GNUNET_ntohll (msg->q_len); | ||
633 | if (NULL != tc_h->add_queue_cb) | 641 | if (NULL != tc_h->add_queue_cb) |
634 | { | 642 | { |
635 | tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue); | 643 | tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu); |
636 | } | 644 | } |
637 | GNUNET_SERVICE_client_continue (client->client); | 645 | GNUNET_SERVICE_client_continue (client->client); |
638 | } | 646 | } |
639 | 647 | ||
640 | 648 | ||
641 | /** | 649 | /** |
650 | * @brief Handle new queue | ||
651 | * | ||
652 | * Store context and call client callback. | ||
653 | * | ||
654 | * @param cls Closure - communicator handle | ||
655 | * @param msg Message struct | ||
656 | */ | ||
657 | static void | ||
658 | handle_update_queue_message (void *cls, | ||
659 | const struct | ||
660 | GNUNET_TRANSPORT_UpdateQueueMessage *msg) | ||
661 | { | ||
662 | struct MyClient *client = cls; | ||
663 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = | ||
664 | client->tc; | ||
665 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; | ||
666 | |||
667 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
668 | "Received queue update message for %u with q_len %"PRIu64"\n", | ||
669 | msg->qid, GNUNET_ntohll(msg->q_len)); | ||
670 | tc_queue = tc_h->queue_head; | ||
671 | if (NULL != tc_queue) | ||
672 | { | ||
673 | while (tc_queue->qid != msg->qid) | ||
674 | { | ||
675 | tc_queue = tc_queue->next; | ||
676 | } | ||
677 | } | ||
678 | GNUNET_assert (tc_queue->qid == msg->qid); | ||
679 | GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); | ||
680 | tc_queue->nt = msg->nt; | ||
681 | tc_queue->mtu = ntohl (msg->mtu); | ||
682 | tc_queue->cs = msg->cs; | ||
683 | tc_queue->priority = ntohl (msg->priority); | ||
684 | tc_queue->q_len += GNUNET_ntohll (msg->q_len); | ||
685 | GNUNET_SERVICE_client_continue (client->client); | ||
686 | } | ||
687 | |||
688 | |||
689 | /** | ||
642 | * @brief Shut down the service | 690 | * @brief Shut down the service |
643 | * | 691 | * |
644 | * @param cls Closure - Handle to the service | 692 | * @param cls Closure - Handle to the service |
@@ -789,6 +837,10 @@ transport_communicator_start ( | |||
789 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, | 837 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, |
790 | struct GNUNET_TRANSPORT_AddQueueMessage, | 838 | struct GNUNET_TRANSPORT_AddQueueMessage, |
791 | tc_h), | 839 | tc_h), |
840 | GNUNET_MQ_hd_fixed_size (update_queue_message, | ||
841 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE, | ||
842 | struct GNUNET_TRANSPORT_UpdateQueueMessage, | ||
843 | tc_h), | ||
792 | // GNUNET_MQ_hd_fixed_size (del_queue_message, | 844 | // GNUNET_MQ_hd_fixed_size (del_queue_message, |
793 | // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, | 845 | // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, |
794 | // struct GNUNET_TRANSPORT_DelQueueMessage, | 846 | // struct GNUNET_TRANSPORT_DelQueueMessage, |
@@ -1063,7 +1115,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue ( | |||
1063 | */ | 1115 | */ |
1064 | void | 1116 | void |
1065 | GNUNET_TRANSPORT_TESTING_transport_communicator_send | 1117 | GNUNET_TRANSPORT_TESTING_transport_communicator_send |
1066 | (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, | 1118 | (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, |
1067 | GNUNET_SCHEDULER_TaskCallback cont, | 1119 | GNUNET_SCHEDULER_TaskCallback cont, |
1068 | void *cont_cls, | 1120 | void *cont_cls, |
1069 | const void *payload, | 1121 | const void *payload, |
@@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send | |||
1073 | struct GNUNET_TRANSPORT_SendMessageTo *msg; | 1125 | struct GNUNET_TRANSPORT_SendMessageTo *msg; |
1074 | struct GNUNET_MQ_Envelope *env; | 1126 | struct GNUNET_MQ_Envelope *env; |
1075 | size_t inbox_size; | 1127 | size_t inbox_size; |
1128 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; | ||
1129 | struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp; | ||
1076 | 1130 | ||
1131 | tc_queue = NULL; | ||
1132 | for (tc_queue_tmp = tc_h->queue_head; | ||
1133 | NULL != tc_queue_tmp; | ||
1134 | tc_queue_tmp = tc_queue_tmp->next) | ||
1135 | { | ||
1136 | if (tc_queue_tmp->q_len <= 0) | ||
1137 | continue; | ||
1138 | if (NULL == tc_queue) | ||
1139 | { | ||
1140 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1141 | "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n", | ||
1142 | tc_queue_tmp->priority, | ||
1143 | tc_queue_tmp->q_len, | ||
1144 | tc_queue_tmp->mtu); | ||
1145 | tc_queue = tc_queue_tmp; | ||
1146 | continue; | ||
1147 | } | ||
1148 | if (tc_queue->priority < tc_queue_tmp->priority) | ||
1149 | { | ||
1150 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1151 | "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n", | ||
1152 | tc_queue_tmp->priority, | ||
1153 | tc_queue_tmp->q_len, | ||
1154 | tc_queue_tmp->mtu); | ||
1155 | tc_queue = tc_queue_tmp; | ||
1156 | } | ||
1157 | } | ||
1158 | GNUNET_assert (NULL != tc_queue); | ||
1159 | if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED) | ||
1160 | tc_queue->q_len--; | ||
1077 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1161 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1078 | "Sending message\n"); | 1162 | "Sending message\n"); |
1079 | inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; | 1163 | inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; |
diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h index 7a449f081..b77125e82 100644 --- a/src/transport/transport-testing2.h +++ b/src/transport/transport-testing2.h | |||
@@ -132,7 +132,8 @@ typedef void | |||
132 | *tc_h, | 132 | *tc_h, |
133 | struct | 133 | struct |
134 | GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue | 134 | GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue |
135 | *tc_queue); | 135 | *tc_queue, |
136 | size_t mtu); | ||
136 | 137 | ||
137 | 138 | ||
138 | /** | 139 | /** |
@@ -215,8 +216,8 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (struct | |||
215 | */ | 216 | */ |
216 | void | 217 | void |
217 | GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct | 218 | GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct |
218 | GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue | 219 | GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle |
219 | *tc_queue, | 220 | *tc_h, |
220 | GNUNET_SCHEDULER_TaskCallback | 221 | GNUNET_SCHEDULER_TaskCallback |
221 | cont, | 222 | cont, |
222 | void *cont_cls, | 223 | void *cont_cls, |
diff --git a/src/transport/transport.h b/src/transport/transport.h index 36182d8d7..a64ffd5c6 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h | |||
@@ -836,6 +836,17 @@ struct GNUNET_TRANSPORT_AddQueueMessage | |||
836 | uint32_t mtu; | 836 | uint32_t mtu; |
837 | 837 | ||
838 | /** | 838 | /** |
839 | * Queue length, in NBO. Defines how many messages may be | ||
840 | * send through this queue. UINT64_MAX for unlimited. | ||
841 | */ | ||
842 | uint64_t q_len; | ||
843 | |||
844 | /** | ||
845 | * Priority of the queue in relation to other queues. | ||
846 | */ | ||
847 | uint32_t priority; | ||
848 | |||
849 | /** | ||
839 | * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. | 850 | * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. |
840 | */ | 851 | */ |
841 | uint32_t cs; | 852 | uint32_t cs; |
@@ -845,6 +856,55 @@ struct GNUNET_TRANSPORT_AddQueueMessage | |||
845 | 856 | ||
846 | 857 | ||
847 | /** | 858 | /** |
859 | * Update queue | ||
860 | */ | ||
861 | struct GNUNET_TRANSPORT_UpdateQueueMessage | ||
862 | { | ||
863 | /** | ||
864 | * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP. | ||
865 | */ | ||
866 | struct GNUNET_MessageHeader header; | ||
867 | |||
868 | /** | ||
869 | * Queue identifier (used to identify the queue). | ||
870 | */ | ||
871 | uint32_t qid GNUNET_PACKED; | ||
872 | |||
873 | /** | ||
874 | * Receiver that can be addressed via the queue. | ||
875 | */ | ||
876 | struct GNUNET_PeerIdentity receiver; | ||
877 | |||
878 | /** | ||
879 | * An `enum GNUNET_NetworkType` in NBO. | ||
880 | */ | ||
881 | uint32_t nt; | ||
882 | |||
883 | /** | ||
884 | * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. | ||
885 | */ | ||
886 | uint32_t mtu; | ||
887 | |||
888 | /** | ||
889 | * Queue length, in NBO. Defines how many messages may be | ||
890 | * send through this queue. UINT64_MAX for unlimited. | ||
891 | */ | ||
892 | uint64_t q_len; | ||
893 | |||
894 | /** | ||
895 | * Priority of the queue in relation to other queues. | ||
896 | */ | ||
897 | uint32_t priority; | ||
898 | |||
899 | /** | ||
900 | * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. | ||
901 | */ | ||
902 | uint32_t cs; | ||
903 | }; | ||
904 | |||
905 | |||
906 | |||
907 | /** | ||
848 | * Remove queue, it is no longer available. | 908 | * Remove queue, it is no longer available. |
849 | */ | 909 | */ |
850 | struct GNUNET_TRANSPORT_DelQueueMessage | 910 | struct GNUNET_TRANSPORT_DelQueueMessage |
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index e80cd5c03..cfa144415 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c | |||
@@ -280,6 +280,15 @@ struct GNUNET_TRANSPORT_QueueHandle | |||
280 | * Maximum transmission unit for the queue. | 280 | * Maximum transmission unit for the queue. |
281 | */ | 281 | */ |
282 | uint32_t mtu; | 282 | uint32_t mtu; |
283 | |||
284 | /** | ||
285 | * Queue length. | ||
286 | */ | ||
287 | uint64_t q_len; | ||
288 | /** | ||
289 | * Queue priority. | ||
290 | */ | ||
291 | uint32_t priority; | ||
283 | }; | 292 | }; |
284 | 293 | ||
285 | 294 | ||
@@ -395,6 +404,8 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | |||
395 | 404 | ||
396 | if (NULL == qh->ch->mq) | 405 | if (NULL == qh->ch->mq) |
397 | return; | 406 | return; |
407 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
408 | "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n"); | ||
398 | env = GNUNET_MQ_msg_extra (aqm, | 409 | env = GNUNET_MQ_msg_extra (aqm, |
399 | strlen (qh->address) + 1, | 410 | strlen (qh->address) + 1, |
400 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); | 411 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); |
@@ -402,11 +413,39 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | |||
402 | aqm->receiver = qh->peer; | 413 | aqm->receiver = qh->peer; |
403 | aqm->nt = htonl ((uint32_t) qh->nt); | 414 | aqm->nt = htonl ((uint32_t) qh->nt); |
404 | aqm->mtu = htonl (qh->mtu); | 415 | aqm->mtu = htonl (qh->mtu); |
416 | aqm->q_len = GNUNET_htonll (qh->q_len); | ||
417 | aqm->priority = htonl (qh->priority); | ||
405 | aqm->cs = htonl ((uint32_t) qh->cs); | 418 | aqm->cs = htonl ((uint32_t) qh->cs); |
406 | memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); | 419 | memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); |
407 | GNUNET_MQ_send (qh->ch->mq, env); | 420 | GNUNET_MQ_send (qh->ch->mq, env); |
408 | } | 421 | } |
409 | 422 | ||
423 | /** | ||
424 | * Send message to the transport service about queue @a qh | ||
425 | * updated. | ||
426 | * | ||
427 | * @param qh queue to add | ||
428 | */ | ||
429 | static void | ||
430 | send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | ||
431 | { | ||
432 | struct GNUNET_MQ_Envelope *env; | ||
433 | struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm; | ||
434 | |||
435 | if (NULL == qh->ch->mq) | ||
436 | return; | ||
437 | env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE); | ||
438 | uqm->qid = htonl (qh->queue_id); | ||
439 | uqm->receiver = qh->peer; | ||
440 | uqm->nt = htonl ((uint32_t) qh->nt); | ||
441 | uqm->mtu = htonl (qh->mtu); | ||
442 | uqm->q_len = GNUNET_htonll (qh->q_len); | ||
443 | uqm->priority = htonl (qh->priority); | ||
444 | uqm->cs = htonl ((uint32_t) qh->cs); | ||
445 | GNUNET_MQ_send (qh->ch->mq, env); | ||
446 | } | ||
447 | |||
448 | |||
410 | 449 | ||
411 | /** | 450 | /** |
412 | * Send message to the transport service about queue @a qh | 451 | * Send message to the transport service about queue @a qh |
@@ -924,6 +963,9 @@ GNUNET_TRANSPORT_communicator_receive ( | |||
924 | * @param address address in human-readable format, 0-terminated, UTF-8 | 963 | * @param address address in human-readable format, 0-terminated, UTF-8 |
925 | * @param mtu maximum message size supported by queue, 0 if | 964 | * @param mtu maximum message size supported by queue, 0 if |
926 | * sending is not supported, SIZE_MAX for no MTU | 965 | * sending is not supported, SIZE_MAX for no MTU |
966 | * @param q_len number of messages that can be send through this queue | ||
967 | * @param priority queue priority. Queues with highest priority should be | ||
968 | * used | ||
927 | * @param nt which network type does the @a address belong to? | 969 | * @param nt which network type does the @a address belong to? |
928 | * @param cc what characteristics does the communicator have? | 970 | * @param cc what characteristics does the communicator have? |
929 | * @param cs what is the connection status of the queue? | 971 | * @param cs what is the connection status of the queue? |
@@ -936,6 +978,8 @@ GNUNET_TRANSPORT_communicator_mq_add ( | |||
936 | const struct GNUNET_PeerIdentity *peer, | 978 | const struct GNUNET_PeerIdentity *peer, |
937 | const char *address, | 979 | const char *address, |
938 | uint32_t mtu, | 980 | uint32_t mtu, |
981 | uint64_t q_len, | ||
982 | uint32_t priority, | ||
939 | enum GNUNET_NetworkType nt, | 983 | enum GNUNET_NetworkType nt, |
940 | enum GNUNET_TRANSPORT_ConnectionStatus cs, | 984 | enum GNUNET_TRANSPORT_ConnectionStatus cs, |
941 | struct GNUNET_MQ_Handle *mq) | 985 | struct GNUNET_MQ_Handle *mq) |
@@ -948,6 +992,8 @@ GNUNET_TRANSPORT_communicator_mq_add ( | |||
948 | qh->address = GNUNET_strdup (address); | 992 | qh->address = GNUNET_strdup (address); |
949 | qh->nt = nt; | 993 | qh->nt = nt; |
950 | qh->mtu = mtu; | 994 | qh->mtu = mtu; |
995 | qh->q_len = q_len; | ||
996 | qh->priority = priority; | ||
951 | qh->cs = cs; | 997 | qh->cs = cs; |
952 | qh->mq = mq; | 998 | qh->mq = mq; |
953 | qh->queue_id = ch->queue_gen++; | 999 | qh->queue_id = ch->queue_gen++; |
@@ -958,6 +1004,37 @@ GNUNET_TRANSPORT_communicator_mq_add ( | |||
958 | 1004 | ||
959 | 1005 | ||
960 | /** | 1006 | /** |
1007 | * Notify transport service that an MQ was updated | ||
1008 | * | ||
1009 | * @param ch connection to transport service | ||
1010 | * @param qh the queue to update | ||
1011 | * @param q_len number of messages that can be send through this queue | ||
1012 | * @param priority queue priority. Queues with highest priority should be | ||
1013 | * used | ||
1014 | */ | ||
1015 | void | ||
1016 | GNUNET_TRANSPORT_communicator_mq_update ( | ||
1017 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | ||
1018 | const struct GNUNET_TRANSPORT_QueueHandle *u_qh, | ||
1019 | uint64_t q_len, | ||
1020 | uint32_t priority) | ||
1021 | { | ||
1022 | struct GNUNET_TRANSPORT_QueueHandle *qh; | ||
1023 | |||
1024 | for (qh = ch->queue_head; NULL != qh; qh = qh->next) | ||
1025 | { | ||
1026 | if (u_qh == qh) | ||
1027 | break; | ||
1028 | } | ||
1029 | GNUNET_assert (NULL != qh); | ||
1030 | qh->q_len = q_len; | ||
1031 | qh->priority = priority; | ||
1032 | send_update_queue (qh); | ||
1033 | } | ||
1034 | |||
1035 | |||
1036 | |||
1037 | /** | ||
961 | * Notify transport service that an MQ became unavailable due to a | 1038 | * Notify transport service that an MQ became unavailable due to a |
962 | * disconnect or timeout. | 1039 | * disconnect or timeout. |
963 | * | 1040 | * |