From 67935982582a31e4a75dc431feceee9664bca839 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 22 Nov 2018 18:37:35 +0100 Subject: more work on TNG --- src/transport/gnunet-service-tng.c | 274 +++++++++++++++++++++++++++++++++++-- 1 file changed, 262 insertions(+), 12 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index e5fb51bd2..efbaf6fc9 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -21,16 +21,17 @@ * @author Christian Grothoff * * TODO: - * - MTU information is missing for queues! - * - start supporting monitor logic (add functions to signal monitors!) + * - monitor start: iterate to inform monitor about all existing queues! * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc. - * - ask ATS about bandwidth allocation + * - inform ATS about RTT, goodput/loss, overheads, etc. + * - ask ATS about bandwidth allocation! * - */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_statistics_service.h" #include "gnunet_transport_service.h" +#include "gnunet_transport_monitor_service.h" #include "gnunet_peerstore_service.h" #include "gnunet_ats_service.h" #include "gnunet-service-transport.h" @@ -136,6 +137,11 @@ struct Queue */ uint32_t qid; + /** + * Maximum transmission unit supported by this queue. + */ + uint32_t mtu; + /** * Network type offered by this queue. */ @@ -453,6 +459,127 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid) } +/** + * Details about what to notify monitors about. + */ +struct MonitorEvent +{ + /** + * @deprecated To be discussed if we keep these... + */ + struct GNUNET_TIME_Absolute last_validation; + struct GNUNET_TIME_Absolute valid_until; + struct GNUNET_TIME_Absolute next_validation; + + /** + * Current round-trip time estimate. + */ + struct GNUNET_TIME_Relative rtt; + + /** + * Connection status. + */ + enum GNUNET_TRANSPORT_ConnectionStatus cs; + + /** + * Messages pending. + */ + uint32_t num_msg_pending; + + /** + * Bytes pending. + */ + uint32_t num_bytes_pending; + + +}; + + +/** + * Notify monitor @a tc about an event. That @a tc + * cares about the event has already been checked. + * + * Send @a tc information in @a me about a @a peer's status with + * respect to some @a address to all monitors that care. + * + * @param tc monitor to inform + * @param peer peer the information is about + * @param address address the information is about + * @param nt network type associated with @a address + * @param me detailed information to transmit + */ +static void +notify_monitor (struct TransportClient *tc, + const struct GNUNET_PeerIdentity *peer, + const char *address, + enum GNUNET_ATS_Network_Type nt, + const struct MonitorEvent *me) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_MonitorData *md; + size_t addr_len = strlen (address) + 1; + + env = GNUNET_MQ_msg_extra (md, + addr_len, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA); + md->nt = htonl ((uint32_t) nt); + md->peer = *peer; + md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation); + md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until); + md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation); + md->rtt = GNUNET_TIME_relative_hton (me->rtt); + md->cs = htonl ((uint32_t) me->cs); + md->num_msg_pending = htonl (me->num_msg_pending); + md->num_bytes_pending = htonl (me->num_bytes_pending); + memcpy (&md[1], + address, + addr_len); + GNUNET_MQ_send (tc->mq, + env); +} + + +/** + * Send information in @a me about a @a peer's status with respect + * to some @a address to all monitors that care. + * + * @param peer peer the information is about + * @param address address the information is about + * @param nt network type associated with @a address + * @param me detailed information to transmit + */ +static void +notify_monitors (const struct GNUNET_PeerIdentity *peer, + const char *address, + enum GNUNET_ATS_Network_Type nt, + const struct MonitorEvent *me) +{ + static struct GNUNET_PeerIdentity zero; + + for (struct TransportClient *tc = clients_head; + NULL != tc; + tc = tc->next) + { + if (CT_MONITOR != tc->type) + continue; + if (tc->details.monitor.one_shot) + continue; + if ( (0 != memcmp (&tc->details.monitor.peer, + &zero, + sizeof (zero))) && + (0 != memcmp (&tc->details.monitor.peer, + peer, + sizeof (*peer))) ) + continue; + notify_monitor (tc, + peer, + address, + nt, + me); + } +} + + /** * Called whenever a client connects. Allocates our * data structures associated with that client. @@ -499,6 +626,80 @@ free_neighbour (struct Neighbour *neighbour) } +/** + * Send message to CORE clients that we lost a connection. + * + * @param tc client to inform (must be CORE client) + * @param pid peer the connection is for + * @param quota_out current quota for the peer + */ +static void +core_send_connect_info (struct TransportClient *tc, + const struct GNUNET_PeerIdentity *pid, + struct GNUNET_BANDWIDTH_Value32NBO quota_out) +{ + struct GNUNET_MQ_Envelope *env; + struct ConnectInfoMessage *cim; + + GNUNET_assert (CT_CORE == tc->type); + env = GNUNET_MQ_msg (cim, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); + cim->quota_out = quota_out; + cim->id = *pid; + GNUNET_MQ_send (tc->mq, + env); +} + + +/** + * Send message to CORE clients that we gained a connection + * + * @param pid peer the queue was for + * @param quota_out current quota for the peer + */ +static void +cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, + struct GNUNET_BANDWIDTH_Value32NBO quota_out) +{ + for (struct TransportClient *tc = clients_head; + NULL != tc; + tc = tc->next) + { + if (CT_CORE != tc->type) + continue; + core_send_connect_info (tc, + pid, + quota_out); + } +} + + +/** + * Send message to CORE clients that we lost a connection. + * + * @param pid peer the connection was for + */ +static void +cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) +{ + for (struct TransportClient *tc = clients_head; + NULL != tc; + tc = tc->next) + { + struct GNUNET_MQ_Envelope *env; + struct DisconnectInfoMessage *dim; + + if (CT_CORE != tc->type) + continue; + env = GNUNET_MQ_msg (dim, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); + dim->peer = *pid; + GNUNET_MQ_send (tc->mq, + env); + } +} + + /** * Free @a queue. * @@ -509,6 +710,10 @@ free_queue (struct Queue *queue) { struct Neighbour *neighbour = queue->neighbour; struct TransportClient *tc = queue->tc; + struct MonitorEvent me = { + .cs = GNUNET_TRANSPORT_CS_DOWN, + .rtt = GNUNET_TIME_UNIT_FOREVER_REL + }; GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->queue_head, @@ -518,10 +723,15 @@ free_queue (struct Queue *queue) tc->details.communicator.queue_head, tc->details.communicator.queue_tail, queue); + + notify_monitors (&neighbour->pid, + queue->address, + queue->nt, + &me); GNUNET_free (queue); if (NULL == neighbour->queue_head) { - // FIXME: notify cores/monitors! + cores_send_disconnect_info (&neighbour->pid); free_neighbour (neighbour); } } @@ -612,6 +822,30 @@ client_disconnect_cb (void *cls, } +/** + * Iterator telling new CORE client about all existing + * connections to peers. + * + * @param cls the new `struct TransportClient` + * @param pid a connected peer + * @param value the `struct Neighbour` with more information + * @return #GNUNET_OK (continue to iterate) + */ +static int +notify_client_connect_info (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct TransportClient *tc = cls; + struct Neighbour *neighbour = value; + + core_send_connect_info (tc, + pid, + neighbour->quota_out); + return GNUNET_OK; +} + + /** * Initialize a "CORE" client. We got a start message from this * client, so add it to the list of clients for broadcasting of @@ -646,6 +880,9 @@ handle_client_start (void *cls, return; } tc->type = CT_CORE; + GNUNET_CONTAINER_multipeermap_iterate (neighbours, + ¬ify_client_connect_info, + tc); GNUNET_SERVICE_client_continue (tc->client); } @@ -786,7 +1023,7 @@ handle_client_send (void *cls, tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); - // FIXME: do the work, continuation with: + // FIXME: do the work, final continuation with call to: client_send_response (pm, GNUNET_NO, 0); @@ -930,6 +1167,7 @@ store_pi (void *cls) ale->st = NULL; expiration = GNUNET_TIME_relative_to_absolute (ale->expiration); GNUNET_HELLO_sign_address (ale->address, + ale->nt, expiration, GST_my_private_key, &addr, @@ -1123,7 +1361,7 @@ handle_add_queue_message (void *cls, struct Queue *queue; struct Neighbour *neighbour; const char *addr; - uint16_t addr_len; + uint16_t addr_len; neighbour = lookup_neighbour (&aqm->receiver); if (NULL == neighbour) @@ -1135,12 +1373,15 @@ handle_add_queue_message (void *cls, &neighbour->pid, neighbour, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - // FIXME: notify ATS/COREs/monitors! + cores_send_connect_info (&neighbour->pid, + GNUNET_BANDWIDTH_ZERO); + // FIXME: notify ATS! } addr_len = ntohs (aqm->header.size) - sizeof (*aqm); addr = (const char *) &aqm[1]; queue = GNUNET_malloc (sizeof (struct Queue) + addr_len); + queue->mtu = ntohl (aqm->mtu); queue->qid = aqm->qid; queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt); queue->tc = tc; @@ -1149,6 +1390,17 @@ handle_add_queue_message (void *cls, memcpy (&queue[1], addr, addr_len); + /* notify monitors about new queue */ + { + struct MonitorEvent me = { + .cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs) + }; + + notify_monitors (&neighbour->pid, + queue->address, + queue->nt, + &me); + } GNUNET_CONTAINER_MDLL_insert (neighbour, neighbour->queue_head, neighbour->queue_tail, @@ -1244,6 +1496,8 @@ handle_monitor_start (void *cls, tc->details.monitor.peer = start->peer; tc->details.monitor.one_shot = ntohl (start->one_shot); // FIXME: do work! + + GNUNET_SERVICE_client_mark_monitor (tc->client); GNUNET_SERVICE_client_continue (tc->client); } @@ -1286,10 +1540,6 @@ do_shutdown (void *cls) GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL); - /* FIXME: if this assertion fails (likely!), make sure we - clean up clients *before* doing the rest of the - shutdown! (i.e. by scheduling rest asynchronously!) */ - GNUNET_assert (NULL == clients_head); if (NULL != peerstore) { GNUNET_PEERSTORE_disconnect (peerstore, @@ -1362,7 +1612,7 @@ run (void *cls, */ GNUNET_SERVICE_MAIN ("transport", - GNUNET_SERVICE_OPTION_NONE, + GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, &run, &client_connect_cb, &client_disconnect_cb, -- cgit v1.2.3