aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-communicator-udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-communicator-udp.c')
-rw-r--r--src/transport/gnunet-communicator-udp.c384
1 files changed, 234 insertions, 150 deletions
diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c
index 344ba5180..46d9766d0 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -549,14 +549,24 @@ struct ReceiverAddress
549 struct GNUNET_CONTAINER_HeapNode *hn; 549 struct GNUNET_CONTAINER_HeapNode *hn;
550 550
551 /** 551 /**
552 * Message queue we are providing for the #ch. 552 * KX message queue we are providing for the #ch.
553 */ 553 */
554 struct GNUNET_MQ_Handle *mq; 554 struct GNUNET_MQ_Handle *kx_mq;
555
556 /**
557 * Default message queue we are providing for the #ch.
558 */
559 struct GNUNET_MQ_Handle *d_mq;
560
561 /**
562 * handle for KX queue with the #ch.
563 */
564 struct GNUNET_TRANSPORT_QueueHandle *kx_qh;
555 565
556 /** 566 /**
557 * handle for this queue with the #ch. 567 * handle for default queue with the #ch.
558 */ 568 */
559 struct GNUNET_TRANSPORT_QueueHandle *qh; 569 struct GNUNET_TRANSPORT_QueueHandle *d_qh;
560 570
561 /** 571 /**
562 * Timeout for this receiver address. 572 * Timeout for this receiver address.
@@ -564,9 +574,14 @@ struct ReceiverAddress
564 struct GNUNET_TIME_Absolute timeout; 574 struct GNUNET_TIME_Absolute timeout;
565 575
566 /** 576 /**
567 * MTU we allowed transport for this receiver right now. 577 * MTU we allowed transport for this receiver's KX queue.
568 */ 578 */
569 size_t mtu; 579 size_t kx_mtu;
580
581 /**
582 * MTU we allowed transport for this receiver's default queue.
583 */
584 size_t d_mtu;
570 585
571 /** 586 /**
572 * Length of the DLL at @a ss_head. 587 * Length of the DLL at @a ss_head.
@@ -786,15 +801,25 @@ receiver_destroy (struct ReceiverAddress *receiver)
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787 "Disconnecting receiver for peer `%s'\n", 802 "Disconnecting receiver for peer `%s'\n",
788 GNUNET_i2s (&receiver->target)); 803 GNUNET_i2s (&receiver->target));
789 if (NULL != (mq = receiver->mq)) 804 if (NULL != (mq = receiver->kx_mq))
790 { 805 {
791 receiver->mq = NULL; 806 receiver->kx_mq = NULL;
792 GNUNET_MQ_destroy (mq); 807 GNUNET_MQ_destroy (mq);
793 } 808 }
794 if (NULL != receiver->qh) 809 if (NULL != receiver->kx_qh)
795 { 810 {
796 GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); 811 GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
797 receiver->qh = NULL; 812 receiver->kx_qh = NULL;
813 }
814 if (NULL != (mq = receiver->d_mq))
815 {
816 receiver->d_mq = NULL;
817 GNUNET_MQ_destroy (mq);
818 }
819 if (NULL != receiver->d_qh)
820 {
821 GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
822 receiver->d_qh = NULL;
798 } 823 }
799 GNUNET_assert (GNUNET_YES == 824 GNUNET_assert (GNUNET_YES ==
800 GNUNET_CONTAINER_multipeermap_remove (receivers, 825 GNUNET_CONTAINER_multipeermap_remove (receivers,
@@ -1265,30 +1290,27 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity *pid, void *value)
1265 (void) pid; 1290 (void) pid;
1266 for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next) 1291 for (struct SharedSecret *ss = receiver->ss_head; NULL != ss; ss = ss->next)
1267 { 1292 {
1268 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269 "Checking shared secrets\n");
1270 if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode))) 1293 if (0 == memcmp (&ack->cmac, &ss->cmac, sizeof(struct GNUNET_HashCode)))
1271 { 1294 {
1272 uint32_t allowed; 1295 uint32_t allowed;
1273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274 "Found matching mac\n"); 1297 "Found matching mac\n");
1275 1298
1276 allowed = ntohl (ack->sequence_max); 1299 allowed = ntohl (ack->sequence_max);
1277 1300
1278 if (allowed > ss->sequence_allowed) 1301 if (allowed > ss->sequence_allowed)
1279 { 1302 {
1280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1281 "%u > %u (%u)\n", allowed, ss->sequence_allowed, 1304 "%u > %u (%u)\n", allowed, ss->sequence_allowed,
1282 receiver->acks_available); 1305 receiver->acks_available);
1283 1306
1284 receiver->acks_available += (allowed - ss->sequence_allowed); 1307 receiver->acks_available += (allowed - ss->sequence_allowed);
1285 if ((allowed - ss->sequence_allowed) == receiver->acks_available) 1308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286 { 1309 "Tell transport we have more acks!\n");
1287 /* we just incremented from zero => MTU change! */ 1310 GNUNET_TRANSPORT_communicator_mq_update (ch,
1288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1311 receiver->d_qh,
1289 "we just incremented from zero => MTU change!\n"); 1312 (allowed - ss->sequence_allowed),
1290 //TODO setup_receiver_mq (receiver); 1313 1);
1291 }
1292 ss->sequence_allowed = allowed; 1314 ss->sequence_allowed = allowed;
1293 /* move ss to head to avoid discarding it anytime soon! */ 1315 /* move ss to head to avoid discarding it anytime soon! */
1294 GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss); 1316 GNUNET_CONTAINER_DLL_remove (receiver->ss_head, receiver->ss_tail, ss);
@@ -1906,15 +1928,24 @@ do_pad (gcry_cipher_hd_t out_cipher, char *dgram, size_t pad_size)
1906 * @param impl_state our `struct ReceiverAddress` 1928 * @param impl_state our `struct ReceiverAddress`
1907 */ 1929 */
1908static void 1930static void
1909mq_send (struct GNUNET_MQ_Handle *mq, 1931mq_send_kx (struct GNUNET_MQ_Handle *mq,
1910 const struct GNUNET_MessageHeader *msg, 1932 const struct GNUNET_MessageHeader *msg,
1911 void *impl_state) 1933 void *impl_state)
1912{ 1934{
1913 struct ReceiverAddress *receiver = impl_state; 1935 struct ReceiverAddress *receiver = impl_state;
1914 uint16_t msize = ntohs (msg->size); 1936 uint16_t msize = ntohs (msg->size);
1937 struct UdpHandshakeSignature uhs;
1938 struct UDPConfirmation uc;
1939 struct InitialKX kx;
1940 struct GNUNET_CRYPTO_EcdhePrivateKey epriv;
1941 char dgram[receiver->kx_mtu + sizeof(uc) + sizeof(kx)];
1942 size_t dpos;
1943 gcry_cipher_hd_t out_cipher;
1944 struct SharedSecret *ss;
1945
1915 1946
1916 GNUNET_assert (mq == receiver->mq); 1947 GNUNET_assert (mq == receiver->kx_mq);
1917 if (msize > receiver->mtu) 1948 if (msize > receiver->kx_mtu)
1918 { 1949 {
1919 GNUNET_break (0); 1950 GNUNET_break (0);
1920 receiver_destroy (receiver); 1951 receiver_destroy (receiver);
@@ -1922,117 +1953,124 @@ mq_send (struct GNUNET_MQ_Handle *mq,
1922 } 1953 }
1923 reschedule_receiver_timeout (receiver); 1954 reschedule_receiver_timeout (receiver);
1924 1955
1925 if (0 == receiver->acks_available) 1956 /* setup key material */
1957 GNUNET_CRYPTO_ecdhe_key_create (&epriv);
1958
1959 ss = setup_shared_secret_enc (&epriv, receiver);
1960 setup_cipher (&ss->master, 0, &out_cipher);
1961 /* compute 'uc' */
1962 uc.sender = my_identity;
1963 uc.monotonic_time =
1964 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
1965 uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
1966 uhs.purpose.size = htonl (sizeof(uhs));
1967 uhs.sender = my_identity;
1968 uhs.receiver = receiver->target;
1969 GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
1970 uhs.monotonic_time = uc.monotonic_time;
1971 GNUNET_CRYPTO_eddsa_sign (my_private_key,
1972 &uhs,
1973 &uc.sender_sig);
1974 /* Leave space for kx */
1975 dpos = sizeof(kx);
1976 /* Append encrypted uc to dgram */
1977 GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
1978 &dgram[dpos],
1979 sizeof(uc),
1980 &uc,
1981 sizeof(uc)));
1982 dpos += sizeof(uc);
1983 /* Append encrypted payload to dgram */
1984 GNUNET_assert (
1985 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
1986 dpos += msize;
1987 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
1988 /* Datagram starts with kx */
1989 kx.ephemeral = uhs.ephemeral;
1990 GNUNET_assert (
1991 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
1992 gcry_cipher_close (out_cipher);
1993 memcpy (dgram, &kx, sizeof(kx));
1994 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
1995 dgram,
1996 sizeof(dgram),
1997 receiver->address,
1998 receiver->address_len))
1999 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
2000 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2001 "Sending KX to %s\n", GNUNET_a2s (receiver->address,
2002 receiver->address_len));
2003 GNUNET_MQ_impl_send_continue (mq);
2004}
2005
2006
2007/**
2008 * Signature of functions implementing the sending functionality of a
2009 * message queue.
2010 *
2011 * @param mq the message queue
2012 * @param msg the message to send
2013 * @param impl_state our `struct ReceiverAddress`
2014 */
2015static void
2016mq_send_d (struct GNUNET_MQ_Handle *mq,
2017 const struct GNUNET_MessageHeader *msg,
2018 void *impl_state)
2019{
2020 struct ReceiverAddress *receiver = impl_state;
2021 uint16_t msize = ntohs (msg->size);
2022
2023 GNUNET_assert (mq == receiver->d_mq);
2024 if ((msize > receiver->d_mtu) ||
2025 (0 == receiver->acks_available))
1926 { 2026 {
1927 /* use KX encryption method */ 2027 GNUNET_break (0);
1928 struct UdpHandshakeSignature uhs; 2028 receiver_destroy (receiver);
1929 struct UDPConfirmation uc; 2029 return;
1930 struct InitialKX kx; 2030 }
1931 struct GNUNET_CRYPTO_EcdhePrivateKey epriv; 2031 reschedule_receiver_timeout (receiver);
1932 char dgram[receiver->mtu + sizeof(uc) + sizeof(kx)];
1933 size_t dpos;
1934 gcry_cipher_hd_t out_cipher;
1935 struct SharedSecret *ss;
1936 2032
1937 /* setup key material */ 2033 /* begin "BOX" encryption method, scan for ACKs from tail! */
1938 GNUNET_CRYPTO_ecdhe_key_create (&epriv); 2034 for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
2035 {
2036 if (ss->sequence_used >= ss->sequence_allowed)
2037 {
2038 continue;
2039 }
2040 char dgram[sizeof(struct UDPBox) + receiver->d_mtu];
2041 struct UDPBox *box;
2042 gcry_cipher_hd_t out_cipher;
2043 size_t dpos;
1939 2044
1940 ss = setup_shared_secret_enc (&epriv, receiver); 2045 box = (struct UDPBox *) dgram;
1941 setup_cipher (&ss->master, 0, &out_cipher); 2046 ss->sequence_used++;
1942 /* compute 'uc' */ 2047 get_kid (&ss->master, ss->sequence_used, &box->kid);
1943 uc.sender = my_identity; 2048 setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
1944 uc.monotonic_time =
1945 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
1946 uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_UDP_HANDSHAKE);
1947 uhs.purpose.size = htonl (sizeof(uhs));
1948 uhs.sender = my_identity;
1949 uhs.receiver = receiver->target;
1950 GNUNET_CRYPTO_ecdhe_key_get_public (&epriv, &uhs.ephemeral);
1951 uhs.monotonic_time = uc.monotonic_time;
1952 GNUNET_CRYPTO_eddsa_sign (my_private_key,
1953 &uhs,
1954 &uc.sender_sig);
1955 /* Leave space for kx */
1956 dpos = sizeof(kx);
1957 /* Append encrypted uc to dgram */
1958 GNUNET_assert (0 == gcry_cipher_encrypt (out_cipher,
1959 &dgram[dpos],
1960 sizeof(uc),
1961 &uc,
1962 sizeof(uc)));
1963 dpos += sizeof(uc);
1964 /* Append encrypted payload to dgram */ 2049 /* Append encrypted payload to dgram */
2050 dpos = sizeof(struct UDPBox);
1965 GNUNET_assert ( 2051 GNUNET_assert (
1966 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize)); 2052 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
1967 dpos += msize; 2053 dpos += msize;
1968 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos); 2054 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
1969 /* Datagram starts with kx */ 2055 GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
1970 kx.ephemeral = uhs.ephemeral; 2056 box->gcm_tag,
1971 GNUNET_assert ( 2057 sizeof(box->gcm_tag)));
1972 0 == gcry_cipher_gettag (out_cipher, kx.gcm_tag, sizeof(kx.gcm_tag)));
1973 gcry_cipher_close (out_cipher); 2058 gcry_cipher_close (out_cipher);
1974 memcpy (dgram, &kx, sizeof(kx));
1975 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock, 2059 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
1976 dgram, 2060 dgram,
1977 sizeof(dgram), 2061 sizeof(dgram),
1978 receiver->address, 2062 receiver->address,
1979 receiver->address_len)) 2063 receiver->address_len))
1980 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send"); 2064 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
1981 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1982 "Sending KX to %s\n", GNUNET_a2s (receiver->address,
1983 receiver->address_len));
1984 GNUNET_MQ_impl_send_continue (mq); 2065 GNUNET_MQ_impl_send_continue (mq);
1985 return; 2066 receiver->acks_available--;
1986 } /* End of KX encryption method */ 2067 if (0 == receiver->acks_available)
1987
1988 /* begin "BOX" encryption method, scan for ACKs from tail! */
1989 for (struct SharedSecret *ss = receiver->ss_tail; NULL != ss; ss = ss->prev)
1990 {
1991 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1992 "In non-kx mode...\n");
1993 if (ss->sequence_used < ss->sequence_allowed)
1994 { 2068 {
1995 char dgram[sizeof(struct UDPBox) + receiver->mtu]; 2069 /* We have no more ACKs */
1996 struct UDPBox *box;
1997 gcry_cipher_hd_t out_cipher;
1998 size_t dpos;
1999
2000 box = (struct UDPBox *) dgram;
2001 ss->sequence_used++;
2002 get_kid (&ss->master, ss->sequence_used, &box->kid);
2003 setup_cipher (&ss->master, ss->sequence_used, &out_cipher);
2004 /* Append encrypted payload to dgram */
2005 dpos = sizeof(struct UDPBox);
2006 GNUNET_assert (
2007 0 == gcry_cipher_encrypt (out_cipher, &dgram[dpos], msize, msg, msize));
2008 dpos += msize;
2009 do_pad (out_cipher, &dgram[dpos], sizeof(dgram) - dpos);
2010 GNUNET_assert (0 == gcry_cipher_gettag (out_cipher,
2011 box->gcm_tag,
2012 sizeof(box->gcm_tag)));
2013 gcry_cipher_close (out_cipher);
2014 if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
2015 dgram,
2016 sizeof(dgram),
2017 receiver->address,
2018 receiver->address_len))
2019 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "send");
2020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2070 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2021 "Sending data\n"); 2071 "No more acks\n");
2022
2023 GNUNET_MQ_impl_send_continue (mq);
2024 receiver->acks_available--;
2025 if (0 == receiver->acks_available)
2026 {
2027 /* We have no more ACKs => MTU change! */
2028 setup_receiver_mq (receiver);
2029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2030 "No more acks, MTU changed\n");
2031 }
2032 return;
2033 } 2072 }
2034 } 2073 }
2035 GNUNET_assert (0);
2036} 2074}
2037 2075
2038 2076
@@ -2045,15 +2083,37 @@ mq_send (struct GNUNET_MQ_Handle *mq,
2045 * @param impl_state our `struct ReceiverAddress` 2083 * @param impl_state our `struct ReceiverAddress`
2046 */ 2084 */
2047static void 2085static void
2048mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) 2086mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
2049{ 2087{
2050 struct ReceiverAddress *receiver = impl_state; 2088 struct ReceiverAddress *receiver = impl_state;
2051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052 "MQ destroyed\n"); 2090 "Default MQ destroyed\n");
2053 if (mq == receiver->mq) 2091 if (mq == receiver->d_mq)
2054 { 2092 {
2055 receiver->mq = NULL; 2093 receiver->d_mq = NULL;
2056 //receiver_destroy (receiver); 2094 receiver_destroy (receiver);
2095 }
2096}
2097
2098
2099/**
2100 * Signature of functions implementing the destruction of a message
2101 * queue. Implementations must not free @a mq, but should take care
2102 * of @a impl_state.
2103 *
2104 * @param mq the message queue to destroy
2105 * @param impl_state our `struct ReceiverAddress`
2106 */
2107static void
2108mq_destroy_kx (struct GNUNET_MQ_Handle *mq, void *impl_state)
2109{
2110 struct ReceiverAddress *receiver = impl_state;
2111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112 "KX MQ destroyed\n");
2113 if (mq == receiver->kx_mq)
2114 {
2115 receiver->kx_mq = NULL;
2116 receiver_destroy (receiver);
2057 } 2117 }
2058} 2118}
2059 2119
@@ -2106,12 +2166,17 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
2106{ 2166{
2107 size_t base_mtu; 2167 size_t base_mtu;
2108 2168
2109 if (NULL != receiver->qh) 2169 /*if (NULL != receiver->kx_qh)
2110 { 2170 {
2111 GNUNET_TRANSPORT_communicator_mq_del (receiver->qh); 2171 GNUNET_TRANSPORT_communicator_mq_del (receiver->kx_qh);
2112 receiver->qh = NULL; 2172 receiver->kx_qh = NULL;
2113 } 2173 }
2114 //GNUNET_assert (NULL == receiver->mq); 2174 if (NULL != receiver->d_qh)
2175 {
2176 GNUNET_TRANSPORT_communicator_mq_del (receiver->d_qh);
2177 receiver->d_qh = NULL;
2178 }*/
2179 // GNUNET_assert (NULL == receiver->mq);
2115 switch (receiver->address->sa_family) 2180 switch (receiver->address->sa_family)
2116 { 2181 {
2117 case AF_INET: 2182 case AF_INET:
@@ -2130,35 +2195,54 @@ setup_receiver_mq (struct ReceiverAddress *receiver)
2130 GNUNET_assert (0); 2195 GNUNET_assert (0);
2131 break; 2196 break;
2132 } 2197 }
2133 if (0 == receiver->acks_available) 2198 /* MTU based on full KX messages */
2134 { 2199 receiver->kx_mtu = base_mtu - sizeof(struct InitialKX) /* 48 */
2135 /* MTU based on full KX messages */ 2200 - sizeof(struct UDPConfirmation); /* 104 */
2136 receiver->mtu = base_mtu - sizeof(struct InitialKX) /* 48 */ 2201 /* MTU based on BOXed messages */
2137 - sizeof(struct UDPConfirmation); /* 104 */ 2202 receiver->d_mtu = base_mtu - sizeof(struct UDPBox);
2138 } 2203
2139 else 2204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2140 { 2205 "Setting up MQs and QHs\n");
2141 /* MTU based on BOXed messages */
2142 receiver->mtu = base_mtu - sizeof(struct UDPBox);
2143 }
2144 /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to 2206 /* => Effective MTU for CORE will range from 1080 (IPv6 + KX) to
2145 1404 (IPv4 + Box) bytes, depending on circumstances... */ 2207 1404 (IPv4 + Box) bytes, depending on circumstances... */
2146 if (NULL == receiver->mq) 2208 if (NULL == receiver->kx_mq)
2147 receiver->mq = GNUNET_MQ_queue_for_callbacks (&mq_send, 2209 receiver->kx_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_kx,
2148 &mq_destroy, 2210 &mq_destroy_kx,
2149 &mq_cancel, 2211 &mq_cancel,
2150 receiver, 2212 receiver,
2151 NULL, 2213 NULL,
2152 &mq_error, 2214 &mq_error,
2153 receiver); 2215 receiver);
2154 receiver->qh = 2216 if (NULL == receiver->d_mq)
2217 receiver->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
2218 &mq_destroy_d,
2219 &mq_cancel,
2220 receiver,
2221 NULL,
2222 &mq_error,
2223 receiver);
2224
2225 receiver->kx_qh =
2155 GNUNET_TRANSPORT_communicator_mq_add (ch, 2226 GNUNET_TRANSPORT_communicator_mq_add (ch,
2156 &receiver->target, 2227 &receiver->target,
2157 receiver->foreign_addr, 2228 receiver->foreign_addr,
2158 receiver->mtu, 2229 receiver->kx_mtu,
2230 GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
2231 0, /* Priority */
2159 receiver->nt, 2232 receiver->nt,
2160 GNUNET_TRANSPORT_CS_OUTBOUND, 2233 GNUNET_TRANSPORT_CS_OUTBOUND,
2161 receiver->mq); 2234 receiver->kx_mq);
2235 receiver->d_qh =
2236 GNUNET_TRANSPORT_communicator_mq_add (ch,
2237 &receiver->target,
2238 receiver->foreign_addr,
2239 receiver->d_mtu,
2240 0, /* Initialize with 0 acks */
2241 1, /* Priority */
2242 receiver->nt,
2243 GNUNET_TRANSPORT_CS_OUTBOUND,
2244 receiver->d_mq);
2245
2162} 2246}
2163 2247
2164 2248