summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-11-22 21:42:57 +0100
committerChristian Grothoff <christian@grothoff.org>2018-11-22 21:42:57 +0100
commitdd7379052f3749d87d8c35969ec94b4580e998b5 (patch)
tree2f4babf46193a8eb6058be4713fadb4e0ac24ef7 /src
parenta3a300f43a354c786d74c1952aba6a7816157c97 (diff)
implement monitor start logic
Diffstat (limited to 'src')
-rw-r--r--src/transport/gnunet-service-tng.c116
1 files changed, 93 insertions, 23 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index efbaf6fc9..ca8838380 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -21,11 +21,18 @@
* @author Christian Grothoff
*
* TODO:
- * - monitor start: iterate to inform monitor about all existing queues!
+ * - design ATS-NG API
+ * - figure out how to transmit (selective) ACKs in case of uni-directional
+ * communicators (with/without core? DV-only?) When do we use ACKs?
+ * How/where do we distinguish between TCP/HTTP and unreliable communicators?
+ * => Should communicator provide reliable/unreliable ("flags") information?
* - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
* - inform ATS about RTT, goodput/loss, overheads, etc.
* - ask ATS about bandwidth allocation!
- * -
+ * - change transport-core API to provide proper flow control in both
+ * directions, allow multiple messages per peer simultaneously (tag
+ * confirmations with unique message ID), and replace quota-out with
+ * proper flow control;
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -133,6 +140,11 @@ struct Queue
const char *address;
/**
+ * Our current RTT estimate for this queue.
+ */
+ struct GNUNET_TIME_Relative rtt;
+
+ /**
* Unique identifier of this queue with the communicator.
*/
uint32_t qid;
@@ -141,12 +153,27 @@ struct Queue
* Maximum transmission unit supported by this queue.
*/
uint32_t mtu;
-
+
/**
* Network type offered by this queue.
*/
enum GNUNET_ATS_Network_Type nt;
+ /**
+ * Connection status for this queue.
+ */
+ enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
+ /**
+ * Messages pending.
+ */
+ uint32_t num_msg_pending;
+
+ /**
+ * Bytes pending.
+ */
+ uint32_t num_bytes_pending;
+
// FIXME: add ATS-specific fields here!
};
@@ -516,7 +543,7 @@ notify_monitor (struct TransportClient *tc,
const struct MonitorEvent *me)
{
struct GNUNET_MQ_Envelope *env;
- struct GNUNET_TRANSPORT_MonitorData *md;
+ struct GNUNET_TRANSPORT_MonitorData *md;
size_t addr_len = strlen (address) + 1;
env = GNUNET_MQ_msg_extra (md,
@@ -577,7 +604,7 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer,
nt,
me);
}
-}
+}
/**
@@ -640,7 +667,7 @@ core_send_connect_info (struct TransportClient *tc,
{
struct GNUNET_MQ_Envelope *env;
struct ConnectInfoMessage *cim;
-
+
GNUNET_assert (CT_CORE == tc->type);
env = GNUNET_MQ_msg (cim,
GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
@@ -664,7 +691,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
for (struct TransportClient *tc = clients_head;
NULL != tc;
tc = tc->next)
- {
+ {
if (CT_CORE != tc->type)
continue;
core_send_connect_info (tc,
@@ -688,7 +715,7 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
{
struct GNUNET_MQ_Envelope *env;
struct DisconnectInfoMessage *dim;
-
+
if (CT_CORE != tc->type)
continue;
env = GNUNET_MQ_msg (dim,
@@ -739,14 +766,14 @@ free_queue (struct Queue *queue)
/**
* Free @a ale
- *
+ *
* @param ale address list entry to free
*/
static void
free_address_list_entry (struct AddressListEntry *ale)
{
struct TransportClient *tc = ale->tc;
-
+
GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
tc->details.communicator.addr_tail,
ale);
@@ -838,7 +865,7 @@ notify_client_connect_info (void *cls,
{
struct TransportClient *tc = cls;
struct Neighbour *neighbour = value;
-
+
core_send_connect_info (tc,
pid,
neighbour->quota_out);
@@ -1142,7 +1169,7 @@ peerstore_store_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to store our own address `%s' in peerstore!\n",
ale->address);
- /* refresh period is 1/4 of expiration time, that should be plenty
+ /* refresh period is 1/4 of expiration time, that should be plenty
without being excessive. */
ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
4ULL),
@@ -1163,7 +1190,7 @@ store_pi (void *cls)
void *addr;
size_t addr_len;
struct GNUNET_TIME_Absolute expiration;
-
+
ale->st = NULL;
expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
GNUNET_HELLO_sign_address (ale->address,
@@ -1171,7 +1198,7 @@ store_pi (void *cls)
expiration,
GST_my_private_key,
&addr,
- &addr_len);
+ &addr_len);
ale->sc = GNUNET_PEERSTORE_store (peerstore,
"transport",
&GST_my_identity,
@@ -1361,7 +1388,7 @@ handle_add_queue_message (void *cls,
struct Queue *queue;
struct Neighbour *neighbour;
const char *addr;
- uint16_t addr_len;
+ uint16_t addr_len;
neighbour = lookup_neighbour (&aqm->receiver);
if (NULL == neighbour)
@@ -1381,21 +1408,24 @@ handle_add_queue_message (void *cls,
addr = (const char *) &aqm[1];
queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
- queue->mtu = ntohl (aqm->mtu);
+ queue->tc = tc;
+ queue->address = (const char *) &queue[1];
+ queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
queue->qid = aqm->qid;
+ queue->mtu = ntohl (aqm->mtu);
queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt);
- queue->tc = tc;
+ queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
queue->neighbour = neighbour;
- queue->address = (const char *) &queue[1];
memcpy (&queue[1],
addr,
addr_len);
/* notify monitors about new queue */
{
struct MonitorEvent me = {
- .cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs)
+ .rtt = queue->rtt,
+ .cs = queue->cs
};
-
+
notify_monitors (&neighbour->pid,
queue->address,
queue->nt,
@@ -1475,6 +1505,45 @@ handle_send_message_ack (void *cls,
/**
+ * Iterator telling new MONITOR client about all existing
+ * queues to peers.
+ *
+ * @param cls the new `struct TransportClient`
+ * @param pid a connected peer
+ * @param value the `struct Neighbour` with more information
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+notify_client_queues (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct TransportClient *tc = cls;
+ struct Neighbour *neighbour = value;
+
+ GNUNET_assert (CT_MONITOR == tc->type);
+ for (struct Queue *q = neighbour->queue_head;
+ NULL != q;
+ q = q->next_neighbour)
+ {
+ struct MonitorEvent me = {
+ .rtt = q->rtt,
+ .cs = q->cs,
+ .num_msg_pending = q->num_msg_pending,
+ .num_bytes_pending = q->num_bytes_pending
+ };
+
+ notify_monitor (tc,
+ pid,
+ q->address,
+ q->nt,
+ &me);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
* Initialize a monitor client.
*
* @param cls the client
@@ -1495,8 +1564,9 @@ handle_monitor_start (void *cls,
tc->type = CT_MONITOR;
tc->details.monitor.peer = start->peer;
tc->details.monitor.one_shot = ntohl (start->one_shot);
- // FIXME: do work!
-
+ GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+ &notify_client_queues,
+ tc);
GNUNET_SERVICE_client_mark_monitor (tc->client);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -1551,7 +1621,7 @@ do_shutdown (void *cls)
GNUNET_STATISTICS_destroy (GST_stats,
GNUNET_NO);
GST_stats = NULL;
- }
+ }
if (NULL != GST_my_private_key)
{
GNUNET_free (GST_my_private_key);