summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-30 10:58:56 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-30 10:59:05 +0200
commita563820a60326473e74ac2431431e86e1963fd31 (patch)
tree5c69c43a9ff2867a83cd8a1b604e8bd6ff1892eb /src/transport/gnunet-service-tng.c
parenta7bb6a804cb3a598ad9957f4e1c0a7a716731fd2 (diff)
complete CORE flow control loop
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c234
1 files changed, 135 insertions, 99 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 825d45522..a8f70986b 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -24,40 +24,31 @@
*
* TODO:
* Implement next:
- * - complete flow control push back from CORE via TRANSPORT to communicators:
- * + resume communicators in handle_client_recv_ok (see FIXME)
- * + count transmissions to CORE and suspend communicators if window is full
- * - check flow control push back from TRANSPROT to CORE:
- * + check when to send ACKs
- * - change transport-core API to provide proper flow control in both
- * directions, allow multiple messages per peer simultaneously (tag
- * confirmations with unique message ID), and replace quota-out with
- * proper flow control; specify transmission preferences (latency,
+ * - add (more) logging
+ * - change transport-core API to specify transmission preferences (latency,
* reliability, etc.) per message!
- * - add logging
- *
- * Later:
* - review retransmission logic, right now there is no smartness there!
- * => congestion control, flow control, etc
+ * => congestion control, flow control, etc [PERFORMANCE-BASICS]
*
* Optimizations:
* - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
- * => Need 128 bit hash map though!
+ * => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
* - queue_send_msg and route_message both by API design have to make copies
* of the payload, and route_message on top of that requires a malloc/free.
- * Change design to approximate "zero" copy better...
+ * Change design to approximate "zero" copy better... [CPU]
* - could avoid copying body of message into each fragment and keep
* fragments as just pointers into the original message and only
* fully build fragments just before transmission (optimization, should
- * reduce CPU and memory use)
+ * reduce CPU and memory use) [CPU, MEMORY]
* - if messages are below MTU, consider adding ACKs and other stuff
- * (requires planning at receiver, and additional MST-style demultiplex
- * at receiver!)
+ * to the same transmission to avoid tiny messages (requires planning at
+ * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
* - When we passively learned DV (with unconfirmed freshness), we
* right now add the path to our list but with a zero path_valid_until
* time and only use it for unconfirmed routes. However, we could consider
* triggering an explicit validation mechansim ourselves, specifically routing
- * a challenge-response message over the path (OPTIMIZATION-FIXME).
+ * a challenge-response message over the path [ROUTING]
+ * - Track ACK losses based on ACK-counter [ROUTING]
*
* Design realizations / discussion:
* - communicators do flow control by calling MQ "notify sent"
@@ -1115,6 +1106,44 @@ struct PendingMessage;
*/
struct DistanceVectorHop;
+
+/**
+ * Context from #handle_incoming_msg(). Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+
+ /**
+ * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+ * flow control to unchoke.
+ */
+ struct CommunicatorMessageContext *next;
+
+ /**
+ * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+ * flow control to unchoke.
+ */
+ struct CommunicatorMessageContext *prev;
+
+ /**
+ * Which communicator provided us with the message.
+ */
+ struct TransportClient *tc;
+
+ /**
+ * Additional information for flow control and about the sender.
+ */
+ struct GNUNET_TRANSPORT_IncomingMessage im;
+
+ /**
+ * Number of hops the message has travelled (if DV-routed).
+ * FIXME: make use of this in ACK handling!
+ */
+ uint16_t total_hops;
+};
+
+
/**
* A virtual link is another reachable peer that is known to CORE. It
* can be either a `struct Neighbour` with at least one confirmed
@@ -1131,6 +1160,18 @@ struct VirtualLink
struct GNUNET_PeerIdentity target;
/**
+ * Communicators blocked for receiving on @e target as we are waiting
+ * on the @e core_recv_window to increase.
+ */
+ struct CommunicatorMessageContext *cmc_head;
+
+ /**
+ * Communicators blocked for receiving on @e target as we are waiting
+ * on the @e core_recv_window to increase.
+ */
+ struct CommunicatorMessageContext *cmc_tail;
+
+ /**
* Task scheduled to possibly notfiy core that this peer is no
* longer counting as confirmed. Runs the #core_visibility_check(),
* which checks that some DV-path or a queue exists that is still
@@ -1152,9 +1193,11 @@ struct VirtualLink
* How many more messages can we send to core before we exhaust
* the receive window of CORE for this peer? If this hits zero,
* we must tell communicators to stop providing us more messages
- * for this peer.
+ * for this peer. In fact, the window can go negative as we
+ * have multiple communicators, so per communicator we can go
+ * down by one into the negative range.
*/
- unsigned int core_recv_window;
+ int core_recv_window;
};
@@ -3497,21 +3540,14 @@ free_pending_message (struct PendingMessage *pm)
/**
- * Send a response to the @a pm that we have processed a
- * "send" request with status @a success. We
- * transmitted @a bytes_physical on the actual wire.
- * Sends a confirmation to the "core" client responsible
- * for the original request and free's @a pm.
+ * Send a response to the @a pm that we have processed a "send"
+ * request. Sends a confirmation to the "core" client responsible for
+ * the original request and free's @a pm.
*
* @param pm handle to the original pending message
- * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
- * for transmission failure
- * @param bytes_physical amount of bandwidth consumed
*/
static void
-client_send_response (struct PendingMessage *pm,
- int success,
- uint32_t bytes_physical)
+client_send_response (struct PendingMessage *pm)
{
struct TransportClient *tc = pm->client;
struct Neighbour *target = pm->target;
@@ -3523,10 +3559,7 @@ client_send_response (struct PendingMessage *pm,
env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
som->peer = target->pid;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Confirming %s transmission of %u/%u bytes to %s\n",
- (GNUNET_OK == success) ? "successful" : "failed",
- (unsigned int) pm->bytes_msg,
- (unsigned int) bytes_physical,
+ "Confirming transmission to %s\n",
GNUNET_i2s (&pm->target->pid));
GNUNET_MQ_send (tc->mq, env);
}
@@ -3827,6 +3860,31 @@ check_communicator_available (
/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+ if (0 != ntohl (cmc->im.fc_on))
+ {
+ /* send ACK when done to communicator for flow control! */
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+ env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+ ack->reserved = htonl (0);
+ ack->fc_id = cmc->im.fc_id;
+ ack->sender = cmc->im.sender;
+ GNUNET_MQ_send (cmc->tc->mq, env);
+ }
+ GNUNET_SERVICE_client_continue (cmc->tc->client);
+ GNUNET_free (cmc);
+}
+
+
+/**
* Client confirms that it is done handling message(s) to a particular
* peer. We may now provide more messages to CORE for this peer.
*
@@ -3841,6 +3899,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
struct TransportClient *tc = cls;
struct VirtualLink *vl;
uint32_t delta;
+ struct CommunicatorMessageContext *cmc;
if (CT_CORE != tc->type)
{
@@ -3860,9 +3919,13 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
}
delta = ntohl (rom->increase_window_delta);
vl->core_recv_window += delta;
- if (delta == vl->core_recv_window)
+ if (vl->core_recv_window <= 0)
+ return;
+ /* resume communicators */
+ while (NULL != (cmc = vl->cmc_tail))
{
- // FIXME: resume communicators!
+ GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
+ finish_cmc_handling (cmc);
}
}
@@ -4684,30 +4747,6 @@ handle_del_address (void *cls,
/**
- * Context from #handle_incoming_msg(). Closure for many
- * message handlers below.
- */
-struct CommunicatorMessageContext
-{
- /**
- * Which communicator provided us with the message.
- */
- struct TransportClient *tc;
-
- /**
- * Additional information for flow control and about the sender.
- */
- struct GNUNET_TRANSPORT_IncomingMessage im;
-
- /**
- * Number of hops the message has travelled (if DV-routed).
- * FIXME: make use of this in ACK handling!
- */
- uint16_t total_hops;
-};
-
-
-/**
* Given an inbound message @a msg from a communicator @a cmc,
* demultiplex it based on the type calling the right handler.
*
@@ -4720,31 +4759,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
/**
- * Send ACK to communicator (if requested) and free @a cmc.
- *
- * @param cmc context for which we are done handling the message
- */
-static void
-finish_cmc_handling (struct CommunicatorMessageContext *cmc)
-{
- if (0 != ntohl (cmc->im.fc_on))
- {
- /* send ACK when done to communicator for flow control! */
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
-
- env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
- ack->reserved = htonl (0);
- ack->fc_id = cmc->im.fc_id;
- ack->sender = cmc->im.sender;
- GNUNET_MQ_send (cmc->tc->mq, env);
- }
- GNUNET_SERVICE_client_continue (cmc->tc->client);
- GNUNET_free (cmc);
-}
-
-
-/**
* Communicator gave us an unencapsulated message to pass as-is to
* CORE. Process the request.
*
@@ -4756,6 +4770,7 @@ static void
handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
{
struct CommunicatorMessageContext *cmc = cls;
+ struct VirtualLink *vl;
uint16_t size = ntohs (mh->size);
if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
@@ -4768,6 +4783,25 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
GNUNET_SERVICE_client_drop (client);
return;
}
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+ if (NULL == vl)
+ {
+ /* FIXME: sender is giving us messages for CORE but we don't have
+ the link up yet! I *suspect* this can happen right now (i.e.
+ sender has verified us, but we didn't verify sender), but if
+ we pass this on, CORE would be confused (link down, messages
+ arrive). We should investigate more if this happens often,
+ or in a persistent manner, and possibly do "something" about
+ it. Thus logging as error for now. */
+ GNUNET_break_op (0);
+ GNUNET_STATISTICS_update (GST_stats,
+ "# CORE messages droped (virtual link still down)",
+ 1,
+ GNUNET_NO);
+
+ finish_cmc_handling (cmc);
+ return;
+ }
/* Forward to all CORE clients */
for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
{
@@ -4781,11 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
memcpy (&im[1], mh, size);
GNUNET_MQ_send (tc->mq, env);
}
- /* FIXME: consider doing this _only_ once the message
- was drained from the CORE MQs to extend flow control to CORE!
- (basically, increment counter in cmc, decrement on MQ send continuation!
- */
- finish_cmc_handling (cmc);
+ vl->core_recv_window--;
+ if (vl->core_recv_window > 0)
+ {
+ finish_cmc_handling (cmc);
+ return;
+ }
+ /* Wait with calling #finish_cmc_handling(cmc) until the message
+ was processed by CORE MQs (for CORE flow control)! */
+ GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
}
@@ -5345,7 +5383,8 @@ handle_reliability_ack (void *cls,
}
ack_counter = htonl (ra->ack_counter);
- // FIXME: track ACK losses based on ack_counter somewhere!
+ (void) ack_counter; /* silence compiler warning for now */
+ // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
// (DV and/or Neighbour?)
finish_cmc_handling (cmc);
}
@@ -7380,7 +7419,7 @@ reliability_box_message (struct Queue *queue,
{
/* failed hard */
GNUNET_break (0);
- client_send_response (pm, GNUNET_NO, 0);
+ client_send_response (pm);
return NULL;
}
pa = prepare_pending_acknowledgement (queue, dvh, pm);
@@ -7531,7 +7570,7 @@ transmit_on_queue (void *cls)
(GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
{
/* Full message sent, and over reliabile channel */
- client_send_response (pm, GNUNET_YES, pm->bytes_msg);
+ client_send_response (pm);
}
else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
queue->tc->details.communicator.cc) &&
@@ -7556,10 +7595,7 @@ transmit_on_queue (void *cls)
/* Was this the last applicable fragmment? */
if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
- client_send_response (
- pm,
- GNUNET_YES,
- pm->bytes_msg /* FIXME: calculate and add overheads! */);
+ client_send_response (pm);
}
else if (PMT_CORE != pm->pmt)
{