From b26f2bce0710217bf68fd829ba6a652ca09cbfa8 Mon Sep 17 00:00:00 2001 From: Julius Bünger Date: Sat, 20 Apr 2019 10:57:18 +0200 Subject: TNG testing: Add ability to open queue --- src/transport/test_communicator_unix.c | 79 +++++++++- src/transport/transport-testing2.c | 280 +++++++++++++++++++++++++++++---- src/transport/transport-testing2.h | 43 +++-- 3 files changed, 352 insertions(+), 50 deletions(-) (limited to 'src') diff --git a/src/transport/test_communicator_unix.c b/src/transport/test_communicator_unix.c index fd189659c..64ba1d5f5 100644 --- a/src/transport/test_communicator_unix.c +++ b/src/transport/test_communicator_unix.c @@ -30,6 +30,8 @@ #include "gnunet_signatures.h" #include "transport.h" +#include + /** * TODO * - start two communicators @@ -45,28 +47,91 @@ #define LOG(kind,...) GNUNET_log_from (kind, "test_transport_communicator_unix", __VA_ARGS__) +#define NUM_PEERS 2 + +static struct GNUNET_PeerIdentity peer_id[NUM_PEERS]; + +static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_hs[NUM_PEERS]; + +//static char *addresses[NUM_PEERS]; + +static void +communicator_available_cb (void *cls, + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc, + char *address_prefix) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Communicator available. (cc: %u, prefix: %s)\n", + cc, + address_prefix); +} + + +static void +add_address_cb (void *cls, + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + const char *address, + struct GNUNET_TIME_Relative expiration, + uint32_t aid, + enum GNUNET_NetworkType nt) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "New address. (addr: %s, expir: %" PRIu32 ", ID: %" PRIu32 ", nt: %u\n", + address, + expiration.rel_value_us, + aid, + nt); + //addresses[1] = GNUNET_strdup (address); + GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (tc_hs[0], + &peer_id[1], + address); +} + + +static void +queue_create_reply_cb (void *cls, + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + int success) +{ + if (GNUNET_YES == success) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got Queue!\n"); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed getting queue!\n"); +} + + static void -communicator_available (void *cls, - const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg) +add_queue_cb (void *cls, + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "communicator_available()\n"); + "Got Queue!\n"); } + static void run (void *cls) { struct GNUNET_CONFIGURATION_Handle *cfg = cls; - GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( + tc_hs[0] = GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( "transport", "test_communicator_1.conf", - &communicator_available, + &communicator_available_cb, + NULL, + &queue_create_reply_cb, + &add_queue_cb, NULL); /* cls */ - GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( + tc_hs[1] = GNUNET_TRANSPORT_TESTING_transport_communicator_service_start ( "transport", "test_communicator_2.conf", - &communicator_available, + &communicator_available_cb, + &add_address_cb, + NULL, + &add_queue_cb, NULL); /* cls */ } diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index 51791e981..9fa6b7761 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -30,6 +30,7 @@ #include "gnunet_constants.h" #include "transport-testing2.h" #include "gnunet_ats_transport_service.h" +#include "gnunet_hello_lib.h" #include "gnunet_signatures.h" #include "transport.h" @@ -65,6 +66,11 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle */ struct GNUNET_SERVICE_Client *client; + /** + * @brief Handle to the client + */ + struct GNUNET_MQ_Handle *c_mq; + /** * @brief Process of the communicator */ @@ -75,16 +81,51 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle */ struct GNUNET_SCHEDULER_Task *c_shutdown_task; + /** + * @brief Characteristics of the communicator + */ + enum GNUNET_TRANSPORT_CommunicatorCharacteristics c_characteristics; + + /** + * @brief Specifies supported addresses + */ + char *c_addr_prefix; + + /** + * @brief Specifies supported addresses + */ + char *c_address; + + /** + * @brief Task to request the opening of a view + */ + struct GNUNET_MQ_Envelope *open_queue_env; + /* Callbacks + Closures */ /** * @brief Callback called when a new communicator connects */ - GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available; + GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available_cb; + + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb; + + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb; + + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb; /** * @brief Closure to the callback */ - void *communicator_available_cls; + void *cb_cls; }; @@ -127,11 +168,133 @@ handle_communicator_available (void *cls, size = ntohs (msg->header.size) - sizeof (*msg); if (0 == size) return; /* receive-only communicator */ - if (NULL != tc_h->communicator_available) + tc_h->c_characteristics = ntohl (msg->cc); + tc_h->c_addr_prefix = GNUNET_strdup ((const char *) &msg[1]); + if (NULL != tc_h->communicator_available_cb) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "calling communicator_available_cb()\n"); + tc_h->communicator_available_cb (tc_h->cb_cls, + tc_h, + tc_h->c_characteristics, + tc_h->c_addr_prefix); + } + GNUNET_SERVICE_client_continue (tc_h->client); +} + + +/** + * Address of our peer added. Test message is well-formed. + * + * @param cls the client + * @param aam the send message that was sent + * @return #GNUNET_OK if message is well-formed + */ +static int +check_add_address (void *cls, + const struct GNUNET_TRANSPORT_AddAddressMessage *msg) +{ + struct TransportClient *tc = cls; + + //if (CT_COMMUNICATOR != tc->type) + //{ + // GNUNET_break (0); + // return GNUNET_SYSERR; + //} + GNUNET_MQ_check_zero_termination (msg); + return GNUNET_OK; +} + + +static void +handle_add_address (void *cls, + const struct GNUNET_TRANSPORT_AddAddressMessage *msg) +{ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; + uint16_t size; + + size = ntohs (msg->header.size) - sizeof (*msg); + if (0 == size) + return; /* receive-only communicator */ + tc_h->c_address = GNUNET_strdup ((const char *) &msg[1]); + if (NULL != tc_h->add_address_cb) { LOG (GNUNET_ERROR_TYPE_DEBUG, "calling communicator_available()\n"); - tc_h->communicator_available (tc_h->communicator_available_cls, msg); + tc_h->add_address_cb (tc_h->cb_cls, + tc_h, + tc_h->c_address, + GNUNET_TIME_relative_ntoh (msg->expiration), + msg->aid, + ntohl (msg->nt)); + } + GNUNET_SERVICE_client_continue (tc_h->client); +} + + +static void +handle_queue_create_ok (void *cls, + const struct GNUNET_TRANSPORT_CreateQueueResponse *msg) +{ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; + + if (NULL != tc_h->queue_create_reply_cb) + { + tc_h->queue_create_reply_cb (tc_h->cb_cls, + tc_h, + GNUNET_YES); + } + GNUNET_SERVICE_client_continue (tc_h->client); +} + + +static void +handle_queue_create_fail (void *cls, + const struct GNUNET_TRANSPORT_CreateQueueResponse *msg) +{ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; + + if (NULL != tc_h->queue_create_reply_cb) + { + tc_h->queue_create_reply_cb (tc_h->cb_cls, + tc_h, + GNUNET_NO); + } + GNUNET_SERVICE_client_continue (tc_h->client); +} + + +/** + * New queue became available. Check message. + * + * @param cls the client + * @param aqm the send message that was sent + */ +static int +check_add_queue_message (void *cls, + const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) +{ + GNUNET_MQ_check_zero_termination (aqm); + return GNUNET_OK; +} + + +/** + * @brief Handle new communicator + * + * @param cls Closure + * @param msg Message struct + */ +static void +handle_add_queue_message (void *cls, + const struct GNUNET_TRANSPORT_AddQueueMessage *msg) +{ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls; + + if (NULL != tc_h->add_queue_cb) + { + tc_h->add_queue_cb (tc_h->cb_cls, + tc_h); } GNUNET_SERVICE_client_continue (tc_h->client); } @@ -170,6 +333,14 @@ connect_cb (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Client connected.\n"); tc_h->client = client; + tc_h->c_mq = mq; + + if (NULL != tc_h->open_queue_env) + { + GNUNET_MQ_send (tc_h->c_mq, + tc_h->open_queue_env); + tc_h->open_queue_env = NULL; + } return tc_h; } @@ -213,10 +384,10 @@ transport_communicator_start (struct GNUNET_TRANSPORT_TESTING_TransportCommunica // GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL, // struct GNUNET_TRANSPORT_CommunicatorBackchannel, // NULL), - //GNUNET_MQ_hd_var_size (add_address, - // GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS, - // struct GNUNET_TRANSPORT_AddAddressMessage, - // NULL), + GNUNET_MQ_hd_var_size (add_address, + GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS, + struct GNUNET_TRANSPORT_AddAddressMessage, + &tc_h), //GNUNET_MQ_hd_fixed_size (del_address, // GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS, // struct GNUNET_TRANSPORT_DelAddressMessage, @@ -225,18 +396,18 @@ transport_communicator_start (struct GNUNET_TRANSPORT_TESTING_TransportCommunica // GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, // struct GNUNET_TRANSPORT_IncomingMessage, // NULL), - //GNUNET_MQ_hd_fixed_size (queue_create_ok, - // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, - // struct GNUNET_TRANSPORT_CreateQueueResponse, - // NULL), - //GNUNET_MQ_hd_fixed_size (queue_create_fail, - // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, - // struct GNUNET_TRANSPORT_CreateQueueResponse, - // NULL), - //GNUNET_MQ_hd_var_size (add_queue_message, - // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, - // struct GNUNET_TRANSPORT_AddQueueMessage, - // NULL), + GNUNET_MQ_hd_fixed_size (queue_create_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, + struct GNUNET_TRANSPORT_CreateQueueResponse, + tc_h), + GNUNET_MQ_hd_fixed_size (queue_create_fail, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, + struct GNUNET_TRANSPORT_CreateQueueResponse, + tc_h), + GNUNET_MQ_hd_var_size (add_queue_message, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, + struct GNUNET_TRANSPORT_AddQueueMessage, + NULL), //GNUNET_MQ_hd_fixed_size (del_queue_message, // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, // struct GNUNET_TRANSPORT_DelQueueMessage, @@ -333,9 +504,10 @@ communicator_start (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle * * @param service_name Name of the service * @param cfg Configuration handle - * @param communicator_available Callback that is called when a new + * @param communicator_available_cb Callback that is called when a new + * @param add_address_cb Callback that is called when a new * communicator becomes available - * @param cb_cls Closure to @p communicator_available + * @param cb_cls Closure to @a communicator_available_cb and @a * * @return Handle to the communicator duo */ @@ -343,10 +515,10 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle * GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (const char *service_name, const char *cfg_filename, - GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available, - //GNUNET_TRANSPORT_TESTING_Callback2 cb2, - //GNUNET_TRANSPORT_TESTING_Callback3 cb3, - //GNUNET_TRANSPORT_TESTING_Callback4 cb4, + GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available_cb, + GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb, + GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb, + GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb, void *cb_cls) { struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h; @@ -363,8 +535,11 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start cfg_filename); return NULL; } - tc_h->communicator_available = communicator_available; - tc_h->communicator_available_cls = cb_cls; + tc_h->communicator_available_cb = communicator_available_cb; + tc_h->add_address_cb = add_address_cb; + tc_h->queue_create_reply_cb = queue_create_reply_cb; + tc_h->add_queue_cb = add_queue_cb; + tc_h->cb_cls = cb_cls; /* Start communicator part of service */ transport_communicator_start (tc_h); @@ -374,11 +549,50 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_service_start return tc_h; } -//void -//GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue -// (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tch, -// const char *address); -// + +void +GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue + (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + const struct GNUNET_PeerIdentity *peer_id, + const char *address) +{ + static uint32_t idgen; + char *prefix; + struct GNUNET_TRANSPORT_CreateQueue *msg; + struct GNUNET_MQ_Envelope *env; + size_t alen; + + if (NULL != tc_h->open_queue_env) + { + // FIXME: handle multiple queue requests + return; /* Already waiting for opening of queue */ + } + prefix = GNUNET_HELLO_address_to_prefix (address); + if (NULL == prefix) + { + GNUNET_break (0); /* We got an invalid address!? */ + return; + } + alen = strlen (address) + 1; + env = GNUNET_MQ_msg_extra (msg, + alen, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE); + msg->request_id = htonl (idgen++); + msg->receiver = *peer_id; + memcpy (&msg[1], + address, + alen); + if (NULL != tc_h->c_mq) + { + GNUNET_MQ_send (tc_h->c_mq, + env); + } + else + { + tc_h->open_queue_env = env; + } +} + //struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission * //GNUNET_TRANSPORT_TESTING_transport_communicator_send // (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tcq, diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h index c5adda4eb..6aee919cd 100644 --- a/src/transport/transport-testing2.h +++ b/src/transport/transport-testing2.h @@ -40,7 +40,29 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle; */ typedef void (*GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback)(void *cls, - const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg); + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc, + char *address_prefix); + + +typedef void +(*GNUNET_TRANSPORT_TESTING_AddAddressCallback)(void *cls, + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + const char *address, + struct GNUNET_TIME_Relative expiration, + uint32_t aid, + enum GNUNET_NetworkType nt); + + +typedef void +(*GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback)(void *cls, + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + int will_try); + + +typedef void +(*GNUNET_TRANSPORT_TESTING_AddQueueCallback)(void *cls, + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h); /** @@ -58,17 +80,18 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle * GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (const char *service_name, const char *cfg_filename, - GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available, - //GNUNET_TRANSPORT_TESTING_Callback2 cb2, - //GNUNET_TRANSPORT_TESTING_Callback3 cb3, - //GNUNET_TRANSPORT_TESTING_Callback4 cb4, + GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback communicator_available_cb, + GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb, + GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb, + GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb, void *cb_cls); -//void -//GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue -// (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tch, -// const char *address); -// +void +GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue + (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h, + const struct GNUNET_PeerIdentity *peer_id, + const char *address); + //struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission * //GNUNET_TRANSPORT_TESTING_transport_communicator_send // (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tcq, -- cgit v1.2.3