aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcontrib/netjail/netjail_core.sh5
-rwxr-xr-xcontrib/netjail/netjail_start.sh4
-rw-r--r--src/transport/Makefile.am2
-rw-r--r--src/transport/gnunet-communicator-udp.c29
-rw-r--r--src/transport/gnunet-service-tng.c699
-rw-r--r--src/transport/test_transport_distance_vector_circle_topo.conf2
-rw-r--r--src/transport/test_transport_plugin_cmd_simple_send_dv.c79
-rw-r--r--src/transport/transport_api_cmd_start_peer.c2
-rw-r--r--src/util/mq.c11
9 files changed, 581 insertions, 252 deletions
diff --git a/contrib/netjail/netjail_core.sh b/contrib/netjail/netjail_core.sh
index ed363cf35..da784fa5e 100755
--- a/contrib/netjail/netjail_core.sh
+++ b/contrib/netjail/netjail_core.sh
@@ -188,7 +188,10 @@ netjail_node_add_nat() {
188 local ADDRESS=$2 188 local ADDRESS=$2
189 local MASK=$3 189 local MASK=$3
190 190
191 ip netns exec $NODE iptables -t nat -A POSTROUTING -s "$ADDRESS/$MASK" -j MASQUERADE 191 ip netns exec $NODE nft add table nat
192 ip netns exec $NODE nft add chain nat postrouting { type nat hook postrouting priority 0 \; }
193 ip netns exec $NODE nft add rule ip nat postrouting ip saddr "$ADDRESS/$MASK" counter masquerade
194 # ip netns exec $NODE iptables -t nat -A POSTROUTING -s "$ADDRESS/$MASK" -j MASQUERADE
192} 195}
193 196
194netjail_node_add_default() { 197netjail_node_add_default() {
diff --git a/contrib/netjail/netjail_start.sh b/contrib/netjail/netjail_start.sh
index f7c417c27..e2d5fd634 100755
--- a/contrib/netjail/netjail_start.sh
+++ b/contrib/netjail/netjail_start.sh
@@ -77,11 +77,15 @@ for N in $(seq $GLOBAL_N); do
77 77
78 if [ "1" == "${R_TCP[$N]}" ] 78 if [ "1" == "${R_TCP[$N]}" ]
79 then 79 then
80 #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr $GLOBAL_GROUP.$N tcp dport 60002 counter dnat to $LOCAL_GROUP.1
81 #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr $LOCAL_GROUP.1 ct state new,related,established counter accept
80 ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p tcp -d $GLOBAL_GROUP.$N --dport 60002 -j DNAT --to $LOCAL_GROUP.1 82 ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p tcp -d $GLOBAL_GROUP.$N --dport 60002 -j DNAT --to $LOCAL_GROUP.1
81 ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT 83 ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT
82 fi 84 fi
83 if [ "1" == "${R_UDP[$N]}" ] 85 if [ "1" == "${R_UDP[$N]}" ]
84 then 86 then
87 #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr $GLOBAL_GROUP.$N udp dport $PORT counter dnat to $LOCAL_GROUP.1
88 #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr $LOCAL_GROUP.1 ct state new,related,established counter accept
85 ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p udp -d $GLOBAL_GROUP.$N --dport $PORT -j DNAT --to $LOCAL_GROUP.1 89 ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p udp -d $GLOBAL_GROUP.$N --dport $PORT -j DNAT --to $LOCAL_GROUP.1
86 ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT 90 ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT
87 fi 91 fi
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index 5d6fb8421..ec00bc917 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -778,7 +778,7 @@ check_SCRIPTS= \
778 test_transport_simple_send.sh \ 778 test_transport_simple_send.sh \
779 test_transport_simple_send_broadcast.sh \ 779 test_transport_simple_send_broadcast.sh \
780 test_transport_udp_backchannel.sh \ 780 test_transport_udp_backchannel.sh \
781 # test_transport_simple_send_dv_circle.sh 781 test_transport_simple_send_dv_circle.sh
782 # test_transport_simple_send_dv_inverse.sh 782 # test_transport_simple_send_dv_inverse.sh
783 783
784test_transport_start_with_config_SOURCES = \ 784test_transport_start_with_config_SOURCES = \
diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c
index b6edff485..70848ff79 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -1273,7 +1273,7 @@ pass_plaintext_to_core (struct SenderAddress *sender,
1273 const struct GNUNET_MessageHeader *hdr = plaintext; 1273 const struct GNUNET_MessageHeader *hdr = plaintext;
1274 const char *pos = plaintext; 1274 const char *pos = plaintext;
1275 1275
1276 while (ntohs (hdr->size) < plaintext_len) 1276 while (ntohs (hdr->size) <= plaintext_len)
1277 { 1277 {
1278 GNUNET_STATISTICS_update (stats, 1278 GNUNET_STATISTICS_update (stats,
1279 "# bytes given to core", 1279 "# bytes given to core",
@@ -1722,6 +1722,12 @@ try_handle_plaintext (struct SenderAddress *sender,
1722 const struct UDPAck *ack = (const struct UDPAck *) buf; 1722 const struct UDPAck *ack = (const struct UDPAck *) buf;
1723 uint16_t type; 1723 uint16_t type;
1724 1724
1725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726 "try_handle_plaintext of size %u (%u %u) and type %u\n",
1727 buf_size,
1728 ntohs (hdr->size),
1729 sizeof(*hdr),
1730 ntohs (hdr->type));
1725 if (sizeof(*hdr) > buf_size) 1731 if (sizeof(*hdr) > buf_size)
1726 return; /* not even a header */ 1732 return; /* not even a header */
1727 if (ntohs (hdr->size) > buf_size) 1733 if (ntohs (hdr->size) > buf_size)
@@ -2202,7 +2208,8 @@ verify_confirmation (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
2202{ 2208{
2203 struct UdpHandshakeSignature uhs; 2209 struct UdpHandshakeSignature uhs;
2204 2210
2205 uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE); 2211 uhs.purpose.purpose = htonl (
2212 GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
2206 uhs.purpose.size = htonl (sizeof(uhs)); 2213 uhs.purpose.size = htonl (sizeof(uhs));
2207 uhs.sender = uc->sender; 2214 uhs.sender = uc->sender;
2208 uhs.receiver = my_identity; 2215 uhs.receiver = my_identity;
@@ -2350,7 +2357,8 @@ sock_read (void *cls)
2350 "received UDPBroadcast from %s\n", 2357 "received UDPBroadcast from %s\n",
2351 GNUNET_a2s ((const struct sockaddr *) addr_verify, salen)); 2358 GNUNET_a2s ((const struct sockaddr *) addr_verify, salen));
2352 ub = (const struct UDPBroadcast *) buf; 2359 ub = (const struct UDPBroadcast *) buf;
2353 uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST); 2360 uhs.purpose.purpose = htonl (
2361 GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
2354 uhs.purpose.size = htonl (sizeof(uhs)); 2362 uhs.purpose.size = htonl (sizeof(uhs));
2355 uhs.sender = ub->sender; 2363 uhs.sender = ub->sender;
2356 sender = ub->sender; 2364 sender = ub->sender;
@@ -2366,10 +2374,11 @@ sock_read (void *cls)
2366 GNUNET_i2s (&sender)); 2374 GNUNET_i2s (&sender));
2367 GNUNET_CRYPTO_hash ((struct sockaddr *) addr_verify, salen, &uhs.h_address); 2375 GNUNET_CRYPTO_hash ((struct sockaddr *) addr_verify, salen, &uhs.h_address);
2368 if (GNUNET_OK == 2376 if (GNUNET_OK ==
2369 GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST, 2377 GNUNET_CRYPTO_eddsa_verify (
2370 &uhs, 2378 GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST,
2371 &ub->sender_sig, 2379 &uhs,
2372 &ub->sender.public_key)) 2380 &ub->sender_sig,
2381 &ub->sender.public_key))
2373 { 2382 {
2374 char *addr_s; 2383 char *addr_s;
2375 enum GNUNET_NetworkType nt; 2384 enum GNUNET_NetworkType nt;
@@ -2699,7 +2708,8 @@ mq_send_kx (struct GNUNET_MQ_Handle *mq,
2699 uc.sender = my_identity; 2708 uc.sender = my_identity;
2700 uc.monotonic_time = 2709 uc.monotonic_time =
2701 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); 2710 GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
2702 uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE); 2711 uhs.purpose.purpose = htonl (
2712 GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
2703 uhs.purpose.size = htonl (sizeof(uhs)); 2713 uhs.purpose.size = htonl (sizeof(uhs));
2704 uhs.sender = my_identity; 2714 uhs.sender = my_identity;
2705 uhs.receiver = receiver->target; 2715 uhs.receiver = receiver->target;
@@ -3644,7 +3654,8 @@ iface_proc (void *cls,
3644 bi->salen = addrlen; 3654 bi->salen = addrlen;
3645 bi->found = GNUNET_YES; 3655 bi->found = GNUNET_YES;
3646 bi->bcm.sender = my_identity; 3656 bi->bcm.sender = my_identity;
3647 ubs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST); 3657 ubs.purpose.purpose = htonl (
3658 GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
3648 ubs.purpose.size = htonl (sizeof(ubs)); 3659 ubs.purpose.size = htonl (sizeof(ubs));
3649 ubs.sender = my_identity; 3660 ubs.sender = my_identity;
3650 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 56a854a70..0427bd229 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -82,6 +82,11 @@
82#include "transport.h" 82#include "transport.h"
83 83
84/** 84/**
85 * Maximum number of FC retransmissions for a runing retransmission task.
86 */
87#define MAX_FC_RETRANSMIT_COUNT 1000
88
89/**
85 * Maximum number of messages we acknowledge together in one 90 * Maximum number of messages we acknowledge together in one
86 * cumulative ACK. Larger values may save a bit of bandwidth. 91 * cumulative ACK. Larger values may save a bit of bandwidth.
87 */ 92 */
@@ -186,6 +191,12 @@
186 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) 191 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
187 192
188/** 193/**
194 * Default value for how long we wait for reliability ack.
195 */
196#define DEFAULT_ACK_WAIT_DURATION \
197 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
198
199/**
189 * We only consider queues as "quality" connections when 200 * We only consider queues as "quality" connections when
190 * suppressing the generation of DV initiation messages if 201 * suppressing the generation of DV initiation messages if
191 * the latency of the queue is below this threshold. 202 * the latency of the queue is below this threshold.
@@ -781,6 +792,16 @@ struct TransportDVBoxMessage
781 */ 792 */
782 struct GNUNET_HashCode hmac; 793 struct GNUNET_HashCode hmac;
783 794
795 /**
796 * Size this msg had initially. This is needed to calculate the hmac at the target.
797 * The header size can not be used for that, because the box size is getting smaller at each hop.
798 */
799 /**
800 * The length of the struct (in bytes, including the length field itself),
801 * in big-endian format.
802 */
803 uint16_t orig_size GNUNET_PACKED;
804
784 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values; 805 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
785 excluding the @e origin and the current peer, the last must be 806 excluding the @e origin and the current peer, the last must be
786 the ultimate target; if @e num_hops is zero, the receiver of this 807 the ultimate target; if @e num_hops is zero, the receiver of this
@@ -1342,6 +1363,17 @@ struct VirtualLink
1342 struct GNUNET_SCHEDULER_Task *fc_retransmit_task; 1363 struct GNUNET_SCHEDULER_Task *fc_retransmit_task;
1343 1364
1344 /** 1365 /**
1366 * Number of FC retransmissions for this running task.
1367 */
1368 unsigned int fc_retransmit_count;
1369
1370 /**
1371 * Is this VirtualLink confirmed.
1372 * A unconfirmed VirtualLink might exist, if we got a FC from that target.
1373 */
1374 unsigned int confirmed;
1375
1376 /**
1345 * Neighbour used by this virtual link, NULL if @e dv is used. 1377 * Neighbour used by this virtual link, NULL if @e dv is used.
1346 */ 1378 */
1347 struct Neighbour *n; 1379 struct Neighbour *n;
@@ -2845,8 +2877,8 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa)
2845 struct PendingMessage *pm = pa->pm; 2877 struct PendingMessage *pm = pa->pm;
2846 struct DistanceVectorHop *dvh = pa->dvh; 2878 struct DistanceVectorHop *dvh = pa->dvh;
2847 2879
2848 GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa); 2880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2849 pa_count--; 2881 "free_pending_acknowledgement\n");
2850 if (NULL != q) 2882 if (NULL != q)
2851 { 2883 {
2852 GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa); 2884 GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
@@ -2854,6 +2886,17 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa)
2854 } 2886 }
2855 if (NULL != pm) 2887 if (NULL != pm)
2856 { 2888 {
2889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2890 "remove pa from message\n");
2891 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2892 "remove pa from message %llu\n",
2893 pm->logging_uuid);
2894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2895 "remove pa from message %u\n",
2896 pm->pmt);
2897 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2898 "remove pa from message %s\n",
2899 GNUNET_uuid2s (&pa->ack_uuid.value));
2857 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa); 2900 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2858 pa->pm = NULL; 2901 pa->pm = NULL;
2859 } 2902 }
@@ -2920,8 +2963,11 @@ free_pending_message (struct PendingMessage *pm)
2920 tc->details.core.pending_msg_tail, 2963 tc->details.core.pending_msg_tail,
2921 pm); 2964 pm);
2922 } 2965 }
2923 if (NULL != vl) 2966 if ((NULL != vl) && (NULL == pm->frag_parent))
2924 { 2967 {
2968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2969 "Removing pm %lu\n",
2970 pm->logging_uuid);
2925 GNUNET_CONTAINER_MDLL_remove (vl, 2971 GNUNET_CONTAINER_MDLL_remove (vl,
2926 vl->pending_msg_head, 2972 vl->pending_msg_head,
2927 vl->pending_msg_tail, 2973 vl->pending_msg_tail,
@@ -2929,6 +2975,18 @@ free_pending_message (struct PendingMessage *pm)
2929 } 2975 }
2930 while (NULL != (pa = pm->pa_head)) 2976 while (NULL != (pa = pm->pa_head))
2931 { 2977 {
2978 if (NULL == pa)
2979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2980 "free pending pa null\n");
2981 if (NULL == pm->pa_tail)
2982 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2983 "free pending pa_tail null\n");
2984 if (NULL == pa->prev_pa)
2985 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2986 "free pending pa prev null\n");
2987 if (NULL == pa->next_pa)
2988 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2989 "free pending pa next null\n");
2932 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa); 2990 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2933 pa->pm = NULL; 2991 pa->pm = NULL;
2934 } 2992 }
@@ -2944,6 +3002,9 @@ free_pending_message (struct PendingMessage *pm)
2944 free_fragment_tree (pm->bpm); 3002 free_fragment_tree (pm->bpm);
2945 GNUNET_free (pm->bpm); 3003 GNUNET_free (pm->bpm);
2946 } 3004 }
3005 if (NULL == pm)
3006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3007 "free pending pm null\n");
2947 GNUNET_free (pm); 3008 GNUNET_free (pm);
2948} 3009}
2949 3010
@@ -3028,6 +3089,9 @@ free_virtual_link (struct VirtualLink *vl)
3028 struct PendingMessage *pm; 3089 struct PendingMessage *pm;
3029 struct CoreSentContext *csc; 3090 struct CoreSentContext *csc;
3030 3091
3092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3093 "free virtual link\n");
3094
3031 if (NULL != vl->reassembly_map) 3095 if (NULL != vl->reassembly_map)
3032 { 3096 {
3033 GNUNET_CONTAINER_multihashmap32_iterate (vl->reassembly_map, 3097 GNUNET_CONTAINER_multihashmap32_iterate (vl->reassembly_map,
@@ -3084,6 +3148,8 @@ free_validation_state (struct ValidationState *vs)
3084 vs->hn = NULL; 3148 vs->hn = NULL;
3085 if (NULL != vs->sc) 3149 if (NULL != vs->sc)
3086 { 3150 {
3151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3152 "store cancel\n");
3087 GNUNET_PEERSTORE_store_cancel (vs->sc); 3153 GNUNET_PEERSTORE_store_cancel (vs->sc);
3088 vs->sc = NULL; 3154 vs->sc = NULL;
3089 } 3155 }
@@ -3392,6 +3458,8 @@ free_neighbour (struct Neighbour *neighbour)
3392 } 3458 }
3393 if (NULL != neighbour->sc) 3459 if (NULL != neighbour->sc)
3394 { 3460 {
3461 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3462 "store cancel\n");
3395 GNUNET_PEERSTORE_store_cancel (neighbour->sc); 3463 GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3396 neighbour->sc = NULL; 3464 neighbour->sc = NULL;
3397 } 3465 }
@@ -3503,7 +3571,8 @@ check_for_queue_with_higher_prio (struct Queue *queue, struct Queue *queue_head)
3503 * @param p task priority to use, if @a queue is scheduled 3571 * @param p task priority to use, if @a queue is scheduled
3504 */ 3572 */
3505static void 3573static void
3506schedule_transmit_on_queue (struct Queue *queue, 3574schedule_transmit_on_queue (struct GNUNET_TIME_Relative delay,
3575 struct Queue *queue,
3507 enum GNUNET_SCHEDULER_Priority p) 3576 enum GNUNET_SCHEDULER_Priority p)
3508{ 3577{
3509 if (check_for_queue_with_higher_prio (queue, 3578 if (check_for_queue_with_higher_prio (queue,
@@ -3552,7 +3621,8 @@ schedule_transmit_on_queue (struct Queue *queue,
3552 if (NULL != queue->transmit_task) 3621 if (NULL != queue->transmit_task)
3553 GNUNET_SCHEDULER_cancel (queue->transmit_task); 3622 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3554 queue->transmit_task = 3623 queue->transmit_task =
3555 GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue); 3624 GNUNET_SCHEDULER_add_delayed_with_priority (delay, p, &transmit_on_queue,
3625 queue);
3556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3557 "Considering transmission on queue `%s' to %s\n", 3627 "Considering transmission on queue `%s' to %s\n",
3558 queue->address, 3628 queue->address,
@@ -3677,13 +3747,15 @@ free_queue (struct Queue *queue)
3677 GNUNET_NO); 3747 GNUNET_NO);
3678 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; 3748 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3679 s = s->next_client) 3749 s = s->next_client)
3680 schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 3750 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
3751 s,
3752 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
3681 } 3753 }
3682 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); 3754 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3683 GNUNET_free (queue); 3755 GNUNET_free (queue);
3684 3756
3685 vl = lookup_virtual_link (&neighbour->pid); 3757 vl = lookup_virtual_link (&neighbour->pid);
3686 if ((NULL != vl) && (neighbour == vl->n)) 3758 if ((NULL != vl) && (GNUNET_YES == vl->confirmed) && (neighbour == vl->n))
3687 { 3759 {
3688 GNUNET_SCHEDULER_cancel (vl->visibility_task); 3760 GNUNET_SCHEDULER_cancel (vl->visibility_task);
3689 check_link_down (vl); 3761 check_link_down (vl);
@@ -3710,6 +3782,8 @@ free_address_list_entry (struct AddressListEntry *ale)
3710 ale); 3782 ale);
3711 if (NULL != ale->sc) 3783 if (NULL != ale->sc)
3712 { 3784 {
3785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3786 "store cancel\n");
3713 GNUNET_PEERSTORE_store_cancel (ale->sc); 3787 GNUNET_PEERSTORE_store_cancel (ale->sc);
3714 ale->sc = NULL; 3788 ale->sc = NULL;
3715 } 3789 }
@@ -3954,6 +4028,8 @@ client_send_response (struct PendingMessage *pm)
3954 struct TransportClient *tc = pm->client; 4028 struct TransportClient *tc = pm->client;
3955 struct VirtualLink *vl = pm->vl; 4029 struct VirtualLink *vl = pm->vl;
3956 4030
4031 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4032 "client send response\n");
3957 if (NULL != tc) 4033 if (NULL != tc)
3958 { 4034 {
3959 struct GNUNET_MQ_Envelope *env; 4035 struct GNUNET_MQ_Envelope *env;
@@ -4127,7 +4203,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4127 return; 4203 return;
4128 } 4204 }
4129 vl = lookup_virtual_link (&rom->peer); 4205 vl = lookup_virtual_link (&rom->peer);
4130 if (NULL == vl) 4206 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4131 { 4207 {
4132 GNUNET_STATISTICS_update (GST_stats, 4208 GNUNET_STATISTICS_update (GST_stats,
4133 "# RECV_OK dropped: virtual link unknown", 4209 "# RECV_OK dropped: virtual link unknown",
@@ -4323,6 +4399,12 @@ queue_send_msg (struct Queue *queue,
4323 queue->idle = GNUNET_NO; 4399 queue->idle = GNUNET_NO;
4324 if (0 == queue->q_capacity) 4400 if (0 == queue->q_capacity)
4325 queue->idle = GNUNET_NO; 4401 queue->idle = GNUNET_NO;
4402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4403 "Sending message of type %u (%u) and size %u with MQ %p\n",
4404 ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4405 ntohs (smt->header.size),
4406 payload_size,
4407 queue->tc->mq);
4326 GNUNET_MQ_send (queue->tc->mq, env); 4408 GNUNET_MQ_send (queue->tc->mq, env);
4327 } 4409 }
4328} 4410}
@@ -4684,6 +4766,7 @@ encapsulate_for_dv (struct DistanceVector *dv,
4684 struct GNUNET_PeerIdentity *dhops; 4766 struct GNUNET_PeerIdentity *dhops;
4685 4767
4686 box_hdr.header.size = htons (sizeof(buf)); 4768 box_hdr.header.size = htons (sizeof(buf));
4769 box_hdr.orig_size = htons (sizeof(buf));
4687 box_hdr.num_hops = htons (num_hops); 4770 box_hdr.num_hops = htons (num_hops);
4688 memcpy (buf, &box_hdr, sizeof(box_hdr)); 4771 memcpy (buf, &box_hdr, sizeof(box_hdr));
4689 dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)]; 4772 dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
@@ -4741,7 +4824,7 @@ send_dv_to_neighbour (void *cls,
4741 enum RouteMessageOptions options) 4824 enum RouteMessageOptions options)
4742{ 4825{
4743 (void) cls; 4826 (void) cls;
4744 (void) route_via_neighbour (next_hop, hdr, options); 4827 (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
4745} 4828}
4746 4829
4747 4830
@@ -4776,7 +4859,7 @@ route_control_message_without_fc (struct VirtualLink *vl,
4776 4859
4777 // TODO Do this elsewhere. vl should be given as parameter to method. 4860 // TODO Do this elsewhere. vl should be given as parameter to method.
4778 // vl = lookup_virtual_link (target); 4861 // vl = lookup_virtual_link (target);
4779 GNUNET_assert (NULL != vl); 4862 GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
4780 if (NULL == vl) 4863 if (NULL == vl)
4781 return GNUNET_TIME_UNIT_FOREVER_REL; 4864 return GNUNET_TIME_UNIT_FOREVER_REL;
4782 n = vl->n; 4865 n = vl->n;
@@ -4916,7 +4999,7 @@ consider_sending_fc (void *cls)
4916 fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used); 4999 fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
4917 fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size); 5000 fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
4918 fc.sender_time = GNUNET_TIME_absolute_hton (monotime); 5001 fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
4919 rtt = route_control_message_without_fc (vl, &fc.header, RMO_NONE); 5002 rtt = route_control_message_without_fc (vl, &fc.header, RMO_DV_ALLOWED);
4920 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us) 5003 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
4921 { 5004 {
4922 rtt = GNUNET_TIME_UNIT_SECONDS; 5005 rtt = GNUNET_TIME_UNIT_SECONDS;
@@ -4933,8 +5016,14 @@ consider_sending_fc (void *cls)
4933 } 5016 }
4934 if (NULL != vl->fc_retransmit_task) 5017 if (NULL != vl->fc_retransmit_task)
4935 GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task); 5018 GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
5019 if (MAX_FC_RETRANSMIT_COUNT == vl->fc_retransmit_count)
5020 {
5021 rtt = GNUNET_TIME_UNIT_MINUTES;
5022 vl->fc_retransmit_count = 0;
5023 }
4936 vl->fc_retransmit_task = 5024 vl->fc_retransmit_task =
4937 GNUNET_SCHEDULER_add_delayed (rtt, &task_consider_sending_fc, vl); 5025 GNUNET_SCHEDULER_add_delayed (rtt, &task_consider_sending_fc, vl);
5026 vl->fc_retransmit_count++;
4938} 5027}
4939 5028
4940 5029
@@ -4960,14 +5049,20 @@ check_vl_transmission (struct VirtualLink *vl)
4960 struct Neighbour *n = vl->n; 5049 struct Neighbour *n = vl->n;
4961 struct DistanceVector *dv = vl->dv; 5050 struct DistanceVector *dv = vl->dv;
4962 struct GNUNET_TIME_Absolute now; 5051 struct GNUNET_TIME_Absolute now;
5052 struct VirtualLink *vl_next_hop;
4963 int elig; 5053 int elig;
4964 5054
5055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5056 "check_vl_transmission to target %s\n",
5057 GNUNET_i2s (&vl->target));
4965 /* Check that we have an eligible pending message! 5058 /* Check that we have an eligible pending message!
4966 (cheaper than having #transmit_on_queue() find out!) */ 5059 (cheaper than having #transmit_on_queue() find out!) */
4967 elig = GNUNET_NO; 5060 elig = GNUNET_NO;
4968 for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; 5061 for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
4969 pm = pm->next_vl) 5062 pm = pm->next_vl)
4970 { 5063 {
5064 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5065 "check_vl_transmission loop\n");
4971 if (NULL != pm->qe) 5066 if (NULL != pm->qe)
4972 continue; /* not eligible, is in a queue! */ 5067 continue; /* not eligible, is in a queue! */
4973 if (pm->bytes_msg + vl->outbound_fc_window_size_used > 5068 if (pm->bytes_msg + vl->outbound_fc_window_size_used >
@@ -4983,62 +5078,96 @@ check_vl_transmission (struct VirtualLink *vl)
4983 consider_sending_fc (vl); 5078 consider_sending_fc (vl);
4984 return; /* We have a message, but flow control says "nope" */ 5079 return; /* We have a message, but flow control says "nope" */
4985 } 5080 }
4986 elig = GNUNET_YES;
4987 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4988 "Eligible message %lu of size %llu to %s: %llu/%llu\n", 5082 "Target window on VL %s not stalled. Scheduling transmission on queue\n",
4989 pm->logging_uuid, 5083 GNUNET_i2s (&vl->target));
4990 pm->bytes_msg, 5084 /* Notify queues at direct neighbours that we are interested */
4991 GNUNET_i2s (&vl->target), 5085 now = GNUNET_TIME_absolute_get ();
4992 (unsigned long long) vl->outbound_fc_window_size, 5086 if (NULL != n)
4993 (unsigned long long) (pm->bytes_msg
4994 + vl->outbound_fc_window_size_used));
4995 break;
4996 }
4997 if (GNUNET_NO == elig)
4998 return;
4999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5000 "Not stalled. Scheduling transmission on queue\n");
5001 /* Notify queues at direct neighbours that we are interested */
5002 now = GNUNET_TIME_absolute_get ();
5003 if (NULL != n)
5004 {
5005 for (struct Queue *queue = n->queue_head; NULL != queue;
5006 queue = queue->next_neighbour)
5007 {
5008 if ((GNUNET_YES == queue->idle) &&
5009 (queue->validated_until.abs_value_us > now.abs_value_us))
5010 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
5011 else
5012 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5013 "Neighbour Queue QID: %u (%u) busy or invalid\n",
5014 queue->qid,
5015 queue->idle);
5016 }
5017 }
5018 /* Notify queues via DV that we are interested */
5019 if (NULL != dv)
5020 {
5021 /* Do DV with lower scheduler priority, which effectively means that
5022 IF a neighbour exists and is available, we prefer it. */
5023 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5024 pos = pos->next_dv)
5025 { 5087 {
5026 struct Neighbour *nh = pos->next_hop; 5088 for (struct Queue *queue = n->queue_head; NULL != queue;
5027
5028 if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5029 continue; /* skip this one: path not validated */
5030 for (struct Queue *queue = nh->queue_head; NULL != queue;
5031 queue = queue->next_neighbour) 5089 queue = queue->next_neighbour)
5090 {
5032 if ((GNUNET_YES == queue->idle) && 5091 if ((GNUNET_YES == queue->idle) &&
5033 (queue->validated_until.abs_value_us > now.abs_value_us)) 5092 (queue->validated_until.abs_value_us > now.abs_value_us))
5034 schedule_transmit_on_queue (queue, 5093 {
5035 GNUNET_SCHEDULER_PRIORITY_BACKGROUND); 5094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5095 "Direct neighbour %s not stalled\n",
5096 GNUNET_i2s (&n->pid));
5097 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
5098 queue,
5099 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
5100 elig = GNUNET_YES;
5101 }
5036 else 5102 else
5037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5038 "DV Queue QID: %u (%u) busy or invalid\n", 5104 "Neighbour Queue QID: %u (%u) busy or invalid\n",
5039 queue->qid, 5105 queue->qid,
5040 queue->idle); 5106 queue->idle);
5107 }
5041 } 5108 }
5109 /* Notify queues via DV that we are interested */
5110 if (NULL != dv)
5111 {
5112 /* Do DV with lower scheduler priority, which effectively means that
5113 IF a neighbour exists and is available, we prefer it. */
5114 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5115 pos = pos->next_dv)
5116 {
5117 struct Neighbour *nh = pos->next_hop;
5118
5119
5120 if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5121 continue; /* skip this one: path not validated */
5122 else
5123 {
5124 vl_next_hop = lookup_virtual_link (&nh->pid);
5125 if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5126 vl_next_hop->outbound_fc_window_size)
5127 {
5128 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5129 "Stalled message %lu transmission on next hop %s due to flow control: %llu < %llu\n",
5130 pm->logging_uuid,
5131 GNUNET_i2s (&vl_next_hop->target),
5132 (unsigned long
5133 long) vl_next_hop->outbound_fc_window_size,
5134 (unsigned long long) (pm->bytes_msg
5135 + vl_next_hop->
5136 outbound_fc_window_size_used));
5137 consider_sending_fc (vl_next_hop);
5138 continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5139 }
5140 for (struct Queue *queue = nh->queue_head; NULL != queue;
5141 queue = queue->next_neighbour)
5142 if ((GNUNET_YES == queue->idle) &&
5143 (queue->validated_until.abs_value_us > now.abs_value_us))
5144 {
5145 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5146 "Next hop neighbour %s not stalled\n",
5147 GNUNET_i2s (&nh->pid));
5148 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
5149 queue,
5150 GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
5151 elig = GNUNET_YES;
5152 }
5153 else
5154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5155 "DV Queue QID: %u (%u) busy or invalid\n",
5156 queue->qid,
5157 queue->idle);
5158 }
5159 }
5160 }
5161 if (GNUNET_YES == elig)
5162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5163 "Eligible message %lu of size %llu to %s: %llu/%llu\n",
5164 pm->logging_uuid,
5165 pm->bytes_msg,
5166 GNUNET_i2s (&vl->target),
5167 (unsigned long long) vl->outbound_fc_window_size,
5168 (unsigned long long) (pm->bytes_msg
5169 + vl->outbound_fc_window_size_used));
5170 break;
5042 } 5171 }
5043} 5172}
5044 5173
@@ -5064,7 +5193,7 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
5064 bytes_msg = ntohs (obmm->size); 5193 bytes_msg = ntohs (obmm->size);
5065 pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority); 5194 pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5066 vl = lookup_virtual_link (&obm->peer); 5195 vl = lookup_virtual_link (&obm->peer);
5067 if (NULL == vl) 5196 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5068 { 5197 {
5069 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5070 "Don't have %s as a neighbour (anymore).\n", 5199 "Don't have %s as a neighbour (anymore).\n",
@@ -5154,7 +5283,7 @@ handle_communicator_backchannel (
5154 strlen (is) + 1); 5283 strlen (is) + 1);
5155 // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED); 5284 // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5156 vl = lookup_virtual_link (&cb->pid); 5285 vl = lookup_virtual_link (&cb->pid);
5157 if (NULL != vl) 5286 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5158 { 5287 {
5159 route_control_message_without_fc (vl, &be->header, RMO_DV_ALLOWED); 5288 route_control_message_without_fc (vl, &be->header, RMO_DV_ALLOWED);
5160 } 5289 }
@@ -5428,7 +5557,7 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5428 return; 5557 return;
5429 } 5558 }
5430 vl = lookup_virtual_link (&cmc->im.sender); 5559 vl = lookup_virtual_link (&cmc->im.sender);
5431 if (NULL == vl) 5560 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5432 { 5561 {
5433 /* FIXME: sender is giving us messages for CORE but we don't have 5562 /* FIXME: sender is giving us messages for CORE but we don't have
5434 the link up yet! I *suspect* this can happen right now (i.e. 5563 the link up yet! I *suspect* this can happen right now (i.e.
@@ -5624,7 +5753,7 @@ transmit_cummulative_ack_cb (void *cls)
5624 &ack->header, 5753 &ack->header,
5625 RMO_DV_ALLOWED);*/ 5754 RMO_DV_ALLOWED);*/
5626 vl = lookup_virtual_link (&ac->target); 5755 vl = lookup_virtual_link (&ac->target);
5627 if (NULL != vl) 5756 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5628 { 5757 {
5629 route_control_message_without_fc ( 5758 route_control_message_without_fc (
5630 vl, 5759 vl,
@@ -5765,7 +5894,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
5765 struct FindByMessageUuidContext fc; 5894 struct FindByMessageUuidContext fc;
5766 5895
5767 vl = lookup_virtual_link (&cmc->im.sender); 5896 vl = lookup_virtual_link (&cmc->im.sender);
5768 if (NULL == vl) 5897 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5769 { 5898 {
5770 struct GNUNET_SERVICE_Client *client = cmc->tc->client; 5899 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5771 5900
@@ -5794,6 +5923,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
5794 fb->msg_uuid.uuid, 5923 fb->msg_uuid.uuid,
5795 &find_by_message_uuid, 5924 &find_by_message_uuid,
5796 &fc); 5925 &fc);
5926 fsize = ntohs (fb->header.size) - sizeof(*fb);
5797 if (NULL == (rc = fc.rc)) 5927 if (NULL == (rc = fc.rc))
5798 { 5928 {
5799 rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */ 5929 rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
@@ -5815,11 +5945,16 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
5815 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 5945 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
5816 target = (char *) &rc[1]; 5946 target = (char *) &rc[1];
5817 rc->bitfield = (uint8_t *) (target + rc->msg_size); 5947 rc->bitfield = (uint8_t *) (target + rc->msg_size);
5818 rc->msg_missing = rc->msg_size; 5948 if (fsize != rc->msg_size)
5949 rc->msg_missing = rc->msg_size;
5950 else
5951 rc->msg_missing = 0;
5819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 5952 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5820 "Received fragment at offset %u/%u from %s for NEW message %u\n", 5953 "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %u\n",
5954 fsize,
5821 ntohs (fb->frag_off), 5955 ntohs (fb->frag_off),
5822 msize, 5956 msize,
5957 rc->msg_missing,
5823 GNUNET_i2s (&cmc->im.sender), 5958 GNUNET_i2s (&cmc->im.sender),
5824 (unsigned int) fb->msg_uuid.uuid); 5959 (unsigned int) fb->msg_uuid.uuid);
5825 } 5960 }
@@ -5841,7 +5976,6 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
5841 } 5976 }
5842 5977
5843 /* reassemble */ 5978 /* reassemble */
5844 fsize = ntohs (fb->header.size) - sizeof(*fb);
5845 if (0 == fsize) 5979 if (0 == fsize)
5846 { 5980 {
5847 GNUNET_break (0); 5981 GNUNET_break (0);
@@ -5918,6 +6052,16 @@ check_reliability_box (void *cls,
5918 const struct TransportReliabilityBoxMessage *rb) 6052 const struct TransportReliabilityBoxMessage *rb)
5919{ 6053{
5920 (void) cls; 6054 (void) cls;
6055 const struct GNUNET_MessageHeader *inbox = (const struct
6056 GNUNET_MessageHeader *) &rb[1];
6057
6058 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6059 "check_send_msg with size %u: inner msg type %u and size %u (%u %u)\n",
6060 ntohs (rb->header.size),
6061 ntohs (inbox->type),
6062 ntohs (inbox->size),
6063 sizeof (struct TransportReliabilityBoxMessage),
6064 sizeof (struct GNUNET_MessageHeader));
5921 GNUNET_MQ_check_boxed_message (rb); 6065 GNUNET_MQ_check_boxed_message (rb);
5922 return GNUNET_YES; 6066 return GNUNET_YES;
5923} 6067}
@@ -6060,6 +6204,10 @@ completed_pending_message (struct PendingMessage *pm)
6060{ 6204{
6061 struct PendingMessage *pos; 6205 struct PendingMessage *pos;
6062 6206
6207 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6208 "Complete transmission of message %llu %u\n",
6209 pm->logging_uuid,
6210 pm->pmt);
6063 switch (pm->pmt) 6211 switch (pm->pmt)
6064 { 6212 {
6065 case PMT_CORE: 6213 case PMT_CORE:
@@ -6069,7 +6217,7 @@ completed_pending_message (struct PendingMessage *pm)
6069 return; 6217 return;
6070 6218
6071 case PMT_FRAGMENT_BOX: 6219 case PMT_FRAGMENT_BOX:
6072 /* Fragment sent over reliabile channel */ 6220 /* Fragment sent over reliable channel */
6073 free_fragment_tree (pm); 6221 free_fragment_tree (pm);
6074 pos = pm->frag_parent; 6222 pos = pm->frag_parent;
6075 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); 6223 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
@@ -6080,11 +6228,17 @@ completed_pending_message (struct PendingMessage *pm)
6080 { 6228 {
6081 pm = pos; 6229 pm = pos;
6082 pos = pm->frag_parent; 6230 pos = pm->frag_parent;
6231 if (PMT_DV_BOX == pm->pmt)
6232 {
6233 GNUNET_free (pm);
6234 client_send_response (pos);
6235 return;
6236 }
6083 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); 6237 GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6084 GNUNET_free (pm); 6238 GNUNET_free (pm);
6085 } 6239 }
6086 6240
6087 /* Was this the last applicable fragmment? */ 6241 /* Was this the last applicable fragment? */
6088 if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) && 6242 if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
6089 (pos->frag_off == pos->bytes_msg)) 6243 (pos->frag_off == pos->bytes_msg))
6090 client_send_response (pos); 6244 client_send_response (pos);
@@ -6293,6 +6447,7 @@ handle_backchannel_encapsulation (
6293 target_communicator); 6447 target_communicator);
6294 GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO); 6448 GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO);
6295 GNUNET_free (stastr); 6449 GNUNET_free (stastr);
6450 finish_cmc_handling (cmc);
6296 return; 6451 return;
6297 } 6452 }
6298 /* Finally, deliver backchannel message to communicator */ 6453 /* Finally, deliver backchannel message to communicator */
@@ -6309,6 +6464,7 @@ handle_backchannel_encapsulation (
6309 cbi->pid = cmc->im.sender; 6464 cbi->pid = cmc->im.sender;
6310 memcpy (&cbi[1], inbox, isize); 6465 memcpy (&cbi[1], inbox, isize);
6311 GNUNET_MQ_send (tc->mq, env); 6466 GNUNET_MQ_send (tc->mq, env);
6467 finish_cmc_handling (cmc);
6312} 6468}
6313 6469
6314 6470
@@ -6359,39 +6515,54 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
6359 struct VirtualLink *vl; 6515 struct VirtualLink *vl;
6360 6516
6361 vl = lookup_virtual_link (&dv->target); 6517 vl = lookup_virtual_link (&dv->target);
6362 if (NULL != vl) 6518 if (NULL == vl)
6363 { 6519 {
6364 /* Link was already up, remember dv is also now available and we are done */
6365 vl->dv = dv;
6366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 6520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6367 "Virtual link to %s could now also use DV!\n", 6521 "Creating new virtual link to %s using DV!\n",
6368 GNUNET_i2s (&dv->target)); 6522 GNUNET_i2s (&dv->target));
6369 return; 6523 vl = GNUNET_new (struct VirtualLink);
6524 vl->confirmed = GNUNET_YES;
6525 vl->message_uuid_ctr =
6526 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
6527 vl->target = dv->target;
6528 vl->core_recv_window = RECV_WINDOW_SIZE;
6529 vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
6530 vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
6531 GNUNET_break (GNUNET_YES ==
6532 GNUNET_CONTAINER_multipeermap_put (
6533 links,
6534 &vl->target,
6535 vl,
6536 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6537 vl->dv = dv;
6538 dv->vl = vl;
6539 vl->visibility_task =
6540 GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
6541 consider_sending_fc (vl);
6542 /* We lacked a confirmed connection to the target
6543 before, so tell CORE about it (finally!) */
6544 cores_send_connect_info (&dv->target);
6545 }
6546 else
6547 {
6548 /* Link was already up, remember dv is also now available and we are done */
6549 vl->dv = dv;
6550 dv->vl = vl;
6551 if (GNUNET_NO == vl->confirmed)
6552 {
6553 vl->confirmed = GNUNET_YES;
6554 vl->visibility_task =
6555 GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
6556 consider_sending_fc (vl);
6557 /* We lacked a confirmed connection to the target
6558 before, so tell CORE about it (finally!) */
6559 cores_send_connect_info (&dv->target);
6560 }
6561 else
6562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6563 "Virtual link to %s could now also use DV!\n",
6564 GNUNET_i2s (&dv->target));
6370 } 6565 }
6371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6372 "Creating new virtual link to %s using DV!\n",
6373 GNUNET_i2s (&dv->target));
6374 vl = GNUNET_new (struct VirtualLink);
6375 vl->message_uuid_ctr =
6376 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
6377 vl->target = dv->target;
6378 vl->dv = dv;
6379 dv->vl = vl;
6380 vl->core_recv_window = RECV_WINDOW_SIZE;
6381 vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
6382 vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
6383 vl->visibility_task =
6384 GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
6385 GNUNET_break (GNUNET_YES ==
6386 GNUNET_CONTAINER_multipeermap_put (
6387 links,
6388 &vl->target,
6389 vl,
6390 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6391 consider_sending_fc (vl);
6392 /* We lacked a confirmed connection to the target
6393 before, so tell CORE about it (finally!) */
6394 cores_send_connect_info (&dv->target);
6395} 6566}
6396 6567
6397 6568
@@ -6530,8 +6701,10 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
6530 return GNUNET_NO; 6701 return GNUNET_NO;
6531 } 6702 }
6532 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 6703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6533 "Refreshed known path to %s, forwarding further\n", 6704 "Refreshed known path to %s valid until %s, forwarding further\n",
6534 GNUNET_i2s (&dv->target)); 6705 GNUNET_i2s (&dv->target),
6706 GNUNET_STRINGS_absolute_time_to_string (
6707 pos->path_valid_until));
6535 return GNUNET_YES; 6708 return GNUNET_YES;
6536 } 6709 }
6537 } 6710 }
@@ -6548,8 +6721,9 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
6548 } 6721 }
6549 /* create new DV path entry */ 6722 /* create new DV path entry */
6550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 6723 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6551 "Discovered new DV path to %s\n", 6724 "Discovered new DV path to %s valid until %s\n",
6552 GNUNET_i2s (&dv->target)); 6725 GNUNET_i2s (&dv->target),
6726 GNUNET_STRINGS_absolute_time_to_string (path_valid_until));
6553 hop = GNUNET_malloc (sizeof(struct DistanceVectorHop) 6727 hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
6554 + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3)); 6728 + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
6555 hop->next_hop = next_hop; 6729 hop->next_hop = next_hop;
@@ -6680,7 +6854,7 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
6680 &fwd->header, 6854 &fwd->header,
6681 RMO_UNCONFIRMED_ALLOWED);*/ 6855 RMO_UNCONFIRMED_ALLOWED);*/
6682 vl = lookup_virtual_link (next_hop); 6856 vl = lookup_virtual_link (next_hop);
6683 if (NULL != vl) 6857 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
6684 { 6858 {
6685 route_control_message_without_fc (vl, 6859 route_control_message_without_fc (vl,
6686 &fwd->header, 6860 &fwd->header,
@@ -6977,13 +7151,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
6977 struct GNUNET_TIME_Absolute in_time; 7151 struct GNUNET_TIME_Absolute in_time;
6978 struct Neighbour *n; 7152 struct Neighbour *n;
6979 7153
6980 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6981 "handle dv learn message from %s\n",
6982 GNUNET_i2s (&dvl->initiator));
6983 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6984 "handle dv learn message sender %s\n",
6985 GNUNET_i2s (&cmc->im.sender));
6986
6987 nhops = ntohs (dvl->num_hops); /* 0 = sender is initiator */ 7154 nhops = ntohs (dvl->num_hops); /* 0 = sender is initiator */
6988 bi_history = ntohs (dvl->bidirectional); 7155 bi_history = ntohs (dvl->bidirectional);
6989 hops = (const struct DVPathEntryP *) &dvl[1]; 7156 hops = (const struct DVPathEntryP *) &dvl[1];
@@ -7017,10 +7184,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7017 cc); // FIXME: add bi-directional flag to cc? 7184 cc); // FIXME: add bi-directional flag to cc?
7018 in_time = GNUNET_TIME_absolute_get (); 7185 in_time = GNUNET_TIME_absolute_get ();
7019 7186
7020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7021 "2 handle dv learn message from %s\n",
7022 GNUNET_i2s (&dvl->initiator));
7023
7024 /* continue communicator here, everything else can happen asynchronous! */ 7187 /* continue communicator here, everything else can happen asynchronous! */
7025 finish_cmc_handling (cmc); 7188 finish_cmc_handling (cmc);
7026 7189
@@ -7055,7 +7218,11 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7055 if (GNUNET_YES == n->dv_monotime_available) 7218 if (GNUNET_YES == n->dv_monotime_available)
7056 { 7219 {
7057 if (NULL != n->sc) 7220 if (NULL != n->sc)
7221 {
7222 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7223 "store cancel\n");
7058 GNUNET_PEERSTORE_store_cancel (n->sc); 7224 GNUNET_PEERSTORE_store_cancel (n->sc);
7225 }
7059 n->sc = 7226 n->sc =
7060 GNUNET_PEERSTORE_store (peerstore, 7227 GNUNET_PEERSTORE_store (peerstore,
7061 "transport", 7228 "transport",
@@ -7069,9 +7236,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7069 n); 7236 n);
7070 } 7237 }
7071 } 7238 }
7072 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7073 "3 handle dv learn message from %s\n",
7074 GNUNET_i2s (&dvl->initiator));
7075 /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!, 7239 /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
7076 If signature verification load too high, implement random drop strategy */ 7240 If signature verification load too high, implement random drop strategy */
7077 for (unsigned int i = 0; i < nhops; i++) 7241 for (unsigned int i = 0; i < nhops; i++)
@@ -7110,9 +7274,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7110 return; 7274 return;
7111 } 7275 }
7112 } 7276 }
7113 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7114 "4 handle dv learn message from %s\n",
7115 GNUNET_i2s (&dvl->initiator));
7116 if (GNUNET_EXTRA_LOGGING > 0) 7277 if (GNUNET_EXTRA_LOGGING > 0)
7117 { 7278 {
7118 char *path; 7279 char *path;
@@ -7137,9 +7298,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7137 GNUNET_i2s (&GST_my_identity)); 7298 GNUNET_i2s (&GST_my_identity));
7138 GNUNET_free (path); 7299 GNUNET_free (path);
7139 } 7300 }
7140 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7141 "5 handle dv learn message from %s\n",
7142 GNUNET_i2s (&dvl->initiator));
7143 do_fwd = GNUNET_YES; 7301 do_fwd = GNUNET_YES;
7144 if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator)) 7302 if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
7145 { 7303 {
@@ -7147,6 +7305,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7147 struct GNUNET_TIME_Relative host_latency_sum; 7305 struct GNUNET_TIME_Relative host_latency_sum;
7148 struct GNUNET_TIME_Relative latency; 7306 struct GNUNET_TIME_Relative latency;
7149 struct GNUNET_TIME_Relative network_latency; 7307 struct GNUNET_TIME_Relative network_latency;
7308 struct GNUNET_TIME_Absolute now;
7150 7309
7151 /* We initiated this, learn the forward path! */ 7310 /* We initiated this, learn the forward path! */
7152 path[0] = GST_my_identity; 7311 path[0] = GST_my_identity;
@@ -7155,7 +7314,11 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7155 7314
7156 // Need also something to lookup initiation time 7315 // Need also something to lookup initiation time
7157 // to compute RTT! -> add RTT argument here? 7316 // to compute RTT! -> add RTT argument here?
7158 latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly 7317 now = GNUNET_TIME_absolute_get ();
7318 latency = GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh (
7319 dvl->monotonic_time));
7320 GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us);
7321 // latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
7159 // (based on dvl->challenge, we can identify time of origin!) 7322 // (based on dvl->challenge, we can identify time of origin!)
7160 7323
7161 network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum); 7324 network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
@@ -7184,9 +7347,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7184 do_fwd = GNUNET_NO; 7347 do_fwd = GNUNET_NO;
7185 return; 7348 return;
7186 } 7349 }
7187 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7188 "6 handle dv learn message from %s\n",
7189 GNUNET_i2s (&dvl->initiator));
7190 if (bi_hop) 7350 if (bi_hop)
7191 { 7351 {
7192 /* last hop was bi-directional, we could learn something here! */ 7352 /* last hop was bi-directional, we could learn something here! */
@@ -7243,9 +7403,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7243 } 7403 }
7244 } 7404 }
7245 } 7405 }
7246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7247 "7 handle dv learn message from %s\n",
7248 GNUNET_i2s (&dvl->initiator));
7249 if (MAX_DV_HOPS_ALLOWED == nhops) 7406 if (MAX_DV_HOPS_ALLOWED == nhops)
7250 { 7407 {
7251 /* At limit, we're out of here! */ 7408 /* At limit, we're out of here! */
@@ -7305,9 +7462,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7305 &dv_neighbour_transmission, 7462 &dv_neighbour_transmission,
7306 &nsc); 7463 &nsc);
7307 } 7464 }
7308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7309 "9 handle dv learn message from %s\n",
7310 GNUNET_i2s (&dvl->initiator));
7311} 7465}
7312 7466
7313 7467
@@ -7374,10 +7528,17 @@ forward_dv_box (struct Neighbour *next_hop,
7374 char msg_buf[msg_size] GNUNET_ALIGN; 7528 char msg_buf[msg_size] GNUNET_ALIGN;
7375 struct GNUNET_PeerIdentity *dhops; 7529 struct GNUNET_PeerIdentity *dhops;
7376 7530
7531 if (GNUNET_NO == ntohs (hdr->without_fc))
7532 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7533 "forward dv box without fc\n");
7534 if (NULL == vl)
7535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7536 "forward dv box vl null\n");
7377 GNUNET_assert (GNUNET_YES == ntohs (hdr->without_fc) || NULL != vl); 7537 GNUNET_assert (GNUNET_YES == ntohs (hdr->without_fc) || NULL != vl);
7378 7538
7379 hdr->num_hops = htons (num_hops); 7539 hdr->num_hops = htons (num_hops);
7380 hdr->total_hops = htons (total_hops); 7540 hdr->total_hops = htons (total_hops);
7541 hdr->header.size = htons (msg_size);
7381 memcpy (msg_buf, hdr, sizeof(*hdr)); 7542 memcpy (msg_buf, hdr, sizeof(*hdr));
7382 dhops = (struct GNUNET_PeerIdentity *) &msg_buf[sizeof(struct 7543 dhops = (struct GNUNET_PeerIdentity *) &msg_buf[sizeof(struct
7383 TransportDVBoxMessage)]; 7544 TransportDVBoxMessage)];
@@ -7387,7 +7548,8 @@ forward_dv_box (struct Neighbour *next_hop,
7387 if (GNUNET_YES == ntohs (hdr->without_fc)) 7548 if (GNUNET_YES == ntohs (hdr->without_fc))
7388 { 7549 {
7389 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 7550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7390 "Forwarding control message in DV Box to next hop %s (%u/%u) \n", 7551 "Forwarding control message (payload size %u) in DV Box to next hop %s (%u/%u) \n",
7552 enc_payload_size,
7391 GNUNET_i2s (&next_hop->pid), 7553 GNUNET_i2s (&next_hop->pid),
7392 (unsigned int) num_hops, 7554 (unsigned int) num_hops,
7393 (unsigned int) total_hops); 7555 (unsigned int) total_hops);
@@ -7444,6 +7606,8 @@ free_backtalker (struct Backtalker *b)
7444 } 7606 }
7445 if (NULL != b->sc) 7607 if (NULL != b->sc)
7446 { 7608 {
7609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7610 "store cancel\n");
7447 GNUNET_PEERSTORE_store_cancel (b->sc); 7611 GNUNET_PEERSTORE_store_cancel (b->sc);
7448 b->sc = NULL; 7612 b->sc = NULL;
7449 } 7613 }
@@ -7486,6 +7650,8 @@ backtalker_timeout_cb (void *cls)
7486{ 7650{
7487 struct Backtalker *b = cls; 7651 struct Backtalker *b = cls;
7488 7652
7653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7654 "backtalker timeout.\n");
7489 b->task = NULL; 7655 b->task = NULL;
7490 if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us) 7656 if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
7491 { 7657 {
@@ -7589,6 +7755,8 @@ update_backtalker_monotime (struct Backtalker *b)
7589 7755
7590 if (NULL != b->sc) 7756 if (NULL != b->sc)
7591 { 7757 {
7758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7759 "store cancel\n");
7592 GNUNET_PEERSTORE_store_cancel (b->sc); 7760 GNUNET_PEERSTORE_store_cancel (b->sc);
7593 b->sc = NULL; 7761 b->sc = NULL;
7594 } 7762 }
@@ -7714,8 +7882,8 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
7714 7882
7715 dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, key); 7883 dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, key);
7716 hdr = (const char *) &dvb[1]; 7884 hdr = (const char *) &dvb[1];
7717 hdr_len = ntohs (dvb->header.size) - sizeof(*dvb) - sizeof(struct 7885 hdr_len = ntohs (dvb->orig_size) - sizeof(*dvb) - sizeof(struct
7718 GNUNET_PeerIdentity) 7886 GNUNET_PeerIdentity)
7719 * ntohs (dvb->total_hops); 7887 * ntohs (dvb->total_hops);
7720 7888
7721 dv_hmac (key, &hmac, hdr, hdr_len); 7889 dv_hmac (key, &hmac, hdr, hdr_len);
@@ -8112,7 +8280,7 @@ handle_validation_challenge (
8112 } 8280 }
8113 sender = cmc->im.sender; 8281 sender = cmc->im.sender;
8114 vl = lookup_virtual_link (&sender); 8282 vl = lookup_virtual_link (&sender);
8115 if (NULL != vl) 8283 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
8116 { 8284 {
8117 // route_control_message_without_fc (&cmc->im.sender, 8285 // route_control_message_without_fc (&cmc->im.sender,
8118 route_control_message_without_fc (vl, 8286 route_control_message_without_fc (vl,
@@ -8380,46 +8548,63 @@ handle_validation_response (
8380 q->pd.aged_rtt = vs->validation_rtt; 8548 q->pd.aged_rtt = vs->validation_rtt;
8381 n = q->neighbour; 8549 n = q->neighbour;
8382 vl = lookup_virtual_link (&vs->pid); 8550 vl = lookup_virtual_link (&vs->pid);
8383 if (NULL != vl) 8551 if (NULL == vl)
8552 {
8553 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8554 "Creating new virtual link to %s using direct neighbour!\n",
8555 GNUNET_i2s (&vs->pid));
8556 vl = GNUNET_new (struct VirtualLink);
8557 vl->confirmed = GNUNET_YES;
8558 vl->message_uuid_ctr =
8559 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
8560 vl->target = n->pid;
8561 vl->core_recv_window = RECV_WINDOW_SIZE;
8562 vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
8563 vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
8564 GNUNET_break (GNUNET_YES ==
8565 GNUNET_CONTAINER_multipeermap_put (
8566 links,
8567 &vl->target,
8568 vl,
8569 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
8570 vl->n = n;
8571 n->vl = vl;
8572 q->idle = GNUNET_YES;
8573 vl->visibility_task =
8574 GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
8575 consider_sending_fc (vl);
8576 /* We lacked a confirmed connection to the target
8577 before, so tell CORE about it (finally!) */
8578 cores_send_connect_info (&n->pid);
8579 }
8580 else
8384 { 8581 {
8385 /* Link was already up, remember n is also now available and we are done */ 8582 /* Link was already up, remember n is also now available and we are done */
8386 if (NULL == vl->n) 8583 if (NULL == vl->n)
8387 { 8584 {
8388 vl->n = n; 8585 vl->n = n;
8389 n->vl = vl; 8586 n->vl = vl;
8390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 8587 if (GNUNET_YES == vl->confirmed)
8391 "Virtual link to %s could now also direct neighbour!\n", 8588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8392 GNUNET_i2s (&vs->pid)); 8589 "Virtual link to %s could now also direct neighbour!\n",
8590 GNUNET_i2s (&vs->pid));
8393 } 8591 }
8394 else 8592 else
8395 { 8593 {
8396 GNUNET_assert (n == vl->n); 8594 GNUNET_assert (n == vl->n);
8397 } 8595 }
8398 return; 8596 if (GNUNET_NO == vl->confirmed)
8597 {
8598 vl->confirmed = GNUNET_YES;
8599 q->idle = GNUNET_YES;
8600 vl->visibility_task =
8601 GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
8602 consider_sending_fc (vl);
8603 /* We lacked a confirmed connection to the target
8604 before, so tell CORE about it (finally!) */
8605 cores_send_connect_info (&n->pid);
8606 }
8399 } 8607 }
8400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8401 "Creating new virtual link to %s using direct neighbour!\n",
8402 GNUNET_i2s (&vs->pid));
8403 vl = GNUNET_new (struct VirtualLink);
8404 vl->target = n->pid;
8405 vl->n = n;
8406 n->vl = vl;
8407 q->idle = GNUNET_YES;
8408 vl->core_recv_window = RECV_WINDOW_SIZE;
8409 vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
8410 vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
8411 vl->visibility_task =
8412 GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
8413 GNUNET_break (GNUNET_YES ==
8414 GNUNET_CONTAINER_multipeermap_put (
8415 links,
8416 &vl->target,
8417 vl,
8418 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
8419 consider_sending_fc (vl);
8420 /* We lacked a confirmed connection to the target
8421 before, so tell CORE about it (finally!) */
8422 cores_send_connect_info (&n->pid);
8423} 8608}
8424 8609
8425 8610
@@ -8462,6 +8647,7 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
8462 struct GNUNET_TIME_Absolute st; 8647 struct GNUNET_TIME_Absolute st;
8463 uint64_t os; 8648 uint64_t os;
8464 uint64_t wnd; 8649 uint64_t wnd;
8650 uint32_t random;
8465 8651
8466 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 8652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8467 "Received FC from %s\n", GNUNET_i2s (&cmc->im.sender)); 8653 "Received FC from %s\n", GNUNET_i2s (&cmc->im.sender));
@@ -8469,13 +8655,22 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
8469 if (NULL == vl) 8655 if (NULL == vl)
8470 { 8656 {
8471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 8657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8472 "FC dropped: VL unknown\n"); 8658 "No virtual link for FC creating new unconfirmed virtual link to %s!\n",
8473 GNUNET_STATISTICS_update (GST_stats, 8659 GNUNET_i2s (&cmc->im.sender));
8474 "# FC dropped: Virtual link unknown", 8660 vl = GNUNET_new (struct VirtualLink);
8475 1, 8661 vl->confirmed = GNUNET_NO;
8476 GNUNET_NO); 8662 vl->message_uuid_ctr =
8477 finish_cmc_handling (cmc); 8663 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
8478 return; 8664 vl->target = cmc->im.sender;
8665 vl->core_recv_window = RECV_WINDOW_SIZE;
8666 vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
8667 vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
8668 GNUNET_break (GNUNET_YES ==
8669 GNUNET_CONTAINER_multipeermap_put (
8670 links,
8671 &vl->target,
8672 vl,
8673 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
8479 } 8674 }
8480 st = GNUNET_TIME_absolute_ntoh (fc->sender_time); 8675 st = GNUNET_TIME_absolute_ntoh (fc->sender_time);
8481 if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us) 8676 if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us)
@@ -8509,15 +8704,20 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
8509 (unsigned long long) vl->outbound_fc_window_size, 8704 (unsigned long long) vl->outbound_fc_window_size,
8510 (long long) vl->incoming_fc_window_size_loss); 8705 (long long) vl->incoming_fc_window_size_loss);
8511 wnd = GNUNET_ntohll (fc->outbound_window_size); 8706 wnd = GNUNET_ntohll (fc->outbound_window_size);
8512 if ((wnd < vl->incoming_fc_window_size) || 8707 random = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
8513 (vl->last_outbound_window_size_received != wnd) || 8708 UINT32_MAX);
8514 (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX) 8709 if ((GNUNET_YES == vl->confirmed) && ((wnd < vl->incoming_fc_window_size) ||
8515 % FC_NO_CHANGE_REPLY_PROBABILITY)) 8710 (vl->last_outbound_window_size_received
8711 != wnd) ||
8712 (0 == random
8713 % FC_NO_CHANGE_REPLY_PROBABILITY)))
8516 { 8714 {
8517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 8715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8518 "Consider re-sending our FC message, as clearly the other peer's idea of the window is not up-to-date (%llu vs %llu)\n", 8716 "Consider re-sending our FC message, as clearly the other peer's idea of the window is not up-to-date (%llu vs %llu) or %llu last received differs, or random reply %lu\n",
8519 (unsigned long long) wnd, 8717 (unsigned long long) wnd,
8520 (unsigned long long) vl->incoming_fc_window_size); 8718 (unsigned long long) vl->incoming_fc_window_size,
8719 (unsigned long long) vl->last_outbound_window_size_received,
8720 random % FC_NO_CHANGE_REPLY_PROBABILITY);
8521 consider_sending_fc (vl); 8721 consider_sending_fc (vl);
8522 } 8722 }
8523 if ((wnd == vl->incoming_fc_window_size) && 8723 if ((wnd == vl->incoming_fc_window_size) &&
@@ -8530,6 +8730,7 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
8530 (unsigned long long) wnd); 8730 (unsigned long long) wnd);
8531 GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task); 8731 GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
8532 vl->fc_retransmit_task = NULL; 8732 vl->fc_retransmit_task = NULL;
8733 vl->fc_retransmit_count = 0;
8533 } 8734 }
8534 vl->last_outbound_window_size_received = wnd; 8735 vl->last_outbound_window_size_received = wnd;
8535 /* FC window likely increased, check transmission possibilities! */ 8736 /* FC window likely increased, check transmission possibilities! */
@@ -8729,7 +8930,6 @@ fragment_message (struct Queue *queue,
8729 pm->bytes_msg, 8930 pm->bytes_msg,
8730 GNUNET_i2s (&pm->vl->target), 8931 GNUNET_i2s (&pm->vl->target),
8731 (unsigned int) mtu); 8932 (unsigned int) mtu);
8732 pa = prepare_pending_acknowledgement (queue, dvh, pm);
8733 8933
8734 /* This invariant is established in #handle_add_queue_message() */ 8934 /* This invariant is established in #handle_add_queue_message() */
8735 GNUNET_assert (mtu > sizeof(struct TransportFragmentBoxMessage)); 8935 GNUNET_assert (mtu > sizeof(struct TransportFragmentBoxMessage));
@@ -8745,7 +8945,7 @@ fragment_message (struct Queue *queue,
8745 ff = ff->head_frag; /* descent into fragmented fragments */ 8945 ff = ff->head_frag; /* descent into fragmented fragments */
8746 } 8946 }
8747 8947
8748 if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg)) 8948 if (((ff->bytes_msg > mtu) || (pm == ff)) && (ff->frag_off < ff->bytes_msg))
8749 { 8949 {
8750 /* Did not yet calculate all fragments, calculate next fragment */ 8950 /* Did not yet calculate all fragments, calculate next fragment */
8751 struct PendingMessage *frag; 8951 struct PendingMessage *frag;
@@ -8783,13 +8983,15 @@ fragment_message (struct Queue *queue,
8783 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); 8983 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
8784 tfb.header.size = 8984 tfb.header.size =
8785 htons (sizeof(struct TransportFragmentBoxMessage) + fragsize); 8985 htons (sizeof(struct TransportFragmentBoxMessage) + fragsize);
8986 pa = prepare_pending_acknowledgement (queue, dvh, frag);
8786 tfb.ack_uuid = pa->ack_uuid; 8987 tfb.ack_uuid = pa->ack_uuid;
8787 tfb.msg_uuid = pm->msg_uuid; 8988 tfb.msg_uuid = pm->msg_uuid;
8788 tfb.frag_off = htons (ff->frag_off + xoff); 8989 tfb.frag_off = htons (ff->frag_off + xoff);
8789 tfb.msg_size = htons (pm->bytes_msg); 8990 tfb.msg_size = htons (pm->bytes_msg);
8790 memcpy (msg, &tfb, sizeof(tfb)); 8991 memcpy (msg, &tfb, sizeof(tfb));
8791 memcpy (&msg[sizeof(tfb)], &orig[ff->frag_off], fragsize); 8992 memcpy (&msg[sizeof(tfb)], &orig[ff->frag_off], fragsize);
8792 GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, ff->tail_frag, frag); 8993 GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag,
8994 ff->tail_frag, frag);
8793 ff->frag_off += fragsize; 8995 ff->frag_off += fragsize;
8794 ff = frag; 8996 ff = frag;
8795 } 8997 }
@@ -8803,6 +9005,7 @@ fragment_message (struct Queue *queue,
8803 ff->frag_parent->head_frag, 9005 ff->frag_parent->head_frag,
8804 ff->frag_parent->tail_frag, 9006 ff->frag_parent->tail_frag,
8805 ff); 9007 ff);
9008
8806 return ff; 9009 return ff;
8807} 9010}
8808 9011
@@ -8829,7 +9032,7 @@ reliability_box_message (struct Queue *queue,
8829 struct PendingMessage *bpm; 9032 struct PendingMessage *bpm;
8830 char *msg; 9033 char *msg;
8831 9034
8832 if (PMT_CORE != pm->pmt) 9035 if ((PMT_CORE != pm->pmt) && (PMT_DV_BOX != pm->pmt))
8833 return pm; /* already fragmented or reliability boxed, or control message: 9036 return pm; /* already fragmented or reliability boxed, or control message:
8834 do nothing */ 9037 do nothing */
8835 if (NULL != pm->bpm) 9038 if (NULL != pm->bpm)
@@ -8842,11 +9045,7 @@ reliability_box_message (struct Queue *queue,
8842 client_send_response (pm); 9045 client_send_response (pm);
8843 return NULL; 9046 return NULL;
8844 } 9047 }
8845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 9048
8846 "Preparing reliability box for message <%llu> to %s on queue %s\n",
8847 pm->logging_uuid,
8848 GNUNET_i2s (&pm->vl->target),
8849 queue->address);
8850 pa = prepare_pending_acknowledgement (queue, dvh, pm); 9049 pa = prepare_pending_acknowledgement (queue, dvh, pm);
8851 9050
8852 bpm = GNUNET_malloc (sizeof(struct PendingMessage) + sizeof(rbox) 9051 bpm = GNUNET_malloc (sizeof(struct PendingMessage) + sizeof(rbox)
@@ -8869,6 +9068,13 @@ reliability_box_message (struct Queue *queue,
8869 memcpy (msg, &rbox, sizeof(rbox)); 9068 memcpy (msg, &rbox, sizeof(rbox));
8870 memcpy (&msg[sizeof(rbox)], &pm[1], pm->bytes_msg); 9069 memcpy (&msg[sizeof(rbox)], &pm[1], pm->bytes_msg);
8871 pm->bpm = bpm; 9070 pm->bpm = bpm;
9071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9072 "Preparing reliability box for message <%llu> of size %lu (%lu) to %s on queue %s\n",
9073 pm->logging_uuid,
9074 pm->bytes_msg,
9075 ntohs (((const struct GNUNET_MessageHeader *) &pm[1])->size),
9076 GNUNET_i2s (&pm->vl->target),
9077 queue->address);
8872 return bpm; 9078 return bpm;
8873} 9079}
8874 9080
@@ -8911,6 +9117,7 @@ update_pm_next_attempt (struct PendingMessage *pm,
8911{ 9117{
8912 struct VirtualLink *vl = pm->vl; 9118 struct VirtualLink *vl = pm->vl;
8913 9119
9120 // TODO Do we really need a next_attempt value for PendingMessage other than the root Pending Message?
8914 pm->next_attempt = next_attempt; 9121 pm->next_attempt = next_attempt;
8915 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 9122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8916 "Next attempt for message <%llu> set to %s\n", 9123 "Next attempt for message <%llu> set to %s\n",
@@ -8921,9 +9128,17 @@ update_pm_next_attempt (struct PendingMessage *pm,
8921 { 9128 {
8922 reorder_root_pm (pm, next_attempt); 9129 reorder_root_pm (pm, next_attempt);
8923 } 9130 }
8924 else if ((PMT_RELIABILITY_BOX == pm->pmt)||(PMT_DV_BOX == pm->pmt)) 9131 else if ((PMT_RELIABILITY_BOX == pm->pmt) || (PMT_DV_BOX == pm->pmt))
8925 { 9132 {
8926 reorder_root_pm (pm->frag_parent, next_attempt); 9133 struct PendingMessage *root = pm->frag_parent;
9134
9135 while (NULL != root->frag_parent)
9136 root = root->frag_parent;
9137 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9138 "Next attempt for root message <%llu> set to %s\n",
9139 root->logging_uuid);
9140 root->next_attempt = next_attempt;
9141 reorder_root_pm (root, next_attempt);
8927 } 9142 }
8928 else 9143 else
8929 { 9144 {
@@ -8988,6 +9203,16 @@ struct PendingMessageScoreContext
8988 * Did we have to reliability box? 9203 * Did we have to reliability box?
8989 */ 9204 */
8990 int relb; 9205 int relb;
9206
9207 /**
9208 * There are pending messages, but it was to early to send one of them.
9209 */
9210 int to_early;
9211
9212 /**
9213 * When will we try to transmit the message again for which it was to early to retry.
9214 */
9215 struct GNUNET_TIME_Relative to_early_retry_delay;
8991}; 9216};
8992 9217
8993 9218
@@ -9020,14 +9245,32 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
9020 int relb; 9245 int relb;
9021 9246
9022 if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt)) 9247 if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
9248 {
9249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9250 "DV messages must not be DV-routed to next hop!\n");
9023 continue; /* DV messages must not be DV-routed to next hop! */ 9251 continue; /* DV messages must not be DV-routed to next hop! */
9252 }
9024 if (pos->next_attempt.abs_value_us > now.abs_value_us) 9253 if (pos->next_attempt.abs_value_us > now.abs_value_us)
9254 {
9255 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9256 "too early for all messages, they are sorted by next_attempt\n");
9257 sc->to_early = GNUNET_YES;
9258
9025 break; /* too early for all messages, they are sorted by next_attempt */ 9259 break; /* too early for all messages, they are sorted by next_attempt */
9260 }
9261 sc->to_early = GNUNET_NO;
9026 if (NULL != pos->qe) 9262 if (NULL != pos->qe)
9263 {
9264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9265 "not eligible\n");
9027 continue; /* not eligible */ 9266 continue; /* not eligible */
9267 }
9028 sc->consideration_counter++; 9268 sc->consideration_counter++;
9029 /* determine if we have to fragment, if so add fragmentation 9269 /* determine if we have to fragment, if so add fragmentation
9030 overhead! */ 9270 overhead! */
9271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9272 "check %u for sc->best\n",
9273 pos->logging_uuid);
9031 frag = GNUNET_NO; 9274 frag = GNUNET_NO;
9032 if (((0 != queue->mtu) && 9275 if (((0 != queue->mtu) &&
9033 (pos->bytes_msg + real_overhead > queue->mtu)) || 9276 (pos->bytes_msg + real_overhead > queue->mtu)) ||
@@ -9038,6 +9281,10 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
9038 respect that even if MTU is UINT16_MAX for 9281 respect that even if MTU is UINT16_MAX for
9039 this queue */)) 9282 this queue */))
9040 { 9283 {
9284 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9285 "fragment msg with size %u, realoverhead is %u\n",
9286 pos->bytes_msg,
9287 real_overhead);
9041 frag = GNUNET_YES; 9288 frag = GNUNET_YES;
9042 if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) 9289 if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
9043 { 9290 {
@@ -9064,7 +9311,10 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
9064 { 9311 {
9065 relb = GNUNET_YES; 9312 relb = GNUNET_YES;
9066 } 9313 }
9067 9314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9315 "Create reliability box of msg with size %u, realoverhead is %u\n",
9316 pos->bytes_msg,
9317 real_overhead);
9068 } 9318 }
9069 9319
9070 /* Finally, compare to existing 'best' in sc to see if this 'pos' pending 9320 /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
@@ -9107,7 +9357,13 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
9107 pm_score -= queue->mtu - (real_overhead + pos->bytes_msg); 9357 pm_score -= queue->mtu - (real_overhead + pos->bytes_msg);
9108 } 9358 }
9109 if (sc_score + time_delta > pm_score) 9359 if (sc_score + time_delta > pm_score)
9360 {
9361 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9362 "sc_score of %u larger, keep sc->best %u\n",
9363 pos->logging_uuid,
9364 sc->best->logging_uuid);
9110 continue; /* sc_score larger, keep sc->best */ 9365 continue; /* sc_score larger, keep sc->best */
9366 }
9111 } 9367 }
9112 sc->best = pos; 9368 sc->best = pos;
9113 sc->dvh = dvh; 9369 sc->dvh = dvh;
@@ -9216,6 +9472,10 @@ transmit_on_queue (void *cls)
9216 "No pending messages, queue `%s' to %s now idle\n", 9472 "No pending messages, queue `%s' to %s now idle\n",
9217 queue->address, 9473 queue->address,
9218 GNUNET_i2s (&n->pid)); 9474 GNUNET_i2s (&n->pid));
9475 if (GNUNET_YES == sc.to_early)
9476 schedule_transmit_on_queue (sc.to_early_retry_delay,
9477 queue,
9478 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9219 queue->idle = GNUNET_YES; 9479 queue->idle = GNUNET_YES;
9220 return; 9480 return;
9221 } 9481 }
@@ -9226,9 +9486,17 @@ transmit_on_queue (void *cls)
9226 pm = sc.best; 9486 pm = sc.best;
9227 if (NULL != sc.dvh) 9487 if (NULL != sc.dvh)
9228 { 9488 {
9489 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9490 "Is this %u a DV box?\n",
9491 pm->pmt);
9229 GNUNET_assert (PMT_DV_BOX != pm->pmt); 9492 GNUNET_assert (PMT_DV_BOX != pm->pmt);
9230 if (NULL != sc.best->bpm) 9493 if (NULL != sc.best->bpm)
9231 { 9494 {
9495 const struct DVPathEntryP *hops_old;
9496 const struct DVPathEntryP *hops_selected;
9497
9498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9499 "Discard old box\n");
9232 /* We did this boxing before, but possibly for a different path! 9500 /* We did this boxing before, but possibly for a different path!
9233 Discard old DV box! OPTIMIZE-ME: we might want to check if 9501 Discard old DV box! OPTIMIZE-ME: we might want to check if
9234 it is the same and then not re-build the message... */ 9502 it is the same and then not re-build the message... */
@@ -9246,6 +9514,13 @@ transmit_on_queue (void *cls)
9246 RMO_NONE, 9514 RMO_NONE,
9247 GNUNET_NO); 9515 GNUNET_NO);
9248 GNUNET_assert (NULL != sc.best->bpm); 9516 GNUNET_assert (NULL != sc.best->bpm);
9517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9518 "%u %u %u %u %u\n",
9519 sizeof(struct GNUNET_PeerIdentity),
9520 sizeof(struct TransportDVBoxMessage),
9521 sizeof(struct TransportDVBoxPayloadP),
9522 sizeof(struct TransportFragmentBoxMessage),
9523 ((const struct GNUNET_MessageHeader *) &sc.best[1])->size);
9249 pm = sc.best->bpm; 9524 pm = sc.best->bpm;
9250 } 9525 }
9251 if (GNUNET_YES == sc.frag) 9526 if (GNUNET_YES == sc.frag)
@@ -9258,7 +9533,9 @@ transmit_on_queue (void *cls)
9258 queue->address, 9533 queue->address,
9259 GNUNET_i2s (&n->pid), 9534 GNUNET_i2s (&n->pid),
9260 sc.best->logging_uuid); 9535 sc.best->logging_uuid);
9261 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 9536 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
9537 queue,
9538 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9262 return; 9539 return;
9263 } 9540 }
9264 } 9541 }
@@ -9274,7 +9551,9 @@ transmit_on_queue (void *cls)
9274 queue->address, 9551 queue->address,
9275 GNUNET_i2s (&n->pid), 9552 GNUNET_i2s (&n->pid),
9276 sc.best->logging_uuid); 9553 sc.best->logging_uuid);
9277 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 9554 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
9555 queue,
9556 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9278 return; 9557 return;
9279 } 9558 }
9280 } 9559 }
@@ -9323,6 +9602,8 @@ transmit_on_queue (void *cls)
9323 } 9602 }
9324 else 9603 else
9325 { 9604 {
9605 struct GNUNET_TIME_Relative wait_duration;
9606
9326 /* Message not finished, waiting for acknowledgement. 9607 /* Message not finished, waiting for acknowledgement.
9327 Update time by which we might retransmit 's' based on queue 9608 Update time by which we might retransmit 's' based on queue
9328 characteristics (i.e. RTT); it takes one RTT for the message to 9609 characteristics (i.e. RTT); it takes one RTT for the message to
@@ -9333,18 +9614,33 @@ transmit_on_queue (void *cls)
9333 OPTIMIZE: Note that in the future this heuristic should likely 9614 OPTIMIZE: Note that in the future this heuristic should likely
9334 be improved further (measure RTT stability, consider message 9615 be improved further (measure RTT stability, consider message
9335 urgency and size when delaying ACKs, etc.) */ 9616 urgency and size when delaying ACKs, etc.) */
9617
9618 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us !=
9619 queue->pd.aged_rtt.rel_value_us)
9620 wait_duration = queue->pd.aged_rtt;
9621 else
9622 wait_duration = DEFAULT_ACK_WAIT_DURATION;
9623 struct GNUNET_TIME_Absolute next = GNUNET_TIME_relative_to_absolute (
9624 GNUNET_TIME_relative_multiply (
9625 wait_duration, 4));
9626 struct GNUNET_TIME_Relative plus = GNUNET_TIME_relative_multiply (
9627 wait_duration, 4);
9336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 9628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
9337 "Waiting %s for ACK\n", 9629 "Waiting %s (%llu) for ACK until %llu\n",
9338 GNUNET_STRINGS_relative_time_to_string ( 9630 GNUNET_STRINGS_relative_time_to_string (
9339 GNUNET_TIME_relative_multiply ( 9631 GNUNET_TIME_relative_multiply (
9340 queue->pd.aged_rtt, 4), GNUNET_NO)); 9632 queue->pd.aged_rtt, 4), GNUNET_NO),
9633 plus,
9634 next);
9341 update_pm_next_attempt (pm, 9635 update_pm_next_attempt (pm,
9342 GNUNET_TIME_relative_to_absolute ( 9636 GNUNET_TIME_relative_to_absolute (
9343 GNUNET_TIME_relative_multiply (queue->pd.aged_rtt, 9637 GNUNET_TIME_relative_multiply (wait_duration,
9344 4))); 9638 4)));
9345 } 9639 }
9346 /* finally, re-schedule queue transmission task itself */ 9640 /* finally, re-schedule queue transmission task itself */
9347 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 9641 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
9642 queue,
9643 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9348} 9644}
9349 9645
9350 9646
@@ -9476,7 +9772,9 @@ handle_send_message_ack (void *cls,
9476 NULL != queue; 9772 NULL != queue;
9477 queue = queue->next_client) 9773 queue = queue->next_client)
9478 { 9774 {
9479 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 9775 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
9776 queue,
9777 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9480 } 9778 }
9481 } 9779 }
9482 else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) 9780 else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
@@ -9486,7 +9784,9 @@ handle_send_message_ack (void *cls,
9486 "# Transmission throttled due to queue queue limit", 9784 "# Transmission throttled due to queue queue limit",
9487 -1, 9785 -1,
9488 GNUNET_NO); 9786 GNUNET_NO);
9489 schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 9787 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
9788 qe->queue,
9789 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9490 } 9790 }
9491 else if (1 == qe->queue->q_capacity) 9791 else if (1 == qe->queue->q_capacity)
9492 { 9792 {
@@ -9500,7 +9800,9 @@ handle_send_message_ack (void *cls,
9500 "# Transmission throttled due to message queue capacity", 9800 "# Transmission throttled due to message queue capacity",
9501 -1, 9801 -1,
9502 GNUNET_NO); 9802 GNUNET_NO);
9503 schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 9803 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
9804 qe->queue,
9805 GNUNET_SCHEDULER_PRIORITY_DEFAULT);
9504 } 9806 }
9505 9807
9506 if (NULL != (pm = qe->pm)) 9808 if (NULL != (pm = qe->pm))
@@ -10090,7 +10392,8 @@ handle_add_queue_message (void *cls,
10090 &check_validation_request_pending, 10392 &check_validation_request_pending,
10091 queue); 10393 queue);
10092 /* look for traffic for this queue */ 10394 /* look for traffic for this queue */
10093 schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); 10395 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
10396 queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
10094 /* might be our first queue, try launching DV learning */ 10397 /* might be our first queue, try launching DV learning */
10095 if (NULL == dvlearn_task) 10398 if (NULL == dvlearn_task)
10096 dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL); 10399 dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);
diff --git a/src/transport/test_transport_distance_vector_circle_topo.conf b/src/transport/test_transport_distance_vector_circle_topo.conf
index ce26bf923..210179291 100644
--- a/src/transport/test_transport_distance_vector_circle_topo.conf
+++ b/src/transport/test_transport_distance_vector_circle_topo.conf
@@ -2,7 +2,7 @@ M:1
2N:3 2N:3
3X:0 3X:0
4AC:1 4AC:1
5B:1 5B:0
6T:libgnunet_test_transport_plugin_cmd_simple_send_dv 6T:libgnunet_test_transport_plugin_cmd_simple_send_dv
7R:1|{tcp_port:0}|{udp_port:1} 7R:1|{tcp_port:0}|{udp_port:1}
8R:2|{tcp_port:0}|{udp_port:1} 8R:2|{tcp_port:0}|{udp_port:1}
diff --git a/src/transport/test_transport_plugin_cmd_simple_send_dv.c b/src/transport/test_transport_plugin_cmd_simple_send_dv.c
index 167120e2b..f1f168102 100644
--- a/src/transport/test_transport_plugin_cmd_simple_send_dv.c
+++ b/src/transport/test_transport_plugin_cmd_simple_send_dv.c
@@ -65,12 +65,12 @@ struct TestState
65 */ 65 */
66 struct GNUNET_TESTING_NetjailTopology *topology; 66 struct GNUNET_TESTING_NetjailTopology *topology;
67 67
68 /** 68};
69
70/**
69 * The number of messages received. 71 * The number of messages received.
70 */ 72 */
71 unsigned int number_received; 73static unsigned int number_received;
72
73};
74 74
75static struct GNUNET_TESTING_Command block_send; 75static struct GNUNET_TESTING_Command block_send;
76 76
@@ -105,52 +105,61 @@ static void
105handle_test (void *cls, 105handle_test (void *cls,
106 const struct GNUNET_TRANSPORT_TESTING_TestMessage *message) 106 const struct GNUNET_TRANSPORT_TESTING_TestMessage *message)
107{ 107{
108 struct TestState *ts = cls; 108 struct GNUNET_PeerIdentity *peer = cls;
109 const struct GNUNET_TESTING_AsyncContext *ac_block; 109 const struct GNUNET_TESTING_AsyncContext *ac_block;
110 const struct GNUNET_TESTING_AsyncContext *ac_start; 110 const struct GNUNET_TESTING_AsyncContext *ac_start;
111 const struct GNUNET_TESTING_Command *cmd; 111 const struct GNUNET_TESTING_Command *cmd;
112 const struct GNUNET_CONTAINER_MultiShortmap *connected_peers_map; 112 const struct GNUNET_CONTAINER_MultiShortmap *connected_peers_map;
113 unsigned int connected; 113 unsigned int connected;
114 struct BlockState *bs; 114 struct BlockState *bs;
115 struct GNUNET_TRANSPORT_CoreHandle *ch;
116 const struct StartPeerState *sps;
115 117
116 118
117 119 GNUNET_TRANSPORT_get_trait_state (&start_peer,
120 &sps);
121 ch = sps->th;
118 GNUNET_TRANSPORT_get_trait_connected_peers_map (&start_peer, 122 GNUNET_TRANSPORT_get_trait_connected_peers_map (&start_peer,
119 &connected_peers_map); 123 &connected_peers_map);
120 124
121 connected = GNUNET_CONTAINER_multishortmap_size ( 125 if (NULL != connected_peers_map)
122 connected_peers_map); 126 {
127 connected = GNUNET_CONTAINER_multishortmap_size (
128 connected_peers_map);
123 129
124 ts->number_received++; 130 number_received++;
125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 131 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
126 "Received %u test message(s) from %u connected peer(s)\n", 132 "Received %u test message(s) from %s, %u connected peer(s)\n",
127 ts->number_received, 133 number_received,
128 connected); 134 GNUNET_i2s (peer),
135 connected);
129 136
130 GNUNET_TESTING_get_trait_async_context (&block_receive, 137 GNUNET_TESTING_get_trait_async_context (&block_receive,
131 &ac_block); 138 &ac_block);
132 139
133 if ( connected == ts->number_received) 140 if ( connected == number_received)
134 {
135 if (NULL != ac_block->is)
136 { 141 {
137 GNUNET_assert (NULL != ac_block); 142 if (NULL != ac_block->is)
138 if (NULL == ac_block->cont) 143 {
139 GNUNET_TESTING_async_fail ((struct 144 GNUNET_assert (NULL != ac_block);
140 GNUNET_TESTING_AsyncContext *) ac_block); 145 if (NULL == ac_block->cont)
141 else 146 GNUNET_TESTING_async_fail ((struct
142 GNUNET_TESTING_async_finish ((struct
143 GNUNET_TESTING_AsyncContext *) ac_block); 147 GNUNET_TESTING_AsyncContext *) ac_block);
144 } 148 else
145 else 149 GNUNET_TESTING_async_finish ((struct
146 { 150 GNUNET_TESTING_AsyncContext *) ac_block);
147 GNUNET_TESTING_get_trait_block_state ( 151 }
148 &block_receive, 152 else
149 (const struct BlockState **) &bs); 153 {
150 bs->asynchronous_finish = GNUNET_YES; 154 GNUNET_TESTING_get_trait_block_state (
151 } 155 &block_receive,
156 (const struct BlockState **) &bs);
157 bs->asynchronous_finish = GNUNET_YES;
158 }
152 159
160 }
153 } 161 }
162 GNUNET_TRANSPORT_core_receive_continue (ch, peer);
154} 163}
155 164
156 165
@@ -284,10 +293,6 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, char *router_ip,
284 GNUNET_MQ_handler_end () 293 GNUNET_MQ_handler_end ()
285 }; 294 };
286 295
287 LOG (GNUNET_ERROR_TYPE_DEBUG,
288 "number_received %u\n",
289 ts->number_received);
290
291 if (GNUNET_YES == *read_file) 296 if (GNUNET_YES == *read_file)
292 { 297 {
293 LOG (GNUNET_ERROR_TYPE_DEBUG, 298 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -347,7 +352,7 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, char *router_ip,
347 handlers, 352 handlers,
348 ts->cfgname, 353 ts->cfgname,
349 notify_connect, 354 notify_connect,
350 GNUNET_YES); 355 GNUNET_NO);
351 struct GNUNET_TESTING_Command commands[] = { 356 struct GNUNET_TESTING_Command commands[] = {
352 GNUNET_TESTING_cmd_system_create ("system-create", 357 GNUNET_TESTING_cmd_system_create ("system-create",
353 ts->testdir), 358 ts->testdir),
diff --git a/src/transport/transport_api_cmd_start_peer.c b/src/transport/transport_api_cmd_start_peer.c
index e305e24e1..7448eff5a 100644
--- a/src/transport/transport_api_cmd_start_peer.c
+++ b/src/transport/transport_api_cmd_start_peer.c
@@ -124,7 +124,7 @@ notify_connect (void *cls,
124 struct GNUNET_HashCode hc; 124 struct GNUNET_HashCode hc;
125 struct GNUNET_CRYPTO_EddsaPublicKey public_key = peer->public_key; 125 struct GNUNET_CRYPTO_EddsaPublicKey public_key = peer->public_key;
126 126
127 void *ret = sps->handlers; 127 void *ret = (struct GNUNET_PeerIdentity *) peer;
128 128
129 LOG (GNUNET_ERROR_TYPE_DEBUG, 129 LOG (GNUNET_ERROR_TYPE_DEBUG,
130 "This Peer %s \n", 130 "This Peer %s \n",
diff --git a/src/util/mq.c b/src/util/mq.c
index b09837459..40ac97bbe 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -273,7 +273,7 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
273 break; 273 break;
274 } 274 }
275 } 275 }
276done: 276 done:
277 if (GNUNET_NO == handled) 277 if (GNUNET_NO == handled)
278 { 278 {
279 LOG (GNUNET_ERROR_TYPE_INFO, 279 LOG (GNUNET_ERROR_TYPE_INFO,
@@ -384,8 +384,9 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
384 mq->current_envelope = ev; 384 mq->current_envelope = ev;
385 385
386 LOG (GNUNET_ERROR_TYPE_DEBUG, 386 LOG (GNUNET_ERROR_TYPE_DEBUG,
387 "sending message of type %u, queue empty (MQ: %p)\n", 387 "sending message of type %u and size %u, queue empty (MQ: %p)\n",
388 ntohs (ev->mh->type), 388 ntohs (ev->mh->type),
389 ntohs (ev->mh->size),
389 mq); 390 mq);
390 391
391 mq->send_impl (mq, 392 mq->send_impl (mq,
@@ -479,8 +480,10 @@ impl_send_continue (void *cls)
479 mq->current_envelope); 480 mq->current_envelope);
480 481
481 LOG (GNUNET_ERROR_TYPE_DEBUG, 482 LOG (GNUNET_ERROR_TYPE_DEBUG,
482 "sending message of type %u from queue\n", 483 "sending message of type %u and size %u from queue (MQ: %p)\n",
483 ntohs (mq->current_envelope->mh->type)); 484 ntohs (mq->current_envelope->mh->type),
485 ntohs (mq->current_envelope->mh->size),
486 mq);
484 487
485 mq->send_impl (mq, 488 mq->send_impl (mq,
486 mq->current_envelope->mh, 489 mq->current_envelope->mh,