From 198c09654354d09a9b33f27cf095e0295f70826c Mon Sep 17 00:00:00 2001 From: Martin Schanzenbach Date: Mon, 1 Jun 2020 16:39:35 +0200 Subject: tng: more UDP communicator backchannels Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter. --- src/transport/transport_api2_communication.c | 77 ++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) (limited to 'src/transport/transport_api2_communication.c') diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index e80cd5c03..cfa144415 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c @@ -280,6 +280,15 @@ struct GNUNET_TRANSPORT_QueueHandle * Maximum transmission unit for the queue. */ uint32_t mtu; + + /** + * Queue length. + */ + uint64_t q_len; + /** + * Queue priority. + */ + uint32_t priority; }; @@ -395,6 +404,8 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) if (NULL == qh->ch->mq) return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n"); env = GNUNET_MQ_msg_extra (aqm, strlen (qh->address) + 1, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); @@ -402,11 +413,39 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) aqm->receiver = qh->peer; aqm->nt = htonl ((uint32_t) qh->nt); aqm->mtu = htonl (qh->mtu); + aqm->q_len = GNUNET_htonll (qh->q_len); + aqm->priority = htonl (qh->priority); aqm->cs = htonl ((uint32_t) qh->cs); memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); GNUNET_MQ_send (qh->ch->mq, env); } +/** + * Send message to the transport service about queue @a qh + * updated. + * + * @param qh queue to add + */ +static void +send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm; + + if (NULL == qh->ch->mq) + return; + env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE); + uqm->qid = htonl (qh->queue_id); + uqm->receiver = qh->peer; + uqm->nt = htonl ((uint32_t) qh->nt); + uqm->mtu = htonl (qh->mtu); + uqm->q_len = GNUNET_htonll (qh->q_len); + uqm->priority = htonl (qh->priority); + uqm->cs = htonl ((uint32_t) qh->cs); + GNUNET_MQ_send (qh->ch->mq, env); +} + + /** * Send message to the transport service about queue @a qh @@ -924,6 +963,9 @@ GNUNET_TRANSPORT_communicator_receive ( * @param address address in human-readable format, 0-terminated, UTF-8 * @param mtu maximum message size supported by queue, 0 if * sending is not supported, SIZE_MAX for no MTU + * @param q_len number of messages that can be send through this queue + * @param priority queue priority. Queues with highest priority should be + * used * @param nt which network type does the @a address belong to? * @param cc what characteristics does the communicator have? * @param cs what is the connection status of the queue? @@ -936,6 +978,8 @@ GNUNET_TRANSPORT_communicator_mq_add ( const struct GNUNET_PeerIdentity *peer, const char *address, uint32_t mtu, + uint64_t q_len, + uint32_t priority, enum GNUNET_NetworkType nt, enum GNUNET_TRANSPORT_ConnectionStatus cs, struct GNUNET_MQ_Handle *mq) @@ -948,6 +992,8 @@ GNUNET_TRANSPORT_communicator_mq_add ( qh->address = GNUNET_strdup (address); qh->nt = nt; qh->mtu = mtu; + qh->q_len = q_len; + qh->priority = priority; qh->cs = cs; qh->mq = mq; qh->queue_id = ch->queue_gen++; @@ -957,6 +1003,37 @@ GNUNET_TRANSPORT_communicator_mq_add ( } +/** + * Notify transport service that an MQ was updated + * + * @param ch connection to transport service + * @param qh the queue to update + * @param q_len number of messages that can be send through this queue + * @param priority queue priority. Queues with highest priority should be + * used + */ +void +GNUNET_TRANSPORT_communicator_mq_update ( + struct GNUNET_TRANSPORT_CommunicatorHandle *ch, + const struct GNUNET_TRANSPORT_QueueHandle *u_qh, + uint64_t q_len, + uint32_t priority) +{ + struct GNUNET_TRANSPORT_QueueHandle *qh; + + for (qh = ch->queue_head; NULL != qh; qh = qh->next) + { + if (u_qh == qh) + break; + } + GNUNET_assert (NULL != qh); + qh->q_len = q_len; + qh->priority = priority; + send_update_queue (qh); +} + + + /** * Notify transport service that an MQ became unavailable due to a * disconnect or timeout. -- cgit v1.2.3