summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/transport/gnunet-service-tng.c382
1 files changed, 298 insertions, 84 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 29bf3bf95..f3874724a 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -35,7 +35,25 @@
* Implement next:
* - route_message() implementation, including using DV data structures
* (but not when routing certain message types, like DV learn,
- * MUST pay attention to content here -- or pass extra flags?)
+ * looks like now like we need two flags (DV/no-DV, confirmed-only,
+ * unconfirmed OK)
+ * + NOTE: do NOT use PendingMessage for route_message(), as that is
+ * for fragmentation/reliability and ultimately core flow control!
+ * => route_message() should pick the queue
+ * => in case of DV routing, route_message should BOX the message, too.
+ * - We currently do NEVER tell CORE also about DV-connections (core_visible
+ * of `struct DistanceVector` is simply never set!)
+ * + When? Easy if we initiated the DV and got the challenge; do that NOW
+ * BUT what we passively learned DV (unconfirmed freshness)
+ * => Do we trigger Challenge->Response there as well, or 'wait' for
+ * our own DV initiations to discover?
+ * => What about DV routes that expire? Do we also only count on
+ * our own DV initiations for maintenance here, or do we
+ * try to specifically re-confirm the existence of a particular path?
+ * => OPITMIZATION-FIXME!
+ * + Where do we track what we told core? Careful: need to check
+ * the "core_visible' flag in both neighbours and DV before
+ * sending out notifications to CORE!
* - retransmission logic
* - track RTT, distance, loss, etc. => requires extra data structures!
*
@@ -55,11 +73,16 @@
* FIXME (without marks in the code!):
* - proper use/initialization of timestamps in messages exchanged
* during DV learning
+ * - persistence of monotonic time obtained from other peers
+ * in PEERSTORE (by message type)
*
* Optimizations:
* - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
* against our pending message queue (requires additional per neighbour
* hash map to be maintained, avoids possible linear scan on pending msgs)
+ * - queue_send_msg and route_message both by API design have to make copies
+ * of the payload, and route_message on top of that requires a malloc/free.
+ * Change design to approximate "zero" copy better...
*
* Design realizations / discussion:
* - communicators do flow control by calling MQ "notify sent"
@@ -1060,6 +1083,13 @@ struct DistanceVector
* Task scheduled to purge expired paths from @e dv_head MDLL.
*/
struct GNUNET_SCHEDULER_Task *timeout_task;
+
+ /**
+ * Is one of the DV paths in this struct 'confirmed' and thus
+ * the cause for CORE to see this peer as connected? (Note that
+ * the same may apply to a `struct Neighbour` at the same time.)
+ */
+ int core_visible;
};
@@ -1162,11 +1192,26 @@ struct Queue
struct GNUNET_SCHEDULER_Task *transmit_task;
/**
+ * Task scheduled to possibly notfiy core that this queue is no longer
+ * counting as confirmed. Runs the #core_queue_visibility_check().
+ */
+ struct GNUNET_SCHEDULER_Task *visibility_task;
+
+ /**
* Our current RTT estimate for this queue.
*/
struct GNUNET_TIME_Relative rtt;
/**
+ * How long do *we* consider this @e address to be valid? In the past or
+ * zero if we have not yet validated it. Can be updated based on
+ * challenge-response validations (via address validation logic), or when we
+ * receive ACKs that we can definitively map to transmissions via this
+ * queue.
+ */
+ struct GNUNET_TIME_Absolute validated_until;
+
+ /**
* Message ID generator for transmissions on this queue.
*/
uint64_t mid_gen;
@@ -1397,6 +1442,11 @@ struct Neighbour
*/
struct GNUNET_TIME_Absolute earliest_timeout;
+ /**
+ * Do we have a confirmed working queue and are thus visible to
+ * CORE?
+ */
+ int core_visible;
};
@@ -2514,6 +2564,16 @@ schedule_transmit_on_queue (struct Queue *queue)
/**
+ * Check whether the CORE visibility of @a n changed. If so,
+ * check whether we need to notify CORE.
+ *
+ * @param n neighbour to perform the check for
+ */
+static void
+update_neighbour_core_visibility (struct Neighbour *n);
+
+
+/**
* Free @a queue.
*
* @param queue the queue to free
@@ -2535,6 +2595,11 @@ free_queue (struct Queue *queue)
GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task = NULL;
}
+ if (NULL != queue->visibility_task)
+ {
+ GNUNET_SCHEDULER_cancel (queue->visibility_task);
+ queue->visibility_task = NULL;
+ }
GNUNET_CONTAINER_MDLL_remove (neighbour,
neighbour->queue_head,
neighbour->queue_tail,
@@ -2574,9 +2639,12 @@ free_queue (struct Queue *queue)
GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
GNUNET_free (queue);
+
+ update_neighbour_core_visibility (neighbour);
+ cores_send_disconnect_info (&neighbour->pid);
+
if (NULL == neighbour->queue_head)
{
- cores_send_disconnect_info (&neighbour->pid);
free_neighbour (neighbour);
}
}
@@ -3209,16 +3277,89 @@ lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
/**
+ * Send the control message @a payload on @a queue.
+ *
+ * @param queue the queue to use for transmission
+ * @param pm pending message to update once transmission is done, may be NULL!
+ * @param payload the payload to send (encapsulated in a
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
+ * @param payload_size number of bytes in @a payload
+ */
+static void
+queue_send_msg (struct Queue *queue,
+ struct PendingMessage *pm,
+ const void *payload,
+ size_t payload_size)
+{
+ struct Neighbour *n = queue->neighbour;
+ struct GNUNET_TRANSPORT_SendMessageTo *smt;
+ struct GNUNET_MQ_Envelope *env;
+
+ env = GNUNET_MQ_msg_extra (smt,
+ payload_size,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
+ smt->qid = queue->qid;
+ smt->mid = queue->mid_gen;
+ smt->receiver = n->pid;
+ memcpy (&smt[1],
+ payload,
+ payload_size);
+ {
+ /* Pass the env to the communicator of queue for transmission. */
+ struct QueueEntry *qe;
+
+ qe = GNUNET_new (struct QueueEntry);
+ qe->mid = queue->mid_gen++;
+ qe->queue = queue;
+ // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
+ // (also, note that pm may be NULL!)
+ GNUNET_CONTAINER_DLL_insert (queue->queue_head,
+ queue->queue_tail,
+ qe);
+ GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
+ queue->queue_length++;
+ queue->tc->details.communicator.total_queue_length++;
+ GNUNET_MQ_send (queue->tc->mq,
+ env);
+ }
+}
+
+
+/**
* We need to transmit @a hdr to @a target. If necessary, this may
* involve DV routing or even broadcasting and fragmentation.
*
* @param target peer to receive @a hdr
- * @param hdr header of the message to route
+ * @param hdr header of the message to route and #GNUNET_free()
*/
static void
route_message (const struct GNUNET_PeerIdentity *target,
struct GNUNET_MessageHeader *hdr)
{
+ // Cases:
+ // 1: called to transmit backchannel message we initiated
+ // 2: called to transmit fragment ack
+ // 3: called to transmit reliability box
+ // 4: called to forward backchannel message
+ // 5: called to forward DV learn message (caller already picked random neighbour(s))!
+ // 6: called to forward DV Box message
+ // 7: called to forward valdiation response
+
+ // Choices:
+ // a) Send ONLY to a *confirmed* direct neighbour
+ // b) Send allowed to *unconfirmed* direct neighbour
+ // c) Route also via *confirmed* DV to target
+ // c) Route allowed via *unconfirmed DV to target
+ // => One BIT "dv allowed or not", plus one BIT "confirmed/unconfirmed" might do!
+
+ // Case analysis:
+ // 1 2 3 4 5 6 7
+ // a X X X X X X X
+ // b X X
+ // c X X X X X
+ // d X
+ //
+
// FIXME: this one is tricky:
// - we could try a direct, reliable channel
// - if that is unavailable / for load balancing, we may try:
@@ -4305,6 +4446,8 @@ check_backchannel_encapsulation (void *cls,
/**
* Communicator gave us a backchannel encapsulation. Process the request.
+ * (We are not the origin of the backchannel here, the communicator simply
+ * received a backchannel message and we are expected to forward it.)
*
* @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
* @param be the message that was received
@@ -4811,6 +4954,10 @@ handle_dv_learn (void *cls,
ilat = GNUNET_TIME_relative_multiply (network_latency,
i);
path[i] = hops[i-1].hop;
+ // FIXME: mark ALL of these as *confirmed* (with what timeout?)
+ // -- and schedule a job for the confirmation to time out! --
+ // and possibly do #cores_send_connect_info() if
+ // the respective neighbour is NOT confirmed yet!
learn_dv_path (path,
i,
ilat);
@@ -5263,6 +5410,107 @@ update_next_challenge_time (struct ValidationState *vs,
/**
+ * Find the queue matching @a pid and @a address.
+ *
+ * @param pid peer the queue must go to
+ * @param address address the queue must use
+ * @return NULL if no such queue exists
+ */
+static struct Queue *
+find_queue (const struct GNUNET_PeerIdentity *pid,
+ const char *address)
+{
+ struct Neighbour *n;
+
+ n = GNUNET_CONTAINER_multipeermap_get (neighbours,
+ pid);
+ if (NULL == n)
+ return NULL;
+ for (struct Queue *pos = n->queue_head;
+ NULL != pos;
+ pos = pos->next_neighbour)
+ {
+ if (0 == strcmp (pos->address,
+ address))
+ return pos;
+ }
+ return NULL;
+}
+
+
+/**
+ * Task run periodically to check whether the validity of the given queue has
+ * run its course. If so, finds either another queue to take over, or clears
+ * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
+ * chance to take over, and if that fails, notifies CORE about the disconnect.
+ *
+ * @param cls a `struct Queue`
+ */
+static void
+core_queue_visibility_check (void *cls)
+{
+ struct Queue *q = cls;
+
+ q->visibility_task = NULL;
+ if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
+ {
+ q->visibility_task
+ = GNUNET_SCHEDULER_add_at (q->validated_until,
+ &core_queue_visibility_check,
+ q);
+ return;
+ }
+ update_neighbour_core_visibility (q->neighbour);
+}
+
+
+/**
+ * Check whether the CORE visibility of @a n should change. Finds either a
+ * queue to preserve the visibility, or clears the neighbour's `core_visible`
+ * flag. In the latter case, gives DV routes a chance to take over, and if
+ * that fails, notifies CORE about the disconnect. If so, check whether we
+ * need to notify CORE.
+ *
+ * @param n neighbour to perform the check for
+ */
+static void
+update_neighbour_core_visibility (struct Neighbour *n)
+{
+ struct DistanceVector *dv;
+
+ GNUNET_assert (GNUNET_YES == n->core_visible);
+ /* Check if _any_ queue of this neighbour is still valid, if so, schedule
+ the #core_queue_visibility_check() task for that queue */
+ for (struct Queue *q = n->queue_head;
+ NULL != q;
+ q = q->next_neighbour)
+ {
+ if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
+ {
+ /* found a valid queue, use this one */
+ q->visibility_task
+ = GNUNET_SCHEDULER_add_at (q->validated_until,
+ &core_queue_visibility_check,
+ q);
+ return;
+ }
+ }
+ n->core_visible = GNUNET_NO;
+
+ /* Check if _any_ DV route to this neighbour is currently
+ valid, if so, do NOT tell core about the loss of direct
+ connectivity (DV still counts!) */
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
+ &n->pid);
+ if (GNUNET_YES == dv->core_visible)
+ return;
+ /* Nothing works anymore, need to tell CORE about the loss of
+ connectivity! */
+ cores_send_disconnect_info (&n->pid);
+}
+
+
+/**
* Communicator gave us a transport address validation response. Process the request.
*
* @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
@@ -5279,6 +5527,8 @@ handle_validation_response (void *cls,
.vs = NULL
};
struct GNUNET_TIME_Absolute origin_time;
+ struct Queue *q;
+ struct DistanceVector *dv;
/* check this is one of our challenges */
(void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
@@ -5357,8 +5607,39 @@ handle_validation_response (void *cls,
GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
&peerstore_store_validation_cb,
vs);
- // FIXME: should we find the matching queue and update the RTT?
finish_cmc_handling (cmc);
+
+ /* Finally, we now possibly have a confirmed (!) working queue,
+ update queue status (if queue still is around) */
+ q = find_queue (&vs->pid,
+ vs->address);
+ if (NULL == q)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# Queues lost at time of successful validation",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+ q->validated_until = vs->validated_until;
+ q->rtt = vs->validation_rtt;
+ if (GNUNET_NO != q->neighbour->core_visible)
+ return; /* nothing changed, we are done here */
+ q->neighbour->core_visible = GNUNET_YES;
+ q->visibility_task
+ = GNUNET_SCHEDULER_add_at (q->validated_until,
+ &core_queue_visibility_check,
+ q);
+ /* Check if _any_ DV route to this neighbour is
+ currently valid, if so, do NOT tell core anything! */
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
+ &q->neighbour->pid);
+ if (GNUNET_YES == dv->core_visible)
+ return; /* nothing changed, done */
+ /* We lacked a confirmed connection to the neighbour
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&q->neighbour->pid,
+ GNUNET_BANDWIDTH_ZERO);
}
@@ -5640,19 +5921,19 @@ reliability_box_message (struct PendingMessage *pm)
/* failed hard */
GNUNET_break (0);
client_send_response (pm,
- GNUNET_NO,
- 0);
+ GNUNET_NO,
+ 0);
return NULL;
}
bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
- sizeof (rbox) +
- pm->bytes_msg);
+ sizeof (rbox) +
+ pm->bytes_msg);
bpm->target = pm->target;
bpm->frag_parent = pm;
GNUNET_CONTAINER_MDLL_insert (frag,
- pm->head_frag,
- pm->tail_frag,
- bpm);
+ pm->head_frag,
+ pm->tail_frag,
+ bpm);
bpm->timeout = pm->timeout;
bpm->pmt = PMT_RELIABILITY_BOX;
bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
@@ -5663,66 +5944,17 @@ reliability_box_message (struct PendingMessage *pm)
rbox.msg_uuid = pm->msg_uuid;
msg = (char *) &bpm[1];
memcpy (msg,
- &rbox,
- sizeof (rbox));
+ &rbox,
+ sizeof (rbox));
memcpy (&msg[sizeof (rbox)],
- &pm[1],
- pm->bytes_msg);
+ &pm[1],
+ pm->bytes_msg);
pm->bpm = bpm;
return bpm;
}
/**
- * Send the control message @a payload on @a queue.
- *
- * @param queue the queue to use for transmission
- * @param pm pending message to update once transmission is done, may be NULL!
- * @param payload the payload to send (encapsulated in a
- * #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
- * @param payload_size number of bytes in @a payload
- */
-static void
-queue_send_msg (struct Queue *queue,
- struct PendingMessage *pm,
- const void *payload,
- size_t payload_size)
-{
- struct Neighbour *n = queue->neighbour;
- struct GNUNET_TRANSPORT_SendMessageTo *smt;
- struct GNUNET_MQ_Envelope *env;
-
- env = GNUNET_MQ_msg_extra (smt,
- payload_size,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
- smt->qid = queue->qid;
- smt->mid = queue->mid_gen;
- smt->receiver = n->pid;
- memcpy (&smt[1],
- payload,
- payload_size);
- {
- /* Pass the env to the communicator of queue for transmission. */
- struct QueueEntry *qe;
-
- qe = GNUNET_new (struct QueueEntry);
- qe->mid = queue->mid_gen++;
- qe->queue = queue;
- // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
- // (also, note that pm may be NULL!)
- GNUNET_CONTAINER_DLL_insert (queue->queue_head,
- queue->queue_tail,
- qe);
- GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
- queue->queue_length++;
- queue->tc->details.communicator.total_queue_length++;
- GNUNET_MQ_send (queue->tc->mq,
- env);
- }
-}
-
-
-/**
* We believe we are ready to transmit a message on a queue. Double-checks
* with the queue's "tracker_out" and then gives the message to the
* communicator for transmission (updating the tracker, and re-scheduling
@@ -6268,7 +6500,6 @@ static void
validation_start_cb (void *cls)
{
struct ValidationState *vs;
- struct Neighbour *n;
struct Queue *q;
(void) cls;
@@ -6284,23 +6515,8 @@ validation_start_cb (void *cls)
if (NULL == vs)
return; /* woopsie, no more addresses known, should only
happen if we're really a lonely peer */
- n = GNUNET_CONTAINER_multipeermap_get (neighbours,
- &vs->pid);
- q = NULL;
- if (NULL != n)
- {
- for (struct Queue *pos = n->queue_head;
- NULL != pos;
- pos = pos->next_neighbour)
- {
- if (0 == strcmp (pos->address,
- vs->address))
- {
- q = pos;
- break;
- }
- }
- }
+ q = find_queue (&vs->pid,
+ vs->address);
if (NULL == q)
{
vs->awaiting_queue = GNUNET_YES;
@@ -6570,8 +6786,6 @@ handle_add_queue_message (void *cls,
&neighbour->pid,
neighbour,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- cores_send_connect_info (&neighbour->pid,
- GNUNET_BANDWIDTH_ZERO);
}
addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
addr = (const char *) &aqm[1];