aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport-testing2.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
committerMartin Schanzenbach <mschanzenbach@posteo.de>2020-06-01 16:39:35 +0200
commit198c09654354d09a9b33f27cf095e0295f70826c (patch)
tree07aa088c8e9664dc76915cc6b664654da59359f4 /src/transport/transport-testing2.c
parenta325c3eaa8450d325fe57959eac29da5496cfd6d (diff)
downloadgnunet-198c09654354d09a9b33f27cf095e0295f70826c.tar.gz
gnunet-198c09654354d09a9b33f27cf095e0295f70826c.zip
tng: more UDP communicator backchannels
Added a new message for queue updates to indicate queue length. Queues now may also have a priority parameter.
Diffstat (limited to 'src/transport/transport-testing2.c')
-rw-r--r--src/transport/transport-testing2.c126
1 files changed, 105 insertions, 21 deletions
diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c
index fc6d13590..8250027f7 100644
--- a/src/transport/transport-testing2.c
+++ b/src/transport/transport-testing2.c
@@ -33,7 +33,7 @@
33#include "gnunet_hello_lib.h" 33#include "gnunet_hello_lib.h"
34#include "gnunet_signatures.h" 34#include "gnunet_signatures.h"
35#include "transport.h" 35#include "transport.h"
36 36#include <inttypes.h>
37 37
38#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__) 38#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
39 39
@@ -227,11 +227,21 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
227 uint32_t nt; 227 uint32_t nt;
228 228
229 /** 229 /**
230 * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. 230 * Maximum transmission unit. UINT32_MAX for unlimited.
231 */ 231 */
232 uint32_t mtu; 232 uint32_t mtu;
233 233
234 /** 234 /**
235 * Queue length. UINT64_MAX for unlimited.
236 */
237 uint64_t q_len;
238
239 /**
240 * Queue prio
241 */
242 uint32_t priority;
243
244 /**
235 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. 245 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
236 */ 246 */
237 uint32_t cs; 247 uint32_t cs;
@@ -370,8 +380,8 @@ handle_communicator_backchannel (void *cls,
370 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; 380 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
371 struct GNUNET_MQ_Envelope *env; 381 struct GNUNET_MQ_Envelope *env;
372 382
373 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 383 LOG (GNUNET_ERROR_TYPE_DEBUG,
374 "Received backchannel message\n"); 384 "Received backchannel message\n");
375 if (tc_h->bc_enabled != GNUNET_YES) 385 if (tc_h->bc_enabled != GNUNET_YES)
376 { 386 {
377 GNUNET_SERVICE_client_continue (client->client); 387 GNUNET_SERVICE_client_continue (client->client);
@@ -379,10 +389,10 @@ handle_communicator_backchannel (void *cls,
379 } 389 }
380 /* Find client providing this communicator */ 390 /* Find client providing this communicator */
381 /* Finally, deliver backchannel message to communicator */ 391 /* Finally, deliver backchannel message to communicator */
382 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 392 LOG (GNUNET_ERROR_TYPE_DEBUG,
383 "Delivering backchannel message of type %u to %s\n", 393 "Delivering backchannel message of type %u to %s\n",
384 ntohs (msg->type), 394 ntohs (msg->type),
385 target_communicator); 395 target_communicator);
386 other_tc_h = tc_h->bc_cb (tc_h, msg, (struct 396 other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
387 GNUNET_PeerIdentity*) &bc_msg->pid); 397 GNUNET_PeerIdentity*) &bc_msg->pid);
388 env = GNUNET_MQ_msg_extra ( 398 env = GNUNET_MQ_msg_extra (
@@ -496,9 +506,6 @@ handle_incoming_msg (void *cls,
496 msg = (struct GNUNET_MessageHeader *) &inc_msg[1]; 506 msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
497 size_t payload_len = ntohs (msg->size) - sizeof (struct 507 size_t payload_len = ntohs (msg->size) - sizeof (struct
498 GNUNET_MessageHeader); 508 GNUNET_MessageHeader);
499 LOG (GNUNET_ERROR_TYPE_DEBUG,
500 "Incoming message from communicator!\n");
501
502 if (NULL != tc_h->incoming_msg_cb) 509 if (NULL != tc_h->incoming_msg_cb)
503 { 510 {
504 tc_h->incoming_msg_cb (tc_h->cb_cls, 511 tc_h->incoming_msg_cb (tc_h->cb_cls,
@@ -608,15 +615,14 @@ handle_add_queue_message (void *cls,
608 client->tc; 615 client->tc;
609 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; 616 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
610 617
611 tc_queue = tc_h->queue_head; 618 LOG (GNUNET_ERROR_TYPE_DEBUG,
612 if (NULL != tc_queue) 619 "Got queue with ID %u\n", msg->qid);
620 for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
613 { 621 {
614 while (tc_queue->qid != msg->qid) 622 if (tc_queue->qid == msg->qid)
615 { 623 break;
616 tc_queue = tc_queue->next;
617 }
618 } 624 }
619 else 625 if (NULL == tc_queue)
620 { 626 {
621 tc_queue = 627 tc_queue =
622 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue); 628 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
@@ -628,17 +634,59 @@ handle_add_queue_message (void *cls,
628 GNUNET_assert (tc_queue->qid == msg->qid); 634 GNUNET_assert (tc_queue->qid == msg->qid);
629 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); 635 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
630 tc_queue->nt = msg->nt; 636 tc_queue->nt = msg->nt;
631 tc_queue->mtu = msg->mtu; 637 tc_queue->mtu = ntohl (msg->mtu);
632 tc_queue->cs = msg->cs; 638 tc_queue->cs = msg->cs;
639 tc_queue->priority = ntohl (msg->priority);
640 tc_queue->q_len = GNUNET_ntohll (msg->q_len);
633 if (NULL != tc_h->add_queue_cb) 641 if (NULL != tc_h->add_queue_cb)
634 { 642 {
635 tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue); 643 tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
636 } 644 }
637 GNUNET_SERVICE_client_continue (client->client); 645 GNUNET_SERVICE_client_continue (client->client);
638} 646}
639 647
640 648
641/** 649/**
650 * @brief Handle new queue
651 *
652 * Store context and call client callback.
653 *
654 * @param cls Closure - communicator handle
655 * @param msg Message struct
656 */
657static void
658handle_update_queue_message (void *cls,
659 const struct
660 GNUNET_TRANSPORT_UpdateQueueMessage *msg)
661{
662 struct MyClient *client = cls;
663 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
664 client->tc;
665 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
666
667 LOG (GNUNET_ERROR_TYPE_DEBUG,
668 "Received queue update message for %u with q_len %"PRIu64"\n",
669 msg->qid, GNUNET_ntohll(msg->q_len));
670 tc_queue = tc_h->queue_head;
671 if (NULL != tc_queue)
672 {
673 while (tc_queue->qid != msg->qid)
674 {
675 tc_queue = tc_queue->next;
676 }
677 }
678 GNUNET_assert (tc_queue->qid == msg->qid);
679 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
680 tc_queue->nt = msg->nt;
681 tc_queue->mtu = ntohl (msg->mtu);
682 tc_queue->cs = msg->cs;
683 tc_queue->priority = ntohl (msg->priority);
684 tc_queue->q_len += GNUNET_ntohll (msg->q_len);
685 GNUNET_SERVICE_client_continue (client->client);
686}
687
688
689/**
642 * @brief Shut down the service 690 * @brief Shut down the service
643 * 691 *
644 * @param cls Closure - Handle to the service 692 * @param cls Closure - Handle to the service
@@ -789,6 +837,10 @@ transport_communicator_start (
789 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, 837 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
790 struct GNUNET_TRANSPORT_AddQueueMessage, 838 struct GNUNET_TRANSPORT_AddQueueMessage,
791 tc_h), 839 tc_h),
840 GNUNET_MQ_hd_fixed_size (update_queue_message,
841 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
842 struct GNUNET_TRANSPORT_UpdateQueueMessage,
843 tc_h),
792 // GNUNET_MQ_hd_fixed_size (del_queue_message, 844 // GNUNET_MQ_hd_fixed_size (del_queue_message,
793 // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, 845 // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
794 // struct GNUNET_TRANSPORT_DelQueueMessage, 846 // struct GNUNET_TRANSPORT_DelQueueMessage,
@@ -1063,7 +1115,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
1063 */ 1115 */
1064void 1116void
1065GNUNET_TRANSPORT_TESTING_transport_communicator_send 1117GNUNET_TRANSPORT_TESTING_transport_communicator_send
1066 (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, 1118 (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
1067 GNUNET_SCHEDULER_TaskCallback cont, 1119 GNUNET_SCHEDULER_TaskCallback cont,
1068 void *cont_cls, 1120 void *cont_cls,
1069 const void *payload, 1121 const void *payload,
@@ -1073,7 +1125,39 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
1073 struct GNUNET_TRANSPORT_SendMessageTo *msg; 1125 struct GNUNET_TRANSPORT_SendMessageTo *msg;
1074 struct GNUNET_MQ_Envelope *env; 1126 struct GNUNET_MQ_Envelope *env;
1075 size_t inbox_size; 1127 size_t inbox_size;
1128 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
1129 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
1076 1130
1131 tc_queue = NULL;
1132 for (tc_queue_tmp = tc_h->queue_head;
1133 NULL != tc_queue_tmp;
1134 tc_queue_tmp = tc_queue_tmp->next)
1135 {
1136 if (tc_queue_tmp->q_len <= 0)
1137 continue;
1138 if (NULL == tc_queue)
1139 {
1140 LOG (GNUNET_ERROR_TYPE_DEBUG,
1141 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1142 tc_queue_tmp->priority,
1143 tc_queue_tmp->q_len,
1144 tc_queue_tmp->mtu);
1145 tc_queue = tc_queue_tmp;
1146 continue;
1147 }
1148 if (tc_queue->priority < tc_queue_tmp->priority)
1149 {
1150 LOG (GNUNET_ERROR_TYPE_DEBUG,
1151 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1152 tc_queue_tmp->priority,
1153 tc_queue_tmp->q_len,
1154 tc_queue_tmp->mtu);
1155 tc_queue = tc_queue_tmp;
1156 }
1157 }
1158 GNUNET_assert (NULL != tc_queue);
1159 if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
1160 tc_queue->q_len--;
1077 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078 "Sending message\n"); 1162 "Sending message\n");
1079 inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; 1163 inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;