From fbc5f3876a2ed52f18e2a2810e3cdda497cc99ea Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 22 Nov 2018 22:46:43 +0100 Subject: add design sketch for new ATS API --- src/transport/gnunet-communicator-unix.c | 44 +++++------ src/transport/gnunet-service-tng.c | 16 +++- src/transport/transport.h | 10 +++ src/transport/transport_api2_communication.c | 105 ++++++++++++++++----------- 4 files changed, 110 insertions(+), 65 deletions(-) (limited to 'src/transport') diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index a9a75f779..a97560ad4 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ @@ -31,8 +31,8 @@ #include "gnunet_transport_communication_service.h" /** - * How many messages do we keep at most in the queue to the - * transport service before we start to drop (default, + * How many messages do we keep at most in the queue to the + * transport service before we start to drop (default, * can be changed via the configuration file). * Should be _below_ the level of the communicator API, as * otherwise we may read messages just to have them dropped @@ -113,17 +113,17 @@ struct Queue * if this queue is in the #queue_head DLL. */ const struct GNUNET_MessageHeader *msg; - + /** * Message queue we are providing for the #ch. */ struct GNUNET_MQ_Handle *mq; - + /** * handle for this queue with the #ch. - */ + */ struct GNUNET_TRANSPORT_QueueHandle *qh; - + /** * Number of bytes we currently have in our write queue. */ @@ -211,14 +211,14 @@ queue_destroy (struct Queue *queue) struct GNUNET_MQ_Handle *mq; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting queue for peer `%s'\n", + "Disconnecting queue for peer `%s'\n", GNUNET_i2s (&queue->target)); if (0 != queue->bytes_in_queue) { GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue); - queue->bytes_in_queue = 0; + queue->bytes_in_queue = 0; } if (NULL != (mq = queue->mq)) { @@ -439,7 +439,7 @@ select_write_cb (void *cls) queue->address_len); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "UNIX transmitted message to %s (%d/%u: %s)\n", - GNUNET_i2s (&queue->target), + GNUNET_i2s (&queue->target), (int) sent, (unsigned int) msg_size, (sent < 0) ? STRERROR (errno) : "ok"); @@ -463,7 +463,7 @@ select_write_cb (void *cls) /* We should retry later... */ GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send"); - return; + return; case EMSGSIZE: { socklen_t size = 0; @@ -533,7 +533,7 @@ mq_send (struct GNUNET_MQ_Handle *mq, GNUNET_assert (mq == queue->mq); GNUNET_assert (NULL == queue->msg); - queue->msg = msg; + queue->msg = msg; GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue); @@ -664,7 +664,7 @@ setup_queue (const struct GNUNET_PeerIdentity *target, queue); { char *foreign_addr; - + if ('\0' == un->sun_path[0]) GNUNET_asprintf (&foreign_addr, "%s-@%s", @@ -679,8 +679,9 @@ setup_queue (const struct GNUNET_PeerIdentity *target, = GNUNET_TRANSPORT_communicator_mq_add (ch, &queue->target, foreign_addr, - UNIX_MTU, + UNIX_MTU, GNUNET_ATS_NET_LOOPBACK, + 0 /* distance */, cs, queue->mq); GNUNET_free (foreign_addr); @@ -798,12 +799,12 @@ select_read_cb (void *cls) _("Maximum number of UNIX connections exceeded, dropping incoming message\n")); return; } - + { uint16_t offset = 0; uint16_t tsize = msize - sizeof (struct UNIXMessage); const char *msgbuf = (const char *) &msg[1]; - + while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) { const struct GNUNET_MessageHeader *currhdr; @@ -870,7 +871,7 @@ mq_init (void *cls, const char *path; struct sockaddr_un *un; socklen_t un_len; - + if (0 != strncmp (address, COMMUNICATOR_ADDRESS_PREFIX "-", strlen (COMMUNICATOR_ADDRESS_PREFIX "-"))) @@ -902,7 +903,7 @@ mq_init (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to setup queue to %s at `%s'\n", - GNUNET_i2s (peer), + GNUNET_i2s (peer), path); return GNUNET_NO; } @@ -981,7 +982,7 @@ do_shutdown (void *cls) /** * Setup communicator and launch network interactions. - * + * * @param cls NULL (always) * @param args remaining command-line arguments * @param cfgfile name of the configuration file used (for saving, can be NULL!) @@ -998,7 +999,7 @@ run (void *cls, socklen_t un_len; char *my_addr; (void) cls; - + if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg, COMMUNICATOR_CONFIG_SECTION, @@ -1016,7 +1017,7 @@ run (void *cls, "MAX_QUEUE_LENGTH", &max_queue_length)) max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; - + un = unix_address_to_sockaddr (unix_socket_path, &un_len); if (NULL == un) @@ -1082,6 +1083,7 @@ run (void *cls, ch = GNUNET_TRANSPORT_communicator_connect (cfg, COMMUNICATOR_CONFIG_SECTION, COMMUNICATOR_ADDRESS_PREFIX, + GNUNET_TRANSPORT_CC_RELIABLE, &mq_init, NULL); if (NULL == ch) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index ca8838380..3e08900bb 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -154,6 +154,11 @@ struct Queue */ uint32_t mtu; + /** + * Distance to the target of this queue. + */ + uint32_t distance; + /** * Network type offered by this queue. */ @@ -423,6 +428,11 @@ struct TransportClient */ struct AddressListEntry *addr_tail; + /** + * Characteristics of this communicator. + */ + enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; + } communicator; } details; @@ -1106,7 +1116,10 @@ handle_communicator_available (void *cls, size = ntohs (cam->header.size) - sizeof (*cam); if (0 == size) return; /* receive-only communicator */ - tc->details.communicator.address_prefix = GNUNET_strdup ((const char *) &cam[1]); + tc->details.communicator.address_prefix + = GNUNET_strdup ((const char *) &cam[1]); + tc->details.communicator.cc + = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc); GNUNET_SERVICE_client_continue (tc->client); } @@ -1413,6 +1426,7 @@ handle_add_queue_message (void *cls, queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL; queue->qid = aqm->qid; queue->mtu = ntohl (aqm->mtu); + queue->distance = ntohl (aqm->distance); queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt); queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); queue->neighbour = neighbour; diff --git a/src/transport/transport.h b/src/transport/transport.h index e8c276342..515c178f4 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -661,6 +661,11 @@ struct GNUNET_TRANSPORT_CommunicatorAvailableMessage */ struct GNUNET_MessageHeader header; + /** + * NBO encoding of `enum GNUNET_TRANSPORT_CommunicatorCharacteristics` + */ + uint32_t cc; + /* Followed by the address prefix of the communicator */ }; @@ -810,6 +815,11 @@ struct GNUNET_TRANSPORT_AddQueueMessage */ uint32_t cs; + /** + * Hops to the target (DV-only), in NBO. + */ + uint32_t distance; + /* followed by UTF-8 encoded, 0-terminated human-readable address */ }; diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index 6704f0cd8..a89802ddd 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ @@ -29,8 +29,8 @@ /** - * How many messages do we keep at most in the queue to the - * transport service before we start to drop (default, + * How many messages do we keep at most in the queue to the + * transport service before we start to drop (default, * can be changed via the configuration file). */ #define DEFAULT_MAX_QUEUE_LENGTH 16 @@ -60,7 +60,7 @@ struct FlowControl * Closure for @e cb */ void *cb_cls; - + /** * Which peer is this about? */ @@ -93,7 +93,7 @@ struct AckPending * Communicator this entry belongs to. */ struct GNUNET_TRANSPORT_CommunicatorHandle *ch; - + /** * Which peer is this about? */ @@ -145,12 +145,12 @@ struct GNUNET_TRANSPORT_CommunicatorHandle * DLL of queues we offer. */ struct GNUNET_TRANSPORT_QueueHandle *queue_head; - + /** * DLL of queues we offer. */ struct GNUNET_TRANSPORT_QueueHandle *queue_tail; - + /** * Our configuration. */ @@ -181,12 +181,12 @@ struct GNUNET_TRANSPORT_CommunicatorHandle * Queue to talk to the transport service. */ struct GNUNET_MQ_Handle *mq; - + /** * Maximum permissable queue length. */ unsigned long long max_queue_length; - + /** * Flow-control identifier generator. */ @@ -202,7 +202,12 @@ struct GNUNET_TRANSPORT_CommunicatorHandle * Queue identifier generator. */ uint32_t queue_gen; - + + /** + * Characteristics of the communicator. + */ + enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; + }; @@ -222,21 +227,26 @@ struct GNUNET_TRANSPORT_QueueHandle * Kept in a DLL. */ struct GNUNET_TRANSPORT_QueueHandle *prev; - + /** * Handle this queue belongs to. */ struct GNUNET_TRANSPORT_CommunicatorHandle *ch; /** - * Which peer we can communciate with. + * Address used by the communication queue. */ - struct GNUNET_PeerIdentity peer; + char *address; /** - * Address used by the communication queue. - */ - char *address; + * The queue itself. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Which peer we can communciate with. + */ + struct GNUNET_PeerIdentity peer; /** * Network type of the communciation queue. @@ -247,11 +257,11 @@ struct GNUNET_TRANSPORT_QueueHandle * Communication status of the queue. */ enum GNUNET_TRANSPORT_ConnectionStatus cs; - + /** - * The queue itself. - */ - struct GNUNET_MQ_Handle *mq; + * How many hops is the target away (DV-only) + */ + uint32_t distance; /** * ID for this queue when talking to the transport service. @@ -262,7 +272,7 @@ struct GNUNET_TRANSPORT_QueueHandle * Maximum transmission unit for the queue. */ uint32_t mtu; - + }; @@ -282,7 +292,7 @@ struct GNUNET_TRANSPORT_AddressIdentifier * Kept in a DLL. */ struct GNUNET_TRANSPORT_AddressIdentifier *prev; - + /** * Transport handle where the address was added. */ @@ -298,7 +308,7 @@ struct GNUNET_TRANSPORT_AddressIdentifier * address.) */ struct GNUNET_TIME_Relative expiration; - + /** * Internal UUID for the address used in communication with the * transport service. @@ -309,7 +319,7 @@ struct GNUNET_TRANSPORT_AddressIdentifier * Network type for the address. */ enum GNUNET_ATS_Network_Type nt; - + }; @@ -333,7 +343,7 @@ send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) { struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_AddAddressMessage *aam; - + if (NULL == ai->ch->mq) return; env = GNUNET_MQ_msg_extra (aam, @@ -360,10 +370,10 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) { struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_DelAddressMessage *dam; - + if (NULL == ai->ch->mq) return; - env = GNUNET_MQ_msg (dam, + env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); dam->aid = htonl (ai->aid); GNUNET_MQ_send (ai->ch->mq, @@ -382,7 +392,7 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) { struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_AddQueueMessage *aqm; - + if (NULL == qh->ch->mq) return; env = GNUNET_MQ_msg_extra (aqm, @@ -393,6 +403,7 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) aqm->nt = htonl ((uint32_t) qh->nt); aqm->mtu = htonl (qh->mtu); aqm->cs = htonl ((uint32_t) qh->cs); + aqm->distance = htonl (qh->distance); memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); @@ -412,10 +423,10 @@ send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) { struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_DelQueueMessage *dqm; - + if (NULL == qh->ch->mq) return; - env = GNUNET_MQ_msg (dqm, + env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN); dqm->qid = htonl (qh->queue_id); dqm->receiver = qh->peer; @@ -437,7 +448,7 @@ disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) { struct FlowControl *fcn; struct AckPending *apn; - + for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn) @@ -497,7 +508,7 @@ handle_incoming_ack (void *cls, const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) { struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; - + for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next) @@ -563,7 +574,7 @@ handle_create_queue (void *cls, const char *addr = (const char *) &cq[1]; struct GNUNET_TRANSPORT_CreateQueueResponse *cqr; struct GNUNET_MQ_Envelope *env; - + if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, @@ -573,12 +584,12 @@ handle_create_queue (void *cls, "Address `%s' invalid for this communicator\n", addr); env = GNUNET_MQ_msg (cqr, - GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL); + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL); } else { env = GNUNET_MQ_msg (cqr, - GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK); + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK); } cqr->request_id = cq->request_id; GNUNET_MQ_send (ch->mq, @@ -678,12 +689,12 @@ handle_send_msg (void *cls, struct AckPending *ap; struct GNUNET_TRANSPORT_QueueHandle *qh; - for (qh = ch->queue_head;NULL != qh; qh = qh->next) + for (qh = ch->queue_head;NULL != qh; qh = qh->next) if ( (qh->queue_id == smt->qid) && (0 == memcmp (&qh->peer, &smt->receiver, sizeof (struct GNUNET_PeerIdentity))) ) - break; + break; if (NULL == qh) { /* queue is already gone, tell transport this one failed */ @@ -737,7 +748,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) }; struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam; struct GNUNET_MQ_Envelope *env; - + ch->mq = GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, @@ -745,9 +756,10 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) ch); if (NULL == ch->mq) return; - env = GNUNET_MQ_msg_extra (cam, + env = GNUNET_MQ_msg_extra (cam, strlen (ch->addr_prefix) + 1, GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR); + cam->cc = htonl ((uint32_t) ch->cc); memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1); @@ -771,6 +783,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) * @param config_section section of the configuration to use for options * @param addr_prefix address prefix for addresses supported by this * communicator, could be NULL for incoming-only communicators + * @param cc what characteristics does the communicator have? * @param mtu maximum message size supported by communicator, 0 if * sending is not supported, SIZE_MAX for no MTU * @param mq_init function to call to initialize a message queue given @@ -783,17 +796,19 @@ struct GNUNET_TRANSPORT_CommunicatorHandle * GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, const char *config_section, const char *addr_prefix, + enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc, GNUNET_TRANSPORT_CommunicatorMqInit mq_init, void *mq_init_cls) { struct GNUNET_TRANSPORT_CommunicatorHandle *ch; - + ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); ch->cfg = cfg; ch->config_section = config_section; ch->addr_prefix = addr_prefix; ch->mq_init = mq_init; ch->mq_init_cls = mq_init_cls; + ch->cc = cc; reconnect (ch); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, @@ -858,7 +873,7 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_IncomingMessage *im; uint16_t msize; - + if (NULL == ch->mq) return GNUNET_SYSERR; if ( (NULL == cb) && @@ -869,7 +884,7 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl ch->max_queue_length); return GNUNET_NO; } - + msize = ntohs (msg->size); env = GNUNET_MQ_msg_extra (im, msize, @@ -918,6 +933,8 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl * @param mtu maximum message size supported by queue, 0 if * sending is not supported, SIZE_MAX for no MTU * @param nt which network type does the @a address belong to? + * @param cc what characteristics does the communicator have? + * @param distance how many hops does this queue use (DV-only)? * @param cs what is the connection status of the queue? * @param mq message queue of the @a peer * @return API handle identifying the new MQ @@ -928,6 +945,7 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle const char *address, uint32_t mtu, enum GNUNET_ATS_Network_Type nt, + uint32_t distance, enum GNUNET_TRANSPORT_ConnectionStatus cs, struct GNUNET_MQ_Handle *mq) { @@ -939,6 +957,7 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle qh->address = GNUNET_strdup (address); qh->nt = nt; qh->mtu = mtu; + qh->distance = distance; qh->cs = cs; qh->mq = mq; qh->queue_id = ch->queue_gen++; @@ -960,7 +979,7 @@ void GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) { struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; - + send_del_queue (qh); GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, -- cgit v1.2.3