aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
commit198c09654354d09a9b33f27cf095e0295f70826c (patch)
tree07aa088c8e9664dc76915cc6b664654da59359f4
parenta325c3eaa8450d325fe57959eac29da5496cfd6d (diff)
downloadgnunet-198c09654354d09a9b33f27cf095e0295f70826c.tar.gz
gnunet-198c09654354d09a9b33f27cf095e0295f70826c.zip
tng: more UDP communicator backchannels
Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter.
-rw-r--r--src/include/gnunet_protocols.h4
-rw-r--r--src/include/gnunet_transport_communication_service.h24
-rw-r--r--src/transport/gnunet-communicator-tcp.c2
-rw-r--r--src/transport/gnunet-communicator-udp.c384
-rw-r--r--src/transport/gnunet-communicator-unix.c2
-rw-r--r--src/transport/test_communicator_basic.c45
-rw-r--r--src/transport/transport-testing2.c126
-rw-r--r--src/transport/transport-testing2.h7
-rw-r--r--src/transport/transport.h60
-rw-r--r--src/transport/transport_api2_communication.c77
10 files changed, 543 insertions, 188 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 282bb53d1..a9cd7466a 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3161,6 +3161,10 @@ extern "C" {
3161 */ 3161 */
3162#define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221 3162#define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221
3163 3163
3164/**
3165 * @brief inform transport that a queue was updated
3166 */
3167#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE 1222
3164 3168
3165/** 3169/**
3166 * Message sent to indicate to the transport that a monitor 3170 * Message sent to indicate to the transport that a monitor
diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h
index 3ead03536..ea6b95e2d 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -50,6 +50,10 @@ extern "C" {
50 */ 50 */
51#define GNUNET_TRANSPORT_COMMUNICATION_VERSION 0x00000000 51#define GNUNET_TRANSPORT_COMMUNICATION_VERSION 0x00000000
52 52
53/**
54 * Queue length
55 */
56#define GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED UINT64_MAX
53 57
54/** 58/**
55 * Function called by the transport service to initialize a 59 * Function called by the transport service to initialize a
@@ -252,6 +256,9 @@ enum GNUNET_TRANSPORT_ConnectionStatus
252 * @param address address in human-readable format, 0-terminated, UTF-8 256 * @param address address in human-readable format, 0-terminated, UTF-8
253 * @param mtu maximum message size supported by queue, 0 if 257 * @param mtu maximum message size supported by queue, 0 if
254 * sending is not supported, SIZE_MAX for no MTU 258 * sending is not supported, SIZE_MAX for no MTU
259 * @param q_len number of messages that can be send through this queue
260 * @param priority queue priority. Queues with highest priority should be
261 * used
255 * @param nt which network type does the @a address belong to? 262 * @param nt which network type does the @a address belong to?
256 * @param cs what is the connection status of the queue? 263 * @param cs what is the connection status of the queue?
257 * @param mq message queue of the @a peer 264 * @param mq message queue of the @a peer
@@ -263,10 +270,27 @@ GNUNET_TRANSPORT_communicator_mq_add (
263 const struct GNUNET_PeerIdentity *peer, 270 const struct GNUNET_PeerIdentity *peer,
264 const char *address, 271 const char *address,
265 uint32_t mtu, 272 uint32_t mtu,
273 uint64_t q_len,
274 uint32_t priority,
266 enum GNUNET_NetworkType nt, 275 enum GNUNET_NetworkType nt,
267 enum GNUNET_TRANSPORT_ConnectionStatus cs, 276 enum GNUNET_TRANSPORT_ConnectionStatus cs,
268 struct GNUNET_MQ_Handle *mq); 277 struct GNUNET_MQ_Handle *mq);
269 278
279/**
280 * Notify transport service that an MQ was updated
281 *
282 * @param ch connection to transport service
283 * @param qh the queue to update
284 * @param q_len number of messages that can be send through this queue
285 * @param priority queue priority. Queues with highest priority should be
286 * used
287 */
288void
289GNUNET_TRANSPORT_communicator_mq_update (
290 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
291 const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
292 uint64_t q_len,
293 uint32_t priority);
270 294
271/** 295/**
272 * Notify transport service that an MQ became unavailable due to a 296 * Notify transport service that an MQ became unavailable due to a
diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c
index bbfacbffd..7f70c55df 100644
--- a/src/transport/gnunet-communicator-tcp.c
+++ b/src/transport/gnunet-communicator-tcp.c
@@ -1547,6 +1547,8 @@ boot_queue (struct Queue *queue, enum GNUNET_TRANSPORT_ConnectionStatus cs)
1547 &queue->target, 1547 &queue->target,
1548 foreign_addr, 1548 foreign_addr,
1549 0 /* no MTU */, 1549 0 /* no MTU */,
1550 GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
1551 0, /* Priority */
1550 queue->nt, 1552 queue->nt,
1551 cs, 1553 cs,
1552 queue->mq); 1554 queue->mq);
diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c
index 344ba5180..46d9766d0 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -549,14 +549,24 @@ struct ReceiverAddress
549 struct GNUNET_CONTAINER_HeapNode *hn; 549 struct GNUNET_CONTAINER_HeapNode *hn;
550 550
551 /** 551 /**
552 * Message queue we are providing for the #ch. 552 * KX message queue we are providing for the #ch.
553 */ 553 */
554 struct GNUNET_MQ_Handle *mq; 554 struct GNUNET_MQ_Handle *kx_mq;
555
556 /**
557 * Default message queue we are providing for the #ch.
558 */
559 struct GNUNET_MQ_Handle *d_mq;
560
561 /**
562 * handle for KX queue with the #ch.
563 */
564 struct GNUNET_TRANSPORT_QueueHandle *kx_qh;
555 565
556 /** 566 /**
557 * handle for this queue with the #ch. 567 * handle for default queue with the #ch.
558 */ 568 */
559 struct GNUNET_TRANSPORT_QueueHandle *qh; 569 struct GNUNET_TRANSPORT_QueueHandle *d_qh;
560 570
561 /** 571 /**
562 * Timeout for this receiver address. 572 * Timeout for this receiver address.
@@ -564,9 +574,14 @@ struct ReceiverAddress
564 struct GNUNET_TIME_Absolute timeout; 574 struct GNUNET_TIME_Absolute timeout;
565 575
566 /** 576 /**
567 * MTU we allowed transport for this receiver right now. 577 * MTU we allowed transport for this receiver's KX queue.
568 */ 578 */
569 size_t mtu; 579 size_t kx_mtu;
580
581 /**
582 * MTU we allowed transport for this receiver's default queue.
583 */
584 size_t d_mtu;
570 585
571 /** 586 /**
572 * Length of the DLL at @a ss_head. 587 * Length of the DLL at @a ss_head.
@@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver)
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787 "Disconnecting receiver for peer `%s'\n", 802 "Disconnecting receiver for peer `%s'\n",
788 GNUNET_i2s (&receiver->target)); 803 GNUNET_i2s (&receiver->target));
789 if (NULL != (mq = receiver->mq)) 804 if (NULL != (mq = receiver->kx_mq))
790 { 805 {
791 receiver->mq = NULL; 806 receiver->kx_mq = NULL;
792 GNUNET_MQ_destroy (mq); 807 GNUNET_MQ_destroy (mq);
793 } 808 }
794 if (NULL != receiver->qh) 809 if (NULL != receiver->kx_qh)
795 { 810 {
796 GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); 811 GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
797 receiver->qh = NULL; 812 receiver->kx_qh = NULL;
813 }
814 if (NULL != (mq = receiver->d_mq))
815 {
816 receiver->d_mq = NULL;
817 GNUNET_MQ_destroy (mq);
818 }
819 if (NULL != receiver->d_qh)
820 {
821 GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
822 receiver->d_qh = NULL;
798 } 823 }
799 GNUNET_assert (GNUNET_YES == 824 GNUNET_assert (GNUNET_YES ==
800 GNUNET_CONTAINER_multipeermap_remove (receivers, 825 GNUNET_CONTAINER_multipeermap_remove (receivers,
@@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value)
1265 (void) pid; 1290 (void) pid;
1266 for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next) 1291 for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next)
1267 { 1292 {
1268 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269 "Checking shared secrets\n");
1270 if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode))) 1293 if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode)))
1271 { 1294 {
1272 uint32_t allowed; 1295 uint32_t allowed;
1273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274 "Found matching mac\n"); 1297 "Found matching mac\n");
1275 1298
1276 allowed = ntohl (ack->sequence_max); 1299 allowed = ntohl (ack->sequence_max);
1277 1300
1278 if (allowed > ss->sequence_allowed) 1301 if (allowed > ss->sequence_allowed)
1279 { 1302 {
1280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1281 "%u > %u (%u)\n", allowed, ss->sequence_allowed, 1304 "%u > %u (%u)\n", allowed, ss->sequence_allowed,
1282 receiver->acks_available); 1305 receiver->acks_available);
1283 1306
1284 receiver->acks_available += (allowed - ss->sequence_allowed); 1307 receiver->acks_available += (allowed - ss->sequence_allowed);
1285 if ((allowed - ss->sequence_allowed) == receiver->acks_available) 1308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286 { 1309 "Tell transport we have more acks!\n");
1287 /* we just incremented from zero => MTU change! */ 1310 GNUNET_TRANSPORT_communicator_mq_update (ch,
1288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1311 receiver->d_qh,
1289 "we just incremented from zero => MTU change!\n"); 1312 (allowed - ss->sequence_allowed),
1290 //TODO setup_receiver_mq (receiver); 1313 1);
1291 }
1292 ss->sequence_allowed = allowed; 1314 ss->sequence_allowed = allowed;
1293 /* move ss to head to avoid discarding it anytime soon! */ 1315 /* move ss to head to avoid discarding it anytime soon! */
1294 GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss); 1316 GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss);
@@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram, size_t pad_size)
1906 * @param impl_state our `struct ReceiverAddress` 1928 * @param impl_state our `struct ReceiverAddress`
1907 */ 1929 */
1908static void 1930static void
1909mq_send (struct GNUNET_MQ_Handle *mq, 1931mq_send_kx (struct GNUNET_MQ_Handle *mq,
1910 const struct GNUNET_MessageHeader *msg, 1932 const struct GNUNET_MessageHeader *msg,
1911 void *impl_state) 1933 void *impl_state)
1912{ 1934{
1913 struct ReceiverAddress *receiver = impl_state; 1935 struct ReceiverAddress *receiver = impl_state;
1914 uint16_t msize = ntohs (msg->size); 1936 uint16_t msize = ntohs (msg->size);
1937 struct UdpHandshakeSignature uhs;
1938 struct UDPConfirmation uc;
1939 struct InitialKX kx;
1940 struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
1941 char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)];
1942 size_t dpos;
1943 gcry_cipher_hd_t out_cipher;
1944 struct SharedSecret *ss;
1945
1915 1946
1916 GNUNET_assert (mq == receiver->mq); 1947 GNUNET_assert (mq == receiver->kx_mq);
1917 if (msize > receiver->mtu) 1948 if (msize > receiver->kx_mtu)
1918 { 1949 {
1919 GNUNET_break (0); 1950 GNUNET_break (0);
1920 receiver_destroy (receiver); 1951 receiver_destroy (receiver);
@@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq,
1922 } 1953 }
1923 reschedule_receiver_timeout (receiver); 1954 reschedule_receiver_timeout (receiver);
1924 1955
1925 if (0 == receiver->acks_available) 1956 /* setup key material */
1957 GNUNET_CRYPTO_ecdhe_key_create (&epriv);
1958
1959 ss = setup_shared_secret_enc (&epriv, receiver);
1960 setup_cipher (&ss->master, 0, &out_cipher);
1961 /* compute 'uc' */
1962 uc.sender = my_identity;
1963 uc.monotonic_time =
1964 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
1965 uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
1966 uhs.purpose.size = htonl (sizeof(uhs));
1967 uhs.sender = my_identity;
1968 uhs.receiver = receiver->target;
1969 GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
1970 uhs.monotonic_time = uc.monotonic_time;
1971 GNUNET_CRYPTO_eddsa_sign (my_private_key,
1972 &uhs,
1973 &uc.sender_sig);
1974 /* Leave space for kx */
1975 dpos = sizeof(kx);
1976 /* Append encrypted uc to dgram */
1977 GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
1978 &dgram[dpos],
1979 sizeof(uc),
1980 &uc,
1981 sizeof(uc)));
1982 dpos += sizeof(uc);
1983 /* Append encrypted payload to dgram */
1984 GNUNET_assert (
1985 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
1986 dpos += msize;
1987 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
1988 /* Datagram starts with kx */
1989 kx.ephemeral = uhs.ephemeral;
1990 GNUNET_assert (
1991 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
1992 gcry_cipher_close (out_cipher);
1993 memcpy (dgram, &kx, sizeof(kx));
1994 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
1995 dgram,
1996 sizeof(dgram),
1997 receiver->address,
1998 receiver->address_len))
1999 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
2000 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2001 "Sending KX to %s\n", GNUNET_a2s (receiver->address,
2002 receiver->address_len));
2003 GNUNET_MQ_impl_send_continue (mq);
2004}
2005
2006
2007/**
2008 * Signature of functions implementing the sending functionality of a
2009 * message queue.
2010 *
2011 * @param mq the message queue
2012 * @param msg the message to send
2013 * @param impl_state our `struct ReceiverAddress`
2014 */
2015static void
2016mq_send_d (struct GNUNET_MQ_Handle *mq,
2017 const struct GNUNET_MessageHeader *msg,
2018 void *impl_state)
2019{
2020 struct ReceiverAddress *receiver = impl_state;
2021 uint16_t msize = ntohs (msg->size);
2022
2023 GNUNET_assert (mq == receiver->d_mq);
2024 if ((msize > receiver->d_mtu) ||
2025 (0 == receiver->acks_available))
1926 { 2026 {
1927 /* use KX encryption method */ 2027 GNUNET_break (0);
1928 struct UdpHandshakeSignature uhs; 2028 receiver_destroy (receiver);
1929 struct UDPConfirmation uc; 2029 return;
1930 struct InitialKX kx; 2030 }
1931 struct GNUNET_CRYPTO_EcdhePrivateKey epriv; 2031 reschedule_receiver_timeout (receiver);
1932 char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)];
1933 size_t dpos;
1934 gcry_cipher_hd_t out_cipher;
1935 struct SharedSecret *ss;
1936 2032
1937 /* setup key material */ 2033 /* begin "BOX" encryption method, scan for ACKs from tail! */
1938 GNUNET_CRYPTO_ecdhe_key_create (&epriv); 2034 for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
2035 {
2036 if (ss->sequence_used >= ss->sequence_allowed)
2037 {
2038 continue;
2039 }
2040 char dgram[sizeof(struct UDPBox) + receiver->d_mtu];
2041 struct UDPBox *box;
2042 gcry_cipher_hd_t out_cipher;
2043 size_t dpos;
1939 2044
1940 ss = setup_shared_secret_enc (&epriv, receiver); 2045 box = (struct UDPBox *) dgram;
1941 setup_cipher (&ss->master, 0, &out_cipher); 2046 ss->sequence_used++;
1942 /* compute 'uc' */ 2047 get_kid (&ss->master, ss->sequence_used, &box->kid);
1943 uc.sender = my_identity; 2048 setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
1944 uc.monotonic_time =
1945 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
1946 uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
1947 uhs.purpose.size = htonl (sizeof(uhs));
1948 uhs.sender = my_identity;
1949 uhs.receiver = receiver->target;
1950 GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
1951 uhs.monotonic_time = uc.monotonic_time;
1952 GNUNET_CRYPTO_eddsa_sign (my_private_key,
1953 &uhs,
1954 &uc.sender_sig);
1955 /* Leave space for kx */
1956 dpos = sizeof(kx);
1957 /* Append encrypted uc to dgram */
1958 GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
1959 &dgram[dpos],
1960 sizeof(uc),
1961 &uc,
1962 sizeof(uc)));
1963 dpos += sizeof(uc);
1964 /* Append encrypted payload to dgram */ 2049 /* Append encrypted payload to dgram */
2050 dpos = sizeof(struct UDPBox);
1965 GNUNET_assert ( 2051 GNUNET_assert (
1966 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); 2052 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
1967 dpos += msize; 2053 dpos += msize;
1968 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); 2054 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
1969 /* Datagram starts with kx */ 2055 GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
1970 kx.ephemeral = uhs.ephemeral; 2056 box->gcm_tag,
1971 GNUNET_assert ( 2057 sizeof(box->gcm_tag)));
1972 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
1973 gcry_cipher_close (out_cipher); 2058 gcry_cipher_close (out_cipher);
1974 memcpy (dgram, &kx, sizeof(kx));
1975 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, 2059 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
1976 dgram, 2060 dgram,
1977 sizeof(dgram), 2061 sizeof(dgram),
1978 receiver->address, 2062 receiver->address,
1979 receiver->address_len)) 2063 receiver->address_len))
1980 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); 2064 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
1981 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1982 "Sending KX to %s\n", GNUNET_a2s (receiver->address,
1983 receiver->address_len));
1984 GNUNET_MQ_impl_send_continue (mq); 2065 GNUNET_MQ_impl_send_continue (mq);
1985 return; 2066 receiver->acks_available--;
1986 } /* End of KX encryption method */ 2067 if (0 == receiver->acks_available)
1987
1988 /* begin "BOX" encryption method, scan for ACKs from tail! */
1989 for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
1990 {
1991 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1992 "In non-kx mode...\n");
1993 if (ss->sequence_used < ss->sequence_allowed)
1994 { 2068 {
1995 char dgram[sizeof(struct UDPBox) + receiver->mtu]; 2069 /* We have no more ACKs */
1996 struct UDPBox *box;
1997 gcry_cipher_hd_t out_cipher;
1998 size_t dpos;
1999
2000 box = (struct UDPBox *) dgram;
2001 ss->sequence_used++;
2002 get_kid (&ss->master, ss->sequence_used, &box->kid);
2003 setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
2004 /* Append encrypted payload to dgram */
2005 dpos = sizeof(struct UDPBox);
2006 GNUNET_assert (
2007 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
2008 dpos += msize;
2009 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
2010 GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
2011 box->gcm_tag,
2012 sizeof(box->gcm_tag)));
2013 gcry_cipher_close (out_cipher);
2014 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
2015 dgram,
2016 sizeof(dgram),
2017 receiver->address,
2018 receiver->address_len))
2019 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
2020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2070 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2021 "Sending data\n"); 2071 "No more acks\n");
2022
2023 GNUNET_MQ_impl_send_continue (mq);
2024 receiver->acks_available--;
2025 if (0 == receiver->acks_available)
2026 {
2027 /* We have no more ACKs => MTU change! */
2028 setup_receiver_mq (receiver);
2029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2030 "No more acks, MTU changed\n");
2031 }
2032 return;
2033 } 2072 }
2034 } 2073 }
2035 GNUNET_assert (0);
2036} 2074}
2037 2075
2038 2076
@@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq,
2045 * @param impl_state our `struct ReceiverAddress` 2083 * @param impl_state our `struct ReceiverAddress`
2046 */ 2084 */
2047static void 2085static void
2048mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) 2086mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
2049{ 2087{
2050 struct ReceiverAddress *receiver = impl_state; 2088 struct ReceiverAddress *receiver = impl_state;
2051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052 "MQ destroyed\n"); 2090 "Default MQ destroyed\n");
2053 if (mq == receiver->mq) 2091 if (mq == receiver->d_mq)
2054 { 2092 {
2055 receiver->mq = NULL; 2093 receiver->d_mq = NULL;
2056 //receiver_destroy (receiver); 2094 receiver_destroy (receiver);
2095 }
2096}
2097
2098
2099/**
2100 * Signature of functions implementing the destruction of a message
2101 * queue. Implementations must not free @a mq, but should take care
2102 * of @a impl_state.
2103 *
2104 * @param mq the message queue to destroy
2105 * @param impl_state our `struct ReceiverAddress`
2106 */
2107static void
2108mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state)
2109{
2110 struct ReceiverAddress *receiver = impl_state;
2111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112 "KX MQ destroyed\n");
2113 if (mq == receiver->kx_mq)
2114 {
2115 receiver->kx_mq = NULL;
2116 receiver_destroy (receiver);
2057 } 2117 }
2058} 2118}
2059 2119
@@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
2106{ 2166{
2107 size_t base_mtu; 2167 size_t base_mtu;
2108 2168
2109 if (NULL != receiver->qh) 2169 /*if (NULL != receiver->kx_qh)
2110 { 2170 {
2111 GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); 2171 GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
2112 receiver->qh = NULL; 2172 receiver->kx_qh = NULL;
2113 } 2173 }
2114 //GNUNET_assert (NULL == receiver->mq); 2174 if (NULL != receiver->d_qh)
2175 {
2176 GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
2177 receiver->d_qh = NULL;
2178 }*/
2179 // GNUNET_assert (NULL == receiver->mq);
2115 switch (receiver->address->sa_family) 2180 switch (receiver->address->sa_family)
2116 { 2181 {
2117 case AF_INET: 2182 case AF_INET:
@@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
2130 GNUNET_assert (0); 2195 GNUNET_assert (0);
2131 break; 2196 break;
2132 } 2197 }
2133 if (0 == receiver->acks_available) 2198 /* MTU based on full KX messages */
2134 { 2199 receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
2135 /* MTU based on full KX messages */ 2200 - sizeof(struct UDPConfirmation); /* 104 */
2136 receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ 2201 /* MTU based on BOXed messages */
2137 - sizeof(struct UDPConfirmation); /* 104 */ 2202 receiver->d_mtu = base_mtu - sizeof(struct UDPBox);
2138 } 2203
2139 else 2204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2140 { 2205 "Setting up MQs and QHs\n");
2141 /* MTU based on BOXed messages */
2142 receiver->mtu = base_mtu - sizeof(struct UDPBox);
2143 }
2144 /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to 2206 /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
2145 1404 (IPv4 + Box) bytes, depending on circumstances... */ 2207 1404 (IPv4 + Box) bytes, depending on circumstances... */
2146 if (NULL == receiver->mq) 2208 if (NULL == receiver->kx_mq)
2147 receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send, 2209 receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx,
2148 &mq_destroy, 2210 &mq_destroy_kx,
2149 &mq_cancel, 2211 &mq_cancel,
2150 receiver, 2212 receiver,
2151 NULL, 2213 NULL,
2152 &mq_error, 2214 &mq_error,
2153 receiver); 2215 receiver);
2154 receiver->qh = 2216 if (NULL == receiver->d_mq)
2217 receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
2218 &mq_destroy_d,
2219 &mq_cancel,
2220 receiver,
2221 NULL,
2222 &mq_error,
2223 receiver);
2224
2225 receiver->kx_qh =
2155 GNUNET_TRANSPORT_communicator_mq_add (ch, 2226 GNUNET_TRANSPORT_communicator_mq_add (ch,
2156 &receiver->target, 2227 &receiver->target,
2157 receiver->foreign_addr, 2228 receiver->foreign_addr,
2158 receiver->mtu, 2229 receiver->kx_mtu,
2230 GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
2231 0, /* Priority */
2159 receiver->nt, 2232 receiver->nt,
2160 GNUNET_TRANSPORT_CS_OUTBOUND, 2233 GNUNET_TRANSPORT_CS_OUTBOUND,
2161 receiver->mq); 2234 receiver->kx_mq);
2235 receiver->d_qh =
2236 GNUNET_TRANSPORT_communicator_mq_add (ch,
2237 &receiver->target,
2238 receiver->foreign_addr,
2239 receiver->d_mtu,
2240 0, /* Initialize with 0 acks */
2241 1, /* Priority */
2242 receiver->nt,
2243 GNUNET_TRANSPORT_CS_OUTBOUND,
2244 receiver->d_mq);
2245
2162} 2246}
2163 2247
2164 2248
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
index 31d2e4ed3..27dda7281 100644
--- a/src/transport/gnunet-communicator-unix.c
+++ b/src/transport/gnunet-communicator-unix.c
@@ -670,6 +670,8 @@ setup_queue (const struct GNUNET_PeerIdentity *target,
670 &queue->target, 670 &queue->target,
671 foreign_addr, 671 foreign_addr,
672 UNIX_MTU, 672 UNIX_MTU,
673 GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
674 0,
673 GNUNET_NT_LOOPBACK, 675 GNUNET_NT_LOOPBACK,
674 cs, 676 cs,
675 queue->mq); 677 queue->mq);
diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c
index 1dfcf2371..1ea79fa19 100644
--- a/src/transport/test_communicator_basic.c
+++ b/src/transport/test_communicator_basic.c
@@ -58,19 +58,21 @@ static char *cfg_peers_name[NUM_PEERS];
58 58
59static int ret; 59static int ret;
60 60
61static size_t long_message_size;
62
61static struct GNUNET_TIME_Absolute start_short; 63static struct GNUNET_TIME_Absolute start_short;
62 64
63static struct GNUNET_TIME_Absolute start_long; 65static struct GNUNET_TIME_Absolute start_long;
64 66
65static struct GNUNET_TIME_Absolute timeout; 67static struct GNUNET_TIME_Absolute timeout;
66 68
67static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *my_tc; 69static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *my_tc;
68 70
69#define SHORT_MESSAGE_SIZE 128 71#define SHORT_MESSAGE_SIZE 128
70 72
71#define LONG_MESSAGE_SIZE 32000 73#define LONG_MESSAGE_SIZE 32000 /* FIXME */
72 74
73#define BURST_PACKETS 50 75#define BURST_PACKETS 500
74 76
75#define TOTAL_ITERATIONS 1 77#define TOTAL_ITERATIONS 1
76 78
@@ -88,6 +90,7 @@ static unsigned int iterations_left = TOTAL_ITERATIONS;
88 90
89enum TestPhase 91enum TestPhase
90{ 92{
93 TP_INIT,
91 TP_BURST_SHORT, 94 TP_BURST_SHORT,
92 TP_BURST_LONG, 95 TP_BURST_LONG,
93 TP_SIZE_CHECK 96 TP_SIZE_CHECK
@@ -230,15 +233,18 @@ static void
230size_test (void *cls) 233size_test (void *cls)
231{ 234{
232 char *payload; 235 char *payload;
236 size_t max_size = 64000;
233 237
234 GNUNET_assert (TP_SIZE_CHECK == phase); 238 GNUNET_assert (TP_SIZE_CHECK == phase);
235 if (ack >= 64000) 239 if (LONG_MESSAGE_SIZE != long_message_size)
240 max_size = long_message_size;
241 if (ack >= max_size)
236 return; /* Leave some room for our protocol, so not 2^16 exactly */ 242 return; /* Leave some room for our protocol, so not 2^16 exactly */
237 payload = make_payload (ack); 243 payload = make_payload (ack);
238 ack += 5; 244 ack += 5;
239 num_sent++; 245 num_sent++;
240 GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, 246 GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
241 (ack < 64000) 247 (ack < max_size)
242 ? &size_test 248 ? &size_test
243 : NULL, 249 : NULL,
244 NULL, 250 NULL,
@@ -254,7 +260,7 @@ long_test (void *cls)
254{ 260{
255 char *payload; 261 char *payload;
256 262
257 payload = make_payload (LONG_MESSAGE_SIZE); 263 payload = make_payload (long_message_size);
258 num_sent++; 264 num_sent++;
259 GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc, 265 GNUNET_TRANSPORT_TESTING_transport_communicator_send (my_tc,
260 (BURST_PACKETS == 266 (BURST_PACKETS ==
@@ -263,7 +269,7 @@ long_test (void *cls)
263 : &long_test, 269 : &long_test,
264 NULL, 270 NULL,
265 payload, 271 payload,
266 LONG_MESSAGE_SIZE); 272 long_message_size);
267 GNUNET_free (payload); 273 GNUNET_free (payload);
268 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); 274 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
269} 275}
@@ -288,6 +294,7 @@ short_test (void *cls)
288 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); 294 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
289} 295}
290 296
297
291static int test_prepared = GNUNET_NO; 298static int test_prepared = GNUNET_NO;
292 299
293/** 300/**
@@ -316,7 +323,6 @@ prepare_test (void *cls)
316} 323}
317 324
318 325
319
320/** 326/**
321 * @brief Handle opening of queue 327 * @brief Handle opening of queue
322 * 328 *
@@ -332,18 +338,25 @@ static void
332add_queue_cb (void *cls, 338add_queue_cb (void *cls,
333 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, 339 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
334 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue * 340 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *
335 tc_queue) 341 tc_queue,
342 size_t mtu)
336{ 343{
344 if (TP_INIT != phase)
345 return;
337 if (0 != strcmp ((char*) cls, cfg_peers_name[0])) 346 if (0 != strcmp ((char*) cls, cfg_peers_name[0]))
338 return; // TODO? 347 return; // TODO?
339 LOG (GNUNET_ERROR_TYPE_DEBUG, 348 LOG (GNUNET_ERROR_TYPE_DEBUG,
340 "Queue established, starting test...\n"); 349 "Queue established, starting test...\n");
341 start_short = GNUNET_TIME_absolute_get (); 350 start_short = GNUNET_TIME_absolute_get ();
342 my_tc = tc_queue; 351 my_tc = tc_h;
352 if (0 != mtu)
353 long_message_size = mtu;
354 else
355 long_message_size = LONG_MESSAGE_SIZE;
343 phase = TP_BURST_SHORT; 356 phase = TP_BURST_SHORT;
344 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); 357 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES);
345 GNUNET_assert (NULL == to_task); 358 GNUNET_assert (NULL == to_task);
346 to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, 359 to_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
347 &latency_timeout, 360 &latency_timeout,
348 NULL); 361 NULL);
349 prepare_test (NULL); 362 prepare_test (NULL);
@@ -395,6 +408,9 @@ incoming_message_cb (void *cls,
395 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS); 408 timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_SECONDS);
396 switch (phase) 409 switch (phase)
397 { 410 {
411 case TP_INIT:
412 GNUNET_break (0);
413 break;
398 case TP_BURST_SHORT: 414 case TP_BURST_SHORT:
399 { 415 {
400 GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len); 416 GNUNET_assert (SHORT_MESSAGE_SIZE == payload_len);
@@ -428,7 +444,7 @@ incoming_message_cb (void *cls,
428 } 444 }
429 case TP_BURST_LONG: 445 case TP_BURST_LONG:
430 { 446 {
431 if (LONG_MESSAGE_SIZE != payload_len) 447 if (long_message_size != payload_len)
432 { 448 {
433 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 449 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
434 "Ignoring packet with wrong length\n"); 450 "Ignoring packet with wrong length\n");
@@ -441,7 +457,7 @@ incoming_message_cb (void *cls,
441 { 457 {
442 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, 458 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
443 "Long size packet test done.\n"); 459 "Long size packet test done.\n");
444 char *goodput = GNUNET_STRINGS_byte_size_fancy ((LONG_MESSAGE_SIZE 460 char *goodput = GNUNET_STRINGS_byte_size_fancy ((long_message_size
445 * num_received * 1000 461 * num_received * 1000
446 * 1000) 462 * 1000)
447 / duration.rel_value_us); 463 / duration.rel_value_us);
@@ -553,6 +569,7 @@ main (int argc,
553 char *test_name; 569 char *test_name;
554 char *cfg_peer; 570 char *cfg_peer;
555 571
572 phase = TP_INIT;
556 ret = 1; 573 ret = 1;
557 test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]); 574 test_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
558 communicator_name = strchr (test_name, '-'); 575 communicator_name = strchr (test_name, '-');
diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c
index fc6d13590..8250027f7 100644
--- a/src/transport/transport-testing2.c
+++ b/src/transport/transport-testing2.c
@@ -33,7 +33,7 @@
33#include "gnunet_hello_lib.h" 33#include "gnunet_hello_lib.h"
34#include "gnunet_signatures.h" 34#include "gnunet_signatures.h"
35#include "transport.h" 35#include "transport.h"
36 36#include <inttypes.h>
37 37
38#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__) 38#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
39 39
@@ -227,11 +227,21 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
227 uint32_t nt; 227 uint32_t nt;
228 228
229 /** 229 /**
230 * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. 230 * Maximum transmission unit. UINT32_MAX for unlimited.
231 */ 231 */
232 uint32_t mtu; 232 uint32_t mtu;
233 233
234 /** 234 /**
235 * Queue length. UINT64_MAX for unlimited.
236 */
237 uint64_t q_len;
238
239 /**
240 * Queue prio
241 */
242 uint32_t priority;
243
244 /**
235 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. 245 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
236 */ 246 */
237 uint32_t cs; 247 uint32_t cs;
@@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls,
370 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; 380 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
371 struct GNUNET_MQ_Envelope *env; 381 struct GNUNET_MQ_Envelope *env;
372 382
373 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 383 LOG (GNUNET_ERROR_TYPE_DEBUG,
374 "Received backchannel message\n"); 384 "Received backchannel message\n");
375 if (tc_h->bc_enabled != GNUNET_YES) 385 if (tc_h->bc_enabled != GNUNET_YES)
376 { 386 {
377 GNUNET_SERVICE_client_continue (client->client); 387 GNUNET_SERVICE_client_continue (client->client);
@@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls,
379 } 389 }
380 /* Find client providing this communicator */ 390 /* Find client providing this communicator */
381 /* Finally, deliver backchannel message to communicator */ 391 /* Finally, deliver backchannel message to communicator */
382 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 392 LOG (GNUNET_ERROR_TYPE_DEBUG,
383 "Delivering backchannel message of type %u to %s\n", 393 "Delivering backchannel message of type %u to %s\n",
384 ntohs (msg->type), 394 ntohs (msg->type),
385 target_communicator); 395 target_communicator);
386 other_tc_h = tc_h->bc_cb (tc_h, msg, (struct 396 other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
387 GNUNET_PeerIdentity*) &bc_msg->pid); 397 GNUNET_PeerIdentity*) &bc_msg->pid);
388 env = GNUNET_MQ_msg_extra ( 398 env = GNUNET_MQ_msg_extra (
@@ -496,9 +506,6 @@ handle_incoming_msg (void *cls,
496 msg = (struct GNUNET_MessageHeader *) &inc_msg[1]; 506 msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
497 size_t payload_len = ntohs (msg->size) - sizeof (struct 507 size_t payload_len = ntohs (msg->size) - sizeof (struct
498 GNUNET_MessageHeader); 508 GNUNET_MessageHeader);
499 LOG (GNUNET_ERROR_TYPE_DEBUG,
500 "Incoming message from communicator!\n");
501
502 if (NULL != tc_h->incoming_msg_cb) 509 if (NULL != tc_h->incoming_msg_cb)
503 { 510 {
504 tc_h->incoming_msg_cb (tc_h->cb_cls, 511 tc_h->incoming_msg_cb (tc_h->cb_cls,
@@ -608,15 +615,14 @@ handle_add_queue_message (void *cls,
608 client->tc; 615 client->tc;
609 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; 616 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
610 617
611 tc_queue = tc_h->queue_head; 618 LOG (GNUNET_ERROR_TYPE_DEBUG,
612 if (NULL != tc_queue) 619 "Got queue with ID %u\n", msg->qid);
620 for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
613 { 621 {
614 while (tc_queue->qid != msg->qid) 622 if (tc_queue->qid == msg->qid)
615 { 623 break;
616 tc_queue = tc_queue->next;
617 }
618 } 624 }
619 else 625 if (NULL == tc_queue)
620 { 626 {
621 tc_queue = 627 tc_queue =
622 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue); 628 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
@@ -628,17 +634,59 @@ handle_add_queue_message (void *cls,
628 GNUNET_assert (tc_queue->qid == msg->qid); 634 GNUNET_assert (tc_queue->qid == msg->qid);
629 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); 635 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
630 tc_queue->nt = msg->nt; 636 tc_queue->nt = msg->nt;
631 tc_queue->mtu = msg->mtu; 637 tc_queue->mtu = ntohl (msg->mtu);
632 tc_queue->cs = msg->cs; 638 tc_queue->cs = msg->cs;
639 tc_queue->priority = ntohl (msg->priority);
640 tc_queue->q_len = GNUNET_ntohll (msg->q_len);
633 if (NULL != tc_h->add_queue_cb) 641 if (NULL != tc_h->add_queue_cb)
634 { 642 {
635 tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue); 643 tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
636 } 644 }
637 GNUNET_SERVICE_client_continue (client->client); 645 GNUNET_SERVICE_client_continue (client->client);
638} 646}
639 647
640 648
641/** 649/**
650 * @brief Handle new queue
651 *
652 * Store context and call client callback.
653 *
654 * @param cls Closure - communicator handle
655 * @param msg Message struct
656 */
657static void
658handle_update_queue_message (void *cls,
659 const struct
660 GNUNET_TRANSPORT_UpdateQueueMessage *msg)
661{
662 struct MyClient *client = cls;
663 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
664 client->tc;
665 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
666
667 LOG (GNUNET_ERROR_TYPE_DEBUG,
668 "Received queue update message for %u with q_len %"PRIu64"\n",
669 msg->qid, GNUNET_ntohll(msg->q_len));
670 tc_queue = tc_h->queue_head;
671 if (NULL != tc_queue)
672 {
673 while (tc_queue->qid != msg->qid)
674 {
675 tc_queue = tc_queue->next;
676 }
677 }
678 GNUNET_assert (tc_queue->qid == msg->qid);
679 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
680 tc_queue->nt = msg->nt;
681 tc_queue->mtu = ntohl (msg->mtu);
682 tc_queue->cs = msg->cs;
683 tc_queue->priority = ntohl (msg->priority);
684 tc_queue->q_len += GNUNET_ntohll (msg->q_len);
685 GNUNET_SERVICE_client_continue (client->client);
686}
687
688
689/**
642 * @brief Shut down the service 690 * @brief Shut down the service
643 * 691 *
644 * @param cls Closure - Handle to the service 692 * @param cls Closure - Handle to the service
@@ -789,6 +837,10 @@ transport_communicator_start (
789 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, 837 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
790 struct GNUNET_TRANSPORT_AddQueueMessage, 838 struct GNUNET_TRANSPORT_AddQueueMessage,
791 tc_h), 839 tc_h),
840 GNUNET_MQ_hd_fixed_size (update_queue_message,
841 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
842 struct GNUNET_TRANSPORT_UpdateQueueMessage,
843 tc_h),
792 // GNUNET_MQ_hd_fixed_size (del_queue_message, 844 // GNUNET_MQ_hd_fixed_size (del_queue_message,
793 // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, 845 // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
794 // struct GNUNET_TRANSPORT_DelQueueMessage, 846 // struct GNUNET_TRANSPORT_DelQueueMessage,
@@ -1063,7 +1115,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
1063 */ 1115 */
1064void 1116void
1065GNUNET_TRANSPORT_TESTING_transport_communicator_send 1117GNUNET_TRANSPORT_TESTING_transport_communicator_send
1066 (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, 1118 (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
1067 GNUNET_SCHEDULER_TaskCallback cont, 1119 GNUNET_SCHEDULER_TaskCallback cont,
1068 void *cont_cls, 1120 void *cont_cls,
1069 const void *payload, 1121 const void *payload,
@@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
1073 struct GNUNET_TRANSPORT_SendMessageTo *msg; 1125 struct GNUNET_TRANSPORT_SendMessageTo *msg;
1074 struct GNUNET_MQ_Envelope *env; 1126 struct GNUNET_MQ_Envelope *env;
1075 size_t inbox_size; 1127 size_t inbox_size;
1128 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
1129 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
1076 1130
1131 tc_queue = NULL;
1132 for (tc_queue_tmp = tc_h->queue_head;
1133 NULL != tc_queue_tmp;
1134 tc_queue_tmp = tc_queue_tmp->next)
1135 {
1136 if (tc_queue_tmp->q_len <= 0)
1137 continue;
1138 if (NULL == tc_queue)
1139 {
1140 LOG (GNUNET_ERROR_TYPE_DEBUG,
1141 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1142 tc_queue_tmp->priority,
1143 tc_queue_tmp->q_len,
1144 tc_queue_tmp->mtu);
1145 tc_queue = tc_queue_tmp;
1146 continue;
1147 }
1148 if (tc_queue->priority < tc_queue_tmp->priority)
1149 {
1150 LOG (GNUNET_ERROR_TYPE_DEBUG,
1151 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1152 tc_queue_tmp->priority,
1153 tc_queue_tmp->q_len,
1154 tc_queue_tmp->mtu);
1155 tc_queue = tc_queue_tmp;
1156 }
1157 }
1158 GNUNET_assert (NULL != tc_queue);
1159 if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
1160 tc_queue->q_len--;
1077 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078 "Sending message\n"); 1162 "Sending message\n");
1079 inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; 1163 inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h
index 7a449f081..b77125e82 100644
--- a/src/transport/transport-testing2.h
+++ b/src/transport/transport-testing2.h
@@ -132,7 +132,8 @@ typedef void
132 *tc_h, 132 *tc_h,
133 struct 133 struct
134 GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue 134 GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
135 *tc_queue); 135 *tc_queue,
136 size_t mtu);
136 137
137 138
138/** 139/**
@@ -215,8 +216,8 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (struct
215 */ 216 */
216void 217void
217GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct 218GNUNET_TRANSPORT_TESTING_transport_communicator_send (struct
218 GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue 219 GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
219 *tc_queue, 220 *tc_h,
220 GNUNET_SCHEDULER_TaskCallback 221 GNUNET_SCHEDULER_TaskCallback
221 cont, 222 cont,
222 void *cont_cls, 223 void *cont_cls,
diff --git a/src/transport/transport.h b/src/transport/transport.h
index 36182d8d7..a64ffd5c6 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -836,6 +836,17 @@ struct GNUNET_TRANSPORT_AddQueueMessage
836 uint32_t mtu; 836 uint32_t mtu;
837 837
838 /** 838 /**
839 * Queue length, in NBO. Defines how many messages may be
840 * send through this queue. UINT64_MAX for unlimited.
841 */
842 uint64_t q_len;
843
844 /**
845 * Priority of the queue in relation to other queues.
846 */
847 uint32_t priority;
848
849 /**
839 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. 850 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
840 */ 851 */
841 uint32_t cs; 852 uint32_t cs;
@@ -845,6 +856,55 @@ struct GNUNET_TRANSPORT_AddQueueMessage
845 856
846 857
847/** 858/**
859 * Update queue
860 */
861struct GNUNET_TRANSPORT_UpdateQueueMessage
862{
863 /**
864 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP.
865 */
866 struct GNUNET_MessageHeader header;
867
868 /**
869 * Queue identifier (used to identify the queue).
870 */
871 uint32_t qid GNUNET_PACKED;
872
873 /**
874 * Receiver that can be addressed via the queue.
875 */
876 struct GNUNET_PeerIdentity receiver;
877
878 /**
879 * An `enum GNUNET_NetworkType` in NBO.
880 */
881 uint32_t nt;
882
883 /**
884 * Maximum transmission unit, in NBO. UINT32_MAX for unlimited.
885 */
886 uint32_t mtu;
887
888 /**
889 * Queue length, in NBO. Defines how many messages may be
890 * send through this queue. UINT64_MAX for unlimited.
891 */
892 uint64_t q_len;
893
894 /**
895 * Priority of the queue in relation to other queues.
896 */
897 uint32_t priority;
898
899 /**
900 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
901 */
902 uint32_t cs;
903};
904
905
906
907/**
848 * Remove queue, it is no longer available. 908 * Remove queue, it is no longer available.
849 */ 909 */
850struct GNUNET_TRANSPORT_DelQueueMessage 910struct GNUNET_TRANSPORT_DelQueueMessage
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index e80cd5c03..cfa144415 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -280,6 +280,15 @@ struct GNUNET_TRANSPORT_QueueHandle
280 * Maximum transmission unit for the queue. 280 * Maximum transmission unit for the queue.
281 */ 281 */
282 uint32_t mtu; 282 uint32_t mtu;
283
284 /**
285 * Queue length.
286 */
287 uint64_t q_len;
288 /**
289 * Queue priority.
290 */
291 uint32_t priority;
283}; 292};
284 293
285 294
@@ -395,6 +404,8 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
395 404
396 if (NULL == qh->ch->mq) 405 if (NULL == qh->ch->mq)
397 return; 406 return;
407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
408 "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
398 env = GNUNET_MQ_msg_extra (aqm, 409 env = GNUNET_MQ_msg_extra (aqm,
399 strlen (qh->address) + 1, 410 strlen (qh->address) + 1,
400 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); 411 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
@@ -402,11 +413,39 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
402 aqm->receiver = qh->peer; 413 aqm->receiver = qh->peer;
403 aqm->nt = htonl ((uint32_t) qh->nt); 414 aqm->nt = htonl ((uint32_t) qh->nt);
404 aqm->mtu = htonl (qh->mtu); 415 aqm->mtu = htonl (qh->mtu);
416 aqm->q_len = GNUNET_htonll (qh->q_len);
417 aqm->priority = htonl (qh->priority);
405 aqm->cs = htonl ((uint32_t) qh->cs); 418 aqm->cs = htonl ((uint32_t) qh->cs);
406 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); 419 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
407 GNUNET_MQ_send (qh->ch->mq, env); 420 GNUNET_MQ_send (qh->ch->mq, env);
408} 421}
409 422
423/**
424 * Send message to the transport service about queue @a qh
425 * updated.
426 *
427 * @param qh queue to add
428 */
429static void
430send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
431{
432 struct GNUNET_MQ_Envelope *env;
433 struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
434
435 if (NULL == qh->ch->mq)
436 return;
437 env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
438 uqm->qid = htonl (qh->queue_id);
439 uqm->receiver = qh->peer;
440 uqm->nt = htonl ((uint32_t) qh->nt);
441 uqm->mtu = htonl (qh->mtu);
442 uqm->q_len = GNUNET_htonll (qh->q_len);
443 uqm->priority = htonl (qh->priority);
444 uqm->cs = htonl ((uint32_t) qh->cs);
445 GNUNET_MQ_send (qh->ch->mq, env);
446}
447
448
410 449
411/** 450/**
412 * Send message to the transport service about queue @a qh 451 * Send message to the transport service about queue @a qh
@@ -924,6 +963,9 @@ GNUNET_TRANSPORT_communicator_receive (
924 * @param address address in human-readable format, 0-terminated, UTF-8 963 * @param address address in human-readable format, 0-terminated, UTF-8
925 * @param mtu maximum message size supported by queue, 0 if 964 * @param mtu maximum message size supported by queue, 0 if
926 * sending is not supported, SIZE_MAX for no MTU 965 * sending is not supported, SIZE_MAX for no MTU
966 * @param q_len number of messages that can be send through this queue
967 * @param priority queue priority. Queues with highest priority should be
968 * used
927 * @param nt which network type does the @a address belong to? 969 * @param nt which network type does the @a address belong to?
928 * @param cc what characteristics does the communicator have? 970 * @param cc what characteristics does the communicator have?
929 * @param cs what is the connection status of the queue? 971 * @param cs what is the connection status of the queue?
@@ -936,6 +978,8 @@ GNUNET_TRANSPORT_communicator_mq_add (
936 const struct GNUNET_PeerIdentity *peer, 978 const struct GNUNET_PeerIdentity *peer,
937 const char *address, 979 const char *address,
938 uint32_t mtu, 980 uint32_t mtu,
981 uint64_t q_len,
982 uint32_t priority,
939 enum GNUNET_NetworkType nt, 983 enum GNUNET_NetworkType nt,
940 enum GNUNET_TRANSPORT_ConnectionStatus cs, 984 enum GNUNET_TRANSPORT_ConnectionStatus cs,
941 struct GNUNET_MQ_Handle *mq) 985 struct GNUNET_MQ_Handle *mq)
@@ -948,6 +992,8 @@ GNUNET_TRANSPORT_communicator_mq_add (
948 qh->address = GNUNET_strdup (address); 992 qh->address = GNUNET_strdup (address);
949 qh->nt = nt; 993 qh->nt = nt;
950 qh->mtu = mtu; 994 qh->mtu = mtu;
995 qh->q_len = q_len;
996 qh->priority = priority;
951 qh->cs = cs; 997 qh->cs = cs;
952 qh->mq = mq; 998 qh->mq = mq;
953 qh->queue_id = ch->queue_gen++; 999 qh->queue_id = ch->queue_gen++;
@@ -958,6 +1004,37 @@ GNUNET_TRANSPORT_communicator_mq_add (
958 1004
959 1005
960/** 1006/**
1007 * Notify transport service that an MQ was updated
1008 *
1009 * @param ch connection to transport service
1010 * @param qh the queue to update
1011 * @param q_len number of messages that can be send through this queue
1012 * @param priority queue priority. Queues with highest priority should be
1013 * used
1014 */
1015void
1016GNUNET_TRANSPORT_communicator_mq_update (
1017 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1018 const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
1019 uint64_t q_len,
1020 uint32_t priority)
1021{
1022 struct GNUNET_TRANSPORT_QueueHandle *qh;
1023
1024 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
1025 {
1026 if (u_qh == qh)
1027 break;
1028 }
1029 GNUNET_assert (NULL != qh);
1030 qh->q_len = q_len;
1031 qh->priority = priority;
1032 send_update_queue (qh);
1033}
1034
1035
1036
1037/**
961 * Notify transport service that an MQ became unavailable due to a 1038 * Notify transport service that an MQ became unavailable due to a
962 * disconnect or timeout. 1039 * disconnect or timeout.
963 * 1040 *