summaryrefslogtreecommitdiff
path: root/src/transport/transport-testing2.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
commit198c09654354d09a9b33f27cf095e0295f70826c (patch)
tree07aa088c8e9664dc76915cc6b664654da59359f4 /src/transport/transport-testing2.c
parenta325c3eaa8450d325fe57959eac29da5496cfd6d (diff)
tng: more UDP communicator backchannels
Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter.
Diffstat (limited to 'src/transport/transport-testing2.c')
-rw-r--r--src/transport/transport-testing2.c126
1 files changed, 105 insertions, 21 deletions
diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c
index fc6d13590..8250027f7 100644
--- a/src/transport/transport-testing2.c
+++ b/src/transport/transport-testing2.c
@@ -33,7 +33,7 @@
#include "gnunet_hello_lib.h"
#include "gnunet_signatures.h"
#include "transport.h"
-
+#include <inttypes.h>
#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
@@ -227,11 +227,21 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
uint32_t nt;
/**
- * Maximum transmission unit, in NBO. UINT32_MAX for unlimited.
+ * Maximum transmission unit. UINT32_MAX for unlimited.
*/
uint32_t mtu;
/**
+ * Queue length. UINT64_MAX for unlimited.
+ */
+ uint64_t q_len;
+
+ /**
+ * Queue prio
+ */
+ uint32_t priority;
+
+ /**
* An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
*/
uint32_t cs;
@@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls,
struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
struct GNUNET_MQ_Envelope *env;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received backchannel message\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received backchannel message\n");
if (tc_h->bc_enabled != GNUNET_YES)
{
GNUNET_SERVICE_client_continue (client->client);
@@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls,
}
/* Find client providing this communicator */
/* Finally, deliver backchannel message to communicator */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Delivering backchannel message of type %u to %s\n",
- ntohs (msg->type),
- target_communicator);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Delivering backchannel message of type %u to %s\n",
+ ntohs (msg->type),
+ target_communicator);
other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
GNUNET_PeerIdentity*) &bc_msg->pid);
env = GNUNET_MQ_msg_extra (
@@ -496,9 +506,6 @@ handle_incoming_msg (void *cls,
msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
size_t payload_len = ntohs (msg->size) - sizeof (struct
GNUNET_MessageHeader);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Incoming message from communicator!\n");
-
if (NULL != tc_h->incoming_msg_cb)
{
tc_h->incoming_msg_cb (tc_h->cb_cls,
@@ -608,15 +615,14 @@ handle_add_queue_message (void *cls,
client->tc;
struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
- tc_queue = tc_h->queue_head;
- if (NULL != tc_queue)
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got queue with ID %u\n", msg->qid);
+ for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
{
- while (tc_queue->qid != msg->qid)
- {
- tc_queue = tc_queue->next;
- }
+ if (tc_queue->qid == msg->qid)
+ break;
}
- else
+ if (NULL == tc_queue)
{
tc_queue =
GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
@@ -628,17 +634,59 @@ handle_add_queue_message (void *cls,
GNUNET_assert (tc_queue->qid == msg->qid);
GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
tc_queue->nt = msg->nt;
- tc_queue->mtu = msg->mtu;
+ tc_queue->mtu = ntohl (msg->mtu);
tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ tc_queue->q_len = GNUNET_ntohll (msg->q_len);
if (NULL != tc_h->add_queue_cb)
{
- tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue);
+ tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
}
GNUNET_SERVICE_client_continue (client->client);
}
/**
+ * @brief Handle new queue
+ *
+ * Store context and call client callback.
+ *
+ * @param cls Closure - communicator handle
+ * @param msg Message struct
+ */
+static void
+handle_update_queue_message (void *cls,
+ const struct
+ GNUNET_TRANSPORT_UpdateQueueMessage *msg)
+{
+ struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received queue update message for %u with q_len %"PRIu64"\n",
+ msg->qid, GNUNET_ntohll(msg->q_len));
+ tc_queue = tc_h->queue_head;
+ if (NULL != tc_queue)
+ {
+ while (tc_queue->qid != msg->qid)
+ {
+ tc_queue = tc_queue->next;
+ }
+ }
+ GNUNET_assert (tc_queue->qid == msg->qid);
+ GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
+ tc_queue->nt = msg->nt;
+ tc_queue->mtu = ntohl (msg->mtu);
+ tc_queue->cs = msg->cs;
+ tc_queue->priority = ntohl (msg->priority);
+ tc_queue->q_len += GNUNET_ntohll (msg->q_len);
+ GNUNET_SERVICE_client_continue (client->client);
+}
+
+
+/**
* @brief Shut down the service
*
* @param cls Closure - Handle to the service
@@ -789,6 +837,10 @@ transport_communicator_start (
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
struct GNUNET_TRANSPORT_AddQueueMessage,
tc_h),
+ GNUNET_MQ_hd_fixed_size (update_queue_message,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
+ struct GNUNET_TRANSPORT_UpdateQueueMessage,
+ tc_h),
// GNUNET_MQ_hd_fixed_size (del_queue_message,
// GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
// struct GNUNET_TRANSPORT_DelQueueMessage,
@@ -1063,7 +1115,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
*/
void
GNUNET_TRANSPORT_TESTING_transport_communicator_send
- (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue,
+ (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
GNUNET_SCHEDULER_TaskCallback cont,
void *cont_cls,
const void *payload,
@@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
struct GNUNET_TRANSPORT_SendMessageTo *msg;
struct GNUNET_MQ_Envelope *env;
size_t inbox_size;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
+ tc_queue = NULL;
+ for (tc_queue_tmp = tc_h->queue_head;
+ NULL != tc_queue_tmp;
+ tc_queue_tmp = tc_queue_tmp->next)
+ {
+ if (tc_queue_tmp->q_len <= 0)
+ continue;
+ if (NULL == tc_queue)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ continue;
+ }
+ if (tc_queue->priority < tc_queue_tmp->priority)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
+ tc_queue_tmp->priority,
+ tc_queue_tmp->q_len,
+ tc_queue_tmp->mtu);
+ tc_queue = tc_queue_tmp;
+ }
+ }
+ GNUNET_assert (NULL != tc_queue);
+ if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
+ tc_queue->q_len--;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending message\n");
inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;