From 198c09654354d09a9b33f27cf095e0295f70826c Mon Sep 17 00:00:00 2001 From: Martin Schanzenbach Date: Mon, 1 Jun 2020 16:39:35 +0200 Subject: tng: more UDP communicator backchannels Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter. --- src/transport/transport-testing2.c | 126 ++++++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 21 deletions(-) (limited to 'src/transport/transport-testing2.c') diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index fc6d13590..8250027f7 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -33,7 +33,7 @@ #include "gnunet_hello_lib.h" #include "gnunet_signatures.h" #include "transport.h" - +#include #define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__) @@ -227,10 +227,20 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue uint32_t nt; /** - * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. + * Maximum transmission unit. UINT32_MAX for unlimited. */ uint32_t mtu; + /** + * Queue length. UINT64_MAX for unlimited. + */ + uint64_t q_len; + + /** + * Queue prio + */ + uint32_t priority; + /** * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. */ @@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls, struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; struct GNUNET_MQ_Envelope *env; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received backchannel message\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received backchannel message\n"); if (tc_h->bc_enabled != GNUNET_YES) { GNUNET_SERVICE_client_continue (client->client); @@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls, } /* Find client providing this communicator */ /* Finally, deliver backchannel message to communicator */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Delivering backchannel message of type %u to %s\n", - ntohs (msg->type), - target_communicator); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Delivering backchannel message of type %u to %s\n", + ntohs (msg->type), + target_communicator); other_tc_h = tc_h->bc_cb (tc_h, msg, (struct GNUNET_PeerIdentity*) &bc_msg->pid); env = GNUNET_MQ_msg_extra ( @@ -496,9 +506,6 @@ handle_incoming_msg (void *cls, msg = (struct GNUNET_MessageHeader *) &inc_msg[1]; size_t payload_len = ntohs (msg->size) - sizeof (struct GNUNET_MessageHeader); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Incoming message from communicator!\n"); - if (NULL != tc_h->incoming_msg_cb) { tc_h->incoming_msg_cb (tc_h->cb_cls, @@ -608,15 +615,14 @@ handle_add_queue_message (void *cls, client->tc; struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; - tc_queue = tc_h->queue_head; - if (NULL != tc_queue) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got queue with ID %u\n", msg->qid); + for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next) { - while (tc_queue->qid != msg->qid) - { - tc_queue = tc_queue->next; - } + if (tc_queue->qid == msg->qid) + break; } - else + if (NULL == tc_queue) { tc_queue = GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue); @@ -628,16 +634,58 @@ handle_add_queue_message (void *cls, GNUNET_assert (tc_queue->qid == msg->qid); GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); tc_queue->nt = msg->nt; - tc_queue->mtu = msg->mtu; + tc_queue->mtu = ntohl (msg->mtu); tc_queue->cs = msg->cs; + tc_queue->priority = ntohl (msg->priority); + tc_queue->q_len = GNUNET_ntohll (msg->q_len); if (NULL != tc_h->add_queue_cb) { - tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue); + tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu); } GNUNET_SERVICE_client_continue (client->client); } +/** + * @brief Handle new queue + * + * Store context and call client callback. + * + * @param cls Closure - communicator handle + * @param msg Message struct + */ +static void +handle_update_queue_message (void *cls, + const struct + GNUNET_TRANSPORT_UpdateQueueMessage *msg) +{ + struct MyClient *client = cls; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = + client->tc; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received queue update message for %u with q_len %"PRIu64"\n", + msg->qid, GNUNET_ntohll(msg->q_len)); + tc_queue = tc_h->queue_head; + if (NULL != tc_queue) + { + while (tc_queue->qid != msg->qid) + { + tc_queue = tc_queue->next; + } + } + GNUNET_assert (tc_queue->qid == msg->qid); + GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); + tc_queue->nt = msg->nt; + tc_queue->mtu = ntohl (msg->mtu); + tc_queue->cs = msg->cs; + tc_queue->priority = ntohl (msg->priority); + tc_queue->q_len += GNUNET_ntohll (msg->q_len); + GNUNET_SERVICE_client_continue (client->client); +} + + /** * @brief Shut down the service * @@ -789,6 +837,10 @@ transport_communicator_start ( GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, struct GNUNET_TRANSPORT_AddQueueMessage, tc_h), + GNUNET_MQ_hd_fixed_size (update_queue_message, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE, + struct GNUNET_TRANSPORT_UpdateQueueMessage, + tc_h), // GNUNET_MQ_hd_fixed_size (del_queue_message, // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, // struct GNUNET_TRANSPORT_DelQueueMessage, @@ -1063,7 +1115,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue ( */ void GNUNET_TRANSPORT_TESTING_transport_communicator_send - (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, + (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls, const void *payload, @@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send struct GNUNET_TRANSPORT_SendMessageTo *msg; struct GNUNET_MQ_Envelope *env; size_t inbox_size; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp; + tc_queue = NULL; + for (tc_queue_tmp = tc_h->queue_head; + NULL != tc_queue_tmp; + tc_queue_tmp = tc_queue_tmp->next) + { + if (tc_queue_tmp->q_len <= 0) + continue; + if (NULL == tc_queue) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n", + tc_queue_tmp->priority, + tc_queue_tmp->q_len, + tc_queue_tmp->mtu); + tc_queue = tc_queue_tmp; + continue; + } + if (tc_queue->priority < tc_queue_tmp->priority) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n", + tc_queue_tmp->priority, + tc_queue_tmp->q_len, + tc_queue_tmp->mtu); + tc_queue = tc_queue_tmp; + } + } + GNUNET_assert (NULL != tc_queue); + if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED) + tc_queue->q_len--; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message\n"); inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; -- cgit v1.2.3