From bd337cf7c6993eb8e97976ec0b088b317c57da0e Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 17 Apr 2019 22:15:43 +0200 Subject: only notify core about validated queues --- src/transport/gnunet-service-tng.c | 382 +++++++++++++++++++++++++++++-------- 1 file changed, 298 insertions(+), 84 deletions(-) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 29bf3bf95..f3874724a 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -35,7 +35,25 @@ * Implement next: * - route_message() implementation, including using DV data structures * (but not when routing certain message types, like DV learn, - * MUST pay attention to content here -- or pass extra flags?) + * looks like now like we need two flags (DV/no-DV, confirmed-only, + * unconfirmed OK) + * + NOTE: do NOT use PendingMessage for route_message(), as that is + * for fragmentation/reliability and ultimately core flow control! + * => route_message() should pick the queue + * => in case of DV routing, route_message should BOX the message, too. + * - We currently do NEVER tell CORE also about DV-connections (core_visible + * of `struct DistanceVector` is simply never set!) + * + When? Easy if we initiated the DV and got the challenge; do that NOW + * BUT what we passively learned DV (unconfirmed freshness) + * => Do we trigger Challenge->Response there as well, or 'wait' for + * our own DV initiations to discover? + * => What about DV routes that expire? Do we also only count on + * our own DV initiations for maintenance here, or do we + * try to specifically re-confirm the existence of a particular path? + * => OPITMIZATION-FIXME! + * + Where do we track what we told core? Careful: need to check + * the "core_visible' flag in both neighbours and DV before + * sending out notifications to CORE! * - retransmission logic * - track RTT, distance, loss, etc. => requires extra data structures! * @@ -55,11 +73,16 @@ * FIXME (without marks in the code!): * - proper use/initialization of timestamps in messages exchanged * during DV learning + * - persistence of monotonic time obtained from other peers + * in PEERSTORE (by message type) * * Optimizations: * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs * against our pending message queue (requires additional per neighbour * hash map to be maintained, avoids possible linear scan on pending msgs) + * - queue_send_msg and route_message both by API design have to make copies + * of the payload, and route_message on top of that requires a malloc/free. + * Change design to approximate "zero" copy better... * * Design realizations / discussion: * - communicators do flow control by calling MQ "notify sent" @@ -1060,6 +1083,13 @@ struct DistanceVector * Task scheduled to purge expired paths from @e dv_head MDLL. */ struct GNUNET_SCHEDULER_Task *timeout_task; + + /** + * Is one of the DV paths in this struct 'confirmed' and thus + * the cause for CORE to see this peer as connected? (Note that + * the same may apply to a `struct Neighbour` at the same time.) + */ + int core_visible; }; @@ -1161,11 +1191,26 @@ struct Queue */ struct GNUNET_SCHEDULER_Task *transmit_task; + /** + * Task scheduled to possibly notfiy core that this queue is no longer + * counting as confirmed. Runs the #core_queue_visibility_check(). + */ + struct GNUNET_SCHEDULER_Task *visibility_task; + /** * Our current RTT estimate for this queue. */ struct GNUNET_TIME_Relative rtt; + /** + * How long do *we* consider this @e address to be valid? In the past or + * zero if we have not yet validated it. Can be updated based on + * challenge-response validations (via address validation logic), or when we + * receive ACKs that we can definitively map to transmissions via this + * queue. + */ + struct GNUNET_TIME_Absolute validated_until; + /** * Message ID generator for transmissions on this queue. */ @@ -1397,6 +1442,11 @@ struct Neighbour */ struct GNUNET_TIME_Absolute earliest_timeout; + /** + * Do we have a confirmed working queue and are thus visible to + * CORE? + */ + int core_visible; }; @@ -2513,6 +2563,16 @@ schedule_transmit_on_queue (struct Queue *queue) } +/** + * Check whether the CORE visibility of @a n changed. If so, + * check whether we need to notify CORE. + * + * @param n neighbour to perform the check for + */ +static void +update_neighbour_core_visibility (struct Neighbour *n); + + /** * Free @a queue. * @@ -2535,6 +2595,11 @@ free_queue (struct Queue *queue) GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = NULL; } + if (NULL != queue->visibility_task) + { + GNUNET_SCHEDULER_cancel (queue->visibility_task); + queue->visibility_task = NULL; + } GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->queue_head, neighbour->queue_tail, @@ -2574,9 +2639,12 @@ free_queue (struct Queue *queue) GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in); GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out); GNUNET_free (queue); + + update_neighbour_core_visibility (neighbour); + cores_send_disconnect_info (&neighbour->pid); + if (NULL == neighbour->queue_head) { - cores_send_disconnect_info (&neighbour->pid); free_neighbour (neighbour); } } @@ -3208,17 +3276,90 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid, } +/** + * Send the control message @a payload on @a queue. + * + * @param queue the queue to use for transmission + * @param pm pending message to update once transmission is done, may be NULL! + * @param payload the payload to send (encapsulated in a + * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG). + * @param payload_size number of bytes in @a payload + */ +static void +queue_send_msg (struct Queue *queue, + struct PendingMessage *pm, + const void *payload, + size_t payload_size) +{ + struct Neighbour *n = queue->neighbour; + struct GNUNET_TRANSPORT_SendMessageTo *smt; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg_extra (smt, + payload_size, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); + smt->qid = queue->qid; + smt->mid = queue->mid_gen; + smt->receiver = n->pid; + memcpy (&smt[1], + payload, + payload_size); + { + /* Pass the env to the communicator of queue for transmission. */ + struct QueueEntry *qe; + + qe = GNUNET_new (struct QueueEntry); + qe->mid = queue->mid_gen++; + qe->queue = queue; + // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'! + // (also, note that pm may be NULL!) + GNUNET_CONTAINER_DLL_insert (queue->queue_head, + queue->queue_tail, + qe); + GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); + queue->queue_length++; + queue->tc->details.communicator.total_queue_length++; + GNUNET_MQ_send (queue->tc->mq, + env); + } +} + + /** * We need to transmit @a hdr to @a target. If necessary, this may * involve DV routing or even broadcasting and fragmentation. * * @param target peer to receive @a hdr - * @param hdr header of the message to route + * @param hdr header of the message to route and #GNUNET_free() */ static void route_message (const struct GNUNET_PeerIdentity *target, struct GNUNET_MessageHeader *hdr) { + // Cases: + // 1: called to transmit backchannel message we initiated + // 2: called to transmit fragment ack + // 3: called to transmit reliability box + // 4: called to forward backchannel message + // 5: called to forward DV learn message (caller already picked random neighbour(s))! + // 6: called to forward DV Box message + // 7: called to forward valdiation response + + // Choices: + // a) Send ONLY to a *confirmed* direct neighbour + // b) Send allowed to *unconfirmed* direct neighbour + // c) Route also via *confirmed* DV to target + // c) Route allowed via *unconfirmed DV to target + // => One BIT "dv allowed or not", plus one BIT "confirmed/unconfirmed" might do! + + // Case analysis: + // 1 2 3 4 5 6 7 + // a X X X X X X X + // b X X + // c X X X X X + // d X + // + // FIXME: this one is tricky: // - we could try a direct, reliable channel // - if that is unavailable / for load balancing, we may try: @@ -4305,6 +4446,8 @@ check_backchannel_encapsulation (void *cls, /** * Communicator gave us a backchannel encapsulation. Process the request. + * (We are not the origin of the backchannel here, the communicator simply + * received a backchannel message and we are expected to forward it.) * * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) * @param be the message that was received @@ -4811,6 +4954,10 @@ handle_dv_learn (void *cls, ilat = GNUNET_TIME_relative_multiply (network_latency, i); path[i] = hops[i-1].hop; + // FIXME: mark ALL of these as *confirmed* (with what timeout?) + // -- and schedule a job for the confirmation to time out! -- + // and possibly do #cores_send_connect_info() if + // the respective neighbour is NOT confirmed yet! learn_dv_path (path, i, ilat); @@ -5262,6 +5409,107 @@ update_next_challenge_time (struct ValidationState *vs, } +/** + * Find the queue matching @a pid and @a address. + * + * @param pid peer the queue must go to + * @param address address the queue must use + * @return NULL if no such queue exists + */ +static struct Queue * +find_queue (const struct GNUNET_PeerIdentity *pid, + const char *address) +{ + struct Neighbour *n; + + n = GNUNET_CONTAINER_multipeermap_get (neighbours, + pid); + if (NULL == n) + return NULL; + for (struct Queue *pos = n->queue_head; + NULL != pos; + pos = pos->next_neighbour) + { + if (0 == strcmp (pos->address, + address)) + return pos; + } + return NULL; +} + + +/** + * Task run periodically to check whether the validity of the given queue has + * run its course. If so, finds either another queue to take over, or clears + * the neighbour's `core_visible` flag. In the latter case, gives DV routes a + * chance to take over, and if that fails, notifies CORE about the disconnect. + * + * @param cls a `struct Queue` + */ +static void +core_queue_visibility_check (void *cls) +{ + struct Queue *q = cls; + + q->visibility_task = NULL; + if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) + { + q->visibility_task + = GNUNET_SCHEDULER_add_at (q->validated_until, + &core_queue_visibility_check, + q); + return; + } + update_neighbour_core_visibility (q->neighbour); +} + + +/** + * Check whether the CORE visibility of @a n should change. Finds either a + * queue to preserve the visibility, or clears the neighbour's `core_visible` + * flag. In the latter case, gives DV routes a chance to take over, and if + * that fails, notifies CORE about the disconnect. If so, check whether we + * need to notify CORE. + * + * @param n neighbour to perform the check for + */ +static void +update_neighbour_core_visibility (struct Neighbour *n) +{ + struct DistanceVector *dv; + + GNUNET_assert (GNUNET_YES == n->core_visible); + /* Check if _any_ queue of this neighbour is still valid, if so, schedule + the #core_queue_visibility_check() task for that queue */ + for (struct Queue *q = n->queue_head; + NULL != q; + q = q->next_neighbour) + { + if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) + { + /* found a valid queue, use this one */ + q->visibility_task + = GNUNET_SCHEDULER_add_at (q->validated_until, + &core_queue_visibility_check, + q); + return; + } + } + n->core_visible = GNUNET_NO; + + /* Check if _any_ DV route to this neighbour is currently + valid, if so, do NOT tell core about the loss of direct + connectivity (DV still counts!) */ + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, + &n->pid); + if (GNUNET_YES == dv->core_visible) + return; + /* Nothing works anymore, need to tell CORE about the loss of + connectivity! */ + cores_send_disconnect_info (&n->pid); +} + + /** * Communicator gave us a transport address validation response. Process the request. * @@ -5279,6 +5527,8 @@ handle_validation_response (void *cls, .vs = NULL }; struct GNUNET_TIME_Absolute origin_time; + struct Queue *q; + struct DistanceVector *dv; /* check this is one of our challenges */ (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, @@ -5357,8 +5607,39 @@ handle_validation_response (void *cls, GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, &peerstore_store_validation_cb, vs); - // FIXME: should we find the matching queue and update the RTT? finish_cmc_handling (cmc); + + /* Finally, we now possibly have a confirmed (!) working queue, + update queue status (if queue still is around) */ + q = find_queue (&vs->pid, + vs->address); + if (NULL == q) + { + GNUNET_STATISTICS_update (GST_stats, + "# Queues lost at time of successful validation", + 1, + GNUNET_NO); + return; + } + q->validated_until = vs->validated_until; + q->rtt = vs->validation_rtt; + if (GNUNET_NO != q->neighbour->core_visible) + return; /* nothing changed, we are done here */ + q->neighbour->core_visible = GNUNET_YES; + q->visibility_task + = GNUNET_SCHEDULER_add_at (q->validated_until, + &core_queue_visibility_check, + q); + /* Check if _any_ DV route to this neighbour is + currently valid, if so, do NOT tell core anything! */ + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, + &q->neighbour->pid); + if (GNUNET_YES == dv->core_visible) + return; /* nothing changed, done */ + /* We lacked a confirmed connection to the neighbour + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&q->neighbour->pid, + GNUNET_BANDWIDTH_ZERO); } @@ -5640,19 +5921,19 @@ reliability_box_message (struct PendingMessage *pm) /* failed hard */ GNUNET_break (0); client_send_response (pm, - GNUNET_NO, - 0); + GNUNET_NO, + 0); return NULL; } bpm = GNUNET_malloc (sizeof (struct PendingMessage) + - sizeof (rbox) + - pm->bytes_msg); + sizeof (rbox) + + pm->bytes_msg); bpm->target = pm->target; bpm->frag_parent = pm; GNUNET_CONTAINER_MDLL_insert (frag, - pm->head_frag, - pm->tail_frag, - bpm); + pm->head_frag, + pm->tail_frag, + bpm); bpm->timeout = pm->timeout; bpm->pmt = PMT_RELIABILITY_BOX; bpm->bytes_msg = pm->bytes_msg + sizeof (rbox); @@ -5663,65 +5944,16 @@ reliability_box_message (struct PendingMessage *pm) rbox.msg_uuid = pm->msg_uuid; msg = (char *) &bpm[1]; memcpy (msg, - &rbox, - sizeof (rbox)); + &rbox, + sizeof (rbox)); memcpy (&msg[sizeof (rbox)], - &pm[1], - pm->bytes_msg); + &pm[1], + pm->bytes_msg); pm->bpm = bpm; return bpm; } -/** - * Send the control message @a payload on @a queue. - * - * @param queue the queue to use for transmission - * @param pm pending message to update once transmission is done, may be NULL! - * @param payload the payload to send (encapsulated in a - * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG). - * @param payload_size number of bytes in @a payload - */ -static void -queue_send_msg (struct Queue *queue, - struct PendingMessage *pm, - const void *payload, - size_t payload_size) -{ - struct Neighbour *n = queue->neighbour; - struct GNUNET_TRANSPORT_SendMessageTo *smt; - struct GNUNET_MQ_Envelope *env; - - env = GNUNET_MQ_msg_extra (smt, - payload_size, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); - smt->qid = queue->qid; - smt->mid = queue->mid_gen; - smt->receiver = n->pid; - memcpy (&smt[1], - payload, - payload_size); - { - /* Pass the env to the communicator of queue for transmission. */ - struct QueueEntry *qe; - - qe = GNUNET_new (struct QueueEntry); - qe->mid = queue->mid_gen++; - qe->queue = queue; - // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'! - // (also, note that pm may be NULL!) - GNUNET_CONTAINER_DLL_insert (queue->queue_head, - queue->queue_tail, - qe); - GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); - queue->queue_length++; - queue->tc->details.communicator.total_queue_length++; - GNUNET_MQ_send (queue->tc->mq, - env); - } -} - - /** * We believe we are ready to transmit a message on a queue. Double-checks * with the queue's "tracker_out" and then gives the message to the @@ -6268,7 +6500,6 @@ static void validation_start_cb (void *cls) { struct ValidationState *vs; - struct Neighbour *n; struct Queue *q; (void) cls; @@ -6284,23 +6515,8 @@ validation_start_cb (void *cls) if (NULL == vs) return; /* woopsie, no more addresses known, should only happen if we're really a lonely peer */ - n = GNUNET_CONTAINER_multipeermap_get (neighbours, - &vs->pid); - q = NULL; - if (NULL != n) - { - for (struct Queue *pos = n->queue_head; - NULL != pos; - pos = pos->next_neighbour) - { - if (0 == strcmp (pos->address, - vs->address)) - { - q = pos; - break; - } - } - } + q = find_queue (&vs->pid, + vs->address); if (NULL == q) { vs->awaiting_queue = GNUNET_YES; @@ -6570,8 +6786,6 @@ handle_add_queue_message (void *cls, &neighbour->pid, neighbour, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - cores_send_connect_info (&neighbour->pid, - GNUNET_BANDWIDTH_ZERO); } addr_len = ntohs (aqm->header.size) - sizeof (*aqm); addr = (const char *) &aqm[1]; -- cgit v1.2.3