From dd7379052f3749d87d8c35969ec94b4580e998b5 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 22 Nov 2018 21:42:57 +0100 Subject: implement monitor start logic --- src/transport/gnunet-service-tng.c | 116 +++++++++++++++++++++++++++++-------- 1 file changed, 93 insertions(+), 23 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index efbaf6fc9..ca8838380 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -21,11 +21,18 @@ * @author Christian Grothoff * * TODO: - * - monitor start: iterate to inform monitor about all existing queues! + * - design ATS-NG API + * - figure out how to transmit (selective) ACKs in case of uni-directional + * communicators (with/without core? DV-only?) When do we use ACKs? + * How/where do we distinguish between TCP/HTTP and unreliable communicators? + * => Should communicator provide reliable/unreliable ("flags") information? * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc. * - inform ATS about RTT, goodput/loss, overheads, etc. * - ask ATS about bandwidth allocation! - * - + * - change transport-core API to provide proper flow control in both + * directions, allow multiple messages per peer simultaneously (tag + * confirmations with unique message ID), and replace quota-out with + * proper flow control; */ #include "platform.h" #include "gnunet_util_lib.h" @@ -132,6 +139,11 @@ struct Queue */ const char *address; + /** + * Our current RTT estimate for this queue. + */ + struct GNUNET_TIME_Relative rtt; + /** * Unique identifier of this queue with the communicator. */ @@ -141,12 +153,27 @@ struct Queue * Maximum transmission unit supported by this queue. */ uint32_t mtu; - + /** * Network type offered by this queue. */ enum GNUNET_ATS_Network_Type nt; + /** + * Connection status for this queue. + */ + enum GNUNET_TRANSPORT_ConnectionStatus cs; + + /** + * Messages pending. + */ + uint32_t num_msg_pending; + + /** + * Bytes pending. + */ + uint32_t num_bytes_pending; + // FIXME: add ATS-specific fields here! }; @@ -516,7 +543,7 @@ notify_monitor (struct TransportClient *tc, const struct MonitorEvent *me) { struct GNUNET_MQ_Envelope *env; - struct GNUNET_TRANSPORT_MonitorData *md; + struct GNUNET_TRANSPORT_MonitorData *md; size_t addr_len = strlen (address) + 1; env = GNUNET_MQ_msg_extra (md, @@ -577,7 +604,7 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer, nt, me); } -} +} /** @@ -640,7 +667,7 @@ core_send_connect_info (struct TransportClient *tc, { struct GNUNET_MQ_Envelope *env; struct ConnectInfoMessage *cim; - + GNUNET_assert (CT_CORE == tc->type); env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); @@ -664,7 +691,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) - { + { if (CT_CORE != tc->type) continue; core_send_connect_info (tc, @@ -688,7 +715,7 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) { struct GNUNET_MQ_Envelope *env; struct DisconnectInfoMessage *dim; - + if (CT_CORE != tc->type) continue; env = GNUNET_MQ_msg (dim, @@ -739,14 +766,14 @@ free_queue (struct Queue *queue) /** * Free @a ale - * + * * @param ale address list entry to free */ static void free_address_list_entry (struct AddressListEntry *ale) { struct TransportClient *tc = ale->tc; - + GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head, tc->details.communicator.addr_tail, ale); @@ -838,7 +865,7 @@ notify_client_connect_info (void *cls, { struct TransportClient *tc = cls; struct Neighbour *neighbour = value; - + core_send_connect_info (tc, pid, neighbour->quota_out); @@ -1142,7 +1169,7 @@ peerstore_store_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to store our own address `%s' in peerstore!\n", ale->address); - /* refresh period is 1/4 of expiration time, that should be plenty + /* refresh period is 1/4 of expiration time, that should be plenty without being excessive. */ ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration, 4ULL), @@ -1163,7 +1190,7 @@ store_pi (void *cls) void *addr; size_t addr_len; struct GNUNET_TIME_Absolute expiration; - + ale->st = NULL; expiration = GNUNET_TIME_relative_to_absolute (ale->expiration); GNUNET_HELLO_sign_address (ale->address, @@ -1171,7 +1198,7 @@ store_pi (void *cls) expiration, GST_my_private_key, &addr, - &addr_len); + &addr_len); ale->sc = GNUNET_PEERSTORE_store (peerstore, "transport", &GST_my_identity, @@ -1361,7 +1388,7 @@ handle_add_queue_message (void *cls, struct Queue *queue; struct Neighbour *neighbour; const char *addr; - uint16_t addr_len; + uint16_t addr_len; neighbour = lookup_neighbour (&aqm->receiver); if (NULL == neighbour) @@ -1381,21 +1408,24 @@ handle_add_queue_message (void *cls, addr = (const char *) &aqm[1]; queue = GNUNET_malloc (sizeof (struct Queue) + addr_len); - queue->mtu = ntohl (aqm->mtu); + queue->tc = tc; + queue->address = (const char *) &queue[1]; + queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL; queue->qid = aqm->qid; + queue->mtu = ntohl (aqm->mtu); queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt); - queue->tc = tc; + queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); queue->neighbour = neighbour; - queue->address = (const char *) &queue[1]; memcpy (&queue[1], addr, addr_len); /* notify monitors about new queue */ { struct MonitorEvent me = { - .cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs) + .rtt = queue->rtt, + .cs = queue->cs }; - + notify_monitors (&neighbour->pid, queue->address, queue->nt, @@ -1474,6 +1504,45 @@ handle_send_message_ack (void *cls, } +/** + * Iterator telling new MONITOR client about all existing + * queues to peers. + * + * @param cls the new `struct TransportClient` + * @param pid a connected peer + * @param value the `struct Neighbour` with more information + * @return #GNUNET_OK (continue to iterate) + */ +static int +notify_client_queues (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct TransportClient *tc = cls; + struct Neighbour *neighbour = value; + + GNUNET_assert (CT_MONITOR == tc->type); + for (struct Queue *q = neighbour->queue_head; + NULL != q; + q = q->next_neighbour) + { + struct MonitorEvent me = { + .rtt = q->rtt, + .cs = q->cs, + .num_msg_pending = q->num_msg_pending, + .num_bytes_pending = q->num_bytes_pending + }; + + notify_monitor (tc, + pid, + q->address, + q->nt, + &me); + } + return GNUNET_OK; +} + + /** * Initialize a monitor client. * @@ -1495,8 +1564,9 @@ handle_monitor_start (void *cls, tc->type = CT_MONITOR; tc->details.monitor.peer = start->peer; tc->details.monitor.one_shot = ntohl (start->one_shot); - // FIXME: do work! - + GNUNET_CONTAINER_multipeermap_iterate (neighbours, + ¬ify_client_queues, + tc); GNUNET_SERVICE_client_mark_monitor (tc->client); GNUNET_SERVICE_client_continue (tc->client); } @@ -1551,7 +1621,7 @@ do_shutdown (void *cls) GNUNET_STATISTICS_destroy (GST_stats, GNUNET_NO); GST_stats = NULL; - } + } if (NULL != GST_my_private_key) { GNUNET_free (GST_my_private_key); -- cgit v1.2.3