aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport-testing2.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport-testing2.c')
-rw-r--r--src/transport/transport-testing2.c188
1 files changed, 165 insertions, 23 deletions
diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c
index fe2f28f54..0dc1bb331 100644
--- a/src/transport/transport-testing2.c
+++ b/src/transport/transport-testing2.c
@@ -33,7 +33,7 @@
33#include "gnunet_hello_lib.h" 33#include "gnunet_hello_lib.h"
34#include "gnunet_signatures.h" 34#include "gnunet_signatures.h"
35#include "transport.h" 35#include "transport.h"
36 36#include <inttypes.h>
37 37
38#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__) 38#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
39 39
@@ -84,6 +84,8 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
84 */ 84 */
85 char *cfg_filename; 85 char *cfg_filename;
86 86
87 struct GNUNET_PeerIdentity peer_id;
88
87 /** 89 /**
88 * @brief Handle to the transport service 90 * @brief Handle to the transport service
89 */ 91 */
@@ -107,6 +109,11 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
107 struct GNUNET_OS_Process *nat_proc; 109 struct GNUNET_OS_Process *nat_proc;
108 110
109 /** 111 /**
112 * resolver service process
113 */
114 struct GNUNET_OS_Process *resolver_proc;
115
116 /**
110 * @brief Task that will be run on shutdown to stop and clean communicator 117 * @brief Task that will be run on shutdown to stop and clean communicator
111 */ 118 */
112 struct GNUNET_SCHEDULER_Task *c_shutdown_task; 119 struct GNUNET_SCHEDULER_Task *c_shutdown_task;
@@ -225,11 +232,21 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
225 uint32_t nt; 232 uint32_t nt;
226 233
227 /** 234 /**
228 * Maximum transmission unit, in NBO. UINT32_MAX for unlimited. 235 * Maximum transmission unit. UINT32_MAX for unlimited.
229 */ 236 */
230 uint32_t mtu; 237 uint32_t mtu;
231 238
232 /** 239 /**
240 * Queue length. UINT64_MAX for unlimited.
241 */
242 uint64_t q_len;
243
244 /**
245 * Queue prio
246 */
247 uint32_t priority;
248
249 /**
233 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO. 250 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
234 */ 251 */
235 uint32_t cs; 252 uint32_t cs;
@@ -368,7 +385,8 @@ handle_communicator_backchannel (void *cls,
368 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi; 385 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
369 struct GNUNET_MQ_Envelope *env; 386 struct GNUNET_MQ_Envelope *env;
370 387
371 388 LOG (GNUNET_ERROR_TYPE_DEBUG,
389 "Received backchannel message\n");
372 if (tc_h->bc_enabled != GNUNET_YES) 390 if (tc_h->bc_enabled != GNUNET_YES)
373 { 391 {
374 GNUNET_SERVICE_client_continue (client->client); 392 GNUNET_SERVICE_client_continue (client->client);
@@ -376,17 +394,17 @@ handle_communicator_backchannel (void *cls,
376 } 394 }
377 /* Find client providing this communicator */ 395 /* Find client providing this communicator */
378 /* Finally, deliver backchannel message to communicator */ 396 /* Finally, deliver backchannel message to communicator */
379 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 397 LOG (GNUNET_ERROR_TYPE_DEBUG,
380 "Delivering backchannel message of type %u to %s\n", 398 "Delivering backchannel message of type %u to %s\n",
381 ntohs (msg->type), 399 ntohs (msg->type),
382 target_communicator); 400 target_communicator);
383 other_tc_h = tc_h->bc_cb (tc_h, msg, (struct 401 other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
384 GNUNET_PeerIdentity*) &bc_msg->pid); 402 GNUNET_PeerIdentity*) &bc_msg->pid);
385 env = GNUNET_MQ_msg_extra ( 403 env = GNUNET_MQ_msg_extra (
386 cbi, 404 cbi,
387 isize, 405 isize,
388 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING); 406 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
389 cbi->pid = bc_msg->pid; 407 cbi->pid = tc_h->peer_id;
390 memcpy (&cbi[1], msg, isize); 408 memcpy (&cbi[1], msg, isize);
391 409
392 410
@@ -493,9 +511,6 @@ handle_incoming_msg (void *cls,
493 msg = (struct GNUNET_MessageHeader *) &inc_msg[1]; 511 msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
494 size_t payload_len = ntohs (msg->size) - sizeof (struct 512 size_t payload_len = ntohs (msg->size) - sizeof (struct
495 GNUNET_MessageHeader); 513 GNUNET_MessageHeader);
496 LOG (GNUNET_ERROR_TYPE_DEBUG,
497 "Incoming message from communicator!\n");
498
499 if (NULL != tc_h->incoming_msg_cb) 514 if (NULL != tc_h->incoming_msg_cb)
500 { 515 {
501 tc_h->incoming_msg_cb (tc_h->cb_cls, 516 tc_h->incoming_msg_cb (tc_h->cb_cls,
@@ -515,6 +530,7 @@ handle_incoming_msg (void *cls,
515 struct GNUNET_TRANSPORT_IncomingMessageAck *ack; 530 struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
516 531
517 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK); 532 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
533 GNUNET_assert (NULL != env);
518 ack->reserved = htonl (0); 534 ack->reserved = htonl (0);
519 ack->fc_id = inc_msg->fc_id; 535 ack->fc_id = inc_msg->fc_id;
520 ack->sender = inc_msg->sender; 536 ack->sender = inc_msg->sender;
@@ -605,15 +621,14 @@ handle_add_queue_message (void *cls,
605 client->tc; 621 client->tc;
606 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue; 622 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
607 623
608 tc_queue = tc_h->queue_head; 624 LOG (GNUNET_ERROR_TYPE_DEBUG,
609 if (NULL != tc_queue) 625 "Got queue with ID %u\n", msg->qid);
626 for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
610 { 627 {
611 while (tc_queue->qid != msg->qid) 628 if (tc_queue->qid == msg->qid)
612 { 629 break;
613 tc_queue = tc_queue->next;
614 }
615 } 630 }
616 else 631 if (NULL == tc_queue)
617 { 632 {
618 tc_queue = 633 tc_queue =
619 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue); 634 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
@@ -625,12 +640,54 @@ handle_add_queue_message (void *cls,
625 GNUNET_assert (tc_queue->qid == msg->qid); 640 GNUNET_assert (tc_queue->qid == msg->qid);
626 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver)); 641 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
627 tc_queue->nt = msg->nt; 642 tc_queue->nt = msg->nt;
628 tc_queue->mtu = msg->mtu; 643 tc_queue->mtu = ntohl (msg->mtu);
629 tc_queue->cs = msg->cs; 644 tc_queue->cs = msg->cs;
645 tc_queue->priority = ntohl (msg->priority);
646 tc_queue->q_len = GNUNET_ntohll (msg->q_len);
630 if (NULL != tc_h->add_queue_cb) 647 if (NULL != tc_h->add_queue_cb)
631 { 648 {
632 tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue); 649 tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
650 }
651 GNUNET_SERVICE_client_continue (client->client);
652}
653
654
655/**
656 * @brief Handle new queue
657 *
658 * Store context and call client callback.
659 *
660 * @param cls Closure - communicator handle
661 * @param msg Message struct
662 */
663static void
664handle_update_queue_message (void *cls,
665 const struct
666 GNUNET_TRANSPORT_UpdateQueueMessage *msg)
667{
668 struct MyClient *client = cls;
669 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
670 client->tc;
671 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
672
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "Received queue update message for %u with q_len %"PRIu64"\n",
675 msg->qid, GNUNET_ntohll(msg->q_len));
676 tc_queue = tc_h->queue_head;
677 if (NULL != tc_queue)
678 {
679 while (tc_queue->qid != msg->qid)
680 {
681 tc_queue = tc_queue->next;
682 }
633 } 683 }
684 GNUNET_assert (tc_queue->qid == msg->qid);
685 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
686 tc_queue->nt = msg->nt;
687 tc_queue->mtu = ntohl (msg->mtu);
688 tc_queue->cs = msg->cs;
689 tc_queue->priority = ntohl (msg->priority);
690 tc_queue->q_len += GNUNET_ntohll (msg->q_len);
634 GNUNET_SERVICE_client_continue (client->client); 691 GNUNET_SERVICE_client_continue (client->client);
635} 692}
636 693
@@ -719,6 +776,8 @@ disconnect_cb (void *cls,
719 GNUNET_CONTAINER_DLL_remove (tc_h->client_head, 776 GNUNET_CONTAINER_DLL_remove (tc_h->client_head,
720 tc_h->client_tail, 777 tc_h->client_tail,
721 cl); 778 cl);
779 if (cl->c_mq == tc_h->c_mq)
780 tc_h->c_mq = NULL;
722 GNUNET_free (cl); 781 GNUNET_free (cl);
723 break; 782 break;
724 } 783 }
@@ -786,6 +845,10 @@ transport_communicator_start (
786 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, 845 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
787 struct GNUNET_TRANSPORT_AddQueueMessage, 846 struct GNUNET_TRANSPORT_AddQueueMessage,
788 tc_h), 847 tc_h),
848 GNUNET_MQ_hd_fixed_size (update_queue_message,
849 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
850 struct GNUNET_TRANSPORT_UpdateQueueMessage,
851 tc_h),
789 // GNUNET_MQ_hd_fixed_size (del_queue_message, 852 // GNUNET_MQ_hd_fixed_size (del_queue_message,
790 // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, 853 // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
791 // struct GNUNET_TRANSPORT_DelQueueMessage, 854 // struct GNUNET_TRANSPORT_DelQueueMessage,
@@ -819,11 +882,11 @@ shutdown_process (struct GNUNET_OS_Process *proc)
819 if (0 != GNUNET_OS_process_kill (proc, SIGTERM)) 882 if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
820 { 883 {
821 LOG (GNUNET_ERROR_TYPE_WARNING, 884 LOG (GNUNET_ERROR_TYPE_WARNING,
822 "Error shutting down communicator with SIGERM, trying SIGKILL\n"); 885 "Error shutting down process with SIGERM, trying SIGKILL\n");
823 if (0 != GNUNET_OS_process_kill (proc, SIGKILL)) 886 if (0 != GNUNET_OS_process_kill (proc, SIGKILL))
824 { 887 {
825 LOG (GNUNET_ERROR_TYPE_ERROR, 888 LOG (GNUNET_ERROR_TYPE_ERROR,
826 "Error shutting down communicator with SIGERM and SIGKILL\n"); 889 "Error shutting down process with SIGERM and SIGKILL\n");
827 } 890 }
828 } 891 }
829 GNUNET_OS_process_destroy (proc); 892 GNUNET_OS_process_destroy (proc);
@@ -884,6 +947,45 @@ shutdown_nat (void *cls)
884 shutdown_process (proc); 947 shutdown_process (proc);
885} 948}
886 949
950/**
951 * @brief Task run at shutdown to kill the resolver process
952 *
953 * @param cls Closure - Process of communicator
954 */
955static void
956shutdown_resolver (void *cls)
957{
958 struct GNUNET_OS_Process *proc = cls;
959 shutdown_process (proc);
960}
961
962static void
963resolver_start (struct
964 GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
965{
966 char *binary;
967
968 LOG (GNUNET_ERROR_TYPE_DEBUG, "resolver_start\n");
969 binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-resolver");
970 tc_h->resolver_proc = GNUNET_OS_start_process (GNUNET_YES,
971 GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
972 NULL,
973 NULL,
974 NULL,
975 binary,
976 "gnunet-service-resolver",
977 "-c",
978 tc_h->cfg_filename,
979 NULL);
980 if (NULL == tc_h->resolver_proc)
981 {
982 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start resolver service!");
983 return;
984 }
985 LOG (GNUNET_ERROR_TYPE_INFO, "started resolver service\n");
986 GNUNET_free (binary);
987
988}
887 989
888/** 990/**
889 * @brief Start NAT 991 * @brief Start NAT
@@ -934,6 +1036,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
934 const char *service_name, 1036 const char *service_name,
935 const char *binary_name, 1037 const char *binary_name,
936 const char *cfg_filename, 1038 const char *cfg_filename,
1039 const struct GNUNET_PeerIdentity *peer_id,
937 GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback 1040 GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback
938 communicator_available_cb, 1041 communicator_available_cb,
939 GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb, 1042 GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb,
@@ -971,12 +1074,15 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
971 tc_h->add_queue_cb = add_queue_cb; 1074 tc_h->add_queue_cb = add_queue_cb;
972 tc_h->incoming_msg_cb = incoming_message_cb; 1075 tc_h->incoming_msg_cb = incoming_message_cb;
973 tc_h->bc_cb = bc_cb; 1076 tc_h->bc_cb = bc_cb;
1077 tc_h->peer_id = *peer_id;
974 tc_h->cb_cls = cb_cls; 1078 tc_h->cb_cls = cb_cls;
975 1079
976 /* Start communicator part of service */ 1080 /* Start communicator part of service */
977 transport_communicator_start (tc_h); 1081 transport_communicator_start (tc_h);
978 /* Start NAT */ 1082 /* Start NAT */
979 nat_start (tc_h); 1083 nat_start (tc_h);
1084 /* Start resolver service */
1085 resolver_start (tc_h);
980 /* Schedule start communicator */ 1086 /* Schedule start communicator */
981 communicator_start (tc_h, 1087 communicator_start (tc_h,
982 binary_name); 1088 binary_name);
@@ -991,6 +1097,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop (
991 shutdown_communicator (tc_h->c_proc); 1097 shutdown_communicator (tc_h->c_proc);
992 shutdown_service (tc_h->sh); 1098 shutdown_service (tc_h->sh);
993 shutdown_nat (tc_h->nat_proc); 1099 shutdown_nat (tc_h->nat_proc);
1100 shutdown_resolver (tc_h->resolver_proc);
994 GNUNET_CONFIGURATION_destroy (tc_h->cfg); 1101 GNUNET_CONFIGURATION_destroy (tc_h->cfg);
995 GNUNET_free (tc_h); 1102 GNUNET_free (tc_h);
996} 1103}
@@ -1058,7 +1165,7 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
1058 */ 1165 */
1059void 1166void
1060GNUNET_TRANSPORT_TESTING_transport_communicator_send 1167GNUNET_TRANSPORT_TESTING_transport_communicator_send
1061 (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue, 1168 (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
1062 GNUNET_SCHEDULER_TaskCallback cont, 1169 GNUNET_SCHEDULER_TaskCallback cont,
1063 void *cont_cls, 1170 void *cont_cls,
1064 const void *payload, 1171 const void *payload,
@@ -1068,11 +1175,46 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
1068 struct GNUNET_TRANSPORT_SendMessageTo *msg; 1175 struct GNUNET_TRANSPORT_SendMessageTo *msg;
1069 struct GNUNET_MQ_Envelope *env; 1176 struct GNUNET_MQ_Envelope *env;
1070 size_t inbox_size; 1177 size_t inbox_size;
1178 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
1179 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
1071 1180
1181 tc_queue = NULL;
1182 for (tc_queue_tmp = tc_h->queue_head;
1183 NULL != tc_queue_tmp;
1184 tc_queue_tmp = tc_queue_tmp->next)
1185 {
1186 if (tc_queue_tmp->q_len <= 0)
1187 continue;
1188 if (NULL == tc_queue)
1189 {
1190 LOG (GNUNET_ERROR_TYPE_DEBUG,
1191 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1192 tc_queue_tmp->priority,
1193 tc_queue_tmp->q_len,
1194 tc_queue_tmp->mtu);
1195 tc_queue = tc_queue_tmp;
1196 continue;
1197 }
1198 if (tc_queue->priority < tc_queue_tmp->priority)
1199 {
1200 LOG (GNUNET_ERROR_TYPE_DEBUG,
1201 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1202 tc_queue_tmp->priority,
1203 tc_queue_tmp->q_len,
1204 tc_queue_tmp->mtu);
1205 tc_queue = tc_queue_tmp;
1206 }
1207 }
1208 GNUNET_assert (NULL != tc_queue);
1209 if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
1210 tc_queue->q_len--;
1211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212 "Sending message\n");
1072 inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size; 1213 inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
1073 env = GNUNET_MQ_msg_extra (msg, 1214 env = GNUNET_MQ_msg_extra (msg,
1074 inbox_size, 1215 inbox_size,
1075 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); 1216 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
1217 GNUNET_assert (NULL != env);
1076 msg->qid = htonl (tc_queue->qid); 1218 msg->qid = htonl (tc_queue->qid);
1077 msg->mid = tc_queue->mid++; 1219 msg->mid = tc_queue->mid++;
1078 msg->receiver = tc_queue->peer_id; 1220 msg->receiver = tc_queue->peer_id;