summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/transport/gnunet-service-tng.c195
1 files changed, 161 insertions, 34 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 83c057795..471ded644 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -28,7 +28,6 @@
* communicators do not offer flow control).
* We do transmit FC window sizes now. Left:
* for SENDING)
- * - Increment "outbound_fc_window_size_used" on transmission
* - Throttle sending if "outbound_fc_window_size_used" reaches limit
* - Send *new* challenge when we get close to the limit (including
* at the beginning when the limit is zero!)
@@ -203,6 +202,13 @@
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
+ * If a DVBox could not be forwarded after this number of
+ * seconds we drop it.
+ */
+#define DV_FORWARD_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
+/**
* We only consider queues as "quality" connections when
* suppressing the generation of DV initiation messages if
* the latency of the queue is below this threshold.
@@ -939,9 +945,9 @@ struct TransportFlowControlMessage
* Used to detect one-sided connection drops. On wrap-around, the
* flow control counters will be reset as if the connection had
* dropped.
- */
+ */
uint32_t seq GNUNET_PACKED;
-
+
/**
* Flow control window size in bytes, in NBO.
* The receiver can send this many bytes at most.
@@ -974,7 +980,6 @@ struct TransportFlowControlMessage
* reset the counters for the number of bytes sent!
*/
struct GNUNET_TIME_AbsoluteNBO sender_time;
-
};
@@ -1300,7 +1305,7 @@ struct VirtualLink
* Distance vector used by this virtual link, NULL if @e n is used.
*/
struct DistanceVector *dv;
-
+
/**
* Last challenge we received from @a n.
* FIXME: where do we need this?
@@ -1308,7 +1313,7 @@ struct VirtualLink
struct ChallengeNonceP n_challenge;
/**
- * Last challenge we used with @a n for flow control.
+ * Last challenge we used with @a n for flow control.
* FIXME: where do we need this?
*/
struct ChallengeNonceP my_challenge;
@@ -1373,7 +1378,7 @@ struct VirtualLink
* Based on the difference between how much the sender sent according
* to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message
* (@e outbound_sent field) and how much we actually received at that
- * time (@e incoming_fc_window_size_used). This delta is then
+ * time (@e incoming_fc_window_size_used). This delta is then
* added onto the @e incoming_fc_window_size when determining the
* @e outbound_window_size we send to the other peer. Initially zero.
* May be negative if we (due to out-of-order delivery) actually received
@@ -1407,7 +1412,7 @@ struct VirtualLink
* received.
*/
uint32_t last_fc_seq;
-
+
/**
* How many more messages can we send to CORE before we exhaust
* the receive window of CORE for this peer? If this hits zero,
@@ -2057,7 +2062,12 @@ enum PendingMessageType
/**
* Reliability box.
*/
- PMT_RELIABILITY_BOX = 2
+ PMT_RELIABILITY_BOX = 2,
+
+ /**
+ * Pending message created during #forward_dv_box().
+ */
+ PMT_DV_BOX = 3
};
@@ -2133,8 +2143,8 @@ struct PendingMessage
struct PendingAcknowledgement *pa_tail;
/**
- * This message, reliability boxed. Only possibly available if @e pmt is
- * #PMT_CORE.
+ * This message, reliability *or* DV-boxed. Only possibly available
+ * if @e pmt is #PMT_CORE.
*/
struct PendingMessage *bpm;
@@ -2949,7 +2959,11 @@ free_pending_message (struct PendingMessage *pm)
GNUNET_assert (pm == pm->qe->pm);
pm->qe->pm = NULL;
}
- GNUNET_free_non_null (pm->bpm);
+ if (NULL != pm->bpm)
+ {
+ free_fragment_tree (pm->bpm);
+ GNUNET_free (pm->bpm);
+ }
GNUNET_free (pm);
}
@@ -4751,16 +4765,18 @@ send_dv_to_neighbour (void *cls,
/**
* We need to transmit @a hdr to @a target. If necessary, this may
- * involve DV routing.
+ * involve DV routing. This function routes without applying flow
+ * control or congestion control and should only be used for control
+ * traffic.
*
* @param target peer to receive @a hdr
* @param hdr header of the message to route and #GNUNET_free()
* @param options which transmission channels are allowed
*/
static void
-route_message (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_MessageHeader *hdr,
- enum RouteMessageOptions options)
+route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_MessageHeader *hdr,
+ enum RouteMessageOptions options)
{
struct VirtualLink *vl;
struct Neighbour *n;
@@ -4878,7 +4894,7 @@ handle_communicator_backchannel (
isize],
is,
strlen (is) + 1);
- route_message (&cb->pid, &be->header, RMO_DV_ALLOWED);
+ route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -5303,7 +5319,7 @@ transmit_cummulative_ack_cb (void *cls)
ap[i].ack_delay = GNUNET_TIME_relative_hton (
GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
}
- route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+ route_control_message_without_fc (&ac->target, &ack->header, RMO_DV_ALLOWED);
ac->num_acks = 0;
ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
&destroy_ack_cummulator,
@@ -5742,6 +5758,12 @@ completed_pending_message (struct PendingMessage *pm)
(pos->frag_off == pos->bytes_msg))
client_send_response (pos);
return;
+ case PMT_DV_BOX:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Completed transmission of message %llu (DV Box)\n",
+ pm->logging_uuid);
+ free_pending_message (pm);
+ return;
}
}
@@ -6309,7 +6331,9 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
&dhp.purpose,
&dhops[nhops].hop_sig));
}
- route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED);
+ route_control_message_without_fc (next_hop,
+ &fwd->header,
+ RMO_UNCONFIRMED_ALLOWED);
}
@@ -6924,17 +6948,39 @@ forward_dv_box (struct Neighbour *next_hop,
const void *enc_payload,
uint16_t enc_payload_size)
{
- char buf[sizeof (struct TransportDVBoxMessage) +
- num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size];
- struct GNUNET_PeerIdentity *dhops =
- (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
-
+ struct VirtualLink *vl = next_hop->vl;
+ struct PendingMessage *pm;
+ size_t msg_size;
+ char *buf;
+ struct GNUNET_PeerIdentity *dhops;
+
+ GNUNET_assert (NULL != vl);
+ msg_size = sizeof (struct TransportDVBoxMessage) +
+ num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size;
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size);
+ pm->pmt = PMT_DV_BOX;
+ pm->vl = vl;
+ pm->timeout = GNUNET_TIME_relative_to_absolute (DV_FORWARD_TIMEOUT);
+ pm->logging_uuid = logging_uuid_gen++;
+ pm->prefs = GNUNET_MQ_PRIO_BACKGROUND;
+ pm->bytes_msg = msg_size;
+ buf = (char *) &pm[1];
memcpy (buf, hdr, sizeof (*hdr));
+ dhops =
+ (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
memcpy (&dhops[num_hops], enc_payload, enc_payload_size);
- route_message (&next_hop->pid,
- (const struct GNUNET_MessageHeader *) buf,
- RMO_NONE);
+ GNUNET_CONTAINER_MDLL_insert (vl,
+ vl->pending_msg_head,
+ vl->pending_msg_tail,
+ pm);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Created pending message %llu for DV Box with next hop %s (%u/%u)\n",
+ pm->logging_uuid,
+ GNUNET_i2s (&next_hop->pid),
+ (unsigned int) num_hops,
+ (unsigned int) total_hops);
+ check_vl_transmission (vl);
}
@@ -7696,9 +7742,9 @@ handle_validation_challenge (
&tvp.purpose,
&tvr.signature));
}
- route_message (&cmc->im.sender,
- &tvr.header,
- RMO_ANYTHING_GOES | RMO_REDUNDANT);
+ route_control_message_without_fc (&cmc->im.sender,
+ &tvr.header,
+ RMO_ANYTHING_GOES | RMO_REDUNDANT);
finish_cmc_handling (cmc);
vl = lookup_virtual_link (&cmc->im.sender);
@@ -8459,6 +8505,8 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
int frag;
int relb;
+ if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
+ continue; /* DV messages must not be DV-routed to next hop! */
if (pos->next_attempt.abs_value_us > now.abs_value_us)
break; /* too early for all messages, they are sorted by next_attempt */
if (NULL != pos->qe)
@@ -8547,6 +8595,41 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
/**
+ * Function to call to further operate on the now DV encapsulated
+ * message @a hdr, forwarding it via @a next_hop under respect of
+ * @a options.
+ *
+ * @param cls a `struct PendingMessageScoreContext`
+ * @param next_hop next hop of the DV path
+ * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage`
+ * @param options options of the original message
+ */
+static void
+extract_box_cb (void *cls,
+ struct Neighbour *next_hop,
+ const struct GNUNET_MessageHeader *hdr,
+ enum RouteMessageOptions options)
+{
+ struct PendingMessageScoreContext *sc = cls;
+ struct PendingMessage *pm = sc->best;
+ struct PendingMessage *bpm;
+ uint16_t bsize = ntohs (hdr->size);
+
+ GNUNET_assert (NULL == pm->bpm);
+ bpm = GNUNET_malloc (sizeof (struct PendingMessage) + bsize);
+ bpm->logging_uuid = logging_uuid_gen++;
+ bpm->pmt = PMT_DV_BOX;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating DV Box %llu for original message %llu (next hop is %s)\n",
+ bpm->logging_uuid,
+ pm->logging_uuid,
+ GNUNET_i2s (&next_hop->pid));
+ memcpy (&bpm[1], hdr, bsize);
+ pm->bpm = bpm;
+}
+
+
+/**
* We believe we are ready to transmit a `struct PendingMessage` on a
* queue, the big question is which one! We need to see if there is
* one pending that is allowed by flow control and congestion control
@@ -8610,9 +8693,30 @@ transmit_on_queue (void *cls)
/* Given selection in `sc`, do transmission */
pm = sc.best;
+ if (NULL != sc.dvh)
+ {
+ GNUNET_assert (PMT_DV_BOX != pm->pmt);
+ if (NULL != sc.best->bpm)
+ {
+ /* We did this boxing before, but possibly for a different path!
+ Discard old DV box! OPTIMIZE-ME: we might want to check if
+ it is the same and then not re-build the message... */
+ free_pending_message (sc.best->bpm);
+ sc.best->bpm = NULL;
+ }
+ encapsulate_for_dv (sc.dvh->dv,
+ 1,
+ &sc.dvh,
+ (const struct GNUNET_MessageHeader *) &sc.best[1],
+ &extract_box_cb,
+ &sc,
+ RMO_NONE);
+ GNUNET_assert (NULL != sc.best->bpm);
+ pm = sc.best->bpm;
+ }
if (GNUNET_YES == sc.frag)
{
- pm = fragment_message (queue, sc.dvh, sc.best);
+ pm = fragment_message (queue, sc.dvh, pm);
if (NULL == pm)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -8625,7 +8729,7 @@ transmit_on_queue (void *cls)
}
else if (GNUNET_YES == sc.relb)
{
- pm = reliability_box_message (queue, sc.dvh, sc.best);
+ pm = reliability_box_message (queue, sc.dvh, pm);
if (NULL == pm)
{
/* Reliability boxing failed, try next message... */
@@ -8639,8 +8743,6 @@ transmit_on_queue (void *cls)
return;
}
}
- else
- pm = sc.best; /* no boxing required */
/* Pass 'pm' for transission to the communicator */
GNUNET_log (
@@ -8650,6 +8752,31 @@ transmit_on_queue (void *cls)
queue->address,
GNUNET_i2s (&n->pid),
sc.consideration_counter);
+
+ /* Flow control: increment amount of traffic sent; if we are routing
+ via DV (and thus the ultimate target of the pending message is for
+ a different virtual link than the one of the queue), then we need
+ to use up not only the window of the direct link but also the
+ flow control window for the DV link! */
+ pm->vl->outbound_fc_window_size_used += pm->bytes_msg;
+
+ if (pm->vl != queue->neighbour->vl)
+ {
+ /* If the virtual link of the queue differs, this better be distance
+ vector routing! */
+ GNUNET_assert (NULL != sc.dvh);
+ /* If we do distance vector routing, we better not do this for a
+ message that was itself DV-routed */
+ GNUNET_assert (PMT_DV_BOX != sc.best->pmt);
+ /* We use the size of the unboxed message here, to avoid counting
+ the DV-Box header which is eaten up on the way by intermediaries */
+ queue->neighbour->vl->outbound_fc_window_size_used += sc.best->bytes_msg;
+ }
+ else
+ {
+ GNUNET_assert (NULL == sc.dvh);
+ }
+
queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
/* Check if this transmission somehow conclusively finished handing 'pm'