From 67935982582a31e4a75dc431feceee9664bca839 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 22 Nov 2018 18:37:35 +0100 Subject: more work on TNG --- src/transport/gnunet-communicator-unix.c | 11 +- src/transport/gnunet-service-tng.c | 274 +++++++++++++++++++++++++-- src/transport/transport.h | 18 +- src/transport/transport_api2_communication.c | 28 ++- src/transport/transport_api2_monitor.c | 2 +- 5 files changed, 305 insertions(+), 28 deletions(-) (limited to 'src/transport') diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index b2eebbe20..a9a75f779 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c @@ -50,6 +50,10 @@ */ #define COMMUNICATOR_CONFIG_SECTION "communicator-unix" +/** + * Our MTU. + */ +#define UNIX_MTU UINT16_MAX GNUNET_NETWORK_STRUCT_BEGIN @@ -619,12 +623,14 @@ mq_error (void *cls, * data to another peer. * * @param peer the target peer + * @param cs inbound or outbound queue * @param un the address * @param un_len number of bytes in @a un * @return the queue or NULL of max connections exceeded */ static struct Queue * setup_queue (const struct GNUNET_PeerIdentity *target, + enum GNUNET_TRANSPORT_ConnectionStatus cs, const struct sockaddr_un *un, socklen_t un_len) { @@ -673,7 +679,9 @@ setup_queue (const struct GNUNET_PeerIdentity *target, = GNUNET_TRANSPORT_communicator_mq_add (ch, &queue->target, foreign_addr, + UNIX_MTU, GNUNET_ATS_NET_LOOPBACK, + cs, queue->mq); GNUNET_free (foreign_addr); } @@ -779,6 +787,7 @@ select_read_cb (void *cls) addrlen); if (NULL == queue) queue = setup_queue (&msg->sender, + GNUNET_TRANSPORT_CS_INBOUND, &un, addrlen); else @@ -885,6 +894,7 @@ mq_init (void *cls, return GNUNET_OK; } queue = setup_queue (peer, + GNUNET_TRANSPORT_CS_OUTBOUND, un, un_len); GNUNET_free (un); @@ -1072,7 +1082,6 @@ run (void *cls, ch = GNUNET_TRANSPORT_communicator_connect (cfg, COMMUNICATOR_CONFIG_SECTION, COMMUNICATOR_ADDRESS_PREFIX, - 65535, &mq_init, NULL); if (NULL == ch) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index e5fb51bd2..efbaf6fc9 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -21,16 +21,17 @@ * @author Christian Grothoff * * TODO: - * - MTU information is missing for queues! - * - start supporting monitor logic (add functions to signal monitors!) + * - monitor start: iterate to inform monitor about all existing queues! * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc. - * - ask ATS about bandwidth allocation + * - inform ATS about RTT, goodput/loss, overheads, etc. + * - ask ATS about bandwidth allocation! * - */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_statistics_service.h" #include "gnunet_transport_service.h" +#include "gnunet_transport_monitor_service.h" #include "gnunet_peerstore_service.h" #include "gnunet_ats_service.h" #include "gnunet-service-transport.h" @@ -136,6 +137,11 @@ struct Queue */ uint32_t qid; + /** + * Maximum transmission unit supported by this queue. + */ + uint32_t mtu; + /** * Network type offered by this queue. */ @@ -453,6 +459,127 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid) } +/** + * Details about what to notify monitors about. + */ +struct MonitorEvent +{ + /** + * @deprecated To be discussed if we keep these... + */ + struct GNUNET_TIME_Absolute last_validation; + struct GNUNET_TIME_Absolute valid_until; + struct GNUNET_TIME_Absolute next_validation; + + /** + * Current round-trip time estimate. + */ + struct GNUNET_TIME_Relative rtt; + + /** + * Connection status. + */ + enum GNUNET_TRANSPORT_ConnectionStatus cs; + + /** + * Messages pending. + */ + uint32_t num_msg_pending; + + /** + * Bytes pending. + */ + uint32_t num_bytes_pending; + + +}; + + +/** + * Notify monitor @a tc about an event. That @a tc + * cares about the event has already been checked. + * + * Send @a tc information in @a me about a @a peer's status with + * respect to some @a address to all monitors that care. + * + * @param tc monitor to inform + * @param peer peer the information is about + * @param address address the information is about + * @param nt network type associated with @a address + * @param me detailed information to transmit + */ +static void +notify_monitor (struct TransportClient *tc, + const struct GNUNET_PeerIdentity *peer, + const char *address, + enum GNUNET_ATS_Network_Type nt, + const struct MonitorEvent *me) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_MonitorData *md; + size_t addr_len = strlen (address) + 1; + + env = GNUNET_MQ_msg_extra (md, + addr_len, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA); + md->nt = htonl ((uint32_t) nt); + md->peer = *peer; + md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation); + md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until); + md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation); + md->rtt = GNUNET_TIME_relative_hton (me->rtt); + md->cs = htonl ((uint32_t) me->cs); + md->num_msg_pending = htonl (me->num_msg_pending); + md->num_bytes_pending = htonl (me->num_bytes_pending); + memcpy (&md[1], + address, + addr_len); + GNUNET_MQ_send (tc->mq, + env); +} + + +/** + * Send information in @a me about a @a peer's status with respect + * to some @a address to all monitors that care. + * + * @param peer peer the information is about + * @param address address the information is about + * @param nt network type associated with @a address + * @param me detailed information to transmit + */ +static void +notify_monitors (const struct GNUNET_PeerIdentity *peer, + const char *address, + enum GNUNET_ATS_Network_Type nt, + const struct MonitorEvent *me) +{ + static struct GNUNET_PeerIdentity zero; + + for (struct TransportClient *tc = clients_head; + NULL != tc; + tc = tc->next) + { + if (CT_MONITOR != tc->type) + continue; + if (tc->details.monitor.one_shot) + continue; + if ( (0 != memcmp (&tc->details.monitor.peer, + &zero, + sizeof (zero))) && + (0 != memcmp (&tc->details.monitor.peer, + peer, + sizeof (*peer))) ) + continue; + notify_monitor (tc, + peer, + address, + nt, + me); + } +} + + /** * Called whenever a client connects. Allocates our * data structures associated with that client. @@ -499,6 +626,80 @@ free_neighbour (struct Neighbour *neighbour) } +/** + * Send message to CORE clients that we lost a connection. + * + * @param tc client to inform (must be CORE client) + * @param pid peer the connection is for + * @param quota_out current quota for the peer + */ +static void +core_send_connect_info (struct TransportClient *tc, + const struct GNUNET_PeerIdentity *pid, + struct GNUNET_BANDWIDTH_Value32NBO quota_out) +{ + struct GNUNET_MQ_Envelope *env; + struct ConnectInfoMessage *cim; + + GNUNET_assert (CT_CORE == tc->type); + env = GNUNET_MQ_msg (cim, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); + cim->quota_out = quota_out; + cim->id = *pid; + GNUNET_MQ_send (tc->mq, + env); +} + + +/** + * Send message to CORE clients that we gained a connection + * + * @param pid peer the queue was for + * @param quota_out current quota for the peer + */ +static void +cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, + struct GNUNET_BANDWIDTH_Value32NBO quota_out) +{ + for (struct TransportClient *tc = clients_head; + NULL != tc; + tc = tc->next) + { + if (CT_CORE != tc->type) + continue; + core_send_connect_info (tc, + pid, + quota_out); + } +} + + +/** + * Send message to CORE clients that we lost a connection. + * + * @param pid peer the connection was for + */ +static void +cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) +{ + for (struct TransportClient *tc = clients_head; + NULL != tc; + tc = tc->next) + { + struct GNUNET_MQ_Envelope *env; + struct DisconnectInfoMessage *dim; + + if (CT_CORE != tc->type) + continue; + env = GNUNET_MQ_msg (dim, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); + dim->peer = *pid; + GNUNET_MQ_send (tc->mq, + env); + } +} + + /** * Free @a queue. * @@ -509,6 +710,10 @@ free_queue (struct Queue *queue) { struct Neighbour *neighbour = queue->neighbour; struct TransportClient *tc = queue->tc; + struct MonitorEvent me = { + .cs = GNUNET_TRANSPORT_CS_DOWN, + .rtt = GNUNET_TIME_UNIT_FOREVER_REL + }; GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->queue_head, @@ -518,10 +723,15 @@ free_queue (struct Queue *queue) tc->details.communicator.queue_head, tc->details.communicator.queue_tail, queue); + + notify_monitors (&neighbour->pid, + queue->address, + queue->nt, + &me); GNUNET_free (queue); if (NULL == neighbour->queue_head) { - // FIXME: notify cores/monitors! + cores_send_disconnect_info (&neighbour->pid); free_neighbour (neighbour); } } @@ -612,6 +822,30 @@ client_disconnect_cb (void *cls, } +/** + * Iterator telling new CORE client about all existing + * connections to peers. + * + * @param cls the new `struct TransportClient` + * @param pid a connected peer + * @param value the `struct Neighbour` with more information + * @return #GNUNET_OK (continue to iterate) + */ +static int +notify_client_connect_info (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct TransportClient *tc = cls; + struct Neighbour *neighbour = value; + + core_send_connect_info (tc, + pid, + neighbour->quota_out); + return GNUNET_OK; +} + + /** * Initialize a "CORE" client. We got a start message from this * client, so add it to the list of clients for broadcasting of @@ -646,6 +880,9 @@ handle_client_start (void *cls, return; } tc->type = CT_CORE; + GNUNET_CONTAINER_multipeermap_iterate (neighbours, + ¬ify_client_connect_info, + tc); GNUNET_SERVICE_client_continue (tc->client); } @@ -786,7 +1023,7 @@ handle_client_send (void *cls, tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); - // FIXME: do the work, continuation with: + // FIXME: do the work, final continuation with call to: client_send_response (pm, GNUNET_NO, 0); @@ -930,6 +1167,7 @@ store_pi (void *cls) ale->st = NULL; expiration = GNUNET_TIME_relative_to_absolute (ale->expiration); GNUNET_HELLO_sign_address (ale->address, + ale->nt, expiration, GST_my_private_key, &addr, @@ -1123,7 +1361,7 @@ handle_add_queue_message (void *cls, struct Queue *queue; struct Neighbour *neighbour; const char *addr; - uint16_t addr_len; + uint16_t addr_len; neighbour = lookup_neighbour (&aqm->receiver); if (NULL == neighbour) @@ -1135,12 +1373,15 @@ handle_add_queue_message (void *cls, &neighbour->pid, neighbour, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - // FIXME: notify ATS/COREs/monitors! + cores_send_connect_info (&neighbour->pid, + GNUNET_BANDWIDTH_ZERO); + // FIXME: notify ATS! } addr_len = ntohs (aqm->header.size) - sizeof (*aqm); addr = (const char *) &aqm[1]; queue = GNUNET_malloc (sizeof (struct Queue) + addr_len); + queue->mtu = ntohl (aqm->mtu); queue->qid = aqm->qid; queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt); queue->tc = tc; @@ -1149,6 +1390,17 @@ handle_add_queue_message (void *cls, memcpy (&queue[1], addr, addr_len); + /* notify monitors about new queue */ + { + struct MonitorEvent me = { + .cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs) + }; + + notify_monitors (&neighbour->pid, + queue->address, + queue->nt, + &me); + } GNUNET_CONTAINER_MDLL_insert (neighbour, neighbour->queue_head, neighbour->queue_tail, @@ -1244,6 +1496,8 @@ handle_monitor_start (void *cls, tc->details.monitor.peer = start->peer; tc->details.monitor.one_shot = ntohl (start->one_shot); // FIXME: do work! + + GNUNET_SERVICE_client_mark_monitor (tc->client); GNUNET_SERVICE_client_continue (tc->client); } @@ -1286,10 +1540,6 @@ do_shutdown (void *cls) GNUNET_CONTAINER_multipeermap_iterate (neighbours, &free_neighbour_cb, NULL); - /* FIXME: if this assertion fails (likely!), make sure we - clean up clients *before* doing the rest of the - shutdown! (i.e. by scheduling rest asynchronously!) */ - GNUNET_assert (NULL == clients_head); if (NULL != peerstore) { GNUNET_PEERSTORE_disconnect (peerstore, @@ -1362,7 +1612,7 @@ run (void *cls, */ GNUNET_SERVICE_MAIN ("transport", - GNUNET_SERVICE_OPTION_NONE, + GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, &run, &client_connect_cb, &client_disconnect_cb, diff --git a/src/transport/transport.h b/src/transport/transport.h index 423d3cefa..e8c276342 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -800,7 +800,15 @@ struct GNUNET_TRANSPORT_AddQueueMessage */ uint32_t nt; - // FIXME: add MTU? + /** + * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. + */ + uint32_t mtu; + + /** + * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. + */ + uint32_t cs; /* followed by UTF-8 encoded, 0-terminated human-readable address */ }; @@ -992,9 +1000,9 @@ struct GNUNET_TRANSPORT_MonitorData struct GNUNET_TIME_RelativeNBO rtt; /** - * Is inbound (in NBO). + * Connection status (in NBO). */ - uint32_t is_inbound GNUNET_PACKED; + uint32_t cs GNUNET_PACKED; /** * Messages pending (in NBO). @@ -1006,9 +1014,7 @@ struct GNUNET_TRANSPORT_MonitorData */ uint32_t num_bytes_pending GNUNET_PACKED; - /* Followed by 0-terminated address of the peer - (TODO: do we allow no address? If so, - adjust transport_api2_monitor!) */ + /* Followed by 0-terminated address of the peer */ }; diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index 3a68c6eba..6704f0cd8 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c @@ -192,11 +192,6 @@ struct GNUNET_TRANSPORT_CommunicatorHandle */ uint64_t fc_gen; - /** - * MTU of the communicator - */ - size_t mtu; - /** * Internal UUID for the address used in communication with the * transport service. @@ -248,6 +243,11 @@ struct GNUNET_TRANSPORT_QueueHandle */ enum GNUNET_ATS_Network_Type nt; + /** + * Communication status of the queue. + */ + enum GNUNET_TRANSPORT_ConnectionStatus cs; + /** * The queue itself. */ @@ -257,6 +257,11 @@ struct GNUNET_TRANSPORT_QueueHandle * ID for this queue when talking to the transport service. */ uint32_t queue_id; + + /** + * Maximum transmission unit for the queue. + */ + uint32_t mtu; }; @@ -383,9 +388,11 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) env = GNUNET_MQ_msg_extra (aqm, strlen (qh->address) + 1, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); + aqm->qid = htonl (qh->queue_id); aqm->receiver = qh->peer; aqm->nt = htonl ((uint32_t) qh->nt); - aqm->qid = htonl (qh->queue_id); + aqm->mtu = htonl (qh->mtu); + aqm->cs = htonl ((uint32_t) qh->cs); memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); @@ -776,7 +783,6 @@ struct GNUNET_TRANSPORT_CommunicatorHandle * GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, const char *config_section, const char *addr_prefix, - size_t mtu, GNUNET_TRANSPORT_CommunicatorMqInit mq_init, void *mq_init_cls) { @@ -786,7 +792,6 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle ch->cfg = cfg; ch->config_section = config_section; ch->addr_prefix = addr_prefix; - ch->mtu = mtu; ch->mq_init = mq_init; ch->mq_init_cls = mq_init_cls; reconnect (ch); @@ -910,7 +915,10 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl * @param ch connection to transport service * @param peer peer with which we can now communicate * @param address address in human-readable format, 0-terminated, UTF-8 + * @param mtu maximum message size supported by queue, 0 if + * sending is not supported, SIZE_MAX for no MTU * @param nt which network type does the @a address belong to? + * @param cs what is the connection status of the queue? * @param mq message queue of the @a peer * @return API handle identifying the new MQ */ @@ -918,7 +926,9 @@ struct GNUNET_TRANSPORT_QueueHandle * GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, const struct GNUNET_PeerIdentity *peer, const char *address, + uint32_t mtu, enum GNUNET_ATS_Network_Type nt, + enum GNUNET_TRANSPORT_ConnectionStatus cs, struct GNUNET_MQ_Handle *mq) { struct GNUNET_TRANSPORT_QueueHandle *qh; @@ -928,6 +938,8 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle qh->peer = *peer; qh->address = GNUNET_strdup (address); qh->nt = nt; + qh->mtu = mtu; + qh->cs = cs; qh->mq = mq; qh->queue_id = ch->queue_gen++; GNUNET_CONTAINER_DLL_insert (ch->queue_head, diff --git a/src/transport/transport_api2_monitor.c b/src/transport/transport_api2_monitor.c index d7b13ec74..3798296c1 100644 --- a/src/transport/transport_api2_monitor.c +++ b/src/transport/transport_api2_monitor.c @@ -175,7 +175,7 @@ handle_monitor_data (void *cls, mi.address = (const char *) &md[1]; mi.nt = (enum GNUNET_ATS_Network_Type) ntohl (md->nt); - mi.is_inbound = (int) ntohl (md->is_inbound); + mi.cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (md->cs); mi.num_msg_pending = ntohl (md->num_msg_pending); mi.num_bytes_pending = ntohl (md->num_bytes_pending); mi.last_validation = GNUNET_TIME_absolute_ntoh (md->last_validation); -- cgit v1.2.3