From 3f945e6798d8d736ceb104b59ea1269a7abdfe8a Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 28 Apr 2019 19:32:10 +0200 Subject: towards flow control in TNG --- src/transport/gnunet-service-tng.c | 575 +++++++++++++++++-------------------- 1 file changed, 259 insertions(+), 316 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index c2922dd7e..825d45522 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -24,6 +24,11 @@ * * TODO: * Implement next: + * - complete flow control push back from CORE via TRANSPORT to communicators: + * + resume communicators in handle_client_recv_ok (see FIXME) + * + count transmissions to CORE and suspend communicators if window is full + * - check flow control push back from TRANSPROT to CORE: + * + check when to send ACKs * - change transport-core API to provide proper flow control in both * directions, allow multiple messages per peer simultaneously (tag * confirmations with unique message ID), and replace quota-out with @@ -113,6 +118,16 @@ */ #define MAX_DV_DISCOVERY_SELECTION 16 +/** + * Window size. How many messages to the same target do we pass + * to CORE without a RECV_OK in between? Small values limit + * thoughput, large values will increase latency. + * + * FIXME-OPTIMIZE: find out what good values are experimentally, + * maybe set adaptively (i.e. to observed available bandwidth). + */ +#define RECV_WINDOW_SIZE 4 + /** * Minimum number of hops we should forward DV learn messages * even if they are NOT useful for us in hope of looping @@ -1100,6 +1115,48 @@ struct PendingMessage; */ struct DistanceVectorHop; +/** + * A virtual link is another reachable peer that is known to CORE. It + * can be either a `struct Neighbour` with at least one confirmed + * `struct Queue`, or a `struct DistanceVector` with at least one + * confirmed `struct DistanceVectorHop`. With a virtual link we track + * data that is per neighbour that is not specific to how the + * connectivity is established. + */ +struct VirtualLink +{ + /** + * Identity of the peer at the other end of the link. + */ + struct GNUNET_PeerIdentity target; + + /** + * Task scheduled to possibly notfiy core that this peer is no + * longer counting as confirmed. Runs the #core_visibility_check(), + * which checks that some DV-path or a queue exists that is still + * considered confirmed. + */ + struct GNUNET_SCHEDULER_Task *visibility_task; + + /** + * Neighbour used by this virtual link, NULL if @e dv is used. + */ + struct Neighbour *n; + + /** + * Distance vector used by this virtual link, NULL if @e n is used. + */ + struct DistanceVector *dv; + + /** + * How many more messages can we send to core before we exhaust + * the receive window of CORE for this peer? If this hits zero, + * we must tell communicators to stop providing us more messages + * for this peer. + */ + unsigned int core_recv_window; +}; + /** * Data structure kept when we are waiting for an acknowledgement. @@ -1316,31 +1373,10 @@ struct DistanceVector struct GNUNET_SCHEDULER_Task *timeout_task; /** - * Task scheduled to possibly notfiy core that this queue is no longer - * counting as confirmed. Runs the #core_queue_visibility_check(). - */ - struct GNUNET_SCHEDULER_Task *visibility_task; - - /** - * Quota at which CORE is allowed to transmit to this peer - * (note that the value CORE should actually be told is this - * value plus the respective value in `struct Neighbour`). - * Should match the sum of the quotas of all of the paths. - * - * FIXME: not yet set, tricky to get right given multiple paths, - * many of which may be inactive! (=> Idea: measure???) - * FIXME: how do we set this value initially when we tell CORE? - * Options: start at a minimum value or at literally zero? - * (=> Current thought: clean would be zero!) - */ - struct GNUNET_BANDWIDTH_Value32NBO quota_out; - - /** - * Is one of the DV paths in this struct 'confirmed' and thus - * the cause for CORE to see this peer as connected? (Note that - * the same may apply to a `struct Neighbour` at the same time.) + * Do we have a confirmed working queue and are thus visible to + * CORE? If so, this is the virtual link, otherwise NULL. */ - int core_visible; + struct VirtualLink *link; }; @@ -1450,12 +1486,6 @@ struct Queue */ struct GNUNET_SCHEDULER_Task *transmit_task; - /** - * Task scheduled to possibly notfiy core that this queue is no longer - * counting as confirmed. Runs the #core_queue_visibility_check(). - */ - struct GNUNET_SCHEDULER_Task *visibility_task; - /** * How long do *we* consider this @e address to be valid? In the past or * zero if we have not yet validated it. Can be updated based on @@ -1642,11 +1672,6 @@ struct Neighbour */ struct Queue *queue_tail; - /** - * Task run to cleanup pending messages that have exceeded their timeout. - */ - struct GNUNET_SCHEDULER_Task *timeout_task; - /** * Handle for an operation to fetch @e last_dv_learn_monotime information from * the PEERSTORE, or NULL. @@ -1660,18 +1685,10 @@ struct Neighbour struct GNUNET_PEERSTORE_StoreContext *sc; /** - * Quota at which CORE is allowed to transmit to this peer - * (note that the value CORE should actually be told is this - * value plus the respective value in `struct DistanceVector`). - * Should match the sum of the quotas of all of the queues. - * - * FIXME: not yet set, tricky to get right given multiple queues! - * (=> Idea: measure???) - * FIXME: how do we set this value initially when we tell CORE? - * Options: start at a minimum value or at literally zero? - * (=> Current thought: clean would be zero!) + * Do we have a confirmed working queue and are thus visible to + * CORE? If so, this is the virtual link, otherwise NULL. */ - struct GNUNET_BANDWIDTH_Value32NBO quota_out; + struct VirtualLink *link; /** * Latest DVLearn monotonic time seen from this peer. Initialized only @@ -1679,17 +1696,6 @@ struct Neighbour */ struct GNUNET_TIME_Absolute last_dv_learn_monotime; - /** - * What is the earliest timeout of any message in @e pending_msg_tail? - */ - struct GNUNET_TIME_Absolute earliest_timeout; - - /** - * Do we have a confirmed working queue and are thus visible to - * CORE? - */ - int core_visible; - /** * Do we have the lastest value for @e last_dv_learn_monotime from * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE? @@ -2416,6 +2422,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes; */ static struct GNUNET_CONTAINER_MultiPeerMap *validation_map; +/** + * Map from PIDs to `struct VirtualLink` entries describing + * links CORE knows to exist. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *links; + /** * Map from challenges to `struct LearnLaunchEntry` values. */ @@ -2563,6 +2575,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece) } +/** + * Free virtual link. + * + * @param vl link data to free + */ +static void +free_virtual_link (struct VirtualLink *vl) +{ + GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl); + if (NULL != vl->visibility_task) + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + vl->visibility_task = NULL; + } + GNUNET_break (NULL == vl->n); + GNUNET_break (NULL == vl->dv); + GNUNET_free (vl); +} + + /** * Free validation state. * @@ -2684,8 +2716,6 @@ free_dv_route (struct DistanceVector *dv) GNUNET_assert ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); - if (NULL != dv->visibility_task) - GNUNET_SCHEDULER_cancel (dv->visibility_task); if (NULL != dv->timeout_task) GNUNET_SCHEDULER_cancel (dv->timeout_task); GNUNET_free (dv); @@ -2873,8 +2903,6 @@ free_neighbour (struct Neighbour *neighbour) GNUNET_CONTAINER_multipeermap_remove (neighbours, &neighbour->pid, neighbour)); - if (NULL != neighbour->timeout_task) - GNUNET_SCHEDULER_cancel (neighbour->timeout_task); if (NULL != neighbour->reassembly_map) { GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map, @@ -2917,19 +2945,16 @@ free_neighbour (struct Neighbour *neighbour) * * @param tc client to inform (must be CORE client) * @param pid peer the connection is for - * @param quota_out current quota for the peer */ static void core_send_connect_info (struct TransportClient *tc, - const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) + const struct GNUNET_PeerIdentity *pid) { struct GNUNET_MQ_Envelope *env; struct ConnectInfoMessage *cim; GNUNET_assert (CT_CORE == tc->type); env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim->quota_out = quota_out; cim->id = *pid; GNUNET_MQ_send (tc->mq, env); } @@ -2939,11 +2964,9 @@ core_send_connect_info (struct TransportClient *tc, * Send message to CORE clients that we gained a connection * * @param pid peer the queue was for - * @param quota_out current quota for the peer */ static void -cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) +cores_send_connect_info (const struct GNUNET_PeerIdentity *pid) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Informing CORE clients about connection to %s\n", @@ -2952,7 +2975,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, { if (CT_CORE != tc->type) continue; - core_send_connect_info (tc, pid, quota_out); + core_send_connect_info (tc, pid); } } @@ -3059,13 +3082,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) /** - * Check whether the CORE visibility of @a n changed. If so, - * check whether we need to notify CORE. + * Task run to check whether the hops of the @a cls still + * are validated, or if we need to core about disconnection. * - * @param n neighbour to perform the check for + * @param cls a `struct VirtualLink` */ static void -update_neighbour_core_visibility (struct Neighbour *n); +check_link_down (void *cls) +{ + struct VirtualLink *vl = cls; + struct DistanceVector *dv = vl->dv; + struct Neighbour *n = vl->n; + struct GNUNET_TIME_Absolute dvh_timeout; + struct GNUNET_TIME_Absolute q_timeout; + + vl->visibility_task = NULL; + dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS; + for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; + pos = pos->next_dv) + dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); + if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + vl->dv = NULL; + q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; + for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) + q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); + if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + vl->n = NULL; + if ((NULL == vl->n) && (NULL == vl->dv)) + { + cores_send_disconnect_info (&dv->target); + free_virtual_link (vl); + return; + } + vl->visibility_task = + GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout), + &check_link_down, + vl); +} /** @@ -3083,17 +3136,13 @@ free_queue (struct Queue *queue) struct QueueEntry *qe; int maxxed; struct PendingAcknowledgement *pa; + struct VirtualLink *vl; if (NULL != queue->transmit_task) { GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = NULL; } - if (NULL != queue->visibility_task) - { - GNUNET_SCHEDULER_cancel (queue->visibility_task); - queue->visibility_task = NULL; - } while (NULL != (pa = queue->pa_head)) { GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa); @@ -3139,9 +3188,12 @@ free_queue (struct Queue *queue) notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); GNUNET_free (queue); - update_neighbour_core_visibility (neighbour); - cores_send_disconnect_info (&neighbour->pid); - + vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid); + if ((NULL != vl) && (neighbour == vl->n)) + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + check_link_down (vl); + } if (NULL == neighbour->queue_head) { free_neighbour (neighbour); @@ -3281,12 +3333,12 @@ notify_client_connect_info (void *cls, void *value) { struct TransportClient *tc = cls; - struct Neighbour *neighbour = value; + (void) value; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Telling new CORE client about existing connection to %s\n", GNUNET_i2s (pid)); - core_send_connect_info (tc, pid, neighbour->quota_out); + core_send_connect_info (tc, pid); return GNUNET_OK; } @@ -3469,9 +3521,6 @@ client_send_response (struct PendingMessage *pm, if (NULL != tc) { env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - som->success = htonl ((uint32_t) success); - som->bytes_msg = htons (pm->bytes_msg); - som->bytes_physical = htonl (bytes_physical); som->peer = target->pid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Confirming %s transmission of %u/%u bytes to %s\n", @@ -3485,45 +3534,6 @@ client_send_response (struct PendingMessage *pm, } -/** - * Checks the message queue for a neighbour for messages that have timed - * out and purges them. - * - * @param cls a `struct Neighbour` - */ -static void -check_queue_timeouts (void *cls) -{ - struct Neighbour *n = cls; - struct PendingMessage *pm; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Absolute earliest_timeout; - - n->timeout_task = NULL; - earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; - now = GNUNET_TIME_absolute_get (); - for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm) - { - pm = pos->next_neighbour; - if (pos->timeout.abs_value_us <= now.abs_value_us) - { - GNUNET_STATISTICS_update (GST_stats, - "# messages dropped (timeout before confirmation)", - 1, - GNUNET_NO); - client_send_response (pm, GNUNET_NO, 0); - continue; - } - earliest_timeout = - GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout); - } - n->earliest_timeout = earliest_timeout; - if (NULL != n->pending_msg_head) - n->timeout_task = - GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n); -} - - /** * Create a DV Box message. * @@ -3689,30 +3699,18 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) const void *payload; size_t payload_size; struct TransportDVBoxMessage *dvb; + struct VirtualLink *vl; GNUNET_assert (CT_CORE == tc->type); obmm = (const struct GNUNET_MessageHeader *) &obm[1]; bytes_msg = ntohs (obmm->size); - target = lookup_neighbour (&obm->peer); - if (NULL == target) - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); - else - dv = NULL; - if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible))) + vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer); + if (NULL == vl) { /* Failure: don't have this peer as a neighbour (anymore). Might have gone down asynchronously, so this is NOT a protocol violation by CORE. Still count the event, as this should be rare. */ - struct GNUNET_MQ_Envelope *env; - struct SendOkMessage *som; - - env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - som->success = htonl (GNUNET_SYSERR); - som->bytes_msg = htonl (bytes_msg); - som->bytes_physical = htonl (0); - som->peer = obm->peer; - GNUNET_MQ_send (tc->mq, env); GNUNET_SERVICE_client_continue (tc->client); GNUNET_STATISTICS_update (GST_stats, "# messages dropped (neighbour unknown)", @@ -3720,6 +3718,12 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) GNUNET_NO); return; } + target = lookup_neighbour (&obm->peer); + if (NULL == target) + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); + else + dv = NULL; + GNUNET_assert ((NULL != target) || (NULL != dv)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending %u bytes to %s using %s\n", bytes_msg, @@ -3756,8 +3760,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) pm->client = tc; pm->target = target; pm->bytes_msg = payload_size; - pm->timeout = - GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); memcpy (&pm[1], payload, payload_size); GNUNET_free_non_null (dvb); dvb = NULL; @@ -3777,15 +3779,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); - if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) - { - target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; - if (NULL != target->timeout_task) - GNUNET_SCHEDULER_cancel (target->timeout_task); - target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout, - &check_queue_timeouts, - target); - } if (! was_empty) return; /* all queues must already be busy */ for (struct Queue *queue = target->queue_head; NULL != queue; @@ -3833,6 +3826,47 @@ check_communicator_available ( } +/** + * Client confirms that it is done handling message(s) to a particular + * peer. We may now provide more messages to CORE for this peer. + * + * Notifies the respective queues that more messages can now be received. + * + * @param cls the client + * @param rom the message that was sent + */ +static void +handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) +{ + struct TransportClient *tc = cls; + struct VirtualLink *vl; + uint32_t delta; + + if (CT_CORE != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer); + if (NULL == vl) + { + GNUNET_STATISTICS_update (GST_stats, + "# RECV_OK dropped: virtual link unknown", + 1, + GNUNET_NO); + GNUNET_SERVICE_client_continue (tc->client); + return; + } + delta = ntohl (rom->increase_window_delta); + vl->core_recv_window += delta; + if (delta == vl->core_recv_window) + { + // FIXME: resume communicators! + } +} + + /** * Communicator started. Process the request. * @@ -4090,20 +4124,18 @@ route_via_neighbour (const struct Neighbour *n, for (struct Queue *pos = n->queue_head; NULL != pos; pos = pos->next_neighbour) { - /* Count the queue with the visibility task in all cases, as - otherwise we may end up with no queues just because the - time for the visibility task just expired but the scheduler - just ran this task first */ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || - (pos->validated_until.abs_value_us > now.abs_value_us) || - (NULL != pos->visibility_task)) + (pos->validated_until.abs_value_us > now.abs_value_us)) candidates++; } if (0 == candidates) { - /* Given that we above check for pos->visibility task, - this should be strictly impossible. */ - GNUNET_break (0); + /* This can happen rarely if the last confirmed queue timed + out just as we were beginning to process this message. */ + GNUNET_STATISTICS_update (GST_stats, + "# route selection failed (all no valid queue)", + 1, + GNUNET_NO); return; } sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates); @@ -4115,12 +4147,8 @@ route_via_neighbour (const struct Neighbour *n, for (struct Queue *pos = n->queue_head; NULL != pos; pos = pos->next_neighbour) { - /* Count the queue with the visibility task in all cases, as - otherwise we may end up with no queues just because the - time for the visibility task just expired but the scheduler - just ran this task first */ - if ((pos->validated_until.abs_value_us > now.abs_value_us) || - (NULL != pos->visibility_task)) + if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || + (pos->validated_until.abs_value_us > now.abs_value_us)) { if ((sel1 == candidates) || (sel2 == candidates)) queue_send_msg (pos, NULL, hdr, ntohs (hdr->size)); @@ -4197,21 +4225,21 @@ route_message (const struct GNUNET_PeerIdentity *target, struct GNUNET_MessageHeader *hdr, enum RouteMessageOptions options) { + struct VirtualLink *vl; struct Neighbour *n; struct DistanceVector *dv; - n = lookup_neighbour (target); - dv = (0 != (options & RMO_DV_ALLOWED)) - ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target) - : NULL; + vl = GNUNET_CONTAINER_multipeermap_get (links, target); + n = vl->n; + dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL; if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) { /* if confirmed is required, and we do not have anything confirmed, drop respective options */ - if ((NULL != n) && (GNUNET_NO == n->core_visible)) - n = NULL; - if ((NULL != dv) && (GNUNET_NO == dv->core_visible)) - dv = NULL; + if (NULL == n) + n = lookup_neighbour (target); + if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED))) + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target); } if ((NULL == n) && (NULL == dv)) { @@ -5758,40 +5786,6 @@ path_cleanup_cb (void *cls) GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv); } -/** - * Task run to check whether the hops of the @a cls still - * are validated, or if we need to core about disconnection. - * - * @param cls a `struct DistanceVector` (with core_visible set!) - */ -static void -check_dv_path_down (void *cls) -{ - struct DistanceVector *dv = cls; - struct Neighbour *n; - - dv->visibility_task = NULL; - GNUNET_assert (GNUNET_YES == dv->core_visible); - for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; - pos = pos->next_dv) - { - if (0 < - GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us) - { - dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until, - &check_dv_path_down, - dv); - return; - } - } - /* all paths invalid, make dv core-invisible */ - dv->core_visible = GNUNET_NO; - n = lookup_neighbour (&dv->target); - if ((NULL != n) && (GNUNET_YES == n->core_visible)) - return; /* no need to tell core, connection still up! */ - cores_send_disconnect_info (&dv->target); -} - /** * The @a hop is a validated path to the respective target @@ -5804,22 +5798,30 @@ static void activate_core_visible_dv_path (struct DistanceVectorHop *hop) { struct DistanceVector *dv = hop->dv; - struct Neighbour *n; - - GNUNET_assert (GNUNET_NO == dv->core_visible); - GNUNET_assert (NULL == dv->visibility_task); + struct VirtualLink *vl; - dv->core_visible = GNUNET_YES; - dv->visibility_task = - GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv); - n = lookup_neighbour (&dv->target); - if ((NULL != n) && (GNUNET_YES == n->core_visible)) - return; /* no need to tell core, connection already up! */ - cores_send_connect_info (&dv->target, - (NULL != n) - ? GNUNET_BANDWIDTH_value_sum (n->quota_out, - dv->quota_out) - : dv->quota_out); + vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target); + if (NULL != vl) + { + /* Link was already up, remember dv is also now available and we are done */ + vl->dv = dv; + return; + } + vl = GNUNET_new (struct VirtualLink); + vl->target = dv->target; + vl->dv = dv; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + /* We lacked a confirmed connection to the target + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&dv->target); } @@ -5934,9 +5936,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until); GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos); GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos); - if ((GNUNET_NO == dv->core_visible) && - (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until) - .rel_value_us)) + if (0 < + GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) activate_core_visible_dv_path (pos); if (last_timeout.rel_value_us < GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT, @@ -5976,8 +5977,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, next_hop->dv_head, next_hop->dv_tail, hop); - if ((GNUNET_NO == dv->core_visible) && - (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)) + if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) activate_core_visible_dv_path (hop); return GNUNET_YES; } @@ -6942,75 +6942,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address) } -/** - * Task run periodically to check whether the validity of the given queue has - * run its course. If so, finds either another queue to take over, or clears - * the neighbour's `core_visible` flag. In the latter case, gives DV routes a - * chance to take over, and if that fails, notifies CORE about the disconnect. - * - * @param cls a `struct Queue` - */ -static void -core_queue_visibility_check (void *cls) -{ - struct Queue *q = cls; - - q->visibility_task = NULL; - if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) - { - q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - return; - } - update_neighbour_core_visibility (q->neighbour); -} - - -/** - * Check whether the CORE visibility of @a n should change. Finds either a - * queue to preserve the visibility, or clears the neighbour's `core_visible` - * flag. In the latter case, gives DV routes a chance to take over, and if - * that fails, notifies CORE about the disconnect. If so, check whether we - * need to notify CORE. - * - * @param n neighbour to perform the check for - */ -static void -update_neighbour_core_visibility (struct Neighbour *n) -{ - struct DistanceVector *dv; - - GNUNET_assert (GNUNET_YES == n->core_visible); - /* Check if _any_ queue of this neighbour is still valid, if so, schedule - the #core_queue_visibility_check() task for that queue */ - for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) - { - if (0 != - GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) - { - /* found a valid queue, use this one */ - q->visibility_task = - GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - return; - } - } - n->core_visible = GNUNET_NO; - - /* Check if _any_ DV route to this neighbour is currently - valid, if so, do NOT tell core about the loss of direct - connectivity (DV still counts!) */ - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); - if (GNUNET_YES == dv->core_visible) - return; - /* Nothing works anymore, need to tell CORE about the loss of - connectivity! */ - cores_send_disconnect_info (&n->pid); -} - - /** * Communicator gave us a transport address validation response. Process the * request. @@ -7030,8 +6961,8 @@ handle_validation_response ( .vs = NULL}; struct GNUNET_TIME_Absolute origin_time; struct Queue *q; - struct DistanceVector *dv; struct Neighbour *n; + struct VirtualLink *vl; /* check this is one of our challenges */ (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, @@ -7129,24 +7060,28 @@ handle_validation_response ( q->validated_until = vs->validated_until; q->pd.aged_rtt = vs->validation_rtt; n = q->neighbour; - if (GNUNET_NO != n->core_visible) - return; /* nothing changed, we are done here */ - n->core_visible = GNUNET_YES; - q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - /* Check if _any_ DV route to this neighbour is - currently valid, if so, do NOT tell core anything! */ - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); - if ((NULL != dv) && (GNUNET_YES == dv->core_visible)) - return; /* nothing changed, done */ - /* We lacked a confirmed connection to the neighbour + vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid); + if (NULL != vl) + { + /* Link was already up, remember n is also now available and we are done */ + vl->n = n; + return; + } + vl = GNUNET_new (struct VirtualLink); + vl->target = n->pid; + vl->n = n; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + /* We lacked a confirmed connection to the target before, so tell CORE about it (finally!) */ - cores_send_connect_info (&n->pid, - (NULL != dv) - ? GNUNET_BANDWIDTH_value_sum (dv->quota_out, - n->quota_out) - : n->quota_out); + cores_send_connect_info (&n->pid); } @@ -8256,7 +8191,6 @@ handle_add_queue_message (void *cls, if (NULL == neighbour) { neighbour = GNUNET_new (struct Neighbour); - neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; neighbour->pid = aqm->receiver; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put ( @@ -8872,8 +8806,12 @@ do_shutdown (void *cls) NULL); GNUNET_CONTAINER_multishortmap_destroy (pending_acks); pending_acks = NULL; + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours)); GNUNET_CONTAINER_multipeermap_destroy (neighbours); neighbours = NULL; + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links)); + GNUNET_CONTAINER_multipeermap_destroy (links); + links = NULL; GNUNET_CONTAINER_multipeermap_iterate (backtalkers, &free_backtalker_cb, NULL); @@ -8926,6 +8864,7 @@ run (void *cls, pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES); ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); + links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES); dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); ephemeral_heap = @@ -8995,6 +8934,10 @@ GNUNET_SERVICE_MAIN ( GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, struct OutboundMessage, NULL), + GNUNET_MQ_hd_fixed_size (client_recv_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK, + struct RecvOkMessage, + NULL), /* communication with communicators */ GNUNET_MQ_hd_var_size (communicator_available, GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, -- cgit v1.2.3