summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcontrib/netjail/netjail_core.sh5
-rwxr-xr-xcontrib/netjail/netjail_start.sh4
-rw-r--r--src/transport/Makefile.am2
-rw-r--r--src/transport/gnunet-communicator-udp.c29
-rw-r--r--src/transport/gnunet-service-tng.c699
-rw-r--r--src/transport/test_transport_distance_vector_circle_topo.conf2
-rw-r--r--src/transport/test_transport_plugin_cmd_simple_send_dv.c79
-rw-r--r--src/transport/transport_api_cmd_start_peer.c2
-rw-r--r--src/util/mq.c11
9 files changed, 581 insertions, 252 deletions
diff --git a/contrib/netjail/netjail_core.sh b/contrib/netjail/netjail_core.sh
index ed363cf35..da784fa5e 100755
--- a/contrib/netjail/netjail_core.sh
+++ b/contrib/netjail/netjail_core.sh
@@ -188,7 +188,10 @@ netjail_node_add_nat() {
local ADDRESS=$2
local MASK=$3
- ip netns exec $NODE iptables -t nat -A POSTROUTING -s "$ADDRESS/$MASK" -j MASQUERADE
+ ip netns exec $NODE nft add table nat
+ ip netns exec $NODE nft add chain nat postrouting { type nat hook postrouting priority 0 \; }
+ ip netns exec $NODE nft add rule ip nat postrouting ip saddr "$ADDRESS/$MASK" counter masquerade
+ # ip netns exec $NODE iptables -t nat -A POSTROUTING -s "$ADDRESS/$MASK" -j MASQUERADE
}
netjail_node_add_default() {
diff --git a/contrib/netjail/netjail_start.sh b/contrib/netjail/netjail_start.sh
index f7c417c27..e2d5fd634 100755
--- a/contrib/netjail/netjail_start.sh
+++ b/contrib/netjail/netjail_start.sh
@@ -77,11 +77,15 @@ for N in $(seq $GLOBAL_N); do
if [ "1" == "${R_TCP[$N]}" ]
then
+ #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr $GLOBAL_GROUP.$N tcp dport 60002 counter dnat to $LOCAL_GROUP.1
+ #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr $LOCAL_GROUP.1 ct state new,related,established counter accept
ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p tcp -d $GLOBAL_GROUP.$N --dport 60002 -j DNAT --to $LOCAL_GROUP.1
ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT
fi
if [ "1" == "${R_UDP[$N]}" ]
then
+ #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr $GLOBAL_GROUP.$N udp dport $PORT counter dnat to $LOCAL_GROUP.1
+ #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr $LOCAL_GROUP.1 ct state new,related,established counter accept
ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p udp -d $GLOBAL_GROUP.$N --dport $PORT -j DNAT --to $LOCAL_GROUP.1
ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT
fi
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index 5d6fb8421..ec00bc917 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -778,7 +778,7 @@ check_SCRIPTS= \
test_transport_simple_send.sh \
test_transport_simple_send_broadcast.sh \
test_transport_udp_backchannel.sh \
- # test_transport_simple_send_dv_circle.sh
+ test_transport_simple_send_dv_circle.sh
# test_transport_simple_send_dv_inverse.sh
test_transport_start_with_config_SOURCES = \
diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c
index b6edff485..70848ff79 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -1273,7 +1273,7 @@ pass_plaintext_to_core (struct SenderAddress *sender,
const struct GNUNET_MessageHeader *hdr = plaintext;
const char *pos = plaintext;
- while (ntohs (hdr->size) < plaintext_len)
+ while (ntohs (hdr->size) <= plaintext_len)
{
GNUNET_STATISTICS_update (stats,
"# bytes given to core",
@@ -1722,6 +1722,12 @@ try_handle_plaintext (struct SenderAddress *sender,
const struct UDPAck *ack = (const struct UDPAck *) buf;
uint16_t type;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "try_handle_plaintext of size %u (%u %u) and type %u\n",
+ buf_size,
+ ntohs (hdr->size),
+ sizeof(*hdr),
+ ntohs (hdr->type));
if (sizeof(*hdr) > buf_size)
return; /* not even a header */
if (ntohs (hdr->size) > buf_size)
@@ -2202,7 +2208,8 @@ verify_confirmation (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
{
struct UdpHandshakeSignature uhs;
- uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
+ uhs.purpose.purpose = htonl (
+ GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
uhs.purpose.size = htonl (sizeof(uhs));
uhs.sender = uc->sender;
uhs.receiver = my_identity;
@@ -2350,7 +2357,8 @@ sock_read (void *cls)
"received UDPBroadcast from %s\n",
GNUNET_a2s ((const struct sockaddr *) addr_verify, salen));
ub = (const struct UDPBroadcast *) buf;
- uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
+ uhs.purpose.purpose = htonl (
+ GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
uhs.purpose.size = htonl (sizeof(uhs));
uhs.sender = ub->sender;
sender = ub->sender;
@@ -2366,10 +2374,11 @@ sock_read (void *cls)
GNUNET_i2s (&sender));
GNUNET_CRYPTO_hash ((struct sockaddr *) addr_verify, salen, &uhs.h_address);
if (GNUNET_OK ==
- GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST,
- &uhs,
- &ub->sender_sig,
- &ub->sender.public_key))
+ GNUNET_CRYPTO_eddsa_verify (
+ GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST,
+ &uhs,
+ &ub->sender_sig,
+ &ub->sender.public_key))
{
char *addr_s;
enum GNUNET_NetworkType nt;
@@ -2699,7 +2708,8 @@ mq_send_kx (struct GNUNET_MQ_Handle *mq,
uc.sender = my_identity;
uc.monotonic_time =
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
- uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
+ uhs.purpose.purpose = htonl (
+ GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
uhs.purpose.size = htonl (sizeof(uhs));
uhs.sender = my_identity;
uhs.receiver = receiver->target;
@@ -3644,7 +3654,8 @@ iface_proc (void *cls,
bi->salen = addrlen;
bi->found = GNUNET_YES;
bi->bcm.sender = my_identity;
- ubs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
+ ubs.purpose.purpose = htonl (
+ GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
ubs.purpose.size = htonl (sizeof(ubs));
ubs.sender = my_identity;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 56a854a70..0427bd229 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -82,6 +82,11 @@
#include "transport.h"
/**
+ * Maximum number of FC retransmissions for a runing retransmission task.
+ */
+#define MAX_FC_RETRANSMIT_COUNT 1000
+
+/**
* Maximum number of messages we acknowledge together in one
* cumulative ACK. Larger values may save a bit of bandwidth.
*/
@@ -186,6 +191,12 @@
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
/**
+ * Default value for how long we wait for reliability ack.
+ */
+#define DEFAULT_ACK_WAIT_DURATION \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
+
+/**
* We only consider queues as "quality" connections when
* suppressing the generation of DV initiation messages if
* the latency of the queue is below this threshold.
@@ -781,6 +792,16 @@ struct TransportDVBoxMessage
*/
struct GNUNET_HashCode hmac;
+ /**
+ * Size this msg had initially. This is needed to calculate the hmac at the target.
+ * The header size can not be used for that, because the box size is getting smaller at each hop.
+ */
+ /**
+ * The length of the struct (in bytes, including the length field itself),
+ * in big-endian format.
+ */
+ uint16_t orig_size GNUNET_PACKED;
+
/* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
excluding the @e origin and the current peer, the last must be
the ultimate target; if @e num_hops is zero, the receiver of this
@@ -1342,6 +1363,17 @@ struct VirtualLink
struct GNUNET_SCHEDULER_Task *fc_retransmit_task;
/**
+ * Number of FC retransmissions for this running task.
+ */
+ unsigned int fc_retransmit_count;
+
+ /**
+ * Is this VirtualLink confirmed.
+ * A unconfirmed VirtualLink might exist, if we got a FC from that target.
+ */
+ unsigned int confirmed;
+
+ /**
* Neighbour used by this virtual link, NULL if @e dv is used.
*/
struct Neighbour *n;
@@ -2845,8 +2877,8 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa)
struct PendingMessage *pm = pa->pm;
struct DistanceVectorHop *dvh = pa->dvh;
- GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
- pa_count--;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "free_pending_acknowledgement\n");
if (NULL != q)
{
GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
@@ -2854,6 +2886,17 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa)
}
if (NULL != pm)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "remove pa from message\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "remove pa from message %llu\n",
+ pm->logging_uuid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "remove pa from message %u\n",
+ pm->pmt);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "remove pa from message %s\n",
+ GNUNET_uuid2s (&pa->ack_uuid.value));
GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
pa->pm = NULL;
}
@@ -2920,8 +2963,11 @@ free_pending_message (struct PendingMessage *pm)
tc->details.core.pending_msg_tail,
pm);
}
- if (NULL != vl)
+ if ((NULL != vl) && (NULL == pm->frag_parent))
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing pm %lu\n",
+ pm->logging_uuid);
GNUNET_CONTAINER_MDLL_remove (vl,
vl->pending_msg_head,
vl->pending_msg_tail,
@@ -2929,6 +2975,18 @@ free_pending_message (struct PendingMessage *pm)
}
while (NULL != (pa = pm->pa_head))
{
+ if (NULL == pa)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "free pending pa null\n");
+ if (NULL == pm->pa_tail)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "free pending pa_tail null\n");
+ if (NULL == pa->prev_pa)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "free pending pa prev null\n");
+ if (NULL == pa->next_pa)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "free pending pa next null\n");
GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
pa->pm = NULL;
}
@@ -2944,6 +3002,9 @@ free_pending_message (struct PendingMessage *pm)
free_fragment_tree (pm->bpm);
GNUNET_free (pm->bpm);
}
+ if (NULL == pm)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "free pending pm null\n");
GNUNET_free (pm);
}
@@ -3028,6 +3089,9 @@ free_virtual_link (struct VirtualLink *vl)
struct PendingMessage *pm;
struct CoreSentContext *csc;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "free virtual link\n");
+
if (NULL != vl->reassembly_map)
{
GNUNET_CONTAINER_multihashmap32_iterate (vl->reassembly_map,
@@ -3084,6 +3148,8 @@ free_validation_state (struct ValidationState *vs)
vs->hn = NULL;
if (NULL != vs->sc)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel\n");
GNUNET_PEERSTORE_store_cancel (vs->sc);
vs->sc = NULL;
}
@@ -3392,6 +3458,8 @@ free_neighbour (struct Neighbour *neighbour)
}
if (NULL != neighbour->sc)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel\n");
GNUNET_PEERSTORE_store_cancel (neighbour->sc);
neighbour->sc = NULL;
}
@@ -3503,7 +3571,8 @@ check_for_queue_with_higher_prio (struct Queue *queue, struct Queue *queue_head)
* @param p task priority to use, if @a queue is scheduled
*/
static void
-schedule_transmit_on_queue (struct Queue *queue,
+schedule_transmit_on_queue (struct GNUNET_TIME_Relative delay,
+ struct Queue *queue,
enum GNUNET_SCHEDULER_Priority p)
{
if (check_for_queue_with_higher_prio (queue,
@@ -3552,7 +3621,8 @@ schedule_transmit_on_queue (struct Queue *queue,
if (NULL != queue->transmit_task)
GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task =
- GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
+ GNUNET_SCHEDULER_add_delayed_with_priority (delay, p, &transmit_on_queue,
+ queue);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Considering transmission on queue `%s' to %s\n",
queue->address,
@@ -3677,13 +3747,15 @@ free_queue (struct Queue *queue)
GNUNET_NO);
for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
s = s->next_client)
- schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ s,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
GNUNET_free (queue);
vl = lookup_virtual_link (&neighbour->pid);
- if ((NULL != vl) && (neighbour == vl->n))
+ if ((NULL != vl) && (GNUNET_YES == vl->confirmed) && (neighbour == vl->n))
{
GNUNET_SCHEDULER_cancel (vl->visibility_task);
check_link_down (vl);
@@ -3710,6 +3782,8 @@ free_address_list_entry (struct AddressListEntry *ale)
ale);
if (NULL != ale->sc)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel\n");
GNUNET_PEERSTORE_store_cancel (ale->sc);
ale->sc = NULL;
}
@@ -3954,6 +4028,8 @@ client_send_response (struct PendingMessage *pm)
struct TransportClient *tc = pm->client;
struct VirtualLink *vl = pm->vl;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "client send response\n");
if (NULL != tc)
{
struct GNUNET_MQ_Envelope *env;
@@ -4127,7 +4203,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
return;
}
vl = lookup_virtual_link (&rom->peer);
- if (NULL == vl)
+ if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
{
GNUNET_STATISTICS_update (GST_stats,
"# RECV_OK dropped: virtual link unknown",
@@ -4323,6 +4399,12 @@ queue_send_msg (struct Queue *queue,
queue->idle = GNUNET_NO;
if (0 == queue->q_capacity)
queue->idle = GNUNET_NO;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message of type %u (%u) and size %u with MQ %p\n",
+ ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
+ ntohs (smt->header.size),
+ payload_size,
+ queue->tc->mq);
GNUNET_MQ_send (queue->tc->mq, env);
}
}
@@ -4684,6 +4766,7 @@ encapsulate_for_dv (struct DistanceVector *dv,
struct GNUNET_PeerIdentity *dhops;
box_hdr.header.size = htons (sizeof(buf));
+ box_hdr.orig_size = htons (sizeof(buf));
box_hdr.num_hops = htons (num_hops);
memcpy (buf, &box_hdr, sizeof(box_hdr));
dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
@@ -4741,7 +4824,7 @@ send_dv_to_neighbour (void *cls,
enum RouteMessageOptions options)
{
(void) cls;
- (void) route_via_neighbour (next_hop, hdr, options);
+ (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
}
@@ -4776,7 +4859,7 @@ route_control_message_without_fc (struct VirtualLink *vl,
// TODO Do this elsewhere. vl should be given as parameter to method.
// vl = lookup_virtual_link (target);
- GNUNET_assert (NULL != vl);
+ GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
if (NULL == vl)
return GNUNET_TIME_UNIT_FOREVER_REL;
n = vl->n;
@@ -4916,7 +4999,7 @@ consider_sending_fc (void *cls)
fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
- rtt = route_control_message_without_fc (vl, &fc.header, RMO_NONE);
+ rtt = route_control_message_without_fc (vl, &fc.header, RMO_DV_ALLOWED);
if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
{
rtt = GNUNET_TIME_UNIT_SECONDS;
@@ -4933,8 +5016,14 @@ consider_sending_fc (void *cls)
}
if (NULL != vl->fc_retransmit_task)
GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+ if (MAX_FC_RETRANSMIT_COUNT == vl->fc_retransmit_count)
+ {
+ rtt = GNUNET_TIME_UNIT_MINUTES;
+ vl->fc_retransmit_count = 0;
+ }
vl->fc_retransmit_task =
GNUNET_SCHEDULER_add_delayed (rtt, &task_consider_sending_fc, vl);
+ vl->fc_retransmit_count++;
}
@@ -4960,14 +5049,20 @@ check_vl_transmission (struct VirtualLink *vl)
struct Neighbour *n = vl->n;
struct DistanceVector *dv = vl->dv;
struct GNUNET_TIME_Absolute now;
+ struct VirtualLink *vl_next_hop;
int elig;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "check_vl_transmission to target %s\n",
+ GNUNET_i2s (&vl->target));
/* Check that we have an eligible pending message!
(cheaper than having #transmit_on_queue() find out!) */
elig = GNUNET_NO;
for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
pm = pm->next_vl)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "check_vl_transmission loop\n");
if (NULL != pm->qe)
continue; /* not eligible, is in a queue! */
if (pm->bytes_msg + vl->outbound_fc_window_size_used >
@@ -4983,62 +5078,96 @@ check_vl_transmission (struct VirtualLink *vl)
consider_sending_fc (vl);
return; /* We have a message, but flow control says "nope" */
}
- elig = GNUNET_YES;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Eligible message %lu of size %llu to %s: %llu/%llu\n",
- pm->logging_uuid,
- pm->bytes_msg,
- GNUNET_i2s (&vl->target),
- (unsigned long long) vl->outbound_fc_window_size,
- (unsigned long long) (pm->bytes_msg
- + vl->outbound_fc_window_size_used));
- break;
- }
- if (GNUNET_NO == elig)
- return;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not stalled. Scheduling transmission on queue\n");
- /* Notify queues at direct neighbours that we are interested */
- now = GNUNET_TIME_absolute_get ();
- if (NULL != n)
- {
- for (struct Queue *queue = n->queue_head; NULL != queue;
- queue = queue->next_neighbour)
- {
- if ((GNUNET_YES == queue->idle) &&
- (queue->validated_until.abs_value_us > now.abs_value_us))
- schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Neighbour Queue QID: %u (%u) busy or invalid\n",
- queue->qid,
- queue->idle);
- }
- }
- /* Notify queues via DV that we are interested */
- if (NULL != dv)
- {
- /* Do DV with lower scheduler priority, which effectively means that
- IF a neighbour exists and is available, we prefer it. */
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
+ "Target window on VL %s not stalled. Scheduling transmission on queue\n",
+ GNUNET_i2s (&vl->target));
+ /* Notify queues at direct neighbours that we are interested */
+ now = GNUNET_TIME_absolute_get ();
+ if (NULL != n)
{
- struct Neighbour *nh = pos->next_hop;
-
- if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
- continue; /* skip this one: path not validated */
- for (struct Queue *queue = nh->queue_head; NULL != queue;
+ for (struct Queue *queue = n->queue_head; NULL != queue;
queue = queue->next_neighbour)
+ {
if ((GNUNET_YES == queue->idle) &&
(queue->validated_until.abs_value_us > now.abs_value_us))
- schedule_transmit_on_queue (queue,
- GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Direct neighbour %s not stalled\n",
+ GNUNET_i2s (&n->pid));
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ elig = GNUNET_YES;
+ }
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "DV Queue QID: %u (%u) busy or invalid\n",
+ "Neighbour Queue QID: %u (%u) busy or invalid\n",
queue->qid,
queue->idle);
+ }
}
+ /* Notify queues via DV that we are interested */
+ if (NULL != dv)
+ {
+ /* Do DV with lower scheduler priority, which effectively means that
+ IF a neighbour exists and is available, we prefer it. */
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ struct Neighbour *nh = pos->next_hop;
+
+
+ if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+ continue; /* skip this one: path not validated */
+ else
+ {
+ vl_next_hop = lookup_virtual_link (&nh->pid);
+ if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
+ vl_next_hop->outbound_fc_window_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Stalled message %lu transmission on next hop %s due to flow control: %llu < %llu\n",
+ pm->logging_uuid,
+ GNUNET_i2s (&vl_next_hop->target),
+ (unsigned long
+ long) vl_next_hop->outbound_fc_window_size,
+ (unsigned long long) (pm->bytes_msg
+ + vl_next_hop->
+ outbound_fc_window_size_used));
+ consider_sending_fc (vl_next_hop);
+ continue; /* We have a message, but flow control says "nope" for the first hop of this path */
+ }
+ for (struct Queue *queue = nh->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ if ((GNUNET_YES == queue->idle) &&
+ (queue->validated_until.abs_value_us > now.abs_value_us))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Next hop neighbour %s not stalled\n",
+ GNUNET_i2s (&nh->pid));
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ queue,
+ GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+ elig = GNUNET_YES;
+ }
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "DV Queue QID: %u (%u) busy or invalid\n",
+ queue->qid,
+ queue->idle);
+ }
+ }
+ }
+ if (GNUNET_YES == elig)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Eligible message %lu of size %llu to %s: %llu/%llu\n",
+ pm->logging_uuid,
+ pm->bytes_msg,
+ GNUNET_i2s (&vl->target),
+ (unsigned long long) vl->outbound_fc_window_size,
+ (unsigned long long) (pm->bytes_msg
+ + vl->outbound_fc_window_size_used));
+ break;
}
}
@@ -5064,7 +5193,7 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
bytes_msg = ntohs (obmm->size);
pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
vl = lookup_virtual_link (&obm->peer);
- if (NULL == vl)
+ if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Don't have %s as a neighbour (anymore).\n",
@@ -5154,7 +5283,7 @@ handle_communicator_backchannel (
strlen (is) + 1);
// route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
vl = lookup_virtual_link (&cb->pid);
- if (NULL != vl)
+ if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
{
route_control_message_without_fc (vl, &be->header, RMO_DV_ALLOWED);
}
@@ -5428,7 +5557,7 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
return;
}
vl = lookup_virtual_link (&cmc->im.sender);
- if (NULL == vl)
+ if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
{
/* FIXME: sender is giving us messages for CORE but we don't have
the link up yet! I *suspect* this can happen right now (i.e.
@@ -5624,7 +5753,7 @@ transmit_cummulative_ack_cb (void *cls)
&ack->header,
RMO_DV_ALLOWED);*/
vl = lookup_virtual_link (&ac->target);
- if (NULL != vl)
+ if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
{
route_control_message_without_fc (
vl,
@@ -5765,7 +5894,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
struct FindByMessageUuidContext fc;
vl = lookup_virtual_link (&cmc->im.sender);
- if (NULL == vl)
+ if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
{
struct GNUNET_SERVICE_Client *client = cmc->tc->client;
@@ -5794,6 +5923,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
fb->msg_uuid.uuid,
&find_by_message_uuid,
&fc);
+ fsize = ntohs (fb->header.size) - sizeof(*fb);
if (NULL == (rc = fc.rc))
{
rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
@@ -5815,11 +5945,16 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
target = (char *) &rc[1];
rc->bitfield = (uint8_t *) (target + rc->msg_size);
- rc->msg_missing = rc->msg_size;
+ if (fsize != rc->msg_size)
+ rc->msg_missing = rc->msg_size;
+ else
+ rc->msg_missing = 0;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received fragment at offset %u/%u from %s for NEW message %u\n",
+ "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %u\n",
+ fsize,
ntohs (fb->frag_off),
msize,
+ rc->msg_missing,
GNUNET_i2s (&cmc->im.sender),
(unsigned int) fb->msg_uuid.uuid);
}
@@ -5841,7 +5976,6 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb)
}
/* reassemble */
- fsize = ntohs (fb->header.size) - sizeof(*fb);
if (0 == fsize)
{
GNUNET_break (0);
@@ -5918,6 +6052,16 @@ check_reliability_box (void *cls,
const struct TransportReliabilityBoxMessage *rb)
{
(void) cls;
+ const struct GNUNET_MessageHeader *inbox = (const struct
+ GNUNET_MessageHeader *) &rb[1];
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "check_send_msg with size %u: inner msg type %u and size %u (%u %u)\n",
+ ntohs (rb->header.size),
+ ntohs (inbox->type),
+ ntohs (inbox->size),
+ sizeof (struct TransportReliabilityBoxMessage),
+ sizeof (struct GNUNET_MessageHeader));
GNUNET_MQ_check_boxed_message (rb);
return GNUNET_YES;
}
@@ -6060,6 +6204,10 @@ completed_pending_message (struct PendingMessage *pm)
{
struct PendingMessage *pos;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Complete transmission of message %llu %u\n",
+ pm->logging_uuid,
+ pm->pmt);
switch (pm->pmt)
{
case PMT_CORE:
@@ -6069,7 +6217,7 @@ completed_pending_message (struct PendingMessage *pm)
return;
case PMT_FRAGMENT_BOX:
- /* Fragment sent over reliabile channel */
+ /* Fragment sent over reliable channel */
free_fragment_tree (pm);
pos = pm->frag_parent;
GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
@@ -6080,11 +6228,17 @@ completed_pending_message (struct PendingMessage *pm)
{
pm = pos;
pos = pm->frag_parent;
+ if (PMT_DV_BOX == pm->pmt)
+ {
+ GNUNET_free (pm);
+ client_send_response (pos);
+ return;
+ }
GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
GNUNET_free (pm);
}
- /* Was this the last applicable fragmment? */
+ /* Was this the last applicable fragment? */
if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
(pos->frag_off == pos->bytes_msg))
client_send_response (pos);
@@ -6293,6 +6447,7 @@ handle_backchannel_encapsulation (
target_communicator);
GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO);
GNUNET_free (stastr);
+ finish_cmc_handling (cmc);
return;
}
/* Finally, deliver backchannel message to communicator */
@@ -6309,6 +6464,7 @@ handle_backchannel_encapsulation (
cbi->pid = cmc->im.sender;
memcpy (&cbi[1], inbox, isize);
GNUNET_MQ_send (tc->mq, env);
+ finish_cmc_handling (cmc);
}
@@ -6359,39 +6515,54 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop)
struct VirtualLink *vl;
vl = lookup_virtual_link (&dv->target);
- if (NULL != vl)
+ if (NULL == vl)
{
- /* Link was already up, remember dv is also now available and we are done */
- vl->dv = dv;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Virtual link to %s could now also use DV!\n",
+ "Creating new virtual link to %s using DV!\n",
GNUNET_i2s (&dv->target));
- return;
+ vl = GNUNET_new (struct VirtualLink);
+ vl->confirmed = GNUNET_YES;
+ vl->message_uuid_ctr =
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+ vl->target = dv->target;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+ vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ vl->dv = dv;
+ dv->vl = vl;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+ consider_sending_fc (vl);
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&dv->target);
+ }
+ else
+ {
+ /* Link was already up, remember dv is also now available and we are done */
+ vl->dv = dv;
+ dv->vl = vl;
+ if (GNUNET_NO == vl->confirmed)
+ {
+ vl->confirmed = GNUNET_YES;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+ consider_sending_fc (vl);
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&dv->target);
+ }
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Virtual link to %s could now also use DV!\n",
+ GNUNET_i2s (&dv->target));
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new virtual link to %s using DV!\n",
- GNUNET_i2s (&dv->target));
- vl = GNUNET_new (struct VirtualLink);
- vl->message_uuid_ctr =
- GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
- vl->target = dv->target;
- vl->dv = dv;
- dv->vl = vl;
- vl->core_recv_window = RECV_WINDOW_SIZE;
- vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
- vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
- vl->visibility_task =
- GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_put (
- links,
- &vl->target,
- vl,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- consider_sending_fc (vl);
- /* We lacked a confirmed connection to the target
- before, so tell CORE about it (finally!) */
- cores_send_connect_info (&dv->target);
}
@@ -6530,8 +6701,10 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
return GNUNET_NO;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Refreshed known path to %s, forwarding further\n",
- GNUNET_i2s (&dv->target));
+ "Refreshed known path to %s valid until %s, forwarding further\n",
+ GNUNET_i2s (&dv->target),
+ GNUNET_STRINGS_absolute_time_to_string (
+ pos->path_valid_until));
return GNUNET_YES;
}
}
@@ -6548,8 +6721,9 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
}
/* create new DV path entry */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Discovered new DV path to %s\n",
- GNUNET_i2s (&dv->target));
+ "Discovered new DV path to %s valid until %s\n",
+ GNUNET_i2s (&dv->target),
+ GNUNET_STRINGS_absolute_time_to_string (path_valid_until));
hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
+ sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
hop->next_hop = next_hop;
@@ -6680,7 +6854,7 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
&fwd->header,
RMO_UNCONFIRMED_ALLOWED);*/
vl = lookup_virtual_link (next_hop);
- if (NULL != vl)
+ if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
{
route_control_message_without_fc (vl,
&fwd->header,
@@ -6977,13 +7151,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
struct GNUNET_TIME_Absolute in_time;
struct Neighbour *n;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "handle dv learn message sender %s\n",
- GNUNET_i2s (&cmc->im.sender));
-
nhops = ntohs (dvl->num_hops); /* 0 = sender is initiator */
bi_history = ntohs (dvl->bidirectional);
hops = (const struct DVPathEntryP *) &dvl[1];
@@ -7017,10 +7184,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
cc); // FIXME: add bi-directional flag to cc?
in_time = GNUNET_TIME_absolute_get ();
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "2 handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
-
/* continue communicator here, everything else can happen asynchronous! */
finish_cmc_handling (cmc);
@@ -7055,7 +7218,11 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
if (GNUNET_YES == n->dv_monotime_available)
{
if (NULL != n->sc)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel\n");
GNUNET_PEERSTORE_store_cancel (n->sc);
+ }
n->sc =
GNUNET_PEERSTORE_store (peerstore,
"transport",
@@ -7069,9 +7236,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
n);
}
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "3 handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
/* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
If signature verification load too high, implement random drop strategy */
for (unsigned int i = 0; i < nhops; i++)
@@ -7110,9 +7274,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
return;
}
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "4 handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
if (GNUNET_EXTRA_LOGGING > 0)
{
char *path;
@@ -7137,9 +7298,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
GNUNET_i2s (&GST_my_identity));
GNUNET_free (path);
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "5 handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
do_fwd = GNUNET_YES;
if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
{
@@ -7147,6 +7305,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
struct GNUNET_TIME_Relative host_latency_sum;
struct GNUNET_TIME_Relative latency;
struct GNUNET_TIME_Relative network_latency;
+ struct GNUNET_TIME_Absolute now;
/* We initiated this, learn the forward path! */
path[0] = GST_my_identity;
@@ -7155,7 +7314,11 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
// Need also something to lookup initiation time
// to compute RTT! -> add RTT argument here?
- latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
+ now = GNUNET_TIME_absolute_get ();
+ latency = GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh (
+ dvl->monotonic_time));
+ GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us);
+ // latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
// (based on dvl->challenge, we can identify time of origin!)
network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
@@ -7184,9 +7347,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
do_fwd = GNUNET_NO;
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "6 handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
if (bi_hop)
{
/* last hop was bi-directional, we could learn something here! */
@@ -7243,9 +7403,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
}
}
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "7 handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
if (MAX_DV_HOPS_ALLOWED == nhops)
{
/* At limit, we're out of here! */
@@ -7305,9 +7462,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
&dv_neighbour_transmission,
&nsc);
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "9 handle dv learn message from %s\n",
- GNUNET_i2s (&dvl->initiator));
}
@@ -7374,10 +7528,17 @@ forward_dv_box (struct Neighbour *next_hop,
char msg_buf[msg_size] GNUNET_ALIGN;
struct GNUNET_PeerIdentity *dhops;
+ if (GNUNET_NO == ntohs (hdr->without_fc))
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "forward dv box without fc\n");
+ if (NULL == vl)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "forward dv box vl null\n");
GNUNET_assert (GNUNET_YES == ntohs (hdr->without_fc) || NULL != vl);
hdr->num_hops = htons (num_hops);
hdr->total_hops = htons (total_hops);
+ hdr->header.size = htons (msg_size);
memcpy (msg_buf, hdr, sizeof(*hdr));
dhops = (struct GNUNET_PeerIdentity *) &msg_buf[sizeof(struct
TransportDVBoxMessage)];
@@ -7387,7 +7548,8 @@ forward_dv_box (struct Neighbour *next_hop,
if (GNUNET_YES == ntohs (hdr->without_fc))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Forwarding control message in DV Box to next hop %s (%u/%u) \n",
+ "Forwarding control message (payload size %u) in DV Box to next hop %s (%u/%u) \n",
+ enc_payload_size,
GNUNET_i2s (&next_hop->pid),
(unsigned int) num_hops,
(unsigned int) total_hops);
@@ -7444,6 +7606,8 @@ free_backtalker (struct Backtalker *b)
}
if (NULL != b->sc)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel\n");
GNUNET_PEERSTORE_store_cancel (b->sc);
b->sc = NULL;
}
@@ -7486,6 +7650,8 @@ backtalker_timeout_cb (void *cls)
{
struct Backtalker *b = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "backtalker timeout.\n");
b->task = NULL;
if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
{
@@ -7589,6 +7755,8 @@ update_backtalker_monotime (struct Backtalker *b)
if (NULL != b->sc)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel\n");
GNUNET_PEERSTORE_store_cancel (b->sc);
b->sc = NULL;
}
@@ -7714,8 +7882,8 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, key);
hdr = (const char *) &dvb[1];
- hdr_len = ntohs (dvb->header.size) - sizeof(*dvb) - sizeof(struct
- GNUNET_PeerIdentity)
+ hdr_len = ntohs (dvb->orig_size) - sizeof(*dvb) - sizeof(struct
+ GNUNET_PeerIdentity)
* ntohs (dvb->total_hops);
dv_hmac (key, &hmac, hdr, hdr_len);
@@ -8112,7 +8280,7 @@ handle_validation_challenge (
}
sender = cmc->im.sender;
vl = lookup_virtual_link (&sender);
- if (NULL != vl)
+ if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
{
// route_control_message_without_fc (&cmc->im.sender,
route_control_message_without_fc (vl,
@@ -8380,46 +8548,63 @@ handle_validation_response (
q->pd.aged_rtt = vs->validation_rtt;
n = q->neighbour;
vl = lookup_virtual_link (&vs->pid);
- if (NULL != vl)
+ if (NULL == vl)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new virtual link to %s using direct neighbour!\n",
+ GNUNET_i2s (&vs->pid));
+ vl = GNUNET_new (struct VirtualLink);
+ vl->confirmed = GNUNET_YES;
+ vl->message_uuid_ctr =
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+ vl->target = n->pid;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+ vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ vl->n = n;
+ n->vl = vl;
+ q->idle = GNUNET_YES;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+ consider_sending_fc (vl);
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&n->pid);
+ }
+ else
{
/* Link was already up, remember n is also now available and we are done */
if (NULL == vl->n)
{
vl->n = n;
n->vl = vl;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Virtual link to %s could now also direct neighbour!\n",
- GNUNET_i2s (&vs->pid));
+ if (GNUNET_YES == vl->confirmed)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Virtual link to %s could now also direct neighbour!\n",
+ GNUNET_i2s (&vs->pid));
}
else
{
GNUNET_assert (n == vl->n);
}
- return;
+ if (GNUNET_NO == vl->confirmed)
+ {
+ vl->confirmed = GNUNET_YES;
+ q->idle = GNUNET_YES;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+ consider_sending_fc (vl);
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&n->pid);
+ }
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new virtual link to %s using direct neighbour!\n",
- GNUNET_i2s (&vs->pid));
- vl = GNUNET_new (struct VirtualLink);
- vl->target = n->pid;
- vl->n = n;
- n->vl = vl;
- q->idle = GNUNET_YES;
- vl->core_recv_window = RECV_WINDOW_SIZE;
- vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
- vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
- vl->visibility_task =
- GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_put (
- links,
- &vl->target,
- vl,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- consider_sending_fc (vl);
- /* We lacked a confirmed connection to the target
- before, so tell CORE about it (finally!) */
- cores_send_connect_info (&n->pid);
}
@@ -8462,6 +8647,7 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
struct GNUNET_TIME_Absolute st;
uint64_t os;
uint64_t wnd;
+ uint32_t random;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received FC from %s\n", GNUNET_i2s (&cmc->im.sender));
@@ -8469,13 +8655,22 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
if (NULL == vl)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "FC dropped: VL unknown\n");
- GNUNET_STATISTICS_update (GST_stats,
- "# FC dropped: Virtual link unknown",
- 1,
- GNUNET_NO);
- finish_cmc_handling (cmc);
- return;
+ "No virtual link for FC creating new unconfirmed virtual link to %s!\n",
+ GNUNET_i2s (&cmc->im.sender));
+ vl = GNUNET_new (struct VirtualLink);
+ vl->confirmed = GNUNET_NO;
+ vl->message_uuid_ctr =
+ GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+ vl->target = cmc->im.sender;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+ vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
}
st = GNUNET_TIME_absolute_ntoh (fc->sender_time);
if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us)
@@ -8509,15 +8704,20 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
(unsigned long long) vl->outbound_fc_window_size,
(long long) vl->incoming_fc_window_size_loss);
wnd = GNUNET_ntohll (fc->outbound_window_size);
- if ((wnd < vl->incoming_fc_window_size) ||
- (vl->last_outbound_window_size_received != wnd) ||
- (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX)
- % FC_NO_CHANGE_REPLY_PROBABILITY))
+ random = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ if ((GNUNET_YES == vl->confirmed) && ((wnd < vl->incoming_fc_window_size) ||
+ (vl->last_outbound_window_size_received
+ != wnd) ||
+ (0 == random
+ % FC_NO_CHANGE_REPLY_PROBABILITY)))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Consider re-sending our FC message, as clearly the other peer's idea of the window is not up-to-date (%llu vs %llu)\n",
+ "Consider re-sending our FC message, as clearly the other peer's idea of the window is not up-to-date (%llu vs %llu) or %llu last received differs, or random reply %lu\n",
(unsigned long long) wnd,
- (unsigned long long) vl->incoming_fc_window_size);
+ (unsigned long long) vl->incoming_fc_window_size,
+ (unsigned long long) vl->last_outbound_window_size_received,
+ random % FC_NO_CHANGE_REPLY_PROBABILITY);
consider_sending_fc (vl);
}
if ((wnd == vl->incoming_fc_window_size) &&
@@ -8530,6 +8730,7 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
(unsigned long long) wnd);
GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
vl->fc_retransmit_task = NULL;
+ vl->fc_retransmit_count = 0;
}
vl->last_outbound_window_size_received = wnd;
/* FC window likely increased, check transmission possibilities! */
@@ -8729,7 +8930,6 @@ fragment_message (struct Queue *queue,
pm->bytes_msg,
GNUNET_i2s (&pm->vl->target),
(unsigned int) mtu);
- pa = prepare_pending_acknowledgement (queue, dvh, pm);
/* This invariant is established in #handle_add_queue_message() */
GNUNET_assert (mtu > sizeof(struct TransportFragmentBoxMessage));
@@ -8745,7 +8945,7 @@ fragment_message (struct Queue *queue,
ff = ff->head_frag; /* descent into fragmented fragments */
}
- if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg))
+ if (((ff->bytes_msg > mtu) || (pm == ff)) && (ff->frag_off < ff->bytes_msg))
{
/* Did not yet calculate all fragments, calculate next fragment */
struct PendingMessage *frag;
@@ -8783,13 +8983,15 @@ fragment_message (struct Queue *queue,
tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
tfb.header.size =
htons (sizeof(struct TransportFragmentBoxMessage) + fragsize);
+ pa = prepare_pending_acknowledgement (queue, dvh, frag);
tfb.ack_uuid = pa->ack_uuid;
tfb.msg_uuid = pm->msg_uuid;
tfb.frag_off = htons (ff->frag_off + xoff);
tfb.msg_size = htons (pm->bytes_msg);
memcpy (msg, &tfb, sizeof(tfb));
memcpy (&msg[sizeof(tfb)], &orig[ff->frag_off], fragsize);
- GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, ff->tail_frag, frag);
+ GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag,
+ ff->tail_frag, frag);
ff->frag_off += fragsize;
ff = frag;
}
@@ -8803,6 +9005,7 @@ fragment_message (struct Queue *queue,
ff->frag_parent->head_frag,
ff->frag_parent->tail_frag,
ff);
+
return ff;
}
@@ -8829,7 +9032,7 @@ reliability_box_message (struct Queue *queue,
struct PendingMessage *bpm;
char *msg;
- if (PMT_CORE != pm->pmt)
+ if ((PMT_CORE != pm->pmt) && (PMT_DV_BOX != pm->pmt))
return pm; /* already fragmented or reliability boxed, or control message:
do nothing */
if (NULL != pm->bpm)
@@ -8842,11 +9045,7 @@ reliability_box_message (struct Queue *queue,
client_send_response (pm);
return NULL;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Preparing reliability box for message <%llu> to %s on queue %s\n",
- pm->logging_uuid,
- GNUNET_i2s (&pm->vl->target),
- queue->address);
+
pa = prepare_pending_acknowledgement (queue, dvh, pm);
bpm = GNUNET_malloc (sizeof(struct PendingMessage) + sizeof(rbox)
@@ -8869,6 +9068,13 @@ reliability_box_message (struct Queue *queue,
memcpy (msg, &rbox, sizeof(rbox));
memcpy (&msg[sizeof(rbox)], &pm[1], pm->bytes_msg);
pm->bpm = bpm;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing reliability box for message <%llu> of size %lu (%lu) to %s on queue %s\n",
+ pm->logging_uuid,
+ pm->bytes_msg,
+ ntohs (((const struct GNUNET_MessageHeader *) &pm[1])->size),
+ GNUNET_i2s (&pm->vl->target),
+ queue->address);
return bpm;
}
@@ -8911,6 +9117,7 @@ update_pm_next_attempt (struct PendingMessage *pm,
{
struct VirtualLink *vl = pm->vl;
+ // TODO Do we really need a next_attempt value for PendingMessage other than the root Pending Message?
pm->next_attempt = next_attempt;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Next attempt for message <%llu> set to %s\n",
@@ -8921,9 +9128,17 @@ update_pm_next_attempt (struct PendingMessage *pm,
{
reorder_root_pm (pm, next_attempt);
}
- else if ((PMT_RELIABILITY_BOX == pm->pmt)||(PMT_DV_BOX == pm->pmt))
+ else if ((PMT_RELIABILITY_BOX == pm->pmt) || (PMT_DV_BOX == pm->pmt))
{
- reorder_root_pm (pm->frag_parent, next_attempt);
+ struct PendingMessage *root = pm->frag_parent;
+
+ while (NULL != root->frag_parent)
+ root = root->frag_parent;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Next attempt for root message <%llu> set to %s\n",
+ root->logging_uuid);
+ root->next_attempt = next_attempt;
+ reorder_root_pm (root, next_attempt);
}
else
{
@@ -8988,6 +9203,16 @@ struct PendingMessageScoreContext
* Did we have to reliability box?
*/
int relb;
+
+ /**
+ * There are pending messages, but it was to early to send one of them.
+ */
+ int to_early;
+
+ /**
+ * When will we try to transmit the message again for which it was to early to retry.
+ */
+ struct GNUNET_TIME_Relative to_early_retry_delay;
};
@@ -9020,14 +9245,32 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
int relb;
if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "DV messages must not be DV-routed to next hop!\n");
continue; /* DV messages must not be DV-routed to next hop! */
+ }
if (pos->next_attempt.abs_value_us > now.abs_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "too early for all messages, they are sorted by next_attempt\n");
+ sc->to_early = GNUNET_YES;
+
break; /* too early for all messages, they are sorted by next_attempt */
+ }
+ sc->to_early = GNUNET_NO;
if (NULL != pos->qe)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "not eligible\n");
continue; /* not eligible */
+ }
sc->consideration_counter++;
/* determine if we have to fragment, if so add fragmentation
overhead! */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "check %u for sc->best\n",
+ pos->logging_uuid);
frag = GNUNET_NO;
if (((0 != queue->mtu) &&
(pos->bytes_msg + real_overhead > queue->mtu)) ||
@@ -9038,6 +9281,10 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
respect that even if MTU is UINT16_MAX for
this queue */))
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "fragment msg with size %u, realoverhead is %u\n",
+ pos->bytes_msg,
+ real_overhead);
frag = GNUNET_YES;
if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
{
@@ -9064,7 +9311,10 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
{
relb = GNUNET_YES;
}
-
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Create reliability box of msg with size %u, realoverhead is %u\n",
+ pos->bytes_msg,
+ real_overhead);
}
/* Finally, compare to existing 'best' in sc to see if this 'pos' pending
@@ -9107,7 +9357,13 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
pm_score -= queue->mtu - (real_overhead + pos->bytes_msg);
}
if (sc_score + time_delta > pm_score)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "sc_score of %u larger, keep sc->best %u\n",
+ pos->logging_uuid,
+ sc->best->logging_uuid);
continue; /* sc_score larger, keep sc->best */
+ }
}
sc->best = pos;
sc->dvh = dvh;
@@ -9216,6 +9472,10 @@ transmit_on_queue (void *cls)
"No pending messages, queue `%s' to %s now idle\n",
queue->address,
GNUNET_i2s (&n->pid));
+ if (GNUNET_YES == sc.to_early)
+ schedule_transmit_on_queue (sc.to_early_retry_delay,
+ queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
queue->idle = GNUNET_YES;
return;
}
@@ -9226,9 +9486,17 @@ transmit_on_queue (void *cls)
pm = sc.best;
if (NULL != sc.dvh)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Is this %u a DV box?\n",
+ pm->pmt);
GNUNET_assert (PMT_DV_BOX != pm->pmt);
if (NULL != sc.best->bpm)
{
+ const struct DVPathEntryP *hops_old;
+ const struct DVPathEntryP *hops_selected;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Discard old box\n");
/* We did this boxing before, but possibly for a different path!
Discard old DV box! OPTIMIZE-ME: we might want to check if
it is the same and then not re-build the message... */
@@ -9246,6 +9514,13 @@ transmit_on_queue (void *cls)
RMO_NONE,
GNUNET_NO);
GNUNET_assert (NULL != sc.best->bpm);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%u %u %u %u %u\n",
+ sizeof(struct GNUNET_PeerIdentity),
+ sizeof(struct TransportDVBoxMessage),
+ sizeof(struct TransportDVBoxPayloadP),
+ sizeof(struct TransportFragmentBoxMessage),
+ ((const struct GNUNET_MessageHeader *) &sc.best[1])->size);
pm = sc.best->bpm;
}
if (GNUNET_YES == sc.frag)
@@ -9258,7 +9533,9 @@ transmit_on_queue (void *cls)
queue->address,
GNUNET_i2s (&n->pid),
sc.best->logging_uuid);
- schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
return;
}
}
@@ -9274,7 +9551,9 @@ transmit_on_queue (void *cls)
queue->address,
GNUNET_i2s (&n->pid),
sc.best->logging_uuid);
- schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
return;
}
}
@@ -9323,6 +9602,8 @@ transmit_on_queue (void *cls)
}
else
{
+ struct GNUNET_TIME_Relative wait_duration;
+
/* Message not finished, waiting for acknowledgement.
Update time by which we might retransmit 's' based on queue
characteristics (i.e. RTT); it takes one RTT for the message to
@@ -9333,18 +9614,33 @@ transmit_on_queue (void *cls)
OPTIMIZE: Note that in the future this heuristic should likely
be improved further (measure RTT stability, consider message
urgency and size when delaying ACKs, etc.) */
+
+ if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us !=
+ queue->pd.aged_rtt.rel_value_us)
+ wait_duration = queue->pd.aged_rtt;
+ else
+ wait_duration = DEFAULT_ACK_WAIT_DURATION;
+ struct GNUNET_TIME_Absolute next = GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_relative_multiply (
+ wait_duration, 4));
+ struct GNUNET_TIME_Relative plus = GNUNET_TIME_relative_multiply (
+ wait_duration, 4);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Waiting %s for ACK\n",
+ "Waiting %s (%llu) for ACK until %llu\n",
GNUNET_STRINGS_relative_time_to_string (
GNUNET_TIME_relative_multiply (
- queue->pd.aged_rtt, 4), GNUNET_NO));
+ queue->pd.aged_rtt, 4), GNUNET_NO),
+ plus,
+ next);
update_pm_next_attempt (pm,
GNUNET_TIME_relative_to_absolute (
- GNUNET_TIME_relative_multiply (queue->pd.aged_rtt,
+ GNUNET_TIME_relative_multiply (wait_duration,
4)));
}
/* finally, re-schedule queue transmission task itself */
- schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
@@ -9476,7 +9772,9 @@ handle_send_message_ack (void *cls,
NULL != queue;
queue = queue->next_client)
{
- schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
}
else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
@@ -9486,7 +9784,9 @@ handle_send_message_ack (void *cls,
"# Transmission throttled due to queue queue limit",
-1,
GNUNET_NO);
- schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ qe->queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
else if (1 == qe->queue->q_capacity)
{
@@ -9500,7 +9800,9 @@ handle_send_message_ack (void *cls,
"# Transmission throttled due to message queue capacity",
-1,
GNUNET_NO);
- schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ qe->queue,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT);
}
if (NULL != (pm = qe->pm))
@@ -10090,7 +10392,8 @@ handle_add_queue_message (void *cls,
&check_validation_request_pending,
queue);
/* look for traffic for this queue */
- schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+ queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
/* might be our first queue, try launching DV learning */
if (NULL == dvlearn_task)
dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);
diff --git a/src/transport/test_transport_distance_vector_circle_topo.conf b/src/transport/test_transport_distance_vector_circle_topo.conf
index ce26bf923..210179291 100644
--- a/src/transport/test_transport_distance_vector_circle_topo.conf
+++ b/src/transport/test_transport_distance_vector_circle_topo.conf
@@ -2,7 +2,7 @@ M:1
N:3
X:0
AC:1
-B:1
+B:0
T:libgnunet_test_transport_plugin_cmd_simple_send_dv
R:1|{tcp_port:0}|{udp_port:1}
R:2|{tcp_port:0}|{udp_port:1}
diff --git a/src/transport/test_transport_plugin_cmd_simple_send_dv.c b/src/transport/test_transport_plugin_cmd_simple_send_dv.c
index 167120e2b..f1f168102 100644
--- a/src/transport/test_transport_plugin_cmd_simple_send_dv.c
+++ b/src/transport/test_transport_plugin_cmd_simple_send_dv.c
@@ -65,12 +65,12 @@ struct TestState
*/
struct GNUNET_TESTING_NetjailTopology *topology;
- /**
+};
+
+/**
* The number of messages received.
*/
- unsigned int number_received;
-
-};
+static unsigned int number_received;
static struct GNUNET_TESTING_Command block_send;
@@ -105,52 +105,61 @@ static void
handle_test (void *cls,
const struct GNUNET_TRANSPORT_TESTING_TestMessage *message)
{
- struct TestState *ts = cls;
+ struct GNUNET_PeerIdentity *peer = cls;
const struct GNUNET_TESTING_AsyncContext *ac_block;
const struct GNUNET_TESTING_AsyncContext *ac_start;
const struct GNUNET_TESTING_Command *cmd;
const struct GNUNET_CONTAINER_MultiShortmap *connected_peers_map;
unsigned int connected;
struct BlockState *bs;
+ struct GNUNET_TRANSPORT_CoreHandle *ch;
+ const struct StartPeerState *sps;
-
+ GNUNET_TRANSPORT_get_trait_state (&start_peer,
+ &sps);
+ ch = sps->th;
GNUNET_TRANSPORT_get_trait_connected_peers_map (&start_peer,
&connected_peers_map);
- connected = GNUNET_CONTAINER_multishortmap_size (
- connected_peers_map);
+ if (NULL != connected_peers_map)
+ {
+ connected = GNUNET_CONTAINER_multishortmap_size (
+ connected_peers_map);
- ts->number_received++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received %u test message(s) from %u connected peer(s)\n",
- ts->number_received,
- connected);
+ number_received++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u test message(s) from %s, %u connected peer(s)\n",
+ number_received,
+ GNUNET_i2s (peer),
+ connected);
- GNUNET_TESTING_get_trait_async_context (&block_receive,
- &ac_block);
+ GNUNET_TESTING_get_trait_async_context (&block_receive,
+ &ac_block);
- if ( connected == ts->number_received)
- {
- if (NULL != ac_block->is)
+ if ( connected == number_received)
{
- GNUNET_assert (NULL != ac_block);
- if (NULL == ac_block->cont)
- GNUNET_TESTING_async_fail ((struct
- GNUNET_TESTING_AsyncContext *) ac_block);
- else
- GNUNET_TESTING_async_finish ((struct
+ if (NULL != ac_block->is)
+ {
+ GNUNET_assert (NULL != ac_block);
+ if (NULL == ac_block->cont)
+ GNUNET_TESTING_async_fail ((struct
GNUNET_TESTING_AsyncContext *) ac_block);
- }
- else
- {
- GNUNET_TESTING_get_trait_block_state (
- &block_receive,
- (const struct BlockState **) &bs);
- bs->asynchronous_finish = GNUNET_YES;
- }
+ else
+ GNUNET_TESTING_async_finish ((struct
+ GNUNET_TESTING_AsyncContext *) ac_block);
+ }
+ else
+ {
+ GNUNET_TESTING_get_trait_block_state (
+ &block_receive,
+ (const struct BlockState **) &bs);
+ bs->asynchronous_finish = GNUNET_YES;
+ }
+ }
}
+ GNUNET_TRANSPORT_core_receive_continue (ch, peer);
}
@@ -284,10 +293,6 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, char *router_ip,
GNUNET_MQ_handler_end ()
};
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "number_received %u\n",
- ts->number_received);
-
if (GNUNET_YES == *read_file)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -347,7 +352,7 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, char *router_ip,
handlers,
ts->cfgname,
notify_connect,
- GNUNET_YES);
+ GNUNET_NO);
struct GNUNET_TESTING_Command commands[] = {
GNUNET_TESTING_cmd_system_create ("system-create",
ts->testdir),
diff --git a/src/transport/transport_api_cmd_start_peer.c b/src/transport/transport_api_cmd_start_peer.c
index e305e24e1..7448eff5a 100644
--- a/src/transport/transport_api_cmd_start_peer.c
+++ b/src/transport/transport_api_cmd_start_peer.c
@@ -124,7 +124,7 @@ notify_connect (void *cls,
struct GNUNET_HashCode hc;
struct GNUNET_CRYPTO_EddsaPublicKey public_key = peer->public_key;
- void *ret = sps->handlers;
+ void *ret = (struct GNUNET_PeerIdentity *) peer;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"This Peer %s \n",
diff --git a/src/util/mq.c b/src/util/mq.c
index b09837459..40ac97bbe 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -273,7 +273,7 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
break;
}
}
-done:
+ done:
if (GNUNET_NO == handled)
{
LOG (GNUNET_ERROR_TYPE_INFO,
@@ -384,8 +384,9 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
mq->current_envelope = ev;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "sending message of type %u, queue empty (MQ: %p)\n",
+ "sending message of type %u and size %u, queue empty (MQ: %p)\n",
ntohs (ev->mh->type),
+ ntohs (ev->mh->size),
mq);
mq->send_impl (mq,
@@ -479,8 +480,10 @@ impl_send_continue (void *cls)
mq->current_envelope);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "sending message of type %u from queue\n",
- ntohs (mq->current_envelope->mh->type));
+ "sending message of type %u and size %u from queue (MQ: %p)\n",
+ ntohs (mq->current_envelope->mh->type),
+ ntohs (mq->current_envelope->mh->size),
+ mq);
mq->send_impl (mq,
mq->current_envelope->mh,