From 95a1edacccd9b3bf769a144a12d41946d0ac25dc Mon Sep 17 00:00:00 2001 From: t3sserakt Date: Thu, 17 Mar 2022 14:28:40 +0100 Subject: - Trying to exchange iptables with nft, first shot failed. - Fixed small bug in UDP communicator. - Fixed bug in DV circle test case - Introduced a default value to wait for a reliability ack. - Introduced a FC retransmission threshold together with a retransmission count. - Introduced a original size value for TransportDVBoxMessage - Checking if we have the root pending messge, when removing the pending message from virtual link. - Added delay value to schedule_transmit_on_queue to wait for retransmitting. - Checking for confirmed virtual link, before routing. - Allow unconfirmed queues or DV routes when doing dv encapsulation for control traffic. - Changed check_vl_transmission to also check window size for DV next hop peer. - Fixed fragment box handling to also handle reliability boxed message which needed to be fragmented. - Fixed completing a message which was not only fragmented but also DV boxed. - Added logic to notify core about a new virtual link using distance vector without having validated next neighbour. - Added logic to create a virtual link to handle flow control messages. - fixed several smaller bugs in fragmentation logic. - Changed logic for adding the next_attempt value of PendingMessage. --- contrib/netjail/netjail_core.sh | 5 +- contrib/netjail/netjail_start.sh | 4 + src/transport/Makefile.am | 2 +- src/transport/gnunet-communicator-udp.c | 29 +- src/transport/gnunet-service-tng.c | 699 +++++++++++++++------ ...test_transport_distance_vector_circle_topo.conf | 2 +- .../test_transport_plugin_cmd_simple_send_dv.c | 79 +-- src/transport/transport_api_cmd_start_peer.c | 2 +- src/util/mq.c | 11 +- 9 files changed, 581 insertions(+), 252 deletions(-) diff --git a/contrib/netjail/netjail_core.sh b/contrib/netjail/netjail_core.sh index ed363cf35..da784fa5e 100755 --- a/contrib/netjail/netjail_core.sh +++ b/contrib/netjail/netjail_core.sh @@ -188,7 +188,10 @@ netjail_node_add_nat() { local ADDRESS=$2 local MASK=$3 - ip netns exec $NODE iptables -t nat -A POSTROUTING -s "$ADDRESS/$MASK" -j MASQUERADE + ip netns exec $NODE nft add table nat + ip netns exec $NODE nft add chain nat postrouting { type nat hook postrouting priority 0 \; } + ip netns exec $NODE nft add rule ip nat postrouting ip saddr "$ADDRESS/$MASK" counter masquerade + # ip netns exec $NODE iptables -t nat -A POSTROUTING -s "$ADDRESS/$MASK" -j MASQUERADE } netjail_node_add_default() { diff --git a/contrib/netjail/netjail_start.sh b/contrib/netjail/netjail_start.sh index f7c417c27..e2d5fd634 100755 --- a/contrib/netjail/netjail_start.sh +++ b/contrib/netjail/netjail_start.sh @@ -77,11 +77,15 @@ for N in $(seq $GLOBAL_N); do if [ "1" == "${R_TCP[$N]}" ] then + #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr $GLOBAL_GROUP.$N tcp dport 60002 counter dnat to $LOCAL_GROUP.1 + #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr $LOCAL_GROUP.1 ct state new,related,established counter accept ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p tcp -d $GLOBAL_GROUP.$N --dport 60002 -j DNAT --to $LOCAL_GROUP.1 ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT fi if [ "1" == "${R_UDP[$N]}" ] then + #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr $GLOBAL_GROUP.$N udp dport $PORT counter dnat to $LOCAL_GROUP.1 + #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr $LOCAL_GROUP.1 ct state new,related,established counter accept ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p udp -d $GLOBAL_GROUP.$N --dport $PORT -j DNAT --to $LOCAL_GROUP.1 ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1 -m state --state NEW,RELATED,ESTABLISHED -j ACCEPT fi diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am index 5d6fb8421..ec00bc917 100644 --- a/src/transport/Makefile.am +++ b/src/transport/Makefile.am @@ -778,7 +778,7 @@ check_SCRIPTS= \ test_transport_simple_send.sh \ test_transport_simple_send_broadcast.sh \ test_transport_udp_backchannel.sh \ - # test_transport_simple_send_dv_circle.sh + test_transport_simple_send_dv_circle.sh # test_transport_simple_send_dv_inverse.sh test_transport_start_with_config_SOURCES = \ diff --git a/src/transport/gnunet-communicator-udp.c b/src/transport/gnunet-communicator-udp.c index b6edff485..70848ff79 100644 --- a/src/transport/gnunet-communicator-udp.c +++ b/src/transport/gnunet-communicator-udp.c @@ -1273,7 +1273,7 @@ pass_plaintext_to_core (struct SenderAddress *sender, const struct GNUNET_MessageHeader *hdr = plaintext; const char *pos = plaintext; - while (ntohs (hdr->size) < plaintext_len) + while (ntohs (hdr->size) <= plaintext_len) { GNUNET_STATISTICS_update (stats, "# bytes given to core", @@ -1722,6 +1722,12 @@ try_handle_plaintext (struct SenderAddress *sender, const struct UDPAck *ack = (const struct UDPAck *) buf; uint16_t type; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "try_handle_plaintext of size %u (%u %u) and type %u\n", + buf_size, + ntohs (hdr->size), + sizeof(*hdr), + ntohs (hdr->type)); if (sizeof(*hdr) > buf_size) return; /* not even a header */ if (ntohs (hdr->size) > buf_size) @@ -2202,7 +2208,8 @@ verify_confirmation (const struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral, { struct UdpHandshakeSignature uhs; - uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE); + uhs.purpose.purpose = htonl ( + GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE); uhs.purpose.size = htonl (sizeof(uhs)); uhs.sender = uc->sender; uhs.receiver = my_identity; @@ -2350,7 +2357,8 @@ sock_read (void *cls) "received UDPBroadcast from %s\n", GNUNET_a2s ((const struct sockaddr *) addr_verify, salen)); ub = (const struct UDPBroadcast *) buf; - uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST); + uhs.purpose.purpose = htonl ( + GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST); uhs.purpose.size = htonl (sizeof(uhs)); uhs.sender = ub->sender; sender = ub->sender; @@ -2366,10 +2374,11 @@ sock_read (void *cls) GNUNET_i2s (&sender)); GNUNET_CRYPTO_hash ((struct sockaddr *) addr_verify, salen, &uhs.h_address); if (GNUNET_OK == - GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST, - &uhs, - &ub->sender_sig, - &ub->sender.public_key)) + GNUNET_CRYPTO_eddsa_verify ( + GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST, + &uhs, + &ub->sender_sig, + &ub->sender.public_key)) { char *addr_s; enum GNUNET_NetworkType nt; @@ -2699,7 +2708,8 @@ mq_send_kx (struct GNUNET_MQ_Handle *mq, uc.sender = my_identity; uc.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); - uhs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE); + uhs.purpose.purpose = htonl ( + GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE); uhs.purpose.size = htonl (sizeof(uhs)); uhs.sender = my_identity; uhs.receiver = receiver->target; @@ -3644,7 +3654,8 @@ iface_proc (void *cls, bi->salen = addrlen; bi->found = GNUNET_YES; bi->bcm.sender = my_identity; - ubs.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST); + ubs.purpose.purpose = htonl ( + GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST); ubs.purpose.size = htonl (sizeof(ubs)); ubs.sender = my_identity; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 56a854a70..0427bd229 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -81,6 +81,11 @@ #include "gnunet_signatures.h" #include "transport.h" +/** + * Maximum number of FC retransmissions for a runing retransmission task. + */ +#define MAX_FC_RETRANSMIT_COUNT 1000 + /** * Maximum number of messages we acknowledge together in one * cumulative ACK. Larger values may save a bit of bandwidth. @@ -185,6 +190,12 @@ #define DV_FORWARD_TIMEOUT \ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) +/** + * Default value for how long we wait for reliability ack. + */ +#define DEFAULT_ACK_WAIT_DURATION \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) + /** * We only consider queues as "quality" connections when * suppressing the generation of DV initiation messages if @@ -781,6 +792,16 @@ struct TransportDVBoxMessage */ struct GNUNET_HashCode hmac; + /** + * Size this msg had initially. This is needed to calculate the hmac at the target. + * The header size can not be used for that, because the box size is getting smaller at each hop. + */ + /** + * The length of the struct (in bytes, including the length field itself), + * in big-endian format. + */ + uint16_t orig_size GNUNET_PACKED; + /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values; excluding the @e origin and the current peer, the last must be the ultimate target; if @e num_hops is zero, the receiver of this @@ -1341,6 +1362,17 @@ struct VirtualLink */ struct GNUNET_SCHEDULER_Task *fc_retransmit_task; + /** + * Number of FC retransmissions for this running task. + */ + unsigned int fc_retransmit_count; + + /** + * Is this VirtualLink confirmed. + * A unconfirmed VirtualLink might exist, if we got a FC from that target. + */ + unsigned int confirmed; + /** * Neighbour used by this virtual link, NULL if @e dv is used. */ @@ -2845,8 +2877,8 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa) struct PendingMessage *pm = pa->pm; struct DistanceVectorHop *dvh = pa->dvh; - GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa); - pa_count--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "free_pending_acknowledgement\n"); if (NULL != q) { GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa); @@ -2854,6 +2886,17 @@ free_pending_acknowledgement (struct PendingAcknowledgement *pa) } if (NULL != pm) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "remove pa from message\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "remove pa from message %llu\n", + pm->logging_uuid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "remove pa from message %u\n", + pm->pmt); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "remove pa from message %s\n", + GNUNET_uuid2s (&pa->ack_uuid.value)); GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa); pa->pm = NULL; } @@ -2920,8 +2963,11 @@ free_pending_message (struct PendingMessage *pm) tc->details.core.pending_msg_tail, pm); } - if (NULL != vl) + if ((NULL != vl) && (NULL == pm->frag_parent)) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Removing pm %lu\n", + pm->logging_uuid); GNUNET_CONTAINER_MDLL_remove (vl, vl->pending_msg_head, vl->pending_msg_tail, @@ -2929,6 +2975,18 @@ free_pending_message (struct PendingMessage *pm) } while (NULL != (pa = pm->pa_head)) { + if (NULL == pa) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "free pending pa null\n"); + if (NULL == pm->pa_tail) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "free pending pa_tail null\n"); + if (NULL == pa->prev_pa) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "free pending pa prev null\n"); + if (NULL == pa->next_pa) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "free pending pa next null\n"); GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa); pa->pm = NULL; } @@ -2944,6 +3002,9 @@ free_pending_message (struct PendingMessage *pm) free_fragment_tree (pm->bpm); GNUNET_free (pm->bpm); } + if (NULL == pm) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "free pending pm null\n"); GNUNET_free (pm); } @@ -3028,6 +3089,9 @@ free_virtual_link (struct VirtualLink *vl) struct PendingMessage *pm; struct CoreSentContext *csc; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "free virtual link\n"); + if (NULL != vl->reassembly_map) { GNUNET_CONTAINER_multihashmap32_iterate (vl->reassembly_map, @@ -3084,6 +3148,8 @@ free_validation_state (struct ValidationState *vs) vs->hn = NULL; if (NULL != vs->sc) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "store cancel\n"); GNUNET_PEERSTORE_store_cancel (vs->sc); vs->sc = NULL; } @@ -3392,6 +3458,8 @@ free_neighbour (struct Neighbour *neighbour) } if (NULL != neighbour->sc) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "store cancel\n"); GNUNET_PEERSTORE_store_cancel (neighbour->sc); neighbour->sc = NULL; } @@ -3503,7 +3571,8 @@ check_for_queue_with_higher_prio (struct Queue *queue, struct Queue *queue_head) * @param p task priority to use, if @a queue is scheduled */ static void -schedule_transmit_on_queue (struct Queue *queue, +schedule_transmit_on_queue (struct GNUNET_TIME_Relative delay, + struct Queue *queue, enum GNUNET_SCHEDULER_Priority p) { if (check_for_queue_with_higher_prio (queue, @@ -3552,7 +3621,8 @@ schedule_transmit_on_queue (struct Queue *queue, if (NULL != queue->transmit_task) GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = - GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue); + GNUNET_SCHEDULER_add_delayed_with_priority (delay, p, &transmit_on_queue, + queue); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Considering transmission on queue `%s' to %s\n", queue->address, @@ -3677,13 +3747,15 @@ free_queue (struct Queue *queue) GNUNET_NO); for (struct Queue *s = tc->details.communicator.queue_head; NULL != s; s = s->next_client) - schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + s, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); } notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); GNUNET_free (queue); vl = lookup_virtual_link (&neighbour->pid); - if ((NULL != vl) && (neighbour == vl->n)) + if ((NULL != vl) && (GNUNET_YES == vl->confirmed) && (neighbour == vl->n)) { GNUNET_SCHEDULER_cancel (vl->visibility_task); check_link_down (vl); @@ -3710,6 +3782,8 @@ free_address_list_entry (struct AddressListEntry *ale) ale); if (NULL != ale->sc) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "store cancel\n"); GNUNET_PEERSTORE_store_cancel (ale->sc); ale->sc = NULL; } @@ -3954,6 +4028,8 @@ client_send_response (struct PendingMessage *pm) struct TransportClient *tc = pm->client; struct VirtualLink *vl = pm->vl; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "client send response\n"); if (NULL != tc) { struct GNUNET_MQ_Envelope *env; @@ -4127,7 +4203,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) return; } vl = lookup_virtual_link (&rom->peer); - if (NULL == vl) + if ((NULL == vl) || (GNUNET_NO == vl->confirmed)) { GNUNET_STATISTICS_update (GST_stats, "# RECV_OK dropped: virtual link unknown", @@ -4323,6 +4399,12 @@ queue_send_msg (struct Queue *queue, queue->idle = GNUNET_NO; if (0 == queue->q_capacity) queue->idle = GNUNET_NO; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending message of type %u (%u) and size %u with MQ %p\n", + ntohs (((const struct GNUNET_MessageHeader *) payload)->type), + ntohs (smt->header.size), + payload_size, + queue->tc->mq); GNUNET_MQ_send (queue->tc->mq, env); } } @@ -4684,6 +4766,7 @@ encapsulate_for_dv (struct DistanceVector *dv, struct GNUNET_PeerIdentity *dhops; box_hdr.header.size = htons (sizeof(buf)); + box_hdr.orig_size = htons (sizeof(buf)); box_hdr.num_hops = htons (num_hops); memcpy (buf, &box_hdr, sizeof(box_hdr)); dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)]; @@ -4741,7 +4824,7 @@ send_dv_to_neighbour (void *cls, enum RouteMessageOptions options) { (void) cls; - (void) route_via_neighbour (next_hop, hdr, options); + (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED); } @@ -4776,7 +4859,7 @@ route_control_message_without_fc (struct VirtualLink *vl, // TODO Do this elsewhere. vl should be given as parameter to method. // vl = lookup_virtual_link (target); - GNUNET_assert (NULL != vl); + GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed); if (NULL == vl) return GNUNET_TIME_UNIT_FOREVER_REL; n = vl->n; @@ -4916,7 +4999,7 @@ consider_sending_fc (void *cls) fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used); fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size); fc.sender_time = GNUNET_TIME_absolute_hton (monotime); - rtt = route_control_message_without_fc (vl, &fc.header, RMO_NONE); + rtt = route_control_message_without_fc (vl, &fc.header, RMO_DV_ALLOWED); if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us) { rtt = GNUNET_TIME_UNIT_SECONDS; @@ -4933,8 +5016,14 @@ consider_sending_fc (void *cls) } if (NULL != vl->fc_retransmit_task) GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task); + if (MAX_FC_RETRANSMIT_COUNT == vl->fc_retransmit_count) + { + rtt = GNUNET_TIME_UNIT_MINUTES; + vl->fc_retransmit_count = 0; + } vl->fc_retransmit_task = GNUNET_SCHEDULER_add_delayed (rtt, &task_consider_sending_fc, vl); + vl->fc_retransmit_count++; } @@ -4960,14 +5049,20 @@ check_vl_transmission (struct VirtualLink *vl) struct Neighbour *n = vl->n; struct DistanceVector *dv = vl->dv; struct GNUNET_TIME_Absolute now; + struct VirtualLink *vl_next_hop; int elig; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "check_vl_transmission to target %s\n", + GNUNET_i2s (&vl->target)); /* Check that we have an eligible pending message! (cheaper than having #transmit_on_queue() find out!) */ elig = GNUNET_NO; for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm; pm = pm->next_vl) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "check_vl_transmission loop\n"); if (NULL != pm->qe) continue; /* not eligible, is in a queue! */ if (pm->bytes_msg + vl->outbound_fc_window_size_used > @@ -4983,62 +5078,96 @@ check_vl_transmission (struct VirtualLink *vl) consider_sending_fc (vl); return; /* We have a message, but flow control says "nope" */ } - elig = GNUNET_YES; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Eligible message %lu of size %llu to %s: %llu/%llu\n", - pm->logging_uuid, - pm->bytes_msg, - GNUNET_i2s (&vl->target), - (unsigned long long) vl->outbound_fc_window_size, - (unsigned long long) (pm->bytes_msg - + vl->outbound_fc_window_size_used)); - break; - } - if (GNUNET_NO == elig) - return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not stalled. Scheduling transmission on queue\n"); - /* Notify queues at direct neighbours that we are interested */ - now = GNUNET_TIME_absolute_get (); - if (NULL != n) - { - for (struct Queue *queue = n->queue_head; NULL != queue; - queue = queue->next_neighbour) - { - if ((GNUNET_YES == queue->idle) && - (queue->validated_until.abs_value_us > now.abs_value_us)) - schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Neighbour Queue QID: %u (%u) busy or invalid\n", - queue->qid, - queue->idle); - } - } - /* Notify queues via DV that we are interested */ - if (NULL != dv) - { - /* Do DV with lower scheduler priority, which effectively means that - IF a neighbour exists and is available, we prefer it. */ - for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; - pos = pos->next_dv) + "Target window on VL %s not stalled. Scheduling transmission on queue\n", + GNUNET_i2s (&vl->target)); + /* Notify queues at direct neighbours that we are interested */ + now = GNUNET_TIME_absolute_get (); + if (NULL != n) { - struct Neighbour *nh = pos->next_hop; - - if (pos->path_valid_until.abs_value_us <= now.abs_value_us) - continue; /* skip this one: path not validated */ - for (struct Queue *queue = nh->queue_head; NULL != queue; + for (struct Queue *queue = n->queue_head; NULL != queue; queue = queue->next_neighbour) + { if ((GNUNET_YES == queue->idle) && (queue->validated_until.abs_value_us > now.abs_value_us)) - schedule_transmit_on_queue (queue, - GNUNET_SCHEDULER_PRIORITY_BACKGROUND); + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Direct neighbour %s not stalled\n", + GNUNET_i2s (&n->pid)); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); + elig = GNUNET_YES; + } else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DV Queue QID: %u (%u) busy or invalid\n", + "Neighbour Queue QID: %u (%u) busy or invalid\n", queue->qid, queue->idle); + } } + /* Notify queues via DV that we are interested */ + if (NULL != dv) + { + /* Do DV with lower scheduler priority, which effectively means that + IF a neighbour exists and is available, we prefer it. */ + for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; + pos = pos->next_dv) + { + struct Neighbour *nh = pos->next_hop; + + + if (pos->path_valid_until.abs_value_us <= now.abs_value_us) + continue; /* skip this one: path not validated */ + else + { + vl_next_hop = lookup_virtual_link (&nh->pid); + if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used > + vl_next_hop->outbound_fc_window_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Stalled message %lu transmission on next hop %s due to flow control: %llu < %llu\n", + pm->logging_uuid, + GNUNET_i2s (&vl_next_hop->target), + (unsigned long + long) vl_next_hop->outbound_fc_window_size, + (unsigned long long) (pm->bytes_msg + + vl_next_hop-> + outbound_fc_window_size_used)); + consider_sending_fc (vl_next_hop); + continue; /* We have a message, but flow control says "nope" for the first hop of this path */ + } + for (struct Queue *queue = nh->queue_head; NULL != queue; + queue = queue->next_neighbour) + if ((GNUNET_YES == queue->idle) && + (queue->validated_until.abs_value_us > now.abs_value_us)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Next hop neighbour %s not stalled\n", + GNUNET_i2s (&nh->pid)); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + queue, + GNUNET_SCHEDULER_PRIORITY_BACKGROUND); + elig = GNUNET_YES; + } + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "DV Queue QID: %u (%u) busy or invalid\n", + queue->qid, + queue->idle); + } + } + } + if (GNUNET_YES == elig) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Eligible message %lu of size %llu to %s: %llu/%llu\n", + pm->logging_uuid, + pm->bytes_msg, + GNUNET_i2s (&vl->target), + (unsigned long long) vl->outbound_fc_window_size, + (unsigned long long) (pm->bytes_msg + + vl->outbound_fc_window_size_used)); + break; } } @@ -5064,7 +5193,7 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) bytes_msg = ntohs (obmm->size); pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority); vl = lookup_virtual_link (&obm->peer); - if (NULL == vl) + if ((NULL == vl) || (GNUNET_NO == vl->confirmed)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Don't have %s as a neighbour (anymore).\n", @@ -5154,7 +5283,7 @@ handle_communicator_backchannel ( strlen (is) + 1); // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED); vl = lookup_virtual_link (&cb->pid); - if (NULL != vl) + if ((NULL != vl) && (GNUNET_YES == vl->confirmed)) { route_control_message_without_fc (vl, &be->header, RMO_DV_ALLOWED); } @@ -5428,7 +5557,7 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) return; } vl = lookup_virtual_link (&cmc->im.sender); - if (NULL == vl) + if ((NULL == vl) || (GNUNET_NO == vl->confirmed)) { /* FIXME: sender is giving us messages for CORE but we don't have the link up yet! I *suspect* this can happen right now (i.e. @@ -5624,7 +5753,7 @@ transmit_cummulative_ack_cb (void *cls) &ack->header, RMO_DV_ALLOWED);*/ vl = lookup_virtual_link (&ac->target); - if (NULL != vl) + if ((NULL != vl) && (GNUNET_YES == vl->confirmed)) { route_control_message_without_fc ( vl, @@ -5765,7 +5894,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) struct FindByMessageUuidContext fc; vl = lookup_virtual_link (&cmc->im.sender); - if (NULL == vl) + if ((NULL == vl) || (GNUNET_NO == vl->confirmed)) { struct GNUNET_SERVICE_Client *client = cmc->tc->client; @@ -5794,6 +5923,7 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) fb->msg_uuid.uuid, &find_by_message_uuid, &fc); + fsize = ntohs (fb->header.size) - sizeof(*fb); if (NULL == (rc = fc.rc)) { rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */ @@ -5815,11 +5945,16 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); target = (char *) &rc[1]; rc->bitfield = (uint8_t *) (target + rc->msg_size); - rc->msg_missing = rc->msg_size; + if (fsize != rc->msg_size) + rc->msg_missing = rc->msg_size; + else + rc->msg_missing = 0; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received fragment at offset %u/%u from %s for NEW message %u\n", + "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %u\n", + fsize, ntohs (fb->frag_off), msize, + rc->msg_missing, GNUNET_i2s (&cmc->im.sender), (unsigned int) fb->msg_uuid.uuid); } @@ -5841,7 +5976,6 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) } /* reassemble */ - fsize = ntohs (fb->header.size) - sizeof(*fb); if (0 == fsize) { GNUNET_break (0); @@ -5918,6 +6052,16 @@ check_reliability_box (void *cls, const struct TransportReliabilityBoxMessage *rb) { (void) cls; + const struct GNUNET_MessageHeader *inbox = (const struct + GNUNET_MessageHeader *) &rb[1]; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "check_send_msg with size %u: inner msg type %u and size %u (%u %u)\n", + ntohs (rb->header.size), + ntohs (inbox->type), + ntohs (inbox->size), + sizeof (struct TransportReliabilityBoxMessage), + sizeof (struct GNUNET_MessageHeader)); GNUNET_MQ_check_boxed_message (rb); return GNUNET_YES; } @@ -6060,6 +6204,10 @@ completed_pending_message (struct PendingMessage *pm) { struct PendingMessage *pos; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Complete transmission of message %llu %u\n", + pm->logging_uuid, + pm->pmt); switch (pm->pmt) { case PMT_CORE: @@ -6069,7 +6217,7 @@ completed_pending_message (struct PendingMessage *pm) return; case PMT_FRAGMENT_BOX: - /* Fragment sent over reliabile channel */ + /* Fragment sent over reliable channel */ free_fragment_tree (pm); pos = pm->frag_parent; GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); @@ -6080,11 +6228,17 @@ completed_pending_message (struct PendingMessage *pm) { pm = pos; pos = pm->frag_parent; + if (PMT_DV_BOX == pm->pmt) + { + GNUNET_free (pm); + client_send_response (pos); + return; + } GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm); GNUNET_free (pm); } - /* Was this the last applicable fragmment? */ + /* Was this the last applicable fragment? */ if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) && (pos->frag_off == pos->bytes_msg)) client_send_response (pos); @@ -6293,6 +6447,7 @@ handle_backchannel_encapsulation ( target_communicator); GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO); GNUNET_free (stastr); + finish_cmc_handling (cmc); return; } /* Finally, deliver backchannel message to communicator */ @@ -6309,6 +6464,7 @@ handle_backchannel_encapsulation ( cbi->pid = cmc->im.sender; memcpy (&cbi[1], inbox, isize); GNUNET_MQ_send (tc->mq, env); + finish_cmc_handling (cmc); } @@ -6359,39 +6515,54 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) struct VirtualLink *vl; vl = lookup_virtual_link (&dv->target); - if (NULL != vl) + if (NULL == vl) { - /* Link was already up, remember dv is also now available and we are done */ - vl->dv = dv; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Virtual link to %s could now also use DV!\n", + "Creating new virtual link to %s using DV!\n", GNUNET_i2s (&dv->target)); - return; + vl = GNUNET_new (struct VirtualLink); + vl->confirmed = GNUNET_YES; + vl->message_uuid_ctr = + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); + vl->target = dv->target; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->available_fc_window_size = DEFAULT_WINDOW_SIZE; + vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + vl->dv = dv; + dv->vl = vl; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); + consider_sending_fc (vl); + /* We lacked a confirmed connection to the target + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&dv->target); + } + else + { + /* Link was already up, remember dv is also now available and we are done */ + vl->dv = dv; + dv->vl = vl; + if (GNUNET_NO == vl->confirmed) + { + vl->confirmed = GNUNET_YES; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); + consider_sending_fc (vl); + /* We lacked a confirmed connection to the target + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&dv->target); + } + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Virtual link to %s could now also use DV!\n", + GNUNET_i2s (&dv->target)); } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating new virtual link to %s using DV!\n", - GNUNET_i2s (&dv->target)); - vl = GNUNET_new (struct VirtualLink); - vl->message_uuid_ctr = - GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); - vl->target = dv->target; - vl->dv = dv; - dv->vl = vl; - vl->core_recv_window = RECV_WINDOW_SIZE; - vl->available_fc_window_size = DEFAULT_WINDOW_SIZE; - vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE; - vl->visibility_task = - GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_put ( - links, - &vl->target, - vl, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - consider_sending_fc (vl); - /* We lacked a confirmed connection to the target - before, so tell CORE about it (finally!) */ - cores_send_connect_info (&dv->target); } @@ -6530,8 +6701,10 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Refreshed known path to %s, forwarding further\n", - GNUNET_i2s (&dv->target)); + "Refreshed known path to %s valid until %s, forwarding further\n", + GNUNET_i2s (&dv->target), + GNUNET_STRINGS_absolute_time_to_string ( + pos->path_valid_until)); return GNUNET_YES; } } @@ -6548,8 +6721,9 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, } /* create new DV path entry */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Discovered new DV path to %s\n", - GNUNET_i2s (&dv->target)); + "Discovered new DV path to %s valid until %s\n", + GNUNET_i2s (&dv->target), + GNUNET_STRINGS_absolute_time_to_string (path_valid_until)); hop = GNUNET_malloc (sizeof(struct DistanceVectorHop) + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3)); hop->next_hop = next_hop; @@ -6680,7 +6854,7 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);*/ vl = lookup_virtual_link (next_hop); - if (NULL != vl) + if ((NULL != vl) && (GNUNET_YES == vl->confirmed)) { route_control_message_without_fc (vl, &fwd->header, @@ -6977,13 +7151,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) struct GNUNET_TIME_Absolute in_time; struct Neighbour *n; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "handle dv learn message sender %s\n", - GNUNET_i2s (&cmc->im.sender)); - nhops = ntohs (dvl->num_hops); /* 0 = sender is initiator */ bi_history = ntohs (dvl->bidirectional); hops = (const struct DVPathEntryP *) &dvl[1]; @@ -7017,10 +7184,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) cc); // FIXME: add bi-directional flag to cc? in_time = GNUNET_TIME_absolute_get (); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "2 handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); - /* continue communicator here, everything else can happen asynchronous! */ finish_cmc_handling (cmc); @@ -7055,7 +7218,11 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) if (GNUNET_YES == n->dv_monotime_available) { if (NULL != n->sc) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "store cancel\n"); GNUNET_PEERSTORE_store_cancel (n->sc); + } n->sc = GNUNET_PEERSTORE_store (peerstore, "transport", @@ -7069,9 +7236,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) n); } } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "3 handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!, If signature verification load too high, implement random drop strategy */ for (unsigned int i = 0; i < nhops; i++) @@ -7110,9 +7274,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) return; } } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "4 handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); if (GNUNET_EXTRA_LOGGING > 0) { char *path; @@ -7137,9 +7298,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) GNUNET_i2s (&GST_my_identity)); GNUNET_free (path); } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "5 handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); do_fwd = GNUNET_YES; if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator)) { @@ -7147,6 +7305,7 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) struct GNUNET_TIME_Relative host_latency_sum; struct GNUNET_TIME_Relative latency; struct GNUNET_TIME_Relative network_latency; + struct GNUNET_TIME_Absolute now; /* We initiated this, learn the forward path! */ path[0] = GST_my_identity; @@ -7155,7 +7314,11 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) // Need also something to lookup initiation time // to compute RTT! -> add RTT argument here? - latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly + now = GNUNET_TIME_absolute_get (); + latency = GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh ( + dvl->monotonic_time)); + GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us); + // latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly // (based on dvl->challenge, we can identify time of origin!) network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum); @@ -7184,9 +7347,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) do_fwd = GNUNET_NO; return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "6 handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); if (bi_hop) { /* last hop was bi-directional, we could learn something here! */ @@ -7243,9 +7403,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) } } } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "7 handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); if (MAX_DV_HOPS_ALLOWED == nhops) { /* At limit, we're out of here! */ @@ -7305,9 +7462,6 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) &dv_neighbour_transmission, &nsc); } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "9 handle dv learn message from %s\n", - GNUNET_i2s (&dvl->initiator)); } @@ -7374,10 +7528,17 @@ forward_dv_box (struct Neighbour *next_hop, char msg_buf[msg_size] GNUNET_ALIGN; struct GNUNET_PeerIdentity *dhops; + if (GNUNET_NO == ntohs (hdr->without_fc)) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "forward dv box without fc\n"); + if (NULL == vl) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "forward dv box vl null\n"); GNUNET_assert (GNUNET_YES == ntohs (hdr->without_fc) || NULL != vl); hdr->num_hops = htons (num_hops); hdr->total_hops = htons (total_hops); + hdr->header.size = htons (msg_size); memcpy (msg_buf, hdr, sizeof(*hdr)); dhops = (struct GNUNET_PeerIdentity *) &msg_buf[sizeof(struct TransportDVBoxMessage)]; @@ -7387,7 +7548,8 @@ forward_dv_box (struct Neighbour *next_hop, if (GNUNET_YES == ntohs (hdr->without_fc)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Forwarding control message in DV Box to next hop %s (%u/%u) \n", + "Forwarding control message (payload size %u) in DV Box to next hop %s (%u/%u) \n", + enc_payload_size, GNUNET_i2s (&next_hop->pid), (unsigned int) num_hops, (unsigned int) total_hops); @@ -7444,6 +7606,8 @@ free_backtalker (struct Backtalker *b) } if (NULL != b->sc) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "store cancel\n"); GNUNET_PEERSTORE_store_cancel (b->sc); b->sc = NULL; } @@ -7486,6 +7650,8 @@ backtalker_timeout_cb (void *cls) { struct Backtalker *b = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "backtalker timeout.\n"); b->task = NULL; if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us) { @@ -7589,6 +7755,8 @@ update_backtalker_monotime (struct Backtalker *b) if (NULL != b->sc) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "store cancel\n"); GNUNET_PEERSTORE_store_cancel (b->sc); b->sc = NULL; } @@ -7714,8 +7882,8 @@ handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb) dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, key); hdr = (const char *) &dvb[1]; - hdr_len = ntohs (dvb->header.size) - sizeof(*dvb) - sizeof(struct - GNUNET_PeerIdentity) + hdr_len = ntohs (dvb->orig_size) - sizeof(*dvb) - sizeof(struct + GNUNET_PeerIdentity) * ntohs (dvb->total_hops); dv_hmac (key, &hmac, hdr, hdr_len); @@ -8112,7 +8280,7 @@ handle_validation_challenge ( } sender = cmc->im.sender; vl = lookup_virtual_link (&sender); - if (NULL != vl) + if ((NULL != vl) && (GNUNET_YES == vl->confirmed)) { // route_control_message_without_fc (&cmc->im.sender, route_control_message_without_fc (vl, @@ -8380,46 +8548,63 @@ handle_validation_response ( q->pd.aged_rtt = vs->validation_rtt; n = q->neighbour; vl = lookup_virtual_link (&vs->pid); - if (NULL != vl) + if (NULL == vl) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new virtual link to %s using direct neighbour!\n", + GNUNET_i2s (&vs->pid)); + vl = GNUNET_new (struct VirtualLink); + vl->confirmed = GNUNET_YES; + vl->message_uuid_ctr = + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); + vl->target = n->pid; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->available_fc_window_size = DEFAULT_WINDOW_SIZE; + vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + vl->n = n; + n->vl = vl; + q->idle = GNUNET_YES; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); + consider_sending_fc (vl); + /* We lacked a confirmed connection to the target + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&n->pid); + } + else { /* Link was already up, remember n is also now available and we are done */ if (NULL == vl->n) { vl->n = n; n->vl = vl; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Virtual link to %s could now also direct neighbour!\n", - GNUNET_i2s (&vs->pid)); + if (GNUNET_YES == vl->confirmed) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Virtual link to %s could now also direct neighbour!\n", + GNUNET_i2s (&vs->pid)); } else { GNUNET_assert (n == vl->n); } - return; + if (GNUNET_NO == vl->confirmed) + { + vl->confirmed = GNUNET_YES; + q->idle = GNUNET_YES; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); + consider_sending_fc (vl); + /* We lacked a confirmed connection to the target + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&n->pid); + } } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating new virtual link to %s using direct neighbour!\n", - GNUNET_i2s (&vs->pid)); - vl = GNUNET_new (struct VirtualLink); - vl->target = n->pid; - vl->n = n; - n->vl = vl; - q->idle = GNUNET_YES; - vl->core_recv_window = RECV_WINDOW_SIZE; - vl->available_fc_window_size = DEFAULT_WINDOW_SIZE; - vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE; - vl->visibility_task = - GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_put ( - links, - &vl->target, - vl, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - consider_sending_fc (vl); - /* We lacked a confirmed connection to the target - before, so tell CORE about it (finally!) */ - cores_send_connect_info (&n->pid); } @@ -8462,6 +8647,7 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc) struct GNUNET_TIME_Absolute st; uint64_t os; uint64_t wnd; + uint32_t random; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received FC from %s\n", GNUNET_i2s (&cmc->im.sender)); @@ -8469,13 +8655,22 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc) if (NULL == vl) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "FC dropped: VL unknown\n"); - GNUNET_STATISTICS_update (GST_stats, - "# FC dropped: Virtual link unknown", - 1, - GNUNET_NO); - finish_cmc_handling (cmc); - return; + "No virtual link for FC creating new unconfirmed virtual link to %s!\n", + GNUNET_i2s (&cmc->im.sender)); + vl = GNUNET_new (struct VirtualLink); + vl->confirmed = GNUNET_NO; + vl->message_uuid_ctr = + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); + vl->target = cmc->im.sender; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->available_fc_window_size = DEFAULT_WINDOW_SIZE; + vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); } st = GNUNET_TIME_absolute_ntoh (fc->sender_time); if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us) @@ -8509,15 +8704,20 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc) (unsigned long long) vl->outbound_fc_window_size, (long long) vl->incoming_fc_window_size_loss); wnd = GNUNET_ntohll (fc->outbound_window_size); - if ((wnd < vl->incoming_fc_window_size) || - (vl->last_outbound_window_size_received != wnd) || - (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX) - % FC_NO_CHANGE_REPLY_PROBABILITY)) + random = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); + if ((GNUNET_YES == vl->confirmed) && ((wnd < vl->incoming_fc_window_size) || + (vl->last_outbound_window_size_received + != wnd) || + (0 == random + % FC_NO_CHANGE_REPLY_PROBABILITY))) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Consider re-sending our FC message, as clearly the other peer's idea of the window is not up-to-date (%llu vs %llu)\n", + "Consider re-sending our FC message, as clearly the other peer's idea of the window is not up-to-date (%llu vs %llu) or %llu last received differs, or random reply %lu\n", (unsigned long long) wnd, - (unsigned long long) vl->incoming_fc_window_size); + (unsigned long long) vl->incoming_fc_window_size, + (unsigned long long) vl->last_outbound_window_size_received, + random % FC_NO_CHANGE_REPLY_PROBABILITY); consider_sending_fc (vl); } if ((wnd == vl->incoming_fc_window_size) && @@ -8530,6 +8730,7 @@ handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc) (unsigned long long) wnd); GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task); vl->fc_retransmit_task = NULL; + vl->fc_retransmit_count = 0; } vl->last_outbound_window_size_received = wnd; /* FC window likely increased, check transmission possibilities! */ @@ -8729,7 +8930,6 @@ fragment_message (struct Queue *queue, pm->bytes_msg, GNUNET_i2s (&pm->vl->target), (unsigned int) mtu); - pa = prepare_pending_acknowledgement (queue, dvh, pm); /* This invariant is established in #handle_add_queue_message() */ GNUNET_assert (mtu > sizeof(struct TransportFragmentBoxMessage)); @@ -8745,7 +8945,7 @@ fragment_message (struct Queue *queue, ff = ff->head_frag; /* descent into fragmented fragments */ } - if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg)) + if (((ff->bytes_msg > mtu) || (pm == ff)) && (ff->frag_off < ff->bytes_msg)) { /* Did not yet calculate all fragments, calculate next fragment */ struct PendingMessage *frag; @@ -8783,13 +8983,15 @@ fragment_message (struct Queue *queue, tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); tfb.header.size = htons (sizeof(struct TransportFragmentBoxMessage) + fragsize); + pa = prepare_pending_acknowledgement (queue, dvh, frag); tfb.ack_uuid = pa->ack_uuid; tfb.msg_uuid = pm->msg_uuid; tfb.frag_off = htons (ff->frag_off + xoff); tfb.msg_size = htons (pm->bytes_msg); memcpy (msg, &tfb, sizeof(tfb)); memcpy (&msg[sizeof(tfb)], &orig[ff->frag_off], fragsize); - GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, ff->tail_frag, frag); + GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, + ff->tail_frag, frag); ff->frag_off += fragsize; ff = frag; } @@ -8803,6 +9005,7 @@ fragment_message (struct Queue *queue, ff->frag_parent->head_frag, ff->frag_parent->tail_frag, ff); + return ff; } @@ -8829,7 +9032,7 @@ reliability_box_message (struct Queue *queue, struct PendingMessage *bpm; char *msg; - if (PMT_CORE != pm->pmt) + if ((PMT_CORE != pm->pmt) && (PMT_DV_BOX != pm->pmt)) return pm; /* already fragmented or reliability boxed, or control message: do nothing */ if (NULL != pm->bpm) @@ -8842,11 +9045,7 @@ reliability_box_message (struct Queue *queue, client_send_response (pm); return NULL; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Preparing reliability box for message <%llu> to %s on queue %s\n", - pm->logging_uuid, - GNUNET_i2s (&pm->vl->target), - queue->address); + pa = prepare_pending_acknowledgement (queue, dvh, pm); bpm = GNUNET_malloc (sizeof(struct PendingMessage) + sizeof(rbox) @@ -8869,6 +9068,13 @@ reliability_box_message (struct Queue *queue, memcpy (msg, &rbox, sizeof(rbox)); memcpy (&msg[sizeof(rbox)], &pm[1], pm->bytes_msg); pm->bpm = bpm; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing reliability box for message <%llu> of size %lu (%lu) to %s on queue %s\n", + pm->logging_uuid, + pm->bytes_msg, + ntohs (((const struct GNUNET_MessageHeader *) &pm[1])->size), + GNUNET_i2s (&pm->vl->target), + queue->address); return bpm; } @@ -8911,6 +9117,7 @@ update_pm_next_attempt (struct PendingMessage *pm, { struct VirtualLink *vl = pm->vl; + // TODO Do we really need a next_attempt value for PendingMessage other than the root Pending Message? pm->next_attempt = next_attempt; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Next attempt for message <%llu> set to %s\n", @@ -8921,9 +9128,17 @@ update_pm_next_attempt (struct PendingMessage *pm, { reorder_root_pm (pm, next_attempt); } - else if ((PMT_RELIABILITY_BOX == pm->pmt)||(PMT_DV_BOX == pm->pmt)) + else if ((PMT_RELIABILITY_BOX == pm->pmt) || (PMT_DV_BOX == pm->pmt)) { - reorder_root_pm (pm->frag_parent, next_attempt); + struct PendingMessage *root = pm->frag_parent; + + while (NULL != root->frag_parent) + root = root->frag_parent; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Next attempt for root message <%llu> set to %s\n", + root->logging_uuid); + root->next_attempt = next_attempt; + reorder_root_pm (root, next_attempt); } else { @@ -8988,6 +9203,16 @@ struct PendingMessageScoreContext * Did we have to reliability box? */ int relb; + + /** + * There are pending messages, but it was to early to send one of them. + */ + int to_early; + + /** + * When will we try to transmit the message again for which it was to early to retry. + */ + struct GNUNET_TIME_Relative to_early_retry_delay; }; @@ -9020,14 +9245,32 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc, int relb; if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "DV messages must not be DV-routed to next hop!\n"); continue; /* DV messages must not be DV-routed to next hop! */ + } if (pos->next_attempt.abs_value_us > now.abs_value_us) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "too early for all messages, they are sorted by next_attempt\n"); + sc->to_early = GNUNET_YES; + break; /* too early for all messages, they are sorted by next_attempt */ + } + sc->to_early = GNUNET_NO; if (NULL != pos->qe) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "not eligible\n"); continue; /* not eligible */ + } sc->consideration_counter++; /* determine if we have to fragment, if so add fragmentation overhead! */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "check %u for sc->best\n", + pos->logging_uuid); frag = GNUNET_NO; if (((0 != queue->mtu) && (pos->bytes_msg + real_overhead > queue->mtu)) || @@ -9038,6 +9281,10 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc, respect that even if MTU is UINT16_MAX for this queue */)) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "fragment msg with size %u, realoverhead is %u\n", + pos->bytes_msg, + real_overhead); frag = GNUNET_YES; if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) { @@ -9064,7 +9311,10 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc, { relb = GNUNET_YES; } - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Create reliability box of msg with size %u, realoverhead is %u\n", + pos->bytes_msg, + real_overhead); } /* Finally, compare to existing 'best' in sc to see if this 'pos' pending @@ -9107,7 +9357,13 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc, pm_score -= queue->mtu - (real_overhead + pos->bytes_msg); } if (sc_score + time_delta > pm_score) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "sc_score of %u larger, keep sc->best %u\n", + pos->logging_uuid, + sc->best->logging_uuid); continue; /* sc_score larger, keep sc->best */ + } } sc->best = pos; sc->dvh = dvh; @@ -9216,6 +9472,10 @@ transmit_on_queue (void *cls) "No pending messages, queue `%s' to %s now idle\n", queue->address, GNUNET_i2s (&n->pid)); + if (GNUNET_YES == sc.to_early) + schedule_transmit_on_queue (sc.to_early_retry_delay, + queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); queue->idle = GNUNET_YES; return; } @@ -9226,9 +9486,17 @@ transmit_on_queue (void *cls) pm = sc.best; if (NULL != sc.dvh) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Is this %u a DV box?\n", + pm->pmt); GNUNET_assert (PMT_DV_BOX != pm->pmt); if (NULL != sc.best->bpm) { + const struct DVPathEntryP *hops_old; + const struct DVPathEntryP *hops_selected; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Discard old box\n"); /* We did this boxing before, but possibly for a different path! Discard old DV box! OPTIMIZE-ME: we might want to check if it is the same and then not re-build the message... */ @@ -9246,6 +9514,13 @@ transmit_on_queue (void *cls) RMO_NONE, GNUNET_NO); GNUNET_assert (NULL != sc.best->bpm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%u %u %u %u %u\n", + sizeof(struct GNUNET_PeerIdentity), + sizeof(struct TransportDVBoxMessage), + sizeof(struct TransportDVBoxPayloadP), + sizeof(struct TransportFragmentBoxMessage), + ((const struct GNUNET_MessageHeader *) &sc.best[1])->size); pm = sc.best->bpm; } if (GNUNET_YES == sc.frag) @@ -9258,7 +9533,9 @@ transmit_on_queue (void *cls) queue->address, GNUNET_i2s (&n->pid), sc.best->logging_uuid); - schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); return; } } @@ -9274,7 +9551,9 @@ transmit_on_queue (void *cls) queue->address, GNUNET_i2s (&n->pid), sc.best->logging_uuid); - schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); return; } } @@ -9323,6 +9602,8 @@ transmit_on_queue (void *cls) } else { + struct GNUNET_TIME_Relative wait_duration; + /* Message not finished, waiting for acknowledgement. Update time by which we might retransmit 's' based on queue characteristics (i.e. RTT); it takes one RTT for the message to @@ -9333,18 +9614,33 @@ transmit_on_queue (void *cls) OPTIMIZE: Note that in the future this heuristic should likely be improved further (measure RTT stability, consider message urgency and size when delaying ACKs, etc.) */ + + if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != + queue->pd.aged_rtt.rel_value_us) + wait_duration = queue->pd.aged_rtt; + else + wait_duration = DEFAULT_ACK_WAIT_DURATION; + struct GNUNET_TIME_Absolute next = GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_relative_multiply ( + wait_duration, 4)); + struct GNUNET_TIME_Relative plus = GNUNET_TIME_relative_multiply ( + wait_duration, 4); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Waiting %s for ACK\n", + "Waiting %s (%llu) for ACK until %llu\n", GNUNET_STRINGS_relative_time_to_string ( GNUNET_TIME_relative_multiply ( - queue->pd.aged_rtt, 4), GNUNET_NO)); + queue->pd.aged_rtt, 4), GNUNET_NO), + plus, + next); update_pm_next_attempt (pm, GNUNET_TIME_relative_to_absolute ( - GNUNET_TIME_relative_multiply (queue->pd.aged_rtt, + GNUNET_TIME_relative_multiply (wait_duration, 4))); } /* finally, re-schedule queue transmission task itself */ - schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); } @@ -9476,7 +9772,9 @@ handle_send_message_ack (void *cls, NULL != queue; queue = queue->next_client) { - schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); } } else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length) @@ -9486,7 +9784,9 @@ handle_send_message_ack (void *cls, "# Transmission throttled due to queue queue limit", -1, GNUNET_NO); - schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + qe->queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); } else if (1 == qe->queue->q_capacity) { @@ -9500,7 +9800,9 @@ handle_send_message_ack (void *cls, "# Transmission throttled due to message queue capacity", -1, GNUNET_NO); - schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + qe->queue, + GNUNET_SCHEDULER_PRIORITY_DEFAULT); } if (NULL != (pm = qe->pm)) @@ -10090,7 +10392,8 @@ handle_add_queue_message (void *cls, &check_validation_request_pending, queue); /* look for traffic for this queue */ - schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); + schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, + queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT); /* might be our first queue, try launching DV learning */ if (NULL == dvlearn_task) dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL); diff --git a/src/transport/test_transport_distance_vector_circle_topo.conf b/src/transport/test_transport_distance_vector_circle_topo.conf index ce26bf923..210179291 100644 --- a/src/transport/test_transport_distance_vector_circle_topo.conf +++ b/src/transport/test_transport_distance_vector_circle_topo.conf @@ -2,7 +2,7 @@ M:1 N:3 X:0 AC:1 -B:1 +B:0 T:libgnunet_test_transport_plugin_cmd_simple_send_dv R:1|{tcp_port:0}|{udp_port:1} R:2|{tcp_port:0}|{udp_port:1} diff --git a/src/transport/test_transport_plugin_cmd_simple_send_dv.c b/src/transport/test_transport_plugin_cmd_simple_send_dv.c index 167120e2b..f1f168102 100644 --- a/src/transport/test_transport_plugin_cmd_simple_send_dv.c +++ b/src/transport/test_transport_plugin_cmd_simple_send_dv.c @@ -65,12 +65,12 @@ struct TestState */ struct GNUNET_TESTING_NetjailTopology *topology; - /** +}; + +/** * The number of messages received. */ - unsigned int number_received; - -}; +static unsigned int number_received; static struct GNUNET_TESTING_Command block_send; @@ -105,52 +105,61 @@ static void handle_test (void *cls, const struct GNUNET_TRANSPORT_TESTING_TestMessage *message) { - struct TestState *ts = cls; + struct GNUNET_PeerIdentity *peer = cls; const struct GNUNET_TESTING_AsyncContext *ac_block; const struct GNUNET_TESTING_AsyncContext *ac_start; const struct GNUNET_TESTING_Command *cmd; const struct GNUNET_CONTAINER_MultiShortmap *connected_peers_map; unsigned int connected; struct BlockState *bs; + struct GNUNET_TRANSPORT_CoreHandle *ch; + const struct StartPeerState *sps; - + GNUNET_TRANSPORT_get_trait_state (&start_peer, + &sps); + ch = sps->th; GNUNET_TRANSPORT_get_trait_connected_peers_map (&start_peer, &connected_peers_map); - connected = GNUNET_CONTAINER_multishortmap_size ( - connected_peers_map); + if (NULL != connected_peers_map) + { + connected = GNUNET_CONTAINER_multishortmap_size ( + connected_peers_map); - ts->number_received++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received %u test message(s) from %u connected peer(s)\n", - ts->number_received, - connected); + number_received++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received %u test message(s) from %s, %u connected peer(s)\n", + number_received, + GNUNET_i2s (peer), + connected); - GNUNET_TESTING_get_trait_async_context (&block_receive, - &ac_block); + GNUNET_TESTING_get_trait_async_context (&block_receive, + &ac_block); - if ( connected == ts->number_received) - { - if (NULL != ac_block->is) + if ( connected == number_received) { - GNUNET_assert (NULL != ac_block); - if (NULL == ac_block->cont) - GNUNET_TESTING_async_fail ((struct - GNUNET_TESTING_AsyncContext *) ac_block); - else - GNUNET_TESTING_async_finish ((struct + if (NULL != ac_block->is) + { + GNUNET_assert (NULL != ac_block); + if (NULL == ac_block->cont) + GNUNET_TESTING_async_fail ((struct GNUNET_TESTING_AsyncContext *) ac_block); - } - else - { - GNUNET_TESTING_get_trait_block_state ( - &block_receive, - (const struct BlockState **) &bs); - bs->asynchronous_finish = GNUNET_YES; - } + else + GNUNET_TESTING_async_finish ((struct + GNUNET_TESTING_AsyncContext *) ac_block); + } + else + { + GNUNET_TESTING_get_trait_block_state ( + &block_receive, + (const struct BlockState **) &bs); + bs->asynchronous_finish = GNUNET_YES; + } + } } + GNUNET_TRANSPORT_core_receive_continue (ch, peer); } @@ -284,10 +293,6 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, char *router_ip, GNUNET_MQ_handler_end () }; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "number_received %u\n", - ts->number_received); - if (GNUNET_YES == *read_file) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -347,7 +352,7 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, char *router_ip, handlers, ts->cfgname, notify_connect, - GNUNET_YES); + GNUNET_NO); struct GNUNET_TESTING_Command commands[] = { GNUNET_TESTING_cmd_system_create ("system-create", ts->testdir), diff --git a/src/transport/transport_api_cmd_start_peer.c b/src/transport/transport_api_cmd_start_peer.c index e305e24e1..7448eff5a 100644 --- a/src/transport/transport_api_cmd_start_peer.c +++ b/src/transport/transport_api_cmd_start_peer.c @@ -124,7 +124,7 @@ notify_connect (void *cls, struct GNUNET_HashCode hc; struct GNUNET_CRYPTO_EddsaPublicKey public_key = peer->public_key; - void *ret = sps->handlers; + void *ret = (struct GNUNET_PeerIdentity *) peer; LOG (GNUNET_ERROR_TYPE_DEBUG, "This Peer %s \n", diff --git a/src/util/mq.c b/src/util/mq.c index b09837459..40ac97bbe 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -273,7 +273,7 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, break; } } -done: + done: if (GNUNET_NO == handled) { LOG (GNUNET_ERROR_TYPE_INFO, @@ -384,8 +384,9 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, mq->current_envelope = ev; LOG (GNUNET_ERROR_TYPE_DEBUG, - "sending message of type %u, queue empty (MQ: %p)\n", + "sending message of type %u and size %u, queue empty (MQ: %p)\n", ntohs (ev->mh->type), + ntohs (ev->mh->size), mq); mq->send_impl (mq, @@ -479,8 +480,10 @@ impl_send_continue (void *cls) mq->current_envelope); LOG (GNUNET_ERROR_TYPE_DEBUG, - "sending message of type %u from queue\n", - ntohs (mq->current_envelope->mh->type)); + "sending message of type %u and size %u from queue (MQ: %p)\n", + ntohs (mq->current_envelope->mh->type), + ntohs (mq->current_envelope->mh->size), + mq); mq->send_impl (mq, mq->current_envelope->mh, -- cgit v1.2.3